1 # Copyright 2019 Canonical
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
15 # For those usages not covered by the Apache License, Version 2.0 please
16 # contact: eduardo.sousa@canonical.com
20 from http
import HTTPStatus
21 from io
import BytesIO
, StringIO
27 from gridfs
import errors
, GridFSBucket
28 from osm_common
.fsbase
import FsBase
, FsException
29 from pymongo
import MongoClient
32 __author__
= "Eduardo Sousa <eduardo.sousa@canonical.com>"
35 class GridByteStream(BytesIO
):
36 def __init__(self
, filename
, fs
, mode
):
37 BytesIO
.__init
__(self
)
39 self
.filename
= filename
42 self
.file_type
= "file" # Set "file" as default file_type
46 def __initialize__(self
):
49 cursor
= self
.fs
.find({"filename": self
.filename
})
51 for requested_file
in cursor
:
52 exception_file
= next(cursor
, None)
56 "Multiple files found", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
59 if requested_file
.metadata
["type"] in ("file", "sym"):
60 grid_file
= requested_file
61 self
.file_type
= requested_file
.metadata
["type"]
64 "Type isn't file", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
68 self
._id
= grid_file
._id
69 self
.fs
.download_to_stream(self
._id
, self
)
76 super(GridByteStream
, self
).close()
80 self
.fs
.delete(self
._id
)
82 cursor
= self
.fs
.find(
83 {"filename": self
.filename
.split("/")[0], "metadata": {"type": "dir"}}
86 parent_dir
= next(cursor
, None)
89 parent_dir_name
= self
.filename
.split("/")[0]
90 self
.filename
= self
.filename
.replace(
91 parent_dir_name
, parent_dir_name
[:-1], 1
96 self
.fs
.upload_from_stream_with_id(
97 self
._id
, self
.filename
, self
, metadata
={"type": self
.file_type
}
100 self
.fs
.upload_from_stream(
101 self
.filename
, self
, metadata
={"type": self
.file_type
}
103 super(GridByteStream
, self
).close()
108 def __exit__(self
, exc_type
, exc_val
, exc_tb
):
112 class GridStringStream(StringIO
):
113 def __init__(self
, filename
, fs
, mode
):
114 StringIO
.__init
__(self
)
116 self
.filename
= filename
119 self
.file_type
= "file" # Set "file" as default file_type
121 self
.__initialize
__()
123 def __initialize__(self
):
126 cursor
= self
.fs
.find({"filename": self
.filename
})
128 for requested_file
in cursor
:
129 exception_file
= next(cursor
, None)
133 "Multiple files found", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
136 if requested_file
.metadata
["type"] in ("file", "dir"):
137 grid_file
= requested_file
138 self
.file_type
= requested_file
.metadata
["type"]
141 "File type isn't file", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
146 self
._id
= grid_file
._id
147 self
.fs
.download_to_stream(self
._id
, stream
)
149 self
.write(stream
.read().decode("utf-8"))
157 super(GridStringStream
, self
).close()
161 self
.fs
.delete(self
._id
)
163 cursor
= self
.fs
.find(
164 {"filename": self
.filename
.split("/")[0], "metadata": {"type": "dir"}}
167 parent_dir
= next(cursor
, None)
170 parent_dir_name
= self
.filename
.split("/")[0]
171 self
.filename
= self
.filename
.replace(
172 parent_dir_name
, parent_dir_name
[:-1], 1
177 stream
.write(self
.read().encode("utf-8"))
180 self
.fs
.upload_from_stream_with_id(
181 self
._id
, self
.filename
, stream
, metadata
={"type": self
.file_type
}
184 self
.fs
.upload_from_stream(
185 self
.filename
, stream
, metadata
={"type": self
.file_type
}
188 super(GridStringStream
, self
).close()
193 def __exit__(self
, exc_type
, exc_val
, exc_tb
):
197 class FsMongo(FsBase
):
198 def __init__(self
, logger_name
="fs", lock
=False):
199 super().__init
__(logger_name
, lock
)
204 def __update_local_fs(self
, from_path
=None):
205 dir_cursor
= self
.fs
.find({"metadata.type": "dir"}, no_cursor_timeout
=True)
209 for directory
in dir_cursor
:
210 if from_path
and not directory
.filename
.startswith(from_path
):
212 self
.logger
.debug("Making dir {}".format(self
.path
+ directory
.filename
))
213 os
.makedirs(self
.path
+ directory
.filename
, exist_ok
=True)
214 valid_paths
.append(self
.path
+ directory
.filename
)
216 file_cursor
= self
.fs
.find(
217 {"metadata.type": {"$in": ["file", "sym"]}}, no_cursor_timeout
=True
220 for writing_file
in file_cursor
:
221 if from_path
and not writing_file
.filename
.startswith(from_path
):
223 file_path
= self
.path
+ writing_file
.filename
225 if writing_file
.metadata
["type"] == "sym":
227 self
.fs
.download_to_stream(writing_file
._id
, b
)
229 link
= b
.read().decode("utf-8")
232 self
.logger
.debug("Sync removing {}".format(file_path
))
235 if e
.errno
!= errno
.ENOENT
:
236 # This is probably permission denied or worse
239 link
, os
.path
.realpath(os
.path
.normpath(os
.path
.abspath(file_path
)))
242 folder
= os
.path
.dirname(file_path
)
243 if folder
not in valid_paths
:
244 self
.logger
.debug("Sync local directory {}".format(file_path
))
245 os
.makedirs(folder
, exist_ok
=True)
246 with
open(file_path
, "wb+") as file_stream
:
247 self
.logger
.debug("Sync download {}".format(file_path
))
248 self
.fs
.download_to_stream(writing_file
._id
, file_stream
)
249 if "permissions" in writing_file
.metadata
:
250 os
.chmod(file_path
, writing_file
.metadata
["permissions"])
252 def get_params(self
):
253 return {"fs": "mongo", "path": self
.path
}
255 def fs_connect(self
, config
):
257 if "logger_name" in config
:
258 self
.logger
= logging
.getLogger(config
["logger_name"])
260 self
.path
= config
["path"]
262 raise FsException('Missing parameter "path"')
263 if not self
.path
.endswith("/"):
265 if not os
.path
.exists(self
.path
):
267 "Invalid configuration param at '[storage]': path '{}' does not exist".format(
271 elif not os
.access(self
.path
, os
.W_OK
):
273 "Invalid configuration param at '[storage]': path '{}' is not writable".format(
277 if all(key
in config
.keys() for key
in ["uri", "collection"]):
278 self
.client
= MongoClient(config
["uri"])
279 self
.fs
= GridFSBucket(self
.client
[config
["collection"]])
281 if "collection" not in config
.keys():
282 raise FsException('Missing parameter "collection"')
284 raise FsException('Missing parameters: "uri"')
287 except Exception as e
: # TODO refine
288 raise FsException(str(e
))
290 def fs_disconnect(self
):
293 def mkdir(self
, folder
):
295 Creates a folder or parent object location
297 :return: None or raises an exception
299 folder
= folder
.rstrip("/")
301 self
.fs
.upload_from_stream(folder
, BytesIO(), metadata
={"type": "dir"})
302 except errors
.FileExists
: # make it idempotent
304 except Exception as e
:
305 raise FsException(str(e
), http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
307 def dir_rename(self
, src
, dst
):
309 Rename one directory name. If dst exist, it replaces (deletes) existing directory
310 :param src: source directory
311 :param dst: destination directory
312 :return: None or raises and exception
314 dst
= dst
.rstrip("/")
315 src
= src
.rstrip("/")
318 dst_cursor
= self
.fs
.find(
319 {"filename": {"$regex": "^{}(/|$)".format(dst
)}}, no_cursor_timeout
=True
322 for dst_file
in dst_cursor
:
323 self
.fs
.delete(dst_file
._id
)
325 src_cursor
= self
.fs
.find(
326 {"filename": {"$regex": "^{}(/|$)".format(src
)}}, no_cursor_timeout
=True
329 for src_file
in src_cursor
:
330 self
.fs
.rename(src_file
._id
, src_file
.filename
.replace(src
, dst
, 1))
331 except Exception as e
:
332 raise FsException(str(e
), http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
334 def file_exists(self
, storage
, mode
=None):
336 Indicates if "storage" file exist
337 :param storage: can be a str or a str list
338 :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
341 f
= storage
if isinstance(storage
, str) else "/".join(storage
)
344 cursor
= self
.fs
.find({"filename": f
})
346 for requested_file
in cursor
:
347 exception_file
= next(cursor
, None)
351 "Multiple files found", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
354 self
.logger
.debug("Entry {} metadata {}".format(f
, requested_file
.metadata
))
356 # if no special mode is required just check it does exists
360 if requested_file
.metadata
["type"] == mode
:
363 if requested_file
.metadata
["type"] == "sym" and mode
== "file":
368 def file_size(self
, storage
):
371 :param storage: can be a str or a str list
374 f
= storage
if isinstance(storage
, str) else "/".join(storage
)
377 cursor
= self
.fs
.find({"filename": f
})
379 for requested_file
in cursor
:
380 exception_file
= next(cursor
, None)
384 "Multiple files found", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
387 return requested_file
.length
389 def file_extract(self
, compressed_object
, path
):
392 :param compressed_object: object of type tar or zip
393 :param path: can be a str or a str list, or a tar object where to extract the tar_object
396 f
= path
if isinstance(path
, str) else "/".join(path
)
399 if type(compressed_object
) is tarfile
.TarFile
:
400 for member
in compressed_object
.getmembers():
402 stream
= compressed_object
.extractfile(member
)
404 stream
= BytesIO(member
.linkname
.encode("utf-8"))
415 metadata
= {"type": file_type
, "permissions": member
.mode
}
416 member
.name
= member
.name
.rstrip("/")
418 self
.logger
.debug("Uploading {}/{}".format(f
, member
.name
))
419 self
.fs
.upload_from_stream(
420 f
+ "/" + member
.name
, stream
, metadata
=metadata
424 elif type(compressed_object
) is zipfile
.ZipFile
:
425 for member
in compressed_object
.infolist():
429 stream
= compressed_object
.read(member
)
436 metadata
= {"type": file_type
}
437 member
.filename
= member
.filename
.rstrip("/")
439 self
.logger
.debug("Uploading {}/{}".format(f
, member
.filename
))
440 self
.fs
.upload_from_stream(
441 f
+ "/" + member
.filename
, stream
, metadata
=metadata
447 def file_open(self
, storage
, mode
):
450 :param storage: can be a str or list of str
451 :param mode: file mode
455 f
= storage
if isinstance(storage
, str) else "/".join(storage
)
459 return GridByteStream(f
, self
.fs
, mode
)
461 return GridStringStream(f
, self
.fs
, mode
)
462 except errors
.NoFile
:
464 "File {} does not exist".format(f
), http_code
=HTTPStatus
.NOT_FOUND
468 "File {} cannot be opened".format(f
), http_code
=HTTPStatus
.BAD_REQUEST
471 def dir_ls(self
, storage
):
473 return folder content
474 :param storage: can be a str or list of str
475 :return: folder content
478 f
= storage
if isinstance(storage
, str) else "/".join(storage
)
482 dir_cursor
= self
.fs
.find({"filename": f
})
483 for requested_dir
in dir_cursor
:
484 exception_dir
= next(dir_cursor
, None)
488 "Multiple directories found",
489 http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
,
492 if requested_dir
.metadata
["type"] != "dir":
494 "File {} does not exist".format(f
),
495 http_code
=HTTPStatus
.NOT_FOUND
,
501 files_cursor
= self
.fs
.find(
502 {"filename": {"$regex": "^{}/([^/])*".format(f
)}}
504 for children_file
in files_cursor
:
505 files
+= [children_file
.filename
.replace(f
+ "/", "", 1)]
510 "File {} cannot be opened".format(f
), http_code
=HTTPStatus
.BAD_REQUEST
513 def file_delete(self
, storage
, ignore_non_exist
=False):
515 Delete storage content recursively
516 :param storage: can be a str or list of str
517 :param ignore_non_exist: not raise exception if storage does not exist
521 f
= storage
if isinstance(storage
, str) else "/".join(storage
)
524 file_cursor
= self
.fs
.find({"filename": f
})
526 for requested_file
in file_cursor
:
528 exception_file
= next(file_cursor
, None)
532 "Cannot delete duplicate file: {} and {}".format(
533 requested_file
.filename
, exception_file
.filename
537 "Multiple files found",
538 http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
,
541 if requested_file
.metadata
["type"] == "dir":
542 dir_cursor
= self
.fs
.find(
543 {"filename": {"$regex": "^{}/".format(f
)}}
546 for tmp
in dir_cursor
:
547 self
.logger
.debug("Deleting {}".format(tmp
.filename
))
548 self
.fs
.delete(tmp
._id
)
550 self
.logger
.debug("Deleting {}".format(requested_file
.filename
))
551 self
.fs
.delete(requested_file
._id
)
552 if not found
and not ignore_non_exist
:
554 "File {} does not exist".format(storage
),
555 http_code
=HTTPStatus
.NOT_FOUND
,
559 "File {} cannot be deleted: {}".format(f
, e
),
560 http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
,
563 def sync(self
, from_path
=None):
565 Sync from FSMongo to local storage
566 :param from_path: if supplied, only copy content from this path, not all
570 if os
.path
.isabs(from_path
):
571 from_path
= os
.path
.relpath(from_path
, self
.path
)
572 self
.__update
_local
_fs
(from_path
=from_path
)
574 def _update_mongo_fs(self
, from_path
):
575 os_path
= self
.path
+ from_path
576 # Obtain list of files and dirs in filesystem
578 for root
, dirs
, files
in os
.walk(os_path
):
580 member
= {"filename": os
.path
.join(root
, folder
), "type": "dir"}
581 if os
.path
.islink(member
["filename"]):
582 member
["type"] = "sym"
583 members
.append(member
)
585 filename
= os
.path
.join(root
, file)
586 if os
.path
.islink(filename
):
590 member
= {"filename": os
.path
.join(root
, file), "type": file_type
}
591 members
.append(member
)
593 # Obtain files in mongo dict
594 remote_files
= self
._get
_mongo
_files
(from_path
)
596 # Upload members if they do not exists or have been modified
597 # We will do this for performance (avoid updating unmodified files) and to avoid
598 # updating a file with an older one in case there are two sources for synchronization
599 # in high availability scenarios
600 for member
in members
:
602 mask
= int(oct(os
.stat(member
["filename"]).st_mode
)[-3:], 8)
604 # convert to relative path
605 rel_filename
= os
.path
.relpath(member
["filename"], self
.path
)
606 # get timestamp in UTC because mongo stores upload date in UTC:
607 # https://www.mongodb.com/docs/v4.0/tutorial/model-time-data/#overview
608 last_modified_date
= datetime
.datetime
.utcfromtimestamp(
609 os
.path
.getmtime(member
["filename"])
612 remote_file
= remote_files
.get(rel_filename
)
614 remote_file
[0].uploadDate
if remote_file
else datetime
.datetime
.min
616 # remove processed files from dict
617 remote_files
.pop(rel_filename
, None)
619 if last_modified_date
>= upload_date
:
623 file_type
= member
["type"]
624 if file_type
== "dir":
626 elif file_type
== "sym":
628 os
.readlink(member
["filename"]).encode("utf-8")
631 fh
= open(member
["filename"], "rb")
632 stream
= BytesIO(fh
.read())
634 metadata
= {"type": file_type
, "permissions": mask
}
636 self
.logger
.debug("Sync upload {}".format(rel_filename
))
637 self
.fs
.upload_from_stream(rel_filename
, stream
, metadata
=metadata
)
641 for file in remote_file
:
642 self
.logger
.debug("Sync deleting {}".format(file.filename
))
643 self
.fs
.delete(file._id
)
650 # delete files that are not anymore in local fs
651 for remote_file
in remote_files
.values():
652 for file in remote_file
:
653 self
.fs
.delete(file._id
)
655 def _get_mongo_files(self
, from_path
=None):
657 file_cursor
= self
.fs
.find(no_cursor_timeout
=True, sort
=[("uploadDate", -1)])
658 for file in file_cursor
:
659 if from_path
and not file.filename
.startswith(from_path
):
661 if file.filename
in file_dict
:
662 file_dict
[file.filename
].append(file)
664 file_dict
[file.filename
] = [file]
667 def reverse_sync(self
, from_path
: str):
669 Sync from local storage to FSMongo
670 :param from_path: base directory to upload content to mongo fs
673 if os
.path
.isabs(from_path
):
674 from_path
= os
.path
.relpath(from_path
, self
.path
)
675 self
._update
_mongo
_fs
(from_path
=from_path
)