Enable parallel execution and output of tox env
[osm/common.git] / osm_common / fsmongo.py
1 # Copyright 2019 Canonical
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
13 # under the License.
14 #
15 # For those usages not covered by the Apache License, Version 2.0 please
16 # contact: eduardo.sousa@canonical.com
17 ##
18
19 import errno
20 from http import HTTPStatus
21 from io import BytesIO, StringIO
22 import logging
23 import os
24 import datetime
25
26 from gridfs import GridFSBucket, errors
27 from osm_common.fsbase import FsBase, FsException
28 from pymongo import MongoClient
29
30
31 __author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>"
32
33
34 class GridByteStream(BytesIO):
35 def __init__(self, filename, fs, mode):
36 BytesIO.__init__(self)
37 self._id = None
38 self.filename = filename
39 self.fs = fs
40 self.mode = mode
41 self.file_type = "file" # Set "file" as default file_type
42
43 self.__initialize__()
44
45 def __initialize__(self):
46 grid_file = None
47
48 cursor = self.fs.find({"filename": self.filename})
49
50 for requested_file in cursor:
51 exception_file = next(cursor, None)
52
53 if exception_file:
54 raise FsException(
55 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
56 )
57
58 if requested_file.metadata["type"] in ("file", "sym"):
59 grid_file = requested_file
60 self.file_type = requested_file.metadata["type"]
61 else:
62 raise FsException(
63 "Type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
64 )
65
66 if grid_file:
67 self._id = grid_file._id
68 self.fs.download_to_stream(self._id, self)
69
70 if "r" in self.mode:
71 self.seek(0, 0)
72
73 def close(self):
74 if "r" in self.mode:
75 super(GridByteStream, self).close()
76 return
77
78 if self._id:
79 self.fs.delete(self._id)
80
81 cursor = self.fs.find(
82 {"filename": self.filename.split("/")[0], "metadata": {"type": "dir"}}
83 )
84
85 parent_dir = next(cursor, None)
86
87 if not parent_dir:
88 parent_dir_name = self.filename.split("/")[0]
89 self.filename = self.filename.replace(
90 parent_dir_name, parent_dir_name[:-1], 1
91 )
92
93 self.seek(0, 0)
94 if self._id:
95 self.fs.upload_from_stream_with_id(
96 self._id, self.filename, self, metadata={"type": self.file_type}
97 )
98 else:
99 self.fs.upload_from_stream(
100 self.filename, self, metadata={"type": self.file_type}
101 )
102 super(GridByteStream, self).close()
103
104 def __enter__(self):
105 return self
106
107 def __exit__(self, exc_type, exc_val, exc_tb):
108 self.close()
109
110
111 class GridStringStream(StringIO):
112 def __init__(self, filename, fs, mode):
113 StringIO.__init__(self)
114 self._id = None
115 self.filename = filename
116 self.fs = fs
117 self.mode = mode
118 self.file_type = "file" # Set "file" as default file_type
119
120 self.__initialize__()
121
122 def __initialize__(self):
123 grid_file = None
124
125 cursor = self.fs.find({"filename": self.filename})
126
127 for requested_file in cursor:
128 exception_file = next(cursor, None)
129
130 if exception_file:
131 raise FsException(
132 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
133 )
134
135 if requested_file.metadata["type"] in ("file", "dir"):
136 grid_file = requested_file
137 self.file_type = requested_file.metadata["type"]
138 else:
139 raise FsException(
140 "File type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
141 )
142
143 if grid_file:
144 stream = BytesIO()
145 self._id = grid_file._id
146 self.fs.download_to_stream(self._id, stream)
147 stream.seek(0)
148 self.write(stream.read().decode("utf-8"))
149 stream.close()
150
151 if "r" in self.mode:
152 self.seek(0, 0)
153
154 def close(self):
155 if "r" in self.mode:
156 super(GridStringStream, self).close()
157 return
158
159 if self._id:
160 self.fs.delete(self._id)
161
162 cursor = self.fs.find(
163 {"filename": self.filename.split("/")[0], "metadata": {"type": "dir"}}
164 )
165
166 parent_dir = next(cursor, None)
167
168 if not parent_dir:
169 parent_dir_name = self.filename.split("/")[0]
170 self.filename = self.filename.replace(
171 parent_dir_name, parent_dir_name[:-1], 1
172 )
173
174 self.seek(0, 0)
175 stream = BytesIO()
176 stream.write(self.read().encode("utf-8"))
177 stream.seek(0, 0)
178 if self._id:
179 self.fs.upload_from_stream_with_id(
180 self._id, self.filename, stream, metadata={"type": self.file_type}
181 )
182 else:
183 self.fs.upload_from_stream(
184 self.filename, stream, metadata={"type": self.file_type}
185 )
186 stream.close()
187 super(GridStringStream, self).close()
188
189 def __enter__(self):
190 return self
191
192 def __exit__(self, exc_type, exc_val, exc_tb):
193 self.close()
194
195
196 class FsMongo(FsBase):
197 def __init__(self, logger_name="fs", lock=False):
198 super().__init__(logger_name, lock)
199 self.path = None
200 self.client = None
201 self.fs = None
202
203 def __update_local_fs(self, from_path=None):
204 dir_cursor = self.fs.find({"metadata.type": "dir"}, no_cursor_timeout=True)
205
206 for directory in dir_cursor:
207 if from_path and not directory.filename.startswith(from_path):
208 continue
209 os.makedirs(self.path + directory.filename, exist_ok=True)
210
211 file_cursor = self.fs.find(
212 {"metadata.type": {"$in": ["file", "sym"]}}, no_cursor_timeout=True
213 )
214
215 for writing_file in file_cursor:
216 if from_path and not writing_file.filename.startswith(from_path):
217 continue
218 file_path = self.path + writing_file.filename
219
220 if writing_file.metadata["type"] == "sym":
221 with BytesIO() as b:
222 self.fs.download_to_stream(writing_file._id, b)
223 b.seek(0)
224 link = b.read().decode("utf-8")
225
226 try:
227 os.remove(file_path)
228 except OSError as e:
229 if e.errno != errno.ENOENT:
230 # This is probably permission denied or worse
231 raise
232 os.symlink(link, file_path)
233 else:
234 with open(file_path, "wb+") as file_stream:
235 self.fs.download_to_stream(writing_file._id, file_stream)
236 if "permissions" in writing_file.metadata:
237 os.chmod(file_path, writing_file.metadata["permissions"])
238
239 def get_params(self):
240 return {"fs": "mongo", "path": self.path}
241
242 def fs_connect(self, config):
243 try:
244 if "logger_name" in config:
245 self.logger = logging.getLogger(config["logger_name"])
246 if "path" in config:
247 self.path = config["path"]
248 else:
249 raise FsException('Missing parameter "path"')
250 if not self.path.endswith("/"):
251 self.path += "/"
252 if not os.path.exists(self.path):
253 raise FsException(
254 "Invalid configuration param at '[storage]': path '{}' does not exist".format(
255 config["path"]
256 )
257 )
258 elif not os.access(self.path, os.W_OK):
259 raise FsException(
260 "Invalid configuration param at '[storage]': path '{}' is not writable".format(
261 config["path"]
262 )
263 )
264 if all(key in config.keys() for key in ["uri", "collection"]):
265 self.client = MongoClient(config["uri"])
266 self.fs = GridFSBucket(self.client[config["collection"]])
267 elif all(key in config.keys() for key in ["host", "port", "collection"]):
268 self.client = MongoClient(config["host"], config["port"])
269 self.fs = GridFSBucket(self.client[config["collection"]])
270 else:
271 if "collection" not in config.keys():
272 raise FsException('Missing parameter "collection"')
273 else:
274 raise FsException('Missing parameters: "uri" or "host" + "port"')
275 except FsException:
276 raise
277 except Exception as e: # TODO refine
278 raise FsException(str(e))
279
280 def fs_disconnect(self):
281 pass # TODO
282
283 def mkdir(self, folder):
284 """
285 Creates a folder or parent object location
286 :param folder:
287 :return: None or raises an exception
288 """
289 try:
290 self.fs.upload_from_stream(folder, BytesIO(), metadata={"type": "dir"})
291 except errors.FileExists: # make it idempotent
292 pass
293 except Exception as e:
294 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
295
296 def dir_rename(self, src, dst):
297 """
298 Rename one directory name. If dst exist, it replaces (deletes) existing directory
299 :param src: source directory
300 :param dst: destination directory
301 :return: None or raises and exception
302 """
303 try:
304 dst_cursor = self.fs.find(
305 {"filename": {"$regex": "^{}(/|$)".format(dst)}}, no_cursor_timeout=True
306 )
307
308 for dst_file in dst_cursor:
309 self.fs.delete(dst_file._id)
310
311 src_cursor = self.fs.find(
312 {"filename": {"$regex": "^{}(/|$)".format(src)}}, no_cursor_timeout=True
313 )
314
315 for src_file in src_cursor:
316 self.fs.rename(src_file._id, src_file.filename.replace(src, dst, 1))
317 except Exception as e:
318 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
319
320 def file_exists(self, storage, mode=None):
321 """
322 Indicates if "storage" file exist
323 :param storage: can be a str or a str list
324 :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
325 :return: True, False
326 """
327 f = storage if isinstance(storage, str) else "/".join(storage)
328
329 cursor = self.fs.find({"filename": f})
330
331 for requested_file in cursor:
332 exception_file = next(cursor, None)
333
334 if exception_file:
335 raise FsException(
336 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
337 )
338
339 # if no special mode is required just check it does exists
340 if not mode:
341 return True
342
343 if requested_file.metadata["type"] == mode:
344 return True
345
346 if requested_file.metadata["type"] == "sym" and mode == "file":
347 return True
348
349 return False
350
351 def file_size(self, storage):
352 """
353 return file size
354 :param storage: can be a str or a str list
355 :return: file size
356 """
357 f = storage if isinstance(storage, str) else "/".join(storage)
358
359 cursor = self.fs.find({"filename": f})
360
361 for requested_file in cursor:
362 exception_file = next(cursor, None)
363
364 if exception_file:
365 raise FsException(
366 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
367 )
368
369 return requested_file.length
370
371 def file_extract(self, tar_object, path):
372 """
373 extract a tar file
374 :param tar_object: object of type tar
375 :param path: can be a str or a str list, or a tar object where to extract the tar_object
376 :return: None
377 """
378 f = path if isinstance(path, str) else "/".join(path)
379
380 for member in tar_object.getmembers():
381 if member.isfile():
382 stream = tar_object.extractfile(member)
383 elif member.issym():
384 stream = BytesIO(member.linkname.encode("utf-8"))
385 else:
386 stream = BytesIO()
387
388 if member.isfile():
389 file_type = "file"
390 elif member.issym():
391 file_type = "sym"
392 else:
393 file_type = "dir"
394
395 metadata = {"type": file_type, "permissions": member.mode}
396
397 self.fs.upload_from_stream(f + "/" + member.name, stream, metadata=metadata)
398
399 stream.close()
400
401 def file_open(self, storage, mode):
402 """
403 Open a file
404 :param storage: can be a str or list of str
405 :param mode: file mode
406 :return: file object
407 """
408 try:
409 f = storage if isinstance(storage, str) else "/".join(storage)
410
411 if "b" in mode:
412 return GridByteStream(f, self.fs, mode)
413 else:
414 return GridStringStream(f, self.fs, mode)
415 except errors.NoFile:
416 raise FsException(
417 "File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND
418 )
419 except IOError:
420 raise FsException(
421 "File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST
422 )
423
424 def dir_ls(self, storage):
425 """
426 return folder content
427 :param storage: can be a str or list of str
428 :return: folder content
429 """
430 try:
431 f = storage if isinstance(storage, str) else "/".join(storage)
432
433 files = []
434 dir_cursor = self.fs.find({"filename": f})
435 for requested_dir in dir_cursor:
436 exception_dir = next(dir_cursor, None)
437
438 if exception_dir:
439 raise FsException(
440 "Multiple directories found",
441 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
442 )
443
444 if requested_dir.metadata["type"] != "dir":
445 raise FsException(
446 "File {} does not exist".format(f),
447 http_code=HTTPStatus.NOT_FOUND,
448 )
449
450 files_cursor = self.fs.find(
451 {"filename": {"$regex": "^{}/([^/])*".format(f)}}
452 )
453 for children_file in files_cursor:
454 files += [children_file.filename.replace(f + "/", "", 1)]
455
456 return files
457 except IOError:
458 raise FsException(
459 "File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST
460 )
461
462 def file_delete(self, storage, ignore_non_exist=False):
463 """
464 Delete storage content recursively
465 :param storage: can be a str or list of str
466 :param ignore_non_exist: not raise exception if storage does not exist
467 :return: None
468 """
469 try:
470 f = storage if isinstance(storage, str) else "/".join(storage)
471
472 file_cursor = self.fs.find({"filename": f})
473 found = False
474 for requested_file in file_cursor:
475 found = True
476 exception_file = next(file_cursor, None)
477
478 if exception_file:
479 raise FsException(
480 "Multiple files found",
481 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
482 )
483
484 if requested_file.metadata["type"] == "dir":
485 dir_cursor = self.fs.find({"filename": {"$regex": "^{}".format(f)}})
486
487 for tmp in dir_cursor:
488 self.fs.delete(tmp._id)
489 else:
490 self.fs.delete(requested_file._id)
491 if not found and not ignore_non_exist:
492 raise FsException(
493 "File {} does not exist".format(storage),
494 http_code=HTTPStatus.NOT_FOUND,
495 )
496 except IOError as e:
497 raise FsException(
498 "File {} cannot be deleted: {}".format(f, e),
499 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
500 )
501
502 def sync(self, from_path=None):
503 """
504 Sync from FSMongo to local storage
505 :param from_path: if supplied, only copy content from this path, not all
506 :return: None
507 """
508 if from_path:
509 if os.path.isabs(from_path):
510 from_path = os.path.relpath(from_path, self.path)
511 self.__update_local_fs(from_path=from_path)
512
513 def _update_mongo_fs(self, from_path):
514
515 os_path = self.path + from_path
516
517 # Obtain list of files and dirs in filesystem
518 members = []
519 for root, dirs, files in os.walk(os_path):
520 for folder in dirs:
521 member = {"filename": os.path.join(root, folder), "type": "dir"}
522 members.append(member)
523 for file in files:
524 filename = os.path.join(root, file)
525 if os.path.islink(filename):
526 file_type = "sym"
527 else:
528 file_type = "file"
529 member = {"filename": os.path.join(root, file), "type": file_type}
530 members.append(member)
531
532 # Obtain files in mongo dict
533 remote_files = self._get_mongo_files(from_path)
534
535 # Upload members if they do not exists or have been modified
536 # We will do this for performance (avoid updating unmodified files) and to avoid
537 # updating a file with an older one in case there are two sources for synchronization
538 # in high availability scenarios
539 for member in members:
540 # obtain permission
541 mask = int(oct(os.stat(member["filename"]).st_mode)[-3:], 8)
542
543 # convert to relative path
544 rel_filename = os.path.relpath(member["filename"], self.path)
545 last_modified_date = datetime.datetime.fromtimestamp(
546 os.path.getmtime(member["filename"])
547 )
548
549 remote_file = remote_files.get(rel_filename)
550 upload_date = (
551 remote_file[0].uploadDate if remote_file else datetime.datetime.min
552 )
553 # remove processed files from dict
554 remote_files.pop(rel_filename, None)
555
556 if last_modified_date >= upload_date:
557
558 stream = None
559 fh = None
560 try:
561 file_type = member["type"]
562 if file_type == "dir":
563 stream = BytesIO()
564 elif file_type == "sym":
565 stream = BytesIO(
566 os.readlink(member["filename"]).encode("utf-8")
567 )
568 else:
569 fh = open(member["filename"], "rb")
570 stream = BytesIO(fh.read())
571
572 metadata = {"type": file_type, "permissions": mask}
573
574 self.fs.upload_from_stream(rel_filename, stream, metadata=metadata)
575
576 # delete old files
577 if remote_file:
578 for file in remote_file:
579 self.fs.delete(file._id)
580 finally:
581 if fh:
582 fh.close()
583 if stream:
584 stream.close()
585
586 # delete files that are not any more in local fs
587 for remote_file in remote_files.values():
588 for file in remote_file:
589 self.fs.delete(file._id)
590
591 def _get_mongo_files(self, from_path=None):
592
593 file_dict = {}
594 file_cursor = self.fs.find(no_cursor_timeout=True, sort=[("uploadDate", -1)])
595 for file in file_cursor:
596 if from_path and not file.filename.startswith(from_path):
597 continue
598 if file.filename in file_dict:
599 file_dict[file.filename].append(file)
600 else:
601 file_dict[file.filename] = [file]
602 return file_dict
603
604 def reverse_sync(self, from_path: str):
605 """
606 Sync from local storage to FSMongo
607 :param from_path: base directory to upload content to mongo fs
608 :return: None
609 """
610 if os.path.isabs(from_path):
611 from_path = os.path.relpath(from_path, self.path)
612 self._update_mongo_fs(from_path=from_path)