Source code for dpdispatcher.hdfs_cli

# /usr/bin/python
# -*- encoding=utf-8 -*-

import os
import sys

from dpdispatcher.utils import run_cmd_with_all_output


[docs]class HDFS(object): """Fundamental class for HDFS basic manipulation"""
[docs] @staticmethod def exists(uri): """Check existence of hdfs uri Returns: True on exists Raises: RuntimeError """ cmd = "hadoop fs -test -e {uri}".format(uri=uri) try: ret, out, err = run_cmd_with_all_output(cmd) if ret == 0: return True elif ret == 1: return False else: raise RuntimeError( "Cannot check existence of hdfs uri[{}] " "with cmd[{}]; ret[{}] stdout[{}] stderr[{}]".format( uri, cmd, ret, out, err ) ) except Exception as e: raise RuntimeError( "Cannot check existence of hdfs uri[{}] " "with cmd[{}]".format(uri, cmd) ) from e
[docs] @staticmethod def remove(uri): """Check existence of hdfs uri Returns: True on exists Raises: RuntimeError """ cmd = "hadoop fs -rm -r {uri}".format(uri=uri) try: ret, out, err = run_cmd_with_all_output(cmd) if ret == 0: return True else: raise RuntimeError( "Cannot remove hdfs uri[{}] " "with cmd[{}]; ret[{}] output[{}] stderr[{}]".format( uri, cmd, ret, out, err ) ) except Exception as e: raise RuntimeError( "Cannot remove hdfs uri[{}] " "with cmd[{}]".format(uri, cmd) ) from e
[docs] @staticmethod def mkdir(uri): """Make new hdfs directory Returns: True on success Raises: RuntimeError """ cmd = "hadoop fs -mkdir -p {uri}".format(uri=uri) try: ret, out, err = run_cmd_with_all_output(cmd) if ret == 0: return True else: raise RuntimeError( "Cannot mkdir of hdfs uri[{}] " "with cmd[{}]; ret[{}] output[{}] stderr[{}]".format( uri, cmd, ret, out, err ) ) except Exception as e: raise RuntimeError( "Cannot mkdir of hdfs uri[{}] " "with cmd[{}]".format(uri, cmd) ) from e
[docs] @staticmethod def copy_from_local(local_path, to_uri): """ Returns: True on success Raises: on unexpected error """ # Make sure local_path is accessible if not os.path.exists(local_path) or not os.access(local_path, os.R_OK): raise RuntimeError( "try to access local_path[{}] " "but failed".format(local_path) ) cmd = "hadoop fs -copyFromLocal -f {local} {remote}".format( local=local_path, remote=to_uri ) try: ret, out, err = run_cmd_with_all_output(cmd) if ret == 0: return True, out else: raise RuntimeError( "Cannot copy local[{}] to remote[{}] with cmd[{}]; " "ret[{}] output[{}] stderr[{}]".format( local_path, to_uri, cmd, ret, out, err ) ) except Exception as e: raise RuntimeError( "Cannot copy local[{}] to remote[{}] with cmd[{}]".format( local_path, to_uri, cmd ) ) from e
[docs] @staticmethod def copy_to_local(from_uri, local_path): remote = "" if isinstance(from_uri, str): remote = from_uri elif isinstance(from_uri, list) or isinstance(from_uri, tuple): remote = " ".join(from_uri) cmd = "hadoop fs -copyToLocal {remote} {local}".format( remote=remote, local=local_path ) try: ret, out, err = run_cmd_with_all_output(cmd) if ret == 0: return True else: raise RuntimeError( "Cannot copy remote[{}] to local[{}] with cmd[{}]; " "ret[{}] output[{}] stderr[{}]".format( from_uri, local_path, cmd, ret, out, err ) ) except Exception as e: raise RuntimeError( "Cannot copy remote[{}] to local[{}] with cmd[{}]".format( from_uri, local_path, cmd ) ) from e
[docs] @staticmethod def read_hdfs_file(uri): cmd = "hadoop fs -text {uri}".format(uri=uri) try: ret, out, err = run_cmd_with_all_output(cmd) if ret == 0: return out else: raise RuntimeError( "Cannot read text from uri[{}]" "cmd [{}] ret[{}] output[{}] stderr[{}]".format( uri, cmd, ret, out, err ) ) except Exception as e: raise RuntimeError( "Cannot read text from uri[{}]" "cmd [{}]".format(uri, cmd) ) from e
[docs] @staticmethod def move(from_uri, to_uri): cmd = "hadoop fs -mv {furi} {turi}".format(furi=from_uri, turi=to_uri) try: ret, out, err = run_cmd_with_all_output(cmd) if ret == 0: return True else: raise RuntimeError( "Cannot move from_uri[{}] to " "to_uri[{}] with cmd[{}]; " "ret[{}] output[{}] stderr[{}]".format( from_uri, to_uri, cmd, ret, out, err ) ) except Exception as e: raise RuntimeError( "Cannot move from_uri[{}] to " "to_uri[{}] with cmd[{}]".format(from_uri, to_uri, cmd) ) from e