Coverage for osm_nbi/tests/test_admin_topics.py: 99%
754 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-10 20:04 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-10 20:04 +0000
1#! /usr/bin/python3
2# -*- coding: utf-8 -*-
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13# implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
17__author__ = "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com"
18__date__ = "$2019-10-019"
20import unittest
21import random
22from unittest import TestCase
23from unittest.mock import Mock, patch, call
24from uuid import uuid4
25from http import HTTPStatus
26from time import time
27from osm_common import dbbase, fsbase, msgbase
28from osm_common.dbmemory import DbMemory
29from osm_nbi import authconn, validation
30from osm_nbi.admin_topics import (
31 ProjectTopicAuth,
32 RoleTopicAuth,
33 UserTopicAuth,
34 CommonVimWimSdn,
35 VcaTopic,
36)
37from osm_nbi.engine import EngineException
38from osm_nbi.authconn import AuthconnNotFoundException
39from osm_nbi.authconn_internal import AuthconnInternal
42test_pid = str(uuid4())
43test_name = "test-user"
46def norm(str):
47 """Normalize string for checking"""
48 return " ".join(str.strip().split()).lower()
51class TestVcaTopic(TestCase):
52 def setUp(self):
53 self.db = Mock(dbbase.DbBase())
54 self.fs = Mock(fsbase.FsBase())
55 self.msg = Mock(msgbase.MsgBase())
56 self.auth = Mock(authconn.Authconn(None, None, None))
57 self.vca_topic = VcaTopic(self.db, self.fs, self.msg, self.auth)
59 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_new")
60 def test_format_on_new(self, mock_super_format_on_new):
61 content = {
62 "_id": "id",
63 "secret": "encrypted_secret",
64 "cacert": "encrypted_cacert",
65 }
66 self.db.encrypt.side_effect = ["secret", "cacert"]
67 mock_super_format_on_new.return_value = "1234"
69 oid = self.vca_topic.format_on_new(content)
71 self.assertEqual(oid, "1234")
72 self.assertEqual(content["secret"], "secret")
73 self.assertEqual(content["cacert"], "cacert")
74 self.db.encrypt.assert_has_calls(
75 [
76 call("encrypted_secret", schema_version="1.11", salt="id"),
77 call("encrypted_cacert", schema_version="1.11", salt="id"),
78 ]
79 )
80 mock_super_format_on_new.assert_called_with(content, None, False)
82 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_edit")
83 def test_format_on_edit(self, mock_super_format_on_edit):
84 edit_content = {
85 "_id": "id",
86 "secret": "encrypted_secret",
87 "cacert": "encrypted_cacert",
88 }
89 final_content = {
90 "_id": "id",
91 "schema_version": "1.11",
92 }
93 self.db.encrypt.side_effect = ["secret", "cacert"]
94 mock_super_format_on_edit.return_value = "1234"
96 oid = self.vca_topic.format_on_edit(final_content, edit_content)
98 self.assertEqual(oid, "1234")
99 self.assertEqual(final_content["secret"], "secret")
100 self.assertEqual(final_content["cacert"], "cacert")
101 self.db.encrypt.assert_has_calls(
102 [
103 call("encrypted_secret", schema_version="1.11", salt="id"),
104 call("encrypted_cacert", schema_version="1.11", salt="id"),
105 ]
106 )
107 mock_super_format_on_edit.assert_called()
109 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
110 def test_check_conflict_on_del(self, mock_check_conflict_on_del):
111 session = {
112 "project_id": "project-id",
113 "force": False,
114 }
115 _id = "vca-id"
116 db_content = {}
118 self.db.get_list.return_value = None
120 self.vca_topic.check_conflict_on_del(session, _id, db_content)
122 self.db.get_list.assert_called_with(
123 "vim_accounts",
124 {"vca": _id, "_admin.projects_read.cont": "project-id"},
125 )
126 mock_check_conflict_on_del.assert_called_with(session, _id, db_content)
128 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
129 def test_check_conflict_on_del_force(self, mock_check_conflict_on_del):
130 session = {
131 "project_id": "project-id",
132 "force": True,
133 }
134 _id = "vca-id"
135 db_content = {}
137 self.vca_topic.check_conflict_on_del(session, _id, db_content)
139 self.db.get_list.assert_not_called()
140 mock_check_conflict_on_del.assert_not_called()
142 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
143 def test_check_conflict_on_del_with_conflict(self, mock_check_conflict_on_del):
144 session = {
145 "project_id": "project-id",
146 "force": False,
147 }
148 _id = "vca-id"
149 db_content = {}
151 self.db.get_list.return_value = {"_id": "vim", "vca": "vca-id"}
153 with self.assertRaises(EngineException) as context:
154 self.vca_topic.check_conflict_on_del(session, _id, db_content)
155 self.assertEqual(
156 context.exception,
157 EngineException(
158 "There is at least one VIM account using this vca",
159 http_code=HTTPStatus.CONFLICT,
160 ),
161 )
163 self.db.get_list.assert_called_with(
164 "vim_accounts",
165 {"vca": _id, "_admin.projects_read.cont": "project-id"},
166 )
167 mock_check_conflict_on_del.assert_not_called()
170class Test_ProjectTopicAuth(TestCase):
171 @classmethod
172 def setUpClass(cls):
173 cls.test_name = "test-project-topic"
175 def setUp(self):
176 self.db = Mock(dbbase.DbBase())
177 self.fs = Mock(fsbase.FsBase())
178 self.msg = Mock(msgbase.MsgBase())
179 self.auth = Mock(authconn.Authconn(None, None, None))
180 self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth)
181 self.fake_session = {
182 "username": self.test_name,
183 "project_id": (test_pid,),
184 "method": None,
185 "admin": True,
186 "force": False,
187 "public": False,
188 "allow_show_user_project_role": True,
189 }
190 self.topic.check_quota = Mock(return_value=None) # skip quota
192 def test_new_project(self):
193 with self.subTest(i=1):
194 rollback = []
195 pid1 = str(uuid4())
196 self.auth.get_project_list.return_value = []
197 self.auth.create_project.return_value = pid1
198 pid2, oid = self.topic.new(
199 rollback, self.fake_session, {"name": self.test_name, "quotas": {}}
200 )
201 self.assertEqual(len(rollback), 1, "Wrong rollback length")
202 self.assertEqual(pid2, pid1, "Wrong project identifier")
203 content = self.auth.create_project.call_args[0][0]
204 self.assertEqual(content["name"], self.test_name, "Wrong project name")
205 self.assertEqual(content["quotas"], {}, "Wrong quotas")
206 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
207 self.assertEqual(
208 content["_admin"]["modified"],
209 content["_admin"]["created"],
210 "Wrong modification time",
211 )
212 with self.subTest(i=2):
213 rollback = []
214 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
215 self.topic.new(
216 rollback,
217 self.fake_session,
218 {"name": "other-project-name", "quotas": {"baditems": 10}},
219 )
220 self.assertEqual(len(rollback), 0, "Wrong rollback length")
221 self.assertEqual(
222 e.exception.http_code,
223 HTTPStatus.UNPROCESSABLE_ENTITY,
224 "Wrong HTTP status code",
225 )
226 self.assertIn(
227 "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format(
228 "baditems"
229 ),
230 norm(str(e.exception)),
231 "Wrong exception text",
232 )
234 def test_edit_project(self):
235 now = time()
236 pid = str(uuid4())
237 proj = {
238 "_id": pid,
239 "name": self.test_name,
240 "_admin": {"created": now, "modified": now},
241 }
242 with self.subTest(i=1):
243 self.auth.get_project_list.side_effect = [[proj], []]
244 new_name = "new-project-name"
245 quotas = {
246 "vnfds": random.SystemRandom().randint(0, 100),
247 "nsds": random.SystemRandom().randint(0, 100),
248 }
249 self.topic.edit(
250 self.fake_session, pid, {"name": new_name, "quotas": quotas}
251 )
252 _id, content = self.auth.update_project.call_args[0]
253 self.assertEqual(_id, pid, "Wrong project identifier")
254 self.assertEqual(content["_id"], pid, "Wrong project identifier")
255 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
256 self.assertGreater(
257 content["_admin"]["modified"], now, "Wrong modification time"
258 )
259 self.assertEqual(content["name"], new_name, "Wrong project name")
260 self.assertEqual(content["quotas"], quotas, "Wrong quotas")
261 with self.subTest(i=2):
262 new_name = "other-project-name"
263 quotas = {"baditems": random.SystemRandom().randint(0, 100)}
264 self.auth.get_project_list.side_effect = [[proj], []]
265 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
266 self.topic.edit(
267 self.fake_session, pid, {"name": new_name, "quotas": quotas}
268 )
269 self.assertEqual(
270 e.exception.http_code,
271 HTTPStatus.UNPROCESSABLE_ENTITY,
272 "Wrong HTTP status code",
273 )
274 self.assertIn(
275 "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format(
276 "baditems"
277 ),
278 norm(str(e.exception)),
279 "Wrong exception text",
280 )
282 def test_conflict_on_new(self):
283 with self.subTest(i=1):
284 rollback = []
285 pid = str(uuid4())
286 with self.assertRaises(
287 EngineException, msg="Accepted uuid as project name"
288 ) as e:
289 self.topic.new(rollback, self.fake_session, {"name": pid})
290 self.assertEqual(len(rollback), 0, "Wrong rollback length")
291 self.assertEqual(
292 e.exception.http_code,
293 HTTPStatus.UNPROCESSABLE_ENTITY,
294 "Wrong HTTP status code",
295 )
296 self.assertIn(
297 "project name '{}' cannot have an uuid format".format(pid),
298 norm(str(e.exception)),
299 "Wrong exception text",
300 )
301 with self.subTest(i=2):
302 rollback = []
303 self.auth.get_project_list.return_value = [
304 {"_id": test_pid, "name": self.test_name}
305 ]
306 with self.assertRaises(
307 EngineException, msg="Accepted existing project name"
308 ) as e:
309 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
310 self.assertEqual(len(rollback), 0, "Wrong rollback length")
311 self.assertEqual(
312 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
313 )
314 self.assertIn(
315 "project '{}' exists".format(self.test_name),
316 norm(str(e.exception)),
317 "Wrong exception text",
318 )
320 def test_conflict_on_edit(self):
321 with self.subTest(i=1):
322 self.auth.get_project_list.return_value = [
323 {"_id": test_pid, "name": self.test_name}
324 ]
325 new_name = str(uuid4())
326 with self.assertRaises(
327 EngineException, msg="Accepted uuid as project name"
328 ) as e:
329 self.topic.edit(self.fake_session, test_pid, {"name": new_name})
330 self.assertEqual(
331 e.exception.http_code,
332 HTTPStatus.UNPROCESSABLE_ENTITY,
333 "Wrong HTTP status code",
334 )
335 self.assertIn(
336 "project name '{}' cannot have an uuid format".format(new_name),
337 norm(str(e.exception)),
338 "Wrong exception text",
339 )
340 with self.subTest(i=2):
341 pid = str(uuid4())
342 self.auth.get_project_list.return_value = [{"_id": pid, "name": "admin"}]
343 with self.assertRaises(
344 EngineException, msg="Accepted renaming of project 'admin'"
345 ) as e:
346 self.topic.edit(self.fake_session, pid, {"name": "new-name"})
347 self.assertEqual(
348 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
349 )
350 self.assertIn(
351 "you cannot rename project 'admin'",
352 norm(str(e.exception)),
353 "Wrong exception text",
354 )
355 with self.subTest(i=3):
356 new_name = "new-project-name"
357 self.auth.get_project_list.side_effect = [
358 [{"_id": test_pid, "name": self.test_name}],
359 [{"_id": str(uuid4()), "name": new_name}],
360 ]
361 with self.assertRaises(
362 EngineException, msg="Accepted existing project name"
363 ) as e:
364 self.topic.edit(self.fake_session, pid, {"name": new_name})
365 self.assertEqual(
366 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
367 )
368 self.assertIn(
369 "project '{}' is already used".format(new_name),
370 norm(str(e.exception)),
371 "Wrong exception text",
372 )
374 def test_delete_project(self):
375 with self.subTest(i=1):
376 pid = str(uuid4())
377 self.auth.get_project.return_value = {
378 "_id": pid,
379 "name": "other-project-name",
380 }
381 self.auth.delete_project.return_value = {"deleted": 1}
382 self.auth.get_user_list.return_value = []
383 self.db.get_list.return_value = []
384 rc = self.topic.delete(self.fake_session, pid)
385 self.assertEqual(rc, {"deleted": 1}, "Wrong project deletion return info")
386 self.assertEqual(
387 self.auth.get_project.call_args[0][0], pid, "Wrong project identifier"
388 )
389 self.assertEqual(
390 self.auth.delete_project.call_args[0][0],
391 pid,
392 "Wrong project identifier",
393 )
395 def test_conflict_on_del(self):
396 with self.subTest(i=1):
397 self.auth.get_project.return_value = {
398 "_id": test_pid,
399 "name": self.test_name,
400 }
401 with self.assertRaises(
402 EngineException, msg="Accepted deletion of own project"
403 ) as e:
404 self.topic.delete(self.fake_session, self.test_name)
405 self.assertEqual(
406 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
407 )
408 self.assertIn(
409 "you cannot delete your own project",
410 norm(str(e.exception)),
411 "Wrong exception text",
412 )
413 with self.subTest(i=2):
414 self.auth.get_project.return_value = {"_id": str(uuid4()), "name": "admin"}
415 with self.assertRaises(
416 EngineException, msg="Accepted deletion of project 'admin'"
417 ) as e:
418 self.topic.delete(self.fake_session, "admin")
419 self.assertEqual(
420 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
421 )
422 self.assertIn(
423 "you cannot delete project 'admin'",
424 norm(str(e.exception)),
425 "Wrong exception text",
426 )
427 with self.subTest(i=3):
428 pid = str(uuid4())
429 name = "other-project-name"
430 self.auth.get_project.return_value = {"_id": pid, "name": name}
431 self.auth.get_user_list.return_value = [
432 {
433 "_id": str(uuid4()),
434 "username": self.test_name,
435 "project_role_mappings": [{"project": pid, "role": str(uuid4())}],
436 }
437 ]
438 with self.assertRaises(
439 EngineException, msg="Accepted deletion of used project"
440 ) as e:
441 self.topic.delete(self.fake_session, pid)
442 self.assertEqual(
443 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
444 )
445 self.assertIn(
446 "project '{}' ({}) is being used by user '{}'".format(
447 name, pid, self.test_name
448 ),
449 norm(str(e.exception)),
450 "Wrong exception text",
451 )
452 with self.subTest(i=4):
453 self.auth.get_user_list.return_value = []
454 self.db.get_list.return_value = [
455 {
456 "_id": str(uuid4()),
457 "id": self.test_name,
458 "_admin": {"projects_read": [pid], "projects_write": []},
459 }
460 ]
461 with self.assertRaises(
462 EngineException, msg="Accepted deletion of used project"
463 ) as e:
464 self.topic.delete(self.fake_session, pid)
465 self.assertEqual(
466 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
467 )
468 self.assertIn(
469 "project '{}' ({}) is being used by {} '{}'".format(
470 name, pid, "vnf descriptor", self.test_name
471 ),
472 norm(str(e.exception)),
473 "Wrong exception text",
474 )
477class Test_RoleTopicAuth(TestCase):
478 @classmethod
479 def setUpClass(cls):
480 cls.test_name = "test-role-topic"
481 cls.test_operations = ["tokens:get"]
483 def setUp(self):
484 self.db = Mock(dbbase.DbBase())
485 self.fs = Mock(fsbase.FsBase())
486 self.msg = Mock(msgbase.MsgBase())
487 self.auth = Mock(authconn.Authconn(None, None, None))
488 self.auth.role_permissions = self.test_operations
489 self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth)
490 self.fake_session = {
491 "username": test_name,
492 "project_id": (test_pid,),
493 "method": None,
494 "admin": True,
495 "force": False,
496 "public": False,
497 "allow_show_user_project_role": True,
498 }
499 self.topic.check_quota = Mock(return_value=None) # skip quota
501 def test_new_role(self):
502 with self.subTest(i=1):
503 rollback = []
504 rid1 = str(uuid4())
505 perms_in = {"tokens": True}
506 perms_out = {"default": False, "admin": False, "tokens": True}
507 self.auth.get_role_list.return_value = []
508 self.auth.create_role.return_value = rid1
509 rid2, oid = self.topic.new(
510 rollback,
511 self.fake_session,
512 {"name": self.test_name, "permissions": perms_in},
513 )
514 self.assertEqual(len(rollback), 1, "Wrong rollback length")
515 self.assertEqual(rid2, rid1, "Wrong project identifier")
516 content = self.auth.create_role.call_args[0][0]
517 self.assertEqual(content["name"], self.test_name, "Wrong role name")
518 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
519 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
520 self.assertEqual(
521 content["_admin"]["modified"],
522 content["_admin"]["created"],
523 "Wrong modification time",
524 )
525 with self.subTest(i=2):
526 rollback = []
527 with self.assertRaises(
528 EngineException, msg="Accepted wrong permissions"
529 ) as e:
530 self.topic.new(
531 rollback,
532 self.fake_session,
533 {"name": "other-role-name", "permissions": {"projects": True}},
534 )
535 self.assertEqual(len(rollback), 0, "Wrong rollback length")
536 self.assertEqual(
537 e.exception.http_code,
538 HTTPStatus.UNPROCESSABLE_ENTITY,
539 "Wrong HTTP status code",
540 )
541 self.assertIn(
542 "invalid permission '{}'".format("projects"),
543 norm(str(e.exception)),
544 "Wrong exception text",
545 )
547 def test_edit_role(self):
548 now = time()
549 rid = str(uuid4())
550 role = {
551 "_id": rid,
552 "name": self.test_name,
553 "permissions": {"tokens": True},
554 "_admin": {"created": now, "modified": now},
555 }
556 with self.subTest(i=1):
557 self.auth.get_role_list.side_effect = [[role], []]
558 self.auth.get_role.return_value = role
559 new_name = "new-role-name"
560 perms_in = {"tokens": False, "tokens:get": True}
561 perms_out = {
562 "default": False,
563 "admin": False,
564 "tokens": False,
565 "tokens:get": True,
566 }
567 self.topic.edit(
568 self.fake_session, rid, {"name": new_name, "permissions": perms_in}
569 )
570 content = self.auth.update_role.call_args[0][0]
571 self.assertEqual(content["_id"], rid, "Wrong role identifier")
572 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
573 self.assertGreater(
574 content["_admin"]["modified"], now, "Wrong modification time"
575 )
576 self.assertEqual(content["name"], new_name, "Wrong role name")
577 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
578 with self.subTest(i=2):
579 new_name = "other-role-name"
580 perms_in = {"tokens": False, "tokens:post": True}
581 self.auth.get_role_list.side_effect = [[role], []]
582 with self.assertRaises(
583 EngineException, msg="Accepted wrong permissions"
584 ) as e:
585 self.topic.edit(
586 self.fake_session, rid, {"name": new_name, "permissions": perms_in}
587 )
588 self.assertEqual(
589 e.exception.http_code,
590 HTTPStatus.UNPROCESSABLE_ENTITY,
591 "Wrong HTTP status code",
592 )
593 self.assertIn(
594 "invalid permission '{}'".format("tokens:post"),
595 norm(str(e.exception)),
596 "Wrong exception text",
597 )
599 def test_delete_role(self):
600 with self.subTest(i=1):
601 rid = str(uuid4())
602 role = {"_id": rid, "name": "other-role-name"}
603 self.auth.get_role_list.return_value = [role]
604 self.auth.get_role.return_value = role
605 self.auth.delete_role.return_value = {"deleted": 1}
606 self.auth.get_user_list.return_value = []
607 rc = self.topic.delete(self.fake_session, rid)
608 self.assertEqual(rc, {"deleted": 1}, "Wrong role deletion return info")
609 self.assertEqual(
610 self.auth.get_role_list.call_args[0][0]["_id"],
611 rid,
612 "Wrong role identifier",
613 )
614 self.assertEqual(
615 self.auth.get_role.call_args[0][0], rid, "Wrong role identifier"
616 )
617 self.assertEqual(
618 self.auth.delete_role.call_args[0][0], rid, "Wrong role identifier"
619 )
621 def test_conflict_on_new(self):
622 with self.subTest(i=1):
623 rollback = []
624 rid = str(uuid4())
625 with self.assertRaises(
626 EngineException, msg="Accepted uuid as role name"
627 ) as e:
628 self.topic.new(rollback, self.fake_session, {"name": rid})
629 self.assertEqual(len(rollback), 0, "Wrong rollback length")
630 self.assertEqual(
631 e.exception.http_code,
632 HTTPStatus.UNPROCESSABLE_ENTITY,
633 "Wrong HTTP status code",
634 )
635 self.assertIn(
636 "role name '{}' cannot have an uuid format".format(rid),
637 norm(str(e.exception)),
638 "Wrong exception text",
639 )
640 with self.subTest(i=2):
641 rollback = []
642 self.auth.get_role_list.return_value = [
643 {"_id": str(uuid4()), "name": self.test_name}
644 ]
645 with self.assertRaises(
646 EngineException, msg="Accepted existing role name"
647 ) as e:
648 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
649 self.assertEqual(len(rollback), 0, "Wrong rollback length")
650 self.assertEqual(
651 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
652 )
653 self.assertIn(
654 "role name '{}' exists".format(self.test_name),
655 norm(str(e.exception)),
656 "Wrong exception text",
657 )
659 def test_conflict_on_edit(self):
660 rid = str(uuid4())
661 with self.subTest(i=1):
662 self.auth.get_role_list.return_value = [
663 {"_id": rid, "name": self.test_name, "permissions": {}}
664 ]
665 new_name = str(uuid4())
666 with self.assertRaises(
667 EngineException, msg="Accepted uuid as role name"
668 ) as e:
669 self.topic.edit(self.fake_session, rid, {"name": new_name})
670 self.assertEqual(
671 e.exception.http_code,
672 HTTPStatus.UNPROCESSABLE_ENTITY,
673 "Wrong HTTP status code",
674 )
675 self.assertIn(
676 "role name '{}' cannot have an uuid format".format(new_name),
677 norm(str(e.exception)),
678 "Wrong exception text",
679 )
680 for i, role_name in enumerate(["system_admin", "project_admin"], start=2):
681 with self.subTest(i=i):
682 rid = str(uuid4())
683 self.auth.get_role.return_value = {
684 "_id": rid,
685 "name": role_name,
686 "permissions": {},
687 }
688 with self.assertRaises(
689 EngineException,
690 msg="Accepted renaming of role '{}'".format(role_name),
691 ) as e:
692 self.topic.edit(self.fake_session, rid, {"name": "new-name"})
693 self.assertEqual(
694 e.exception.http_code,
695 HTTPStatus.FORBIDDEN,
696 "Wrong HTTP status code",
697 )
698 self.assertIn(
699 "you cannot rename role '{}'".format(role_name),
700 norm(str(e.exception)),
701 "Wrong exception text",
702 )
703 with self.subTest(i=i + 1):
704 new_name = "new-role-name"
705 self.auth.get_role_list.side_effect = [
706 [{"_id": rid, "name": self.test_name, "permissions": {}}],
707 [{"_id": str(uuid4()), "name": new_name, "permissions": {}}],
708 ]
709 self.auth.get_role.return_value = {
710 "_id": rid,
711 "name": self.test_name,
712 "permissions": {},
713 }
714 with self.assertRaises(
715 EngineException, msg="Accepted existing role name"
716 ) as e:
717 self.topic.edit(self.fake_session, rid, {"name": new_name})
718 self.assertEqual(
719 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
720 )
721 self.assertIn(
722 "role name '{}' exists".format(new_name),
723 norm(str(e.exception)),
724 "Wrong exception text",
725 )
727 def test_conflict_on_del(self):
728 for i, role_name in enumerate(["system_admin", "project_admin"], start=1):
729 with self.subTest(i=i):
730 rid = str(uuid4())
731 role = {"_id": rid, "name": role_name}
732 self.auth.get_role_list.return_value = [role]
733 self.auth.get_role.return_value = role
734 with self.assertRaises(
735 EngineException,
736 msg="Accepted deletion of role '{}'".format(role_name),
737 ) as e:
738 self.topic.delete(self.fake_session, rid)
739 self.assertEqual(
740 e.exception.http_code,
741 HTTPStatus.FORBIDDEN,
742 "Wrong HTTP status code",
743 )
744 self.assertIn(
745 "you cannot delete role '{}'".format(role_name),
746 norm(str(e.exception)),
747 "Wrong exception text",
748 )
749 with self.subTest(i=i + 1):
750 rid = str(uuid4())
751 name = "other-role-name"
752 role = {"_id": rid, "name": name}
753 self.auth.get_role_list.return_value = [role]
754 self.auth.get_role.return_value = role
755 self.auth.get_user_list.return_value = [
756 {
757 "_id": str(uuid4()),
758 "username": self.test_name,
759 "project_role_mappings": [{"project": str(uuid4()), "role": rid}],
760 }
761 ]
762 with self.assertRaises(
763 EngineException, msg="Accepted deletion of used role"
764 ) as e:
765 self.topic.delete(self.fake_session, rid)
766 self.assertEqual(
767 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
768 )
769 self.assertIn(
770 "role '{}' ({}) is being used by user '{}'".format(
771 name, rid, self.test_name
772 ),
773 norm(str(e.exception)),
774 "Wrong exception text",
775 )
778class Test_UserTopicAuth(TestCase):
779 @classmethod
780 def setUpClass(cls):
781 cls.test_name = "test-user-topic"
782 cls.password = "Test@123"
784 def setUp(self):
785 # self.db = Mock(dbbase.DbBase())
786 self.db = DbMemory()
787 self.fs = Mock(fsbase.FsBase())
788 self.msg = Mock(msgbase.MsgBase())
789 self.auth = Mock(authconn.Authconn(None, None, None))
790 self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth)
791 self.fake_session = {
792 "username": test_name,
793 "project_id": (test_pid,),
794 "method": None,
795 "admin": True,
796 "force": False,
797 "public": False,
798 "allow_show_user_project_role": True,
799 }
800 self.topic.check_quota = Mock(return_value=None) # skip quota
802 def test_new_user(self):
803 uid1 = str(uuid4())
804 pid = str(uuid4())
805 self.auth.get_user_list.return_value = []
806 self.auth.get_project.return_value = {"_id": pid, "name": "some_project"}
807 self.auth.create_user.return_value = {"_id": uid1, "username": self.test_name}
808 with self.subTest(i=1):
809 rollback = []
810 rid = str(uuid4())
811 self.auth.get_role.return_value = {"_id": rid, "name": "some_role"}
812 prms_in = [{"project": "some_project", "role": "some_role"}]
813 prms_out = [{"project": pid, "role": rid}]
814 uid2, oid = self.topic.new(
815 rollback,
816 self.fake_session,
817 {
818 "username": self.test_name,
819 "password": self.password,
820 "project_role_mappings": prms_in,
821 },
822 )
823 self.assertEqual(len(rollback), 1, "Wrong rollback length")
824 self.assertEqual(uid2, uid1, "Wrong project identifier")
825 content = self.auth.create_user.call_args[0][0]
826 self.assertEqual(content["username"], self.test_name, "Wrong project name")
827 self.assertEqual(content["password"], self.password, "Wrong password")
828 self.assertEqual(
829 content["project_role_mappings"],
830 prms_out,
831 "Wrong project-role mappings",
832 )
833 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
834 self.assertEqual(
835 content["_admin"]["modified"],
836 content["_admin"]["created"],
837 "Wrong modification time",
838 )
839 with self.subTest(i=2):
840 rollback = []
841 def_rid = str(uuid4())
842 def_role = {"_id": def_rid, "name": "project_admin"}
843 self.auth.get_role.return_value = def_role
844 self.auth.get_role_list.return_value = [def_role]
845 prms_out = [{"project": pid, "role": def_rid}]
846 uid2, oid = self.topic.new(
847 rollback,
848 self.fake_session,
849 {
850 "username": self.test_name,
851 "password": self.password,
852 "projects": ["some_project"],
853 },
854 )
855 self.assertEqual(len(rollback), 1, "Wrong rollback length")
856 self.assertEqual(uid2, uid1, "Wrong project identifier")
857 content = self.auth.create_user.call_args[0][0]
858 self.assertEqual(content["username"], self.test_name, "Wrong project name")
859 self.assertEqual(content["password"], self.password, "Wrong password")
860 self.assertEqual(
861 content["project_role_mappings"],
862 prms_out,
863 "Wrong project-role mappings",
864 )
865 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
866 self.assertEqual(
867 content["_admin"]["modified"],
868 content["_admin"]["created"],
869 "Wrong modification time",
870 )
871 with self.subTest(i=3):
872 rollback = []
873 with self.assertRaises(
874 EngineException, msg="Accepted wrong project-role mappings"
875 ) as e:
876 self.topic.new(
877 rollback,
878 self.fake_session,
879 {
880 "username": "other-project-name",
881 "password": "Other@pwd1",
882 "project_role_mappings": [{}],
883 },
884 )
885 self.assertEqual(len(rollback), 0, "Wrong rollback length")
886 self.assertEqual(
887 e.exception.http_code,
888 HTTPStatus.UNPROCESSABLE_ENTITY,
889 "Wrong HTTP status code",
890 )
891 self.assertIn(
892 "format error at '{}' '{}'".format(
893 "project_role_mappings:{}", "'{}' is a required property"
894 ).format(0, "project"),
895 norm(str(e.exception)),
896 "Wrong exception text",
897 )
898 with self.subTest(i=4):
899 rollback = []
900 with self.assertRaises(EngineException, msg="Accepted wrong projects") as e:
901 self.topic.new(
902 rollback,
903 self.fake_session,
904 {
905 "username": "other-project-name",
906 "password": "Other@pwd1",
907 "projects": [],
908 },
909 )
910 self.assertEqual(len(rollback), 0, "Wrong rollback length")
911 self.assertEqual(
912 e.exception.http_code,
913 HTTPStatus.UNPROCESSABLE_ENTITY,
914 "Wrong HTTP status code",
915 )
916 self.assertIn(
917 "format error at '{}' '{}'".format(
918 "projects", "{} should be non-empty"
919 ).format([]),
920 norm(str(e.exception)),
921 "Wrong exception text",
922 )
924 def test_edit_user(self):
925 now = time()
926 uid = str(uuid4())
927 pid1 = str(uuid4())
928 rid1 = str(uuid4())
929 test_project = {
930 "_id": test_pid,
931 "name": "test",
932 "_admin": {"created": now, "modified": now},
933 }
934 self.db.create("projects", test_project)
935 self.fake_session["user_id"] = uid
936 self.fake_session["admin_show"] = True
937 prms = [
938 {
939 "project": pid1,
940 "project_name": "project-1",
941 "role": rid1,
942 "role_name": "role-1",
943 }
944 ]
945 user = {
946 "_id": uid,
947 "username": self.test_name,
948 "project_role_mappings": prms,
949 "_admin": {"created": now, "modified": now},
950 }
951 with self.subTest(i=1):
952 self.auth.get_user_list.side_effect = [[user], []]
953 self.auth.get_user.return_value = user
954 pid2 = str(uuid4())
955 rid2 = str(uuid4())
956 self.auth.get_project.side_effect = [
957 {"_id": pid2, "name": "project-2"},
958 {"_id": pid1, "name": "project-1"},
959 ]
960 self.auth.get_role.side_effect = [
961 {"_id": rid2, "name": "role-2"},
962 {"_id": rid1, "name": "role-1"},
963 ]
965 role = {
966 "_id": rid1,
967 "name": "role-1",
968 "permissions": {"default": False, "admin": False, "roles": True},
969 }
970 self.db.create("users", user)
971 self.db.create("roles", role)
972 new_name = "new-user-name"
973 new_pasw = "New@pwd1"
974 add_prms = [{"project": pid2, "role": rid2}]
975 rem_prms = [{"project": pid1, "role": rid1}]
976 self.topic.edit(
977 self.fake_session,
978 uid,
979 {
980 "username": new_name,
981 "password": new_pasw,
982 "add_project_role_mappings": add_prms,
983 "remove_project_role_mappings": rem_prms,
984 },
985 )
986 content = self.auth.update_user.call_args[0][0]
987 self.assertEqual(content["_id"], uid, "Wrong user identifier")
988 self.assertEqual(content["username"], new_name, "Wrong user name")
989 self.assertEqual(content["password"], new_pasw, "Wrong user password")
990 self.assertEqual(
991 content["add_project_role_mappings"],
992 add_prms,
993 "Wrong project-role mappings to add",
994 )
995 self.assertEqual(
996 content["remove_project_role_mappings"],
997 prms,
998 "Wrong project-role mappings to remove",
999 )
1000 with self.subTest(i=2):
1001 new_name = "other-user-name"
1002 new_prms = [{}]
1003 self.auth.get_role_list.side_effect = [[user], []]
1004 self.auth.get_user_list.side_effect = [[user]]
1005 with self.assertRaises(
1006 EngineException, msg="Accepted wrong project-role mappings"
1007 ) as e:
1008 self.topic.edit(
1009 self.fake_session,
1010 uid,
1011 {"username": new_name, "project_role_mappings": new_prms},
1012 )
1013 self.assertEqual(
1014 e.exception.http_code,
1015 HTTPStatus.UNPROCESSABLE_ENTITY,
1016 "Wrong HTTP status code",
1017 )
1018 self.assertIn(
1019 "format error at '{}' '{}'".format(
1020 "project_role_mappings:{}", "'{}' is a required property"
1021 ).format(0, "project"),
1022 norm(str(e.exception)),
1023 "Wrong exception text",
1024 )
1025 with self.subTest(i=3):
1026 self.auth.get_user_list.side_effect = [[user], []]
1027 self.auth.get_user.return_value = user
1028 old_password = self.password
1029 new_pasw = "New@pwd1"
1030 self.topic.edit(
1031 self.fake_session,
1032 uid,
1033 {
1034 "old_password": old_password,
1035 "password": new_pasw,
1036 },
1037 )
1038 content = self.auth.update_user.call_args[0][0]
1039 self.assertEqual(
1040 content["old_password"], old_password, "Wrong old password"
1041 )
1042 self.assertEqual(content["password"], new_pasw, "Wrong user password")
1044 def test_delete_user(self):
1045 with self.subTest(i=1):
1046 uid = str(uuid4())
1047 self.fake_session["username"] = self.test_name
1048 user = user = {
1049 "_id": uid,
1050 "username": "other-user-name",
1051 "project_role_mappings": [],
1052 }
1053 self.auth.get_user.return_value = user
1054 self.auth.delete_user.return_value = {"deleted": 1}
1055 rc = self.topic.delete(self.fake_session, uid)
1056 self.assertEqual(rc, {"deleted": 1}, "Wrong user deletion return info")
1057 self.assertEqual(
1058 self.auth.get_user.call_args[0][0], uid, "Wrong user identifier"
1059 )
1060 self.assertEqual(
1061 self.auth.delete_user.call_args[0][0], uid, "Wrong user identifier"
1062 )
1064 def test_conflict_on_new(self):
1065 with self.subTest(i=1):
1066 rollback = []
1067 uid = str(uuid4())
1068 with self.assertRaises(
1069 EngineException, msg="Accepted uuid as username"
1070 ) as e:
1071 self.topic.new(
1072 rollback,
1073 self.fake_session,
1074 {
1075 "username": uid,
1076 "password": self.password,
1077 "projects": [test_pid],
1078 },
1079 )
1080 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1081 self.assertEqual(
1082 e.exception.http_code,
1083 HTTPStatus.UNPROCESSABLE_ENTITY,
1084 "Wrong HTTP status code",
1085 )
1086 self.assertIn(
1087 "username '{}' cannot have a uuid format".format(uid),
1088 norm(str(e.exception)),
1089 "Wrong exception text",
1090 )
1091 with self.subTest(i=2):
1092 rollback = []
1093 self.auth.get_user_list.return_value = [
1094 {"_id": str(uuid4()), "username": self.test_name}
1095 ]
1096 with self.assertRaises(
1097 EngineException, msg="Accepted existing username"
1098 ) as e:
1099 self.topic.new(
1100 rollback,
1101 self.fake_session,
1102 {
1103 "username": self.test_name,
1104 "password": self.password,
1105 "projects": [test_pid],
1106 },
1107 )
1108 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1109 self.assertEqual(
1110 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1111 )
1112 self.assertIn(
1113 "username '{}' is already used".format(self.test_name),
1114 norm(str(e.exception)),
1115 "Wrong exception text",
1116 )
1117 with self.subTest(i=3):
1118 rollback = []
1119 self.auth.get_user_list.return_value = []
1120 self.auth.get_role_list.side_effect = [[], []]
1121 with self.assertRaises(
1122 AuthconnNotFoundException, msg="Accepted user without default role"
1123 ) as e:
1124 self.topic.new(
1125 rollback,
1126 self.fake_session,
1127 {
1128 "username": self.test_name,
1129 "password": self.password,
1130 "projects": [str(uuid4())],
1131 },
1132 )
1133 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1134 self.assertEqual(
1135 e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code"
1136 )
1137 self.assertIn(
1138 "can't find default role for user '{}'".format(self.test_name),
1139 norm(str(e.exception)),
1140 "Wrong exception text",
1141 )
1143 def test_conflict_on_edit(self):
1144 uid = str(uuid4())
1145 with self.subTest(i=1):
1146 self.auth.get_user_list.return_value = [
1147 {"_id": uid, "username": self.test_name}
1148 ]
1149 new_name = str(uuid4())
1150 with self.assertRaises(
1151 EngineException, msg="Accepted uuid as username"
1152 ) as e:
1153 self.topic.edit(self.fake_session, uid, {"username": new_name})
1154 self.assertEqual(
1155 e.exception.http_code,
1156 HTTPStatus.UNPROCESSABLE_ENTITY,
1157 "Wrong HTTP status code",
1158 )
1159 self.assertIn(
1160 "username '{}' cannot have an uuid format".format(new_name),
1161 norm(str(e.exception)),
1162 "Wrong exception text",
1163 )
1164 with self.subTest(i=2):
1165 self.auth.get_user_list.return_value = [
1166 {"_id": uid, "username": self.test_name}
1167 ]
1168 self.auth.get_role_list.side_effect = [[], []]
1169 with self.assertRaises(
1170 AuthconnNotFoundException, msg="Accepted user without default role"
1171 ) as e:
1172 self.topic.edit(self.fake_session, uid, {"projects": [str(uuid4())]})
1173 self.assertEqual(
1174 e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code"
1175 )
1176 self.assertIn(
1177 "can't find a default role for user '{}'".format(self.test_name),
1178 norm(str(e.exception)),
1179 "Wrong exception text",
1180 )
1181 with self.subTest(i=3):
1182 admin_uid = str(uuid4())
1183 self.auth.get_user_list.return_value = [
1184 {"_id": admin_uid, "username": "admin"}
1185 ]
1186 with self.assertRaises(
1187 EngineException,
1188 msg="Accepted removing system_admin role from admin user",
1189 ) as e:
1190 self.topic.edit(
1191 self.fake_session,
1192 admin_uid,
1193 {
1194 "remove_project_role_mappings": [
1195 {"project": "admin", "role": "system_admin"}
1196 ]
1197 },
1198 )
1199 self.assertEqual(
1200 e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code"
1201 )
1202 self.assertIn(
1203 "you cannot remove system_admin role from admin user",
1204 norm(str(e.exception)),
1205 "Wrong exception text",
1206 )
1207 with self.subTest(i=4):
1208 new_name = "new-user-name"
1209 self.auth.get_user_list.side_effect = [
1210 [{"_id": uid, "name": self.test_name}],
1211 [{"_id": str(uuid4()), "name": new_name}],
1212 ]
1213 with self.assertRaises(
1214 EngineException, msg="Accepted existing username"
1215 ) as e:
1216 self.topic.edit(self.fake_session, uid, {"username": new_name})
1217 self.assertEqual(
1218 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1219 )
1220 self.assertIn(
1221 "username '{}' is already used".format(new_name),
1222 norm(str(e.exception)),
1223 "Wrong exception text",
1224 )
1226 def test_conflict_on_del(self):
1227 with self.subTest(i=1):
1228 uid = str(uuid4())
1229 self.fake_session["username"] = self.test_name
1230 user = user = {
1231 "_id": uid,
1232 "username": self.test_name,
1233 "project_role_mappings": [],
1234 }
1235 self.auth.get_user.return_value = user
1236 with self.assertRaises(
1237 EngineException, msg="Accepted deletion of own user"
1238 ) as e:
1239 self.topic.delete(self.fake_session, uid)
1240 self.assertEqual(
1241 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1242 )
1243 self.assertIn(
1244 "you cannot delete your own login user",
1245 norm(str(e.exception)),
1246 "Wrong exception text",
1247 )
1249 def test_user_management(self):
1250 self.config = {
1251 "user_management": True,
1252 "pwd_expire_days": 30,
1253 "max_pwd_attempt": 5,
1254 "account_expire_days": 90,
1255 "version": "dev",
1256 "deviceVendor": "test",
1257 "deviceProduct": "test",
1258 }
1259 self.permissions = {"admin": True, "default": True}
1260 now = time()
1261 rid = str(uuid4())
1262 role = {
1263 "_id": rid,
1264 "name": self.test_name,
1265 "permissions": self.permissions,
1266 "_admin": {"created": now, "modified": now},
1267 }
1268 self.db.create("roles", role)
1269 admin_user = {
1270 "_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500",
1271 "username": "admin",
1272 "password": "bf0d9f988ad9b404464cf8c8749b298209b05fd404119bae0c11e247efbbc4cb",
1273 "_admin": {
1274 "created": 1663058370.7721832,
1275 "modified": 1663681183.5651639,
1276 "salt": "37587e7e0c2f4dbfb9416f3fb5543e2b",
1277 "last_token_time": 1666876472.2962265,
1278 "user_status": "always-active",
1279 "retry_count": 0,
1280 },
1281 "project_role_mappings": [
1282 {"project": "a595ce4e-09dc-4b24-9d6f-e723830bc66b", "role": rid}
1283 ],
1284 }
1285 self.db.create("users", admin_user)
1286 with self.subTest(i=1):
1287 self.user_create = AuthconnInternal(self.config, self.db, self.permissions)
1288 user_info = {"username": "user_mgmt_true", "password": "Test@123"}
1289 self.user_create.create_user(user_info)
1290 user = self.db.get_one("users", {"username": user_info["username"]})
1291 self.assertEqual(user["username"], user_info["username"], "Wrong user name")
1292 self.assertEqual(
1293 user["_admin"]["user_status"], "active", "User status is unknown"
1294 )
1295 self.assertIn("password_expire_time", user["_admin"], "Key is not there")
1296 self.assertIn("account_expire_time", user["_admin"], "Key is not there")
1297 with self.subTest(i=2):
1298 self.user_update = AuthconnInternal(self.config, self.db, self.permissions)
1299 locked_user = {
1300 "username": "user_lock",
1301 "password": "c94ba8cfe81985cf5c84dff16d5bac95814ab17e44a8871755eb4cf3a27b7d3d",
1302 "_admin": {
1303 "created": 1667207552.2191198,
1304 "modified": 1667207552.2191815,
1305 "salt": "560a5d51b1d64bb4b9cae0ccff3f1102",
1306 "user_status": "locked",
1307 "password_expire_time": 1667207552.2191815,
1308 "account_expire_time": now + 60,
1309 "retry_count": 5,
1310 "last_token_time": 1667207552.2191815,
1311 },
1312 "_id": "73bbbb71-ed38-4b79-9f58-ece19e7e32d6",
1313 }
1314 self.db.create("users", locked_user)
1315 user_info = {
1316 "_id": "73bbbb71-ed38-4b79-9f58-ece19e7e32d6",
1317 "system_admin_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500",
1318 "unlock": True,
1319 }
1320 self.assertEqual(
1321 locked_user["_admin"]["user_status"], "locked", "User status is unknown"
1322 )
1323 self.user_update.update_user(user_info)
1324 user = self.db.get_one("users", {"username": locked_user["username"]})
1325 self.assertEqual(
1326 user["username"], locked_user["username"], "Wrong user name"
1327 )
1328 self.assertEqual(
1329 user["_admin"]["user_status"], "active", "User status is unknown"
1330 )
1331 self.assertEqual(user["_admin"]["retry_count"], 0, "retry_count is unknown")
1332 with self.subTest(i=3):
1333 self.user_update = AuthconnInternal(self.config, self.db, self.permissions)
1334 expired_user = {
1335 "username": "user_expire",
1336 "password": "c94ba8cfe81985cf5c84dff16d5bac95814ab17e44a8871755eb4cf3a27b7d3d",
1337 "_admin": {
1338 "created": 1665602087.601298,
1339 "modified": 1665636442.1245084,
1340 "salt": "560a5d51b1d64bb4b9cae0ccff3f1102",
1341 "user_status": "expired",
1342 "password_expire_time": 1668248628.2191815,
1343 "account_expire_time": 1666952628.2191815,
1344 "retry_count": 0,
1345 "last_token_time": 1666779828.2171815,
1346 },
1347 "_id": "3266430f-8222-407f-b08f-3a242504ab94",
1348 }
1349 self.db.create("users", expired_user)
1350 user_info = {
1351 "_id": "3266430f-8222-407f-b08f-3a242504ab94",
1352 "system_admin_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500",
1353 "renew": True,
1354 }
1355 self.assertEqual(
1356 expired_user["_admin"]["user_status"],
1357 "expired",
1358 "User status is unknown",
1359 )
1360 self.user_update.update_user(user_info)
1361 user = self.db.get_one("users", {"username": expired_user["username"]})
1362 self.assertEqual(
1363 user["username"], expired_user["username"], "Wrong user name"
1364 )
1365 self.assertEqual(
1366 user["_admin"]["user_status"], "active", "User status is unknown"
1367 )
1368 self.assertGreater(
1369 user["_admin"]["account_expire_time"],
1370 expired_user["_admin"]["account_expire_time"],
1371 "User expire time is not get extended",
1372 )
1373 with self.subTest(i=4):
1374 self.config.update({"user_management": False})
1375 self.user_create = AuthconnInternal(self.config, self.db, self.permissions)
1376 user_info = {"username": "user_mgmt_false", "password": "Test@123"}
1377 self.user_create.create_user(user_info)
1378 user = self.db.get_one("users", {"username": user_info["username"]})
1379 self.assertEqual(user["username"], user_info["username"], "Wrong user name")
1380 self.assertEqual(
1381 user["_admin"]["user_status"], "active", "User status is unknown"
1382 )
1383 self.assertNotIn("password_expire_time", user["_admin"], "Key is not there")
1384 self.assertNotIn("account_expire_time", user["_admin"], "Key is not there")
1387class Test_CommonVimWimSdn(TestCase):
1388 @classmethod
1389 def setUpClass(cls):
1390 cls.test_name = "test-cim-topic" # CIM = Common Infrastructure Manager
1392 def setUp(self):
1393 self.db = Mock(dbbase.DbBase())
1394 self.fs = Mock(fsbase.FsBase())
1395 self.msg = Mock(msgbase.MsgBase())
1396 self.auth = Mock(authconn.Authconn(None, None, None))
1397 self.topic = CommonVimWimSdn(self.db, self.fs, self.msg, self.auth)
1398 # Use WIM schemas for testing because they are the simplest
1399 self.topic._send_msg = Mock()
1400 self.topic.topic = "wims"
1401 self.topic.schema_new = validation.wim_account_new_schema
1402 self.topic.schema_edit = validation.wim_account_edit_schema
1403 self.fake_session = {
1404 "username": test_name,
1405 "project_id": (test_pid,),
1406 "method": None,
1407 "admin": True,
1408 "force": False,
1409 "public": False,
1410 "allow_show_user_project_role": True,
1411 }
1412 self.topic.check_quota = Mock(return_value=None) # skip quota
1414 def test_new_cvws(self):
1415 test_url = "http://0.0.0.0:0"
1416 with self.subTest(i=1):
1417 rollback = []
1418 test_type = "fake"
1419 self.db.get_one.return_value = None
1420 self.db.create.side_effect = lambda self, content: content["_id"]
1421 cid, oid = self.topic.new(
1422 rollback,
1423 self.fake_session,
1424 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type},
1425 )
1426 self.assertEqual(len(rollback), 1, "Wrong rollback length")
1427 args = self.db.create.call_args[0]
1428 content = args[1]
1429 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
1430 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
1431 self.assertEqual(content["name"], self.test_name, "Wrong CIM name")
1432 self.assertEqual(content["wim_url"], test_url, "Wrong URL")
1433 self.assertEqual(content["wim_type"], test_type, "Wrong CIM type")
1434 self.assertEqual(content["schema_version"], "1.11", "Wrong schema version")
1435 self.assertEqual(content["op_id"], oid, "Wrong operation identifier")
1436 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
1437 self.assertEqual(
1438 content["_admin"]["modified"],
1439 content["_admin"]["created"],
1440 "Wrong modification time",
1441 )
1442 self.assertEqual(
1443 content["_admin"]["operationalState"],
1444 "PROCESSING",
1445 "Wrong operational state",
1446 )
1447 self.assertEqual(
1448 content["_admin"]["projects_read"],
1449 [test_pid],
1450 "Wrong read-only projects",
1451 )
1452 self.assertEqual(
1453 content["_admin"]["projects_write"],
1454 [test_pid],
1455 "Wrong read/write projects",
1456 )
1457 self.assertIsNone(
1458 content["_admin"]["current_operation"], "Wrong current operation"
1459 )
1460 self.assertEqual(
1461 len(content["_admin"]["operations"]), 1, "Wrong number of operations"
1462 )
1463 operation = content["_admin"]["operations"][0]
1464 self.assertEqual(
1465 operation["lcmOperationType"], "create", "Wrong operation type"
1466 )
1467 self.assertEqual(
1468 operation["operationState"], "PROCESSING", "Wrong operation state"
1469 )
1470 self.assertGreater(
1471 operation["startTime"],
1472 content["_admin"]["created"],
1473 "Wrong operation start time",
1474 )
1475 self.assertGreater(
1476 operation["statusEnteredTime"],
1477 content["_admin"]["created"],
1478 "Wrong operation status enter time",
1479 )
1480 self.assertEqual(
1481 operation["detailed-status"], "", "Wrong operation detailed status info"
1482 )
1483 self.assertIsNone(
1484 operation["operationParams"], "Wrong operation parameters"
1485 )
1486 # This test is disabled. From Feature 8030 we admit all WIM/SDN types
1487 # with self.subTest(i=2):
1488 # rollback = []
1489 # test_type = "bad_type"
1490 # with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e:
1491 # self.topic.new(rollback, self.fake_session,
1492 # {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
1493 # self.assertEqual(len(rollback), 0, "Wrong rollback length")
1494 # self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
1495 # self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type,""),
1496 # norm(str(e.exception)), "Wrong exception text")
1498 def test_conflict_on_new(self):
1499 with self.subTest(i=1):
1500 rollback = []
1501 test_url = "http://0.0.0.0:0"
1502 test_type = "fake"
1503 self.db.get_one.return_value = {"_id": str(uuid4()), "name": self.test_name}
1504 with self.assertRaises(
1505 EngineException, msg="Accepted existing CIM name"
1506 ) as e:
1507 self.topic.new(
1508 rollback,
1509 self.fake_session,
1510 {
1511 "name": self.test_name,
1512 "wim_url": test_url,
1513 "wim_type": test_type,
1514 },
1515 )
1516 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1517 self.assertEqual(
1518 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1519 )
1520 self.assertIn(
1521 "name '{}' already exists for {}".format(
1522 self.test_name, self.topic.topic
1523 ),
1524 norm(str(e.exception)),
1525 "Wrong exception text",
1526 )
1528 def test_edit_cvws(self):
1529 now = time()
1530 cid = str(uuid4())
1531 test_url = "http://0.0.0.0:0"
1532 test_type = "fake"
1533 cvws = {
1534 "_id": cid,
1535 "name": self.test_name,
1536 "wim_url": test_url,
1537 "wim_type": test_type,
1538 "_admin": {
1539 "created": now,
1540 "modified": now,
1541 "operations": [{"lcmOperationType": "create"}],
1542 },
1543 }
1544 with self.subTest(i=1):
1545 new_name = "new-cim-name"
1546 new_url = "https://1.1.1.1:1"
1547 new_type = "onos"
1548 self.db.get_one.side_effect = [cvws, None]
1549 self.db.replace.return_value = {"updated": 1}
1550 # self.db.encrypt.side_effect = [b64str(), b64str()]
1551 self.topic.edit(
1552 self.fake_session,
1553 cid,
1554 {"name": new_name, "wim_url": new_url, "wim_type": new_type},
1555 )
1556 args = self.db.replace.call_args[0]
1557 content = args[2]
1558 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
1559 self.assertEqual(args[1], cid, "Wrong CIM identifier")
1560 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
1561 self.assertEqual(content["name"], new_name, "Wrong CIM name")
1562 self.assertEqual(content["wim_type"], new_type, "Wrong CIM type")
1563 self.assertEqual(content["wim_url"], new_url, "Wrong URL")
1564 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
1565 self.assertGreater(
1566 content["_admin"]["modified"],
1567 content["_admin"]["created"],
1568 "Wrong modification time",
1569 )
1570 self.assertEqual(
1571 len(content["_admin"]["operations"]), 2, "Wrong number of operations"
1572 )
1573 operation = content["_admin"]["operations"][1]
1574 self.assertEqual(
1575 operation["lcmOperationType"], "edit", "Wrong operation type"
1576 )
1577 self.assertEqual(
1578 operation["operationState"], "PROCESSING", "Wrong operation state"
1579 )
1580 self.assertGreater(
1581 operation["startTime"],
1582 content["_admin"]["modified"],
1583 "Wrong operation start time",
1584 )
1585 self.assertGreater(
1586 operation["statusEnteredTime"],
1587 content["_admin"]["modified"],
1588 "Wrong operation status enter time",
1589 )
1590 self.assertEqual(
1591 operation["detailed-status"], "", "Wrong operation detailed status info"
1592 )
1593 self.assertIsNone(
1594 operation["operationParams"], "Wrong operation parameters"
1595 )
1596 with self.subTest(i=2):
1597 self.db.get_one.side_effect = [cvws]
1598 with self.assertRaises(EngineException, msg="Accepted wrong property") as e:
1599 self.topic.edit(
1600 self.fake_session,
1601 str(uuid4()),
1602 {"name": "new-name", "extra_prop": "anything"},
1603 )
1604 self.assertEqual(
1605 e.exception.http_code,
1606 HTTPStatus.UNPROCESSABLE_ENTITY,
1607 "Wrong HTTP status code",
1608 )
1609 self.assertIn(
1610 "format error '{}'".format(
1611 "additional properties are not allowed ('{}' was unexpected)"
1612 ).format("extra_prop"),
1613 norm(str(e.exception)),
1614 "Wrong exception text",
1615 )
1617 def test_conflict_on_edit(self):
1618 with self.subTest(i=1):
1619 cid = str(uuid4())
1620 new_name = "new-cim-name"
1621 self.db.get_one.side_effect = [
1622 {"_id": cid, "name": self.test_name},
1623 {"_id": str(uuid4()), "name": new_name},
1624 ]
1625 with self.assertRaises(
1626 EngineException, msg="Accepted existing CIM name"
1627 ) as e:
1628 self.topic.edit(self.fake_session, cid, {"name": new_name})
1629 self.assertEqual(
1630 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1631 )
1632 self.assertIn(
1633 "name '{}' already exists for {}".format(new_name, self.topic.topic),
1634 norm(str(e.exception)),
1635 "Wrong exception text",
1636 )
1638 def test_delete_cvws(self):
1639 cid = str(uuid4())
1640 ro_pid = str(uuid4())
1641 rw_pid = str(uuid4())
1642 cvws = {"_id": cid, "name": self.test_name}
1643 self.db.get_list.return_value = []
1644 with self.subTest(i=1):
1645 cvws["_admin"] = {
1646 "projects_read": [test_pid, ro_pid, rw_pid],
1647 "projects_write": [test_pid, rw_pid],
1648 }
1649 self.db.get_one.return_value = cvws
1650 oid = self.topic.delete(self.fake_session, cid)
1651 self.assertIsNone(oid, "Wrong operation identifier")
1652 self.assertEqual(
1653 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1654 )
1655 self.assertEqual(
1656 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1657 )
1658 self.assertEqual(
1659 self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic"
1660 )
1661 self.assertEqual(
1662 self.db.set_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1663 )
1664 self.assertEqual(
1665 self.db.set_one.call_args[1]["update_dict"],
1666 None,
1667 "Wrong read-only projects update",
1668 )
1669 self.assertEqual(
1670 self.db.set_one.call_args[1]["pull_list"],
1671 {
1672 "_admin.projects_read": (test_pid,),
1673 "_admin.projects_write": (test_pid,),
1674 },
1675 "Wrong read/write projects update",
1676 )
1677 self.topic._send_msg.assert_not_called()
1678 with self.subTest(i=2):
1679 now = time()
1680 cvws["_admin"] = {
1681 "projects_read": [test_pid],
1682 "projects_write": [test_pid],
1683 "operations": [],
1684 }
1685 self.db.get_one.return_value = cvws
1686 oid = self.topic.delete(self.fake_session, cid)
1687 self.assertEqual(oid, cid + ":0", "Wrong operation identifier")
1688 self.assertEqual(
1689 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1690 )
1691 self.assertEqual(
1692 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1693 )
1694 self.assertEqual(
1695 self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic"
1696 )
1697 self.assertEqual(
1698 self.db.set_one.call_args[0][1]["_id"], cid, "Wrong user identifier"
1699 )
1700 self.assertEqual(
1701 self.db.set_one.call_args[1]["update_dict"],
1702 {"_admin.to_delete": True},
1703 "Wrong _admin.to_delete update",
1704 )
1705 operation = self.db.set_one.call_args[1]["push"]["_admin.operations"]
1706 self.assertEqual(
1707 operation["lcmOperationType"], "delete", "Wrong operation type"
1708 )
1709 self.assertEqual(
1710 operation["operationState"], "PROCESSING", "Wrong operation state"
1711 )
1712 self.assertEqual(
1713 operation["detailed-status"], "", "Wrong operation detailed status"
1714 )
1715 self.assertIsNone(
1716 operation["operationParams"], "Wrong operation parameters"
1717 )
1718 self.assertGreater(
1719 operation["startTime"], now, "Wrong operation start time"
1720 )
1721 self.assertGreater(
1722 operation["statusEnteredTime"], now, "Wrong operation status enter time"
1723 )
1724 self.topic._send_msg.assert_called_once_with(
1725 "delete", {"_id": cid, "op_id": cid + ":0"}, not_send_msg=None
1726 )
1727 with self.subTest(i=3):
1728 cvws["_admin"] = {
1729 "projects_read": [],
1730 "projects_write": [],
1731 "operations": [],
1732 }
1733 self.db.get_one.return_value = cvws
1734 self.topic._send_msg.reset_mock()
1735 self.db.get_one.reset_mock()
1736 self.db.del_one.reset_mock()
1737 self.fake_session["force"] = True # to force deletion
1738 self.fake_session["admin"] = True # to force deletion
1739 self.fake_session["project_id"] = [] # to force deletion
1740 oid = self.topic.delete(self.fake_session, cid)
1741 self.assertIsNone(oid, "Wrong operation identifier")
1742 self.assertEqual(
1743 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1744 )
1745 self.assertEqual(
1746 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1747 )
1748 self.assertEqual(
1749 self.db.del_one.call_args[0][0], self.topic.topic, "Wrong topic"
1750 )
1751 self.assertEqual(
1752 self.db.del_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1753 )
1754 self.topic._send_msg.assert_called_once_with(
1755 "deleted", {"_id": cid, "op_id": None}, not_send_msg=None
1756 )
1759if __name__ == "__main__":
1760 unittest.main()