blob: d92384243997d242e59f6b342544f114d5c708e5 [file] [log] [blame]
Eduardo Sousa0593aba2019-06-04 12:55:43 +01001# Copyright 2019 Canonical
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14#
15# For those usages not covered by the Apache License, Version 2.0 please
16# contact: eduardo.sousa@canonical.com
17##
18
beierlmff2e8262020-07-08 16:32:50 -040019import errno
Eduardo Sousa0593aba2019-06-04 12:55:43 +010020from http import HTTPStatus
beierlmff2e8262020-07-08 16:32:50 -040021from io import BytesIO, StringIO
22import logging
Eduardo Sousa0593aba2019-06-04 12:55:43 +010023import os
lloretgallegf296d2a2020-09-02 09:36:24 +000024import datetime
bravof98fc8f02021-11-04 21:16:00 -030025import tarfile
26import zipfile
beierlmff2e8262020-07-08 16:32:50 -040027
28from gridfs import GridFSBucket, errors
Eduardo Sousa0593aba2019-06-04 12:55:43 +010029from osm_common.fsbase import FsBase, FsException
beierlmff2e8262020-07-08 16:32:50 -040030from pymongo import MongoClient
31
Eduardo Sousa0593aba2019-06-04 12:55:43 +010032
33__author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>"
34
35
36class GridByteStream(BytesIO):
37 def __init__(self, filename, fs, mode):
38 BytesIO.__init__(self)
39 self._id = None
40 self.filename = filename
41 self.fs = fs
42 self.mode = mode
David Garcia7982b782020-05-20 12:09:37 +020043 self.file_type = "file" # Set "file" as default file_type
Eduardo Sousa0593aba2019-06-04 12:55:43 +010044
45 self.__initialize__()
46
47 def __initialize__(self):
48 grid_file = None
49
50 cursor = self.fs.find({"filename": self.filename})
51
52 for requested_file in cursor:
53 exception_file = next(cursor, None)
54
55 if exception_file:
garciadeblas2644b762021-03-24 09:21:01 +010056 raise FsException(
57 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
58 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +010059
sousaedub95cca62020-03-12 11:12:25 +000060 if requested_file.metadata["type"] in ("file", "sym"):
Eduardo Sousa0593aba2019-06-04 12:55:43 +010061 grid_file = requested_file
sousaedub95cca62020-03-12 11:12:25 +000062 self.file_type = requested_file.metadata["type"]
Eduardo Sousa0593aba2019-06-04 12:55:43 +010063 else:
garciadeblas2644b762021-03-24 09:21:01 +010064 raise FsException(
65 "Type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
66 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +010067
68 if grid_file:
69 self._id = grid_file._id
70 self.fs.download_to_stream(self._id, self)
71
72 if "r" in self.mode:
73 self.seek(0, 0)
74
75 def close(self):
76 if "r" in self.mode:
77 super(GridByteStream, self).close()
78 return
79
80 if self._id:
81 self.fs.delete(self._id)
82
garciadeblas2644b762021-03-24 09:21:01 +010083 cursor = self.fs.find(
84 {"filename": self.filename.split("/")[0], "metadata": {"type": "dir"}}
85 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +010086
87 parent_dir = next(cursor, None)
88
89 if not parent_dir:
90 parent_dir_name = self.filename.split("/")[0]
garciadeblas2644b762021-03-24 09:21:01 +010091 self.filename = self.filename.replace(
92 parent_dir_name, parent_dir_name[:-1], 1
93 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +010094
95 self.seek(0, 0)
96 if self._id:
97 self.fs.upload_from_stream_with_id(
garciadeblas2644b762021-03-24 09:21:01 +010098 self._id, self.filename, self, metadata={"type": self.file_type}
Eduardo Sousa0593aba2019-06-04 12:55:43 +010099 )
100 else:
101 self.fs.upload_from_stream(
garciadeblas2644b762021-03-24 09:21:01 +0100102 self.filename, self, metadata={"type": self.file_type}
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100103 )
104 super(GridByteStream, self).close()
105
106 def __enter__(self):
107 return self
108
109 def __exit__(self, exc_type, exc_val, exc_tb):
110 self.close()
111
112
113class GridStringStream(StringIO):
114 def __init__(self, filename, fs, mode):
115 StringIO.__init__(self)
116 self._id = None
117 self.filename = filename
118 self.fs = fs
119 self.mode = mode
David Garcia7982b782020-05-20 12:09:37 +0200120 self.file_type = "file" # Set "file" as default file_type
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100121
122 self.__initialize__()
123
124 def __initialize__(self):
125 grid_file = None
126
127 cursor = self.fs.find({"filename": self.filename})
128
129 for requested_file in cursor:
130 exception_file = next(cursor, None)
131
132 if exception_file:
garciadeblas2644b762021-03-24 09:21:01 +0100133 raise FsException(
134 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
135 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100136
sousaedub95cca62020-03-12 11:12:25 +0000137 if requested_file.metadata["type"] in ("file", "dir"):
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100138 grid_file = requested_file
sousaedub95cca62020-03-12 11:12:25 +0000139 self.file_type = requested_file.metadata["type"]
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100140 else:
garciadeblas2644b762021-03-24 09:21:01 +0100141 raise FsException(
142 "File type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
143 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100144
145 if grid_file:
146 stream = BytesIO()
147 self._id = grid_file._id
148 self.fs.download_to_stream(self._id, stream)
149 stream.seek(0)
150 self.write(stream.read().decode("utf-8"))
151 stream.close()
152
153 if "r" in self.mode:
154 self.seek(0, 0)
155
156 def close(self):
157 if "r" in self.mode:
158 super(GridStringStream, self).close()
159 return
160
161 if self._id:
162 self.fs.delete(self._id)
163
garciadeblas2644b762021-03-24 09:21:01 +0100164 cursor = self.fs.find(
165 {"filename": self.filename.split("/")[0], "metadata": {"type": "dir"}}
166 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100167
168 parent_dir = next(cursor, None)
169
170 if not parent_dir:
171 parent_dir_name = self.filename.split("/")[0]
garciadeblas2644b762021-03-24 09:21:01 +0100172 self.filename = self.filename.replace(
173 parent_dir_name, parent_dir_name[:-1], 1
174 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100175
176 self.seek(0, 0)
177 stream = BytesIO()
178 stream.write(self.read().encode("utf-8"))
179 stream.seek(0, 0)
180 if self._id:
181 self.fs.upload_from_stream_with_id(
garciadeblas2644b762021-03-24 09:21:01 +0100182 self._id, self.filename, stream, metadata={"type": self.file_type}
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100183 )
184 else:
185 self.fs.upload_from_stream(
garciadeblas2644b762021-03-24 09:21:01 +0100186 self.filename, stream, metadata={"type": self.file_type}
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100187 )
188 stream.close()
189 super(GridStringStream, self).close()
190
191 def __enter__(self):
192 return self
193
194 def __exit__(self, exc_type, exc_val, exc_tb):
195 self.close()
196
197
198class FsMongo(FsBase):
garciadeblas2644b762021-03-24 09:21:01 +0100199 def __init__(self, logger_name="fs", lock=False):
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100200 super().__init__(logger_name, lock)
201 self.path = None
202 self.client = None
203 self.fs = None
204
tiernob07e4ef2020-05-06 14:22:48 +0000205 def __update_local_fs(self, from_path=None):
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100206 dir_cursor = self.fs.find({"metadata.type": "dir"}, no_cursor_timeout=True)
207
bravoff73a9002021-11-23 10:34:43 -0300208 valid_paths = []
209
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100210 for directory in dir_cursor:
tiernob07e4ef2020-05-06 14:22:48 +0000211 if from_path and not directory.filename.startswith(from_path):
212 continue
beierlm3d82ba22022-04-19 14:12:50 -0400213 self.logger.debug("Making dir {}".format(self.path + directory.filename))
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100214 os.makedirs(self.path + directory.filename, exist_ok=True)
bravoff73a9002021-11-23 10:34:43 -0300215 valid_paths.append(self.path + directory.filename)
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100216
garciadeblas2644b762021-03-24 09:21:01 +0100217 file_cursor = self.fs.find(
218 {"metadata.type": {"$in": ["file", "sym"]}}, no_cursor_timeout=True
219 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100220
221 for writing_file in file_cursor:
tiernob07e4ef2020-05-06 14:22:48 +0000222 if from_path and not writing_file.filename.startswith(from_path):
223 continue
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100224 file_path = self.path + writing_file.filename
David Garcia8ab6cc62020-06-26 17:04:37 +0200225
226 if writing_file.metadata["type"] == "sym":
227 with BytesIO() as b:
228 self.fs.download_to_stream(writing_file._id, b)
229 b.seek(0)
230 link = b.read().decode("utf-8")
beierlmff2e8262020-07-08 16:32:50 -0400231
232 try:
beierlm3d82ba22022-04-19 14:12:50 -0400233 self.logger.debug("Sync removing {}".format(file_path))
beierlmff2e8262020-07-08 16:32:50 -0400234 os.remove(file_path)
235 except OSError as e:
236 if e.errno != errno.ENOENT:
237 # This is probably permission denied or worse
238 raise
David Garcia8ab6cc62020-06-26 17:04:37 +0200239 os.symlink(link, file_path)
240 else:
bravoff73a9002021-11-23 10:34:43 -0300241 folder = os.path.dirname(file_path)
242 if folder not in valid_paths:
beierlm3d82ba22022-04-19 14:12:50 -0400243 self.logger.debug("Sync local directory {}".format(file_path))
bravoff73a9002021-11-23 10:34:43 -0300244 os.makedirs(folder, exist_ok=True)
garciadeblas2644b762021-03-24 09:21:01 +0100245 with open(file_path, "wb+") as file_stream:
beierlm3d82ba22022-04-19 14:12:50 -0400246 self.logger.debug("Sync download {}".format(file_path))
David Garcia8ab6cc62020-06-26 17:04:37 +0200247 self.fs.download_to_stream(writing_file._id, file_stream)
248 if "permissions" in writing_file.metadata:
sousaedub95cca62020-03-12 11:12:25 +0000249 os.chmod(file_path, writing_file.metadata["permissions"])
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100250
251 def get_params(self):
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100252 return {"fs": "mongo", "path": self.path}
253
254 def fs_connect(self, config):
255 try:
256 if "logger_name" in config:
257 self.logger = logging.getLogger(config["logger_name"])
258 if "path" in config:
259 self.path = config["path"]
260 else:
garciadeblas2644b762021-03-24 09:21:01 +0100261 raise FsException('Missing parameter "path"')
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100262 if not self.path.endswith("/"):
263 self.path += "/"
264 if not os.path.exists(self.path):
garciadeblas2644b762021-03-24 09:21:01 +0100265 raise FsException(
266 "Invalid configuration param at '[storage]': path '{}' does not exist".format(
267 config["path"]
268 )
269 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100270 elif not os.access(self.path, os.W_OK):
garciadeblas2644b762021-03-24 09:21:01 +0100271 raise FsException(
272 "Invalid configuration param at '[storage]': path '{}' is not writable".format(
273 config["path"]
274 )
275 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100276 if all(key in config.keys() for key in ["uri", "collection"]):
277 self.client = MongoClient(config["uri"])
278 self.fs = GridFSBucket(self.client[config["collection"]])
279 elif all(key in config.keys() for key in ["host", "port", "collection"]):
280 self.client = MongoClient(config["host"], config["port"])
281 self.fs = GridFSBucket(self.client[config["collection"]])
282 else:
283 if "collection" not in config.keys():
garciadeblas2644b762021-03-24 09:21:01 +0100284 raise FsException('Missing parameter "collection"')
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100285 else:
garciadeblas2644b762021-03-24 09:21:01 +0100286 raise FsException('Missing parameters: "uri" or "host" + "port"')
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100287 except FsException:
288 raise
289 except Exception as e: # TODO refine
290 raise FsException(str(e))
291
292 def fs_disconnect(self):
293 pass # TODO
294
295 def mkdir(self, folder):
296 """
297 Creates a folder or parent object location
298 :param folder:
299 :return: None or raises an exception
300 """
beierlm40f57c82022-04-14 14:45:07 -0400301 folder = folder.rstrip("/")
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100302 try:
garciadeblas2644b762021-03-24 09:21:01 +0100303 self.fs.upload_from_stream(folder, BytesIO(), metadata={"type": "dir"})
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100304 except errors.FileExists: # make it idempotent
305 pass
306 except Exception as e:
307 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
308
309 def dir_rename(self, src, dst):
310 """
311 Rename one directory name. If dst exist, it replaces (deletes) existing directory
312 :param src: source directory
313 :param dst: destination directory
314 :return: None or raises and exception
315 """
beierlm40f57c82022-04-14 14:45:07 -0400316 dst = dst.rstrip("/")
317 src = src.rstrip("/")
318
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100319 try:
320 dst_cursor = self.fs.find(
garciadeblas2644b762021-03-24 09:21:01 +0100321 {"filename": {"$regex": "^{}(/|$)".format(dst)}}, no_cursor_timeout=True
322 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100323
324 for dst_file in dst_cursor:
325 self.fs.delete(dst_file._id)
326
327 src_cursor = self.fs.find(
garciadeblas2644b762021-03-24 09:21:01 +0100328 {"filename": {"$regex": "^{}(/|$)".format(src)}}, no_cursor_timeout=True
329 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100330
331 for src_file in src_cursor:
332 self.fs.rename(src_file._id, src_file.filename.replace(src, dst, 1))
333 except Exception as e:
334 raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
335
336 def file_exists(self, storage, mode=None):
337 """
338 Indicates if "storage" file exist
339 :param storage: can be a str or a str list
340 :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
341 :return: True, False
342 """
343 f = storage if isinstance(storage, str) else "/".join(storage)
beierlm40f57c82022-04-14 14:45:07 -0400344 f = f.rstrip("/")
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100345
346 cursor = self.fs.find({"filename": f})
347
348 for requested_file in cursor:
349 exception_file = next(cursor, None)
350
351 if exception_file:
garciadeblas2644b762021-03-24 09:21:01 +0100352 raise FsException(
353 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
354 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100355
beierlm40f57c82022-04-14 14:45:07 -0400356 self.logger.debug("Entry {} metadata {}".format(f, requested_file.metadata))
bravof98fc8f02021-11-04 21:16:00 -0300357
lloretgallegf296d2a2020-09-02 09:36:24 +0000358 # if no special mode is required just check it does exists
359 if not mode:
360 return True
361
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100362 if requested_file.metadata["type"] == mode:
363 return True
beierlmff2e8262020-07-08 16:32:50 -0400364
sousaedub95cca62020-03-12 11:12:25 +0000365 if requested_file.metadata["type"] == "sym" and mode == "file":
366 return True
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100367
368 return False
369
370 def file_size(self, storage):
371 """
372 return file size
373 :param storage: can be a str or a str list
374 :return: file size
375 """
376 f = storage if isinstance(storage, str) else "/".join(storage)
beierlm40f57c82022-04-14 14:45:07 -0400377 f = f.rstrip("/")
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100378
379 cursor = self.fs.find({"filename": f})
380
381 for requested_file in cursor:
382 exception_file = next(cursor, None)
383
384 if exception_file:
garciadeblas2644b762021-03-24 09:21:01 +0100385 raise FsException(
386 "Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
387 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100388
389 return requested_file.length
390
bravof98fc8f02021-11-04 21:16:00 -0300391 def file_extract(self, compressed_object, path):
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100392 """
393 extract a tar file
bravof98fc8f02021-11-04 21:16:00 -0300394 :param compressed_object: object of type tar or zip
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100395 :param path: can be a str or a str list, or a tar object where to extract the tar_object
396 :return: None
397 """
398 f = path if isinstance(path, str) else "/".join(path)
beierlm40f57c82022-04-14 14:45:07 -0400399 f = f.rstrip("/")
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100400
bravof98fc8f02021-11-04 21:16:00 -0300401 if type(compressed_object) is tarfile.TarFile:
402 for member in compressed_object.getmembers():
403 if member.isfile():
404 stream = compressed_object.extractfile(member)
405 elif member.issym():
406 stream = BytesIO(member.linkname.encode("utf-8"))
407 else:
408 stream = BytesIO()
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100409
bravof98fc8f02021-11-04 21:16:00 -0300410 if member.isfile():
411 file_type = "file"
412 elif member.issym():
413 file_type = "sym"
414 else:
415 file_type = "dir"
sousaedub95cca62020-03-12 11:12:25 +0000416
bravof98fc8f02021-11-04 21:16:00 -0300417 metadata = {"type": file_type, "permissions": member.mode}
beierlm40f57c82022-04-14 14:45:07 -0400418 member.name = member.name.rstrip("/")
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100419
beierlm3d82ba22022-04-19 14:12:50 -0400420 self.logger.debug("Uploading {}/{}".format(f, member.name))
bravof98fc8f02021-11-04 21:16:00 -0300421 self.fs.upload_from_stream(
422 f + "/" + member.name, stream, metadata=metadata
423 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100424
bravof98fc8f02021-11-04 21:16:00 -0300425 stream.close()
426 elif type(compressed_object) is zipfile.ZipFile:
427 for member in compressed_object.infolist():
428 if member.is_dir():
429 stream = BytesIO()
430 else:
431 stream = compressed_object.read(member)
432
433 if member.is_dir():
434 file_type = "dir"
435 else:
436 file_type = "file"
437
438 metadata = {"type": file_type}
beierlm40f57c82022-04-14 14:45:07 -0400439 member.filename = member.filename.rstrip("/")
bravof98fc8f02021-11-04 21:16:00 -0300440
beierlm3d82ba22022-04-19 14:12:50 -0400441 self.logger.debug("Uploading {}/{}".format(f, member.filename))
bravof98fc8f02021-11-04 21:16:00 -0300442 self.fs.upload_from_stream(
443 f + "/" + member.filename, stream, metadata=metadata
444 )
445
446 if member.is_dir():
447 stream.close()
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100448
449 def file_open(self, storage, mode):
450 """
451 Open a file
452 :param storage: can be a str or list of str
453 :param mode: file mode
454 :return: file object
455 """
456 try:
457 f = storage if isinstance(storage, str) else "/".join(storage)
beierlm40f57c82022-04-14 14:45:07 -0400458 f = f.rstrip("/")
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100459
460 if "b" in mode:
461 return GridByteStream(f, self.fs, mode)
462 else:
463 return GridStringStream(f, self.fs, mode)
464 except errors.NoFile:
garciadeblas2644b762021-03-24 09:21:01 +0100465 raise FsException(
466 "File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND
467 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100468 except IOError:
garciadeblas2644b762021-03-24 09:21:01 +0100469 raise FsException(
470 "File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST
471 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100472
473 def dir_ls(self, storage):
474 """
475 return folder content
476 :param storage: can be a str or list of str
477 :return: folder content
478 """
479 try:
480 f = storage if isinstance(storage, str) else "/".join(storage)
beierlm40f57c82022-04-14 14:45:07 -0400481 f = f.rstrip("/")
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100482
483 files = []
484 dir_cursor = self.fs.find({"filename": f})
485 for requested_dir in dir_cursor:
486 exception_dir = next(dir_cursor, None)
487
488 if exception_dir:
garciadeblas2644b762021-03-24 09:21:01 +0100489 raise FsException(
490 "Multiple directories found",
491 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
492 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100493
494 if requested_dir.metadata["type"] != "dir":
garciadeblas2644b762021-03-24 09:21:01 +0100495 raise FsException(
496 "File {} does not exist".format(f),
497 http_code=HTTPStatus.NOT_FOUND,
498 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100499
bravof98fc8f02021-11-04 21:16:00 -0300500 if f.endswith("/"):
501 f = f[:-1]
502
garciadeblas2644b762021-03-24 09:21:01 +0100503 files_cursor = self.fs.find(
504 {"filename": {"$regex": "^{}/([^/])*".format(f)}}
505 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100506 for children_file in files_cursor:
garciadeblas2644b762021-03-24 09:21:01 +0100507 files += [children_file.filename.replace(f + "/", "", 1)]
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100508
509 return files
510 except IOError:
garciadeblas2644b762021-03-24 09:21:01 +0100511 raise FsException(
512 "File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST
513 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100514
515 def file_delete(self, storage, ignore_non_exist=False):
516 """
517 Delete storage content recursively
518 :param storage: can be a str or list of str
519 :param ignore_non_exist: not raise exception if storage does not exist
520 :return: None
521 """
522 try:
523 f = storage if isinstance(storage, str) else "/".join(storage)
beierlm40f57c82022-04-14 14:45:07 -0400524 f = f.rstrip("/")
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100525
526 file_cursor = self.fs.find({"filename": f})
527 found = False
528 for requested_file in file_cursor:
529 found = True
530 exception_file = next(file_cursor, None)
531
532 if exception_file:
beierlm3d82ba22022-04-19 14:12:50 -0400533 self.logger.error(
534 "Cannot delete duplicate file: {} and {}".format(
535 requested_file.filename, exception_file.filename
536 )
537 )
garciadeblas2644b762021-03-24 09:21:01 +0100538 raise FsException(
539 "Multiple files found",
540 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
541 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100542
543 if requested_file.metadata["type"] == "dir":
beierlm3d82ba22022-04-19 14:12:50 -0400544 dir_cursor = self.fs.find(
545 {"filename": {"$regex": "^{}/".format(f)}}
546 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100547
548 for tmp in dir_cursor:
beierlm3d82ba22022-04-19 14:12:50 -0400549 self.logger.debug("Deleting {}".format(tmp.filename))
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100550 self.fs.delete(tmp._id)
beierlm3d82ba22022-04-19 14:12:50 -0400551
552 self.logger.debug("Deleting {}".format(requested_file.filename))
553 self.fs.delete(requested_file._id)
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100554 if not found and not ignore_non_exist:
garciadeblas2644b762021-03-24 09:21:01 +0100555 raise FsException(
556 "File {} does not exist".format(storage),
557 http_code=HTTPStatus.NOT_FOUND,
558 )
Eduardo Sousa0593aba2019-06-04 12:55:43 +0100559 except IOError as e:
garciadeblas2644b762021-03-24 09:21:01 +0100560 raise FsException(
561 "File {} cannot be deleted: {}".format(f, e),
562 http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
563 )
David Garcia788b9d62020-01-20 13:21:06 +0100564
tiernob07e4ef2020-05-06 14:22:48 +0000565 def sync(self, from_path=None):
David Garcia788b9d62020-01-20 13:21:06 +0100566 """
567 Sync from FSMongo to local storage
tiernob07e4ef2020-05-06 14:22:48 +0000568 :param from_path: if supplied, only copy content from this path, not all
569 :return: None
David Garcia788b9d62020-01-20 13:21:06 +0100570 """
lloretgallegf296d2a2020-09-02 09:36:24 +0000571 if from_path:
572 if os.path.isabs(from_path):
573 from_path = os.path.relpath(from_path, self.path)
tiernob07e4ef2020-05-06 14:22:48 +0000574 self.__update_local_fs(from_path=from_path)
lloretgallegf296d2a2020-09-02 09:36:24 +0000575
576 def _update_mongo_fs(self, from_path):
577
578 os_path = self.path + from_path
579
580 # Obtain list of files and dirs in filesystem
581 members = []
582 for root, dirs, files in os.walk(os_path):
583 for folder in dirs:
garciadeblas2644b762021-03-24 09:21:01 +0100584 member = {"filename": os.path.join(root, folder), "type": "dir"}
beierlm945fa222022-04-04 11:44:03 -0400585 if os.path.islink(member["filename"]):
586 member["type"] = "sym"
lloretgallegf296d2a2020-09-02 09:36:24 +0000587 members.append(member)
588 for file in files:
589 filename = os.path.join(root, file)
590 if os.path.islink(filename):
591 file_type = "sym"
592 else:
593 file_type = "file"
garciadeblas2644b762021-03-24 09:21:01 +0100594 member = {"filename": os.path.join(root, file), "type": file_type}
lloretgallegf296d2a2020-09-02 09:36:24 +0000595 members.append(member)
596
597 # Obtain files in mongo dict
598 remote_files = self._get_mongo_files(from_path)
599
600 # Upload members if they do not exists or have been modified
601 # We will do this for performance (avoid updating unmodified files) and to avoid
602 # updating a file with an older one in case there are two sources for synchronization
603 # in high availability scenarios
604 for member in members:
605 # obtain permission
606 mask = int(oct(os.stat(member["filename"]).st_mode)[-3:], 8)
607
608 # convert to relative path
609 rel_filename = os.path.relpath(member["filename"], self.path)
garciadeblas2644b762021-03-24 09:21:01 +0100610 last_modified_date = datetime.datetime.fromtimestamp(
611 os.path.getmtime(member["filename"])
612 )
lloretgallegf296d2a2020-09-02 09:36:24 +0000613
614 remote_file = remote_files.get(rel_filename)
garciadeblas2644b762021-03-24 09:21:01 +0100615 upload_date = (
616 remote_file[0].uploadDate if remote_file else datetime.datetime.min
617 )
lloretgallegf296d2a2020-09-02 09:36:24 +0000618 # remove processed files from dict
619 remote_files.pop(rel_filename, None)
620
621 if last_modified_date >= upload_date:
622
623 stream = None
624 fh = None
625 try:
626 file_type = member["type"]
627 if file_type == "dir":
628 stream = BytesIO()
629 elif file_type == "sym":
garciadeblas2644b762021-03-24 09:21:01 +0100630 stream = BytesIO(
631 os.readlink(member["filename"]).encode("utf-8")
632 )
lloretgallegf296d2a2020-09-02 09:36:24 +0000633 else:
634 fh = open(member["filename"], "rb")
635 stream = BytesIO(fh.read())
636
garciadeblas2644b762021-03-24 09:21:01 +0100637 metadata = {"type": file_type, "permissions": mask}
lloretgallegf296d2a2020-09-02 09:36:24 +0000638
beierlm3d82ba22022-04-19 14:12:50 -0400639 self.logger.debug("Sync upload {}".format(rel_filename))
garciadeblas2644b762021-03-24 09:21:01 +0100640 self.fs.upload_from_stream(rel_filename, stream, metadata=metadata)
lloretgallegf296d2a2020-09-02 09:36:24 +0000641
642 # delete old files
643 if remote_file:
644 for file in remote_file:
beierlm3d82ba22022-04-19 14:12:50 -0400645 self.logger.debug("Sync deleting {}".format(file.filename))
lloretgallegf296d2a2020-09-02 09:36:24 +0000646 self.fs.delete(file._id)
647 finally:
648 if fh:
649 fh.close()
650 if stream:
651 stream.close()
652
653 # delete files that are not any more in local fs
654 for remote_file in remote_files.values():
655 for file in remote_file:
656 self.fs.delete(file._id)
657
658 def _get_mongo_files(self, from_path=None):
659
660 file_dict = {}
garciadeblas2644b762021-03-24 09:21:01 +0100661 file_cursor = self.fs.find(no_cursor_timeout=True, sort=[("uploadDate", -1)])
lloretgallegf296d2a2020-09-02 09:36:24 +0000662 for file in file_cursor:
663 if from_path and not file.filename.startswith(from_path):
664 continue
665 if file.filename in file_dict:
666 file_dict[file.filename].append(file)
667 else:
668 file_dict[file.filename] = [file]
669 return file_dict
670
671 def reverse_sync(self, from_path: str):
672 """
673 Sync from local storage to FSMongo
674 :param from_path: base directory to upload content to mongo fs
675 :return: None
676 """
677 if os.path.isabs(from_path):
678 from_path = os.path.relpath(from_path, self.path)
679 self._update_mongo_fs(from_path=from_path)