Code Coverage

Cobertura Coverage Report > osm_nbi.tests >

test_admin_topics.py

Trend

File Coverage summary

NameClassesLinesConditionals
test_admin_topics.py
100%
1/1
99%
745/747
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
test_admin_topics.py
99%
745/747
N/A

Source

osm_nbi/tests/test_admin_topics.py
1 #! /usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #    http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 1 __author__ = "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com"
18 1 __date__ = "$2019-10-019"
19
20 1 import unittest
21 1 import random
22 1 from unittest import TestCase
23 1 from unittest.mock import Mock, patch, call
24 1 from uuid import uuid4
25 1 from http import HTTPStatus
26 1 from time import time
27 1 from osm_common import dbbase, fsbase, msgbase
28 1 from osm_common.dbmemory import DbMemory
29 1 from osm_nbi import authconn, validation
30 1 from osm_nbi.admin_topics import (
31     ProjectTopicAuth,
32     RoleTopicAuth,
33     UserTopicAuth,
34     CommonVimWimSdn,
35     VcaTopic,
36 )
37 1 from osm_nbi.engine import EngineException
38 1 from osm_nbi.authconn import AuthconnNotFoundException
39 1 from osm_nbi.authconn_internal import AuthconnInternal
40
41
42 1 test_pid = str(uuid4())
43 1 test_name = "test-user"
44
45
46 1 def norm(str):
47     """Normalize string for checking"""
48 1     return " ".join(str.strip().split()).lower()
49
50
51 1 class TestVcaTopic(TestCase):
52 1     def setUp(self):
53 1         self.db = Mock(dbbase.DbBase())
54 1         self.fs = Mock(fsbase.FsBase())
55 1         self.msg = Mock(msgbase.MsgBase())
56 1         self.auth = Mock(authconn.Authconn(None, None, None))
57 1         self.vca_topic = VcaTopic(self.db, self.fs, self.msg, self.auth)
58
59 1     @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_new")
60 1     def test_format_on_new(self, mock_super_format_on_new):
61 1         content = {
62             "_id": "id",
63             "secret": "encrypted_secret",
64             "cacert": "encrypted_cacert",
65         }
66 1         self.db.encrypt.side_effect = ["secret", "cacert"]
67 1         mock_super_format_on_new.return_value = "1234"
68
69 1         oid = self.vca_topic.format_on_new(content)
70
71 1         self.assertEqual(oid, "1234")
72 1         self.assertEqual(content["secret"], "secret")
73 1         self.assertEqual(content["cacert"], "cacert")
74 1         self.db.encrypt.assert_has_calls(
75             [
76                 call("encrypted_secret", schema_version="1.11", salt="id"),
77                 call("encrypted_cacert", schema_version="1.11", salt="id"),
78             ]
79         )
80 1         mock_super_format_on_new.assert_called_with(content, None, False)
81
82 1     @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_edit")
83 1     def test_format_on_edit(self, mock_super_format_on_edit):
84 1         edit_content = {
85             "_id": "id",
86             "secret": "encrypted_secret",
87             "cacert": "encrypted_cacert",
88         }
89 1         final_content = {
90             "_id": "id",
91             "schema_version": "1.11",
92         }
93 1         self.db.encrypt.side_effect = ["secret", "cacert"]
94 1         mock_super_format_on_edit.return_value = "1234"
95
96 1         oid = self.vca_topic.format_on_edit(final_content, edit_content)
97
98 1         self.assertEqual(oid, "1234")
99 1         self.assertEqual(final_content["secret"], "secret")
100 1         self.assertEqual(final_content["cacert"], "cacert")
101 1         self.db.encrypt.assert_has_calls(
102             [
103                 call("encrypted_secret", schema_version="1.11", salt="id"),
104                 call("encrypted_cacert", schema_version="1.11", salt="id"),
105             ]
106         )
107 1         mock_super_format_on_edit.assert_called()
108
109 1     @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
110 1     def test_check_conflict_on_del(self, mock_check_conflict_on_del):
111 1         session = {
112             "project_id": "project-id",
113             "force": False,
114         }
115 1         _id = "vca-id"
116 1         db_content = {}
117
118 1         self.db.get_list.return_value = None
119
120 1         self.vca_topic.check_conflict_on_del(session, _id, db_content)
121
122 1         self.db.get_list.assert_called_with(
123             "vim_accounts",
124             {"vca": _id, "_admin.projects_read.cont": "project-id"},
125         )
126 1         mock_check_conflict_on_del.assert_called_with(session, _id, db_content)
127
128 1     @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
129 1     def test_check_conflict_on_del_force(self, mock_check_conflict_on_del):
130 1         session = {
131             "project_id": "project-id",
132             "force": True,
133         }
134 1         _id = "vca-id"
135 1         db_content = {}
136
137 1         self.vca_topic.check_conflict_on_del(session, _id, db_content)
138
139 1         self.db.get_list.assert_not_called()
140 1         mock_check_conflict_on_del.assert_not_called()
141
142 1     @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
143 1     def test_check_conflict_on_del_with_conflict(self, mock_check_conflict_on_del):
144 1         session = {
145             "project_id": "project-id",
146             "force": False,
147         }
148 1         _id = "vca-id"
149 1         db_content = {}
150
151 1         self.db.get_list.return_value = {"_id": "vim", "vca": "vca-id"}
152
153 1         with self.assertRaises(EngineException) as context:
154 1             self.vca_topic.check_conflict_on_del(session, _id, db_content)
155 0             self.assertEqual(
156                 context.exception,
157                 EngineException(
158                     "There is at least one VIM account using this vca",
159                     http_code=HTTPStatus.CONFLICT,
160                 ),
161             )
162
163 1         self.db.get_list.assert_called_with(
164             "vim_accounts",
165             {"vca": _id, "_admin.projects_read.cont": "project-id"},
166         )
167 1         mock_check_conflict_on_del.assert_not_called()
168
169
170 1 class Test_ProjectTopicAuth(TestCase):
171 1     @classmethod
172 1     def setUpClass(cls):
173 1         cls.test_name = "test-project-topic"
174
175 1     def setUp(self):
176 1         self.db = Mock(dbbase.DbBase())
177 1         self.fs = Mock(fsbase.FsBase())
178 1         self.msg = Mock(msgbase.MsgBase())
179 1         self.auth = Mock(authconn.Authconn(None, None, None))
180 1         self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth)
181 1         self.fake_session = {
182             "username": self.test_name,
183             "project_id": (test_pid,),
184             "method": None,
185             "admin": True,
186             "force": False,
187             "public": False,
188             "allow_show_user_project_role": True,
189         }
190 1         self.topic.check_quota = Mock(return_value=None)  # skip quota
191
192 1     def test_new_project(self):
193 1         with self.subTest(i=1):
194 1             rollback = []
195 1             pid1 = str(uuid4())
196 1             self.auth.get_project_list.return_value = []
197 1             self.auth.create_project.return_value = pid1
198 1             pid2, oid = self.topic.new(
199                 rollback, self.fake_session, {"name": self.test_name, "quotas": {}}
200             )
201 1             self.assertEqual(len(rollback), 1, "Wrong rollback length")
202 1             self.assertEqual(pid2, pid1, "Wrong project identifier")
203 1             content = self.auth.create_project.call_args[0][0]
204 1             self.assertEqual(content["name"], self.test_name, "Wrong project name")
205 1             self.assertEqual(content["quotas"], {}, "Wrong quotas")
206 1             self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
207 1             self.assertEqual(
208                 content["_admin"]["modified"],
209                 content["_admin"]["created"],
210                 "Wrong modification time",
211             )
212 1         with self.subTest(i=2):
213 1             rollback = []
214 1             with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
215 1                 self.topic.new(
216                     rollback,
217                     self.fake_session,
218                     {"name": "other-project-name", "quotas": {"baditems": 10}},
219                 )
220 1             self.assertEqual(len(rollback), 0, "Wrong rollback length")
221 1             self.assertEqual(
222                 e.exception.http_code,
223                 HTTPStatus.UNPROCESSABLE_ENTITY,
224                 "Wrong HTTP status code",
225             )
226 1             self.assertIn(
227                 "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format(
228                     "baditems"
229                 ),
230                 norm(str(e.exception)),
231                 "Wrong exception text",
232             )
233
234 1     def test_edit_project(self):
235 1         now = time()
236 1         pid = str(uuid4())
237 1         proj = {
238             "_id": pid,
239             "name": self.test_name,
240             "_admin": {"created": now, "modified": now},
241         }
242 1         with self.subTest(i=1):
243 1             self.auth.get_project_list.side_effect = [[proj], []]
244 1             new_name = "new-project-name"
245 1             quotas = {
246                 "vnfds": random.SystemRandom().randint(0, 100),
247                 "nsds": random.SystemRandom().randint(0, 100),
248             }
249 1             self.topic.edit(
250                 self.fake_session, pid, {"name": new_name, "quotas": quotas}
251             )
252 1             _id, content = self.auth.update_project.call_args[0]
253 1             self.assertEqual(_id, pid, "Wrong project identifier")
254 1             self.assertEqual(content["_id"], pid, "Wrong project identifier")
255 1             self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
256 1             self.assertGreater(
257                 content["_admin"]["modified"], now, "Wrong modification time"
258             )
259 1             self.assertEqual(content["name"], new_name, "Wrong project name")
260 1             self.assertEqual(content["quotas"], quotas, "Wrong quotas")
261 1         with self.subTest(i=2):
262 1             new_name = "other-project-name"
263 1             quotas = {"baditems": random.SystemRandom().randint(0, 100)}
264 1             self.auth.get_project_list.side_effect = [[proj], []]
265 1             with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
266 1                 self.topic.edit(
267                     self.fake_session, pid, {"name": new_name, "quotas": quotas}
268                 )
269 1             self.assertEqual(
270                 e.exception.http_code,
271                 HTTPStatus.UNPROCESSABLE_ENTITY,
272                 "Wrong HTTP status code",
273             )
274 1             self.assertIn(
275                 "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format(
276                     "baditems"
277                 ),
278                 norm(str(e.exception)),
279                 "Wrong exception text",
280             )
281
282 1     def test_conflict_on_new(self):
283 1         with self.subTest(i=1):
284 1             rollback = []
285 1             pid = str(uuid4())
286 1             with self.assertRaises(
287                 EngineException, msg="Accepted uuid as project name"
288             ) as e:
289 1                 self.topic.new(rollback, self.fake_session, {"name": pid})
290 1             self.assertEqual(len(rollback), 0, "Wrong rollback length")
291 1             self.assertEqual(
292                 e.exception.http_code,
293                 HTTPStatus.UNPROCESSABLE_ENTITY,
294                 "Wrong HTTP status code",
295             )
296 1             self.assertIn(
297                 "project name '{}' cannot have an uuid format".format(pid),
298                 norm(str(e.exception)),
299                 "Wrong exception text",
300             )
301 1         with self.subTest(i=2):
302 1             rollback = []
303 1             self.auth.get_project_list.return_value = [
304                 {"_id": test_pid, "name": self.test_name}
305             ]
306 1             with self.assertRaises(
307                 EngineException, msg="Accepted existing project name"
308             ) as e:
309 1                 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
310 1             self.assertEqual(len(rollback), 0, "Wrong rollback length")
311 1             self.assertEqual(
312                 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
313             )
314 1             self.assertIn(
315                 "project '{}' exists".format(self.test_name),
316                 norm(str(e.exception)),
317                 "Wrong exception text",
318             )
319
320 1     def test_conflict_on_edit(self):
321 1         with self.subTest(i=1):
322 1             self.auth.get_project_list.return_value = [
323                 {"_id": test_pid, "name": self.test_name}
324             ]
325 1             new_name = str(uuid4())
326 1             with self.assertRaises(
327                 EngineException, msg="Accepted uuid as project name"
328             ) as e:
329 1                 self.topic.edit(self.fake_session, test_pid, {"name": new_name})
330 1             self.assertEqual(
331                 e.exception.http_code,
332                 HTTPStatus.UNPROCESSABLE_ENTITY,
333                 "Wrong HTTP status code",
334             )
335 1             self.assertIn(
336                 "project name '{}' cannot have an uuid format".format(new_name),
337                 norm(str(e.exception)),
338                 "Wrong exception text",
339             )
340 1         with self.subTest(i=2):
341 1             pid = str(uuid4())
342 1             self.auth.get_project_list.return_value = [{"_id": pid, "name": "admin"}]
343 1             with self.assertRaises(
344                 EngineException, msg="Accepted renaming of project 'admin'"
345             ) as e:
346 1                 self.topic.edit(self.fake_session, pid, {"name": "new-name"})
347 1             self.assertEqual(
348                 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
349             )
350 1             self.assertIn(
351                 "you cannot rename project 'admin'",
352                 norm(str(e.exception)),
353                 "Wrong exception text",
354             )
355 1         with self.subTest(i=3):
356 1             new_name = "new-project-name"
357 1             self.auth.get_project_list.side_effect = [
358                 [{"_id": test_pid, "name": self.test_name}],
359                 [{"_id": str(uuid4()), "name": new_name}],
360             ]
361 1             with self.assertRaises(
362                 EngineException, msg="Accepted existing project name"
363             ) as e:
364 1                 self.topic.edit(self.fake_session, pid, {"name": new_name})
365 1             self.assertEqual(
366                 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
367             )
368 1             self.assertIn(
369                 "project '{}' is already used".format(new_name),
370                 norm(str(e.exception)),
371                 "Wrong exception text",
372             )
373
374 1     def test_delete_project(self):
375 1         with self.subTest(i=1):
376 1             pid = str(uuid4())
377 1             self.auth.get_project.return_value = {
378                 "_id": pid,
379                 "name": "other-project-name",
380             }
381 1             self.auth.delete_project.return_value = {"deleted": 1}
382 1             self.auth.get_user_list.return_value = []
383 1             self.db.get_list.return_value = []
384 1             rc = self.topic.delete(self.fake_session, pid)
385 1             self.assertEqual(rc, {"deleted": 1}, "Wrong project deletion return info")
386 1             self.assertEqual(
387                 self.auth.get_project.call_args[0][0], pid, "Wrong project identifier"
388             )
389 1             self.assertEqual(
390                 self.auth.delete_project.call_args[0][0],
391                 pid,
392                 "Wrong project identifier",
393             )
394
395 1     def test_conflict_on_del(self):
396 1         with self.subTest(i=1):
397 1             self.auth.get_project.return_value = {
398                 "_id": test_pid,
399                 "name": self.test_name,
400             }
401 1             with self.assertRaises(
402                 EngineException, msg="Accepted deletion of own project"
403             ) as e:
404 1                 self.topic.delete(self.fake_session, self.test_name)
405 1             self.assertEqual(
406                 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
407             )
408 1             self.assertIn(
409                 "you cannot delete your own project",
410                 norm(str(e.exception)),
411                 "Wrong exception text",
412             )
413 1         with self.subTest(i=2):
414 1             self.auth.get_project.return_value = {"_id": str(uuid4()), "name": "admin"}
415 1             with self.assertRaises(
416                 EngineException, msg="Accepted deletion of project 'admin'"
417             ) as e:
418 1                 self.topic.delete(self.fake_session, "admin")
419 1             self.assertEqual(
420                 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
421             )
422 1             self.assertIn(
423                 "you cannot delete project 'admin'",
424                 norm(str(e.exception)),
425                 "Wrong exception text",
426             )
427 1         with self.subTest(i=3):
428 1             pid = str(uuid4())
429 1             name = "other-project-name"
430 1             self.auth.get_project.return_value = {"_id": pid, "name": name}
431 1             self.auth.get_user_list.return_value = [
432                 {
433                     "_id": str(uuid4()),
434                     "username": self.test_name,
435                     "project_role_mappings": [{"project": pid, "role": str(uuid4())}],
436                 }
437             ]
438 1             with self.assertRaises(
439                 EngineException, msg="Accepted deletion of used project"
440             ) as e:
441 1                 self.topic.delete(self.fake_session, pid)
442 1             self.assertEqual(
443                 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
444             )
445 1             self.assertIn(
446                 "project '{}' ({}) is being used by user '{}'".format(
447                     name, pid, self.test_name
448                 ),
449                 norm(str(e.exception)),
450                 "Wrong exception text",
451             )
452 1         with self.subTest(i=4):
453 1             self.auth.get_user_list.return_value = []
454 1             self.db.get_list.return_value = [
455                 {
456                     "_id": str(uuid4()),
457                     "id": self.test_name,
458                     "_admin": {"projects_read": [pid], "projects_write": []},
459                 }
460             ]
461 1             with self.assertRaises(
462                 EngineException, msg="Accepted deletion of used project"
463             ) as e:
464 1                 self.topic.delete(self.fake_session, pid)
465 1             self.assertEqual(
466                 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
467             )
468 1             self.assertIn(
469                 "project '{}' ({}) is being used by {} '{}'".format(
470                     name, pid, "vnf descriptor", self.test_name
471                 ),
472                 norm(str(e.exception)),
473                 "Wrong exception text",
474             )
475
476
477 1 class Test_RoleTopicAuth(TestCase):
478 1     @classmethod
479 1     def setUpClass(cls):
480 1         cls.test_name = "test-role-topic"
481 1         cls.test_operations = ["tokens:get"]
482
483 1     def setUp(self):
484 1         self.db = Mock(dbbase.DbBase())
485 1         self.fs = Mock(fsbase.FsBase())
486 1         self.msg = Mock(msgbase.MsgBase())
487 1         self.auth = Mock(authconn.Authconn(None, None, None))
488 1         self.auth.role_permissions = self.test_operations
489 1         self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth)
490 1         self.fake_session = {
491             "username": test_name,
492             "project_id": (test_pid,),
493             "method": None,
494             "admin": True,
495             "force": False,
496             "public": False,
497             "allow_show_user_project_role": True,
498         }
499 1         self.topic.check_quota = Mock(return_value=None)  # skip quota
500
501 1     def test_new_role(self):
502 1         with self.subTest(i=1):
503 1             rollback = []
504 1             rid1 = str(uuid4())
505 1             perms_in = {"tokens": True}
506 1             perms_out = {"default": False, "admin": False, "tokens": True}
507 1             self.auth.get_role_list.return_value = []
508 1             self.auth.create_role.return_value = rid1
509 1             rid2, oid = self.topic.new(
510                 rollback,
511                 self.fake_session,
512                 {"name": self.test_name, "permissions": perms_in},
513             )
514 1             self.assertEqual(len(rollback), 1, "Wrong rollback length")
515 1             self.assertEqual(rid2, rid1, "Wrong project identifier")
516 1             content = self.auth.create_role.call_args[0][0]
517 1             self.assertEqual(content["name"], self.test_name, "Wrong role name")
518 1             self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
519 1             self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
520 1             self.assertEqual(
521                 content["_admin"]["modified"],
522                 content["_admin"]["created"],
523                 "Wrong modification time",
524             )
525 1         with self.subTest(i=2):
526 1             rollback = []
527 1             with self.assertRaises(
528                 EngineException, msg="Accepted wrong permissions"
529             ) as e:
530 1                 self.topic.new(
531                     rollback,
532                     self.fake_session,
533                     {"name": "other-role-name", "permissions": {"projects": True}},
534                 )
535 1             self.assertEqual(len(rollback), 0, "Wrong rollback length")
536 1             self.assertEqual(
537                 e.exception.http_code,
538                 HTTPStatus.UNPROCESSABLE_ENTITY,
539                 "Wrong HTTP status code",
540             )
541 1             self.assertIn(
542                 "invalid permission '{}'".format("projects"),
543                 norm(str(e.exception)),
544                 "Wrong exception text",
545             )
546
547 1     def test_edit_role(self):
548 1         now = time()
549 1         rid = str(uuid4())
550 1         role = {
551             "_id": rid,
552             "name": self.test_name,
553             "permissions": {"tokens": True},
554             "_admin": {"created": now, "modified": now},
555         }
556 1         with self.subTest(i=1):
557 1             self.auth.get_role_list.side_effect = [[role], []]
558 1             self.auth.get_role.return_value = role
559 1             new_name = "new-role-name"
560 1             perms_in = {"tokens": False, "tokens:get": True}
561 1             perms_out = {
562                 "default": False,
563                 "admin": False,
564                 "tokens": False,
565                 "tokens:get": True,
566             }
567 1             self.topic.edit(
568                 self.fake_session, rid, {"name": new_name, "permissions": perms_in}
569             )
570 1             content = self.auth.update_role.call_args[0][0]
571 1             self.assertEqual(content["_id"], rid, "Wrong role identifier")
572 1             self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
573 1             self.assertGreater(
574                 content["_admin"]["modified"], now, "Wrong modification time"
575             )
576 1             self.assertEqual(content["name"], new_name, "Wrong role name")
577 1             self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
578 1         with self.subTest(i=2):
579 1             new_name = "other-role-name"
580 1             perms_in = {"tokens": False, "tokens:post": True}
581 1             self.auth.get_role_list.side_effect = [[role], []]
582 1             with self.assertRaises(
583                 EngineException, msg="Accepted wrong permissions"
584             ) as e:
585 1                 self.topic.edit(
586                     self.fake_session, rid, {"name": new_name, "permissions": perms_in}
587                 )
588 1             self.assertEqual(
589                 e.exception.http_code,
590                 HTTPStatus.UNPROCESSABLE_ENTITY,
591                 "Wrong HTTP status code",
592             )
593 1             self.assertIn(
594                 "invalid permission '{}'".format("tokens:post"),
595                 norm(str(e.exception)),
596                 "Wrong exception text",
597             )
598
599 1     def test_delete_role(self):
600 1         with self.subTest(i=1):
601 1             rid = str(uuid4())
602 1             role = {"_id": rid, "name": "other-role-name"}
603 1             self.auth.get_role_list.return_value = [role]
604 1             self.auth.get_role.return_value = role
605 1             self.auth.delete_role.return_value = {"deleted": 1}
606 1             self.auth.get_user_list.return_value = []
607 1             rc = self.topic.delete(self.fake_session, rid)
608 1             self.assertEqual(rc, {"deleted": 1}, "Wrong role deletion return info")
609 1             self.assertEqual(
610                 self.auth.get_role_list.call_args[0][0]["_id"],
611                 rid,
612                 "Wrong role identifier",
613             )
614 1             self.assertEqual(
615                 self.auth.get_role.call_args[0][0], rid, "Wrong role identifier"
616             )
617 1             self.assertEqual(
618                 self.auth.delete_role.call_args[0][0], rid, "Wrong role identifier"
619             )
620
621 1     def test_conflict_on_new(self):
622 1         with self.subTest(i=1):
623 1             rollback = []
624 1             rid = str(uuid4())
625 1             with self.assertRaises(
626                 EngineException, msg="Accepted uuid as role name"
627             ) as e:
628 1                 self.topic.new(rollback, self.fake_session, {"name": rid})
629 1             self.assertEqual(len(rollback), 0, "Wrong rollback length")
630 1             self.assertEqual(
631                 e.exception.http_code,
632                 HTTPStatus.UNPROCESSABLE_ENTITY,
633                 "Wrong HTTP status code",
634             )
635 1             self.assertIn(
636                 "role name '{}' cannot have an uuid format".format(rid),
637                 norm(str(e.exception)),
638                 "Wrong exception text",
639             )
640 1         with self.subTest(i=2):
641 1             rollback = []
642 1             self.auth.get_role_list.return_value = [
643                 {"_id": str(uuid4()), "name": self.test_name}
644             ]
645 1             with self.assertRaises(
646                 EngineException, msg="Accepted existing role name"
647             ) as e:
648 1                 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
649 1             self.assertEqual(len(rollback), 0, "Wrong rollback length")
650 1             self.assertEqual(
651                 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
652             )
653 1             self.assertIn(
654                 "role name '{}' exists".format(self.test_name),
655                 norm(str(e.exception)),
656                 "Wrong exception text",
657             )
658
659 1     def test_conflict_on_edit(self):
660 1         rid = str(uuid4())
661 1         with self.subTest(i=1):
662 1             self.auth.get_role_list.return_value = [
663                 {"_id": rid, "name": self.test_name, "permissions": {}}
664             ]
665 1             new_name = str(uuid4())
666 1             with self.assertRaises(
667                 EngineException, msg="Accepted uuid as role name"
668             ) as e:
669 1                 self.topic.edit(self.fake_session, rid, {"name": new_name})
670 1             self.assertEqual(
671                 e.exception.http_code,
672                 HTTPStatus.UNPROCESSABLE_ENTITY,
673                 "Wrong HTTP status code",
674             )
675 1             self.assertIn(
676                 "role name '{}' cannot have an uuid format".format(new_name),
677                 norm(str(e.exception)),
678                 "Wrong exception text",
679             )
680 1         for i, role_name in enumerate(["system_admin", "project_admin"], start=2):
681 1             with self.subTest(i=i):
682 1                 rid = str(uuid4())
683 1                 self.auth.get_role.return_value = {
684                     "_id": rid,
685                     "name": role_name,
686                     "permissions": {},
687                 }
688 1                 with self.assertRaises(
689                     EngineException,
690                     msg="Accepted renaming of role '{}'".format(role_name),
691                 ) as e:
692 1                     self.topic.edit(self.fake_session, rid, {"name": "new-name"})
693 1                 self.assertEqual(
694                     e.exception.http_code,
695                     HTTPStatus.FORBIDDEN,
696                     "Wrong HTTP status code",
697                 )
698 1                 self.assertIn(
699                     "you cannot rename role '{}'".format(role_name),
700                     norm(str(e.exception)),
701                     "Wrong exception text",
702                 )
703 1         with self.subTest(i=i + 1):
704 1             new_name = "new-role-name"
705 1             self.auth.get_role_list.side_effect = [
706                 [{"_id": rid, "name": self.test_name, "permissions": {}}],
707                 [{"_id": str(uuid4()), "name": new_name, "permissions": {}}],
708             ]
709 1             self.auth.get_role.return_value = {
710                 "_id": rid,
711                 "name": self.test_name,
712                 "permissions": {},
713             }
714 1             with self.assertRaises(
715                 EngineException, msg="Accepted existing role name"
716             ) as e:
717 1                 self.topic.edit(self.fake_session, rid, {"name": new_name})
718 1             self.assertEqual(
719                 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
720             )
721 1             self.assertIn(
722                 "role name '{}' exists".format(new_name),
723                 norm(str(e.exception)),
724                 "Wrong exception text",
725             )
726
727 1     def test_conflict_on_del(self):
728 1         for i, role_name in enumerate(["system_admin", "project_admin"], start=1):
729 1             with self.subTest(i=i):
730 1                 rid = str(uuid4())
731 1                 role = {"_id": rid, "name": role_name}
732 1                 self.auth.get_role_list.return_value = [role]
733 1                 self.auth.get_role.return_value = role
734 1                 with self.assertRaises(
735                     EngineException,
736                     msg="Accepted deletion of role '{}'".format(role_name),
737                 ) as e:
738 1                     self.topic.delete(self.fake_session, rid)
739 1                 self.assertEqual(
740                     e.exception.http_code,
741                     HTTPStatus.FORBIDDEN,
742                     "Wrong HTTP status code",
743                 )
744 1                 self.assertIn(
745                     "you cannot delete role '{}'".format(role_name),
746                     norm(str(e.exception)),
747                     "Wrong exception text",
748                 )
749 1         with self.subTest(i=i + 1):
750 1             rid = str(uuid4())
751 1             name = "other-role-name"
752 1             role = {"_id": rid, "name": name}
753 1             self.auth.get_role_list.return_value = [role]
754 1             self.auth.get_role.return_value = role
755 1             self.auth.get_user_list.return_value = [
756                 {
757                     "_id": str(uuid4()),
758                     "username": self.test_name,
759                     "project_role_mappings": [{"project": str(uuid4()), "role": rid}],
760                 }
761             ]
762 1             with self.assertRaises(
763                 EngineException, msg="Accepted deletion of used role"
764             ) as e:
765 1                 self.topic.delete(self.fake_session, rid)
766 1             self.assertEqual(
767                 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
768             )
769 1             self.assertIn(
770                 "role '{}' ({}) is being used by user '{}'".format(
771                     name, rid, self.test_name
772                 ),
773                 norm(str(e.exception)),
774                 "Wrong exception text",
775             )
776
777
778 1 class Test_UserTopicAuth(TestCase):
779 1     @classmethod
780 1     def setUpClass(cls):
781 1         cls.test_name = "test-user-topic"
782 1         cls.password = "Test@123"
783
784 1     def setUp(self):
785         # self.db = Mock(dbbase.DbBase())
786 1         self.db = DbMemory()
787 1         self.fs = Mock(fsbase.FsBase())
788 1         self.msg = Mock(msgbase.MsgBase())
789 1         self.auth = Mock(authconn.Authconn(None, None, None))
790 1         self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth)
791 1         self.fake_session = {
792             "username": test_name,
793             "project_id": (test_pid,),
794             "method": None,
795             "admin": True,
796             "force": False,
797             "public": False,
798             "allow_show_user_project_role": True,
799         }
800 1         self.topic.check_quota = Mock(return_value=None)  # skip quota
801
802 1     def test_new_user(self):
803 1         uid1 = str(uuid4())
804 1         pid = str(uuid4())
805 1         self.auth.get_user_list.return_value = []
806 1         self.auth.get_project.return_value = {"_id": pid, "name": "some_project"}
807 1         self.auth.create_user.return_value = {"_id": uid1, "username": self.test_name}
808 1         with self.subTest(i=1):
809 1             rollback = []
810 1             rid = str(uuid4())
811 1             self.auth.get_role.return_value = {"_id": rid, "name": "some_role"}
812 1             prms_in = [{"project": "some_project", "role": "some_role"}]
813 1             prms_out = [{"project": pid, "role": rid}]
814 1             uid2, oid = self.topic.new(
815                 rollback,
816                 self.fake_session,
817                 {
818                     "username": self.test_name,
819                     "password": self.password,
820                     "project_role_mappings": prms_in,
821                 },
822             )
823 1             self.assertEqual(len(rollback), 1, "Wrong rollback length")
824 1             self.assertEqual(uid2, uid1, "Wrong project identifier")
825 1             content = self.auth.create_user.call_args[0][0]
826 1             self.assertEqual(content["username"], self.test_name, "Wrong project name")
827 1             self.assertEqual(content["password"], self.password, "Wrong password")
828 1             self.assertEqual(
829                 content["project_role_mappings"],
830                 prms_out,
831                 "Wrong project-role mappings",
832             )
833 1             self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
834 1             self.assertEqual(
835                 content["_admin"]["modified"],
836                 content["_admin"]["created"],
837                 "Wrong modification time",
838             )
839 1         with self.subTest(i=2):
840 1             rollback = []
841 1             def_rid = str(uuid4())
842 1             def_role = {"_id": def_rid, "name": "project_admin"}
843 1             self.auth.get_role.return_value = def_role
844 1             self.auth.get_role_list.return_value = [def_role]
845 1             prms_out = [{"project": pid, "role": def_rid}]
846 1             uid2, oid = self.topic.new(
847                 rollback,
848                 self.fake_session,
849                 {
850                     "username": self.test_name,
851                     "password": self.password,
852                     "projects": ["some_project"],
853                 },
854             )
855 1             self.assertEqual(len(rollback), 1, "Wrong rollback length")
856 1             self.assertEqual(uid2, uid1, "Wrong project identifier")
857 1             content = self.auth.create_user.call_args[0][0]
858 1             self.assertEqual(content["username"], self.test_name, "Wrong project name")
859 1             self.assertEqual(content["password"], self.password, "Wrong password")
860 1             self.assertEqual(
861                 content["project_role_mappings"],
862                 prms_out,
863                 "Wrong project-role mappings",
864             )
865 1             self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
866 1             self.assertEqual(
867                 content["_admin"]["modified"],
868                 content["_admin"]["created"],
869                 "Wrong modification time",
870             )
871 1         with self.subTest(i=3):
872 1             rollback = []
873 1             with self.assertRaises(
874                 EngineException, msg="Accepted wrong project-role mappings"
875             ) as e:
876 1                 self.topic.new(
877                     rollback,
878                     self.fake_session,
879                     {
880                         "username": "other-project-name",
881                         "password": "Other@pwd1",
882                         "project_role_mappings": [{}],
883                     },
884                 )
885 1             self.assertEqual(len(rollback), 0, "Wrong rollback length")
886 1             self.assertEqual(
887                 e.exception.http_code,
888                 HTTPStatus.UNPROCESSABLE_ENTITY,
889                 "Wrong HTTP status code",
890             )
891 1             self.assertIn(
892                 "format error at '{}' '{}'".format(
893                     "project_role_mappings:{}", "'{}' is a required property"
894                 ).format(0, "project"),
895                 norm(str(e.exception)),
896                 "Wrong exception text",
897             )
898 1         with self.subTest(i=4):
899 1             rollback = []
900 1             with self.assertRaises(EngineException, msg="Accepted wrong projects") as e:
901 1                 self.topic.new(
902                     rollback,
903                     self.fake_session,
904                     {
905                         "username": "other-project-name",
906                         "password": "Other@pwd1",
907                         "projects": [],
908                     },
909                 )
910 1             self.assertEqual(len(rollback), 0, "Wrong rollback length")
911 1             self.assertEqual(
912                 e.exception.http_code,
913                 HTTPStatus.UNPROCESSABLE_ENTITY,
914                 "Wrong HTTP status code",
915             )
916 1             self.assertIn(
917                 "format error at '{}' '{}'".format(
918                     "projects", "{} is too short"
919                 ).format([]),
920                 norm(str(e.exception)),
921                 "Wrong exception text",
922             )
923
924 1     def test_edit_user(self):
925 1         now = time()
926 1         uid = str(uuid4())
927 1         pid1 = str(uuid4())
928 1         rid1 = str(uuid4())
929 1         prms = [
930             {
931                 "project": pid1,
932                 "project_name": "project-1",
933                 "role": rid1,
934                 "role_name": "role-1",
935             }
936         ]
937 1         user = {
938             "_id": uid,
939             "username": self.test_name,
940             "project_role_mappings": prms,
941             "_admin": {"created": now, "modified": now},
942         }
943 1         with self.subTest(i=1):
944 1             self.auth.get_user_list.side_effect = [[user], []]
945 1             self.auth.get_user.return_value = user
946 1             pid2 = str(uuid4())
947 1             rid2 = str(uuid4())
948 1             self.auth.get_project.side_effect = [
949                 {"_id": pid2, "name": "project-2"},
950                 {"_id": pid1, "name": "project-1"},
951             ]
952 1             self.auth.get_role.side_effect = [
953                 {"_id": rid2, "name": "role-2"},
954                 {"_id": rid1, "name": "role-1"},
955             ]
956 1             new_name = "new-user-name"
957 1             new_pasw = "New@pwd1"
958 1             add_prms = [{"project": pid2, "role": rid2}]
959 1             rem_prms = [{"project": pid1, "role": rid1}]
960 1             self.topic.edit(
961                 self.fake_session,
962                 uid,
963                 {
964                     "username": new_name,
965                     "password": new_pasw,
966                     "add_project_role_mappings": add_prms,
967                     "remove_project_role_mappings": rem_prms,
968                 },
969             )
970 1             content = self.auth.update_user.call_args[0][0]
971 1             self.assertEqual(content["_id"], uid, "Wrong user identifier")
972 1             self.assertEqual(content["username"], new_name, "Wrong user name")
973 1             self.assertEqual(content["password"], new_pasw, "Wrong user password")
974 1             self.assertEqual(
975                 content["add_project_role_mappings"],
976                 add_prms,
977                 "Wrong project-role mappings to add",
978             )
979 1             self.assertEqual(
980                 content["remove_project_role_mappings"],
981                 prms,
982                 "Wrong project-role mappings to remove",
983             )
984 1         with self.subTest(i=2):
985 1             new_name = "other-user-name"
986 1             new_prms = [{}]
987 1             self.auth.get_role_list.side_effect = [[user], []]
988 1             self.auth.get_user_list.side_effect = [[user]]
989 1             with self.assertRaises(
990                 EngineException, msg="Accepted wrong project-role mappings"
991             ) as e:
992 1                 self.topic.edit(
993                     self.fake_session,
994                     uid,
995                     {"username": new_name, "project_role_mappings": new_prms},
996                 )
997 1             self.assertEqual(
998                 e.exception.http_code,
999                 HTTPStatus.UNPROCESSABLE_ENTITY,
1000                 "Wrong HTTP status code",
1001             )
1002 1             self.assertIn(
1003                 "format error at '{}' '{}'".format(
1004                     "project_role_mappings:{}", "'{}' is a required property"
1005                 ).format(0, "project"),
1006                 norm(str(e.exception)),
1007                 "Wrong exception text",
1008             )
1009 1         with self.subTest(i=3):
1010 1             self.auth.get_user_list.side_effect = [[user], []]
1011 1             self.auth.get_user.return_value = user
1012 1             old_password = self.password
1013 1             new_pasw = "New@pwd1"
1014 1             self.topic.edit(
1015                 self.fake_session,
1016                 uid,
1017                 {
1018                     "old_password": old_password,
1019                     "password": new_pasw,
1020                 },
1021             )
1022 1             content = self.auth.update_user.call_args[0][0]
1023 1             self.assertEqual(
1024                 content["old_password"], old_password, "Wrong old password"
1025             )
1026 1             self.assertEqual(content["password"], new_pasw, "Wrong user password")
1027
1028 1     def test_delete_user(self):
1029 1         with self.subTest(i=1):
1030 1             uid = str(uuid4())
1031 1             self.fake_session["username"] = self.test_name
1032 1             user = user = {
1033                 "_id": uid,
1034                 "username": "other-user-name",
1035                 "project_role_mappings": [],
1036             }
1037 1             self.auth.get_user.return_value = user
1038 1             self.auth.delete_user.return_value = {"deleted": 1}
1039 1             rc = self.topic.delete(self.fake_session, uid)
1040 1             self.assertEqual(rc, {"deleted": 1}, "Wrong user deletion return info")
1041 1             self.assertEqual(
1042                 self.auth.get_user.call_args[0][0], uid, "Wrong user identifier"
1043             )
1044 1             self.assertEqual(
1045                 self.auth.delete_user.call_args[0][0], uid, "Wrong user identifier"
1046             )
1047
1048 1     def test_conflict_on_new(self):
1049 1         with self.subTest(i=1):
1050 1             rollback = []
1051 1             uid = str(uuid4())
1052 1             with self.assertRaises(
1053                 EngineException, msg="Accepted uuid as username"
1054             ) as e:
1055 1                 self.topic.new(
1056                     rollback,
1057                     self.fake_session,
1058                     {
1059                         "username": uid,
1060                         "password": self.password,
1061                         "projects": [test_pid],
1062                     },
1063                 )
1064 1             self.assertEqual(len(rollback), 0, "Wrong rollback length")
1065 1             self.assertEqual(
1066                 e.exception.http_code,
1067                 HTTPStatus.UNPROCESSABLE_ENTITY,
1068                 "Wrong HTTP status code",
1069             )
1070 1             self.assertIn(
1071                 "username '{}' cannot have a uuid format".format(uid),
1072                 norm(str(e.exception)),
1073                 "Wrong exception text",
1074             )
1075 1         with self.subTest(i=2):
1076 1             rollback = []
1077 1             self.auth.get_user_list.return_value = [
1078                 {"_id": str(uuid4()), "username": self.test_name}
1079             ]
1080 1             with self.assertRaises(
1081                 EngineException, msg="Accepted existing username"
1082             ) as e:
1083 1                 self.topic.new(
1084                     rollback,
1085                     self.fake_session,
1086                     {
1087                         "username": self.test_name,
1088                         "password": self.password,
1089                         "projects": [test_pid],
1090                     },
1091                 )
1092 1             self.assertEqual(len(rollback), 0, "Wrong rollback length")
1093 1             self.assertEqual(
1094                 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1095             )
1096 1             self.assertIn(
1097                 "username '{}' is already used".format(self.test_name),
1098                 norm(str(e.exception)),
1099                 "Wrong exception text",
1100             )
1101 1         with self.subTest(i=3):
1102 1             rollback = []
1103 1             self.auth.get_user_list.return_value = []
1104 1             self.auth.get_role_list.side_effect = [[], []]
1105 1             with self.assertRaises(
1106                 AuthconnNotFoundException, msg="Accepted user without default role"
1107             ) as e:
1108 1                 self.topic.new(
1109                     rollback,
1110                     self.fake_session,
1111                     {
1112                         "username": self.test_name,
1113                         "password": self.password,
1114                         "projects": [str(uuid4())],
1115                     },
1116                 )
1117 1             self.assertEqual(len(rollback), 0, "Wrong rollback length")
1118 1             self.assertEqual(
1119                 e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code"
1120             )
1121 1             self.assertIn(
1122                 "can't find default role for user '{}'".format(self.test_name),
1123                 norm(str(e.exception)),
1124                 "Wrong exception text",
1125             )
1126
1127 1     def test_conflict_on_edit(self):
1128 1         uid = str(uuid4())
1129 1         with self.subTest(i=1):
1130 1             self.auth.get_user_list.return_value = [
1131                 {"_id": uid, "username": self.test_name}
1132             ]
1133 1             new_name = str(uuid4())
1134 1             with self.assertRaises(
1135                 EngineException, msg="Accepted uuid as username"
1136             ) as e:
1137 1                 self.topic.edit(self.fake_session, uid, {"username": new_name})
1138 1             self.assertEqual(
1139                 e.exception.http_code,
1140                 HTTPStatus.UNPROCESSABLE_ENTITY,
1141                 "Wrong HTTP status code",
1142             )
1143 1             self.assertIn(
1144                 "username '{}' cannot have an uuid format".format(new_name),
1145                 norm(str(e.exception)),
1146                 "Wrong exception text",
1147             )
1148 1         with self.subTest(i=2):
1149 1             self.auth.get_user_list.return_value = [
1150                 {"_id": uid, "username": self.test_name}
1151             ]
1152 1             self.auth.get_role_list.side_effect = [[], []]
1153 1             with self.assertRaises(
1154                 AuthconnNotFoundException, msg="Accepted user without default role"
1155             ) as e:
1156 1                 self.topic.edit(self.fake_session, uid, {"projects": [str(uuid4())]})
1157 1             self.assertEqual(
1158                 e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code"
1159             )
1160 1             self.assertIn(
1161                 "can't find a default role for user '{}'".format(self.test_name),
1162                 norm(str(e.exception)),
1163                 "Wrong exception text",
1164             )
1165 1         with self.subTest(i=3):
1166 1             admin_uid = str(uuid4())
1167 1             self.auth.get_user_list.return_value = [
1168                 {"_id": admin_uid, "username": "admin"}
1169             ]
1170 1             with self.assertRaises(
1171                 EngineException,
1172                 msg="Accepted removing system_admin role from admin user",
1173             ) as e:
1174 1                 self.topic.edit(
1175                     self.fake_session,
1176                     admin_uid,
1177                     {
1178                         "remove_project_role_mappings": [
1179                             {"project": "admin", "role": "system_admin"}
1180                         ]
1181                     },
1182                 )
1183 1             self.assertEqual(
1184                 e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code"
1185             )
1186 1             self.assertIn(
1187                 "you cannot remove system_admin role from admin user",
1188                 norm(str(e.exception)),
1189                 "Wrong exception text",
1190             )
1191 1         with self.subTest(i=4):
1192 1             new_name = "new-user-name"
1193 1             self.auth.get_user_list.side_effect = [
1194                 [{"_id": uid, "name": self.test_name}],
1195                 [{"_id": str(uuid4()), "name": new_name}],
1196             ]
1197 1             with self.assertRaises(
1198                 EngineException, msg="Accepted existing username"
1199             ) as e:
1200 1                 self.topic.edit(self.fake_session, uid, {"username": new_name})
1201 1             self.assertEqual(
1202                 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1203             )
1204 1             self.assertIn(
1205                 "username '{}' is already used".format(new_name),
1206                 norm(str(e.exception)),
1207                 "Wrong exception text",
1208             )
1209
1210 1     def test_conflict_on_del(self):
1211 1         with self.subTest(i=1):
1212 1             uid = str(uuid4())
1213 1             self.fake_session["username"] = self.test_name
1214 1             user = user = {
1215                 "_id": uid,
1216                 "username": self.test_name,
1217                 "project_role_mappings": [],
1218             }
1219 1             self.auth.get_user.return_value = user
1220 1             with self.assertRaises(
1221                 EngineException, msg="Accepted deletion of own user"
1222             ) as e:
1223 1                 self.topic.delete(self.fake_session, uid)
1224 1             self.assertEqual(
1225                 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1226             )
1227 1             self.assertIn(
1228                 "you cannot delete your own login user",
1229                 norm(str(e.exception)),
1230                 "Wrong exception text",
1231             )
1232
1233 1     def test_user_management(self):
1234 1         self.config = {
1235             "user_management": True,
1236             "pwd_expire_days": 30,
1237             "max_pwd_attempt": 5,
1238             "account_expire_days": 90,
1239             "version": "dev",
1240             "deviceVendor": "test",
1241             "deviceProduct": "test",
1242         }
1243 1         self.permissions = {"admin": True, "default": True}
1244 1         now = time()
1245 1         rid = str(uuid4())
1246 1         role = {
1247             "_id": rid,
1248             "name": self.test_name,
1249             "permissions": self.permissions,
1250             "_admin": {"created": now, "modified": now},
1251         }
1252 1         self.db.create("roles", role)
1253 1         admin_user = {
1254             "_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500",
1255             "username": "admin",
1256             "password": "bf0d9f988ad9b404464cf8c8749b298209b05fd404119bae0c11e247efbbc4cb",
1257             "_admin": {
1258                 "created": 1663058370.7721832,
1259                 "modified": 1663681183.5651639,
1260                 "salt": "37587e7e0c2f4dbfb9416f3fb5543e2b",
1261                 "last_token_time": 1666876472.2962265,
1262                 "user_status": "always-active",
1263                 "retry_count": 0,
1264             },
1265             "project_role_mappings": [
1266                 {"project": "a595ce4e-09dc-4b24-9d6f-e723830bc66b", "role": rid}
1267             ],
1268         }
1269 1         self.db.create("users", admin_user)
1270 1         with self.subTest(i=1):
1271 1             self.user_create = AuthconnInternal(self.config, self.db, self.permissions)
1272 1             user_info = {"username": "user_mgmt_true", "password": "Test@123"}
1273 1             self.user_create.create_user(user_info)
1274 1             user = self.db.get_one("users", {"username": user_info["username"]})
1275 1             self.assertEqual(user["username"], user_info["username"], "Wrong user name")
1276 1             self.assertEqual(
1277                 user["_admin"]["user_status"], "active", "User status is unknown"
1278             )
1279 1             self.assertIn("password_expire_time", user["_admin"], "Key is not there")
1280 1             self.assertIn("account_expire_time", user["_admin"], "Key is not there")
1281 1         with self.subTest(i=2):
1282 1             self.user_update = AuthconnInternal(self.config, self.db, self.permissions)
1283 1             locked_user = {
1284                 "username": "user_lock",
1285                 "password": "c94ba8cfe81985cf5c84dff16d5bac95814ab17e44a8871755eb4cf3a27b7d3d",
1286                 "_admin": {
1287                     "created": 1667207552.2191198,
1288                     "modified": 1667207552.2191815,
1289                     "salt": "560a5d51b1d64bb4b9cae0ccff3f1102",
1290                     "user_status": "locked",
1291                     "password_expire_time": 1667207552.2191815,
1292                     "account_expire_time": 1674983552.2191815,
1293                     "retry_count": 5,
1294                     "last_token_time": 1667207552.2191815,
1295                 },
1296                 "_id": "73bbbb71-ed38-4b79-9f58-ece19e7e32d6",
1297             }
1298 1             self.db.create("users", locked_user)
1299 1             user_info = {
1300                 "_id": "73bbbb71-ed38-4b79-9f58-ece19e7e32d6",
1301                 "system_admin_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500",
1302                 "unlock": True,
1303             }
1304 1             self.assertEqual(
1305                 locked_user["_admin"]["user_status"], "locked", "User status is unknown"
1306             )
1307 1             self.user_update.update_user(user_info)
1308 1             user = self.db.get_one("users", {"username": locked_user["username"]})
1309 1             self.assertEqual(
1310                 user["username"], locked_user["username"], "Wrong user name"
1311             )
1312 1             self.assertEqual(
1313                 user["_admin"]["user_status"], "active", "User status is unknown"
1314             )
1315 1             self.assertEqual(user["_admin"]["retry_count"], 0, "retry_count is unknown")
1316 1         with self.subTest(i=3):
1317 1             self.user_update = AuthconnInternal(self.config, self.db, self.permissions)
1318 1             expired_user = {
1319                 "username": "user_expire",
1320                 "password": "c94ba8cfe81985cf5c84dff16d5bac95814ab17e44a8871755eb4cf3a27b7d3d",
1321                 "_admin": {
1322                     "created": 1665602087.601298,
1323                     "modified": 1665636442.1245084,
1324                     "salt": "560a5d51b1d64bb4b9cae0ccff3f1102",
1325                     "user_status": "expired",
1326                     "password_expire_time": 1668248628.2191815,
1327                     "account_expire_time": 1666952628.2191815,
1328                     "retry_count": 0,
1329                     "last_token_time": 1666779828.2171815,
1330                 },
1331                 "_id": "3266430f-8222-407f-b08f-3a242504ab94",
1332             }
1333 1             self.db.create("users", expired_user)
1334 1             user_info = {
1335                 "_id": "3266430f-8222-407f-b08f-3a242504ab94",
1336                 "system_admin_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500",
1337                 "renew": True,
1338             }
1339 1             self.assertEqual(
1340                 expired_user["_admin"]["user_status"],
1341                 "expired",
1342                 "User status is unknown",
1343             )
1344 1             self.user_update.update_user(user_info)
1345 1             user = self.db.get_one("users", {"username": expired_user["username"]})
1346 1             self.assertEqual(
1347                 user["username"], expired_user["username"], "Wrong user name"
1348             )
1349 1             self.assertEqual(
1350                 user["_admin"]["user_status"], "active", "User status is unknown"
1351             )
1352 1             self.assertGreater(
1353                 user["_admin"]["account_expire_time"],
1354                 expired_user["_admin"]["account_expire_time"],
1355                 "User expire time is not get extended",
1356             )
1357 1         with self.subTest(i=4):
1358 1             self.config.update({"user_management": False})
1359 1             self.user_create = AuthconnInternal(self.config, self.db, self.permissions)
1360 1             user_info = {"username": "user_mgmt_false", "password": "Test@123"}
1361 1             self.user_create.create_user(user_info)
1362 1             user = self.db.get_one("users", {"username": user_info["username"]})
1363 1             self.assertEqual(user["username"], user_info["username"], "Wrong user name")
1364 1             self.assertEqual(
1365                 user["_admin"]["user_status"], "active", "User status is unknown"
1366             )
1367 1             self.assertNotIn("password_expire_time", user["_admin"], "Key is not there")
1368 1             self.assertNotIn("account_expire_time", user["_admin"], "Key is not there")
1369
1370
1371 1 class Test_CommonVimWimSdn(TestCase):
1372 1     @classmethod
1373 1     def setUpClass(cls):
1374 1         cls.test_name = "test-cim-topic"  # CIM = Common Infrastructure Manager
1375
1376 1     def setUp(self):
1377 1         self.db = Mock(dbbase.DbBase())
1378 1         self.fs = Mock(fsbase.FsBase())
1379 1         self.msg = Mock(msgbase.MsgBase())
1380 1         self.auth = Mock(authconn.Authconn(None, None, None))
1381 1         self.topic = CommonVimWimSdn(self.db, self.fs, self.msg, self.auth)
1382         # Use WIM schemas for testing because they are the simplest
1383 1         self.topic._send_msg = Mock()
1384 1         self.topic.topic = "wims"
1385 1         self.topic.schema_new = validation.wim_account_new_schema
1386 1         self.topic.schema_edit = validation.wim_account_edit_schema
1387 1         self.fake_session = {
1388             "username": test_name,
1389             "project_id": (test_pid,),
1390             "method": None,
1391             "admin": True,
1392             "force": False,
1393             "public": False,
1394             "allow_show_user_project_role": True,
1395         }
1396 1         self.topic.check_quota = Mock(return_value=None)  # skip quota
1397
1398 1     def test_new_cvws(self):
1399 1         test_url = "http://0.0.0.0:0"
1400 1         with self.subTest(i=1):
1401 1             rollback = []
1402 1             test_type = "fake"
1403 1             self.db.get_one.return_value = None
1404 1             self.db.create.side_effect = lambda self, content: content["_id"]
1405 1             cid, oid = self.topic.new(
1406                 rollback,
1407                 self.fake_session,
1408                 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type},
1409             )
1410 1             self.assertEqual(len(rollback), 1, "Wrong rollback length")
1411 1             args = self.db.create.call_args[0]
1412 1             content = args[1]
1413 1             self.assertEqual(args[0], self.topic.topic, "Wrong topic")
1414 1             self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
1415 1             self.assertEqual(content["name"], self.test_name, "Wrong CIM name")
1416 1             self.assertEqual(content["wim_url"], test_url, "Wrong URL")
1417 1             self.assertEqual(content["wim_type"], test_type, "Wrong CIM type")
1418 1             self.assertEqual(content["schema_version"], "1.11", "Wrong schema version")
1419 1             self.assertEqual(content["op_id"], oid, "Wrong operation identifier")
1420 1             self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
1421 1             self.assertEqual(
1422                 content["_admin"]["modified"],
1423                 content["_admin"]["created"],
1424                 "Wrong modification time",
1425             )
1426 1             self.assertEqual(
1427                 content["_admin"]["operationalState"],
1428                 "PROCESSING",
1429                 "Wrong operational state",
1430             )
1431 1             self.assertEqual(
1432                 content["_admin"]["projects_read"],
1433                 [test_pid],
1434                 "Wrong read-only projects",
1435             )
1436 1             self.assertEqual(
1437                 content["_admin"]["projects_write"],
1438                 [test_pid],
1439                 "Wrong read/write projects",
1440             )
1441 1             self.assertIsNone(
1442                 content["_admin"]["current_operation"], "Wrong current operation"
1443             )
1444 1             self.assertEqual(
1445                 len(content["_admin"]["operations"]), 1, "Wrong number of operations"
1446             )
1447 1             operation = content["_admin"]["operations"][0]
1448 1             self.assertEqual(
1449                 operation["lcmOperationType"], "create", "Wrong operation type"
1450             )
1451 1             self.assertEqual(
1452                 operation["operationState"], "PROCESSING", "Wrong operation state"
1453             )
1454 1             self.assertGreater(
1455                 operation["startTime"],
1456                 content["_admin"]["created"],
1457                 "Wrong operation start time",
1458             )
1459 1             self.assertGreater(
1460                 operation["statusEnteredTime"],
1461                 content["_admin"]["created"],
1462                 "Wrong operation status enter time",
1463             )
1464 1             self.assertEqual(
1465                 operation["detailed-status"], "", "Wrong operation detailed status info"
1466             )
1467 1             self.assertIsNone(
1468                 operation["operationParams"], "Wrong operation parameters"
1469             )
1470         # This test is disabled. From Feature 8030 we admit all WIM/SDN types
1471         # with self.subTest(i=2):
1472         #     rollback = []
1473         #     test_type = "bad_type"
1474         #     with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e:
1475         #         self.topic.new(rollback, self.fake_session,
1476         #                        {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
1477         #     self.assertEqual(len(rollback), 0, "Wrong rollback length")
1478         #     self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
1479         #     self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type,""),
1480         #                   norm(str(e.exception)), "Wrong exception text")
1481
1482 1     def test_conflict_on_new(self):
1483 1         with self.subTest(i=1):
1484 1             rollback = []
1485 1             test_url = "http://0.0.0.0:0"
1486 1             test_type = "fake"
1487 1             self.db.get_one.return_value = {"_id": str(uuid4()), "name": self.test_name}
1488 1             with self.assertRaises(
1489                 EngineException, msg="Accepted existing CIM name"
1490             ) as e:
1491 1                 self.topic.new(
1492                     rollback,
1493                     self.fake_session,
1494                     {
1495                         "name": self.test_name,
1496                         "wim_url": test_url,
1497                         "wim_type": test_type,
1498                     },
1499                 )
1500 1             self.assertEqual(len(rollback), 0, "Wrong rollback length")
1501 1             self.assertEqual(
1502                 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1503             )
1504 1             self.assertIn(
1505                 "name '{}' already exists for {}".format(
1506                     self.test_name, self.topic.topic
1507                 ),
1508                 norm(str(e.exception)),
1509                 "Wrong exception text",
1510             )
1511
1512 1     def test_edit_cvws(self):
1513 1         now = time()
1514 1         cid = str(uuid4())
1515 1         test_url = "http://0.0.0.0:0"
1516 1         test_type = "fake"
1517 1         cvws = {
1518             "_id": cid,
1519             "name": self.test_name,
1520             "wim_url": test_url,
1521             "wim_type": test_type,
1522             "_admin": {
1523                 "created": now,
1524                 "modified": now,
1525                 "operations": [{"lcmOperationType": "create"}],
1526             },
1527         }
1528 1         with self.subTest(i=1):
1529 1             new_name = "new-cim-name"
1530 1             new_url = "https://1.1.1.1:1"
1531 1             new_type = "onos"
1532 1             self.db.get_one.side_effect = [cvws, None]
1533 1             self.db.replace.return_value = {"updated": 1}
1534             # self.db.encrypt.side_effect = [b64str(), b64str()]
1535 1             self.topic.edit(
1536                 self.fake_session,
1537                 cid,
1538                 {"name": new_name, "wim_url": new_url, "wim_type": new_type},
1539             )
1540 1             args = self.db.replace.call_args[0]
1541 1             content = args[2]
1542 1             self.assertEqual(args[0], self.topic.topic, "Wrong topic")
1543 1             self.assertEqual(args[1], cid, "Wrong CIM identifier")
1544 1             self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
1545 1             self.assertEqual(content["name"], new_name, "Wrong CIM name")
1546 1             self.assertEqual(content["wim_type"], new_type, "Wrong CIM type")
1547 1             self.assertEqual(content["wim_url"], new_url, "Wrong URL")
1548 1             self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
1549 1             self.assertGreater(
1550                 content["_admin"]["modified"],
1551                 content["_admin"]["created"],
1552                 "Wrong modification time",
1553             )
1554 1             self.assertEqual(
1555                 len(content["_admin"]["operations"]), 2, "Wrong number of operations"
1556             )
1557 1             operation = content["_admin"]["operations"][1]
1558 1             self.assertEqual(
1559                 operation["lcmOperationType"], "edit", "Wrong operation type"
1560             )
1561 1             self.assertEqual(
1562                 operation["operationState"], "PROCESSING", "Wrong operation state"
1563             )
1564 1             self.assertGreater(
1565                 operation["startTime"],
1566                 content["_admin"]["modified"],
1567                 "Wrong operation start time",
1568             )
1569 1             self.assertGreater(
1570                 operation["statusEnteredTime"],
1571                 content["_admin"]["modified"],
1572                 "Wrong operation status enter time",
1573             )
1574 1             self.assertEqual(
1575                 operation["detailed-status"], "", "Wrong operation detailed status info"
1576             )
1577 1             self.assertIsNone(
1578                 operation["operationParams"], "Wrong operation parameters"
1579             )
1580 1         with self.subTest(i=2):
1581 1             self.db.get_one.side_effect = [cvws]
1582 1             with self.assertRaises(EngineException, msg="Accepted wrong property") as e:
1583 1                 self.topic.edit(
1584                     self.fake_session,
1585                     str(uuid4()),
1586                     {"name": "new-name", "extra_prop": "anything"},
1587                 )
1588 1             self.assertEqual(
1589                 e.exception.http_code,
1590                 HTTPStatus.UNPROCESSABLE_ENTITY,
1591                 "Wrong HTTP status code",
1592             )
1593 1             self.assertIn(
1594                 "format error '{}'".format(
1595                     "additional properties are not allowed ('{}' was unexpected)"
1596                 ).format("extra_prop"),
1597                 norm(str(e.exception)),
1598                 "Wrong exception text",
1599             )
1600
1601 1     def test_conflict_on_edit(self):
1602 1         with self.subTest(i=1):
1603 1             cid = str(uuid4())
1604 1             new_name = "new-cim-name"
1605 1             self.db.get_one.side_effect = [
1606                 {"_id": cid, "name": self.test_name},
1607                 {"_id": str(uuid4()), "name": new_name},
1608             ]
1609 1             with self.assertRaises(
1610                 EngineException, msg="Accepted existing CIM name"
1611             ) as e:
1612 1                 self.topic.edit(self.fake_session, cid, {"name": new_name})
1613 1             self.assertEqual(
1614                 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1615             )
1616 1             self.assertIn(
1617                 "name '{}' already exists for {}".format(new_name, self.topic.topic),
1618                 norm(str(e.exception)),
1619                 "Wrong exception text",
1620             )
1621
1622 1     def test_delete_cvws(self):
1623 1         cid = str(uuid4())
1624 1         ro_pid = str(uuid4())
1625 1         rw_pid = str(uuid4())
1626 1         cvws = {"_id": cid, "name": self.test_name}
1627 1         self.db.get_list.return_value = []
1628 1         with self.subTest(i=1):
1629 1             cvws["_admin"] = {
1630                 "projects_read": [test_pid, ro_pid, rw_pid],
1631                 "projects_write": [test_pid, rw_pid],
1632             }
1633 1             self.db.get_one.return_value = cvws
1634 1             oid = self.topic.delete(self.fake_session, cid)
1635 1             self.assertIsNone(oid, "Wrong operation identifier")
1636 1             self.assertEqual(
1637                 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1638             )
1639 1             self.assertEqual(
1640                 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1641             )
1642 1             self.assertEqual(
1643                 self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic"
1644             )
1645 1             self.assertEqual(
1646                 self.db.set_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1647             )
1648 1             self.assertEqual(
1649                 self.db.set_one.call_args[1]["update_dict"],
1650                 None,
1651                 "Wrong read-only projects update",
1652             )
1653 1             self.assertEqual(
1654                 self.db.set_one.call_args[1]["pull_list"],
1655                 {
1656                     "_admin.projects_read": (test_pid,),
1657                     "_admin.projects_write": (test_pid,),
1658                 },
1659                 "Wrong read/write projects update",
1660             )
1661 1             self.topic._send_msg.assert_not_called()
1662 1         with self.subTest(i=2):
1663 1             now = time()
1664 1             cvws["_admin"] = {
1665                 "projects_read": [test_pid],
1666                 "projects_write": [test_pid],
1667                 "operations": [],
1668             }
1669 1             self.db.get_one.return_value = cvws
1670 1             oid = self.topic.delete(self.fake_session, cid)
1671 1             self.assertEqual(oid, cid + ":0", "Wrong operation identifier")
1672 1             self.assertEqual(
1673                 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1674             )
1675 1             self.assertEqual(
1676                 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1677             )
1678 1             self.assertEqual(
1679                 self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic"
1680             )
1681 1             self.assertEqual(
1682                 self.db.set_one.call_args[0][1]["_id"], cid, "Wrong user identifier"
1683             )
1684 1             self.assertEqual(
1685                 self.db.set_one.call_args[1]["update_dict"],
1686                 {"_admin.to_delete": True},
1687                 "Wrong _admin.to_delete update",
1688             )
1689 1             operation = self.db.set_one.call_args[1]["push"]["_admin.operations"]
1690 1             self.assertEqual(
1691                 operation["lcmOperationType"], "delete", "Wrong operation type"
1692             )
1693 1             self.assertEqual(
1694                 operation["operationState"], "PROCESSING", "Wrong operation state"
1695             )
1696 1             self.assertEqual(
1697                 operation["detailed-status"], "", "Wrong operation detailed status"
1698             )
1699 1             self.assertIsNone(
1700                 operation["operationParams"], "Wrong operation parameters"
1701             )
1702 1             self.assertGreater(
1703                 operation["startTime"], now, "Wrong operation start time"
1704             )
1705 1             self.assertGreater(
1706                 operation["statusEnteredTime"], now, "Wrong operation status enter time"
1707             )
1708 1             self.topic._send_msg.assert_called_once_with(
1709                 "delete", {"_id": cid, "op_id": cid + ":0"}, not_send_msg=None
1710             )
1711 1         with self.subTest(i=3):
1712 1             cvws["_admin"] = {
1713                 "projects_read": [],
1714                 "projects_write": [],
1715                 "operations": [],
1716             }
1717 1             self.db.get_one.return_value = cvws
1718 1             self.topic._send_msg.reset_mock()
1719 1             self.db.get_one.reset_mock()
1720 1             self.db.del_one.reset_mock()
1721 1             self.fake_session["force"] = True  # to force deletion
1722 1             self.fake_session["admin"] = True  # to force deletion
1723 1             self.fake_session["project_id"] = []  # to force deletion
1724 1             oid = self.topic.delete(self.fake_session, cid)
1725 1             self.assertIsNone(oid, "Wrong operation identifier")
1726 1             self.assertEqual(
1727                 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1728             )
1729 1             self.assertEqual(
1730                 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1731             )
1732 1             self.assertEqual(
1733                 self.db.del_one.call_args[0][0], self.topic.topic, "Wrong topic"
1734             )
1735 1             self.assertEqual(
1736                 self.db.del_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1737             )
1738 1             self.topic._send_msg.assert_called_once_with(
1739                 "deleted", {"_id": cid, "op_id": None}, not_send_msg=None
1740             )
1741
1742
1743 1 if __name__ == "__main__":
1744 0     unittest.main()