Enable pylint, black and flake8 in tox.ini
[osm/NBI.git] / osm_nbi / tests / test_admin_topics.py
1 #! /usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 __author__ = "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com"
18 __date__ = "$2019-10-019"
19
20 import unittest
21 from unittest import TestCase
22 from unittest.mock import Mock, patch, call
23 from uuid import uuid4
24 from http import HTTPStatus
25 from time import time
26 from random import randint
27 from osm_common import dbbase, fsbase, msgbase
28 from osm_nbi import authconn, validation
29 from osm_nbi.admin_topics import (
30 ProjectTopicAuth,
31 RoleTopicAuth,
32 UserTopicAuth,
33 CommonVimWimSdn,
34 VcaTopic,
35 )
36 from osm_nbi.engine import EngineException
37 from osm_nbi.authconn import AuthconnNotFoundException
38
39
40 test_pid = str(uuid4())
41 test_name = "test-user"
42
43
44 def norm(str):
45 """Normalize string for checking"""
46 return " ".join(str.strip().split()).lower()
47
48
49 class TestVcaTopic(TestCase):
50 def setUp(self):
51 self.db = Mock(dbbase.DbBase())
52 self.fs = Mock(fsbase.FsBase())
53 self.msg = Mock(msgbase.MsgBase())
54 self.auth = Mock(authconn.Authconn(None, None, None))
55 self.vca_topic = VcaTopic(self.db, self.fs, self.msg, self.auth)
56
57 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_new")
58 def test_format_on_new(self, mock_super_format_on_new):
59 content = {
60 "_id": "id",
61 "secret": "encrypted_secret",
62 "cacert": "encrypted_cacert",
63 }
64 self.db.encrypt.side_effect = ["secret", "cacert"]
65 mock_super_format_on_new.return_value = "1234"
66
67 oid = self.vca_topic.format_on_new(content)
68
69 self.assertEqual(oid, "1234")
70 self.assertEqual(content["secret"], "secret")
71 self.assertEqual(content["cacert"], "cacert")
72 self.db.encrypt.assert_has_calls(
73 [
74 call("encrypted_secret", schema_version="1.11", salt="id"),
75 call("encrypted_cacert", schema_version="1.11", salt="id"),
76 ]
77 )
78 mock_super_format_on_new.assert_called_with(content, None, False)
79
80 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_edit")
81 def test_format_on_edit(self, mock_super_format_on_edit):
82 edit_content = {
83 "_id": "id",
84 "secret": "encrypted_secret",
85 "cacert": "encrypted_cacert",
86 }
87 final_content = {
88 "_id": "id",
89 "schema_version": "1.11",
90 }
91 self.db.encrypt.side_effect = ["secret", "cacert"]
92 mock_super_format_on_edit.return_value = "1234"
93
94 oid = self.vca_topic.format_on_edit(final_content, edit_content)
95
96 self.assertEqual(oid, "1234")
97 self.assertEqual(final_content["secret"], "secret")
98 self.assertEqual(final_content["cacert"], "cacert")
99 self.db.encrypt.assert_has_calls(
100 [
101 call("encrypted_secret", schema_version="1.11", salt="id"),
102 call("encrypted_cacert", schema_version="1.11", salt="id"),
103 ]
104 )
105 mock_super_format_on_edit.assert_called()
106
107 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
108 def test_check_conflict_on_del(self, mock_check_conflict_on_del):
109 session = {
110 "project_id": "project-id",
111 "force": False,
112 }
113 _id = "vca-id"
114 db_content = {}
115
116 self.db.get_list.return_value = None
117
118 self.vca_topic.check_conflict_on_del(session, _id, db_content)
119
120 self.db.get_list.assert_called_with(
121 "vim_accounts",
122 {"vca": _id, "_admin.projects_read.cont": "project-id"},
123 )
124 mock_check_conflict_on_del.assert_called_with(session, _id, db_content)
125
126 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
127 def test_check_conflict_on_del_force(self, mock_check_conflict_on_del):
128 session = {
129 "project_id": "project-id",
130 "force": True,
131 }
132 _id = "vca-id"
133 db_content = {}
134
135 self.vca_topic.check_conflict_on_del(session, _id, db_content)
136
137 self.db.get_list.assert_not_called()
138 mock_check_conflict_on_del.assert_not_called()
139
140 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
141 def test_check_conflict_on_del_with_conflict(self, mock_check_conflict_on_del):
142 session = {
143 "project_id": "project-id",
144 "force": False,
145 }
146 _id = "vca-id"
147 db_content = {}
148
149 self.db.get_list.return_value = {"_id": "vim", "vca": "vca-id"}
150
151 with self.assertRaises(EngineException) as context:
152 self.vca_topic.check_conflict_on_del(session, _id, db_content)
153 self.assertEqual(
154 context.exception,
155 EngineException(
156 "There is at least one VIM account using this vca",
157 http_code=HTTPStatus.CONFLICT,
158 ),
159 )
160
161 self.db.get_list.assert_called_with(
162 "vim_accounts",
163 {"vca": _id, "_admin.projects_read.cont": "project-id"},
164 )
165 mock_check_conflict_on_del.assert_not_called()
166
167
168 class Test_ProjectTopicAuth(TestCase):
169 @classmethod
170 def setUpClass(cls):
171 cls.test_name = "test-project-topic"
172
173 def setUp(self):
174 self.db = Mock(dbbase.DbBase())
175 self.fs = Mock(fsbase.FsBase())
176 self.msg = Mock(msgbase.MsgBase())
177 self.auth = Mock(authconn.Authconn(None, None, None))
178 self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth)
179 self.fake_session = {
180 "username": self.test_name,
181 "project_id": (test_pid,),
182 "method": None,
183 "admin": True,
184 "force": False,
185 "public": False,
186 "allow_show_user_project_role": True,
187 }
188 self.topic.check_quota = Mock(return_value=None) # skip quota
189
190 def test_new_project(self):
191 with self.subTest(i=1):
192 rollback = []
193 pid1 = str(uuid4())
194 self.auth.get_project_list.return_value = []
195 self.auth.create_project.return_value = pid1
196 pid2, oid = self.topic.new(
197 rollback, self.fake_session, {"name": self.test_name, "quotas": {}}
198 )
199 self.assertEqual(len(rollback), 1, "Wrong rollback length")
200 self.assertEqual(pid2, pid1, "Wrong project identifier")
201 content = self.auth.create_project.call_args[0][0]
202 self.assertEqual(content["name"], self.test_name, "Wrong project name")
203 self.assertEqual(content["quotas"], {}, "Wrong quotas")
204 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
205 self.assertEqual(
206 content["_admin"]["modified"],
207 content["_admin"]["created"],
208 "Wrong modification time",
209 )
210 with self.subTest(i=2):
211 rollback = []
212 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
213 self.topic.new(
214 rollback,
215 self.fake_session,
216 {"name": "other-project-name", "quotas": {"baditems": 10}},
217 )
218 self.assertEqual(len(rollback), 0, "Wrong rollback length")
219 self.assertEqual(
220 e.exception.http_code,
221 HTTPStatus.UNPROCESSABLE_ENTITY,
222 "Wrong HTTP status code",
223 )
224 self.assertIn(
225 "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format(
226 "baditems"
227 ),
228 norm(str(e.exception)),
229 "Wrong exception text",
230 )
231
232 def test_edit_project(self):
233 now = time()
234 pid = str(uuid4())
235 proj = {
236 "_id": pid,
237 "name": self.test_name,
238 "_admin": {"created": now, "modified": now},
239 }
240 with self.subTest(i=1):
241 self.auth.get_project_list.side_effect = [[proj], []]
242 new_name = "new-project-name"
243 quotas = {"vnfds": randint(0, 100), "nsds": randint(0, 100)}
244 self.topic.edit(
245 self.fake_session, pid, {"name": new_name, "quotas": quotas}
246 )
247 _id, content = self.auth.update_project.call_args[0]
248 self.assertEqual(_id, pid, "Wrong project identifier")
249 self.assertEqual(content["_id"], pid, "Wrong project identifier")
250 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
251 self.assertGreater(
252 content["_admin"]["modified"], now, "Wrong modification time"
253 )
254 self.assertEqual(content["name"], new_name, "Wrong project name")
255 self.assertEqual(content["quotas"], quotas, "Wrong quotas")
256 with self.subTest(i=2):
257 new_name = "other-project-name"
258 quotas = {"baditems": randint(0, 100)}
259 self.auth.get_project_list.side_effect = [[proj], []]
260 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
261 self.topic.edit(
262 self.fake_session, pid, {"name": new_name, "quotas": quotas}
263 )
264 self.assertEqual(
265 e.exception.http_code,
266 HTTPStatus.UNPROCESSABLE_ENTITY,
267 "Wrong HTTP status code",
268 )
269 self.assertIn(
270 "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format(
271 "baditems"
272 ),
273 norm(str(e.exception)),
274 "Wrong exception text",
275 )
276
277 def test_conflict_on_new(self):
278 with self.subTest(i=1):
279 rollback = []
280 pid = str(uuid4())
281 with self.assertRaises(
282 EngineException, msg="Accepted uuid as project name"
283 ) as e:
284 self.topic.new(rollback, self.fake_session, {"name": pid})
285 self.assertEqual(len(rollback), 0, "Wrong rollback length")
286 self.assertEqual(
287 e.exception.http_code,
288 HTTPStatus.UNPROCESSABLE_ENTITY,
289 "Wrong HTTP status code",
290 )
291 self.assertIn(
292 "project name '{}' cannot have an uuid format".format(pid),
293 norm(str(e.exception)),
294 "Wrong exception text",
295 )
296 with self.subTest(i=2):
297 rollback = []
298 self.auth.get_project_list.return_value = [
299 {"_id": test_pid, "name": self.test_name}
300 ]
301 with self.assertRaises(
302 EngineException, msg="Accepted existing project name"
303 ) as e:
304 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
305 self.assertEqual(len(rollback), 0, "Wrong rollback length")
306 self.assertEqual(
307 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
308 )
309 self.assertIn(
310 "project '{}' exists".format(self.test_name),
311 norm(str(e.exception)),
312 "Wrong exception text",
313 )
314
315 def test_conflict_on_edit(self):
316 with self.subTest(i=1):
317 self.auth.get_project_list.return_value = [
318 {"_id": test_pid, "name": self.test_name}
319 ]
320 new_name = str(uuid4())
321 with self.assertRaises(
322 EngineException, msg="Accepted uuid as project name"
323 ) as e:
324 self.topic.edit(self.fake_session, test_pid, {"name": new_name})
325 self.assertEqual(
326 e.exception.http_code,
327 HTTPStatus.UNPROCESSABLE_ENTITY,
328 "Wrong HTTP status code",
329 )
330 self.assertIn(
331 "project name '{}' cannot have an uuid format".format(new_name),
332 norm(str(e.exception)),
333 "Wrong exception text",
334 )
335 with self.subTest(i=2):
336 pid = str(uuid4())
337 self.auth.get_project_list.return_value = [{"_id": pid, "name": "admin"}]
338 with self.assertRaises(
339 EngineException, msg="Accepted renaming of project 'admin'"
340 ) as e:
341 self.topic.edit(self.fake_session, pid, {"name": "new-name"})
342 self.assertEqual(
343 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
344 )
345 self.assertIn(
346 "you cannot rename project 'admin'",
347 norm(str(e.exception)),
348 "Wrong exception text",
349 )
350 with self.subTest(i=3):
351 new_name = "new-project-name"
352 self.auth.get_project_list.side_effect = [
353 [{"_id": test_pid, "name": self.test_name}],
354 [{"_id": str(uuid4()), "name": new_name}],
355 ]
356 with self.assertRaises(
357 EngineException, msg="Accepted existing project name"
358 ) as e:
359 self.topic.edit(self.fake_session, pid, {"name": new_name})
360 self.assertEqual(
361 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
362 )
363 self.assertIn(
364 "project '{}' is already used".format(new_name),
365 norm(str(e.exception)),
366 "Wrong exception text",
367 )
368
369 def test_delete_project(self):
370 with self.subTest(i=1):
371 pid = str(uuid4())
372 self.auth.get_project.return_value = {
373 "_id": pid,
374 "name": "other-project-name",
375 }
376 self.auth.delete_project.return_value = {"deleted": 1}
377 self.auth.get_user_list.return_value = []
378 self.db.get_list.return_value = []
379 rc = self.topic.delete(self.fake_session, pid)
380 self.assertEqual(rc, {"deleted": 1}, "Wrong project deletion return info")
381 self.assertEqual(
382 self.auth.get_project.call_args[0][0], pid, "Wrong project identifier"
383 )
384 self.assertEqual(
385 self.auth.delete_project.call_args[0][0],
386 pid,
387 "Wrong project identifier",
388 )
389
390 def test_conflict_on_del(self):
391 with self.subTest(i=1):
392 self.auth.get_project.return_value = {
393 "_id": test_pid,
394 "name": self.test_name,
395 }
396 with self.assertRaises(
397 EngineException, msg="Accepted deletion of own project"
398 ) as e:
399 self.topic.delete(self.fake_session, self.test_name)
400 self.assertEqual(
401 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
402 )
403 self.assertIn(
404 "you cannot delete your own project",
405 norm(str(e.exception)),
406 "Wrong exception text",
407 )
408 with self.subTest(i=2):
409 self.auth.get_project.return_value = {"_id": str(uuid4()), "name": "admin"}
410 with self.assertRaises(
411 EngineException, msg="Accepted deletion of project 'admin'"
412 ) as e:
413 self.topic.delete(self.fake_session, "admin")
414 self.assertEqual(
415 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
416 )
417 self.assertIn(
418 "you cannot delete project 'admin'",
419 norm(str(e.exception)),
420 "Wrong exception text",
421 )
422 with self.subTest(i=3):
423 pid = str(uuid4())
424 name = "other-project-name"
425 self.auth.get_project.return_value = {"_id": pid, "name": name}
426 self.auth.get_user_list.return_value = [
427 {
428 "_id": str(uuid4()),
429 "username": self.test_name,
430 "project_role_mappings": [{"project": pid, "role": str(uuid4())}],
431 }
432 ]
433 with self.assertRaises(
434 EngineException, msg="Accepted deletion of used project"
435 ) as e:
436 self.topic.delete(self.fake_session, pid)
437 self.assertEqual(
438 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
439 )
440 self.assertIn(
441 "project '{}' ({}) is being used by user '{}'".format(
442 name, pid, self.test_name
443 ),
444 norm(str(e.exception)),
445 "Wrong exception text",
446 )
447 with self.subTest(i=4):
448 self.auth.get_user_list.return_value = []
449 self.db.get_list.return_value = [
450 {
451 "_id": str(uuid4()),
452 "id": self.test_name,
453 "_admin": {"projects_read": [pid], "projects_write": []},
454 }
455 ]
456 with self.assertRaises(
457 EngineException, msg="Accepted deletion of used project"
458 ) as e:
459 self.topic.delete(self.fake_session, pid)
460 self.assertEqual(
461 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
462 )
463 self.assertIn(
464 "project '{}' ({}) is being used by {} '{}'".format(
465 name, pid, "vnf descriptor", self.test_name
466 ),
467 norm(str(e.exception)),
468 "Wrong exception text",
469 )
470
471
472 class Test_RoleTopicAuth(TestCase):
473 @classmethod
474 def setUpClass(cls):
475 cls.test_name = "test-role-topic"
476 cls.test_operations = ["tokens:get"]
477
478 def setUp(self):
479 self.db = Mock(dbbase.DbBase())
480 self.fs = Mock(fsbase.FsBase())
481 self.msg = Mock(msgbase.MsgBase())
482 self.auth = Mock(authconn.Authconn(None, None, None))
483 self.auth.role_permissions = self.test_operations
484 self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth)
485 self.fake_session = {
486 "username": test_name,
487 "project_id": (test_pid,),
488 "method": None,
489 "admin": True,
490 "force": False,
491 "public": False,
492 "allow_show_user_project_role": True,
493 }
494 self.topic.check_quota = Mock(return_value=None) # skip quota
495
496 def test_new_role(self):
497 with self.subTest(i=1):
498 rollback = []
499 rid1 = str(uuid4())
500 perms_in = {"tokens": True}
501 perms_out = {"default": False, "admin": False, "tokens": True}
502 self.auth.get_role_list.return_value = []
503 self.auth.create_role.return_value = rid1
504 rid2, oid = self.topic.new(
505 rollback,
506 self.fake_session,
507 {"name": self.test_name, "permissions": perms_in},
508 )
509 self.assertEqual(len(rollback), 1, "Wrong rollback length")
510 self.assertEqual(rid2, rid1, "Wrong project identifier")
511 content = self.auth.create_role.call_args[0][0]
512 self.assertEqual(content["name"], self.test_name, "Wrong role name")
513 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
514 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
515 self.assertEqual(
516 content["_admin"]["modified"],
517 content["_admin"]["created"],
518 "Wrong modification time",
519 )
520 with self.subTest(i=2):
521 rollback = []
522 with self.assertRaises(
523 EngineException, msg="Accepted wrong permissions"
524 ) as e:
525 self.topic.new(
526 rollback,
527 self.fake_session,
528 {"name": "other-role-name", "permissions": {"projects": True}},
529 )
530 self.assertEqual(len(rollback), 0, "Wrong rollback length")
531 self.assertEqual(
532 e.exception.http_code,
533 HTTPStatus.UNPROCESSABLE_ENTITY,
534 "Wrong HTTP status code",
535 )
536 self.assertIn(
537 "invalid permission '{}'".format("projects"),
538 norm(str(e.exception)),
539 "Wrong exception text",
540 )
541
542 def test_edit_role(self):
543 now = time()
544 rid = str(uuid4())
545 role = {
546 "_id": rid,
547 "name": self.test_name,
548 "permissions": {"tokens": True},
549 "_admin": {"created": now, "modified": now},
550 }
551 with self.subTest(i=1):
552 self.auth.get_role_list.side_effect = [[role], []]
553 self.auth.get_role.return_value = role
554 new_name = "new-role-name"
555 perms_in = {"tokens": False, "tokens:get": True}
556 perms_out = {
557 "default": False,
558 "admin": False,
559 "tokens": False,
560 "tokens:get": True,
561 }
562 self.topic.edit(
563 self.fake_session, rid, {"name": new_name, "permissions": perms_in}
564 )
565 content = self.auth.update_role.call_args[0][0]
566 self.assertEqual(content["_id"], rid, "Wrong role identifier")
567 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
568 self.assertGreater(
569 content["_admin"]["modified"], now, "Wrong modification time"
570 )
571 self.assertEqual(content["name"], new_name, "Wrong role name")
572 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
573 with self.subTest(i=2):
574 new_name = "other-role-name"
575 perms_in = {"tokens": False, "tokens:post": True}
576 self.auth.get_role_list.side_effect = [[role], []]
577 with self.assertRaises(
578 EngineException, msg="Accepted wrong permissions"
579 ) as e:
580 self.topic.edit(
581 self.fake_session, rid, {"name": new_name, "permissions": perms_in}
582 )
583 self.assertEqual(
584 e.exception.http_code,
585 HTTPStatus.UNPROCESSABLE_ENTITY,
586 "Wrong HTTP status code",
587 )
588 self.assertIn(
589 "invalid permission '{}'".format("tokens:post"),
590 norm(str(e.exception)),
591 "Wrong exception text",
592 )
593
594 def test_delete_role(self):
595 with self.subTest(i=1):
596 rid = str(uuid4())
597 role = {"_id": rid, "name": "other-role-name"}
598 self.auth.get_role_list.return_value = [role]
599 self.auth.get_role.return_value = role
600 self.auth.delete_role.return_value = {"deleted": 1}
601 self.auth.get_user_list.return_value = []
602 rc = self.topic.delete(self.fake_session, rid)
603 self.assertEqual(rc, {"deleted": 1}, "Wrong role deletion return info")
604 self.assertEqual(
605 self.auth.get_role_list.call_args[0][0]["_id"],
606 rid,
607 "Wrong role identifier",
608 )
609 self.assertEqual(
610 self.auth.get_role.call_args[0][0], rid, "Wrong role identifier"
611 )
612 self.assertEqual(
613 self.auth.delete_role.call_args[0][0], rid, "Wrong role identifier"
614 )
615
616 def test_conflict_on_new(self):
617 with self.subTest(i=1):
618 rollback = []
619 rid = str(uuid4())
620 with self.assertRaises(
621 EngineException, msg="Accepted uuid as role name"
622 ) as e:
623 self.topic.new(rollback, self.fake_session, {"name": rid})
624 self.assertEqual(len(rollback), 0, "Wrong rollback length")
625 self.assertEqual(
626 e.exception.http_code,
627 HTTPStatus.UNPROCESSABLE_ENTITY,
628 "Wrong HTTP status code",
629 )
630 self.assertIn(
631 "role name '{}' cannot have an uuid format".format(rid),
632 norm(str(e.exception)),
633 "Wrong exception text",
634 )
635 with self.subTest(i=2):
636 rollback = []
637 self.auth.get_role_list.return_value = [
638 {"_id": str(uuid4()), "name": self.test_name}
639 ]
640 with self.assertRaises(
641 EngineException, msg="Accepted existing role name"
642 ) as e:
643 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
644 self.assertEqual(len(rollback), 0, "Wrong rollback length")
645 self.assertEqual(
646 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
647 )
648 self.assertIn(
649 "role name '{}' exists".format(self.test_name),
650 norm(str(e.exception)),
651 "Wrong exception text",
652 )
653
654 def test_conflict_on_edit(self):
655 rid = str(uuid4())
656 with self.subTest(i=1):
657 self.auth.get_role_list.return_value = [
658 {"_id": rid, "name": self.test_name, "permissions": {}}
659 ]
660 new_name = str(uuid4())
661 with self.assertRaises(
662 EngineException, msg="Accepted uuid as role name"
663 ) as e:
664 self.topic.edit(self.fake_session, rid, {"name": new_name})
665 self.assertEqual(
666 e.exception.http_code,
667 HTTPStatus.UNPROCESSABLE_ENTITY,
668 "Wrong HTTP status code",
669 )
670 self.assertIn(
671 "role name '{}' cannot have an uuid format".format(new_name),
672 norm(str(e.exception)),
673 "Wrong exception text",
674 )
675 for i, role_name in enumerate(["system_admin", "project_admin"], start=2):
676 with self.subTest(i=i):
677 rid = str(uuid4())
678 self.auth.get_role.return_value = {
679 "_id": rid,
680 "name": role_name,
681 "permissions": {},
682 }
683 with self.assertRaises(
684 EngineException,
685 msg="Accepted renaming of role '{}'".format(role_name),
686 ) as e:
687 self.topic.edit(self.fake_session, rid, {"name": "new-name"})
688 self.assertEqual(
689 e.exception.http_code,
690 HTTPStatus.FORBIDDEN,
691 "Wrong HTTP status code",
692 )
693 self.assertIn(
694 "you cannot rename role '{}'".format(role_name),
695 norm(str(e.exception)),
696 "Wrong exception text",
697 )
698 with self.subTest(i=i + 1):
699 new_name = "new-role-name"
700 self.auth.get_role_list.side_effect = [
701 [{"_id": rid, "name": self.test_name, "permissions": {}}],
702 [{"_id": str(uuid4()), "name": new_name, "permissions": {}}],
703 ]
704 self.auth.get_role.return_value = {
705 "_id": rid,
706 "name": self.test_name,
707 "permissions": {},
708 }
709 with self.assertRaises(
710 EngineException, msg="Accepted existing role name"
711 ) as e:
712 self.topic.edit(self.fake_session, rid, {"name": new_name})
713 self.assertEqual(
714 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
715 )
716 self.assertIn(
717 "role name '{}' exists".format(new_name),
718 norm(str(e.exception)),
719 "Wrong exception text",
720 )
721
722 def test_conflict_on_del(self):
723 for i, role_name in enumerate(["system_admin", "project_admin"], start=1):
724 with self.subTest(i=i):
725 rid = str(uuid4())
726 role = {"_id": rid, "name": role_name}
727 self.auth.get_role_list.return_value = [role]
728 self.auth.get_role.return_value = role
729 with self.assertRaises(
730 EngineException,
731 msg="Accepted deletion of role '{}'".format(role_name),
732 ) as e:
733 self.topic.delete(self.fake_session, rid)
734 self.assertEqual(
735 e.exception.http_code,
736 HTTPStatus.FORBIDDEN,
737 "Wrong HTTP status code",
738 )
739 self.assertIn(
740 "you cannot delete role '{}'".format(role_name),
741 norm(str(e.exception)),
742 "Wrong exception text",
743 )
744 with self.subTest(i=i + 1):
745 rid = str(uuid4())
746 name = "other-role-name"
747 role = {"_id": rid, "name": name}
748 self.auth.get_role_list.return_value = [role]
749 self.auth.get_role.return_value = role
750 self.auth.get_user_list.return_value = [
751 {
752 "_id": str(uuid4()),
753 "username": self.test_name,
754 "project_role_mappings": [{"project": str(uuid4()), "role": rid}],
755 }
756 ]
757 with self.assertRaises(
758 EngineException, msg="Accepted deletion of used role"
759 ) as e:
760 self.topic.delete(self.fake_session, rid)
761 self.assertEqual(
762 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
763 )
764 self.assertIn(
765 "role '{}' ({}) is being used by user '{}'".format(
766 name, rid, self.test_name
767 ),
768 norm(str(e.exception)),
769 "Wrong exception text",
770 )
771
772
773 class Test_UserTopicAuth(TestCase):
774 @classmethod
775 def setUpClass(cls):
776 cls.test_name = "test-user-topic"
777
778 def setUp(self):
779 self.db = Mock(dbbase.DbBase())
780 self.fs = Mock(fsbase.FsBase())
781 self.msg = Mock(msgbase.MsgBase())
782 self.auth = Mock(authconn.Authconn(None, None, None))
783 self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth)
784 self.fake_session = {
785 "username": test_name,
786 "project_id": (test_pid,),
787 "method": None,
788 "admin": True,
789 "force": False,
790 "public": False,
791 "allow_show_user_project_role": True,
792 }
793 self.topic.check_quota = Mock(return_value=None) # skip quota
794
795 def test_new_user(self):
796 uid1 = str(uuid4())
797 pid = str(uuid4())
798 self.auth.get_user_list.return_value = []
799 self.auth.get_project.return_value = {"_id": pid, "name": "some_project"}
800 self.auth.create_user.return_value = {"_id": uid1, "username": self.test_name}
801 with self.subTest(i=1):
802 rollback = []
803 rid = str(uuid4())
804 self.auth.get_role.return_value = {"_id": rid, "name": "some_role"}
805 prms_in = [{"project": "some_project", "role": "some_role"}]
806 prms_out = [{"project": pid, "role": rid}]
807 uid2, oid = self.topic.new(
808 rollback,
809 self.fake_session,
810 {
811 "username": self.test_name,
812 "password": self.test_name,
813 "project_role_mappings": prms_in,
814 },
815 )
816 self.assertEqual(len(rollback), 1, "Wrong rollback length")
817 self.assertEqual(uid2, uid1, "Wrong project identifier")
818 content = self.auth.create_user.call_args[0][0]
819 self.assertEqual(content["username"], self.test_name, "Wrong project name")
820 self.assertEqual(content["password"], self.test_name, "Wrong password")
821 self.assertEqual(
822 content["project_role_mappings"],
823 prms_out,
824 "Wrong project-role mappings",
825 )
826 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
827 self.assertEqual(
828 content["_admin"]["modified"],
829 content["_admin"]["created"],
830 "Wrong modification time",
831 )
832 with self.subTest(i=2):
833 rollback = []
834 def_rid = str(uuid4())
835 def_role = {"_id": def_rid, "name": "project_admin"}
836 self.auth.get_role.return_value = def_role
837 self.auth.get_role_list.return_value = [def_role]
838 prms_out = [{"project": pid, "role": def_rid}]
839 uid2, oid = self.topic.new(
840 rollback,
841 self.fake_session,
842 {
843 "username": self.test_name,
844 "password": self.test_name,
845 "projects": ["some_project"],
846 },
847 )
848 self.assertEqual(len(rollback), 1, "Wrong rollback length")
849 self.assertEqual(uid2, uid1, "Wrong project identifier")
850 content = self.auth.create_user.call_args[0][0]
851 self.assertEqual(content["username"], self.test_name, "Wrong project name")
852 self.assertEqual(content["password"], self.test_name, "Wrong password")
853 self.assertEqual(
854 content["project_role_mappings"],
855 prms_out,
856 "Wrong project-role mappings",
857 )
858 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
859 self.assertEqual(
860 content["_admin"]["modified"],
861 content["_admin"]["created"],
862 "Wrong modification time",
863 )
864 with self.subTest(i=3):
865 rollback = []
866 with self.assertRaises(
867 EngineException, msg="Accepted wrong project-role mappings"
868 ) as e:
869 self.topic.new(
870 rollback,
871 self.fake_session,
872 {
873 "username": "other-project-name",
874 "password": "other-password",
875 "project_role_mappings": [{}],
876 },
877 )
878 self.assertEqual(len(rollback), 0, "Wrong rollback length")
879 self.assertEqual(
880 e.exception.http_code,
881 HTTPStatus.UNPROCESSABLE_ENTITY,
882 "Wrong HTTP status code",
883 )
884 self.assertIn(
885 "format error at '{}' '{}'".format(
886 "project_role_mappings:{}", "'{}' is a required property"
887 ).format(0, "project"),
888 norm(str(e.exception)),
889 "Wrong exception text",
890 )
891 with self.subTest(i=4):
892 rollback = []
893 with self.assertRaises(EngineException, msg="Accepted wrong projects") as e:
894 self.topic.new(
895 rollback,
896 self.fake_session,
897 {
898 "username": "other-project-name",
899 "password": "other-password",
900 "projects": [],
901 },
902 )
903 self.assertEqual(len(rollback), 0, "Wrong rollback length")
904 self.assertEqual(
905 e.exception.http_code,
906 HTTPStatus.UNPROCESSABLE_ENTITY,
907 "Wrong HTTP status code",
908 )
909 self.assertIn(
910 "format error at '{}' '{}'".format(
911 "projects", "{} is too short"
912 ).format([]),
913 norm(str(e.exception)),
914 "Wrong exception text",
915 )
916
917 def test_edit_user(self):
918 now = time()
919 uid = str(uuid4())
920 pid1 = str(uuid4())
921 rid1 = str(uuid4())
922 prms = [
923 {
924 "project": pid1,
925 "project_name": "project-1",
926 "role": rid1,
927 "role_name": "role-1",
928 }
929 ]
930 user = {
931 "_id": uid,
932 "username": self.test_name,
933 "project_role_mappings": prms,
934 "_admin": {"created": now, "modified": now},
935 }
936 with self.subTest(i=1):
937 self.auth.get_user_list.side_effect = [[user], []]
938 self.auth.get_user.return_value = user
939 pid2 = str(uuid4())
940 rid2 = str(uuid4())
941 self.auth.get_project.side_effect = [
942 {"_id": pid2, "name": "project-2"},
943 {"_id": pid1, "name": "project-1"},
944 ]
945 self.auth.get_role.side_effect = [
946 {"_id": rid2, "name": "role-2"},
947 {"_id": rid1, "name": "role-1"},
948 ]
949 new_name = "new-user-name"
950 new_pasw = "new-password"
951 add_prms = [{"project": pid2, "role": rid2}]
952 rem_prms = [{"project": pid1, "role": rid1}]
953 self.topic.edit(
954 self.fake_session,
955 uid,
956 {
957 "username": new_name,
958 "password": new_pasw,
959 "add_project_role_mappings": add_prms,
960 "remove_project_role_mappings": rem_prms,
961 },
962 )
963 content = self.auth.update_user.call_args[0][0]
964 self.assertEqual(content["_id"], uid, "Wrong user identifier")
965 self.assertEqual(content["username"], new_name, "Wrong user name")
966 self.assertEqual(content["password"], new_pasw, "Wrong user password")
967 self.assertEqual(
968 content["add_project_role_mappings"],
969 add_prms,
970 "Wrong project-role mappings to add",
971 )
972 self.assertEqual(
973 content["remove_project_role_mappings"],
974 prms,
975 "Wrong project-role mappings to remove",
976 )
977 with self.subTest(i=2):
978 new_name = "other-user-name"
979 new_prms = [{}]
980 self.auth.get_role_list.side_effect = [[user], []]
981 self.auth.get_user_list.side_effect = [[user]]
982 with self.assertRaises(
983 EngineException, msg="Accepted wrong project-role mappings"
984 ) as e:
985 self.topic.edit(
986 self.fake_session,
987 uid,
988 {"username": new_name, "project_role_mappings": new_prms},
989 )
990 self.assertEqual(
991 e.exception.http_code,
992 HTTPStatus.UNPROCESSABLE_ENTITY,
993 "Wrong HTTP status code",
994 )
995 self.assertIn(
996 "format error at '{}' '{}'".format(
997 "project_role_mappings:{}", "'{}' is a required property"
998 ).format(0, "project"),
999 norm(str(e.exception)),
1000 "Wrong exception text",
1001 )
1002 with self.subTest(i=3):
1003 self.auth.get_user_list.side_effect = [[user], []]
1004 self.auth.get_user.return_value = user
1005 old_password = self.test_name
1006 new_pasw = "new-password"
1007 self.topic.edit(
1008 self.fake_session,
1009 uid,
1010 {
1011 "old_password": old_password,
1012 "password": new_pasw,
1013 },
1014 )
1015 content = self.auth.update_user.call_args[0][0]
1016 self.assertEqual(
1017 content["old_password"], old_password, "Wrong old password"
1018 )
1019 self.assertEqual(content["password"], new_pasw, "Wrong user password")
1020
1021 def test_delete_user(self):
1022 with self.subTest(i=1):
1023 uid = str(uuid4())
1024 self.fake_session["username"] = self.test_name
1025 user = user = {
1026 "_id": uid,
1027 "username": "other-user-name",
1028 "project_role_mappings": [],
1029 }
1030 self.auth.get_user.return_value = user
1031 self.auth.delete_user.return_value = {"deleted": 1}
1032 rc = self.topic.delete(self.fake_session, uid)
1033 self.assertEqual(rc, {"deleted": 1}, "Wrong user deletion return info")
1034 self.assertEqual(
1035 self.auth.get_user.call_args[0][0], uid, "Wrong user identifier"
1036 )
1037 self.assertEqual(
1038 self.auth.delete_user.call_args[0][0], uid, "Wrong user identifier"
1039 )
1040
1041 def test_conflict_on_new(self):
1042 with self.subTest(i=1):
1043 rollback = []
1044 uid = str(uuid4())
1045 with self.assertRaises(
1046 EngineException, msg="Accepted uuid as username"
1047 ) as e:
1048 self.topic.new(
1049 rollback,
1050 self.fake_session,
1051 {
1052 "username": uid,
1053 "password": self.test_name,
1054 "projects": [test_pid],
1055 },
1056 )
1057 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1058 self.assertEqual(
1059 e.exception.http_code,
1060 HTTPStatus.UNPROCESSABLE_ENTITY,
1061 "Wrong HTTP status code",
1062 )
1063 self.assertIn(
1064 "username '{}' cannot have a uuid format".format(uid),
1065 norm(str(e.exception)),
1066 "Wrong exception text",
1067 )
1068 with self.subTest(i=2):
1069 rollback = []
1070 self.auth.get_user_list.return_value = [
1071 {"_id": str(uuid4()), "username": self.test_name}
1072 ]
1073 with self.assertRaises(
1074 EngineException, msg="Accepted existing username"
1075 ) as e:
1076 self.topic.new(
1077 rollback,
1078 self.fake_session,
1079 {
1080 "username": self.test_name,
1081 "password": self.test_name,
1082 "projects": [test_pid],
1083 },
1084 )
1085 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1086 self.assertEqual(
1087 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1088 )
1089 self.assertIn(
1090 "username '{}' is already used".format(self.test_name),
1091 norm(str(e.exception)),
1092 "Wrong exception text",
1093 )
1094 with self.subTest(i=3):
1095 rollback = []
1096 self.auth.get_user_list.return_value = []
1097 self.auth.get_role_list.side_effect = [[], []]
1098 with self.assertRaises(
1099 AuthconnNotFoundException, msg="Accepted user without default role"
1100 ) as e:
1101 self.topic.new(
1102 rollback,
1103 self.fake_session,
1104 {
1105 "username": self.test_name,
1106 "password": self.test_name,
1107 "projects": [str(uuid4())],
1108 },
1109 )
1110 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1111 self.assertEqual(
1112 e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code"
1113 )
1114 self.assertIn(
1115 "can't find default role for user '{}'".format(self.test_name),
1116 norm(str(e.exception)),
1117 "Wrong exception text",
1118 )
1119
1120 def test_conflict_on_edit(self):
1121 uid = str(uuid4())
1122 with self.subTest(i=1):
1123 self.auth.get_user_list.return_value = [
1124 {"_id": uid, "username": self.test_name}
1125 ]
1126 new_name = str(uuid4())
1127 with self.assertRaises(
1128 EngineException, msg="Accepted uuid as username"
1129 ) as e:
1130 self.topic.edit(self.fake_session, uid, {"username": new_name})
1131 self.assertEqual(
1132 e.exception.http_code,
1133 HTTPStatus.UNPROCESSABLE_ENTITY,
1134 "Wrong HTTP status code",
1135 )
1136 self.assertIn(
1137 "username '{}' cannot have an uuid format".format(new_name),
1138 norm(str(e.exception)),
1139 "Wrong exception text",
1140 )
1141 with self.subTest(i=2):
1142 self.auth.get_user_list.return_value = [
1143 {"_id": uid, "username": self.test_name}
1144 ]
1145 self.auth.get_role_list.side_effect = [[], []]
1146 with self.assertRaises(
1147 AuthconnNotFoundException, msg="Accepted user without default role"
1148 ) as e:
1149 self.topic.edit(self.fake_session, uid, {"projects": [str(uuid4())]})
1150 self.assertEqual(
1151 e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code"
1152 )
1153 self.assertIn(
1154 "can't find a default role for user '{}'".format(self.test_name),
1155 norm(str(e.exception)),
1156 "Wrong exception text",
1157 )
1158 with self.subTest(i=3):
1159 admin_uid = str(uuid4())
1160 self.auth.get_user_list.return_value = [
1161 {"_id": admin_uid, "username": "admin"}
1162 ]
1163 with self.assertRaises(
1164 EngineException,
1165 msg="Accepted removing system_admin role from admin user",
1166 ) as e:
1167 self.topic.edit(
1168 self.fake_session,
1169 admin_uid,
1170 {
1171 "remove_project_role_mappings": [
1172 {"project": "admin", "role": "system_admin"}
1173 ]
1174 },
1175 )
1176 self.assertEqual(
1177 e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code"
1178 )
1179 self.assertIn(
1180 "you cannot remove system_admin role from admin user",
1181 norm(str(e.exception)),
1182 "Wrong exception text",
1183 )
1184 with self.subTest(i=4):
1185 new_name = "new-user-name"
1186 self.auth.get_user_list.side_effect = [
1187 [{"_id": uid, "name": self.test_name}],
1188 [{"_id": str(uuid4()), "name": new_name}],
1189 ]
1190 with self.assertRaises(
1191 EngineException, msg="Accepted existing username"
1192 ) as e:
1193 self.topic.edit(self.fake_session, uid, {"username": new_name})
1194 self.assertEqual(
1195 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1196 )
1197 self.assertIn(
1198 "username '{}' is already used".format(new_name),
1199 norm(str(e.exception)),
1200 "Wrong exception text",
1201 )
1202
1203 def test_conflict_on_del(self):
1204 with self.subTest(i=1):
1205 uid = str(uuid4())
1206 self.fake_session["username"] = self.test_name
1207 user = user = {
1208 "_id": uid,
1209 "username": self.test_name,
1210 "project_role_mappings": [],
1211 }
1212 self.auth.get_user.return_value = user
1213 with self.assertRaises(
1214 EngineException, msg="Accepted deletion of own user"
1215 ) as e:
1216 self.topic.delete(self.fake_session, uid)
1217 self.assertEqual(
1218 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1219 )
1220 self.assertIn(
1221 "you cannot delete your own login user",
1222 norm(str(e.exception)),
1223 "Wrong exception text",
1224 )
1225
1226
1227 class Test_CommonVimWimSdn(TestCase):
1228 @classmethod
1229 def setUpClass(cls):
1230 cls.test_name = "test-cim-topic" # CIM = Common Infrastructure Manager
1231
1232 def setUp(self):
1233 self.db = Mock(dbbase.DbBase())
1234 self.fs = Mock(fsbase.FsBase())
1235 self.msg = Mock(msgbase.MsgBase())
1236 self.auth = Mock(authconn.Authconn(None, None, None))
1237 self.topic = CommonVimWimSdn(self.db, self.fs, self.msg, self.auth)
1238 # Use WIM schemas for testing because they are the simplest
1239 self.topic._send_msg = Mock()
1240 self.topic.topic = "wims"
1241 self.topic.schema_new = validation.wim_account_new_schema
1242 self.topic.schema_edit = validation.wim_account_edit_schema
1243 self.fake_session = {
1244 "username": test_name,
1245 "project_id": (test_pid,),
1246 "method": None,
1247 "admin": True,
1248 "force": False,
1249 "public": False,
1250 "allow_show_user_project_role": True,
1251 }
1252 self.topic.check_quota = Mock(return_value=None) # skip quota
1253
1254 def test_new_cvws(self):
1255 test_url = "http://0.0.0.0:0"
1256 with self.subTest(i=1):
1257 rollback = []
1258 test_type = "fake"
1259 self.db.get_one.return_value = None
1260 self.db.create.side_effect = lambda self, content: content["_id"]
1261 cid, oid = self.topic.new(
1262 rollback,
1263 self.fake_session,
1264 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type},
1265 )
1266 self.assertEqual(len(rollback), 1, "Wrong rollback length")
1267 args = self.db.create.call_args[0]
1268 content = args[1]
1269 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
1270 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
1271 self.assertEqual(content["name"], self.test_name, "Wrong CIM name")
1272 self.assertEqual(content["wim_url"], test_url, "Wrong URL")
1273 self.assertEqual(content["wim_type"], test_type, "Wrong CIM type")
1274 self.assertEqual(content["schema_version"], "1.11", "Wrong schema version")
1275 self.assertEqual(content["op_id"], oid, "Wrong operation identifier")
1276 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
1277 self.assertEqual(
1278 content["_admin"]["modified"],
1279 content["_admin"]["created"],
1280 "Wrong modification time",
1281 )
1282 self.assertEqual(
1283 content["_admin"]["operationalState"],
1284 "PROCESSING",
1285 "Wrong operational state",
1286 )
1287 self.assertEqual(
1288 content["_admin"]["projects_read"],
1289 [test_pid],
1290 "Wrong read-only projects",
1291 )
1292 self.assertEqual(
1293 content["_admin"]["projects_write"],
1294 [test_pid],
1295 "Wrong read/write projects",
1296 )
1297 self.assertIsNone(
1298 content["_admin"]["current_operation"], "Wrong current operation"
1299 )
1300 self.assertEqual(
1301 len(content["_admin"]["operations"]), 1, "Wrong number of operations"
1302 )
1303 operation = content["_admin"]["operations"][0]
1304 self.assertEqual(
1305 operation["lcmOperationType"], "create", "Wrong operation type"
1306 )
1307 self.assertEqual(
1308 operation["operationState"], "PROCESSING", "Wrong operation state"
1309 )
1310 self.assertGreater(
1311 operation["startTime"],
1312 content["_admin"]["created"],
1313 "Wrong operation start time",
1314 )
1315 self.assertGreater(
1316 operation["statusEnteredTime"],
1317 content["_admin"]["created"],
1318 "Wrong operation status enter time",
1319 )
1320 self.assertEqual(
1321 operation["detailed-status"], "", "Wrong operation detailed status info"
1322 )
1323 self.assertIsNone(
1324 operation["operationParams"], "Wrong operation parameters"
1325 )
1326 # This test is disabled. From Feature 8030 we admit all WIM/SDN types
1327 # with self.subTest(i=2):
1328 # rollback = []
1329 # test_type = "bad_type"
1330 # with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e:
1331 # self.topic.new(rollback, self.fake_session,
1332 # {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
1333 # self.assertEqual(len(rollback), 0, "Wrong rollback length")
1334 # self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
1335 # self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type,""),
1336 # norm(str(e.exception)), "Wrong exception text")
1337
1338 def test_conflict_on_new(self):
1339 with self.subTest(i=1):
1340 rollback = []
1341 test_url = "http://0.0.0.0:0"
1342 test_type = "fake"
1343 self.db.get_one.return_value = {"_id": str(uuid4()), "name": self.test_name}
1344 with self.assertRaises(
1345 EngineException, msg="Accepted existing CIM name"
1346 ) as e:
1347 self.topic.new(
1348 rollback,
1349 self.fake_session,
1350 {
1351 "name": self.test_name,
1352 "wim_url": test_url,
1353 "wim_type": test_type,
1354 },
1355 )
1356 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1357 self.assertEqual(
1358 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1359 )
1360 self.assertIn(
1361 "name '{}' already exists for {}".format(
1362 self.test_name, self.topic.topic
1363 ),
1364 norm(str(e.exception)),
1365 "Wrong exception text",
1366 )
1367
1368 def test_edit_cvws(self):
1369 now = time()
1370 cid = str(uuid4())
1371 test_url = "http://0.0.0.0:0"
1372 test_type = "fake"
1373 cvws = {
1374 "_id": cid,
1375 "name": self.test_name,
1376 "wim_url": test_url,
1377 "wim_type": test_type,
1378 "_admin": {
1379 "created": now,
1380 "modified": now,
1381 "operations": [{"lcmOperationType": "create"}],
1382 },
1383 }
1384 with self.subTest(i=1):
1385 new_name = "new-cim-name"
1386 new_url = "https://1.1.1.1:1"
1387 new_type = "onos"
1388 self.db.get_one.side_effect = [cvws, None]
1389 self.db.replace.return_value = {"updated": 1}
1390 # self.db.encrypt.side_effect = [b64str(), b64str()]
1391 self.topic.edit(
1392 self.fake_session,
1393 cid,
1394 {"name": new_name, "wim_url": new_url, "wim_type": new_type},
1395 )
1396 args = self.db.replace.call_args[0]
1397 content = args[2]
1398 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
1399 self.assertEqual(args[1], cid, "Wrong CIM identifier")
1400 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
1401 self.assertEqual(content["name"], new_name, "Wrong CIM name")
1402 self.assertEqual(content["wim_type"], new_type, "Wrong CIM type")
1403 self.assertEqual(content["wim_url"], new_url, "Wrong URL")
1404 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
1405 self.assertGreater(
1406 content["_admin"]["modified"],
1407 content["_admin"]["created"],
1408 "Wrong modification time",
1409 )
1410 self.assertEqual(
1411 len(content["_admin"]["operations"]), 2, "Wrong number of operations"
1412 )
1413 operation = content["_admin"]["operations"][1]
1414 self.assertEqual(
1415 operation["lcmOperationType"], "edit", "Wrong operation type"
1416 )
1417 self.assertEqual(
1418 operation["operationState"], "PROCESSING", "Wrong operation state"
1419 )
1420 self.assertGreater(
1421 operation["startTime"],
1422 content["_admin"]["modified"],
1423 "Wrong operation start time",
1424 )
1425 self.assertGreater(
1426 operation["statusEnteredTime"],
1427 content["_admin"]["modified"],
1428 "Wrong operation status enter time",
1429 )
1430 self.assertEqual(
1431 operation["detailed-status"], "", "Wrong operation detailed status info"
1432 )
1433 self.assertIsNone(
1434 operation["operationParams"], "Wrong operation parameters"
1435 )
1436 with self.subTest(i=2):
1437 self.db.get_one.side_effect = [cvws]
1438 with self.assertRaises(EngineException, msg="Accepted wrong property") as e:
1439 self.topic.edit(
1440 self.fake_session,
1441 str(uuid4()),
1442 {"name": "new-name", "extra_prop": "anything"},
1443 )
1444 self.assertEqual(
1445 e.exception.http_code,
1446 HTTPStatus.UNPROCESSABLE_ENTITY,
1447 "Wrong HTTP status code",
1448 )
1449 self.assertIn(
1450 "format error '{}'".format(
1451 "additional properties are not allowed ('{}' was unexpected)"
1452 ).format("extra_prop"),
1453 norm(str(e.exception)),
1454 "Wrong exception text",
1455 )
1456
1457 def test_conflict_on_edit(self):
1458 with self.subTest(i=1):
1459 cid = str(uuid4())
1460 new_name = "new-cim-name"
1461 self.db.get_one.side_effect = [
1462 {"_id": cid, "name": self.test_name},
1463 {"_id": str(uuid4()), "name": new_name},
1464 ]
1465 with self.assertRaises(
1466 EngineException, msg="Accepted existing CIM name"
1467 ) as e:
1468 self.topic.edit(self.fake_session, cid, {"name": new_name})
1469 self.assertEqual(
1470 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1471 )
1472 self.assertIn(
1473 "name '{}' already exists for {}".format(new_name, self.topic.topic),
1474 norm(str(e.exception)),
1475 "Wrong exception text",
1476 )
1477
1478 def test_delete_cvws(self):
1479 cid = str(uuid4())
1480 ro_pid = str(uuid4())
1481 rw_pid = str(uuid4())
1482 cvws = {"_id": cid, "name": self.test_name}
1483 self.db.get_list.return_value = []
1484 with self.subTest(i=1):
1485 cvws["_admin"] = {
1486 "projects_read": [test_pid, ro_pid, rw_pid],
1487 "projects_write": [test_pid, rw_pid],
1488 }
1489 self.db.get_one.return_value = cvws
1490 oid = self.topic.delete(self.fake_session, cid)
1491 self.assertIsNone(oid, "Wrong operation identifier")
1492 self.assertEqual(
1493 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1494 )
1495 self.assertEqual(
1496 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1497 )
1498 self.assertEqual(
1499 self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic"
1500 )
1501 self.assertEqual(
1502 self.db.set_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1503 )
1504 self.assertEqual(
1505 self.db.set_one.call_args[1]["update_dict"],
1506 None,
1507 "Wrong read-only projects update",
1508 )
1509 self.assertEqual(
1510 self.db.set_one.call_args[1]["pull_list"],
1511 {
1512 "_admin.projects_read": (test_pid,),
1513 "_admin.projects_write": (test_pid,),
1514 },
1515 "Wrong read/write projects update",
1516 )
1517 self.topic._send_msg.assert_not_called()
1518 with self.subTest(i=2):
1519 now = time()
1520 cvws["_admin"] = {
1521 "projects_read": [test_pid],
1522 "projects_write": [test_pid],
1523 "operations": [],
1524 }
1525 self.db.get_one.return_value = cvws
1526 oid = self.topic.delete(self.fake_session, cid)
1527 self.assertEqual(oid, cid + ":0", "Wrong operation identifier")
1528 self.assertEqual(
1529 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1530 )
1531 self.assertEqual(
1532 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1533 )
1534 self.assertEqual(
1535 self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic"
1536 )
1537 self.assertEqual(
1538 self.db.set_one.call_args[0][1]["_id"], cid, "Wrong user identifier"
1539 )
1540 self.assertEqual(
1541 self.db.set_one.call_args[1]["update_dict"],
1542 {"_admin.to_delete": True},
1543 "Wrong _admin.to_delete update",
1544 )
1545 operation = self.db.set_one.call_args[1]["push"]["_admin.operations"]
1546 self.assertEqual(
1547 operation["lcmOperationType"], "delete", "Wrong operation type"
1548 )
1549 self.assertEqual(
1550 operation["operationState"], "PROCESSING", "Wrong operation state"
1551 )
1552 self.assertEqual(
1553 operation["detailed-status"], "", "Wrong operation detailed status"
1554 )
1555 self.assertIsNone(
1556 operation["operationParams"], "Wrong operation parameters"
1557 )
1558 self.assertGreater(
1559 operation["startTime"], now, "Wrong operation start time"
1560 )
1561 self.assertGreater(
1562 operation["statusEnteredTime"], now, "Wrong operation status enter time"
1563 )
1564 self.topic._send_msg.assert_called_once_with(
1565 "delete", {"_id": cid, "op_id": cid + ":0"}, not_send_msg=None
1566 )
1567 with self.subTest(i=3):
1568 cvws["_admin"] = {
1569 "projects_read": [],
1570 "projects_write": [],
1571 "operations": [],
1572 }
1573 self.db.get_one.return_value = cvws
1574 self.topic._send_msg.reset_mock()
1575 self.db.get_one.reset_mock()
1576 self.db.del_one.reset_mock()
1577 self.fake_session["force"] = True # to force deletion
1578 self.fake_session["admin"] = True # to force deletion
1579 self.fake_session["project_id"] = [] # to force deletion
1580 oid = self.topic.delete(self.fake_session, cid)
1581 self.assertIsNone(oid, "Wrong operation identifier")
1582 self.assertEqual(
1583 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1584 )
1585 self.assertEqual(
1586 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1587 )
1588 self.assertEqual(
1589 self.db.del_one.call_args[0][0], self.topic.topic, "Wrong topic"
1590 )
1591 self.assertEqual(
1592 self.db.del_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1593 )
1594 self.topic._send_msg.assert_called_once_with(
1595 "deleted", {"_id": cid, "op_id": None}, not_send_msg=None
1596 )
1597
1598
1599 if __name__ == "__main__":
1600 unittest.main()