740d54032c9237986a989c7753820088b36db75b
1 # Copyright 2019 Canonical
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
15 # For those usages not covered by the Apache License, Version 2.0 please
16 # contact: eduardo.sousa@canonical.com
20 from http
import HTTPStatus
21 from io
import BytesIO
, StringIO
25 from gridfs
import GridFSBucket
, errors
26 from osm_common
.fsbase
import FsBase
, FsException
27 from pymongo
import MongoClient
30 __author__
= "Eduardo Sousa <eduardo.sousa@canonical.com>"
33 class GridByteStream(BytesIO
):
34 def __init__(self
, filename
, fs
, mode
):
35 BytesIO
.__init
__(self
)
37 self
.filename
= filename
40 self
.file_type
= "file" # Set "file" as default file_type
44 def __initialize__(self
):
47 cursor
= self
.fs
.find({"filename": self
.filename
})
49 for requested_file
in cursor
:
50 exception_file
= next(cursor
, None)
53 raise FsException("Multiple files found", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
55 if requested_file
.metadata
["type"] in ("file", "sym"):
56 grid_file
= requested_file
57 self
.file_type
= requested_file
.metadata
["type"]
59 raise FsException("Type isn't file", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
62 self
._id
= grid_file
._id
63 self
.fs
.download_to_stream(self
._id
, self
)
70 super(GridByteStream
, self
).close()
74 self
.fs
.delete(self
._id
)
76 cursor
= self
.fs
.find({
77 "filename": self
.filename
.split("/")[0],
78 "metadata": {"type": "dir"}})
80 parent_dir
= next(cursor
, None)
83 parent_dir_name
= self
.filename
.split("/")[0]
84 self
.filename
= self
.filename
.replace(parent_dir_name
, parent_dir_name
[:-1], 1)
88 self
.fs
.upload_from_stream_with_id(
92 metadata
={"type": self
.file_type
}
95 self
.fs
.upload_from_stream(
98 metadata
={"type": self
.file_type
}
100 super(GridByteStream
, self
).close()
105 def __exit__(self
, exc_type
, exc_val
, exc_tb
):
109 class GridStringStream(StringIO
):
110 def __init__(self
, filename
, fs
, mode
):
111 StringIO
.__init
__(self
)
113 self
.filename
= filename
116 self
.file_type
= "file" # Set "file" as default file_type
118 self
.__initialize
__()
120 def __initialize__(self
):
123 cursor
= self
.fs
.find({"filename": self
.filename
})
125 for requested_file
in cursor
:
126 exception_file
= next(cursor
, None)
129 raise FsException("Multiple files found", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
131 if requested_file
.metadata
["type"] in ("file", "dir"):
132 grid_file
= requested_file
133 self
.file_type
= requested_file
.metadata
["type"]
135 raise FsException("File type isn't file", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
139 self
._id
= grid_file
._id
140 self
.fs
.download_to_stream(self
._id
, stream
)
142 self
.write(stream
.read().decode("utf-8"))
150 super(GridStringStream
, self
).close()
154 self
.fs
.delete(self
._id
)
156 cursor
= self
.fs
.find({
157 "filename": self
.filename
.split("/")[0],
158 "metadata": {"type": "dir"}})
160 parent_dir
= next(cursor
, None)
163 parent_dir_name
= self
.filename
.split("/")[0]
164 self
.filename
= self
.filename
.replace(parent_dir_name
, parent_dir_name
[:-1], 1)
168 stream
.write(self
.read().encode("utf-8"))
171 self
.fs
.upload_from_stream_with_id(
175 metadata
={"type": self
.file_type
}
178 self
.fs
.upload_from_stream(
181 metadata
={"type": self
.file_type
}
184 super(GridStringStream
, self
).close()
189 def __exit__(self
, exc_type
, exc_val
, exc_tb
):
193 class FsMongo(FsBase
):
195 def __init__(self
, logger_name
='fs', lock
=False):
196 super().__init
__(logger_name
, lock
)
201 def __update_local_fs(self
, from_path
=None):
202 dir_cursor
= self
.fs
.find({"metadata.type": "dir"}, no_cursor_timeout
=True)
204 for directory
in dir_cursor
:
205 if from_path
and not directory
.filename
.startswith(from_path
):
207 os
.makedirs(self
.path
+ directory
.filename
, exist_ok
=True)
209 file_cursor
= self
.fs
.find({"metadata.type": {"$in": ["file", "sym"]}}, no_cursor_timeout
=True)
211 for writing_file
in file_cursor
:
212 if from_path
and not writing_file
.filename
.startswith(from_path
):
214 file_path
= self
.path
+ writing_file
.filename
216 if writing_file
.metadata
["type"] == "sym":
218 self
.fs
.download_to_stream(writing_file
._id
, b
)
220 link
= b
.read().decode("utf-8")
225 if e
.errno
!= errno
.ENOENT
:
226 # This is probably permission denied or worse
228 os
.symlink(link
, file_path
)
230 with
open(file_path
, 'wb+') as file_stream
:
231 self
.fs
.download_to_stream(writing_file
._id
, file_stream
)
232 if "permissions" in writing_file
.metadata
:
233 os
.chmod(file_path
, writing_file
.metadata
["permissions"])
235 def get_params(self
):
236 return {"fs": "mongo", "path": self
.path
}
238 def fs_connect(self
, config
):
240 if "logger_name" in config
:
241 self
.logger
= logging
.getLogger(config
["logger_name"])
243 self
.path
= config
["path"]
245 raise FsException("Missing parameter \"path\"")
246 if not self
.path
.endswith("/"):
248 if not os
.path
.exists(self
.path
):
249 raise FsException("Invalid configuration param at '[storage]': path '{}' does not exist".format(
251 elif not os
.access(self
.path
, os
.W_OK
):
252 raise FsException("Invalid configuration param at '[storage]': path '{}' is not writable".format(
254 if all(key
in config
.keys() for key
in ["uri", "collection"]):
255 self
.client
= MongoClient(config
["uri"])
256 self
.fs
= GridFSBucket(self
.client
[config
["collection"]])
257 elif all(key
in config
.keys() for key
in ["host", "port", "collection"]):
258 self
.client
= MongoClient(config
["host"], config
["port"])
259 self
.fs
= GridFSBucket(self
.client
[config
["collection"]])
261 if "collection" not in config
.keys():
262 raise FsException("Missing parameter \"collection\"")
264 raise FsException("Missing parameters: \"uri\" or \"host\" + \"port\"")
267 except Exception as e
: # TODO refine
268 raise FsException(str(e
))
270 def fs_disconnect(self
):
273 def mkdir(self
, folder
):
275 Creates a folder or parent object location
277 :return: None or raises an exception
280 self
.fs
.upload_from_stream(
281 folder
, BytesIO(), metadata
={"type": "dir"})
282 except errors
.FileExists
: # make it idempotent
284 except Exception as e
:
285 raise FsException(str(e
), http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
287 def dir_rename(self
, src
, dst
):
289 Rename one directory name. If dst exist, it replaces (deletes) existing directory
290 :param src: source directory
291 :param dst: destination directory
292 :return: None or raises and exception
295 dst_cursor
= self
.fs
.find(
296 {"filename": {"$regex": "^{}(/|$)".format(dst
)}},
297 no_cursor_timeout
=True)
299 for dst_file
in dst_cursor
:
300 self
.fs
.delete(dst_file
._id
)
302 src_cursor
= self
.fs
.find(
303 {"filename": {"$regex": "^{}(/|$)".format(src
)}},
304 no_cursor_timeout
=True)
306 for src_file
in src_cursor
:
307 self
.fs
.rename(src_file
._id
, src_file
.filename
.replace(src
, dst
, 1))
308 except Exception as e
:
309 raise FsException(str(e
), http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
311 def file_exists(self
, storage
, mode
=None):
313 Indicates if "storage" file exist
314 :param storage: can be a str or a str list
315 :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
318 f
= storage
if isinstance(storage
, str) else "/".join(storage
)
320 cursor
= self
.fs
.find({"filename": f
})
322 for requested_file
in cursor
:
323 exception_file
= next(cursor
, None)
326 raise FsException("Multiple files found", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
328 if requested_file
.metadata
["type"] == mode
:
331 if requested_file
.metadata
["type"] == "sym" and mode
== "file":
336 def file_size(self
, storage
):
339 :param storage: can be a str or a str list
342 f
= storage
if isinstance(storage
, str) else "/".join(storage
)
344 cursor
= self
.fs
.find({"filename": f
})
346 for requested_file
in cursor
:
347 exception_file
= next(cursor
, None)
350 raise FsException("Multiple files found", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
352 return requested_file
.length
354 def file_extract(self
, tar_object
, path
):
357 :param tar_object: object of type tar
358 :param path: can be a str or a str list, or a tar object where to extract the tar_object
361 f
= path
if isinstance(path
, str) else "/".join(path
)
363 for member
in tar_object
.getmembers():
365 stream
= tar_object
.extractfile(member
)
367 stream
= BytesIO(member
.linkname
.encode("utf-8"))
380 "permissions": member
.mode
383 self
.fs
.upload_from_stream(
384 f
+ "/" + member
.name
,
391 def file_open(self
, storage
, mode
):
394 :param storage: can be a str or list of str
395 :param mode: file mode
399 f
= storage
if isinstance(storage
, str) else "/".join(storage
)
402 return GridByteStream(f
, self
.fs
, mode
)
404 return GridStringStream(f
, self
.fs
, mode
)
405 except errors
.NoFile
:
406 raise FsException("File {} does not exist".format(f
), http_code
=HTTPStatus
.NOT_FOUND
)
408 raise FsException("File {} cannot be opened".format(f
), http_code
=HTTPStatus
.BAD_REQUEST
)
410 def dir_ls(self
, storage
):
412 return folder content
413 :param storage: can be a str or list of str
414 :return: folder content
417 f
= storage
if isinstance(storage
, str) else "/".join(storage
)
420 dir_cursor
= self
.fs
.find({"filename": f
})
421 for requested_dir
in dir_cursor
:
422 exception_dir
= next(dir_cursor
, None)
425 raise FsException("Multiple directories found", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
427 if requested_dir
.metadata
["type"] != "dir":
428 raise FsException("File {} does not exist".format(f
), http_code
=HTTPStatus
.NOT_FOUND
)
430 files_cursor
= self
.fs
.find({"filename": {"$regex": "^{}/([^/])*".format(f
)}})
431 for children_file
in files_cursor
:
432 files
+= [children_file
.filename
.replace(f
+ '/', '', 1)]
436 raise FsException("File {} cannot be opened".format(f
), http_code
=HTTPStatus
.BAD_REQUEST
)
438 def file_delete(self
, storage
, ignore_non_exist
=False):
440 Delete storage content recursively
441 :param storage: can be a str or list of str
442 :param ignore_non_exist: not raise exception if storage does not exist
446 f
= storage
if isinstance(storage
, str) else "/".join(storage
)
448 file_cursor
= self
.fs
.find({"filename": f
})
450 for requested_file
in file_cursor
:
452 exception_file
= next(file_cursor
, None)
455 raise FsException("Multiple files found", http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
457 if requested_file
.metadata
["type"] == "dir":
458 dir_cursor
= self
.fs
.find({"filename": {"$regex": "^{}".format(f
)}})
460 for tmp
in dir_cursor
:
461 self
.fs
.delete(tmp
._id
)
463 self
.fs
.delete(requested_file
._id
)
464 if not found
and not ignore_non_exist
:
465 raise FsException("File {} does not exist".format(storage
), http_code
=HTTPStatus
.NOT_FOUND
)
467 raise FsException("File {} cannot be deleted: {}".format(f
, e
), http_code
=HTTPStatus
.INTERNAL_SERVER_ERROR
)
469 def sync(self
, from_path
=None):
471 Sync from FSMongo to local storage
472 :param from_path: if supplied, only copy content from this path, not all
475 self
.__update
_local
_fs
(from_path
=from_path
)