+
+
+# Example.tar.gz
+# example_tar/
+# ├── directory
+# │ └── file
+# └── symlinks
+# ├── directory_link -> ../directory/
+# └── file_link -> ../directory/file
+class FakeCursor:
+ def __init__(self, id, filename, metadata):
+ self._id = id
+ self.filename = filename
+ self.metadata = metadata
+
+
+class FakeFS:
+ directory_metadata = {"type": "dir", "permissions": 509}
+ file_metadata = {"type": "file", "permissions": 436}
+ symlink_metadata = {"type": "sym", "permissions": 511}
+
+ tar_info = {
+ 1: {
+ "cursor": FakeCursor(1, "example_tar", directory_metadata),
+ "metadata": directory_metadata,
+ "stream_content": b"",
+ "stream_content_bad": b"Something",
+ "path": "./tmp/example_tar",
+ },
+ 2: {
+ "cursor": FakeCursor(2, "example_tar/directory", directory_metadata),
+ "metadata": directory_metadata,
+ "stream_content": b"",
+ "stream_content_bad": b"Something",
+ "path": "./tmp/example_tar/directory",
+ },
+ 3: {
+ "cursor": FakeCursor(3, "example_tar/symlinks", directory_metadata),
+ "metadata": directory_metadata,
+ "stream_content": b"",
+ "stream_content_bad": b"Something",
+ "path": "./tmp/example_tar/symlinks",
+ },
+ 4: {
+ "cursor": FakeCursor(4, "example_tar/directory/file", file_metadata),
+ "metadata": file_metadata,
+ "stream_content": b"Example test",
+ "stream_content_bad": b"Example test2",
+ "path": "./tmp/example_tar/directory/file",
+ },
+ 5: {
+ "cursor": FakeCursor(5, "example_tar/symlinks/file_link", symlink_metadata),
+ "metadata": symlink_metadata,
+ "stream_content": b"../directory/file",
+ "stream_content_bad": b"",
+ "path": "./tmp/example_tar/symlinks/file_link",
+ },
+ 6: {
+ "cursor": FakeCursor(
+ 6, "example_tar/symlinks/directory_link", symlink_metadata
+ ),
+ "metadata": symlink_metadata,
+ "stream_content": b"../directory/",
+ "stream_content_bad": b"",
+ "path": "./tmp/example_tar/symlinks/directory_link",
+ },
+ }
+
+ def upload_from_stream(self, f, stream, metadata=None):
+ found = False
+ for i, v in self.tar_info.items():
+ if f == v["path"]:
+ assert metadata["type"] == v["metadata"]["type"]
+ assert stream.read() == BytesIO(v["stream_content"]).read()
+ stream.seek(0)
+ assert stream.read() != BytesIO(v["stream_content_bad"]).read()
+ found = True
+ continue
+ assert found
+
+ def find(self, type, no_cursor_timeout=True, sort=None):
+ list = []
+ for i, v in self.tar_info.items():
+ if type["metadata.type"] == "dir":
+ if v["metadata"] == self.directory_metadata:
+ list.append(v["cursor"])
+ else:
+ if v["metadata"] != self.directory_metadata:
+ list.append(v["cursor"])
+ return list
+
+ def download_to_stream(self, id, file_stream):
+ file_stream.write(BytesIO(self.tar_info[id]["stream_content"]).read())
+
+
+def test_file_extract():
+ tar_path = "tmp/Example.tar.gz"
+ folder_path = "tmp/example_tar"
+
+ # Generate package
+ subprocess.call(["rm", "-rf", "./tmp"])
+ subprocess.call(["mkdir", "-p", "{}/directory".format(folder_path)])
+ subprocess.call(["mkdir", "-p", "{}/symlinks".format(folder_path)])
+ p = Path("{}/directory/file".format(folder_path))
+ p.write_text("Example test")
+ os.symlink("../directory/file", "{}/symlinks/file_link".format(folder_path))
+ os.symlink("../directory/", "{}/symlinks/directory_link".format(folder_path))
+ if os.path.exists(tar_path):
+ os.remove(tar_path)
+ subprocess.call(["tar", "-czvf", tar_path, folder_path])
+
+ try:
+ tar = tarfile.open(tar_path, "r")
+ fs = FsMongo()
+ fs.fs = FakeFS()
+ fs.file_extract(compressed_object=tar, path=".")
+ finally:
+ os.remove(tar_path)
+ subprocess.call(["rm", "-rf", "./tmp"])
+
+
+def test_upload_local_fs():
+ path = "./tmp/"
+
+ subprocess.call(["rm", "-rf", path])
+ try:
+ fs = FsMongo()
+ fs.path = path
+ fs.fs = FakeFS()
+ fs.sync()
+ assert os.path.isdir("{}example_tar".format(path))
+ assert os.path.isdir("{}example_tar/directory".format(path))
+ assert os.path.isdir("{}example_tar/symlinks".format(path))
+ assert os.path.isfile("{}example_tar/directory/file".format(path))
+ assert os.path.islink("{}example_tar/symlinks/file_link".format(path))
+ assert os.path.islink("{}example_tar/symlinks/directory_link".format(path))
+ finally:
+ subprocess.call(["rm", "-rf", path])
+
+
+def test_upload_mongo_fs():
+ path = "./tmp/"
+
+ subprocess.call(["rm", "-rf", path])
+ try:
+ fs = FsMongo()
+ fs.path = path
+ fs.fs = Mock()
+ fs.fs.find.return_value = {}
+
+ file_content = "Test file content"
+
+ # Create local dir and upload content to fakefs
+ os.mkdir(path)
+ os.mkdir("{}example_local".format(path))
+ os.mkdir("{}example_local/directory".format(path))
+ with open(
+ "{}example_local/directory/test_file".format(path), "w+"
+ ) as test_file:
+ test_file.write(file_content)
+ fs.reverse_sync("example_local")
+
+ assert fs.fs.upload_from_stream.call_count == 2
+
+ # first call to upload_from_stream, dir_name
+ dir_name = "example_local/directory"
+ call_args_0 = fs.fs.upload_from_stream.call_args_list[0]
+ assert call_args_0[0][0] == dir_name
+ assert call_args_0[1].get("metadata").get("type") == "dir"
+
+ # second call to upload_from_stream, dir_name
+ file_name = "example_local/directory/test_file"
+ call_args_1 = fs.fs.upload_from_stream.call_args_list[1]
+ assert call_args_1[0][0] == file_name
+ assert call_args_1[1].get("metadata").get("type") == "file"
+
+ finally:
+ subprocess.call(["rm", "-rf", path])
+ pass