51dc11f6edb58992d819262e2433e339b43c12f0
[osm/common.git] / osm_common / fsmongo.py
1 # Copyright 2019 Canonical
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
13 # under the License.
14 #
15 # For those usages not covered by the Apache License, Version 2.0 please
16 # contact: eduardo.sousa@canonical.com
17 ##
18
19 import errno
20 from http import HTTPStatus
21 from io import BytesIO, StringIO
22 import logging
23 import os
24 import datetime
25
26 from gridfs import GridFSBucket, errors
27 from osm_common.fsbase import FsBase, FsException
28 from pymongo import MongoClient
29
30
31 __author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>"
32
33
34 class GridByteStream(BytesIO):
35 def __init__(self, filename, fs, mode):
36 BytesIO.__init__(self)
37 self._id = None
38 self.filename = filename
39 self.fs = fs
40 self.mode = mode
41 self.file_type = "file" # Set "file" as default file_type
42
43 self.__initialize__()
44
45 def __initialize__(self):
46 grid_file = None
47
48 cursor = self.fs.find({"filename": self.filename})
49
50 for requested_file in cursor:
51 exception_file = next(cursor, None)
52
53 if exception_file:
54 raise FsException(
55 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
56 )
57
58 if requested_file.metadata["type"] in ("file", "sym"):
59 grid_file = requested_file
60 self.file_type = requested_file.metadata["type"]
61 else:
62 raise FsException(
63 "Type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
64 )
65
66 if grid_file:
67 self._id = grid_file._id
68 self.fs.download_to_stream(self._id, self)
69
70 if "r" in self.mode:
71 self.seek(0, 0)
72
73 def close(self):
74 if "r" in self.mode:
75 super(GridByteStream, self).close()
76 return
77
78 if self._id:
79 self.fs.delete(self._id)
80
81 cursor = self.fs.find(
82 {"filename": self.filename.split("/")[0], "metadata": {"type": "dir"}}
83 )
84
85 parent_dir = next(cursor, None)
86
87 if not parent_dir:
88 parent_dir_name = self.filename.split("/")[0]
89 self.filename = self.filename.replace(
90 parent_dir_name, parent_dir_name[:-1], 1
91 )
92
93 self.seek(0, 0)
94 if self._id:
95 self.fs.upload_from_stream_with_id(
96 self._id, self.filename, self, metadata={"type": self.file_type}
97 )
98 else:
99 self.fs.upload_from_stream(
100 self.filename, self, metadata={"type": self.file_type}
101 )
102 super(GridByteStream, self).close()
103
104 def __enter__(self):
105 return self
106
107 def __exit__(self, exc_type, exc_val, exc_tb):
108 self.close()
109
110
111 class GridStringStream(StringIO):
112 def __init__(self, filename, fs, mode):
113 StringIO.__init__(self)
114 self._id = None
115 self.filename = filename
116 self.fs = fs
117 self.mode = mode
118 self.file_type = "file" # Set "file" as default file_type
119
120 self.__initialize__()
121
122 def __initialize__(self):
123 grid_file = None
124
125 cursor = self.fs.find({"filename": self.filename})
126
127 for requested_file in cursor:
128 exception_file = next(cursor, None)
129
130 if exception_file:
131 raise FsException(
132 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
133 )
134
135 if requested_file.metadata["type"] in ("file", "dir"):
136 grid_file = requested_file
137 self.file_type = requested_file.metadata["type"]
138 else:
139 raise FsException(
140 "File type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
141 )
142
143 if grid_file:
144 stream = BytesIO()
145 self._id = grid_file._id
146 self.fs.download_to_stream(self._id, stream)
147 stream.seek(0)
148 self.write(stream.read().decode("utf-8"))
149 stream.close()
150
151 if "r" in self.mode:
152 self.seek(0, 0)
153
154 def close(self):
155 if "r" in self.mode:
156 super(GridStringStream, self).close()
157 return
158
159 if self._id:
160 self.fs.delete(self._id)
161
162 cursor = self.fs.find(
163 {"filename": self.filename.split("/")[0], "metadata": {"type": "dir"}}
164 )
165
166 parent_dir = next(cursor, None)
167
168 if not parent_dir:
169 parent_dir_name = self.filename.split("/")[0]
170 self.filename = self.filename.replace(
171 parent_dir_name, parent_dir_name[:-1], 1
172 )
173
174 self.seek(0, 0)
175 stream = BytesIO()
176 stream.write(self.read().encode("utf-8"))
177 stream.seek(0, 0)
178 if self._id:
179 self.fs.upload_from_stream_with_id(
180 self._id, self.filename, stream, metadata={"type": self.file_type}
181 )
182 else:
183 self.fs.upload_from_stream(
184 self.filename, stream, metadata={"type": self.file_type}
185 )
186 stream.close()
187 super(GridStringStream, self).close()
188
189 def __enter__(self):
190 return self
191
192 def __exit__(self, exc_type, exc_val, exc_tb):
193 self.close()
194
195
196 class FsMongo(FsBase):
197 def __init__(self, logger_name="fs", lock=False):
198 super().__init__(logger_name, lock)
199 self.path = None
200 self.client = None
201 self.fs = None
202
203 def __update_local_fs(self, from_path=None):
204 dir_cursor = self.fs.find({"metadata.type": "dir"}, no_cursor_timeout=True)
205
206 valid_paths = []
207
208 for directory in dir_cursor:
209 if from_path and not directory.filename.startswith(from_path):
210 continue
211 os.makedirs(self.path + directory.filename, exist_ok=True)
212 valid_paths.append(self.path + directory.filename)
213
214 file_cursor = self.fs.find(
215 {"metadata.type": {"$in": ["file", "sym"]}}, no_cursor_timeout=True
216 )
217
218 for writing_file in file_cursor:
219 if from_path and not writing_file.filename.startswith(from_path):
220 continue
221 file_path = self.path + writing_file.filename
222
223 if writing_file.metadata["type"] == "sym":
224 with BytesIO() as b:
225 self.fs.download_to_stream(writing_file._id, b)
226 b.seek(0)
227 link = b.read().decode("utf-8")
228
229 try:
230 os.remove(file_path)
231 except OSError as e:
232 if e.errno != errno.ENOENT:
233 # This is probably permission denied or worse
234 raise
235 os.symlink(link, file_path)
236 else:
237 folder = os.path.dirname(file_path)
238 if folder not in valid_paths:
239 os.makedirs(folder, exist_ok=True)
240 with open(file_path, "wb+") as file_stream:
241 self.fs.download_to_stream(writing_file._id, file_stream)
242 if "permissions" in writing_file.metadata:
243 os.chmod(file_path, writing_file.metadata["permissions"])
244
245 def get_params(self):
246 return {"fs": "mongo", "path": self.path}
247
248 def fs_connect(self, config):
249 try:
250 if "logger_name" in config:
251 self.logger = logging.getLogger(config["logger_name"])
252 if "path" in config:
253 self.path = config["path"]
254 else:
255 raise FsException('Missing parameter "path"')
256 if not self.path.endswith("/"):
257 self.path += "/"
258 if not os.path.exists(self.path):
259 raise FsException(
260 "Invalid configuration param at '[storage]': path '{}' does not exist".format(
261 config["path"]
262 )
263 )
264 elif not os.access(self.path, os.W_OK):
265 raise FsException(
266 "Invalid configuration param at '[storage]': path '{}' is not writable".format(
267 config["path"]
268 )
269 )
270 if all(key in config.keys() for key in ["uri", "collection"]):
271 self.client = MongoClient(config["uri"])
272 self.fs = GridFSBucket(self.client[config["collection"]])
273 elif all(key in config.keys() for key in ["host", "port", "collection"]):
274 self.client = MongoClient(config["host"], config["port"])
275 self.fs = GridFSBucket(self.client[config["collection"]])
276 else:
277 if "collection" not in config.keys():
278 raise FsException('Missing parameter "collection"')
279 else:
280 raise FsException('Missing parameters: "uri" or "host" + "port"')
281 except FsException:
282 raise
283 except Exception as e: # TODO refine
284 raise FsException(str(e))
285
286 def fs_disconnect(self):
287 pass # TODO
288
289 def mkdir(self, folder):
290 """
291 Creates a folder or parent object location
292 :param folder:
293 :return: None or raises an exception
294 """
295 try:
296 self.fs.upload_from_stream(folder, BytesIO(), metadata={"type": "dir"})
297 except errors.FileExists: # make it idempotent
298 pass
299 except Exception as e:
300 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
301
302 def dir_rename(self, src, dst):
303 """
304 Rename one directory name. If dst exist, it replaces (deletes) existing directory
305 :param src: source directory
306 :param dst: destination directory
307 :return: None or raises and exception
308 """
309 try:
310 dst_cursor = self.fs.find(
311 {"filename": {"$regex": "^{}(/|$)".format(dst)}}, no_cursor_timeout=True
312 )
313
314 for dst_file in dst_cursor:
315 self.fs.delete(dst_file._id)
316
317 src_cursor = self.fs.find(
318 {"filename": {"$regex": "^{}(/|$)".format(src)}}, no_cursor_timeout=True
319 )
320
321 for src_file in src_cursor:
322 self.fs.rename(src_file._id, src_file.filename.replace(src, dst, 1))
323 except Exception as e:
324 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
325
326 def file_exists(self, storage, mode=None):
327 """
328 Indicates if "storage" file exist
329 :param storage: can be a str or a str list
330 :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
331 :return: True, False
332 """
333 f = storage if isinstance(storage, str) else "/".join(storage)
334
335 cursor = self.fs.find({"filename": f})
336
337 for requested_file in cursor:
338 exception_file = next(cursor, None)
339
340 if exception_file:
341 raise FsException(
342 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
343 )
344
345 # if no special mode is required just check it does exists
346 if not mode:
347 return True
348
349 if requested_file.metadata["type"] == mode:
350 return True
351
352 if requested_file.metadata["type"] == "sym" and mode == "file":
353 return True
354
355 return False
356
357 def file_size(self, storage):
358 """
359 return file size
360 :param storage: can be a str or a str list
361 :return: file size
362 """
363 f = storage if isinstance(storage, str) else "/".join(storage)
364
365 cursor = self.fs.find({"filename": f})
366
367 for requested_file in cursor:
368 exception_file = next(cursor, None)
369
370 if exception_file:
371 raise FsException(
372 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
373 )
374
375 return requested_file.length
376
377 def file_extract(self, tar_object, path):
378 """
379 extract a tar file
380 :param tar_object: object of type tar
381 :param path: can be a str or a str list, or a tar object where to extract the tar_object
382 :return: None
383 """
384 f = path if isinstance(path, str) else "/".join(path)
385
386 for member in tar_object.getmembers():
387 if member.isfile():
388 stream = tar_object.extractfile(member)
389 elif member.issym():
390 stream = BytesIO(member.linkname.encode("utf-8"))
391 else:
392 stream = BytesIO()
393
394 if member.isfile():
395 file_type = "file"
396 elif member.issym():
397 file_type = "sym"
398 else:
399 file_type = "dir"
400
401 metadata = {"type": file_type, "permissions": member.mode}
402
403 self.fs.upload_from_stream(f + "/" + member.name, stream, metadata=metadata)
404
405 stream.close()
406
407 def file_open(self, storage, mode):
408 """
409 Open a file
410 :param storage: can be a str or list of str
411 :param mode: file mode
412 :return: file object
413 """
414 try:
415 f = storage if isinstance(storage, str) else "/".join(storage)
416
417 if "b" in mode:
418 return GridByteStream(f, self.fs, mode)
419 else:
420 return GridStringStream(f, self.fs, mode)
421 except errors.NoFile:
422 raise FsException(
423 "File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND
424 )
425 except IOError:
426 raise FsException(
427 "File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST
428 )
429
430 def dir_ls(self, storage):
431 """
432 return folder content
433 :param storage: can be a str or list of str
434 :return: folder content
435 """
436 try:
437 f = storage if isinstance(storage, str) else "/".join(storage)
438
439 files = []
440 dir_cursor = self.fs.find({"filename": f})
441 for requested_dir in dir_cursor:
442 exception_dir = next(dir_cursor, None)
443
444 if exception_dir:
445 raise FsException(
446 "Multiple directories found",
447 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
448 )
449
450 if requested_dir.metadata["type"] != "dir":
451 raise FsException(
452 "File {} does not exist".format(f),
453 http_code=HTTPStatus.NOT_FOUND,
454 )
455
456 files_cursor = self.fs.find(
457 {"filename": {"$regex": "^{}/([^/])*".format(f)}}
458 )
459 for children_file in files_cursor:
460 files += [children_file.filename.replace(f + "/", "", 1)]
461
462 return files
463 except IOError:
464 raise FsException(
465 "File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST
466 )
467
468 def file_delete(self, storage, ignore_non_exist=False):
469 """
470 Delete storage content recursively
471 :param storage: can be a str or list of str
472 :param ignore_non_exist: not raise exception if storage does not exist
473 :return: None
474 """
475 try:
476 f = storage if isinstance(storage, str) else "/".join(storage)
477
478 file_cursor = self.fs.find({"filename": f})
479 found = False
480 for requested_file in file_cursor:
481 found = True
482 exception_file = next(file_cursor, None)
483
484 if exception_file:
485 raise FsException(
486 "Multiple files found",
487 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
488 )
489
490 if requested_file.metadata["type"] == "dir":
491 dir_cursor = self.fs.find({"filename": {"$regex": "^{}".format(f)}})
492
493 for tmp in dir_cursor:
494 self.fs.delete(tmp._id)
495 else:
496 self.fs.delete(requested_file._id)
497 if not found and not ignore_non_exist:
498 raise FsException(
499 "File {} does not exist".format(storage),
500 http_code=HTTPStatus.NOT_FOUND,
501 )
502 except IOError as e:
503 raise FsException(
504 "File {} cannot be deleted: {}".format(f, e),
505 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
506 )
507
508 def sync(self, from_path=None):
509 """
510 Sync from FSMongo to local storage
511 :param from_path: if supplied, only copy content from this path, not all
512 :return: None
513 """
514 if from_path:
515 if os.path.isabs(from_path):
516 from_path = os.path.relpath(from_path, self.path)
517 self.__update_local_fs(from_path=from_path)
518
519 def _update_mongo_fs(self, from_path):
520
521 os_path = self.path + from_path
522
523 # Obtain list of files and dirs in filesystem
524 members = []
525 for root, dirs, files in os.walk(os_path):
526 for folder in dirs:
527 member = {"filename": os.path.join(root, folder), "type": "dir"}
528 members.append(member)
529 for file in files:
530 filename = os.path.join(root, file)
531 if os.path.islink(filename):
532 file_type = "sym"
533 else:
534 file_type = "file"
535 member = {"filename": os.path.join(root, file), "type": file_type}
536 members.append(member)
537
538 # Obtain files in mongo dict
539 remote_files = self._get_mongo_files(from_path)
540
541 # Upload members if they do not exists or have been modified
542 # We will do this for performance (avoid updating unmodified files) and to avoid
543 # updating a file with an older one in case there are two sources for synchronization
544 # in high availability scenarios
545 for member in members:
546 # obtain permission
547 mask = int(oct(os.stat(member["filename"]).st_mode)[-3:], 8)
548
549 # convert to relative path
550 rel_filename = os.path.relpath(member["filename"], self.path)
551 last_modified_date = datetime.datetime.fromtimestamp(
552 os.path.getmtime(member["filename"])
553 )
554
555 remote_file = remote_files.get(rel_filename)
556 upload_date = (
557 remote_file[0].uploadDate if remote_file else datetime.datetime.min
558 )
559 # remove processed files from dict
560 remote_files.pop(rel_filename, None)
561
562 if last_modified_date >= upload_date:
563
564 stream = None
565 fh = None
566 try:
567 file_type = member["type"]
568 if file_type == "dir":
569 stream = BytesIO()
570 elif file_type == "sym":
571 stream = BytesIO(
572 os.readlink(member["filename"]).encode("utf-8")
573 )
574 else:
575 fh = open(member["filename"], "rb")
576 stream = BytesIO(fh.read())
577
578 metadata = {"type": file_type, "permissions": mask}
579
580 self.fs.upload_from_stream(rel_filename, stream, metadata=metadata)
581
582 # delete old files
583 if remote_file:
584 for file in remote_file:
585 self.fs.delete(file._id)
586 finally:
587 if fh:
588 fh.close()
589 if stream:
590 stream.close()
591
592 # delete files that are not any more in local fs
593 for remote_file in remote_files.values():
594 for file in remote_file:
595 self.fs.delete(file._id)
596
597 def _get_mongo_files(self, from_path=None):
598
599 file_dict = {}
600 file_cursor = self.fs.find(no_cursor_timeout=True, sort=[("uploadDate", -1)])
601 for file in file_cursor:
602 if from_path and not file.filename.startswith(from_path):
603 continue
604 if file.filename in file_dict:
605 file_dict[file.filename].append(file)
606 else:
607 file_dict[file.filename] = [file]
608 return file_dict
609
610 def reverse_sync(self, from_path: str):
611 """
612 Sync from local storage to FSMongo
613 :param from_path: base directory to upload content to mongo fs
614 :return: None
615 """
616 if os.path.isabs(from_path):
617 from_path = os.path.relpath(from_path, self.path)
618 self._update_mongo_fs(from_path=from_path)