Added reverse sync
[osm/common.git] / osm_common / tests / test_fsmongo.py
1 # Copyright 2019 Canonical
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
13 # under the License.
14 #
15 # For those usages not covered by the Apache License, Version 2.0 please
16 # contact: eduardo.sousa@canonical.com
17 ##
18
19 import logging
20 import pytest
21 import tempfile
22 import tarfile
23 import os
24 import subprocess
25
26 from pymongo import MongoClient
27 from gridfs import GridFSBucket
28
29 from io import BytesIO
30
31 from unittest.mock import Mock
32
33 from osm_common.fsbase import FsException
34 from osm_common.fsmongo import FsMongo
35 from pathlib import Path
36
37 __author__ = "Eduardo Sousa <eduardo.sousa@canonical.com>"
38
39
40 def valid_path():
41 return tempfile.gettempdir() + '/'
42
43
44 def invalid_path():
45 return '/#tweeter/'
46
47
48 @pytest.fixture(scope="function", params=[True, False])
49 def fs_mongo(request, monkeypatch):
50 def mock_mongoclient_constructor(a, b, c):
51 pass
52
53 def mock_mongoclient_getitem(a, b):
54 pass
55
56 def mock_gridfs_constructor(a, b):
57 pass
58
59 monkeypatch.setattr(MongoClient, '__init__', mock_mongoclient_constructor)
60 monkeypatch.setattr(MongoClient, '__getitem__', mock_mongoclient_getitem)
61 monkeypatch.setattr(GridFSBucket, '__init__', mock_gridfs_constructor)
62 fs = FsMongo(lock=request.param)
63 fs.fs_connect({
64 'path': valid_path(),
65 'host': 'mongo',
66 'port': 27017,
67 'collection': 'files'})
68 return fs
69
70
71 def generic_fs_exception_message(message):
72 return "storage exception {}".format(message)
73
74
75 def fs_connect_exception_message(path):
76 return "storage exception Invalid configuration param at '[storage]': path '{}' does not exist".format(path)
77
78
79 def file_open_file_not_found_exception(storage):
80 f = storage if isinstance(storage, str) else '/'.join(storage)
81 return "storage exception File {} does not exist".format(f)
82
83
84 def file_open_io_exception(storage):
85 f = storage if isinstance(storage, str) else '/'.join(storage)
86 return "storage exception File {} cannot be opened".format(f)
87
88
89 def dir_ls_not_a_directory_exception(storage):
90 f = storage if isinstance(storage, str) else '/'.join(storage)
91 return "storage exception File {} does not exist".format(f)
92
93
94 def dir_ls_io_exception(storage):
95 f = storage if isinstance(storage, str) else '/'.join(storage)
96 return "storage exception File {} cannot be opened".format(f)
97
98
99 def file_delete_exception_message(storage):
100 return "storage exception File {} does not exist".format(storage)
101
102
103 def test_constructor_without_logger():
104 fs = FsMongo()
105 assert fs.logger == logging.getLogger('fs')
106 assert fs.path is None
107 assert fs.client is None
108 assert fs.fs is None
109
110
111 def test_constructor_with_logger():
112 logger_name = 'fs_mongo'
113 fs = FsMongo(logger_name=logger_name)
114 assert fs.logger == logging.getLogger(logger_name)
115 assert fs.path is None
116 assert fs.client is None
117 assert fs.fs is None
118
119
120 def test_get_params(fs_mongo, monkeypatch):
121 def mock_gridfs_find(self, search_query, **kwargs):
122 return []
123
124 monkeypatch.setattr(GridFSBucket, 'find', mock_gridfs_find)
125 params = fs_mongo.get_params()
126 assert len(params) == 2
127 assert "fs" in params
128 assert "path" in params
129 assert params["fs"] == "mongo"
130 assert params["path"] == valid_path()
131
132
133 @pytest.mark.parametrize("config, exp_logger, exp_path", [
134 (
135 {
136 'logger_name': 'fs_mongo',
137 'path': valid_path(),
138 'uri': 'mongo:27017',
139 'collection': 'files'
140 },
141 'fs_mongo', valid_path()
142 ),
143 (
144 {
145 'logger_name': 'fs_mongo',
146 'path': valid_path(),
147 'host': 'mongo',
148 'port': 27017,
149 'collection': 'files'
150 },
151 'fs_mongo', valid_path()
152 ),
153 (
154 {
155 'logger_name': 'fs_mongo',
156 'path': valid_path()[:-1],
157 'uri': 'mongo:27017',
158 'collection': 'files'
159 },
160 'fs_mongo', valid_path()
161 ),
162 (
163 {
164 'logger_name': 'fs_mongo',
165 'path': valid_path()[:-1],
166 'host': 'mongo',
167 'port': 27017,
168 'collection': 'files'
169 },
170 'fs_mongo', valid_path()
171 ),
172 (
173 {
174 'path': valid_path(),
175 'uri': 'mongo:27017',
176 'collection': 'files'
177 },
178 'fs', valid_path()
179 ),
180 (
181 {
182 'path': valid_path(),
183 'host': 'mongo',
184 'port': 27017,
185 'collection': 'files'
186 },
187 'fs', valid_path()
188 ),
189 (
190 {
191 'path': valid_path()[:-1],
192 'uri': 'mongo:27017',
193 'collection': 'files'
194 },
195 'fs', valid_path()
196 ),
197 (
198 {
199 'path': valid_path()[:-1],
200 'host': 'mongo',
201 'port': 27017,
202 'collection': 'files'
203 },
204 'fs', valid_path()
205 )])
206 def test_fs_connect_with_valid_config(config, exp_logger, exp_path):
207 fs = FsMongo()
208 fs.fs_connect(config)
209 assert fs.logger == logging.getLogger(exp_logger)
210 assert fs.path == exp_path
211 assert type(fs.client) == MongoClient
212 assert type(fs.fs) == GridFSBucket
213
214
215 @pytest.mark.parametrize("config, exp_exception_message", [
216 (
217 {
218 'logger_name': 'fs_mongo',
219 'path': invalid_path(),
220 'uri': 'mongo:27017',
221 'collection': 'files'
222 },
223 fs_connect_exception_message(invalid_path())
224 ),
225 (
226 {
227 'logger_name': 'fs_mongo',
228 'path': invalid_path(),
229 'host': 'mongo',
230 'port': 27017,
231 'collection': 'files'
232 },
233 fs_connect_exception_message(invalid_path())
234 ),
235 (
236 {
237 'logger_name': 'fs_mongo',
238 'path': invalid_path()[:-1],
239 'uri': 'mongo:27017',
240 'collection': 'files'
241 },
242 fs_connect_exception_message(invalid_path()[:-1])
243 ),
244 (
245 {
246 'logger_name': 'fs_mongo',
247 'path': invalid_path()[:-1],
248 'host': 'mongo',
249 'port': 27017,
250 'collection': 'files'
251 },
252 fs_connect_exception_message(invalid_path()[:-1])
253 ),
254 (
255 {
256 'path': invalid_path(),
257 'uri': 'mongo:27017',
258 'collection': 'files'
259 },
260 fs_connect_exception_message(invalid_path())
261 ),
262 (
263 {
264 'path': invalid_path(),
265 'host': 'mongo',
266 'port': 27017,
267 'collection': 'files'
268 },
269 fs_connect_exception_message(invalid_path())
270 ),
271 (
272 {
273 'path': invalid_path()[:-1],
274 'uri': 'mongo:27017',
275 'collection': 'files'
276 },
277 fs_connect_exception_message(invalid_path()[:-1])
278 ),
279 (
280 {
281 'path': invalid_path()[:-1],
282 'host': 'mongo',
283 'port': 27017,
284 'collection': 'files'
285 },
286 fs_connect_exception_message(invalid_path()[:-1])
287 ),
288 (
289 {
290 'path': '/',
291 'host': 'mongo',
292 'port': 27017,
293 'collection': 'files'
294 },
295 generic_fs_exception_message(
296 "Invalid configuration param at '[storage]': path '/' is not writable"
297 )
298 )])
299 def test_fs_connect_with_invalid_path(config, exp_exception_message):
300 fs = FsMongo()
301 with pytest.raises(FsException) as excinfo:
302 fs.fs_connect(config)
303 assert str(excinfo.value) == exp_exception_message
304
305
306 @pytest.mark.parametrize("config, exp_exception_message", [
307 (
308 {
309 'logger_name': 'fs_mongo',
310 'uri': 'mongo:27017',
311 'collection': 'files'
312 },
313 "Missing parameter \"path\""
314 ),
315 (
316 {
317 'logger_name': 'fs_mongo',
318 'host': 'mongo',
319 'port': 27017,
320 'collection': 'files'
321 },
322 "Missing parameter \"path\""
323 ),
324 (
325 {
326 'logger_name': 'fs_mongo',
327 'path': valid_path(),
328 'collection': 'files'
329 },
330 "Missing parameters: \"uri\" or \"host\" + \"port\""
331 ),
332 (
333 {
334 'logger_name': 'fs_mongo',
335 'path': valid_path(),
336 'port': 27017,
337 'collection': 'files'
338 },
339 "Missing parameters: \"uri\" or \"host\" + \"port\""
340 ),
341 (
342 {
343 'logger_name': 'fs_mongo',
344 'path': valid_path(),
345 'host': 'mongo',
346 'collection': 'files'
347 },
348 "Missing parameters: \"uri\" or \"host\" + \"port\""
349 ),
350 (
351 {
352 'logger_name': 'fs_mongo',
353 'path': valid_path(),
354 'uri': 'mongo:27017'
355 },
356 "Missing parameter \"collection\""
357 ),
358 (
359 {
360 'logger_name': 'fs_mongo',
361 'path': valid_path(),
362 'host': 'mongo',
363 'port': 27017,
364 },
365 "Missing parameter \"collection\""
366 )])
367 def test_fs_connect_with_missing_parameters(config, exp_exception_message):
368 fs = FsMongo()
369 with pytest.raises(FsException) as excinfo:
370 fs.fs_connect(config)
371 assert str(excinfo.value) == generic_fs_exception_message(exp_exception_message)
372
373
374 @pytest.mark.parametrize("config, exp_exception_message", [
375 (
376 {
377 'logger_name': 'fs_mongo',
378 'path': valid_path(),
379 'uri': 'mongo:27017',
380 'collection': 'files'
381 },
382 "MongoClient crashed"
383 ),
384 (
385 {
386 'logger_name': 'fs_mongo',
387 'path': valid_path(),
388 'host': 'mongo',
389 'port': 27017,
390 'collection': 'files'
391 },
392 "MongoClient crashed"
393 )])
394 def test_fs_connect_with_invalid_mongoclient(config, exp_exception_message, monkeypatch):
395 def generate_exception(a, b, c=None):
396 raise Exception(exp_exception_message)
397
398 monkeypatch.setattr(MongoClient, '__init__', generate_exception)
399
400 fs = FsMongo()
401 with pytest.raises(FsException) as excinfo:
402 fs.fs_connect(config)
403 assert str(excinfo.value) == generic_fs_exception_message(exp_exception_message)
404
405
406 @pytest.mark.parametrize("config, exp_exception_message", [
407 (
408 {
409 'logger_name': 'fs_mongo',
410 'path': valid_path(),
411 'uri': 'mongo:27017',
412 'collection': 'files'
413 },
414 "Collection unavailable"
415 ),
416 (
417 {
418 'logger_name': 'fs_mongo',
419 'path': valid_path(),
420 'host': 'mongo',
421 'port': 27017,
422 'collection': 'files'
423 },
424 "Collection unavailable"
425 )])
426 def test_fs_connect_with_invalid_mongo_collection(config, exp_exception_message, monkeypatch):
427 def mock_mongoclient_constructor(a, b, c=None):
428 pass
429
430 def generate_exception(a, b):
431 raise Exception(exp_exception_message)
432
433 monkeypatch.setattr(MongoClient, '__init__', mock_mongoclient_constructor)
434 monkeypatch.setattr(MongoClient, '__getitem__', generate_exception)
435
436 fs = FsMongo()
437 with pytest.raises(FsException) as excinfo:
438 fs.fs_connect(config)
439 assert str(excinfo.value) == generic_fs_exception_message(exp_exception_message)
440
441
442 @pytest.mark.parametrize("config, exp_exception_message", [
443 (
444 {
445 'logger_name': 'fs_mongo',
446 'path': valid_path(),
447 'uri': 'mongo:27017',
448 'collection': 'files'
449 },
450 "GridFsBucket crashed"
451 ),
452 (
453 {
454 'logger_name': 'fs_mongo',
455 'path': valid_path(),
456 'host': 'mongo',
457 'port': 27017,
458 'collection': 'files'
459 },
460 "GridFsBucket crashed"
461 )])
462 def test_fs_connect_with_invalid_gridfsbucket(config, exp_exception_message, monkeypatch):
463 def mock_mongoclient_constructor(a, b, c=None):
464 pass
465
466 def mock_mongoclient_getitem(a, b):
467 pass
468
469 def generate_exception(a, b):
470 raise Exception(exp_exception_message)
471
472 monkeypatch.setattr(MongoClient, '__init__', mock_mongoclient_constructor)
473 monkeypatch.setattr(MongoClient, '__getitem__', mock_mongoclient_getitem)
474 monkeypatch.setattr(GridFSBucket, '__init__', generate_exception)
475
476 fs = FsMongo()
477 with pytest.raises(FsException) as excinfo:
478 fs.fs_connect(config)
479 assert str(excinfo.value) == generic_fs_exception_message(exp_exception_message)
480
481
482 def test_fs_disconnect(fs_mongo):
483 fs_mongo.fs_disconnect()
484
485
486 # Example.tar.gz
487 # example_tar/
488 # ├── directory
489 # │ └── file
490 # └── symlinks
491 # ├── directory_link -> ../directory/
492 # └── file_link -> ../directory/file
493 class FakeCursor:
494 def __init__(self, id, filename, metadata):
495 self._id = id
496 self.filename = filename
497 self.metadata = metadata
498
499
500 class FakeFS:
501 directory_metadata = {'type': 'dir', 'permissions': 509}
502 file_metadata = {'type': 'file', 'permissions': 436}
503 symlink_metadata = {'type': 'sym', 'permissions': 511}
504
505 tar_info = {
506 1: {
507 "cursor": FakeCursor(1, 'example_tar', directory_metadata),
508 "metadata": directory_metadata,
509 "stream_content": b'',
510 "stream_content_bad": b"Something",
511 "path": './tmp/example_tar',
512 },
513 2: {
514 "cursor": FakeCursor(2, 'example_tar/directory', directory_metadata),
515 "metadata": directory_metadata,
516 "stream_content": b'',
517 "stream_content_bad": b"Something",
518 "path": './tmp/example_tar/directory',
519 },
520 3: {
521 "cursor": FakeCursor(3, 'example_tar/symlinks', directory_metadata),
522 "metadata": directory_metadata,
523 "stream_content": b'',
524 "stream_content_bad": b"Something",
525 "path": './tmp/example_tar/symlinks',
526 },
527 4: {
528 "cursor": FakeCursor(4, 'example_tar/directory/file', file_metadata),
529 "metadata": file_metadata,
530 "stream_content": b"Example test",
531 "stream_content_bad": b"Example test2",
532 "path": './tmp/example_tar/directory/file',
533 },
534 5: {
535 "cursor": FakeCursor(5, 'example_tar/symlinks/file_link', symlink_metadata),
536 "metadata": symlink_metadata,
537 "stream_content": b"../directory/file",
538 "stream_content_bad": b"",
539 "path": './tmp/example_tar/symlinks/file_link',
540 },
541 6: {
542 "cursor": FakeCursor(6, 'example_tar/symlinks/directory_link', symlink_metadata),
543 "metadata": symlink_metadata,
544 "stream_content": b"../directory/",
545 "stream_content_bad": b"",
546 "path": './tmp/example_tar/symlinks/directory_link',
547 }
548 }
549
550 def upload_from_stream(self, f, stream, metadata=None):
551 found = False
552 for i, v in self.tar_info.items():
553 if f == v["path"]:
554 assert metadata["type"] == v["metadata"]["type"]
555 assert stream.read() == BytesIO(v["stream_content"]).read()
556 stream.seek(0)
557 assert stream.read() != BytesIO(v["stream_content_bad"]).read()
558 found = True
559 continue
560 assert found
561
562 def find(self, type, no_cursor_timeout=True, sort=None):
563 list = []
564 for i, v in self.tar_info.items():
565 if type["metadata.type"] == "dir":
566 if v["metadata"] == self.directory_metadata:
567 list.append(v["cursor"])
568 else:
569 if v["metadata"] != self.directory_metadata:
570 list.append(v["cursor"])
571 return list
572
573 def download_to_stream(self, id, file_stream):
574 file_stream.write(BytesIO(self.tar_info[id]["stream_content"]).read())
575
576
577 def test_file_extract():
578 tar_path = "tmp/Example.tar.gz"
579 folder_path = "tmp/example_tar"
580
581 # Generate package
582 subprocess.call(["rm", "-rf", "./tmp"])
583 subprocess.call(["mkdir", "-p", "{}/directory".format(folder_path)])
584 subprocess.call(["mkdir", "-p", "{}/symlinks".format(folder_path)])
585 p = Path("{}/directory/file".format(folder_path))
586 p.write_text("Example test")
587 os.symlink("../directory/file", "{}/symlinks/file_link".format(folder_path))
588 os.symlink("../directory/", "{}/symlinks/directory_link".format(folder_path))
589 if os.path.exists(tar_path):
590 os.remove(tar_path)
591 subprocess.call(["tar", "-czvf", tar_path, folder_path])
592
593 try:
594 tar = tarfile.open(tar_path, "r")
595 fs = FsMongo()
596 fs.fs = FakeFS()
597 fs.file_extract(tar_object=tar, path=".")
598 finally:
599 os.remove(tar_path)
600 subprocess.call(["rm", "-rf", "./tmp"])
601
602
603 def test_upload_local_fs():
604 path = "./tmp/"
605
606 subprocess.call(["rm", "-rf", path])
607 try:
608 fs = FsMongo()
609 fs.path = path
610 fs.fs = FakeFS()
611 fs.sync()
612 assert os.path.isdir("{}example_tar".format(path))
613 assert os.path.isdir("{}example_tar/directory".format(path))
614 assert os.path.isdir("{}example_tar/symlinks".format(path))
615 assert os.path.isfile("{}example_tar/directory/file".format(path))
616 assert os.path.islink("{}example_tar/symlinks/file_link".format(path))
617 assert os.path.islink("{}example_tar/symlinks/directory_link".format(path))
618 finally:
619 subprocess.call(["rm", "-rf", path])
620
621
622 def test_upload_mongo_fs():
623 path = "./tmp/"
624
625 subprocess.call(["rm", "-rf", path])
626 try:
627 fs = FsMongo()
628 fs.path = path
629 fs.fs = Mock()
630 fs.fs.find.return_value = {}
631
632 file_content = "Test file content"
633
634 # Create local dir and upload content to fakefs
635 os.mkdir(path)
636 os.mkdir("{}example_local".format(path))
637 os.mkdir("{}example_local/directory".format(path))
638 with open("{}example_local/directory/test_file".format(path), "w+") as test_file:
639 test_file.write(file_content)
640 fs.reverse_sync("example_local")
641
642 assert fs.fs.upload_from_stream.call_count == 2
643
644 # first call to upload_from_stream, dir_name
645 dir_name = "example_local/directory"
646 call_args_0 = fs.fs.upload_from_stream.call_args_list[0]
647 assert call_args_0[0][0] == dir_name
648 assert call_args_0[1].get("metadata").get("type") == "dir"
649
650 # second call to upload_from_stream, dir_name
651 file_name = "example_local/directory/test_file"
652 call_args_1 = fs.fs.upload_from_stream.call_args_list[1]
653 assert call_args_1[0][0] == file_name
654 assert call_args_1[1].get("metadata").get("type") == "file"
655
656 finally:
657 subprocess.call(["rm", "-rf", path])
658 pass