import logging
from http import HTTPStatus
import os
-import stat
from osm_common.fsbase import FsBase, FsException
__author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>"
for writing_file in file_cursor:
file_path = self.path + writing_file.filename
- file_stream = open(file_path, 'wb+')
- self.fs.download_to_stream(writing_file._id, file_stream)
- file_stream.close()
- if "permissions" in writing_file.metadata:
- if writing_file.metadata["type"] == "sym":
- os.chmod(
- file_path,
- writing_file.metadata["permissions"] | stat.S_IFLNK
- )
- else:
+
+ if writing_file.metadata["type"] == "sym":
+ with BytesIO() as b:
+ self.fs.download_to_stream(writing_file._id, b)
+ b.seek(0)
+ link = b.read().decode("utf-8")
+ os.symlink(link, file_path)
+ else:
+ with open(file_path, 'wb+') as file_stream:
+ self.fs.download_to_stream(writing_file._id, file_stream)
+ if "permissions" in writing_file.metadata:
os.chmod(file_path, writing_file.metadata["permissions"])
def get_params(self):
for member in tar_object.getmembers():
if member.isfile():
stream = tar_object.extractfile(member)
+ elif member.issym():
+ stream = BytesIO(member.linkname.encode("utf-8"))
else:
stream = BytesIO()
import logging
import pytest
import tempfile
+import tarfile
+import os
+import subprocess
from pymongo import MongoClient
from gridfs import GridFSBucket
+from io import BytesIO
+
from osm_common.fsbase import FsException
from osm_common.fsmongo import FsMongo
+from pathlib import Path
__author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>"
def test_fs_disconnect(fs_mongo):
fs_mongo.fs_disconnect()
+
+
+# Example.tar.gz
+# example_tar/
+# ├── directory
+# │ └── file
+# └── symlinks
+# ├── directory_link -> ../directory/
+# └── file_link -> ../directory/file
+class FakeCursor:
+ def __init__(self, id, filename, metadata):
+ self._id = id
+ self.filename = filename
+ self.metadata = metadata
+
+
+class FakeFS:
+ directory_metadata = {'type': 'dir', 'permissions': 509}
+ file_metadata = {'type': 'file', 'permissions': 436}
+ symlink_metadata = {'type': 'sym', 'permissions': 511}
+
+ tar_info = {
+ 1: {
+ "cursor": FakeCursor(1, 'example_tar', directory_metadata),
+ "metadata": directory_metadata,
+ "stream_content": b'',
+ "stream_content_bad": b"Something",
+ "path": './tmp/example_tar',
+ },
+ 2: {
+ "cursor": FakeCursor(2, 'example_tar/directory', directory_metadata),
+ "metadata": directory_metadata,
+ "stream_content": b'',
+ "stream_content_bad": b"Something",
+ "path": './tmp/example_tar/directory',
+ },
+ 3: {
+ "cursor": FakeCursor(3, 'example_tar/symlinks', directory_metadata),
+ "metadata": directory_metadata,
+ "stream_content": b'',
+ "stream_content_bad": b"Something",
+ "path": './tmp/example_tar/symlinks',
+ },
+ 4: {
+ "cursor": FakeCursor(4, 'example_tar/directory/file', file_metadata),
+ "metadata": file_metadata,
+ "stream_content": b"Example test",
+ "stream_content_bad": b"Example test2",
+ "path": './tmp/example_tar/directory/file',
+ },
+ 5: {
+ "cursor": FakeCursor(5, 'example_tar/symlinks/file_link', symlink_metadata),
+ "metadata": symlink_metadata,
+ "stream_content": b"../directory/file",
+ "stream_content_bad": b"",
+ "path": './tmp/example_tar/symlinks/file_link',
+ },
+ 6: {
+ "cursor": FakeCursor(6, 'example_tar/symlinks/directory_link', symlink_metadata),
+ "metadata": symlink_metadata,
+ "stream_content": b"../directory/",
+ "stream_content_bad": b"",
+ "path": './tmp/example_tar/symlinks/directory_link',
+ }
+ }
+
+ def upload_from_stream(self, f, stream, metadata=None):
+ found = False
+ for i, v in self.tar_info.items():
+ if f == v["path"]:
+ assert metadata["type"] == v["metadata"]["type"]
+ assert stream.read() == BytesIO(v["stream_content"]).read()
+ stream.seek(0)
+ assert stream.read() != BytesIO(v["stream_content_bad"]).read()
+ found = True
+ continue
+ assert found
+
+ def find(self, type, no_cursor_timeout=True):
+ list = []
+ for i, v in self.tar_info.items():
+ if type["metadata.type"] == "dir":
+ if v["metadata"] == self.directory_metadata:
+ list.append(v["cursor"])
+ else:
+ if v["metadata"] != self.directory_metadata:
+ list.append(v["cursor"])
+ return list
+
+ def download_to_stream(self, id, file_stream):
+ file_stream.write(BytesIO(self.tar_info[id]["stream_content"]).read())
+
+
+def test_file_extract():
+ tar_path = "tmp/Example.tar.gz"
+ folder_path = "tmp/example_tar"
+
+ # Generate package
+ subprocess.call(["rm", "-rf", "./tmp"])
+ subprocess.call(["mkdir", "-p", "{}/directory".format(folder_path)])
+ subprocess.call(["mkdir", "-p", "{}/symlinks".format(folder_path)])
+ p = Path("{}/directory/file".format(folder_path))
+ p.write_text("Example test")
+ os.symlink("../directory/file", "{}/symlinks/file_link".format(folder_path))
+ os.symlink("../directory/", "{}/symlinks/directory_link".format(folder_path))
+ if os.path.exists(tar_path):
+ os.remove(tar_path)
+ subprocess.call(["tar", "-czvf", tar_path, folder_path])
+
+ try:
+ tar = tarfile.open(tar_path, "r")
+ fs = FsMongo()
+ fs.fs = FakeFS()
+ fs.file_extract(tar_object=tar, path=".")
+ finally:
+ os.remove(tar_path)
+ subprocess.call(["rm", "-rf", "./tmp"])
+
+
+def test_upload_local_fs():
+ path = "./tmp/"
+
+ subprocess.call(["rm", "-rf", path])
+ try:
+ fs = FsMongo()
+ fs.path = path
+ fs.fs = FakeFS()
+ fs.sync()
+ assert os.path.isdir("{}example_tar".format(path))
+ assert os.path.isdir("{}example_tar/directory".format(path))
+ assert os.path.isdir("{}example_tar/symlinks".format(path))
+ assert os.path.isfile("{}example_tar/directory/file".format(path))
+ assert os.path.islink("{}example_tar/symlinks/file_link".format(path))
+ assert os.path.islink("{}example_tar/symlinks/directory_link".format(path))
+ finally:
+ subprocess.call(["rm", "-rf", path])