7fb071ac71790fb50c9edf35347cfeec49206bfc
1 # Copyright 2019 Canonical
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
15 # For those usages not covered by the Apache License, Version 2.0 please
16 # contact: eduardo.sousa@canonical.com
20 from http
import HTTPStatus
21 from io
import BytesIO
, StringIO
28 from gridfs
import GridFSBucket
, errors
29 from osm_common
.fsbase
import FsBase
, FsException
30 from pymongo
import MongoClient
33 __author__
= "Eduardo Sousa <eduardo.sousa@canonical.com>"
36 class GridByteStream(BytesIO
):
37 def __init__(self
, filename
, fs
, mode
):
38 BytesIO
.__init
__(self
)
40 self
.filename
= filename
43 self
.file_type
= "file" # Set "file" as default file_type
47 def __initialize__(self
):
50 cursor
= self
.fs
.find({"filename": self
.filename
})
52 for requested_file
in cursor
:
53 exception_file
= next(cursor
, None)
57 "Multiple files found", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
60 if requested_file
.metadata
["type"] in ("file", "sym"):
61 grid_file
= requested_file
62 self
.file_type
= requested_file
.metadata
["type"]
65 "Type isn't file", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
69 self
._id
= grid_file
._id
70 self
.fs
.download_to_stream(self
._id
, self
)
77 super(GridByteStream
, self
).close()
81 self
.fs
.delete(self
._id
)
83 cursor
= self
.fs
.find(
84 {"filename": self
.filename
.split("/")[0], "metadata": {"type": "dir"}}
87 parent_dir
= next(cursor
, None)
90 parent_dir_name
= self
.filename
.split("/")[0]
91 self
.filename
= self
.filename
.replace(
92 parent_dir_name
, parent_dir_name
[:-1], 1
97 self
.fs
.upload_from_stream_with_id(
98 self
._id
, self
.filename
, self
, metadata
={"type": self
.file_type
}
101 self
.fs
.upload_from_stream(
102 self
.filename
, self
, metadata
={"type": self
.file_type
}
104 super(GridByteStream
, self
).close()
109 def __exit__(self
, exc_type
, exc_val
, exc_tb
):
113 class GridStringStream(StringIO
):
114 def __init__(self
, filename
, fs
, mode
):
115 StringIO
.__init
__(self
)
117 self
.filename
= filename
120 self
.file_type
= "file" # Set "file" as default file_type
122 self
.__initialize
__()
124 def __initialize__(self
):
127 cursor
= self
.fs
.find({"filename": self
.filename
})
129 for requested_file
in cursor
:
130 exception_file
= next(cursor
, None)
134 "Multiple files found", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
137 if requested_file
.metadata
["type"] in ("file", "dir"):
138 grid_file
= requested_file
139 self
.file_type
= requested_file
.metadata
["type"]
142 "File type isn't file", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
147 self
._id
= grid_file
._id
148 self
.fs
.download_to_stream(self
._id
, stream
)
150 self
.write(stream
.read().decode("utf-8"))
158 super(GridStringStream
, self
).close()
162 self
.fs
.delete(self
._id
)
164 cursor
= self
.fs
.find(
165 {"filename": self
.filename
.split("/")[0], "metadata": {"type": "dir"}}
168 parent_dir
= next(cursor
, None)
171 parent_dir_name
= self
.filename
.split("/")[0]
172 self
.filename
= self
.filename
.replace(
173 parent_dir_name
, parent_dir_name
[:-1], 1
178 stream
.write(self
.read().encode("utf-8"))
181 self
.fs
.upload_from_stream_with_id(
182 self
._id
, self
.filename
, stream
, metadata
={"type": self
.file_type
}
185 self
.fs
.upload_from_stream(
186 self
.filename
, stream
, metadata
={"type": self
.file_type
}
189 super(GridStringStream
, self
).close()
194 def __exit__(self
, exc_type
, exc_val
, exc_tb
):
198 class FsMongo(FsBase
):
199 def __init__(self
, logger_name
="fs", lock
=False):
200 super().__init
__(logger_name
, lock
)
205 def __update_local_fs(self
, from_path
=None):
206 dir_cursor
= self
.fs
.find({"metadata.type": "dir"}, no_cursor_timeout
=True)
208 for directory
in dir_cursor
:
209 if from_path
and not directory
.filename
.startswith(from_path
):
211 os
.makedirs(self
.path
+ directory
.filename
, exist_ok
=True)
213 file_cursor
= self
.fs
.find(
214 {"metadata.type": {"$in": ["file", "sym"]}}, no_cursor_timeout
=True
217 for writing_file
in file_cursor
:
218 if from_path
and not writing_file
.filename
.startswith(from_path
):
220 file_path
= self
.path
+ writing_file
.filename
222 if writing_file
.metadata
["type"] == "sym":
224 self
.fs
.download_to_stream(writing_file
._id
, b
)
226 link
= b
.read().decode("utf-8")
231 if e
.errno
!= errno
.ENOENT
:
232 # This is probably permission denied or worse
234 os
.symlink(link
, file_path
)
236 with
open(file_path
, "wb+") as file_stream
:
237 self
.fs
.download_to_stream(writing_file
._id
, file_stream
)
238 if "permissions" in writing_file
.metadata
:
239 os
.chmod(file_path
, writing_file
.metadata
["permissions"])
241 def get_params(self
):
242 return {"fs": "mongo", "path": self
.path
}
244 def fs_connect(self
, config
):
246 if "logger_name" in config
:
247 self
.logger
= logging
.getLogger(config
["logger_name"])
249 self
.path
= config
["path"]
251 raise FsException('Missing parameter "path"')
252 if not self
.path
.endswith("/"):
254 if not os
.path
.exists(self
.path
):
256 "Invalid configuration param at '[storage]': path '{}' does not exist".format(
260 elif not os
.access(self
.path
, os
.W_OK
):
262 "Invalid configuration param at '[storage]': path '{}' is not writable".format(
266 if all(key
in config
.keys() for key
in ["uri", "collection"]):
267 self
.client
= MongoClient(config
["uri"])
268 self
.fs
= GridFSBucket(self
.client
[config
["collection"]])
269 elif all(key
in config
.keys() for key
in ["host", "port", "collection"]):
270 self
.client
= MongoClient(config
["host"], config
["port"])
271 self
.fs
= GridFSBucket(self
.client
[config
["collection"]])
273 if "collection" not in config
.keys():
274 raise FsException('Missing parameter "collection"')
276 raise FsException('Missing parameters: "uri" or "host" + "port"')
279 except Exception as e
: # TODO refine
280 raise FsException(str(e
))
282 def fs_disconnect(self
):
285 def mkdir(self
, folder
):
287 Creates a folder or parent object location
289 :return: None or raises an exception
292 self
.fs
.upload_from_stream(folder
, BytesIO(), metadata
={"type": "dir"})
293 except errors
.FileExists
: # make it idempotent
295 except Exception as e
:
296 raise FsException(str(e
), http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
298 def dir_rename(self
, src
, dst
):
300 Rename one directory name. If dst exist, it replaces (deletes) existing directory
301 :param src: source directory
302 :param dst: destination directory
303 :return: None or raises and exception
306 dst_cursor
= self
.fs
.find(
307 {"filename": {"$regex": "^{}(/|$)".format(dst
)}}, no_cursor_timeout
=True
310 for dst_file
in dst_cursor
:
311 self
.fs
.delete(dst_file
._id
)
313 src_cursor
= self
.fs
.find(
314 {"filename": {"$regex": "^{}(/|$)".format(src
)}}, no_cursor_timeout
=True
317 for src_file
in src_cursor
:
318 self
.fs
.rename(src_file
._id
, src_file
.filename
.replace(src
, dst
, 1))
319 except Exception as e
:
320 raise FsException(str(e
), http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
322 def file_exists(self
, storage
, mode
=None):
324 Indicates if "storage" file exist
325 :param storage: can be a str or a str list
326 :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
329 f
= storage
if isinstance(storage
, str) else "/".join(storage
)
331 cursor
= self
.fs
.find({"filename": f
})
333 for requested_file
in cursor
:
334 exception_file
= next(cursor
, None)
338 "Multiple files found", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
341 print(requested_file
.metadata
)
343 # if no special mode is required just check it does exists
347 if requested_file
.metadata
["type"] == mode
:
350 if requested_file
.metadata
["type"] == "sym" and mode
== "file":
355 def file_size(self
, storage
):
358 :param storage: can be a str or a str list
361 f
= storage
if isinstance(storage
, str) else "/".join(storage
)
363 cursor
= self
.fs
.find({"filename": f
})
365 for requested_file
in cursor
:
366 exception_file
= next(cursor
, None)
370 "Multiple files found", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
373 return requested_file
.length
375 def file_extract(self
, compressed_object
, path
):
378 :param compressed_object: object of type tar or zip
379 :param path: can be a str or a str list, or a tar object where to extract the tar_object
382 f
= path
if isinstance(path
, str) else "/".join(path
)
384 if type(compressed_object
) is tarfile
.TarFile
:
385 for member
in compressed_object
.getmembers():
387 stream
= compressed_object
.extractfile(member
)
389 stream
= BytesIO(member
.linkname
.encode("utf-8"))
400 metadata
= {"type": file_type
, "permissions": member
.mode
}
402 self
.fs
.upload_from_stream(
403 f
+ "/" + member
.name
, stream
, metadata
=metadata
407 elif type(compressed_object
) is zipfile
.ZipFile
:
408 for member
in compressed_object
.infolist():
412 stream
= compressed_object
.read(member
)
419 metadata
= {"type": file_type
}
421 print("Now uploading...")
422 print(f
+ "/" + member
.filename
)
423 self
.fs
.upload_from_stream(
424 f
+ "/" + member
.filename
, stream
, metadata
=metadata
430 def file_open(self
, storage
, mode
):
433 :param storage: can be a str or list of str
434 :param mode: file mode
438 f
= storage
if isinstance(storage
, str) else "/".join(storage
)
441 return GridByteStream(f
, self
.fs
, mode
)
443 return GridStringStream(f
, self
.fs
, mode
)
444 except errors
.NoFile
:
446 "File {} does not exist".format(f
), http_code
=HTTPStatus
.NOT_FOUND
450 "File {} cannot be opened".format(f
), http_code
=HTTPStatus
.BAD_REQUEST
453 def dir_ls(self
, storage
):
455 return folder content
456 :param storage: can be a str or list of str
457 :return: folder content
460 f
= storage
if isinstance(storage
, str) else "/".join(storage
)
463 dir_cursor
= self
.fs
.find({"filename": f
})
464 for requested_dir
in dir_cursor
:
465 exception_dir
= next(dir_cursor
, None)
469 "Multiple directories found",
470 http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
,
473 if requested_dir
.metadata
["type"] != "dir":
475 "File {} does not exist".format(f
),
476 http_code
=HTTPStatus
.NOT_FOUND
,
482 files_cursor
= self
.fs
.find(
483 {"filename": {"$regex": "^{}/([^/])*".format(f
)}}
485 for children_file
in files_cursor
:
486 files
+= [children_file
.filename
.replace(f
+ "/", "", 1)]
491 "File {} cannot be opened".format(f
), http_code
=HTTPStatus
.BAD_REQUEST
494 def file_delete(self
, storage
, ignore_non_exist
=False):
496 Delete storage content recursively
497 :param storage: can be a str or list of str
498 :param ignore_non_exist: not raise exception if storage does not exist
502 f
= storage
if isinstance(storage
, str) else "/".join(storage
)
504 file_cursor
= self
.fs
.find({"filename": f
})
506 for requested_file
in file_cursor
:
508 exception_file
= next(file_cursor
, None)
512 "Multiple files found",
513 http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
,
516 if requested_file
.metadata
["type"] == "dir":
517 dir_cursor
= self
.fs
.find({"filename": {"$regex": "^{}".format(f
)}})
519 for tmp
in dir_cursor
:
520 self
.fs
.delete(tmp
._id
)
522 self
.fs
.delete(requested_file
._id
)
523 if not found
and not ignore_non_exist
:
525 "File {} does not exist".format(storage
),
526 http_code
=HTTPStatus
.NOT_FOUND
,
530 "File {} cannot be deleted: {}".format(f
, e
),
531 http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
,
534 def sync(self
, from_path
=None):
536 Sync from FSMongo to local storage
537 :param from_path: if supplied, only copy content from this path, not all
541 if os
.path
.isabs(from_path
):
542 from_path
= os
.path
.relpath(from_path
, self
.path
)
543 self
.__update
_local
_fs
(from_path
=from_path
)
545 def _update_mongo_fs(self
, from_path
):
547 os_path
= self
.path
+ from_path
549 # Obtain list of files and dirs in filesystem
551 for root
, dirs
, files
in os
.walk(os_path
):
553 member
= {"filename": os
.path
.join(root
, folder
), "type": "dir"}
554 members
.append(member
)
556 filename
= os
.path
.join(root
, file)
557 if os
.path
.islink(filename
):
561 member
= {"filename": os
.path
.join(root
, file), "type": file_type
}
562 members
.append(member
)
564 # Obtain files in mongo dict
565 remote_files
= self
._get
_mongo
_files
(from_path
)
567 # Upload members if they do not exists or have been modified
568 # We will do this for performance (avoid updating unmodified files) and to avoid
569 # updating a file with an older one in case there are two sources for synchronization
570 # in high availability scenarios
571 for member
in members
:
573 mask
= int(oct(os
.stat(member
["filename"]).st_mode
)[-3:], 8)
575 # convert to relative path
576 rel_filename
= os
.path
.relpath(member
["filename"], self
.path
)
577 last_modified_date
= datetime
.datetime
.fromtimestamp(
578 os
.path
.getmtime(member
["filename"])
581 remote_file
= remote_files
.get(rel_filename
)
583 remote_file
[0].uploadDate
if remote_file
else datetime
.datetime
.min
585 # remove processed files from dict
586 remote_files
.pop(rel_filename
, None)
588 if last_modified_date
>= upload_date
:
593 file_type
= member
["type"]
594 if file_type
== "dir":
596 elif file_type
== "sym":
598 os
.readlink(member
["filename"]).encode("utf-8")
601 fh
= open(member
["filename"], "rb")
602 stream
= BytesIO(fh
.read())
604 metadata
= {"type": file_type
, "permissions": mask
}
606 self
.fs
.upload_from_stream(rel_filename
, stream
, metadata
=metadata
)
610 for file in remote_file
:
611 self
.fs
.delete(file._id
)
618 # delete files that are not any more in local fs
619 for remote_file
in remote_files
.values():
620 for file in remote_file
:
621 self
.fs
.delete(file._id
)
623 def _get_mongo_files(self
, from_path
=None):
626 file_cursor
= self
.fs
.find(no_cursor_timeout
=True, sort
=[("uploadDate", -1)])
627 for file in file_cursor
:
628 if from_path
and not file.filename
.startswith(from_path
):
630 if file.filename
in file_dict
:
631 file_dict
[file.filename
].append(file)
633 file_dict
[file.filename
] = [file]
636 def reverse_sync(self
, from_path
: str):
638 Sync from local storage to FSMongo
639 :param from_path: base directory to upload content to mongo fs
642 if os
.path
.isabs(from_path
):
643 from_path
= os
.path
.relpath(from_path
, self
.path
)
644 self
._update
_mongo
_fs
(from_path
=from_path
)