allow partial sync for fsmongo
[osm/common.git] / osm_common / fsmongo.py
1 # Copyright 2019 Canonical
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
13 # under the License.
14 #
15 # For those usages not covered by the Apache License, Version 2.0 please
16 # contact: eduardo.sousa@canonical.com
17 ##
18
19 from io import BytesIO, StringIO
20 from pymongo import MongoClient
21 from gridfs import GridFSBucket, errors
22 import logging
23 from http import HTTPStatus
24 import os
25 from osm_common.fsbase import FsBase, FsException
26
27 __author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>"
28
29
30 class GridByteStream(BytesIO):
31 def __init__(self, filename, fs, mode):
32 BytesIO.__init__(self)
33 self._id = None
34 self.filename = filename
35 self.fs = fs
36 self.mode = mode
37 self.file_type = "file" # Set "file" as default file_type
38
39 self.__initialize__()
40
41 def __initialize__(self):
42 grid_file = None
43
44 cursor = self.fs.find({"filename": self.filename})
45
46 for requested_file in cursor:
47 exception_file = next(cursor, None)
48
49 if exception_file:
50 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
51
52 if requested_file.metadata["type"] in ("file", "sym"):
53 grid_file = requested_file
54 self.file_type = requested_file.metadata["type"]
55 else:
56 raise FsException("Type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
57
58 if grid_file:
59 self._id = grid_file._id
60 self.fs.download_to_stream(self._id, self)
61
62 if "r" in self.mode:
63 self.seek(0, 0)
64
65 def close(self):
66 if "r" in self.mode:
67 super(GridByteStream, self).close()
68 return
69
70 if self._id:
71 self.fs.delete(self._id)
72
73 cursor = self.fs.find({
74 "filename": self.filename.split("/")[0],
75 "metadata": {"type": "dir"}})
76
77 parent_dir = next(cursor, None)
78
79 if not parent_dir:
80 parent_dir_name = self.filename.split("/")[0]
81 self.filename = self.filename.replace(parent_dir_name, parent_dir_name[:-1], 1)
82
83 self.seek(0, 0)
84 if self._id:
85 self.fs.upload_from_stream_with_id(
86 self._id,
87 self.filename,
88 self,
89 metadata={"type": self.file_type}
90 )
91 else:
92 self.fs.upload_from_stream(
93 self.filename,
94 self,
95 metadata={"type": self.file_type}
96 )
97 super(GridByteStream, self).close()
98
99 def __enter__(self):
100 return self
101
102 def __exit__(self, exc_type, exc_val, exc_tb):
103 self.close()
104
105
106 class GridStringStream(StringIO):
107 def __init__(self, filename, fs, mode):
108 StringIO.__init__(self)
109 self._id = None
110 self.filename = filename
111 self.fs = fs
112 self.mode = mode
113 self.file_type = "file" # Set "file" as default file_type
114
115 self.__initialize__()
116
117 def __initialize__(self):
118 grid_file = None
119
120 cursor = self.fs.find({"filename": self.filename})
121
122 for requested_file in cursor:
123 exception_file = next(cursor, None)
124
125 if exception_file:
126 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
127
128 if requested_file.metadata["type"] in ("file", "dir"):
129 grid_file = requested_file
130 self.file_type = requested_file.metadata["type"]
131 else:
132 raise FsException("File type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
133
134 if grid_file:
135 stream = BytesIO()
136 self._id = grid_file._id
137 self.fs.download_to_stream(self._id, stream)
138 stream.seek(0)
139 self.write(stream.read().decode("utf-8"))
140 stream.close()
141
142 if "r" in self.mode:
143 self.seek(0, 0)
144
145 def close(self):
146 if "r" in self.mode:
147 super(GridStringStream, self).close()
148 return
149
150 if self._id:
151 self.fs.delete(self._id)
152
153 cursor = self.fs.find({
154 "filename": self.filename.split("/")[0],
155 "metadata": {"type": "dir"}})
156
157 parent_dir = next(cursor, None)
158
159 if not parent_dir:
160 parent_dir_name = self.filename.split("/")[0]
161 self.filename = self.filename.replace(parent_dir_name, parent_dir_name[:-1], 1)
162
163 self.seek(0, 0)
164 stream = BytesIO()
165 stream.write(self.read().encode("utf-8"))
166 stream.seek(0, 0)
167 if self._id:
168 self.fs.upload_from_stream_with_id(
169 self._id,
170 self.filename,
171 stream,
172 metadata={"type": self.file_type}
173 )
174 else:
175 self.fs.upload_from_stream(
176 self.filename,
177 stream,
178 metadata={"type": self.file_type}
179 )
180 stream.close()
181 super(GridStringStream, self).close()
182
183 def __enter__(self):
184 return self
185
186 def __exit__(self, exc_type, exc_val, exc_tb):
187 self.close()
188
189
190 class FsMongo(FsBase):
191
192 def __init__(self, logger_name='fs', lock=False):
193 super().__init__(logger_name, lock)
194 self.path = None
195 self.client = None
196 self.fs = None
197
198 def __update_local_fs(self, from_path=None):
199 dir_cursor = self.fs.find({"metadata.type": "dir"}, no_cursor_timeout=True)
200
201 for directory in dir_cursor:
202 if from_path and not directory.filename.startswith(from_path):
203 continue
204 os.makedirs(self.path + directory.filename, exist_ok=True)
205
206 file_cursor = self.fs.find({"metadata.type": {"$in": ["file", "sym"]}}, no_cursor_timeout=True)
207
208 for writing_file in file_cursor:
209 if from_path and not writing_file.filename.startswith(from_path):
210 continue
211 file_path = self.path + writing_file.filename
212
213 if writing_file.metadata["type"] == "sym":
214 with BytesIO() as b:
215 self.fs.download_to_stream(writing_file._id, b)
216 b.seek(0)
217 link = b.read().decode("utf-8")
218 os.symlink(link, file_path)
219 else:
220 with open(file_path, 'wb+') as file_stream:
221 self.fs.download_to_stream(writing_file._id, file_stream)
222 if "permissions" in writing_file.metadata:
223 os.chmod(file_path, writing_file.metadata["permissions"])
224
225 def get_params(self):
226 return {"fs": "mongo", "path": self.path}
227
228 def fs_connect(self, config):
229 try:
230 if "logger_name" in config:
231 self.logger = logging.getLogger(config["logger_name"])
232 if "path" in config:
233 self.path = config["path"]
234 else:
235 raise FsException("Missing parameter \"path\"")
236 if not self.path.endswith("/"):
237 self.path += "/"
238 if not os.path.exists(self.path):
239 raise FsException("Invalid configuration param at '[storage]': path '{}' does not exist".format(
240 config["path"]))
241 elif not os.access(self.path, os.W_OK):
242 raise FsException("Invalid configuration param at '[storage]': path '{}' is not writable".format(
243 config["path"]))
244 if all(key in config.keys() for key in ["uri", "collection"]):
245 self.client = MongoClient(config["uri"])
246 self.fs = GridFSBucket(self.client[config["collection"]])
247 elif all(key in config.keys() for key in ["host", "port", "collection"]):
248 self.client = MongoClient(config["host"], config["port"])
249 self.fs = GridFSBucket(self.client[config["collection"]])
250 else:
251 if "collection" not in config.keys():
252 raise FsException("Missing parameter \"collection\"")
253 else:
254 raise FsException("Missing parameters: \"uri\" or \"host\" + \"port\"")
255 except FsException:
256 raise
257 except Exception as e: # TODO refine
258 raise FsException(str(e))
259
260 def fs_disconnect(self):
261 pass # TODO
262
263 def mkdir(self, folder):
264 """
265 Creates a folder or parent object location
266 :param folder:
267 :return: None or raises an exception
268 """
269 try:
270 self.fs.upload_from_stream(
271 folder, BytesIO(), metadata={"type": "dir"})
272 except errors.FileExists: # make it idempotent
273 pass
274 except Exception as e:
275 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
276
277 def dir_rename(self, src, dst):
278 """
279 Rename one directory name. If dst exist, it replaces (deletes) existing directory
280 :param src: source directory
281 :param dst: destination directory
282 :return: None or raises and exception
283 """
284 try:
285 dst_cursor = self.fs.find(
286 {"filename": {"$regex": "^{}(/|$)".format(dst)}},
287 no_cursor_timeout=True)
288
289 for dst_file in dst_cursor:
290 self.fs.delete(dst_file._id)
291
292 src_cursor = self.fs.find(
293 {"filename": {"$regex": "^{}(/|$)".format(src)}},
294 no_cursor_timeout=True)
295
296 for src_file in src_cursor:
297 self.fs.rename(src_file._id, src_file.filename.replace(src, dst, 1))
298 except Exception as e:
299 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
300
301 def file_exists(self, storage, mode=None):
302 """
303 Indicates if "storage" file exist
304 :param storage: can be a str or a str list
305 :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
306 :return: True, False
307 """
308 f = storage if isinstance(storage, str) else "/".join(storage)
309
310 cursor = self.fs.find({"filename": f})
311
312 for requested_file in cursor:
313 exception_file = next(cursor, None)
314
315 if exception_file:
316 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
317
318 if requested_file.metadata["type"] == mode:
319 return True
320
321 if requested_file.metadata["type"] == "sym" and mode == "file":
322 return True
323
324 return False
325
326 def file_size(self, storage):
327 """
328 return file size
329 :param storage: can be a str or a str list
330 :return: file size
331 """
332 f = storage if isinstance(storage, str) else "/".join(storage)
333
334 cursor = self.fs.find({"filename": f})
335
336 for requested_file in cursor:
337 exception_file = next(cursor, None)
338
339 if exception_file:
340 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
341
342 return requested_file.length
343
344 def file_extract(self, tar_object, path):
345 """
346 extract a tar file
347 :param tar_object: object of type tar
348 :param path: can be a str or a str list, or a tar object where to extract the tar_object
349 :return: None
350 """
351 f = path if isinstance(path, str) else "/".join(path)
352
353 for member in tar_object.getmembers():
354 if member.isfile():
355 stream = tar_object.extractfile(member)
356 elif member.issym():
357 stream = BytesIO(member.linkname.encode("utf-8"))
358 else:
359 stream = BytesIO()
360
361 if member.isfile():
362 file_type = "file"
363 elif member.issym():
364 file_type = "sym"
365 else:
366 file_type = "dir"
367
368 metadata = {
369 "type": file_type,
370 "permissions": member.mode
371 }
372
373 self.fs.upload_from_stream(
374 f + "/" + member.name,
375 stream,
376 metadata=metadata
377 )
378
379 stream.close()
380
381 def file_open(self, storage, mode):
382 """
383 Open a file
384 :param storage: can be a str or list of str
385 :param mode: file mode
386 :return: file object
387 """
388 try:
389 f = storage if isinstance(storage, str) else "/".join(storage)
390
391 if "b" in mode:
392 return GridByteStream(f, self.fs, mode)
393 else:
394 return GridStringStream(f, self.fs, mode)
395 except errors.NoFile:
396 raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
397 except IOError:
398 raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
399
400 def dir_ls(self, storage):
401 """
402 return folder content
403 :param storage: can be a str or list of str
404 :return: folder content
405 """
406 try:
407 f = storage if isinstance(storage, str) else "/".join(storage)
408
409 files = []
410 dir_cursor = self.fs.find({"filename": f})
411 for requested_dir in dir_cursor:
412 exception_dir = next(dir_cursor, None)
413
414 if exception_dir:
415 raise FsException("Multiple directories found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
416
417 if requested_dir.metadata["type"] != "dir":
418 raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
419
420 files_cursor = self.fs.find({"filename": {"$regex": "^{}/([^/])*".format(f)}})
421 for children_file in files_cursor:
422 files += [children_file.filename.replace(f + '/', '', 1)]
423
424 return files
425 except IOError:
426 raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
427
428 def file_delete(self, storage, ignore_non_exist=False):
429 """
430 Delete storage content recursively
431 :param storage: can be a str or list of str
432 :param ignore_non_exist: not raise exception if storage does not exist
433 :return: None
434 """
435 try:
436 f = storage if isinstance(storage, str) else "/".join(storage)
437
438 file_cursor = self.fs.find({"filename": f})
439 found = False
440 for requested_file in file_cursor:
441 found = True
442 exception_file = next(file_cursor, None)
443
444 if exception_file:
445 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
446
447 if requested_file.metadata["type"] == "dir":
448 dir_cursor = self.fs.find({"filename": {"$regex": "^{}".format(f)}})
449
450 for tmp in dir_cursor:
451 self.fs.delete(tmp._id)
452 else:
453 self.fs.delete(requested_file._id)
454 if not found and not ignore_non_exist:
455 raise FsException("File {} does not exist".format(storage), http_code=HTTPStatus.NOT_FOUND)
456 except IOError as e:
457 raise FsException("File {} cannot be deleted: {}".format(f, e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
458
459 def sync(self, from_path=None):
460 """
461 Sync from FSMongo to local storage
462 :param from_path: if supplied, only copy content from this path, not all
463 :return: None
464 """
465 self.__update_local_fs(from_path=from_path)