Coverage for osm_nbi/tests/test_admin_topics.py: 99%
747 statements
« prev ^ index » next coverage.py v7.3.1, created at 2024-06-27 02:46 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2024-06-27 02:46 +0000
1#! /usr/bin/python3
2# -*- coding: utf-8 -*-
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13# implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
17__author__ = "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com"
18__date__ = "$2019-10-019"
20import unittest
21import random
22from unittest import TestCase
23from unittest.mock import Mock, patch, call
24from uuid import uuid4
25from http import HTTPStatus
26from time import time
27from osm_common import dbbase, fsbase, msgbase
28from osm_common.dbmemory import DbMemory
29from osm_nbi import authconn, validation
30from osm_nbi.admin_topics import (
31 ProjectTopicAuth,
32 RoleTopicAuth,
33 UserTopicAuth,
34 CommonVimWimSdn,
35 VcaTopic,
36)
37from osm_nbi.engine import EngineException
38from osm_nbi.authconn import AuthconnNotFoundException
39from osm_nbi.authconn_internal import AuthconnInternal
42test_pid = str(uuid4())
43test_name = "test-user"
46def norm(str):
47 """Normalize string for checking"""
48 return " ".join(str.strip().split()).lower()
51class TestVcaTopic(TestCase):
52 def setUp(self):
53 self.db = Mock(dbbase.DbBase())
54 self.fs = Mock(fsbase.FsBase())
55 self.msg = Mock(msgbase.MsgBase())
56 self.auth = Mock(authconn.Authconn(None, None, None))
57 self.vca_topic = VcaTopic(self.db, self.fs, self.msg, self.auth)
59 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_new")
60 def test_format_on_new(self, mock_super_format_on_new):
61 content = {
62 "_id": "id",
63 "secret": "encrypted_secret",
64 "cacert": "encrypted_cacert",
65 }
66 self.db.encrypt.side_effect = ["secret", "cacert"]
67 mock_super_format_on_new.return_value = "1234"
69 oid = self.vca_topic.format_on_new(content)
71 self.assertEqual(oid, "1234")
72 self.assertEqual(content["secret"], "secret")
73 self.assertEqual(content["cacert"], "cacert")
74 self.db.encrypt.assert_has_calls(
75 [
76 call("encrypted_secret", schema_version="1.11", salt="id"),
77 call("encrypted_cacert", schema_version="1.11", salt="id"),
78 ]
79 )
80 mock_super_format_on_new.assert_called_with(content, None, False)
82 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_edit")
83 def test_format_on_edit(self, mock_super_format_on_edit):
84 edit_content = {
85 "_id": "id",
86 "secret": "encrypted_secret",
87 "cacert": "encrypted_cacert",
88 }
89 final_content = {
90 "_id": "id",
91 "schema_version": "1.11",
92 }
93 self.db.encrypt.side_effect = ["secret", "cacert"]
94 mock_super_format_on_edit.return_value = "1234"
96 oid = self.vca_topic.format_on_edit(final_content, edit_content)
98 self.assertEqual(oid, "1234")
99 self.assertEqual(final_content["secret"], "secret")
100 self.assertEqual(final_content["cacert"], "cacert")
101 self.db.encrypt.assert_has_calls(
102 [
103 call("encrypted_secret", schema_version="1.11", salt="id"),
104 call("encrypted_cacert", schema_version="1.11", salt="id"),
105 ]
106 )
107 mock_super_format_on_edit.assert_called()
109 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
110 def test_check_conflict_on_del(self, mock_check_conflict_on_del):
111 session = {
112 "project_id": "project-id",
113 "force": False,
114 }
115 _id = "vca-id"
116 db_content = {}
118 self.db.get_list.return_value = None
120 self.vca_topic.check_conflict_on_del(session, _id, db_content)
122 self.db.get_list.assert_called_with(
123 "vim_accounts",
124 {"vca": _id, "_admin.projects_read.cont": "project-id"},
125 )
126 mock_check_conflict_on_del.assert_called_with(session, _id, db_content)
128 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
129 def test_check_conflict_on_del_force(self, mock_check_conflict_on_del):
130 session = {
131 "project_id": "project-id",
132 "force": True,
133 }
134 _id = "vca-id"
135 db_content = {}
137 self.vca_topic.check_conflict_on_del(session, _id, db_content)
139 self.db.get_list.assert_not_called()
140 mock_check_conflict_on_del.assert_not_called()
142 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
143 def test_check_conflict_on_del_with_conflict(self, mock_check_conflict_on_del):
144 session = {
145 "project_id": "project-id",
146 "force": False,
147 }
148 _id = "vca-id"
149 db_content = {}
151 self.db.get_list.return_value = {"_id": "vim", "vca": "vca-id"}
153 with self.assertRaises(EngineException) as context:
154 self.vca_topic.check_conflict_on_del(session, _id, db_content)
155 self.assertEqual(
156 context.exception,
157 EngineException(
158 "There is at least one VIM account using this vca",
159 http_code=HTTPStatus.CONFLICT,
160 ),
161 )
163 self.db.get_list.assert_called_with(
164 "vim_accounts",
165 {"vca": _id, "_admin.projects_read.cont": "project-id"},
166 )
167 mock_check_conflict_on_del.assert_not_called()
170class Test_ProjectTopicAuth(TestCase):
171 @classmethod
172 def setUpClass(cls):
173 cls.test_name = "test-project-topic"
175 def setUp(self):
176 self.db = Mock(dbbase.DbBase())
177 self.fs = Mock(fsbase.FsBase())
178 self.msg = Mock(msgbase.MsgBase())
179 self.auth = Mock(authconn.Authconn(None, None, None))
180 self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth)
181 self.fake_session = {
182 "username": self.test_name,
183 "project_id": (test_pid,),
184 "method": None,
185 "admin": True,
186 "force": False,
187 "public": False,
188 "allow_show_user_project_role": True,
189 }
190 self.topic.check_quota = Mock(return_value=None) # skip quota
192 def test_new_project(self):
193 with self.subTest(i=1):
194 rollback = []
195 pid1 = str(uuid4())
196 self.auth.get_project_list.return_value = []
197 self.auth.create_project.return_value = pid1
198 pid2, oid = self.topic.new(
199 rollback, self.fake_session, {"name": self.test_name, "quotas": {}}
200 )
201 self.assertEqual(len(rollback), 1, "Wrong rollback length")
202 self.assertEqual(pid2, pid1, "Wrong project identifier")
203 content = self.auth.create_project.call_args[0][0]
204 self.assertEqual(content["name"], self.test_name, "Wrong project name")
205 self.assertEqual(content["quotas"], {}, "Wrong quotas")
206 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
207 self.assertEqual(
208 content["_admin"]["modified"],
209 content["_admin"]["created"],
210 "Wrong modification time",
211 )
212 with self.subTest(i=2):
213 rollback = []
214 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
215 self.topic.new(
216 rollback,
217 self.fake_session,
218 {"name": "other-project-name", "quotas": {"baditems": 10}},
219 )
220 self.assertEqual(len(rollback), 0, "Wrong rollback length")
221 self.assertEqual(
222 e.exception.http_code,
223 HTTPStatus.UNPROCESSABLE_ENTITY,
224 "Wrong HTTP status code",
225 )
226 self.assertIn(
227 "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format(
228 "baditems"
229 ),
230 norm(str(e.exception)),
231 "Wrong exception text",
232 )
234 def test_edit_project(self):
235 now = time()
236 pid = str(uuid4())
237 proj = {
238 "_id": pid,
239 "name": self.test_name,
240 "_admin": {"created": now, "modified": now},
241 }
242 with self.subTest(i=1):
243 self.auth.get_project_list.side_effect = [[proj], []]
244 new_name = "new-project-name"
245 quotas = {
246 "vnfds": random.SystemRandom().randint(0, 100),
247 "nsds": random.SystemRandom().randint(0, 100),
248 }
249 self.topic.edit(
250 self.fake_session, pid, {"name": new_name, "quotas": quotas}
251 )
252 _id, content = self.auth.update_project.call_args[0]
253 self.assertEqual(_id, pid, "Wrong project identifier")
254 self.assertEqual(content["_id"], pid, "Wrong project identifier")
255 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
256 self.assertGreater(
257 content["_admin"]["modified"], now, "Wrong modification time"
258 )
259 self.assertEqual(content["name"], new_name, "Wrong project name")
260 self.assertEqual(content["quotas"], quotas, "Wrong quotas")
261 with self.subTest(i=2):
262 new_name = "other-project-name"
263 quotas = {"baditems": random.SystemRandom().randint(0, 100)}
264 self.auth.get_project_list.side_effect = [[proj], []]
265 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
266 self.topic.edit(
267 self.fake_session, pid, {"name": new_name, "quotas": quotas}
268 )
269 self.assertEqual(
270 e.exception.http_code,
271 HTTPStatus.UNPROCESSABLE_ENTITY,
272 "Wrong HTTP status code",
273 )
274 self.assertIn(
275 "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format(
276 "baditems"
277 ),
278 norm(str(e.exception)),
279 "Wrong exception text",
280 )
282 def test_conflict_on_new(self):
283 with self.subTest(i=1):
284 rollback = []
285 pid = str(uuid4())
286 with self.assertRaises(
287 EngineException, msg="Accepted uuid as project name"
288 ) as e:
289 self.topic.new(rollback, self.fake_session, {"name": pid})
290 self.assertEqual(len(rollback), 0, "Wrong rollback length")
291 self.assertEqual(
292 e.exception.http_code,
293 HTTPStatus.UNPROCESSABLE_ENTITY,
294 "Wrong HTTP status code",
295 )
296 self.assertIn(
297 "project name '{}' cannot have an uuid format".format(pid),
298 norm(str(e.exception)),
299 "Wrong exception text",
300 )
301 with self.subTest(i=2):
302 rollback = []
303 self.auth.get_project_list.return_value = [
304 {"_id": test_pid, "name": self.test_name}
305 ]
306 with self.assertRaises(
307 EngineException, msg="Accepted existing project name"
308 ) as e:
309 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
310 self.assertEqual(len(rollback), 0, "Wrong rollback length")
311 self.assertEqual(
312 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
313 )
314 self.assertIn(
315 "project '{}' exists".format(self.test_name),
316 norm(str(e.exception)),
317 "Wrong exception text",
318 )
320 def test_conflict_on_edit(self):
321 with self.subTest(i=1):
322 self.auth.get_project_list.return_value = [
323 {"_id": test_pid, "name": self.test_name}
324 ]
325 new_name = str(uuid4())
326 with self.assertRaises(
327 EngineException, msg="Accepted uuid as project name"
328 ) as e:
329 self.topic.edit(self.fake_session, test_pid, {"name": new_name})
330 self.assertEqual(
331 e.exception.http_code,
332 HTTPStatus.UNPROCESSABLE_ENTITY,
333 "Wrong HTTP status code",
334 )
335 self.assertIn(
336 "project name '{}' cannot have an uuid format".format(new_name),
337 norm(str(e.exception)),
338 "Wrong exception text",
339 )
340 with self.subTest(i=2):
341 pid = str(uuid4())
342 self.auth.get_project_list.return_value = [{"_id": pid, "name": "admin"}]
343 with self.assertRaises(
344 EngineException, msg="Accepted renaming of project 'admin'"
345 ) as e:
346 self.topic.edit(self.fake_session, pid, {"name": "new-name"})
347 self.assertEqual(
348 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
349 )
350 self.assertIn(
351 "you cannot rename project 'admin'",
352 norm(str(e.exception)),
353 "Wrong exception text",
354 )
355 with self.subTest(i=3):
356 new_name = "new-project-name"
357 self.auth.get_project_list.side_effect = [
358 [{"_id": test_pid, "name": self.test_name}],
359 [{"_id": str(uuid4()), "name": new_name}],
360 ]
361 with self.assertRaises(
362 EngineException, msg="Accepted existing project name"
363 ) as e:
364 self.topic.edit(self.fake_session, pid, {"name": new_name})
365 self.assertEqual(
366 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
367 )
368 self.assertIn(
369 "project '{}' is already used".format(new_name),
370 norm(str(e.exception)),
371 "Wrong exception text",
372 )
374 def test_delete_project(self):
375 with self.subTest(i=1):
376 pid = str(uuid4())
377 self.auth.get_project.return_value = {
378 "_id": pid,
379 "name": "other-project-name",
380 }
381 self.auth.delete_project.return_value = {"deleted": 1}
382 self.auth.get_user_list.return_value = []
383 self.db.get_list.return_value = []
384 rc = self.topic.delete(self.fake_session, pid)
385 self.assertEqual(rc, {"deleted": 1}, "Wrong project deletion return info")
386 self.assertEqual(
387 self.auth.get_project.call_args[0][0], pid, "Wrong project identifier"
388 )
389 self.assertEqual(
390 self.auth.delete_project.call_args[0][0],
391 pid,
392 "Wrong project identifier",
393 )
395 def test_conflict_on_del(self):
396 with self.subTest(i=1):
397 self.auth.get_project.return_value = {
398 "_id": test_pid,
399 "name": self.test_name,
400 }
401 with self.assertRaises(
402 EngineException, msg="Accepted deletion of own project"
403 ) as e:
404 self.topic.delete(self.fake_session, self.test_name)
405 self.assertEqual(
406 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
407 )
408 self.assertIn(
409 "you cannot delete your own project",
410 norm(str(e.exception)),
411 "Wrong exception text",
412 )
413 with self.subTest(i=2):
414 self.auth.get_project.return_value = {"_id": str(uuid4()), "name": "admin"}
415 with self.assertRaises(
416 EngineException, msg="Accepted deletion of project 'admin'"
417 ) as e:
418 self.topic.delete(self.fake_session, "admin")
419 self.assertEqual(
420 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
421 )
422 self.assertIn(
423 "you cannot delete project 'admin'",
424 norm(str(e.exception)),
425 "Wrong exception text",
426 )
427 with self.subTest(i=3):
428 pid = str(uuid4())
429 name = "other-project-name"
430 self.auth.get_project.return_value = {"_id": pid, "name": name}
431 self.auth.get_user_list.return_value = [
432 {
433 "_id": str(uuid4()),
434 "username": self.test_name,
435 "project_role_mappings": [{"project": pid, "role": str(uuid4())}],
436 }
437 ]
438 with self.assertRaises(
439 EngineException, msg="Accepted deletion of used project"
440 ) as e:
441 self.topic.delete(self.fake_session, pid)
442 self.assertEqual(
443 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
444 )
445 self.assertIn(
446 "project '{}' ({}) is being used by user '{}'".format(
447 name, pid, self.test_name
448 ),
449 norm(str(e.exception)),
450 "Wrong exception text",
451 )
452 with self.subTest(i=4):
453 self.auth.get_user_list.return_value = []
454 self.db.get_list.return_value = [
455 {
456 "_id": str(uuid4()),
457 "id": self.test_name,
458 "_admin": {"projects_read": [pid], "projects_write": []},
459 }
460 ]
461 with self.assertRaises(
462 EngineException, msg="Accepted deletion of used project"
463 ) as e:
464 self.topic.delete(self.fake_session, pid)
465 self.assertEqual(
466 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
467 )
468 self.assertIn(
469 "project '{}' ({}) is being used by {} '{}'".format(
470 name, pid, "vnf descriptor", self.test_name
471 ),
472 norm(str(e.exception)),
473 "Wrong exception text",
474 )
477class Test_RoleTopicAuth(TestCase):
478 @classmethod
479 def setUpClass(cls):
480 cls.test_name = "test-role-topic"
481 cls.test_operations = ["tokens:get"]
483 def setUp(self):
484 self.db = Mock(dbbase.DbBase())
485 self.fs = Mock(fsbase.FsBase())
486 self.msg = Mock(msgbase.MsgBase())
487 self.auth = Mock(authconn.Authconn(None, None, None))
488 self.auth.role_permissions = self.test_operations
489 self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth)
490 self.fake_session = {
491 "username": test_name,
492 "project_id": (test_pid,),
493 "method": None,
494 "admin": True,
495 "force": False,
496 "public": False,
497 "allow_show_user_project_role": True,
498 }
499 self.topic.check_quota = Mock(return_value=None) # skip quota
501 def test_new_role(self):
502 with self.subTest(i=1):
503 rollback = []
504 rid1 = str(uuid4())
505 perms_in = {"tokens": True}
506 perms_out = {"default": False, "admin": False, "tokens": True}
507 self.auth.get_role_list.return_value = []
508 self.auth.create_role.return_value = rid1
509 rid2, oid = self.topic.new(
510 rollback,
511 self.fake_session,
512 {"name": self.test_name, "permissions": perms_in},
513 )
514 self.assertEqual(len(rollback), 1, "Wrong rollback length")
515 self.assertEqual(rid2, rid1, "Wrong project identifier")
516 content = self.auth.create_role.call_args[0][0]
517 self.assertEqual(content["name"], self.test_name, "Wrong role name")
518 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
519 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
520 self.assertEqual(
521 content["_admin"]["modified"],
522 content["_admin"]["created"],
523 "Wrong modification time",
524 )
525 with self.subTest(i=2):
526 rollback = []
527 with self.assertRaises(
528 EngineException, msg="Accepted wrong permissions"
529 ) as e:
530 self.topic.new(
531 rollback,
532 self.fake_session,
533 {"name": "other-role-name", "permissions": {"projects": True}},
534 )
535 self.assertEqual(len(rollback), 0, "Wrong rollback length")
536 self.assertEqual(
537 e.exception.http_code,
538 HTTPStatus.UNPROCESSABLE_ENTITY,
539 "Wrong HTTP status code",
540 )
541 self.assertIn(
542 "invalid permission '{}'".format("projects"),
543 norm(str(e.exception)),
544 "Wrong exception text",
545 )
547 def test_edit_role(self):
548 now = time()
549 rid = str(uuid4())
550 role = {
551 "_id": rid,
552 "name": self.test_name,
553 "permissions": {"tokens": True},
554 "_admin": {"created": now, "modified": now},
555 }
556 with self.subTest(i=1):
557 self.auth.get_role_list.side_effect = [[role], []]
558 self.auth.get_role.return_value = role
559 new_name = "new-role-name"
560 perms_in = {"tokens": False, "tokens:get": True}
561 perms_out = {
562 "default": False,
563 "admin": False,
564 "tokens": False,
565 "tokens:get": True,
566 }
567 self.topic.edit(
568 self.fake_session, rid, {"name": new_name, "permissions": perms_in}
569 )
570 content = self.auth.update_role.call_args[0][0]
571 self.assertEqual(content["_id"], rid, "Wrong role identifier")
572 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
573 self.assertGreater(
574 content["_admin"]["modified"], now, "Wrong modification time"
575 )
576 self.assertEqual(content["name"], new_name, "Wrong role name")
577 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
578 with self.subTest(i=2):
579 new_name = "other-role-name"
580 perms_in = {"tokens": False, "tokens:post": True}
581 self.auth.get_role_list.side_effect = [[role], []]
582 with self.assertRaises(
583 EngineException, msg="Accepted wrong permissions"
584 ) as e:
585 self.topic.edit(
586 self.fake_session, rid, {"name": new_name, "permissions": perms_in}
587 )
588 self.assertEqual(
589 e.exception.http_code,
590 HTTPStatus.UNPROCESSABLE_ENTITY,
591 "Wrong HTTP status code",
592 )
593 self.assertIn(
594 "invalid permission '{}'".format("tokens:post"),
595 norm(str(e.exception)),
596 "Wrong exception text",
597 )
599 def test_delete_role(self):
600 with self.subTest(i=1):
601 rid = str(uuid4())
602 role = {"_id": rid, "name": "other-role-name"}
603 self.auth.get_role_list.return_value = [role]
604 self.auth.get_role.return_value = role
605 self.auth.delete_role.return_value = {"deleted": 1}
606 self.auth.get_user_list.return_value = []
607 rc = self.topic.delete(self.fake_session, rid)
608 self.assertEqual(rc, {"deleted": 1}, "Wrong role deletion return info")
609 self.assertEqual(
610 self.auth.get_role_list.call_args[0][0]["_id"],
611 rid,
612 "Wrong role identifier",
613 )
614 self.assertEqual(
615 self.auth.get_role.call_args[0][0], rid, "Wrong role identifier"
616 )
617 self.assertEqual(
618 self.auth.delete_role.call_args[0][0], rid, "Wrong role identifier"
619 )
621 def test_conflict_on_new(self):
622 with self.subTest(i=1):
623 rollback = []
624 rid = str(uuid4())
625 with self.assertRaises(
626 EngineException, msg="Accepted uuid as role name"
627 ) as e:
628 self.topic.new(rollback, self.fake_session, {"name": rid})
629 self.assertEqual(len(rollback), 0, "Wrong rollback length")
630 self.assertEqual(
631 e.exception.http_code,
632 HTTPStatus.UNPROCESSABLE_ENTITY,
633 "Wrong HTTP status code",
634 )
635 self.assertIn(
636 "role name '{}' cannot have an uuid format".format(rid),
637 norm(str(e.exception)),
638 "Wrong exception text",
639 )
640 with self.subTest(i=2):
641 rollback = []
642 self.auth.get_role_list.return_value = [
643 {"_id": str(uuid4()), "name": self.test_name}
644 ]
645 with self.assertRaises(
646 EngineException, msg="Accepted existing role name"
647 ) as e:
648 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
649 self.assertEqual(len(rollback), 0, "Wrong rollback length")
650 self.assertEqual(
651 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
652 )
653 self.assertIn(
654 "role name '{}' exists".format(self.test_name),
655 norm(str(e.exception)),
656 "Wrong exception text",
657 )
659 def test_conflict_on_edit(self):
660 rid = str(uuid4())
661 with self.subTest(i=1):
662 self.auth.get_role_list.return_value = [
663 {"_id": rid, "name": self.test_name, "permissions": {}}
664 ]
665 new_name = str(uuid4())
666 with self.assertRaises(
667 EngineException, msg="Accepted uuid as role name"
668 ) as e:
669 self.topic.edit(self.fake_session, rid, {"name": new_name})
670 self.assertEqual(
671 e.exception.http_code,
672 HTTPStatus.UNPROCESSABLE_ENTITY,
673 "Wrong HTTP status code",
674 )
675 self.assertIn(
676 "role name '{}' cannot have an uuid format".format(new_name),
677 norm(str(e.exception)),
678 "Wrong exception text",
679 )
680 for i, role_name in enumerate(["system_admin", "project_admin"], start=2):
681 with self.subTest(i=i):
682 rid = str(uuid4())
683 self.auth.get_role.return_value = {
684 "_id": rid,
685 "name": role_name,
686 "permissions": {},
687 }
688 with self.assertRaises(
689 EngineException,
690 msg="Accepted renaming of role '{}'".format(role_name),
691 ) as e:
692 self.topic.edit(self.fake_session, rid, {"name": "new-name"})
693 self.assertEqual(
694 e.exception.http_code,
695 HTTPStatus.FORBIDDEN,
696 "Wrong HTTP status code",
697 )
698 self.assertIn(
699 "you cannot rename role '{}'".format(role_name),
700 norm(str(e.exception)),
701 "Wrong exception text",
702 )
703 with self.subTest(i=i + 1):
704 new_name = "new-role-name"
705 self.auth.get_role_list.side_effect = [
706 [{"_id": rid, "name": self.test_name, "permissions": {}}],
707 [{"_id": str(uuid4()), "name": new_name, "permissions": {}}],
708 ]
709 self.auth.get_role.return_value = {
710 "_id": rid,
711 "name": self.test_name,
712 "permissions": {},
713 }
714 with self.assertRaises(
715 EngineException, msg="Accepted existing role name"
716 ) as e:
717 self.topic.edit(self.fake_session, rid, {"name": new_name})
718 self.assertEqual(
719 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
720 )
721 self.assertIn(
722 "role name '{}' exists".format(new_name),
723 norm(str(e.exception)),
724 "Wrong exception text",
725 )
727 def test_conflict_on_del(self):
728 for i, role_name in enumerate(["system_admin", "project_admin"], start=1):
729 with self.subTest(i=i):
730 rid = str(uuid4())
731 role = {"_id": rid, "name": role_name}
732 self.auth.get_role_list.return_value = [role]
733 self.auth.get_role.return_value = role
734 with self.assertRaises(
735 EngineException,
736 msg="Accepted deletion of role '{}'".format(role_name),
737 ) as e:
738 self.topic.delete(self.fake_session, rid)
739 self.assertEqual(
740 e.exception.http_code,
741 HTTPStatus.FORBIDDEN,
742 "Wrong HTTP status code",
743 )
744 self.assertIn(
745 "you cannot delete role '{}'".format(role_name),
746 norm(str(e.exception)),
747 "Wrong exception text",
748 )
749 with self.subTest(i=i + 1):
750 rid = str(uuid4())
751 name = "other-role-name"
752 role = {"_id": rid, "name": name}
753 self.auth.get_role_list.return_value = [role]
754 self.auth.get_role.return_value = role
755 self.auth.get_user_list.return_value = [
756 {
757 "_id": str(uuid4()),
758 "username": self.test_name,
759 "project_role_mappings": [{"project": str(uuid4()), "role": rid}],
760 }
761 ]
762 with self.assertRaises(
763 EngineException, msg="Accepted deletion of used role"
764 ) as e:
765 self.topic.delete(self.fake_session, rid)
766 self.assertEqual(
767 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
768 )
769 self.assertIn(
770 "role '{}' ({}) is being used by user '{}'".format(
771 name, rid, self.test_name
772 ),
773 norm(str(e.exception)),
774 "Wrong exception text",
775 )
778class Test_UserTopicAuth(TestCase):
779 @classmethod
780 def setUpClass(cls):
781 cls.test_name = "test-user-topic"
782 cls.password = "Test@123"
784 def setUp(self):
785 # self.db = Mock(dbbase.DbBase())
786 self.db = DbMemory()
787 self.fs = Mock(fsbase.FsBase())
788 self.msg = Mock(msgbase.MsgBase())
789 self.auth = Mock(authconn.Authconn(None, None, None))
790 self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth)
791 self.fake_session = {
792 "username": test_name,
793 "project_id": (test_pid,),
794 "method": None,
795 "admin": True,
796 "force": False,
797 "public": False,
798 "allow_show_user_project_role": True,
799 }
800 self.topic.check_quota = Mock(return_value=None) # skip quota
802 def test_new_user(self):
803 uid1 = str(uuid4())
804 pid = str(uuid4())
805 self.auth.get_user_list.return_value = []
806 self.auth.get_project.return_value = {"_id": pid, "name": "some_project"}
807 self.auth.create_user.return_value = {"_id": uid1, "username": self.test_name}
808 with self.subTest(i=1):
809 rollback = []
810 rid = str(uuid4())
811 self.auth.get_role.return_value = {"_id": rid, "name": "some_role"}
812 prms_in = [{"project": "some_project", "role": "some_role"}]
813 prms_out = [{"project": pid, "role": rid}]
814 uid2, oid = self.topic.new(
815 rollback,
816 self.fake_session,
817 {
818 "username": self.test_name,
819 "password": self.password,
820 "project_role_mappings": prms_in,
821 },
822 )
823 self.assertEqual(len(rollback), 1, "Wrong rollback length")
824 self.assertEqual(uid2, uid1, "Wrong project identifier")
825 content = self.auth.create_user.call_args[0][0]
826 self.assertEqual(content["username"], self.test_name, "Wrong project name")
827 self.assertEqual(content["password"], self.password, "Wrong password")
828 self.assertEqual(
829 content["project_role_mappings"],
830 prms_out,
831 "Wrong project-role mappings",
832 )
833 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
834 self.assertEqual(
835 content["_admin"]["modified"],
836 content["_admin"]["created"],
837 "Wrong modification time",
838 )
839 with self.subTest(i=2):
840 rollback = []
841 def_rid = str(uuid4())
842 def_role = {"_id": def_rid, "name": "project_admin"}
843 self.auth.get_role.return_value = def_role
844 self.auth.get_role_list.return_value = [def_role]
845 prms_out = [{"project": pid, "role": def_rid}]
846 uid2, oid = self.topic.new(
847 rollback,
848 self.fake_session,
849 {
850 "username": self.test_name,
851 "password": self.password,
852 "projects": ["some_project"],
853 },
854 )
855 self.assertEqual(len(rollback), 1, "Wrong rollback length")
856 self.assertEqual(uid2, uid1, "Wrong project identifier")
857 content = self.auth.create_user.call_args[0][0]
858 self.assertEqual(content["username"], self.test_name, "Wrong project name")
859 self.assertEqual(content["password"], self.password, "Wrong password")
860 self.assertEqual(
861 content["project_role_mappings"],
862 prms_out,
863 "Wrong project-role mappings",
864 )
865 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
866 self.assertEqual(
867 content["_admin"]["modified"],
868 content["_admin"]["created"],
869 "Wrong modification time",
870 )
871 with self.subTest(i=3):
872 rollback = []
873 with self.assertRaises(
874 EngineException, msg="Accepted wrong project-role mappings"
875 ) as e:
876 self.topic.new(
877 rollback,
878 self.fake_session,
879 {
880 "username": "other-project-name",
881 "password": "Other@pwd1",
882 "project_role_mappings": [{}],
883 },
884 )
885 self.assertEqual(len(rollback), 0, "Wrong rollback length")
886 self.assertEqual(
887 e.exception.http_code,
888 HTTPStatus.UNPROCESSABLE_ENTITY,
889 "Wrong HTTP status code",
890 )
891 self.assertIn(
892 "format error at '{}' '{}'".format(
893 "project_role_mappings:{}", "'{}' is a required property"
894 ).format(0, "project"),
895 norm(str(e.exception)),
896 "Wrong exception text",
897 )
898 with self.subTest(i=4):
899 rollback = []
900 with self.assertRaises(EngineException, msg="Accepted wrong projects") as e:
901 self.topic.new(
902 rollback,
903 self.fake_session,
904 {
905 "username": "other-project-name",
906 "password": "Other@pwd1",
907 "projects": [],
908 },
909 )
910 self.assertEqual(len(rollback), 0, "Wrong rollback length")
911 self.assertEqual(
912 e.exception.http_code,
913 HTTPStatus.UNPROCESSABLE_ENTITY,
914 "Wrong HTTP status code",
915 )
916 self.assertIn(
917 "format error at '{}' '{}'".format(
918 "projects", "{} is too short"
919 ).format([]),
920 norm(str(e.exception)),
921 "Wrong exception text",
922 )
924 def test_edit_user(self):
925 now = time()
926 uid = str(uuid4())
927 pid1 = str(uuid4())
928 rid1 = str(uuid4())
929 prms = [
930 {
931 "project": pid1,
932 "project_name": "project-1",
933 "role": rid1,
934 "role_name": "role-1",
935 }
936 ]
937 user = {
938 "_id": uid,
939 "username": self.test_name,
940 "project_role_mappings": prms,
941 "_admin": {"created": now, "modified": now},
942 }
943 with self.subTest(i=1):
944 self.auth.get_user_list.side_effect = [[user], []]
945 self.auth.get_user.return_value = user
946 pid2 = str(uuid4())
947 rid2 = str(uuid4())
948 self.auth.get_project.side_effect = [
949 {"_id": pid2, "name": "project-2"},
950 {"_id": pid1, "name": "project-1"},
951 ]
952 self.auth.get_role.side_effect = [
953 {"_id": rid2, "name": "role-2"},
954 {"_id": rid1, "name": "role-1"},
955 ]
956 new_name = "new-user-name"
957 new_pasw = "New@pwd1"
958 add_prms = [{"project": pid2, "role": rid2}]
959 rem_prms = [{"project": pid1, "role": rid1}]
960 self.topic.edit(
961 self.fake_session,
962 uid,
963 {
964 "username": new_name,
965 "password": new_pasw,
966 "add_project_role_mappings": add_prms,
967 "remove_project_role_mappings": rem_prms,
968 },
969 )
970 content = self.auth.update_user.call_args[0][0]
971 self.assertEqual(content["_id"], uid, "Wrong user identifier")
972 self.assertEqual(content["username"], new_name, "Wrong user name")
973 self.assertEqual(content["password"], new_pasw, "Wrong user password")
974 self.assertEqual(
975 content["add_project_role_mappings"],
976 add_prms,
977 "Wrong project-role mappings to add",
978 )
979 self.assertEqual(
980 content["remove_project_role_mappings"],
981 prms,
982 "Wrong project-role mappings to remove",
983 )
984 with self.subTest(i=2):
985 new_name = "other-user-name"
986 new_prms = [{}]
987 self.auth.get_role_list.side_effect = [[user], []]
988 self.auth.get_user_list.side_effect = [[user]]
989 with self.assertRaises(
990 EngineException, msg="Accepted wrong project-role mappings"
991 ) as e:
992 self.topic.edit(
993 self.fake_session,
994 uid,
995 {"username": new_name, "project_role_mappings": new_prms},
996 )
997 self.assertEqual(
998 e.exception.http_code,
999 HTTPStatus.UNPROCESSABLE_ENTITY,
1000 "Wrong HTTP status code",
1001 )
1002 self.assertIn(
1003 "format error at '{}' '{}'".format(
1004 "project_role_mappings:{}", "'{}' is a required property"
1005 ).format(0, "project"),
1006 norm(str(e.exception)),
1007 "Wrong exception text",
1008 )
1009 with self.subTest(i=3):
1010 self.auth.get_user_list.side_effect = [[user], []]
1011 self.auth.get_user.return_value = user
1012 old_password = self.password
1013 new_pasw = "New@pwd1"
1014 self.topic.edit(
1015 self.fake_session,
1016 uid,
1017 {
1018 "old_password": old_password,
1019 "password": new_pasw,
1020 },
1021 )
1022 content = self.auth.update_user.call_args[0][0]
1023 self.assertEqual(
1024 content["old_password"], old_password, "Wrong old password"
1025 )
1026 self.assertEqual(content["password"], new_pasw, "Wrong user password")
1028 def test_delete_user(self):
1029 with self.subTest(i=1):
1030 uid = str(uuid4())
1031 self.fake_session["username"] = self.test_name
1032 user = user = {
1033 "_id": uid,
1034 "username": "other-user-name",
1035 "project_role_mappings": [],
1036 }
1037 self.auth.get_user.return_value = user
1038 self.auth.delete_user.return_value = {"deleted": 1}
1039 rc = self.topic.delete(self.fake_session, uid)
1040 self.assertEqual(rc, {"deleted": 1}, "Wrong user deletion return info")
1041 self.assertEqual(
1042 self.auth.get_user.call_args[0][0], uid, "Wrong user identifier"
1043 )
1044 self.assertEqual(
1045 self.auth.delete_user.call_args[0][0], uid, "Wrong user identifier"
1046 )
1048 def test_conflict_on_new(self):
1049 with self.subTest(i=1):
1050 rollback = []
1051 uid = str(uuid4())
1052 with self.assertRaises(
1053 EngineException, msg="Accepted uuid as username"
1054 ) as e:
1055 self.topic.new(
1056 rollback,
1057 self.fake_session,
1058 {
1059 "username": uid,
1060 "password": self.password,
1061 "projects": [test_pid],
1062 },
1063 )
1064 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1065 self.assertEqual(
1066 e.exception.http_code,
1067 HTTPStatus.UNPROCESSABLE_ENTITY,
1068 "Wrong HTTP status code",
1069 )
1070 self.assertIn(
1071 "username '{}' cannot have a uuid format".format(uid),
1072 norm(str(e.exception)),
1073 "Wrong exception text",
1074 )
1075 with self.subTest(i=2):
1076 rollback = []
1077 self.auth.get_user_list.return_value = [
1078 {"_id": str(uuid4()), "username": self.test_name}
1079 ]
1080 with self.assertRaises(
1081 EngineException, msg="Accepted existing username"
1082 ) as e:
1083 self.topic.new(
1084 rollback,
1085 self.fake_session,
1086 {
1087 "username": self.test_name,
1088 "password": self.password,
1089 "projects": [test_pid],
1090 },
1091 )
1092 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1093 self.assertEqual(
1094 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1095 )
1096 self.assertIn(
1097 "username '{}' is already used".format(self.test_name),
1098 norm(str(e.exception)),
1099 "Wrong exception text",
1100 )
1101 with self.subTest(i=3):
1102 rollback = []
1103 self.auth.get_user_list.return_value = []
1104 self.auth.get_role_list.side_effect = [[], []]
1105 with self.assertRaises(
1106 AuthconnNotFoundException, msg="Accepted user without default role"
1107 ) as e:
1108 self.topic.new(
1109 rollback,
1110 self.fake_session,
1111 {
1112 "username": self.test_name,
1113 "password": self.password,
1114 "projects": [str(uuid4())],
1115 },
1116 )
1117 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1118 self.assertEqual(
1119 e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code"
1120 )
1121 self.assertIn(
1122 "can't find default role for user '{}'".format(self.test_name),
1123 norm(str(e.exception)),
1124 "Wrong exception text",
1125 )
1127 def test_conflict_on_edit(self):
1128 uid = str(uuid4())
1129 with self.subTest(i=1):
1130 self.auth.get_user_list.return_value = [
1131 {"_id": uid, "username": self.test_name}
1132 ]
1133 new_name = str(uuid4())
1134 with self.assertRaises(
1135 EngineException, msg="Accepted uuid as username"
1136 ) as e:
1137 self.topic.edit(self.fake_session, uid, {"username": new_name})
1138 self.assertEqual(
1139 e.exception.http_code,
1140 HTTPStatus.UNPROCESSABLE_ENTITY,
1141 "Wrong HTTP status code",
1142 )
1143 self.assertIn(
1144 "username '{}' cannot have an uuid format".format(new_name),
1145 norm(str(e.exception)),
1146 "Wrong exception text",
1147 )
1148 with self.subTest(i=2):
1149 self.auth.get_user_list.return_value = [
1150 {"_id": uid, "username": self.test_name}
1151 ]
1152 self.auth.get_role_list.side_effect = [[], []]
1153 with self.assertRaises(
1154 AuthconnNotFoundException, msg="Accepted user without default role"
1155 ) as e:
1156 self.topic.edit(self.fake_session, uid, {"projects": [str(uuid4())]})
1157 self.assertEqual(
1158 e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code"
1159 )
1160 self.assertIn(
1161 "can't find a default role for user '{}'".format(self.test_name),
1162 norm(str(e.exception)),
1163 "Wrong exception text",
1164 )
1165 with self.subTest(i=3):
1166 admin_uid = str(uuid4())
1167 self.auth.get_user_list.return_value = [
1168 {"_id": admin_uid, "username": "admin"}
1169 ]
1170 with self.assertRaises(
1171 EngineException,
1172 msg="Accepted removing system_admin role from admin user",
1173 ) as e:
1174 self.topic.edit(
1175 self.fake_session,
1176 admin_uid,
1177 {
1178 "remove_project_role_mappings": [
1179 {"project": "admin", "role": "system_admin"}
1180 ]
1181 },
1182 )
1183 self.assertEqual(
1184 e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code"
1185 )
1186 self.assertIn(
1187 "you cannot remove system_admin role from admin user",
1188 norm(str(e.exception)),
1189 "Wrong exception text",
1190 )
1191 with self.subTest(i=4):
1192 new_name = "new-user-name"
1193 self.auth.get_user_list.side_effect = [
1194 [{"_id": uid, "name": self.test_name}],
1195 [{"_id": str(uuid4()), "name": new_name}],
1196 ]
1197 with self.assertRaises(
1198 EngineException, msg="Accepted existing username"
1199 ) as e:
1200 self.topic.edit(self.fake_session, uid, {"username": new_name})
1201 self.assertEqual(
1202 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1203 )
1204 self.assertIn(
1205 "username '{}' is already used".format(new_name),
1206 norm(str(e.exception)),
1207 "Wrong exception text",
1208 )
1210 def test_conflict_on_del(self):
1211 with self.subTest(i=1):
1212 uid = str(uuid4())
1213 self.fake_session["username"] = self.test_name
1214 user = user = {
1215 "_id": uid,
1216 "username": self.test_name,
1217 "project_role_mappings": [],
1218 }
1219 self.auth.get_user.return_value = user
1220 with self.assertRaises(
1221 EngineException, msg="Accepted deletion of own user"
1222 ) as e:
1223 self.topic.delete(self.fake_session, uid)
1224 self.assertEqual(
1225 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1226 )
1227 self.assertIn(
1228 "you cannot delete your own login user",
1229 norm(str(e.exception)),
1230 "Wrong exception text",
1231 )
1233 def test_user_management(self):
1234 self.config = {
1235 "user_management": True,
1236 "pwd_expire_days": 30,
1237 "max_pwd_attempt": 5,
1238 "account_expire_days": 90,
1239 "version": "dev",
1240 "deviceVendor": "test",
1241 "deviceProduct": "test",
1242 }
1243 self.permissions = {"admin": True, "default": True}
1244 now = time()
1245 rid = str(uuid4())
1246 role = {
1247 "_id": rid,
1248 "name": self.test_name,
1249 "permissions": self.permissions,
1250 "_admin": {"created": now, "modified": now},
1251 }
1252 self.db.create("roles", role)
1253 admin_user = {
1254 "_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500",
1255 "username": "admin",
1256 "password": "bf0d9f988ad9b404464cf8c8749b298209b05fd404119bae0c11e247efbbc4cb",
1257 "_admin": {
1258 "created": 1663058370.7721832,
1259 "modified": 1663681183.5651639,
1260 "salt": "37587e7e0c2f4dbfb9416f3fb5543e2b",
1261 "last_token_time": 1666876472.2962265,
1262 "user_status": "always-active",
1263 "retry_count": 0,
1264 },
1265 "project_role_mappings": [
1266 {"project": "a595ce4e-09dc-4b24-9d6f-e723830bc66b", "role": rid}
1267 ],
1268 }
1269 self.db.create("users", admin_user)
1270 with self.subTest(i=1):
1271 self.user_create = AuthconnInternal(self.config, self.db, self.permissions)
1272 user_info = {"username": "user_mgmt_true", "password": "Test@123"}
1273 self.user_create.create_user(user_info)
1274 user = self.db.get_one("users", {"username": user_info["username"]})
1275 self.assertEqual(user["username"], user_info["username"], "Wrong user name")
1276 self.assertEqual(
1277 user["_admin"]["user_status"], "active", "User status is unknown"
1278 )
1279 self.assertIn("password_expire_time", user["_admin"], "Key is not there")
1280 self.assertIn("account_expire_time", user["_admin"], "Key is not there")
1281 with self.subTest(i=2):
1282 self.user_update = AuthconnInternal(self.config, self.db, self.permissions)
1283 locked_user = {
1284 "username": "user_lock",
1285 "password": "c94ba8cfe81985cf5c84dff16d5bac95814ab17e44a8871755eb4cf3a27b7d3d",
1286 "_admin": {
1287 "created": 1667207552.2191198,
1288 "modified": 1667207552.2191815,
1289 "salt": "560a5d51b1d64bb4b9cae0ccff3f1102",
1290 "user_status": "locked",
1291 "password_expire_time": 1667207552.2191815,
1292 "account_expire_time": 1674983552.2191815,
1293 "retry_count": 5,
1294 "last_token_time": 1667207552.2191815,
1295 },
1296 "_id": "73bbbb71-ed38-4b79-9f58-ece19e7e32d6",
1297 }
1298 self.db.create("users", locked_user)
1299 user_info = {
1300 "_id": "73bbbb71-ed38-4b79-9f58-ece19e7e32d6",
1301 "system_admin_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500",
1302 "unlock": True,
1303 }
1304 self.assertEqual(
1305 locked_user["_admin"]["user_status"], "locked", "User status is unknown"
1306 )
1307 self.user_update.update_user(user_info)
1308 user = self.db.get_one("users", {"username": locked_user["username"]})
1309 self.assertEqual(
1310 user["username"], locked_user["username"], "Wrong user name"
1311 )
1312 self.assertEqual(
1313 user["_admin"]["user_status"], "active", "User status is unknown"
1314 )
1315 self.assertEqual(user["_admin"]["retry_count"], 0, "retry_count is unknown")
1316 with self.subTest(i=3):
1317 self.user_update = AuthconnInternal(self.config, self.db, self.permissions)
1318 expired_user = {
1319 "username": "user_expire",
1320 "password": "c94ba8cfe81985cf5c84dff16d5bac95814ab17e44a8871755eb4cf3a27b7d3d",
1321 "_admin": {
1322 "created": 1665602087.601298,
1323 "modified": 1665636442.1245084,
1324 "salt": "560a5d51b1d64bb4b9cae0ccff3f1102",
1325 "user_status": "expired",
1326 "password_expire_time": 1668248628.2191815,
1327 "account_expire_time": 1666952628.2191815,
1328 "retry_count": 0,
1329 "last_token_time": 1666779828.2171815,
1330 },
1331 "_id": "3266430f-8222-407f-b08f-3a242504ab94",
1332 }
1333 self.db.create("users", expired_user)
1334 user_info = {
1335 "_id": "3266430f-8222-407f-b08f-3a242504ab94",
1336 "system_admin_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500",
1337 "renew": True,
1338 }
1339 self.assertEqual(
1340 expired_user["_admin"]["user_status"],
1341 "expired",
1342 "User status is unknown",
1343 )
1344 self.user_update.update_user(user_info)
1345 user = self.db.get_one("users", {"username": expired_user["username"]})
1346 self.assertEqual(
1347 user["username"], expired_user["username"], "Wrong user name"
1348 )
1349 self.assertEqual(
1350 user["_admin"]["user_status"], "active", "User status is unknown"
1351 )
1352 self.assertGreater(
1353 user["_admin"]["account_expire_time"],
1354 expired_user["_admin"]["account_expire_time"],
1355 "User expire time is not get extended",
1356 )
1357 with self.subTest(i=4):
1358 self.config.update({"user_management": False})
1359 self.user_create = AuthconnInternal(self.config, self.db, self.permissions)
1360 user_info = {"username": "user_mgmt_false", "password": "Test@123"}
1361 self.user_create.create_user(user_info)
1362 user = self.db.get_one("users", {"username": user_info["username"]})
1363 self.assertEqual(user["username"], user_info["username"], "Wrong user name")
1364 self.assertEqual(
1365 user["_admin"]["user_status"], "active", "User status is unknown"
1366 )
1367 self.assertNotIn("password_expire_time", user["_admin"], "Key is not there")
1368 self.assertNotIn("account_expire_time", user["_admin"], "Key is not there")
1371class Test_CommonVimWimSdn(TestCase):
1372 @classmethod
1373 def setUpClass(cls):
1374 cls.test_name = "test-cim-topic" # CIM = Common Infrastructure Manager
1376 def setUp(self):
1377 self.db = Mock(dbbase.DbBase())
1378 self.fs = Mock(fsbase.FsBase())
1379 self.msg = Mock(msgbase.MsgBase())
1380 self.auth = Mock(authconn.Authconn(None, None, None))
1381 self.topic = CommonVimWimSdn(self.db, self.fs, self.msg, self.auth)
1382 # Use WIM schemas for testing because they are the simplest
1383 self.topic._send_msg = Mock()
1384 self.topic.topic = "wims"
1385 self.topic.schema_new = validation.wim_account_new_schema
1386 self.topic.schema_edit = validation.wim_account_edit_schema
1387 self.fake_session = {
1388 "username": test_name,
1389 "project_id": (test_pid,),
1390 "method": None,
1391 "admin": True,
1392 "force": False,
1393 "public": False,
1394 "allow_show_user_project_role": True,
1395 }
1396 self.topic.check_quota = Mock(return_value=None) # skip quota
1398 def test_new_cvws(self):
1399 test_url = "http://0.0.0.0:0"
1400 with self.subTest(i=1):
1401 rollback = []
1402 test_type = "fake"
1403 self.db.get_one.return_value = None
1404 self.db.create.side_effect = lambda self, content: content["_id"]
1405 cid, oid = self.topic.new(
1406 rollback,
1407 self.fake_session,
1408 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type},
1409 )
1410 self.assertEqual(len(rollback), 1, "Wrong rollback length")
1411 args = self.db.create.call_args[0]
1412 content = args[1]
1413 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
1414 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
1415 self.assertEqual(content["name"], self.test_name, "Wrong CIM name")
1416 self.assertEqual(content["wim_url"], test_url, "Wrong URL")
1417 self.assertEqual(content["wim_type"], test_type, "Wrong CIM type")
1418 self.assertEqual(content["schema_version"], "1.11", "Wrong schema version")
1419 self.assertEqual(content["op_id"], oid, "Wrong operation identifier")
1420 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
1421 self.assertEqual(
1422 content["_admin"]["modified"],
1423 content["_admin"]["created"],
1424 "Wrong modification time",
1425 )
1426 self.assertEqual(
1427 content["_admin"]["operationalState"],
1428 "PROCESSING",
1429 "Wrong operational state",
1430 )
1431 self.assertEqual(
1432 content["_admin"]["projects_read"],
1433 [test_pid],
1434 "Wrong read-only projects",
1435 )
1436 self.assertEqual(
1437 content["_admin"]["projects_write"],
1438 [test_pid],
1439 "Wrong read/write projects",
1440 )
1441 self.assertIsNone(
1442 content["_admin"]["current_operation"], "Wrong current operation"
1443 )
1444 self.assertEqual(
1445 len(content["_admin"]["operations"]), 1, "Wrong number of operations"
1446 )
1447 operation = content["_admin"]["operations"][0]
1448 self.assertEqual(
1449 operation["lcmOperationType"], "create", "Wrong operation type"
1450 )
1451 self.assertEqual(
1452 operation["operationState"], "PROCESSING", "Wrong operation state"
1453 )
1454 self.assertGreater(
1455 operation["startTime"],
1456 content["_admin"]["created"],
1457 "Wrong operation start time",
1458 )
1459 self.assertGreater(
1460 operation["statusEnteredTime"],
1461 content["_admin"]["created"],
1462 "Wrong operation status enter time",
1463 )
1464 self.assertEqual(
1465 operation["detailed-status"], "", "Wrong operation detailed status info"
1466 )
1467 self.assertIsNone(
1468 operation["operationParams"], "Wrong operation parameters"
1469 )
1470 # This test is disabled. From Feature 8030 we admit all WIM/SDN types
1471 # with self.subTest(i=2):
1472 # rollback = []
1473 # test_type = "bad_type"
1474 # with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e:
1475 # self.topic.new(rollback, self.fake_session,
1476 # {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
1477 # self.assertEqual(len(rollback), 0, "Wrong rollback length")
1478 # self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
1479 # self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type,""),
1480 # norm(str(e.exception)), "Wrong exception text")
1482 def test_conflict_on_new(self):
1483 with self.subTest(i=1):
1484 rollback = []
1485 test_url = "http://0.0.0.0:0"
1486 test_type = "fake"
1487 self.db.get_one.return_value = {"_id": str(uuid4()), "name": self.test_name}
1488 with self.assertRaises(
1489 EngineException, msg="Accepted existing CIM name"
1490 ) as e:
1491 self.topic.new(
1492 rollback,
1493 self.fake_session,
1494 {
1495 "name": self.test_name,
1496 "wim_url": test_url,
1497 "wim_type": test_type,
1498 },
1499 )
1500 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1501 self.assertEqual(
1502 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1503 )
1504 self.assertIn(
1505 "name '{}' already exists for {}".format(
1506 self.test_name, self.topic.topic
1507 ),
1508 norm(str(e.exception)),
1509 "Wrong exception text",
1510 )
1512 def test_edit_cvws(self):
1513 now = time()
1514 cid = str(uuid4())
1515 test_url = "http://0.0.0.0:0"
1516 test_type = "fake"
1517 cvws = {
1518 "_id": cid,
1519 "name": self.test_name,
1520 "wim_url": test_url,
1521 "wim_type": test_type,
1522 "_admin": {
1523 "created": now,
1524 "modified": now,
1525 "operations": [{"lcmOperationType": "create"}],
1526 },
1527 }
1528 with self.subTest(i=1):
1529 new_name = "new-cim-name"
1530 new_url = "https://1.1.1.1:1"
1531 new_type = "onos"
1532 self.db.get_one.side_effect = [cvws, None]
1533 self.db.replace.return_value = {"updated": 1}
1534 # self.db.encrypt.side_effect = [b64str(), b64str()]
1535 self.topic.edit(
1536 self.fake_session,
1537 cid,
1538 {"name": new_name, "wim_url": new_url, "wim_type": new_type},
1539 )
1540 args = self.db.replace.call_args[0]
1541 content = args[2]
1542 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
1543 self.assertEqual(args[1], cid, "Wrong CIM identifier")
1544 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
1545 self.assertEqual(content["name"], new_name, "Wrong CIM name")
1546 self.assertEqual(content["wim_type"], new_type, "Wrong CIM type")
1547 self.assertEqual(content["wim_url"], new_url, "Wrong URL")
1548 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
1549 self.assertGreater(
1550 content["_admin"]["modified"],
1551 content["_admin"]["created"],
1552 "Wrong modification time",
1553 )
1554 self.assertEqual(
1555 len(content["_admin"]["operations"]), 2, "Wrong number of operations"
1556 )
1557 operation = content["_admin"]["operations"][1]
1558 self.assertEqual(
1559 operation["lcmOperationType"], "edit", "Wrong operation type"
1560 )
1561 self.assertEqual(
1562 operation["operationState"], "PROCESSING", "Wrong operation state"
1563 )
1564 self.assertGreater(
1565 operation["startTime"],
1566 content["_admin"]["modified"],
1567 "Wrong operation start time",
1568 )
1569 self.assertGreater(
1570 operation["statusEnteredTime"],
1571 content["_admin"]["modified"],
1572 "Wrong operation status enter time",
1573 )
1574 self.assertEqual(
1575 operation["detailed-status"], "", "Wrong operation detailed status info"
1576 )
1577 self.assertIsNone(
1578 operation["operationParams"], "Wrong operation parameters"
1579 )
1580 with self.subTest(i=2):
1581 self.db.get_one.side_effect = [cvws]
1582 with self.assertRaises(EngineException, msg="Accepted wrong property") as e:
1583 self.topic.edit(
1584 self.fake_session,
1585 str(uuid4()),
1586 {"name": "new-name", "extra_prop": "anything"},
1587 )
1588 self.assertEqual(
1589 e.exception.http_code,
1590 HTTPStatus.UNPROCESSABLE_ENTITY,
1591 "Wrong HTTP status code",
1592 )
1593 self.assertIn(
1594 "format error '{}'".format(
1595 "additional properties are not allowed ('{}' was unexpected)"
1596 ).format("extra_prop"),
1597 norm(str(e.exception)),
1598 "Wrong exception text",
1599 )
1601 def test_conflict_on_edit(self):
1602 with self.subTest(i=1):
1603 cid = str(uuid4())
1604 new_name = "new-cim-name"
1605 self.db.get_one.side_effect = [
1606 {"_id": cid, "name": self.test_name},
1607 {"_id": str(uuid4()), "name": new_name},
1608 ]
1609 with self.assertRaises(
1610 EngineException, msg="Accepted existing CIM name"
1611 ) as e:
1612 self.topic.edit(self.fake_session, cid, {"name": new_name})
1613 self.assertEqual(
1614 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1615 )
1616 self.assertIn(
1617 "name '{}' already exists for {}".format(new_name, self.topic.topic),
1618 norm(str(e.exception)),
1619 "Wrong exception text",
1620 )
1622 def test_delete_cvws(self):
1623 cid = str(uuid4())
1624 ro_pid = str(uuid4())
1625 rw_pid = str(uuid4())
1626 cvws = {"_id": cid, "name": self.test_name}
1627 self.db.get_list.return_value = []
1628 with self.subTest(i=1):
1629 cvws["_admin"] = {
1630 "projects_read": [test_pid, ro_pid, rw_pid],
1631 "projects_write": [test_pid, rw_pid],
1632 }
1633 self.db.get_one.return_value = cvws
1634 oid = self.topic.delete(self.fake_session, cid)
1635 self.assertIsNone(oid, "Wrong operation identifier")
1636 self.assertEqual(
1637 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1638 )
1639 self.assertEqual(
1640 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1641 )
1642 self.assertEqual(
1643 self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic"
1644 )
1645 self.assertEqual(
1646 self.db.set_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1647 )
1648 self.assertEqual(
1649 self.db.set_one.call_args[1]["update_dict"],
1650 None,
1651 "Wrong read-only projects update",
1652 )
1653 self.assertEqual(
1654 self.db.set_one.call_args[1]["pull_list"],
1655 {
1656 "_admin.projects_read": (test_pid,),
1657 "_admin.projects_write": (test_pid,),
1658 },
1659 "Wrong read/write projects update",
1660 )
1661 self.topic._send_msg.assert_not_called()
1662 with self.subTest(i=2):
1663 now = time()
1664 cvws["_admin"] = {
1665 "projects_read": [test_pid],
1666 "projects_write": [test_pid],
1667 "operations": [],
1668 }
1669 self.db.get_one.return_value = cvws
1670 oid = self.topic.delete(self.fake_session, cid)
1671 self.assertEqual(oid, cid + ":0", "Wrong operation identifier")
1672 self.assertEqual(
1673 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1674 )
1675 self.assertEqual(
1676 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1677 )
1678 self.assertEqual(
1679 self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic"
1680 )
1681 self.assertEqual(
1682 self.db.set_one.call_args[0][1]["_id"], cid, "Wrong user identifier"
1683 )
1684 self.assertEqual(
1685 self.db.set_one.call_args[1]["update_dict"],
1686 {"_admin.to_delete": True},
1687 "Wrong _admin.to_delete update",
1688 )
1689 operation = self.db.set_one.call_args[1]["push"]["_admin.operations"]
1690 self.assertEqual(
1691 operation["lcmOperationType"], "delete", "Wrong operation type"
1692 )
1693 self.assertEqual(
1694 operation["operationState"], "PROCESSING", "Wrong operation state"
1695 )
1696 self.assertEqual(
1697 operation["detailed-status"], "", "Wrong operation detailed status"
1698 )
1699 self.assertIsNone(
1700 operation["operationParams"], "Wrong operation parameters"
1701 )
1702 self.assertGreater(
1703 operation["startTime"], now, "Wrong operation start time"
1704 )
1705 self.assertGreater(
1706 operation["statusEnteredTime"], now, "Wrong operation status enter time"
1707 )
1708 self.topic._send_msg.assert_called_once_with(
1709 "delete", {"_id": cid, "op_id": cid + ":0"}, not_send_msg=None
1710 )
1711 with self.subTest(i=3):
1712 cvws["_admin"] = {
1713 "projects_read": [],
1714 "projects_write": [],
1715 "operations": [],
1716 }
1717 self.db.get_one.return_value = cvws
1718 self.topic._send_msg.reset_mock()
1719 self.db.get_one.reset_mock()
1720 self.db.del_one.reset_mock()
1721 self.fake_session["force"] = True # to force deletion
1722 self.fake_session["admin"] = True # to force deletion
1723 self.fake_session["project_id"] = [] # to force deletion
1724 oid = self.topic.delete(self.fake_session, cid)
1725 self.assertIsNone(oid, "Wrong operation identifier")
1726 self.assertEqual(
1727 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1728 )
1729 self.assertEqual(
1730 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1731 )
1732 self.assertEqual(
1733 self.db.del_one.call_args[0][0], self.topic.topic, "Wrong topic"
1734 )
1735 self.assertEqual(
1736 self.db.del_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1737 )
1738 self.topic._send_msg.assert_called_once_with(
1739 "deleted", {"_id": cid, "op_id": None}, not_send_msg=None
1740 )
1743if __name__ == "__main__":
1744 unittest.main()