Coverage for osm_nbi/tests/test_admin_topics.py: 99%
755 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-12 20:04 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-12 20:04 +0000
1#! /usr/bin/python3
2# -*- coding: utf-8 -*-
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13# implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
17__author__ = "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com"
18__date__ = "$2019-10-019"
20import unittest
21import random
22from unittest import TestCase
23from unittest.mock import Mock, patch, call
24from uuid import uuid4
25from http import HTTPStatus
26from time import time
27from osm_common import dbbase, fsbase, msgbase
28from osm_common.dbmemory import DbMemory
29from osm_nbi import authconn, validation
30from osm_nbi.admin_topics import (
31 ProjectTopicAuth,
32 RoleTopicAuth,
33 UserTopicAuth,
34 CommonVimWimSdn,
35 VcaTopic,
36)
37from osm_nbi.engine import EngineException
38from osm_nbi.authconn import AuthconnNotFoundException
39from osm_nbi.authconn_internal import AuthconnInternal
42test_pid = str(uuid4())
43test_name = "test-user"
46def norm(str):
47 """Normalize string for checking"""
48 return " ".join(str.strip().split()).lower()
51class TestVcaTopic(TestCase):
52 def setUp(self):
53 self.db = Mock(dbbase.DbBase())
54 self.fs = Mock(fsbase.FsBase())
55 self.msg = Mock(msgbase.MsgBase())
56 self.auth = Mock(authconn.Authconn(None, None, None))
57 self.vca_topic = VcaTopic(self.db, self.fs, self.msg, self.auth)
59 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_new")
60 def test_format_on_new(self, mock_super_format_on_new):
61 content = {
62 "_id": "id",
63 "secret": "encrypted_secret",
64 "cacert": "encrypted_cacert",
65 }
66 self.db.encrypt.side_effect = ["secret", "cacert"]
67 mock_super_format_on_new.return_value = "1234"
69 oid = self.vca_topic.format_on_new(content)
71 self.assertEqual(oid, "1234")
72 self.assertEqual(content["secret"], "secret")
73 self.assertEqual(content["cacert"], "cacert")
74 self.db.encrypt.assert_has_calls(
75 [
76 call("encrypted_secret", schema_version="1.11", salt="id"),
77 call("encrypted_cacert", schema_version="1.11", salt="id"),
78 ]
79 )
80 mock_super_format_on_new.assert_called_with(content, None, False)
82 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_edit")
83 def test_format_on_edit(self, mock_super_format_on_edit):
84 edit_content = {
85 "_id": "id",
86 "secret": "encrypted_secret",
87 "cacert": "encrypted_cacert",
88 }
89 final_content = {
90 "_id": "id",
91 "schema_version": "1.11",
92 }
93 self.db.encrypt.side_effect = ["secret", "cacert"]
94 mock_super_format_on_edit.return_value = "1234"
96 oid = self.vca_topic.format_on_edit(final_content, edit_content)
98 self.assertEqual(oid, "1234")
99 self.assertEqual(final_content["secret"], "secret")
100 self.assertEqual(final_content["cacert"], "cacert")
101 self.db.encrypt.assert_has_calls(
102 [
103 call("encrypted_secret", schema_version="1.11", salt="id"),
104 call("encrypted_cacert", schema_version="1.11", salt="id"),
105 ]
106 )
107 mock_super_format_on_edit.assert_called()
109 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
110 def test_check_conflict_on_del(self, mock_check_conflict_on_del):
111 session = {
112 "project_id": "project-id",
113 "force": False,
114 }
115 _id = "vca-id"
116 db_content = {}
118 self.db.get_list.return_value = None
120 self.vca_topic.check_conflict_on_del(session, _id, db_content)
122 self.db.get_list.assert_called_with(
123 "vim_accounts",
124 {"vca": _id, "_admin.projects_read.cont": "project-id"},
125 )
126 mock_check_conflict_on_del.assert_called_with(session, _id, db_content)
128 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
129 def test_check_conflict_on_del_force(self, mock_check_conflict_on_del):
130 session = {
131 "project_id": "project-id",
132 "force": True,
133 }
134 _id = "vca-id"
135 db_content = {}
137 self.vca_topic.check_conflict_on_del(session, _id, db_content)
139 self.db.get_list.assert_not_called()
140 mock_check_conflict_on_del.assert_not_called()
142 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
143 def test_check_conflict_on_del_with_conflict(self, mock_check_conflict_on_del):
144 session = {
145 "project_id": "project-id",
146 "force": False,
147 }
148 _id = "vca-id"
149 db_content = {}
151 self.db.get_list.return_value = {"_id": "vim", "vca": "vca-id"}
153 with self.assertRaises(EngineException) as context:
154 self.vca_topic.check_conflict_on_del(session, _id, db_content)
155 self.assertEqual(
156 context.exception,
157 EngineException(
158 "There is at least one VIM account using this vca",
159 http_code=HTTPStatus.CONFLICT,
160 ),
161 )
163 self.db.get_list.assert_called_with(
164 "vim_accounts",
165 {"vca": _id, "_admin.projects_read.cont": "project-id"},
166 )
167 mock_check_conflict_on_del.assert_not_called()
170class Test_ProjectTopicAuth(TestCase):
171 @classmethod
172 def setUpClass(cls):
173 cls.test_name = "test-project-topic"
175 def setUp(self):
176 self.db = Mock(dbbase.DbBase())
177 self.fs = Mock(fsbase.FsBase())
178 self.msg = Mock(msgbase.MsgBase())
179 self.auth = Mock(authconn.Authconn(None, None, None))
180 self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth)
181 self.fake_session = {
182 "username": self.test_name,
183 "project_id": (test_pid,),
184 "method": None,
185 "admin": True,
186 "force": False,
187 "public": False,
188 "allow_show_user_project_role": True,
189 }
190 self.topic.check_quota = Mock(return_value=None) # skip quota
192 def test_new_project(self):
193 with self.subTest(i=1):
194 rollback = []
195 pid1 = str(uuid4())
196 self.auth.get_project_list.return_value = []
197 self.auth.create_project.return_value = pid1
198 pid2, oid = self.topic.new(
199 rollback, self.fake_session, {"name": self.test_name, "quotas": {}}
200 )
201 self.assertEqual(len(rollback), 1, "Wrong rollback length")
202 self.assertEqual(pid2, pid1, "Wrong project identifier")
203 content = self.auth.create_project.call_args[0][0]
204 self.assertEqual(content["name"], self.test_name, "Wrong project name")
205 self.assertEqual(content["quotas"], {}, "Wrong quotas")
206 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
207 self.assertEqual(
208 content["_admin"]["modified"],
209 content["_admin"]["created"],
210 "Wrong modification time",
211 )
212 with self.subTest(i=2):
213 rollback = []
214 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
215 self.topic.new(
216 rollback,
217 self.fake_session,
218 {"name": "other-project-name", "quotas": {"baditems": 10}},
219 )
220 self.assertEqual(len(rollback), 0, "Wrong rollback length")
221 self.assertEqual(
222 e.exception.http_code,
223 HTTPStatus.UNPROCESSABLE_ENTITY,
224 "Wrong HTTP status code",
225 )
226 self.assertIn(
227 "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format(
228 "baditems"
229 ),
230 norm(str(e.exception)),
231 "Wrong exception text",
232 )
234 def test_edit_project(self):
235 now = time()
236 pid = str(uuid4())
237 proj = {
238 "_id": pid,
239 "name": self.test_name,
240 "_admin": {"created": now, "modified": now},
241 }
242 with self.subTest(i=1):
243 self.auth.get_project_list.side_effect = [[proj], []]
244 new_name = "new-project-name"
245 quotas = {
246 "vnfds": random.SystemRandom().randint(0, 100),
247 "nsds": random.SystemRandom().randint(0, 100),
248 }
249 self.topic.edit(
250 self.fake_session, pid, {"name": new_name, "quotas": quotas}
251 )
252 _id, content = self.auth.update_project.call_args[0]
253 self.assertEqual(_id, pid, "Wrong project identifier")
254 self.assertEqual(content["_id"], pid, "Wrong project identifier")
255 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
256 self.assertGreater(
257 content["_admin"]["modified"], now, "Wrong modification time"
258 )
259 self.assertEqual(content["name"], new_name, "Wrong project name")
260 self.assertEqual(content["quotas"], quotas, "Wrong quotas")
261 with self.subTest(i=2):
262 new_name = "other-project-name"
263 quotas = {"baditems": random.SystemRandom().randint(0, 100)}
264 self.auth.get_project_list.side_effect = [[proj], []]
265 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
266 self.topic.edit(
267 self.fake_session, pid, {"name": new_name, "quotas": quotas}
268 )
269 self.assertEqual(
270 e.exception.http_code,
271 HTTPStatus.UNPROCESSABLE_ENTITY,
272 "Wrong HTTP status code",
273 )
274 self.assertIn(
275 "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format(
276 "baditems"
277 ),
278 norm(str(e.exception)),
279 "Wrong exception text",
280 )
282 def test_conflict_on_new(self):
283 with self.subTest(i=1):
284 rollback = []
285 pid = str(uuid4())
286 with self.assertRaises(
287 EngineException, msg="Accepted uuid as project name"
288 ) as e:
289 self.topic.new(rollback, self.fake_session, {"name": pid})
290 self.assertEqual(len(rollback), 0, "Wrong rollback length")
291 self.assertEqual(
292 e.exception.http_code,
293 HTTPStatus.UNPROCESSABLE_ENTITY,
294 "Wrong HTTP status code",
295 )
296 self.assertIn(
297 "project name '{}' cannot have an uuid format".format(pid),
298 norm(str(e.exception)),
299 "Wrong exception text",
300 )
301 with self.subTest(i=2):
302 rollback = []
303 self.auth.get_project_list.return_value = [
304 {"_id": test_pid, "name": self.test_name}
305 ]
306 with self.assertRaises(
307 EngineException, msg="Accepted existing project name"
308 ) as e:
309 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
310 self.assertEqual(len(rollback), 0, "Wrong rollback length")
311 self.assertEqual(
312 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
313 )
314 self.assertIn(
315 "project '{}' exists".format(self.test_name),
316 norm(str(e.exception)),
317 "Wrong exception text",
318 )
320 def test_conflict_on_edit(self):
321 with self.subTest(i=1):
322 self.auth.get_project_list.return_value = [
323 {"_id": test_pid, "name": self.test_name}
324 ]
325 new_name = str(uuid4())
326 with self.assertRaises(
327 EngineException, msg="Accepted uuid as project name"
328 ) as e:
329 self.topic.edit(self.fake_session, test_pid, {"name": new_name})
330 self.assertEqual(
331 e.exception.http_code,
332 HTTPStatus.UNPROCESSABLE_ENTITY,
333 "Wrong HTTP status code",
334 )
335 self.assertIn(
336 "project name '{}' cannot have an uuid format".format(new_name),
337 norm(str(e.exception)),
338 "Wrong exception text",
339 )
340 with self.subTest(i=2):
341 pid = str(uuid4())
342 self.auth.get_project_list.return_value = [{"_id": pid, "name": "admin"}]
343 with self.assertRaises(
344 EngineException, msg="Accepted renaming of project 'admin'"
345 ) as e:
346 self.topic.edit(self.fake_session, pid, {"name": "new-name"})
347 self.assertEqual(
348 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
349 )
350 self.assertIn(
351 "you cannot rename project 'admin'",
352 norm(str(e.exception)),
353 "Wrong exception text",
354 )
355 with self.subTest(i=3):
356 new_name = "new-project-name"
357 self.auth.get_project_list.side_effect = [
358 [{"_id": test_pid, "name": self.test_name}],
359 [{"_id": str(uuid4()), "name": new_name}],
360 ]
361 with self.assertRaises(
362 EngineException, msg="Accepted existing project name"
363 ) as e:
364 self.topic.edit(self.fake_session, pid, {"name": new_name})
365 self.assertEqual(
366 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
367 )
368 self.assertIn(
369 "project '{}' is already used".format(new_name),
370 norm(str(e.exception)),
371 "Wrong exception text",
372 )
374 def test_delete_project(self):
375 with self.subTest(i=1):
376 pid = str(uuid4())
377 self.auth.get_project.return_value = {
378 "_id": pid,
379 "name": "other-project-name",
380 }
381 self.auth.delete_project.return_value = {"deleted": 1}
382 self.auth.get_user_list.return_value = []
383 self.db.get_list.return_value = []
384 rc = self.topic.delete(self.fake_session, pid)
385 self.assertEqual(rc, {"deleted": 1}, "Wrong project deletion return info")
386 self.assertEqual(
387 self.auth.get_project.call_args[0][0], pid, "Wrong project identifier"
388 )
389 self.assertEqual(
390 self.auth.delete_project.call_args[0][0],
391 pid,
392 "Wrong project identifier",
393 )
395 def test_conflict_on_del(self):
396 with self.subTest(i=1):
397 self.auth.get_project.return_value = {
398 "_id": test_pid,
399 "name": self.test_name,
400 }
401 with self.assertRaises(
402 EngineException, msg="Accepted deletion of own project"
403 ) as e:
404 self.topic.delete(self.fake_session, self.test_name)
405 self.assertEqual(
406 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
407 )
408 self.assertIn(
409 "you cannot delete your own project",
410 norm(str(e.exception)),
411 "Wrong exception text",
412 )
413 with self.subTest(i=2):
414 self.auth.get_project.return_value = {"_id": str(uuid4()), "name": "admin"}
415 with self.assertRaises(
416 EngineException, msg="Accepted deletion of project 'admin'"
417 ) as e:
418 self.topic.delete(self.fake_session, "admin")
419 self.assertEqual(
420 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
421 )
422 self.assertIn(
423 "you cannot delete project 'admin'",
424 norm(str(e.exception)),
425 "Wrong exception text",
426 )
427 with self.subTest(i=3):
428 pid = str(uuid4())
429 name = "other-project-name"
430 self.auth.get_project.return_value = {"_id": pid, "name": name}
431 self.auth.get_user_list.return_value = [
432 {
433 "_id": str(uuid4()),
434 "username": self.test_name,
435 "project_role_mappings": [{"project": pid, "role": str(uuid4())}],
436 }
437 ]
438 with self.assertRaises(
439 EngineException, msg="Accepted deletion of used project"
440 ) as e:
441 self.topic.delete(self.fake_session, pid)
442 self.assertEqual(
443 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
444 )
445 self.assertIn(
446 "project '{}' ({}) is being used by user '{}'".format(
447 name, pid, self.test_name
448 ),
449 norm(str(e.exception)),
450 "Wrong exception text",
451 )
452 with self.subTest(i=4):
453 self.auth.get_user_list.return_value = []
454 self.db.get_list.return_value = [
455 {
456 "_id": str(uuid4()),
457 "id": self.test_name,
458 "_admin": {"projects_read": [pid], "projects_write": []},
459 }
460 ]
461 with self.assertRaises(
462 EngineException, msg="Accepted deletion of used project"
463 ) as e:
464 self.topic.delete(self.fake_session, pid)
465 self.assertEqual(
466 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
467 )
468 self.assertIn(
469 "project '{}' ({}) is being used by {} '{}'".format(
470 name, pid, "vnf descriptor", self.test_name
471 ),
472 norm(str(e.exception)),
473 "Wrong exception text",
474 )
477class Test_RoleTopicAuth(TestCase):
478 @classmethod
479 def setUpClass(cls):
480 cls.test_name = "test-role-topic"
481 cls.test_operations = ["tokens:get"]
483 def setUp(self):
484 self.db = Mock(dbbase.DbBase())
485 self.fs = Mock(fsbase.FsBase())
486 self.msg = Mock(msgbase.MsgBase())
487 self.auth = Mock(authconn.Authconn(None, None, None))
488 self.auth.role_permissions = self.test_operations
489 self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth)
490 self.fake_session = {
491 "username": test_name,
492 "project_id": (test_pid,),
493 "method": None,
494 "admin": True,
495 "force": False,
496 "public": False,
497 "allow_show_user_project_role": True,
498 }
499 self.topic.check_quota = Mock(return_value=None) # skip quota
501 def test_new_role(self):
502 with self.subTest(i=1):
503 rollback = []
504 rid1 = str(uuid4())
505 perms_in = {"tokens": True}
506 perms_out = {"default": False, "admin": False, "tokens": True}
507 self.auth.get_role_list.return_value = []
508 self.auth.create_role.return_value = rid1
509 rid2, oid = self.topic.new(
510 rollback,
511 self.fake_session,
512 {"name": self.test_name, "permissions": perms_in},
513 )
514 self.assertEqual(len(rollback), 1, "Wrong rollback length")
515 self.assertEqual(rid2, rid1, "Wrong project identifier")
516 content = self.auth.create_role.call_args[0][0]
517 self.assertEqual(content["name"], self.test_name, "Wrong role name")
518 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
519 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
520 self.assertEqual(
521 content["_admin"]["modified"],
522 content["_admin"]["created"],
523 "Wrong modification time",
524 )
525 with self.subTest(i=2):
526 rollback = []
527 with self.assertRaises(
528 EngineException, msg="Accepted wrong permissions"
529 ) as e:
530 self.topic.new(
531 rollback,
532 self.fake_session,
533 {"name": "other-role-name", "permissions": {"projects": True}},
534 )
535 self.assertEqual(len(rollback), 0, "Wrong rollback length")
536 self.assertEqual(
537 e.exception.http_code,
538 HTTPStatus.UNPROCESSABLE_ENTITY,
539 "Wrong HTTP status code",
540 )
541 self.assertIn(
542 "invalid permission '{}'".format("projects"),
543 norm(str(e.exception)),
544 "Wrong exception text",
545 )
547 def test_edit_role(self):
548 now = time()
549 rid = str(uuid4())
550 role = {
551 "_id": rid,
552 "name": self.test_name,
553 "permissions": {"tokens": True},
554 "_admin": {"created": now, "modified": now},
555 }
556 with self.subTest(i=1):
557 self.auth.get_role_list.side_effect = [[role], []]
558 self.auth.get_role.return_value = role
559 new_name = "new-role-name"
560 perms_in = {"tokens": False, "tokens:get": True}
561 perms_out = {
562 "default": False,
563 "admin": False,
564 "tokens": False,
565 "tokens:get": True,
566 }
567 self.topic.edit(
568 self.fake_session, rid, {"name": new_name, "permissions": perms_in}
569 )
570 content = self.auth.update_role.call_args[0][0]
571 self.assertEqual(content["_id"], rid, "Wrong role identifier")
572 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
573 self.assertGreater(
574 content["_admin"]["modified"], now, "Wrong modification time"
575 )
576 self.assertEqual(content["name"], new_name, "Wrong role name")
577 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
578 with self.subTest(i=2):
579 new_name = "other-role-name"
580 perms_in = {"tokens": False, "tokens:post": True}
581 self.auth.get_role_list.side_effect = [[role], []]
582 with self.assertRaises(
583 EngineException, msg="Accepted wrong permissions"
584 ) as e:
585 self.topic.edit(
586 self.fake_session, rid, {"name": new_name, "permissions": perms_in}
587 )
588 self.assertEqual(
589 e.exception.http_code,
590 HTTPStatus.UNPROCESSABLE_ENTITY,
591 "Wrong HTTP status code",
592 )
593 self.assertIn(
594 "invalid permission '{}'".format("tokens:post"),
595 norm(str(e.exception)),
596 "Wrong exception text",
597 )
599 def test_delete_role(self):
600 with self.subTest(i=1):
601 rid = str(uuid4())
602 role = {"_id": rid, "name": "other-role-name"}
603 self.auth.get_role_list.return_value = [role]
604 self.auth.get_role.return_value = role
605 self.auth.delete_role.return_value = {"deleted": 1}
606 self.auth.get_user_list.return_value = []
607 rc = self.topic.delete(self.fake_session, rid)
608 self.assertEqual(rc, {"deleted": 1}, "Wrong role deletion return info")
609 self.assertEqual(
610 self.auth.get_role_list.call_args[0][0]["_id"],
611 rid,
612 "Wrong role identifier",
613 )
614 self.assertEqual(
615 self.auth.get_role.call_args[0][0], rid, "Wrong role identifier"
616 )
617 self.assertEqual(
618 self.auth.delete_role.call_args[0][0], rid, "Wrong role identifier"
619 )
621 def test_conflict_on_new(self):
622 with self.subTest(i=1):
623 rollback = []
624 rid = str(uuid4())
625 with self.assertRaises(
626 EngineException, msg="Accepted uuid as role name"
627 ) as e:
628 self.topic.new(rollback, self.fake_session, {"name": rid})
629 self.assertEqual(len(rollback), 0, "Wrong rollback length")
630 self.assertEqual(
631 e.exception.http_code,
632 HTTPStatus.UNPROCESSABLE_ENTITY,
633 "Wrong HTTP status code",
634 )
635 self.assertIn(
636 "role name '{}' cannot have an uuid format".format(rid),
637 norm(str(e.exception)),
638 "Wrong exception text",
639 )
640 with self.subTest(i=2):
641 rollback = []
642 self.auth.get_role_list.return_value = [
643 {"_id": str(uuid4()), "name": self.test_name}
644 ]
645 with self.assertRaises(
646 EngineException, msg="Accepted existing role name"
647 ) as e:
648 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
649 self.assertEqual(len(rollback), 0, "Wrong rollback length")
650 self.assertEqual(
651 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
652 )
653 self.assertIn(
654 "role name '{}' exists".format(self.test_name),
655 norm(str(e.exception)),
656 "Wrong exception text",
657 )
659 def test_conflict_on_edit(self):
660 rid = str(uuid4())
661 with self.subTest(i=1):
662 self.auth.get_role_list.return_value = [
663 {"_id": rid, "name": self.test_name, "permissions": {}}
664 ]
665 new_name = str(uuid4())
666 with self.assertRaises(
667 EngineException, msg="Accepted uuid as role name"
668 ) as e:
669 self.topic.edit(self.fake_session, rid, {"name": new_name})
670 self.assertEqual(
671 e.exception.http_code,
672 HTTPStatus.UNPROCESSABLE_ENTITY,
673 "Wrong HTTP status code",
674 )
675 self.assertIn(
676 "role name '{}' cannot have an uuid format".format(new_name),
677 norm(str(e.exception)),
678 "Wrong exception text",
679 )
680 for i, role_name in enumerate(["system_admin", "project_admin"], start=2):
681 with self.subTest(i=i):
682 rid = str(uuid4())
683 self.auth.get_role.return_value = {
684 "_id": rid,
685 "name": role_name,
686 "permissions": {},
687 }
688 with self.assertRaises(
689 EngineException,
690 msg="Accepted renaming of role '{}'".format(role_name),
691 ) as e:
692 self.topic.edit(self.fake_session, rid, {"name": "new-name"})
693 self.assertEqual(
694 e.exception.http_code,
695 HTTPStatus.FORBIDDEN,
696 "Wrong HTTP status code",
697 )
698 self.assertIn(
699 "you cannot rename role '{}'".format(role_name),
700 norm(str(e.exception)),
701 "Wrong exception text",
702 )
703 with self.subTest(i=i + 1):
704 new_name = "new-role-name"
705 self.auth.get_role_list.side_effect = [
706 [{"_id": rid, "name": self.test_name, "permissions": {}}],
707 [{"_id": str(uuid4()), "name": new_name, "permissions": {}}],
708 ]
709 self.auth.get_role.return_value = {
710 "_id": rid,
711 "name": self.test_name,
712 "permissions": {},
713 }
714 with self.assertRaises(
715 EngineException, msg="Accepted existing role name"
716 ) as e:
717 self.topic.edit(self.fake_session, rid, {"name": new_name})
718 self.assertEqual(
719 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
720 )
721 self.assertIn(
722 "role name '{}' exists".format(new_name),
723 norm(str(e.exception)),
724 "Wrong exception text",
725 )
727 def test_conflict_on_del(self):
728 for i, role_name in enumerate(["system_admin", "project_admin"], start=1):
729 with self.subTest(i=i):
730 rid = str(uuid4())
731 role = {"_id": rid, "name": role_name}
732 self.auth.get_role_list.return_value = [role]
733 self.auth.get_role.return_value = role
734 with self.assertRaises(
735 EngineException,
736 msg="Accepted deletion of role '{}'".format(role_name),
737 ) as e:
738 self.topic.delete(self.fake_session, rid)
739 self.assertEqual(
740 e.exception.http_code,
741 HTTPStatus.FORBIDDEN,
742 "Wrong HTTP status code",
743 )
744 self.assertIn(
745 "you cannot delete role '{}'".format(role_name),
746 norm(str(e.exception)),
747 "Wrong exception text",
748 )
749 with self.subTest(i=i + 1):
750 rid = str(uuid4())
751 name = "other-role-name"
752 role = {"_id": rid, "name": name}
753 self.auth.get_role_list.return_value = [role]
754 self.auth.get_role.return_value = role
755 self.auth.get_user_list.return_value = [
756 {
757 "_id": str(uuid4()),
758 "username": self.test_name,
759 "project_role_mappings": [{"project": str(uuid4()), "role": rid}],
760 }
761 ]
762 with self.assertRaises(
763 EngineException, msg="Accepted deletion of used role"
764 ) as e:
765 self.topic.delete(self.fake_session, rid)
766 self.assertEqual(
767 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
768 )
769 self.assertIn(
770 "role '{}' ({}) is being used by user '{}'".format(
771 name, rid, self.test_name
772 ),
773 norm(str(e.exception)),
774 "Wrong exception text",
775 )
778class Test_UserTopicAuth(TestCase):
779 @classmethod
780 def setUpClass(cls):
781 cls.test_name = "test-user-topic"
782 cls.password = "Test@123"
784 def setUp(self):
785 # self.db = Mock(dbbase.DbBase())
786 self.db = DbMemory()
787 self.fs = Mock(fsbase.FsBase())
788 self.msg = Mock(msgbase.MsgBase())
789 self.auth = Mock(authconn.Authconn(None, None, None))
790 self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth)
791 self.fake_session = {
792 "username": test_name,
793 "project_id": (test_pid,),
794 "method": None,
795 "admin": True,
796 "force": False,
797 "public": False,
798 "allow_show_user_project_role": True,
799 }
800 self.topic.check_quota = Mock(return_value=None) # skip quota
802 def test_new_user(self):
803 uid1 = str(uuid4())
804 pid = str(uuid4())
805 self.auth.get_user_list.return_value = []
806 self.auth.get_project.return_value = {"_id": pid, "name": "some_project"}
807 self.auth.create_user.return_value = {"_id": uid1, "username": self.test_name}
808 with self.subTest(i=1):
809 rollback = []
810 rid = str(uuid4())
811 self.auth.get_role.return_value = {"_id": rid, "name": "some_role"}
812 prms_in = [{"project": "some_project", "role": "some_role"}]
813 prms_out = [{"project": pid, "role": rid}]
814 uid2, oid = self.topic.new(
815 rollback,
816 self.fake_session,
817 {
818 "username": self.test_name,
819 "password": self.password,
820 "project_role_mappings": prms_in,
821 },
822 )
823 self.assertEqual(len(rollback), 1, "Wrong rollback length")
824 self.assertEqual(uid2, uid1, "Wrong project identifier")
825 content = self.auth.create_user.call_args[0][0]
826 self.assertEqual(content["username"], self.test_name, "Wrong project name")
827 self.assertEqual(content["password"], self.password, "Wrong password")
828 self.assertEqual(
829 content["project_role_mappings"],
830 prms_out,
831 "Wrong project-role mappings",
832 )
833 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
834 self.assertEqual(
835 content["_admin"]["modified"],
836 content["_admin"]["created"],
837 "Wrong modification time",
838 )
839 with self.subTest(i=2):
840 rollback = []
841 def_rid = str(uuid4())
842 def_role = {"_id": def_rid, "name": "project_admin"}
843 self.auth.get_role.return_value = def_role
844 self.auth.get_role_list.return_value = [def_role]
845 prms_out = [{"project": pid, "role": def_rid}]
846 uid2, oid = self.topic.new(
847 rollback,
848 self.fake_session,
849 {
850 "username": self.test_name,
851 "password": self.password,
852 "projects": ["some_project"],
853 },
854 )
855 self.assertEqual(len(rollback), 1, "Wrong rollback length")
856 self.assertEqual(uid2, uid1, "Wrong project identifier")
857 content = self.auth.create_user.call_args[0][0]
858 self.assertEqual(content["username"], self.test_name, "Wrong project name")
859 self.assertEqual(content["password"], self.password, "Wrong password")
860 self.assertEqual(
861 content["project_role_mappings"],
862 prms_out,
863 "Wrong project-role mappings",
864 )
865 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
866 self.assertEqual(
867 content["_admin"]["modified"],
868 content["_admin"]["created"],
869 "Wrong modification time",
870 )
871 with self.subTest(i=3):
872 rollback = []
873 with self.assertRaises(
874 EngineException, msg="Accepted wrong project-role mappings"
875 ) as e:
876 self.topic.new(
877 rollback,
878 self.fake_session,
879 {
880 "username": "other-project-name",
881 "password": "Other@pwd1",
882 "project_role_mappings": [{}],
883 },
884 )
885 self.assertEqual(len(rollback), 0, "Wrong rollback length")
886 self.assertEqual(
887 e.exception.http_code,
888 HTTPStatus.UNPROCESSABLE_ENTITY,
889 "Wrong HTTP status code",
890 )
891 self.assertIn(
892 "format error at '{}' '{}'".format(
893 "project_role_mappings:{}", "'{}' is a required property"
894 ).format(0, "project"),
895 norm(str(e.exception)),
896 "Wrong exception text",
897 )
898 with self.subTest(i=4):
899 rollback = []
900 with self.assertRaises(EngineException, msg="Accepted wrong projects") as e:
901 self.topic.new(
902 rollback,
903 self.fake_session,
904 {
905 "username": "other-project-name",
906 "password": "Other@pwd1",
907 "projects": [],
908 },
909 )
910 self.assertEqual(len(rollback), 0, "Wrong rollback length")
911 self.assertEqual(
912 e.exception.http_code,
913 HTTPStatus.UNPROCESSABLE_ENTITY,
914 "Wrong HTTP status code",
915 )
916 self.assertIn(
917 "format error at '{}' '{}'".format(
918 "projects", "{} should be non-empty"
919 ).format([]),
920 norm(str(e.exception)),
921 "Wrong exception text",
922 )
924 def test_edit_user(self):
925 now = time()
926 uid = str(uuid4())
927 pid1 = str(uuid4())
928 rid1 = str(uuid4())
929 test_project = {
930 "_id": test_pid,
931 "name": "test",
932 "_admin": {"created": now, "modified": now},
933 }
934 self.db.create("projects", test_project)
935 self.fake_session["user_id"] = uid
936 self.fake_session["admin_show"] = True
937 prms = [
938 {
939 "project": pid1,
940 "project_name": "project-1",
941 "role": rid1,
942 "role_name": "role-1",
943 }
944 ]
945 user = {
946 "_id": uid,
947 "username": self.test_name,
948 "project_role_mappings": prms,
949 "_admin": {"created": now, "modified": now},
950 }
951 with self.subTest(i=1):
952 self.auth.get_user_list.side_effect = [[user], []]
953 self.auth.get_user.return_value = user
954 pid2 = str(uuid4())
955 rid2 = str(uuid4())
956 self.auth.get_project.side_effect = [
957 {"_id": pid2, "name": "project-2"},
958 {"_id": pid1, "name": "project-1"},
959 ]
960 self.auth.get_role.side_effect = [
961 {"_id": rid2, "name": "role-2"},
962 {"_id": rid1, "name": "role-1"},
963 ]
965 role = {
966 "_id": rid1,
967 "name": "role-1",
968 "permissions": {"default": False, "admin": False, "roles": True},
969 }
970 self.db.create("users", user)
971 self.db.create("roles", role)
972 new_name = "new-user-name"
973 new_pasw = "New@pwd1"
974 add_prms = [{"project": pid2, "role": rid2}]
975 rem_prms = [{"project": pid1, "role": rid1}]
976 self.topic.edit(
977 self.fake_session,
978 uid,
979 {
980 "username": new_name,
981 "password": new_pasw,
982 "add_project_role_mappings": add_prms,
983 "remove_project_role_mappings": rem_prms,
984 },
985 )
986 content = self.auth.update_user.call_args[0][0]
987 self.assertEqual(content["_id"], uid, "Wrong user identifier")
988 self.assertEqual(content["username"], new_name, "Wrong user name")
989 self.assertEqual(content["password"], new_pasw, "Wrong user password")
990 self.assertEqual(
991 content["add_project_role_mappings"],
992 add_prms,
993 "Wrong project-role mappings to add",
994 )
995 self.assertEqual(
996 content["remove_project_role_mappings"],
997 prms,
998 "Wrong project-role mappings to remove",
999 )
1000 with self.subTest(i=2):
1001 new_name = "other-user-name"
1002 new_prms = [{}]
1003 self.auth.get_role_list.side_effect = [[user], []]
1004 self.auth.get_user_list.side_effect = [[user]]
1005 with self.assertRaises(
1006 EngineException, msg="Accepted wrong project-role mappings"
1007 ) as e:
1008 self.topic.edit(
1009 self.fake_session,
1010 uid,
1011 {"username": new_name, "project_role_mappings": new_prms},
1012 )
1013 self.assertEqual(
1014 e.exception.http_code,
1015 HTTPStatus.UNPROCESSABLE_ENTITY,
1016 "Wrong HTTP status code",
1017 )
1018 self.assertIn(
1019 "format error at '{}' '{}'".format(
1020 "project_role_mappings:{}", "'{}' is a required property"
1021 ).format(0, "project"),
1022 norm(str(e.exception)),
1023 "Wrong exception text",
1024 )
1025 with self.subTest(i=3):
1026 self.auth.get_user_list.side_effect = [[user], []]
1027 self.auth.get_user.return_value = user
1028 old_password = self.password
1029 new_pasw = "New@pwd1"
1030 self.topic.edit(
1031 self.fake_session,
1032 uid,
1033 {
1034 "old_password": old_password,
1035 "password": new_pasw,
1036 },
1037 )
1038 content = self.auth.update_user.call_args[0][0]
1039 self.assertEqual(
1040 content["old_password"], old_password, "Wrong old password"
1041 )
1042 self.assertEqual(content["password"], new_pasw, "Wrong user password")
1044 def test_delete_user(self):
1045 with self.subTest(i=1):
1046 uid = str(uuid4())
1047 other_uid = str(uuid4())
1048 self.fake_session["user_id"] = other_uid
1049 user = user = {
1050 "_id": uid,
1051 "username": "other-user-name",
1052 "project_role_mappings": [],
1053 }
1054 self.auth.get_user.return_value = user
1055 self.auth.delete_user.return_value = {"deleted": 1}
1056 rc = self.topic.delete(self.fake_session, uid)
1057 self.assertEqual(rc, {"deleted": 1}, "Wrong user deletion return info")
1058 self.assertEqual(
1059 self.auth.get_user.call_args[0][0], uid, "Wrong user identifier"
1060 )
1061 self.assertEqual(
1062 self.auth.delete_user.call_args[0][0], uid, "Wrong user identifier"
1063 )
1065 def test_conflict_on_new(self):
1066 with self.subTest(i=1):
1067 rollback = []
1068 uid = str(uuid4())
1069 with self.assertRaises(
1070 EngineException, msg="Accepted uuid as username"
1071 ) as e:
1072 self.topic.new(
1073 rollback,
1074 self.fake_session,
1075 {
1076 "username": uid,
1077 "password": self.password,
1078 "projects": [test_pid],
1079 },
1080 )
1081 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1082 self.assertEqual(
1083 e.exception.http_code,
1084 HTTPStatus.UNPROCESSABLE_ENTITY,
1085 "Wrong HTTP status code",
1086 )
1087 self.assertIn(
1088 "username '{}' cannot have a uuid format".format(uid),
1089 norm(str(e.exception)),
1090 "Wrong exception text",
1091 )
1092 with self.subTest(i=2):
1093 rollback = []
1094 self.auth.get_user_list.return_value = [
1095 {"_id": str(uuid4()), "username": self.test_name}
1096 ]
1097 with self.assertRaises(
1098 EngineException, msg="Accepted existing username"
1099 ) as e:
1100 self.topic.new(
1101 rollback,
1102 self.fake_session,
1103 {
1104 "username": self.test_name,
1105 "password": self.password,
1106 "projects": [test_pid],
1107 },
1108 )
1109 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1110 self.assertEqual(
1111 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1112 )
1113 self.assertIn(
1114 "username '{}' is already used".format(self.test_name),
1115 norm(str(e.exception)),
1116 "Wrong exception text",
1117 )
1118 with self.subTest(i=3):
1119 rollback = []
1120 self.auth.get_user_list.return_value = []
1121 self.auth.get_role_list.side_effect = [[], []]
1122 with self.assertRaises(
1123 AuthconnNotFoundException, msg="Accepted user without default role"
1124 ) as e:
1125 self.topic.new(
1126 rollback,
1127 self.fake_session,
1128 {
1129 "username": self.test_name,
1130 "password": self.password,
1131 "projects": [str(uuid4())],
1132 },
1133 )
1134 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1135 self.assertEqual(
1136 e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code"
1137 )
1138 self.assertIn(
1139 "can't find default role for user '{}'".format(self.test_name),
1140 norm(str(e.exception)),
1141 "Wrong exception text",
1142 )
1144 def test_conflict_on_edit(self):
1145 uid = str(uuid4())
1146 with self.subTest(i=1):
1147 self.auth.get_user_list.return_value = [
1148 {"_id": uid, "username": self.test_name}
1149 ]
1150 new_name = str(uuid4())
1151 with self.assertRaises(
1152 EngineException, msg="Accepted uuid as username"
1153 ) as e:
1154 self.topic.edit(self.fake_session, uid, {"username": new_name})
1155 self.assertEqual(
1156 e.exception.http_code,
1157 HTTPStatus.UNPROCESSABLE_ENTITY,
1158 "Wrong HTTP status code",
1159 )
1160 self.assertIn(
1161 "username '{}' cannot have an uuid format".format(new_name),
1162 norm(str(e.exception)),
1163 "Wrong exception text",
1164 )
1165 with self.subTest(i=2):
1166 self.auth.get_user_list.return_value = [
1167 {"_id": uid, "username": self.test_name}
1168 ]
1169 self.auth.get_role_list.side_effect = [[], []]
1170 with self.assertRaises(
1171 AuthconnNotFoundException, msg="Accepted user without default role"
1172 ) as e:
1173 self.topic.edit(self.fake_session, uid, {"projects": [str(uuid4())]})
1174 self.assertEqual(
1175 e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code"
1176 )
1177 self.assertIn(
1178 "can't find a default role for user '{}'".format(self.test_name),
1179 norm(str(e.exception)),
1180 "Wrong exception text",
1181 )
1182 with self.subTest(i=3):
1183 admin_uid = str(uuid4())
1184 self.auth.get_user_list.return_value = [
1185 {"_id": admin_uid, "username": "admin"}
1186 ]
1187 with self.assertRaises(
1188 EngineException,
1189 msg="Accepted removing system_admin role from admin user",
1190 ) as e:
1191 self.topic.edit(
1192 self.fake_session,
1193 admin_uid,
1194 {
1195 "remove_project_role_mappings": [
1196 {"project": "admin", "role": "system_admin"}
1197 ]
1198 },
1199 )
1200 self.assertEqual(
1201 e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code"
1202 )
1203 self.assertIn(
1204 "you cannot remove system_admin role from admin user",
1205 norm(str(e.exception)),
1206 "Wrong exception text",
1207 )
1208 with self.subTest(i=4):
1209 new_name = "new-user-name"
1210 self.auth.get_user_list.side_effect = [
1211 [{"_id": uid, "name": self.test_name}],
1212 [{"_id": str(uuid4()), "name": new_name}],
1213 ]
1214 with self.assertRaises(
1215 EngineException, msg="Accepted existing username"
1216 ) as e:
1217 self.topic.edit(self.fake_session, uid, {"username": new_name})
1218 self.assertEqual(
1219 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1220 )
1221 self.assertIn(
1222 "username '{}' is already used".format(new_name),
1223 norm(str(e.exception)),
1224 "Wrong exception text",
1225 )
1227 def test_conflict_on_del(self):
1228 with self.subTest(i=1):
1229 uid = str(uuid4())
1230 self.fake_session["user_id"] = uid
1231 user = user = {
1232 "_id": uid,
1233 "username": self.test_name,
1234 "project_role_mappings": [],
1235 }
1236 self.auth.get_user.return_value = user
1237 with self.assertRaises(
1238 EngineException, msg="Accepted deletion of own user"
1239 ) as e:
1240 self.topic.delete(self.fake_session, uid)
1241 self.assertEqual(
1242 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1243 )
1244 self.assertIn(
1245 "you cannot delete your own login user",
1246 norm(str(e.exception)),
1247 "Wrong exception text",
1248 )
1250 def test_user_management(self):
1251 self.config = {
1252 "user_management": True,
1253 "pwd_expire_days": 30,
1254 "max_pwd_attempt": 5,
1255 "account_expire_days": 90,
1256 "version": "dev",
1257 "deviceVendor": "test",
1258 "deviceProduct": "test",
1259 }
1260 self.permissions = {"admin": True, "default": True}
1261 now = time()
1262 rid = str(uuid4())
1263 role = {
1264 "_id": rid,
1265 "name": self.test_name,
1266 "permissions": self.permissions,
1267 "_admin": {"created": now, "modified": now},
1268 }
1269 self.db.create("roles", role)
1270 admin_user = {
1271 "_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500",
1272 "username": "admin",
1273 "password": "bf0d9f988ad9b404464cf8c8749b298209b05fd404119bae0c11e247efbbc4cb",
1274 "_admin": {
1275 "created": 1663058370.7721832,
1276 "modified": 1663681183.5651639,
1277 "salt": "37587e7e0c2f4dbfb9416f3fb5543e2b",
1278 "last_token_time": 1666876472.2962265,
1279 "user_status": "always-active",
1280 "retry_count": 0,
1281 },
1282 "project_role_mappings": [
1283 {"project": "a595ce4e-09dc-4b24-9d6f-e723830bc66b", "role": rid}
1284 ],
1285 }
1286 self.db.create("users", admin_user)
1287 with self.subTest(i=1):
1288 self.user_create = AuthconnInternal(self.config, self.db, self.permissions)
1289 user_info = {"username": "user_mgmt_true", "password": "Test@123"}
1290 self.user_create.create_user(user_info)
1291 user = self.db.get_one("users", {"username": user_info["username"]})
1292 self.assertEqual(user["username"], user_info["username"], "Wrong user name")
1293 self.assertEqual(
1294 user["_admin"]["user_status"], "active", "User status is unknown"
1295 )
1296 self.assertIn("password_expire_time", user["_admin"], "Key is not there")
1297 self.assertIn("account_expire_time", user["_admin"], "Key is not there")
1298 with self.subTest(i=2):
1299 self.user_update = AuthconnInternal(self.config, self.db, self.permissions)
1300 locked_user = {
1301 "username": "user_lock",
1302 "password": "c94ba8cfe81985cf5c84dff16d5bac95814ab17e44a8871755eb4cf3a27b7d3d",
1303 "_admin": {
1304 "created": 1667207552.2191198,
1305 "modified": 1667207552.2191815,
1306 "salt": "560a5d51b1d64bb4b9cae0ccff3f1102",
1307 "user_status": "locked",
1308 "password_expire_time": 1667207552.2191815,
1309 "account_expire_time": now + 60,
1310 "retry_count": 5,
1311 "last_token_time": 1667207552.2191815,
1312 },
1313 "_id": "73bbbb71-ed38-4b79-9f58-ece19e7e32d6",
1314 }
1315 self.db.create("users", locked_user)
1316 user_info = {
1317 "_id": "73bbbb71-ed38-4b79-9f58-ece19e7e32d6",
1318 "system_admin_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500",
1319 "unlock": True,
1320 }
1321 self.assertEqual(
1322 locked_user["_admin"]["user_status"], "locked", "User status is unknown"
1323 )
1324 self.user_update.update_user(user_info)
1325 user = self.db.get_one("users", {"username": locked_user["username"]})
1326 self.assertEqual(
1327 user["username"], locked_user["username"], "Wrong user name"
1328 )
1329 self.assertEqual(
1330 user["_admin"]["user_status"], "active", "User status is unknown"
1331 )
1332 self.assertEqual(user["_admin"]["retry_count"], 0, "retry_count is unknown")
1333 with self.subTest(i=3):
1334 self.user_update = AuthconnInternal(self.config, self.db, self.permissions)
1335 expired_user = {
1336 "username": "user_expire",
1337 "password": "c94ba8cfe81985cf5c84dff16d5bac95814ab17e44a8871755eb4cf3a27b7d3d",
1338 "_admin": {
1339 "created": 1665602087.601298,
1340 "modified": 1665636442.1245084,
1341 "salt": "560a5d51b1d64bb4b9cae0ccff3f1102",
1342 "user_status": "expired",
1343 "password_expire_time": 1668248628.2191815,
1344 "account_expire_time": 1666952628.2191815,
1345 "retry_count": 0,
1346 "last_token_time": 1666779828.2171815,
1347 },
1348 "_id": "3266430f-8222-407f-b08f-3a242504ab94",
1349 }
1350 self.db.create("users", expired_user)
1351 user_info = {
1352 "_id": "3266430f-8222-407f-b08f-3a242504ab94",
1353 "system_admin_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500",
1354 "renew": True,
1355 }
1356 self.assertEqual(
1357 expired_user["_admin"]["user_status"],
1358 "expired",
1359 "User status is unknown",
1360 )
1361 self.user_update.update_user(user_info)
1362 user = self.db.get_one("users", {"username": expired_user["username"]})
1363 self.assertEqual(
1364 user["username"], expired_user["username"], "Wrong user name"
1365 )
1366 self.assertEqual(
1367 user["_admin"]["user_status"], "active", "User status is unknown"
1368 )
1369 self.assertGreater(
1370 user["_admin"]["account_expire_time"],
1371 expired_user["_admin"]["account_expire_time"],
1372 "User expire time is not get extended",
1373 )
1374 with self.subTest(i=4):
1375 self.config.update({"user_management": False})
1376 self.user_create = AuthconnInternal(self.config, self.db, self.permissions)
1377 user_info = {"username": "user_mgmt_false", "password": "Test@123"}
1378 self.user_create.create_user(user_info)
1379 user = self.db.get_one("users", {"username": user_info["username"]})
1380 self.assertEqual(user["username"], user_info["username"], "Wrong user name")
1381 self.assertEqual(
1382 user["_admin"]["user_status"], "active", "User status is unknown"
1383 )
1384 self.assertNotIn("password_expire_time", user["_admin"], "Key is not there")
1385 self.assertNotIn("account_expire_time", user["_admin"], "Key is not there")
1388class Test_CommonVimWimSdn(TestCase):
1389 @classmethod
1390 def setUpClass(cls):
1391 cls.test_name = "test-cim-topic" # CIM = Common Infrastructure Manager
1393 def setUp(self):
1394 self.db = Mock(dbbase.DbBase())
1395 self.fs = Mock(fsbase.FsBase())
1396 self.msg = Mock(msgbase.MsgBase())
1397 self.auth = Mock(authconn.Authconn(None, None, None))
1398 self.topic = CommonVimWimSdn(self.db, self.fs, self.msg, self.auth)
1399 # Use WIM schemas for testing because they are the simplest
1400 self.topic._send_msg = Mock()
1401 self.topic.topic = "wims"
1402 self.topic.schema_new = validation.wim_account_new_schema
1403 self.topic.schema_edit = validation.wim_account_edit_schema
1404 self.fake_session = {
1405 "username": test_name,
1406 "project_id": (test_pid,),
1407 "method": None,
1408 "admin": True,
1409 "force": False,
1410 "public": False,
1411 "allow_show_user_project_role": True,
1412 }
1413 self.topic.check_quota = Mock(return_value=None) # skip quota
1415 def test_new_cvws(self):
1416 test_url = "http://0.0.0.0:0"
1417 with self.subTest(i=1):
1418 rollback = []
1419 test_type = "fake"
1420 self.db.get_one.return_value = None
1421 self.db.create.side_effect = lambda self, content: content["_id"]
1422 cid, oid = self.topic.new(
1423 rollback,
1424 self.fake_session,
1425 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type},
1426 )
1427 self.assertEqual(len(rollback), 1, "Wrong rollback length")
1428 args = self.db.create.call_args[0]
1429 content = args[1]
1430 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
1431 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
1432 self.assertEqual(content["name"], self.test_name, "Wrong CIM name")
1433 self.assertEqual(content["wim_url"], test_url, "Wrong URL")
1434 self.assertEqual(content["wim_type"], test_type, "Wrong CIM type")
1435 self.assertEqual(content["schema_version"], "1.11", "Wrong schema version")
1436 self.assertEqual(content["op_id"], oid, "Wrong operation identifier")
1437 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
1438 self.assertEqual(
1439 content["_admin"]["modified"],
1440 content["_admin"]["created"],
1441 "Wrong modification time",
1442 )
1443 self.assertEqual(
1444 content["_admin"]["operationalState"],
1445 "PROCESSING",
1446 "Wrong operational state",
1447 )
1448 self.assertEqual(
1449 content["_admin"]["projects_read"],
1450 [test_pid],
1451 "Wrong read-only projects",
1452 )
1453 self.assertEqual(
1454 content["_admin"]["projects_write"],
1455 [test_pid],
1456 "Wrong read/write projects",
1457 )
1458 self.assertIsNone(
1459 content["_admin"]["current_operation"], "Wrong current operation"
1460 )
1461 self.assertEqual(
1462 len(content["_admin"]["operations"]), 1, "Wrong number of operations"
1463 )
1464 operation = content["_admin"]["operations"][0]
1465 self.assertEqual(
1466 operation["lcmOperationType"], "create", "Wrong operation type"
1467 )
1468 self.assertEqual(
1469 operation["operationState"], "PROCESSING", "Wrong operation state"
1470 )
1471 self.assertGreater(
1472 operation["startTime"],
1473 content["_admin"]["created"],
1474 "Wrong operation start time",
1475 )
1476 self.assertGreater(
1477 operation["statusEnteredTime"],
1478 content["_admin"]["created"],
1479 "Wrong operation status enter time",
1480 )
1481 self.assertEqual(
1482 operation["detailed-status"], "", "Wrong operation detailed status info"
1483 )
1484 self.assertIsNone(
1485 operation["operationParams"], "Wrong operation parameters"
1486 )
1487 # This test is disabled. From Feature 8030 we admit all WIM/SDN types
1488 # with self.subTest(i=2):
1489 # rollback = []
1490 # test_type = "bad_type"
1491 # with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e:
1492 # self.topic.new(rollback, self.fake_session,
1493 # {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
1494 # self.assertEqual(len(rollback), 0, "Wrong rollback length")
1495 # self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
1496 # self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type,""),
1497 # norm(str(e.exception)), "Wrong exception text")
1499 def test_conflict_on_new(self):
1500 with self.subTest(i=1):
1501 rollback = []
1502 test_url = "http://0.0.0.0:0"
1503 test_type = "fake"
1504 self.db.get_one.return_value = {"_id": str(uuid4()), "name": self.test_name}
1505 with self.assertRaises(
1506 EngineException, msg="Accepted existing CIM name"
1507 ) as e:
1508 self.topic.new(
1509 rollback,
1510 self.fake_session,
1511 {
1512 "name": self.test_name,
1513 "wim_url": test_url,
1514 "wim_type": test_type,
1515 },
1516 )
1517 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1518 self.assertEqual(
1519 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1520 )
1521 self.assertIn(
1522 "name '{}' already exists for {}".format(
1523 self.test_name, self.topic.topic
1524 ),
1525 norm(str(e.exception)),
1526 "Wrong exception text",
1527 )
1529 def test_edit_cvws(self):
1530 now = time()
1531 cid = str(uuid4())
1532 test_url = "http://0.0.0.0:0"
1533 test_type = "fake"
1534 cvws = {
1535 "_id": cid,
1536 "name": self.test_name,
1537 "wim_url": test_url,
1538 "wim_type": test_type,
1539 "_admin": {
1540 "created": now,
1541 "modified": now,
1542 "operations": [{"lcmOperationType": "create"}],
1543 },
1544 }
1545 with self.subTest(i=1):
1546 new_name = "new-cim-name"
1547 new_url = "https://1.1.1.1:1"
1548 new_type = "onos"
1549 self.db.get_one.side_effect = [cvws, None]
1550 self.db.replace.return_value = {"updated": 1}
1551 # self.db.encrypt.side_effect = [b64str(), b64str()]
1552 self.topic.edit(
1553 self.fake_session,
1554 cid,
1555 {"name": new_name, "wim_url": new_url, "wim_type": new_type},
1556 )
1557 args = self.db.replace.call_args[0]
1558 content = args[2]
1559 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
1560 self.assertEqual(args[1], cid, "Wrong CIM identifier")
1561 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
1562 self.assertEqual(content["name"], new_name, "Wrong CIM name")
1563 self.assertEqual(content["wim_type"], new_type, "Wrong CIM type")
1564 self.assertEqual(content["wim_url"], new_url, "Wrong URL")
1565 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
1566 self.assertGreater(
1567 content["_admin"]["modified"],
1568 content["_admin"]["created"],
1569 "Wrong modification time",
1570 )
1571 self.assertEqual(
1572 len(content["_admin"]["operations"]), 2, "Wrong number of operations"
1573 )
1574 operation = content["_admin"]["operations"][1]
1575 self.assertEqual(
1576 operation["lcmOperationType"], "edit", "Wrong operation type"
1577 )
1578 self.assertEqual(
1579 operation["operationState"], "PROCESSING", "Wrong operation state"
1580 )
1581 self.assertGreater(
1582 operation["startTime"],
1583 content["_admin"]["modified"],
1584 "Wrong operation start time",
1585 )
1586 self.assertGreater(
1587 operation["statusEnteredTime"],
1588 content["_admin"]["modified"],
1589 "Wrong operation status enter time",
1590 )
1591 self.assertEqual(
1592 operation["detailed-status"], "", "Wrong operation detailed status info"
1593 )
1594 self.assertIsNone(
1595 operation["operationParams"], "Wrong operation parameters"
1596 )
1597 with self.subTest(i=2):
1598 self.db.get_one.side_effect = [cvws]
1599 with self.assertRaises(EngineException, msg="Accepted wrong property") as e:
1600 self.topic.edit(
1601 self.fake_session,
1602 str(uuid4()),
1603 {"name": "new-name", "extra_prop": "anything"},
1604 )
1605 self.assertEqual(
1606 e.exception.http_code,
1607 HTTPStatus.UNPROCESSABLE_ENTITY,
1608 "Wrong HTTP status code",
1609 )
1610 self.assertIn(
1611 "format error '{}'".format(
1612 "additional properties are not allowed ('{}' was unexpected)"
1613 ).format("extra_prop"),
1614 norm(str(e.exception)),
1615 "Wrong exception text",
1616 )
1618 def test_conflict_on_edit(self):
1619 with self.subTest(i=1):
1620 cid = str(uuid4())
1621 new_name = "new-cim-name"
1622 self.db.get_one.side_effect = [
1623 {"_id": cid, "name": self.test_name},
1624 {"_id": str(uuid4()), "name": new_name},
1625 ]
1626 with self.assertRaises(
1627 EngineException, msg="Accepted existing CIM name"
1628 ) as e:
1629 self.topic.edit(self.fake_session, cid, {"name": new_name})
1630 self.assertEqual(
1631 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1632 )
1633 self.assertIn(
1634 "name '{}' already exists for {}".format(new_name, self.topic.topic),
1635 norm(str(e.exception)),
1636 "Wrong exception text",
1637 )
1639 def test_delete_cvws(self):
1640 cid = str(uuid4())
1641 ro_pid = str(uuid4())
1642 rw_pid = str(uuid4())
1643 cvws = {"_id": cid, "name": self.test_name}
1644 self.db.get_list.return_value = []
1645 with self.subTest(i=1):
1646 cvws["_admin"] = {
1647 "projects_read": [test_pid, ro_pid, rw_pid],
1648 "projects_write": [test_pid, rw_pid],
1649 }
1650 self.db.get_one.return_value = cvws
1651 oid = self.topic.delete(self.fake_session, cid)
1652 self.assertIsNone(oid, "Wrong operation identifier")
1653 self.assertEqual(
1654 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1655 )
1656 self.assertEqual(
1657 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1658 )
1659 self.assertEqual(
1660 self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic"
1661 )
1662 self.assertEqual(
1663 self.db.set_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1664 )
1665 self.assertEqual(
1666 self.db.set_one.call_args[1]["update_dict"],
1667 None,
1668 "Wrong read-only projects update",
1669 )
1670 self.assertEqual(
1671 self.db.set_one.call_args[1]["pull_list"],
1672 {
1673 "_admin.projects_read": (test_pid,),
1674 "_admin.projects_write": (test_pid,),
1675 },
1676 "Wrong read/write projects update",
1677 )
1678 self.topic._send_msg.assert_not_called()
1679 with self.subTest(i=2):
1680 now = time()
1681 cvws["_admin"] = {
1682 "projects_read": [test_pid],
1683 "projects_write": [test_pid],
1684 "operations": [],
1685 }
1686 self.db.get_one.return_value = cvws
1687 oid = self.topic.delete(self.fake_session, cid)
1688 self.assertEqual(oid, cid + ":0", "Wrong operation identifier")
1689 self.assertEqual(
1690 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1691 )
1692 self.assertEqual(
1693 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1694 )
1695 self.assertEqual(
1696 self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic"
1697 )
1698 self.assertEqual(
1699 self.db.set_one.call_args[0][1]["_id"], cid, "Wrong user identifier"
1700 )
1701 self.assertEqual(
1702 self.db.set_one.call_args[1]["update_dict"],
1703 {"_admin.to_delete": True},
1704 "Wrong _admin.to_delete update",
1705 )
1706 operation = self.db.set_one.call_args[1]["push"]["_admin.operations"]
1707 self.assertEqual(
1708 operation["lcmOperationType"], "delete", "Wrong operation type"
1709 )
1710 self.assertEqual(
1711 operation["operationState"], "PROCESSING", "Wrong operation state"
1712 )
1713 self.assertEqual(
1714 operation["detailed-status"], "", "Wrong operation detailed status"
1715 )
1716 self.assertIsNone(
1717 operation["operationParams"], "Wrong operation parameters"
1718 )
1719 self.assertGreater(
1720 operation["startTime"], now, "Wrong operation start time"
1721 )
1722 self.assertGreater(
1723 operation["statusEnteredTime"], now, "Wrong operation status enter time"
1724 )
1725 self.topic._send_msg.assert_called_once_with(
1726 "delete", {"_id": cid, "op_id": cid + ":0"}, not_send_msg=None
1727 )
1728 with self.subTest(i=3):
1729 cvws["_admin"] = {
1730 "projects_read": [],
1731 "projects_write": [],
1732 "operations": [],
1733 }
1734 self.db.get_one.return_value = cvws
1735 self.topic._send_msg.reset_mock()
1736 self.db.get_one.reset_mock()
1737 self.db.del_one.reset_mock()
1738 self.fake_session["force"] = True # to force deletion
1739 self.fake_session["admin"] = True # to force deletion
1740 self.fake_session["project_id"] = [] # to force deletion
1741 oid = self.topic.delete(self.fake_session, cid)
1742 self.assertIsNone(oid, "Wrong operation identifier")
1743 self.assertEqual(
1744 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1745 )
1746 self.assertEqual(
1747 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1748 )
1749 self.assertEqual(
1750 self.db.del_one.call_args[0][0], self.topic.topic, "Wrong topic"
1751 )
1752 self.assertEqual(
1753 self.db.del_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1754 )
1755 self.topic._send_msg.assert_called_once_with(
1756 "deleted", {"_id": cid, "op_id": None}, not_send_msg=None
1757 )
1760if __name__ == "__main__":
1761 unittest.main()