Code Cleanup and adding unit tests
[osm/common.git] / osm_common / fsmongo.py
index 740d540..727410e 100644 (file)
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: eduardo.sousa@canonical.com
 ##
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: eduardo.sousa@canonical.com
 ##
-
+import datetime
 import errno
 from http import HTTPStatus
 from io import BytesIO, StringIO
 import logging
 import os
 import errno
 from http import HTTPStatus
 from io import BytesIO, StringIO
 import logging
 import os
+import tarfile
+import zipfile
 
 
-from gridfs import GridFSBucket, errors
+from gridfs import errors, GridFSBucket
 from osm_common.fsbase import FsBase, FsException
 from pymongo import MongoClient
 
 from osm_common.fsbase import FsBase, FsException
 from pymongo import MongoClient
 
@@ -50,13 +52,17 @@ class GridByteStream(BytesIO):
             exception_file = next(cursor, None)
 
             if exception_file:
             exception_file = next(cursor, None)
 
             if exception_file:
-                raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+                raise FsException(
+                    "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
+                )
 
             if requested_file.metadata["type"] in ("file", "sym"):
                 grid_file = requested_file
                 self.file_type = requested_file.metadata["type"]
             else:
 
             if requested_file.metadata["type"] in ("file", "sym"):
                 grid_file = requested_file
                 self.file_type = requested_file.metadata["type"]
             else:
-                raise FsException("Type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+                raise FsException(
+                    "Type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
+                )
 
         if grid_file:
             self._id = grid_file._id
 
         if grid_file:
             self._id = grid_file._id
@@ -73,29 +79,26 @@ class GridByteStream(BytesIO):
         if self._id:
             self.fs.delete(self._id)
 
         if self._id:
             self.fs.delete(self._id)
 
-        cursor = self.fs.find({
-            "filename": self.filename.split("/")[0],
-            "metadata": {"type": "dir"}})
+        cursor = self.fs.find(
+            {"filename": self.filename.split("/")[0], "metadata": {"type": "dir"}}
+        )
 
         parent_dir = next(cursor, None)
 
         if not parent_dir:
             parent_dir_name = self.filename.split("/")[0]
 
         parent_dir = next(cursor, None)
 
         if not parent_dir:
             parent_dir_name = self.filename.split("/")[0]
-            self.filename = self.filename.replace(parent_dir_name, parent_dir_name[:-1], 1)
+            self.filename = self.filename.replace(
+                parent_dir_name, parent_dir_name[:-1], 1
+            )
 
         self.seek(0, 0)
         if self._id:
             self.fs.upload_from_stream_with_id(
 
         self.seek(0, 0)
         if self._id:
             self.fs.upload_from_stream_with_id(
-                self._id,
-                self.filename,
-                self,
-                metadata={"type": self.file_type}
+                self._id, self.filename, self, metadata={"type": self.file_type}
             )
         else:
             self.fs.upload_from_stream(
             )
         else:
             self.fs.upload_from_stream(
-                self.filename,
-                self,
-                metadata={"type": self.file_type}
+                self.filename, self, metadata={"type": self.file_type}
             )
         super(GridByteStream, self).close()
 
             )
         super(GridByteStream, self).close()
 
@@ -126,13 +129,17 @@ class GridStringStream(StringIO):
             exception_file = next(cursor, None)
 
             if exception_file:
             exception_file = next(cursor, None)
 
             if exception_file:
-                raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+                raise FsException(
+                    "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
+                )
 
             if requested_file.metadata["type"] in ("file", "dir"):
                 grid_file = requested_file
                 self.file_type = requested_file.metadata["type"]
             else:
 
             if requested_file.metadata["type"] in ("file", "dir"):
                 grid_file = requested_file
                 self.file_type = requested_file.metadata["type"]
             else:
-                raise FsException("File type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+                raise FsException(
+                    "File type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
+                )
 
         if grid_file:
             stream = BytesIO()
 
         if grid_file:
             stream = BytesIO()
@@ -153,15 +160,17 @@ class GridStringStream(StringIO):
         if self._id:
             self.fs.delete(self._id)
 
         if self._id:
             self.fs.delete(self._id)
 
-        cursor = self.fs.find({
-            "filename": self.filename.split("/")[0],
-            "metadata": {"type": "dir"}})
+        cursor = self.fs.find(
+            {"filename": self.filename.split("/")[0], "metadata": {"type": "dir"}}
+        )
 
         parent_dir = next(cursor, None)
 
         if not parent_dir:
             parent_dir_name = self.filename.split("/")[0]
 
         parent_dir = next(cursor, None)
 
         if not parent_dir:
             parent_dir_name = self.filename.split("/")[0]
-            self.filename = self.filename.replace(parent_dir_name, parent_dir_name[:-1], 1)
+            self.filename = self.filename.replace(
+                parent_dir_name, parent_dir_name[:-1], 1
+            )
 
         self.seek(0, 0)
         stream = BytesIO()
 
         self.seek(0, 0)
         stream = BytesIO()
@@ -169,16 +178,11 @@ class GridStringStream(StringIO):
         stream.seek(0, 0)
         if self._id:
             self.fs.upload_from_stream_with_id(
         stream.seek(0, 0)
         if self._id:
             self.fs.upload_from_stream_with_id(
-                self._id,
-                self.filename,
-                stream,
-                metadata={"type": self.file_type}
+                self._id, self.filename, stream, metadata={"type": self.file_type}
             )
         else:
             self.fs.upload_from_stream(
             )
         else:
             self.fs.upload_from_stream(
-                self.filename,
-                stream,
-                metadata={"type": self.file_type}
+                self.filename, stream, metadata={"type": self.file_type}
             )
         stream.close()
         super(GridStringStream, self).close()
             )
         stream.close()
         super(GridStringStream, self).close()
@@ -191,8 +195,7 @@ class GridStringStream(StringIO):
 
 
 class FsMongo(FsBase):
 
 
 class FsMongo(FsBase):
-
-    def __init__(self, logger_name='fs', lock=False):
+    def __init__(self, logger_name="fs", lock=False):
         super().__init__(logger_name, lock)
         self.path = None
         self.client = None
         super().__init__(logger_name, lock)
         self.path = None
         self.client = None
@@ -201,12 +204,18 @@ class FsMongo(FsBase):
     def __update_local_fs(self, from_path=None):
         dir_cursor = self.fs.find({"metadata.type": "dir"}, no_cursor_timeout=True)
 
     def __update_local_fs(self, from_path=None):
         dir_cursor = self.fs.find({"metadata.type": "dir"}, no_cursor_timeout=True)
 
+        valid_paths = []
+
         for directory in dir_cursor:
             if from_path and not directory.filename.startswith(from_path):
                 continue
         for directory in dir_cursor:
             if from_path and not directory.filename.startswith(from_path):
                 continue
+            self.logger.debug("Making dir {}".format(self.path + directory.filename))
             os.makedirs(self.path + directory.filename, exist_ok=True)
             os.makedirs(self.path + directory.filename, exist_ok=True)
+            valid_paths.append(self.path + directory.filename)
 
 
-        file_cursor = self.fs.find({"metadata.type": {"$in": ["file", "sym"]}}, no_cursor_timeout=True)
+        file_cursor = self.fs.find(
+            {"metadata.type": {"$in": ["file", "sym"]}}, no_cursor_timeout=True
+        )
 
         for writing_file in file_cursor:
             if from_path and not writing_file.filename.startswith(from_path):
 
         for writing_file in file_cursor:
             if from_path and not writing_file.filename.startswith(from_path):
@@ -220,6 +229,7 @@ class FsMongo(FsBase):
                     link = b.read().decode("utf-8")
 
                 try:
                     link = b.read().decode("utf-8")
 
                 try:
+                    self.logger.debug("Sync removing {}".format(file_path))
                     os.remove(file_path)
                 except OSError as e:
                     if e.errno != errno.ENOENT:
                     os.remove(file_path)
                 except OSError as e:
                     if e.errno != errno.ENOENT:
@@ -227,7 +237,12 @@ class FsMongo(FsBase):
                         raise
                 os.symlink(link, file_path)
             else:
                         raise
                 os.symlink(link, file_path)
             else:
-                with open(file_path, 'wb+') as file_stream:
+                folder = os.path.dirname(file_path)
+                if folder not in valid_paths:
+                    self.logger.debug("Sync local directory {}".format(file_path))
+                    os.makedirs(folder, exist_ok=True)
+                with open(file_path, "wb+") as file_stream:
+                    self.logger.debug("Sync download {}".format(file_path))
                     self.fs.download_to_stream(writing_file._id, file_stream)
                 if "permissions" in writing_file.metadata:
                     os.chmod(file_path, writing_file.metadata["permissions"])
                     self.fs.download_to_stream(writing_file._id, file_stream)
                 if "permissions" in writing_file.metadata:
                     os.chmod(file_path, writing_file.metadata["permissions"])
@@ -242,26 +257,29 @@ class FsMongo(FsBase):
             if "path" in config:
                 self.path = config["path"]
             else:
             if "path" in config:
                 self.path = config["path"]
             else:
-                raise FsException("Missing parameter \"path\"")
+                raise FsException('Missing parameter "path"')
             if not self.path.endswith("/"):
                 self.path += "/"
             if not os.path.exists(self.path):
             if not self.path.endswith("/"):
                 self.path += "/"
             if not os.path.exists(self.path):
-                raise FsException("Invalid configuration param at '[storage]': path '{}' does not exist".format(
-                    config["path"]))
+                raise FsException(
+                    "Invalid configuration param at '[storage]': path '{}' does not exist".format(
+                        config["path"]
+                    )
+                )
             elif not os.access(self.path, os.W_OK):
             elif not os.access(self.path, os.W_OK):
-                raise FsException("Invalid configuration param at '[storage]': path '{}' is not writable".format(
-                    config["path"]))
+                raise FsException(
+                    "Invalid configuration param at '[storage]': path '{}' is not writable".format(
+                        config["path"]
+                    )
+                )
             if all(key in config.keys() for key in ["uri", "collection"]):
                 self.client = MongoClient(config["uri"])
                 self.fs = GridFSBucket(self.client[config["collection"]])
             if all(key in config.keys() for key in ["uri", "collection"]):
                 self.client = MongoClient(config["uri"])
                 self.fs = GridFSBucket(self.client[config["collection"]])
-            elif all(key in config.keys() for key in ["host", "port", "collection"]):
-                self.client = MongoClient(config["host"], config["port"])
-                self.fs = GridFSBucket(self.client[config["collection"]])
             else:
                 if "collection" not in config.keys():
             else:
                 if "collection" not in config.keys():
-                    raise FsException("Missing parameter \"collection\"")
+                    raise FsException('Missing parameter "collection"')
                 else:
                 else:
-                    raise FsException("Missing parameters: \"uri\" or \"host\" + \"port\"")
+                    raise FsException('Missing parameters: "uri"')
         except FsException:
             raise
         except Exception as e:  # TODO refine
         except FsException:
             raise
         except Exception as e:  # TODO refine
@@ -276,9 +294,9 @@ class FsMongo(FsBase):
         :param folder:
         :return: None or raises an exception
         """
         :param folder:
         :return: None or raises an exception
         """
+        folder = folder.rstrip("/")
         try:
         try:
-            self.fs.upload_from_stream(
-                folder, BytesIO(), metadata={"type": "dir"})
+            self.fs.upload_from_stream(folder, BytesIO(), metadata={"type": "dir"})
         except errors.FileExists:  # make it idempotent
             pass
         except Exception as e:
         except errors.FileExists:  # make it idempotent
             pass
         except Exception as e:
@@ -291,17 +309,20 @@ class FsMongo(FsBase):
         :param dst: destination directory
         :return: None or raises and exception
         """
         :param dst: destination directory
         :return: None or raises and exception
         """
+        dst = dst.rstrip("/")
+        src = src.rstrip("/")
+
         try:
             dst_cursor = self.fs.find(
         try:
             dst_cursor = self.fs.find(
-                {"filename": {"$regex": "^{}(/|$)".format(dst)}},
-                no_cursor_timeout=True)
+                {"filename": {"$regex": "^{}(/|$)".format(dst)}}, no_cursor_timeout=True
+            )
 
             for dst_file in dst_cursor:
                 self.fs.delete(dst_file._id)
 
             src_cursor = self.fs.find(
 
             for dst_file in dst_cursor:
                 self.fs.delete(dst_file._id)
 
             src_cursor = self.fs.find(
-                {"filename": {"$regex": "^{}(/|$)".format(src)}},
-                no_cursor_timeout=True)
+                {"filename": {"$regex": "^{}(/|$)".format(src)}}, no_cursor_timeout=True
+            )
 
             for src_file in src_cursor:
                 self.fs.rename(src_file._id, src_file.filename.replace(src, dst, 1))
 
             for src_file in src_cursor:
                 self.fs.rename(src_file._id, src_file.filename.replace(src, dst, 1))
@@ -316,6 +337,7 @@ class FsMongo(FsBase):
         :return: True, False
         """
         f = storage if isinstance(storage, str) else "/".join(storage)
         :return: True, False
         """
         f = storage if isinstance(storage, str) else "/".join(storage)
+        f = f.rstrip("/")
 
         cursor = self.fs.find({"filename": f})
 
 
         cursor = self.fs.find({"filename": f})
 
@@ -323,7 +345,15 @@ class FsMongo(FsBase):
             exception_file = next(cursor, None)
 
             if exception_file:
             exception_file = next(cursor, None)
 
             if exception_file:
-                raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+                raise FsException(
+                    "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
+                )
+
+            self.logger.debug("Entry {} metadata {}".format(f, requested_file.metadata))
+
+            # if no special mode is required just check it does exists
+            if not mode:
+                return True
 
             if requested_file.metadata["type"] == mode:
                 return True
 
             if requested_file.metadata["type"] == mode:
                 return True
@@ -340,6 +370,7 @@ class FsMongo(FsBase):
         :return: file size
         """
         f = storage if isinstance(storage, str) else "/".join(storage)
         :return: file size
         """
         f = storage if isinstance(storage, str) else "/".join(storage)
+        f = f.rstrip("/")
 
         cursor = self.fs.find({"filename": f})
 
 
         cursor = self.fs.find({"filename": f})
 
@@ -347,46 +378,69 @@ class FsMongo(FsBase):
             exception_file = next(cursor, None)
 
             if exception_file:
             exception_file = next(cursor, None)
 
             if exception_file:
-                raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+                raise FsException(
+                    "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
+                )
 
             return requested_file.length
 
 
             return requested_file.length
 
-    def file_extract(self, tar_object, path):
+    def file_extract(self, compressed_object, path):
         """
         extract a tar file
         """
         extract a tar file
-        :param tar_object: object of type tar
+        :param compressed_object: object of type tar or zip
         :param path: can be a str or a str list, or a tar object where to extract the tar_object
         :return: None
         """
         f = path if isinstance(path, str) else "/".join(path)
         :param path: can be a str or a str list, or a tar object where to extract the tar_object
         :return: None
         """
         f = path if isinstance(path, str) else "/".join(path)
+        f = f.rstrip("/")
+
+        if type(compressed_object) is tarfile.TarFile:
+            for member in compressed_object.getmembers():
+                if member.isfile():
+                    stream = compressed_object.extractfile(member)
+                elif member.issym():
+                    stream = BytesIO(member.linkname.encode("utf-8"))
+                else:
+                    stream = BytesIO()
 
 
-        for member in tar_object.getmembers():
-            if member.isfile():
-                stream = tar_object.extractfile(member)
-            elif member.issym():
-                stream = BytesIO(member.linkname.encode("utf-8"))
-            else:
-                stream = BytesIO()
+                if member.isfile():
+                    file_type = "file"
+                elif member.issym():
+                    file_type = "sym"
+                else:
+                    file_type = "dir"
 
 
-            if member.isfile():
-                file_type = "file"
-            elif member.issym():
-                file_type = "sym"
-            else:
-                file_type = "dir"
+                metadata = {"type": file_type, "permissions": member.mode}
+                member.name = member.name.rstrip("/")
 
 
-            metadata = {
-                "type": file_type,
-                "permissions": member.mode
-            }
+                self.logger.debug("Uploading {}/{}".format(f, member.name))
+                self.fs.upload_from_stream(
+                    f + "/" + member.name, stream, metadata=metadata
+                )
 
 
-            self.fs.upload_from_stream(
-                f + "/" + member.name,
-                stream,
-                metadata=metadata
-            )
+                stream.close()
+        elif type(compressed_object) is zipfile.ZipFile:
+            for member in compressed_object.infolist():
+                if member.is_dir():
+                    stream = BytesIO()
+                else:
+                    stream = compressed_object.read(member)
 
 
-            stream.close()
+                if member.is_dir():
+                    file_type = "dir"
+                else:
+                    file_type = "file"
+
+                metadata = {"type": file_type}
+                member.filename = member.filename.rstrip("/")
+
+                self.logger.debug("Uploading {}/{}".format(f, member.filename))
+                self.fs.upload_from_stream(
+                    f + "/" + member.filename, stream, metadata=metadata
+                )
+
+                if member.is_dir():
+                    stream.close()
 
     def file_open(self, storage, mode):
         """
 
     def file_open(self, storage, mode):
         """
@@ -397,15 +451,20 @@ class FsMongo(FsBase):
         """
         try:
             f = storage if isinstance(storage, str) else "/".join(storage)
         """
         try:
             f = storage if isinstance(storage, str) else "/".join(storage)
+            f = f.rstrip("/")
 
             if "b" in mode:
                 return GridByteStream(f, self.fs, mode)
             else:
                 return GridStringStream(f, self.fs, mode)
         except errors.NoFile:
 
             if "b" in mode:
                 return GridByteStream(f, self.fs, mode)
             else:
                 return GridStringStream(f, self.fs, mode)
         except errors.NoFile:
-            raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
+            raise FsException(
+                "File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND
+            )
         except IOError:
         except IOError:
-            raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
+            raise FsException(
+                "File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST
+            )
 
     def dir_ls(self, storage):
         """
 
     def dir_ls(self, storage):
         """
@@ -415,6 +474,7 @@ class FsMongo(FsBase):
         """
         try:
             f = storage if isinstance(storage, str) else "/".join(storage)
         """
         try:
             f = storage if isinstance(storage, str) else "/".join(storage)
+            f = f.rstrip("/")
 
             files = []
             dir_cursor = self.fs.find({"filename": f})
 
             files = []
             dir_cursor = self.fs.find({"filename": f})
@@ -422,18 +482,31 @@ class FsMongo(FsBase):
                 exception_dir = next(dir_cursor, None)
 
                 if exception_dir:
                 exception_dir = next(dir_cursor, None)
 
                 if exception_dir:
-                    raise FsException("Multiple directories found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+                    raise FsException(
+                        "Multiple directories found",
+                        http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
+                    )
 
                 if requested_dir.metadata["type"] != "dir":
 
                 if requested_dir.metadata["type"] != "dir":
-                    raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
+                    raise FsException(
+                        "File {} does not exist".format(f),
+                        http_code=HTTPStatus.NOT_FOUND,
+                    )
+
+                if f.endswith("/"):
+                    f = f[:-1]
 
 
-                files_cursor = self.fs.find({"filename": {"$regex": "^{}/([^/])*".format(f)}})
+                files_cursor = self.fs.find(
+                    {"filename": {"$regex": "^{}/([^/])*".format(f)}}
+                )
                 for children_file in files_cursor:
                 for children_file in files_cursor:
-                    files += [children_file.filename.replace(f + '/', '', 1)]
+                    files += [children_file.filename.replace(f + "/", "", 1)]
 
             return files
         except IOError:
 
             return files
         except IOError:
-            raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
+            raise FsException(
+                "File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST
+            )
 
     def file_delete(self, storage, ignore_non_exist=False):
         """
 
     def file_delete(self, storage, ignore_non_exist=False):
         """
@@ -444,6 +517,7 @@ class FsMongo(FsBase):
         """
         try:
             f = storage if isinstance(storage, str) else "/".join(storage)
         """
         try:
             f = storage if isinstance(storage, str) else "/".join(storage)
+            f = f.rstrip("/")
 
             file_cursor = self.fs.find({"filename": f})
             found = False
 
             file_cursor = self.fs.find({"filename": f})
             found = False
@@ -452,19 +526,37 @@ class FsMongo(FsBase):
                 exception_file = next(file_cursor, None)
 
                 if exception_file:
                 exception_file = next(file_cursor, None)
 
                 if exception_file:
-                    raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+                    self.logger.error(
+                        "Cannot delete duplicate file: {} and {}".format(
+                            requested_file.filename, exception_file.filename
+                        )
+                    )
+                    raise FsException(
+                        "Multiple files found",
+                        http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
+                    )
 
                 if requested_file.metadata["type"] == "dir":
 
                 if requested_file.metadata["type"] == "dir":
-                    dir_cursor = self.fs.find({"filename": {"$regex": "^{}".format(f)}})
+                    dir_cursor = self.fs.find(
+                        {"filename": {"$regex": "^{}/".format(f)}}
+                    )
 
                     for tmp in dir_cursor:
 
                     for tmp in dir_cursor:
+                        self.logger.debug("Deleting {}".format(tmp.filename))
                         self.fs.delete(tmp._id)
                         self.fs.delete(tmp._id)
-                else:
-                    self.fs.delete(requested_file._id)
+
+                self.logger.debug("Deleting {}".format(requested_file.filename))
+                self.fs.delete(requested_file._id)
             if not found and not ignore_non_exist:
             if not found and not ignore_non_exist:
-                raise FsException("File {} does not exist".format(storage), http_code=HTTPStatus.NOT_FOUND)
+                raise FsException(
+                    "File {} does not exist".format(storage),
+                    http_code=HTTPStatus.NOT_FOUND,
+                )
         except IOError as e:
         except IOError as e:
-            raise FsException("File {} cannot be deleted: {}".format(f, e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+            raise FsException(
+                "File {} cannot be deleted: {}".format(f, e),
+                http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
+            )
 
     def sync(self, from_path=None):
         """
 
     def sync(self, from_path=None):
         """
@@ -472,4 +564,112 @@ class FsMongo(FsBase):
         :param from_path: if supplied, only copy content from this path, not all
         :return: None
         """
         :param from_path: if supplied, only copy content from this path, not all
         :return: None
         """
+        if from_path:
+            if os.path.isabs(from_path):
+                from_path = os.path.relpath(from_path, self.path)
         self.__update_local_fs(from_path=from_path)
         self.__update_local_fs(from_path=from_path)
+
+    def _update_mongo_fs(self, from_path):
+
+        os_path = self.path + from_path
+
+        # Obtain list of files and dirs in filesystem
+        members = []
+        for root, dirs, files in os.walk(os_path):
+            for folder in dirs:
+                member = {"filename": os.path.join(root, folder), "type": "dir"}
+                if os.path.islink(member["filename"]):
+                    member["type"] = "sym"
+                members.append(member)
+            for file in files:
+                filename = os.path.join(root, file)
+                if os.path.islink(filename):
+                    file_type = "sym"
+                else:
+                    file_type = "file"
+                member = {"filename": os.path.join(root, file), "type": file_type}
+                members.append(member)
+
+        # Obtain files in mongo dict
+        remote_files = self._get_mongo_files(from_path)
+
+        # Upload members if they do not exists or have been modified
+        # We will do this for performance (avoid updating unmodified files) and to avoid
+        # updating a file with an older one in case there are two sources for synchronization
+        # in high availability scenarios
+        for member in members:
+            # obtain permission
+            mask = int(oct(os.stat(member["filename"]).st_mode)[-3:], 8)
+
+            # convert to relative path
+            rel_filename = os.path.relpath(member["filename"], self.path)
+            last_modified_date = datetime.datetime.fromtimestamp(
+                os.path.getmtime(member["filename"])
+            )
+
+            remote_file = remote_files.get(rel_filename)
+            upload_date = (
+                remote_file[0].uploadDate if remote_file else datetime.datetime.min
+            )
+            # remove processed files from dict
+            remote_files.pop(rel_filename, None)
+
+            if last_modified_date >= upload_date:
+
+                stream = None
+                fh = None
+                try:
+                    file_type = member["type"]
+                    if file_type == "dir":
+                        stream = BytesIO()
+                    elif file_type == "sym":
+                        stream = BytesIO(
+                            os.readlink(member["filename"]).encode("utf-8")
+                        )
+                    else:
+                        fh = open(member["filename"], "rb")
+                        stream = BytesIO(fh.read())
+
+                    metadata = {"type": file_type, "permissions": mask}
+
+                    self.logger.debug("Sync upload {}".format(rel_filename))
+                    self.fs.upload_from_stream(rel_filename, stream, metadata=metadata)
+
+                    # delete old files
+                    if remote_file:
+                        for file in remote_file:
+                            self.logger.debug("Sync deleting {}".format(file.filename))
+                            self.fs.delete(file._id)
+                finally:
+                    if fh:
+                        fh.close()
+                    if stream:
+                        stream.close()
+
+        # delete files that are not any more in local fs
+        for remote_file in remote_files.values():
+            for file in remote_file:
+                self.fs.delete(file._id)
+
+    def _get_mongo_files(self, from_path=None):
+
+        file_dict = {}
+        file_cursor = self.fs.find(no_cursor_timeout=True, sort=[("uploadDate", -1)])
+        for file in file_cursor:
+            if from_path and not file.filename.startswith(from_path):
+                continue
+            if file.filename in file_dict:
+                file_dict[file.filename].append(file)
+            else:
+                file_dict[file.filename] = [file]
+        return file_dict
+
+    def reverse_sync(self, from_path: str):
+        """
+        Sync from local storage to FSMongo
+        :param from_path: base directory to upload content to mongo fs
+        :return: None
+        """
+        if os.path.isabs(from_path):
+            from_path = os.path.relpath(from_path, self.path)
+        self._update_mongo_fs(from_path=from_path)