from io import BytesIO, StringIO
import logging
import os
+import datetime
from gridfs import GridFSBucket, errors
from osm_common.fsbase import FsBase, FsException
if exception_file:
raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+ # if no special mode is required just check it does exists
+ if not mode:
+ return True
+
if requested_file.metadata["type"] == mode:
return True
:param from_path: if supplied, only copy content from this path, not all
:return: None
"""
+ if from_path:
+ if os.path.isabs(from_path):
+ from_path = os.path.relpath(from_path, self.path)
self.__update_local_fs(from_path=from_path)
+
+ def _update_mongo_fs(self, from_path):
+
+ os_path = self.path + from_path
+
+ # Obtain list of files and dirs in filesystem
+ members = []
+ for root, dirs, files in os.walk(os_path):
+ for folder in dirs:
+ member = {
+ "filename": os.path.join(root, folder),
+ "type": "dir"
+ }
+ members.append(member)
+ for file in files:
+ filename = os.path.join(root, file)
+ if os.path.islink(filename):
+ file_type = "sym"
+ else:
+ file_type = "file"
+ member = {
+ "filename": os.path.join(root, file),
+ "type": file_type
+ }
+ members.append(member)
+
+ # Obtain files in mongo dict
+ remote_files = self._get_mongo_files(from_path)
+
+ # Upload members if they do not exists or have been modified
+ # We will do this for performance (avoid updating unmodified files) and to avoid
+ # updating a file with an older one in case there are two sources for synchronization
+ # in high availability scenarios
+ for member in members:
+ # obtain permission
+ mask = int(oct(os.stat(member["filename"]).st_mode)[-3:], 8)
+
+ # convert to relative path
+ rel_filename = os.path.relpath(member["filename"], self.path)
+ last_modified_date = datetime.datetime.fromtimestamp(os.path.getmtime(member["filename"]))
+
+ remote_file = remote_files.get(rel_filename)
+ upload_date = remote_file[0].uploadDate if remote_file else datetime.datetime.min
+ # remove processed files from dict
+ remote_files.pop(rel_filename, None)
+
+ if last_modified_date >= upload_date:
+
+ stream = None
+ fh = None
+ try:
+ file_type = member["type"]
+ if file_type == "dir":
+ stream = BytesIO()
+ elif file_type == "sym":
+ stream = BytesIO(os.readlink(member["filename"]).encode("utf-8"))
+ else:
+ fh = open(member["filename"], "rb")
+ stream = BytesIO(fh.read())
+
+ metadata = {
+ "type": file_type,
+ "permissions": mask
+ }
+
+ self.fs.upload_from_stream(
+ rel_filename,
+ stream,
+ metadata=metadata
+ )
+
+ # delete old files
+ if remote_file:
+ for file in remote_file:
+ self.fs.delete(file._id)
+ finally:
+ if fh:
+ fh.close()
+ if stream:
+ stream.close()
+
+ # delete files that are not any more in local fs
+ for remote_file in remote_files.values():
+ for file in remote_file:
+ self.fs.delete(file._id)
+
+ def _get_mongo_files(self, from_path=None):
+
+ file_dict = {}
+ file_cursor = self.fs.find(no_cursor_timeout=True, sort=[('uploadDate', -1)])
+ for file in file_cursor:
+ if from_path and not file.filename.startswith(from_path):
+ continue
+ if file.filename in file_dict:
+ file_dict[file.filename].append(file)
+ else:
+ file_dict[file.filename] = [file]
+ return file_dict
+
+ def reverse_sync(self, from_path: str):
+ """
+ Sync from local storage to FSMongo
+ :param from_path: base directory to upload content to mongo fs
+ :return: None
+ """
+ if os.path.isabs(from_path):
+ from_path = os.path.relpath(from_path, self.path)
+ self._update_mongo_fs(from_path=from_path)
from io import BytesIO
+from unittest.mock import Mock
+
from osm_common.fsbase import FsException
from osm_common.fsmongo import FsMongo
from pathlib import Path
continue
assert found
- def find(self, type, no_cursor_timeout=True):
+ def find(self, type, no_cursor_timeout=True, sort=None):
list = []
for i, v in self.tar_info.items():
if type["metadata.type"] == "dir":
assert os.path.islink("{}example_tar/symlinks/directory_link".format(path))
finally:
subprocess.call(["rm", "-rf", path])
+
+
+def test_upload_mongo_fs():
+ path = "./tmp/"
+
+ subprocess.call(["rm", "-rf", path])
+ try:
+ fs = FsMongo()
+ fs.path = path
+ fs.fs = Mock()
+ fs.fs.find.return_value = {}
+
+ file_content = "Test file content"
+
+ # Create local dir and upload content to fakefs
+ os.mkdir(path)
+ os.mkdir("{}example_local".format(path))
+ os.mkdir("{}example_local/directory".format(path))
+ with open("{}example_local/directory/test_file".format(path), "w+") as test_file:
+ test_file.write(file_content)
+ fs.reverse_sync("example_local")
+
+ assert fs.fs.upload_from_stream.call_count == 2
+
+ # first call to upload_from_stream, dir_name
+ dir_name = "example_local/directory"
+ call_args_0 = fs.fs.upload_from_stream.call_args_list[0]
+ assert call_args_0[0][0] == dir_name
+ assert call_args_0[1].get("metadata").get("type") == "dir"
+
+ # second call to upload_from_stream, dir_name
+ file_name = "example_local/directory/test_file"
+ call_args_1 = fs.fs.upload_from_stream.call_args_list[1]
+ assert call_args_1[0][0] == file_name
+ assert call_args_1[1].get("metadata").get("type") == "file"
+
+ finally:
+ subprocess.call(["rm", "-rf", path])
+ pass