Feature 10941: User Management Enhancements
[osm/NBI.git] / osm_nbi / tests / test_admin_topics.py
1 #! /usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 __author__ = "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com"
18 __date__ = "$2019-10-019"
19
20 import unittest
21 import random
22 from unittest import TestCase
23 from unittest.mock import Mock, patch, call
24 from uuid import uuid4
25 from http import HTTPStatus
26 from time import time
27 from osm_common import dbbase, fsbase, msgbase
28 from osm_common.dbmemory import DbMemory
29 from osm_nbi import authconn, validation
30 from osm_nbi.admin_topics import (
31 ProjectTopicAuth,
32 RoleTopicAuth,
33 UserTopicAuth,
34 CommonVimWimSdn,
35 VcaTopic,
36 )
37 from osm_nbi.engine import EngineException
38 from osm_nbi.authconn import AuthconnNotFoundException
39 from osm_nbi.authconn_internal import AuthconnInternal
40
41
42 test_pid = str(uuid4())
43 test_name = "test-user"
44
45
46 def norm(str):
47 """Normalize string for checking"""
48 return " ".join(str.strip().split()).lower()
49
50
51 class TestVcaTopic(TestCase):
52 def setUp(self):
53 self.db = Mock(dbbase.DbBase())
54 self.fs = Mock(fsbase.FsBase())
55 self.msg = Mock(msgbase.MsgBase())
56 self.auth = Mock(authconn.Authconn(None, None, None))
57 self.vca_topic = VcaTopic(self.db, self.fs, self.msg, self.auth)
58
59 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_new")
60 def test_format_on_new(self, mock_super_format_on_new):
61 content = {
62 "_id": "id",
63 "secret": "encrypted_secret",
64 "cacert": "encrypted_cacert",
65 }
66 self.db.encrypt.side_effect = ["secret", "cacert"]
67 mock_super_format_on_new.return_value = "1234"
68
69 oid = self.vca_topic.format_on_new(content)
70
71 self.assertEqual(oid, "1234")
72 self.assertEqual(content["secret"], "secret")
73 self.assertEqual(content["cacert"], "cacert")
74 self.db.encrypt.assert_has_calls(
75 [
76 call("encrypted_secret", schema_version="1.11", salt="id"),
77 call("encrypted_cacert", schema_version="1.11", salt="id"),
78 ]
79 )
80 mock_super_format_on_new.assert_called_with(content, None, False)
81
82 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_edit")
83 def test_format_on_edit(self, mock_super_format_on_edit):
84 edit_content = {
85 "_id": "id",
86 "secret": "encrypted_secret",
87 "cacert": "encrypted_cacert",
88 }
89 final_content = {
90 "_id": "id",
91 "schema_version": "1.11",
92 }
93 self.db.encrypt.side_effect = ["secret", "cacert"]
94 mock_super_format_on_edit.return_value = "1234"
95
96 oid = self.vca_topic.format_on_edit(final_content, edit_content)
97
98 self.assertEqual(oid, "1234")
99 self.assertEqual(final_content["secret"], "secret")
100 self.assertEqual(final_content["cacert"], "cacert")
101 self.db.encrypt.assert_has_calls(
102 [
103 call("encrypted_secret", schema_version="1.11", salt="id"),
104 call("encrypted_cacert", schema_version="1.11", salt="id"),
105 ]
106 )
107 mock_super_format_on_edit.assert_called()
108
109 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
110 def test_check_conflict_on_del(self, mock_check_conflict_on_del):
111 session = {
112 "project_id": "project-id",
113 "force": False,
114 }
115 _id = "vca-id"
116 db_content = {}
117
118 self.db.get_list.return_value = None
119
120 self.vca_topic.check_conflict_on_del(session, _id, db_content)
121
122 self.db.get_list.assert_called_with(
123 "vim_accounts",
124 {"vca": _id, "_admin.projects_read.cont": "project-id"},
125 )
126 mock_check_conflict_on_del.assert_called_with(session, _id, db_content)
127
128 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
129 def test_check_conflict_on_del_force(self, mock_check_conflict_on_del):
130 session = {
131 "project_id": "project-id",
132 "force": True,
133 }
134 _id = "vca-id"
135 db_content = {}
136
137 self.vca_topic.check_conflict_on_del(session, _id, db_content)
138
139 self.db.get_list.assert_not_called()
140 mock_check_conflict_on_del.assert_not_called()
141
142 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
143 def test_check_conflict_on_del_with_conflict(self, mock_check_conflict_on_del):
144 session = {
145 "project_id": "project-id",
146 "force": False,
147 }
148 _id = "vca-id"
149 db_content = {}
150
151 self.db.get_list.return_value = {"_id": "vim", "vca": "vca-id"}
152
153 with self.assertRaises(EngineException) as context:
154 self.vca_topic.check_conflict_on_del(session, _id, db_content)
155 self.assertEqual(
156 context.exception,
157 EngineException(
158 "There is at least one VIM account using this vca",
159 http_code=HTTPStatus.CONFLICT,
160 ),
161 )
162
163 self.db.get_list.assert_called_with(
164 "vim_accounts",
165 {"vca": _id, "_admin.projects_read.cont": "project-id"},
166 )
167 mock_check_conflict_on_del.assert_not_called()
168
169
170 class Test_ProjectTopicAuth(TestCase):
171 @classmethod
172 def setUpClass(cls):
173 cls.test_name = "test-project-topic"
174
175 def setUp(self):
176 self.db = Mock(dbbase.DbBase())
177 self.fs = Mock(fsbase.FsBase())
178 self.msg = Mock(msgbase.MsgBase())
179 self.auth = Mock(authconn.Authconn(None, None, None))
180 self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth)
181 self.fake_session = {
182 "username": self.test_name,
183 "project_id": (test_pid,),
184 "method": None,
185 "admin": True,
186 "force": False,
187 "public": False,
188 "allow_show_user_project_role": True,
189 }
190 self.topic.check_quota = Mock(return_value=None) # skip quota
191
192 def test_new_project(self):
193 with self.subTest(i=1):
194 rollback = []
195 pid1 = str(uuid4())
196 self.auth.get_project_list.return_value = []
197 self.auth.create_project.return_value = pid1
198 pid2, oid = self.topic.new(
199 rollback, self.fake_session, {"name": self.test_name, "quotas": {}}
200 )
201 self.assertEqual(len(rollback), 1, "Wrong rollback length")
202 self.assertEqual(pid2, pid1, "Wrong project identifier")
203 content = self.auth.create_project.call_args[0][0]
204 self.assertEqual(content["name"], self.test_name, "Wrong project name")
205 self.assertEqual(content["quotas"], {}, "Wrong quotas")
206 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
207 self.assertEqual(
208 content["_admin"]["modified"],
209 content["_admin"]["created"],
210 "Wrong modification time",
211 )
212 with self.subTest(i=2):
213 rollback = []
214 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
215 self.topic.new(
216 rollback,
217 self.fake_session,
218 {"name": "other-project-name", "quotas": {"baditems": 10}},
219 )
220 self.assertEqual(len(rollback), 0, "Wrong rollback length")
221 self.assertEqual(
222 e.exception.http_code,
223 HTTPStatus.UNPROCESSABLE_ENTITY,
224 "Wrong HTTP status code",
225 )
226 self.assertIn(
227 "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format(
228 "baditems"
229 ),
230 norm(str(e.exception)),
231 "Wrong exception text",
232 )
233
234 def test_edit_project(self):
235 now = time()
236 pid = str(uuid4())
237 proj = {
238 "_id": pid,
239 "name": self.test_name,
240 "_admin": {"created": now, "modified": now},
241 }
242 with self.subTest(i=1):
243 self.auth.get_project_list.side_effect = [[proj], []]
244 new_name = "new-project-name"
245 quotas = {
246 "vnfds": random.SystemRandom().randint(0, 100),
247 "nsds": random.SystemRandom().randint(0, 100),
248 }
249 self.topic.edit(
250 self.fake_session, pid, {"name": new_name, "quotas": quotas}
251 )
252 _id, content = self.auth.update_project.call_args[0]
253 self.assertEqual(_id, pid, "Wrong project identifier")
254 self.assertEqual(content["_id"], pid, "Wrong project identifier")
255 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
256 self.assertGreater(
257 content["_admin"]["modified"], now, "Wrong modification time"
258 )
259 self.assertEqual(content["name"], new_name, "Wrong project name")
260 self.assertEqual(content["quotas"], quotas, "Wrong quotas")
261 with self.subTest(i=2):
262 new_name = "other-project-name"
263 quotas = {"baditems": random.SystemRandom().randint(0, 100)}
264 self.auth.get_project_list.side_effect = [[proj], []]
265 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
266 self.topic.edit(
267 self.fake_session, pid, {"name": new_name, "quotas": quotas}
268 )
269 self.assertEqual(
270 e.exception.http_code,
271 HTTPStatus.UNPROCESSABLE_ENTITY,
272 "Wrong HTTP status code",
273 )
274 self.assertIn(
275 "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format(
276 "baditems"
277 ),
278 norm(str(e.exception)),
279 "Wrong exception text",
280 )
281
282 def test_conflict_on_new(self):
283 with self.subTest(i=1):
284 rollback = []
285 pid = str(uuid4())
286 with self.assertRaises(
287 EngineException, msg="Accepted uuid as project name"
288 ) as e:
289 self.topic.new(rollback, self.fake_session, {"name": pid})
290 self.assertEqual(len(rollback), 0, "Wrong rollback length")
291 self.assertEqual(
292 e.exception.http_code,
293 HTTPStatus.UNPROCESSABLE_ENTITY,
294 "Wrong HTTP status code",
295 )
296 self.assertIn(
297 "project name '{}' cannot have an uuid format".format(pid),
298 norm(str(e.exception)),
299 "Wrong exception text",
300 )
301 with self.subTest(i=2):
302 rollback = []
303 self.auth.get_project_list.return_value = [
304 {"_id": test_pid, "name": self.test_name}
305 ]
306 with self.assertRaises(
307 EngineException, msg="Accepted existing project name"
308 ) as e:
309 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
310 self.assertEqual(len(rollback), 0, "Wrong rollback length")
311 self.assertEqual(
312 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
313 )
314 self.assertIn(
315 "project '{}' exists".format(self.test_name),
316 norm(str(e.exception)),
317 "Wrong exception text",
318 )
319
320 def test_conflict_on_edit(self):
321 with self.subTest(i=1):
322 self.auth.get_project_list.return_value = [
323 {"_id": test_pid, "name": self.test_name}
324 ]
325 new_name = str(uuid4())
326 with self.assertRaises(
327 EngineException, msg="Accepted uuid as project name"
328 ) as e:
329 self.topic.edit(self.fake_session, test_pid, {"name": new_name})
330 self.assertEqual(
331 e.exception.http_code,
332 HTTPStatus.UNPROCESSABLE_ENTITY,
333 "Wrong HTTP status code",
334 )
335 self.assertIn(
336 "project name '{}' cannot have an uuid format".format(new_name),
337 norm(str(e.exception)),
338 "Wrong exception text",
339 )
340 with self.subTest(i=2):
341 pid = str(uuid4())
342 self.auth.get_project_list.return_value = [{"_id": pid, "name": "admin"}]
343 with self.assertRaises(
344 EngineException, msg="Accepted renaming of project 'admin'"
345 ) as e:
346 self.topic.edit(self.fake_session, pid, {"name": "new-name"})
347 self.assertEqual(
348 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
349 )
350 self.assertIn(
351 "you cannot rename project 'admin'",
352 norm(str(e.exception)),
353 "Wrong exception text",
354 )
355 with self.subTest(i=3):
356 new_name = "new-project-name"
357 self.auth.get_project_list.side_effect = [
358 [{"_id": test_pid, "name": self.test_name}],
359 [{"_id": str(uuid4()), "name": new_name}],
360 ]
361 with self.assertRaises(
362 EngineException, msg="Accepted existing project name"
363 ) as e:
364 self.topic.edit(self.fake_session, pid, {"name": new_name})
365 self.assertEqual(
366 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
367 )
368 self.assertIn(
369 "project '{}' is already used".format(new_name),
370 norm(str(e.exception)),
371 "Wrong exception text",
372 )
373
374 def test_delete_project(self):
375 with self.subTest(i=1):
376 pid = str(uuid4())
377 self.auth.get_project.return_value = {
378 "_id": pid,
379 "name": "other-project-name",
380 }
381 self.auth.delete_project.return_value = {"deleted": 1}
382 self.auth.get_user_list.return_value = []
383 self.db.get_list.return_value = []
384 rc = self.topic.delete(self.fake_session, pid)
385 self.assertEqual(rc, {"deleted": 1}, "Wrong project deletion return info")
386 self.assertEqual(
387 self.auth.get_project.call_args[0][0], pid, "Wrong project identifier"
388 )
389 self.assertEqual(
390 self.auth.delete_project.call_args[0][0],
391 pid,
392 "Wrong project identifier",
393 )
394
395 def test_conflict_on_del(self):
396 with self.subTest(i=1):
397 self.auth.get_project.return_value = {
398 "_id": test_pid,
399 "name": self.test_name,
400 }
401 with self.assertRaises(
402 EngineException, msg="Accepted deletion of own project"
403 ) as e:
404 self.topic.delete(self.fake_session, self.test_name)
405 self.assertEqual(
406 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
407 )
408 self.assertIn(
409 "you cannot delete your own project",
410 norm(str(e.exception)),
411 "Wrong exception text",
412 )
413 with self.subTest(i=2):
414 self.auth.get_project.return_value = {"_id": str(uuid4()), "name": "admin"}
415 with self.assertRaises(
416 EngineException, msg="Accepted deletion of project 'admin'"
417 ) as e:
418 self.topic.delete(self.fake_session, "admin")
419 self.assertEqual(
420 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
421 )
422 self.assertIn(
423 "you cannot delete project 'admin'",
424 norm(str(e.exception)),
425 "Wrong exception text",
426 )
427 with self.subTest(i=3):
428 pid = str(uuid4())
429 name = "other-project-name"
430 self.auth.get_project.return_value = {"_id": pid, "name": name}
431 self.auth.get_user_list.return_value = [
432 {
433 "_id": str(uuid4()),
434 "username": self.test_name,
435 "project_role_mappings": [{"project": pid, "role": str(uuid4())}],
436 }
437 ]
438 with self.assertRaises(
439 EngineException, msg="Accepted deletion of used project"
440 ) as e:
441 self.topic.delete(self.fake_session, pid)
442 self.assertEqual(
443 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
444 )
445 self.assertIn(
446 "project '{}' ({}) is being used by user '{}'".format(
447 name, pid, self.test_name
448 ),
449 norm(str(e.exception)),
450 "Wrong exception text",
451 )
452 with self.subTest(i=4):
453 self.auth.get_user_list.return_value = []
454 self.db.get_list.return_value = [
455 {
456 "_id": str(uuid4()),
457 "id": self.test_name,
458 "_admin": {"projects_read": [pid], "projects_write": []},
459 }
460 ]
461 with self.assertRaises(
462 EngineException, msg="Accepted deletion of used project"
463 ) as e:
464 self.topic.delete(self.fake_session, pid)
465 self.assertEqual(
466 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
467 )
468 self.assertIn(
469 "project '{}' ({}) is being used by {} '{}'".format(
470 name, pid, "vnf descriptor", self.test_name
471 ),
472 norm(str(e.exception)),
473 "Wrong exception text",
474 )
475
476
477 class Test_RoleTopicAuth(TestCase):
478 @classmethod
479 def setUpClass(cls):
480 cls.test_name = "test-role-topic"
481 cls.test_operations = ["tokens:get"]
482
483 def setUp(self):
484 self.db = Mock(dbbase.DbBase())
485 self.fs = Mock(fsbase.FsBase())
486 self.msg = Mock(msgbase.MsgBase())
487 self.auth = Mock(authconn.Authconn(None, None, None))
488 self.auth.role_permissions = self.test_operations
489 self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth)
490 self.fake_session = {
491 "username": test_name,
492 "project_id": (test_pid,),
493 "method": None,
494 "admin": True,
495 "force": False,
496 "public": False,
497 "allow_show_user_project_role": True,
498 }
499 self.topic.check_quota = Mock(return_value=None) # skip quota
500
501 def test_new_role(self):
502 with self.subTest(i=1):
503 rollback = []
504 rid1 = str(uuid4())
505 perms_in = {"tokens": True}
506 perms_out = {"default": False, "admin": False, "tokens": True}
507 self.auth.get_role_list.return_value = []
508 self.auth.create_role.return_value = rid1
509 rid2, oid = self.topic.new(
510 rollback,
511 self.fake_session,
512 {"name": self.test_name, "permissions": perms_in},
513 )
514 self.assertEqual(len(rollback), 1, "Wrong rollback length")
515 self.assertEqual(rid2, rid1, "Wrong project identifier")
516 content = self.auth.create_role.call_args[0][0]
517 self.assertEqual(content["name"], self.test_name, "Wrong role name")
518 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
519 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
520 self.assertEqual(
521 content["_admin"]["modified"],
522 content["_admin"]["created"],
523 "Wrong modification time",
524 )
525 with self.subTest(i=2):
526 rollback = []
527 with self.assertRaises(
528 EngineException, msg="Accepted wrong permissions"
529 ) as e:
530 self.topic.new(
531 rollback,
532 self.fake_session,
533 {"name": "other-role-name", "permissions": {"projects": True}},
534 )
535 self.assertEqual(len(rollback), 0, "Wrong rollback length")
536 self.assertEqual(
537 e.exception.http_code,
538 HTTPStatus.UNPROCESSABLE_ENTITY,
539 "Wrong HTTP status code",
540 )
541 self.assertIn(
542 "invalid permission '{}'".format("projects"),
543 norm(str(e.exception)),
544 "Wrong exception text",
545 )
546
547 def test_edit_role(self):
548 now = time()
549 rid = str(uuid4())
550 role = {
551 "_id": rid,
552 "name": self.test_name,
553 "permissions": {"tokens": True},
554 "_admin": {"created": now, "modified": now},
555 }
556 with self.subTest(i=1):
557 self.auth.get_role_list.side_effect = [[role], []]
558 self.auth.get_role.return_value = role
559 new_name = "new-role-name"
560 perms_in = {"tokens": False, "tokens:get": True}
561 perms_out = {
562 "default": False,
563 "admin": False,
564 "tokens": False,
565 "tokens:get": True,
566 }
567 self.topic.edit(
568 self.fake_session, rid, {"name": new_name, "permissions": perms_in}
569 )
570 content = self.auth.update_role.call_args[0][0]
571 self.assertEqual(content["_id"], rid, "Wrong role identifier")
572 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
573 self.assertGreater(
574 content["_admin"]["modified"], now, "Wrong modification time"
575 )
576 self.assertEqual(content["name"], new_name, "Wrong role name")
577 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
578 with self.subTest(i=2):
579 new_name = "other-role-name"
580 perms_in = {"tokens": False, "tokens:post": True}
581 self.auth.get_role_list.side_effect = [[role], []]
582 with self.assertRaises(
583 EngineException, msg="Accepted wrong permissions"
584 ) as e:
585 self.topic.edit(
586 self.fake_session, rid, {"name": new_name, "permissions": perms_in}
587 )
588 self.assertEqual(
589 e.exception.http_code,
590 HTTPStatus.UNPROCESSABLE_ENTITY,
591 "Wrong HTTP status code",
592 )
593 self.assertIn(
594 "invalid permission '{}'".format("tokens:post"),
595 norm(str(e.exception)),
596 "Wrong exception text",
597 )
598
599 def test_delete_role(self):
600 with self.subTest(i=1):
601 rid = str(uuid4())
602 role = {"_id": rid, "name": "other-role-name"}
603 self.auth.get_role_list.return_value = [role]
604 self.auth.get_role.return_value = role
605 self.auth.delete_role.return_value = {"deleted": 1}
606 self.auth.get_user_list.return_value = []
607 rc = self.topic.delete(self.fake_session, rid)
608 self.assertEqual(rc, {"deleted": 1}, "Wrong role deletion return info")
609 self.assertEqual(
610 self.auth.get_role_list.call_args[0][0]["_id"],
611 rid,
612 "Wrong role identifier",
613 )
614 self.assertEqual(
615 self.auth.get_role.call_args[0][0], rid, "Wrong role identifier"
616 )
617 self.assertEqual(
618 self.auth.delete_role.call_args[0][0], rid, "Wrong role identifier"
619 )
620
621 def test_conflict_on_new(self):
622 with self.subTest(i=1):
623 rollback = []
624 rid = str(uuid4())
625 with self.assertRaises(
626 EngineException, msg="Accepted uuid as role name"
627 ) as e:
628 self.topic.new(rollback, self.fake_session, {"name": rid})
629 self.assertEqual(len(rollback), 0, "Wrong rollback length")
630 self.assertEqual(
631 e.exception.http_code,
632 HTTPStatus.UNPROCESSABLE_ENTITY,
633 "Wrong HTTP status code",
634 )
635 self.assertIn(
636 "role name '{}' cannot have an uuid format".format(rid),
637 norm(str(e.exception)),
638 "Wrong exception text",
639 )
640 with self.subTest(i=2):
641 rollback = []
642 self.auth.get_role_list.return_value = [
643 {"_id": str(uuid4()), "name": self.test_name}
644 ]
645 with self.assertRaises(
646 EngineException, msg="Accepted existing role name"
647 ) as e:
648 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
649 self.assertEqual(len(rollback), 0, "Wrong rollback length")
650 self.assertEqual(
651 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
652 )
653 self.assertIn(
654 "role name '{}' exists".format(self.test_name),
655 norm(str(e.exception)),
656 "Wrong exception text",
657 )
658
659 def test_conflict_on_edit(self):
660 rid = str(uuid4())
661 with self.subTest(i=1):
662 self.auth.get_role_list.return_value = [
663 {"_id": rid, "name": self.test_name, "permissions": {}}
664 ]
665 new_name = str(uuid4())
666 with self.assertRaises(
667 EngineException, msg="Accepted uuid as role name"
668 ) as e:
669 self.topic.edit(self.fake_session, rid, {"name": new_name})
670 self.assertEqual(
671 e.exception.http_code,
672 HTTPStatus.UNPROCESSABLE_ENTITY,
673 "Wrong HTTP status code",
674 )
675 self.assertIn(
676 "role name '{}' cannot have an uuid format".format(new_name),
677 norm(str(e.exception)),
678 "Wrong exception text",
679 )
680 for i, role_name in enumerate(["system_admin", "project_admin"], start=2):
681 with self.subTest(i=i):
682 rid = str(uuid4())
683 self.auth.get_role.return_value = {
684 "_id": rid,
685 "name": role_name,
686 "permissions": {},
687 }
688 with self.assertRaises(
689 EngineException,
690 msg="Accepted renaming of role '{}'".format(role_name),
691 ) as e:
692 self.topic.edit(self.fake_session, rid, {"name": "new-name"})
693 self.assertEqual(
694 e.exception.http_code,
695 HTTPStatus.FORBIDDEN,
696 "Wrong HTTP status code",
697 )
698 self.assertIn(
699 "you cannot rename role '{}'".format(role_name),
700 norm(str(e.exception)),
701 "Wrong exception text",
702 )
703 with self.subTest(i=i + 1):
704 new_name = "new-role-name"
705 self.auth.get_role_list.side_effect = [
706 [{"_id": rid, "name": self.test_name, "permissions": {}}],
707 [{"_id": str(uuid4()), "name": new_name, "permissions": {}}],
708 ]
709 self.auth.get_role.return_value = {
710 "_id": rid,
711 "name": self.test_name,
712 "permissions": {},
713 }
714 with self.assertRaises(
715 EngineException, msg="Accepted existing role name"
716 ) as e:
717 self.topic.edit(self.fake_session, rid, {"name": new_name})
718 self.assertEqual(
719 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
720 )
721 self.assertIn(
722 "role name '{}' exists".format(new_name),
723 norm(str(e.exception)),
724 "Wrong exception text",
725 )
726
727 def test_conflict_on_del(self):
728 for i, role_name in enumerate(["system_admin", "project_admin"], start=1):
729 with self.subTest(i=i):
730 rid = str(uuid4())
731 role = {"_id": rid, "name": role_name}
732 self.auth.get_role_list.return_value = [role]
733 self.auth.get_role.return_value = role
734 with self.assertRaises(
735 EngineException,
736 msg="Accepted deletion of role '{}'".format(role_name),
737 ) as e:
738 self.topic.delete(self.fake_session, rid)
739 self.assertEqual(
740 e.exception.http_code,
741 HTTPStatus.FORBIDDEN,
742 "Wrong HTTP status code",
743 )
744 self.assertIn(
745 "you cannot delete role '{}'".format(role_name),
746 norm(str(e.exception)),
747 "Wrong exception text",
748 )
749 with self.subTest(i=i + 1):
750 rid = str(uuid4())
751 name = "other-role-name"
752 role = {"_id": rid, "name": name}
753 self.auth.get_role_list.return_value = [role]
754 self.auth.get_role.return_value = role
755 self.auth.get_user_list.return_value = [
756 {
757 "_id": str(uuid4()),
758 "username": self.test_name,
759 "project_role_mappings": [{"project": str(uuid4()), "role": rid}],
760 }
761 ]
762 with self.assertRaises(
763 EngineException, msg="Accepted deletion of used role"
764 ) as e:
765 self.topic.delete(self.fake_session, rid)
766 self.assertEqual(
767 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
768 )
769 self.assertIn(
770 "role '{}' ({}) is being used by user '{}'".format(
771 name, rid, self.test_name
772 ),
773 norm(str(e.exception)),
774 "Wrong exception text",
775 )
776
777
778 class Test_UserTopicAuth(TestCase):
779 @classmethod
780 def setUpClass(cls):
781 cls.test_name = "test-user-topic"
782 cls.password = "Test@123"
783
784 def setUp(self):
785 # self.db = Mock(dbbase.DbBase())
786 self.db = DbMemory()
787 self.fs = Mock(fsbase.FsBase())
788 self.msg = Mock(msgbase.MsgBase())
789 self.auth = Mock(authconn.Authconn(None, None, None))
790 self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth)
791 self.fake_session = {
792 "username": test_name,
793 "project_id": (test_pid,),
794 "method": None,
795 "admin": True,
796 "force": False,
797 "public": False,
798 "allow_show_user_project_role": True,
799 }
800 self.topic.check_quota = Mock(return_value=None) # skip quota
801
802 def test_new_user(self):
803 uid1 = str(uuid4())
804 pid = str(uuid4())
805 self.auth.get_user_list.return_value = []
806 self.auth.get_project.return_value = {"_id": pid, "name": "some_project"}
807 self.auth.create_user.return_value = {"_id": uid1, "username": self.test_name}
808 with self.subTest(i=1):
809 rollback = []
810 rid = str(uuid4())
811 self.auth.get_role.return_value = {"_id": rid, "name": "some_role"}
812 prms_in = [{"project": "some_project", "role": "some_role"}]
813 prms_out = [{"project": pid, "role": rid}]
814 uid2, oid = self.topic.new(
815 rollback,
816 self.fake_session,
817 {
818 "username": self.test_name,
819 "password": self.password,
820 "project_role_mappings": prms_in,
821 },
822 )
823 self.assertEqual(len(rollback), 1, "Wrong rollback length")
824 self.assertEqual(uid2, uid1, "Wrong project identifier")
825 content = self.auth.create_user.call_args[0][0]
826 self.assertEqual(content["username"], self.test_name, "Wrong project name")
827 self.assertEqual(content["password"], self.password, "Wrong password")
828 self.assertEqual(
829 content["project_role_mappings"],
830 prms_out,
831 "Wrong project-role mappings",
832 )
833 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
834 self.assertEqual(
835 content["_admin"]["modified"],
836 content["_admin"]["created"],
837 "Wrong modification time",
838 )
839 with self.subTest(i=2):
840 rollback = []
841 def_rid = str(uuid4())
842 def_role = {"_id": def_rid, "name": "project_admin"}
843 self.auth.get_role.return_value = def_role
844 self.auth.get_role_list.return_value = [def_role]
845 prms_out = [{"project": pid, "role": def_rid}]
846 uid2, oid = self.topic.new(
847 rollback,
848 self.fake_session,
849 {
850 "username": self.test_name,
851 "password": self.password,
852 "projects": ["some_project"],
853 },
854 )
855 self.assertEqual(len(rollback), 1, "Wrong rollback length")
856 self.assertEqual(uid2, uid1, "Wrong project identifier")
857 content = self.auth.create_user.call_args[0][0]
858 self.assertEqual(content["username"], self.test_name, "Wrong project name")
859 self.assertEqual(content["password"], self.password, "Wrong password")
860 self.assertEqual(
861 content["project_role_mappings"],
862 prms_out,
863 "Wrong project-role mappings",
864 )
865 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
866 self.assertEqual(
867 content["_admin"]["modified"],
868 content["_admin"]["created"],
869 "Wrong modification time",
870 )
871 with self.subTest(i=3):
872 rollback = []
873 with self.assertRaises(
874 EngineException, msg="Accepted wrong project-role mappings"
875 ) as e:
876 self.topic.new(
877 rollback,
878 self.fake_session,
879 {
880 "username": "other-project-name",
881 "password": "Other@pwd1",
882 "project_role_mappings": [{}],
883 },
884 )
885 self.assertEqual(len(rollback), 0, "Wrong rollback length")
886 self.assertEqual(
887 e.exception.http_code,
888 HTTPStatus.UNPROCESSABLE_ENTITY,
889 "Wrong HTTP status code",
890 )
891 self.assertIn(
892 "format error at '{}' '{}'".format(
893 "project_role_mappings:{}", "'{}' is a required property"
894 ).format(0, "project"),
895 norm(str(e.exception)),
896 "Wrong exception text",
897 )
898 with self.subTest(i=4):
899 rollback = []
900 with self.assertRaises(EngineException, msg="Accepted wrong projects") as e:
901 self.topic.new(
902 rollback,
903 self.fake_session,
904 {
905 "username": "other-project-name",
906 "password": "Other@pwd1",
907 "projects": [],
908 },
909 )
910 self.assertEqual(len(rollback), 0, "Wrong rollback length")
911 self.assertEqual(
912 e.exception.http_code,
913 HTTPStatus.UNPROCESSABLE_ENTITY,
914 "Wrong HTTP status code",
915 )
916 self.assertIn(
917 "format error at '{}' '{}'".format(
918 "projects", "{} is too short"
919 ).format([]),
920 norm(str(e.exception)),
921 "Wrong exception text",
922 )
923
924 def test_edit_user(self):
925 now = time()
926 uid = str(uuid4())
927 pid1 = str(uuid4())
928 rid1 = str(uuid4())
929 prms = [
930 {
931 "project": pid1,
932 "project_name": "project-1",
933 "role": rid1,
934 "role_name": "role-1",
935 }
936 ]
937 user = {
938 "_id": uid,
939 "username": self.test_name,
940 "project_role_mappings": prms,
941 "_admin": {"created": now, "modified": now},
942 }
943 with self.subTest(i=1):
944 self.auth.get_user_list.side_effect = [[user], []]
945 self.auth.get_user.return_value = user
946 pid2 = str(uuid4())
947 rid2 = str(uuid4())
948 self.auth.get_project.side_effect = [
949 {"_id": pid2, "name": "project-2"},
950 {"_id": pid1, "name": "project-1"},
951 ]
952 self.auth.get_role.side_effect = [
953 {"_id": rid2, "name": "role-2"},
954 {"_id": rid1, "name": "role-1"},
955 ]
956 new_name = "new-user-name"
957 new_pasw = "New@pwd1"
958 add_prms = [{"project": pid2, "role": rid2}]
959 rem_prms = [{"project": pid1, "role": rid1}]
960 self.topic.edit(
961 self.fake_session,
962 uid,
963 {
964 "username": new_name,
965 "password": new_pasw,
966 "add_project_role_mappings": add_prms,
967 "remove_project_role_mappings": rem_prms,
968 },
969 )
970 content = self.auth.update_user.call_args[0][0]
971 self.assertEqual(content["_id"], uid, "Wrong user identifier")
972 self.assertEqual(content["username"], new_name, "Wrong user name")
973 self.assertEqual(content["password"], new_pasw, "Wrong user password")
974 self.assertEqual(
975 content["add_project_role_mappings"],
976 add_prms,
977 "Wrong project-role mappings to add",
978 )
979 self.assertEqual(
980 content["remove_project_role_mappings"],
981 prms,
982 "Wrong project-role mappings to remove",
983 )
984 with self.subTest(i=2):
985 new_name = "other-user-name"
986 new_prms = [{}]
987 self.auth.get_role_list.side_effect = [[user], []]
988 self.auth.get_user_list.side_effect = [[user]]
989 with self.assertRaises(
990 EngineException, msg="Accepted wrong project-role mappings"
991 ) as e:
992 self.topic.edit(
993 self.fake_session,
994 uid,
995 {"username": new_name, "project_role_mappings": new_prms},
996 )
997 self.assertEqual(
998 e.exception.http_code,
999 HTTPStatus.UNPROCESSABLE_ENTITY,
1000 "Wrong HTTP status code",
1001 )
1002 self.assertIn(
1003 "format error at '{}' '{}'".format(
1004 "project_role_mappings:{}", "'{}' is a required property"
1005 ).format(0, "project"),
1006 norm(str(e.exception)),
1007 "Wrong exception text",
1008 )
1009 with self.subTest(i=3):
1010 self.auth.get_user_list.side_effect = [[user], []]
1011 self.auth.get_user.return_value = user
1012 old_password = self.password
1013 new_pasw = "New@pwd1"
1014 self.topic.edit(
1015 self.fake_session,
1016 uid,
1017 {
1018 "old_password": old_password,
1019 "password": new_pasw,
1020 },
1021 )
1022 content = self.auth.update_user.call_args[0][0]
1023 self.assertEqual(
1024 content["old_password"], old_password, "Wrong old password"
1025 )
1026 self.assertEqual(content["password"], new_pasw, "Wrong user password")
1027
1028 def test_delete_user(self):
1029 with self.subTest(i=1):
1030 uid = str(uuid4())
1031 self.fake_session["username"] = self.test_name
1032 user = user = {
1033 "_id": uid,
1034 "username": "other-user-name",
1035 "project_role_mappings": [],
1036 }
1037 self.auth.get_user.return_value = user
1038 self.auth.delete_user.return_value = {"deleted": 1}
1039 rc = self.topic.delete(self.fake_session, uid)
1040 self.assertEqual(rc, {"deleted": 1}, "Wrong user deletion return info")
1041 self.assertEqual(
1042 self.auth.get_user.call_args[0][0], uid, "Wrong user identifier"
1043 )
1044 self.assertEqual(
1045 self.auth.delete_user.call_args[0][0], uid, "Wrong user identifier"
1046 )
1047
1048 def test_conflict_on_new(self):
1049 with self.subTest(i=1):
1050 rollback = []
1051 uid = str(uuid4())
1052 with self.assertRaises(
1053 EngineException, msg="Accepted uuid as username"
1054 ) as e:
1055 self.topic.new(
1056 rollback,
1057 self.fake_session,
1058 {
1059 "username": uid,
1060 "password": self.password,
1061 "projects": [test_pid],
1062 },
1063 )
1064 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1065 self.assertEqual(
1066 e.exception.http_code,
1067 HTTPStatus.UNPROCESSABLE_ENTITY,
1068 "Wrong HTTP status code",
1069 )
1070 self.assertIn(
1071 "username '{}' cannot have a uuid format".format(uid),
1072 norm(str(e.exception)),
1073 "Wrong exception text",
1074 )
1075 with self.subTest(i=2):
1076 rollback = []
1077 self.auth.get_user_list.return_value = [
1078 {"_id": str(uuid4()), "username": self.test_name}
1079 ]
1080 with self.assertRaises(
1081 EngineException, msg="Accepted existing username"
1082 ) as e:
1083 self.topic.new(
1084 rollback,
1085 self.fake_session,
1086 {
1087 "username": self.test_name,
1088 "password": self.password,
1089 "projects": [test_pid],
1090 },
1091 )
1092 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1093 self.assertEqual(
1094 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1095 )
1096 self.assertIn(
1097 "username '{}' is already used".format(self.test_name),
1098 norm(str(e.exception)),
1099 "Wrong exception text",
1100 )
1101 with self.subTest(i=3):
1102 rollback = []
1103 self.auth.get_user_list.return_value = []
1104 self.auth.get_role_list.side_effect = [[], []]
1105 with self.assertRaises(
1106 AuthconnNotFoundException, msg="Accepted user without default role"
1107 ) as e:
1108 self.topic.new(
1109 rollback,
1110 self.fake_session,
1111 {
1112 "username": self.test_name,
1113 "password": self.password,
1114 "projects": [str(uuid4())],
1115 },
1116 )
1117 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1118 self.assertEqual(
1119 e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code"
1120 )
1121 self.assertIn(
1122 "can't find default role for user '{}'".format(self.test_name),
1123 norm(str(e.exception)),
1124 "Wrong exception text",
1125 )
1126
1127 def test_conflict_on_edit(self):
1128 uid = str(uuid4())
1129 with self.subTest(i=1):
1130 self.auth.get_user_list.return_value = [
1131 {"_id": uid, "username": self.test_name}
1132 ]
1133 new_name = str(uuid4())
1134 with self.assertRaises(
1135 EngineException, msg="Accepted uuid as username"
1136 ) as e:
1137 self.topic.edit(self.fake_session, uid, {"username": new_name})
1138 self.assertEqual(
1139 e.exception.http_code,
1140 HTTPStatus.UNPROCESSABLE_ENTITY,
1141 "Wrong HTTP status code",
1142 )
1143 self.assertIn(
1144 "username '{}' cannot have an uuid format".format(new_name),
1145 norm(str(e.exception)),
1146 "Wrong exception text",
1147 )
1148 with self.subTest(i=2):
1149 self.auth.get_user_list.return_value = [
1150 {"_id": uid, "username": self.test_name}
1151 ]
1152 self.auth.get_role_list.side_effect = [[], []]
1153 with self.assertRaises(
1154 AuthconnNotFoundException, msg="Accepted user without default role"
1155 ) as e:
1156 self.topic.edit(self.fake_session, uid, {"projects": [str(uuid4())]})
1157 self.assertEqual(
1158 e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code"
1159 )
1160 self.assertIn(
1161 "can't find a default role for user '{}'".format(self.test_name),
1162 norm(str(e.exception)),
1163 "Wrong exception text",
1164 )
1165 with self.subTest(i=3):
1166 admin_uid = str(uuid4())
1167 self.auth.get_user_list.return_value = [
1168 {"_id": admin_uid, "username": "admin"}
1169 ]
1170 with self.assertRaises(
1171 EngineException,
1172 msg="Accepted removing system_admin role from admin user",
1173 ) as e:
1174 self.topic.edit(
1175 self.fake_session,
1176 admin_uid,
1177 {
1178 "remove_project_role_mappings": [
1179 {"project": "admin", "role": "system_admin"}
1180 ]
1181 },
1182 )
1183 self.assertEqual(
1184 e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code"
1185 )
1186 self.assertIn(
1187 "you cannot remove system_admin role from admin user",
1188 norm(str(e.exception)),
1189 "Wrong exception text",
1190 )
1191 with self.subTest(i=4):
1192 new_name = "new-user-name"
1193 self.auth.get_user_list.side_effect = [
1194 [{"_id": uid, "name": self.test_name}],
1195 [{"_id": str(uuid4()), "name": new_name}],
1196 ]
1197 with self.assertRaises(
1198 EngineException, msg="Accepted existing username"
1199 ) as e:
1200 self.topic.edit(self.fake_session, uid, {"username": new_name})
1201 self.assertEqual(
1202 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1203 )
1204 self.assertIn(
1205 "username '{}' is already used".format(new_name),
1206 norm(str(e.exception)),
1207 "Wrong exception text",
1208 )
1209
1210 def test_conflict_on_del(self):
1211 with self.subTest(i=1):
1212 uid = str(uuid4())
1213 self.fake_session["username"] = self.test_name
1214 user = user = {
1215 "_id": uid,
1216 "username": self.test_name,
1217 "project_role_mappings": [],
1218 }
1219 self.auth.get_user.return_value = user
1220 with self.assertRaises(
1221 EngineException, msg="Accepted deletion of own user"
1222 ) as e:
1223 self.topic.delete(self.fake_session, uid)
1224 self.assertEqual(
1225 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1226 )
1227 self.assertIn(
1228 "you cannot delete your own login user",
1229 norm(str(e.exception)),
1230 "Wrong exception text",
1231 )
1232
1233 def test_user_management(self):
1234 self.config = {
1235 "user_management": True,
1236 "pwd_expire_days": 30,
1237 "max_pwd_attempt": 5,
1238 "account_expire_days": 90,
1239 }
1240 self.permissions = {"admin": True, "default": True}
1241 now = time()
1242 rid = str(uuid4())
1243 role = {
1244 "_id": rid,
1245 "name": self.test_name,
1246 "permissions": self.permissions,
1247 "_admin": {"created": now, "modified": now},
1248 }
1249 self.db.create("roles", role)
1250 admin_user = {
1251 "_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500",
1252 "username": "admin",
1253 "password": "bf0d9f988ad9b404464cf8c8749b298209b05fd404119bae0c11e247efbbc4cb",
1254 "_admin": {
1255 "created": 1663058370.7721832,
1256 "modified": 1663681183.5651639,
1257 "salt": "37587e7e0c2f4dbfb9416f3fb5543e2b",
1258 "last_token_time": 1666876472.2962265,
1259 "user_status": "always-active",
1260 "retry_count": 0,
1261 },
1262 "project_role_mappings": [
1263 {"project": "a595ce4e-09dc-4b24-9d6f-e723830bc66b", "role": rid}
1264 ],
1265 }
1266 self.db.create("users", admin_user)
1267 with self.subTest(i=1):
1268 self.user_create = AuthconnInternal(self.config, self.db, self.permissions)
1269 user_info = {"username": "user_mgmt_true", "password": "Test@123"}
1270 self.user_create.create_user(user_info)
1271 user = self.db.get_one("users", {"username": user_info["username"]})
1272 self.assertEqual(user["username"], user_info["username"], "Wrong user name")
1273 self.assertEqual(
1274 user["_admin"]["user_status"], "active", "User status is unknown"
1275 )
1276 self.assertIn("password_expire_time", user["_admin"], "Key is not there")
1277 self.assertIn("account_expire_time", user["_admin"], "Key is not there")
1278 with self.subTest(i=2):
1279 self.user_update = AuthconnInternal(self.config, self.db, self.permissions)
1280 locked_user = {
1281 "username": "user_lock",
1282 "password": "c94ba8cfe81985cf5c84dff16d5bac95814ab17e44a8871755eb4cf3a27b7d3d",
1283 "_admin": {
1284 "created": 1667207552.2191198,
1285 "modified": 1667207552.2191815,
1286 "salt": "560a5d51b1d64bb4b9cae0ccff3f1102",
1287 "user_status": "locked",
1288 "password_expire_time": 1667207552.2191815,
1289 "account_expire_time": 1674983552.2191815,
1290 "retry_count": 5,
1291 "last_token_time": 1667207552.2191815,
1292 },
1293 "_id": "73bbbb71-ed38-4b79-9f58-ece19e7e32d6",
1294 }
1295 self.db.create("users", locked_user)
1296 user_info = {
1297 "_id": "73bbbb71-ed38-4b79-9f58-ece19e7e32d6",
1298 "system_admin_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500",
1299 "unlock": True,
1300 }
1301 self.assertEqual(
1302 locked_user["_admin"]["user_status"], "locked", "User status is unknown"
1303 )
1304 self.user_update.update_user(user_info)
1305 user = self.db.get_one("users", {"username": locked_user["username"]})
1306 self.assertEqual(
1307 user["username"], locked_user["username"], "Wrong user name"
1308 )
1309 self.assertEqual(
1310 user["_admin"]["user_status"], "active", "User status is unknown"
1311 )
1312 self.assertEqual(user["_admin"]["retry_count"], 0, "retry_count is unknown")
1313 with self.subTest(i=3):
1314 self.user_update = AuthconnInternal(self.config, self.db, self.permissions)
1315 expired_user = {
1316 "username": "user_expire",
1317 "password": "c94ba8cfe81985cf5c84dff16d5bac95814ab17e44a8871755eb4cf3a27b7d3d",
1318 "_admin": {
1319 "created": 1665602087.601298,
1320 "modified": 1665636442.1245084,
1321 "salt": "560a5d51b1d64bb4b9cae0ccff3f1102",
1322 "user_status": "expired",
1323 "password_expire_time": 1668248628.2191815,
1324 "account_expire_time": 1666952628.2191815,
1325 "retry_count": 0,
1326 "last_token_time": 1666779828.2171815,
1327 },
1328 "_id": "3266430f-8222-407f-b08f-3a242504ab94",
1329 }
1330 self.db.create("users", expired_user)
1331 user_info = {
1332 "_id": "3266430f-8222-407f-b08f-3a242504ab94",
1333 "system_admin_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500",
1334 "renew": True,
1335 }
1336 self.assertEqual(
1337 expired_user["_admin"]["user_status"],
1338 "expired",
1339 "User status is unknown",
1340 )
1341 self.user_update.update_user(user_info)
1342 user = self.db.get_one("users", {"username": expired_user["username"]})
1343 self.assertEqual(
1344 user["username"], expired_user["username"], "Wrong user name"
1345 )
1346 self.assertEqual(
1347 user["_admin"]["user_status"], "active", "User status is unknown"
1348 )
1349 self.assertGreater(
1350 user["_admin"]["account_expire_time"],
1351 expired_user["_admin"]["account_expire_time"],
1352 "User expire time is not get extended",
1353 )
1354 with self.subTest(i=4):
1355 self.config.update({"user_management": False})
1356 self.user_create = AuthconnInternal(self.config, self.db, self.permissions)
1357 user_info = {"username": "user_mgmt_false", "password": "Test@123"}
1358 self.user_create.create_user(user_info)
1359 user = self.db.get_one("users", {"username": user_info["username"]})
1360 self.assertEqual(user["username"], user_info["username"], "Wrong user name")
1361 self.assertEqual(
1362 user["_admin"]["user_status"], "active", "User status is unknown"
1363 )
1364 self.assertNotIn("password_expire_time", user["_admin"], "Key is not there")
1365 self.assertNotIn("account_expire_time", user["_admin"], "Key is not there")
1366
1367
1368 class Test_CommonVimWimSdn(TestCase):
1369 @classmethod
1370 def setUpClass(cls):
1371 cls.test_name = "test-cim-topic" # CIM = Common Infrastructure Manager
1372
1373 def setUp(self):
1374 self.db = Mock(dbbase.DbBase())
1375 self.fs = Mock(fsbase.FsBase())
1376 self.msg = Mock(msgbase.MsgBase())
1377 self.auth = Mock(authconn.Authconn(None, None, None))
1378 self.topic = CommonVimWimSdn(self.db, self.fs, self.msg, self.auth)
1379 # Use WIM schemas for testing because they are the simplest
1380 self.topic._send_msg = Mock()
1381 self.topic.topic = "wims"
1382 self.topic.schema_new = validation.wim_account_new_schema
1383 self.topic.schema_edit = validation.wim_account_edit_schema
1384 self.fake_session = {
1385 "username": test_name,
1386 "project_id": (test_pid,),
1387 "method": None,
1388 "admin": True,
1389 "force": False,
1390 "public": False,
1391 "allow_show_user_project_role": True,
1392 }
1393 self.topic.check_quota = Mock(return_value=None) # skip quota
1394
1395 def test_new_cvws(self):
1396 test_url = "http://0.0.0.0:0"
1397 with self.subTest(i=1):
1398 rollback = []
1399 test_type = "fake"
1400 self.db.get_one.return_value = None
1401 self.db.create.side_effect = lambda self, content: content["_id"]
1402 cid, oid = self.topic.new(
1403 rollback,
1404 self.fake_session,
1405 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type},
1406 )
1407 self.assertEqual(len(rollback), 1, "Wrong rollback length")
1408 args = self.db.create.call_args[0]
1409 content = args[1]
1410 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
1411 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
1412 self.assertEqual(content["name"], self.test_name, "Wrong CIM name")
1413 self.assertEqual(content["wim_url"], test_url, "Wrong URL")
1414 self.assertEqual(content["wim_type"], test_type, "Wrong CIM type")
1415 self.assertEqual(content["schema_version"], "1.11", "Wrong schema version")
1416 self.assertEqual(content["op_id"], oid, "Wrong operation identifier")
1417 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
1418 self.assertEqual(
1419 content["_admin"]["modified"],
1420 content["_admin"]["created"],
1421 "Wrong modification time",
1422 )
1423 self.assertEqual(
1424 content["_admin"]["operationalState"],
1425 "PROCESSING",
1426 "Wrong operational state",
1427 )
1428 self.assertEqual(
1429 content["_admin"]["projects_read"],
1430 [test_pid],
1431 "Wrong read-only projects",
1432 )
1433 self.assertEqual(
1434 content["_admin"]["projects_write"],
1435 [test_pid],
1436 "Wrong read/write projects",
1437 )
1438 self.assertIsNone(
1439 content["_admin"]["current_operation"], "Wrong current operation"
1440 )
1441 self.assertEqual(
1442 len(content["_admin"]["operations"]), 1, "Wrong number of operations"
1443 )
1444 operation = content["_admin"]["operations"][0]
1445 self.assertEqual(
1446 operation["lcmOperationType"], "create", "Wrong operation type"
1447 )
1448 self.assertEqual(
1449 operation["operationState"], "PROCESSING", "Wrong operation state"
1450 )
1451 self.assertGreater(
1452 operation["startTime"],
1453 content["_admin"]["created"],
1454 "Wrong operation start time",
1455 )
1456 self.assertGreater(
1457 operation["statusEnteredTime"],
1458 content["_admin"]["created"],
1459 "Wrong operation status enter time",
1460 )
1461 self.assertEqual(
1462 operation["detailed-status"], "", "Wrong operation detailed status info"
1463 )
1464 self.assertIsNone(
1465 operation["operationParams"], "Wrong operation parameters"
1466 )
1467 # This test is disabled. From Feature 8030 we admit all WIM/SDN types
1468 # with self.subTest(i=2):
1469 # rollback = []
1470 # test_type = "bad_type"
1471 # with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e:
1472 # self.topic.new(rollback, self.fake_session,
1473 # {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
1474 # self.assertEqual(len(rollback), 0, "Wrong rollback length")
1475 # self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
1476 # self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type,""),
1477 # norm(str(e.exception)), "Wrong exception text")
1478
1479 def test_conflict_on_new(self):
1480 with self.subTest(i=1):
1481 rollback = []
1482 test_url = "http://0.0.0.0:0"
1483 test_type = "fake"
1484 self.db.get_one.return_value = {"_id": str(uuid4()), "name": self.test_name}
1485 with self.assertRaises(
1486 EngineException, msg="Accepted existing CIM name"
1487 ) as e:
1488 self.topic.new(
1489 rollback,
1490 self.fake_session,
1491 {
1492 "name": self.test_name,
1493 "wim_url": test_url,
1494 "wim_type": test_type,
1495 },
1496 )
1497 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1498 self.assertEqual(
1499 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1500 )
1501 self.assertIn(
1502 "name '{}' already exists for {}".format(
1503 self.test_name, self.topic.topic
1504 ),
1505 norm(str(e.exception)),
1506 "Wrong exception text",
1507 )
1508
1509 def test_edit_cvws(self):
1510 now = time()
1511 cid = str(uuid4())
1512 test_url = "http://0.0.0.0:0"
1513 test_type = "fake"
1514 cvws = {
1515 "_id": cid,
1516 "name": self.test_name,
1517 "wim_url": test_url,
1518 "wim_type": test_type,
1519 "_admin": {
1520 "created": now,
1521 "modified": now,
1522 "operations": [{"lcmOperationType": "create"}],
1523 },
1524 }
1525 with self.subTest(i=1):
1526 new_name = "new-cim-name"
1527 new_url = "https://1.1.1.1:1"
1528 new_type = "onos"
1529 self.db.get_one.side_effect = [cvws, None]
1530 self.db.replace.return_value = {"updated": 1}
1531 # self.db.encrypt.side_effect = [b64str(), b64str()]
1532 self.topic.edit(
1533 self.fake_session,
1534 cid,
1535 {"name": new_name, "wim_url": new_url, "wim_type": new_type},
1536 )
1537 args = self.db.replace.call_args[0]
1538 content = args[2]
1539 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
1540 self.assertEqual(args[1], cid, "Wrong CIM identifier")
1541 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
1542 self.assertEqual(content["name"], new_name, "Wrong CIM name")
1543 self.assertEqual(content["wim_type"], new_type, "Wrong CIM type")
1544 self.assertEqual(content["wim_url"], new_url, "Wrong URL")
1545 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
1546 self.assertGreater(
1547 content["_admin"]["modified"],
1548 content["_admin"]["created"],
1549 "Wrong modification time",
1550 )
1551 self.assertEqual(
1552 len(content["_admin"]["operations"]), 2, "Wrong number of operations"
1553 )
1554 operation = content["_admin"]["operations"][1]
1555 self.assertEqual(
1556 operation["lcmOperationType"], "edit", "Wrong operation type"
1557 )
1558 self.assertEqual(
1559 operation["operationState"], "PROCESSING", "Wrong operation state"
1560 )
1561 self.assertGreater(
1562 operation["startTime"],
1563 content["_admin"]["modified"],
1564 "Wrong operation start time",
1565 )
1566 self.assertGreater(
1567 operation["statusEnteredTime"],
1568 content["_admin"]["modified"],
1569 "Wrong operation status enter time",
1570 )
1571 self.assertEqual(
1572 operation["detailed-status"], "", "Wrong operation detailed status info"
1573 )
1574 self.assertIsNone(
1575 operation["operationParams"], "Wrong operation parameters"
1576 )
1577 with self.subTest(i=2):
1578 self.db.get_one.side_effect = [cvws]
1579 with self.assertRaises(EngineException, msg="Accepted wrong property") as e:
1580 self.topic.edit(
1581 self.fake_session,
1582 str(uuid4()),
1583 {"name": "new-name", "extra_prop": "anything"},
1584 )
1585 self.assertEqual(
1586 e.exception.http_code,
1587 HTTPStatus.UNPROCESSABLE_ENTITY,
1588 "Wrong HTTP status code",
1589 )
1590 self.assertIn(
1591 "format error '{}'".format(
1592 "additional properties are not allowed ('{}' was unexpected)"
1593 ).format("extra_prop"),
1594 norm(str(e.exception)),
1595 "Wrong exception text",
1596 )
1597
1598 def test_conflict_on_edit(self):
1599 with self.subTest(i=1):
1600 cid = str(uuid4())
1601 new_name = "new-cim-name"
1602 self.db.get_one.side_effect = [
1603 {"_id": cid, "name": self.test_name},
1604 {"_id": str(uuid4()), "name": new_name},
1605 ]
1606 with self.assertRaises(
1607 EngineException, msg="Accepted existing CIM name"
1608 ) as e:
1609 self.topic.edit(self.fake_session, cid, {"name": new_name})
1610 self.assertEqual(
1611 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1612 )
1613 self.assertIn(
1614 "name '{}' already exists for {}".format(new_name, self.topic.topic),
1615 norm(str(e.exception)),
1616 "Wrong exception text",
1617 )
1618
1619 def test_delete_cvws(self):
1620 cid = str(uuid4())
1621 ro_pid = str(uuid4())
1622 rw_pid = str(uuid4())
1623 cvws = {"_id": cid, "name": self.test_name}
1624 self.db.get_list.return_value = []
1625 with self.subTest(i=1):
1626 cvws["_admin"] = {
1627 "projects_read": [test_pid, ro_pid, rw_pid],
1628 "projects_write": [test_pid, rw_pid],
1629 }
1630 self.db.get_one.return_value = cvws
1631 oid = self.topic.delete(self.fake_session, cid)
1632 self.assertIsNone(oid, "Wrong operation identifier")
1633 self.assertEqual(
1634 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1635 )
1636 self.assertEqual(
1637 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1638 )
1639 self.assertEqual(
1640 self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic"
1641 )
1642 self.assertEqual(
1643 self.db.set_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1644 )
1645 self.assertEqual(
1646 self.db.set_one.call_args[1]["update_dict"],
1647 None,
1648 "Wrong read-only projects update",
1649 )
1650 self.assertEqual(
1651 self.db.set_one.call_args[1]["pull_list"],
1652 {
1653 "_admin.projects_read": (test_pid,),
1654 "_admin.projects_write": (test_pid,),
1655 },
1656 "Wrong read/write projects update",
1657 )
1658 self.topic._send_msg.assert_not_called()
1659 with self.subTest(i=2):
1660 now = time()
1661 cvws["_admin"] = {
1662 "projects_read": [test_pid],
1663 "projects_write": [test_pid],
1664 "operations": [],
1665 }
1666 self.db.get_one.return_value = cvws
1667 oid = self.topic.delete(self.fake_session, cid)
1668 self.assertEqual(oid, cid + ":0", "Wrong operation identifier")
1669 self.assertEqual(
1670 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1671 )
1672 self.assertEqual(
1673 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1674 )
1675 self.assertEqual(
1676 self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic"
1677 )
1678 self.assertEqual(
1679 self.db.set_one.call_args[0][1]["_id"], cid, "Wrong user identifier"
1680 )
1681 self.assertEqual(
1682 self.db.set_one.call_args[1]["update_dict"],
1683 {"_admin.to_delete": True},
1684 "Wrong _admin.to_delete update",
1685 )
1686 operation = self.db.set_one.call_args[1]["push"]["_admin.operations"]
1687 self.assertEqual(
1688 operation["lcmOperationType"], "delete", "Wrong operation type"
1689 )
1690 self.assertEqual(
1691 operation["operationState"], "PROCESSING", "Wrong operation state"
1692 )
1693 self.assertEqual(
1694 operation["detailed-status"], "", "Wrong operation detailed status"
1695 )
1696 self.assertIsNone(
1697 operation["operationParams"], "Wrong operation parameters"
1698 )
1699 self.assertGreater(
1700 operation["startTime"], now, "Wrong operation start time"
1701 )
1702 self.assertGreater(
1703 operation["statusEnteredTime"], now, "Wrong operation status enter time"
1704 )
1705 self.topic._send_msg.assert_called_once_with(
1706 "delete", {"_id": cid, "op_id": cid + ":0"}, not_send_msg=None
1707 )
1708 with self.subTest(i=3):
1709 cvws["_admin"] = {
1710 "projects_read": [],
1711 "projects_write": [],
1712 "operations": [],
1713 }
1714 self.db.get_one.return_value = cvws
1715 self.topic._send_msg.reset_mock()
1716 self.db.get_one.reset_mock()
1717 self.db.del_one.reset_mock()
1718 self.fake_session["force"] = True # to force deletion
1719 self.fake_session["admin"] = True # to force deletion
1720 self.fake_session["project_id"] = [] # to force deletion
1721 oid = self.topic.delete(self.fake_session, cid)
1722 self.assertIsNone(oid, "Wrong operation identifier")
1723 self.assertEqual(
1724 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1725 )
1726 self.assertEqual(
1727 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1728 )
1729 self.assertEqual(
1730 self.db.del_one.call_args[0][0], self.topic.topic, "Wrong topic"
1731 )
1732 self.assertEqual(
1733 self.db.del_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1734 )
1735 self.topic._send_msg.assert_called_once_with(
1736 "deleted", {"_id": cid, "op_id": None}, not_send_msg=None
1737 )
1738
1739
1740 if __name__ == "__main__":
1741 unittest.main()