Fix bug 1069: Add default value for fsmongo.GridByteStream.file_type
[osm/common.git] / osm_common / fsmongo.py
1 # Copyright 2019 Canonical
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
13 # under the License.
14 #
15 # For those usages not covered by the Apache License, Version 2.0 please
16 # contact: eduardo.sousa@canonical.com
17 ##
18
19 from io import BytesIO, StringIO
20 from pymongo import MongoClient
21 from gridfs import GridFSBucket, errors
22 import logging
23 from http import HTTPStatus
24 import os
25 import stat
26 from osm_common.fsbase import FsBase, FsException
27
28 __author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>"
29
30
31 class GridByteStream(BytesIO):
32 def __init__(self, filename, fs, mode):
33 BytesIO.__init__(self)
34 self._id = None
35 self.filename = filename
36 self.fs = fs
37 self.mode = mode
38 self.file_type = "file" # Set "file" as default file_type
39
40 self.__initialize__()
41
42 def __initialize__(self):
43 grid_file = None
44
45 cursor = self.fs.find({"filename": self.filename})
46
47 for requested_file in cursor:
48 exception_file = next(cursor, None)
49
50 if exception_file:
51 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
52
53 if requested_file.metadata["type"] in ("file", "sym"):
54 grid_file = requested_file
55 self.file_type = requested_file.metadata["type"]
56 else:
57 raise FsException("Type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
58
59 if grid_file:
60 self._id = grid_file._id
61 self.fs.download_to_stream(self._id, self)
62
63 if "r" in self.mode:
64 self.seek(0, 0)
65
66 def close(self):
67 if "r" in self.mode:
68 super(GridByteStream, self).close()
69 return
70
71 if self._id:
72 self.fs.delete(self._id)
73
74 cursor = self.fs.find({
75 "filename": self.filename.split("/")[0],
76 "metadata": {"type": "dir"}})
77
78 parent_dir = next(cursor, None)
79
80 if not parent_dir:
81 parent_dir_name = self.filename.split("/")[0]
82 self.filename = self.filename.replace(parent_dir_name, parent_dir_name[:-1], 1)
83
84 self.seek(0, 0)
85 if self._id:
86 self.fs.upload_from_stream_with_id(
87 self._id,
88 self.filename,
89 self,
90 metadata={"type": self.file_type}
91 )
92 else:
93 self.fs.upload_from_stream(
94 self.filename,
95 self,
96 metadata={"type": self.file_type}
97 )
98 super(GridByteStream, self).close()
99
100 def __enter__(self):
101 return self
102
103 def __exit__(self, exc_type, exc_val, exc_tb):
104 self.close()
105
106
107 class GridStringStream(StringIO):
108 def __init__(self, filename, fs, mode):
109 StringIO.__init__(self)
110 self._id = None
111 self.filename = filename
112 self.fs = fs
113 self.mode = mode
114 self.file_type = "file" # Set "file" as default file_type
115
116 self.__initialize__()
117
118 def __initialize__(self):
119 grid_file = None
120
121 cursor = self.fs.find({"filename": self.filename})
122
123 for requested_file in cursor:
124 exception_file = next(cursor, None)
125
126 if exception_file:
127 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
128
129 if requested_file.metadata["type"] in ("file", "dir"):
130 grid_file = requested_file
131 self.file_type = requested_file.metadata["type"]
132 else:
133 raise FsException("File type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
134
135 if grid_file:
136 stream = BytesIO()
137 self._id = grid_file._id
138 self.fs.download_to_stream(self._id, stream)
139 stream.seek(0)
140 self.write(stream.read().decode("utf-8"))
141 stream.close()
142
143 if "r" in self.mode:
144 self.seek(0, 0)
145
146 def close(self):
147 if "r" in self.mode:
148 super(GridStringStream, self).close()
149 return
150
151 if self._id:
152 self.fs.delete(self._id)
153
154 cursor = self.fs.find({
155 "filename": self.filename.split("/")[0],
156 "metadata": {"type": "dir"}})
157
158 parent_dir = next(cursor, None)
159
160 if not parent_dir:
161 parent_dir_name = self.filename.split("/")[0]
162 self.filename = self.filename.replace(parent_dir_name, parent_dir_name[:-1], 1)
163
164 self.seek(0, 0)
165 stream = BytesIO()
166 stream.write(self.read().encode("utf-8"))
167 stream.seek(0, 0)
168 if self._id:
169 self.fs.upload_from_stream_with_id(
170 self._id,
171 self.filename,
172 stream,
173 metadata={"type": self.file_type}
174 )
175 else:
176 self.fs.upload_from_stream(
177 self.filename,
178 stream,
179 metadata={"type": self.file_type}
180 )
181 stream.close()
182 super(GridStringStream, self).close()
183
184 def __enter__(self):
185 return self
186
187 def __exit__(self, exc_type, exc_val, exc_tb):
188 self.close()
189
190
191 class FsMongo(FsBase):
192
193 def __init__(self, logger_name='fs', lock=False):
194 super().__init__(logger_name, lock)
195 self.path = None
196 self.client = None
197 self.fs = None
198
199 def __update_local_fs(self):
200 dir_cursor = self.fs.find({"metadata.type": "dir"}, no_cursor_timeout=True)
201
202 for directory in dir_cursor:
203 os.makedirs(self.path + directory.filename, exist_ok=True)
204
205 file_cursor = self.fs.find({"metadata.type": {"$in": ["file", "sym"]}}, no_cursor_timeout=True)
206
207 for writing_file in file_cursor:
208 file_path = self.path + writing_file.filename
209 file_stream = open(file_path, 'wb+')
210 self.fs.download_to_stream(writing_file._id, file_stream)
211 file_stream.close()
212 if "permissions" in writing_file.metadata:
213 if writing_file.metadata["type"] == "sym":
214 os.chmod(
215 file_path,
216 writing_file.metadata["permissions"] | stat.S_IFLNK
217 )
218 else:
219 os.chmod(file_path, writing_file.metadata["permissions"])
220
221 def get_params(self):
222 return {"fs": "mongo", "path": self.path}
223
224 def fs_connect(self, config):
225 try:
226 if "logger_name" in config:
227 self.logger = logging.getLogger(config["logger_name"])
228 if "path" in config:
229 self.path = config["path"]
230 else:
231 raise FsException("Missing parameter \"path\"")
232 if not self.path.endswith("/"):
233 self.path += "/"
234 if not os.path.exists(self.path):
235 raise FsException("Invalid configuration param at '[storage]': path '{}' does not exist".format(
236 config["path"]))
237 elif not os.access(self.path, os.W_OK):
238 raise FsException("Invalid configuration param at '[storage]': path '{}' is not writable".format(
239 config["path"]))
240 if all(key in config.keys() for key in ["uri", "collection"]):
241 self.client = MongoClient(config["uri"])
242 self.fs = GridFSBucket(self.client[config["collection"]])
243 elif all(key in config.keys() for key in ["host", "port", "collection"]):
244 self.client = MongoClient(config["host"], config["port"])
245 self.fs = GridFSBucket(self.client[config["collection"]])
246 else:
247 if "collection" not in config.keys():
248 raise FsException("Missing parameter \"collection\"")
249 else:
250 raise FsException("Missing parameters: \"uri\" or \"host\" + \"port\"")
251 except FsException:
252 raise
253 except Exception as e: # TODO refine
254 raise FsException(str(e))
255
256 def fs_disconnect(self):
257 pass # TODO
258
259 def mkdir(self, folder):
260 """
261 Creates a folder or parent object location
262 :param folder:
263 :return: None or raises an exception
264 """
265 try:
266 self.fs.upload_from_stream(
267 folder, BytesIO(), metadata={"type": "dir"})
268 except errors.FileExists: # make it idempotent
269 pass
270 except Exception as e:
271 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
272
273 def dir_rename(self, src, dst):
274 """
275 Rename one directory name. If dst exist, it replaces (deletes) existing directory
276 :param src: source directory
277 :param dst: destination directory
278 :return: None or raises and exception
279 """
280 try:
281 dst_cursor = self.fs.find(
282 {"filename": {"$regex": "^{}(/|$)".format(dst)}},
283 no_cursor_timeout=True)
284
285 for dst_file in dst_cursor:
286 self.fs.delete(dst_file._id)
287
288 src_cursor = self.fs.find(
289 {"filename": {"$regex": "^{}(/|$)".format(src)}},
290 no_cursor_timeout=True)
291
292 for src_file in src_cursor:
293 self.fs.rename(src_file._id, src_file.filename.replace(src, dst, 1))
294 except Exception as e:
295 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
296
297 def file_exists(self, storage, mode=None):
298 """
299 Indicates if "storage" file exist
300 :param storage: can be a str or a str list
301 :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
302 :return: True, False
303 """
304 f = storage if isinstance(storage, str) else "/".join(storage)
305
306 cursor = self.fs.find({"filename": f})
307
308 for requested_file in cursor:
309 exception_file = next(cursor, None)
310
311 if exception_file:
312 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
313
314 if requested_file.metadata["type"] == mode:
315 return True
316
317 if requested_file.metadata["type"] == "sym" and mode == "file":
318 return True
319
320 return False
321
322 def file_size(self, storage):
323 """
324 return file size
325 :param storage: can be a str or a str list
326 :return: file size
327 """
328 f = storage if isinstance(storage, str) else "/".join(storage)
329
330 cursor = self.fs.find({"filename": f})
331
332 for requested_file in cursor:
333 exception_file = next(cursor, None)
334
335 if exception_file:
336 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
337
338 return requested_file.length
339
340 def file_extract(self, tar_object, path):
341 """
342 extract a tar file
343 :param tar_object: object of type tar
344 :param path: can be a str or a str list, or a tar object where to extract the tar_object
345 :return: None
346 """
347 f = path if isinstance(path, str) else "/".join(path)
348
349 for member in tar_object.getmembers():
350 if member.isfile():
351 stream = tar_object.extractfile(member)
352 else:
353 stream = BytesIO()
354
355 if member.isfile():
356 file_type = "file"
357 elif member.issym():
358 file_type = "sym"
359 else:
360 file_type = "dir"
361
362 metadata = {
363 "type": file_type,
364 "permissions": member.mode
365 }
366
367 self.fs.upload_from_stream(
368 f + "/" + member.name,
369 stream,
370 metadata=metadata
371 )
372
373 stream.close()
374
375 def file_open(self, storage, mode):
376 """
377 Open a file
378 :param storage: can be a str or list of str
379 :param mode: file mode
380 :return: file object
381 """
382 try:
383 f = storage if isinstance(storage, str) else "/".join(storage)
384
385 if "b" in mode:
386 return GridByteStream(f, self.fs, mode)
387 else:
388 return GridStringStream(f, self.fs, mode)
389 except errors.NoFile:
390 raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
391 except IOError:
392 raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
393
394 def dir_ls(self, storage):
395 """
396 return folder content
397 :param storage: can be a str or list of str
398 :return: folder content
399 """
400 try:
401 f = storage if isinstance(storage, str) else "/".join(storage)
402
403 files = []
404 dir_cursor = self.fs.find({"filename": f})
405 for requested_dir in dir_cursor:
406 exception_dir = next(dir_cursor, None)
407
408 if exception_dir:
409 raise FsException("Multiple directories found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
410
411 if requested_dir.metadata["type"] != "dir":
412 raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
413
414 files_cursor = self.fs.find({"filename": {"$regex": "^{}/([^/])*".format(f)}})
415 for children_file in files_cursor:
416 files += [children_file.filename.replace(f + '/', '', 1)]
417
418 return files
419 except IOError:
420 raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
421
422 def file_delete(self, storage, ignore_non_exist=False):
423 """
424 Delete storage content recursively
425 :param storage: can be a str or list of str
426 :param ignore_non_exist: not raise exception if storage does not exist
427 :return: None
428 """
429 try:
430 f = storage if isinstance(storage, str) else "/".join(storage)
431
432 file_cursor = self.fs.find({"filename": f})
433 found = False
434 for requested_file in file_cursor:
435 found = True
436 exception_file = next(file_cursor, None)
437
438 if exception_file:
439 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
440
441 if requested_file.metadata["type"] == "dir":
442 dir_cursor = self.fs.find({"filename": {"$regex": "^{}".format(f)}})
443
444 for tmp in dir_cursor:
445 self.fs.delete(tmp._id)
446 else:
447 self.fs.delete(requested_file._id)
448 if not found and not ignore_non_exist:
449 raise FsException("File {} does not exist".format(storage), http_code=HTTPStatus.NOT_FOUND)
450 except IOError as e:
451 raise FsException("File {} cannot be deleted: {}".format(f, e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
452
453 def sync(self):
454 """
455 Sync from FSMongo to local storage
456 """
457 self.__update_local_fs()