1 |
|
# Copyright 2019 Canonical |
2 |
|
# |
3 |
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may |
4 |
|
# not use this file except in compliance with the License. You may obtain |
5 |
|
# a copy of the License at |
6 |
|
# |
7 |
|
# http://www.apache.org/licenses/LICENSE-2.0 |
8 |
|
# |
9 |
|
# Unless required by applicable law or agreed to in writing, software |
10 |
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
11 |
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
12 |
|
# License for the specific language governing permissions and limitations |
13 |
|
# under the License. |
14 |
|
# |
15 |
|
# For those usages not covered by the Apache License, Version 2.0 please |
16 |
|
# contact: eduardo.sousa@canonical.com |
17 |
|
## |
18 |
1 |
import datetime |
19 |
1 |
import errno |
20 |
1 |
from http import HTTPStatus |
21 |
1 |
from io import BytesIO, StringIO |
22 |
1 |
import logging |
23 |
1 |
import os |
24 |
1 |
import tarfile |
25 |
1 |
import zipfile |
26 |
|
|
27 |
1 |
from gridfs import errors, GridFSBucket |
28 |
1 |
from osm_common.fsbase import FsBase, FsException |
29 |
1 |
from pymongo import MongoClient |
30 |
|
|
31 |
|
|
32 |
1 |
__author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>" |
33 |
|
|
34 |
|
|
35 |
1 |
class GridByteStream(BytesIO): |
36 |
1 |
def __init__(self, filename, fs, mode): |
37 |
0 |
BytesIO.__init__(self) |
38 |
0 |
self._id = None |
39 |
0 |
self.filename = filename |
40 |
0 |
self.fs = fs |
41 |
0 |
self.mode = mode |
42 |
0 |
self.file_type = "file" # Set "file" as default file_type |
43 |
|
|
44 |
0 |
self.__initialize__() |
45 |
|
|
46 |
1 |
def __initialize__(self): |
47 |
0 |
grid_file = None |
48 |
|
|
49 |
0 |
cursor = self.fs.find({"filename": self.filename}) |
50 |
|
|
51 |
0 |
for requested_file in cursor: |
52 |
0 |
exception_file = next(cursor, None) |
53 |
|
|
54 |
0 |
if exception_file: |
55 |
0 |
raise FsException( |
56 |
|
"Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR |
57 |
|
) |
58 |
|
|
59 |
0 |
if requested_file.metadata["type"] in ("file", "sym"): |
60 |
0 |
grid_file = requested_file |
61 |
0 |
self.file_type = requested_file.metadata["type"] |
62 |
|
else: |
63 |
0 |
raise FsException( |
64 |
|
"Type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR |
65 |
|
) |
66 |
|
|
67 |
0 |
if grid_file: |
68 |
0 |
self._id = grid_file._id |
69 |
0 |
self.fs.download_to_stream(self._id, self) |
70 |
|
|
71 |
0 |
if "r" in self.mode: |
72 |
0 |
self.seek(0, 0) |
73 |
|
|
74 |
1 |
def close(self): |
75 |
0 |
if "r" in self.mode: |
76 |
0 |
super(GridByteStream, self).close() |
77 |
0 |
return |
78 |
|
|
79 |
0 |
if self._id: |
80 |
0 |
self.fs.delete(self._id) |
81 |
|
|
82 |
0 |
cursor = self.fs.find( |
83 |
|
{"filename": self.filename.split("/")[0], "metadata": {"type": "dir"}} |
84 |
|
) |
85 |
|
|
86 |
0 |
parent_dir = next(cursor, None) |
87 |
|
|
88 |
0 |
if not parent_dir: |
89 |
0 |
parent_dir_name = self.filename.split("/")[0] |
90 |
0 |
self.filename = self.filename.replace( |
91 |
|
parent_dir_name, parent_dir_name[:-1], 1 |
92 |
|
) |
93 |
|
|
94 |
0 |
self.seek(0, 0) |
95 |
0 |
if self._id: |
96 |
0 |
self.fs.upload_from_stream_with_id( |
97 |
|
self._id, self.filename, self, metadata={"type": self.file_type} |
98 |
|
) |
99 |
|
else: |
100 |
0 |
self.fs.upload_from_stream( |
101 |
|
self.filename, self, metadata={"type": self.file_type} |
102 |
|
) |
103 |
0 |
super(GridByteStream, self).close() |
104 |
|
|
105 |
1 |
def __enter__(self): |
106 |
0 |
return self |
107 |
|
|
108 |
1 |
def __exit__(self, exc_type, exc_val, exc_tb): |
109 |
0 |
self.close() |
110 |
|
|
111 |
|
|
112 |
1 |
class GridStringStream(StringIO): |
113 |
1 |
def __init__(self, filename, fs, mode): |
114 |
0 |
StringIO.__init__(self) |
115 |
0 |
self._id = None |
116 |
0 |
self.filename = filename |
117 |
0 |
self.fs = fs |
118 |
0 |
self.mode = mode |
119 |
0 |
self.file_type = "file" # Set "file" as default file_type |
120 |
|
|
121 |
0 |
self.__initialize__() |
122 |
|
|
123 |
1 |
def __initialize__(self): |
124 |
0 |
grid_file = None |
125 |
|
|
126 |
0 |
cursor = self.fs.find({"filename": self.filename}) |
127 |
|
|
128 |
0 |
for requested_file in cursor: |
129 |
0 |
exception_file = next(cursor, None) |
130 |
|
|
131 |
0 |
if exception_file: |
132 |
0 |
raise FsException( |
133 |
|
"Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR |
134 |
|
) |
135 |
|
|
136 |
0 |
if requested_file.metadata["type"] in ("file", "dir"): |
137 |
0 |
grid_file = requested_file |
138 |
0 |
self.file_type = requested_file.metadata["type"] |
139 |
|
else: |
140 |
0 |
raise FsException( |
141 |
|
"File type isn't file", http_code=HTTPStatus.INTERNAL_SERVER_ERROR |
142 |
|
) |
143 |
|
|
144 |
0 |
if grid_file: |
145 |
0 |
stream = BytesIO() |
146 |
0 |
self._id = grid_file._id |
147 |
0 |
self.fs.download_to_stream(self._id, stream) |
148 |
0 |
stream.seek(0) |
149 |
0 |
self.write(stream.read().decode("utf-8")) |
150 |
0 |
stream.close() |
151 |
|
|
152 |
0 |
if "r" in self.mode: |
153 |
0 |
self.seek(0, 0) |
154 |
|
|
155 |
1 |
def close(self): |
156 |
0 |
if "r" in self.mode: |
157 |
0 |
super(GridStringStream, self).close() |
158 |
0 |
return |
159 |
|
|
160 |
0 |
if self._id: |
161 |
0 |
self.fs.delete(self._id) |
162 |
|
|
163 |
0 |
cursor = self.fs.find( |
164 |
|
{"filename": self.filename.split("/")[0], "metadata": {"type": "dir"}} |
165 |
|
) |
166 |
|
|
167 |
0 |
parent_dir = next(cursor, None) |
168 |
|
|
169 |
0 |
if not parent_dir: |
170 |
0 |
parent_dir_name = self.filename.split("/")[0] |
171 |
0 |
self.filename = self.filename.replace( |
172 |
|
parent_dir_name, parent_dir_name[:-1], 1 |
173 |
|
) |
174 |
|
|
175 |
0 |
self.seek(0, 0) |
176 |
0 |
stream = BytesIO() |
177 |
0 |
stream.write(self.read().encode("utf-8")) |
178 |
0 |
stream.seek(0, 0) |
179 |
0 |
if self._id: |
180 |
0 |
self.fs.upload_from_stream_with_id( |
181 |
|
self._id, self.filename, stream, metadata={"type": self.file_type} |
182 |
|
) |
183 |
|
else: |
184 |
0 |
self.fs.upload_from_stream( |
185 |
|
self.filename, stream, metadata={"type": self.file_type} |
186 |
|
) |
187 |
0 |
stream.close() |
188 |
0 |
super(GridStringStream, self).close() |
189 |
|
|
190 |
1 |
def __enter__(self): |
191 |
0 |
return self |
192 |
|
|
193 |
1 |
def __exit__(self, exc_type, exc_val, exc_tb): |
194 |
0 |
self.close() |
195 |
|
|
196 |
|
|
197 |
1 |
class FsMongo(FsBase): |
198 |
1 |
def __init__(self, logger_name="fs", lock=False): |
199 |
1 |
super().__init__(logger_name, lock) |
200 |
1 |
self.path = None |
201 |
1 |
self.client = None |
202 |
1 |
self.fs = None |
203 |
|
|
204 |
1 |
def __update_local_fs(self, from_path=None): |
205 |
1 |
dir_cursor = self.fs.find({"metadata.type": "dir"}, no_cursor_timeout=True) |
206 |
|
|
207 |
1 |
valid_paths = [] |
208 |
|
|
209 |
1 |
for directory in dir_cursor: |
210 |
1 |
if from_path and not directory.filename.startswith(from_path): |
211 |
0 |
continue |
212 |
1 |
self.logger.debug("Making dir {}".format(self.path + directory.filename)) |
213 |
1 |
os.makedirs(self.path + directory.filename, exist_ok=True) |
214 |
1 |
valid_paths.append(self.path + directory.filename) |
215 |
|
|
216 |
1 |
file_cursor = self.fs.find( |
217 |
|
{"metadata.type": {"$in": ["file", "sym"]}}, no_cursor_timeout=True |
218 |
|
) |
219 |
|
|
220 |
1 |
for writing_file in file_cursor: |
221 |
1 |
if from_path and not writing_file.filename.startswith(from_path): |
222 |
0 |
continue |
223 |
1 |
file_path = self.path + writing_file.filename |
224 |
|
|
225 |
1 |
if writing_file.metadata["type"] == "sym": |
226 |
1 |
with BytesIO() as b: |
227 |
1 |
self.fs.download_to_stream(writing_file._id, b) |
228 |
1 |
b.seek(0) |
229 |
1 |
link = b.read().decode("utf-8") |
230 |
|
|
231 |
1 |
try: |
232 |
1 |
self.logger.debug("Sync removing {}".format(file_path)) |
233 |
1 |
os.remove(file_path) |
234 |
1 |
except OSError as e: |
235 |
1 |
if e.errno != errno.ENOENT: |
236 |
|
# This is probably permission denied or worse |
237 |
0 |
raise |
238 |
1 |
os.symlink(link, file_path) |
239 |
|
else: |
240 |
1 |
folder = os.path.dirname(file_path) |
241 |
1 |
if folder not in valid_paths: |
242 |
0 |
self.logger.debug("Sync local directory {}".format(file_path)) |
243 |
0 |
os.makedirs(folder, exist_ok=True) |
244 |
1 |
with open(file_path, "wb+") as file_stream: |
245 |
1 |
self.logger.debug("Sync download {}".format(file_path)) |
246 |
1 |
self.fs.download_to_stream(writing_file._id, file_stream) |
247 |
1 |
if "permissions" in writing_file.metadata: |
248 |
1 |
os.chmod(file_path, writing_file.metadata["permissions"]) |
249 |
|
|
250 |
1 |
def get_params(self): |
251 |
0 |
return {"fs": "mongo", "path": self.path} |
252 |
|
|
253 |
1 |
def fs_connect(self, config): |
254 |
0 |
try: |
255 |
0 |
if "logger_name" in config: |
256 |
0 |
self.logger = logging.getLogger(config["logger_name"]) |
257 |
0 |
if "path" in config: |
258 |
0 |
self.path = config["path"] |
259 |
|
else: |
260 |
0 |
raise FsException('Missing parameter "path"') |
261 |
0 |
if not self.path.endswith("/"): |
262 |
0 |
self.path += "/" |
263 |
0 |
if not os.path.exists(self.path): |
264 |
0 |
raise FsException( |
265 |
|
"Invalid configuration param at '[storage]': path '{}' does not exist".format( |
266 |
|
config["path"] |
267 |
|
) |
268 |
|
) |
269 |
0 |
elif not os.access(self.path, os.W_OK): |
270 |
0 |
raise FsException( |
271 |
|
"Invalid configuration param at '[storage]': path '{}' is not writable".format( |
272 |
|
config["path"] |
273 |
|
) |
274 |
|
) |
275 |
0 |
if all(key in config.keys() for key in ["uri", "collection"]): |
276 |
0 |
self.client = MongoClient(config["uri"]) |
277 |
0 |
self.fs = GridFSBucket(self.client[config["collection"]]) |
278 |
|
else: |
279 |
0 |
if "collection" not in config.keys(): |
280 |
0 |
raise FsException('Missing parameter "collection"') |
281 |
|
else: |
282 |
0 |
raise FsException('Missing parameters: "uri"') |
283 |
0 |
except FsException: |
284 |
0 |
raise |
285 |
0 |
except Exception as e: # TODO refine |
286 |
0 |
raise FsException(str(e)) |
287 |
|
|
288 |
1 |
def fs_disconnect(self): |
289 |
0 |
pass # TODO |
290 |
|
|
291 |
1 |
def mkdir(self, folder): |
292 |
|
""" |
293 |
|
Creates a folder or parent object location |
294 |
|
:param folder: |
295 |
|
:return: None or raises an exception |
296 |
|
""" |
297 |
0 |
folder = folder.rstrip("/") |
298 |
0 |
try: |
299 |
0 |
self.fs.upload_from_stream(folder, BytesIO(), metadata={"type": "dir"}) |
300 |
0 |
except errors.FileExists: # make it idempotent |
301 |
0 |
pass |
302 |
0 |
except Exception as e: |
303 |
0 |
raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR) |
304 |
|
|
305 |
1 |
def dir_rename(self, src, dst): |
306 |
|
""" |
307 |
|
Rename one directory name. If dst exist, it replaces (deletes) existing directory |
308 |
|
:param src: source directory |
309 |
|
:param dst: destination directory |
310 |
|
:return: None or raises and exception |
311 |
|
""" |
312 |
0 |
dst = dst.rstrip("/") |
313 |
0 |
src = src.rstrip("/") |
314 |
|
|
315 |
0 |
try: |
316 |
0 |
dst_cursor = self.fs.find( |
317 |
|
{"filename": {"$regex": "^{}(/|$)".format(dst)}}, no_cursor_timeout=True |
318 |
|
) |
319 |
|
|
320 |
0 |
for dst_file in dst_cursor: |
321 |
0 |
self.fs.delete(dst_file._id) |
322 |
|
|
323 |
0 |
src_cursor = self.fs.find( |
324 |
|
{"filename": {"$regex": "^{}(/|$)".format(src)}}, no_cursor_timeout=True |
325 |
|
) |
326 |
|
|
327 |
0 |
for src_file in src_cursor: |
328 |
0 |
self.fs.rename(src_file._id, src_file.filename.replace(src, dst, 1)) |
329 |
0 |
except Exception as e: |
330 |
0 |
raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR) |
331 |
|
|
332 |
1 |
def file_exists(self, storage, mode=None): |
333 |
|
""" |
334 |
|
Indicates if "storage" file exist |
335 |
|
:param storage: can be a str or a str list |
336 |
|
:param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists |
337 |
|
:return: True, False |
338 |
|
""" |
339 |
0 |
f = storage if isinstance(storage, str) else "/".join(storage) |
340 |
0 |
f = f.rstrip("/") |
341 |
|
|
342 |
0 |
cursor = self.fs.find({"filename": f}) |
343 |
|
|
344 |
0 |
for requested_file in cursor: |
345 |
0 |
exception_file = next(cursor, None) |
346 |
|
|
347 |
0 |
if exception_file: |
348 |
0 |
raise FsException( |
349 |
|
"Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR |
350 |
|
) |
351 |
|
|
352 |
0 |
self.logger.debug("Entry {} metadata {}".format(f, requested_file.metadata)) |
353 |
|
|
354 |
|
# if no special mode is required just check it does exists |
355 |
0 |
if not mode: |
356 |
0 |
return True |
357 |
|
|
358 |
0 |
if requested_file.metadata["type"] == mode: |
359 |
0 |
return True |
360 |
|
|
361 |
0 |
if requested_file.metadata["type"] == "sym" and mode == "file": |
362 |
0 |
return True |
363 |
|
|
364 |
0 |
return False |
365 |
|
|
366 |
1 |
def file_size(self, storage): |
367 |
|
""" |
368 |
|
return file size |
369 |
|
:param storage: can be a str or a str list |
370 |
|
:return: file size |
371 |
|
""" |
372 |
0 |
f = storage if isinstance(storage, str) else "/".join(storage) |
373 |
0 |
f = f.rstrip("/") |
374 |
|
|
375 |
0 |
cursor = self.fs.find({"filename": f}) |
376 |
|
|
377 |
0 |
for requested_file in cursor: |
378 |
0 |
exception_file = next(cursor, None) |
379 |
|
|
380 |
0 |
if exception_file: |
381 |
0 |
raise FsException( |
382 |
|
"Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR |
383 |
|
) |
384 |
|
|
385 |
0 |
return requested_file.length |
386 |
|
|
387 |
1 |
def file_extract(self, compressed_object, path): |
388 |
|
""" |
389 |
|
extract a tar file |
390 |
|
:param compressed_object: object of type tar or zip |
391 |
|
:param path: can be a str or a str list, or a tar object where to extract the tar_object |
392 |
|
:return: None |
393 |
|
""" |
394 |
1 |
f = path if isinstance(path, str) else "/".join(path) |
395 |
1 |
f = f.rstrip("/") |
396 |
|
|
397 |
1 |
if type(compressed_object) is tarfile.TarFile: |
398 |
1 |
for member in compressed_object.getmembers(): |
399 |
1 |
if member.isfile(): |
400 |
1 |
stream = compressed_object.extractfile(member) |
401 |
1 |
elif member.issym(): |
402 |
1 |
stream = BytesIO(member.linkname.encode("utf-8")) |
403 |
|
else: |
404 |
1 |
stream = BytesIO() |
405 |
|
|
406 |
1 |
if member.isfile(): |
407 |
1 |
file_type = "file" |
408 |
1 |
elif member.issym(): |
409 |
1 |
file_type = "sym" |
410 |
|
else: |
411 |
1 |
file_type = "dir" |
412 |
|
|
413 |
1 |
metadata = {"type": file_type, "permissions": member.mode} |
414 |
1 |
member.name = member.name.rstrip("/") |
415 |
|
|
416 |
1 |
self.logger.debug("Uploading {}/{}".format(f, member.name)) |
417 |
1 |
self.fs.upload_from_stream( |
418 |
|
f + "/" + member.name, stream, metadata=metadata |
419 |
|
) |
420 |
|
|
421 |
1 |
stream.close() |
422 |
0 |
elif type(compressed_object) is zipfile.ZipFile: |
423 |
0 |
for member in compressed_object.infolist(): |
424 |
0 |
if member.is_dir(): |
425 |
0 |
stream = BytesIO() |
426 |
|
else: |
427 |
0 |
stream = compressed_object.read(member) |
428 |
|
|
429 |
0 |
if member.is_dir(): |
430 |
0 |
file_type = "dir" |
431 |
|
else: |
432 |
0 |
file_type = "file" |
433 |
|
|
434 |
0 |
metadata = {"type": file_type} |
435 |
0 |
member.filename = member.filename.rstrip("/") |
436 |
|
|
437 |
0 |
self.logger.debug("Uploading {}/{}".format(f, member.filename)) |
438 |
0 |
self.fs.upload_from_stream( |
439 |
|
f + "/" + member.filename, stream, metadata=metadata |
440 |
|
) |
441 |
|
|
442 |
0 |
if member.is_dir(): |
443 |
0 |
stream.close() |
444 |
|
|
445 |
1 |
def file_open(self, storage, mode): |
446 |
|
""" |
447 |
|
Open a file |
448 |
|
:param storage: can be a str or list of str |
449 |
|
:param mode: file mode |
450 |
|
:return: file object |
451 |
|
""" |
452 |
0 |
try: |
453 |
0 |
f = storage if isinstance(storage, str) else "/".join(storage) |
454 |
0 |
f = f.rstrip("/") |
455 |
|
|
456 |
0 |
if "b" in mode: |
457 |
0 |
return GridByteStream(f, self.fs, mode) |
458 |
|
else: |
459 |
0 |
return GridStringStream(f, self.fs, mode) |
460 |
0 |
except errors.NoFile: |
461 |
0 |
raise FsException( |
462 |
|
"File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND |
463 |
|
) |
464 |
0 |
except IOError: |
465 |
0 |
raise FsException( |
466 |
|
"File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST |
467 |
|
) |
468 |
|
|
469 |
1 |
def dir_ls(self, storage): |
470 |
|
""" |
471 |
|
return folder content |
472 |
|
:param storage: can be a str or list of str |
473 |
|
:return: folder content |
474 |
|
""" |
475 |
0 |
try: |
476 |
0 |
f = storage if isinstance(storage, str) else "/".join(storage) |
477 |
0 |
f = f.rstrip("/") |
478 |
|
|
479 |
0 |
files = [] |
480 |
0 |
dir_cursor = self.fs.find({"filename": f}) |
481 |
0 |
for requested_dir in dir_cursor: |
482 |
0 |
exception_dir = next(dir_cursor, None) |
483 |
|
|
484 |
0 |
if exception_dir: |
485 |
0 |
raise FsException( |
486 |
|
"Multiple directories found", |
487 |
|
http_code=HTTPStatus.INTERNAL_SERVER_ERROR, |
488 |
|
) |
489 |
|
|
490 |
0 |
if requested_dir.metadata["type"] != "dir": |
491 |
0 |
raise FsException( |
492 |
|
"File {} does not exist".format(f), |
493 |
|
http_code=HTTPStatus.NOT_FOUND, |
494 |
|
) |
495 |
|
|
496 |
0 |
if f.endswith("/"): |
497 |
0 |
f = f[:-1] |
498 |
|
|
499 |
0 |
files_cursor = self.fs.find( |
500 |
|
{"filename": {"$regex": "^{}/([^/])*".format(f)}} |
501 |
|
) |
502 |
0 |
for children_file in files_cursor: |
503 |
0 |
files += [children_file.filename.replace(f + "/", "", 1)] |
504 |
|
|
505 |
0 |
return files |
506 |
0 |
except IOError: |
507 |
0 |
raise FsException( |
508 |
|
"File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST |
509 |
|
) |
510 |
|
|
511 |
1 |
def file_delete(self, storage, ignore_non_exist=False): |
512 |
|
""" |
513 |
|
Delete storage content recursively |
514 |
|
:param storage: can be a str or list of str |
515 |
|
:param ignore_non_exist: not raise exception if storage does not exist |
516 |
|
:return: None |
517 |
|
""" |
518 |
0 |
try: |
519 |
0 |
f = storage if isinstance(storage, str) else "/".join(storage) |
520 |
0 |
f = f.rstrip("/") |
521 |
|
|
522 |
0 |
file_cursor = self.fs.find({"filename": f}) |
523 |
0 |
found = False |
524 |
0 |
for requested_file in file_cursor: |
525 |
0 |
found = True |
526 |
0 |
exception_file = next(file_cursor, None) |
527 |
|
|
528 |
0 |
if exception_file: |
529 |
0 |
self.logger.error( |
530 |
|
"Cannot delete duplicate file: {} and {}".format( |
531 |
|
requested_file.filename, exception_file.filename |
532 |
|
) |
533 |
|
) |
534 |
0 |
raise FsException( |
535 |
|
"Multiple files found", |
536 |
|
http_code=HTTPStatus.INTERNAL_SERVER_ERROR, |
537 |
|
) |
538 |
|
|
539 |
0 |
if requested_file.metadata["type"] == "dir": |
540 |
0 |
dir_cursor = self.fs.find( |
541 |
|
{"filename": {"$regex": "^{}/".format(f)}} |
542 |
|
) |
543 |
|
|
544 |
0 |
for tmp in dir_cursor: |
545 |
0 |
self.logger.debug("Deleting {}".format(tmp.filename)) |
546 |
0 |
self.fs.delete(tmp._id) |
547 |
|
|
548 |
0 |
self.logger.debug("Deleting {}".format(requested_file.filename)) |
549 |
0 |
self.fs.delete(requested_file._id) |
550 |
0 |
if not found and not ignore_non_exist: |
551 |
0 |
raise FsException( |
552 |
|
"File {} does not exist".format(storage), |
553 |
|
http_code=HTTPStatus.NOT_FOUND, |
554 |
|
) |
555 |
0 |
except IOError as e: |
556 |
0 |
raise FsException( |
557 |
|
"File {} cannot be deleted: {}".format(f, e), |
558 |
|
http_code=HTTPStatus.INTERNAL_SERVER_ERROR, |
559 |
|
) |
560 |
|
|
561 |
1 |
def sync(self, from_path=None): |
562 |
|
""" |
563 |
|
Sync from FSMongo to local storage |
564 |
|
:param from_path: if supplied, only copy content from this path, not all |
565 |
|
:return: None |
566 |
|
""" |
567 |
1 |
if from_path: |
568 |
0 |
if os.path.isabs(from_path): |
569 |
0 |
from_path = os.path.relpath(from_path, self.path) |
570 |
1 |
self.__update_local_fs(from_path=from_path) |
571 |
|
|
572 |
1 |
def _update_mongo_fs(self, from_path): |
573 |
1 |
os_path = self.path + from_path |
574 |
|
|
575 |
|
# Obtain list of files and dirs in filesystem |
576 |
1 |
members = [] |
577 |
1 |
for root, dirs, files in os.walk(os_path): |
578 |
1 |
for folder in dirs: |
579 |
1 |
member = {"filename": os.path.join(root, folder), "type": "dir"} |
580 |
1 |
if os.path.islink(member["filename"]): |
581 |
0 |
member["type"] = "sym" |
582 |
1 |
members.append(member) |
583 |
1 |
for file in files: |
584 |
1 |
filename = os.path.join(root, file) |
585 |
1 |
if os.path.islink(filename): |
586 |
0 |
file_type = "sym" |
587 |
|
else: |
588 |
1 |
file_type = "file" |
589 |
1 |
member = {"filename": os.path.join(root, file), "type": file_type} |
590 |
1 |
members.append(member) |
591 |
|
|
592 |
|
# Obtain files in mongo dict |
593 |
1 |
remote_files = self._get_mongo_files(from_path) |
594 |
|
|
595 |
|
# Upload members if they do not exists or have been modified |
596 |
|
# We will do this for performance (avoid updating unmodified files) and to avoid |
597 |
|
# updating a file with an older one in case there are two sources for synchronization |
598 |
|
# in high availability scenarios |
599 |
1 |
for member in members: |
600 |
|
# obtain permission |
601 |
1 |
mask = int(oct(os.stat(member["filename"]).st_mode)[-3:], 8) |
602 |
|
|
603 |
|
# convert to relative path |
604 |
1 |
rel_filename = os.path.relpath(member["filename"], self.path) |
605 |
|
# get timestamp in UTC because mongo stores upload date in UTC: |
606 |
|
# https://www.mongodb.com/docs/v4.0/tutorial/model-time-data/#overview |
607 |
1 |
last_modified_date = datetime.datetime.utcfromtimestamp( |
608 |
|
os.path.getmtime(member["filename"]) |
609 |
|
) |
610 |
|
|
611 |
1 |
remote_file = remote_files.get(rel_filename) |
612 |
1 |
upload_date = ( |
613 |
|
remote_file[0].uploadDate if remote_file else datetime.datetime.min |
614 |
|
) |
615 |
|
# remove processed files from dict |
616 |
1 |
remote_files.pop(rel_filename, None) |
617 |
|
|
618 |
1 |
if last_modified_date >= upload_date: |
619 |
1 |
stream = None |
620 |
1 |
fh = None |
621 |
1 |
try: |
622 |
1 |
file_type = member["type"] |
623 |
1 |
if file_type == "dir": |
624 |
1 |
stream = BytesIO() |
625 |
1 |
elif file_type == "sym": |
626 |
0 |
stream = BytesIO( |
627 |
|
os.readlink(member["filename"]).encode("utf-8") |
628 |
|
) |
629 |
|
else: |
630 |
1 |
fh = open(member["filename"], "rb") |
631 |
1 |
stream = BytesIO(fh.read()) |
632 |
|
|
633 |
1 |
metadata = {"type": file_type, "permissions": mask} |
634 |
|
|
635 |
1 |
self.logger.debug("Sync upload {}".format(rel_filename)) |
636 |
1 |
self.fs.upload_from_stream(rel_filename, stream, metadata=metadata) |
637 |
|
|
638 |
|
# delete old files |
639 |
1 |
if remote_file: |
640 |
0 |
for file in remote_file: |
641 |
0 |
self.logger.debug("Sync deleting {}".format(file.filename)) |
642 |
0 |
self.fs.delete(file._id) |
643 |
|
finally: |
644 |
1 |
if fh: |
645 |
1 |
fh.close() |
646 |
1 |
if stream: |
647 |
1 |
stream.close() |
648 |
|
|
649 |
|
# delete files that are not any more in local fs |
650 |
1 |
for remote_file in remote_files.values(): |
651 |
0 |
for file in remote_file: |
652 |
0 |
self.fs.delete(file._id) |
653 |
|
|
654 |
1 |
def _get_mongo_files(self, from_path=None): |
655 |
1 |
file_dict = {} |
656 |
1 |
file_cursor = self.fs.find(no_cursor_timeout=True, sort=[("uploadDate", -1)]) |
657 |
1 |
for file in file_cursor: |
658 |
0 |
if from_path and not file.filename.startswith(from_path): |
659 |
0 |
continue |
660 |
0 |
if file.filename in file_dict: |
661 |
0 |
file_dict[file.filename].append(file) |
662 |
|
else: |
663 |
0 |
file_dict[file.filename] = [file] |
664 |
1 |
return file_dict |
665 |
|
|
666 |
1 |
def reverse_sync(self, from_path: str): |
667 |
|
""" |
668 |
|
Sync from local storage to FSMongo |
669 |
|
:param from_path: base directory to upload content to mongo fs |
670 |
|
:return: None |
671 |
|
""" |
672 |
1 |
if os.path.isabs(from_path): |
673 |
0 |
from_path = os.path.relpath(from_path, self.path) |
674 |
1 |
self._update_mongo_fs(from_path=from_path) |