Increase Logging, fix directory delete
[osm/common.git] / osm_common / fsmongo.py
1 # Copyright 2019 Canonical
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
13 # under the License.
14 #
15 # For those usages not covered by the Apache License, Version 2.0 please
16 # contact: eduardo.sousa@canonical.com
17 ##
18
19 import errno
20 from http import HTTPStatus
21 from io import BytesIO, StringIO
22 import logging
23 import os
24 import datetime
25 import tarfile
26 import zipfile
27
28 from gridfs import GridFSBucket, errors
29 from osm_common.fsbase import FsBase, FsException
30 from pymongo import MongoClient
31
32
33 __author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>"
34
35
36 class GridByteStream(BytesIO):
37 def __init__(self, filename, fs, mode):
38 BytesIO.__init__(self)
39 self._id = None
40 self.filename = filename
41 self.fs = fs
42 self.mode = mode
43 self.file_type = "file" # Set "file" as default file_type
44
45 self.__initialize__()
46
47 def __initialize__(self):
48 grid_file = None
49
50 cursor = self.fs.find({"filename": self.filename})
51
52 for requested_file in cursor:
53 exception_file = next(cursor, None)
54
55 if exception_file:
56 raise FsException(
57 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
58 )
59
60 if requested_file.metadata["type"] in ("file", "sym"):
61 grid_file = requested_file
62 self.file_type = requested_file.metadata["type"]
63 else:
64 raise FsException(
65 "Type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
66 )
67
68 if grid_file:
69 self._id = grid_file._id
70 self.fs.download_to_stream(self._id, self)
71
72 if "r" in self.mode:
73 self.seek(0, 0)
74
75 def close(self):
76 if "r" in self.mode:
77 super(GridByteStream, self).close()
78 return
79
80 if self._id:
81 self.fs.delete(self._id)
82
83 cursor = self.fs.find(
84 {"filename": self.filename.split("/")[0], "metadata": {"type": "dir"}}
85 )
86
87 parent_dir = next(cursor, None)
88
89 if not parent_dir:
90 parent_dir_name = self.filename.split("/")[0]
91 self.filename = self.filename.replace(
92 parent_dir_name, parent_dir_name[:-1], 1
93 )
94
95 self.seek(0, 0)
96 if self._id:
97 self.fs.upload_from_stream_with_id(
98 self._id, self.filename, self, metadata={"type": self.file_type}
99 )
100 else:
101 self.fs.upload_from_stream(
102 self.filename, self, metadata={"type": self.file_type}
103 )
104 super(GridByteStream, self).close()
105
106 def __enter__(self):
107 return self
108
109 def __exit__(self, exc_type, exc_val, exc_tb):
110 self.close()
111
112
113 class GridStringStream(StringIO):
114 def __init__(self, filename, fs, mode):
115 StringIO.__init__(self)
116 self._id = None
117 self.filename = filename
118 self.fs = fs
119 self.mode = mode
120 self.file_type = "file" # Set "file" as default file_type
121
122 self.__initialize__()
123
124 def __initialize__(self):
125 grid_file = None
126
127 cursor = self.fs.find({"filename": self.filename})
128
129 for requested_file in cursor:
130 exception_file = next(cursor, None)
131
132 if exception_file:
133 raise FsException(
134 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
135 )
136
137 if requested_file.metadata["type"] in ("file", "dir"):
138 grid_file = requested_file
139 self.file_type = requested_file.metadata["type"]
140 else:
141 raise FsException(
142 "File type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
143 )
144
145 if grid_file:
146 stream = BytesIO()
147 self._id = grid_file._id
148 self.fs.download_to_stream(self._id, stream)
149 stream.seek(0)
150 self.write(stream.read().decode("utf-8"))
151 stream.close()
152
153 if "r" in self.mode:
154 self.seek(0, 0)
155
156 def close(self):
157 if "r" in self.mode:
158 super(GridStringStream, self).close()
159 return
160
161 if self._id:
162 self.fs.delete(self._id)
163
164 cursor = self.fs.find(
165 {"filename": self.filename.split("/")[0], "metadata": {"type": "dir"}}
166 )
167
168 parent_dir = next(cursor, None)
169
170 if not parent_dir:
171 parent_dir_name = self.filename.split("/")[0]
172 self.filename = self.filename.replace(
173 parent_dir_name, parent_dir_name[:-1], 1
174 )
175
176 self.seek(0, 0)
177 stream = BytesIO()
178 stream.write(self.read().encode("utf-8"))
179 stream.seek(0, 0)
180 if self._id:
181 self.fs.upload_from_stream_with_id(
182 self._id, self.filename, stream, metadata={"type": self.file_type}
183 )
184 else:
185 self.fs.upload_from_stream(
186 self.filename, stream, metadata={"type": self.file_type}
187 )
188 stream.close()
189 super(GridStringStream, self).close()
190
191 def __enter__(self):
192 return self
193
194 def __exit__(self, exc_type, exc_val, exc_tb):
195 self.close()
196
197
198 class FsMongo(FsBase):
199 def __init__(self, logger_name="fs", lock=False):
200 super().__init__(logger_name, lock)
201 self.path = None
202 self.client = None
203 self.fs = None
204
205 def __update_local_fs(self, from_path=None):
206 dir_cursor = self.fs.find({"metadata.type": "dir"}, no_cursor_timeout=True)
207
208 valid_paths = []
209
210 for directory in dir_cursor:
211 if from_path and not directory.filename.startswith(from_path):
212 continue
213 self.logger.debug("Making dir {}".format(self.path + directory.filename))
214 os.makedirs(self.path + directory.filename, exist_ok=True)
215 valid_paths.append(self.path + directory.filename)
216
217 file_cursor = self.fs.find(
218 {"metadata.type": {"$in": ["file", "sym"]}}, no_cursor_timeout=True
219 )
220
221 for writing_file in file_cursor:
222 if from_path and not writing_file.filename.startswith(from_path):
223 continue
224 file_path = self.path + writing_file.filename
225
226 if writing_file.metadata["type"] == "sym":
227 with BytesIO() as b:
228 self.fs.download_to_stream(writing_file._id, b)
229 b.seek(0)
230 link = b.read().decode("utf-8")
231
232 try:
233 self.logger.debug("Sync removing {}".format(file_path))
234 os.remove(file_path)
235 except OSError as e:
236 if e.errno != errno.ENOENT:
237 # This is probably permission denied or worse
238 raise
239 os.symlink(link, file_path)
240 else:
241 folder = os.path.dirname(file_path)
242 if folder not in valid_paths:
243 self.logger.debug("Sync local directory {}".format(file_path))
244 os.makedirs(folder, exist_ok=True)
245 with open(file_path, "wb+") as file_stream:
246 self.logger.debug("Sync download {}".format(file_path))
247 self.fs.download_to_stream(writing_file._id, file_stream)
248 if "permissions" in writing_file.metadata:
249 os.chmod(file_path, writing_file.metadata["permissions"])
250
251 def get_params(self):
252 return {"fs": "mongo", "path": self.path}
253
254 def fs_connect(self, config):
255 try:
256 if "logger_name" in config:
257 self.logger = logging.getLogger(config["logger_name"])
258 if "path" in config:
259 self.path = config["path"]
260 else:
261 raise FsException('Missing parameter "path"')
262 if not self.path.endswith("/"):
263 self.path += "/"
264 if not os.path.exists(self.path):
265 raise FsException(
266 "Invalid configuration param at '[storage]': path '{}' does not exist".format(
267 config["path"]
268 )
269 )
270 elif not os.access(self.path, os.W_OK):
271 raise FsException(
272 "Invalid configuration param at '[storage]': path '{}' is not writable".format(
273 config["path"]
274 )
275 )
276 if all(key in config.keys() for key in ["uri", "collection"]):
277 self.client = MongoClient(config["uri"])
278 self.fs = GridFSBucket(self.client[config["collection"]])
279 elif all(key in config.keys() for key in ["host", "port", "collection"]):
280 self.client = MongoClient(config["host"], config["port"])
281 self.fs = GridFSBucket(self.client[config["collection"]])
282 else:
283 if "collection" not in config.keys():
284 raise FsException('Missing parameter "collection"')
285 else:
286 raise FsException('Missing parameters: "uri" or "host" + "port"')
287 except FsException:
288 raise
289 except Exception as e: # TODO refine
290 raise FsException(str(e))
291
292 def fs_disconnect(self):
293 pass # TODO
294
295 def mkdir(self, folder):
296 """
297 Creates a folder or parent object location
298 :param folder:
299 :return: None or raises an exception
300 """
301 folder = folder.rstrip("/")
302 try:
303 self.fs.upload_from_stream(folder, BytesIO(), metadata={"type": "dir"})
304 except errors.FileExists: # make it idempotent
305 pass
306 except Exception as e:
307 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
308
309 def dir_rename(self, src, dst):
310 """
311 Rename one directory name. If dst exist, it replaces (deletes) existing directory
312 :param src: source directory
313 :param dst: destination directory
314 :return: None or raises and exception
315 """
316 dst = dst.rstrip("/")
317 src = src.rstrip("/")
318
319 try:
320 dst_cursor = self.fs.find(
321 {"filename": {"$regex": "^{}(/|$)".format(dst)}}, no_cursor_timeout=True
322 )
323
324 for dst_file in dst_cursor:
325 self.fs.delete(dst_file._id)
326
327 src_cursor = self.fs.find(
328 {"filename": {"$regex": "^{}(/|$)".format(src)}}, no_cursor_timeout=True
329 )
330
331 for src_file in src_cursor:
332 self.fs.rename(src_file._id, src_file.filename.replace(src, dst, 1))
333 except Exception as e:
334 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
335
336 def file_exists(self, storage, mode=None):
337 """
338 Indicates if "storage" file exist
339 :param storage: can be a str or a str list
340 :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
341 :return: True, False
342 """
343 f = storage if isinstance(storage, str) else "/".join(storage)
344 f = f.rstrip("/")
345
346 cursor = self.fs.find({"filename": f})
347
348 for requested_file in cursor:
349 exception_file = next(cursor, None)
350
351 if exception_file:
352 raise FsException(
353 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
354 )
355
356 self.logger.debug("Entry {} metadata {}".format(f, requested_file.metadata))
357
358 # if no special mode is required just check it does exists
359 if not mode:
360 return True
361
362 if requested_file.metadata["type"] == mode:
363 return True
364
365 if requested_file.metadata["type"] == "sym" and mode == "file":
366 return True
367
368 return False
369
370 def file_size(self, storage):
371 """
372 return file size
373 :param storage: can be a str or a str list
374 :return: file size
375 """
376 f = storage if isinstance(storage, str) else "/".join(storage)
377 f = f.rstrip("/")
378
379 cursor = self.fs.find({"filename": f})
380
381 for requested_file in cursor:
382 exception_file = next(cursor, None)
383
384 if exception_file:
385 raise FsException(
386 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
387 )
388
389 return requested_file.length
390
391 def file_extract(self, compressed_object, path):
392 """
393 extract a tar file
394 :param compressed_object: object of type tar or zip
395 :param path: can be a str or a str list, or a tar object where to extract the tar_object
396 :return: None
397 """
398 f = path if isinstance(path, str) else "/".join(path)
399 f = f.rstrip("/")
400
401 if type(compressed_object) is tarfile.TarFile:
402 for member in compressed_object.getmembers():
403 if member.isfile():
404 stream = compressed_object.extractfile(member)
405 elif member.issym():
406 stream = BytesIO(member.linkname.encode("utf-8"))
407 else:
408 stream = BytesIO()
409
410 if member.isfile():
411 file_type = "file"
412 elif member.issym():
413 file_type = "sym"
414 else:
415 file_type = "dir"
416
417 metadata = {"type": file_type, "permissions": member.mode}
418 member.name = member.name.rstrip("/")
419
420 self.logger.debug("Uploading {}/{}".format(f, member.name))
421 self.fs.upload_from_stream(
422 f + "/" + member.name, stream, metadata=metadata
423 )
424
425 stream.close()
426 elif type(compressed_object) is zipfile.ZipFile:
427 for member in compressed_object.infolist():
428 if member.is_dir():
429 stream = BytesIO()
430 else:
431 stream = compressed_object.read(member)
432
433 if member.is_dir():
434 file_type = "dir"
435 else:
436 file_type = "file"
437
438 metadata = {"type": file_type}
439 member.filename = member.filename.rstrip("/")
440
441 self.logger.debug("Uploading {}/{}".format(f, member.filename))
442 self.fs.upload_from_stream(
443 f + "/" + member.filename, stream, metadata=metadata
444 )
445
446 if member.is_dir():
447 stream.close()
448
449 def file_open(self, storage, mode):
450 """
451 Open a file
452 :param storage: can be a str or list of str
453 :param mode: file mode
454 :return: file object
455 """
456 try:
457 f = storage if isinstance(storage, str) else "/".join(storage)
458 f = f.rstrip("/")
459
460 if "b" in mode:
461 return GridByteStream(f, self.fs, mode)
462 else:
463 return GridStringStream(f, self.fs, mode)
464 except errors.NoFile:
465 raise FsException(
466 "File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND
467 )
468 except IOError:
469 raise FsException(
470 "File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST
471 )
472
473 def dir_ls(self, storage):
474 """
475 return folder content
476 :param storage: can be a str or list of str
477 :return: folder content
478 """
479 try:
480 f = storage if isinstance(storage, str) else "/".join(storage)
481 f = f.rstrip("/")
482
483 files = []
484 dir_cursor = self.fs.find({"filename": f})
485 for requested_dir in dir_cursor:
486 exception_dir = next(dir_cursor, None)
487
488 if exception_dir:
489 raise FsException(
490 "Multiple directories found",
491 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
492 )
493
494 if requested_dir.metadata["type"] != "dir":
495 raise FsException(
496 "File {} does not exist".format(f),
497 http_code=HTTPStatus.NOT_FOUND,
498 )
499
500 if f.endswith("/"):
501 f = f[:-1]
502
503 files_cursor = self.fs.find(
504 {"filename": {"$regex": "^{}/([^/])*".format(f)}}
505 )
506 for children_file in files_cursor:
507 files += [children_file.filename.replace(f + "/", "", 1)]
508
509 return files
510 except IOError:
511 raise FsException(
512 "File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST
513 )
514
515 def file_delete(self, storage, ignore_non_exist=False):
516 """
517 Delete storage content recursively
518 :param storage: can be a str or list of str
519 :param ignore_non_exist: not raise exception if storage does not exist
520 :return: None
521 """
522 try:
523 f = storage if isinstance(storage, str) else "/".join(storage)
524 f = f.rstrip("/")
525
526 file_cursor = self.fs.find({"filename": f})
527 found = False
528 for requested_file in file_cursor:
529 found = True
530 exception_file = next(file_cursor, None)
531
532 if exception_file:
533 self.logger.error(
534 "Cannot delete duplicate file: {} and {}".format(
535 requested_file.filename, exception_file.filename
536 )
537 )
538 raise FsException(
539 "Multiple files found",
540 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
541 )
542
543 if requested_file.metadata["type"] == "dir":
544 dir_cursor = self.fs.find(
545 {"filename": {"$regex": "^{}/".format(f)}}
546 )
547
548 for tmp in dir_cursor:
549 self.logger.debug("Deleting {}".format(tmp.filename))
550 self.fs.delete(tmp._id)
551
552 self.logger.debug("Deleting {}".format(requested_file.filename))
553 self.fs.delete(requested_file._id)
554 if not found and not ignore_non_exist:
555 raise FsException(
556 "File {} does not exist".format(storage),
557 http_code=HTTPStatus.NOT_FOUND,
558 )
559 except IOError as e:
560 raise FsException(
561 "File {} cannot be deleted: {}".format(f, e),
562 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
563 )
564
565 def sync(self, from_path=None):
566 """
567 Sync from FSMongo to local storage
568 :param from_path: if supplied, only copy content from this path, not all
569 :return: None
570 """
571 if from_path:
572 if os.path.isabs(from_path):
573 from_path = os.path.relpath(from_path, self.path)
574 self.__update_local_fs(from_path=from_path)
575
576 def _update_mongo_fs(self, from_path):
577
578 os_path = self.path + from_path
579
580 # Obtain list of files and dirs in filesystem
581 members = []
582 for root, dirs, files in os.walk(os_path):
583 for folder in dirs:
584 member = {"filename": os.path.join(root, folder), "type": "dir"}
585 if os.path.islink(member["filename"]):
586 member["type"] = "sym"
587 members.append(member)
588 for file in files:
589 filename = os.path.join(root, file)
590 if os.path.islink(filename):
591 file_type = "sym"
592 else:
593 file_type = "file"
594 member = {"filename": os.path.join(root, file), "type": file_type}
595 members.append(member)
596
597 # Obtain files in mongo dict
598 remote_files = self._get_mongo_files(from_path)
599
600 # Upload members if they do not exists or have been modified
601 # We will do this for performance (avoid updating unmodified files) and to avoid
602 # updating a file with an older one in case there are two sources for synchronization
603 # in high availability scenarios
604 for member in members:
605 # obtain permission
606 mask = int(oct(os.stat(member["filename"]).st_mode)[-3:], 8)
607
608 # convert to relative path
609 rel_filename = os.path.relpath(member["filename"], self.path)
610 last_modified_date = datetime.datetime.fromtimestamp(
611 os.path.getmtime(member["filename"])
612 )
613
614 remote_file = remote_files.get(rel_filename)
615 upload_date = (
616 remote_file[0].uploadDate if remote_file else datetime.datetime.min
617 )
618 # remove processed files from dict
619 remote_files.pop(rel_filename, None)
620
621 if last_modified_date >= upload_date:
622
623 stream = None
624 fh = None
625 try:
626 file_type = member["type"]
627 if file_type == "dir":
628 stream = BytesIO()
629 elif file_type == "sym":
630 stream = BytesIO(
631 os.readlink(member["filename"]).encode("utf-8")
632 )
633 else:
634 fh = open(member["filename"], "rb")
635 stream = BytesIO(fh.read())
636
637 metadata = {"type": file_type, "permissions": mask}
638
639 self.logger.debug("Sync upload {}".format(rel_filename))
640 self.fs.upload_from_stream(rel_filename, stream, metadata=metadata)
641
642 # delete old files
643 if remote_file:
644 for file in remote_file:
645 self.logger.debug("Sync deleting {}".format(file.filename))
646 self.fs.delete(file._id)
647 finally:
648 if fh:
649 fh.close()
650 if stream:
651 stream.close()
652
653 # delete files that are not any more in local fs
654 for remote_file in remote_files.values():
655 for file in remote_file:
656 self.fs.delete(file._id)
657
658 def _get_mongo_files(self, from_path=None):
659
660 file_dict = {}
661 file_cursor = self.fs.find(no_cursor_timeout=True, sort=[("uploadDate", -1)])
662 for file in file_cursor:
663 if from_path and not file.filename.startswith(from_path):
664 continue
665 if file.filename in file_dict:
666 file_dict[file.filename].append(file)
667 else:
668 file_dict[file.filename] = [file]
669 return file_dict
670
671 def reverse_sync(self, from_path: str):
672 """
673 Sync from local storage to FSMongo
674 :param from_path: base directory to upload content to mongo fs
675 :return: None
676 """
677 if os.path.isabs(from_path):
678 from_path = os.path.relpath(from_path, self.path)
679 self._update_mongo_fs(from_path=from_path)