projects
/
osm
/
common.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Code Cleanup and adding unit tests
[osm/common.git]
/
osm_common
/
fsmongo.py
diff --git
a/osm_common/fsmongo.py
b/osm_common/fsmongo.py
index
ff37c42
..
727410e
100644
(file)
--- a/
osm_common/fsmongo.py
+++ b/
osm_common/fsmongo.py
@@
-15,15
+15,16
@@
# For those usages not covered by the Apache License, Version 2.0 please
# contact: eduardo.sousa@canonical.com
##
# For those usages not covered by the Apache License, Version 2.0 please
# contact: eduardo.sousa@canonical.com
##
-
+import datetime
import errno
from http import HTTPStatus
from io import BytesIO, StringIO
import logging
import os
import errno
from http import HTTPStatus
from io import BytesIO, StringIO
import logging
import os
-import datetime
+import tarfile
+import zipfile
-from gridfs import
GridFSBucket, errors
+from gridfs import
errors, GridFSBucket
from osm_common.fsbase import FsBase, FsException
from pymongo import MongoClient
from osm_common.fsbase import FsBase, FsException
from pymongo import MongoClient
@@
-203,10
+204,14
@@
class FsMongo(FsBase):
def __update_local_fs(self, from_path=None):
dir_cursor = self.fs.find({"metadata.type": "dir"}, no_cursor_timeout=True)
def __update_local_fs(self, from_path=None):
dir_cursor = self.fs.find({"metadata.type": "dir"}, no_cursor_timeout=True)
+ valid_paths = []
+
for directory in dir_cursor:
if from_path and not directory.filename.startswith(from_path):
continue
for directory in dir_cursor:
if from_path and not directory.filename.startswith(from_path):
continue
+ self.logger.debug("Making dir {}".format(self.path + directory.filename))
os.makedirs(self.path + directory.filename, exist_ok=True)
os.makedirs(self.path + directory.filename, exist_ok=True)
+ valid_paths.append(self.path + directory.filename)
file_cursor = self.fs.find(
{"metadata.type": {"$in": ["file", "sym"]}}, no_cursor_timeout=True
file_cursor = self.fs.find(
{"metadata.type": {"$in": ["file", "sym"]}}, no_cursor_timeout=True
@@
-224,6
+229,7
@@
class FsMongo(FsBase):
link = b.read().decode("utf-8")
try:
link = b.read().decode("utf-8")
try:
+ self.logger.debug("Sync removing {}".format(file_path))
os.remove(file_path)
except OSError as e:
if e.errno != errno.ENOENT:
os.remove(file_path)
except OSError as e:
if e.errno != errno.ENOENT:
@@
-231,7
+237,12
@@
class FsMongo(FsBase):
raise
os.symlink(link, file_path)
else:
raise
os.symlink(link, file_path)
else:
+ folder = os.path.dirname(file_path)
+ if folder not in valid_paths:
+ self.logger.debug("Sync local directory {}".format(file_path))
+ os.makedirs(folder, exist_ok=True)
with open(file_path, "wb+") as file_stream:
with open(file_path, "wb+") as file_stream:
+ self.logger.debug("Sync download {}".format(file_path))
self.fs.download_to_stream(writing_file._id, file_stream)
if "permissions" in writing_file.metadata:
os.chmod(file_path, writing_file.metadata["permissions"])
self.fs.download_to_stream(writing_file._id, file_stream)
if "permissions" in writing_file.metadata:
os.chmod(file_path, writing_file.metadata["permissions"])
@@
-264,14
+275,11
@@
class FsMongo(FsBase):
if all(key in config.keys() for key in ["uri", "collection"]):
self.client = MongoClient(config["uri"])
self.fs = GridFSBucket(self.client[config["collection"]])
if all(key in config.keys() for key in ["uri", "collection"]):
self.client = MongoClient(config["uri"])
self.fs = GridFSBucket(self.client[config["collection"]])
- elif all(key in config.keys() for key in ["host", "port", "collection"]):
- self.client = MongoClient(config["host"], config["port"])
- self.fs = GridFSBucket(self.client[config["collection"]])
else:
if "collection" not in config.keys():
raise FsException('Missing parameter "collection"')
else:
else:
if "collection" not in config.keys():
raise FsException('Missing parameter "collection"')
else:
- raise FsException('Missing parameters: "uri"
or "host" + "port"
')
+ raise FsException('Missing parameters: "uri"')
except FsException:
raise
except Exception as e: # TODO refine
except FsException:
raise
except Exception as e: # TODO refine
@@
-286,6
+294,7
@@
class FsMongo(FsBase):
:param folder:
:return: None or raises an exception
"""
:param folder:
:return: None or raises an exception
"""
+ folder = folder.rstrip("/")
try:
self.fs.upload_from_stream(folder, BytesIO(), metadata={"type": "dir"})
except errors.FileExists: # make it idempotent
try:
self.fs.upload_from_stream(folder, BytesIO(), metadata={"type": "dir"})
except errors.FileExists: # make it idempotent
@@
-300,6
+309,9
@@
class FsMongo(FsBase):
:param dst: destination directory
:return: None or raises and exception
"""
:param dst: destination directory
:return: None or raises and exception
"""
+ dst = dst.rstrip("/")
+ src = src.rstrip("/")
+
try:
dst_cursor = self.fs.find(
{"filename": {"$regex": "^{}(/|$)".format(dst)}}, no_cursor_timeout=True
try:
dst_cursor = self.fs.find(
{"filename": {"$regex": "^{}(/|$)".format(dst)}}, no_cursor_timeout=True
@@
-325,6
+337,7
@@
class FsMongo(FsBase):
:return: True, False
"""
f = storage if isinstance(storage, str) else "/".join(storage)
:return: True, False
"""
f = storage if isinstance(storage, str) else "/".join(storage)
+ f = f.rstrip("/")
cursor = self.fs.find({"filename": f})
cursor = self.fs.find({"filename": f})
@@
-336,6
+349,8
@@
class FsMongo(FsBase):
"Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
)
"Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
)
+ self.logger.debug("Entry {} metadata {}".format(f, requested_file.metadata))
+
# if no special mode is required just check it does exists
if not mode:
return True
# if no special mode is required just check it does exists
if not mode:
return True
@@
-355,6
+370,7
@@
class FsMongo(FsBase):
:return: file size
"""
f = storage if isinstance(storage, str) else "/".join(storage)
:return: file size
"""
f = storage if isinstance(storage, str) else "/".join(storage)
+ f = f.rstrip("/")
cursor = self.fs.find({"filename": f})
cursor = self.fs.find({"filename": f})
@@
-368,35
+384,63
@@
class FsMongo(FsBase):
return requested_file.length
return requested_file.length
- def file_extract(self,
tar
_object, path):
+ def file_extract(self,
compressed
_object, path):
"""
extract a tar file
"""
extract a tar file
- :param
tar_object: object of type tar
+ :param
compressed_object: object of type tar or zip
:param path: can be a str or a str list, or a tar object where to extract the tar_object
:return: None
"""
f = path if isinstance(path, str) else "/".join(path)
:param path: can be a str or a str list, or a tar object where to extract the tar_object
:return: None
"""
f = path if isinstance(path, str) else "/".join(path)
+ f = f.rstrip("/")
+
+ if type(compressed_object) is tarfile.TarFile:
+ for member in compressed_object.getmembers():
+ if member.isfile():
+ stream = compressed_object.extractfile(member)
+ elif member.issym():
+ stream = BytesIO(member.linkname.encode("utf-8"))
+ else:
+ stream = BytesIO()
- for member in tar_object.getmembers():
- if member.isfile():
- stream = tar_object.extractfile(member)
- elif member.issym():
- stream = BytesIO(member.linkname.encode("utf-8"))
- else:
- stream = BytesIO()
+ if member.isfile():
+ file_type = "file"
+ elif member.issym():
+ file_type = "sym"
+ else:
+ file_type = "dir"
- if member.isfile():
- file_type = "file"
- elif member.issym():
- file_type = "sym"
- else:
- file_type = "dir"
+ metadata = {"type": file_type, "permissions": member.mode}
+ member.name = member.name.rstrip("/")
+
+ self.logger.debug("Uploading {}/{}".format(f, member.name))
+ self.fs.upload_from_stream(
+ f + "/" + member.name, stream, metadata=metadata
+ )
- metadata = {"type": file_type, "permissions": member.mode}
+ stream.close()
+ elif type(compressed_object) is zipfile.ZipFile:
+ for member in compressed_object.infolist():
+ if member.is_dir():
+ stream = BytesIO()
+ else:
+ stream = compressed_object.read(member)
- self.fs.upload_from_stream(f + "/" + member.name, stream, metadata=metadata)
+ if member.is_dir():
+ file_type = "dir"
+ else:
+ file_type = "file"
- stream.close()
+ metadata = {"type": file_type}
+ member.filename = member.filename.rstrip("/")
+
+ self.logger.debug("Uploading {}/{}".format(f, member.filename))
+ self.fs.upload_from_stream(
+ f + "/" + member.filename, stream, metadata=metadata
+ )
+
+ if member.is_dir():
+ stream.close()
def file_open(self, storage, mode):
"""
def file_open(self, storage, mode):
"""
@@
-407,6
+451,7
@@
class FsMongo(FsBase):
"""
try:
f = storage if isinstance(storage, str) else "/".join(storage)
"""
try:
f = storage if isinstance(storage, str) else "/".join(storage)
+ f = f.rstrip("/")
if "b" in mode:
return GridByteStream(f, self.fs, mode)
if "b" in mode:
return GridByteStream(f, self.fs, mode)
@@
-429,6
+474,7
@@
class FsMongo(FsBase):
"""
try:
f = storage if isinstance(storage, str) else "/".join(storage)
"""
try:
f = storage if isinstance(storage, str) else "/".join(storage)
+ f = f.rstrip("/")
files = []
dir_cursor = self.fs.find({"filename": f})
files = []
dir_cursor = self.fs.find({"filename": f})
@@
-447,6
+493,9
@@
class FsMongo(FsBase):
http_code=HTTPStatus.NOT_FOUND,
)
http_code=HTTPStatus.NOT_FOUND,
)
+ if f.endswith("/"):
+ f = f[:-1]
+
files_cursor = self.fs.find(
{"filename": {"$regex": "^{}/([^/])*".format(f)}}
)
files_cursor = self.fs.find(
{"filename": {"$regex": "^{}/([^/])*".format(f)}}
)
@@
-468,6
+517,7
@@
class FsMongo(FsBase):
"""
try:
f = storage if isinstance(storage, str) else "/".join(storage)
"""
try:
f = storage if isinstance(storage, str) else "/".join(storage)
+ f = f.rstrip("/")
file_cursor = self.fs.find({"filename": f})
found = False
file_cursor = self.fs.find({"filename": f})
found = False
@@
-476,18
+526,27
@@
class FsMongo(FsBase):
exception_file = next(file_cursor, None)
if exception_file:
exception_file = next(file_cursor, None)
if exception_file:
+ self.logger.error(
+ "Cannot delete duplicate file: {} and {}".format(
+ requested_file.filename, exception_file.filename
+ )
+ )
raise FsException(
"Multiple files found",
http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
)
if requested_file.metadata["type"] == "dir":
raise FsException(
"Multiple files found",
http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
)
if requested_file.metadata["type"] == "dir":
- dir_cursor = self.fs.find({"filename": {"$regex": "^{}".format(f)}})
+ dir_cursor = self.fs.find(
+ {"filename": {"$regex": "^{}/".format(f)}}
+ )
for tmp in dir_cursor:
for tmp in dir_cursor:
+ self.logger.debug("Deleting {}".format(tmp.filename))
self.fs.delete(tmp._id)
self.fs.delete(tmp._id)
- else:
- self.fs.delete(requested_file._id)
+
+ self.logger.debug("Deleting {}".format(requested_file.filename))
+ self.fs.delete(requested_file._id)
if not found and not ignore_non_exist:
raise FsException(
"File {} does not exist".format(storage),
if not found and not ignore_non_exist:
raise FsException(
"File {} does not exist".format(storage),
@@
-519,6
+578,8
@@
class FsMongo(FsBase):
for root, dirs, files in os.walk(os_path):
for folder in dirs:
member = {"filename": os.path.join(root, folder), "type": "dir"}
for root, dirs, files in os.walk(os_path):
for folder in dirs:
member = {"filename": os.path.join(root, folder), "type": "dir"}
+ if os.path.islink(member["filename"]):
+ member["type"] = "sym"
members.append(member)
for file in files:
filename = os.path.join(root, file)
members.append(member)
for file in files:
filename = os.path.join(root, file)
@@
-571,11
+632,13
@@
class FsMongo(FsBase):
metadata = {"type": file_type, "permissions": mask}
metadata = {"type": file_type, "permissions": mask}
+ self.logger.debug("Sync upload {}".format(rel_filename))
self.fs.upload_from_stream(rel_filename, stream, metadata=metadata)
# delete old files
if remote_file:
for file in remote_file:
self.fs.upload_from_stream(rel_filename, stream, metadata=metadata)
# delete old files
if remote_file:
for file in remote_file:
+ self.logger.debug("Sync deleting {}".format(file.filename))
self.fs.delete(file._id)
finally:
if fh:
self.fs.delete(file._id)
finally:
if fh: