import os
import logging
+
# import tarfile
from http import HTTPStatus
from shutil import rmtree
class FsLocal(FsBase):
-
- def __init__(self, logger_name='fs', lock=False):
+ def __init__(self, logger_name="fs", lock=False):
super().__init__(logger_name, lock)
self.path = None
if not self.path.endswith("/"):
self.path += "/"
if not os.path.exists(self.path):
- raise FsException("Invalid configuration param at '[storage]': path '{}' does not exist".format(
- config["path"]))
+ raise FsException(
+ "Invalid configuration param at '[storage]': path '{}' does not exist".format(
+ config["path"]
+ )
+ )
except FsException:
raise
except Exception as e: # TODO refine
f = "/".join(storage)
return open(self.path + f, mode)
except FileNotFoundError:
- raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
+ raise FsException(
+ "File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND
+ )
except IOError:
- raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
+ raise FsException(
+ "File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST
+ )
def dir_ls(self, storage):
"""
f = "/".join(storage)
return os.listdir(self.path + f)
except NotADirectoryError:
- raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
+ raise FsException(
+ "File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND
+ )
except IOError:
- raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
+ raise FsException(
+ "File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST
+ )
def file_delete(self, storage, ignore_non_exist=False):
"""
if os.path.exists(f):
rmtree(f)
elif not ignore_non_exist:
- raise FsException("File {} does not exist".format(storage), http_code=HTTPStatus.NOT_FOUND)
+ raise FsException(
+ "File {} does not exist".format(storage),
+ http_code=HTTPStatus.NOT_FOUND,
+ )
except (IOError, PermissionError) as e:
- raise FsException("File {} cannot be deleted: {}".format(f, e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+ raise FsException(
+ "File {} cannot be deleted: {}".format(f, e),
+ http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
+ )
def sync(self, from_path=None):
pass # Not needed in fslocal