Feature 10950 Replace pycrypto with pycryptodome
[osm/common.git] / osm_common / fsmongo.py
1 # Copyright 2019 Canonical
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
13 # under the License.
14 #
15 # For those usages not covered by the Apache License, Version 2.0 please
16 # contact: eduardo.sousa@canonical.com
17 ##
18 import datetime
19 import errno
20 from http import HTTPStatus
21 from io import BytesIO, StringIO
22 import logging
23 import os
24 import tarfile
25 import zipfile
26
27 from gridfs import errors, GridFSBucket
28 from osm_common.fsbase import FsBase, FsException
29 from pymongo import MongoClient
30
31
32 __author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>"
33
34
35 class GridByteStream(BytesIO):
36 def __init__(self, filename, fs, mode):
37 BytesIO.__init__(self)
38 self._id = None
39 self.filename = filename
40 self.fs = fs
41 self.mode = mode
42 self.file_type = "file" # Set "file" as default file_type
43
44 self.__initialize__()
45
46 def __initialize__(self):
47 grid_file = None
48
49 cursor = self.fs.find({"filename": self.filename})
50
51 for requested_file in cursor:
52 exception_file = next(cursor, None)
53
54 if exception_file:
55 raise FsException(
56 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
57 )
58
59 if requested_file.metadata["type"] in ("file", "sym"):
60 grid_file = requested_file
61 self.file_type = requested_file.metadata["type"]
62 else:
63 raise FsException(
64 "Type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
65 )
66
67 if grid_file:
68 self._id = grid_file._id
69 self.fs.download_to_stream(self._id, self)
70
71 if "r" in self.mode:
72 self.seek(0, 0)
73
74 def close(self):
75 if "r" in self.mode:
76 super(GridByteStream, self).close()
77 return
78
79 if self._id:
80 self.fs.delete(self._id)
81
82 cursor = self.fs.find(
83 {"filename": self.filename.split("/")[0], "metadata": {"type": "dir"}}
84 )
85
86 parent_dir = next(cursor, None)
87
88 if not parent_dir:
89 parent_dir_name = self.filename.split("/")[0]
90 self.filename = self.filename.replace(
91 parent_dir_name, parent_dir_name[:-1], 1
92 )
93
94 self.seek(0, 0)
95 if self._id:
96 self.fs.upload_from_stream_with_id(
97 self._id, self.filename, self, metadata={"type": self.file_type}
98 )
99 else:
100 self.fs.upload_from_stream(
101 self.filename, self, metadata={"type": self.file_type}
102 )
103 super(GridByteStream, self).close()
104
105 def __enter__(self):
106 return self
107
108 def __exit__(self, exc_type, exc_val, exc_tb):
109 self.close()
110
111
112 class GridStringStream(StringIO):
113 def __init__(self, filename, fs, mode):
114 StringIO.__init__(self)
115 self._id = None
116 self.filename = filename
117 self.fs = fs
118 self.mode = mode
119 self.file_type = "file" # Set "file" as default file_type
120
121 self.__initialize__()
122
123 def __initialize__(self):
124 grid_file = None
125
126 cursor = self.fs.find({"filename": self.filename})
127
128 for requested_file in cursor:
129 exception_file = next(cursor, None)
130
131 if exception_file:
132 raise FsException(
133 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
134 )
135
136 if requested_file.metadata["type"] in ("file", "dir"):
137 grid_file = requested_file
138 self.file_type = requested_file.metadata["type"]
139 else:
140 raise FsException(
141 "File type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
142 )
143
144 if grid_file:
145 stream = BytesIO()
146 self._id = grid_file._id
147 self.fs.download_to_stream(self._id, stream)
148 stream.seek(0)
149 self.write(stream.read().decode("utf-8"))
150 stream.close()
151
152 if "r" in self.mode:
153 self.seek(0, 0)
154
155 def close(self):
156 if "r" in self.mode:
157 super(GridStringStream, self).close()
158 return
159
160 if self._id:
161 self.fs.delete(self._id)
162
163 cursor = self.fs.find(
164 {"filename": self.filename.split("/")[0], "metadata": {"type": "dir"}}
165 )
166
167 parent_dir = next(cursor, None)
168
169 if not parent_dir:
170 parent_dir_name = self.filename.split("/")[0]
171 self.filename = self.filename.replace(
172 parent_dir_name, parent_dir_name[:-1], 1
173 )
174
175 self.seek(0, 0)
176 stream = BytesIO()
177 stream.write(self.read().encode("utf-8"))
178 stream.seek(0, 0)
179 if self._id:
180 self.fs.upload_from_stream_with_id(
181 self._id, self.filename, stream, metadata={"type": self.file_type}
182 )
183 else:
184 self.fs.upload_from_stream(
185 self.filename, stream, metadata={"type": self.file_type}
186 )
187 stream.close()
188 super(GridStringStream, self).close()
189
190 def __enter__(self):
191 return self
192
193 def __exit__(self, exc_type, exc_val, exc_tb):
194 self.close()
195
196
197 class FsMongo(FsBase):
198 def __init__(self, logger_name="fs", lock=False):
199 super().__init__(logger_name, lock)
200 self.path = None
201 self.client = None
202 self.fs = None
203
204 def __update_local_fs(self, from_path=None):
205 dir_cursor = self.fs.find({"metadata.type": "dir"}, no_cursor_timeout=True)
206
207 valid_paths = []
208
209 for directory in dir_cursor:
210 if from_path and not directory.filename.startswith(from_path):
211 continue
212 self.logger.debug("Making dir {}".format(self.path + directory.filename))
213 os.makedirs(self.path + directory.filename, exist_ok=True)
214 valid_paths.append(self.path + directory.filename)
215
216 file_cursor = self.fs.find(
217 {"metadata.type": {"$in": ["file", "sym"]}}, no_cursor_timeout=True
218 )
219
220 for writing_file in file_cursor:
221 if from_path and not writing_file.filename.startswith(from_path):
222 continue
223 file_path = self.path + writing_file.filename
224
225 if writing_file.metadata["type"] == "sym":
226 with BytesIO() as b:
227 self.fs.download_to_stream(writing_file._id, b)
228 b.seek(0)
229 link = b.read().decode("utf-8")
230
231 try:
232 self.logger.debug("Sync removing {}".format(file_path))
233 os.remove(file_path)
234 except OSError as e:
235 if e.errno != errno.ENOENT:
236 # This is probably permission denied or worse
237 raise
238 os.symlink(link, file_path)
239 else:
240 folder = os.path.dirname(file_path)
241 if folder not in valid_paths:
242 self.logger.debug("Sync local directory {}".format(file_path))
243 os.makedirs(folder, exist_ok=True)
244 with open(file_path, "wb+") as file_stream:
245 self.logger.debug("Sync download {}".format(file_path))
246 self.fs.download_to_stream(writing_file._id, file_stream)
247 if "permissions" in writing_file.metadata:
248 os.chmod(file_path, writing_file.metadata["permissions"])
249
250 def get_params(self):
251 return {"fs": "mongo", "path": self.path}
252
253 def fs_connect(self, config):
254 try:
255 if "logger_name" in config:
256 self.logger = logging.getLogger(config["logger_name"])
257 if "path" in config:
258 self.path = config["path"]
259 else:
260 raise FsException('Missing parameter "path"')
261 if not self.path.endswith("/"):
262 self.path += "/"
263 if not os.path.exists(self.path):
264 raise FsException(
265 "Invalid configuration param at '[storage]': path '{}' does not exist".format(
266 config["path"]
267 )
268 )
269 elif not os.access(self.path, os.W_OK):
270 raise FsException(
271 "Invalid configuration param at '[storage]': path '{}' is not writable".format(
272 config["path"]
273 )
274 )
275 if all(key in config.keys() for key in ["uri", "collection"]):
276 self.client = MongoClient(config["uri"])
277 self.fs = GridFSBucket(self.client[config["collection"]])
278 else:
279 if "collection" not in config.keys():
280 raise FsException('Missing parameter "collection"')
281 else:
282 raise FsException('Missing parameters: "uri"')
283 except FsException:
284 raise
285 except Exception as e: # TODO refine
286 raise FsException(str(e))
287
288 def fs_disconnect(self):
289 pass # TODO
290
291 def mkdir(self, folder):
292 """
293 Creates a folder or parent object location
294 :param folder:
295 :return: None or raises an exception
296 """
297 folder = folder.rstrip("/")
298 try:
299 self.fs.upload_from_stream(folder, BytesIO(), metadata={"type": "dir"})
300 except errors.FileExists: # make it idempotent
301 pass
302 except Exception as e:
303 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
304
305 def dir_rename(self, src, dst):
306 """
307 Rename one directory name. If dst exist, it replaces (deletes) existing directory
308 :param src: source directory
309 :param dst: destination directory
310 :return: None or raises and exception
311 """
312 dst = dst.rstrip("/")
313 src = src.rstrip("/")
314
315 try:
316 dst_cursor = self.fs.find(
317 {"filename": {"$regex": "^{}(/|$)".format(dst)}}, no_cursor_timeout=True
318 )
319
320 for dst_file in dst_cursor:
321 self.fs.delete(dst_file._id)
322
323 src_cursor = self.fs.find(
324 {"filename": {"$regex": "^{}(/|$)".format(src)}}, no_cursor_timeout=True
325 )
326
327 for src_file in src_cursor:
328 self.fs.rename(src_file._id, src_file.filename.replace(src, dst, 1))
329 except Exception as e:
330 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
331
332 def file_exists(self, storage, mode=None):
333 """
334 Indicates if "storage" file exist
335 :param storage: can be a str or a str list
336 :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
337 :return: True, False
338 """
339 f = storage if isinstance(storage, str) else "/".join(storage)
340 f = f.rstrip("/")
341
342 cursor = self.fs.find({"filename": f})
343
344 for requested_file in cursor:
345 exception_file = next(cursor, None)
346
347 if exception_file:
348 raise FsException(
349 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
350 )
351
352 self.logger.debug("Entry {} metadata {}".format(f, requested_file.metadata))
353
354 # if no special mode is required just check it does exists
355 if not mode:
356 return True
357
358 if requested_file.metadata["type"] == mode:
359 return True
360
361 if requested_file.metadata["type"] == "sym" and mode == "file":
362 return True
363
364 return False
365
366 def file_size(self, storage):
367 """
368 return file size
369 :param storage: can be a str or a str list
370 :return: file size
371 """
372 f = storage if isinstance(storage, str) else "/".join(storage)
373 f = f.rstrip("/")
374
375 cursor = self.fs.find({"filename": f})
376
377 for requested_file in cursor:
378 exception_file = next(cursor, None)
379
380 if exception_file:
381 raise FsException(
382 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
383 )
384
385 return requested_file.length
386
387 def file_extract(self, compressed_object, path):
388 """
389 extract a tar file
390 :param compressed_object: object of type tar or zip
391 :param path: can be a str or a str list, or a tar object where to extract the tar_object
392 :return: None
393 """
394 f = path if isinstance(path, str) else "/".join(path)
395 f = f.rstrip("/")
396
397 if type(compressed_object) is tarfile.TarFile:
398 for member in compressed_object.getmembers():
399 if member.isfile():
400 stream = compressed_object.extractfile(member)
401 elif member.issym():
402 stream = BytesIO(member.linkname.encode("utf-8"))
403 else:
404 stream = BytesIO()
405
406 if member.isfile():
407 file_type = "file"
408 elif member.issym():
409 file_type = "sym"
410 else:
411 file_type = "dir"
412
413 metadata = {"type": file_type, "permissions": member.mode}
414 member.name = member.name.rstrip("/")
415
416 self.logger.debug("Uploading {}/{}".format(f, member.name))
417 self.fs.upload_from_stream(
418 f + "/" + member.name, stream, metadata=metadata
419 )
420
421 stream.close()
422 elif type(compressed_object) is zipfile.ZipFile:
423 for member in compressed_object.infolist():
424 if member.is_dir():
425 stream = BytesIO()
426 else:
427 stream = compressed_object.read(member)
428
429 if member.is_dir():
430 file_type = "dir"
431 else:
432 file_type = "file"
433
434 metadata = {"type": file_type}
435 member.filename = member.filename.rstrip("/")
436
437 self.logger.debug("Uploading {}/{}".format(f, member.filename))
438 self.fs.upload_from_stream(
439 f + "/" + member.filename, stream, metadata=metadata
440 )
441
442 if member.is_dir():
443 stream.close()
444
445 def file_open(self, storage, mode):
446 """
447 Open a file
448 :param storage: can be a str or list of str
449 :param mode: file mode
450 :return: file object
451 """
452 try:
453 f = storage if isinstance(storage, str) else "/".join(storage)
454 f = f.rstrip("/")
455
456 if "b" in mode:
457 return GridByteStream(f, self.fs, mode)
458 else:
459 return GridStringStream(f, self.fs, mode)
460 except errors.NoFile:
461 raise FsException(
462 "File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND
463 )
464 except IOError:
465 raise FsException(
466 "File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST
467 )
468
469 def dir_ls(self, storage):
470 """
471 return folder content
472 :param storage: can be a str or list of str
473 :return: folder content
474 """
475 try:
476 f = storage if isinstance(storage, str) else "/".join(storage)
477 f = f.rstrip("/")
478
479 files = []
480 dir_cursor = self.fs.find({"filename": f})
481 for requested_dir in dir_cursor:
482 exception_dir = next(dir_cursor, None)
483
484 if exception_dir:
485 raise FsException(
486 "Multiple directories found",
487 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
488 )
489
490 if requested_dir.metadata["type"] != "dir":
491 raise FsException(
492 "File {} does not exist".format(f),
493 http_code=HTTPStatus.NOT_FOUND,
494 )
495
496 if f.endswith("/"):
497 f = f[:-1]
498
499 files_cursor = self.fs.find(
500 {"filename": {"$regex": "^{}/([^/])*".format(f)}}
501 )
502 for children_file in files_cursor:
503 files += [children_file.filename.replace(f + "/", "", 1)]
504
505 return files
506 except IOError:
507 raise FsException(
508 "File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST
509 )
510
511 def file_delete(self, storage, ignore_non_exist=False):
512 """
513 Delete storage content recursively
514 :param storage: can be a str or list of str
515 :param ignore_non_exist: not raise exception if storage does not exist
516 :return: None
517 """
518 try:
519 f = storage if isinstance(storage, str) else "/".join(storage)
520 f = f.rstrip("/")
521
522 file_cursor = self.fs.find({"filename": f})
523 found = False
524 for requested_file in file_cursor:
525 found = True
526 exception_file = next(file_cursor, None)
527
528 if exception_file:
529 self.logger.error(
530 "Cannot delete duplicate file: {} and {}".format(
531 requested_file.filename, exception_file.filename
532 )
533 )
534 raise FsException(
535 "Multiple files found",
536 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
537 )
538
539 if requested_file.metadata["type"] == "dir":
540 dir_cursor = self.fs.find(
541 {"filename": {"$regex": "^{}/".format(f)}}
542 )
543
544 for tmp in dir_cursor:
545 self.logger.debug("Deleting {}".format(tmp.filename))
546 self.fs.delete(tmp._id)
547
548 self.logger.debug("Deleting {}".format(requested_file.filename))
549 self.fs.delete(requested_file._id)
550 if not found and not ignore_non_exist:
551 raise FsException(
552 "File {} does not exist".format(storage),
553 http_code=HTTPStatus.NOT_FOUND,
554 )
555 except IOError as e:
556 raise FsException(
557 "File {} cannot be deleted: {}".format(f, e),
558 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
559 )
560
561 def sync(self, from_path=None):
562 """
563 Sync from FSMongo to local storage
564 :param from_path: if supplied, only copy content from this path, not all
565 :return: None
566 """
567 if from_path:
568 if os.path.isabs(from_path):
569 from_path = os.path.relpath(from_path, self.path)
570 self.__update_local_fs(from_path=from_path)
571
572 def _update_mongo_fs(self, from_path):
573 os_path = self.path + from_path
574 # Obtain list of files and dirs in filesystem
575 members = []
576 for root, dirs, files in os.walk(os_path):
577 for folder in dirs:
578 member = {"filename": os.path.join(root, folder), "type": "dir"}
579 if os.path.islink(member["filename"]):
580 member["type"] = "sym"
581 members.append(member)
582 for file in files:
583 filename = os.path.join(root, file)
584 if os.path.islink(filename):
585 file_type = "sym"
586 else:
587 file_type = "file"
588 member = {"filename": os.path.join(root, file), "type": file_type}
589 members.append(member)
590
591 # Obtain files in mongo dict
592 remote_files = self._get_mongo_files(from_path)
593
594 # Upload members if they do not exists or have been modified
595 # We will do this for performance (avoid updating unmodified files) and to avoid
596 # updating a file with an older one in case there are two sources for synchronization
597 # in high availability scenarios
598 for member in members:
599 # obtain permission
600 mask = int(oct(os.stat(member["filename"]).st_mode)[-3:], 8)
601
602 # convert to relative path
603 rel_filename = os.path.relpath(member["filename"], self.path)
604 last_modified_date = datetime.datetime.fromtimestamp(
605 os.path.getmtime(member["filename"])
606 )
607
608 remote_file = remote_files.get(rel_filename)
609 upload_date = (
610 remote_file[0].uploadDate if remote_file else datetime.datetime.min
611 )
612 # remove processed files from dict
613 remote_files.pop(rel_filename, None)
614
615 if last_modified_date >= upload_date:
616 stream = None
617 fh = None
618 try:
619 file_type = member["type"]
620 if file_type == "dir":
621 stream = BytesIO()
622 elif file_type == "sym":
623 stream = BytesIO(
624 os.readlink(member["filename"]).encode("utf-8")
625 )
626 else:
627 fh = open(member["filename"], "rb")
628 stream = BytesIO(fh.read())
629
630 metadata = {"type": file_type, "permissions": mask}
631
632 self.logger.debug("Sync upload {}".format(rel_filename))
633 self.fs.upload_from_stream(rel_filename, stream, metadata=metadata)
634
635 # delete old files
636 if remote_file:
637 for file in remote_file:
638 self.logger.debug("Sync deleting {}".format(file.filename))
639 self.fs.delete(file._id)
640 finally:
641 if fh:
642 fh.close()
643 if stream:
644 stream.close()
645
646 # delete files that are not anymore in local fs
647 for remote_file in remote_files.values():
648 for file in remote_file:
649 self.fs.delete(file._id)
650
651 def _get_mongo_files(self, from_path=None):
652 file_dict = {}
653 file_cursor = self.fs.find(no_cursor_timeout=True, sort=[("uploadDate", -1)])
654 for file in file_cursor:
655 if from_path and not file.filename.startswith(from_path):
656 continue
657 if file.filename in file_dict:
658 file_dict[file.filename].append(file)
659 else:
660 file_dict[file.filename] = [file]
661 return file_dict
662
663 def reverse_sync(self, from_path: str):
664 """
665 Sync from local storage to FSMongo
666 :param from_path: base directory to upload content to mongo fs
667 :return: None
668 """
669 if os.path.isabs(from_path):
670 from_path = os.path.relpath(from_path, self.path)
671 self._update_mongo_fs(from_path=from_path)