727410e14a5fb21c405db4d4e97663160203a971
[osm/common.git] / osm_common / fsmongo.py
1 # Copyright 2019 Canonical
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
13 # under the License.
14 #
15 # For those usages not covered by the Apache License, Version 2.0 please
16 # contact: eduardo.sousa@canonical.com
17 ##
18 import datetime
19 import errno
20 from http import HTTPStatus
21 from io import BytesIO, StringIO
22 import logging
23 import os
24 import tarfile
25 import zipfile
26
27 from gridfs import errors, GridFSBucket
28 from osm_common.fsbase import FsBase, FsException
29 from pymongo import MongoClient
30
31
32 __author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>"
33
34
35 class GridByteStream(BytesIO):
36 def __init__(self, filename, fs, mode):
37 BytesIO.__init__(self)
38 self._id = None
39 self.filename = filename
40 self.fs = fs
41 self.mode = mode
42 self.file_type = "file" # Set "file" as default file_type
43
44 self.__initialize__()
45
46 def __initialize__(self):
47 grid_file = None
48
49 cursor = self.fs.find({"filename": self.filename})
50
51 for requested_file in cursor:
52 exception_file = next(cursor, None)
53
54 if exception_file:
55 raise FsException(
56 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
57 )
58
59 if requested_file.metadata["type"] in ("file", "sym"):
60 grid_file = requested_file
61 self.file_type = requested_file.metadata["type"]
62 else:
63 raise FsException(
64 "Type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
65 )
66
67 if grid_file:
68 self._id = grid_file._id
69 self.fs.download_to_stream(self._id, self)
70
71 if "r" in self.mode:
72 self.seek(0, 0)
73
74 def close(self):
75 if "r" in self.mode:
76 super(GridByteStream, self).close()
77 return
78
79 if self._id:
80 self.fs.delete(self._id)
81
82 cursor = self.fs.find(
83 {"filename": self.filename.split("/")[0], "metadata": {"type": "dir"}}
84 )
85
86 parent_dir = next(cursor, None)
87
88 if not parent_dir:
89 parent_dir_name = self.filename.split("/")[0]
90 self.filename = self.filename.replace(
91 parent_dir_name, parent_dir_name[:-1], 1
92 )
93
94 self.seek(0, 0)
95 if self._id:
96 self.fs.upload_from_stream_with_id(
97 self._id, self.filename, self, metadata={"type": self.file_type}
98 )
99 else:
100 self.fs.upload_from_stream(
101 self.filename, self, metadata={"type": self.file_type}
102 )
103 super(GridByteStream, self).close()
104
105 def __enter__(self):
106 return self
107
108 def __exit__(self, exc_type, exc_val, exc_tb):
109 self.close()
110
111
112 class GridStringStream(StringIO):
113 def __init__(self, filename, fs, mode):
114 StringIO.__init__(self)
115 self._id = None
116 self.filename = filename
117 self.fs = fs
118 self.mode = mode
119 self.file_type = "file" # Set "file" as default file_type
120
121 self.__initialize__()
122
123 def __initialize__(self):
124 grid_file = None
125
126 cursor = self.fs.find({"filename": self.filename})
127
128 for requested_file in cursor:
129 exception_file = next(cursor, None)
130
131 if exception_file:
132 raise FsException(
133 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
134 )
135
136 if requested_file.metadata["type"] in ("file", "dir"):
137 grid_file = requested_file
138 self.file_type = requested_file.metadata["type"]
139 else:
140 raise FsException(
141 "File type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
142 )
143
144 if grid_file:
145 stream = BytesIO()
146 self._id = grid_file._id
147 self.fs.download_to_stream(self._id, stream)
148 stream.seek(0)
149 self.write(stream.read().decode("utf-8"))
150 stream.close()
151
152 if "r" in self.mode:
153 self.seek(0, 0)
154
155 def close(self):
156 if "r" in self.mode:
157 super(GridStringStream, self).close()
158 return
159
160 if self._id:
161 self.fs.delete(self._id)
162
163 cursor = self.fs.find(
164 {"filename": self.filename.split("/")[0], "metadata": {"type": "dir"}}
165 )
166
167 parent_dir = next(cursor, None)
168
169 if not parent_dir:
170 parent_dir_name = self.filename.split("/")[0]
171 self.filename = self.filename.replace(
172 parent_dir_name, parent_dir_name[:-1], 1
173 )
174
175 self.seek(0, 0)
176 stream = BytesIO()
177 stream.write(self.read().encode("utf-8"))
178 stream.seek(0, 0)
179 if self._id:
180 self.fs.upload_from_stream_with_id(
181 self._id, self.filename, stream, metadata={"type": self.file_type}
182 )
183 else:
184 self.fs.upload_from_stream(
185 self.filename, stream, metadata={"type": self.file_type}
186 )
187 stream.close()
188 super(GridStringStream, self).close()
189
190 def __enter__(self):
191 return self
192
193 def __exit__(self, exc_type, exc_val, exc_tb):
194 self.close()
195
196
197 class FsMongo(FsBase):
198 def __init__(self, logger_name="fs", lock=False):
199 super().__init__(logger_name, lock)
200 self.path = None
201 self.client = None
202 self.fs = None
203
204 def __update_local_fs(self, from_path=None):
205 dir_cursor = self.fs.find({"metadata.type": "dir"}, no_cursor_timeout=True)
206
207 valid_paths = []
208
209 for directory in dir_cursor:
210 if from_path and not directory.filename.startswith(from_path):
211 continue
212 self.logger.debug("Making dir {}".format(self.path + directory.filename))
213 os.makedirs(self.path + directory.filename, exist_ok=True)
214 valid_paths.append(self.path + directory.filename)
215
216 file_cursor = self.fs.find(
217 {"metadata.type": {"$in": ["file", "sym"]}}, no_cursor_timeout=True
218 )
219
220 for writing_file in file_cursor:
221 if from_path and not writing_file.filename.startswith(from_path):
222 continue
223 file_path = self.path + writing_file.filename
224
225 if writing_file.metadata["type"] == "sym":
226 with BytesIO() as b:
227 self.fs.download_to_stream(writing_file._id, b)
228 b.seek(0)
229 link = b.read().decode("utf-8")
230
231 try:
232 self.logger.debug("Sync removing {}".format(file_path))
233 os.remove(file_path)
234 except OSError as e:
235 if e.errno != errno.ENOENT:
236 # This is probably permission denied or worse
237 raise
238 os.symlink(link, file_path)
239 else:
240 folder = os.path.dirname(file_path)
241 if folder not in valid_paths:
242 self.logger.debug("Sync local directory {}".format(file_path))
243 os.makedirs(folder, exist_ok=True)
244 with open(file_path, "wb+") as file_stream:
245 self.logger.debug("Sync download {}".format(file_path))
246 self.fs.download_to_stream(writing_file._id, file_stream)
247 if "permissions" in writing_file.metadata:
248 os.chmod(file_path, writing_file.metadata["permissions"])
249
250 def get_params(self):
251 return {"fs": "mongo", "path": self.path}
252
253 def fs_connect(self, config):
254 try:
255 if "logger_name" in config:
256 self.logger = logging.getLogger(config["logger_name"])
257 if "path" in config:
258 self.path = config["path"]
259 else:
260 raise FsException('Missing parameter "path"')
261 if not self.path.endswith("/"):
262 self.path += "/"
263 if not os.path.exists(self.path):
264 raise FsException(
265 "Invalid configuration param at '[storage]': path '{}' does not exist".format(
266 config["path"]
267 )
268 )
269 elif not os.access(self.path, os.W_OK):
270 raise FsException(
271 "Invalid configuration param at '[storage]': path '{}' is not writable".format(
272 config["path"]
273 )
274 )
275 if all(key in config.keys() for key in ["uri", "collection"]):
276 self.client = MongoClient(config["uri"])
277 self.fs = GridFSBucket(self.client[config["collection"]])
278 else:
279 if "collection" not in config.keys():
280 raise FsException('Missing parameter "collection"')
281 else:
282 raise FsException('Missing parameters: "uri"')
283 except FsException:
284 raise
285 except Exception as e: # TODO refine
286 raise FsException(str(e))
287
288 def fs_disconnect(self):
289 pass # TODO
290
291 def mkdir(self, folder):
292 """
293 Creates a folder or parent object location
294 :param folder:
295 :return: None or raises an exception
296 """
297 folder = folder.rstrip("/")
298 try:
299 self.fs.upload_from_stream(folder, BytesIO(), metadata={"type": "dir"})
300 except errors.FileExists: # make it idempotent
301 pass
302 except Exception as e:
303 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
304
305 def dir_rename(self, src, dst):
306 """
307 Rename one directory name. If dst exist, it replaces (deletes) existing directory
308 :param src: source directory
309 :param dst: destination directory
310 :return: None or raises and exception
311 """
312 dst = dst.rstrip("/")
313 src = src.rstrip("/")
314
315 try:
316 dst_cursor = self.fs.find(
317 {"filename": {"$regex": "^{}(/|$)".format(dst)}}, no_cursor_timeout=True
318 )
319
320 for dst_file in dst_cursor:
321 self.fs.delete(dst_file._id)
322
323 src_cursor = self.fs.find(
324 {"filename": {"$regex": "^{}(/|$)".format(src)}}, no_cursor_timeout=True
325 )
326
327 for src_file in src_cursor:
328 self.fs.rename(src_file._id, src_file.filename.replace(src, dst, 1))
329 except Exception as e:
330 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
331
332 def file_exists(self, storage, mode=None):
333 """
334 Indicates if "storage" file exist
335 :param storage: can be a str or a str list
336 :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
337 :return: True, False
338 """
339 f = storage if isinstance(storage, str) else "/".join(storage)
340 f = f.rstrip("/")
341
342 cursor = self.fs.find({"filename": f})
343
344 for requested_file in cursor:
345 exception_file = next(cursor, None)
346
347 if exception_file:
348 raise FsException(
349 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
350 )
351
352 self.logger.debug("Entry {} metadata {}".format(f, requested_file.metadata))
353
354 # if no special mode is required just check it does exists
355 if not mode:
356 return True
357
358 if requested_file.metadata["type"] == mode:
359 return True
360
361 if requested_file.metadata["type"] == "sym" and mode == "file":
362 return True
363
364 return False
365
366 def file_size(self, storage):
367 """
368 return file size
369 :param storage: can be a str or a str list
370 :return: file size
371 """
372 f = storage if isinstance(storage, str) else "/".join(storage)
373 f = f.rstrip("/")
374
375 cursor = self.fs.find({"filename": f})
376
377 for requested_file in cursor:
378 exception_file = next(cursor, None)
379
380 if exception_file:
381 raise FsException(
382 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
383 )
384
385 return requested_file.length
386
387 def file_extract(self, compressed_object, path):
388 """
389 extract a tar file
390 :param compressed_object: object of type tar or zip
391 :param path: can be a str or a str list, or a tar object where to extract the tar_object
392 :return: None
393 """
394 f = path if isinstance(path, str) else "/".join(path)
395 f = f.rstrip("/")
396
397 if type(compressed_object) is tarfile.TarFile:
398 for member in compressed_object.getmembers():
399 if member.isfile():
400 stream = compressed_object.extractfile(member)
401 elif member.issym():
402 stream = BytesIO(member.linkname.encode("utf-8"))
403 else:
404 stream = BytesIO()
405
406 if member.isfile():
407 file_type = "file"
408 elif member.issym():
409 file_type = "sym"
410 else:
411 file_type = "dir"
412
413 metadata = {"type": file_type, "permissions": member.mode}
414 member.name = member.name.rstrip("/")
415
416 self.logger.debug("Uploading {}/{}".format(f, member.name))
417 self.fs.upload_from_stream(
418 f + "/" + member.name, stream, metadata=metadata
419 )
420
421 stream.close()
422 elif type(compressed_object) is zipfile.ZipFile:
423 for member in compressed_object.infolist():
424 if member.is_dir():
425 stream = BytesIO()
426 else:
427 stream = compressed_object.read(member)
428
429 if member.is_dir():
430 file_type = "dir"
431 else:
432 file_type = "file"
433
434 metadata = {"type": file_type}
435 member.filename = member.filename.rstrip("/")
436
437 self.logger.debug("Uploading {}/{}".format(f, member.filename))
438 self.fs.upload_from_stream(
439 f + "/" + member.filename, stream, metadata=metadata
440 )
441
442 if member.is_dir():
443 stream.close()
444
445 def file_open(self, storage, mode):
446 """
447 Open a file
448 :param storage: can be a str or list of str
449 :param mode: file mode
450 :return: file object
451 """
452 try:
453 f = storage if isinstance(storage, str) else "/".join(storage)
454 f = f.rstrip("/")
455
456 if "b" in mode:
457 return GridByteStream(f, self.fs, mode)
458 else:
459 return GridStringStream(f, self.fs, mode)
460 except errors.NoFile:
461 raise FsException(
462 "File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND
463 )
464 except IOError:
465 raise FsException(
466 "File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST
467 )
468
469 def dir_ls(self, storage):
470 """
471 return folder content
472 :param storage: can be a str or list of str
473 :return: folder content
474 """
475 try:
476 f = storage if isinstance(storage, str) else "/".join(storage)
477 f = f.rstrip("/")
478
479 files = []
480 dir_cursor = self.fs.find({"filename": f})
481 for requested_dir in dir_cursor:
482 exception_dir = next(dir_cursor, None)
483
484 if exception_dir:
485 raise FsException(
486 "Multiple directories found",
487 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
488 )
489
490 if requested_dir.metadata["type"] != "dir":
491 raise FsException(
492 "File {} does not exist".format(f),
493 http_code=HTTPStatus.NOT_FOUND,
494 )
495
496 if f.endswith("/"):
497 f = f[:-1]
498
499 files_cursor = self.fs.find(
500 {"filename": {"$regex": "^{}/([^/])*".format(f)}}
501 )
502 for children_file in files_cursor:
503 files += [children_file.filename.replace(f + "/", "", 1)]
504
505 return files
506 except IOError:
507 raise FsException(
508 "File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST
509 )
510
511 def file_delete(self, storage, ignore_non_exist=False):
512 """
513 Delete storage content recursively
514 :param storage: can be a str or list of str
515 :param ignore_non_exist: not raise exception if storage does not exist
516 :return: None
517 """
518 try:
519 f = storage if isinstance(storage, str) else "/".join(storage)
520 f = f.rstrip("/")
521
522 file_cursor = self.fs.find({"filename": f})
523 found = False
524 for requested_file in file_cursor:
525 found = True
526 exception_file = next(file_cursor, None)
527
528 if exception_file:
529 self.logger.error(
530 "Cannot delete duplicate file: {} and {}".format(
531 requested_file.filename, exception_file.filename
532 )
533 )
534 raise FsException(
535 "Multiple files found",
536 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
537 )
538
539 if requested_file.metadata["type"] == "dir":
540 dir_cursor = self.fs.find(
541 {"filename": {"$regex": "^{}/".format(f)}}
542 )
543
544 for tmp in dir_cursor:
545 self.logger.debug("Deleting {}".format(tmp.filename))
546 self.fs.delete(tmp._id)
547
548 self.logger.debug("Deleting {}".format(requested_file.filename))
549 self.fs.delete(requested_file._id)
550 if not found and not ignore_non_exist:
551 raise FsException(
552 "File {} does not exist".format(storage),
553 http_code=HTTPStatus.NOT_FOUND,
554 )
555 except IOError as e:
556 raise FsException(
557 "File {} cannot be deleted: {}".format(f, e),
558 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
559 )
560
561 def sync(self, from_path=None):
562 """
563 Sync from FSMongo to local storage
564 :param from_path: if supplied, only copy content from this path, not all
565 :return: None
566 """
567 if from_path:
568 if os.path.isabs(from_path):
569 from_path = os.path.relpath(from_path, self.path)
570 self.__update_local_fs(from_path=from_path)
571
572 def _update_mongo_fs(self, from_path):
573
574 os_path = self.path + from_path
575
576 # Obtain list of files and dirs in filesystem
577 members = []
578 for root, dirs, files in os.walk(os_path):
579 for folder in dirs:
580 member = {"filename": os.path.join(root, folder), "type": "dir"}
581 if os.path.islink(member["filename"]):
582 member["type"] = "sym"
583 members.append(member)
584 for file in files:
585 filename = os.path.join(root, file)
586 if os.path.islink(filename):
587 file_type = "sym"
588 else:
589 file_type = "file"
590 member = {"filename": os.path.join(root, file), "type": file_type}
591 members.append(member)
592
593 # Obtain files in mongo dict
594 remote_files = self._get_mongo_files(from_path)
595
596 # Upload members if they do not exists or have been modified
597 # We will do this for performance (avoid updating unmodified files) and to avoid
598 # updating a file with an older one in case there are two sources for synchronization
599 # in high availability scenarios
600 for member in members:
601 # obtain permission
602 mask = int(oct(os.stat(member["filename"]).st_mode)[-3:], 8)
603
604 # convert to relative path
605 rel_filename = os.path.relpath(member["filename"], self.path)
606 last_modified_date = datetime.datetime.fromtimestamp(
607 os.path.getmtime(member["filename"])
608 )
609
610 remote_file = remote_files.get(rel_filename)
611 upload_date = (
612 remote_file[0].uploadDate if remote_file else datetime.datetime.min
613 )
614 # remove processed files from dict
615 remote_files.pop(rel_filename, None)
616
617 if last_modified_date >= upload_date:
618
619 stream = None
620 fh = None
621 try:
622 file_type = member["type"]
623 if file_type == "dir":
624 stream = BytesIO()
625 elif file_type == "sym":
626 stream = BytesIO(
627 os.readlink(member["filename"]).encode("utf-8")
628 )
629 else:
630 fh = open(member["filename"], "rb")
631 stream = BytesIO(fh.read())
632
633 metadata = {"type": file_type, "permissions": mask}
634
635 self.logger.debug("Sync upload {}".format(rel_filename))
636 self.fs.upload_from_stream(rel_filename, stream, metadata=metadata)
637
638 # delete old files
639 if remote_file:
640 for file in remote_file:
641 self.logger.debug("Sync deleting {}".format(file.filename))
642 self.fs.delete(file._id)
643 finally:
644 if fh:
645 fh.close()
646 if stream:
647 stream.close()
648
649 # delete files that are not any more in local fs
650 for remote_file in remote_files.values():
651 for file in remote_file:
652 self.fs.delete(file._id)
653
654 def _get_mongo_files(self, from_path=None):
655
656 file_dict = {}
657 file_cursor = self.fs.find(no_cursor_timeout=True, sort=[("uploadDate", -1)])
658 for file in file_cursor:
659 if from_path and not file.filename.startswith(from_path):
660 continue
661 if file.filename in file_dict:
662 file_dict[file.filename].append(file)
663 else:
664 file_dict[file.filename] = [file]
665 return file_dict
666
667 def reverse_sync(self, from_path: str):
668 """
669 Sync from local storage to FSMongo
670 :param from_path: base directory to upload content to mongo fs
671 :return: None
672 """
673 if os.path.isabs(from_path):
674 from_path = os.path.relpath(from_path, self.path)
675 self._update_mongo_fs(from_path=from_path)