4 from http
import HTTPStatus
5 from shutil
import rmtree
6 from osm_common
.fsbase
import FsBase
, FsException
8 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
11 class FsLocal(FsBase
):
13 def __init__(self
, logger_name
='fs'):
14 self
.logger
= logging
.getLogger(logger_name
)
18 return {"fs": "local", "path": self
.path
}
20 def fs_connect(self
, config
):
22 if "logger_name" in config
:
23 self
.logger
= logging
.getLogger(config
["logger_name"])
24 self
.path
= config
["path"]
25 if not self
.path
.endswith("/"):
27 if not os
.path
.exists(self
.path
):
28 raise FsException("Invalid configuration param at '[storage]': path '{}' does not exist".format(
32 except Exception as e
: # TODO refine
33 raise FsException(str(e
))
35 def fs_disconnect(self
):
38 def mkdir(self
, folder
):
40 Creates a folder or parent object location
42 :return: None or raises and exception
45 os
.mkdir(self
.path
+ folder
)
46 except Exception as e
:
47 raise FsException(str(e
), http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
49 def file_exists(self
, storage
, mode
=None):
51 Indicates if "storage" file exist
52 :param storage: can be a str or a str list
53 :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
56 if isinstance(storage
, str):
60 if os
.path
.exists(self
.path
+ f
):
61 if mode
== "file" and os
.path
.isfile(self
.path
+ f
):
63 if mode
== "dir" and os
.path
.isdir(self
.path
+ f
):
67 def file_size(self
, storage
):
70 :param storage: can be a str or a str list
73 if isinstance(storage
, str):
77 return os
.path
.getsize(self
.path
+ f
)
79 def file_extract(self
, tar_object
, path
):
82 :param tar_object: object of type tar
83 :param path: can be a str or a str list, or a tar object where to extract the tar_object
86 if isinstance(path
, str):
89 f
= self
.path
+ "/".join(path
)
90 tar_object
.extractall(path
=f
)
92 def file_open(self
, storage
, mode
):
95 :param storage: can be a str or list of str
96 :param mode: file mode
100 if isinstance(storage
, str):
103 f
= "/".join(storage
)
104 return open(self
.path
+ f
, mode
)
105 except FileNotFoundError
:
106 raise FsException("File {} does not exist".format(f
), http_code
=HTTPStatus
.NOT_FOUND
)
108 raise FsException("File {} cannot be opened".format(f
), http_code
=HTTPStatus
.BAD_REQUEST
)
110 def dir_ls(self
, storage
):
112 return folder content
113 :param storage: can be a str or list of str
114 :return: folder content
117 if isinstance(storage
, str):
120 f
= "/".join(storage
)
121 return os
.listdir(self
.path
+ f
)
122 except NotADirectoryError
:
123 raise FsException("File {} does not exist".format(f
), http_code
=HTTPStatus
.NOT_FOUND
)
125 raise FsException("File {} cannot be opened".format(f
), http_code
=HTTPStatus
.BAD_REQUEST
)
127 def file_delete(self
, storage
, ignore_non_exist
=False):
129 Delete storage content recursivelly
130 :param storage: can be a str or list of str
131 :param ignore_non_exist: not raise exception if storage does not exist
135 if isinstance(storage
, str):
136 f
= self
.path
+ storage
138 f
= self
.path
+ "/".join(storage
)
139 if os
.path
.exists(f
):
141 elif not ignore_non_exist
:
142 raise FsException("File {} does not exist".format(storage
), http_code
=HTTPStatus
.NOT_FOUND
)