X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2Fcommon.git;a=blobdiff_plain;f=osm_common%2Ftests%2Ftest_fsmongo.py;h=3b6256920ee2c0944b797369d035d66f55fd7b00;hp=401d2d0652e3a5e74841df6275d1d17860214e3b;hb=f296d2a468e38fef4145f526b5bc0726502d77cf;hpb=0593abaff9f986845d9a89d5354784dc3900e7b5 diff --git a/osm_common/tests/test_fsmongo.py b/osm_common/tests/test_fsmongo.py index 401d2d0..3b62569 100644 --- a/osm_common/tests/test_fsmongo.py +++ b/osm_common/tests/test_fsmongo.py @@ -19,12 +19,20 @@ import logging import pytest import tempfile +import tarfile +import os +import subprocess from pymongo import MongoClient from gridfs import GridFSBucket +from io import BytesIO + +from unittest.mock import Mock + from osm_common.fsbase import FsException from osm_common.fsmongo import FsMongo +from pathlib import Path __author__ = "Eduardo Sousa " @@ -473,3 +481,178 @@ def test_fs_connect_with_invalid_gridfsbucket(config, exp_exception_message, mon def test_fs_disconnect(fs_mongo): fs_mongo.fs_disconnect() + + +# Example.tar.gz +# example_tar/ +# ├── directory +# │ └── file +# └── symlinks +# ├── directory_link -> ../directory/ +# └── file_link -> ../directory/file +class FakeCursor: + def __init__(self, id, filename, metadata): + self._id = id + self.filename = filename + self.metadata = metadata + + +class FakeFS: + directory_metadata = {'type': 'dir', 'permissions': 509} + file_metadata = {'type': 'file', 'permissions': 436} + symlink_metadata = {'type': 'sym', 'permissions': 511} + + tar_info = { + 1: { + "cursor": FakeCursor(1, 'example_tar', directory_metadata), + "metadata": directory_metadata, + "stream_content": b'', + "stream_content_bad": b"Something", + "path": './tmp/example_tar', + }, + 2: { + "cursor": FakeCursor(2, 'example_tar/directory', directory_metadata), + "metadata": directory_metadata, + "stream_content": b'', + "stream_content_bad": b"Something", + "path": './tmp/example_tar/directory', + }, + 3: { + "cursor": FakeCursor(3, 'example_tar/symlinks', directory_metadata), + "metadata": directory_metadata, + "stream_content": b'', + "stream_content_bad": b"Something", + "path": './tmp/example_tar/symlinks', + }, + 4: { + "cursor": FakeCursor(4, 'example_tar/directory/file', file_metadata), + "metadata": file_metadata, + "stream_content": b"Example test", + "stream_content_bad": b"Example test2", + "path": './tmp/example_tar/directory/file', + }, + 5: { + "cursor": FakeCursor(5, 'example_tar/symlinks/file_link', symlink_metadata), + "metadata": symlink_metadata, + "stream_content": b"../directory/file", + "stream_content_bad": b"", + "path": './tmp/example_tar/symlinks/file_link', + }, + 6: { + "cursor": FakeCursor(6, 'example_tar/symlinks/directory_link', symlink_metadata), + "metadata": symlink_metadata, + "stream_content": b"../directory/", + "stream_content_bad": b"", + "path": './tmp/example_tar/symlinks/directory_link', + } + } + + def upload_from_stream(self, f, stream, metadata=None): + found = False + for i, v in self.tar_info.items(): + if f == v["path"]: + assert metadata["type"] == v["metadata"]["type"] + assert stream.read() == BytesIO(v["stream_content"]).read() + stream.seek(0) + assert stream.read() != BytesIO(v["stream_content_bad"]).read() + found = True + continue + assert found + + def find(self, type, no_cursor_timeout=True, sort=None): + list = [] + for i, v in self.tar_info.items(): + if type["metadata.type"] == "dir": + if v["metadata"] == self.directory_metadata: + list.append(v["cursor"]) + else: + if v["metadata"] != self.directory_metadata: + list.append(v["cursor"]) + return list + + def download_to_stream(self, id, file_stream): + file_stream.write(BytesIO(self.tar_info[id]["stream_content"]).read()) + + +def test_file_extract(): + tar_path = "tmp/Example.tar.gz" + folder_path = "tmp/example_tar" + + # Generate package + subprocess.call(["rm", "-rf", "./tmp"]) + subprocess.call(["mkdir", "-p", "{}/directory".format(folder_path)]) + subprocess.call(["mkdir", "-p", "{}/symlinks".format(folder_path)]) + p = Path("{}/directory/file".format(folder_path)) + p.write_text("Example test") + os.symlink("../directory/file", "{}/symlinks/file_link".format(folder_path)) + os.symlink("../directory/", "{}/symlinks/directory_link".format(folder_path)) + if os.path.exists(tar_path): + os.remove(tar_path) + subprocess.call(["tar", "-czvf", tar_path, folder_path]) + + try: + tar = tarfile.open(tar_path, "r") + fs = FsMongo() + fs.fs = FakeFS() + fs.file_extract(tar_object=tar, path=".") + finally: + os.remove(tar_path) + subprocess.call(["rm", "-rf", "./tmp"]) + + +def test_upload_local_fs(): + path = "./tmp/" + + subprocess.call(["rm", "-rf", path]) + try: + fs = FsMongo() + fs.path = path + fs.fs = FakeFS() + fs.sync() + assert os.path.isdir("{}example_tar".format(path)) + assert os.path.isdir("{}example_tar/directory".format(path)) + assert os.path.isdir("{}example_tar/symlinks".format(path)) + assert os.path.isfile("{}example_tar/directory/file".format(path)) + assert os.path.islink("{}example_tar/symlinks/file_link".format(path)) + assert os.path.islink("{}example_tar/symlinks/directory_link".format(path)) + finally: + subprocess.call(["rm", "-rf", path]) + + +def test_upload_mongo_fs(): + path = "./tmp/" + + subprocess.call(["rm", "-rf", path]) + try: + fs = FsMongo() + fs.path = path + fs.fs = Mock() + fs.fs.find.return_value = {} + + file_content = "Test file content" + + # Create local dir and upload content to fakefs + os.mkdir(path) + os.mkdir("{}example_local".format(path)) + os.mkdir("{}example_local/directory".format(path)) + with open("{}example_local/directory/test_file".format(path), "w+") as test_file: + test_file.write(file_content) + fs.reverse_sync("example_local") + + assert fs.fs.upload_from_stream.call_count == 2 + + # first call to upload_from_stream, dir_name + dir_name = "example_local/directory" + call_args_0 = fs.fs.upload_from_stream.call_args_list[0] + assert call_args_0[0][0] == dir_name + assert call_args_0[1].get("metadata").get("type") == "dir" + + # second call to upload_from_stream, dir_name + file_name = "example_local/directory/test_file" + call_args_1 = fs.fs.upload_from_stream.call_args_list[1] + assert call_args_1[0][0] == file_name + assert call_args_1[1].get("metadata").get("type") == "file" + + finally: + subprocess.call(["rm", "-rf", path]) + pass