Feature-9904: Enhancing NG-UI to enable Juju operational view dashboard
[osm/NBI.git] / osm_nbi / tests / test_admin_topics.py
1 #! /usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 __author__ = "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com"
18 __date__ = "$2019-10-019"
19
20 import unittest
21 from unittest import TestCase
22 from unittest.mock import Mock, patch, call
23 from uuid import uuid4
24 from http import HTTPStatus
25 from time import time
26 from random import randint
27 from osm_common import dbbase, fsbase, msgbase
28 from osm_nbi import authconn, validation
29 from osm_nbi.admin_topics import (
30 ProjectTopicAuth,
31 RoleTopicAuth,
32 UserTopicAuth,
33 CommonVimWimSdn,
34 VcaTopic,
35 )
36 from osm_nbi.engine import EngineException
37 from osm_nbi.authconn import AuthconnNotFoundException
38
39
40 test_pid = str(uuid4())
41 test_name = "test-user"
42
43
44 def norm(str):
45 """Normalize string for checking"""
46 return " ".join(str.strip().split()).lower()
47
48
49 class TestVcaTopic(TestCase):
50 def setUp(self):
51 self.db = Mock(dbbase.DbBase())
52 self.fs = Mock(fsbase.FsBase())
53 self.msg = Mock(msgbase.MsgBase())
54 self.auth = Mock(authconn.Authconn(None, None, None))
55 self.vca_topic = VcaTopic(self.db, self.fs, self.msg, self.auth)
56
57 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_new")
58 def test_format_on_new(self, mock_super_format_on_new):
59 content = {
60 "_id": "id",
61 "secret": "encrypted_secret",
62 "cacert": "encrypted_cacert",
63 }
64 self.db.encrypt.side_effect = ["secret", "cacert"]
65 mock_super_format_on_new.return_value = "1234"
66
67 oid = self.vca_topic.format_on_new(content)
68
69 self.assertEqual(oid, "1234")
70 self.assertEqual(content["secret"], "secret")
71 self.assertEqual(content["cacert"], "cacert")
72 self.db.encrypt.assert_has_calls(
73 [
74 call("encrypted_secret", schema_version="1.11", salt="id"),
75 call("encrypted_cacert", schema_version="1.11", salt="id"),
76 ]
77 )
78 mock_super_format_on_new.assert_called_with(content, None, False)
79
80 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_edit")
81 def test_format_on_edit(self, mock_super_format_on_edit):
82 edit_content = {
83 "_id": "id",
84 "secret": "encrypted_secret",
85 "cacert": "encrypted_cacert",
86 }
87 final_content = {
88 "_id": "id",
89 "schema_version": "1.11",
90 }
91 self.db.encrypt.side_effect = ["secret", "cacert"]
92 mock_super_format_on_edit.return_value = "1234"
93
94 oid = self.vca_topic.format_on_edit(final_content, edit_content)
95
96 self.assertEqual(oid, "1234")
97 self.assertEqual(final_content["secret"], "secret")
98 self.assertEqual(final_content["cacert"], "cacert")
99 self.db.encrypt.assert_has_calls(
100 [
101 call("encrypted_secret", schema_version="1.11", salt="id"),
102 call("encrypted_cacert", schema_version="1.11", salt="id"),
103 ]
104 )
105 mock_super_format_on_edit.assert_called()
106
107 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
108 def test_check_conflict_on_del(self, mock_check_conflict_on_del):
109 session = {
110 "project_id": "project-id",
111 "force": False,
112 }
113 _id = "vca-id"
114 db_content = {}
115
116 self.db.get_list.return_value = None
117
118 self.vca_topic.check_conflict_on_del(session, _id, db_content)
119
120 self.db.get_list.assert_called_with(
121 "vim_accounts",
122 {"vca": _id, "_admin.projects_read.cont": "project-id"},
123 )
124 mock_check_conflict_on_del.assert_called_with(session, _id, db_content)
125
126 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
127 def test_check_conflict_on_del_force(self, mock_check_conflict_on_del):
128 session = {
129 "project_id": "project-id",
130 "force": True,
131 }
132 _id = "vca-id"
133 db_content = {}
134
135 self.vca_topic.check_conflict_on_del(session, _id, db_content)
136
137 self.db.get_list.assert_not_called()
138 mock_check_conflict_on_del.assert_not_called()
139
140 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
141 def test_check_conflict_on_del_with_conflict(self, mock_check_conflict_on_del):
142 session = {
143 "project_id": "project-id",
144 "force": False,
145 }
146 _id = "vca-id"
147 db_content = {}
148
149 self.db.get_list.return_value = {"_id": "vim", "vca": "vca-id"}
150
151 with self.assertRaises(EngineException) as context:
152 self.vca_topic.check_conflict_on_del(session, _id, db_content)
153 self.assertEqual(
154 context.exception,
155 EngineException(
156 "There is at least one VIM account using this vca",
157 http_code=HTTPStatus.CONFLICT,
158 ),
159 )
160
161 self.db.get_list.assert_called_with(
162 "vim_accounts",
163 {"vca": _id, "_admin.projects_read.cont": "project-id"},
164 )
165 mock_check_conflict_on_del.assert_not_called()
166
167
168 class Test_ProjectTopicAuth(TestCase):
169 @classmethod
170 def setUpClass(cls):
171 cls.test_name = "test-project-topic"
172
173 def setUp(self):
174 self.db = Mock(dbbase.DbBase())
175 self.fs = Mock(fsbase.FsBase())
176 self.msg = Mock(msgbase.MsgBase())
177 self.auth = Mock(authconn.Authconn(None, None, None))
178 self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth)
179 self.fake_session = {
180 "username": self.test_name,
181 "project_id": (test_pid,),
182 "method": None,
183 "admin": True,
184 "force": False,
185 "public": False,
186 "allow_show_user_project_role": True,
187 }
188 self.topic.check_quota = Mock(return_value=None) # skip quota
189
190 def test_new_project(self):
191 with self.subTest(i=1):
192 rollback = []
193 pid1 = str(uuid4())
194 self.auth.get_project_list.return_value = []
195 self.auth.create_project.return_value = pid1
196 pid2, oid = self.topic.new(
197 rollback, self.fake_session, {"name": self.test_name, "quotas": {}}
198 )
199 self.assertEqual(len(rollback), 1, "Wrong rollback length")
200 self.assertEqual(pid2, pid1, "Wrong project identifier")
201 content = self.auth.create_project.call_args[0][0]
202 self.assertEqual(content["name"], self.test_name, "Wrong project name")
203 self.assertEqual(content["quotas"], {}, "Wrong quotas")
204 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
205 self.assertEqual(
206 content["_admin"]["modified"],
207 content["_admin"]["created"],
208 "Wrong modification time",
209 )
210 with self.subTest(i=2):
211 rollback = []
212 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
213 self.topic.new(
214 rollback,
215 self.fake_session,
216 {"name": "other-project-name", "quotas": {"baditems": 10}},
217 )
218 self.assertEqual(len(rollback), 0, "Wrong rollback length")
219 self.assertEqual(
220 e.exception.http_code,
221 HTTPStatus.UNPROCESSABLE_ENTITY,
222 "Wrong HTTP status code",
223 )
224 self.assertIn(
225 "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format(
226 "baditems"
227 ),
228 norm(str(e.exception)),
229 "Wrong exception text",
230 )
231
232 def test_edit_project(self):
233 now = time()
234 pid = str(uuid4())
235 proj = {
236 "_id": pid,
237 "name": self.test_name,
238 "_admin": {"created": now, "modified": now},
239 }
240 with self.subTest(i=1):
241 self.auth.get_project_list.side_effect = [[proj], []]
242 new_name = "new-project-name"
243 quotas = {"vnfds": randint(0, 100), "nsds": randint(0, 100)}
244 self.topic.edit(
245 self.fake_session, pid, {"name": new_name, "quotas": quotas}
246 )
247 _id, content = self.auth.update_project.call_args[0]
248 self.assertEqual(_id, pid, "Wrong project identifier")
249 self.assertEqual(content["_id"], pid, "Wrong project identifier")
250 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
251 self.assertGreater(
252 content["_admin"]["modified"], now, "Wrong modification time"
253 )
254 self.assertEqual(content["name"], new_name, "Wrong project name")
255 self.assertEqual(content["quotas"], quotas, "Wrong quotas")
256 with self.subTest(i=2):
257 new_name = "other-project-name"
258 quotas = {"baditems": randint(0, 100)}
259 self.auth.get_project_list.side_effect = [[proj], []]
260 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
261 self.topic.edit(
262 self.fake_session, pid, {"name": new_name, "quotas": quotas}
263 )
264 self.assertEqual(
265 e.exception.http_code,
266 HTTPStatus.UNPROCESSABLE_ENTITY,
267 "Wrong HTTP status code",
268 )
269 self.assertIn(
270 "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format(
271 "baditems"
272 ),
273 norm(str(e.exception)),
274 "Wrong exception text",
275 )
276
277 def test_conflict_on_new(self):
278 with self.subTest(i=1):
279 rollback = []
280 pid = str(uuid4())
281 with self.assertRaises(
282 EngineException, msg="Accepted uuid as project name"
283 ) as e:
284 self.topic.new(rollback, self.fake_session, {"name": pid})
285 self.assertEqual(len(rollback), 0, "Wrong rollback length")
286 self.assertEqual(
287 e.exception.http_code,
288 HTTPStatus.UNPROCESSABLE_ENTITY,
289 "Wrong HTTP status code",
290 )
291 self.assertIn(
292 "project name '{}' cannot have an uuid format".format(pid),
293 norm(str(e.exception)),
294 "Wrong exception text",
295 )
296 with self.subTest(i=2):
297 rollback = []
298 self.auth.get_project_list.return_value = [
299 {"_id": test_pid, "name": self.test_name}
300 ]
301 with self.assertRaises(
302 EngineException, msg="Accepted existing project name"
303 ) as e:
304 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
305 self.assertEqual(len(rollback), 0, "Wrong rollback length")
306 self.assertEqual(
307 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
308 )
309 self.assertIn(
310 "project '{}' exists".format(self.test_name),
311 norm(str(e.exception)),
312 "Wrong exception text",
313 )
314
315 def test_conflict_on_edit(self):
316 with self.subTest(i=1):
317 self.auth.get_project_list.return_value = [
318 {"_id": test_pid, "name": self.test_name}
319 ]
320 new_name = str(uuid4())
321 with self.assertRaises(
322 EngineException, msg="Accepted uuid as project name"
323 ) as e:
324 self.topic.edit(self.fake_session, test_pid, {"name": new_name})
325 self.assertEqual(
326 e.exception.http_code,
327 HTTPStatus.UNPROCESSABLE_ENTITY,
328 "Wrong HTTP status code",
329 )
330 self.assertIn(
331 "project name '{}' cannot have an uuid format".format(new_name),
332 norm(str(e.exception)),
333 "Wrong exception text",
334 )
335 with self.subTest(i=2):
336 pid = str(uuid4())
337 self.auth.get_project_list.return_value = [{"_id": pid, "name": "admin"}]
338 with self.assertRaises(
339 EngineException, msg="Accepted renaming of project 'admin'"
340 ) as e:
341 self.topic.edit(self.fake_session, pid, {"name": "new-name"})
342 self.assertEqual(
343 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
344 )
345 self.assertIn(
346 "you cannot rename project 'admin'",
347 norm(str(e.exception)),
348 "Wrong exception text",
349 )
350 with self.subTest(i=3):
351 new_name = "new-project-name"
352 self.auth.get_project_list.side_effect = [
353 [{"_id": test_pid, "name": self.test_name}],
354 [{"_id": str(uuid4()), "name": new_name}],
355 ]
356 with self.assertRaises(
357 EngineException, msg="Accepted existing project name"
358 ) as e:
359 self.topic.edit(self.fake_session, pid, {"name": new_name})
360 self.assertEqual(
361 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
362 )
363 self.assertIn(
364 "project '{}' is already used".format(new_name),
365 norm(str(e.exception)),
366 "Wrong exception text",
367 )
368
369 def test_delete_project(self):
370 with self.subTest(i=1):
371 pid = str(uuid4())
372 self.auth.get_project.return_value = {
373 "_id": pid,
374 "name": "other-project-name",
375 }
376 self.auth.delete_project.return_value = {"deleted": 1}
377 self.auth.get_user_list.return_value = []
378 self.db.get_list.return_value = []
379 rc = self.topic.delete(self.fake_session, pid)
380 self.assertEqual(rc, {"deleted": 1}, "Wrong project deletion return info")
381 self.assertEqual(
382 self.auth.get_project.call_args[0][0], pid, "Wrong project identifier"
383 )
384 self.assertEqual(
385 self.auth.delete_project.call_args[0][0],
386 pid,
387 "Wrong project identifier",
388 )
389
390 def test_conflict_on_del(self):
391 with self.subTest(i=1):
392 self.auth.get_project.return_value = {
393 "_id": test_pid,
394 "name": self.test_name,
395 }
396 with self.assertRaises(
397 EngineException, msg="Accepted deletion of own project"
398 ) as e:
399 self.topic.delete(self.fake_session, self.test_name)
400 self.assertEqual(
401 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
402 )
403 self.assertIn(
404 "you cannot delete your own project",
405 norm(str(e.exception)),
406 "Wrong exception text",
407 )
408 with self.subTest(i=2):
409 self.auth.get_project.return_value = {"_id": str(uuid4()), "name": "admin"}
410 with self.assertRaises(
411 EngineException, msg="Accepted deletion of project 'admin'"
412 ) as e:
413 self.topic.delete(self.fake_session, "admin")
414 self.assertEqual(
415 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
416 )
417 self.assertIn(
418 "you cannot delete project 'admin'",
419 norm(str(e.exception)),
420 "Wrong exception text",
421 )
422 with self.subTest(i=3):
423 pid = str(uuid4())
424 name = "other-project-name"
425 self.auth.get_project.return_value = {"_id": pid, "name": name}
426 self.auth.get_user_list.return_value = [
427 {
428 "_id": str(uuid4()),
429 "username": self.test_name,
430 "project_role_mappings": [{"project": pid, "role": str(uuid4())}],
431 }
432 ]
433 with self.assertRaises(
434 EngineException, msg="Accepted deletion of used project"
435 ) as e:
436 self.topic.delete(self.fake_session, pid)
437 self.assertEqual(
438 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
439 )
440 self.assertIn(
441 "project '{}' ({}) is being used by user '{}'".format(
442 name, pid, self.test_name
443 ),
444 norm(str(e.exception)),
445 "Wrong exception text",
446 )
447 with self.subTest(i=4):
448 self.auth.get_user_list.return_value = []
449 self.db.get_list.return_value = [
450 {
451 "_id": str(uuid4()),
452 "id": self.test_name,
453 "_admin": {"projects_read": [pid], "projects_write": []},
454 }
455 ]
456 with self.assertRaises(
457 EngineException, msg="Accepted deletion of used project"
458 ) as e:
459 self.topic.delete(self.fake_session, pid)
460 self.assertEqual(
461 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
462 )
463 self.assertIn(
464 "project '{}' ({}) is being used by {} '{}'".format(
465 name, pid, "vnf descriptor", self.test_name
466 ),
467 norm(str(e.exception)),
468 "Wrong exception text",
469 )
470
471
472 class Test_RoleTopicAuth(TestCase):
473 @classmethod
474 def setUpClass(cls):
475 cls.test_name = "test-role-topic"
476 cls.test_operations = ["tokens:get"]
477
478 def setUp(self):
479 self.db = Mock(dbbase.DbBase())
480 self.fs = Mock(fsbase.FsBase())
481 self.msg = Mock(msgbase.MsgBase())
482 self.auth = Mock(authconn.Authconn(None, None, None))
483 self.auth.role_permissions = self.test_operations
484 self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth)
485 self.fake_session = {
486 "username": test_name,
487 "project_id": (test_pid,),
488 "method": None,
489 "admin": True,
490 "force": False,
491 "public": False,
492 "allow_show_user_project_role": True,
493 }
494 self.topic.check_quota = Mock(return_value=None) # skip quota
495
496 def test_new_role(self):
497 with self.subTest(i=1):
498 rollback = []
499 rid1 = str(uuid4())
500 perms_in = {"tokens": True}
501 perms_out = {"default": False, "admin": False, "tokens": True}
502 self.auth.get_role_list.return_value = []
503 self.auth.create_role.return_value = rid1
504 rid2, oid = self.topic.new(
505 rollback,
506 self.fake_session,
507 {"name": self.test_name, "permissions": perms_in},
508 )
509 self.assertEqual(len(rollback), 1, "Wrong rollback length")
510 self.assertEqual(rid2, rid1, "Wrong project identifier")
511 content = self.auth.create_role.call_args[0][0]
512 self.assertEqual(content["name"], self.test_name, "Wrong role name")
513 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
514 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
515 self.assertEqual(
516 content["_admin"]["modified"],
517 content["_admin"]["created"],
518 "Wrong modification time",
519 )
520 with self.subTest(i=2):
521 rollback = []
522 with self.assertRaises(
523 EngineException, msg="Accepted wrong permissions"
524 ) as e:
525 self.topic.new(
526 rollback,
527 self.fake_session,
528 {"name": "other-role-name", "permissions": {"projects": True}},
529 )
530 self.assertEqual(len(rollback), 0, "Wrong rollback length")
531 self.assertEqual(
532 e.exception.http_code,
533 HTTPStatus.UNPROCESSABLE_ENTITY,
534 "Wrong HTTP status code",
535 )
536 self.assertIn(
537 "invalid permission '{}'".format("projects"),
538 norm(str(e.exception)),
539 "Wrong exception text",
540 )
541
542 def test_edit_role(self):
543 now = time()
544 rid = str(uuid4())
545 role = {
546 "_id": rid,
547 "name": self.test_name,
548 "permissions": {"tokens": True},
549 "_admin": {"created": now, "modified": now},
550 }
551 with self.subTest(i=1):
552 self.auth.get_role_list.side_effect = [[role], []]
553 self.auth.get_role.return_value = role
554 new_name = "new-role-name"
555 perms_in = {"tokens": False, "tokens:get": True}
556 perms_out = {
557 "default": False,
558 "admin": False,
559 "tokens": False,
560 "tokens:get": True,
561 }
562 self.topic.edit(
563 self.fake_session, rid, {"name": new_name, "permissions": perms_in}
564 )
565 content = self.auth.update_role.call_args[0][0]
566 self.assertEqual(content["_id"], rid, "Wrong role identifier")
567 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
568 self.assertGreater(
569 content["_admin"]["modified"], now, "Wrong modification time"
570 )
571 self.assertEqual(content["name"], new_name, "Wrong role name")
572 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
573 with self.subTest(i=2):
574 new_name = "other-role-name"
575 perms_in = {"tokens": False, "tokens:post": True}
576 self.auth.get_role_list.side_effect = [[role], []]
577 with self.assertRaises(
578 EngineException, msg="Accepted wrong permissions"
579 ) as e:
580 self.topic.edit(
581 self.fake_session, rid, {"name": new_name, "permissions": perms_in}
582 )
583 self.assertEqual(
584 e.exception.http_code,
585 HTTPStatus.UNPROCESSABLE_ENTITY,
586 "Wrong HTTP status code",
587 )
588 self.assertIn(
589 "invalid permission '{}'".format("tokens:post"),
590 norm(str(e.exception)),
591 "Wrong exception text",
592 )
593
594 def test_delete_role(self):
595 with self.subTest(i=1):
596 rid = str(uuid4())
597 role = {"_id": rid, "name": "other-role-name"}
598 self.auth.get_role_list.return_value = [role]
599 self.auth.get_role.return_value = role
600 self.auth.delete_role.return_value = {"deleted": 1}
601 self.auth.get_user_list.return_value = []
602 rc = self.topic.delete(self.fake_session, rid)
603 self.assertEqual(rc, {"deleted": 1}, "Wrong role deletion return info")
604 self.assertEqual(
605 self.auth.get_role_list.call_args[0][0]["_id"],
606 rid,
607 "Wrong role identifier",
608 )
609 self.assertEqual(
610 self.auth.get_role.call_args[0][0], rid, "Wrong role identifier"
611 )
612 self.assertEqual(
613 self.auth.delete_role.call_args[0][0], rid, "Wrong role identifier"
614 )
615
616 def test_conflict_on_new(self):
617 with self.subTest(i=1):
618 rollback = []
619 rid = str(uuid4())
620 with self.assertRaises(
621 EngineException, msg="Accepted uuid as role name"
622 ) as e:
623 self.topic.new(rollback, self.fake_session, {"name": rid})
624 self.assertEqual(len(rollback), 0, "Wrong rollback length")
625 self.assertEqual(
626 e.exception.http_code,
627 HTTPStatus.UNPROCESSABLE_ENTITY,
628 "Wrong HTTP status code",
629 )
630 self.assertIn(
631 "role name '{}' cannot have an uuid format".format(rid),
632 norm(str(e.exception)),
633 "Wrong exception text",
634 )
635 with self.subTest(i=2):
636 rollback = []
637 self.auth.get_role_list.return_value = [
638 {"_id": str(uuid4()), "name": self.test_name}
639 ]
640 with self.assertRaises(
641 EngineException, msg="Accepted existing role name"
642 ) as e:
643 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
644 self.assertEqual(len(rollback), 0, "Wrong rollback length")
645 self.assertEqual(
646 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
647 )
648 self.assertIn(
649 "role name '{}' exists".format(self.test_name),
650 norm(str(e.exception)),
651 "Wrong exception text",
652 )
653
654 def test_conflict_on_edit(self):
655 rid = str(uuid4())
656 with self.subTest(i=1):
657 self.auth.get_role_list.return_value = [
658 {"_id": rid, "name": self.test_name, "permissions": {}}
659 ]
660 new_name = str(uuid4())
661 with self.assertRaises(
662 EngineException, msg="Accepted uuid as role name"
663 ) as e:
664 self.topic.edit(self.fake_session, rid, {"name": new_name})
665 self.assertEqual(
666 e.exception.http_code,
667 HTTPStatus.UNPROCESSABLE_ENTITY,
668 "Wrong HTTP status code",
669 )
670 self.assertIn(
671 "role name '{}' cannot have an uuid format".format(new_name),
672 norm(str(e.exception)),
673 "Wrong exception text",
674 )
675 for i, role_name in enumerate(["system_admin", "project_admin"], start=2):
676 with self.subTest(i=i):
677 rid = str(uuid4())
678 self.auth.get_role.return_value = {
679 "_id": rid,
680 "name": role_name,
681 "permissions": {},
682 }
683 with self.assertRaises(
684 EngineException,
685 msg="Accepted renaming of role '{}'".format(role_name),
686 ) as e:
687 self.topic.edit(self.fake_session, rid, {"name": "new-name"})
688 self.assertEqual(
689 e.exception.http_code,
690 HTTPStatus.FORBIDDEN,
691 "Wrong HTTP status code",
692 )
693 self.assertIn(
694 "you cannot rename role '{}'".format(role_name),
695 norm(str(e.exception)),
696 "Wrong exception text",
697 )
698 with self.subTest(i=i + 1):
699 new_name = "new-role-name"
700 self.auth.get_role_list.side_effect = [
701 [{"_id": rid, "name": self.test_name, "permissions": {}}],
702 [{"_id": str(uuid4()), "name": new_name, "permissions": {}}],
703 ]
704 self.auth.get_role.return_value = {
705 "_id": rid,
706 "name": self.test_name,
707 "permissions": {},
708 }
709 with self.assertRaises(
710 EngineException, msg="Accepted existing role name"
711 ) as e:
712 self.topic.edit(self.fake_session, rid, {"name": new_name})
713 self.assertEqual(
714 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
715 )
716 self.assertIn(
717 "role name '{}' exists".format(new_name),
718 norm(str(e.exception)),
719 "Wrong exception text",
720 )
721
722 def test_conflict_on_del(self):
723 for i, role_name in enumerate(["system_admin", "project_admin"], start=1):
724 with self.subTest(i=i):
725 rid = str(uuid4())
726 role = {"_id": rid, "name": role_name}
727 self.auth.get_role_list.return_value = [role]
728 self.auth.get_role.return_value = role
729 with self.assertRaises(
730 EngineException,
731 msg="Accepted deletion of role '{}'".format(role_name),
732 ) as e:
733 self.topic.delete(self.fake_session, rid)
734 self.assertEqual(
735 e.exception.http_code,
736 HTTPStatus.FORBIDDEN,
737 "Wrong HTTP status code",
738 )
739 self.assertIn(
740 "you cannot delete role '{}'".format(role_name),
741 norm(str(e.exception)),
742 "Wrong exception text",
743 )
744 with self.subTest(i=i + 1):
745 rid = str(uuid4())
746 name = "other-role-name"
747 role = {"_id": rid, "name": name}
748 self.auth.get_role_list.return_value = [role]
749 self.auth.get_role.return_value = role
750 self.auth.get_user_list.return_value = [
751 {
752 "_id": str(uuid4()),
753 "username": self.test_name,
754 "project_role_mappings": [{"project": str(uuid4()), "role": rid}],
755 }
756 ]
757 with self.assertRaises(
758 EngineException, msg="Accepted deletion of used role"
759 ) as e:
760 self.topic.delete(self.fake_session, rid)
761 self.assertEqual(
762 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
763 )
764 self.assertIn(
765 "role '{}' ({}) is being used by user '{}'".format(
766 name, rid, self.test_name
767 ),
768 norm(str(e.exception)),
769 "Wrong exception text",
770 )
771
772
773 class Test_UserTopicAuth(TestCase):
774 @classmethod
775 def setUpClass(cls):
776 cls.test_name = "test-user-topic"
777
778 def setUp(self):
779 self.db = Mock(dbbase.DbBase())
780 self.fs = Mock(fsbase.FsBase())
781 self.msg = Mock(msgbase.MsgBase())
782 self.auth = Mock(authconn.Authconn(None, None, None))
783 self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth)
784 self.fake_session = {
785 "username": test_name,
786 "project_id": (test_pid,),
787 "method": None,
788 "admin": True,
789 "force": False,
790 "public": False,
791 "allow_show_user_project_role": True,
792 }
793 self.topic.check_quota = Mock(return_value=None) # skip quota
794
795 def test_new_user(self):
796 uid1 = str(uuid4())
797 pid = str(uuid4())
798 self.auth.get_user_list.return_value = []
799 self.auth.get_project.return_value = {"_id": pid, "name": "some_project"}
800 self.auth.create_user.return_value = {"_id": uid1, "username": self.test_name}
801 with self.subTest(i=1):
802 rollback = []
803 rid = str(uuid4())
804 self.auth.get_role.return_value = {"_id": rid, "name": "some_role"}
805 prms_in = [{"project": "some_project", "role": "some_role"}]
806 prms_out = [{"project": pid, "role": rid}]
807 uid2, oid = self.topic.new(
808 rollback,
809 self.fake_session,
810 {
811 "username": self.test_name,
812 "password": self.test_name,
813 "project_role_mappings": prms_in,
814 },
815 )
816 self.assertEqual(len(rollback), 1, "Wrong rollback length")
817 self.assertEqual(uid2, uid1, "Wrong project identifier")
818 content = self.auth.create_user.call_args[0][0]
819 self.assertEqual(content["username"], self.test_name, "Wrong project name")
820 self.assertEqual(content["password"], self.test_name, "Wrong password")
821 self.assertEqual(
822 content["project_role_mappings"],
823 prms_out,
824 "Wrong project-role mappings",
825 )
826 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
827 self.assertEqual(
828 content["_admin"]["modified"],
829 content["_admin"]["created"],
830 "Wrong modification time",
831 )
832 with self.subTest(i=2):
833 rollback = []
834 def_rid = str(uuid4())
835 def_role = {"_id": def_rid, "name": "project_admin"}
836 self.auth.get_role.return_value = def_role
837 self.auth.get_role_list.return_value = [def_role]
838 prms_out = [{"project": pid, "role": def_rid}]
839 uid2, oid = self.topic.new(
840 rollback,
841 self.fake_session,
842 {
843 "username": self.test_name,
844 "password": self.test_name,
845 "projects": ["some_project"],
846 },
847 )
848 self.assertEqual(len(rollback), 1, "Wrong rollback length")
849 self.assertEqual(uid2, uid1, "Wrong project identifier")
850 content = self.auth.create_user.call_args[0][0]
851 self.assertEqual(content["username"], self.test_name, "Wrong project name")
852 self.assertEqual(content["password"], self.test_name, "Wrong password")
853 self.assertEqual(
854 content["project_role_mappings"],
855 prms_out,
856 "Wrong project-role mappings",
857 )
858 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
859 self.assertEqual(
860 content["_admin"]["modified"],
861 content["_admin"]["created"],
862 "Wrong modification time",
863 )
864 with self.subTest(i=3):
865 rollback = []
866 with self.assertRaises(
867 EngineException, msg="Accepted wrong project-role mappings"
868 ) as e:
869 self.topic.new(
870 rollback,
871 self.fake_session,
872 {
873 "username": "other-project-name",
874 "password": "other-password",
875 "project_role_mappings": [{}],
876 },
877 )
878 self.assertEqual(len(rollback), 0, "Wrong rollback length")
879 self.assertEqual(
880 e.exception.http_code,
881 HTTPStatus.UNPROCESSABLE_ENTITY,
882 "Wrong HTTP status code",
883 )
884 self.assertIn(
885 "format error at '{}' '{}'".format(
886 "project_role_mappings:{}", "'{}' is a required property"
887 ).format(0, "project"),
888 norm(str(e.exception)),
889 "Wrong exception text",
890 )
891 with self.subTest(i=4):
892 rollback = []
893 with self.assertRaises(EngineException, msg="Accepted wrong projects") as e:
894 self.topic.new(
895 rollback,
896 self.fake_session,
897 {
898 "username": "other-project-name",
899 "password": "other-password",
900 "projects": [],
901 },
902 )
903 self.assertEqual(len(rollback), 0, "Wrong rollback length")
904 self.assertEqual(
905 e.exception.http_code,
906 HTTPStatus.UNPROCESSABLE_ENTITY,
907 "Wrong HTTP status code",
908 )
909 self.assertIn(
910 "format error at '{}' '{}'".format(
911 "projects", "{} is too short"
912 ).format([]),
913 norm(str(e.exception)),
914 "Wrong exception text",
915 )
916
917 def test_edit_user(self):
918 now = time()
919 uid = str(uuid4())
920 pid1 = str(uuid4())
921 rid1 = str(uuid4())
922 prms = [
923 {
924 "project": pid1,
925 "project_name": "project-1",
926 "role": rid1,
927 "role_name": "role-1",
928 }
929 ]
930 user = {
931 "_id": uid,
932 "username": self.test_name,
933 "project_role_mappings": prms,
934 "_admin": {"created": now, "modified": now},
935 }
936 with self.subTest(i=1):
937 self.auth.get_user_list.side_effect = [[user], []]
938 self.auth.get_user.return_value = user
939 pid2 = str(uuid4())
940 rid2 = str(uuid4())
941 self.auth.get_project.side_effect = [
942 {"_id": pid2, "name": "project-2"},
943 {"_id": pid1, "name": "project-1"},
944 ]
945 self.auth.get_role.side_effect = [
946 {"_id": rid2, "name": "role-2"},
947 {"_id": rid1, "name": "role-1"},
948 ]
949 new_name = "new-user-name"
950 new_pasw = "new-password"
951 add_prms = [{"project": pid2, "role": rid2}]
952 rem_prms = [{"project": pid1, "role": rid1}]
953 self.topic.edit(
954 self.fake_session,
955 uid,
956 {
957 "username": new_name,
958 "password": new_pasw,
959 "add_project_role_mappings": add_prms,
960 "remove_project_role_mappings": rem_prms,
961 },
962 )
963 content = self.auth.update_user.call_args[0][0]
964 self.assertEqual(content["_id"], uid, "Wrong user identifier")
965 self.assertEqual(content["username"], new_name, "Wrong user name")
966 self.assertEqual(content["password"], new_pasw, "Wrong user password")
967 self.assertEqual(
968 content["add_project_role_mappings"],
969 add_prms,
970 "Wrong project-role mappings to add",
971 )
972 self.assertEqual(
973 content["remove_project_role_mappings"],
974 prms,
975 "Wrong project-role mappings to remove",
976 )
977 with self.subTest(i=2):
978 new_name = "other-user-name"
979 new_prms = [{}]
980 self.auth.get_role_list.side_effect = [[user], []]
981 self.auth.get_user_list.side_effect = [[user]]
982 with self.assertRaises(
983 EngineException, msg="Accepted wrong project-role mappings"
984 ) as e:
985 self.topic.edit(
986 self.fake_session,
987 uid,
988 {"username": new_name, "project_role_mappings": new_prms},
989 )
990 self.assertEqual(
991 e.exception.http_code,
992 HTTPStatus.UNPROCESSABLE_ENTITY,
993 "Wrong HTTP status code",
994 )
995 self.assertIn(
996 "format error at '{}' '{}'".format(
997 "project_role_mappings:{}", "'{}' is a required property"
998 ).format(0, "project"),
999 norm(str(e.exception)),
1000 "Wrong exception text",
1001 )
1002
1003 def test_delete_user(self):
1004 with self.subTest(i=1):
1005 uid = str(uuid4())
1006 self.fake_session["username"] = self.test_name
1007 user = user = {
1008 "_id": uid,
1009 "username": "other-user-name",
1010 "project_role_mappings": [],
1011 }
1012 self.auth.get_user.return_value = user
1013 self.auth.delete_user.return_value = {"deleted": 1}
1014 rc = self.topic.delete(self.fake_session, uid)
1015 self.assertEqual(rc, {"deleted": 1}, "Wrong user deletion return info")
1016 self.assertEqual(
1017 self.auth.get_user.call_args[0][0], uid, "Wrong user identifier"
1018 )
1019 self.assertEqual(
1020 self.auth.delete_user.call_args[0][0], uid, "Wrong user identifier"
1021 )
1022
1023 def test_conflict_on_new(self):
1024 with self.subTest(i=1):
1025 rollback = []
1026 uid = str(uuid4())
1027 with self.assertRaises(
1028 EngineException, msg="Accepted uuid as username"
1029 ) as e:
1030 self.topic.new(
1031 rollback,
1032 self.fake_session,
1033 {
1034 "username": uid,
1035 "password": self.test_name,
1036 "projects": [test_pid],
1037 },
1038 )
1039 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1040 self.assertEqual(
1041 e.exception.http_code,
1042 HTTPStatus.UNPROCESSABLE_ENTITY,
1043 "Wrong HTTP status code",
1044 )
1045 self.assertIn(
1046 "username '{}' cannot have a uuid format".format(uid),
1047 norm(str(e.exception)),
1048 "Wrong exception text",
1049 )
1050 with self.subTest(i=2):
1051 rollback = []
1052 self.auth.get_user_list.return_value = [
1053 {"_id": str(uuid4()), "username": self.test_name}
1054 ]
1055 with self.assertRaises(
1056 EngineException, msg="Accepted existing username"
1057 ) as e:
1058 self.topic.new(
1059 rollback,
1060 self.fake_session,
1061 {
1062 "username": self.test_name,
1063 "password": self.test_name,
1064 "projects": [test_pid],
1065 },
1066 )
1067 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1068 self.assertEqual(
1069 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1070 )
1071 self.assertIn(
1072 "username '{}' is already used".format(self.test_name),
1073 norm(str(e.exception)),
1074 "Wrong exception text",
1075 )
1076 with self.subTest(i=3):
1077 rollback = []
1078 self.auth.get_user_list.return_value = []
1079 self.auth.get_role_list.side_effect = [[], []]
1080 with self.assertRaises(
1081 AuthconnNotFoundException, msg="Accepted user without default role"
1082 ) as e:
1083 self.topic.new(
1084 rollback,
1085 self.fake_session,
1086 {
1087 "username": self.test_name,
1088 "password": self.test_name,
1089 "projects": [str(uuid4())],
1090 },
1091 )
1092 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1093 self.assertEqual(
1094 e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code"
1095 )
1096 self.assertIn(
1097 "can't find default role for user '{}'".format(self.test_name),
1098 norm(str(e.exception)),
1099 "Wrong exception text",
1100 )
1101
1102 def test_conflict_on_edit(self):
1103 uid = str(uuid4())
1104 with self.subTest(i=1):
1105 self.auth.get_user_list.return_value = [
1106 {"_id": uid, "username": self.test_name}
1107 ]
1108 new_name = str(uuid4())
1109 with self.assertRaises(
1110 EngineException, msg="Accepted uuid as username"
1111 ) as e:
1112 self.topic.edit(self.fake_session, uid, {"username": new_name})
1113 self.assertEqual(
1114 e.exception.http_code,
1115 HTTPStatus.UNPROCESSABLE_ENTITY,
1116 "Wrong HTTP status code",
1117 )
1118 self.assertIn(
1119 "username '{}' cannot have an uuid format".format(new_name),
1120 norm(str(e.exception)),
1121 "Wrong exception text",
1122 )
1123 with self.subTest(i=2):
1124 self.auth.get_user_list.return_value = [
1125 {"_id": uid, "username": self.test_name}
1126 ]
1127 self.auth.get_role_list.side_effect = [[], []]
1128 with self.assertRaises(
1129 AuthconnNotFoundException, msg="Accepted user without default role"
1130 ) as e:
1131 self.topic.edit(self.fake_session, uid, {"projects": [str(uuid4())]})
1132 self.assertEqual(
1133 e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code"
1134 )
1135 self.assertIn(
1136 "can't find a default role for user '{}'".format(self.test_name),
1137 norm(str(e.exception)),
1138 "Wrong exception text",
1139 )
1140 with self.subTest(i=3):
1141 admin_uid = str(uuid4())
1142 self.auth.get_user_list.return_value = [
1143 {"_id": admin_uid, "username": "admin"}
1144 ]
1145 with self.assertRaises(
1146 EngineException,
1147 msg="Accepted removing system_admin role from admin user",
1148 ) as e:
1149 self.topic.edit(
1150 self.fake_session,
1151 admin_uid,
1152 {
1153 "remove_project_role_mappings": [
1154 {"project": "admin", "role": "system_admin"}
1155 ]
1156 },
1157 )
1158 self.assertEqual(
1159 e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code"
1160 )
1161 self.assertIn(
1162 "you cannot remove system_admin role from admin user",
1163 norm(str(e.exception)),
1164 "Wrong exception text",
1165 )
1166 with self.subTest(i=4):
1167 new_name = "new-user-name"
1168 self.auth.get_user_list.side_effect = [
1169 [{"_id": uid, "name": self.test_name}],
1170 [{"_id": str(uuid4()), "name": new_name}],
1171 ]
1172 with self.assertRaises(
1173 EngineException, msg="Accepted existing username"
1174 ) as e:
1175 self.topic.edit(self.fake_session, uid, {"username": new_name})
1176 self.assertEqual(
1177 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1178 )
1179 self.assertIn(
1180 "username '{}' is already used".format(new_name),
1181 norm(str(e.exception)),
1182 "Wrong exception text",
1183 )
1184
1185 def test_conflict_on_del(self):
1186 with self.subTest(i=1):
1187 uid = str(uuid4())
1188 self.fake_session["username"] = self.test_name
1189 user = user = {
1190 "_id": uid,
1191 "username": self.test_name,
1192 "project_role_mappings": [],
1193 }
1194 self.auth.get_user.return_value = user
1195 with self.assertRaises(
1196 EngineException, msg="Accepted deletion of own user"
1197 ) as e:
1198 self.topic.delete(self.fake_session, uid)
1199 self.assertEqual(
1200 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1201 )
1202 self.assertIn(
1203 "you cannot delete your own login user",
1204 norm(str(e.exception)),
1205 "Wrong exception text",
1206 )
1207
1208
1209 class Test_CommonVimWimSdn(TestCase):
1210 @classmethod
1211 def setUpClass(cls):
1212 cls.test_name = "test-cim-topic" # CIM = Common Infrastructure Manager
1213
1214 def setUp(self):
1215 self.db = Mock(dbbase.DbBase())
1216 self.fs = Mock(fsbase.FsBase())
1217 self.msg = Mock(msgbase.MsgBase())
1218 self.auth = Mock(authconn.Authconn(None, None, None))
1219 self.topic = CommonVimWimSdn(self.db, self.fs, self.msg, self.auth)
1220 # Use WIM schemas for testing because they are the simplest
1221 self.topic._send_msg = Mock()
1222 self.topic.topic = "wims"
1223 self.topic.schema_new = validation.wim_account_new_schema
1224 self.topic.schema_edit = validation.wim_account_edit_schema
1225 self.fake_session = {
1226 "username": test_name,
1227 "project_id": (test_pid,),
1228 "method": None,
1229 "admin": True,
1230 "force": False,
1231 "public": False,
1232 "allow_show_user_project_role": True,
1233 }
1234 self.topic.check_quota = Mock(return_value=None) # skip quota
1235
1236 def test_new_cvws(self):
1237 test_url = "http://0.0.0.0:0"
1238 with self.subTest(i=1):
1239 rollback = []
1240 test_type = "fake"
1241 self.db.get_one.return_value = None
1242 self.db.create.side_effect = lambda self, content: content["_id"]
1243 cid, oid = self.topic.new(
1244 rollback,
1245 self.fake_session,
1246 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type},
1247 )
1248 self.assertEqual(len(rollback), 1, "Wrong rollback length")
1249 args = self.db.create.call_args[0]
1250 content = args[1]
1251 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
1252 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
1253 self.assertEqual(content["name"], self.test_name, "Wrong CIM name")
1254 self.assertEqual(content["wim_url"], test_url, "Wrong URL")
1255 self.assertEqual(content["wim_type"], test_type, "Wrong CIM type")
1256 self.assertEqual(content["schema_version"], "1.11", "Wrong schema version")
1257 self.assertEqual(content["op_id"], oid, "Wrong operation identifier")
1258 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
1259 self.assertEqual(
1260 content["_admin"]["modified"],
1261 content["_admin"]["created"],
1262 "Wrong modification time",
1263 )
1264 self.assertEqual(
1265 content["_admin"]["operationalState"],
1266 "PROCESSING",
1267 "Wrong operational state",
1268 )
1269 self.assertEqual(
1270 content["_admin"]["projects_read"],
1271 [test_pid],
1272 "Wrong read-only projects",
1273 )
1274 self.assertEqual(
1275 content["_admin"]["projects_write"],
1276 [test_pid],
1277 "Wrong read/write projects",
1278 )
1279 self.assertIsNone(
1280 content["_admin"]["current_operation"], "Wrong current operation"
1281 )
1282 self.assertEqual(
1283 len(content["_admin"]["operations"]), 1, "Wrong number of operations"
1284 )
1285 operation = content["_admin"]["operations"][0]
1286 self.assertEqual(
1287 operation["lcmOperationType"], "create", "Wrong operation type"
1288 )
1289 self.assertEqual(
1290 operation["operationState"], "PROCESSING", "Wrong operation state"
1291 )
1292 self.assertGreater(
1293 operation["startTime"],
1294 content["_admin"]["created"],
1295 "Wrong operation start time",
1296 )
1297 self.assertGreater(
1298 operation["statusEnteredTime"],
1299 content["_admin"]["created"],
1300 "Wrong operation status enter time",
1301 )
1302 self.assertEqual(
1303 operation["detailed-status"], "", "Wrong operation detailed status info"
1304 )
1305 self.assertIsNone(
1306 operation["operationParams"], "Wrong operation parameters"
1307 )
1308 # This test is disabled. From Feature 8030 we admit all WIM/SDN types
1309 # with self.subTest(i=2):
1310 # rollback = []
1311 # test_type = "bad_type"
1312 # with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e:
1313 # self.topic.new(rollback, self.fake_session,
1314 # {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
1315 # self.assertEqual(len(rollback), 0, "Wrong rollback length")
1316 # self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
1317 # self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type,""),
1318 # norm(str(e.exception)), "Wrong exception text")
1319
1320 def test_conflict_on_new(self):
1321 with self.subTest(i=1):
1322 rollback = []
1323 test_url = "http://0.0.0.0:0"
1324 test_type = "fake"
1325 self.db.get_one.return_value = {"_id": str(uuid4()), "name": self.test_name}
1326 with self.assertRaises(
1327 EngineException, msg="Accepted existing CIM name"
1328 ) as e:
1329 self.topic.new(
1330 rollback,
1331 self.fake_session,
1332 {
1333 "name": self.test_name,
1334 "wim_url": test_url,
1335 "wim_type": test_type,
1336 },
1337 )
1338 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1339 self.assertEqual(
1340 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1341 )
1342 self.assertIn(
1343 "name '{}' already exists for {}".format(
1344 self.test_name, self.topic.topic
1345 ),
1346 norm(str(e.exception)),
1347 "Wrong exception text",
1348 )
1349
1350 def test_edit_cvws(self):
1351 now = time()
1352 cid = str(uuid4())
1353 test_url = "http://0.0.0.0:0"
1354 test_type = "fake"
1355 cvws = {
1356 "_id": cid,
1357 "name": self.test_name,
1358 "wim_url": test_url,
1359 "wim_type": test_type,
1360 "_admin": {
1361 "created": now,
1362 "modified": now,
1363 "operations": [{"lcmOperationType": "create"}],
1364 },
1365 }
1366 with self.subTest(i=1):
1367 new_name = "new-cim-name"
1368 new_url = "https://1.1.1.1:1"
1369 new_type = "onos"
1370 self.db.get_one.side_effect = [cvws, None]
1371 self.db.replace.return_value = {"updated": 1}
1372 # self.db.encrypt.side_effect = [b64str(), b64str()]
1373 self.topic.edit(
1374 self.fake_session,
1375 cid,
1376 {"name": new_name, "wim_url": new_url, "wim_type": new_type},
1377 )
1378 args = self.db.replace.call_args[0]
1379 content = args[2]
1380 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
1381 self.assertEqual(args[1], cid, "Wrong CIM identifier")
1382 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
1383 self.assertEqual(content["name"], new_name, "Wrong CIM name")
1384 self.assertEqual(content["wim_type"], new_type, "Wrong CIM type")
1385 self.assertEqual(content["wim_url"], new_url, "Wrong URL")
1386 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
1387 self.assertGreater(
1388 content["_admin"]["modified"],
1389 content["_admin"]["created"],
1390 "Wrong modification time",
1391 )
1392 self.assertEqual(
1393 len(content["_admin"]["operations"]), 2, "Wrong number of operations"
1394 )
1395 operation = content["_admin"]["operations"][1]
1396 self.assertEqual(
1397 operation["lcmOperationType"], "edit", "Wrong operation type"
1398 )
1399 self.assertEqual(
1400 operation["operationState"], "PROCESSING", "Wrong operation state"
1401 )
1402 self.assertGreater(
1403 operation["startTime"],
1404 content["_admin"]["modified"],
1405 "Wrong operation start time",
1406 )
1407 self.assertGreater(
1408 operation["statusEnteredTime"],
1409 content["_admin"]["modified"],
1410 "Wrong operation status enter time",
1411 )
1412 self.assertEqual(
1413 operation["detailed-status"], "", "Wrong operation detailed status info"
1414 )
1415 self.assertIsNone(
1416 operation["operationParams"], "Wrong operation parameters"
1417 )
1418 with self.subTest(i=2):
1419 self.db.get_one.side_effect = [cvws]
1420 with self.assertRaises(EngineException, msg="Accepted wrong property") as e:
1421 self.topic.edit(
1422 self.fake_session,
1423 str(uuid4()),
1424 {"name": "new-name", "extra_prop": "anything"},
1425 )
1426 self.assertEqual(
1427 e.exception.http_code,
1428 HTTPStatus.UNPROCESSABLE_ENTITY,
1429 "Wrong HTTP status code",
1430 )
1431 self.assertIn(
1432 "format error '{}'".format(
1433 "additional properties are not allowed ('{}' was unexpected)"
1434 ).format("extra_prop"),
1435 norm(str(e.exception)),
1436 "Wrong exception text",
1437 )
1438
1439 def test_conflict_on_edit(self):
1440 with self.subTest(i=1):
1441 cid = str(uuid4())
1442 new_name = "new-cim-name"
1443 self.db.get_one.side_effect = [
1444 {"_id": cid, "name": self.test_name},
1445 {"_id": str(uuid4()), "name": new_name},
1446 ]
1447 with self.assertRaises(
1448 EngineException, msg="Accepted existing CIM name"
1449 ) as e:
1450 self.topic.edit(self.fake_session, cid, {"name": new_name})
1451 self.assertEqual(
1452 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1453 )
1454 self.assertIn(
1455 "name '{}' already exists for {}".format(new_name, self.topic.topic),
1456 norm(str(e.exception)),
1457 "Wrong exception text",
1458 )
1459
1460 def test_delete_cvws(self):
1461 cid = str(uuid4())
1462 ro_pid = str(uuid4())
1463 rw_pid = str(uuid4())
1464 cvws = {"_id": cid, "name": self.test_name}
1465 self.db.get_list.return_value = []
1466 with self.subTest(i=1):
1467 cvws["_admin"] = {
1468 "projects_read": [test_pid, ro_pid, rw_pid],
1469 "projects_write": [test_pid, rw_pid],
1470 }
1471 self.db.get_one.return_value = cvws
1472 oid = self.topic.delete(self.fake_session, cid)
1473 self.assertIsNone(oid, "Wrong operation identifier")
1474 self.assertEqual(
1475 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1476 )
1477 self.assertEqual(
1478 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1479 )
1480 self.assertEqual(
1481 self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic"
1482 )
1483 self.assertEqual(
1484 self.db.set_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1485 )
1486 self.assertEqual(
1487 self.db.set_one.call_args[1]["update_dict"],
1488 None,
1489 "Wrong read-only projects update",
1490 )
1491 self.assertEqual(
1492 self.db.set_one.call_args[1]["pull_list"],
1493 {
1494 "_admin.projects_read": (test_pid,),
1495 "_admin.projects_write": (test_pid,),
1496 },
1497 "Wrong read/write projects update",
1498 )
1499 self.topic._send_msg.assert_not_called()
1500 with self.subTest(i=2):
1501 now = time()
1502 cvws["_admin"] = {
1503 "projects_read": [test_pid],
1504 "projects_write": [test_pid],
1505 "operations": [],
1506 }
1507 self.db.get_one.return_value = cvws
1508 oid = self.topic.delete(self.fake_session, cid)
1509 self.assertEqual(oid, cid + ":0", "Wrong operation identifier")
1510 self.assertEqual(
1511 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1512 )
1513 self.assertEqual(
1514 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1515 )
1516 self.assertEqual(
1517 self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic"
1518 )
1519 self.assertEqual(
1520 self.db.set_one.call_args[0][1]["_id"], cid, "Wrong user identifier"
1521 )
1522 self.assertEqual(
1523 self.db.set_one.call_args[1]["update_dict"],
1524 {"_admin.to_delete": True},
1525 "Wrong _admin.to_delete update",
1526 )
1527 operation = self.db.set_one.call_args[1]["push"]["_admin.operations"]
1528 self.assertEqual(
1529 operation["lcmOperationType"], "delete", "Wrong operation type"
1530 )
1531 self.assertEqual(
1532 operation["operationState"], "PROCESSING", "Wrong operation state"
1533 )
1534 self.assertEqual(
1535 operation["detailed-status"], "", "Wrong operation detailed status"
1536 )
1537 self.assertIsNone(
1538 operation["operationParams"], "Wrong operation parameters"
1539 )
1540 self.assertGreater(
1541 operation["startTime"], now, "Wrong operation start time"
1542 )
1543 self.assertGreater(
1544 operation["statusEnteredTime"], now, "Wrong operation status enter time"
1545 )
1546 self.topic._send_msg.assert_called_once_with(
1547 "delete", {"_id": cid, "op_id": cid + ":0"}, not_send_msg=None
1548 )
1549 with self.subTest(i=3):
1550 cvws["_admin"] = {
1551 "projects_read": [],
1552 "projects_write": [],
1553 "operations": [],
1554 }
1555 self.db.get_one.return_value = cvws
1556 self.topic._send_msg.reset_mock()
1557 self.db.get_one.reset_mock()
1558 self.db.del_one.reset_mock()
1559 self.fake_session["force"] = True # to force deletion
1560 self.fake_session["admin"] = True # to force deletion
1561 self.fake_session["project_id"] = [] # to force deletion
1562 oid = self.topic.delete(self.fake_session, cid)
1563 self.assertIsNone(oid, "Wrong operation identifier")
1564 self.assertEqual(
1565 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1566 )
1567 self.assertEqual(
1568 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1569 )
1570 self.assertEqual(
1571 self.db.del_one.call_args[0][0], self.topic.topic, "Wrong topic"
1572 )
1573 self.assertEqual(
1574 self.db.del_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1575 )
1576 self.topic._send_msg.assert_called_once_with(
1577 "deleted", {"_id": cid, "op_id": None}, not_send_msg=None
1578 )
1579
1580
1581 if __name__ == "__main__":
1582 unittest.main()