2e47039bbf3ff6af8d818c7a382660d5572c7bf6
[osm/common.git] / osm_common / fsmongo.py
1 # Copyright 2019 Canonical
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
13 # under the License.
14 #
15 # For those usages not covered by the Apache License, Version 2.0 please
16 # contact: eduardo.sousa@canonical.com
17 ##
18 import datetime
19 import errno
20 from http import HTTPStatus
21 from io import BytesIO, StringIO
22 import logging
23 import os
24 import tarfile
25 import zipfile
26
27 from gridfs import errors, GridFSBucket
28 from osm_common.fsbase import FsBase, FsException
29 from pymongo import MongoClient
30
31
32 __author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>"
33
34
35 class GridByteStream(BytesIO):
36 def __init__(self, filename, fs, mode):
37 BytesIO.__init__(self)
38 self._id = None
39 self.filename = filename
40 self.fs = fs
41 self.mode = mode
42 self.file_type = "file" # Set "file" as default file_type
43
44 self.__initialize__()
45
46 def __initialize__(self):
47 grid_file = None
48
49 cursor = self.fs.find({"filename": self.filename})
50
51 for requested_file in cursor:
52 exception_file = next(cursor, None)
53
54 if exception_file:
55 raise FsException(
56 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
57 )
58
59 if requested_file.metadata["type"] in ("file", "sym"):
60 grid_file = requested_file
61 self.file_type = requested_file.metadata["type"]
62 else:
63 raise FsException(
64 "Type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
65 )
66
67 if grid_file:
68 self._id = grid_file._id
69 self.fs.download_to_stream(self._id, self)
70
71 if "r" in self.mode:
72 self.seek(0, 0)
73
74 def close(self):
75 if "r" in self.mode:
76 super(GridByteStream, self).close()
77 return
78
79 if self._id:
80 self.fs.delete(self._id)
81
82 cursor = self.fs.find(
83 {"filename": self.filename.split("/")[0], "metadata": {"type": "dir"}}
84 )
85
86 parent_dir = next(cursor, None)
87
88 if not parent_dir:
89 parent_dir_name = self.filename.split("/")[0]
90 self.filename = self.filename.replace(
91 parent_dir_name, parent_dir_name[:-1], 1
92 )
93
94 self.seek(0, 0)
95 if self._id:
96 self.fs.upload_from_stream_with_id(
97 self._id, self.filename, self, metadata={"type": self.file_type}
98 )
99 else:
100 self.fs.upload_from_stream(
101 self.filename, self, metadata={"type": self.file_type}
102 )
103 super(GridByteStream, self).close()
104
105 def __enter__(self):
106 return self
107
108 def __exit__(self, exc_type, exc_val, exc_tb):
109 self.close()
110
111
112 class GridStringStream(StringIO):
113 def __init__(self, filename, fs, mode):
114 StringIO.__init__(self)
115 self._id = None
116 self.filename = filename
117 self.fs = fs
118 self.mode = mode
119 self.file_type = "file" # Set "file" as default file_type
120
121 self.__initialize__()
122
123 def __initialize__(self):
124 grid_file = None
125
126 cursor = self.fs.find({"filename": self.filename})
127
128 for requested_file in cursor:
129 exception_file = next(cursor, None)
130
131 if exception_file:
132 raise FsException(
133 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
134 )
135
136 if requested_file.metadata["type"] in ("file", "dir"):
137 grid_file = requested_file
138 self.file_type = requested_file.metadata["type"]
139 else:
140 raise FsException(
141 "File type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
142 )
143
144 if grid_file:
145 stream = BytesIO()
146 self._id = grid_file._id
147 self.fs.download_to_stream(self._id, stream)
148 stream.seek(0)
149 self.write(stream.read().decode("utf-8"))
150 stream.close()
151
152 if "r" in self.mode:
153 self.seek(0, 0)
154
155 def close(self):
156 if "r" in self.mode:
157 super(GridStringStream, self).close()
158 return
159
160 if self._id:
161 self.fs.delete(self._id)
162
163 cursor = self.fs.find(
164 {"filename": self.filename.split("/")[0], "metadata": {"type": "dir"}}
165 )
166
167 parent_dir = next(cursor, None)
168
169 if not parent_dir:
170 parent_dir_name = self.filename.split("/")[0]
171 self.filename = self.filename.replace(
172 parent_dir_name, parent_dir_name[:-1], 1
173 )
174
175 self.seek(0, 0)
176 stream = BytesIO()
177 stream.write(self.read().encode("utf-8"))
178 stream.seek(0, 0)
179 if self._id:
180 self.fs.upload_from_stream_with_id(
181 self._id, self.filename, stream, metadata={"type": self.file_type}
182 )
183 else:
184 self.fs.upload_from_stream(
185 self.filename, stream, metadata={"type": self.file_type}
186 )
187 stream.close()
188 super(GridStringStream, self).close()
189
190 def __enter__(self):
191 return self
192
193 def __exit__(self, exc_type, exc_val, exc_tb):
194 self.close()
195
196
197 class FsMongo(FsBase):
198 def __init__(self, logger_name="fs", lock=False):
199 super().__init__(logger_name, lock)
200 self.path = None
201 self.client = None
202 self.fs = None
203
204 def __update_local_fs(self, from_path=None):
205 dir_cursor = self.fs.find({"metadata.type": "dir"}, no_cursor_timeout=True)
206
207 valid_paths = []
208
209 for directory in dir_cursor:
210 if from_path and not directory.filename.startswith(from_path):
211 continue
212 self.logger.debug("Making dir {}".format(self.path + directory.filename))
213 os.makedirs(self.path + directory.filename, exist_ok=True)
214 valid_paths.append(self.path + directory.filename)
215
216 file_cursor = self.fs.find(
217 {"metadata.type": {"$in": ["file", "sym"]}}, no_cursor_timeout=True
218 )
219
220 for writing_file in file_cursor:
221 if from_path and not writing_file.filename.startswith(from_path):
222 continue
223 file_path = self.path + writing_file.filename
224
225 if writing_file.metadata["type"] == "sym":
226 with BytesIO() as b:
227 self.fs.download_to_stream(writing_file._id, b)
228 b.seek(0)
229 link = b.read().decode("utf-8")
230
231 try:
232 self.logger.debug("Sync removing {}".format(file_path))
233 os.remove(file_path)
234 except OSError as e:
235 if e.errno != errno.ENOENT:
236 # This is probably permission denied or worse
237 raise
238 os.symlink(
239 link, os.path.realpath(os.path.normpath(os.path.abspath(file_path)))
240 )
241 else:
242 folder = os.path.dirname(file_path)
243 if folder not in valid_paths:
244 self.logger.debug("Sync local directory {}".format(file_path))
245 os.makedirs(folder, exist_ok=True)
246 with open(file_path, "wb+") as file_stream:
247 self.logger.debug("Sync download {}".format(file_path))
248 self.fs.download_to_stream(writing_file._id, file_stream)
249 if "permissions" in writing_file.metadata:
250 os.chmod(file_path, writing_file.metadata["permissions"])
251
252 def get_params(self):
253 return {"fs": "mongo", "path": self.path}
254
255 def fs_connect(self, config):
256 try:
257 if "logger_name" in config:
258 self.logger = logging.getLogger(config["logger_name"])
259 if "path" in config:
260 self.path = config["path"]
261 else:
262 raise FsException('Missing parameter "path"')
263 if not self.path.endswith("/"):
264 self.path += "/"
265 if not os.path.exists(self.path):
266 raise FsException(
267 "Invalid configuration param at '[storage]': path '{}' does not exist".format(
268 config["path"]
269 )
270 )
271 elif not os.access(self.path, os.W_OK):
272 raise FsException(
273 "Invalid configuration param at '[storage]': path '{}' is not writable".format(
274 config["path"]
275 )
276 )
277 if all(key in config.keys() for key in ["uri", "collection"]):
278 self.client = MongoClient(config["uri"])
279 self.fs = GridFSBucket(self.client[config["collection"]])
280 else:
281 if "collection" not in config.keys():
282 raise FsException('Missing parameter "collection"')
283 else:
284 raise FsException('Missing parameters: "uri"')
285 except FsException:
286 raise
287 except Exception as e: # TODO refine
288 raise FsException(str(e))
289
290 def fs_disconnect(self):
291 pass # TODO
292
293 def mkdir(self, folder):
294 """
295 Creates a folder or parent object location
296 :param folder:
297 :return: None or raises an exception
298 """
299 folder = folder.rstrip("/")
300 try:
301 self.fs.upload_from_stream(folder, BytesIO(), metadata={"type": "dir"})
302 except errors.FileExists: # make it idempotent
303 pass
304 except Exception as e:
305 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
306
307 def dir_rename(self, src, dst):
308 """
309 Rename one directory name. If dst exist, it replaces (deletes) existing directory
310 :param src: source directory
311 :param dst: destination directory
312 :return: None or raises and exception
313 """
314 dst = dst.rstrip("/")
315 src = src.rstrip("/")
316
317 try:
318 dst_cursor = self.fs.find(
319 {"filename": {"$regex": "^{}(/|$)".format(dst)}}, no_cursor_timeout=True
320 )
321
322 for dst_file in dst_cursor:
323 self.fs.delete(dst_file._id)
324
325 src_cursor = self.fs.find(
326 {"filename": {"$regex": "^{}(/|$)".format(src)}}, no_cursor_timeout=True
327 )
328
329 for src_file in src_cursor:
330 self.fs.rename(src_file._id, src_file.filename.replace(src, dst, 1))
331 except Exception as e:
332 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
333
334 def file_exists(self, storage, mode=None):
335 """
336 Indicates if "storage" file exist
337 :param storage: can be a str or a str list
338 :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
339 :return: True, False
340 """
341 f = storage if isinstance(storage, str) else "/".join(storage)
342 f = f.rstrip("/")
343
344 cursor = self.fs.find({"filename": f})
345
346 for requested_file in cursor:
347 exception_file = next(cursor, None)
348
349 if exception_file:
350 raise FsException(
351 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
352 )
353
354 self.logger.debug("Entry {} metadata {}".format(f, requested_file.metadata))
355
356 # if no special mode is required just check it does exists
357 if not mode:
358 return True
359
360 if requested_file.metadata["type"] == mode:
361 return True
362
363 if requested_file.metadata["type"] == "sym" and mode == "file":
364 return True
365
366 return False
367
368 def file_size(self, storage):
369 """
370 return file size
371 :param storage: can be a str or a str list
372 :return: file size
373 """
374 f = storage if isinstance(storage, str) else "/".join(storage)
375 f = f.rstrip("/")
376
377 cursor = self.fs.find({"filename": f})
378
379 for requested_file in cursor:
380 exception_file = next(cursor, None)
381
382 if exception_file:
383 raise FsException(
384 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
385 )
386
387 return requested_file.length
388
389 def file_extract(self, compressed_object, path):
390 """
391 extract a tar file
392 :param compressed_object: object of type tar or zip
393 :param path: can be a str or a str list, or a tar object where to extract the tar_object
394 :return: None
395 """
396 f = path if isinstance(path, str) else "/".join(path)
397 f = f.rstrip("/")
398
399 if type(compressed_object) is tarfile.TarFile:
400 for member in compressed_object.getmembers():
401 if member.isfile():
402 stream = compressed_object.extractfile(member)
403 elif member.issym():
404 stream = BytesIO(member.linkname.encode("utf-8"))
405 else:
406 stream = BytesIO()
407
408 if member.isfile():
409 file_type = "file"
410 elif member.issym():
411 file_type = "sym"
412 else:
413 file_type = "dir"
414
415 metadata = {"type": file_type, "permissions": member.mode}
416 member.name = member.name.rstrip("/")
417
418 self.logger.debug("Uploading {}/{}".format(f, member.name))
419 self.fs.upload_from_stream(
420 f + "/" + member.name, stream, metadata=metadata
421 )
422
423 stream.close()
424 elif type(compressed_object) is zipfile.ZipFile:
425 for member in compressed_object.infolist():
426 if member.is_dir():
427 stream = BytesIO()
428 else:
429 stream = compressed_object.read(member)
430
431 if member.is_dir():
432 file_type = "dir"
433 else:
434 file_type = "file"
435
436 metadata = {"type": file_type}
437 member.filename = member.filename.rstrip("/")
438
439 self.logger.debug("Uploading {}/{}".format(f, member.filename))
440 self.fs.upload_from_stream(
441 f + "/" + member.filename, stream, metadata=metadata
442 )
443
444 if member.is_dir():
445 stream.close()
446
447 def file_open(self, storage, mode):
448 """
449 Open a file
450 :param storage: can be a str or list of str
451 :param mode: file mode
452 :return: file object
453 """
454 try:
455 f = storage if isinstance(storage, str) else "/".join(storage)
456 f = f.rstrip("/")
457
458 if "b" in mode:
459 return GridByteStream(f, self.fs, mode)
460 else:
461 return GridStringStream(f, self.fs, mode)
462 except errors.NoFile:
463 raise FsException(
464 "File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND
465 )
466 except IOError:
467 raise FsException(
468 "File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST
469 )
470
471 def dir_ls(self, storage):
472 """
473 return folder content
474 :param storage: can be a str or list of str
475 :return: folder content
476 """
477 try:
478 f = storage if isinstance(storage, str) else "/".join(storage)
479 f = f.rstrip("/")
480
481 files = []
482 dir_cursor = self.fs.find({"filename": f})
483 for requested_dir in dir_cursor:
484 exception_dir = next(dir_cursor, None)
485
486 if exception_dir:
487 raise FsException(
488 "Multiple directories found",
489 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
490 )
491
492 if requested_dir.metadata["type"] != "dir":
493 raise FsException(
494 "File {} does not exist".format(f),
495 http_code=HTTPStatus.NOT_FOUND,
496 )
497
498 if f.endswith("/"):
499 f = f[:-1]
500
501 files_cursor = self.fs.find(
502 {"filename": {"$regex": "^{}/([^/])*".format(f)}}
503 )
504 for children_file in files_cursor:
505 files += [children_file.filename.replace(f + "/", "", 1)]
506
507 return files
508 except IOError:
509 raise FsException(
510 "File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST
511 )
512
513 def file_delete(self, storage, ignore_non_exist=False):
514 """
515 Delete storage content recursively
516 :param storage: can be a str or list of str
517 :param ignore_non_exist: not raise exception if storage does not exist
518 :return: None
519 """
520 try:
521 f = storage if isinstance(storage, str) else "/".join(storage)
522 f = f.rstrip("/")
523
524 file_cursor = self.fs.find({"filename": f})
525 found = False
526 for requested_file in file_cursor:
527 found = True
528 exception_file = next(file_cursor, None)
529
530 if exception_file:
531 self.logger.error(
532 "Cannot delete duplicate file: {} and {}".format(
533 requested_file.filename, exception_file.filename
534 )
535 )
536 raise FsException(
537 "Multiple files found",
538 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
539 )
540
541 if requested_file.metadata["type"] == "dir":
542 dir_cursor = self.fs.find(
543 {"filename": {"$regex": "^{}/".format(f)}}
544 )
545
546 for tmp in dir_cursor:
547 self.logger.debug("Deleting {}".format(tmp.filename))
548 self.fs.delete(tmp._id)
549
550 self.logger.debug("Deleting {}".format(requested_file.filename))
551 self.fs.delete(requested_file._id)
552 if not found and not ignore_non_exist:
553 raise FsException(
554 "File {} does not exist".format(storage),
555 http_code=HTTPStatus.NOT_FOUND,
556 )
557 except IOError as e:
558 raise FsException(
559 "File {} cannot be deleted: {}".format(f, e),
560 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
561 )
562
563 def sync(self, from_path=None):
564 """
565 Sync from FSMongo to local storage
566 :param from_path: if supplied, only copy content from this path, not all
567 :return: None
568 """
569 if from_path:
570 if os.path.isabs(from_path):
571 from_path = os.path.relpath(from_path, self.path)
572 self.__update_local_fs(from_path=from_path)
573
574 def _update_mongo_fs(self, from_path):
575 os_path = self.path + from_path
576 # Obtain list of files and dirs in filesystem
577 members = []
578 for root, dirs, files in os.walk(os_path):
579 for folder in dirs:
580 member = {"filename": os.path.join(root, folder), "type": "dir"}
581 if os.path.islink(member["filename"]):
582 member["type"] = "sym"
583 members.append(member)
584 for file in files:
585 filename = os.path.join(root, file)
586 if os.path.islink(filename):
587 file_type = "sym"
588 else:
589 file_type = "file"
590 member = {"filename": os.path.join(root, file), "type": file_type}
591 members.append(member)
592
593 # Obtain files in mongo dict
594 remote_files = self._get_mongo_files(from_path)
595
596 # Upload members if they do not exists or have been modified
597 # We will do this for performance (avoid updating unmodified files) and to avoid
598 # updating a file with an older one in case there are two sources for synchronization
599 # in high availability scenarios
600 for member in members:
601 # obtain permission
602 mask = int(oct(os.stat(member["filename"]).st_mode)[-3:], 8)
603
604 # convert to relative path
605 rel_filename = os.path.relpath(member["filename"], self.path)
606 # get timestamp in UTC because mongo stores upload date in UTC:
607 # https://www.mongodb.com/docs/v4.0/tutorial/model-time-data/#overview
608 last_modified_date = datetime.datetime.utcfromtimestamp(
609 os.path.getmtime(member["filename"])
610 )
611
612 remote_file = remote_files.get(rel_filename)
613 upload_date = (
614 remote_file[0].uploadDate if remote_file else datetime.datetime.min
615 )
616 # remove processed files from dict
617 remote_files.pop(rel_filename, None)
618
619 if last_modified_date >= upload_date:
620 stream = None
621 fh = None
622 try:
623 file_type = member["type"]
624 if file_type == "dir":
625 stream = BytesIO()
626 elif file_type == "sym":
627 stream = BytesIO(
628 os.readlink(member["filename"]).encode("utf-8")
629 )
630 else:
631 fh = open(member["filename"], "rb")
632 stream = BytesIO(fh.read())
633
634 metadata = {"type": file_type, "permissions": mask}
635
636 self.logger.debug("Sync upload {}".format(rel_filename))
637 self.fs.upload_from_stream(rel_filename, stream, metadata=metadata)
638
639 # delete old files
640 if remote_file:
641 for file in remote_file:
642 self.logger.debug("Sync deleting {}".format(file.filename))
643 self.fs.delete(file._id)
644 finally:
645 if fh:
646 fh.close()
647 if stream:
648 stream.close()
649
650 # delete files that are not anymore in local fs
651 for remote_file in remote_files.values():
652 for file in remote_file:
653 self.fs.delete(file._id)
654
655 def _get_mongo_files(self, from_path=None):
656 file_dict = {}
657 file_cursor = self.fs.find(no_cursor_timeout=True, sort=[("uploadDate", -1)])
658 for file in file_cursor:
659 if from_path and not file.filename.startswith(from_path):
660 continue
661 if file.filename in file_dict:
662 file_dict[file.filename].append(file)
663 else:
664 file_dict[file.filename] = [file]
665 return file_dict
666
667 def reverse_sync(self, from_path: str):
668 """
669 Sync from local storage to FSMongo
670 :param from_path: base directory to upload content to mongo fs
671 :return: None
672 """
673 if os.path.isabs(from_path):
674 from_path = os.path.relpath(from_path, self.path)
675 self._update_mongo_fs(from_path=from_path)