blob: cd7f7f0e81f2f2196259997d8c638c61f576ff4c [file] [log] [blame]
Eduardo Sousa0593aba2019-06-04 12:55:43 +01001# Copyright 2019 Canonical
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14#
15# For those usages not covered by the Apache License, Version 2.0 please
16# contact: eduardo.sousa@canonical.com
17##
18
19from io import BytesIO, StringIO
20from pymongo import MongoClient
21from gridfs import GridFSBucket, errors
22import logging
23from http import HTTPStatus
24import os
sousaedub95cca62020-03-12 11:12:25 +000025import stat
Eduardo Sousa0593aba2019-06-04 12:55:43 +010026from osm_common.fsbase import FsBase, FsException
27
28__author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>"
29
30
31class GridByteStream(BytesIO):
32 def __init__(self, filename, fs, mode):
33 BytesIO.__init__(self)
34 self._id = None
35 self.filename = filename
36 self.fs = fs
37 self.mode = mode
38
39 self.__initialize__()
40
41 def __initialize__(self):
42 grid_file = None
43
44 cursor = self.fs.find({"filename": self.filename})
45
46 for requested_file in cursor:
47 exception_file = next(cursor, None)
48
49 if exception_file:
50 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
51
sousaedub95cca62020-03-12 11:12:25 +000052 if requested_file.metadata["type"] in ("file", "sym"):
Eduardo Sousa0593aba2019-06-04 12:55:43 +010053 grid_file = requested_file
sousaedub95cca62020-03-12 11:12:25 +000054 self.file_type = requested_file.metadata["type"]
Eduardo Sousa0593aba2019-06-04 12:55:43 +010055 else:
56 raise FsException("Type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
57
58 if grid_file:
59 self._id = grid_file._id
60 self.fs.download_to_stream(self._id, self)
61
62 if "r" in self.mode:
63 self.seek(0, 0)
64
65 def close(self):
66 if "r" in self.mode:
67 super(GridByteStream, self).close()
68 return
69
70 if self._id:
71 self.fs.delete(self._id)
72
73 cursor = self.fs.find({
74 "filename": self.filename.split("/")[0],
75 "metadata": {"type": "dir"}})
76
77 parent_dir = next(cursor, None)
78
79 if not parent_dir:
80 parent_dir_name = self.filename.split("/")[0]
81 self.filename = self.filename.replace(parent_dir_name, parent_dir_name[:-1], 1)
82
83 self.seek(0, 0)
84 if self._id:
85 self.fs.upload_from_stream_with_id(
86 self._id,
87 self.filename,
88 self,
sousaedub95cca62020-03-12 11:12:25 +000089 metadata={"type": self.file_type}
Eduardo Sousa0593aba2019-06-04 12:55:43 +010090 )
91 else:
92 self.fs.upload_from_stream(
93 self.filename,
94 self,
sousaedub95cca62020-03-12 11:12:25 +000095 metadata={"type": self.file_type}
Eduardo Sousa0593aba2019-06-04 12:55:43 +010096 )
97 super(GridByteStream, self).close()
98
99 def __enter__(self):
100 return self
101
102 def __exit__(self, exc_type, exc_val, exc_tb):
103 self.close()
104
105
106class GridStringStream(StringIO):
107 def __init__(self, filename, fs, mode):
108 StringIO.__init__(self)
109 self._id = None
110 self.filename = filename
111 self.fs = fs
112 self.mode = mode
113
114 self.__initialize__()
115
116 def __initialize__(self):
117 grid_file = None
118
119 cursor = self.fs.find({"filename": self.filename})
120
121 for requested_file in cursor:
122 exception_file = next(cursor, None)
123
124 if exception_file:
125 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
126
sousaedub95cca62020-03-12 11:12:25 +0000127 if requested_file.metadata["type"] in ("file", "dir"):
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100128 grid_file = requested_file
sousaedub95cca62020-03-12 11:12:25 +0000129 self.file_type = requested_file.metadata["type"]
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100130 else:
131 raise FsException("File type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
132
133 if grid_file:
134 stream = BytesIO()
135 self._id = grid_file._id
136 self.fs.download_to_stream(self._id, stream)
137 stream.seek(0)
138 self.write(stream.read().decode("utf-8"))
139 stream.close()
140
141 if "r" in self.mode:
142 self.seek(0, 0)
143
144 def close(self):
145 if "r" in self.mode:
146 super(GridStringStream, self).close()
147 return
148
149 if self._id:
150 self.fs.delete(self._id)
151
152 cursor = self.fs.find({
153 "filename": self.filename.split("/")[0],
154 "metadata": {"type": "dir"}})
155
156 parent_dir = next(cursor, None)
157
158 if not parent_dir:
159 parent_dir_name = self.filename.split("/")[0]
160 self.filename = self.filename.replace(parent_dir_name, parent_dir_name[:-1], 1)
161
162 self.seek(0, 0)
163 stream = BytesIO()
164 stream.write(self.read().encode("utf-8"))
165 stream.seek(0, 0)
166 if self._id:
167 self.fs.upload_from_stream_with_id(
168 self._id,
169 self.filename,
170 stream,
sousaedub95cca62020-03-12 11:12:25 +0000171 metadata={"type": self.file_type}
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100172 )
173 else:
174 self.fs.upload_from_stream(
175 self.filename,
176 stream,
sousaedub95cca62020-03-12 11:12:25 +0000177 metadata={"type": self.file_type}
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100178 )
179 stream.close()
180 super(GridStringStream, self).close()
181
182 def __enter__(self):
183 return self
184
185 def __exit__(self, exc_type, exc_val, exc_tb):
186 self.close()
187
188
189class FsMongo(FsBase):
190
191 def __init__(self, logger_name='fs', lock=False):
192 super().__init__(logger_name, lock)
193 self.path = None
194 self.client = None
195 self.fs = None
196
197 def __update_local_fs(self):
198 dir_cursor = self.fs.find({"metadata.type": "dir"}, no_cursor_timeout=True)
199
200 for directory in dir_cursor:
201 os.makedirs(self.path + directory.filename, exist_ok=True)
202
sousaedub95cca62020-03-12 11:12:25 +0000203 file_cursor = self.fs.find({"metadata.type": {"$elemMatch": ["file", "sym"]}}, no_cursor_timeout=True)
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100204
205 for writing_file in file_cursor:
206 file_path = self.path + writing_file.filename
207 file_stream = open(file_path, 'wb+')
208 self.fs.download_to_stream(writing_file._id, file_stream)
209 file_stream.close()
210 if "permissions" in writing_file.metadata:
sousaedub95cca62020-03-12 11:12:25 +0000211 if writing_file.metadata["type"] == "sym":
212 os.chmod(
213 file_path,
214 writing_file.metadata["permissions"] | stat.S_IFLNK
215 )
216 else:
217 os.chmod(file_path, writing_file.metadata["permissions"])
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100218
219 def get_params(self):
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100220 return {"fs": "mongo", "path": self.path}
221
222 def fs_connect(self, config):
223 try:
224 if "logger_name" in config:
225 self.logger = logging.getLogger(config["logger_name"])
226 if "path" in config:
227 self.path = config["path"]
228 else:
229 raise FsException("Missing parameter \"path\"")
230 if not self.path.endswith("/"):
231 self.path += "/"
232 if not os.path.exists(self.path):
233 raise FsException("Invalid configuration param at '[storage]': path '{}' does not exist".format(
234 config["path"]))
235 elif not os.access(self.path, os.W_OK):
236 raise FsException("Invalid configuration param at '[storage]': path '{}' is not writable".format(
237 config["path"]))
238 if all(key in config.keys() for key in ["uri", "collection"]):
239 self.client = MongoClient(config["uri"])
240 self.fs = GridFSBucket(self.client[config["collection"]])
241 elif all(key in config.keys() for key in ["host", "port", "collection"]):
242 self.client = MongoClient(config["host"], config["port"])
243 self.fs = GridFSBucket(self.client[config["collection"]])
244 else:
245 if "collection" not in config.keys():
246 raise FsException("Missing parameter \"collection\"")
247 else:
248 raise FsException("Missing parameters: \"uri\" or \"host\" + \"port\"")
249 except FsException:
250 raise
251 except Exception as e: # TODO refine
252 raise FsException(str(e))
253
254 def fs_disconnect(self):
255 pass # TODO
256
257 def mkdir(self, folder):
258 """
259 Creates a folder or parent object location
260 :param folder:
261 :return: None or raises an exception
262 """
263 try:
264 self.fs.upload_from_stream(
265 folder, BytesIO(), metadata={"type": "dir"})
266 except errors.FileExists: # make it idempotent
267 pass
268 except Exception as e:
269 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
270
271 def dir_rename(self, src, dst):
272 """
273 Rename one directory name. If dst exist, it replaces (deletes) existing directory
274 :param src: source directory
275 :param dst: destination directory
276 :return: None or raises and exception
277 """
278 try:
279 dst_cursor = self.fs.find(
280 {"filename": {"$regex": "^{}(/|$)".format(dst)}},
281 no_cursor_timeout=True)
282
283 for dst_file in dst_cursor:
284 self.fs.delete(dst_file._id)
285
286 src_cursor = self.fs.find(
287 {"filename": {"$regex": "^{}(/|$)".format(src)}},
288 no_cursor_timeout=True)
289
290 for src_file in src_cursor:
291 self.fs.rename(src_file._id, src_file.filename.replace(src, dst, 1))
292 except Exception as e:
293 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
294
295 def file_exists(self, storage, mode=None):
296 """
297 Indicates if "storage" file exist
298 :param storage: can be a str or a str list
299 :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
300 :return: True, False
301 """
302 f = storage if isinstance(storage, str) else "/".join(storage)
303
304 cursor = self.fs.find({"filename": f})
305
306 for requested_file in cursor:
307 exception_file = next(cursor, None)
308
309 if exception_file:
310 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
311
312 if requested_file.metadata["type"] == mode:
313 return True
sousaedub95cca62020-03-12 11:12:25 +0000314
315 if requested_file.metadata["type"] == "sym" and mode == "file":
316 return True
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100317
318 return False
319
320 def file_size(self, storage):
321 """
322 return file size
323 :param storage: can be a str or a str list
324 :return: file size
325 """
326 f = storage if isinstance(storage, str) else "/".join(storage)
327
328 cursor = self.fs.find({"filename": f})
329
330 for requested_file in cursor:
331 exception_file = next(cursor, None)
332
333 if exception_file:
334 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
335
336 return requested_file.length
337
338 def file_extract(self, tar_object, path):
339 """
340 extract a tar file
341 :param tar_object: object of type tar
342 :param path: can be a str or a str list, or a tar object where to extract the tar_object
343 :return: None
344 """
345 f = path if isinstance(path, str) else "/".join(path)
346
347 for member in tar_object.getmembers():
348 if member.isfile():
349 stream = tar_object.extractfile(member)
350 else:
351 stream = BytesIO()
352
sousaedub95cca62020-03-12 11:12:25 +0000353 if member.isfile():
354 file_type = "file"
355 elif member.issym():
356 file_type = "sym"
357 else:
358 file_type = "dir"
359
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100360 metadata = {
sousaedub95cca62020-03-12 11:12:25 +0000361 "type": file_type,
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100362 "permissions": member.mode
363 }
364
365 self.fs.upload_from_stream(
366 f + "/" + member.name,
367 stream,
368 metadata=metadata
369 )
370
371 stream.close()
372
373 def file_open(self, storage, mode):
374 """
375 Open a file
376 :param storage: can be a str or list of str
377 :param mode: file mode
378 :return: file object
379 """
380 try:
381 f = storage if isinstance(storage, str) else "/".join(storage)
382
383 if "b" in mode:
384 return GridByteStream(f, self.fs, mode)
385 else:
386 return GridStringStream(f, self.fs, mode)
387 except errors.NoFile:
388 raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
389 except IOError:
390 raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
391
392 def dir_ls(self, storage):
393 """
394 return folder content
395 :param storage: can be a str or list of str
396 :return: folder content
397 """
398 try:
399 f = storage if isinstance(storage, str) else "/".join(storage)
400
401 files = []
402 dir_cursor = self.fs.find({"filename": f})
403 for requested_dir in dir_cursor:
404 exception_dir = next(dir_cursor, None)
405
406 if exception_dir:
407 raise FsException("Multiple directories found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
408
409 if requested_dir.metadata["type"] != "dir":
410 raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
411
412 files_cursor = self.fs.find({"filename": {"$regex": "^{}/([^/])*".format(f)}})
413 for children_file in files_cursor:
414 files += [children_file.filename.replace(f + '/', '', 1)]
415
416 return files
417 except IOError:
418 raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
419
420 def file_delete(self, storage, ignore_non_exist=False):
421 """
422 Delete storage content recursively
423 :param storage: can be a str or list of str
424 :param ignore_non_exist: not raise exception if storage does not exist
425 :return: None
426 """
427 try:
428 f = storage if isinstance(storage, str) else "/".join(storage)
429
430 file_cursor = self.fs.find({"filename": f})
431 found = False
432 for requested_file in file_cursor:
433 found = True
434 exception_file = next(file_cursor, None)
435
436 if exception_file:
437 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
438
439 if requested_file.metadata["type"] == "dir":
440 dir_cursor = self.fs.find({"filename": {"$regex": "^{}".format(f)}})
441
442 for tmp in dir_cursor:
443 self.fs.delete(tmp._id)
444 else:
445 self.fs.delete(requested_file._id)
446 if not found and not ignore_non_exist:
447 raise FsException("File {} does not exist".format(storage), http_code=HTTPStatus.NOT_FOUND)
448 except IOError as e:
449 raise FsException("File {} cannot be deleted: {}".format(f, e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
David Garcia788b9d62020-01-20 13:21:06 +0100450
451 def sync(self):
452 """
453 Sync from FSMongo to local storage
454 """
455 self.__update_local_fs()