1ce59097b7110f39f13afee4f4c0a9c5e44bec54
[osm/common.git] / osm_common / fsmongo.py
1 # Copyright 2019 Canonical
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
13 # under the License.
14 #
15 # For those usages not covered by the Apache License, Version 2.0 please
16 # contact: eduardo.sousa@canonical.com
17 ##
18 import datetime
19 import errno
20 from http import HTTPStatus
21 from io import BytesIO, StringIO
22 import logging
23 import os
24 import tarfile
25 import zipfile
26
27 from gridfs import errors, GridFSBucket
28 from osm_common.fsbase import FsBase, FsException
29 from pymongo import MongoClient
30
31
32 __author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>"
33
34
35 class GridByteStream(BytesIO):
36 def __init__(self, filename, fs, mode):
37 BytesIO.__init__(self)
38 self._id = None
39 self.filename = filename
40 self.fs = fs
41 self.mode = mode
42 self.file_type = "file" # Set "file" as default file_type
43
44 self.__initialize__()
45
46 def __initialize__(self):
47 grid_file = None
48
49 cursor = self.fs.find({"filename": self.filename})
50
51 for requested_file in cursor:
52 exception_file = next(cursor, None)
53
54 if exception_file:
55 raise FsException(
56 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
57 )
58
59 if requested_file.metadata["type"] in ("file", "sym"):
60 grid_file = requested_file
61 self.file_type = requested_file.metadata["type"]
62 else:
63 raise FsException(
64 "Type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
65 )
66
67 if grid_file:
68 self._id = grid_file._id
69 self.fs.download_to_stream(self._id, self)
70
71 if "r" in self.mode:
72 self.seek(0, 0)
73
74 def close(self):
75 if "r" in self.mode:
76 super(GridByteStream, self).close()
77 return
78
79 if self._id:
80 self.fs.delete(self._id)
81
82 cursor = self.fs.find(
83 {"filename": self.filename.split("/")[0], "metadata": {"type": "dir"}}
84 )
85
86 parent_dir = next(cursor, None)
87
88 if not parent_dir:
89 parent_dir_name = self.filename.split("/")[0]
90 self.filename = self.filename.replace(
91 parent_dir_name, parent_dir_name[:-1], 1
92 )
93
94 self.seek(0, 0)
95 if self._id:
96 self.fs.upload_from_stream_with_id(
97 self._id, self.filename, self, metadata={"type": self.file_type}
98 )
99 else:
100 self.fs.upload_from_stream(
101 self.filename, self, metadata={"type": self.file_type}
102 )
103 super(GridByteStream, self).close()
104
105 def __enter__(self):
106 return self
107
108 def __exit__(self, exc_type, exc_val, exc_tb):
109 self.close()
110
111
112 class GridStringStream(StringIO):
113 def __init__(self, filename, fs, mode):
114 StringIO.__init__(self)
115 self._id = None
116 self.filename = filename
117 self.fs = fs
118 self.mode = mode
119 self.file_type = "file" # Set "file" as default file_type
120
121 self.__initialize__()
122
123 def __initialize__(self):
124 grid_file = None
125
126 cursor = self.fs.find({"filename": self.filename})
127
128 for requested_file in cursor:
129 exception_file = next(cursor, None)
130
131 if exception_file:
132 raise FsException(
133 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
134 )
135
136 if requested_file.metadata["type"] in ("file", "dir"):
137 grid_file = requested_file
138 self.file_type = requested_file.metadata["type"]
139 else:
140 raise FsException(
141 "File type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
142 )
143
144 if grid_file:
145 stream = BytesIO()
146 self._id = grid_file._id
147 self.fs.download_to_stream(self._id, stream)
148 stream.seek(0)
149 self.write(stream.read().decode("utf-8"))
150 stream.close()
151
152 if "r" in self.mode:
153 self.seek(0, 0)
154
155 def close(self):
156 if "r" in self.mode:
157 super(GridStringStream, self).close()
158 return
159
160 if self._id:
161 self.fs.delete(self._id)
162
163 cursor = self.fs.find(
164 {"filename": self.filename.split("/")[0], "metadata": {"type": "dir"}}
165 )
166
167 parent_dir = next(cursor, None)
168
169 if not parent_dir:
170 parent_dir_name = self.filename.split("/")[0]
171 self.filename = self.filename.replace(
172 parent_dir_name, parent_dir_name[:-1], 1
173 )
174
175 self.seek(0, 0)
176 stream = BytesIO()
177 stream.write(self.read().encode("utf-8"))
178 stream.seek(0, 0)
179 if self._id:
180 self.fs.upload_from_stream_with_id(
181 self._id, self.filename, stream, metadata={"type": self.file_type}
182 )
183 else:
184 self.fs.upload_from_stream(
185 self.filename, stream, metadata={"type": self.file_type}
186 )
187 stream.close()
188 super(GridStringStream, self).close()
189
190 def __enter__(self):
191 return self
192
193 def __exit__(self, exc_type, exc_val, exc_tb):
194 self.close()
195
196
197 class FsMongo(FsBase):
198 def __init__(self, logger_name="fs", lock=False):
199 super().__init__(logger_name, lock)
200 self.path = None
201 self.client = None
202 self.fs = None
203
204 def __update_local_fs(self, from_path=None):
205 dir_cursor = self.fs.find({"metadata.type": "dir"}, no_cursor_timeout=True)
206
207 valid_paths = []
208
209 for directory in dir_cursor:
210 if from_path and not directory.filename.startswith(from_path):
211 continue
212 self.logger.debug("Making dir {}".format(self.path + directory.filename))
213 os.makedirs(self.path + directory.filename, exist_ok=True)
214 valid_paths.append(self.path + directory.filename)
215
216 file_cursor = self.fs.find(
217 {"metadata.type": {"$in": ["file", "sym"]}}, no_cursor_timeout=True
218 )
219
220 for writing_file in file_cursor:
221 if from_path and not writing_file.filename.startswith(from_path):
222 continue
223 file_path = self.path + writing_file.filename
224
225 if writing_file.metadata["type"] == "sym":
226 with BytesIO() as b:
227 self.fs.download_to_stream(writing_file._id, b)
228 b.seek(0)
229 link = b.read().decode("utf-8")
230
231 try:
232 self.logger.debug("Sync removing {}".format(file_path))
233 os.remove(file_path)
234 except OSError as e:
235 if e.errno != errno.ENOENT:
236 # This is probably permission denied or worse
237 raise
238 os.symlink(link, file_path)
239 else:
240 folder = os.path.dirname(file_path)
241 if folder not in valid_paths:
242 self.logger.debug("Sync local directory {}".format(file_path))
243 os.makedirs(folder, exist_ok=True)
244 with open(file_path, "wb+") as file_stream:
245 self.logger.debug("Sync download {}".format(file_path))
246 self.fs.download_to_stream(writing_file._id, file_stream)
247 if "permissions" in writing_file.metadata:
248 os.chmod(file_path, writing_file.metadata["permissions"])
249
250 def get_params(self):
251 return {"fs": "mongo", "path": self.path}
252
253 def fs_connect(self, config):
254 try:
255 if "logger_name" in config:
256 self.logger = logging.getLogger(config["logger_name"])
257 if "path" in config:
258 self.path = config["path"]
259 else:
260 raise FsException('Missing parameter "path"')
261 if not self.path.endswith("/"):
262 self.path += "/"
263 if not os.path.exists(self.path):
264 raise FsException(
265 "Invalid configuration param at '[storage]': path '{}' does not exist".format(
266 config["path"]
267 )
268 )
269 elif not os.access(self.path, os.W_OK):
270 raise FsException(
271 "Invalid configuration param at '[storage]': path '{}' is not writable".format(
272 config["path"]
273 )
274 )
275 if all(key in config.keys() for key in ["uri", "collection"]):
276 self.client = MongoClient(config["uri"])
277 self.fs = GridFSBucket(self.client[config["collection"]])
278 elif all(key in config.keys() for key in ["host", "port", "collection"]):
279 self.client = MongoClient(config["host"], config["port"])
280 self.fs = GridFSBucket(self.client[config["collection"]])
281 else:
282 if "collection" not in config.keys():
283 raise FsException('Missing parameter "collection"')
284 else:
285 raise FsException('Missing parameters: "uri" or "host" + "port"')
286 except FsException:
287 raise
288 except Exception as e: # TODO refine
289 raise FsException(str(e))
290
291 def fs_disconnect(self):
292 pass # TODO
293
294 def mkdir(self, folder):
295 """
296 Creates a folder or parent object location
297 :param folder:
298 :return: None or raises an exception
299 """
300 folder = folder.rstrip("/")
301 try:
302 self.fs.upload_from_stream(folder, BytesIO(), metadata={"type": "dir"})
303 except errors.FileExists: # make it idempotent
304 pass
305 except Exception as e:
306 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
307
308 def dir_rename(self, src, dst):
309 """
310 Rename one directory name. If dst exist, it replaces (deletes) existing directory
311 :param src: source directory
312 :param dst: destination directory
313 :return: None or raises and exception
314 """
315 dst = dst.rstrip("/")
316 src = src.rstrip("/")
317
318 try:
319 dst_cursor = self.fs.find(
320 {"filename": {"$regex": "^{}(/|$)".format(dst)}}, no_cursor_timeout=True
321 )
322
323 for dst_file in dst_cursor:
324 self.fs.delete(dst_file._id)
325
326 src_cursor = self.fs.find(
327 {"filename": {"$regex": "^{}(/|$)".format(src)}}, no_cursor_timeout=True
328 )
329
330 for src_file in src_cursor:
331 self.fs.rename(src_file._id, src_file.filename.replace(src, dst, 1))
332 except Exception as e:
333 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
334
335 def file_exists(self, storage, mode=None):
336 """
337 Indicates if "storage" file exist
338 :param storage: can be a str or a str list
339 :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
340 :return: True, False
341 """
342 f = storage if isinstance(storage, str) else "/".join(storage)
343 f = f.rstrip("/")
344
345 cursor = self.fs.find({"filename": f})
346
347 for requested_file in cursor:
348 exception_file = next(cursor, None)
349
350 if exception_file:
351 raise FsException(
352 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
353 )
354
355 self.logger.debug("Entry {} metadata {}".format(f, requested_file.metadata))
356
357 # if no special mode is required just check it does exists
358 if not mode:
359 return True
360
361 if requested_file.metadata["type"] == mode:
362 return True
363
364 if requested_file.metadata["type"] == "sym" and mode == "file":
365 return True
366
367 return False
368
369 def file_size(self, storage):
370 """
371 return file size
372 :param storage: can be a str or a str list
373 :return: file size
374 """
375 f = storage if isinstance(storage, str) else "/".join(storage)
376 f = f.rstrip("/")
377
378 cursor = self.fs.find({"filename": f})
379
380 for requested_file in cursor:
381 exception_file = next(cursor, None)
382
383 if exception_file:
384 raise FsException(
385 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
386 )
387
388 return requested_file.length
389
390 def file_extract(self, compressed_object, path):
391 """
392 extract a tar file
393 :param compressed_object: object of type tar or zip
394 :param path: can be a str or a str list, or a tar object where to extract the tar_object
395 :return: None
396 """
397 f = path if isinstance(path, str) else "/".join(path)
398 f = f.rstrip("/")
399
400 if type(compressed_object) is tarfile.TarFile:
401 for member in compressed_object.getmembers():
402 if member.isfile():
403 stream = compressed_object.extractfile(member)
404 elif member.issym():
405 stream = BytesIO(member.linkname.encode("utf-8"))
406 else:
407 stream = BytesIO()
408
409 if member.isfile():
410 file_type = "file"
411 elif member.issym():
412 file_type = "sym"
413 else:
414 file_type = "dir"
415
416 metadata = {"type": file_type, "permissions": member.mode}
417 member.name = member.name.rstrip("/")
418
419 self.logger.debug("Uploading {}/{}".format(f, member.name))
420 self.fs.upload_from_stream(
421 f + "/" + member.name, stream, metadata=metadata
422 )
423
424 stream.close()
425 elif type(compressed_object) is zipfile.ZipFile:
426 for member in compressed_object.infolist():
427 if member.is_dir():
428 stream = BytesIO()
429 else:
430 stream = compressed_object.read(member)
431
432 if member.is_dir():
433 file_type = "dir"
434 else:
435 file_type = "file"
436
437 metadata = {"type": file_type}
438 member.filename = member.filename.rstrip("/")
439
440 self.logger.debug("Uploading {}/{}".format(f, member.filename))
441 self.fs.upload_from_stream(
442 f + "/" + member.filename, stream, metadata=metadata
443 )
444
445 if member.is_dir():
446 stream.close()
447
448 def file_open(self, storage, mode):
449 """
450 Open a file
451 :param storage: can be a str or list of str
452 :param mode: file mode
453 :return: file object
454 """
455 try:
456 f = storage if isinstance(storage, str) else "/".join(storage)
457 f = f.rstrip("/")
458
459 if "b" in mode:
460 return GridByteStream(f, self.fs, mode)
461 else:
462 return GridStringStream(f, self.fs, mode)
463 except errors.NoFile:
464 raise FsException(
465 "File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND
466 )
467 except IOError:
468 raise FsException(
469 "File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST
470 )
471
472 def dir_ls(self, storage):
473 """
474 return folder content
475 :param storage: can be a str or list of str
476 :return: folder content
477 """
478 try:
479 f = storage if isinstance(storage, str) else "/".join(storage)
480 f = f.rstrip("/")
481
482 files = []
483 dir_cursor = self.fs.find({"filename": f})
484 for requested_dir in dir_cursor:
485 exception_dir = next(dir_cursor, None)
486
487 if exception_dir:
488 raise FsException(
489 "Multiple directories found",
490 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
491 )
492
493 if requested_dir.metadata["type"] != "dir":
494 raise FsException(
495 "File {} does not exist".format(f),
496 http_code=HTTPStatus.NOT_FOUND,
497 )
498
499 if f.endswith("/"):
500 f = f[:-1]
501
502 files_cursor = self.fs.find(
503 {"filename": {"$regex": "^{}/([^/])*".format(f)}}
504 )
505 for children_file in files_cursor:
506 files += [children_file.filename.replace(f + "/", "", 1)]
507
508 return files
509 except IOError:
510 raise FsException(
511 "File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST
512 )
513
514 def file_delete(self, storage, ignore_non_exist=False):
515 """
516 Delete storage content recursively
517 :param storage: can be a str or list of str
518 :param ignore_non_exist: not raise exception if storage does not exist
519 :return: None
520 """
521 try:
522 f = storage if isinstance(storage, str) else "/".join(storage)
523 f = f.rstrip("/")
524
525 file_cursor = self.fs.find({"filename": f})
526 found = False
527 for requested_file in file_cursor:
528 found = True
529 exception_file = next(file_cursor, None)
530
531 if exception_file:
532 self.logger.error(
533 "Cannot delete duplicate file: {} and {}".format(
534 requested_file.filename, exception_file.filename
535 )
536 )
537 raise FsException(
538 "Multiple files found",
539 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
540 )
541
542 if requested_file.metadata["type"] == "dir":
543 dir_cursor = self.fs.find(
544 {"filename": {"$regex": "^{}/".format(f)}}
545 )
546
547 for tmp in dir_cursor:
548 self.logger.debug("Deleting {}".format(tmp.filename))
549 self.fs.delete(tmp._id)
550
551 self.logger.debug("Deleting {}".format(requested_file.filename))
552 self.fs.delete(requested_file._id)
553 if not found and not ignore_non_exist:
554 raise FsException(
555 "File {} does not exist".format(storage),
556 http_code=HTTPStatus.NOT_FOUND,
557 )
558 except IOError as e:
559 raise FsException(
560 "File {} cannot be deleted: {}".format(f, e),
561 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
562 )
563
564 def sync(self, from_path=None):
565 """
566 Sync from FSMongo to local storage
567 :param from_path: if supplied, only copy content from this path, not all
568 :return: None
569 """
570 if from_path:
571 if os.path.isabs(from_path):
572 from_path = os.path.relpath(from_path, self.path)
573 self.__update_local_fs(from_path=from_path)
574
575 def _update_mongo_fs(self, from_path):
576
577 os_path = self.path + from_path
578
579 # Obtain list of files and dirs in filesystem
580 members = []
581 for root, dirs, files in os.walk(os_path):
582 for folder in dirs:
583 member = {"filename": os.path.join(root, folder), "type": "dir"}
584 if os.path.islink(member["filename"]):
585 member["type"] = "sym"
586 members.append(member)
587 for file in files:
588 filename = os.path.join(root, file)
589 if os.path.islink(filename):
590 file_type = "sym"
591 else:
592 file_type = "file"
593 member = {"filename": os.path.join(root, file), "type": file_type}
594 members.append(member)
595
596 # Obtain files in mongo dict
597 remote_files = self._get_mongo_files(from_path)
598
599 # Upload members if they do not exists or have been modified
600 # We will do this for performance (avoid updating unmodified files) and to avoid
601 # updating a file with an older one in case there are two sources for synchronization
602 # in high availability scenarios
603 for member in members:
604 # obtain permission
605 mask = int(oct(os.stat(member["filename"]).st_mode)[-3:], 8)
606
607 # convert to relative path
608 rel_filename = os.path.relpath(member["filename"], self.path)
609 last_modified_date = datetime.datetime.fromtimestamp(
610 os.path.getmtime(member["filename"])
611 )
612
613 remote_file = remote_files.get(rel_filename)
614 upload_date = (
615 remote_file[0].uploadDate if remote_file else datetime.datetime.min
616 )
617 # remove processed files from dict
618 remote_files.pop(rel_filename, None)
619
620 if last_modified_date >= upload_date:
621
622 stream = None
623 fh = None
624 try:
625 file_type = member["type"]
626 if file_type == "dir":
627 stream = BytesIO()
628 elif file_type == "sym":
629 stream = BytesIO(
630 os.readlink(member["filename"]).encode("utf-8")
631 )
632 else:
633 fh = open(member["filename"], "rb")
634 stream = BytesIO(fh.read())
635
636 metadata = {"type": file_type, "permissions": mask}
637
638 self.logger.debug("Sync upload {}".format(rel_filename))
639 self.fs.upload_from_stream(rel_filename, stream, metadata=metadata)
640
641 # delete old files
642 if remote_file:
643 for file in remote_file:
644 self.logger.debug("Sync deleting {}".format(file.filename))
645 self.fs.delete(file._id)
646 finally:
647 if fh:
648 fh.close()
649 if stream:
650 stream.close()
651
652 # delete files that are not any more in local fs
653 for remote_file in remote_files.values():
654 for file in remote_file:
655 self.fs.delete(file._id)
656
657 def _get_mongo_files(self, from_path=None):
658
659 file_dict = {}
660 file_cursor = self.fs.find(no_cursor_timeout=True, sort=[("uploadDate", -1)])
661 for file in file_cursor:
662 if from_path and not file.filename.startswith(from_path):
663 continue
664 if file.filename in file_dict:
665 file_dict[file.filename].append(file)
666 else:
667 file_dict[file.filename] = [file]
668 return file_dict
669
670 def reverse_sync(self, from_path: str):
671 """
672 Sync from local storage to FSMongo
673 :param from_path: base directory to upload content to mongo fs
674 :return: None
675 """
676 if os.path.isabs(from_path):
677 from_path = os.path.relpath(from_path, self.path)
678 self._update_mongo_fs(from_path=from_path)