Adding FsMongo 03/7603/41
authorEduardo Sousa <eduardo.sousa@canonical.com>
Tue, 4 Jun 2019 11:55:43 +0000 (12:55 +0100)
committerEduardo Sousa <eduardo.sousa@canonical.com>
Fri, 29 Nov 2019 09:37:36 +0000 (09:37 +0000)
Change-Id: I2380bf7c916533199d686c7448c474ca64635077
Signed-off-by: Eduardo Sousa <eduardo.sousa@canonical.com>
osm_common/fsmongo.py [new file with mode: 0644]
osm_common/tests/test_fsmongo.py [new file with mode: 0644]

diff --git a/osm_common/fsmongo.py b/osm_common/fsmongo.py
new file mode 100644 (file)
index 0000000..c558d8e
--- /dev/null
@@ -0,0 +1,432 @@
+# Copyright 2019 Canonical
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: eduardo.sousa@canonical.com
+##
+
+from io import BytesIO, StringIO
+from pymongo import MongoClient
+from gridfs import GridFSBucket, errors
+import logging
+from http import HTTPStatus
+import os
+from osm_common.fsbase import FsBase, FsException
+
+__author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>"
+
+
+class GridByteStream(BytesIO):
+    def __init__(self, filename, fs, mode):
+        BytesIO.__init__(self)
+        self._id = None
+        self.filename = filename
+        self.fs = fs
+        self.mode = mode
+
+        self.__initialize__()
+
+    def __initialize__(self):
+        grid_file = None
+
+        cursor = self.fs.find({"filename": self.filename})
+
+        for requested_file in cursor:
+            exception_file = next(cursor, None)
+
+            if exception_file:
+                raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+
+            if requested_file.metadata["type"] == "file":
+                grid_file = requested_file
+            else:
+                raise FsException("Type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+
+        if grid_file:
+            self._id = grid_file._id
+            self.fs.download_to_stream(self._id, self)
+
+            if "r" in self.mode:
+                self.seek(0, 0)
+
+    def close(self):
+        if "r" in self.mode:
+            super(GridByteStream, self).close()
+            return
+
+        if self._id:
+            self.fs.delete(self._id)
+
+        cursor = self.fs.find({
+            "filename": self.filename.split("/")[0],
+            "metadata": {"type": "dir"}})
+
+        parent_dir = next(cursor, None)
+
+        if not parent_dir:
+            parent_dir_name = self.filename.split("/")[0]
+            self.filename = self.filename.replace(parent_dir_name, parent_dir_name[:-1], 1)
+
+        self.seek(0, 0)
+        if self._id:
+            self.fs.upload_from_stream_with_id(
+                self._id,
+                self.filename,
+                self,
+                metadata={"type": "file"}
+            )
+        else:
+            self.fs.upload_from_stream(
+                self.filename,
+                self,
+                metadata={"type": "file"}
+            )
+        super(GridByteStream, self).close()
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.close()
+
+
+class GridStringStream(StringIO):
+    def __init__(self, filename, fs, mode):
+        StringIO.__init__(self)
+        self._id = None
+        self.filename = filename
+        self.fs = fs
+        self.mode = mode
+
+        self.__initialize__()
+
+    def __initialize__(self):
+        grid_file = None
+
+        cursor = self.fs.find({"filename": self.filename})
+
+        for requested_file in cursor:
+            exception_file = next(cursor, None)
+
+            if exception_file:
+                raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+
+            if requested_file.metadata["type"] == "file":
+                grid_file = requested_file
+            else:
+                raise FsException("File type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+
+        if grid_file:
+            stream = BytesIO()
+            self._id = grid_file._id
+            self.fs.download_to_stream(self._id, stream)
+            stream.seek(0)
+            self.write(stream.read().decode("utf-8"))
+            stream.close()
+
+            if "r" in self.mode:
+                self.seek(0, 0)
+
+    def close(self):
+        if "r" in self.mode:
+            super(GridStringStream, self).close()
+            return
+
+        if self._id:
+            self.fs.delete(self._id)
+
+        cursor = self.fs.find({
+            "filename": self.filename.split("/")[0],
+            "metadata": {"type": "dir"}})
+
+        parent_dir = next(cursor, None)
+
+        if not parent_dir:
+            parent_dir_name = self.filename.split("/")[0]
+            self.filename = self.filename.replace(parent_dir_name, parent_dir_name[:-1], 1)
+
+        self.seek(0, 0)
+        stream = BytesIO()
+        stream.write(self.read().encode("utf-8"))
+        stream.seek(0, 0)
+        if self._id:
+            self.fs.upload_from_stream_with_id(
+                self._id,
+                self.filename,
+                stream,
+                metadata={"type": "file"}
+            )
+        else:
+            self.fs.upload_from_stream(
+                self.filename,
+                stream,
+                metadata={"type": "file"}
+            )
+        stream.close()
+        super(GridStringStream, self).close()
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.close()
+
+
+class FsMongo(FsBase):
+
+    def __init__(self, logger_name='fs', lock=False):
+        super().__init__(logger_name, lock)
+        self.path = None
+        self.client = None
+        self.fs = None
+
+    def __update_local_fs(self):
+        dir_cursor = self.fs.find({"metadata.type": "dir"}, no_cursor_timeout=True)
+
+        for directory in dir_cursor:
+            os.makedirs(self.path + directory.filename, exist_ok=True)
+
+        file_cursor = self.fs.find({"metadata.type": "file"}, no_cursor_timeout=True)
+
+        for writing_file in file_cursor:
+            file_path = self.path + writing_file.filename
+            file_stream = open(file_path, 'wb+')
+            self.fs.download_to_stream(writing_file._id, file_stream)
+            file_stream.close()
+            if "permissions" in writing_file.metadata:
+                os.chmod(file_path, writing_file.metadata["permissions"])
+
+    def get_params(self):
+        self.__update_local_fs()
+
+        return {"fs": "mongo", "path": self.path}
+
+    def fs_connect(self, config):
+        try:
+            if "logger_name" in config:
+                self.logger = logging.getLogger(config["logger_name"])
+            if "path" in config:
+                self.path = config["path"]
+            else:
+                raise FsException("Missing parameter \"path\"")
+            if not self.path.endswith("/"):
+                self.path += "/"
+            if not os.path.exists(self.path):
+                raise FsException("Invalid configuration param at '[storage]': path '{}' does not exist".format(
+                    config["path"]))
+            elif not os.access(self.path, os.W_OK):
+                raise FsException("Invalid configuration param at '[storage]': path '{}' is not writable".format(
+                    config["path"]))
+            if all(key in config.keys() for key in ["uri", "collection"]):
+                self.client = MongoClient(config["uri"])
+                self.fs = GridFSBucket(self.client[config["collection"]])
+            elif all(key in config.keys() for key in ["host", "port", "collection"]):
+                self.client = MongoClient(config["host"], config["port"])
+                self.fs = GridFSBucket(self.client[config["collection"]])
+            else:
+                if "collection" not in config.keys():
+                    raise FsException("Missing parameter \"collection\"")
+                else:
+                    raise FsException("Missing parameters: \"uri\" or \"host\" + \"port\"")
+        except FsException:
+            raise
+        except Exception as e:  # TODO refine
+            raise FsException(str(e))
+
+    def fs_disconnect(self):
+        pass  # TODO
+
+    def mkdir(self, folder):
+        """
+        Creates a folder or parent object location
+        :param folder:
+        :return: None or raises an exception
+        """
+        try:
+            self.fs.upload_from_stream(
+                folder, BytesIO(), metadata={"type": "dir"})
+        except errors.FileExists:  # make it idempotent
+            pass
+        except Exception as e:
+            raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+
+    def dir_rename(self, src, dst):
+        """
+        Rename one directory name. If dst exist, it replaces (deletes) existing directory
+        :param src: source directory
+        :param dst: destination directory
+        :return: None or raises and exception
+        """
+        try:
+            dst_cursor = self.fs.find(
+                {"filename": {"$regex": "^{}(/|$)".format(dst)}},
+                no_cursor_timeout=True)
+
+            for dst_file in dst_cursor:
+                self.fs.delete(dst_file._id)
+
+            src_cursor = self.fs.find(
+                {"filename": {"$regex": "^{}(/|$)".format(src)}},
+                no_cursor_timeout=True)
+
+            for src_file in src_cursor:
+                self.fs.rename(src_file._id, src_file.filename.replace(src, dst, 1))
+        except Exception as e:
+            raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+
+    def file_exists(self, storage, mode=None):
+        """
+        Indicates if "storage" file exist
+        :param storage: can be a str or a str list
+        :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
+        :return: True, False
+        """
+        f = storage if isinstance(storage, str) else "/".join(storage)
+
+        cursor = self.fs.find({"filename": f})
+
+        for requested_file in cursor:
+            exception_file = next(cursor, None)
+
+            if exception_file:
+                raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+
+            if requested_file.metadata["type"] == mode:
+                return True
+
+        return False
+
+    def file_size(self, storage):
+        """
+        return file size
+        :param storage: can be a str or a str list
+        :return: file size
+        """
+        f = storage if isinstance(storage, str) else "/".join(storage)
+
+        cursor = self.fs.find({"filename": f})
+
+        for requested_file in cursor:
+            exception_file = next(cursor, None)
+
+            if exception_file:
+                raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+
+            return requested_file.length
+
+    def file_extract(self, tar_object, path):
+        """
+        extract a tar file
+        :param tar_object: object of type tar
+        :param path: can be a str or a str list, or a tar object where to extract the tar_object
+        :return: None
+        """
+        f = path if isinstance(path, str) else "/".join(path)
+
+        for member in tar_object.getmembers():
+            if member.isfile():
+                stream = tar_object.extractfile(member)
+            else:
+                stream = BytesIO()
+
+            metadata = {
+                "type": "file" if member.isfile() else "dir",
+                "permissions": member.mode
+            }
+
+            self.fs.upload_from_stream(
+                f + "/" + member.name,
+                stream,
+                metadata=metadata
+            )
+
+            stream.close()
+
+    def file_open(self, storage, mode):
+        """
+        Open a file
+        :param storage: can be a str or list of str
+        :param mode: file mode
+        :return: file object
+        """
+        try:
+            f = storage if isinstance(storage, str) else "/".join(storage)
+
+            if "b" in mode:
+                return GridByteStream(f, self.fs, mode)
+            else:
+                return GridStringStream(f, self.fs, mode)
+        except errors.NoFile:
+            raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
+        except IOError:
+            raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
+
+    def dir_ls(self, storage):
+        """
+        return folder content
+        :param storage: can be a str or list of str
+        :return: folder content
+        """
+        try:
+            f = storage if isinstance(storage, str) else "/".join(storage)
+
+            files = []
+            dir_cursor = self.fs.find({"filename": f})
+            for requested_dir in dir_cursor:
+                exception_dir = next(dir_cursor, None)
+
+                if exception_dir:
+                    raise FsException("Multiple directories found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+
+                if requested_dir.metadata["type"] != "dir":
+                    raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
+
+                files_cursor = self.fs.find({"filename": {"$regex": "^{}/([^/])*".format(f)}})
+                for children_file in files_cursor:
+                    files += [children_file.filename.replace(f + '/', '', 1)]
+
+            return files
+        except IOError:
+            raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
+
+    def file_delete(self, storage, ignore_non_exist=False):
+        """
+        Delete storage content recursively
+        :param storage: can be a str or list of str
+        :param ignore_non_exist: not raise exception if storage does not exist
+        :return: None
+        """
+        try:
+            f = storage if isinstance(storage, str) else "/".join(storage)
+
+            file_cursor = self.fs.find({"filename": f})
+            found = False
+            for requested_file in file_cursor:
+                found = True
+                exception_file = next(file_cursor, None)
+
+                if exception_file:
+                    raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+
+                if requested_file.metadata["type"] == "dir":
+                    dir_cursor = self.fs.find({"filename": {"$regex": "^{}".format(f)}})
+
+                    for tmp in dir_cursor:
+                        self.fs.delete(tmp._id)
+                else:
+                    self.fs.delete(requested_file._id)
+            if not found and not ignore_non_exist:
+                raise FsException("File {} does not exist".format(storage), http_code=HTTPStatus.NOT_FOUND)    
+        except IOError as e:
+            raise FsException("File {} cannot be deleted: {}".format(f, e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
diff --git a/osm_common/tests/test_fsmongo.py b/osm_common/tests/test_fsmongo.py
new file mode 100644 (file)
index 0000000..401d2d0
--- /dev/null
@@ -0,0 +1,475 @@
+# Copyright 2019 Canonical
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: eduardo.sousa@canonical.com
+##
+
+import logging
+import pytest
+import tempfile
+
+from pymongo import MongoClient
+from gridfs import GridFSBucket
+
+from osm_common.fsbase import FsException
+from osm_common.fsmongo import FsMongo
+
+__author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>"
+
+
+def valid_path():
+    return tempfile.gettempdir() + '/'
+
+
+def invalid_path():
+    return '/#tweeter/'
+
+
+@pytest.fixture(scope="function", params=[True, False])
+def fs_mongo(request, monkeypatch):
+    def mock_mongoclient_constructor(a, b, c):
+        pass
+
+    def mock_mongoclient_getitem(a, b):
+        pass
+
+    def mock_gridfs_constructor(a, b):
+        pass
+
+    monkeypatch.setattr(MongoClient, '__init__', mock_mongoclient_constructor)
+    monkeypatch.setattr(MongoClient, '__getitem__', mock_mongoclient_getitem)
+    monkeypatch.setattr(GridFSBucket, '__init__', mock_gridfs_constructor)
+    fs = FsMongo(lock=request.param)
+    fs.fs_connect({
+        'path': valid_path(),
+        'host': 'mongo',
+        'port': 27017,
+        'collection': 'files'})
+    return fs
+
+
+def generic_fs_exception_message(message):
+    return "storage exception {}".format(message)
+
+
+def fs_connect_exception_message(path):
+    return "storage exception Invalid configuration param at '[storage]': path '{}' does not exist".format(path)
+
+
+def file_open_file_not_found_exception(storage):
+    f = storage if isinstance(storage, str) else '/'.join(storage)
+    return "storage exception File {} does not exist".format(f)
+
+
+def file_open_io_exception(storage):
+    f = storage if isinstance(storage, str) else '/'.join(storage)
+    return "storage exception File {} cannot be opened".format(f)
+
+
+def dir_ls_not_a_directory_exception(storage):
+    f = storage if isinstance(storage, str) else '/'.join(storage)
+    return "storage exception File {} does not exist".format(f)
+
+
+def dir_ls_io_exception(storage):
+    f = storage if isinstance(storage, str) else '/'.join(storage)
+    return "storage exception File {} cannot be opened".format(f)
+
+
+def file_delete_exception_message(storage):
+    return "storage exception File {} does not exist".format(storage)
+
+
+def test_constructor_without_logger():
+    fs = FsMongo()
+    assert fs.logger == logging.getLogger('fs')
+    assert fs.path is None
+    assert fs.client is None
+    assert fs.fs is None
+
+
+def test_constructor_with_logger():
+    logger_name = 'fs_mongo'
+    fs = FsMongo(logger_name=logger_name)
+    assert fs.logger == logging.getLogger(logger_name)
+    assert fs.path is None
+    assert fs.client is None
+    assert fs.fs is None
+
+
+def test_get_params(fs_mongo, monkeypatch):
+    def mock_gridfs_find(self, search_query, **kwargs):
+        return []
+
+    monkeypatch.setattr(GridFSBucket, 'find', mock_gridfs_find)
+    params = fs_mongo.get_params()
+    assert len(params) == 2
+    assert "fs" in params
+    assert "path" in params
+    assert params["fs"] == "mongo"
+    assert params["path"] == valid_path()
+
+
+@pytest.mark.parametrize("config, exp_logger, exp_path", [
+    (
+        {
+            'logger_name': 'fs_mongo',
+            'path': valid_path(),
+            'uri': 'mongo:27017',
+            'collection': 'files'
+        },
+        'fs_mongo', valid_path()
+    ),
+    (
+        {
+            'logger_name': 'fs_mongo',
+            'path': valid_path(),
+            'host': 'mongo',
+            'port': 27017,
+            'collection': 'files'
+        },
+        'fs_mongo', valid_path()
+    ),
+    (
+        {
+            'logger_name': 'fs_mongo',
+            'path': valid_path()[:-1],
+            'uri': 'mongo:27017',
+            'collection': 'files'
+        },
+        'fs_mongo', valid_path()
+    ),
+    (
+        {
+            'logger_name': 'fs_mongo',
+            'path': valid_path()[:-1],
+            'host': 'mongo',
+            'port': 27017,
+            'collection': 'files'
+        },
+        'fs_mongo', valid_path()
+    ),
+    (
+        {
+            'path': valid_path(),
+            'uri': 'mongo:27017',
+            'collection': 'files'
+        },
+        'fs', valid_path()
+    ),
+    (
+        {
+            'path': valid_path(),
+            'host': 'mongo',
+            'port': 27017,
+            'collection': 'files'
+        },
+        'fs', valid_path()
+    ),
+    (
+        {
+            'path': valid_path()[:-1],
+            'uri': 'mongo:27017',
+            'collection': 'files'
+        },
+        'fs', valid_path()
+    ),
+    (
+        {
+            'path': valid_path()[:-1],
+            'host': 'mongo',
+            'port': 27017,
+            'collection': 'files'
+        },
+        'fs', valid_path()
+    )])
+def test_fs_connect_with_valid_config(config, exp_logger, exp_path):
+    fs = FsMongo()
+    fs.fs_connect(config)
+    assert fs.logger == logging.getLogger(exp_logger)
+    assert fs.path == exp_path
+    assert type(fs.client) == MongoClient
+    assert type(fs.fs) == GridFSBucket
+
+
+@pytest.mark.parametrize("config, exp_exception_message", [
+    (
+        {
+            'logger_name': 'fs_mongo',
+            'path': invalid_path(),
+            'uri': 'mongo:27017',
+            'collection': 'files'
+        },
+        fs_connect_exception_message(invalid_path())
+    ),
+    (
+        {
+            'logger_name': 'fs_mongo',
+            'path': invalid_path(),
+            'host': 'mongo',
+            'port': 27017,
+            'collection': 'files'
+        },
+        fs_connect_exception_message(invalid_path())
+    ),
+    (
+        {
+            'logger_name': 'fs_mongo',
+            'path': invalid_path()[:-1],
+            'uri': 'mongo:27017',
+            'collection': 'files'
+        },
+        fs_connect_exception_message(invalid_path()[:-1])
+    ),
+    (
+        {
+            'logger_name': 'fs_mongo',
+            'path': invalid_path()[:-1],
+            'host': 'mongo',
+            'port': 27017,
+            'collection': 'files'
+        },
+        fs_connect_exception_message(invalid_path()[:-1])
+    ),
+    (
+        {
+            'path': invalid_path(),
+            'uri': 'mongo:27017',
+            'collection': 'files'
+        },
+        fs_connect_exception_message(invalid_path())
+    ),
+    (
+        {
+            'path': invalid_path(),
+            'host': 'mongo',
+            'port': 27017,
+            'collection': 'files'
+        },
+        fs_connect_exception_message(invalid_path())
+    ),
+    (
+        {
+            'path': invalid_path()[:-1],
+            'uri': 'mongo:27017',
+            'collection': 'files'
+        },
+        fs_connect_exception_message(invalid_path()[:-1])
+    ),
+    (
+        {
+            'path': invalid_path()[:-1],
+            'host': 'mongo',
+            'port': 27017,
+            'collection': 'files'
+        },
+        fs_connect_exception_message(invalid_path()[:-1])
+    ),
+    (
+        {
+            'path': '/',
+            'host': 'mongo',
+            'port': 27017,
+            'collection': 'files'
+        },
+        generic_fs_exception_message(
+            "Invalid configuration param at '[storage]': path '/' is not writable"
+        )
+    )])
+def test_fs_connect_with_invalid_path(config, exp_exception_message):
+    fs = FsMongo()
+    with pytest.raises(FsException) as excinfo:
+        fs.fs_connect(config)
+    assert str(excinfo.value) == exp_exception_message
+
+
+@pytest.mark.parametrize("config, exp_exception_message", [
+    (
+        {
+            'logger_name': 'fs_mongo',
+            'uri': 'mongo:27017',
+            'collection': 'files'
+        },
+        "Missing parameter \"path\""
+    ),
+    (
+        {
+            'logger_name': 'fs_mongo',
+            'host': 'mongo',
+            'port': 27017,
+            'collection': 'files'
+        },
+        "Missing parameter \"path\""
+    ),
+    (
+        {
+            'logger_name': 'fs_mongo',
+            'path': valid_path(),
+            'collection': 'files'
+        },
+        "Missing parameters: \"uri\" or \"host\" + \"port\""
+    ),
+    (
+        {
+            'logger_name': 'fs_mongo',
+            'path': valid_path(),
+            'port': 27017,
+            'collection': 'files'
+        },
+        "Missing parameters: \"uri\" or \"host\" + \"port\""
+    ),
+    (
+        {
+            'logger_name': 'fs_mongo',
+            'path': valid_path(),
+            'host': 'mongo',
+            'collection': 'files'
+        },
+        "Missing parameters: \"uri\" or \"host\" + \"port\""
+    ),
+    (
+        {
+            'logger_name': 'fs_mongo',
+            'path': valid_path(),
+            'uri': 'mongo:27017'
+        },
+        "Missing parameter \"collection\""
+    ),
+    (
+        {
+            'logger_name': 'fs_mongo',
+            'path': valid_path(),
+            'host': 'mongo',
+            'port': 27017,
+        },
+        "Missing parameter \"collection\""
+    )])
+def test_fs_connect_with_missing_parameters(config, exp_exception_message):
+    fs = FsMongo()
+    with pytest.raises(FsException) as excinfo:
+        fs.fs_connect(config)
+    assert str(excinfo.value) == generic_fs_exception_message(exp_exception_message)
+
+
+@pytest.mark.parametrize("config, exp_exception_message", [
+    (
+        {
+            'logger_name': 'fs_mongo',
+            'path': valid_path(),
+            'uri': 'mongo:27017',
+            'collection': 'files'
+        },
+        "MongoClient crashed"
+    ),
+    (
+        {
+            'logger_name': 'fs_mongo',
+            'path': valid_path(),
+            'host': 'mongo',
+            'port': 27017,
+            'collection': 'files'
+        },
+        "MongoClient crashed"
+    )])
+def test_fs_connect_with_invalid_mongoclient(config, exp_exception_message, monkeypatch):
+    def generate_exception(a, b, c=None):
+        raise Exception(exp_exception_message)
+
+    monkeypatch.setattr(MongoClient, '__init__', generate_exception)
+
+    fs = FsMongo()
+    with pytest.raises(FsException) as excinfo:
+        fs.fs_connect(config)
+    assert str(excinfo.value) == generic_fs_exception_message(exp_exception_message)
+
+
+@pytest.mark.parametrize("config, exp_exception_message", [
+    (
+        {
+            'logger_name': 'fs_mongo',
+            'path': valid_path(),
+            'uri': 'mongo:27017',
+            'collection': 'files'
+        },
+        "Collection unavailable"
+    ),
+    (
+        {
+            'logger_name': 'fs_mongo',
+            'path': valid_path(),
+            'host': 'mongo',
+            'port': 27017,
+            'collection': 'files'
+        },
+        "Collection unavailable"
+    )])
+def test_fs_connect_with_invalid_mongo_collection(config, exp_exception_message, monkeypatch):
+    def mock_mongoclient_constructor(a, b, c=None):
+        pass
+
+    def generate_exception(a, b):
+        raise Exception(exp_exception_message)
+
+    monkeypatch.setattr(MongoClient, '__init__', mock_mongoclient_constructor)
+    monkeypatch.setattr(MongoClient, '__getitem__', generate_exception)
+
+    fs = FsMongo()
+    with pytest.raises(FsException) as excinfo:
+        fs.fs_connect(config)
+    assert str(excinfo.value) == generic_fs_exception_message(exp_exception_message)
+
+
+@pytest.mark.parametrize("config, exp_exception_message", [
+    (
+        {
+            'logger_name': 'fs_mongo',
+            'path': valid_path(),
+            'uri': 'mongo:27017',
+            'collection': 'files'
+        },
+        "GridFsBucket crashed"
+    ),
+    (
+        {
+            'logger_name': 'fs_mongo',
+            'path': valid_path(),
+            'host': 'mongo',
+            'port': 27017,
+            'collection': 'files'
+        },
+        "GridFsBucket crashed"
+    )])
+def test_fs_connect_with_invalid_gridfsbucket(config, exp_exception_message, monkeypatch):
+    def mock_mongoclient_constructor(a, b, c=None):
+        pass
+
+    def mock_mongoclient_getitem(a, b):
+        pass
+
+    def generate_exception(a, b):
+        raise Exception(exp_exception_message)
+
+    monkeypatch.setattr(MongoClient, '__init__', mock_mongoclient_constructor)
+    monkeypatch.setattr(MongoClient, '__getitem__', mock_mongoclient_getitem)
+    monkeypatch.setattr(GridFSBucket, '__init__', generate_exception)
+
+    fs = FsMongo()
+    with pytest.raises(FsException) as excinfo:
+        fs.fs_connect(config)
+    assert str(excinfo.value) == generic_fs_exception_message(exp_exception_message)
+
+
+def test_fs_disconnect(fs_mongo):
+    fs_mongo.fs_disconnect()