blob: af52ee5b1dddba146a8a80b2b641354fd1316344 [file] [log] [blame]
Eduardo Sousa0593aba2019-06-04 12:55:43 +01001# Copyright 2019 Canonical
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14#
15# For those usages not covered by the Apache License, Version 2.0 please
16# contact: eduardo.sousa@canonical.com
17##
18
beierlm98260792020-07-08 16:32:50 -040019import errno
Eduardo Sousa0593aba2019-06-04 12:55:43 +010020from http import HTTPStatus
beierlm98260792020-07-08 16:32:50 -040021from io import BytesIO, StringIO
22import logging
Eduardo Sousa0593aba2019-06-04 12:55:43 +010023import os
beierlm98260792020-07-08 16:32:50 -040024
25from gridfs import GridFSBucket, errors
Eduardo Sousa0593aba2019-06-04 12:55:43 +010026from osm_common.fsbase import FsBase, FsException
beierlm98260792020-07-08 16:32:50 -040027from pymongo import MongoClient
28
Eduardo Sousa0593aba2019-06-04 12:55:43 +010029
30__author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>"
31
32
33class GridByteStream(BytesIO):
34 def __init__(self, filename, fs, mode):
35 BytesIO.__init__(self)
36 self._id = None
37 self.filename = filename
38 self.fs = fs
39 self.mode = mode
David Garcia90a77662020-05-20 12:09:37 +020040 self.file_type = "file" # Set "file" as default file_type
Eduardo Sousa0593aba2019-06-04 12:55:43 +010041
42 self.__initialize__()
43
44 def __initialize__(self):
45 grid_file = None
46
47 cursor = self.fs.find({"filename": self.filename})
48
49 for requested_file in cursor:
50 exception_file = next(cursor, None)
51
52 if exception_file:
53 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
54
sousaedub95cca62020-03-12 11:12:25 +000055 if requested_file.metadata["type"] in ("file", "sym"):
Eduardo Sousa0593aba2019-06-04 12:55:43 +010056 grid_file = requested_file
sousaedub95cca62020-03-12 11:12:25 +000057 self.file_type = requested_file.metadata["type"]
Eduardo Sousa0593aba2019-06-04 12:55:43 +010058 else:
59 raise FsException("Type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
60
61 if grid_file:
62 self._id = grid_file._id
63 self.fs.download_to_stream(self._id, self)
64
65 if "r" in self.mode:
66 self.seek(0, 0)
67
68 def close(self):
69 if "r" in self.mode:
70 super(GridByteStream, self).close()
71 return
72
73 if self._id:
74 self.fs.delete(self._id)
75
76 cursor = self.fs.find({
77 "filename": self.filename.split("/")[0],
78 "metadata": {"type": "dir"}})
79
80 parent_dir = next(cursor, None)
81
82 if not parent_dir:
83 parent_dir_name = self.filename.split("/")[0]
84 self.filename = self.filename.replace(parent_dir_name, parent_dir_name[:-1], 1)
85
86 self.seek(0, 0)
87 if self._id:
88 self.fs.upload_from_stream_with_id(
89 self._id,
90 self.filename,
91 self,
sousaedub95cca62020-03-12 11:12:25 +000092 metadata={"type": self.file_type}
Eduardo Sousa0593aba2019-06-04 12:55:43 +010093 )
94 else:
95 self.fs.upload_from_stream(
96 self.filename,
97 self,
sousaedub95cca62020-03-12 11:12:25 +000098 metadata={"type": self.file_type}
Eduardo Sousa0593aba2019-06-04 12:55:43 +010099 )
100 super(GridByteStream, self).close()
101
102 def __enter__(self):
103 return self
104
105 def __exit__(self, exc_type, exc_val, exc_tb):
106 self.close()
107
108
109class GridStringStream(StringIO):
110 def __init__(self, filename, fs, mode):
111 StringIO.__init__(self)
112 self._id = None
113 self.filename = filename
114 self.fs = fs
115 self.mode = mode
David Garcia90a77662020-05-20 12:09:37 +0200116 self.file_type = "file" # Set "file" as default file_type
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100117
118 self.__initialize__()
119
120 def __initialize__(self):
121 grid_file = None
122
123 cursor = self.fs.find({"filename": self.filename})
124
125 for requested_file in cursor:
126 exception_file = next(cursor, None)
127
128 if exception_file:
129 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
130
sousaedub95cca62020-03-12 11:12:25 +0000131 if requested_file.metadata["type"] in ("file", "dir"):
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100132 grid_file = requested_file
sousaedub95cca62020-03-12 11:12:25 +0000133 self.file_type = requested_file.metadata["type"]
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100134 else:
135 raise FsException("File type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
136
137 if grid_file:
138 stream = BytesIO()
139 self._id = grid_file._id
140 self.fs.download_to_stream(self._id, stream)
141 stream.seek(0)
142 self.write(stream.read().decode("utf-8"))
143 stream.close()
144
145 if "r" in self.mode:
146 self.seek(0, 0)
147
148 def close(self):
149 if "r" in self.mode:
150 super(GridStringStream, self).close()
151 return
152
153 if self._id:
154 self.fs.delete(self._id)
155
156 cursor = self.fs.find({
157 "filename": self.filename.split("/")[0],
158 "metadata": {"type": "dir"}})
159
160 parent_dir = next(cursor, None)
161
162 if not parent_dir:
163 parent_dir_name = self.filename.split("/")[0]
164 self.filename = self.filename.replace(parent_dir_name, parent_dir_name[:-1], 1)
165
166 self.seek(0, 0)
167 stream = BytesIO()
168 stream.write(self.read().encode("utf-8"))
169 stream.seek(0, 0)
170 if self._id:
171 self.fs.upload_from_stream_with_id(
172 self._id,
173 self.filename,
174 stream,
sousaedub95cca62020-03-12 11:12:25 +0000175 metadata={"type": self.file_type}
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100176 )
177 else:
178 self.fs.upload_from_stream(
179 self.filename,
180 stream,
sousaedub95cca62020-03-12 11:12:25 +0000181 metadata={"type": self.file_type}
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100182 )
183 stream.close()
184 super(GridStringStream, self).close()
185
186 def __enter__(self):
187 return self
188
189 def __exit__(self, exc_type, exc_val, exc_tb):
190 self.close()
191
192
193class FsMongo(FsBase):
194
195 def __init__(self, logger_name='fs', lock=False):
196 super().__init__(logger_name, lock)
197 self.path = None
198 self.client = None
199 self.fs = None
200
201 def __update_local_fs(self):
202 dir_cursor = self.fs.find({"metadata.type": "dir"}, no_cursor_timeout=True)
203
204 for directory in dir_cursor:
205 os.makedirs(self.path + directory.filename, exist_ok=True)
206
David Garcia90a77662020-05-20 12:09:37 +0200207 file_cursor = self.fs.find({"metadata.type": {"$in": ["file", "sym"]}}, no_cursor_timeout=True)
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100208
209 for writing_file in file_cursor:
210 file_path = self.path + writing_file.filename
David Garcia2a3f7332020-06-26 17:04:37 +0200211
212 if writing_file.metadata["type"] == "sym":
213 with BytesIO() as b:
214 self.fs.download_to_stream(writing_file._id, b)
215 b.seek(0)
216 link = b.read().decode("utf-8")
beierlm98260792020-07-08 16:32:50 -0400217
218 try:
219 os.remove(file_path)
220 except OSError as e:
221 if e.errno != errno.ENOENT:
222 # This is probably permission denied or worse
223 raise
David Garcia2a3f7332020-06-26 17:04:37 +0200224 os.symlink(link, file_path)
225 else:
226 with open(file_path, 'wb+') as file_stream:
227 self.fs.download_to_stream(writing_file._id, file_stream)
228 if "permissions" in writing_file.metadata:
sousaedub95cca62020-03-12 11:12:25 +0000229 os.chmod(file_path, writing_file.metadata["permissions"])
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100230
231 def get_params(self):
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100232 return {"fs": "mongo", "path": self.path}
233
234 def fs_connect(self, config):
235 try:
236 if "logger_name" in config:
237 self.logger = logging.getLogger(config["logger_name"])
238 if "path" in config:
239 self.path = config["path"]
240 else:
241 raise FsException("Missing parameter \"path\"")
242 if not self.path.endswith("/"):
243 self.path += "/"
244 if not os.path.exists(self.path):
245 raise FsException("Invalid configuration param at '[storage]': path '{}' does not exist".format(
246 config["path"]))
247 elif not os.access(self.path, os.W_OK):
248 raise FsException("Invalid configuration param at '[storage]': path '{}' is not writable".format(
249 config["path"]))
250 if all(key in config.keys() for key in ["uri", "collection"]):
251 self.client = MongoClient(config["uri"])
252 self.fs = GridFSBucket(self.client[config["collection"]])
253 elif all(key in config.keys() for key in ["host", "port", "collection"]):
254 self.client = MongoClient(config["host"], config["port"])
255 self.fs = GridFSBucket(self.client[config["collection"]])
256 else:
257 if "collection" not in config.keys():
258 raise FsException("Missing parameter \"collection\"")
259 else:
260 raise FsException("Missing parameters: \"uri\" or \"host\" + \"port\"")
261 except FsException:
262 raise
263 except Exception as e: # TODO refine
264 raise FsException(str(e))
265
266 def fs_disconnect(self):
267 pass # TODO
268
269 def mkdir(self, folder):
270 """
271 Creates a folder or parent object location
272 :param folder:
273 :return: None or raises an exception
274 """
275 try:
276 self.fs.upload_from_stream(
277 folder, BytesIO(), metadata={"type": "dir"})
278 except errors.FileExists: # make it idempotent
279 pass
280 except Exception as e:
281 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
282
283 def dir_rename(self, src, dst):
284 """
285 Rename one directory name. If dst exist, it replaces (deletes) existing directory
286 :param src: source directory
287 :param dst: destination directory
288 :return: None or raises and exception
289 """
290 try:
291 dst_cursor = self.fs.find(
292 {"filename": {"$regex": "^{}(/|$)".format(dst)}},
293 no_cursor_timeout=True)
294
295 for dst_file in dst_cursor:
296 self.fs.delete(dst_file._id)
297
298 src_cursor = self.fs.find(
299 {"filename": {"$regex": "^{}(/|$)".format(src)}},
300 no_cursor_timeout=True)
301
302 for src_file in src_cursor:
303 self.fs.rename(src_file._id, src_file.filename.replace(src, dst, 1))
304 except Exception as e:
305 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
306
307 def file_exists(self, storage, mode=None):
308 """
309 Indicates if "storage" file exist
310 :param storage: can be a str or a str list
311 :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
312 :return: True, False
313 """
314 f = storage if isinstance(storage, str) else "/".join(storage)
315
316 cursor = self.fs.find({"filename": f})
317
318 for requested_file in cursor:
319 exception_file = next(cursor, None)
320
321 if exception_file:
322 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
323
324 if requested_file.metadata["type"] == mode:
325 return True
beierlm98260792020-07-08 16:32:50 -0400326
sousaedub95cca62020-03-12 11:12:25 +0000327 if requested_file.metadata["type"] == "sym" and mode == "file":
328 return True
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100329
330 return False
331
332 def file_size(self, storage):
333 """
334 return file size
335 :param storage: can be a str or a str list
336 :return: file size
337 """
338 f = storage if isinstance(storage, str) else "/".join(storage)
339
340 cursor = self.fs.find({"filename": f})
341
342 for requested_file in cursor:
343 exception_file = next(cursor, None)
344
345 if exception_file:
346 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
347
348 return requested_file.length
349
350 def file_extract(self, tar_object, path):
351 """
352 extract a tar file
353 :param tar_object: object of type tar
354 :param path: can be a str or a str list, or a tar object where to extract the tar_object
355 :return: None
356 """
357 f = path if isinstance(path, str) else "/".join(path)
358
359 for member in tar_object.getmembers():
360 if member.isfile():
361 stream = tar_object.extractfile(member)
David Garcia2a3f7332020-06-26 17:04:37 +0200362 elif member.issym():
363 stream = BytesIO(member.linkname.encode("utf-8"))
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100364 else:
365 stream = BytesIO()
366
sousaedub95cca62020-03-12 11:12:25 +0000367 if member.isfile():
368 file_type = "file"
369 elif member.issym():
370 file_type = "sym"
371 else:
372 file_type = "dir"
373
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100374 metadata = {
sousaedub95cca62020-03-12 11:12:25 +0000375 "type": file_type,
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100376 "permissions": member.mode
377 }
378
379 self.fs.upload_from_stream(
380 f + "/" + member.name,
381 stream,
382 metadata=metadata
383 )
384
385 stream.close()
386
387 def file_open(self, storage, mode):
388 """
389 Open a file
390 :param storage: can be a str or list of str
391 :param mode: file mode
392 :return: file object
393 """
394 try:
395 f = storage if isinstance(storage, str) else "/".join(storage)
396
397 if "b" in mode:
398 return GridByteStream(f, self.fs, mode)
399 else:
400 return GridStringStream(f, self.fs, mode)
401 except errors.NoFile:
402 raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
403 except IOError:
404 raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
405
406 def dir_ls(self, storage):
407 """
408 return folder content
409 :param storage: can be a str or list of str
410 :return: folder content
411 """
412 try:
413 f = storage if isinstance(storage, str) else "/".join(storage)
414
415 files = []
416 dir_cursor = self.fs.find({"filename": f})
417 for requested_dir in dir_cursor:
418 exception_dir = next(dir_cursor, None)
419
420 if exception_dir:
421 raise FsException("Multiple directories found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
422
423 if requested_dir.metadata["type"] != "dir":
424 raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
425
426 files_cursor = self.fs.find({"filename": {"$regex": "^{}/([^/])*".format(f)}})
427 for children_file in files_cursor:
428 files += [children_file.filename.replace(f + '/', '', 1)]
429
430 return files
431 except IOError:
432 raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
433
434 def file_delete(self, storage, ignore_non_exist=False):
435 """
436 Delete storage content recursively
437 :param storage: can be a str or list of str
438 :param ignore_non_exist: not raise exception if storage does not exist
439 :return: None
440 """
441 try:
442 f = storage if isinstance(storage, str) else "/".join(storage)
443
444 file_cursor = self.fs.find({"filename": f})
445 found = False
446 for requested_file in file_cursor:
447 found = True
448 exception_file = next(file_cursor, None)
449
450 if exception_file:
451 raise FsException("Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
452
453 if requested_file.metadata["type"] == "dir":
454 dir_cursor = self.fs.find({"filename": {"$regex": "^{}".format(f)}})
455
456 for tmp in dir_cursor:
457 self.fs.delete(tmp._id)
458 else:
459 self.fs.delete(requested_file._id)
460 if not found and not ignore_non_exist:
beierlm98260792020-07-08 16:32:50 -0400461 raise FsException("File {} does not exist".format(storage), http_code=HTTPStatus.NOT_FOUND)
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100462 except IOError as e:
463 raise FsException("File {} cannot be deleted: {}".format(f, e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
David Garcia788b9d62020-01-20 13:21:06 +0100464
465 def sync(self):
466 """
467 Sync from FSMongo to local storage
468 """
469 self.__update_local_fs()