1 # Copyright 2019 Canonical
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
15 # For those usages not covered by the Apache License, Version 2.0 please
16 # contact: eduardo.sousa@canonical.com
20 from http
import HTTPStatus
21 from io
import BytesIO
, StringIO
26 from gridfs
import GridFSBucket
, errors
27 from osm_common
.fsbase
import FsBase
, FsException
28 from pymongo
import MongoClient
31 __author__
= "Eduardo Sousa <eduardo.sousa@canonical.com>"
34 class GridByteStream(BytesIO
):
35 def __init__(self
, filename
, fs
, mode
):
36 BytesIO
.__init
__(self
)
38 self
.filename
= filename
41 self
.file_type
= "file" # Set "file" as default file_type
45 def __initialize__(self
):
48 cursor
= self
.fs
.find({"filename": self
.filename
})
50 for requested_file
in cursor
:
51 exception_file
= next(cursor
, None)
54 raise FsException("Multiple files found", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
56 if requested_file
.metadata
["type"] in ("file", "sym"):
57 grid_file
= requested_file
58 self
.file_type
= requested_file
.metadata
["type"]
60 raise FsException("Type isn't file", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
63 self
._id
= grid_file
._id
64 self
.fs
.download_to_stream(self
._id
, self
)
71 super(GridByteStream
, self
).close()
75 self
.fs
.delete(self
._id
)
77 cursor
= self
.fs
.find({
78 "filename": self
.filename
.split("/")[0],
79 "metadata": {"type": "dir"}})
81 parent_dir
= next(cursor
, None)
84 parent_dir_name
= self
.filename
.split("/")[0]
85 self
.filename
= self
.filename
.replace(parent_dir_name
, parent_dir_name
[:-1], 1)
89 self
.fs
.upload_from_stream_with_id(
93 metadata
={"type": self
.file_type
}
96 self
.fs
.upload_from_stream(
99 metadata
={"type": self
.file_type
}
101 super(GridByteStream
, self
).close()
106 def __exit__(self
, exc_type
, exc_val
, exc_tb
):
110 class GridStringStream(StringIO
):
111 def __init__(self
, filename
, fs
, mode
):
112 StringIO
.__init
__(self
)
114 self
.filename
= filename
117 self
.file_type
= "file" # Set "file" as default file_type
119 self
.__initialize
__()
121 def __initialize__(self
):
124 cursor
= self
.fs
.find({"filename": self
.filename
})
126 for requested_file
in cursor
:
127 exception_file
= next(cursor
, None)
130 raise FsException("Multiple files found", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
132 if requested_file
.metadata
["type"] in ("file", "dir"):
133 grid_file
= requested_file
134 self
.file_type
= requested_file
.metadata
["type"]
136 raise FsException("File type isn't file", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
140 self
._id
= grid_file
._id
141 self
.fs
.download_to_stream(self
._id
, stream
)
143 self
.write(stream
.read().decode("utf-8"))
151 super(GridStringStream
, self
).close()
155 self
.fs
.delete(self
._id
)
157 cursor
= self
.fs
.find({
158 "filename": self
.filename
.split("/")[0],
159 "metadata": {"type": "dir"}})
161 parent_dir
= next(cursor
, None)
164 parent_dir_name
= self
.filename
.split("/")[0]
165 self
.filename
= self
.filename
.replace(parent_dir_name
, parent_dir_name
[:-1], 1)
169 stream
.write(self
.read().encode("utf-8"))
172 self
.fs
.upload_from_stream_with_id(
176 metadata
={"type": self
.file_type
}
179 self
.fs
.upload_from_stream(
182 metadata
={"type": self
.file_type
}
185 super(GridStringStream
, self
).close()
190 def __exit__(self
, exc_type
, exc_val
, exc_tb
):
194 class FsMongo(FsBase
):
196 def __init__(self
, logger_name
='fs', lock
=False):
197 super().__init
__(logger_name
, lock
)
202 def __update_local_fs(self
, from_path
=None):
203 dir_cursor
= self
.fs
.find({"metadata.type": "dir"}, no_cursor_timeout
=True)
205 for directory
in dir_cursor
:
206 if from_path
and not directory
.filename
.startswith(from_path
):
208 os
.makedirs(self
.path
+ directory
.filename
, exist_ok
=True)
210 file_cursor
= self
.fs
.find({"metadata.type": {"$in": ["file", "sym"]}}, no_cursor_timeout
=True)
212 for writing_file
in file_cursor
:
213 if from_path
and not writing_file
.filename
.startswith(from_path
):
215 file_path
= self
.path
+ writing_file
.filename
217 if writing_file
.metadata
["type"] == "sym":
219 self
.fs
.download_to_stream(writing_file
._id
, b
)
221 link
= b
.read().decode("utf-8")
226 if e
.errno
!= errno
.ENOENT
:
227 # This is probably permission denied or worse
229 os
.symlink(link
, file_path
)
231 with
open(file_path
, 'wb+') as file_stream
:
232 self
.fs
.download_to_stream(writing_file
._id
, file_stream
)
233 if "permissions" in writing_file
.metadata
:
234 os
.chmod(file_path
, writing_file
.metadata
["permissions"])
236 def get_params(self
):
237 return {"fs": "mongo", "path": self
.path
}
239 def fs_connect(self
, config
):
241 if "logger_name" in config
:
242 self
.logger
= logging
.getLogger(config
["logger_name"])
244 self
.path
= config
["path"]
246 raise FsException("Missing parameter \"path\"")
247 if not self
.path
.endswith("/"):
249 if not os
.path
.exists(self
.path
):
250 raise FsException("Invalid configuration param at '[storage]': path '{}' does not exist".format(
252 elif not os
.access(self
.path
, os
.W_OK
):
253 raise FsException("Invalid configuration param at '[storage]': path '{}' is not writable".format(
255 if all(key
in config
.keys() for key
in ["uri", "collection"]):
256 self
.client
= MongoClient(config
["uri"])
257 self
.fs
= GridFSBucket(self
.client
[config
["collection"]])
258 elif all(key
in config
.keys() for key
in ["host", "port", "collection"]):
259 self
.client
= MongoClient(config
["host"], config
["port"])
260 self
.fs
= GridFSBucket(self
.client
[config
["collection"]])
262 if "collection" not in config
.keys():
263 raise FsException("Missing parameter \"collection\"")
265 raise FsException("Missing parameters: \"uri\" or \"host\" + \"port\"")
268 except Exception as e
: # TODO refine
269 raise FsException(str(e
))
271 def fs_disconnect(self
):
274 def mkdir(self
, folder
):
276 Creates a folder or parent object location
278 :return: None or raises an exception
281 self
.fs
.upload_from_stream(
282 folder
, BytesIO(), metadata
={"type": "dir"})
283 except errors
.FileExists
: # make it idempotent
285 except Exception as e
:
286 raise FsException(str(e
), http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
288 def dir_rename(self
, src
, dst
):
290 Rename one directory name. If dst exist, it replaces (deletes) existing directory
291 :param src: source directory
292 :param dst: destination directory
293 :return: None or raises and exception
296 dst_cursor
= self
.fs
.find(
297 {"filename": {"$regex": "^{}(/|$)".format(dst
)}},
298 no_cursor_timeout
=True)
300 for dst_file
in dst_cursor
:
301 self
.fs
.delete(dst_file
._id
)
303 src_cursor
= self
.fs
.find(
304 {"filename": {"$regex": "^{}(/|$)".format(src
)}},
305 no_cursor_timeout
=True)
307 for src_file
in src_cursor
:
308 self
.fs
.rename(src_file
._id
, src_file
.filename
.replace(src
, dst
, 1))
309 except Exception as e
:
310 raise FsException(str(e
), http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
312 def file_exists(self
, storage
, mode
=None):
314 Indicates if "storage" file exist
315 :param storage: can be a str or a str list
316 :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
319 f
= storage
if isinstance(storage
, str) else "/".join(storage
)
321 cursor
= self
.fs
.find({"filename": f
})
323 for requested_file
in cursor
:
324 exception_file
= next(cursor
, None)
327 raise FsException("Multiple files found", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
329 # if no special mode is required just check it does exists
333 if requested_file
.metadata
["type"] == mode
:
336 if requested_file
.metadata
["type"] == "sym" and mode
== "file":
341 def file_size(self
, storage
):
344 :param storage: can be a str or a str list
347 f
= storage
if isinstance(storage
, str) else "/".join(storage
)
349 cursor
= self
.fs
.find({"filename": f
})
351 for requested_file
in cursor
:
352 exception_file
= next(cursor
, None)
355 raise FsException("Multiple files found", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
357 return requested_file
.length
359 def file_extract(self
, tar_object
, path
):
362 :param tar_object: object of type tar
363 :param path: can be a str or a str list, or a tar object where to extract the tar_object
366 f
= path
if isinstance(path
, str) else "/".join(path
)
368 for member
in tar_object
.getmembers():
370 stream
= tar_object
.extractfile(member
)
372 stream
= BytesIO(member
.linkname
.encode("utf-8"))
385 "permissions": member
.mode
388 self
.fs
.upload_from_stream(
389 f
+ "/" + member
.name
,
396 def file_open(self
, storage
, mode
):
399 :param storage: can be a str or list of str
400 :param mode: file mode
404 f
= storage
if isinstance(storage
, str) else "/".join(storage
)
407 return GridByteStream(f
, self
.fs
, mode
)
409 return GridStringStream(f
, self
.fs
, mode
)
410 except errors
.NoFile
:
411 raise FsException("File {} does not exist".format(f
), http_code
=HTTPStatus
.NOT_FOUND
)
413 raise FsException("File {} cannot be opened".format(f
), http_code
=HTTPStatus
.BAD_REQUEST
)
415 def dir_ls(self
, storage
):
417 return folder content
418 :param storage: can be a str or list of str
419 :return: folder content
422 f
= storage
if isinstance(storage
, str) else "/".join(storage
)
425 dir_cursor
= self
.fs
.find({"filename": f
})
426 for requested_dir
in dir_cursor
:
427 exception_dir
= next(dir_cursor
, None)
430 raise FsException("Multiple directories found", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
432 if requested_dir
.metadata
["type"] != "dir":
433 raise FsException("File {} does not exist".format(f
), http_code
=HTTPStatus
.NOT_FOUND
)
435 files_cursor
= self
.fs
.find({"filename": {"$regex": "^{}/([^/])*".format(f
)}})
436 for children_file
in files_cursor
:
437 files
+= [children_file
.filename
.replace(f
+ '/', '', 1)]
441 raise FsException("File {} cannot be opened".format(f
), http_code
=HTTPStatus
.BAD_REQUEST
)
443 def file_delete(self
, storage
, ignore_non_exist
=False):
445 Delete storage content recursively
446 :param storage: can be a str or list of str
447 :param ignore_non_exist: not raise exception if storage does not exist
451 f
= storage
if isinstance(storage
, str) else "/".join(storage
)
453 file_cursor
= self
.fs
.find({"filename": f
})
455 for requested_file
in file_cursor
:
457 exception_file
= next(file_cursor
, None)
460 raise FsException("Multiple files found", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
462 if requested_file
.metadata
["type"] == "dir":
463 dir_cursor
= self
.fs
.find({"filename": {"$regex": "^{}".format(f
)}})
465 for tmp
in dir_cursor
:
466 self
.fs
.delete(tmp
._id
)
468 self
.fs
.delete(requested_file
._id
)
469 if not found
and not ignore_non_exist
:
470 raise FsException("File {} does not exist".format(storage
), http_code
=HTTPStatus
.NOT_FOUND
)
472 raise FsException("File {} cannot be deleted: {}".format(f
, e
), http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
474 def sync(self
, from_path
=None):
476 Sync from FSMongo to local storage
477 :param from_path: if supplied, only copy content from this path, not all
481 if os
.path
.isabs(from_path
):
482 from_path
= os
.path
.relpath(from_path
, self
.path
)
483 self
.__update
_local
_fs
(from_path
=from_path
)
485 def _update_mongo_fs(self
, from_path
):
487 os_path
= self
.path
+ from_path
489 # Obtain list of files and dirs in filesystem
491 for root
, dirs
, files
in os
.walk(os_path
):
494 "filename": os
.path
.join(root
, folder
),
497 members
.append(member
)
499 filename
= os
.path
.join(root
, file)
500 if os
.path
.islink(filename
):
505 "filename": os
.path
.join(root
, file),
508 members
.append(member
)
510 # Obtain files in mongo dict
511 remote_files
= self
._get
_mongo
_files
(from_path
)
513 # Upload members if they do not exists or have been modified
514 # We will do this for performance (avoid updating unmodified files) and to avoid
515 # updating a file with an older one in case there are two sources for synchronization
516 # in high availability scenarios
517 for member
in members
:
519 mask
= int(oct(os
.stat(member
["filename"]).st_mode
)[-3:], 8)
521 # convert to relative path
522 rel_filename
= os
.path
.relpath(member
["filename"], self
.path
)
523 last_modified_date
= datetime
.datetime
.fromtimestamp(os
.path
.getmtime(member
["filename"]))
525 remote_file
= remote_files
.get(rel_filename
)
526 upload_date
= remote_file
[0].uploadDate
if remote_file
else datetime
.datetime
.min
527 # remove processed files from dict
528 remote_files
.pop(rel_filename
, None)
530 if last_modified_date
>= upload_date
:
535 file_type
= member
["type"]
536 if file_type
== "dir":
538 elif file_type
== "sym":
539 stream
= BytesIO(os
.readlink(member
["filename"]).encode("utf-8"))
541 fh
= open(member
["filename"], "rb")
542 stream
= BytesIO(fh
.read())
549 self
.fs
.upload_from_stream(
557 for file in remote_file
:
558 self
.fs
.delete(file._id
)
565 # delete files that are not any more in local fs
566 for remote_file
in remote_files
.values():
567 for file in remote_file
:
568 self
.fs
.delete(file._id
)
570 def _get_mongo_files(self
, from_path
=None):
573 file_cursor
= self
.fs
.find(no_cursor_timeout
=True, sort
=[('uploadDate', -1)])
574 for file in file_cursor
:
575 if from_path
and not file.filename
.startswith(from_path
):
577 if file.filename
in file_dict
:
578 file_dict
[file.filename
].append(file)
580 file_dict
[file.filename
] = [file]
583 def reverse_sync(self
, from_path
: str):
585 Sync from local storage to FSMongo
586 :param from_path: base directory to upload content to mongo fs
589 if os
.path
.isabs(from_path
):
590 from_path
= os
.path
.relpath(from_path
, self
.path
)
591 self
._update
_mongo
_fs
(from_path
=from_path
)