771790f6b58930c91b205e444a3f1ffd3555c1a4
1 # Copyright 2019 Canonical
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
15 # For those usages not covered by the Apache License, Version 2.0 please
16 # contact: eduardo.sousa@canonical.com
19 from io
import BytesIO
, StringIO
20 from pymongo
import MongoClient
21 from gridfs
import GridFSBucket
, errors
23 from http
import HTTPStatus
25 from osm_common
.fsbase
import FsBase
, FsException
27 __author__
= "Eduardo Sousa <eduardo.sousa@canonical.com>"
30 class GridByteStream(BytesIO
):
31 def __init__(self
, filename
, fs
, mode
):
32 BytesIO
.__init
__(self
)
34 self
.filename
= filename
40 def __initialize__(self
):
43 cursor
= self
.fs
.find({"filename": self
.filename
})
45 for requested_file
in cursor
:
46 exception_file
= next(cursor
, None)
49 raise FsException("Multiple files found", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
51 if requested_file
.metadata
["type"] == "file":
52 grid_file
= requested_file
54 raise FsException("Type isn't file", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
57 self
._id
= grid_file
._id
58 self
.fs
.download_to_stream(self
._id
, self
)
65 super(GridByteStream
, self
).close()
69 self
.fs
.delete(self
._id
)
71 cursor
= self
.fs
.find({
72 "filename": self
.filename
.split("/")[0],
73 "metadata": {"type": "dir"}})
75 parent_dir
= next(cursor
, None)
78 parent_dir_name
= self
.filename
.split("/")[0]
79 self
.filename
= self
.filename
.replace(parent_dir_name
, parent_dir_name
[:-1], 1)
83 self
.fs
.upload_from_stream_with_id(
87 metadata
={"type": "file"}
90 self
.fs
.upload_from_stream(
93 metadata
={"type": "file"}
95 super(GridByteStream
, self
).close()
100 def __exit__(self
, exc_type
, exc_val
, exc_tb
):
104 class GridStringStream(StringIO
):
105 def __init__(self
, filename
, fs
, mode
):
106 StringIO
.__init
__(self
)
108 self
.filename
= filename
112 self
.__initialize
__()
114 def __initialize__(self
):
117 cursor
= self
.fs
.find({"filename": self
.filename
})
119 for requested_file
in cursor
:
120 exception_file
= next(cursor
, None)
123 raise FsException("Multiple files found", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
125 if requested_file
.metadata
["type"] == "file":
126 grid_file
= requested_file
128 raise FsException("File type isn't file", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
132 self
._id
= grid_file
._id
133 self
.fs
.download_to_stream(self
._id
, stream
)
135 self
.write(stream
.read().decode("utf-8"))
143 super(GridStringStream
, self
).close()
147 self
.fs
.delete(self
._id
)
149 cursor
= self
.fs
.find({
150 "filename": self
.filename
.split("/")[0],
151 "metadata": {"type": "dir"}})
153 parent_dir
= next(cursor
, None)
156 parent_dir_name
= self
.filename
.split("/")[0]
157 self
.filename
= self
.filename
.replace(parent_dir_name
, parent_dir_name
[:-1], 1)
161 stream
.write(self
.read().encode("utf-8"))
164 self
.fs
.upload_from_stream_with_id(
168 metadata
={"type": "file"}
171 self
.fs
.upload_from_stream(
174 metadata
={"type": "file"}
177 super(GridStringStream
, self
).close()
182 def __exit__(self
, exc_type
, exc_val
, exc_tb
):
186 class FsMongo(FsBase
):
188 def __init__(self
, logger_name
='fs', lock
=False):
189 super().__init
__(logger_name
, lock
)
194 def __update_local_fs(self
):
195 dir_cursor
= self
.fs
.find({"metadata.type": "dir"}, no_cursor_timeout
=True)
197 for directory
in dir_cursor
:
198 os
.makedirs(self
.path
+ directory
.filename
, exist_ok
=True)
200 file_cursor
= self
.fs
.find({"metadata.type": "file"}, no_cursor_timeout
=True)
202 for writing_file
in file_cursor
:
203 file_path
= self
.path
+ writing_file
.filename
204 file_stream
= open(file_path
, 'wb+')
205 self
.fs
.download_to_stream(writing_file
._id
, file_stream
)
207 if "permissions" in writing_file
.metadata
:
208 os
.chmod(file_path
, writing_file
.metadata
["permissions"])
210 def get_params(self
):
211 return {"fs": "mongo", "path": self
.path
}
213 def fs_connect(self
, config
):
215 if "logger_name" in config
:
216 self
.logger
= logging
.getLogger(config
["logger_name"])
218 self
.path
= config
["path"]
220 raise FsException("Missing parameter \"path\"")
221 if not self
.path
.endswith("/"):
223 if not os
.path
.exists(self
.path
):
224 raise FsException("Invalid configuration param at '[storage]': path '{}' does not exist".format(
226 elif not os
.access(self
.path
, os
.W_OK
):
227 raise FsException("Invalid configuration param at '[storage]': path '{}' is not writable".format(
229 if all(key
in config
.keys() for key
in ["uri", "collection"]):
230 self
.client
= MongoClient(config
["uri"])
231 self
.fs
= GridFSBucket(self
.client
[config
["collection"]])
232 elif all(key
in config
.keys() for key
in ["host", "port", "collection"]):
233 self
.client
= MongoClient(config
["host"], config
["port"])
234 self
.fs
= GridFSBucket(self
.client
[config
["collection"]])
236 if "collection" not in config
.keys():
237 raise FsException("Missing parameter \"collection\"")
239 raise FsException("Missing parameters: \"uri\" or \"host\" + \"port\"")
242 except Exception as e
: # TODO refine
243 raise FsException(str(e
))
245 def fs_disconnect(self
):
248 def mkdir(self
, folder
):
250 Creates a folder or parent object location
252 :return: None or raises an exception
255 self
.fs
.upload_from_stream(
256 folder
, BytesIO(), metadata
={"type": "dir"})
257 except errors
.FileExists
: # make it idempotent
259 except Exception as e
:
260 raise FsException(str(e
), http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
262 def dir_rename(self
, src
, dst
):
264 Rename one directory name. If dst exist, it replaces (deletes) existing directory
265 :param src: source directory
266 :param dst: destination directory
267 :return: None or raises and exception
270 dst_cursor
= self
.fs
.find(
271 {"filename": {"$regex": "^{}(/|$)".format(dst
)}},
272 no_cursor_timeout
=True)
274 for dst_file
in dst_cursor
:
275 self
.fs
.delete(dst_file
._id
)
277 src_cursor
= self
.fs
.find(
278 {"filename": {"$regex": "^{}(/|$)".format(src
)}},
279 no_cursor_timeout
=True)
281 for src_file
in src_cursor
:
282 self
.fs
.rename(src_file
._id
, src_file
.filename
.replace(src
, dst
, 1))
283 except Exception as e
:
284 raise FsException(str(e
), http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
286 def file_exists(self
, storage
, mode
=None):
288 Indicates if "storage" file exist
289 :param storage: can be a str or a str list
290 :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
293 f
= storage
if isinstance(storage
, str) else "/".join(storage
)
295 cursor
= self
.fs
.find({"filename": f
})
297 for requested_file
in cursor
:
298 exception_file
= next(cursor
, None)
301 raise FsException("Multiple files found", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
303 if requested_file
.metadata
["type"] == mode
:
308 def file_size(self
, storage
):
311 :param storage: can be a str or a str list
314 f
= storage
if isinstance(storage
, str) else "/".join(storage
)
316 cursor
= self
.fs
.find({"filename": f
})
318 for requested_file
in cursor
:
319 exception_file
= next(cursor
, None)
322 raise FsException("Multiple files found", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
324 return requested_file
.length
326 def file_extract(self
, tar_object
, path
):
329 :param tar_object: object of type tar
330 :param path: can be a str or a str list, or a tar object where to extract the tar_object
333 f
= path
if isinstance(path
, str) else "/".join(path
)
335 for member
in tar_object
.getmembers():
337 stream
= tar_object
.extractfile(member
)
342 "type": "file" if member
.isfile() else "dir",
343 "permissions": member
.mode
346 self
.fs
.upload_from_stream(
347 f
+ "/" + member
.name
,
354 def file_open(self
, storage
, mode
):
357 :param storage: can be a str or list of str
358 :param mode: file mode
362 f
= storage
if isinstance(storage
, str) else "/".join(storage
)
365 return GridByteStream(f
, self
.fs
, mode
)
367 return GridStringStream(f
, self
.fs
, mode
)
368 except errors
.NoFile
:
369 raise FsException("File {} does not exist".format(f
), http_code
=HTTPStatus
.NOT_FOUND
)
371 raise FsException("File {} cannot be opened".format(f
), http_code
=HTTPStatus
.BAD_REQUEST
)
373 def dir_ls(self
, storage
):
375 return folder content
376 :param storage: can be a str or list of str
377 :return: folder content
380 f
= storage
if isinstance(storage
, str) else "/".join(storage
)
383 dir_cursor
= self
.fs
.find({"filename": f
})
384 for requested_dir
in dir_cursor
:
385 exception_dir
= next(dir_cursor
, None)
388 raise FsException("Multiple directories found", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
390 if requested_dir
.metadata
["type"] != "dir":
391 raise FsException("File {} does not exist".format(f
), http_code
=HTTPStatus
.NOT_FOUND
)
393 files_cursor
= self
.fs
.find({"filename": {"$regex": "^{}/([^/])*".format(f
)}})
394 for children_file
in files_cursor
:
395 files
+= [children_file
.filename
.replace(f
+ '/', '', 1)]
399 raise FsException("File {} cannot be opened".format(f
), http_code
=HTTPStatus
.BAD_REQUEST
)
401 def file_delete(self
, storage
, ignore_non_exist
=False):
403 Delete storage content recursively
404 :param storage: can be a str or list of str
405 :param ignore_non_exist: not raise exception if storage does not exist
409 f
= storage
if isinstance(storage
, str) else "/".join(storage
)
411 file_cursor
= self
.fs
.find({"filename": f
})
413 for requested_file
in file_cursor
:
415 exception_file
= next(file_cursor
, None)
418 raise FsException("Multiple files found", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
420 if requested_file
.metadata
["type"] == "dir":
421 dir_cursor
= self
.fs
.find({"filename": {"$regex": "^{}".format(f
)}})
423 for tmp
in dir_cursor
:
424 self
.fs
.delete(tmp
._id
)
426 self
.fs
.delete(requested_file
._id
)
427 if not found
and not ignore_non_exist
:
428 raise FsException("File {} does not exist".format(storage
), http_code
=HTTPStatus
.NOT_FOUND
)
430 raise FsException("File {} cannot be deleted: {}".format(f
, e
), http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
434 Sync from FSMongo to local storage
436 self
.__update
_local
_fs
()