X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_nbi%2Ffslocal.py;fp=osm_nbi%2Ffslocal.py;h=0000000000000000000000000000000000000000;hb=a8d63635eecb1c8debab5f674931a453ea39e78d;hp=b7dd8396ad1a147e1a57a03a3bd787a182e19a63;hpb=0ffaa99c277d76ca49fdbe4d4b6b9df4d7484857;p=osm%2FNBI.git diff --git a/osm_nbi/fslocal.py b/osm_nbi/fslocal.py deleted file mode 100644 index b7dd839..0000000 --- a/osm_nbi/fslocal.py +++ /dev/null @@ -1,142 +0,0 @@ -import os -import logging -import tarfile -from http import HTTPStatus -from shutil import rmtree -from fsbase import FsBase, FsException - -__author__ = "Alfonso Tierno " - - -class FsLocal(FsBase): - - def __init__(self, logger_name='fs'): - self.logger = logging.getLogger(logger_name) - self.path = None - - def get_params(self): - return {"fs": "local", "path": self.path} - - def fs_connect(self, config): - try: - if "logger_name" in config: - self.logger = logging.getLogger(config["logger_name"]) - self.path = config["path"] - if not self.path.endswith("/"): - self.path += "/" - if not os.path.exists(self.path): - raise FsException("Invalid configuration param at '[storage]': path '{}' does not exist".format( - config["path"])) - except FsException: - raise - except Exception as e: # TODO refine - raise FsException(str(e)) - - def fs_disconnect(self): - pass # TODO - - def mkdir(self, folder): - """ - Creates a folder or parent object location - :param folder: - :return: None or raises and exception - """ - try: - os.mkdir(self.path + folder) - except Exception as e: - raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR) - - def file_exists(self, storage, mode=None): - """ - Indicates if "storage" file exist - :param storage: can be a str or a str list - :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists - :return: True, False - """ - if isinstance(storage, str): - f = storage - else: - f = "/".join(storage) - if os.path.exists(self.path + f): - if mode == "file" and os.path.isfile(self.path + f): - return True - if mode == "dir" and os.path.isdir(self.path + f): - return True - return False - - def file_size(self, storage): - """ - return file size - :param storage: can be a str or a str list - :return: file size - """ - if isinstance(storage, str): - f = storage - else: - f = "/".join(storage) - return os.path.getsize(self.path + f) - - def file_extract(self, tar_object, path): - """ - extract a tar file - :param tar_object: object of type tar - :param path: can be a str or a str list, or a tar object where to extract the tar_object - :return: None - """ - if isinstance(path, str): - f = self.path + path - else: - f = self.path + "/".join(path) - tar_object.extractall(path=f) - - def file_open(self, storage, mode): - """ - Open a file - :param storage: can be a str or list of str - :param mode: file mode - :return: file object - """ - try: - if isinstance(storage, str): - f = storage - else: - f = "/".join(storage) - return open(self.path + f, mode) - except FileNotFoundError: - raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND) - except IOError: - raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST) - - def dir_ls(self, storage): - """ - return folder content - :param storage: can be a str or list of str - :return: folder content - """ - try: - if isinstance(storage, str): - f = storage - else: - f = "/".join(storage) - return os.listdir(self.path + f) - except NotADirectoryError: - raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND) - except IOError: - raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST) - - def file_delete(self, storage, ignore_non_exist=False): - """ - Delete storage content recursivelly - :param storage: can be a str or list of str - :param ignore_non_exist: not raise exception if storage does not exist - :return: None - """ - - if isinstance(storage, str): - f = self.path + storage - else: - f = self.path + "/".join(storage) - if os.path.exists(f): - rmtree(f) - elif not ignore_non_exist: - raise FsException("File {} does not exist".format(storage), http_code=HTTPStatus.NOT_FOUND)