Fix bug 1112: Save content of symlinks with FSMongo
[osm/common.git] / osm_common / fsmongo.py
1 # Copyright 2019 Canonical
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
13 # under the License.
14 #
15 # For those usages not covered by the Apache License, Version 2.0 please
16 # contact: eduardo.sousa@canonical.com
17 ##
18
19 from io import BytesIO, StringIO
20 from pymongo import MongoClient
21 from gridfs import GridFSBucket, errors
22 import logging
23 from http import HTTPStatus
24 import os
25 from osm_common.fsbase import FsBase, FsException
26
27 __author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>"
28
29
30 class GridByteStream(BytesIO):
31 def __init__(self, filename, fs, mode):
32 BytesIO.__init__(self)
33 self._id = None
34 self.filename = filename
35 self.fs = fs
36 self.mode = mode
37 self.file_type = "file" # Set "file" as default file_type
38
39 self.__initialize__()
40
41 def __initialize__(self):
42 grid_file = None
43
44 cursor = self.fs.find({"filename": self.filename})
45
46 for requested_file in cursor:
47 exception_file = next(cursor, None)
48
49 if exception_file:
50 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
51
52 if requested_file.metadata["type"] in ("file", "sym"):
53 grid_file = requested_file
54 self.file_type = requested_file.metadata["type"]
55 else:
56 raise FsException("Type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
57
58 if grid_file:
59 self._id = grid_file._id
60 self.fs.download_to_stream(self._id, self)
61
62 if "r" in self.mode:
63 self.seek(0, 0)
64
65 def close(self):
66 if "r" in self.mode:
67 super(GridByteStream, self).close()
68 return
69
70 if self._id:
71 self.fs.delete(self._id)
72
73 cursor = self.fs.find({
74 "filename": self.filename.split("/")[0],
75 "metadata": {"type": "dir"}})
76
77 parent_dir = next(cursor, None)
78
79 if not parent_dir:
80 parent_dir_name = self.filename.split("/")[0]
81 self.filename = self.filename.replace(parent_dir_name, parent_dir_name[:-1], 1)
82
83 self.seek(0, 0)
84 if self._id:
85 self.fs.upload_from_stream_with_id(
86 self._id,
87 self.filename,
88 self,
89 metadata={"type": self.file_type}
90 )
91 else:
92 self.fs.upload_from_stream(
93 self.filename,
94 self,
95 metadata={"type": self.file_type}
96 )
97 super(GridByteStream, self).close()
98
99 def __enter__(self):
100 return self
101
102 def __exit__(self, exc_type, exc_val, exc_tb):
103 self.close()
104
105
106 class GridStringStream(StringIO):
107 def __init__(self, filename, fs, mode):
108 StringIO.__init__(self)
109 self._id = None
110 self.filename = filename
111 self.fs = fs
112 self.mode = mode
113 self.file_type = "file" # Set "file" as default file_type
114
115 self.__initialize__()
116
117 def __initialize__(self):
118 grid_file = None
119
120 cursor = self.fs.find({"filename": self.filename})
121
122 for requested_file in cursor:
123 exception_file = next(cursor, None)
124
125 if exception_file:
126 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
127
128 if requested_file.metadata["type"] in ("file", "dir"):
129 grid_file = requested_file
130 self.file_type = requested_file.metadata["type"]
131 else:
132 raise FsException("File type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
133
134 if grid_file:
135 stream = BytesIO()
136 self._id = grid_file._id
137 self.fs.download_to_stream(self._id, stream)
138 stream.seek(0)
139 self.write(stream.read().decode("utf-8"))
140 stream.close()
141
142 if "r" in self.mode:
143 self.seek(0, 0)
144
145 def close(self):
146 if "r" in self.mode:
147 super(GridStringStream, self).close()
148 return
149
150 if self._id:
151 self.fs.delete(self._id)
152
153 cursor = self.fs.find({
154 "filename": self.filename.split("/")[0],
155 "metadata": {"type": "dir"}})
156
157 parent_dir = next(cursor, None)
158
159 if not parent_dir:
160 parent_dir_name = self.filename.split("/")[0]
161 self.filename = self.filename.replace(parent_dir_name, parent_dir_name[:-1], 1)
162
163 self.seek(0, 0)
164 stream = BytesIO()
165 stream.write(self.read().encode("utf-8"))
166 stream.seek(0, 0)
167 if self._id:
168 self.fs.upload_from_stream_with_id(
169 self._id,
170 self.filename,
171 stream,
172 metadata={"type": self.file_type}
173 )
174 else:
175 self.fs.upload_from_stream(
176 self.filename,
177 stream,
178 metadata={"type": self.file_type}
179 )
180 stream.close()
181 super(GridStringStream, self).close()
182
183 def __enter__(self):
184 return self
185
186 def __exit__(self, exc_type, exc_val, exc_tb):
187 self.close()
188
189
190 class FsMongo(FsBase):
191
192 def __init__(self, logger_name='fs', lock=False):
193 super().__init__(logger_name, lock)
194 self.path = None
195 self.client = None
196 self.fs = None
197
198 def __update_local_fs(self):
199 dir_cursor = self.fs.find({"metadata.type": "dir"}, no_cursor_timeout=True)
200
201 for directory in dir_cursor:
202 os.makedirs(self.path + directory.filename, exist_ok=True)
203
204 file_cursor = self.fs.find({"metadata.type": {"$in": ["file", "sym"]}}, no_cursor_timeout=True)
205
206 for writing_file in file_cursor:
207 file_path = self.path + writing_file.filename
208
209 if writing_file.metadata["type"] == "sym":
210 with BytesIO() as b:
211 self.fs.download_to_stream(writing_file._id, b)
212 b.seek(0)
213 link = b.read().decode("utf-8")
214 os.symlink(link, file_path)
215 else:
216 with open(file_path, 'wb+') as file_stream:
217 self.fs.download_to_stream(writing_file._id, file_stream)
218 if "permissions" in writing_file.metadata:
219 os.chmod(file_path, writing_file.metadata["permissions"])
220
221 def get_params(self):
222 return {"fs": "mongo", "path": self.path}
223
224 def fs_connect(self, config):
225 try:
226 if "logger_name" in config:
227 self.logger = logging.getLogger(config["logger_name"])
228 if "path" in config:
229 self.path = config["path"]
230 else:
231 raise FsException("Missing parameter \"path\"")
232 if not self.path.endswith("/"):
233 self.path += "/"
234 if not os.path.exists(self.path):
235 raise FsException("Invalid configuration param at '[storage]': path '{}' does not exist".format(
236 config["path"]))
237 elif not os.access(self.path, os.W_OK):
238 raise FsException("Invalid configuration param at '[storage]': path '{}' is not writable".format(
239 config["path"]))
240 if all(key in config.keys() for key in ["uri", "collection"]):
241 self.client = MongoClient(config["uri"])
242 self.fs = GridFSBucket(self.client[config["collection"]])
243 elif all(key in config.keys() for key in ["host", "port", "collection"]):
244 self.client = MongoClient(config["host"], config["port"])
245 self.fs = GridFSBucket(self.client[config["collection"]])
246 else:
247 if "collection" not in config.keys():
248 raise FsException("Missing parameter \"collection\"")
249 else:
250 raise FsException("Missing parameters: \"uri\" or \"host\" + \"port\"")
251 except FsException:
252 raise
253 except Exception as e: # TODO refine
254 raise FsException(str(e))
255
256 def fs_disconnect(self):
257 pass # TODO
258
259 def mkdir(self, folder):
260 """
261 Creates a folder or parent object location
262 :param folder:
263 :return: None or raises an exception
264 """
265 try:
266 self.fs.upload_from_stream(
267 folder, BytesIO(), metadata={"type": "dir"})
268 except errors.FileExists: # make it idempotent
269 pass
270 except Exception as e:
271 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
272
273 def dir_rename(self, src, dst):
274 """
275 Rename one directory name. If dst exist, it replaces (deletes) existing directory
276 :param src: source directory
277 :param dst: destination directory
278 :return: None or raises and exception
279 """
280 try:
281 dst_cursor = self.fs.find(
282 {"filename": {"$regex": "^{}(/|$)".format(dst)}},
283 no_cursor_timeout=True)
284
285 for dst_file in dst_cursor:
286 self.fs.delete(dst_file._id)
287
288 src_cursor = self.fs.find(
289 {"filename": {"$regex": "^{}(/|$)".format(src)}},
290 no_cursor_timeout=True)
291
292 for src_file in src_cursor:
293 self.fs.rename(src_file._id, src_file.filename.replace(src, dst, 1))
294 except Exception as e:
295 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
296
297 def file_exists(self, storage, mode=None):
298 """
299 Indicates if "storage" file exist
300 :param storage: can be a str or a str list
301 :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
302 :return: True, False
303 """
304 f = storage if isinstance(storage, str) else "/".join(storage)
305
306 cursor = self.fs.find({"filename": f})
307
308 for requested_file in cursor:
309 exception_file = next(cursor, None)
310
311 if exception_file:
312 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
313
314 if requested_file.metadata["type"] == mode:
315 return True
316
317 if requested_file.metadata["type"] == "sym" and mode == "file":
318 return True
319
320 return False
321
322 def file_size(self, storage):
323 """
324 return file size
325 :param storage: can be a str or a str list
326 :return: file size
327 """
328 f = storage if isinstance(storage, str) else "/".join(storage)
329
330 cursor = self.fs.find({"filename": f})
331
332 for requested_file in cursor:
333 exception_file = next(cursor, None)
334
335 if exception_file:
336 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
337
338 return requested_file.length
339
340 def file_extract(self, tar_object, path):
341 """
342 extract a tar file
343 :param tar_object: object of type tar
344 :param path: can be a str or a str list, or a tar object where to extract the tar_object
345 :return: None
346 """
347 f = path if isinstance(path, str) else "/".join(path)
348
349 for member in tar_object.getmembers():
350 if member.isfile():
351 stream = tar_object.extractfile(member)
352 elif member.issym():
353 stream = BytesIO(member.linkname.encode("utf-8"))
354 else:
355 stream = BytesIO()
356
357 if member.isfile():
358 file_type = "file"
359 elif member.issym():
360 file_type = "sym"
361 else:
362 file_type = "dir"
363
364 metadata = {
365 "type": file_type,
366 "permissions": member.mode
367 }
368
369 self.fs.upload_from_stream(
370 f + "/" + member.name,
371 stream,
372 metadata=metadata
373 )
374
375 stream.close()
376
377 def file_open(self, storage, mode):
378 """
379 Open a file
380 :param storage: can be a str or list of str
381 :param mode: file mode
382 :return: file object
383 """
384 try:
385 f = storage if isinstance(storage, str) else "/".join(storage)
386
387 if "b" in mode:
388 return GridByteStream(f, self.fs, mode)
389 else:
390 return GridStringStream(f, self.fs, mode)
391 except errors.NoFile:
392 raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
393 except IOError:
394 raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
395
396 def dir_ls(self, storage):
397 """
398 return folder content
399 :param storage: can be a str or list of str
400 :return: folder content
401 """
402 try:
403 f = storage if isinstance(storage, str) else "/".join(storage)
404
405 files = []
406 dir_cursor = self.fs.find({"filename": f})
407 for requested_dir in dir_cursor:
408 exception_dir = next(dir_cursor, None)
409
410 if exception_dir:
411 raise FsException("Multiple directories found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
412
413 if requested_dir.metadata["type"] != "dir":
414 raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
415
416 files_cursor = self.fs.find({"filename": {"$regex": "^{}/([^/])*".format(f)}})
417 for children_file in files_cursor:
418 files += [children_file.filename.replace(f + '/', '', 1)]
419
420 return files
421 except IOError:
422 raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
423
424 def file_delete(self, storage, ignore_non_exist=False):
425 """
426 Delete storage content recursively
427 :param storage: can be a str or list of str
428 :param ignore_non_exist: not raise exception if storage does not exist
429 :return: None
430 """
431 try:
432 f = storage if isinstance(storage, str) else "/".join(storage)
433
434 file_cursor = self.fs.find({"filename": f})
435 found = False
436 for requested_file in file_cursor:
437 found = True
438 exception_file = next(file_cursor, None)
439
440 if exception_file:
441 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
442
443 if requested_file.metadata["type"] == "dir":
444 dir_cursor = self.fs.find({"filename": {"$regex": "^{}".format(f)}})
445
446 for tmp in dir_cursor:
447 self.fs.delete(tmp._id)
448 else:
449 self.fs.delete(requested_file._id)
450 if not found and not ignore_non_exist:
451 raise FsException("File {} does not exist".format(storage), http_code=HTTPStatus.NOT_FOUND)
452 except IOError as e:
453 raise FsException("File {} cannot be deleted: {}".format(f, e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
454
455 def sync(self):
456 """
457 Sync from FSMongo to local storage
458 """
459 self.__update_local_fs()