blob: 740d54032c9237986a989c7753820088b36db75b [file] [log] [blame]
Eduardo Sousa0593aba2019-06-04 12:55:43 +01001# Copyright 2019 Canonical
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14#
15# For those usages not covered by the Apache License, Version 2.0 please
16# contact: eduardo.sousa@canonical.com
17##
18
beierlmff2e8262020-07-08 16:32:50 -040019import errno
Eduardo Sousa0593aba2019-06-04 12:55:43 +010020from http import HTTPStatus
beierlmff2e8262020-07-08 16:32:50 -040021from io import BytesIO, StringIO
22import logging
Eduardo Sousa0593aba2019-06-04 12:55:43 +010023import os
beierlmff2e8262020-07-08 16:32:50 -040024
25from gridfs import GridFSBucket, errors
Eduardo Sousa0593aba2019-06-04 12:55:43 +010026from osm_common.fsbase import FsBase, FsException
beierlmff2e8262020-07-08 16:32:50 -040027from pymongo import MongoClient
28
Eduardo Sousa0593aba2019-06-04 12:55:43 +010029
30__author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>"
31
32
33class GridByteStream(BytesIO):
34 def __init__(self, filename, fs, mode):
35 BytesIO.__init__(self)
36 self._id = None
37 self.filename = filename
38 self.fs = fs
39 self.mode = mode
David Garcia7982b782020-05-20 12:09:37 +020040 self.file_type = "file" # Set "file" as default file_type
Eduardo Sousa0593aba2019-06-04 12:55:43 +010041
42 self.__initialize__()
43
44 def __initialize__(self):
45 grid_file = None
46
47 cursor = self.fs.find({"filename": self.filename})
48
49 for requested_file in cursor:
50 exception_file = next(cursor, None)
51
52 if exception_file:
53 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
54
sousaedub95cca62020-03-12 11:12:25 +000055 if requested_file.metadata["type"] in ("file", "sym"):
Eduardo Sousa0593aba2019-06-04 12:55:43 +010056 grid_file = requested_file
sousaedub95cca62020-03-12 11:12:25 +000057 self.file_type = requested_file.metadata["type"]
Eduardo Sousa0593aba2019-06-04 12:55:43 +010058 else:
59 raise FsException("Type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
60
61 if grid_file:
62 self._id = grid_file._id
63 self.fs.download_to_stream(self._id, self)
64
65 if "r" in self.mode:
66 self.seek(0, 0)
67
68 def close(self):
69 if "r" in self.mode:
70 super(GridByteStream, self).close()
71 return
72
73 if self._id:
74 self.fs.delete(self._id)
75
76 cursor = self.fs.find({
77 "filename": self.filename.split("/")[0],
78 "metadata": {"type": "dir"}})
79
80 parent_dir = next(cursor, None)
81
82 if not parent_dir:
83 parent_dir_name = self.filename.split("/")[0]
84 self.filename = self.filename.replace(parent_dir_name, parent_dir_name[:-1], 1)
85
86 self.seek(0, 0)
87 if self._id:
88 self.fs.upload_from_stream_with_id(
89 self._id,
90 self.filename,
91 self,
sousaedub95cca62020-03-12 11:12:25 +000092 metadata={"type": self.file_type}
Eduardo Sousa0593aba2019-06-04 12:55:43 +010093 )
94 else:
95 self.fs.upload_from_stream(
96 self.filename,
97 self,
sousaedub95cca62020-03-12 11:12:25 +000098 metadata={"type": self.file_type}
Eduardo Sousa0593aba2019-06-04 12:55:43 +010099 )
100 super(GridByteStream, self).close()
101
102 def __enter__(self):
103 return self
104
105 def __exit__(self, exc_type, exc_val, exc_tb):
106 self.close()
107
108
109class GridStringStream(StringIO):
110 def __init__(self, filename, fs, mode):
111 StringIO.__init__(self)
112 self._id = None
113 self.filename = filename
114 self.fs = fs
115 self.mode = mode
David Garcia7982b782020-05-20 12:09:37 +0200116 self.file_type = "file" # Set "file" as default file_type
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100117
118 self.__initialize__()
119
120 def __initialize__(self):
121 grid_file = None
122
123 cursor = self.fs.find({"filename": self.filename})
124
125 for requested_file in cursor:
126 exception_file = next(cursor, None)
127
128 if exception_file:
129 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
130
sousaedub95cca62020-03-12 11:12:25 +0000131 if requested_file.metadata["type"] in ("file", "dir"):
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100132 grid_file = requested_file
sousaedub95cca62020-03-12 11:12:25 +0000133 self.file_type = requested_file.metadata["type"]
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100134 else:
135 raise FsException("File type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
136
137 if grid_file:
138 stream = BytesIO()
139 self._id = grid_file._id
140 self.fs.download_to_stream(self._id, stream)
141 stream.seek(0)
142 self.write(stream.read().decode("utf-8"))
143 stream.close()
144
145 if "r" in self.mode:
146 self.seek(0, 0)
147
148 def close(self):
149 if "r" in self.mode:
150 super(GridStringStream, self).close()
151 return
152
153 if self._id:
154 self.fs.delete(self._id)
155
156 cursor = self.fs.find({
157 "filename": self.filename.split("/")[0],
158 "metadata": {"type": "dir"}})
159
160 parent_dir = next(cursor, None)
161
162 if not parent_dir:
163 parent_dir_name = self.filename.split("/")[0]
164 self.filename = self.filename.replace(parent_dir_name, parent_dir_name[:-1], 1)
165
166 self.seek(0, 0)
167 stream = BytesIO()
168 stream.write(self.read().encode("utf-8"))
169 stream.seek(0, 0)
170 if self._id:
171 self.fs.upload_from_stream_with_id(
172 self._id,
173 self.filename,
174 stream,
sousaedub95cca62020-03-12 11:12:25 +0000175 metadata={"type": self.file_type}
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100176 )
177 else:
178 self.fs.upload_from_stream(
179 self.filename,
180 stream,
sousaedub95cca62020-03-12 11:12:25 +0000181 metadata={"type": self.file_type}
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100182 )
183 stream.close()
184 super(GridStringStream, self).close()
185
186 def __enter__(self):
187 return self
188
189 def __exit__(self, exc_type, exc_val, exc_tb):
190 self.close()
191
192
193class FsMongo(FsBase):
194
195 def __init__(self, logger_name='fs', lock=False):
196 super().__init__(logger_name, lock)
197 self.path = None
198 self.client = None
199 self.fs = None
200
tiernob07e4ef2020-05-06 14:22:48 +0000201 def __update_local_fs(self, from_path=None):
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100202 dir_cursor = self.fs.find({"metadata.type": "dir"}, no_cursor_timeout=True)
203
204 for directory in dir_cursor:
tiernob07e4ef2020-05-06 14:22:48 +0000205 if from_path and not directory.filename.startswith(from_path):
206 continue
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100207 os.makedirs(self.path + directory.filename, exist_ok=True)
208
David Garcia7982b782020-05-20 12:09:37 +0200209 file_cursor = self.fs.find({"metadata.type": {"$in": ["file", "sym"]}}, no_cursor_timeout=True)
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100210
211 for writing_file in file_cursor:
tiernob07e4ef2020-05-06 14:22:48 +0000212 if from_path and not writing_file.filename.startswith(from_path):
213 continue
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100214 file_path = self.path + writing_file.filename
David Garcia8ab6cc62020-06-26 17:04:37 +0200215
216 if writing_file.metadata["type"] == "sym":
217 with BytesIO() as b:
218 self.fs.download_to_stream(writing_file._id, b)
219 b.seek(0)
220 link = b.read().decode("utf-8")
beierlmff2e8262020-07-08 16:32:50 -0400221
222 try:
223 os.remove(file_path)
224 except OSError as e:
225 if e.errno != errno.ENOENT:
226 # This is probably permission denied or worse
227 raise
David Garcia8ab6cc62020-06-26 17:04:37 +0200228 os.symlink(link, file_path)
229 else:
230 with open(file_path, 'wb+') as file_stream:
231 self.fs.download_to_stream(writing_file._id, file_stream)
232 if "permissions" in writing_file.metadata:
sousaedub95cca62020-03-12 11:12:25 +0000233 os.chmod(file_path, writing_file.metadata["permissions"])
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100234
235 def get_params(self):
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100236 return {"fs": "mongo", "path": self.path}
237
238 def fs_connect(self, config):
239 try:
240 if "logger_name" in config:
241 self.logger = logging.getLogger(config["logger_name"])
242 if "path" in config:
243 self.path = config["path"]
244 else:
245 raise FsException("Missing parameter \"path\"")
246 if not self.path.endswith("/"):
247 self.path += "/"
248 if not os.path.exists(self.path):
249 raise FsException("Invalid configuration param at '[storage]': path '{}' does not exist".format(
250 config["path"]))
251 elif not os.access(self.path, os.W_OK):
252 raise FsException("Invalid configuration param at '[storage]': path '{}' is not writable".format(
253 config["path"]))
254 if all(key in config.keys() for key in ["uri", "collection"]):
255 self.client = MongoClient(config["uri"])
256 self.fs = GridFSBucket(self.client[config["collection"]])
257 elif all(key in config.keys() for key in ["host", "port", "collection"]):
258 self.client = MongoClient(config["host"], config["port"])
259 self.fs = GridFSBucket(self.client[config["collection"]])
260 else:
261 if "collection" not in config.keys():
262 raise FsException("Missing parameter \"collection\"")
263 else:
264 raise FsException("Missing parameters: \"uri\" or \"host\" + \"port\"")
265 except FsException:
266 raise
267 except Exception as e: # TODO refine
268 raise FsException(str(e))
269
270 def fs_disconnect(self):
271 pass # TODO
272
273 def mkdir(self, folder):
274 """
275 Creates a folder or parent object location
276 :param folder:
277 :return: None or raises an exception
278 """
279 try:
280 self.fs.upload_from_stream(
281 folder, BytesIO(), metadata={"type": "dir"})
282 except errors.FileExists: # make it idempotent
283 pass
284 except Exception as e:
285 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
286
287 def dir_rename(self, src, dst):
288 """
289 Rename one directory name. If dst exist, it replaces (deletes) existing directory
290 :param src: source directory
291 :param dst: destination directory
292 :return: None or raises and exception
293 """
294 try:
295 dst_cursor = self.fs.find(
296 {"filename": {"$regex": "^{}(/|$)".format(dst)}},
297 no_cursor_timeout=True)
298
299 for dst_file in dst_cursor:
300 self.fs.delete(dst_file._id)
301
302 src_cursor = self.fs.find(
303 {"filename": {"$regex": "^{}(/|$)".format(src)}},
304 no_cursor_timeout=True)
305
306 for src_file in src_cursor:
307 self.fs.rename(src_file._id, src_file.filename.replace(src, dst, 1))
308 except Exception as e:
309 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
310
311 def file_exists(self, storage, mode=None):
312 """
313 Indicates if "storage" file exist
314 :param storage: can be a str or a str list
315 :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
316 :return: True, False
317 """
318 f = storage if isinstance(storage, str) else "/".join(storage)
319
320 cursor = self.fs.find({"filename": f})
321
322 for requested_file in cursor:
323 exception_file = next(cursor, None)
324
325 if exception_file:
326 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
327
328 if requested_file.metadata["type"] == mode:
329 return True
beierlmff2e8262020-07-08 16:32:50 -0400330
sousaedub95cca62020-03-12 11:12:25 +0000331 if requested_file.metadata["type"] == "sym" and mode == "file":
332 return True
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100333
334 return False
335
336 def file_size(self, storage):
337 """
338 return file size
339 :param storage: can be a str or a str list
340 :return: file size
341 """
342 f = storage if isinstance(storage, str) else "/".join(storage)
343
344 cursor = self.fs.find({"filename": f})
345
346 for requested_file in cursor:
347 exception_file = next(cursor, None)
348
349 if exception_file:
350 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
351
352 return requested_file.length
353
354 def file_extract(self, tar_object, path):
355 """
356 extract a tar file
357 :param tar_object: object of type tar
358 :param path: can be a str or a str list, or a tar object where to extract the tar_object
359 :return: None
360 """
361 f = path if isinstance(path, str) else "/".join(path)
362
363 for member in tar_object.getmembers():
364 if member.isfile():
365 stream = tar_object.extractfile(member)
David Garcia8ab6cc62020-06-26 17:04:37 +0200366 elif member.issym():
367 stream = BytesIO(member.linkname.encode("utf-8"))
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100368 else:
369 stream = BytesIO()
370
sousaedub95cca62020-03-12 11:12:25 +0000371 if member.isfile():
372 file_type = "file"
373 elif member.issym():
374 file_type = "sym"
375 else:
376 file_type = "dir"
377
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100378 metadata = {
sousaedub95cca62020-03-12 11:12:25 +0000379 "type": file_type,
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100380 "permissions": member.mode
381 }
382
383 self.fs.upload_from_stream(
384 f + "/" + member.name,
385 stream,
386 metadata=metadata
387 )
388
389 stream.close()
390
391 def file_open(self, storage, mode):
392 """
393 Open a file
394 :param storage: can be a str or list of str
395 :param mode: file mode
396 :return: file object
397 """
398 try:
399 f = storage if isinstance(storage, str) else "/".join(storage)
400
401 if "b" in mode:
402 return GridByteStream(f, self.fs, mode)
403 else:
404 return GridStringStream(f, self.fs, mode)
405 except errors.NoFile:
406 raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
407 except IOError:
408 raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
409
410 def dir_ls(self, storage):
411 """
412 return folder content
413 :param storage: can be a str or list of str
414 :return: folder content
415 """
416 try:
417 f = storage if isinstance(storage, str) else "/".join(storage)
418
419 files = []
420 dir_cursor = self.fs.find({"filename": f})
421 for requested_dir in dir_cursor:
422 exception_dir = next(dir_cursor, None)
423
424 if exception_dir:
425 raise FsException("Multiple directories found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
426
427 if requested_dir.metadata["type"] != "dir":
428 raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
429
430 files_cursor = self.fs.find({"filename": {"$regex": "^{}/([^/])*".format(f)}})
431 for children_file in files_cursor:
432 files += [children_file.filename.replace(f + '/', '', 1)]
433
434 return files
435 except IOError:
436 raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
437
438 def file_delete(self, storage, ignore_non_exist=False):
439 """
440 Delete storage content recursively
441 :param storage: can be a str or list of str
442 :param ignore_non_exist: not raise exception if storage does not exist
443 :return: None
444 """
445 try:
446 f = storage if isinstance(storage, str) else "/".join(storage)
447
448 file_cursor = self.fs.find({"filename": f})
449 found = False
450 for requested_file in file_cursor:
451 found = True
452 exception_file = next(file_cursor, None)
453
454 if exception_file:
455 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
456
457 if requested_file.metadata["type"] == "dir":
458 dir_cursor = self.fs.find({"filename": {"$regex": "^{}".format(f)}})
459
460 for tmp in dir_cursor:
461 self.fs.delete(tmp._id)
462 else:
463 self.fs.delete(requested_file._id)
464 if not found and not ignore_non_exist:
beierlmff2e8262020-07-08 16:32:50 -0400465 raise FsException("File {} does not exist".format(storage), http_code=HTTPStatus.NOT_FOUND)
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100466 except IOError as e:
467 raise FsException("File {} cannot be deleted: {}".format(f, e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
David Garcia788b9d62020-01-20 13:21:06 +0100468
tiernob07e4ef2020-05-06 14:22:48 +0000469 def sync(self, from_path=None):
David Garcia788b9d62020-01-20 13:21:06 +0100470 """
471 Sync from FSMongo to local storage
tiernob07e4ef2020-05-06 14:22:48 +0000472 :param from_path: if supplied, only copy content from this path, not all
473 :return: None
David Garcia788b9d62020-01-20 13:21:06 +0100474 """
tiernob07e4ef2020-05-06 14:22:48 +0000475 self.__update_local_fs(from_path=from_path)