blob: 44e7eefb4a2446cb53b1c3c30ab9bce9824a6c98 [file] [log] [blame]
Eduardo Sousa0593aba2019-06-04 12:55:43 +01001# Copyright 2019 Canonical
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14#
15# For those usages not covered by the Apache License, Version 2.0 please
16# contact: eduardo.sousa@canonical.com
17##
18
beierlmff2e8262020-07-08 16:32:50 -040019import errno
Eduardo Sousa0593aba2019-06-04 12:55:43 +010020from http import HTTPStatus
beierlmff2e8262020-07-08 16:32:50 -040021from io import BytesIO, StringIO
22import logging
Eduardo Sousa0593aba2019-06-04 12:55:43 +010023import os
lloretgallegf296d2a2020-09-02 09:36:24 +000024import datetime
beierlmff2e8262020-07-08 16:32:50 -040025
26from gridfs import GridFSBucket, errors
Eduardo Sousa0593aba2019-06-04 12:55:43 +010027from osm_common.fsbase import FsBase, FsException
beierlmff2e8262020-07-08 16:32:50 -040028from pymongo import MongoClient
29
Eduardo Sousa0593aba2019-06-04 12:55:43 +010030
31__author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>"
32
33
34class GridByteStream(BytesIO):
35 def __init__(self, filename, fs, mode):
36 BytesIO.__init__(self)
37 self._id = None
38 self.filename = filename
39 self.fs = fs
40 self.mode = mode
David Garcia7982b782020-05-20 12:09:37 +020041 self.file_type = "file" # Set "file" as default file_type
Eduardo Sousa0593aba2019-06-04 12:55:43 +010042
43 self.__initialize__()
44
45 def __initialize__(self):
46 grid_file = None
47
48 cursor = self.fs.find({"filename": self.filename})
49
50 for requested_file in cursor:
51 exception_file = next(cursor, None)
52
53 if exception_file:
garciadeblas2644b762021-03-24 09:21:01 +010054 raise FsException(
55 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
56 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +010057
sousaedub95cca62020-03-12 11:12:25 +000058 if requested_file.metadata["type"] in ("file", "sym"):
Eduardo Sousa0593aba2019-06-04 12:55:43 +010059 grid_file = requested_file
sousaedub95cca62020-03-12 11:12:25 +000060 self.file_type = requested_file.metadata["type"]
Eduardo Sousa0593aba2019-06-04 12:55:43 +010061 else:
garciadeblas2644b762021-03-24 09:21:01 +010062 raise FsException(
63 "Type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
64 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +010065
66 if grid_file:
67 self._id = grid_file._id
68 self.fs.download_to_stream(self._id, self)
69
70 if "r" in self.mode:
71 self.seek(0, 0)
72
73 def close(self):
74 if "r" in self.mode:
75 super(GridByteStream, self).close()
76 return
77
78 if self._id:
79 self.fs.delete(self._id)
80
garciadeblas2644b762021-03-24 09:21:01 +010081 cursor = self.fs.find(
82 {"filename": self.filename.split("/")[0], "metadata": {"type": "dir"}}
83 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +010084
85 parent_dir = next(cursor, None)
86
87 if not parent_dir:
88 parent_dir_name = self.filename.split("/")[0]
garciadeblas2644b762021-03-24 09:21:01 +010089 self.filename = self.filename.replace(
90 parent_dir_name, parent_dir_name[:-1], 1
91 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +010092
93 self.seek(0, 0)
94 if self._id:
95 self.fs.upload_from_stream_with_id(
garciadeblas2644b762021-03-24 09:21:01 +010096 self._id, self.filename, self, metadata={"type": self.file_type}
Eduardo Sousa0593aba2019-06-04 12:55:43 +010097 )
98 else:
99 self.fs.upload_from_stream(
garciadeblas2644b762021-03-24 09:21:01 +0100100 self.filename, self, metadata={"type": self.file_type}
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100101 )
102 super(GridByteStream, self).close()
103
104 def __enter__(self):
105 return self
106
107 def __exit__(self, exc_type, exc_val, exc_tb):
108 self.close()
109
110
111class GridStringStream(StringIO):
112 def __init__(self, filename, fs, mode):
113 StringIO.__init__(self)
114 self._id = None
115 self.filename = filename
116 self.fs = fs
117 self.mode = mode
David Garcia7982b782020-05-20 12:09:37 +0200118 self.file_type = "file" # Set "file" as default file_type
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100119
120 self.__initialize__()
121
122 def __initialize__(self):
123 grid_file = None
124
125 cursor = self.fs.find({"filename": self.filename})
126
127 for requested_file in cursor:
128 exception_file = next(cursor, None)
129
130 if exception_file:
garciadeblas2644b762021-03-24 09:21:01 +0100131 raise FsException(
132 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
133 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100134
sousaedub95cca62020-03-12 11:12:25 +0000135 if requested_file.metadata["type"] in ("file", "dir"):
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100136 grid_file = requested_file
sousaedub95cca62020-03-12 11:12:25 +0000137 self.file_type = requested_file.metadata["type"]
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100138 else:
garciadeblas2644b762021-03-24 09:21:01 +0100139 raise FsException(
140 "File type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
141 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100142
143 if grid_file:
144 stream = BytesIO()
145 self._id = grid_file._id
146 self.fs.download_to_stream(self._id, stream)
147 stream.seek(0)
148 self.write(stream.read().decode("utf-8"))
149 stream.close()
150
151 if "r" in self.mode:
152 self.seek(0, 0)
153
154 def close(self):
155 if "r" in self.mode:
156 super(GridStringStream, self).close()
157 return
158
159 if self._id:
160 self.fs.delete(self._id)
161
garciadeblas2644b762021-03-24 09:21:01 +0100162 cursor = self.fs.find(
163 {"filename": self.filename.split("/")[0], "metadata": {"type": "dir"}}
164 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100165
166 parent_dir = next(cursor, None)
167
168 if not parent_dir:
169 parent_dir_name = self.filename.split("/")[0]
garciadeblas2644b762021-03-24 09:21:01 +0100170 self.filename = self.filename.replace(
171 parent_dir_name, parent_dir_name[:-1], 1
172 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100173
174 self.seek(0, 0)
175 stream = BytesIO()
176 stream.write(self.read().encode("utf-8"))
177 stream.seek(0, 0)
178 if self._id:
179 self.fs.upload_from_stream_with_id(
garciadeblas2644b762021-03-24 09:21:01 +0100180 self._id, self.filename, stream, metadata={"type": self.file_type}
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100181 )
182 else:
183 self.fs.upload_from_stream(
garciadeblas2644b762021-03-24 09:21:01 +0100184 self.filename, stream, metadata={"type": self.file_type}
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100185 )
186 stream.close()
187 super(GridStringStream, self).close()
188
189 def __enter__(self):
190 return self
191
192 def __exit__(self, exc_type, exc_val, exc_tb):
193 self.close()
194
195
196class FsMongo(FsBase):
garciadeblas2644b762021-03-24 09:21:01 +0100197 def __init__(self, logger_name="fs", lock=False):
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100198 super().__init__(logger_name, lock)
199 self.path = None
200 self.client = None
201 self.fs = None
202
tiernob07e4ef2020-05-06 14:22:48 +0000203 def __update_local_fs(self, from_path=None):
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100204 dir_cursor = self.fs.find({"metadata.type": "dir"}, no_cursor_timeout=True)
205
206 for directory in dir_cursor:
tiernob07e4ef2020-05-06 14:22:48 +0000207 if from_path and not directory.filename.startswith(from_path):
208 continue
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100209 os.makedirs(self.path + directory.filename, exist_ok=True)
210
garciadeblas2644b762021-03-24 09:21:01 +0100211 file_cursor = self.fs.find(
212 {"metadata.type": {"$in": ["file", "sym"]}}, no_cursor_timeout=True
213 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100214
215 for writing_file in file_cursor:
tiernob07e4ef2020-05-06 14:22:48 +0000216 if from_path and not writing_file.filename.startswith(from_path):
217 continue
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100218 file_path = self.path + writing_file.filename
David Garcia8ab6cc62020-06-26 17:04:37 +0200219
220 if writing_file.metadata["type"] == "sym":
221 with BytesIO() as b:
222 self.fs.download_to_stream(writing_file._id, b)
223 b.seek(0)
224 link = b.read().decode("utf-8")
beierlmff2e8262020-07-08 16:32:50 -0400225
226 try:
227 os.remove(file_path)
228 except OSError as e:
229 if e.errno != errno.ENOENT:
230 # This is probably permission denied or worse
231 raise
David Garcia8ab6cc62020-06-26 17:04:37 +0200232 os.symlink(link, file_path)
233 else:
garciadeblas2644b762021-03-24 09:21:01 +0100234 with open(file_path, "wb+") as file_stream:
David Garcia8ab6cc62020-06-26 17:04:37 +0200235 self.fs.download_to_stream(writing_file._id, file_stream)
236 if "permissions" in writing_file.metadata:
sousaedub95cca62020-03-12 11:12:25 +0000237 os.chmod(file_path, writing_file.metadata["permissions"])
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100238
239 def get_params(self):
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100240 return {"fs": "mongo", "path": self.path}
241
242 def fs_connect(self, config):
243 try:
244 if "logger_name" in config:
245 self.logger = logging.getLogger(config["logger_name"])
246 if "path" in config:
247 self.path = config["path"]
248 else:
garciadeblas2644b762021-03-24 09:21:01 +0100249 raise FsException('Missing parameter "path"')
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100250 if not self.path.endswith("/"):
251 self.path += "/"
252 if not os.path.exists(self.path):
garciadeblas2644b762021-03-24 09:21:01 +0100253 raise FsException(
254 "Invalid configuration param at '[storage]': path '{}' does not exist".format(
255 config["path"]
256 )
257 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100258 elif not os.access(self.path, os.W_OK):
garciadeblas2644b762021-03-24 09:21:01 +0100259 raise FsException(
260 "Invalid configuration param at '[storage]': path '{}' is not writable".format(
261 config["path"]
262 )
263 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100264 if all(key in config.keys() for key in ["uri", "collection"]):
265 self.client = MongoClient(config["uri"])
266 self.fs = GridFSBucket(self.client[config["collection"]])
267 elif all(key in config.keys() for key in ["host", "port", "collection"]):
268 self.client = MongoClient(config["host"], config["port"])
269 self.fs = GridFSBucket(self.client[config["collection"]])
270 else:
271 if "collection" not in config.keys():
garciadeblas2644b762021-03-24 09:21:01 +0100272 raise FsException('Missing parameter "collection"')
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100273 else:
garciadeblas2644b762021-03-24 09:21:01 +0100274 raise FsException('Missing parameters: "uri" or "host" + "port"')
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100275 except FsException:
276 raise
277 except Exception as e: # TODO refine
278 raise FsException(str(e))
279
280 def fs_disconnect(self):
281 pass # TODO
282
283 def mkdir(self, folder):
284 """
285 Creates a folder or parent object location
286 :param folder:
287 :return: None or raises an exception
288 """
289 try:
garciadeblas2644b762021-03-24 09:21:01 +0100290 self.fs.upload_from_stream(folder, BytesIO(), metadata={"type": "dir"})
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100291 except errors.FileExists: # make it idempotent
292 pass
293 except Exception as e:
294 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
295
296 def dir_rename(self, src, dst):
297 """
298 Rename one directory name. If dst exist, it replaces (deletes) existing directory
299 :param src: source directory
300 :param dst: destination directory
301 :return: None or raises and exception
302 """
303 try:
304 dst_cursor = self.fs.find(
garciadeblas2644b762021-03-24 09:21:01 +0100305 {"filename": {"$regex": "^{}(/|$)".format(dst)}}, no_cursor_timeout=True
306 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100307
308 for dst_file in dst_cursor:
309 self.fs.delete(dst_file._id)
310
311 src_cursor = self.fs.find(
garciadeblas2644b762021-03-24 09:21:01 +0100312 {"filename": {"$regex": "^{}(/|$)".format(src)}}, no_cursor_timeout=True
313 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100314
315 for src_file in src_cursor:
316 self.fs.rename(src_file._id, src_file.filename.replace(src, dst, 1))
317 except Exception as e:
318 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
319
320 def file_exists(self, storage, mode=None):
321 """
322 Indicates if "storage" file exist
323 :param storage: can be a str or a str list
324 :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
325 :return: True, False
326 """
327 f = storage if isinstance(storage, str) else "/".join(storage)
328
329 cursor = self.fs.find({"filename": f})
330
331 for requested_file in cursor:
332 exception_file = next(cursor, None)
333
334 if exception_file:
garciadeblas2644b762021-03-24 09:21:01 +0100335 raise FsException(
336 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
337 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100338
lloretgallegf296d2a2020-09-02 09:36:24 +0000339 # if no special mode is required just check it does exists
340 if not mode:
341 return True
342
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100343 if requested_file.metadata["type"] == mode:
344 return True
beierlmff2e8262020-07-08 16:32:50 -0400345
sousaedub95cca62020-03-12 11:12:25 +0000346 if requested_file.metadata["type"] == "sym" and mode == "file":
347 return True
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100348
349 return False
350
351 def file_size(self, storage):
352 """
353 return file size
354 :param storage: can be a str or a str list
355 :return: file size
356 """
357 f = storage if isinstance(storage, str) else "/".join(storage)
358
359 cursor = self.fs.find({"filename": f})
360
361 for requested_file in cursor:
362 exception_file = next(cursor, None)
363
364 if exception_file:
garciadeblas2644b762021-03-24 09:21:01 +0100365 raise FsException(
366 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
367 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100368
369 return requested_file.length
370
371 def file_extract(self, tar_object, path):
372 """
373 extract a tar file
374 :param tar_object: object of type tar
375 :param path: can be a str or a str list, or a tar object where to extract the tar_object
376 :return: None
377 """
378 f = path if isinstance(path, str) else "/".join(path)
379
380 for member in tar_object.getmembers():
381 if member.isfile():
382 stream = tar_object.extractfile(member)
David Garcia8ab6cc62020-06-26 17:04:37 +0200383 elif member.issym():
384 stream = BytesIO(member.linkname.encode("utf-8"))
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100385 else:
386 stream = BytesIO()
387
sousaedub95cca62020-03-12 11:12:25 +0000388 if member.isfile():
389 file_type = "file"
390 elif member.issym():
391 file_type = "sym"
392 else:
393 file_type = "dir"
394
garciadeblas2644b762021-03-24 09:21:01 +0100395 metadata = {"type": file_type, "permissions": member.mode}
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100396
garciadeblas2644b762021-03-24 09:21:01 +0100397 self.fs.upload_from_stream(f + "/" + member.name, stream, metadata=metadata)
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100398
399 stream.close()
400
401 def file_open(self, storage, mode):
402 """
403 Open a file
404 :param storage: can be a str or list of str
405 :param mode: file mode
406 :return: file object
407 """
408 try:
409 f = storage if isinstance(storage, str) else "/".join(storage)
410
411 if "b" in mode:
412 return GridByteStream(f, self.fs, mode)
413 else:
414 return GridStringStream(f, self.fs, mode)
415 except errors.NoFile:
garciadeblas2644b762021-03-24 09:21:01 +0100416 raise FsException(
417 "File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND
418 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100419 except IOError:
garciadeblas2644b762021-03-24 09:21:01 +0100420 raise FsException(
421 "File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST
422 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100423
424 def dir_ls(self, storage):
425 """
426 return folder content
427 :param storage: can be a str or list of str
428 :return: folder content
429 """
430 try:
431 f = storage if isinstance(storage, str) else "/".join(storage)
432
433 files = []
434 dir_cursor = self.fs.find({"filename": f})
435 for requested_dir in dir_cursor:
436 exception_dir = next(dir_cursor, None)
437
438 if exception_dir:
garciadeblas2644b762021-03-24 09:21:01 +0100439 raise FsException(
440 "Multiple directories found",
441 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
442 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100443
444 if requested_dir.metadata["type"] != "dir":
garciadeblas2644b762021-03-24 09:21:01 +0100445 raise FsException(
446 "File {} does not exist".format(f),
447 http_code=HTTPStatus.NOT_FOUND,
448 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100449
garciadeblas2644b762021-03-24 09:21:01 +0100450 files_cursor = self.fs.find(
451 {"filename": {"$regex": "^{}/([^/])*".format(f)}}
452 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100453 for children_file in files_cursor:
garciadeblas2644b762021-03-24 09:21:01 +0100454 files += [children_file.filename.replace(f + "/", "", 1)]
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100455
456 return files
457 except IOError:
garciadeblas2644b762021-03-24 09:21:01 +0100458 raise FsException(
459 "File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST
460 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100461
462 def file_delete(self, storage, ignore_non_exist=False):
463 """
464 Delete storage content recursively
465 :param storage: can be a str or list of str
466 :param ignore_non_exist: not raise exception if storage does not exist
467 :return: None
468 """
469 try:
470 f = storage if isinstance(storage, str) else "/".join(storage)
471
472 file_cursor = self.fs.find({"filename": f})
473 found = False
474 for requested_file in file_cursor:
475 found = True
476 exception_file = next(file_cursor, None)
477
478 if exception_file:
garciadeblas2644b762021-03-24 09:21:01 +0100479 raise FsException(
480 "Multiple files found",
481 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
482 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100483
484 if requested_file.metadata["type"] == "dir":
485 dir_cursor = self.fs.find({"filename": {"$regex": "^{}".format(f)}})
486
487 for tmp in dir_cursor:
488 self.fs.delete(tmp._id)
489 else:
490 self.fs.delete(requested_file._id)
491 if not found and not ignore_non_exist:
garciadeblas2644b762021-03-24 09:21:01 +0100492 raise FsException(
493 "File {} does not exist".format(storage),
494 http_code=HTTPStatus.NOT_FOUND,
495 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100496 except IOError as e:
garciadeblas2644b762021-03-24 09:21:01 +0100497 raise FsException(
498 "File {} cannot be deleted: {}".format(f, e),
499 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
500 )
David Garcia788b9d62020-01-20 13:21:06 +0100501
tiernob07e4ef2020-05-06 14:22:48 +0000502 def sync(self, from_path=None):
David Garcia788b9d62020-01-20 13:21:06 +0100503 """
504 Sync from FSMongo to local storage
tiernob07e4ef2020-05-06 14:22:48 +0000505 :param from_path: if supplied, only copy content from this path, not all
506 :return: None
David Garcia788b9d62020-01-20 13:21:06 +0100507 """
lloretgallegf296d2a2020-09-02 09:36:24 +0000508 if from_path:
509 if os.path.isabs(from_path):
510 from_path = os.path.relpath(from_path, self.path)
tiernob07e4ef2020-05-06 14:22:48 +0000511 self.__update_local_fs(from_path=from_path)
lloretgallegf296d2a2020-09-02 09:36:24 +0000512
513 def _update_mongo_fs(self, from_path):
lloretgallegf296d2a2020-09-02 09:36:24 +0000514 os_path = self.path + from_path
515
516 # Obtain list of files and dirs in filesystem
517 members = []
518 for root, dirs, files in os.walk(os_path):
519 for folder in dirs:
garciadeblas2644b762021-03-24 09:21:01 +0100520 member = {"filename": os.path.join(root, folder), "type": "dir"}
lloretgallegf296d2a2020-09-02 09:36:24 +0000521 members.append(member)
522 for file in files:
523 filename = os.path.join(root, file)
524 if os.path.islink(filename):
525 file_type = "sym"
526 else:
527 file_type = "file"
garciadeblas2644b762021-03-24 09:21:01 +0100528 member = {"filename": os.path.join(root, file), "type": file_type}
lloretgallegf296d2a2020-09-02 09:36:24 +0000529 members.append(member)
530
531 # Obtain files in mongo dict
532 remote_files = self._get_mongo_files(from_path)
533
534 # Upload members if they do not exists or have been modified
535 # We will do this for performance (avoid updating unmodified files) and to avoid
536 # updating a file with an older one in case there are two sources for synchronization
537 # in high availability scenarios
538 for member in members:
539 # obtain permission
540 mask = int(oct(os.stat(member["filename"]).st_mode)[-3:], 8)
541
542 # convert to relative path
543 rel_filename = os.path.relpath(member["filename"], self.path)
Gabriel Cuba828249f2023-04-04 01:57:17 -0500544 # get timestamp in UTC because mongo stores upload date in UTC:
545 # https://www.mongodb.com/docs/v4.0/tutorial/model-time-data/#overview
546 last_modified_date = datetime.datetime.utcfromtimestamp(
garciadeblas2644b762021-03-24 09:21:01 +0100547 os.path.getmtime(member["filename"])
548 )
lloretgallegf296d2a2020-09-02 09:36:24 +0000549
550 remote_file = remote_files.get(rel_filename)
garciadeblas2644b762021-03-24 09:21:01 +0100551 upload_date = (
552 remote_file[0].uploadDate if remote_file else datetime.datetime.min
553 )
lloretgallegf296d2a2020-09-02 09:36:24 +0000554 # remove processed files from dict
555 remote_files.pop(rel_filename, None)
556
557 if last_modified_date >= upload_date:
lloretgallegf296d2a2020-09-02 09:36:24 +0000558 stream = None
559 fh = None
560 try:
561 file_type = member["type"]
562 if file_type == "dir":
563 stream = BytesIO()
564 elif file_type == "sym":
garciadeblas2644b762021-03-24 09:21:01 +0100565 stream = BytesIO(
566 os.readlink(member["filename"]).encode("utf-8")
567 )
lloretgallegf296d2a2020-09-02 09:36:24 +0000568 else:
569 fh = open(member["filename"], "rb")
570 stream = BytesIO(fh.read())
571
garciadeblas2644b762021-03-24 09:21:01 +0100572 metadata = {"type": file_type, "permissions": mask}
lloretgallegf296d2a2020-09-02 09:36:24 +0000573
garciadeblas2644b762021-03-24 09:21:01 +0100574 self.fs.upload_from_stream(rel_filename, stream, metadata=metadata)
lloretgallegf296d2a2020-09-02 09:36:24 +0000575
576 # delete old files
577 if remote_file:
578 for file in remote_file:
579 self.fs.delete(file._id)
580 finally:
581 if fh:
582 fh.close()
583 if stream:
584 stream.close()
585
586 # delete files that are not any more in local fs
587 for remote_file in remote_files.values():
588 for file in remote_file:
589 self.fs.delete(file._id)
590
591 def _get_mongo_files(self, from_path=None):
lloretgallegf296d2a2020-09-02 09:36:24 +0000592 file_dict = {}
garciadeblas2644b762021-03-24 09:21:01 +0100593 file_cursor = self.fs.find(no_cursor_timeout=True, sort=[("uploadDate", -1)])
lloretgallegf296d2a2020-09-02 09:36:24 +0000594 for file in file_cursor:
595 if from_path and not file.filename.startswith(from_path):
596 continue
597 if file.filename in file_dict:
598 file_dict[file.filename].append(file)
599 else:
600 file_dict[file.filename] = [file]
601 return file_dict
602
603 def reverse_sync(self, from_path: str):
604 """
605 Sync from local storage to FSMongo
606 :param from_path: base directory to upload content to mongo fs
607 :return: None
608 """
609 if os.path.isabs(from_path):
610 from_path = os.path.relpath(from_path, self.path)
611 self._update_mongo_fs(from_path=from_path)