a057e378d6da14bbd5a382972d079fa1a15bc12d
[osm/common.git] / osm_common / fsmongo.py
1 # Copyright 2019 Canonical
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
13 # under the License.
14 #
15 # For those usages not covered by the Apache License, Version 2.0 please
16 # contact: eduardo.sousa@canonical.com
17 ##
18
19 import errno
20 from http import HTTPStatus
21 from io import BytesIO, StringIO
22 import logging
23 import os
24 import datetime
25 import tarfile
26 import zipfile
27
28 from gridfs import GridFSBucket, errors
29 from osm_common.fsbase import FsBase, FsException
30 from pymongo import MongoClient
31
32
33 __author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>"
34
35
36 class GridByteStream(BytesIO):
37 def __init__(self, filename, fs, mode):
38 BytesIO.__init__(self)
39 self._id = None
40 self.filename = filename
41 self.fs = fs
42 self.mode = mode
43 self.file_type = "file" # Set "file" as default file_type
44
45 self.__initialize__()
46
47 def __initialize__(self):
48 grid_file = None
49
50 cursor = self.fs.find({"filename": self.filename})
51
52 for requested_file in cursor:
53 exception_file = next(cursor, None)
54
55 if exception_file:
56 raise FsException(
57 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
58 )
59
60 if requested_file.metadata["type"] in ("file", "sym"):
61 grid_file = requested_file
62 self.file_type = requested_file.metadata["type"]
63 else:
64 raise FsException(
65 "Type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
66 )
67
68 if grid_file:
69 self._id = grid_file._id
70 self.fs.download_to_stream(self._id, self)
71
72 if "r" in self.mode:
73 self.seek(0, 0)
74
75 def close(self):
76 if "r" in self.mode:
77 super(GridByteStream, self).close()
78 return
79
80 if self._id:
81 self.fs.delete(self._id)
82
83 cursor = self.fs.find(
84 {"filename": self.filename.split("/")[0], "metadata": {"type": "dir"}}
85 )
86
87 parent_dir = next(cursor, None)
88
89 if not parent_dir:
90 parent_dir_name = self.filename.split("/")[0]
91 self.filename = self.filename.replace(
92 parent_dir_name, parent_dir_name[:-1], 1
93 )
94
95 self.seek(0, 0)
96 if self._id:
97 self.fs.upload_from_stream_with_id(
98 self._id, self.filename, self, metadata={"type": self.file_type}
99 )
100 else:
101 self.fs.upload_from_stream(
102 self.filename, self, metadata={"type": self.file_type}
103 )
104 super(GridByteStream, self).close()
105
106 def __enter__(self):
107 return self
108
109 def __exit__(self, exc_type, exc_val, exc_tb):
110 self.close()
111
112
113 class GridStringStream(StringIO):
114 def __init__(self, filename, fs, mode):
115 StringIO.__init__(self)
116 self._id = None
117 self.filename = filename
118 self.fs = fs
119 self.mode = mode
120 self.file_type = "file" # Set "file" as default file_type
121
122 self.__initialize__()
123
124 def __initialize__(self):
125 grid_file = None
126
127 cursor = self.fs.find({"filename": self.filename})
128
129 for requested_file in cursor:
130 exception_file = next(cursor, None)
131
132 if exception_file:
133 raise FsException(
134 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
135 )
136
137 if requested_file.metadata["type"] in ("file", "dir"):
138 grid_file = requested_file
139 self.file_type = requested_file.metadata["type"]
140 else:
141 raise FsException(
142 "File type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
143 )
144
145 if grid_file:
146 stream = BytesIO()
147 self._id = grid_file._id
148 self.fs.download_to_stream(self._id, stream)
149 stream.seek(0)
150 self.write(stream.read().decode("utf-8"))
151 stream.close()
152
153 if "r" in self.mode:
154 self.seek(0, 0)
155
156 def close(self):
157 if "r" in self.mode:
158 super(GridStringStream, self).close()
159 return
160
161 if self._id:
162 self.fs.delete(self._id)
163
164 cursor = self.fs.find(
165 {"filename": self.filename.split("/")[0], "metadata": {"type": "dir"}}
166 )
167
168 parent_dir = next(cursor, None)
169
170 if not parent_dir:
171 parent_dir_name = self.filename.split("/")[0]
172 self.filename = self.filename.replace(
173 parent_dir_name, parent_dir_name[:-1], 1
174 )
175
176 self.seek(0, 0)
177 stream = BytesIO()
178 stream.write(self.read().encode("utf-8"))
179 stream.seek(0, 0)
180 if self._id:
181 self.fs.upload_from_stream_with_id(
182 self._id, self.filename, stream, metadata={"type": self.file_type}
183 )
184 else:
185 self.fs.upload_from_stream(
186 self.filename, stream, metadata={"type": self.file_type}
187 )
188 stream.close()
189 super(GridStringStream, self).close()
190
191 def __enter__(self):
192 return self
193
194 def __exit__(self, exc_type, exc_val, exc_tb):
195 self.close()
196
197
198 class FsMongo(FsBase):
199 def __init__(self, logger_name="fs", lock=False):
200 super().__init__(logger_name, lock)
201 self.path = None
202 self.client = None
203 self.fs = None
204
205 def __update_local_fs(self, from_path=None):
206 dir_cursor = self.fs.find({"metadata.type": "dir"}, no_cursor_timeout=True)
207
208 valid_paths = []
209
210 for directory in dir_cursor:
211 if from_path and not directory.filename.startswith(from_path):
212 continue
213 os.makedirs(self.path + directory.filename, exist_ok=True)
214 valid_paths.append(self.path + directory.filename)
215
216 file_cursor = self.fs.find(
217 {"metadata.type": {"$in": ["file", "sym"]}}, no_cursor_timeout=True
218 )
219
220 for writing_file in file_cursor:
221 if from_path and not writing_file.filename.startswith(from_path):
222 continue
223 file_path = self.path + writing_file.filename
224
225 if writing_file.metadata["type"] == "sym":
226 with BytesIO() as b:
227 self.fs.download_to_stream(writing_file._id, b)
228 b.seek(0)
229 link = b.read().decode("utf-8")
230
231 try:
232 os.remove(file_path)
233 except OSError as e:
234 if e.errno != errno.ENOENT:
235 # This is probably permission denied or worse
236 raise
237 os.symlink(link, file_path)
238 else:
239 folder = os.path.dirname(file_path)
240 if folder not in valid_paths:
241 os.makedirs(folder, exist_ok=True)
242 with open(file_path, "wb+") as file_stream:
243 self.fs.download_to_stream(writing_file._id, file_stream)
244 if "permissions" in writing_file.metadata:
245 os.chmod(file_path, writing_file.metadata["permissions"])
246
247 def get_params(self):
248 return {"fs": "mongo", "path": self.path}
249
250 def fs_connect(self, config):
251 try:
252 if "logger_name" in config:
253 self.logger = logging.getLogger(config["logger_name"])
254 if "path" in config:
255 self.path = config["path"]
256 else:
257 raise FsException('Missing parameter "path"')
258 if not self.path.endswith("/"):
259 self.path += "/"
260 if not os.path.exists(self.path):
261 raise FsException(
262 "Invalid configuration param at '[storage]': path '{}' does not exist".format(
263 config["path"]
264 )
265 )
266 elif not os.access(self.path, os.W_OK):
267 raise FsException(
268 "Invalid configuration param at '[storage]': path '{}' is not writable".format(
269 config["path"]
270 )
271 )
272 if all(key in config.keys() for key in ["uri", "collection"]):
273 self.client = MongoClient(config["uri"])
274 self.fs = GridFSBucket(self.client[config["collection"]])
275 elif all(key in config.keys() for key in ["host", "port", "collection"]):
276 self.client = MongoClient(config["host"], config["port"])
277 self.fs = GridFSBucket(self.client[config["collection"]])
278 else:
279 if "collection" not in config.keys():
280 raise FsException('Missing parameter "collection"')
281 else:
282 raise FsException('Missing parameters: "uri" or "host" + "port"')
283 except FsException:
284 raise
285 except Exception as e: # TODO refine
286 raise FsException(str(e))
287
288 def fs_disconnect(self):
289 pass # TODO
290
291 def mkdir(self, folder):
292 """
293 Creates a folder or parent object location
294 :param folder:
295 :return: None or raises an exception
296 """
297 folder = folder.rstrip("/")
298 try:
299 self.fs.upload_from_stream(folder, BytesIO(), metadata={"type": "dir"})
300 except errors.FileExists: # make it idempotent
301 pass
302 except Exception as e:
303 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
304
305 def dir_rename(self, src, dst):
306 """
307 Rename one directory name. If dst exist, it replaces (deletes) existing directory
308 :param src: source directory
309 :param dst: destination directory
310 :return: None or raises and exception
311 """
312 dst = dst.rstrip("/")
313 src = src.rstrip("/")
314
315 try:
316 dst_cursor = self.fs.find(
317 {"filename": {"$regex": "^{}(/|$)".format(dst)}}, no_cursor_timeout=True
318 )
319
320 for dst_file in dst_cursor:
321 self.fs.delete(dst_file._id)
322
323 src_cursor = self.fs.find(
324 {"filename": {"$regex": "^{}(/|$)".format(src)}}, no_cursor_timeout=True
325 )
326
327 for src_file in src_cursor:
328 self.fs.rename(src_file._id, src_file.filename.replace(src, dst, 1))
329 except Exception as e:
330 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
331
332 def file_exists(self, storage, mode=None):
333 """
334 Indicates if "storage" file exist
335 :param storage: can be a str or a str list
336 :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
337 :return: True, False
338 """
339 f = storage if isinstance(storage, str) else "/".join(storage)
340 f = f.rstrip("/")
341
342 cursor = self.fs.find({"filename": f})
343
344 for requested_file in cursor:
345 exception_file = next(cursor, None)
346
347 if exception_file:
348 raise FsException(
349 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
350 )
351
352 self.logger.debug("Entry {} metadata {}".format(f, requested_file.metadata))
353
354 # if no special mode is required just check it does exists
355 if not mode:
356 return True
357
358 if requested_file.metadata["type"] == mode:
359 return True
360
361 if requested_file.metadata["type"] == "sym" and mode == "file":
362 return True
363
364 return False
365
366 def file_size(self, storage):
367 """
368 return file size
369 :param storage: can be a str or a str list
370 :return: file size
371 """
372 f = storage if isinstance(storage, str) else "/".join(storage)
373 f = f.rstrip("/")
374
375 cursor = self.fs.find({"filename": f})
376
377 for requested_file in cursor:
378 exception_file = next(cursor, None)
379
380 if exception_file:
381 raise FsException(
382 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
383 )
384
385 return requested_file.length
386
387 def file_extract(self, compressed_object, path):
388 """
389 extract a tar file
390 :param compressed_object: object of type tar or zip
391 :param path: can be a str or a str list, or a tar object where to extract the tar_object
392 :return: None
393 """
394 f = path if isinstance(path, str) else "/".join(path)
395 f = f.rstrip("/")
396
397 if type(compressed_object) is tarfile.TarFile:
398 for member in compressed_object.getmembers():
399 if member.isfile():
400 stream = compressed_object.extractfile(member)
401 elif member.issym():
402 stream = BytesIO(member.linkname.encode("utf-8"))
403 else:
404 stream = BytesIO()
405
406 if member.isfile():
407 file_type = "file"
408 elif member.issym():
409 file_type = "sym"
410 else:
411 file_type = "dir"
412
413 metadata = {"type": file_type, "permissions": member.mode}
414 member.name = member.name.rstrip("/")
415
416 self.logger.debug("Uploading {}".format(member.name))
417 self.fs.upload_from_stream(
418 f + "/" + member.name, stream, metadata=metadata
419 )
420
421 stream.close()
422 elif type(compressed_object) is zipfile.ZipFile:
423 for member in compressed_object.infolist():
424 if member.is_dir():
425 stream = BytesIO()
426 else:
427 stream = compressed_object.read(member)
428
429 if member.is_dir():
430 file_type = "dir"
431 else:
432 file_type = "file"
433
434 metadata = {"type": file_type}
435 member.filename = member.filename.rstrip("/")
436
437 self.logger.debug("Uploading {}".format(member.filename))
438 self.fs.upload_from_stream(
439 f + "/" + member.filename, stream, metadata=metadata
440 )
441
442 if member.is_dir():
443 stream.close()
444
445 def file_open(self, storage, mode):
446 """
447 Open a file
448 :param storage: can be a str or list of str
449 :param mode: file mode
450 :return: file object
451 """
452 try:
453 f = storage if isinstance(storage, str) else "/".join(storage)
454 f = f.rstrip("/")
455
456 if "b" in mode:
457 return GridByteStream(f, self.fs, mode)
458 else:
459 return GridStringStream(f, self.fs, mode)
460 except errors.NoFile:
461 raise FsException(
462 "File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND
463 )
464 except IOError:
465 raise FsException(
466 "File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST
467 )
468
469 def dir_ls(self, storage):
470 """
471 return folder content
472 :param storage: can be a str or list of str
473 :return: folder content
474 """
475 try:
476 f = storage if isinstance(storage, str) else "/".join(storage)
477 f = f.rstrip("/")
478
479 files = []
480 dir_cursor = self.fs.find({"filename": f})
481 for requested_dir in dir_cursor:
482 exception_dir = next(dir_cursor, None)
483
484 if exception_dir:
485 raise FsException(
486 "Multiple directories found",
487 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
488 )
489
490 if requested_dir.metadata["type"] != "dir":
491 raise FsException(
492 "File {} does not exist".format(f),
493 http_code=HTTPStatus.NOT_FOUND,
494 )
495
496 if f.endswith("/"):
497 f = f[:-1]
498
499 files_cursor = self.fs.find(
500 {"filename": {"$regex": "^{}/([^/])*".format(f)}}
501 )
502 for children_file in files_cursor:
503 files += [children_file.filename.replace(f + "/", "", 1)]
504
505 return files
506 except IOError:
507 raise FsException(
508 "File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST
509 )
510
511 def file_delete(self, storage, ignore_non_exist=False):
512 """
513 Delete storage content recursively
514 :param storage: can be a str or list of str
515 :param ignore_non_exist: not raise exception if storage does not exist
516 :return: None
517 """
518 try:
519 f = storage if isinstance(storage, str) else "/".join(storage)
520 f = f.rstrip("/")
521
522 file_cursor = self.fs.find({"filename": f})
523 found = False
524 for requested_file in file_cursor:
525 found = True
526 exception_file = next(file_cursor, None)
527
528 if exception_file:
529 raise FsException(
530 "Multiple files found",
531 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
532 )
533
534 if requested_file.metadata["type"] == "dir":
535 dir_cursor = self.fs.find({"filename": {"$regex": "^{}".format(f)}})
536
537 for tmp in dir_cursor:
538 self.fs.delete(tmp._id)
539 else:
540 self.fs.delete(requested_file._id)
541 if not found and not ignore_non_exist:
542 raise FsException(
543 "File {} does not exist".format(storage),
544 http_code=HTTPStatus.NOT_FOUND,
545 )
546 except IOError as e:
547 raise FsException(
548 "File {} cannot be deleted: {}".format(f, e),
549 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
550 )
551
552 def sync(self, from_path=None):
553 """
554 Sync from FSMongo to local storage
555 :param from_path: if supplied, only copy content from this path, not all
556 :return: None
557 """
558 if from_path:
559 if os.path.isabs(from_path):
560 from_path = os.path.relpath(from_path, self.path)
561 self.__update_local_fs(from_path=from_path)
562
563 def _update_mongo_fs(self, from_path):
564
565 os_path = self.path + from_path
566
567 # Obtain list of files and dirs in filesystem
568 members = []
569 for root, dirs, files in os.walk(os_path):
570 for folder in dirs:
571 member = {"filename": os.path.join(root, folder), "type": "dir"}
572 if os.path.islink(member["filename"]):
573 member["type"] = "sym"
574 members.append(member)
575 for file in files:
576 filename = os.path.join(root, file)
577 if os.path.islink(filename):
578 file_type = "sym"
579 else:
580 file_type = "file"
581 member = {"filename": os.path.join(root, file), "type": file_type}
582 members.append(member)
583
584 # Obtain files in mongo dict
585 remote_files = self._get_mongo_files(from_path)
586
587 # Upload members if they do not exists or have been modified
588 # We will do this for performance (avoid updating unmodified files) and to avoid
589 # updating a file with an older one in case there are two sources for synchronization
590 # in high availability scenarios
591 for member in members:
592 # obtain permission
593 mask = int(oct(os.stat(member["filename"]).st_mode)[-3:], 8)
594
595 # convert to relative path
596 rel_filename = os.path.relpath(member["filename"], self.path)
597 last_modified_date = datetime.datetime.fromtimestamp(
598 os.path.getmtime(member["filename"])
599 )
600
601 remote_file = remote_files.get(rel_filename)
602 upload_date = (
603 remote_file[0].uploadDate if remote_file else datetime.datetime.min
604 )
605 # remove processed files from dict
606 remote_files.pop(rel_filename, None)
607
608 if last_modified_date >= upload_date:
609
610 stream = None
611 fh = None
612 try:
613 file_type = member["type"]
614 if file_type == "dir":
615 stream = BytesIO()
616 elif file_type == "sym":
617 stream = BytesIO(
618 os.readlink(member["filename"]).encode("utf-8")
619 )
620 else:
621 fh = open(member["filename"], "rb")
622 stream = BytesIO(fh.read())
623
624 metadata = {"type": file_type, "permissions": mask}
625
626 self.fs.upload_from_stream(rel_filename, stream, metadata=metadata)
627
628 # delete old files
629 if remote_file:
630 for file in remote_file:
631 self.fs.delete(file._id)
632 finally:
633 if fh:
634 fh.close()
635 if stream:
636 stream.close()
637
638 # delete files that are not any more in local fs
639 for remote_file in remote_files.values():
640 for file in remote_file:
641 self.fs.delete(file._id)
642
643 def _get_mongo_files(self, from_path=None):
644
645 file_dict = {}
646 file_cursor = self.fs.find(no_cursor_timeout=True, sort=[("uploadDate", -1)])
647 for file in file_cursor:
648 if from_path and not file.filename.startswith(from_path):
649 continue
650 if file.filename in file_dict:
651 file_dict[file.filename].append(file)
652 else:
653 file_dict[file.filename] = [file]
654 return file_dict
655
656 def reverse_sync(self, from_path: str):
657 """
658 Sync from local storage to FSMongo
659 :param from_path: base directory to upload content to mongo fs
660 :return: None
661 """
662 if os.path.isabs(from_path):
663 from_path = os.path.relpath(from_path, self.path)
664 self._update_mongo_fs(from_path=from_path)