7fb071ac71790fb50c9edf35347cfeec49206bfc
[osm/common.git] / osm_common / fsmongo.py
1 # Copyright 2019 Canonical
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
13 # under the License.
14 #
15 # For those usages not covered by the Apache License, Version 2.0 please
16 # contact: eduardo.sousa@canonical.com
17 ##
18
19 import errno
20 from http import HTTPStatus
21 from io import BytesIO, StringIO
22 import logging
23 import os
24 import datetime
25 import tarfile
26 import zipfile
27
28 from gridfs import GridFSBucket, errors
29 from osm_common.fsbase import FsBase, FsException
30 from pymongo import MongoClient
31
32
33 __author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>"
34
35
36 class GridByteStream(BytesIO):
37 def __init__(self, filename, fs, mode):
38 BytesIO.__init__(self)
39 self._id = None
40 self.filename = filename
41 self.fs = fs
42 self.mode = mode
43 self.file_type = "file" # Set "file" as default file_type
44
45 self.__initialize__()
46
47 def __initialize__(self):
48 grid_file = None
49
50 cursor = self.fs.find({"filename": self.filename})
51
52 for requested_file in cursor:
53 exception_file = next(cursor, None)
54
55 if exception_file:
56 raise FsException(
57 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
58 )
59
60 if requested_file.metadata["type"] in ("file", "sym"):
61 grid_file = requested_file
62 self.file_type = requested_file.metadata["type"]
63 else:
64 raise FsException(
65 "Type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
66 )
67
68 if grid_file:
69 self._id = grid_file._id
70 self.fs.download_to_stream(self._id, self)
71
72 if "r" in self.mode:
73 self.seek(0, 0)
74
75 def close(self):
76 if "r" in self.mode:
77 super(GridByteStream, self).close()
78 return
79
80 if self._id:
81 self.fs.delete(self._id)
82
83 cursor = self.fs.find(
84 {"filename": self.filename.split("/")[0], "metadata": {"type": "dir"}}
85 )
86
87 parent_dir = next(cursor, None)
88
89 if not parent_dir:
90 parent_dir_name = self.filename.split("/")[0]
91 self.filename = self.filename.replace(
92 parent_dir_name, parent_dir_name[:-1], 1
93 )
94
95 self.seek(0, 0)
96 if self._id:
97 self.fs.upload_from_stream_with_id(
98 self._id, self.filename, self, metadata={"type": self.file_type}
99 )
100 else:
101 self.fs.upload_from_stream(
102 self.filename, self, metadata={"type": self.file_type}
103 )
104 super(GridByteStream, self).close()
105
106 def __enter__(self):
107 return self
108
109 def __exit__(self, exc_type, exc_val, exc_tb):
110 self.close()
111
112
113 class GridStringStream(StringIO):
114 def __init__(self, filename, fs, mode):
115 StringIO.__init__(self)
116 self._id = None
117 self.filename = filename
118 self.fs = fs
119 self.mode = mode
120 self.file_type = "file" # Set "file" as default file_type
121
122 self.__initialize__()
123
124 def __initialize__(self):
125 grid_file = None
126
127 cursor = self.fs.find({"filename": self.filename})
128
129 for requested_file in cursor:
130 exception_file = next(cursor, None)
131
132 if exception_file:
133 raise FsException(
134 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
135 )
136
137 if requested_file.metadata["type"] in ("file", "dir"):
138 grid_file = requested_file
139 self.file_type = requested_file.metadata["type"]
140 else:
141 raise FsException(
142 "File type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
143 )
144
145 if grid_file:
146 stream = BytesIO()
147 self._id = grid_file._id
148 self.fs.download_to_stream(self._id, stream)
149 stream.seek(0)
150 self.write(stream.read().decode("utf-8"))
151 stream.close()
152
153 if "r" in self.mode:
154 self.seek(0, 0)
155
156 def close(self):
157 if "r" in self.mode:
158 super(GridStringStream, self).close()
159 return
160
161 if self._id:
162 self.fs.delete(self._id)
163
164 cursor = self.fs.find(
165 {"filename": self.filename.split("/")[0], "metadata": {"type": "dir"}}
166 )
167
168 parent_dir = next(cursor, None)
169
170 if not parent_dir:
171 parent_dir_name = self.filename.split("/")[0]
172 self.filename = self.filename.replace(
173 parent_dir_name, parent_dir_name[:-1], 1
174 )
175
176 self.seek(0, 0)
177 stream = BytesIO()
178 stream.write(self.read().encode("utf-8"))
179 stream.seek(0, 0)
180 if self._id:
181 self.fs.upload_from_stream_with_id(
182 self._id, self.filename, stream, metadata={"type": self.file_type}
183 )
184 else:
185 self.fs.upload_from_stream(
186 self.filename, stream, metadata={"type": self.file_type}
187 )
188 stream.close()
189 super(GridStringStream, self).close()
190
191 def __enter__(self):
192 return self
193
194 def __exit__(self, exc_type, exc_val, exc_tb):
195 self.close()
196
197
198 class FsMongo(FsBase):
199 def __init__(self, logger_name="fs", lock=False):
200 super().__init__(logger_name, lock)
201 self.path = None
202 self.client = None
203 self.fs = None
204
205 def __update_local_fs(self, from_path=None):
206 dir_cursor = self.fs.find({"metadata.type": "dir"}, no_cursor_timeout=True)
207
208 for directory in dir_cursor:
209 if from_path and not directory.filename.startswith(from_path):
210 continue
211 os.makedirs(self.path + directory.filename, exist_ok=True)
212
213 file_cursor = self.fs.find(
214 {"metadata.type": {"$in": ["file", "sym"]}}, no_cursor_timeout=True
215 )
216
217 for writing_file in file_cursor:
218 if from_path and not writing_file.filename.startswith(from_path):
219 continue
220 file_path = self.path + writing_file.filename
221
222 if writing_file.metadata["type"] == "sym":
223 with BytesIO() as b:
224 self.fs.download_to_stream(writing_file._id, b)
225 b.seek(0)
226 link = b.read().decode("utf-8")
227
228 try:
229 os.remove(file_path)
230 except OSError as e:
231 if e.errno != errno.ENOENT:
232 # This is probably permission denied or worse
233 raise
234 os.symlink(link, file_path)
235 else:
236 with open(file_path, "wb+") as file_stream:
237 self.fs.download_to_stream(writing_file._id, file_stream)
238 if "permissions" in writing_file.metadata:
239 os.chmod(file_path, writing_file.metadata["permissions"])
240
241 def get_params(self):
242 return {"fs": "mongo", "path": self.path}
243
244 def fs_connect(self, config):
245 try:
246 if "logger_name" in config:
247 self.logger = logging.getLogger(config["logger_name"])
248 if "path" in config:
249 self.path = config["path"]
250 else:
251 raise FsException('Missing parameter "path"')
252 if not self.path.endswith("/"):
253 self.path += "/"
254 if not os.path.exists(self.path):
255 raise FsException(
256 "Invalid configuration param at '[storage]': path '{}' does not exist".format(
257 config["path"]
258 )
259 )
260 elif not os.access(self.path, os.W_OK):
261 raise FsException(
262 "Invalid configuration param at '[storage]': path '{}' is not writable".format(
263 config["path"]
264 )
265 )
266 if all(key in config.keys() for key in ["uri", "collection"]):
267 self.client = MongoClient(config["uri"])
268 self.fs = GridFSBucket(self.client[config["collection"]])
269 elif all(key in config.keys() for key in ["host", "port", "collection"]):
270 self.client = MongoClient(config["host"], config["port"])
271 self.fs = GridFSBucket(self.client[config["collection"]])
272 else:
273 if "collection" not in config.keys():
274 raise FsException('Missing parameter "collection"')
275 else:
276 raise FsException('Missing parameters: "uri" or "host" + "port"')
277 except FsException:
278 raise
279 except Exception as e: # TODO refine
280 raise FsException(str(e))
281
282 def fs_disconnect(self):
283 pass # TODO
284
285 def mkdir(self, folder):
286 """
287 Creates a folder or parent object location
288 :param folder:
289 :return: None or raises an exception
290 """
291 try:
292 self.fs.upload_from_stream(folder, BytesIO(), metadata={"type": "dir"})
293 except errors.FileExists: # make it idempotent
294 pass
295 except Exception as e:
296 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
297
298 def dir_rename(self, src, dst):
299 """
300 Rename one directory name. If dst exist, it replaces (deletes) existing directory
301 :param src: source directory
302 :param dst: destination directory
303 :return: None or raises and exception
304 """
305 try:
306 dst_cursor = self.fs.find(
307 {"filename": {"$regex": "^{}(/|$)".format(dst)}}, no_cursor_timeout=True
308 )
309
310 for dst_file in dst_cursor:
311 self.fs.delete(dst_file._id)
312
313 src_cursor = self.fs.find(
314 {"filename": {"$regex": "^{}(/|$)".format(src)}}, no_cursor_timeout=True
315 )
316
317 for src_file in src_cursor:
318 self.fs.rename(src_file._id, src_file.filename.replace(src, dst, 1))
319 except Exception as e:
320 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
321
322 def file_exists(self, storage, mode=None):
323 """
324 Indicates if "storage" file exist
325 :param storage: can be a str or a str list
326 :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
327 :return: True, False
328 """
329 f = storage if isinstance(storage, str) else "/".join(storage)
330
331 cursor = self.fs.find({"filename": f})
332
333 for requested_file in cursor:
334 exception_file = next(cursor, None)
335
336 if exception_file:
337 raise FsException(
338 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
339 )
340
341 print(requested_file.metadata)
342
343 # if no special mode is required just check it does exists
344 if not mode:
345 return True
346
347 if requested_file.metadata["type"] == mode:
348 return True
349
350 if requested_file.metadata["type"] == "sym" and mode == "file":
351 return True
352
353 return False
354
355 def file_size(self, storage):
356 """
357 return file size
358 :param storage: can be a str or a str list
359 :return: file size
360 """
361 f = storage if isinstance(storage, str) else "/".join(storage)
362
363 cursor = self.fs.find({"filename": f})
364
365 for requested_file in cursor:
366 exception_file = next(cursor, None)
367
368 if exception_file:
369 raise FsException(
370 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
371 )
372
373 return requested_file.length
374
375 def file_extract(self, compressed_object, path):
376 """
377 extract a tar file
378 :param compressed_object: object of type tar or zip
379 :param path: can be a str or a str list, or a tar object where to extract the tar_object
380 :return: None
381 """
382 f = path if isinstance(path, str) else "/".join(path)
383
384 if type(compressed_object) is tarfile.TarFile:
385 for member in compressed_object.getmembers():
386 if member.isfile():
387 stream = compressed_object.extractfile(member)
388 elif member.issym():
389 stream = BytesIO(member.linkname.encode("utf-8"))
390 else:
391 stream = BytesIO()
392
393 if member.isfile():
394 file_type = "file"
395 elif member.issym():
396 file_type = "sym"
397 else:
398 file_type = "dir"
399
400 metadata = {"type": file_type, "permissions": member.mode}
401
402 self.fs.upload_from_stream(
403 f + "/" + member.name, stream, metadata=metadata
404 )
405
406 stream.close()
407 elif type(compressed_object) is zipfile.ZipFile:
408 for member in compressed_object.infolist():
409 if member.is_dir():
410 stream = BytesIO()
411 else:
412 stream = compressed_object.read(member)
413
414 if member.is_dir():
415 file_type = "dir"
416 else:
417 file_type = "file"
418
419 metadata = {"type": file_type}
420
421 print("Now uploading...")
422 print(f + "/" + member.filename)
423 self.fs.upload_from_stream(
424 f + "/" + member.filename, stream, metadata=metadata
425 )
426
427 if member.is_dir():
428 stream.close()
429
430 def file_open(self, storage, mode):
431 """
432 Open a file
433 :param storage: can be a str or list of str
434 :param mode: file mode
435 :return: file object
436 """
437 try:
438 f = storage if isinstance(storage, str) else "/".join(storage)
439
440 if "b" in mode:
441 return GridByteStream(f, self.fs, mode)
442 else:
443 return GridStringStream(f, self.fs, mode)
444 except errors.NoFile:
445 raise FsException(
446 "File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND
447 )
448 except IOError:
449 raise FsException(
450 "File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST
451 )
452
453 def dir_ls(self, storage):
454 """
455 return folder content
456 :param storage: can be a str or list of str
457 :return: folder content
458 """
459 try:
460 f = storage if isinstance(storage, str) else "/".join(storage)
461
462 files = []
463 dir_cursor = self.fs.find({"filename": f})
464 for requested_dir in dir_cursor:
465 exception_dir = next(dir_cursor, None)
466
467 if exception_dir:
468 raise FsException(
469 "Multiple directories found",
470 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
471 )
472
473 if requested_dir.metadata["type"] != "dir":
474 raise FsException(
475 "File {} does not exist".format(f),
476 http_code=HTTPStatus.NOT_FOUND,
477 )
478
479 if f.endswith("/"):
480 f = f[:-1]
481
482 files_cursor = self.fs.find(
483 {"filename": {"$regex": "^{}/([^/])*".format(f)}}
484 )
485 for children_file in files_cursor:
486 files += [children_file.filename.replace(f + "/", "", 1)]
487
488 return files
489 except IOError:
490 raise FsException(
491 "File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST
492 )
493
494 def file_delete(self, storage, ignore_non_exist=False):
495 """
496 Delete storage content recursively
497 :param storage: can be a str or list of str
498 :param ignore_non_exist: not raise exception if storage does not exist
499 :return: None
500 """
501 try:
502 f = storage if isinstance(storage, str) else "/".join(storage)
503
504 file_cursor = self.fs.find({"filename": f})
505 found = False
506 for requested_file in file_cursor:
507 found = True
508 exception_file = next(file_cursor, None)
509
510 if exception_file:
511 raise FsException(
512 "Multiple files found",
513 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
514 )
515
516 if requested_file.metadata["type"] == "dir":
517 dir_cursor = self.fs.find({"filename": {"$regex": "^{}".format(f)}})
518
519 for tmp in dir_cursor:
520 self.fs.delete(tmp._id)
521 else:
522 self.fs.delete(requested_file._id)
523 if not found and not ignore_non_exist:
524 raise FsException(
525 "File {} does not exist".format(storage),
526 http_code=HTTPStatus.NOT_FOUND,
527 )
528 except IOError as e:
529 raise FsException(
530 "File {} cannot be deleted: {}".format(f, e),
531 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
532 )
533
534 def sync(self, from_path=None):
535 """
536 Sync from FSMongo to local storage
537 :param from_path: if supplied, only copy content from this path, not all
538 :return: None
539 """
540 if from_path:
541 if os.path.isabs(from_path):
542 from_path = os.path.relpath(from_path, self.path)
543 self.__update_local_fs(from_path=from_path)
544
545 def _update_mongo_fs(self, from_path):
546
547 os_path = self.path + from_path
548
549 # Obtain list of files and dirs in filesystem
550 members = []
551 for root, dirs, files in os.walk(os_path):
552 for folder in dirs:
553 member = {"filename": os.path.join(root, folder), "type": "dir"}
554 members.append(member)
555 for file in files:
556 filename = os.path.join(root, file)
557 if os.path.islink(filename):
558 file_type = "sym"
559 else:
560 file_type = "file"
561 member = {"filename": os.path.join(root, file), "type": file_type}
562 members.append(member)
563
564 # Obtain files in mongo dict
565 remote_files = self._get_mongo_files(from_path)
566
567 # Upload members if they do not exists or have been modified
568 # We will do this for performance (avoid updating unmodified files) and to avoid
569 # updating a file with an older one in case there are two sources for synchronization
570 # in high availability scenarios
571 for member in members:
572 # obtain permission
573 mask = int(oct(os.stat(member["filename"]).st_mode)[-3:], 8)
574
575 # convert to relative path
576 rel_filename = os.path.relpath(member["filename"], self.path)
577 last_modified_date = datetime.datetime.fromtimestamp(
578 os.path.getmtime(member["filename"])
579 )
580
581 remote_file = remote_files.get(rel_filename)
582 upload_date = (
583 remote_file[0].uploadDate if remote_file else datetime.datetime.min
584 )
585 # remove processed files from dict
586 remote_files.pop(rel_filename, None)
587
588 if last_modified_date >= upload_date:
589
590 stream = None
591 fh = None
592 try:
593 file_type = member["type"]
594 if file_type == "dir":
595 stream = BytesIO()
596 elif file_type == "sym":
597 stream = BytesIO(
598 os.readlink(member["filename"]).encode("utf-8")
599 )
600 else:
601 fh = open(member["filename"], "rb")
602 stream = BytesIO(fh.read())
603
604 metadata = {"type": file_type, "permissions": mask}
605
606 self.fs.upload_from_stream(rel_filename, stream, metadata=metadata)
607
608 # delete old files
609 if remote_file:
610 for file in remote_file:
611 self.fs.delete(file._id)
612 finally:
613 if fh:
614 fh.close()
615 if stream:
616 stream.close()
617
618 # delete files that are not any more in local fs
619 for remote_file in remote_files.values():
620 for file in remote_file:
621 self.fs.delete(file._id)
622
623 def _get_mongo_files(self, from_path=None):
624
625 file_dict = {}
626 file_cursor = self.fs.find(no_cursor_timeout=True, sort=[("uploadDate", -1)])
627 for file in file_cursor:
628 if from_path and not file.filename.startswith(from_path):
629 continue
630 if file.filename in file_dict:
631 file_dict[file.filename].append(file)
632 else:
633 file_dict[file.filename] = [file]
634 return file_dict
635
636 def reverse_sync(self, from_path: str):
637 """
638 Sync from local storage to FSMongo
639 :param from_path: base directory to upload content to mongo fs
640 :return: None
641 """
642 if os.path.isabs(from_path):
643 from_path = os.path.relpath(from_path, self.path)
644 self._update_mongo_fs(from_path=from_path)