Adding FsMongo
[osm/common.git] / osm_common / fsmongo.py
1 # Copyright 2019 Canonical
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
13 # under the License.
14 #
15 # For those usages not covered by the Apache License, Version 2.0 please
16 # contact: eduardo.sousa@canonical.com
17 ##
18
19 from io import BytesIO, StringIO
20 from pymongo import MongoClient
21 from gridfs import GridFSBucket, errors
22 import logging
23 from http import HTTPStatus
24 import os
25 from osm_common.fsbase import FsBase, FsException
26
27 __author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>"
28
29
30 class GridByteStream(BytesIO):
31 def __init__(self, filename, fs, mode):
32 BytesIO.__init__(self)
33 self._id = None
34 self.filename = filename
35 self.fs = fs
36 self.mode = mode
37
38 self.__initialize__()
39
40 def __initialize__(self):
41 grid_file = None
42
43 cursor = self.fs.find({"filename": self.filename})
44
45 for requested_file in cursor:
46 exception_file = next(cursor, None)
47
48 if exception_file:
49 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
50
51 if requested_file.metadata["type"] == "file":
52 grid_file = requested_file
53 else:
54 raise FsException("Type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
55
56 if grid_file:
57 self._id = grid_file._id
58 self.fs.download_to_stream(self._id, self)
59
60 if "r" in self.mode:
61 self.seek(0, 0)
62
63 def close(self):
64 if "r" in self.mode:
65 super(GridByteStream, self).close()
66 return
67
68 if self._id:
69 self.fs.delete(self._id)
70
71 cursor = self.fs.find({
72 "filename": self.filename.split("/")[0],
73 "metadata": {"type": "dir"}})
74
75 parent_dir = next(cursor, None)
76
77 if not parent_dir:
78 parent_dir_name = self.filename.split("/")[0]
79 self.filename = self.filename.replace(parent_dir_name, parent_dir_name[:-1], 1)
80
81 self.seek(0, 0)
82 if self._id:
83 self.fs.upload_from_stream_with_id(
84 self._id,
85 self.filename,
86 self,
87 metadata={"type": "file"}
88 )
89 else:
90 self.fs.upload_from_stream(
91 self.filename,
92 self,
93 metadata={"type": "file"}
94 )
95 super(GridByteStream, self).close()
96
97 def __enter__(self):
98 return self
99
100 def __exit__(self, exc_type, exc_val, exc_tb):
101 self.close()
102
103
104 class GridStringStream(StringIO):
105 def __init__(self, filename, fs, mode):
106 StringIO.__init__(self)
107 self._id = None
108 self.filename = filename
109 self.fs = fs
110 self.mode = mode
111
112 self.__initialize__()
113
114 def __initialize__(self):
115 grid_file = None
116
117 cursor = self.fs.find({"filename": self.filename})
118
119 for requested_file in cursor:
120 exception_file = next(cursor, None)
121
122 if exception_file:
123 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
124
125 if requested_file.metadata["type"] == "file":
126 grid_file = requested_file
127 else:
128 raise FsException("File type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
129
130 if grid_file:
131 stream = BytesIO()
132 self._id = grid_file._id
133 self.fs.download_to_stream(self._id, stream)
134 stream.seek(0)
135 self.write(stream.read().decode("utf-8"))
136 stream.close()
137
138 if "r" in self.mode:
139 self.seek(0, 0)
140
141 def close(self):
142 if "r" in self.mode:
143 super(GridStringStream, self).close()
144 return
145
146 if self._id:
147 self.fs.delete(self._id)
148
149 cursor = self.fs.find({
150 "filename": self.filename.split("/")[0],
151 "metadata": {"type": "dir"}})
152
153 parent_dir = next(cursor, None)
154
155 if not parent_dir:
156 parent_dir_name = self.filename.split("/")[0]
157 self.filename = self.filename.replace(parent_dir_name, parent_dir_name[:-1], 1)
158
159 self.seek(0, 0)
160 stream = BytesIO()
161 stream.write(self.read().encode("utf-8"))
162 stream.seek(0, 0)
163 if self._id:
164 self.fs.upload_from_stream_with_id(
165 self._id,
166 self.filename,
167 stream,
168 metadata={"type": "file"}
169 )
170 else:
171 self.fs.upload_from_stream(
172 self.filename,
173 stream,
174 metadata={"type": "file"}
175 )
176 stream.close()
177 super(GridStringStream, self).close()
178
179 def __enter__(self):
180 return self
181
182 def __exit__(self, exc_type, exc_val, exc_tb):
183 self.close()
184
185
186 class FsMongo(FsBase):
187
188 def __init__(self, logger_name='fs', lock=False):
189 super().__init__(logger_name, lock)
190 self.path = None
191 self.client = None
192 self.fs = None
193
194 def __update_local_fs(self):
195 dir_cursor = self.fs.find({"metadata.type": "dir"}, no_cursor_timeout=True)
196
197 for directory in dir_cursor:
198 os.makedirs(self.path + directory.filename, exist_ok=True)
199
200 file_cursor = self.fs.find({"metadata.type": "file"}, no_cursor_timeout=True)
201
202 for writing_file in file_cursor:
203 file_path = self.path + writing_file.filename
204 file_stream = open(file_path, 'wb+')
205 self.fs.download_to_stream(writing_file._id, file_stream)
206 file_stream.close()
207 if "permissions" in writing_file.metadata:
208 os.chmod(file_path, writing_file.metadata["permissions"])
209
210 def get_params(self):
211 self.__update_local_fs()
212
213 return {"fs": "mongo", "path": self.path}
214
215 def fs_connect(self, config):
216 try:
217 if "logger_name" in config:
218 self.logger = logging.getLogger(config["logger_name"])
219 if "path" in config:
220 self.path = config["path"]
221 else:
222 raise FsException("Missing parameter \"path\"")
223 if not self.path.endswith("/"):
224 self.path += "/"
225 if not os.path.exists(self.path):
226 raise FsException("Invalid configuration param at '[storage]': path '{}' does not exist".format(
227 config["path"]))
228 elif not os.access(self.path, os.W_OK):
229 raise FsException("Invalid configuration param at '[storage]': path '{}' is not writable".format(
230 config["path"]))
231 if all(key in config.keys() for key in ["uri", "collection"]):
232 self.client = MongoClient(config["uri"])
233 self.fs = GridFSBucket(self.client[config["collection"]])
234 elif all(key in config.keys() for key in ["host", "port", "collection"]):
235 self.client = MongoClient(config["host"], config["port"])
236 self.fs = GridFSBucket(self.client[config["collection"]])
237 else:
238 if "collection" not in config.keys():
239 raise FsException("Missing parameter \"collection\"")
240 else:
241 raise FsException("Missing parameters: \"uri\" or \"host\" + \"port\"")
242 except FsException:
243 raise
244 except Exception as e: # TODO refine
245 raise FsException(str(e))
246
247 def fs_disconnect(self):
248 pass # TODO
249
250 def mkdir(self, folder):
251 """
252 Creates a folder or parent object location
253 :param folder:
254 :return: None or raises an exception
255 """
256 try:
257 self.fs.upload_from_stream(
258 folder, BytesIO(), metadata={"type": "dir"})
259 except errors.FileExists: # make it idempotent
260 pass
261 except Exception as e:
262 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
263
264 def dir_rename(self, src, dst):
265 """
266 Rename one directory name. If dst exist, it replaces (deletes) existing directory
267 :param src: source directory
268 :param dst: destination directory
269 :return: None or raises and exception
270 """
271 try:
272 dst_cursor = self.fs.find(
273 {"filename": {"$regex": "^{}(/|$)".format(dst)}},
274 no_cursor_timeout=True)
275
276 for dst_file in dst_cursor:
277 self.fs.delete(dst_file._id)
278
279 src_cursor = self.fs.find(
280 {"filename": {"$regex": "^{}(/|$)".format(src)}},
281 no_cursor_timeout=True)
282
283 for src_file in src_cursor:
284 self.fs.rename(src_file._id, src_file.filename.replace(src, dst, 1))
285 except Exception as e:
286 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
287
288 def file_exists(self, storage, mode=None):
289 """
290 Indicates if "storage" file exist
291 :param storage: can be a str or a str list
292 :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
293 :return: True, False
294 """
295 f = storage if isinstance(storage, str) else "/".join(storage)
296
297 cursor = self.fs.find({"filename": f})
298
299 for requested_file in cursor:
300 exception_file = next(cursor, None)
301
302 if exception_file:
303 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
304
305 if requested_file.metadata["type"] == mode:
306 return True
307
308 return False
309
310 def file_size(self, storage):
311 """
312 return file size
313 :param storage: can be a str or a str list
314 :return: file size
315 """
316 f = storage if isinstance(storage, str) else "/".join(storage)
317
318 cursor = self.fs.find({"filename": f})
319
320 for requested_file in cursor:
321 exception_file = next(cursor, None)
322
323 if exception_file:
324 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
325
326 return requested_file.length
327
328 def file_extract(self, tar_object, path):
329 """
330 extract a tar file
331 :param tar_object: object of type tar
332 :param path: can be a str or a str list, or a tar object where to extract the tar_object
333 :return: None
334 """
335 f = path if isinstance(path, str) else "/".join(path)
336
337 for member in tar_object.getmembers():
338 if member.isfile():
339 stream = tar_object.extractfile(member)
340 else:
341 stream = BytesIO()
342
343 metadata = {
344 "type": "file" if member.isfile() else "dir",
345 "permissions": member.mode
346 }
347
348 self.fs.upload_from_stream(
349 f + "/" + member.name,
350 stream,
351 metadata=metadata
352 )
353
354 stream.close()
355
356 def file_open(self, storage, mode):
357 """
358 Open a file
359 :param storage: can be a str or list of str
360 :param mode: file mode
361 :return: file object
362 """
363 try:
364 f = storage if isinstance(storage, str) else "/".join(storage)
365
366 if "b" in mode:
367 return GridByteStream(f, self.fs, mode)
368 else:
369 return GridStringStream(f, self.fs, mode)
370 except errors.NoFile:
371 raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
372 except IOError:
373 raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
374
375 def dir_ls(self, storage):
376 """
377 return folder content
378 :param storage: can be a str or list of str
379 :return: folder content
380 """
381 try:
382 f = storage if isinstance(storage, str) else "/".join(storage)
383
384 files = []
385 dir_cursor = self.fs.find({"filename": f})
386 for requested_dir in dir_cursor:
387 exception_dir = next(dir_cursor, None)
388
389 if exception_dir:
390 raise FsException("Multiple directories found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
391
392 if requested_dir.metadata["type"] != "dir":
393 raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
394
395 files_cursor = self.fs.find({"filename": {"$regex": "^{}/([^/])*".format(f)}})
396 for children_file in files_cursor:
397 files += [children_file.filename.replace(f + '/', '', 1)]
398
399 return files
400 except IOError:
401 raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
402
403 def file_delete(self, storage, ignore_non_exist=False):
404 """
405 Delete storage content recursively
406 :param storage: can be a str or list of str
407 :param ignore_non_exist: not raise exception if storage does not exist
408 :return: None
409 """
410 try:
411 f = storage if isinstance(storage, str) else "/".join(storage)
412
413 file_cursor = self.fs.find({"filename": f})
414 found = False
415 for requested_file in file_cursor:
416 found = True
417 exception_file = next(file_cursor, None)
418
419 if exception_file:
420 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
421
422 if requested_file.metadata["type"] == "dir":
423 dir_cursor = self.fs.find({"filename": {"$regex": "^{}".format(f)}})
424
425 for tmp in dir_cursor:
426 self.fs.delete(tmp._id)
427 else:
428 self.fs.delete(requested_file._id)
429 if not found and not ignore_non_exist:
430 raise FsException("File {} does not exist".format(storage), http_code=HTTPStatus.NOT_FOUND)
431 except IOError as e:
432 raise FsException("File {} cannot be deleted: {}".format(f, e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)