Code Coverage

Cobertura Coverage Report > osm_common >

fsmongo.py

Trend

File Coverage summary

NameClassesLinesConditionals
fsmongo.py
100%
1/1
37%
142/379
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
fsmongo.py
37%
142/379
N/A

Source

osm_common/fsmongo.py
1 # Copyright 2019 Canonical
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
13 # under the License.
14 #
15 # For those usages not covered by the Apache License, Version 2.0 please
16 # contact: eduardo.sousa@canonical.com
17 ##
18 1 import datetime
19 1 import errno
20 1 from http import HTTPStatus
21 1 from io import BytesIO, StringIO
22 1 import logging
23 1 import os
24 1 import tarfile
25 1 import zipfile
26
27 1 from gridfs import errors, GridFSBucket
28 1 from osm_common.fsbase import FsBase, FsException
29 1 from pymongo import MongoClient
30
31
32 1 __author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>"
33
34
35 1 class GridByteStream(BytesIO):
36 1     def __init__(self, filename, fs, mode):
37 0         BytesIO.__init__(self)
38 0         self._id = None
39 0         self.filename = filename
40 0         self.fs = fs
41 0         self.mode = mode
42 0         self.file_type = "file"  # Set "file" as default file_type
43
44 0         self.__initialize__()
45
46 1     def __initialize__(self):
47 0         grid_file = None
48
49 0         cursor = self.fs.find({"filename": self.filename})
50
51 0         for requested_file in cursor:
52 0             exception_file = next(cursor, None)
53
54 0             if exception_file:
55 0                 raise FsException(
56                     "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
57                 )
58
59 0             if requested_file.metadata["type"] in ("file", "sym"):
60 0                 grid_file = requested_file
61 0                 self.file_type = requested_file.metadata["type"]
62             else:
63 0                 raise FsException(
64                     "Type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
65                 )
66
67 0         if grid_file:
68 0             self._id = grid_file._id
69 0             self.fs.download_to_stream(self._id, self)
70
71 0             if "r" in self.mode:
72 0                 self.seek(0, 0)
73
74 1     def close(self):
75 0         if "r" in self.mode:
76 0             super(GridByteStream, self).close()
77 0             return
78
79 0         if self._id:
80 0             self.fs.delete(self._id)
81
82 0         cursor = self.fs.find(
83             {"filename": self.filename.split("/")[0], "metadata": {"type": "dir"}}
84         )
85
86 0         parent_dir = next(cursor, None)
87
88 0         if not parent_dir:
89 0             parent_dir_name = self.filename.split("/")[0]
90 0             self.filename = self.filename.replace(
91                 parent_dir_name, parent_dir_name[:-1], 1
92             )
93
94 0         self.seek(0, 0)
95 0         if self._id:
96 0             self.fs.upload_from_stream_with_id(
97                 self._id, self.filename, self, metadata={"type": self.file_type}
98             )
99         else:
100 0             self.fs.upload_from_stream(
101                 self.filename, self, metadata={"type": self.file_type}
102             )
103 0         super(GridByteStream, self).close()
104
105 1     def __enter__(self):
106 0         return self
107
108 1     def __exit__(self, exc_type, exc_val, exc_tb):
109 0         self.close()
110
111
112 1 class GridStringStream(StringIO):
113 1     def __init__(self, filename, fs, mode):
114 0         StringIO.__init__(self)
115 0         self._id = None
116 0         self.filename = filename
117 0         self.fs = fs
118 0         self.mode = mode
119 0         self.file_type = "file"  # Set "file" as default file_type
120
121 0         self.__initialize__()
122
123 1     def __initialize__(self):
124 0         grid_file = None
125
126 0         cursor = self.fs.find({"filename": self.filename})
127
128 0         for requested_file in cursor:
129 0             exception_file = next(cursor, None)
130
131 0             if exception_file:
132 0                 raise FsException(
133                     "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
134                 )
135
136 0             if requested_file.metadata["type"] in ("file", "dir"):
137 0                 grid_file = requested_file
138 0                 self.file_type = requested_file.metadata["type"]
139             else:
140 0                 raise FsException(
141                     "File type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
142                 )
143
144 0         if grid_file:
145 0             stream = BytesIO()
146 0             self._id = grid_file._id
147 0             self.fs.download_to_stream(self._id, stream)
148 0             stream.seek(0)
149 0             self.write(stream.read().decode("utf-8"))
150 0             stream.close()
151
152 0             if "r" in self.mode:
153 0                 self.seek(0, 0)
154
155 1     def close(self):
156 0         if "r" in self.mode:
157 0             super(GridStringStream, self).close()
158 0             return
159
160 0         if self._id:
161 0             self.fs.delete(self._id)
162
163 0         cursor = self.fs.find(
164             {"filename": self.filename.split("/")[0], "metadata": {"type": "dir"}}
165         )
166
167 0         parent_dir = next(cursor, None)
168
169 0         if not parent_dir:
170 0             parent_dir_name = self.filename.split("/")[0]
171 0             self.filename = self.filename.replace(
172                 parent_dir_name, parent_dir_name[:-1], 1
173             )
174
175 0         self.seek(0, 0)
176 0         stream = BytesIO()
177 0         stream.write(self.read().encode("utf-8"))
178 0         stream.seek(0, 0)
179 0         if self._id:
180 0             self.fs.upload_from_stream_with_id(
181                 self._id, self.filename, stream, metadata={"type": self.file_type}
182             )
183         else:
184 0             self.fs.upload_from_stream(
185                 self.filename, stream, metadata={"type": self.file_type}
186             )
187 0         stream.close()
188 0         super(GridStringStream, self).close()
189
190 1     def __enter__(self):
191 0         return self
192
193 1     def __exit__(self, exc_type, exc_val, exc_tb):
194 0         self.close()
195
196
197 1 class FsMongo(FsBase):
198 1     def __init__(self, logger_name="fs", lock=False):
199 1         super().__init__(logger_name, lock)
200 1         self.path = None
201 1         self.client = None
202 1         self.fs = None
203
204 1     def __update_local_fs(self, from_path=None):
205 1         dir_cursor = self.fs.find({"metadata.type": "dir"}, no_cursor_timeout=True)
206
207 1         valid_paths = []
208
209 1         for directory in dir_cursor:
210 1             if from_path and not directory.filename.startswith(from_path):
211 0                 continue
212 1             self.logger.debug("Making dir {}".format(self.path + directory.filename))
213 1             os.makedirs(self.path + directory.filename, exist_ok=True)
214 1             valid_paths.append(self.path + directory.filename)
215
216 1         file_cursor = self.fs.find(
217             {"metadata.type": {"$in": ["file", "sym"]}}, no_cursor_timeout=True
218         )
219
220 1         for writing_file in file_cursor:
221 1             if from_path and not writing_file.filename.startswith(from_path):
222 0                 continue
223 1             file_path = self.path + writing_file.filename
224
225 1             if writing_file.metadata["type"] == "sym":
226 1                 with BytesIO() as b:
227 1                     self.fs.download_to_stream(writing_file._id, b)
228 1                     b.seek(0)
229 1                     link = b.read().decode("utf-8")
230
231 1                 try:
232 1                     self.logger.debug("Sync removing {}".format(file_path))
233 1                     os.remove(file_path)
234 1                 except OSError as e:
235 1                     if e.errno != errno.ENOENT:
236                         # This is probably permission denied or worse
237 0                         raise
238 1                 os.symlink(
239                     link, os.path.realpath(os.path.normpath(os.path.abspath(file_path)))
240                 )
241             else:
242 1                 folder = os.path.dirname(file_path)
243 1                 if folder not in valid_paths:
244 0                     self.logger.debug("Sync local directory {}".format(file_path))
245 0                     os.makedirs(folder, exist_ok=True)
246 1                 with open(file_path, "wb+") as file_stream:
247 1                     self.logger.debug("Sync download {}".format(file_path))
248 1                     self.fs.download_to_stream(writing_file._id, file_stream)
249 1                 if "permissions" in writing_file.metadata:
250 1                     os.chmod(file_path, writing_file.metadata["permissions"])
251
252 1     def get_params(self):
253 0         return {"fs": "mongo", "path": self.path}
254
255 1     def fs_connect(self, config):
256 0         try:
257 0             if "logger_name" in config:
258 0                 self.logger = logging.getLogger(config["logger_name"])
259 0             if "path" in config:
260 0                 self.path = config["path"]
261             else:
262 0                 raise FsException('Missing parameter "path"')
263 0             if not self.path.endswith("/"):
264 0                 self.path += "/"
265 0             if not os.path.exists(self.path):
266 0                 raise FsException(
267                     "Invalid configuration param at '[storage]': path '{}' does not exist".format(
268                         config["path"]
269                     )
270                 )
271 0             elif not os.access(self.path, os.W_OK):
272 0                 raise FsException(
273                     "Invalid configuration param at '[storage]': path '{}' is not writable".format(
274                         config["path"]
275                     )
276                 )
277 0             if all(key in config.keys() for key in ["uri", "collection"]):
278 0                 self.client = MongoClient(config["uri"])
279 0                 self.fs = GridFSBucket(self.client[config["collection"]])
280             else:
281 0                 if "collection" not in config.keys():
282 0                     raise FsException('Missing parameter "collection"')
283                 else:
284 0                     raise FsException('Missing parameters: "uri"')
285 0         except FsException:
286 0             raise
287 0         except Exception as e:  # TODO refine
288 0             raise FsException(str(e))
289
290 1     def fs_disconnect(self):
291 0         pass  # TODO
292
293 1     def mkdir(self, folder):
294         """
295         Creates a folder or parent object location
296         :param folder:
297         :return: None or raises an exception
298         """
299 0         folder = folder.rstrip("/")
300 0         try:
301 0             self.fs.upload_from_stream(folder, BytesIO(), metadata={"type": "dir"})
302 0         except errors.FileExists:  # make it idempotent
303 0             pass
304 0         except Exception as e:
305 0             raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
306
307 1     def dir_rename(self, src, dst):
308         """
309         Rename one directory name. If dst exist, it replaces (deletes) existing directory
310         :param src: source directory
311         :param dst: destination directory
312         :return: None or raises and exception
313         """
314 0         dst = dst.rstrip("/")
315 0         src = src.rstrip("/")
316
317 0         try:
318 0             dst_cursor = self.fs.find(
319                 {"filename": {"$regex": "^{}(/|$)".format(dst)}}, no_cursor_timeout=True
320             )
321
322 0             for dst_file in dst_cursor:
323 0                 self.fs.delete(dst_file._id)
324
325 0             src_cursor = self.fs.find(
326                 {"filename": {"$regex": "^{}(/|$)".format(src)}}, no_cursor_timeout=True
327             )
328
329 0             for src_file in src_cursor:
330 0                 self.fs.rename(src_file._id, src_file.filename.replace(src, dst, 1))
331 0         except Exception as e:
332 0             raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
333
334 1     def file_exists(self, storage, mode=None):
335         """
336         Indicates if "storage" file exist
337         :param storage: can be a str or a str list
338         :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
339         :return: True, False
340         """
341 0         f = storage if isinstance(storage, str) else "/".join(storage)
342 0         f = f.rstrip("/")
343
344 0         cursor = self.fs.find({"filename": f})
345
346 0         for requested_file in cursor:
347 0             exception_file = next(cursor, None)
348
349 0             if exception_file:
350 0                 raise FsException(
351                     "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
352                 )
353
354 0             self.logger.debug("Entry {} metadata {}".format(f, requested_file.metadata))
355
356             # if no special mode is required just check it does exists
357 0             if not mode:
358 0                 return True
359
360 0             if requested_file.metadata["type"] == mode:
361 0                 return True
362
363 0             if requested_file.metadata["type"] == "sym" and mode == "file":
364 0                 return True
365
366 0         return False
367
368 1     def file_size(self, storage):
369         """
370         return file size
371         :param storage: can be a str or a str list
372         :return: file size
373         """
374 0         f = storage if isinstance(storage, str) else "/".join(storage)
375 0         f = f.rstrip("/")
376
377 0         cursor = self.fs.find({"filename": f})
378
379 0         for requested_file in cursor:
380 0             exception_file = next(cursor, None)
381
382 0             if exception_file:
383 0                 raise FsException(
384                     "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
385                 )
386
387 0             return requested_file.length
388
389 1     def file_extract(self, compressed_object, path):
390         """
391         extract a tar file
392         :param compressed_object: object of type tar or zip
393         :param path: can be a str or a str list, or a tar object where to extract the tar_object
394         :return: None
395         """
396 1         f = path if isinstance(path, str) else "/".join(path)
397 1         f = f.rstrip("/")
398
399 1         if type(compressed_object) is tarfile.TarFile:
400 1             for member in compressed_object.getmembers():
401 1                 if member.isfile():
402 1                     stream = compressed_object.extractfile(member)
403 1                 elif member.issym():
404 1                     stream = BytesIO(member.linkname.encode("utf-8"))
405                 else:
406 1                     stream = BytesIO()
407
408 1                 if member.isfile():
409 1                     file_type = "file"
410 1                 elif member.issym():
411 1                     file_type = "sym"
412                 else:
413 1                     file_type = "dir"
414
415 1                 metadata = {"type": file_type, "permissions": member.mode}
416 1                 member.name = member.name.rstrip("/")
417
418 1                 self.logger.debug("Uploading {}/{}".format(f, member.name))
419 1                 self.fs.upload_from_stream(
420                     f + "/" + member.name, stream, metadata=metadata
421                 )
422
423 1                 stream.close()
424 0         elif type(compressed_object) is zipfile.ZipFile:
425 0             for member in compressed_object.infolist():
426 0                 if member.is_dir():
427 0                     stream = BytesIO()
428                 else:
429 0                     stream = compressed_object.read(member)
430
431 0                 if member.is_dir():
432 0                     file_type = "dir"
433                 else:
434 0                     file_type = "file"
435
436 0                 metadata = {"type": file_type}
437 0                 member.filename = member.filename.rstrip("/")
438
439 0                 self.logger.debug("Uploading {}/{}".format(f, member.filename))
440 0                 self.fs.upload_from_stream(
441                     f + "/" + member.filename, stream, metadata=metadata
442                 )
443
444 0                 if member.is_dir():
445 0                     stream.close()
446
447 1     def file_open(self, storage, mode):
448         """
449         Open a file
450         :param storage: can be a str or list of str
451         :param mode: file mode
452         :return: file object
453         """
454 0         try:
455 0             f = storage if isinstance(storage, str) else "/".join(storage)
456 0             f = f.rstrip("/")
457
458 0             if "b" in mode:
459 0                 return GridByteStream(f, self.fs, mode)
460             else:
461 0                 return GridStringStream(f, self.fs, mode)
462 0         except errors.NoFile:
463 0             raise FsException(
464                 "File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND
465             )
466 0         except IOError:
467 0             raise FsException(
468                 "File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST
469             )
470
471 1     def dir_ls(self, storage):
472         """
473         return folder content
474         :param storage: can be a str or list of str
475         :return: folder content
476         """
477 0         try:
478 0             f = storage if isinstance(storage, str) else "/".join(storage)
479 0             f = f.rstrip("/")
480
481 0             files = []
482 0             dir_cursor = self.fs.find({"filename": f})
483 0             for requested_dir in dir_cursor:
484 0                 exception_dir = next(dir_cursor, None)
485
486 0                 if exception_dir:
487 0                     raise FsException(
488                         "Multiple directories found",
489                         http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
490                     )
491
492 0                 if requested_dir.metadata["type"] != "dir":
493 0                     raise FsException(
494                         "File {} does not exist".format(f),
495                         http_code=HTTPStatus.NOT_FOUND,
496                     )
497
498 0                 if f.endswith("/"):
499 0                     f = f[:-1]
500
501 0                 files_cursor = self.fs.find(
502                     {"filename": {"$regex": "^{}/([^/])*".format(f)}}
503                 )
504 0                 for children_file in files_cursor:
505 0                     files += [children_file.filename.replace(f + "/", "", 1)]
506
507 0             return files
508 0         except IOError:
509 0             raise FsException(
510                 "File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST
511             )
512
513 1     def file_delete(self, storage, ignore_non_exist=False):
514         """
515         Delete storage content recursively
516         :param storage: can be a str or list of str
517         :param ignore_non_exist: not raise exception if storage does not exist
518         :return: None
519         """
520 0         try:
521 0             f = storage if isinstance(storage, str) else "/".join(storage)
522 0             f = f.rstrip("/")
523
524 0             file_cursor = self.fs.find({"filename": f})
525 0             found = False
526 0             for requested_file in file_cursor:
527 0                 found = True
528 0                 exception_file = next(file_cursor, None)
529
530 0                 if exception_file:
531 0                     self.logger.error(
532                         "Cannot delete duplicate file: {} and {}".format(
533                             requested_file.filename, exception_file.filename
534                         )
535                     )
536 0                     raise FsException(
537                         "Multiple files found",
538                         http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
539                     )
540
541 0                 if requested_file.metadata["type"] == "dir":
542 0                     dir_cursor = self.fs.find(
543                         {"filename": {"$regex": "^{}/".format(f)}}
544                     )
545
546 0                     for tmp in dir_cursor:
547 0                         self.logger.debug("Deleting {}".format(tmp.filename))
548 0                         self.fs.delete(tmp._id)
549
550 0                 self.logger.debug("Deleting {}".format(requested_file.filename))
551 0                 self.fs.delete(requested_file._id)
552 0             if not found and not ignore_non_exist:
553 0                 raise FsException(
554                     "File {} does not exist".format(storage),
555                     http_code=HTTPStatus.NOT_FOUND,
556                 )
557 0         except IOError as e:
558 0             raise FsException(
559                 "File {} cannot be deleted: {}".format(f, e),
560                 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
561             )
562
563 1     def sync(self, from_path=None):
564         """
565         Sync from FSMongo to local storage
566         :param from_path: if supplied, only copy content from this path, not all
567         :return: None
568         """
569 1         if from_path:
570 0             if os.path.isabs(from_path):
571 0                 from_path = os.path.relpath(from_path, self.path)
572 1         self.__update_local_fs(from_path=from_path)
573
574 1     def _update_mongo_fs(self, from_path):
575 1         os_path = self.path + from_path
576         # Obtain list of files and dirs in filesystem
577 1         members = []
578 1         for root, dirs, files in os.walk(os_path):
579 1             for folder in dirs:
580 1                 member = {"filename": os.path.join(root, folder), "type": "dir"}
581 1                 if os.path.islink(member["filename"]):
582 0                     member["type"] = "sym"
583 1                 members.append(member)
584 1             for file in files:
585 1                 filename = os.path.join(root, file)
586 1                 if os.path.islink(filename):
587 0                     file_type = "sym"
588                 else:
589 1                     file_type = "file"
590 1                 member = {"filename": os.path.join(root, file), "type": file_type}
591 1                 members.append(member)
592
593         # Obtain files in mongo dict
594 1         remote_files = self._get_mongo_files(from_path)
595
596         # Upload members if they do not exists or have been modified
597         # We will do this for performance (avoid updating unmodified files) and to avoid
598         # updating a file with an older one in case there are two sources for synchronization
599         # in high availability scenarios
600 1         for member in members:
601             # obtain permission
602 1             mask = int(oct(os.stat(member["filename"]).st_mode)[-3:], 8)
603
604             # convert to relative path
605 1             rel_filename = os.path.relpath(member["filename"], self.path)
606             # get timestamp in UTC because mongo stores upload date in UTC:
607             # https://www.mongodb.com/docs/v4.0/tutorial/model-time-data/#overview
608 1             last_modified_date = datetime.datetime.utcfromtimestamp(
609                 os.path.getmtime(member["filename"])
610             )
611
612 1             remote_file = remote_files.get(rel_filename)
613 1             upload_date = (
614                 remote_file[0].uploadDate if remote_file else datetime.datetime.min
615             )
616             # remove processed files from dict
617 1             remote_files.pop(rel_filename, None)
618
619 1             if last_modified_date >= upload_date:
620 1                 stream = None
621 1                 fh = None
622 1                 try:
623 1                     file_type = member["type"]
624 1                     if file_type == "dir":
625 1                         stream = BytesIO()
626 1                     elif file_type == "sym":
627 0                         stream = BytesIO(
628                             os.readlink(member["filename"]).encode("utf-8")
629                         )
630                     else:
631 1                         fh = open(member["filename"], "rb")
632 1                         stream = BytesIO(fh.read())
633
634 1                     metadata = {"type": file_type, "permissions": mask}
635
636 1                     self.logger.debug("Sync upload {}".format(rel_filename))
637 1                     self.fs.upload_from_stream(rel_filename, stream, metadata=metadata)
638
639                     # delete old files
640 1                     if remote_file:
641 0                         for file in remote_file:
642 0                             self.logger.debug("Sync deleting {}".format(file.filename))
643 0                             self.fs.delete(file._id)
644                 finally:
645 1                     if fh:
646 1                         fh.close()
647 1                     if stream:
648 1                         stream.close()
649
650         # delete files that are not anymore in local fs
651 1         for remote_file in remote_files.values():
652 0             for file in remote_file:
653 0                 self.fs.delete(file._id)
654
655 1     def _get_mongo_files(self, from_path=None):
656 1         file_dict = {}
657 1         file_cursor = self.fs.find(no_cursor_timeout=True, sort=[("uploadDate", -1)])
658 1         for file in file_cursor:
659 0             if from_path and not file.filename.startswith(from_path):
660 0                 continue
661 0             if file.filename in file_dict:
662 0                 file_dict[file.filename].append(file)
663             else:
664 0                 file_dict[file.filename] = [file]
665 1         return file_dict
666
667 1     def reverse_sync(self, from_path: str):
668         """
669         Sync from local storage to FSMongo
670         :param from_path: base directory to upload content to mongo fs
671         :return: None
672         """
673 1         if os.path.isabs(from_path):
674 0             from_path = os.path.relpath(from_path, self.path)
675 1         self._update_mongo_fs(from_path=from_path)