# /usr/bin/python
import os
from dpdispatcher.utils import run_cmd_with_all_output
[docs]class HDFS:
"""Fundamental class for HDFS basic manipulation."""
[docs] @staticmethod
def exists(uri):
"""Check existence of hdfs uri
Returns: True on exists
Raises: RuntimeError.
"""
cmd = f"hadoop fs -test -e {uri}"
try:
ret, out, err = run_cmd_with_all_output(cmd)
if ret == 0:
return True
elif ret == 1:
return False
else:
raise RuntimeError(
"Cannot check existence of hdfs uri[{}] "
"with cmd[{}]; ret[{}] stdout[{}] stderr[{}]".format(
uri, cmd, ret, out, err
)
)
except Exception as e:
raise RuntimeError(
"Cannot check existence of hdfs uri[{}] "
"with cmd[{}]".format(uri, cmd)
) from e
[docs] @staticmethod
def remove(uri):
"""Check existence of hdfs uri
Returns: True on exists
Raises: RuntimeError.
"""
cmd = f"hadoop fs -rm -r {uri}"
try:
ret, out, err = run_cmd_with_all_output(cmd)
if ret == 0:
return True
else:
raise RuntimeError(
"Cannot remove hdfs uri[{}] "
"with cmd[{}]; ret[{}] output[{}] stderr[{}]".format(
uri, cmd, ret, out, err
)
)
except Exception as e:
raise RuntimeError(
"Cannot remove hdfs uri[{}] " "with cmd[{}]".format(uri, cmd)
) from e
[docs] @staticmethod
def mkdir(uri):
"""Make new hdfs directory
Returns: True on success
Raises: RuntimeError.
"""
cmd = f"hadoop fs -mkdir -p {uri}"
try:
ret, out, err = run_cmd_with_all_output(cmd)
if ret == 0:
return True
else:
raise RuntimeError(
"Cannot mkdir of hdfs uri[{}] "
"with cmd[{}]; ret[{}] output[{}] stderr[{}]".format(
uri, cmd, ret, out, err
)
)
except Exception as e:
raise RuntimeError(
"Cannot mkdir of hdfs uri[{}] " "with cmd[{}]".format(uri, cmd)
) from e
[docs] @staticmethod
def copy_from_local(local_path, to_uri):
"""Returns: True on success
Raises: on unexpected error.
"""
# Make sure local_path is accessible
if not os.path.exists(local_path) or not os.access(local_path, os.R_OK):
raise RuntimeError(
"try to access local_path[{}] " "but failed".format(local_path)
)
cmd = f"hadoop fs -copyFromLocal -f {local_path} {to_uri}"
try:
ret, out, err = run_cmd_with_all_output(cmd)
if ret == 0:
return True, out
else:
raise RuntimeError(
"Cannot copy local[{}] to remote[{}] with cmd[{}]; "
"ret[{}] output[{}] stderr[{}]".format(
local_path, to_uri, cmd, ret, out, err
)
)
except Exception as e:
raise RuntimeError(
f"Cannot copy local[{local_path}] to remote[{to_uri}] with cmd[{cmd}]"
) from e
[docs] @staticmethod
def copy_to_local(from_uri, local_path):
remote = ""
if isinstance(from_uri, str):
remote = from_uri
elif isinstance(from_uri, list) or isinstance(from_uri, tuple):
remote = " ".join(from_uri)
cmd = f"hadoop fs -copyToLocal {remote} {local_path}"
try:
ret, out, err = run_cmd_with_all_output(cmd)
if ret == 0:
return True
else:
raise RuntimeError(
"Cannot copy remote[{}] to local[{}] with cmd[{}]; "
"ret[{}] output[{}] stderr[{}]".format(
from_uri, local_path, cmd, ret, out, err
)
)
except Exception as e:
raise RuntimeError(
f"Cannot copy remote[{from_uri}] to local[{local_path}] with cmd[{cmd}]"
) from e
[docs] @staticmethod
def read_hdfs_file(uri):
cmd = f"hadoop fs -text {uri}"
try:
ret, out, err = run_cmd_with_all_output(cmd)
if ret == 0:
return out
else:
raise RuntimeError(
"Cannot read text from uri[{}]"
"cmd [{}] ret[{}] output[{}] stderr[{}]".format(
uri, cmd, ret, out, err
)
)
except Exception as e:
raise RuntimeError(
"Cannot read text from uri[{}]" "cmd [{}]".format(uri, cmd)
) from e
[docs] @staticmethod
def move(from_uri, to_uri):
cmd = f"hadoop fs -mv {from_uri} {to_uri}"
try:
ret, out, err = run_cmd_with_all_output(cmd)
if ret == 0:
return True
else:
raise RuntimeError(
"Cannot move from_uri[{}] to "
"to_uri[{}] with cmd[{}]; "
"ret[{}] output[{}] stderr[{}]".format(
from_uri, to_uri, cmd, ret, out, err
)
)
except Exception as e:
raise RuntimeError(
"Cannot move from_uri[{}] to "
"to_uri[{}] with cmd[{}]".format(from_uri, to_uri, cmd)
) from e