--- /dev/null
+# Copyright 2019 Canonical
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: eduardo.sousa@canonical.com
+##
+
+from io import BytesIO, StringIO
+from pymongo import MongoClient
+from gridfs import GridFSBucket, errors
+import logging
+from http import HTTPStatus
+import os
+from osm_common.fsbase import FsBase, FsException
+
+__author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>"
+
+
+class GridByteStream(BytesIO):
+ def __init__(self, filename, fs, mode):
+ BytesIO.__init__(self)
+ self._id = None
+ self.filename = filename
+ self.fs = fs
+ self.mode = mode
+
+ self.__initialize__()
+
+ def __initialize__(self):
+ grid_file = None
+
+ cursor = self.fs.find({"filename": self.filename})
+
+ for requested_file in cursor:
+ exception_file = next(cursor, None)
+
+ if exception_file:
+ raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+
+ if requested_file.metadata["type"] == "file":
+ grid_file = requested_file
+ else:
+ raise FsException("Type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+
+ if grid_file:
+ self._id = grid_file._id
+ self.fs.download_to_stream(self._id, self)
+
+ if "r" in self.mode:
+ self.seek(0, 0)
+
+ def close(self):
+ if "r" in self.mode:
+ super(GridByteStream, self).close()
+ return
+
+ if self._id:
+ self.fs.delete(self._id)
+
+ cursor = self.fs.find({
+ "filename": self.filename.split("/")[0],
+ "metadata": {"type": "dir"}})
+
+ parent_dir = next(cursor, None)
+
+ if not parent_dir:
+ parent_dir_name = self.filename.split("/")[0]
+ self.filename = self.filename.replace(parent_dir_name, parent_dir_name[:-1], 1)
+
+ self.seek(0, 0)
+ if self._id:
+ self.fs.upload_from_stream_with_id(
+ self._id,
+ self.filename,
+ self,
+ metadata={"type": "file"}
+ )
+ else:
+ self.fs.upload_from_stream(
+ self.filename,
+ self,
+ metadata={"type": "file"}
+ )
+ super(GridByteStream, self).close()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.close()
+
+
+class GridStringStream(StringIO):
+ def __init__(self, filename, fs, mode):
+ StringIO.__init__(self)
+ self._id = None
+ self.filename = filename
+ self.fs = fs
+ self.mode = mode
+
+ self.__initialize__()
+
+ def __initialize__(self):
+ grid_file = None
+
+ cursor = self.fs.find({"filename": self.filename})
+
+ for requested_file in cursor:
+ exception_file = next(cursor, None)
+
+ if exception_file:
+ raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+
+ if requested_file.metadata["type"] == "file":
+ grid_file = requested_file
+ else:
+ raise FsException("File type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+
+ if grid_file:
+ stream = BytesIO()
+ self._id = grid_file._id
+ self.fs.download_to_stream(self._id, stream)
+ stream.seek(0)
+ self.write(stream.read().decode("utf-8"))
+ stream.close()
+
+ if "r" in self.mode:
+ self.seek(0, 0)
+
+ def close(self):
+ if "r" in self.mode:
+ super(GridStringStream, self).close()
+ return
+
+ if self._id:
+ self.fs.delete(self._id)
+
+ cursor = self.fs.find({
+ "filename": self.filename.split("/")[0],
+ "metadata": {"type": "dir"}})
+
+ parent_dir = next(cursor, None)
+
+ if not parent_dir:
+ parent_dir_name = self.filename.split("/")[0]
+ self.filename = self.filename.replace(parent_dir_name, parent_dir_name[:-1], 1)
+
+ self.seek(0, 0)
+ stream = BytesIO()
+ stream.write(self.read().encode("utf-8"))
+ stream.seek(0, 0)
+ if self._id:
+ self.fs.upload_from_stream_with_id(
+ self._id,
+ self.filename,
+ stream,
+ metadata={"type": "file"}
+ )
+ else:
+ self.fs.upload_from_stream(
+ self.filename,
+ stream,
+ metadata={"type": "file"}
+ )
+ stream.close()
+ super(GridStringStream, self).close()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.close()
+
+
+class FsMongo(FsBase):
+
+ def __init__(self, logger_name='fs', lock=False):
+ super().__init__(logger_name, lock)
+ self.path = None
+ self.client = None
+ self.fs = None
+
+ def __update_local_fs(self):
+ dir_cursor = self.fs.find({"metadata.type": "dir"}, no_cursor_timeout=True)
+
+ for directory in dir_cursor:
+ os.makedirs(self.path + directory.filename, exist_ok=True)
+
+ file_cursor = self.fs.find({"metadata.type": "file"}, no_cursor_timeout=True)
+
+ for writing_file in file_cursor:
+ file_path = self.path + writing_file.filename
+ file_stream = open(file_path, 'wb+')
+ self.fs.download_to_stream(writing_file._id, file_stream)
+ file_stream.close()
+ if "permissions" in writing_file.metadata:
+ os.chmod(file_path, writing_file.metadata["permissions"])
+
+ def get_params(self):
+ self.__update_local_fs()
+
+ return {"fs": "mongo", "path": self.path}
+
+ def fs_connect(self, config):
+ try:
+ if "logger_name" in config:
+ self.logger = logging.getLogger(config["logger_name"])
+ if "path" in config:
+ self.path = config["path"]
+ else:
+ raise FsException("Missing parameter \"path\"")
+ if not self.path.endswith("/"):
+ self.path += "/"
+ if not os.path.exists(self.path):
+ raise FsException("Invalid configuration param at '[storage]': path '{}' does not exist".format(
+ config["path"]))
+ elif not os.access(self.path, os.W_OK):
+ raise FsException("Invalid configuration param at '[storage]': path '{}' is not writable".format(
+ config["path"]))
+ if all(key in config.keys() for key in ["uri", "collection"]):
+ self.client = MongoClient(config["uri"])
+ self.fs = GridFSBucket(self.client[config["collection"]])
+ elif all(key in config.keys() for key in ["host", "port", "collection"]):
+ self.client = MongoClient(config["host"], config["port"])
+ self.fs = GridFSBucket(self.client[config["collection"]])
+ else:
+ if "collection" not in config.keys():
+ raise FsException("Missing parameter \"collection\"")
+ else:
+ raise FsException("Missing parameters: \"uri\" or \"host\" + \"port\"")
+ except FsException:
+ raise
+ except Exception as e: # TODO refine
+ raise FsException(str(e))
+
+ def fs_disconnect(self):
+ pass # TODO
+
+ def mkdir(self, folder):
+ """
+ Creates a folder or parent object location
+ :param folder:
+ :return: None or raises an exception
+ """
+ try:
+ self.fs.upload_from_stream(
+ folder, BytesIO(), metadata={"type": "dir"})
+ except errors.FileExists: # make it idempotent
+ pass
+ except Exception as e:
+ raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+
+ def dir_rename(self, src, dst):
+ """
+ Rename one directory name. If dst exist, it replaces (deletes) existing directory
+ :param src: source directory
+ :param dst: destination directory
+ :return: None or raises and exception
+ """
+ try:
+ dst_cursor = self.fs.find(
+ {"filename": {"$regex": "^{}(/|$)".format(dst)}},
+ no_cursor_timeout=True)
+
+ for dst_file in dst_cursor:
+ self.fs.delete(dst_file._id)
+
+ src_cursor = self.fs.find(
+ {"filename": {"$regex": "^{}(/|$)".format(src)}},
+ no_cursor_timeout=True)
+
+ for src_file in src_cursor:
+ self.fs.rename(src_file._id, src_file.filename.replace(src, dst, 1))
+ except Exception as e:
+ raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+
+ def file_exists(self, storage, mode=None):
+ """
+ Indicates if "storage" file exist
+ :param storage: can be a str or a str list
+ :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
+ :return: True, False
+ """
+ f = storage if isinstance(storage, str) else "/".join(storage)
+
+ cursor = self.fs.find({"filename": f})
+
+ for requested_file in cursor:
+ exception_file = next(cursor, None)
+
+ if exception_file:
+ raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+
+ if requested_file.metadata["type"] == mode:
+ return True
+
+ return False
+
+ def file_size(self, storage):
+ """
+ return file size
+ :param storage: can be a str or a str list
+ :return: file size
+ """
+ f = storage if isinstance(storage, str) else "/".join(storage)
+
+ cursor = self.fs.find({"filename": f})
+
+ for requested_file in cursor:
+ exception_file = next(cursor, None)
+
+ if exception_file:
+ raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+
+ return requested_file.length
+
+ def file_extract(self, tar_object, path):
+ """
+ extract a tar file
+ :param tar_object: object of type tar
+ :param path: can be a str or a str list, or a tar object where to extract the tar_object
+ :return: None
+ """
+ f = path if isinstance(path, str) else "/".join(path)
+
+ for member in tar_object.getmembers():
+ if member.isfile():
+ stream = tar_object.extractfile(member)
+ else:
+ stream = BytesIO()
+
+ metadata = {
+ "type": "file" if member.isfile() else "dir",
+ "permissions": member.mode
+ }
+
+ self.fs.upload_from_stream(
+ f + "/" + member.name,
+ stream,
+ metadata=metadata
+ )
+
+ stream.close()
+
+ def file_open(self, storage, mode):
+ """
+ Open a file
+ :param storage: can be a str or list of str
+ :param mode: file mode
+ :return: file object
+ """
+ try:
+ f = storage if isinstance(storage, str) else "/".join(storage)
+
+ if "b" in mode:
+ return GridByteStream(f, self.fs, mode)
+ else:
+ return GridStringStream(f, self.fs, mode)
+ except errors.NoFile:
+ raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
+ except IOError:
+ raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
+
+ def dir_ls(self, storage):
+ """
+ return folder content
+ :param storage: can be a str or list of str
+ :return: folder content
+ """
+ try:
+ f = storage if isinstance(storage, str) else "/".join(storage)
+
+ files = []
+ dir_cursor = self.fs.find({"filename": f})
+ for requested_dir in dir_cursor:
+ exception_dir = next(dir_cursor, None)
+
+ if exception_dir:
+ raise FsException("Multiple directories found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+
+ if requested_dir.metadata["type"] != "dir":
+ raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
+
+ files_cursor = self.fs.find({"filename": {"$regex": "^{}/([^/])*".format(f)}})
+ for children_file in files_cursor:
+ files += [children_file.filename.replace(f + '/', '', 1)]
+
+ return files
+ except IOError:
+ raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
+
+ def file_delete(self, storage, ignore_non_exist=False):
+ """
+ Delete storage content recursively
+ :param storage: can be a str or list of str
+ :param ignore_non_exist: not raise exception if storage does not exist
+ :return: None
+ """
+ try:
+ f = storage if isinstance(storage, str) else "/".join(storage)
+
+ file_cursor = self.fs.find({"filename": f})
+ found = False
+ for requested_file in file_cursor:
+ found = True
+ exception_file = next(file_cursor, None)
+
+ if exception_file:
+ raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+
+ if requested_file.metadata["type"] == "dir":
+ dir_cursor = self.fs.find({"filename": {"$regex": "^{}".format(f)}})
+
+ for tmp in dir_cursor:
+ self.fs.delete(tmp._id)
+ else:
+ self.fs.delete(requested_file._id)
+ if not found and not ignore_non_exist:
+ raise FsException("File {} does not exist".format(storage), http_code=HTTPStatus.NOT_FOUND)
+ except IOError as e:
+ raise FsException("File {} cannot be deleted: {}".format(f, e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
--- /dev/null
+# Copyright 2019 Canonical
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: eduardo.sousa@canonical.com
+##
+
+import logging
+import pytest
+import tempfile
+
+from pymongo import MongoClient
+from gridfs import GridFSBucket
+
+from osm_common.fsbase import FsException
+from osm_common.fsmongo import FsMongo
+
+__author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>"
+
+
+def valid_path():
+ return tempfile.gettempdir() + '/'
+
+
+def invalid_path():
+ return '/#tweeter/'
+
+
+@pytest.fixture(scope="function", params=[True, False])
+def fs_mongo(request, monkeypatch):
+ def mock_mongoclient_constructor(a, b, c):
+ pass
+
+ def mock_mongoclient_getitem(a, b):
+ pass
+
+ def mock_gridfs_constructor(a, b):
+ pass
+
+ monkeypatch.setattr(MongoClient, '__init__', mock_mongoclient_constructor)
+ monkeypatch.setattr(MongoClient, '__getitem__', mock_mongoclient_getitem)
+ monkeypatch.setattr(GridFSBucket, '__init__', mock_gridfs_constructor)
+ fs = FsMongo(lock=request.param)
+ fs.fs_connect({
+ 'path': valid_path(),
+ 'host': 'mongo',
+ 'port': 27017,
+ 'collection': 'files'})
+ return fs
+
+
+def generic_fs_exception_message(message):
+ return "storage exception {}".format(message)
+
+
+def fs_connect_exception_message(path):
+ return "storage exception Invalid configuration param at '[storage]': path '{}' does not exist".format(path)
+
+
+def file_open_file_not_found_exception(storage):
+ f = storage if isinstance(storage, str) else '/'.join(storage)
+ return "storage exception File {} does not exist".format(f)
+
+
+def file_open_io_exception(storage):
+ f = storage if isinstance(storage, str) else '/'.join(storage)
+ return "storage exception File {} cannot be opened".format(f)
+
+
+def dir_ls_not_a_directory_exception(storage):
+ f = storage if isinstance(storage, str) else '/'.join(storage)
+ return "storage exception File {} does not exist".format(f)
+
+
+def dir_ls_io_exception(storage):
+ f = storage if isinstance(storage, str) else '/'.join(storage)
+ return "storage exception File {} cannot be opened".format(f)
+
+
+def file_delete_exception_message(storage):
+ return "storage exception File {} does not exist".format(storage)
+
+
+def test_constructor_without_logger():
+ fs = FsMongo()
+ assert fs.logger == logging.getLogger('fs')
+ assert fs.path is None
+ assert fs.client is None
+ assert fs.fs is None
+
+
+def test_constructor_with_logger():
+ logger_name = 'fs_mongo'
+ fs = FsMongo(logger_name=logger_name)
+ assert fs.logger == logging.getLogger(logger_name)
+ assert fs.path is None
+ assert fs.client is None
+ assert fs.fs is None
+
+
+def test_get_params(fs_mongo, monkeypatch):
+ def mock_gridfs_find(self, search_query, **kwargs):
+ return []
+
+ monkeypatch.setattr(GridFSBucket, 'find', mock_gridfs_find)
+ params = fs_mongo.get_params()
+ assert len(params) == 2
+ assert "fs" in params
+ assert "path" in params
+ assert params["fs"] == "mongo"
+ assert params["path"] == valid_path()
+
+
+@pytest.mark.parametrize("config, exp_logger, exp_path", [
+ (
+ {
+ 'logger_name': 'fs_mongo',
+ 'path': valid_path(),
+ 'uri': 'mongo:27017',
+ 'collection': 'files'
+ },
+ 'fs_mongo', valid_path()
+ ),
+ (
+ {
+ 'logger_name': 'fs_mongo',
+ 'path': valid_path(),
+ 'host': 'mongo',
+ 'port': 27017,
+ 'collection': 'files'
+ },
+ 'fs_mongo', valid_path()
+ ),
+ (
+ {
+ 'logger_name': 'fs_mongo',
+ 'path': valid_path()[:-1],
+ 'uri': 'mongo:27017',
+ 'collection': 'files'
+ },
+ 'fs_mongo', valid_path()
+ ),
+ (
+ {
+ 'logger_name': 'fs_mongo',
+ 'path': valid_path()[:-1],
+ 'host': 'mongo',
+ 'port': 27017,
+ 'collection': 'files'
+ },
+ 'fs_mongo', valid_path()
+ ),
+ (
+ {
+ 'path': valid_path(),
+ 'uri': 'mongo:27017',
+ 'collection': 'files'
+ },
+ 'fs', valid_path()
+ ),
+ (
+ {
+ 'path': valid_path(),
+ 'host': 'mongo',
+ 'port': 27017,
+ 'collection': 'files'
+ },
+ 'fs', valid_path()
+ ),
+ (
+ {
+ 'path': valid_path()[:-1],
+ 'uri': 'mongo:27017',
+ 'collection': 'files'
+ },
+ 'fs', valid_path()
+ ),
+ (
+ {
+ 'path': valid_path()[:-1],
+ 'host': 'mongo',
+ 'port': 27017,
+ 'collection': 'files'
+ },
+ 'fs', valid_path()
+ )])
+def test_fs_connect_with_valid_config(config, exp_logger, exp_path):
+ fs = FsMongo()
+ fs.fs_connect(config)
+ assert fs.logger == logging.getLogger(exp_logger)
+ assert fs.path == exp_path
+ assert type(fs.client) == MongoClient
+ assert type(fs.fs) == GridFSBucket
+
+
+@pytest.mark.parametrize("config, exp_exception_message", [
+ (
+ {
+ 'logger_name': 'fs_mongo',
+ 'path': invalid_path(),
+ 'uri': 'mongo:27017',
+ 'collection': 'files'
+ },
+ fs_connect_exception_message(invalid_path())
+ ),
+ (
+ {
+ 'logger_name': 'fs_mongo',
+ 'path': invalid_path(),
+ 'host': 'mongo',
+ 'port': 27017,
+ 'collection': 'files'
+ },
+ fs_connect_exception_message(invalid_path())
+ ),
+ (
+ {
+ 'logger_name': 'fs_mongo',
+ 'path': invalid_path()[:-1],
+ 'uri': 'mongo:27017',
+ 'collection': 'files'
+ },
+ fs_connect_exception_message(invalid_path()[:-1])
+ ),
+ (
+ {
+ 'logger_name': 'fs_mongo',
+ 'path': invalid_path()[:-1],
+ 'host': 'mongo',
+ 'port': 27017,
+ 'collection': 'files'
+ },
+ fs_connect_exception_message(invalid_path()[:-1])
+ ),
+ (
+ {
+ 'path': invalid_path(),
+ 'uri': 'mongo:27017',
+ 'collection': 'files'
+ },
+ fs_connect_exception_message(invalid_path())
+ ),
+ (
+ {
+ 'path': invalid_path(),
+ 'host': 'mongo',
+ 'port': 27017,
+ 'collection': 'files'
+ },
+ fs_connect_exception_message(invalid_path())
+ ),
+ (
+ {
+ 'path': invalid_path()[:-1],
+ 'uri': 'mongo:27017',
+ 'collection': 'files'
+ },
+ fs_connect_exception_message(invalid_path()[:-1])
+ ),
+ (
+ {
+ 'path': invalid_path()[:-1],
+ 'host': 'mongo',
+ 'port': 27017,
+ 'collection': 'files'
+ },
+ fs_connect_exception_message(invalid_path()[:-1])
+ ),
+ (
+ {
+ 'path': '/',
+ 'host': 'mongo',
+ 'port': 27017,
+ 'collection': 'files'
+ },
+ generic_fs_exception_message(
+ "Invalid configuration param at '[storage]': path '/' is not writable"
+ )
+ )])
+def test_fs_connect_with_invalid_path(config, exp_exception_message):
+ fs = FsMongo()
+ with pytest.raises(FsException) as excinfo:
+ fs.fs_connect(config)
+ assert str(excinfo.value) == exp_exception_message
+
+
+@pytest.mark.parametrize("config, exp_exception_message", [
+ (
+ {
+ 'logger_name': 'fs_mongo',
+ 'uri': 'mongo:27017',
+ 'collection': 'files'
+ },
+ "Missing parameter \"path\""
+ ),
+ (
+ {
+ 'logger_name': 'fs_mongo',
+ 'host': 'mongo',
+ 'port': 27017,
+ 'collection': 'files'
+ },
+ "Missing parameter \"path\""
+ ),
+ (
+ {
+ 'logger_name': 'fs_mongo',
+ 'path': valid_path(),
+ 'collection': 'files'
+ },
+ "Missing parameters: \"uri\" or \"host\" + \"port\""
+ ),
+ (
+ {
+ 'logger_name': 'fs_mongo',
+ 'path': valid_path(),
+ 'port': 27017,
+ 'collection': 'files'
+ },
+ "Missing parameters: \"uri\" or \"host\" + \"port\""
+ ),
+ (
+ {
+ 'logger_name': 'fs_mongo',
+ 'path': valid_path(),
+ 'host': 'mongo',
+ 'collection': 'files'
+ },
+ "Missing parameters: \"uri\" or \"host\" + \"port\""
+ ),
+ (
+ {
+ 'logger_name': 'fs_mongo',
+ 'path': valid_path(),
+ 'uri': 'mongo:27017'
+ },
+ "Missing parameter \"collection\""
+ ),
+ (
+ {
+ 'logger_name': 'fs_mongo',
+ 'path': valid_path(),
+ 'host': 'mongo',
+ 'port': 27017,
+ },
+ "Missing parameter \"collection\""
+ )])
+def test_fs_connect_with_missing_parameters(config, exp_exception_message):
+ fs = FsMongo()
+ with pytest.raises(FsException) as excinfo:
+ fs.fs_connect(config)
+ assert str(excinfo.value) == generic_fs_exception_message(exp_exception_message)
+
+
+@pytest.mark.parametrize("config, exp_exception_message", [
+ (
+ {
+ 'logger_name': 'fs_mongo',
+ 'path': valid_path(),
+ 'uri': 'mongo:27017',
+ 'collection': 'files'
+ },
+ "MongoClient crashed"
+ ),
+ (
+ {
+ 'logger_name': 'fs_mongo',
+ 'path': valid_path(),
+ 'host': 'mongo',
+ 'port': 27017,
+ 'collection': 'files'
+ },
+ "MongoClient crashed"
+ )])
+def test_fs_connect_with_invalid_mongoclient(config, exp_exception_message, monkeypatch):
+ def generate_exception(a, b, c=None):
+ raise Exception(exp_exception_message)
+
+ monkeypatch.setattr(MongoClient, '__init__', generate_exception)
+
+ fs = FsMongo()
+ with pytest.raises(FsException) as excinfo:
+ fs.fs_connect(config)
+ assert str(excinfo.value) == generic_fs_exception_message(exp_exception_message)
+
+
+@pytest.mark.parametrize("config, exp_exception_message", [
+ (
+ {
+ 'logger_name': 'fs_mongo',
+ 'path': valid_path(),
+ 'uri': 'mongo:27017',
+ 'collection': 'files'
+ },
+ "Collection unavailable"
+ ),
+ (
+ {
+ 'logger_name': 'fs_mongo',
+ 'path': valid_path(),
+ 'host': 'mongo',
+ 'port': 27017,
+ 'collection': 'files'
+ },
+ "Collection unavailable"
+ )])
+def test_fs_connect_with_invalid_mongo_collection(config, exp_exception_message, monkeypatch):
+ def mock_mongoclient_constructor(a, b, c=None):
+ pass
+
+ def generate_exception(a, b):
+ raise Exception(exp_exception_message)
+
+ monkeypatch.setattr(MongoClient, '__init__', mock_mongoclient_constructor)
+ monkeypatch.setattr(MongoClient, '__getitem__', generate_exception)
+
+ fs = FsMongo()
+ with pytest.raises(FsException) as excinfo:
+ fs.fs_connect(config)
+ assert str(excinfo.value) == generic_fs_exception_message(exp_exception_message)
+
+
+@pytest.mark.parametrize("config, exp_exception_message", [
+ (
+ {
+ 'logger_name': 'fs_mongo',
+ 'path': valid_path(),
+ 'uri': 'mongo:27017',
+ 'collection': 'files'
+ },
+ "GridFsBucket crashed"
+ ),
+ (
+ {
+ 'logger_name': 'fs_mongo',
+ 'path': valid_path(),
+ 'host': 'mongo',
+ 'port': 27017,
+ 'collection': 'files'
+ },
+ "GridFsBucket crashed"
+ )])
+def test_fs_connect_with_invalid_gridfsbucket(config, exp_exception_message, monkeypatch):
+ def mock_mongoclient_constructor(a, b, c=None):
+ pass
+
+ def mock_mongoclient_getitem(a, b):
+ pass
+
+ def generate_exception(a, b):
+ raise Exception(exp_exception_message)
+
+ monkeypatch.setattr(MongoClient, '__init__', mock_mongoclient_constructor)
+ monkeypatch.setattr(MongoClient, '__getitem__', mock_mongoclient_getitem)
+ monkeypatch.setattr(GridFSBucket, '__init__', generate_exception)
+
+ fs = FsMongo()
+ with pytest.raises(FsException) as excinfo:
+ fs.fs_connect(config)
+ assert str(excinfo.value) == generic_fs_exception_message(exp_exception_message)
+
+
+def test_fs_disconnect(fs_mongo):
+ fs_mongo.fs_disconnect()