740d54032c9237986a989c7753820088b36db75b
[osm/common.git] / osm_common / fsmongo.py
1 # Copyright 2019 Canonical
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
13 # under the License.
14 #
15 # For those usages not covered by the Apache License, Version 2.0 please
16 # contact: eduardo.sousa@canonical.com
17 ##
18
19 import errno
20 from http import HTTPStatus
21 from io import BytesIO, StringIO
22 import logging
23 import os
24
25 from gridfs import GridFSBucket, errors
26 from osm_common.fsbase import FsBase, FsException
27 from pymongo import MongoClient
28
29
30 __author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>"
31
32
33 class GridByteStream(BytesIO):
34 def __init__(self, filename, fs, mode):
35 BytesIO.__init__(self)
36 self._id = None
37 self.filename = filename
38 self.fs = fs
39 self.mode = mode
40 self.file_type = "file" # Set "file" as default file_type
41
42 self.__initialize__()
43
44 def __initialize__(self):
45 grid_file = None
46
47 cursor = self.fs.find({"filename": self.filename})
48
49 for requested_file in cursor:
50 exception_file = next(cursor, None)
51
52 if exception_file:
53 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
54
55 if requested_file.metadata["type"] in ("file", "sym"):
56 grid_file = requested_file
57 self.file_type = requested_file.metadata["type"]
58 else:
59 raise FsException("Type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
60
61 if grid_file:
62 self._id = grid_file._id
63 self.fs.download_to_stream(self._id, self)
64
65 if "r" in self.mode:
66 self.seek(0, 0)
67
68 def close(self):
69 if "r" in self.mode:
70 super(GridByteStream, self).close()
71 return
72
73 if self._id:
74 self.fs.delete(self._id)
75
76 cursor = self.fs.find({
77 "filename": self.filename.split("/")[0],
78 "metadata": {"type": "dir"}})
79
80 parent_dir = next(cursor, None)
81
82 if not parent_dir:
83 parent_dir_name = self.filename.split("/")[0]
84 self.filename = self.filename.replace(parent_dir_name, parent_dir_name[:-1], 1)
85
86 self.seek(0, 0)
87 if self._id:
88 self.fs.upload_from_stream_with_id(
89 self._id,
90 self.filename,
91 self,
92 metadata={"type": self.file_type}
93 )
94 else:
95 self.fs.upload_from_stream(
96 self.filename,
97 self,
98 metadata={"type": self.file_type}
99 )
100 super(GridByteStream, self).close()
101
102 def __enter__(self):
103 return self
104
105 def __exit__(self, exc_type, exc_val, exc_tb):
106 self.close()
107
108
109 class GridStringStream(StringIO):
110 def __init__(self, filename, fs, mode):
111 StringIO.__init__(self)
112 self._id = None
113 self.filename = filename
114 self.fs = fs
115 self.mode = mode
116 self.file_type = "file" # Set "file" as default file_type
117
118 self.__initialize__()
119
120 def __initialize__(self):
121 grid_file = None
122
123 cursor = self.fs.find({"filename": self.filename})
124
125 for requested_file in cursor:
126 exception_file = next(cursor, None)
127
128 if exception_file:
129 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
130
131 if requested_file.metadata["type"] in ("file", "dir"):
132 grid_file = requested_file
133 self.file_type = requested_file.metadata["type"]
134 else:
135 raise FsException("File type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
136
137 if grid_file:
138 stream = BytesIO()
139 self._id = grid_file._id
140 self.fs.download_to_stream(self._id, stream)
141 stream.seek(0)
142 self.write(stream.read().decode("utf-8"))
143 stream.close()
144
145 if "r" in self.mode:
146 self.seek(0, 0)
147
148 def close(self):
149 if "r" in self.mode:
150 super(GridStringStream, self).close()
151 return
152
153 if self._id:
154 self.fs.delete(self._id)
155
156 cursor = self.fs.find({
157 "filename": self.filename.split("/")[0],
158 "metadata": {"type": "dir"}})
159
160 parent_dir = next(cursor, None)
161
162 if not parent_dir:
163 parent_dir_name = self.filename.split("/")[0]
164 self.filename = self.filename.replace(parent_dir_name, parent_dir_name[:-1], 1)
165
166 self.seek(0, 0)
167 stream = BytesIO()
168 stream.write(self.read().encode("utf-8"))
169 stream.seek(0, 0)
170 if self._id:
171 self.fs.upload_from_stream_with_id(
172 self._id,
173 self.filename,
174 stream,
175 metadata={"type": self.file_type}
176 )
177 else:
178 self.fs.upload_from_stream(
179 self.filename,
180 stream,
181 metadata={"type": self.file_type}
182 )
183 stream.close()
184 super(GridStringStream, self).close()
185
186 def __enter__(self):
187 return self
188
189 def __exit__(self, exc_type, exc_val, exc_tb):
190 self.close()
191
192
193 class FsMongo(FsBase):
194
195 def __init__(self, logger_name='fs', lock=False):
196 super().__init__(logger_name, lock)
197 self.path = None
198 self.client = None
199 self.fs = None
200
201 def __update_local_fs(self, from_path=None):
202 dir_cursor = self.fs.find({"metadata.type": "dir"}, no_cursor_timeout=True)
203
204 for directory in dir_cursor:
205 if from_path and not directory.filename.startswith(from_path):
206 continue
207 os.makedirs(self.path + directory.filename, exist_ok=True)
208
209 file_cursor = self.fs.find({"metadata.type": {"$in": ["file", "sym"]}}, no_cursor_timeout=True)
210
211 for writing_file in file_cursor:
212 if from_path and not writing_file.filename.startswith(from_path):
213 continue
214 file_path = self.path + writing_file.filename
215
216 if writing_file.metadata["type"] == "sym":
217 with BytesIO() as b:
218 self.fs.download_to_stream(writing_file._id, b)
219 b.seek(0)
220 link = b.read().decode("utf-8")
221
222 try:
223 os.remove(file_path)
224 except OSError as e:
225 if e.errno != errno.ENOENT:
226 # This is probably permission denied or worse
227 raise
228 os.symlink(link, file_path)
229 else:
230 with open(file_path, 'wb+') as file_stream:
231 self.fs.download_to_stream(writing_file._id, file_stream)
232 if "permissions" in writing_file.metadata:
233 os.chmod(file_path, writing_file.metadata["permissions"])
234
235 def get_params(self):
236 return {"fs": "mongo", "path": self.path}
237
238 def fs_connect(self, config):
239 try:
240 if "logger_name" in config:
241 self.logger = logging.getLogger(config["logger_name"])
242 if "path" in config:
243 self.path = config["path"]
244 else:
245 raise FsException("Missing parameter \"path\"")
246 if not self.path.endswith("/"):
247 self.path += "/"
248 if not os.path.exists(self.path):
249 raise FsException("Invalid configuration param at '[storage]': path '{}' does not exist".format(
250 config["path"]))
251 elif not os.access(self.path, os.W_OK):
252 raise FsException("Invalid configuration param at '[storage]': path '{}' is not writable".format(
253 config["path"]))
254 if all(key in config.keys() for key in ["uri", "collection"]):
255 self.client = MongoClient(config["uri"])
256 self.fs = GridFSBucket(self.client[config["collection"]])
257 elif all(key in config.keys() for key in ["host", "port", "collection"]):
258 self.client = MongoClient(config["host"], config["port"])
259 self.fs = GridFSBucket(self.client[config["collection"]])
260 else:
261 if "collection" not in config.keys():
262 raise FsException("Missing parameter \"collection\"")
263 else:
264 raise FsException("Missing parameters: \"uri\" or \"host\" + \"port\"")
265 except FsException:
266 raise
267 except Exception as e: # TODO refine
268 raise FsException(str(e))
269
270 def fs_disconnect(self):
271 pass # TODO
272
273 def mkdir(self, folder):
274 """
275 Creates a folder or parent object location
276 :param folder:
277 :return: None or raises an exception
278 """
279 try:
280 self.fs.upload_from_stream(
281 folder, BytesIO(), metadata={"type": "dir"})
282 except errors.FileExists: # make it idempotent
283 pass
284 except Exception as e:
285 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
286
287 def dir_rename(self, src, dst):
288 """
289 Rename one directory name. If dst exist, it replaces (deletes) existing directory
290 :param src: source directory
291 :param dst: destination directory
292 :return: None or raises and exception
293 """
294 try:
295 dst_cursor = self.fs.find(
296 {"filename": {"$regex": "^{}(/|$)".format(dst)}},
297 no_cursor_timeout=True)
298
299 for dst_file in dst_cursor:
300 self.fs.delete(dst_file._id)
301
302 src_cursor = self.fs.find(
303 {"filename": {"$regex": "^{}(/|$)".format(src)}},
304 no_cursor_timeout=True)
305
306 for src_file in src_cursor:
307 self.fs.rename(src_file._id, src_file.filename.replace(src, dst, 1))
308 except Exception as e:
309 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
310
311 def file_exists(self, storage, mode=None):
312 """
313 Indicates if "storage" file exist
314 :param storage: can be a str or a str list
315 :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
316 :return: True, False
317 """
318 f = storage if isinstance(storage, str) else "/".join(storage)
319
320 cursor = self.fs.find({"filename": f})
321
322 for requested_file in cursor:
323 exception_file = next(cursor, None)
324
325 if exception_file:
326 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
327
328 if requested_file.metadata["type"] == mode:
329 return True
330
331 if requested_file.metadata["type"] == "sym" and mode == "file":
332 return True
333
334 return False
335
336 def file_size(self, storage):
337 """
338 return file size
339 :param storage: can be a str or a str list
340 :return: file size
341 """
342 f = storage if isinstance(storage, str) else "/".join(storage)
343
344 cursor = self.fs.find({"filename": f})
345
346 for requested_file in cursor:
347 exception_file = next(cursor, None)
348
349 if exception_file:
350 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
351
352 return requested_file.length
353
354 def file_extract(self, tar_object, path):
355 """
356 extract a tar file
357 :param tar_object: object of type tar
358 :param path: can be a str or a str list, or a tar object where to extract the tar_object
359 :return: None
360 """
361 f = path if isinstance(path, str) else "/".join(path)
362
363 for member in tar_object.getmembers():
364 if member.isfile():
365 stream = tar_object.extractfile(member)
366 elif member.issym():
367 stream = BytesIO(member.linkname.encode("utf-8"))
368 else:
369 stream = BytesIO()
370
371 if member.isfile():
372 file_type = "file"
373 elif member.issym():
374 file_type = "sym"
375 else:
376 file_type = "dir"
377
378 metadata = {
379 "type": file_type,
380 "permissions": member.mode
381 }
382
383 self.fs.upload_from_stream(
384 f + "/" + member.name,
385 stream,
386 metadata=metadata
387 )
388
389 stream.close()
390
391 def file_open(self, storage, mode):
392 """
393 Open a file
394 :param storage: can be a str or list of str
395 :param mode: file mode
396 :return: file object
397 """
398 try:
399 f = storage if isinstance(storage, str) else "/".join(storage)
400
401 if "b" in mode:
402 return GridByteStream(f, self.fs, mode)
403 else:
404 return GridStringStream(f, self.fs, mode)
405 except errors.NoFile:
406 raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
407 except IOError:
408 raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
409
410 def dir_ls(self, storage):
411 """
412 return folder content
413 :param storage: can be a str or list of str
414 :return: folder content
415 """
416 try:
417 f = storage if isinstance(storage, str) else "/".join(storage)
418
419 files = []
420 dir_cursor = self.fs.find({"filename": f})
421 for requested_dir in dir_cursor:
422 exception_dir = next(dir_cursor, None)
423
424 if exception_dir:
425 raise FsException("Multiple directories found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
426
427 if requested_dir.metadata["type"] != "dir":
428 raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
429
430 files_cursor = self.fs.find({"filename": {"$regex": "^{}/([^/])*".format(f)}})
431 for children_file in files_cursor:
432 files += [children_file.filename.replace(f + '/', '', 1)]
433
434 return files
435 except IOError:
436 raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
437
438 def file_delete(self, storage, ignore_non_exist=False):
439 """
440 Delete storage content recursively
441 :param storage: can be a str or list of str
442 :param ignore_non_exist: not raise exception if storage does not exist
443 :return: None
444 """
445 try:
446 f = storage if isinstance(storage, str) else "/".join(storage)
447
448 file_cursor = self.fs.find({"filename": f})
449 found = False
450 for requested_file in file_cursor:
451 found = True
452 exception_file = next(file_cursor, None)
453
454 if exception_file:
455 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
456
457 if requested_file.metadata["type"] == "dir":
458 dir_cursor = self.fs.find({"filename": {"$regex": "^{}".format(f)}})
459
460 for tmp in dir_cursor:
461 self.fs.delete(tmp._id)
462 else:
463 self.fs.delete(requested_file._id)
464 if not found and not ignore_non_exist:
465 raise FsException("File {} does not exist".format(storage), http_code=HTTPStatus.NOT_FOUND)
466 except IOError as e:
467 raise FsException("File {} cannot be deleted: {}".format(f, e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
468
469 def sync(self, from_path=None):
470 """
471 Sync from FSMongo to local storage
472 :param from_path: if supplied, only copy content from this path, not all
473 :return: None
474 """
475 self.__update_local_fs(from_path=from_path)