Add sync function to FSBase, and implementations to fslocal and fsmongo
[osm/common.git] / osm_common / fsmongo.py
1 # Copyright 2019 Canonical
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
13 # under the License.
14 #
15 # For those usages not covered by the Apache License, Version 2.0 please
16 # contact: eduardo.sousa@canonical.com
17 ##
18
19 from io import BytesIO, StringIO
20 from pymongo import MongoClient
21 from gridfs import GridFSBucket, errors
22 import logging
23 from http import HTTPStatus
24 import os
25 from osm_common.fsbase import FsBase, FsException
26
27 __author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>"
28
29
30 class GridByteStream(BytesIO):
31 def __init__(self, filename, fs, mode):
32 BytesIO.__init__(self)
33 self._id = None
34 self.filename = filename
35 self.fs = fs
36 self.mode = mode
37
38 self.__initialize__()
39
40 def __initialize__(self):
41 grid_file = None
42
43 cursor = self.fs.find({"filename": self.filename})
44
45 for requested_file in cursor:
46 exception_file = next(cursor, None)
47
48 if exception_file:
49 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
50
51 if requested_file.metadata["type"] == "file":
52 grid_file = requested_file
53 else:
54 raise FsException("Type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
55
56 if grid_file:
57 self._id = grid_file._id
58 self.fs.download_to_stream(self._id, self)
59
60 if "r" in self.mode:
61 self.seek(0, 0)
62
63 def close(self):
64 if "r" in self.mode:
65 super(GridByteStream, self).close()
66 return
67
68 if self._id:
69 self.fs.delete(self._id)
70
71 cursor = self.fs.find({
72 "filename": self.filename.split("/")[0],
73 "metadata": {"type": "dir"}})
74
75 parent_dir = next(cursor, None)
76
77 if not parent_dir:
78 parent_dir_name = self.filename.split("/")[0]
79 self.filename = self.filename.replace(parent_dir_name, parent_dir_name[:-1], 1)
80
81 self.seek(0, 0)
82 if self._id:
83 self.fs.upload_from_stream_with_id(
84 self._id,
85 self.filename,
86 self,
87 metadata={"type": "file"}
88 )
89 else:
90 self.fs.upload_from_stream(
91 self.filename,
92 self,
93 metadata={"type": "file"}
94 )
95 super(GridByteStream, self).close()
96
97 def __enter__(self):
98 return self
99
100 def __exit__(self, exc_type, exc_val, exc_tb):
101 self.close()
102
103
104 class GridStringStream(StringIO):
105 def __init__(self, filename, fs, mode):
106 StringIO.__init__(self)
107 self._id = None
108 self.filename = filename
109 self.fs = fs
110 self.mode = mode
111
112 self.__initialize__()
113
114 def __initialize__(self):
115 grid_file = None
116
117 cursor = self.fs.find({"filename": self.filename})
118
119 for requested_file in cursor:
120 exception_file = next(cursor, None)
121
122 if exception_file:
123 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
124
125 if requested_file.metadata["type"] == "file":
126 grid_file = requested_file
127 else:
128 raise FsException("File type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
129
130 if grid_file:
131 stream = BytesIO()
132 self._id = grid_file._id
133 self.fs.download_to_stream(self._id, stream)
134 stream.seek(0)
135 self.write(stream.read().decode("utf-8"))
136 stream.close()
137
138 if "r" in self.mode:
139 self.seek(0, 0)
140
141 def close(self):
142 if "r" in self.mode:
143 super(GridStringStream, self).close()
144 return
145
146 if self._id:
147 self.fs.delete(self._id)
148
149 cursor = self.fs.find({
150 "filename": self.filename.split("/")[0],
151 "metadata": {"type": "dir"}})
152
153 parent_dir = next(cursor, None)
154
155 if not parent_dir:
156 parent_dir_name = self.filename.split("/")[0]
157 self.filename = self.filename.replace(parent_dir_name, parent_dir_name[:-1], 1)
158
159 self.seek(0, 0)
160 stream = BytesIO()
161 stream.write(self.read().encode("utf-8"))
162 stream.seek(0, 0)
163 if self._id:
164 self.fs.upload_from_stream_with_id(
165 self._id,
166 self.filename,
167 stream,
168 metadata={"type": "file"}
169 )
170 else:
171 self.fs.upload_from_stream(
172 self.filename,
173 stream,
174 metadata={"type": "file"}
175 )
176 stream.close()
177 super(GridStringStream, self).close()
178
179 def __enter__(self):
180 return self
181
182 def __exit__(self, exc_type, exc_val, exc_tb):
183 self.close()
184
185
186 class FsMongo(FsBase):
187
188 def __init__(self, logger_name='fs', lock=False):
189 super().__init__(logger_name, lock)
190 self.path = None
191 self.client = None
192 self.fs = None
193
194 def __update_local_fs(self):
195 dir_cursor = self.fs.find({"metadata.type": "dir"}, no_cursor_timeout=True)
196
197 for directory in dir_cursor:
198 os.makedirs(self.path + directory.filename, exist_ok=True)
199
200 file_cursor = self.fs.find({"metadata.type": "file"}, no_cursor_timeout=True)
201
202 for writing_file in file_cursor:
203 file_path = self.path + writing_file.filename
204 file_stream = open(file_path, 'wb+')
205 self.fs.download_to_stream(writing_file._id, file_stream)
206 file_stream.close()
207 if "permissions" in writing_file.metadata:
208 os.chmod(file_path, writing_file.metadata["permissions"])
209
210 def get_params(self):
211 return {"fs": "mongo", "path": self.path}
212
213 def fs_connect(self, config):
214 try:
215 if "logger_name" in config:
216 self.logger = logging.getLogger(config["logger_name"])
217 if "path" in config:
218 self.path = config["path"]
219 else:
220 raise FsException("Missing parameter \"path\"")
221 if not self.path.endswith("/"):
222 self.path += "/"
223 if not os.path.exists(self.path):
224 raise FsException("Invalid configuration param at '[storage]': path '{}' does not exist".format(
225 config["path"]))
226 elif not os.access(self.path, os.W_OK):
227 raise FsException("Invalid configuration param at '[storage]': path '{}' is not writable".format(
228 config["path"]))
229 if all(key in config.keys() for key in ["uri", "collection"]):
230 self.client = MongoClient(config["uri"])
231 self.fs = GridFSBucket(self.client[config["collection"]])
232 elif all(key in config.keys() for key in ["host", "port", "collection"]):
233 self.client = MongoClient(config["host"], config["port"])
234 self.fs = GridFSBucket(self.client[config["collection"]])
235 else:
236 if "collection" not in config.keys():
237 raise FsException("Missing parameter \"collection\"")
238 else:
239 raise FsException("Missing parameters: \"uri\" or \"host\" + \"port\"")
240 except FsException:
241 raise
242 except Exception as e: # TODO refine
243 raise FsException(str(e))
244
245 def fs_disconnect(self):
246 pass # TODO
247
248 def mkdir(self, folder):
249 """
250 Creates a folder or parent object location
251 :param folder:
252 :return: None or raises an exception
253 """
254 try:
255 self.fs.upload_from_stream(
256 folder, BytesIO(), metadata={"type": "dir"})
257 except errors.FileExists: # make it idempotent
258 pass
259 except Exception as e:
260 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
261
262 def dir_rename(self, src, dst):
263 """
264 Rename one directory name. If dst exist, it replaces (deletes) existing directory
265 :param src: source directory
266 :param dst: destination directory
267 :return: None or raises and exception
268 """
269 try:
270 dst_cursor = self.fs.find(
271 {"filename": {"$regex": "^{}(/|$)".format(dst)}},
272 no_cursor_timeout=True)
273
274 for dst_file in dst_cursor:
275 self.fs.delete(dst_file._id)
276
277 src_cursor = self.fs.find(
278 {"filename": {"$regex": "^{}(/|$)".format(src)}},
279 no_cursor_timeout=True)
280
281 for src_file in src_cursor:
282 self.fs.rename(src_file._id, src_file.filename.replace(src, dst, 1))
283 except Exception as e:
284 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
285
286 def file_exists(self, storage, mode=None):
287 """
288 Indicates if "storage" file exist
289 :param storage: can be a str or a str list
290 :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
291 :return: True, False
292 """
293 f = storage if isinstance(storage, str) else "/".join(storage)
294
295 cursor = self.fs.find({"filename": f})
296
297 for requested_file in cursor:
298 exception_file = next(cursor, None)
299
300 if exception_file:
301 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
302
303 if requested_file.metadata["type"] == mode:
304 return True
305
306 return False
307
308 def file_size(self, storage):
309 """
310 return file size
311 :param storage: can be a str or a str list
312 :return: file size
313 """
314 f = storage if isinstance(storage, str) else "/".join(storage)
315
316 cursor = self.fs.find({"filename": f})
317
318 for requested_file in cursor:
319 exception_file = next(cursor, None)
320
321 if exception_file:
322 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
323
324 return requested_file.length
325
326 def file_extract(self, tar_object, path):
327 """
328 extract a tar file
329 :param tar_object: object of type tar
330 :param path: can be a str or a str list, or a tar object where to extract the tar_object
331 :return: None
332 """
333 f = path if isinstance(path, str) else "/".join(path)
334
335 for member in tar_object.getmembers():
336 if member.isfile():
337 stream = tar_object.extractfile(member)
338 else:
339 stream = BytesIO()
340
341 metadata = {
342 "type": "file" if member.isfile() else "dir",
343 "permissions": member.mode
344 }
345
346 self.fs.upload_from_stream(
347 f + "/" + member.name,
348 stream,
349 metadata=metadata
350 )
351
352 stream.close()
353
354 def file_open(self, storage, mode):
355 """
356 Open a file
357 :param storage: can be a str or list of str
358 :param mode: file mode
359 :return: file object
360 """
361 try:
362 f = storage if isinstance(storage, str) else "/".join(storage)
363
364 if "b" in mode:
365 return GridByteStream(f, self.fs, mode)
366 else:
367 return GridStringStream(f, self.fs, mode)
368 except errors.NoFile:
369 raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
370 except IOError:
371 raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
372
373 def dir_ls(self, storage):
374 """
375 return folder content
376 :param storage: can be a str or list of str
377 :return: folder content
378 """
379 try:
380 f = storage if isinstance(storage, str) else "/".join(storage)
381
382 files = []
383 dir_cursor = self.fs.find({"filename": f})
384 for requested_dir in dir_cursor:
385 exception_dir = next(dir_cursor, None)
386
387 if exception_dir:
388 raise FsException("Multiple directories found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
389
390 if requested_dir.metadata["type"] != "dir":
391 raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
392
393 files_cursor = self.fs.find({"filename": {"$regex": "^{}/([^/])*".format(f)}})
394 for children_file in files_cursor:
395 files += [children_file.filename.replace(f + '/', '', 1)]
396
397 return files
398 except IOError:
399 raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
400
401 def file_delete(self, storage, ignore_non_exist=False):
402 """
403 Delete storage content recursively
404 :param storage: can be a str or list of str
405 :param ignore_non_exist: not raise exception if storage does not exist
406 :return: None
407 """
408 try:
409 f = storage if isinstance(storage, str) else "/".join(storage)
410
411 file_cursor = self.fs.find({"filename": f})
412 found = False
413 for requested_file in file_cursor:
414 found = True
415 exception_file = next(file_cursor, None)
416
417 if exception_file:
418 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
419
420 if requested_file.metadata["type"] == "dir":
421 dir_cursor = self.fs.find({"filename": {"$regex": "^{}".format(f)}})
422
423 for tmp in dir_cursor:
424 self.fs.delete(tmp._id)
425 else:
426 self.fs.delete(requested_file._id)
427 if not found and not ignore_non_exist:
428 raise FsException("File {} does not exist".format(storage), http_code=HTTPStatus.NOT_FOUND)
429 except IOError as e:
430 raise FsException("File {} cannot be deleted: {}".format(f, e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
431
432 def sync(self):
433 """
434 Sync from FSMongo to local storage
435 """
436 self.__update_local_fs()