cd7f7f0e81f2f2196259997d8c638c61f576ff4c
[osm/common.git] / osm_common / fsmongo.py
1 # Copyright 2019 Canonical
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
13 # under the License.
14 #
15 # For those usages not covered by the Apache License, Version 2.0 please
16 # contact: eduardo.sousa@canonical.com
17 ##
18
19 from io import BytesIO, StringIO
20 from pymongo import MongoClient
21 from gridfs import GridFSBucket, errors
22 import logging
23 from http import HTTPStatus
24 import os
25 import stat
26 from osm_common.fsbase import FsBase, FsException
27
28 __author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>"
29
30
31 class GridByteStream(BytesIO):
32 def __init__(self, filename, fs, mode):
33 BytesIO.__init__(self)
34 self._id = None
35 self.filename = filename
36 self.fs = fs
37 self.mode = mode
38
39 self.__initialize__()
40
41 def __initialize__(self):
42 grid_file = None
43
44 cursor = self.fs.find({"filename": self.filename})
45
46 for requested_file in cursor:
47 exception_file = next(cursor, None)
48
49 if exception_file:
50 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
51
52 if requested_file.metadata["type"] in ("file", "sym"):
53 grid_file = requested_file
54 self.file_type = requested_file.metadata["type"]
55 else:
56 raise FsException("Type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
57
58 if grid_file:
59 self._id = grid_file._id
60 self.fs.download_to_stream(self._id, self)
61
62 if "r" in self.mode:
63 self.seek(0, 0)
64
65 def close(self):
66 if "r" in self.mode:
67 super(GridByteStream, self).close()
68 return
69
70 if self._id:
71 self.fs.delete(self._id)
72
73 cursor = self.fs.find({
74 "filename": self.filename.split("/")[0],
75 "metadata": {"type": "dir"}})
76
77 parent_dir = next(cursor, None)
78
79 if not parent_dir:
80 parent_dir_name = self.filename.split("/")[0]
81 self.filename = self.filename.replace(parent_dir_name, parent_dir_name[:-1], 1)
82
83 self.seek(0, 0)
84 if self._id:
85 self.fs.upload_from_stream_with_id(
86 self._id,
87 self.filename,
88 self,
89 metadata={"type": self.file_type}
90 )
91 else:
92 self.fs.upload_from_stream(
93 self.filename,
94 self,
95 metadata={"type": self.file_type}
96 )
97 super(GridByteStream, self).close()
98
99 def __enter__(self):
100 return self
101
102 def __exit__(self, exc_type, exc_val, exc_tb):
103 self.close()
104
105
106 class GridStringStream(StringIO):
107 def __init__(self, filename, fs, mode):
108 StringIO.__init__(self)
109 self._id = None
110 self.filename = filename
111 self.fs = fs
112 self.mode = mode
113
114 self.__initialize__()
115
116 def __initialize__(self):
117 grid_file = None
118
119 cursor = self.fs.find({"filename": self.filename})
120
121 for requested_file in cursor:
122 exception_file = next(cursor, None)
123
124 if exception_file:
125 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
126
127 if requested_file.metadata["type"] in ("file", "dir"):
128 grid_file = requested_file
129 self.file_type = requested_file.metadata["type"]
130 else:
131 raise FsException("File type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
132
133 if grid_file:
134 stream = BytesIO()
135 self._id = grid_file._id
136 self.fs.download_to_stream(self._id, stream)
137 stream.seek(0)
138 self.write(stream.read().decode("utf-8"))
139 stream.close()
140
141 if "r" in self.mode:
142 self.seek(0, 0)
143
144 def close(self):
145 if "r" in self.mode:
146 super(GridStringStream, self).close()
147 return
148
149 if self._id:
150 self.fs.delete(self._id)
151
152 cursor = self.fs.find({
153 "filename": self.filename.split("/")[0],
154 "metadata": {"type": "dir"}})
155
156 parent_dir = next(cursor, None)
157
158 if not parent_dir:
159 parent_dir_name = self.filename.split("/")[0]
160 self.filename = self.filename.replace(parent_dir_name, parent_dir_name[:-1], 1)
161
162 self.seek(0, 0)
163 stream = BytesIO()
164 stream.write(self.read().encode("utf-8"))
165 stream.seek(0, 0)
166 if self._id:
167 self.fs.upload_from_stream_with_id(
168 self._id,
169 self.filename,
170 stream,
171 metadata={"type": self.file_type}
172 )
173 else:
174 self.fs.upload_from_stream(
175 self.filename,
176 stream,
177 metadata={"type": self.file_type}
178 )
179 stream.close()
180 super(GridStringStream, self).close()
181
182 def __enter__(self):
183 return self
184
185 def __exit__(self, exc_type, exc_val, exc_tb):
186 self.close()
187
188
189 class FsMongo(FsBase):
190
191 def __init__(self, logger_name='fs', lock=False):
192 super().__init__(logger_name, lock)
193 self.path = None
194 self.client = None
195 self.fs = None
196
197 def __update_local_fs(self):
198 dir_cursor = self.fs.find({"metadata.type": "dir"}, no_cursor_timeout=True)
199
200 for directory in dir_cursor:
201 os.makedirs(self.path + directory.filename, exist_ok=True)
202
203 file_cursor = self.fs.find({"metadata.type": {"$elemMatch": ["file", "sym"]}}, no_cursor_timeout=True)
204
205 for writing_file in file_cursor:
206 file_path = self.path + writing_file.filename
207 file_stream = open(file_path, 'wb+')
208 self.fs.download_to_stream(writing_file._id, file_stream)
209 file_stream.close()
210 if "permissions" in writing_file.metadata:
211 if writing_file.metadata["type"] == "sym":
212 os.chmod(
213 file_path,
214 writing_file.metadata["permissions"] | stat.S_IFLNK
215 )
216 else:
217 os.chmod(file_path, writing_file.metadata["permissions"])
218
219 def get_params(self):
220 return {"fs": "mongo", "path": self.path}
221
222 def fs_connect(self, config):
223 try:
224 if "logger_name" in config:
225 self.logger = logging.getLogger(config["logger_name"])
226 if "path" in config:
227 self.path = config["path"]
228 else:
229 raise FsException("Missing parameter \"path\"")
230 if not self.path.endswith("/"):
231 self.path += "/"
232 if not os.path.exists(self.path):
233 raise FsException("Invalid configuration param at '[storage]': path '{}' does not exist".format(
234 config["path"]))
235 elif not os.access(self.path, os.W_OK):
236 raise FsException("Invalid configuration param at '[storage]': path '{}' is not writable".format(
237 config["path"]))
238 if all(key in config.keys() for key in ["uri", "collection"]):
239 self.client = MongoClient(config["uri"])
240 self.fs = GridFSBucket(self.client[config["collection"]])
241 elif all(key in config.keys() for key in ["host", "port", "collection"]):
242 self.client = MongoClient(config["host"], config["port"])
243 self.fs = GridFSBucket(self.client[config["collection"]])
244 else:
245 if "collection" not in config.keys():
246 raise FsException("Missing parameter \"collection\"")
247 else:
248 raise FsException("Missing parameters: \"uri\" or \"host\" + \"port\"")
249 except FsException:
250 raise
251 except Exception as e: # TODO refine
252 raise FsException(str(e))
253
254 def fs_disconnect(self):
255 pass # TODO
256
257 def mkdir(self, folder):
258 """
259 Creates a folder or parent object location
260 :param folder:
261 :return: None or raises an exception
262 """
263 try:
264 self.fs.upload_from_stream(
265 folder, BytesIO(), metadata={"type": "dir"})
266 except errors.FileExists: # make it idempotent
267 pass
268 except Exception as e:
269 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
270
271 def dir_rename(self, src, dst):
272 """
273 Rename one directory name. If dst exist, it replaces (deletes) existing directory
274 :param src: source directory
275 :param dst: destination directory
276 :return: None or raises and exception
277 """
278 try:
279 dst_cursor = self.fs.find(
280 {"filename": {"$regex": "^{}(/|$)".format(dst)}},
281 no_cursor_timeout=True)
282
283 for dst_file in dst_cursor:
284 self.fs.delete(dst_file._id)
285
286 src_cursor = self.fs.find(
287 {"filename": {"$regex": "^{}(/|$)".format(src)}},
288 no_cursor_timeout=True)
289
290 for src_file in src_cursor:
291 self.fs.rename(src_file._id, src_file.filename.replace(src, dst, 1))
292 except Exception as e:
293 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
294
295 def file_exists(self, storage, mode=None):
296 """
297 Indicates if "storage" file exist
298 :param storage: can be a str or a str list
299 :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
300 :return: True, False
301 """
302 f = storage if isinstance(storage, str) else "/".join(storage)
303
304 cursor = self.fs.find({"filename": f})
305
306 for requested_file in cursor:
307 exception_file = next(cursor, None)
308
309 if exception_file:
310 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
311
312 if requested_file.metadata["type"] == mode:
313 return True
314
315 if requested_file.metadata["type"] == "sym" and mode == "file":
316 return True
317
318 return False
319
320 def file_size(self, storage):
321 """
322 return file size
323 :param storage: can be a str or a str list
324 :return: file size
325 """
326 f = storage if isinstance(storage, str) else "/".join(storage)
327
328 cursor = self.fs.find({"filename": f})
329
330 for requested_file in cursor:
331 exception_file = next(cursor, None)
332
333 if exception_file:
334 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
335
336 return requested_file.length
337
338 def file_extract(self, tar_object, path):
339 """
340 extract a tar file
341 :param tar_object: object of type tar
342 :param path: can be a str or a str list, or a tar object where to extract the tar_object
343 :return: None
344 """
345 f = path if isinstance(path, str) else "/".join(path)
346
347 for member in tar_object.getmembers():
348 if member.isfile():
349 stream = tar_object.extractfile(member)
350 else:
351 stream = BytesIO()
352
353 if member.isfile():
354 file_type = "file"
355 elif member.issym():
356 file_type = "sym"
357 else:
358 file_type = "dir"
359
360 metadata = {
361 "type": file_type,
362 "permissions": member.mode
363 }
364
365 self.fs.upload_from_stream(
366 f + "/" + member.name,
367 stream,
368 metadata=metadata
369 )
370
371 stream.close()
372
373 def file_open(self, storage, mode):
374 """
375 Open a file
376 :param storage: can be a str or list of str
377 :param mode: file mode
378 :return: file object
379 """
380 try:
381 f = storage if isinstance(storage, str) else "/".join(storage)
382
383 if "b" in mode:
384 return GridByteStream(f, self.fs, mode)
385 else:
386 return GridStringStream(f, self.fs, mode)
387 except errors.NoFile:
388 raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
389 except IOError:
390 raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
391
392 def dir_ls(self, storage):
393 """
394 return folder content
395 :param storage: can be a str or list of str
396 :return: folder content
397 """
398 try:
399 f = storage if isinstance(storage, str) else "/".join(storage)
400
401 files = []
402 dir_cursor = self.fs.find({"filename": f})
403 for requested_dir in dir_cursor:
404 exception_dir = next(dir_cursor, None)
405
406 if exception_dir:
407 raise FsException("Multiple directories found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
408
409 if requested_dir.metadata["type"] != "dir":
410 raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
411
412 files_cursor = self.fs.find({"filename": {"$regex": "^{}/([^/])*".format(f)}})
413 for children_file in files_cursor:
414 files += [children_file.filename.replace(f + '/', '', 1)]
415
416 return files
417 except IOError:
418 raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
419
420 def file_delete(self, storage, ignore_non_exist=False):
421 """
422 Delete storage content recursively
423 :param storage: can be a str or list of str
424 :param ignore_non_exist: not raise exception if storage does not exist
425 :return: None
426 """
427 try:
428 f = storage if isinstance(storage, str) else "/".join(storage)
429
430 file_cursor = self.fs.find({"filename": f})
431 found = False
432 for requested_file in file_cursor:
433 found = True
434 exception_file = next(file_cursor, None)
435
436 if exception_file:
437 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
438
439 if requested_file.metadata["type"] == "dir":
440 dir_cursor = self.fs.find({"filename": {"$regex": "^{}".format(f)}})
441
442 for tmp in dir_cursor:
443 self.fs.delete(tmp._id)
444 else:
445 self.fs.delete(requested_file._id)
446 if not found and not ignore_non_exist:
447 raise FsException("File {} does not exist".format(storage), http_code=HTTPStatus.NOT_FOUND)
448 except IOError as e:
449 raise FsException("File {} cannot be deleted: {}".format(f, e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
450
451 def sync(self):
452 """
453 Sync from FSMongo to local storage
454 """
455 self.__update_local_fs()