blob: f43dca6ec8317431a59d9f8ce1f2b9e553df6d56 [file] [log] [blame]
Eduardo Sousa0593aba2019-06-04 12:55:43 +01001# Copyright 2019 Canonical
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14#
15# For those usages not covered by the Apache License, Version 2.0 please
16# contact: eduardo.sousa@canonical.com
17##
18
beierlmff2e8262020-07-08 16:32:50 -040019import errno
Eduardo Sousa0593aba2019-06-04 12:55:43 +010020from http import HTTPStatus
beierlmff2e8262020-07-08 16:32:50 -040021from io import BytesIO, StringIO
22import logging
Eduardo Sousa0593aba2019-06-04 12:55:43 +010023import os
lloretgallegf296d2a2020-09-02 09:36:24 +000024import datetime
beierlmff2e8262020-07-08 16:32:50 -040025
26from gridfs import GridFSBucket, errors
Eduardo Sousa0593aba2019-06-04 12:55:43 +010027from osm_common.fsbase import FsBase, FsException
beierlmff2e8262020-07-08 16:32:50 -040028from pymongo import MongoClient
29
Eduardo Sousa0593aba2019-06-04 12:55:43 +010030
31__author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>"
32
33
34class GridByteStream(BytesIO):
35 def __init__(self, filename, fs, mode):
36 BytesIO.__init__(self)
37 self._id = None
38 self.filename = filename
39 self.fs = fs
40 self.mode = mode
David Garcia7982b782020-05-20 12:09:37 +020041 self.file_type = "file" # Set "file" as default file_type
Eduardo Sousa0593aba2019-06-04 12:55:43 +010042
43 self.__initialize__()
44
45 def __initialize__(self):
46 grid_file = None
47
48 cursor = self.fs.find({"filename": self.filename})
49
50 for requested_file in cursor:
51 exception_file = next(cursor, None)
52
53 if exception_file:
54 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
55
sousaedub95cca62020-03-12 11:12:25 +000056 if requested_file.metadata["type"] in ("file", "sym"):
Eduardo Sousa0593aba2019-06-04 12:55:43 +010057 grid_file = requested_file
sousaedub95cca62020-03-12 11:12:25 +000058 self.file_type = requested_file.metadata["type"]
Eduardo Sousa0593aba2019-06-04 12:55:43 +010059 else:
60 raise FsException("Type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
61
62 if grid_file:
63 self._id = grid_file._id
64 self.fs.download_to_stream(self._id, self)
65
66 if "r" in self.mode:
67 self.seek(0, 0)
68
69 def close(self):
70 if "r" in self.mode:
71 super(GridByteStream, self).close()
72 return
73
74 if self._id:
75 self.fs.delete(self._id)
76
77 cursor = self.fs.find({
78 "filename": self.filename.split("/")[0],
79 "metadata": {"type": "dir"}})
80
81 parent_dir = next(cursor, None)
82
83 if not parent_dir:
84 parent_dir_name = self.filename.split("/")[0]
85 self.filename = self.filename.replace(parent_dir_name, parent_dir_name[:-1], 1)
86
87 self.seek(0, 0)
88 if self._id:
89 self.fs.upload_from_stream_with_id(
90 self._id,
91 self.filename,
92 self,
sousaedub95cca62020-03-12 11:12:25 +000093 metadata={"type": self.file_type}
Eduardo Sousa0593aba2019-06-04 12:55:43 +010094 )
95 else:
96 self.fs.upload_from_stream(
97 self.filename,
98 self,
sousaedub95cca62020-03-12 11:12:25 +000099 metadata={"type": self.file_type}
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100100 )
101 super(GridByteStream, self).close()
102
103 def __enter__(self):
104 return self
105
106 def __exit__(self, exc_type, exc_val, exc_tb):
107 self.close()
108
109
110class GridStringStream(StringIO):
111 def __init__(self, filename, fs, mode):
112 StringIO.__init__(self)
113 self._id = None
114 self.filename = filename
115 self.fs = fs
116 self.mode = mode
David Garcia7982b782020-05-20 12:09:37 +0200117 self.file_type = "file" # Set "file" as default file_type
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100118
119 self.__initialize__()
120
121 def __initialize__(self):
122 grid_file = None
123
124 cursor = self.fs.find({"filename": self.filename})
125
126 for requested_file in cursor:
127 exception_file = next(cursor, None)
128
129 if exception_file:
130 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
131
sousaedub95cca62020-03-12 11:12:25 +0000132 if requested_file.metadata["type"] in ("file", "dir"):
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100133 grid_file = requested_file
sousaedub95cca62020-03-12 11:12:25 +0000134 self.file_type = requested_file.metadata["type"]
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100135 else:
136 raise FsException("File type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
137
138 if grid_file:
139 stream = BytesIO()
140 self._id = grid_file._id
141 self.fs.download_to_stream(self._id, stream)
142 stream.seek(0)
143 self.write(stream.read().decode("utf-8"))
144 stream.close()
145
146 if "r" in self.mode:
147 self.seek(0, 0)
148
149 def close(self):
150 if "r" in self.mode:
151 super(GridStringStream, self).close()
152 return
153
154 if self._id:
155 self.fs.delete(self._id)
156
157 cursor = self.fs.find({
158 "filename": self.filename.split("/")[0],
159 "metadata": {"type": "dir"}})
160
161 parent_dir = next(cursor, None)
162
163 if not parent_dir:
164 parent_dir_name = self.filename.split("/")[0]
165 self.filename = self.filename.replace(parent_dir_name, parent_dir_name[:-1], 1)
166
167 self.seek(0, 0)
168 stream = BytesIO()
169 stream.write(self.read().encode("utf-8"))
170 stream.seek(0, 0)
171 if self._id:
172 self.fs.upload_from_stream_with_id(
173 self._id,
174 self.filename,
175 stream,
sousaedub95cca62020-03-12 11:12:25 +0000176 metadata={"type": self.file_type}
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100177 )
178 else:
179 self.fs.upload_from_stream(
180 self.filename,
181 stream,
sousaedub95cca62020-03-12 11:12:25 +0000182 metadata={"type": self.file_type}
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100183 )
184 stream.close()
185 super(GridStringStream, self).close()
186
187 def __enter__(self):
188 return self
189
190 def __exit__(self, exc_type, exc_val, exc_tb):
191 self.close()
192
193
194class FsMongo(FsBase):
195
196 def __init__(self, logger_name='fs', lock=False):
197 super().__init__(logger_name, lock)
198 self.path = None
199 self.client = None
200 self.fs = None
201
tiernob07e4ef2020-05-06 14:22:48 +0000202 def __update_local_fs(self, from_path=None):
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100203 dir_cursor = self.fs.find({"metadata.type": "dir"}, no_cursor_timeout=True)
204
205 for directory in dir_cursor:
tiernob07e4ef2020-05-06 14:22:48 +0000206 if from_path and not directory.filename.startswith(from_path):
207 continue
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100208 os.makedirs(self.path + directory.filename, exist_ok=True)
209
David Garcia7982b782020-05-20 12:09:37 +0200210 file_cursor = self.fs.find({"metadata.type": {"$in": ["file", "sym"]}}, no_cursor_timeout=True)
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100211
212 for writing_file in file_cursor:
tiernob07e4ef2020-05-06 14:22:48 +0000213 if from_path and not writing_file.filename.startswith(from_path):
214 continue
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100215 file_path = self.path + writing_file.filename
David Garcia8ab6cc62020-06-26 17:04:37 +0200216
217 if writing_file.metadata["type"] == "sym":
218 with BytesIO() as b:
219 self.fs.download_to_stream(writing_file._id, b)
220 b.seek(0)
221 link = b.read().decode("utf-8")
beierlmff2e8262020-07-08 16:32:50 -0400222
223 try:
224 os.remove(file_path)
225 except OSError as e:
226 if e.errno != errno.ENOENT:
227 # This is probably permission denied or worse
228 raise
David Garcia8ab6cc62020-06-26 17:04:37 +0200229 os.symlink(link, file_path)
230 else:
231 with open(file_path, 'wb+') as file_stream:
232 self.fs.download_to_stream(writing_file._id, file_stream)
233 if "permissions" in writing_file.metadata:
sousaedub95cca62020-03-12 11:12:25 +0000234 os.chmod(file_path, writing_file.metadata["permissions"])
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100235
236 def get_params(self):
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100237 return {"fs": "mongo", "path": self.path}
238
239 def fs_connect(self, config):
240 try:
241 if "logger_name" in config:
242 self.logger = logging.getLogger(config["logger_name"])
243 if "path" in config:
244 self.path = config["path"]
245 else:
246 raise FsException("Missing parameter \"path\"")
247 if not self.path.endswith("/"):
248 self.path += "/"
249 if not os.path.exists(self.path):
250 raise FsException("Invalid configuration param at '[storage]': path '{}' does not exist".format(
251 config["path"]))
252 elif not os.access(self.path, os.W_OK):
253 raise FsException("Invalid configuration param at '[storage]': path '{}' is not writable".format(
254 config["path"]))
255 if all(key in config.keys() for key in ["uri", "collection"]):
256 self.client = MongoClient(config["uri"])
257 self.fs = GridFSBucket(self.client[config["collection"]])
258 elif all(key in config.keys() for key in ["host", "port", "collection"]):
259 self.client = MongoClient(config["host"], config["port"])
260 self.fs = GridFSBucket(self.client[config["collection"]])
261 else:
262 if "collection" not in config.keys():
263 raise FsException("Missing parameter \"collection\"")
264 else:
265 raise FsException("Missing parameters: \"uri\" or \"host\" + \"port\"")
266 except FsException:
267 raise
268 except Exception as e: # TODO refine
269 raise FsException(str(e))
270
271 def fs_disconnect(self):
272 pass # TODO
273
274 def mkdir(self, folder):
275 """
276 Creates a folder or parent object location
277 :param folder:
278 :return: None or raises an exception
279 """
280 try:
281 self.fs.upload_from_stream(
282 folder, BytesIO(), metadata={"type": "dir"})
283 except errors.FileExists: # make it idempotent
284 pass
285 except Exception as e:
286 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
287
288 def dir_rename(self, src, dst):
289 """
290 Rename one directory name. If dst exist, it replaces (deletes) existing directory
291 :param src: source directory
292 :param dst: destination directory
293 :return: None or raises and exception
294 """
295 try:
296 dst_cursor = self.fs.find(
297 {"filename": {"$regex": "^{}(/|$)".format(dst)}},
298 no_cursor_timeout=True)
299
300 for dst_file in dst_cursor:
301 self.fs.delete(dst_file._id)
302
303 src_cursor = self.fs.find(
304 {"filename": {"$regex": "^{}(/|$)".format(src)}},
305 no_cursor_timeout=True)
306
307 for src_file in src_cursor:
308 self.fs.rename(src_file._id, src_file.filename.replace(src, dst, 1))
309 except Exception as e:
310 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
311
312 def file_exists(self, storage, mode=None):
313 """
314 Indicates if "storage" file exist
315 :param storage: can be a str or a str list
316 :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
317 :return: True, False
318 """
319 f = storage if isinstance(storage, str) else "/".join(storage)
320
321 cursor = self.fs.find({"filename": f})
322
323 for requested_file in cursor:
324 exception_file = next(cursor, None)
325
326 if exception_file:
327 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
328
lloretgallegf296d2a2020-09-02 09:36:24 +0000329 # if no special mode is required just check it does exists
330 if not mode:
331 return True
332
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100333 if requested_file.metadata["type"] == mode:
334 return True
beierlmff2e8262020-07-08 16:32:50 -0400335
sousaedub95cca62020-03-12 11:12:25 +0000336 if requested_file.metadata["type"] == "sym" and mode == "file":
337 return True
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100338
339 return False
340
341 def file_size(self, storage):
342 """
343 return file size
344 :param storage: can be a str or a str list
345 :return: file size
346 """
347 f = storage if isinstance(storage, str) else "/".join(storage)
348
349 cursor = self.fs.find({"filename": f})
350
351 for requested_file in cursor:
352 exception_file = next(cursor, None)
353
354 if exception_file:
355 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
356
357 return requested_file.length
358
359 def file_extract(self, tar_object, path):
360 """
361 extract a tar file
362 :param tar_object: object of type tar
363 :param path: can be a str or a str list, or a tar object where to extract the tar_object
364 :return: None
365 """
366 f = path if isinstance(path, str) else "/".join(path)
367
368 for member in tar_object.getmembers():
369 if member.isfile():
370 stream = tar_object.extractfile(member)
David Garcia8ab6cc62020-06-26 17:04:37 +0200371 elif member.issym():
372 stream = BytesIO(member.linkname.encode("utf-8"))
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100373 else:
374 stream = BytesIO()
375
sousaedub95cca62020-03-12 11:12:25 +0000376 if member.isfile():
377 file_type = "file"
378 elif member.issym():
379 file_type = "sym"
380 else:
381 file_type = "dir"
382
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100383 metadata = {
sousaedub95cca62020-03-12 11:12:25 +0000384 "type": file_type,
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100385 "permissions": member.mode
386 }
387
388 self.fs.upload_from_stream(
389 f + "/" + member.name,
390 stream,
391 metadata=metadata
392 )
393
394 stream.close()
395
396 def file_open(self, storage, mode):
397 """
398 Open a file
399 :param storage: can be a str or list of str
400 :param mode: file mode
401 :return: file object
402 """
403 try:
404 f = storage if isinstance(storage, str) else "/".join(storage)
405
406 if "b" in mode:
407 return GridByteStream(f, self.fs, mode)
408 else:
409 return GridStringStream(f, self.fs, mode)
410 except errors.NoFile:
411 raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
412 except IOError:
413 raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
414
415 def dir_ls(self, storage):
416 """
417 return folder content
418 :param storage: can be a str or list of str
419 :return: folder content
420 """
421 try:
422 f = storage if isinstance(storage, str) else "/".join(storage)
423
424 files = []
425 dir_cursor = self.fs.find({"filename": f})
426 for requested_dir in dir_cursor:
427 exception_dir = next(dir_cursor, None)
428
429 if exception_dir:
430 raise FsException("Multiple directories found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
431
432 if requested_dir.metadata["type"] != "dir":
433 raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
434
435 files_cursor = self.fs.find({"filename": {"$regex": "^{}/([^/])*".format(f)}})
436 for children_file in files_cursor:
437 files += [children_file.filename.replace(f + '/', '', 1)]
438
439 return files
440 except IOError:
441 raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
442
443 def file_delete(self, storage, ignore_non_exist=False):
444 """
445 Delete storage content recursively
446 :param storage: can be a str or list of str
447 :param ignore_non_exist: not raise exception if storage does not exist
448 :return: None
449 """
450 try:
451 f = storage if isinstance(storage, str) else "/".join(storage)
452
453 file_cursor = self.fs.find({"filename": f})
454 found = False
455 for requested_file in file_cursor:
456 found = True
457 exception_file = next(file_cursor, None)
458
459 if exception_file:
460 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
461
462 if requested_file.metadata["type"] == "dir":
463 dir_cursor = self.fs.find({"filename": {"$regex": "^{}".format(f)}})
464
465 for tmp in dir_cursor:
466 self.fs.delete(tmp._id)
467 else:
468 self.fs.delete(requested_file._id)
469 if not found and not ignore_non_exist:
beierlmff2e8262020-07-08 16:32:50 -0400470 raise FsException("File {} does not exist".format(storage), http_code=HTTPStatus.NOT_FOUND)
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100471 except IOError as e:
472 raise FsException("File {} cannot be deleted: {}".format(f, e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
David Garcia788b9d62020-01-20 13:21:06 +0100473
tiernob07e4ef2020-05-06 14:22:48 +0000474 def sync(self, from_path=None):
David Garcia788b9d62020-01-20 13:21:06 +0100475 """
476 Sync from FSMongo to local storage
tiernob07e4ef2020-05-06 14:22:48 +0000477 :param from_path: if supplied, only copy content from this path, not all
478 :return: None
David Garcia788b9d62020-01-20 13:21:06 +0100479 """
lloretgallegf296d2a2020-09-02 09:36:24 +0000480 if from_path:
481 if os.path.isabs(from_path):
482 from_path = os.path.relpath(from_path, self.path)
tiernob07e4ef2020-05-06 14:22:48 +0000483 self.__update_local_fs(from_path=from_path)
lloretgallegf296d2a2020-09-02 09:36:24 +0000484
485 def _update_mongo_fs(self, from_path):
486
487 os_path = self.path + from_path
488
489 # Obtain list of files and dirs in filesystem
490 members = []
491 for root, dirs, files in os.walk(os_path):
492 for folder in dirs:
493 member = {
494 "filename": os.path.join(root, folder),
495 "type": "dir"
496 }
497 members.append(member)
498 for file in files:
499 filename = os.path.join(root, file)
500 if os.path.islink(filename):
501 file_type = "sym"
502 else:
503 file_type = "file"
504 member = {
505 "filename": os.path.join(root, file),
506 "type": file_type
507 }
508 members.append(member)
509
510 # Obtain files in mongo dict
511 remote_files = self._get_mongo_files(from_path)
512
513 # Upload members if they do not exists or have been modified
514 # We will do this for performance (avoid updating unmodified files) and to avoid
515 # updating a file with an older one in case there are two sources for synchronization
516 # in high availability scenarios
517 for member in members:
518 # obtain permission
519 mask = int(oct(os.stat(member["filename"]).st_mode)[-3:], 8)
520
521 # convert to relative path
522 rel_filename = os.path.relpath(member["filename"], self.path)
523 last_modified_date = datetime.datetime.fromtimestamp(os.path.getmtime(member["filename"]))
524
525 remote_file = remote_files.get(rel_filename)
526 upload_date = remote_file[0].uploadDate if remote_file else datetime.datetime.min
527 # remove processed files from dict
528 remote_files.pop(rel_filename, None)
529
530 if last_modified_date >= upload_date:
531
532 stream = None
533 fh = None
534 try:
535 file_type = member["type"]
536 if file_type == "dir":
537 stream = BytesIO()
538 elif file_type == "sym":
539 stream = BytesIO(os.readlink(member["filename"]).encode("utf-8"))
540 else:
541 fh = open(member["filename"], "rb")
542 stream = BytesIO(fh.read())
543
544 metadata = {
545 "type": file_type,
546 "permissions": mask
547 }
548
549 self.fs.upload_from_stream(
550 rel_filename,
551 stream,
552 metadata=metadata
553 )
554
555 # delete old files
556 if remote_file:
557 for file in remote_file:
558 self.fs.delete(file._id)
559 finally:
560 if fh:
561 fh.close()
562 if stream:
563 stream.close()
564
565 # delete files that are not any more in local fs
566 for remote_file in remote_files.values():
567 for file in remote_file:
568 self.fs.delete(file._id)
569
570 def _get_mongo_files(self, from_path=None):
571
572 file_dict = {}
573 file_cursor = self.fs.find(no_cursor_timeout=True, sort=[('uploadDate', -1)])
574 for file in file_cursor:
575 if from_path and not file.filename.startswith(from_path):
576 continue
577 if file.filename in file_dict:
578 file_dict[file.filename].append(file)
579 else:
580 file_dict[file.filename] = [file]
581 return file_dict
582
583 def reverse_sync(self, from_path: str):
584 """
585 Sync from local storage to FSMongo
586 :param from_path: base directory to upload content to mongo fs
587 :return: None
588 """
589 if os.path.isabs(from_path):
590 from_path = os.path.relpath(from_path, self.path)
591 self._update_mongo_fs(from_path=from_path)