blob: 3c68a5f123463f21a97c858f524d784cbfc0a0b0 [file] [log] [blame]
Eduardo Sousa0593aba2019-06-04 12:55:43 +01001# Copyright 2019 Canonical
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14#
15# For those usages not covered by the Apache License, Version 2.0 please
16# contact: eduardo.sousa@canonical.com
17##
18
19from io import BytesIO, StringIO
20from pymongo import MongoClient
21from gridfs import GridFSBucket, errors
22import logging
23from http import HTTPStatus
24import os
sousaedub95cca62020-03-12 11:12:25 +000025import stat
Eduardo Sousa0593aba2019-06-04 12:55:43 +010026from osm_common.fsbase import FsBase, FsException
27
28__author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>"
29
30
31class GridByteStream(BytesIO):
32 def __init__(self, filename, fs, mode):
33 BytesIO.__init__(self)
34 self._id = None
35 self.filename = filename
36 self.fs = fs
37 self.mode = mode
David Garcia7982b782020-05-20 12:09:37 +020038 self.file_type = "file" # Set "file" as default file_type
Eduardo Sousa0593aba2019-06-04 12:55:43 +010039
40 self.__initialize__()
41
42 def __initialize__(self):
43 grid_file = None
44
45 cursor = self.fs.find({"filename": self.filename})
46
47 for requested_file in cursor:
48 exception_file = next(cursor, None)
49
50 if exception_file:
51 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
52
sousaedub95cca62020-03-12 11:12:25 +000053 if requested_file.metadata["type"] in ("file", "sym"):
Eduardo Sousa0593aba2019-06-04 12:55:43 +010054 grid_file = requested_file
sousaedub95cca62020-03-12 11:12:25 +000055 self.file_type = requested_file.metadata["type"]
Eduardo Sousa0593aba2019-06-04 12:55:43 +010056 else:
57 raise FsException("Type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
58
59 if grid_file:
60 self._id = grid_file._id
61 self.fs.download_to_stream(self._id, self)
62
63 if "r" in self.mode:
64 self.seek(0, 0)
65
66 def close(self):
67 if "r" in self.mode:
68 super(GridByteStream, self).close()
69 return
70
71 if self._id:
72 self.fs.delete(self._id)
73
74 cursor = self.fs.find({
75 "filename": self.filename.split("/")[0],
76 "metadata": {"type": "dir"}})
77
78 parent_dir = next(cursor, None)
79
80 if not parent_dir:
81 parent_dir_name = self.filename.split("/")[0]
82 self.filename = self.filename.replace(parent_dir_name, parent_dir_name[:-1], 1)
83
84 self.seek(0, 0)
85 if self._id:
86 self.fs.upload_from_stream_with_id(
87 self._id,
88 self.filename,
89 self,
sousaedub95cca62020-03-12 11:12:25 +000090 metadata={"type": self.file_type}
Eduardo Sousa0593aba2019-06-04 12:55:43 +010091 )
92 else:
93 self.fs.upload_from_stream(
94 self.filename,
95 self,
sousaedub95cca62020-03-12 11:12:25 +000096 metadata={"type": self.file_type}
Eduardo Sousa0593aba2019-06-04 12:55:43 +010097 )
98 super(GridByteStream, self).close()
99
100 def __enter__(self):
101 return self
102
103 def __exit__(self, exc_type, exc_val, exc_tb):
104 self.close()
105
106
107class GridStringStream(StringIO):
108 def __init__(self, filename, fs, mode):
109 StringIO.__init__(self)
110 self._id = None
111 self.filename = filename
112 self.fs = fs
113 self.mode = mode
David Garcia7982b782020-05-20 12:09:37 +0200114 self.file_type = "file" # Set "file" as default file_type
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100115
116 self.__initialize__()
117
118 def __initialize__(self):
119 grid_file = None
120
121 cursor = self.fs.find({"filename": self.filename})
122
123 for requested_file in cursor:
124 exception_file = next(cursor, None)
125
126 if exception_file:
127 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
128
sousaedub95cca62020-03-12 11:12:25 +0000129 if requested_file.metadata["type"] in ("file", "dir"):
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100130 grid_file = requested_file
sousaedub95cca62020-03-12 11:12:25 +0000131 self.file_type = requested_file.metadata["type"]
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100132 else:
133 raise FsException("File type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
134
135 if grid_file:
136 stream = BytesIO()
137 self._id = grid_file._id
138 self.fs.download_to_stream(self._id, stream)
139 stream.seek(0)
140 self.write(stream.read().decode("utf-8"))
141 stream.close()
142
143 if "r" in self.mode:
144 self.seek(0, 0)
145
146 def close(self):
147 if "r" in self.mode:
148 super(GridStringStream, self).close()
149 return
150
151 if self._id:
152 self.fs.delete(self._id)
153
154 cursor = self.fs.find({
155 "filename": self.filename.split("/")[0],
156 "metadata": {"type": "dir"}})
157
158 parent_dir = next(cursor, None)
159
160 if not parent_dir:
161 parent_dir_name = self.filename.split("/")[0]
162 self.filename = self.filename.replace(parent_dir_name, parent_dir_name[:-1], 1)
163
164 self.seek(0, 0)
165 stream = BytesIO()
166 stream.write(self.read().encode("utf-8"))
167 stream.seek(0, 0)
168 if self._id:
169 self.fs.upload_from_stream_with_id(
170 self._id,
171 self.filename,
172 stream,
sousaedub95cca62020-03-12 11:12:25 +0000173 metadata={"type": self.file_type}
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100174 )
175 else:
176 self.fs.upload_from_stream(
177 self.filename,
178 stream,
sousaedub95cca62020-03-12 11:12:25 +0000179 metadata={"type": self.file_type}
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100180 )
181 stream.close()
182 super(GridStringStream, self).close()
183
184 def __enter__(self):
185 return self
186
187 def __exit__(self, exc_type, exc_val, exc_tb):
188 self.close()
189
190
191class FsMongo(FsBase):
192
193 def __init__(self, logger_name='fs', lock=False):
194 super().__init__(logger_name, lock)
195 self.path = None
196 self.client = None
197 self.fs = None
198
199 def __update_local_fs(self):
200 dir_cursor = self.fs.find({"metadata.type": "dir"}, no_cursor_timeout=True)
201
202 for directory in dir_cursor:
203 os.makedirs(self.path + directory.filename, exist_ok=True)
204
David Garcia7982b782020-05-20 12:09:37 +0200205 file_cursor = self.fs.find({"metadata.type": {"$in": ["file", "sym"]}}, no_cursor_timeout=True)
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100206
207 for writing_file in file_cursor:
208 file_path = self.path + writing_file.filename
209 file_stream = open(file_path, 'wb+')
210 self.fs.download_to_stream(writing_file._id, file_stream)
211 file_stream.close()
212 if "permissions" in writing_file.metadata:
sousaedub95cca62020-03-12 11:12:25 +0000213 if writing_file.metadata["type"] == "sym":
214 os.chmod(
David Garcia7982b782020-05-20 12:09:37 +0200215 file_path,
sousaedub95cca62020-03-12 11:12:25 +0000216 writing_file.metadata["permissions"] | stat.S_IFLNK
217 )
218 else:
219 os.chmod(file_path, writing_file.metadata["permissions"])
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100220
221 def get_params(self):
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100222 return {"fs": "mongo", "path": self.path}
223
224 def fs_connect(self, config):
225 try:
226 if "logger_name" in config:
227 self.logger = logging.getLogger(config["logger_name"])
228 if "path" in config:
229 self.path = config["path"]
230 else:
231 raise FsException("Missing parameter \"path\"")
232 if not self.path.endswith("/"):
233 self.path += "/"
234 if not os.path.exists(self.path):
235 raise FsException("Invalid configuration param at '[storage]': path '{}' does not exist".format(
236 config["path"]))
237 elif not os.access(self.path, os.W_OK):
238 raise FsException("Invalid configuration param at '[storage]': path '{}' is not writable".format(
239 config["path"]))
240 if all(key in config.keys() for key in ["uri", "collection"]):
241 self.client = MongoClient(config["uri"])
242 self.fs = GridFSBucket(self.client[config["collection"]])
243 elif all(key in config.keys() for key in ["host", "port", "collection"]):
244 self.client = MongoClient(config["host"], config["port"])
245 self.fs = GridFSBucket(self.client[config["collection"]])
246 else:
247 if "collection" not in config.keys():
248 raise FsException("Missing parameter \"collection\"")
249 else:
250 raise FsException("Missing parameters: \"uri\" or \"host\" + \"port\"")
251 except FsException:
252 raise
253 except Exception as e: # TODO refine
254 raise FsException(str(e))
255
256 def fs_disconnect(self):
257 pass # TODO
258
259 def mkdir(self, folder):
260 """
261 Creates a folder or parent object location
262 :param folder:
263 :return: None or raises an exception
264 """
265 try:
266 self.fs.upload_from_stream(
267 folder, BytesIO(), metadata={"type": "dir"})
268 except errors.FileExists: # make it idempotent
269 pass
270 except Exception as e:
271 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
272
273 def dir_rename(self, src, dst):
274 """
275 Rename one directory name. If dst exist, it replaces (deletes) existing directory
276 :param src: source directory
277 :param dst: destination directory
278 :return: None or raises and exception
279 """
280 try:
281 dst_cursor = self.fs.find(
282 {"filename": {"$regex": "^{}(/|$)".format(dst)}},
283 no_cursor_timeout=True)
284
285 for dst_file in dst_cursor:
286 self.fs.delete(dst_file._id)
287
288 src_cursor = self.fs.find(
289 {"filename": {"$regex": "^{}(/|$)".format(src)}},
290 no_cursor_timeout=True)
291
292 for src_file in src_cursor:
293 self.fs.rename(src_file._id, src_file.filename.replace(src, dst, 1))
294 except Exception as e:
295 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
296
297 def file_exists(self, storage, mode=None):
298 """
299 Indicates if "storage" file exist
300 :param storage: can be a str or a str list
301 :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
302 :return: True, False
303 """
304 f = storage if isinstance(storage, str) else "/".join(storage)
305
306 cursor = self.fs.find({"filename": f})
307
308 for requested_file in cursor:
309 exception_file = next(cursor, None)
310
311 if exception_file:
312 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
313
314 if requested_file.metadata["type"] == mode:
315 return True
sousaedub95cca62020-03-12 11:12:25 +0000316
317 if requested_file.metadata["type"] == "sym" and mode == "file":
318 return True
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100319
320 return False
321
322 def file_size(self, storage):
323 """
324 return file size
325 :param storage: can be a str or a str list
326 :return: file size
327 """
328 f = storage if isinstance(storage, str) else "/".join(storage)
329
330 cursor = self.fs.find({"filename": f})
331
332 for requested_file in cursor:
333 exception_file = next(cursor, None)
334
335 if exception_file:
336 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
337
338 return requested_file.length
339
340 def file_extract(self, tar_object, path):
341 """
342 extract a tar file
343 :param tar_object: object of type tar
344 :param path: can be a str or a str list, or a tar object where to extract the tar_object
345 :return: None
346 """
347 f = path if isinstance(path, str) else "/".join(path)
348
349 for member in tar_object.getmembers():
350 if member.isfile():
351 stream = tar_object.extractfile(member)
352 else:
353 stream = BytesIO()
354
sousaedub95cca62020-03-12 11:12:25 +0000355 if member.isfile():
356 file_type = "file"
357 elif member.issym():
358 file_type = "sym"
359 else:
360 file_type = "dir"
361
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100362 metadata = {
sousaedub95cca62020-03-12 11:12:25 +0000363 "type": file_type,
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100364 "permissions": member.mode
365 }
366
367 self.fs.upload_from_stream(
368 f + "/" + member.name,
369 stream,
370 metadata=metadata
371 )
372
373 stream.close()
374
375 def file_open(self, storage, mode):
376 """
377 Open a file
378 :param storage: can be a str or list of str
379 :param mode: file mode
380 :return: file object
381 """
382 try:
383 f = storage if isinstance(storage, str) else "/".join(storage)
384
385 if "b" in mode:
386 return GridByteStream(f, self.fs, mode)
387 else:
388 return GridStringStream(f, self.fs, mode)
389 except errors.NoFile:
390 raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
391 except IOError:
392 raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
393
394 def dir_ls(self, storage):
395 """
396 return folder content
397 :param storage: can be a str or list of str
398 :return: folder content
399 """
400 try:
401 f = storage if isinstance(storage, str) else "/".join(storage)
402
403 files = []
404 dir_cursor = self.fs.find({"filename": f})
405 for requested_dir in dir_cursor:
406 exception_dir = next(dir_cursor, None)
407
408 if exception_dir:
409 raise FsException("Multiple directories found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
410
411 if requested_dir.metadata["type"] != "dir":
412 raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
413
414 files_cursor = self.fs.find({"filename": {"$regex": "^{}/([^/])*".format(f)}})
415 for children_file in files_cursor:
416 files += [children_file.filename.replace(f + '/', '', 1)]
417
418 return files
419 except IOError:
420 raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
421
422 def file_delete(self, storage, ignore_non_exist=False):
423 """
424 Delete storage content recursively
425 :param storage: can be a str or list of str
426 :param ignore_non_exist: not raise exception if storage does not exist
427 :return: None
428 """
429 try:
430 f = storage if isinstance(storage, str) else "/".join(storage)
431
432 file_cursor = self.fs.find({"filename": f})
433 found = False
434 for requested_file in file_cursor:
435 found = True
436 exception_file = next(file_cursor, None)
437
438 if exception_file:
439 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
440
441 if requested_file.metadata["type"] == "dir":
442 dir_cursor = self.fs.find({"filename": {"$regex": "^{}".format(f)}})
443
444 for tmp in dir_cursor:
445 self.fs.delete(tmp._id)
446 else:
447 self.fs.delete(requested_file._id)
448 if not found and not ignore_non_exist:
449 raise FsException("File {} does not exist".format(storage), http_code=HTTPStatus.NOT_FOUND)
450 except IOError as e:
451 raise FsException("File {} cannot be deleted: {}".format(f, e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
David Garcia788b9d62020-01-20 13:21:06 +0100452
453 def sync(self):
454 """
455 Sync from FSMongo to local storage
456 """
457 self.__update_local_fs()