Bug 1977: FSMongo Reverse Sync
[osm/common.git] / osm_common / fsmongo.py
1 # Copyright 2019 Canonical
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
13 # under the License.
14 #
15 # For those usages not covered by the Apache License, Version 2.0 please
16 # contact: eduardo.sousa@canonical.com
17 ##
18
19 import errno
20 from http import HTTPStatus
21 from io import BytesIO, StringIO
22 import logging
23 import os
24 import datetime
25 import tarfile
26 import zipfile
27
28 from gridfs import GridFSBucket, errors
29 from osm_common.fsbase import FsBase, FsException
30 from pymongo import MongoClient
31
32
33 __author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>"
34
35
36 class GridByteStream(BytesIO):
37 def __init__(self, filename, fs, mode):
38 BytesIO.__init__(self)
39 self._id = None
40 self.filename = filename
41 self.fs = fs
42 self.mode = mode
43 self.file_type = "file" # Set "file" as default file_type
44
45 self.__initialize__()
46
47 def __initialize__(self):
48 grid_file = None
49
50 cursor = self.fs.find({"filename": self.filename})
51
52 for requested_file in cursor:
53 exception_file = next(cursor, None)
54
55 if exception_file:
56 raise FsException(
57 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
58 )
59
60 if requested_file.metadata["type"] in ("file", "sym"):
61 grid_file = requested_file
62 self.file_type = requested_file.metadata["type"]
63 else:
64 raise FsException(
65 "Type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
66 )
67
68 if grid_file:
69 self._id = grid_file._id
70 self.fs.download_to_stream(self._id, self)
71
72 if "r" in self.mode:
73 self.seek(0, 0)
74
75 def close(self):
76 if "r" in self.mode:
77 super(GridByteStream, self).close()
78 return
79
80 if self._id:
81 self.fs.delete(self._id)
82
83 cursor = self.fs.find(
84 {"filename": self.filename.split("/")[0], "metadata": {"type": "dir"}}
85 )
86
87 parent_dir = next(cursor, None)
88
89 if not parent_dir:
90 parent_dir_name = self.filename.split("/")[0]
91 self.filename = self.filename.replace(
92 parent_dir_name, parent_dir_name[:-1], 1
93 )
94
95 self.seek(0, 0)
96 if self._id:
97 self.fs.upload_from_stream_with_id(
98 self._id, self.filename, self, metadata={"type": self.file_type}
99 )
100 else:
101 self.fs.upload_from_stream(
102 self.filename, self, metadata={"type": self.file_type}
103 )
104 super(GridByteStream, self).close()
105
106 def __enter__(self):
107 return self
108
109 def __exit__(self, exc_type, exc_val, exc_tb):
110 self.close()
111
112
113 class GridStringStream(StringIO):
114 def __init__(self, filename, fs, mode):
115 StringIO.__init__(self)
116 self._id = None
117 self.filename = filename
118 self.fs = fs
119 self.mode = mode
120 self.file_type = "file" # Set "file" as default file_type
121
122 self.__initialize__()
123
124 def __initialize__(self):
125 grid_file = None
126
127 cursor = self.fs.find({"filename": self.filename})
128
129 for requested_file in cursor:
130 exception_file = next(cursor, None)
131
132 if exception_file:
133 raise FsException(
134 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
135 )
136
137 if requested_file.metadata["type"] in ("file", "dir"):
138 grid_file = requested_file
139 self.file_type = requested_file.metadata["type"]
140 else:
141 raise FsException(
142 "File type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
143 )
144
145 if grid_file:
146 stream = BytesIO()
147 self._id = grid_file._id
148 self.fs.download_to_stream(self._id, stream)
149 stream.seek(0)
150 self.write(stream.read().decode("utf-8"))
151 stream.close()
152
153 if "r" in self.mode:
154 self.seek(0, 0)
155
156 def close(self):
157 if "r" in self.mode:
158 super(GridStringStream, self).close()
159 return
160
161 if self._id:
162 self.fs.delete(self._id)
163
164 cursor = self.fs.find(
165 {"filename": self.filename.split("/")[0], "metadata": {"type": "dir"}}
166 )
167
168 parent_dir = next(cursor, None)
169
170 if not parent_dir:
171 parent_dir_name = self.filename.split("/")[0]
172 self.filename = self.filename.replace(
173 parent_dir_name, parent_dir_name[:-1], 1
174 )
175
176 self.seek(0, 0)
177 stream = BytesIO()
178 stream.write(self.read().encode("utf-8"))
179 stream.seek(0, 0)
180 if self._id:
181 self.fs.upload_from_stream_with_id(
182 self._id, self.filename, stream, metadata={"type": self.file_type}
183 )
184 else:
185 self.fs.upload_from_stream(
186 self.filename, stream, metadata={"type": self.file_type}
187 )
188 stream.close()
189 super(GridStringStream, self).close()
190
191 def __enter__(self):
192 return self
193
194 def __exit__(self, exc_type, exc_val, exc_tb):
195 self.close()
196
197
198 class FsMongo(FsBase):
199 def __init__(self, logger_name="fs", lock=False):
200 super().__init__(logger_name, lock)
201 self.path = None
202 self.client = None
203 self.fs = None
204
205 def __update_local_fs(self, from_path=None):
206 dir_cursor = self.fs.find({"metadata.type": "dir"}, no_cursor_timeout=True)
207
208 valid_paths = []
209
210 for directory in dir_cursor:
211 if from_path and not directory.filename.startswith(from_path):
212 continue
213 os.makedirs(self.path + directory.filename, exist_ok=True)
214 valid_paths.append(self.path + directory.filename)
215
216 file_cursor = self.fs.find(
217 {"metadata.type": {"$in": ["file", "sym"]}}, no_cursor_timeout=True
218 )
219
220 for writing_file in file_cursor:
221 if from_path and not writing_file.filename.startswith(from_path):
222 continue
223 file_path = self.path + writing_file.filename
224
225 if writing_file.metadata["type"] == "sym":
226 with BytesIO() as b:
227 self.fs.download_to_stream(writing_file._id, b)
228 b.seek(0)
229 link = b.read().decode("utf-8")
230
231 try:
232 os.remove(file_path)
233 except OSError as e:
234 if e.errno != errno.ENOENT:
235 # This is probably permission denied or worse
236 raise
237 os.symlink(link, file_path)
238 else:
239 folder = os.path.dirname(file_path)
240 if folder not in valid_paths:
241 os.makedirs(folder, exist_ok=True)
242 with open(file_path, "wb+") as file_stream:
243 self.fs.download_to_stream(writing_file._id, file_stream)
244 if "permissions" in writing_file.metadata:
245 os.chmod(file_path, writing_file.metadata["permissions"])
246
247 def get_params(self):
248 return {"fs": "mongo", "path": self.path}
249
250 def fs_connect(self, config):
251 try:
252 if "logger_name" in config:
253 self.logger = logging.getLogger(config["logger_name"])
254 if "path" in config:
255 self.path = config["path"]
256 else:
257 raise FsException('Missing parameter "path"')
258 if not self.path.endswith("/"):
259 self.path += "/"
260 if not os.path.exists(self.path):
261 raise FsException(
262 "Invalid configuration param at '[storage]': path '{}' does not exist".format(
263 config["path"]
264 )
265 )
266 elif not os.access(self.path, os.W_OK):
267 raise FsException(
268 "Invalid configuration param at '[storage]': path '{}' is not writable".format(
269 config["path"]
270 )
271 )
272 if all(key in config.keys() for key in ["uri", "collection"]):
273 self.client = MongoClient(config["uri"])
274 self.fs = GridFSBucket(self.client[config["collection"]])
275 elif all(key in config.keys() for key in ["host", "port", "collection"]):
276 self.client = MongoClient(config["host"], config["port"])
277 self.fs = GridFSBucket(self.client[config["collection"]])
278 else:
279 if "collection" not in config.keys():
280 raise FsException('Missing parameter "collection"')
281 else:
282 raise FsException('Missing parameters: "uri" or "host" + "port"')
283 except FsException:
284 raise
285 except Exception as e: # TODO refine
286 raise FsException(str(e))
287
288 def fs_disconnect(self):
289 pass # TODO
290
291 def mkdir(self, folder):
292 """
293 Creates a folder or parent object location
294 :param folder:
295 :return: None or raises an exception
296 """
297 try:
298 self.fs.upload_from_stream(folder, BytesIO(), metadata={"type": "dir"})
299 except errors.FileExists: # make it idempotent
300 pass
301 except Exception as e:
302 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
303
304 def dir_rename(self, src, dst):
305 """
306 Rename one directory name. If dst exist, it replaces (deletes) existing directory
307 :param src: source directory
308 :param dst: destination directory
309 :return: None or raises and exception
310 """
311 try:
312 dst_cursor = self.fs.find(
313 {"filename": {"$regex": "^{}(/|$)".format(dst)}}, no_cursor_timeout=True
314 )
315
316 for dst_file in dst_cursor:
317 self.fs.delete(dst_file._id)
318
319 src_cursor = self.fs.find(
320 {"filename": {"$regex": "^{}(/|$)".format(src)}}, no_cursor_timeout=True
321 )
322
323 for src_file in src_cursor:
324 self.fs.rename(src_file._id, src_file.filename.replace(src, dst, 1))
325 except Exception as e:
326 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
327
328 def file_exists(self, storage, mode=None):
329 """
330 Indicates if "storage" file exist
331 :param storage: can be a str or a str list
332 :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
333 :return: True, False
334 """
335 f = storage if isinstance(storage, str) else "/".join(storage)
336
337 cursor = self.fs.find({"filename": f})
338
339 for requested_file in cursor:
340 exception_file = next(cursor, None)
341
342 if exception_file:
343 raise FsException(
344 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
345 )
346
347 print(requested_file.metadata)
348
349 # if no special mode is required just check it does exists
350 if not mode:
351 return True
352
353 if requested_file.metadata["type"] == mode:
354 return True
355
356 if requested_file.metadata["type"] == "sym" and mode == "file":
357 return True
358
359 return False
360
361 def file_size(self, storage):
362 """
363 return file size
364 :param storage: can be a str or a str list
365 :return: file size
366 """
367 f = storage if isinstance(storage, str) else "/".join(storage)
368
369 cursor = self.fs.find({"filename": f})
370
371 for requested_file in cursor:
372 exception_file = next(cursor, None)
373
374 if exception_file:
375 raise FsException(
376 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
377 )
378
379 return requested_file.length
380
381 def file_extract(self, compressed_object, path):
382 """
383 extract a tar file
384 :param compressed_object: object of type tar or zip
385 :param path: can be a str or a str list, or a tar object where to extract the tar_object
386 :return: None
387 """
388 f = path if isinstance(path, str) else "/".join(path)
389
390 if type(compressed_object) is tarfile.TarFile:
391 for member in compressed_object.getmembers():
392 if member.isfile():
393 stream = compressed_object.extractfile(member)
394 elif member.issym():
395 stream = BytesIO(member.linkname.encode("utf-8"))
396 else:
397 stream = BytesIO()
398
399 if member.isfile():
400 file_type = "file"
401 elif member.issym():
402 file_type = "sym"
403 else:
404 file_type = "dir"
405
406 metadata = {"type": file_type, "permissions": member.mode}
407
408 self.fs.upload_from_stream(
409 f + "/" + member.name, stream, metadata=metadata
410 )
411
412 stream.close()
413 elif type(compressed_object) is zipfile.ZipFile:
414 for member in compressed_object.infolist():
415 if member.is_dir():
416 stream = BytesIO()
417 else:
418 stream = compressed_object.read(member)
419
420 if member.is_dir():
421 file_type = "dir"
422 else:
423 file_type = "file"
424
425 metadata = {"type": file_type}
426
427 print("Now uploading...")
428 print(f + "/" + member.filename)
429 self.fs.upload_from_stream(
430 f + "/" + member.filename, stream, metadata=metadata
431 )
432
433 if member.is_dir():
434 stream.close()
435
436 def file_open(self, storage, mode):
437 """
438 Open a file
439 :param storage: can be a str or list of str
440 :param mode: file mode
441 :return: file object
442 """
443 try:
444 f = storage if isinstance(storage, str) else "/".join(storage)
445
446 if "b" in mode:
447 return GridByteStream(f, self.fs, mode)
448 else:
449 return GridStringStream(f, self.fs, mode)
450 except errors.NoFile:
451 raise FsException(
452 "File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND
453 )
454 except IOError:
455 raise FsException(
456 "File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST
457 )
458
459 def dir_ls(self, storage):
460 """
461 return folder content
462 :param storage: can be a str or list of str
463 :return: folder content
464 """
465 try:
466 f = storage if isinstance(storage, str) else "/".join(storage)
467
468 files = []
469 dir_cursor = self.fs.find({"filename": f})
470 for requested_dir in dir_cursor:
471 exception_dir = next(dir_cursor, None)
472
473 if exception_dir:
474 raise FsException(
475 "Multiple directories found",
476 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
477 )
478
479 if requested_dir.metadata["type"] != "dir":
480 raise FsException(
481 "File {} does not exist".format(f),
482 http_code=HTTPStatus.NOT_FOUND,
483 )
484
485 if f.endswith("/"):
486 f = f[:-1]
487
488 files_cursor = self.fs.find(
489 {"filename": {"$regex": "^{}/([^/])*".format(f)}}
490 )
491 for children_file in files_cursor:
492 files += [children_file.filename.replace(f + "/", "", 1)]
493
494 return files
495 except IOError:
496 raise FsException(
497 "File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST
498 )
499
500 def file_delete(self, storage, ignore_non_exist=False):
501 """
502 Delete storage content recursively
503 :param storage: can be a str or list of str
504 :param ignore_non_exist: not raise exception if storage does not exist
505 :return: None
506 """
507 try:
508 f = storage if isinstance(storage, str) else "/".join(storage)
509
510 file_cursor = self.fs.find({"filename": f})
511 found = False
512 for requested_file in file_cursor:
513 found = True
514 exception_file = next(file_cursor, None)
515
516 if exception_file:
517 raise FsException(
518 "Multiple files found",
519 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
520 )
521
522 if requested_file.metadata["type"] == "dir":
523 dir_cursor = self.fs.find({"filename": {"$regex": "^{}".format(f)}})
524
525 for tmp in dir_cursor:
526 self.fs.delete(tmp._id)
527 else:
528 self.fs.delete(requested_file._id)
529 if not found and not ignore_non_exist:
530 raise FsException(
531 "File {} does not exist".format(storage),
532 http_code=HTTPStatus.NOT_FOUND,
533 )
534 except IOError as e:
535 raise FsException(
536 "File {} cannot be deleted: {}".format(f, e),
537 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
538 )
539
540 def sync(self, from_path=None):
541 """
542 Sync from FSMongo to local storage
543 :param from_path: if supplied, only copy content from this path, not all
544 :return: None
545 """
546 if from_path:
547 if os.path.isabs(from_path):
548 from_path = os.path.relpath(from_path, self.path)
549 self.__update_local_fs(from_path=from_path)
550
551 def _update_mongo_fs(self, from_path):
552
553 os_path = self.path + from_path
554
555 # Obtain list of files and dirs in filesystem
556 members = []
557 for root, dirs, files in os.walk(os_path):
558 for folder in dirs:
559 member = {"filename": os.path.join(root, folder), "type": "dir"}
560 if os.path.islink(member["filename"]):
561 member["type"] = "sym"
562 members.append(member)
563 for file in files:
564 filename = os.path.join(root, file)
565 if os.path.islink(filename):
566 file_type = "sym"
567 else:
568 file_type = "file"
569 member = {"filename": os.path.join(root, file), "type": file_type}
570 members.append(member)
571
572 # Obtain files in mongo dict
573 remote_files = self._get_mongo_files(from_path)
574
575 # Upload members if they do not exists or have been modified
576 # We will do this for performance (avoid updating unmodified files) and to avoid
577 # updating a file with an older one in case there are two sources for synchronization
578 # in high availability scenarios
579 for member in members:
580 # obtain permission
581 mask = int(oct(os.stat(member["filename"]).st_mode)[-3:], 8)
582
583 # convert to relative path
584 rel_filename = os.path.relpath(member["filename"], self.path)
585 last_modified_date = datetime.datetime.fromtimestamp(
586 os.path.getmtime(member["filename"])
587 )
588
589 remote_file = remote_files.get(rel_filename)
590 upload_date = (
591 remote_file[0].uploadDate if remote_file else datetime.datetime.min
592 )
593 # remove processed files from dict
594 remote_files.pop(rel_filename, None)
595
596 if last_modified_date >= upload_date:
597
598 stream = None
599 fh = None
600 try:
601 file_type = member["type"]
602 if file_type == "dir":
603 stream = BytesIO()
604 elif file_type == "sym":
605 stream = BytesIO(
606 os.readlink(member["filename"]).encode("utf-8")
607 )
608 else:
609 fh = open(member["filename"], "rb")
610 stream = BytesIO(fh.read())
611
612 metadata = {"type": file_type, "permissions": mask}
613
614 self.fs.upload_from_stream(rel_filename, stream, metadata=metadata)
615
616 # delete old files
617 if remote_file:
618 for file in remote_file:
619 self.fs.delete(file._id)
620 finally:
621 if fh:
622 fh.close()
623 if stream:
624 stream.close()
625
626 # delete files that are not any more in local fs
627 for remote_file in remote_files.values():
628 for file in remote_file:
629 self.fs.delete(file._id)
630
631 def _get_mongo_files(self, from_path=None):
632
633 file_dict = {}
634 file_cursor = self.fs.find(no_cursor_timeout=True, sort=[("uploadDate", -1)])
635 for file in file_cursor:
636 if from_path and not file.filename.startswith(from_path):
637 continue
638 if file.filename in file_dict:
639 file_dict[file.filename].append(file)
640 else:
641 file_dict[file.filename] = [file]
642 return file_dict
643
644 def reverse_sync(self, from_path: str):
645 """
646 Sync from local storage to FSMongo
647 :param from_path: base directory to upload content to mongo fs
648 :return: None
649 """
650 if os.path.isabs(from_path):
651 from_path = os.path.relpath(from_path, self.path)
652 self._update_mongo_fs(from_path=from_path)