Fixes Bug 1273 - aiokafka>0.7.0 needs extra dependency for python < 3.7.0
[osm/common.git] / osm_common / fsmongo.py
1 # Copyright 2019 Canonical
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
13 # under the License.
14 #
15 # For those usages not covered by the Apache License, Version 2.0 please
16 # contact: eduardo.sousa@canonical.com
17 ##
18
19 import errno
20 from http import HTTPStatus
21 from io import BytesIO, StringIO
22 import logging
23 import os
24 import datetime
25
26 from gridfs import GridFSBucket, errors
27 from osm_common.fsbase import FsBase, FsException
28 from pymongo import MongoClient
29
30
31 __author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>"
32
33
34 class GridByteStream(BytesIO):
35 def __init__(self, filename, fs, mode):
36 BytesIO.__init__(self)
37 self._id = None
38 self.filename = filename
39 self.fs = fs
40 self.mode = mode
41 self.file_type = "file" # Set "file" as default file_type
42
43 self.__initialize__()
44
45 def __initialize__(self):
46 grid_file = None
47
48 cursor = self.fs.find({"filename": self.filename})
49
50 for requested_file in cursor:
51 exception_file = next(cursor, None)
52
53 if exception_file:
54 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
55
56 if requested_file.metadata["type"] in ("file", "sym"):
57 grid_file = requested_file
58 self.file_type = requested_file.metadata["type"]
59 else:
60 raise FsException("Type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
61
62 if grid_file:
63 self._id = grid_file._id
64 self.fs.download_to_stream(self._id, self)
65
66 if "r" in self.mode:
67 self.seek(0, 0)
68
69 def close(self):
70 if "r" in self.mode:
71 super(GridByteStream, self).close()
72 return
73
74 if self._id:
75 self.fs.delete(self._id)
76
77 cursor = self.fs.find({
78 "filename": self.filename.split("/")[0],
79 "metadata": {"type": "dir"}})
80
81 parent_dir = next(cursor, None)
82
83 if not parent_dir:
84 parent_dir_name = self.filename.split("/")[0]
85 self.filename = self.filename.replace(parent_dir_name, parent_dir_name[:-1], 1)
86
87 self.seek(0, 0)
88 if self._id:
89 self.fs.upload_from_stream_with_id(
90 self._id,
91 self.filename,
92 self,
93 metadata={"type": self.file_type}
94 )
95 else:
96 self.fs.upload_from_stream(
97 self.filename,
98 self,
99 metadata={"type": self.file_type}
100 )
101 super(GridByteStream, self).close()
102
103 def __enter__(self):
104 return self
105
106 def __exit__(self, exc_type, exc_val, exc_tb):
107 self.close()
108
109
110 class GridStringStream(StringIO):
111 def __init__(self, filename, fs, mode):
112 StringIO.__init__(self)
113 self._id = None
114 self.filename = filename
115 self.fs = fs
116 self.mode = mode
117 self.file_type = "file" # Set "file" as default file_type
118
119 self.__initialize__()
120
121 def __initialize__(self):
122 grid_file = None
123
124 cursor = self.fs.find({"filename": self.filename})
125
126 for requested_file in cursor:
127 exception_file = next(cursor, None)
128
129 if exception_file:
130 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
131
132 if requested_file.metadata["type"] in ("file", "dir"):
133 grid_file = requested_file
134 self.file_type = requested_file.metadata["type"]
135 else:
136 raise FsException("File type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
137
138 if grid_file:
139 stream = BytesIO()
140 self._id = grid_file._id
141 self.fs.download_to_stream(self._id, stream)
142 stream.seek(0)
143 self.write(stream.read().decode("utf-8"))
144 stream.close()
145
146 if "r" in self.mode:
147 self.seek(0, 0)
148
149 def close(self):
150 if "r" in self.mode:
151 super(GridStringStream, self).close()
152 return
153
154 if self._id:
155 self.fs.delete(self._id)
156
157 cursor = self.fs.find({
158 "filename": self.filename.split("/")[0],
159 "metadata": {"type": "dir"}})
160
161 parent_dir = next(cursor, None)
162
163 if not parent_dir:
164 parent_dir_name = self.filename.split("/")[0]
165 self.filename = self.filename.replace(parent_dir_name, parent_dir_name[:-1], 1)
166
167 self.seek(0, 0)
168 stream = BytesIO()
169 stream.write(self.read().encode("utf-8"))
170 stream.seek(0, 0)
171 if self._id:
172 self.fs.upload_from_stream_with_id(
173 self._id,
174 self.filename,
175 stream,
176 metadata={"type": self.file_type}
177 )
178 else:
179 self.fs.upload_from_stream(
180 self.filename,
181 stream,
182 metadata={"type": self.file_type}
183 )
184 stream.close()
185 super(GridStringStream, self).close()
186
187 def __enter__(self):
188 return self
189
190 def __exit__(self, exc_type, exc_val, exc_tb):
191 self.close()
192
193
194 class FsMongo(FsBase):
195
196 def __init__(self, logger_name='fs', lock=False):
197 super().__init__(logger_name, lock)
198 self.path = None
199 self.client = None
200 self.fs = None
201
202 def __update_local_fs(self, from_path=None):
203 dir_cursor = self.fs.find({"metadata.type": "dir"}, no_cursor_timeout=True)
204
205 for directory in dir_cursor:
206 if from_path and not directory.filename.startswith(from_path):
207 continue
208 os.makedirs(self.path + directory.filename, exist_ok=True)
209
210 file_cursor = self.fs.find({"metadata.type": {"$in": ["file", "sym"]}}, no_cursor_timeout=True)
211
212 for writing_file in file_cursor:
213 if from_path and not writing_file.filename.startswith(from_path):
214 continue
215 file_path = self.path + writing_file.filename
216
217 if writing_file.metadata["type"] == "sym":
218 with BytesIO() as b:
219 self.fs.download_to_stream(writing_file._id, b)
220 b.seek(0)
221 link = b.read().decode("utf-8")
222
223 try:
224 os.remove(file_path)
225 except OSError as e:
226 if e.errno != errno.ENOENT:
227 # This is probably permission denied or worse
228 raise
229 os.symlink(link, file_path)
230 else:
231 with open(file_path, 'wb+') as file_stream:
232 self.fs.download_to_stream(writing_file._id, file_stream)
233 if "permissions" in writing_file.metadata:
234 os.chmod(file_path, writing_file.metadata["permissions"])
235
236 def get_params(self):
237 return {"fs": "mongo", "path": self.path}
238
239 def fs_connect(self, config):
240 try:
241 if "logger_name" in config:
242 self.logger = logging.getLogger(config["logger_name"])
243 if "path" in config:
244 self.path = config["path"]
245 else:
246 raise FsException("Missing parameter \"path\"")
247 if not self.path.endswith("/"):
248 self.path += "/"
249 if not os.path.exists(self.path):
250 raise FsException("Invalid configuration param at '[storage]': path '{}' does not exist".format(
251 config["path"]))
252 elif not os.access(self.path, os.W_OK):
253 raise FsException("Invalid configuration param at '[storage]': path '{}' is not writable".format(
254 config["path"]))
255 if all(key in config.keys() for key in ["uri", "collection"]):
256 self.client = MongoClient(config["uri"])
257 self.fs = GridFSBucket(self.client[config["collection"]])
258 elif all(key in config.keys() for key in ["host", "port", "collection"]):
259 self.client = MongoClient(config["host"], config["port"])
260 self.fs = GridFSBucket(self.client[config["collection"]])
261 else:
262 if "collection" not in config.keys():
263 raise FsException("Missing parameter \"collection\"")
264 else:
265 raise FsException("Missing parameters: \"uri\" or \"host\" + \"port\"")
266 except FsException:
267 raise
268 except Exception as e: # TODO refine
269 raise FsException(str(e))
270
271 def fs_disconnect(self):
272 pass # TODO
273
274 def mkdir(self, folder):
275 """
276 Creates a folder or parent object location
277 :param folder:
278 :return: None or raises an exception
279 """
280 try:
281 self.fs.upload_from_stream(
282 folder, BytesIO(), metadata={"type": "dir"})
283 except errors.FileExists: # make it idempotent
284 pass
285 except Exception as e:
286 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
287
288 def dir_rename(self, src, dst):
289 """
290 Rename one directory name. If dst exist, it replaces (deletes) existing directory
291 :param src: source directory
292 :param dst: destination directory
293 :return: None or raises and exception
294 """
295 try:
296 dst_cursor = self.fs.find(
297 {"filename": {"$regex": "^{}(/|$)".format(dst)}},
298 no_cursor_timeout=True)
299
300 for dst_file in dst_cursor:
301 self.fs.delete(dst_file._id)
302
303 src_cursor = self.fs.find(
304 {"filename": {"$regex": "^{}(/|$)".format(src)}},
305 no_cursor_timeout=True)
306
307 for src_file in src_cursor:
308 self.fs.rename(src_file._id, src_file.filename.replace(src, dst, 1))
309 except Exception as e:
310 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
311
312 def file_exists(self, storage, mode=None):
313 """
314 Indicates if "storage" file exist
315 :param storage: can be a str or a str list
316 :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
317 :return: True, False
318 """
319 f = storage if isinstance(storage, str) else "/".join(storage)
320
321 cursor = self.fs.find({"filename": f})
322
323 for requested_file in cursor:
324 exception_file = next(cursor, None)
325
326 if exception_file:
327 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
328
329 # if no special mode is required just check it does exists
330 if not mode:
331 return True
332
333 if requested_file.metadata["type"] == mode:
334 return True
335
336 if requested_file.metadata["type"] == "sym" and mode == "file":
337 return True
338
339 return False
340
341 def file_size(self, storage):
342 """
343 return file size
344 :param storage: can be a str or a str list
345 :return: file size
346 """
347 f = storage if isinstance(storage, str) else "/".join(storage)
348
349 cursor = self.fs.find({"filename": f})
350
351 for requested_file in cursor:
352 exception_file = next(cursor, None)
353
354 if exception_file:
355 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
356
357 return requested_file.length
358
359 def file_extract(self, tar_object, path):
360 """
361 extract a tar file
362 :param tar_object: object of type tar
363 :param path: can be a str or a str list, or a tar object where to extract the tar_object
364 :return: None
365 """
366 f = path if isinstance(path, str) else "/".join(path)
367
368 for member in tar_object.getmembers():
369 if member.isfile():
370 stream = tar_object.extractfile(member)
371 elif member.issym():
372 stream = BytesIO(member.linkname.encode("utf-8"))
373 else:
374 stream = BytesIO()
375
376 if member.isfile():
377 file_type = "file"
378 elif member.issym():
379 file_type = "sym"
380 else:
381 file_type = "dir"
382
383 metadata = {
384 "type": file_type,
385 "permissions": member.mode
386 }
387
388 self.fs.upload_from_stream(
389 f + "/" + member.name,
390 stream,
391 metadata=metadata
392 )
393
394 stream.close()
395
396 def file_open(self, storage, mode):
397 """
398 Open a file
399 :param storage: can be a str or list of str
400 :param mode: file mode
401 :return: file object
402 """
403 try:
404 f = storage if isinstance(storage, str) else "/".join(storage)
405
406 if "b" in mode:
407 return GridByteStream(f, self.fs, mode)
408 else:
409 return GridStringStream(f, self.fs, mode)
410 except errors.NoFile:
411 raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
412 except IOError:
413 raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
414
415 def dir_ls(self, storage):
416 """
417 return folder content
418 :param storage: can be a str or list of str
419 :return: folder content
420 """
421 try:
422 f = storage if isinstance(storage, str) else "/".join(storage)
423
424 files = []
425 dir_cursor = self.fs.find({"filename": f})
426 for requested_dir in dir_cursor:
427 exception_dir = next(dir_cursor, None)
428
429 if exception_dir:
430 raise FsException("Multiple directories found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
431
432 if requested_dir.metadata["type"] != "dir":
433 raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
434
435 files_cursor = self.fs.find({"filename": {"$regex": "^{}/([^/])*".format(f)}})
436 for children_file in files_cursor:
437 files += [children_file.filename.replace(f + '/', '', 1)]
438
439 return files
440 except IOError:
441 raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
442
443 def file_delete(self, storage, ignore_non_exist=False):
444 """
445 Delete storage content recursively
446 :param storage: can be a str or list of str
447 :param ignore_non_exist: not raise exception if storage does not exist
448 :return: None
449 """
450 try:
451 f = storage if isinstance(storage, str) else "/".join(storage)
452
453 file_cursor = self.fs.find({"filename": f})
454 found = False
455 for requested_file in file_cursor:
456 found = True
457 exception_file = next(file_cursor, None)
458
459 if exception_file:
460 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
461
462 if requested_file.metadata["type"] == "dir":
463 dir_cursor = self.fs.find({"filename": {"$regex": "^{}".format(f)}})
464
465 for tmp in dir_cursor:
466 self.fs.delete(tmp._id)
467 else:
468 self.fs.delete(requested_file._id)
469 if not found and not ignore_non_exist:
470 raise FsException("File {} does not exist".format(storage), http_code=HTTPStatus.NOT_FOUND)
471 except IOError as e:
472 raise FsException("File {} cannot be deleted: {}".format(f, e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
473
474 def sync(self, from_path=None):
475 """
476 Sync from FSMongo to local storage
477 :param from_path: if supplied, only copy content from this path, not all
478 :return: None
479 """
480 if from_path:
481 if os.path.isabs(from_path):
482 from_path = os.path.relpath(from_path, self.path)
483 self.__update_local_fs(from_path=from_path)
484
485 def _update_mongo_fs(self, from_path):
486
487 os_path = self.path + from_path
488
489 # Obtain list of files and dirs in filesystem
490 members = []
491 for root, dirs, files in os.walk(os_path):
492 for folder in dirs:
493 member = {
494 "filename": os.path.join(root, folder),
495 "type": "dir"
496 }
497 members.append(member)
498 for file in files:
499 filename = os.path.join(root, file)
500 if os.path.islink(filename):
501 file_type = "sym"
502 else:
503 file_type = "file"
504 member = {
505 "filename": os.path.join(root, file),
506 "type": file_type
507 }
508 members.append(member)
509
510 # Obtain files in mongo dict
511 remote_files = self._get_mongo_files(from_path)
512
513 # Upload members if they do not exists or have been modified
514 # We will do this for performance (avoid updating unmodified files) and to avoid
515 # updating a file with an older one in case there are two sources for synchronization
516 # in high availability scenarios
517 for member in members:
518 # obtain permission
519 mask = int(oct(os.stat(member["filename"]).st_mode)[-3:], 8)
520
521 # convert to relative path
522 rel_filename = os.path.relpath(member["filename"], self.path)
523 last_modified_date = datetime.datetime.fromtimestamp(os.path.getmtime(member["filename"]))
524
525 remote_file = remote_files.get(rel_filename)
526 upload_date = remote_file[0].uploadDate if remote_file else datetime.datetime.min
527 # remove processed files from dict
528 remote_files.pop(rel_filename, None)
529
530 if last_modified_date >= upload_date:
531
532 stream = None
533 fh = None
534 try:
535 file_type = member["type"]
536 if file_type == "dir":
537 stream = BytesIO()
538 elif file_type == "sym":
539 stream = BytesIO(os.readlink(member["filename"]).encode("utf-8"))
540 else:
541 fh = open(member["filename"], "rb")
542 stream = BytesIO(fh.read())
543
544 metadata = {
545 "type": file_type,
546 "permissions": mask
547 }
548
549 self.fs.upload_from_stream(
550 rel_filename,
551 stream,
552 metadata=metadata
553 )
554
555 # delete old files
556 if remote_file:
557 for file in remote_file:
558 self.fs.delete(file._id)
559 finally:
560 if fh:
561 fh.close()
562 if stream:
563 stream.close()
564
565 # delete files that are not any more in local fs
566 for remote_file in remote_files.values():
567 for file in remote_file:
568 self.fs.delete(file._id)
569
570 def _get_mongo_files(self, from_path=None):
571
572 file_dict = {}
573 file_cursor = self.fs.find(no_cursor_timeout=True, sort=[('uploadDate', -1)])
574 for file in file_cursor:
575 if from_path and not file.filename.startswith(from_path):
576 continue
577 if file.filename in file_dict:
578 file_dict[file.filename].append(file)
579 else:
580 file_dict[file.filename] = [file]
581 return file_dict
582
583 def reverse_sync(self, from_path: str):
584 """
585 Sync from local storage to FSMongo
586 :param from_path: base directory to upload content to mongo fs
587 :return: None
588 """
589 if os.path.isabs(from_path):
590 from_path = os.path.relpath(from_path, self.path)
591 self._update_mongo_fs(from_path=from_path)