Code Coverage

Cobertura Coverage Report > osm_common >

fsmongo.py

Trend

Classes100%
 
Lines37%
   
Conditionals100%
 

File Coverage summary

NameClassesLinesConditionals
fsmongo.py
100%
1/1
37%
142/379
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
fsmongo.py
37%
142/379
N/A

Source

osm_common/fsmongo.py
1 # Copyright 2019 Canonical
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
13 # under the License.
14 #
15 # For those usages not covered by the Apache License, Version 2.0 please
16 # contact: eduardo.sousa@canonical.com
17 ##
18 1 import datetime
19 1 import errno
20 1 from http import HTTPStatus
21 1 from io import BytesIO, StringIO
22 1 import logging
23 1 import os
24 1 import tarfile
25 1 import zipfile
26
27 1 from gridfs import errors, GridFSBucket
28 1 from osm_common.fsbase import FsBase, FsException
29 1 from pymongo import MongoClient
30
31
32 1 __author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>"
33
34
35 1 class GridByteStream(BytesIO):
36 1     def __init__(self, filename, fs, mode):
37 0         BytesIO.__init__(self)
38 0         self._id = None
39 0         self.filename = filename
40 0         self.fs = fs
41 0         self.mode = mode
42 0         self.file_type = "file"  # Set "file" as default file_type
43
44 0         self.__initialize__()
45
46 1     def __initialize__(self):
47 0         grid_file = None
48
49 0         cursor = self.fs.find({"filename": self.filename})
50
51 0         for requested_file in cursor:
52 0             exception_file = next(cursor, None)
53
54 0             if exception_file:
55 0                 raise FsException(
56                     "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
57                 )
58
59 0             if requested_file.metadata["type"] in ("file", "sym"):
60 0                 grid_file = requested_file
61 0                 self.file_type = requested_file.metadata["type"]
62             else:
63 0                 raise FsException(
64                     "Type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
65                 )
66
67 0         if grid_file:
68 0             self._id = grid_file._id
69 0             self.fs.download_to_stream(self._id, self)
70
71 0             if "r" in self.mode:
72 0                 self.seek(0, 0)
73
74 1     def close(self):
75 0         if "r" in self.mode:
76 0             super(GridByteStream, self).close()
77 0             return
78
79 0         if self._id:
80 0             self.fs.delete(self._id)
81
82 0         cursor = self.fs.find(
83             {"filename": self.filename.split("/")[0], "metadata": {"type": "dir"}}
84         )
85
86 0         parent_dir = next(cursor, None)
87
88 0         if not parent_dir:
89 0             parent_dir_name = self.filename.split("/")[0]
90 0             self.filename = self.filename.replace(
91                 parent_dir_name, parent_dir_name[:-1], 1
92             )
93
94 0         self.seek(0, 0)
95 0         if self._id:
96 0             self.fs.upload_from_stream_with_id(
97                 self._id, self.filename, self, metadata={"type": self.file_type}
98             )
99         else:
100 0             self.fs.upload_from_stream(
101                 self.filename, self, metadata={"type": self.file_type}
102             )
103 0         super(GridByteStream, self).close()
104
105 1     def __enter__(self):
106 0         return self
107
108 1     def __exit__(self, exc_type, exc_val, exc_tb):
109 0         self.close()
110
111
112 1 class GridStringStream(StringIO):
113 1     def __init__(self, filename, fs, mode):
114 0         StringIO.__init__(self)
115 0         self._id = None
116 0         self.filename = filename
117 0         self.fs = fs
118 0         self.mode = mode
119 0         self.file_type = "file"  # Set "file" as default file_type
120
121 0         self.__initialize__()
122
123 1     def __initialize__(self):
124 0         grid_file = None
125
126 0         cursor = self.fs.find({"filename": self.filename})
127
128 0         for requested_file in cursor:
129 0             exception_file = next(cursor, None)
130
131 0             if exception_file:
132 0                 raise FsException(
133                     "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
134                 )
135
136 0             if requested_file.metadata["type"] in ("file", "dir"):
137 0                 grid_file = requested_file
138 0                 self.file_type = requested_file.metadata["type"]
139             else:
140 0                 raise FsException(
141                     "File type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
142                 )
143
144 0         if grid_file:
145 0             stream = BytesIO()
146 0             self._id = grid_file._id
147 0             self.fs.download_to_stream(self._id, stream)
148 0             stream.seek(0)
149 0             self.write(stream.read().decode("utf-8"))
150 0             stream.close()
151
152 0             if "r" in self.mode:
153 0                 self.seek(0, 0)
154
155 1     def close(self):
156 0         if "r" in self.mode:
157 0             super(GridStringStream, self).close()
158 0             return
159
160 0         if self._id:
161 0             self.fs.delete(self._id)
162
163 0         cursor = self.fs.find(
164             {"filename": self.filename.split("/")[0], "metadata": {"type": "dir"}}
165         )
166
167 0         parent_dir = next(cursor, None)
168
169 0         if not parent_dir:
170 0             parent_dir_name = self.filename.split("/")[0]
171 0             self.filename = self.filename.replace(
172                 parent_dir_name, parent_dir_name[:-1], 1
173             )
174
175 0         self.seek(0, 0)
176 0         stream = BytesIO()
177 0         stream.write(self.read().encode("utf-8"))
178 0         stream.seek(0, 0)
179 0         if self._id:
180 0             self.fs.upload_from_stream_with_id(
181                 self._id, self.filename, stream, metadata={"type": self.file_type}
182             )
183         else:
184 0             self.fs.upload_from_stream(
185                 self.filename, stream, metadata={"type": self.file_type}
186             )
187 0         stream.close()
188 0         super(GridStringStream, self).close()
189
190 1     def __enter__(self):
191 0         return self
192
193 1     def __exit__(self, exc_type, exc_val, exc_tb):
194 0         self.close()
195
196
197 1 class FsMongo(FsBase):
198 1     def __init__(self, logger_name="fs", lock=False):
199 1         super().__init__(logger_name, lock)
200 1         self.path = None
201 1         self.client = None
202 1         self.fs = None
203
204 1     def __update_local_fs(self, from_path=None):
205 1         dir_cursor = self.fs.find({"metadata.type": "dir"}, no_cursor_timeout=True)
206
207 1         valid_paths = []
208
209 1         for directory in dir_cursor:
210 1             if from_path and not directory.filename.startswith(from_path):
211 0                 continue
212 1             self.logger.debug("Making dir {}".format(self.path + directory.filename))
213 1             os.makedirs(self.path + directory.filename, exist_ok=True)
214 1             valid_paths.append(self.path + directory.filename)
215
216 1         file_cursor = self.fs.find(
217             {"metadata.type": {"$in": ["file", "sym"]}}, no_cursor_timeout=True
218         )
219
220 1         for writing_file in file_cursor:
221 1             if from_path and not writing_file.filename.startswith(from_path):
222 0                 continue
223 1             file_path = self.path + writing_file.filename
224
225 1             if writing_file.metadata["type"] == "sym":
226 1                 with BytesIO() as b:
227 1                     self.fs.download_to_stream(writing_file._id, b)
228 1                     b.seek(0)
229 1                     link = b.read().decode("utf-8")
230
231 1                 try:
232 1                     self.logger.debug("Sync removing {}".format(file_path))
233 1                     os.remove(file_path)
234 1                 except OSError as e:
235 1                     if e.errno != errno.ENOENT:
236                         # This is probably permission denied or worse
237 0                         raise
238 1                 os.symlink(link, file_path)
239             else:
240 1                 folder = os.path.dirname(file_path)
241 1                 if folder not in valid_paths:
242 0                     self.logger.debug("Sync local directory {}".format(file_path))
243 0                     os.makedirs(folder, exist_ok=True)
244 1                 with open(file_path, "wb+") as file_stream:
245 1                     self.logger.debug("Sync download {}".format(file_path))
246 1                     self.fs.download_to_stream(writing_file._id, file_stream)
247 1                 if "permissions" in writing_file.metadata:
248 1                     os.chmod(file_path, writing_file.metadata["permissions"])
249
250 1     def get_params(self):
251 0         return {"fs": "mongo", "path": self.path}
252
253 1     def fs_connect(self, config):
254 0         try:
255 0             if "logger_name" in config:
256 0                 self.logger = logging.getLogger(config["logger_name"])
257 0             if "path" in config:
258 0                 self.path = config["path"]
259             else:
260 0                 raise FsException('Missing parameter "path"')
261 0             if not self.path.endswith("/"):
262 0                 self.path += "/"
263 0             if not os.path.exists(self.path):
264 0                 raise FsException(
265                     "Invalid configuration param at '[storage]': path '{}' does not exist".format(
266                         config["path"]
267                     )
268                 )
269 0             elif not os.access(self.path, os.W_OK):
270 0                 raise FsException(
271                     "Invalid configuration param at '[storage]': path '{}' is not writable".format(
272                         config["path"]
273                     )
274                 )
275 0             if all(key in config.keys() for key in ["uri", "collection"]):
276 0                 self.client = MongoClient(config["uri"])
277 0                 self.fs = GridFSBucket(self.client[config["collection"]])
278             else:
279 0                 if "collection" not in config.keys():
280 0                     raise FsException('Missing parameter "collection"')
281                 else:
282 0                     raise FsException('Missing parameters: "uri"')
283 0         except FsException:
284 0             raise
285 0         except Exception as e:  # TODO refine
286 0             raise FsException(str(e))
287
288 1     def fs_disconnect(self):
289 0         pass  # TODO
290
291 1     def mkdir(self, folder):
292         """
293         Creates a folder or parent object location
294         :param folder:
295         :return: None or raises an exception
296         """
297 0         folder = folder.rstrip("/")
298 0         try:
299 0             self.fs.upload_from_stream(folder, BytesIO(), metadata={"type": "dir"})
300 0         except errors.FileExists:  # make it idempotent
301 0             pass
302 0         except Exception as e:
303 0             raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
304
305 1     def dir_rename(self, src, dst):
306         """
307         Rename one directory name. If dst exist, it replaces (deletes) existing directory
308         :param src: source directory
309         :param dst: destination directory
310         :return: None or raises and exception
311         """
312 0         dst = dst.rstrip("/")
313 0         src = src.rstrip("/")
314
315 0         try:
316 0             dst_cursor = self.fs.find(
317                 {"filename": {"$regex": "^{}(/|$)".format(dst)}}, no_cursor_timeout=True
318             )
319
320 0             for dst_file in dst_cursor:
321 0                 self.fs.delete(dst_file._id)
322
323 0             src_cursor = self.fs.find(
324                 {"filename": {"$regex": "^{}(/|$)".format(src)}}, no_cursor_timeout=True
325             )
326
327 0             for src_file in src_cursor:
328 0                 self.fs.rename(src_file._id, src_file.filename.replace(src, dst, 1))
329 0         except Exception as e:
330 0             raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
331
332 1     def file_exists(self, storage, mode=None):
333         """
334         Indicates if "storage" file exist
335         :param storage: can be a str or a str list
336         :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
337         :return: True, False
338         """
339 0         f = storage if isinstance(storage, str) else "/".join(storage)
340 0         f = f.rstrip("/")
341
342 0         cursor = self.fs.find({"filename": f})
343
344 0         for requested_file in cursor:
345 0             exception_file = next(cursor, None)
346
347 0             if exception_file:
348 0                 raise FsException(
349                     "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
350                 )
351
352 0             self.logger.debug("Entry {} metadata {}".format(f, requested_file.metadata))
353
354             # if no special mode is required just check it does exists
355 0             if not mode:
356 0                 return True
357
358 0             if requested_file.metadata["type"] == mode:
359 0                 return True
360
361 0             if requested_file.metadata["type"] == "sym" and mode == "file":
362 0                 return True
363
364 0         return False
365
366 1     def file_size(self, storage):
367         """
368         return file size
369         :param storage: can be a str or a str list
370         :return: file size
371         """
372 0         f = storage if isinstance(storage, str) else "/".join(storage)
373 0         f = f.rstrip("/")
374
375 0         cursor = self.fs.find({"filename": f})
376
377 0         for requested_file in cursor:
378 0             exception_file = next(cursor, None)
379
380 0             if exception_file:
381 0                 raise FsException(
382                     "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
383                 )
384
385 0             return requested_file.length
386
387 1     def file_extract(self, compressed_object, path):
388         """
389         extract a tar file
390         :param compressed_object: object of type tar or zip
391         :param path: can be a str or a str list, or a tar object where to extract the tar_object
392         :return: None
393         """
394 1         f = path if isinstance(path, str) else "/".join(path)
395 1         f = f.rstrip("/")
396
397 1         if type(compressed_object) is tarfile.TarFile:
398 1             for member in compressed_object.getmembers():
399 1                 if member.isfile():
400 1                     stream = compressed_object.extractfile(member)
401 1                 elif member.issym():
402 1                     stream = BytesIO(member.linkname.encode("utf-8"))
403                 else:
404 1                     stream = BytesIO()
405
406 1                 if member.isfile():
407 1                     file_type = "file"
408 1                 elif member.issym():
409 1                     file_type = "sym"
410                 else:
411 1                     file_type = "dir"
412
413 1                 metadata = {"type": file_type, "permissions": member.mode}
414 1                 member.name = member.name.rstrip("/")
415
416 1                 self.logger.debug("Uploading {}/{}".format(f, member.name))
417 1                 self.fs.upload_from_stream(
418                     f + "/" + member.name, stream, metadata=metadata
419                 )
420
421 1                 stream.close()
422 0         elif type(compressed_object) is zipfile.ZipFile:
423 0             for member in compressed_object.infolist():
424 0                 if member.is_dir():
425 0                     stream = BytesIO()
426                 else:
427 0                     stream = compressed_object.read(member)
428
429 0                 if member.is_dir():
430 0                     file_type = "dir"
431                 else:
432 0                     file_type = "file"
433
434 0                 metadata = {"type": file_type}
435 0                 member.filename = member.filename.rstrip("/")
436
437 0                 self.logger.debug("Uploading {}/{}".format(f, member.filename))
438 0                 self.fs.upload_from_stream(
439                     f + "/" + member.filename, stream, metadata=metadata
440                 )
441
442 0                 if member.is_dir():
443 0                     stream.close()
444
445 1     def file_open(self, storage, mode):
446         """
447         Open a file
448         :param storage: can be a str or list of str
449         :param mode: file mode
450         :return: file object
451         """
452 0         try:
453 0             f = storage if isinstance(storage, str) else "/".join(storage)
454 0             f = f.rstrip("/")
455
456 0             if "b" in mode:
457 0                 return GridByteStream(f, self.fs, mode)
458             else:
459 0                 return GridStringStream(f, self.fs, mode)
460 0         except errors.NoFile:
461 0             raise FsException(
462                 "File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND
463             )
464 0         except IOError:
465 0             raise FsException(
466                 "File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST
467             )
468
469 1     def dir_ls(self, storage):
470         """
471         return folder content
472         :param storage: can be a str or list of str
473         :return: folder content
474         """
475 0         try:
476 0             f = storage if isinstance(storage, str) else "/".join(storage)
477 0             f = f.rstrip("/")
478
479 0             files = []
480 0             dir_cursor = self.fs.find({"filename": f})
481 0             for requested_dir in dir_cursor:
482 0                 exception_dir = next(dir_cursor, None)
483
484 0                 if exception_dir:
485 0                     raise FsException(
486                         "Multiple directories found",
487                         http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
488                     )
489
490 0                 if requested_dir.metadata["type"] != "dir":
491 0                     raise FsException(
492                         "File {} does not exist".format(f),
493                         http_code=HTTPStatus.NOT_FOUND,
494                     )
495
496 0                 if f.endswith("/"):
497 0                     f = f[:-1]
498
499 0                 files_cursor = self.fs.find(
500                     {"filename": {"$regex": "^{}/([^/])*".format(f)}}
501                 )
502 0                 for children_file in files_cursor:
503 0                     files += [children_file.filename.replace(f + "/", "", 1)]
504
505 0             return files
506 0         except IOError:
507 0             raise FsException(
508                 "File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST
509             )
510
511 1     def file_delete(self, storage, ignore_non_exist=False):
512         """
513         Delete storage content recursively
514         :param storage: can be a str or list of str
515         :param ignore_non_exist: not raise exception if storage does not exist
516         :return: None
517         """
518 0         try:
519 0             f = storage if isinstance(storage, str) else "/".join(storage)
520 0             f = f.rstrip("/")
521
522 0             file_cursor = self.fs.find({"filename": f})
523 0             found = False
524 0             for requested_file in file_cursor:
525 0                 found = True
526 0                 exception_file = next(file_cursor, None)
527
528 0                 if exception_file:
529 0                     self.logger.error(
530                         "Cannot delete duplicate file: {} and {}".format(
531                             requested_file.filename, exception_file.filename
532                         )
533                     )
534 0                     raise FsException(
535                         "Multiple files found",
536                         http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
537                     )
538
539 0                 if requested_file.metadata["type"] == "dir":
540 0                     dir_cursor = self.fs.find(
541                         {"filename": {"$regex": "^{}/".format(f)}}
542                     )
543
544 0                     for tmp in dir_cursor:
545 0                         self.logger.debug("Deleting {}".format(tmp.filename))
546 0                         self.fs.delete(tmp._id)
547
548 0                 self.logger.debug("Deleting {}".format(requested_file.filename))
549 0                 self.fs.delete(requested_file._id)
550 0             if not found and not ignore_non_exist:
551 0                 raise FsException(
552                     "File {} does not exist".format(storage),
553                     http_code=HTTPStatus.NOT_FOUND,
554                 )
555 0         except IOError as e:
556 0             raise FsException(
557                 "File {} cannot be deleted: {}".format(f, e),
558                 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
559             )
560
561 1     def sync(self, from_path=None):
562         """
563         Sync from FSMongo to local storage
564         :param from_path: if supplied, only copy content from this path, not all
565         :return: None
566         """
567 1         if from_path:
568 0             if os.path.isabs(from_path):
569 0                 from_path = os.path.relpath(from_path, self.path)
570 1         self.__update_local_fs(from_path=from_path)
571
572 1     def _update_mongo_fs(self, from_path):
573 1         os_path = self.path + from_path
574
575         # Obtain list of files and dirs in filesystem
576 1         members = []
577 1         for root, dirs, files in os.walk(os_path):
578 1             for folder in dirs:
579 1                 member = {"filename": os.path.join(root, folder), "type": "dir"}
580 1                 if os.path.islink(member["filename"]):
581 0                     member["type"] = "sym"
582 1                 members.append(member)
583 1             for file in files:
584 1                 filename = os.path.join(root, file)
585 1                 if os.path.islink(filename):
586 0                     file_type = "sym"
587                 else:
588 1                     file_type = "file"
589 1                 member = {"filename": os.path.join(root, file), "type": file_type}
590 1                 members.append(member)
591
592         # Obtain files in mongo dict
593 1         remote_files = self._get_mongo_files(from_path)
594
595         # Upload members if they do not exists or have been modified
596         # We will do this for performance (avoid updating unmodified files) and to avoid
597         # updating a file with an older one in case there are two sources for synchronization
598         # in high availability scenarios
599 1         for member in members:
600             # obtain permission
601 1             mask = int(oct(os.stat(member["filename"]).st_mode)[-3:], 8)
602
603             # convert to relative path
604 1             rel_filename = os.path.relpath(member["filename"], self.path)
605             # get timestamp in UTC because mongo stores upload date in UTC:
606             # https://www.mongodb.com/docs/v4.0/tutorial/model-time-data/#overview
607 1             last_modified_date = datetime.datetime.utcfromtimestamp(
608                 os.path.getmtime(member["filename"])
609             )
610
611 1             remote_file = remote_files.get(rel_filename)
612 1             upload_date = (
613                 remote_file[0].uploadDate if remote_file else datetime.datetime.min
614             )
615             # remove processed files from dict
616 1             remote_files.pop(rel_filename, None)
617
618 1             if last_modified_date >= upload_date:
619 1                 stream = None
620 1                 fh = None
621 1                 try:
622 1                     file_type = member["type"]
623 1                     if file_type == "dir":
624 1                         stream = BytesIO()
625 1                     elif file_type == "sym":
626 0                         stream = BytesIO(
627                             os.readlink(member["filename"]).encode("utf-8")
628                         )
629                     else:
630 1                         fh = open(member["filename"], "rb")
631 1                         stream = BytesIO(fh.read())
632
633 1                     metadata = {"type": file_type, "permissions": mask}
634
635 1                     self.logger.debug("Sync upload {}".format(rel_filename))
636 1                     self.fs.upload_from_stream(rel_filename, stream, metadata=metadata)
637
638                     # delete old files
639 1                     if remote_file:
640 0                         for file in remote_file:
641 0                             self.logger.debug("Sync deleting {}".format(file.filename))
642 0                             self.fs.delete(file._id)
643                 finally:
644 1                     if fh:
645 1                         fh.close()
646 1                     if stream:
647 1                         stream.close()
648
649         # delete files that are not any more in local fs
650 1         for remote_file in remote_files.values():
651 0             for file in remote_file:
652 0                 self.fs.delete(file._id)
653
654 1     def _get_mongo_files(self, from_path=None):
655 1         file_dict = {}
656 1         file_cursor = self.fs.find(no_cursor_timeout=True, sort=[("uploadDate", -1)])
657 1         for file in file_cursor:
658 0             if from_path and not file.filename.startswith(from_path):
659 0                 continue
660 0             if file.filename in file_dict:
661 0                 file_dict[file.filename].append(file)
662             else:
663 0                 file_dict[file.filename] = [file]
664 1         return file_dict
665
666 1     def reverse_sync(self, from_path: str):
667         """
668         Sync from local storage to FSMongo
669         :param from_path: base directory to upload content to mongo fs
670         :return: None
671         """
672 1         if os.path.isabs(from_path):
673 0             from_path = os.path.relpath(from_path, self.path)
674 1         self._update_mongo_fs(from_path=from_path)