+ :param from_path: if supplied, only copy content from this path, not all
+ :return: None
+ """
+ if from_path:
+ if os.path.isabs(from_path):
+ from_path = os.path.relpath(from_path, self.path)
+ self.__update_local_fs(from_path=from_path)
+
+ def _update_mongo_fs(self, from_path):
+
+ os_path = self.path + from_path
+
+ # Obtain list of files and dirs in filesystem
+ members = []
+ for root, dirs, files in os.walk(os_path):
+ for folder in dirs:
+ member = {"filename": os.path.join(root, folder), "type": "dir"}
+ if os.path.islink(member["filename"]):
+ member["type"] = "sym"
+ members.append(member)
+ for file in files:
+ filename = os.path.join(root, file)
+ if os.path.islink(filename):
+ file_type = "sym"
+ else:
+ file_type = "file"
+ member = {"filename": os.path.join(root, file), "type": file_type}
+ members.append(member)
+
+ # Obtain files in mongo dict
+ remote_files = self._get_mongo_files(from_path)
+
+ # Upload members if they do not exists or have been modified
+ # We will do this for performance (avoid updating unmodified files) and to avoid
+ # updating a file with an older one in case there are two sources for synchronization
+ # in high availability scenarios
+ for member in members:
+ # obtain permission
+ mask = int(oct(os.stat(member["filename"]).st_mode)[-3:], 8)
+
+ # convert to relative path
+ rel_filename = os.path.relpath(member["filename"], self.path)
+ last_modified_date = datetime.datetime.fromtimestamp(
+ os.path.getmtime(member["filename"])
+ )
+
+ remote_file = remote_files.get(rel_filename)
+ upload_date = (
+ remote_file[0].uploadDate if remote_file else datetime.datetime.min
+ )
+ # remove processed files from dict
+ remote_files.pop(rel_filename, None)
+
+ if last_modified_date >= upload_date:
+
+ stream = None
+ fh = None
+ try:
+ file_type = member["type"]
+ if file_type == "dir":
+ stream = BytesIO()
+ elif file_type == "sym":
+ stream = BytesIO(
+ os.readlink(member["filename"]).encode("utf-8")
+ )
+ else:
+ fh = open(member["filename"], "rb")
+ stream = BytesIO(fh.read())
+
+ metadata = {"type": file_type, "permissions": mask}
+
+ self.logger.debug("Sync upload {}".format(rel_filename))
+ self.fs.upload_from_stream(rel_filename, stream, metadata=metadata)
+
+ # delete old files
+ if remote_file:
+ for file in remote_file:
+ self.logger.debug("Sync deleting {}".format(file.filename))
+ self.fs.delete(file._id)
+ finally:
+ if fh:
+ fh.close()
+ if stream:
+ stream.close()
+
+ # delete files that are not any more in local fs
+ for remote_file in remote_files.values():
+ for file in remote_file:
+ self.fs.delete(file._id)
+
+ def _get_mongo_files(self, from_path=None):
+
+ file_dict = {}
+ file_cursor = self.fs.find(no_cursor_timeout=True, sort=[("uploadDate", -1)])
+ for file in file_cursor:
+ if from_path and not file.filename.startswith(from_path):
+ continue
+ if file.filename in file_dict:
+ file_dict[file.filename].append(file)
+ else:
+ file_dict[file.filename] = [file]
+ return file_dict
+
+ def reverse_sync(self, from_path: str):
+ """
+ Sync from local storage to FSMongo
+ :param from_path: base directory to upload content to mongo fs
+ :return: None