Bug 1830 fixed: maps completed operations to original operation types
[osm/NBI.git] / osm_nbi / tests / test_admin_topics.py
1 #! /usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 __author__ = "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com"
18 __date__ = "$2019-10-019"
19
20 import unittest
21 import random
22 from unittest import TestCase
23 from unittest.mock import Mock, patch, call
24 from uuid import uuid4
25 from http import HTTPStatus
26 from time import time
27 from osm_common import dbbase, fsbase, msgbase
28 from osm_common.dbmemory import DbMemory
29 from osm_nbi import authconn, validation
30 from osm_nbi.admin_topics import (
31 ProjectTopicAuth,
32 RoleTopicAuth,
33 UserTopicAuth,
34 CommonVimWimSdn,
35 VcaTopic,
36 )
37 from osm_nbi.engine import EngineException
38 from osm_nbi.authconn import AuthconnNotFoundException
39 from osm_nbi.authconn_internal import AuthconnInternal
40
41
42 test_pid = str(uuid4())
43 test_name = "test-user"
44
45
46 def norm(str):
47 """Normalize string for checking"""
48 return " ".join(str.strip().split()).lower()
49
50
51 class TestVcaTopic(TestCase):
52 def setUp(self):
53 self.db = Mock(dbbase.DbBase())
54 self.fs = Mock(fsbase.FsBase())
55 self.msg = Mock(msgbase.MsgBase())
56 self.auth = Mock(authconn.Authconn(None, None, None))
57 self.vca_topic = VcaTopic(self.db, self.fs, self.msg, self.auth)
58
59 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_new")
60 def test_format_on_new(self, mock_super_format_on_new):
61 content = {
62 "_id": "id",
63 "secret": "encrypted_secret",
64 "cacert": "encrypted_cacert",
65 }
66 self.db.encrypt.side_effect = ["secret", "cacert"]
67 mock_super_format_on_new.return_value = "1234"
68
69 oid = self.vca_topic.format_on_new(content)
70
71 self.assertEqual(oid, "1234")
72 self.assertEqual(content["secret"], "secret")
73 self.assertEqual(content["cacert"], "cacert")
74 self.db.encrypt.assert_has_calls(
75 [
76 call("encrypted_secret", schema_version="1.11", salt="id"),
77 call("encrypted_cacert", schema_version="1.11", salt="id"),
78 ]
79 )
80 mock_super_format_on_new.assert_called_with(content, None, False)
81
82 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_edit")
83 def test_format_on_edit(self, mock_super_format_on_edit):
84 edit_content = {
85 "_id": "id",
86 "secret": "encrypted_secret",
87 "cacert": "encrypted_cacert",
88 }
89 final_content = {
90 "_id": "id",
91 "schema_version": "1.11",
92 }
93 self.db.encrypt.side_effect = ["secret", "cacert"]
94 mock_super_format_on_edit.return_value = "1234"
95
96 oid = self.vca_topic.format_on_edit(final_content, edit_content)
97
98 self.assertEqual(oid, "1234")
99 self.assertEqual(final_content["secret"], "secret")
100 self.assertEqual(final_content["cacert"], "cacert")
101 self.db.encrypt.assert_has_calls(
102 [
103 call("encrypted_secret", schema_version="1.11", salt="id"),
104 call("encrypted_cacert", schema_version="1.11", salt="id"),
105 ]
106 )
107 mock_super_format_on_edit.assert_called()
108
109 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
110 def test_check_conflict_on_del(self, mock_check_conflict_on_del):
111 session = {
112 "project_id": "project-id",
113 "force": False,
114 }
115 _id = "vca-id"
116 db_content = {}
117
118 self.db.get_list.return_value = None
119
120 self.vca_topic.check_conflict_on_del(session, _id, db_content)
121
122 self.db.get_list.assert_called_with(
123 "vim_accounts",
124 {"vca": _id, "_admin.projects_read.cont": "project-id"},
125 )
126 mock_check_conflict_on_del.assert_called_with(session, _id, db_content)
127
128 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
129 def test_check_conflict_on_del_force(self, mock_check_conflict_on_del):
130 session = {
131 "project_id": "project-id",
132 "force": True,
133 }
134 _id = "vca-id"
135 db_content = {}
136
137 self.vca_topic.check_conflict_on_del(session, _id, db_content)
138
139 self.db.get_list.assert_not_called()
140 mock_check_conflict_on_del.assert_not_called()
141
142 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
143 def test_check_conflict_on_del_with_conflict(self, mock_check_conflict_on_del):
144 session = {
145 "project_id": "project-id",
146 "force": False,
147 }
148 _id = "vca-id"
149 db_content = {}
150
151 self.db.get_list.return_value = {"_id": "vim", "vca": "vca-id"}
152
153 with self.assertRaises(EngineException) as context:
154 self.vca_topic.check_conflict_on_del(session, _id, db_content)
155 self.assertEqual(
156 context.exception,
157 EngineException(
158 "There is at least one VIM account using this vca",
159 http_code=HTTPStatus.CONFLICT,
160 ),
161 )
162
163 self.db.get_list.assert_called_with(
164 "vim_accounts",
165 {"vca": _id, "_admin.projects_read.cont": "project-id"},
166 )
167 mock_check_conflict_on_del.assert_not_called()
168
169
170 class Test_ProjectTopicAuth(TestCase):
171 @classmethod
172 def setUpClass(cls):
173 cls.test_name = "test-project-topic"
174
175 def setUp(self):
176 self.db = Mock(dbbase.DbBase())
177 self.fs = Mock(fsbase.FsBase())
178 self.msg = Mock(msgbase.MsgBase())
179 self.auth = Mock(authconn.Authconn(None, None, None))
180 self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth)
181 self.fake_session = {
182 "username": self.test_name,
183 "project_id": (test_pid,),
184 "method": None,
185 "admin": True,
186 "force": False,
187 "public": False,
188 "allow_show_user_project_role": True,
189 }
190 self.topic.check_quota = Mock(return_value=None) # skip quota
191
192 def test_new_project(self):
193 with self.subTest(i=1):
194 rollback = []
195 pid1 = str(uuid4())
196 self.auth.get_project_list.return_value = []
197 self.auth.create_project.return_value = pid1
198 pid2, oid = self.topic.new(
199 rollback, self.fake_session, {"name": self.test_name, "quotas": {}}
200 )
201 self.assertEqual(len(rollback), 1, "Wrong rollback length")
202 self.assertEqual(pid2, pid1, "Wrong project identifier")
203 content = self.auth.create_project.call_args[0][0]
204 self.assertEqual(content["name"], self.test_name, "Wrong project name")
205 self.assertEqual(content["quotas"], {}, "Wrong quotas")
206 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
207 self.assertEqual(
208 content["_admin"]["modified"],
209 content["_admin"]["created"],
210 "Wrong modification time",
211 )
212 with self.subTest(i=2):
213 rollback = []
214 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
215 self.topic.new(
216 rollback,
217 self.fake_session,
218 {"name": "other-project-name", "quotas": {"baditems": 10}},
219 )
220 self.assertEqual(len(rollback), 0, "Wrong rollback length")
221 self.assertEqual(
222 e.exception.http_code,
223 HTTPStatus.UNPROCESSABLE_ENTITY,
224 "Wrong HTTP status code",
225 )
226 self.assertIn(
227 "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format(
228 "baditems"
229 ),
230 norm(str(e.exception)),
231 "Wrong exception text",
232 )
233
234 def test_edit_project(self):
235 now = time()
236 pid = str(uuid4())
237 proj = {
238 "_id": pid,
239 "name": self.test_name,
240 "_admin": {"created": now, "modified": now},
241 }
242 with self.subTest(i=1):
243 self.auth.get_project_list.side_effect = [[proj], []]
244 new_name = "new-project-name"
245 quotas = {
246 "vnfds": random.SystemRandom().randint(0, 100),
247 "nsds": random.SystemRandom().randint(0, 100),
248 }
249 self.topic.edit(
250 self.fake_session, pid, {"name": new_name, "quotas": quotas}
251 )
252 _id, content = self.auth.update_project.call_args[0]
253 self.assertEqual(_id, pid, "Wrong project identifier")
254 self.assertEqual(content["_id"], pid, "Wrong project identifier")
255 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
256 self.assertGreater(
257 content["_admin"]["modified"], now, "Wrong modification time"
258 )
259 self.assertEqual(content["name"], new_name, "Wrong project name")
260 self.assertEqual(content["quotas"], quotas, "Wrong quotas")
261 with self.subTest(i=2):
262 new_name = "other-project-name"
263 quotas = {"baditems": random.SystemRandom().randint(0, 100)}
264 self.auth.get_project_list.side_effect = [[proj], []]
265 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
266 self.topic.edit(
267 self.fake_session, pid, {"name": new_name, "quotas": quotas}
268 )
269 self.assertEqual(
270 e.exception.http_code,
271 HTTPStatus.UNPROCESSABLE_ENTITY,
272 "Wrong HTTP status code",
273 )
274 self.assertIn(
275 "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format(
276 "baditems"
277 ),
278 norm(str(e.exception)),
279 "Wrong exception text",
280 )
281
282 def test_conflict_on_new(self):
283 with self.subTest(i=1):
284 rollback = []
285 pid = str(uuid4())
286 with self.assertRaises(
287 EngineException, msg="Accepted uuid as project name"
288 ) as e:
289 self.topic.new(rollback, self.fake_session, {"name": pid})
290 self.assertEqual(len(rollback), 0, "Wrong rollback length")
291 self.assertEqual(
292 e.exception.http_code,
293 HTTPStatus.UNPROCESSABLE_ENTITY,
294 "Wrong HTTP status code",
295 )
296 self.assertIn(
297 "project name '{}' cannot have an uuid format".format(pid),
298 norm(str(e.exception)),
299 "Wrong exception text",
300 )
301 with self.subTest(i=2):
302 rollback = []
303 self.auth.get_project_list.return_value = [
304 {"_id": test_pid, "name": self.test_name}
305 ]
306 with self.assertRaises(
307 EngineException, msg="Accepted existing project name"
308 ) as e:
309 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
310 self.assertEqual(len(rollback), 0, "Wrong rollback length")
311 self.assertEqual(
312 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
313 )
314 self.assertIn(
315 "project '{}' exists".format(self.test_name),
316 norm(str(e.exception)),
317 "Wrong exception text",
318 )
319
320 def test_conflict_on_edit(self):
321 with self.subTest(i=1):
322 self.auth.get_project_list.return_value = [
323 {"_id": test_pid, "name": self.test_name}
324 ]
325 new_name = str(uuid4())
326 with self.assertRaises(
327 EngineException, msg="Accepted uuid as project name"
328 ) as e:
329 self.topic.edit(self.fake_session, test_pid, {"name": new_name})
330 self.assertEqual(
331 e.exception.http_code,
332 HTTPStatus.UNPROCESSABLE_ENTITY,
333 "Wrong HTTP status code",
334 )
335 self.assertIn(
336 "project name '{}' cannot have an uuid format".format(new_name),
337 norm(str(e.exception)),
338 "Wrong exception text",
339 )
340 with self.subTest(i=2):
341 pid = str(uuid4())
342 self.auth.get_project_list.return_value = [{"_id": pid, "name": "admin"}]
343 with self.assertRaises(
344 EngineException, msg="Accepted renaming of project 'admin'"
345 ) as e:
346 self.topic.edit(self.fake_session, pid, {"name": "new-name"})
347 self.assertEqual(
348 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
349 )
350 self.assertIn(
351 "you cannot rename project 'admin'",
352 norm(str(e.exception)),
353 "Wrong exception text",
354 )
355 with self.subTest(i=3):
356 new_name = "new-project-name"
357 self.auth.get_project_list.side_effect = [
358 [{"_id": test_pid, "name": self.test_name}],
359 [{"_id": str(uuid4()), "name": new_name}],
360 ]
361 with self.assertRaises(
362 EngineException, msg="Accepted existing project name"
363 ) as e:
364 self.topic.edit(self.fake_session, pid, {"name": new_name})
365 self.assertEqual(
366 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
367 )
368 self.assertIn(
369 "project '{}' is already used".format(new_name),
370 norm(str(e.exception)),
371 "Wrong exception text",
372 )
373
374 def test_delete_project(self):
375 with self.subTest(i=1):
376 pid = str(uuid4())
377 self.auth.get_project.return_value = {
378 "_id": pid,
379 "name": "other-project-name",
380 }
381 self.auth.delete_project.return_value = {"deleted": 1}
382 self.auth.get_user_list.return_value = []
383 self.db.get_list.return_value = []
384 rc = self.topic.delete(self.fake_session, pid)
385 self.assertEqual(rc, {"deleted": 1}, "Wrong project deletion return info")
386 self.assertEqual(
387 self.auth.get_project.call_args[0][0], pid, "Wrong project identifier"
388 )
389 self.assertEqual(
390 self.auth.delete_project.call_args[0][0],
391 pid,
392 "Wrong project identifier",
393 )
394
395 def test_conflict_on_del(self):
396 with self.subTest(i=1):
397 self.auth.get_project.return_value = {
398 "_id": test_pid,
399 "name": self.test_name,
400 }
401 with self.assertRaises(
402 EngineException, msg="Accepted deletion of own project"
403 ) as e:
404 self.topic.delete(self.fake_session, self.test_name)
405 self.assertEqual(
406 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
407 )
408 self.assertIn(
409 "you cannot delete your own project",
410 norm(str(e.exception)),
411 "Wrong exception text",
412 )
413 with self.subTest(i=2):
414 self.auth.get_project.return_value = {"_id": str(uuid4()), "name": "admin"}
415 with self.assertRaises(
416 EngineException, msg="Accepted deletion of project 'admin'"
417 ) as e:
418 self.topic.delete(self.fake_session, "admin")
419 self.assertEqual(
420 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
421 )
422 self.assertIn(
423 "you cannot delete project 'admin'",
424 norm(str(e.exception)),
425 "Wrong exception text",
426 )
427 with self.subTest(i=3):
428 pid = str(uuid4())
429 name = "other-project-name"
430 self.auth.get_project.return_value = {"_id": pid, "name": name}
431 self.auth.get_user_list.return_value = [
432 {
433 "_id": str(uuid4()),
434 "username": self.test_name,
435 "project_role_mappings": [{"project": pid, "role": str(uuid4())}],
436 }
437 ]
438 with self.assertRaises(
439 EngineException, msg="Accepted deletion of used project"
440 ) as e:
441 self.topic.delete(self.fake_session, pid)
442 self.assertEqual(
443 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
444 )
445 self.assertIn(
446 "project '{}' ({}) is being used by user '{}'".format(
447 name, pid, self.test_name
448 ),
449 norm(str(e.exception)),
450 "Wrong exception text",
451 )
452 with self.subTest(i=4):
453 self.auth.get_user_list.return_value = []
454 self.db.get_list.return_value = [
455 {
456 "_id": str(uuid4()),
457 "id": self.test_name,
458 "_admin": {"projects_read": [pid], "projects_write": []},
459 }
460 ]
461 with self.assertRaises(
462 EngineException, msg="Accepted deletion of used project"
463 ) as e:
464 self.topic.delete(self.fake_session, pid)
465 self.assertEqual(
466 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
467 )
468 self.assertIn(
469 "project '{}' ({}) is being used by {} '{}'".format(
470 name, pid, "vnf descriptor", self.test_name
471 ),
472 norm(str(e.exception)),
473 "Wrong exception text",
474 )
475
476
477 class Test_RoleTopicAuth(TestCase):
478 @classmethod
479 def setUpClass(cls):
480 cls.test_name = "test-role-topic"
481 cls.test_operations = ["tokens:get"]
482
483 def setUp(self):
484 self.db = Mock(dbbase.DbBase())
485 self.fs = Mock(fsbase.FsBase())
486 self.msg = Mock(msgbase.MsgBase())
487 self.auth = Mock(authconn.Authconn(None, None, None))
488 self.auth.role_permissions = self.test_operations
489 self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth)
490 self.fake_session = {
491 "username": test_name,
492 "project_id": (test_pid,),
493 "method": None,
494 "admin": True,
495 "force": False,
496 "public": False,
497 "allow_show_user_project_role": True,
498 }
499 self.topic.check_quota = Mock(return_value=None) # skip quota
500
501 def test_new_role(self):
502 with self.subTest(i=1):
503 rollback = []
504 rid1 = str(uuid4())
505 perms_in = {"tokens": True}
506 perms_out = {"default": False, "admin": False, "tokens": True}
507 self.auth.get_role_list.return_value = []
508 self.auth.create_role.return_value = rid1
509 rid2, oid = self.topic.new(
510 rollback,
511 self.fake_session,
512 {"name": self.test_name, "permissions": perms_in},
513 )
514 self.assertEqual(len(rollback), 1, "Wrong rollback length")
515 self.assertEqual(rid2, rid1, "Wrong project identifier")
516 content = self.auth.create_role.call_args[0][0]
517 self.assertEqual(content["name"], self.test_name, "Wrong role name")
518 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
519 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
520 self.assertEqual(
521 content["_admin"]["modified"],
522 content["_admin"]["created"],
523 "Wrong modification time",
524 )
525 with self.subTest(i=2):
526 rollback = []
527 with self.assertRaises(
528 EngineException, msg="Accepted wrong permissions"
529 ) as e:
530 self.topic.new(
531 rollback,
532 self.fake_session,
533 {"name": "other-role-name", "permissions": {"projects": True}},
534 )
535 self.assertEqual(len(rollback), 0, "Wrong rollback length")
536 self.assertEqual(
537 e.exception.http_code,
538 HTTPStatus.UNPROCESSABLE_ENTITY,
539 "Wrong HTTP status code",
540 )
541 self.assertIn(
542 "invalid permission '{}'".format("projects"),
543 norm(str(e.exception)),
544 "Wrong exception text",
545 )
546
547 def test_edit_role(self):
548 now = time()
549 rid = str(uuid4())
550 role = {
551 "_id": rid,
552 "name": self.test_name,
553 "permissions": {"tokens": True},
554 "_admin": {"created": now, "modified": now},
555 }
556 with self.subTest(i=1):
557 self.auth.get_role_list.side_effect = [[role], []]
558 self.auth.get_role.return_value = role
559 new_name = "new-role-name"
560 perms_in = {"tokens": False, "tokens:get": True}
561 perms_out = {
562 "default": False,
563 "admin": False,
564 "tokens": False,
565 "tokens:get": True,
566 }
567 self.topic.edit(
568 self.fake_session, rid, {"name": new_name, "permissions": perms_in}
569 )
570 content = self.auth.update_role.call_args[0][0]
571 self.assertEqual(content["_id"], rid, "Wrong role identifier")
572 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
573 self.assertGreater(
574 content["_admin"]["modified"], now, "Wrong modification time"
575 )
576 self.assertEqual(content["name"], new_name, "Wrong role name")
577 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
578 with self.subTest(i=2):
579 new_name = "other-role-name"
580 perms_in = {"tokens": False, "tokens:post": True}
581 self.auth.get_role_list.side_effect = [[role], []]
582 with self.assertRaises(
583 EngineException, msg="Accepted wrong permissions"
584 ) as e:
585 self.topic.edit(
586 self.fake_session, rid, {"name": new_name, "permissions": perms_in}
587 )
588 self.assertEqual(
589 e.exception.http_code,
590 HTTPStatus.UNPROCESSABLE_ENTITY,
591 "Wrong HTTP status code",
592 )
593 self.assertIn(
594 "invalid permission '{}'".format("tokens:post"),
595 norm(str(e.exception)),
596 "Wrong exception text",
597 )
598
599 def test_delete_role(self):
600 with self.subTest(i=1):
601 rid = str(uuid4())
602 role = {"_id": rid, "name": "other-role-name"}
603 self.auth.get_role_list.return_value = [role]
604 self.auth.get_role.return_value = role
605 self.auth.delete_role.return_value = {"deleted": 1}
606 self.auth.get_user_list.return_value = []
607 rc = self.topic.delete(self.fake_session, rid)
608 self.assertEqual(rc, {"deleted": 1}, "Wrong role deletion return info")
609 self.assertEqual(
610 self.auth.get_role_list.call_args[0][0]["_id"],
611 rid,
612 "Wrong role identifier",
613 )
614 self.assertEqual(
615 self.auth.get_role.call_args[0][0], rid, "Wrong role identifier"
616 )
617 self.assertEqual(
618 self.auth.delete_role.call_args[0][0], rid, "Wrong role identifier"
619 )
620
621 def test_conflict_on_new(self):
622 with self.subTest(i=1):
623 rollback = []
624 rid = str(uuid4())
625 with self.assertRaises(
626 EngineException, msg="Accepted uuid as role name"
627 ) as e:
628 self.topic.new(rollback, self.fake_session, {"name": rid})
629 self.assertEqual(len(rollback), 0, "Wrong rollback length")
630 self.assertEqual(
631 e.exception.http_code,
632 HTTPStatus.UNPROCESSABLE_ENTITY,
633 "Wrong HTTP status code",
634 )
635 self.assertIn(
636 "role name '{}' cannot have an uuid format".format(rid),
637 norm(str(e.exception)),
638 "Wrong exception text",
639 )
640 with self.subTest(i=2):
641 rollback = []
642 self.auth.get_role_list.return_value = [
643 {"_id": str(uuid4()), "name": self.test_name}
644 ]
645 with self.assertRaises(
646 EngineException, msg="Accepted existing role name"
647 ) as e:
648 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
649 self.assertEqual(len(rollback), 0, "Wrong rollback length")
650 self.assertEqual(
651 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
652 )
653 self.assertIn(
654 "role name '{}' exists".format(self.test_name),
655 norm(str(e.exception)),
656 "Wrong exception text",
657 )
658
659 def test_conflict_on_edit(self):
660 rid = str(uuid4())
661 with self.subTest(i=1):
662 self.auth.get_role_list.return_value = [
663 {"_id": rid, "name": self.test_name, "permissions": {}}
664 ]
665 new_name = str(uuid4())
666 with self.assertRaises(
667 EngineException, msg="Accepted uuid as role name"
668 ) as e:
669 self.topic.edit(self.fake_session, rid, {"name": new_name})
670 self.assertEqual(
671 e.exception.http_code,
672 HTTPStatus.UNPROCESSABLE_ENTITY,
673 "Wrong HTTP status code",
674 )
675 self.assertIn(
676 "role name '{}' cannot have an uuid format".format(new_name),
677 norm(str(e.exception)),
678 "Wrong exception text",
679 )
680 for i, role_name in enumerate(["system_admin", "project_admin"], start=2):
681 with self.subTest(i=i):
682 rid = str(uuid4())
683 self.auth.get_role.return_value = {
684 "_id": rid,
685 "name": role_name,
686 "permissions": {},
687 }
688 with self.assertRaises(
689 EngineException,
690 msg="Accepted renaming of role '{}'".format(role_name),
691 ) as e:
692 self.topic.edit(self.fake_session, rid, {"name": "new-name"})
693 self.assertEqual(
694 e.exception.http_code,
695 HTTPStatus.FORBIDDEN,
696 "Wrong HTTP status code",
697 )
698 self.assertIn(
699 "you cannot rename role '{}'".format(role_name),
700 norm(str(e.exception)),
701 "Wrong exception text",
702 )
703 with self.subTest(i=i + 1):
704 new_name = "new-role-name"
705 self.auth.get_role_list.side_effect = [
706 [{"_id": rid, "name": self.test_name, "permissions": {}}],
707 [{"_id": str(uuid4()), "name": new_name, "permissions": {}}],
708 ]
709 self.auth.get_role.return_value = {
710 "_id": rid,
711 "name": self.test_name,
712 "permissions": {},
713 }
714 with self.assertRaises(
715 EngineException, msg="Accepted existing role name"
716 ) as e:
717 self.topic.edit(self.fake_session, rid, {"name": new_name})
718 self.assertEqual(
719 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
720 )
721 self.assertIn(
722 "role name '{}' exists".format(new_name),
723 norm(str(e.exception)),
724 "Wrong exception text",
725 )
726
727 def test_conflict_on_del(self):
728 for i, role_name in enumerate(["system_admin", "project_admin"], start=1):
729 with self.subTest(i=i):
730 rid = str(uuid4())
731 role = {"_id": rid, "name": role_name}
732 self.auth.get_role_list.return_value = [role]
733 self.auth.get_role.return_value = role
734 with self.assertRaises(
735 EngineException,
736 msg="Accepted deletion of role '{}'".format(role_name),
737 ) as e:
738 self.topic.delete(self.fake_session, rid)
739 self.assertEqual(
740 e.exception.http_code,
741 HTTPStatus.FORBIDDEN,
742 "Wrong HTTP status code",
743 )
744 self.assertIn(
745 "you cannot delete role '{}'".format(role_name),
746 norm(str(e.exception)),
747 "Wrong exception text",
748 )
749 with self.subTest(i=i + 1):
750 rid = str(uuid4())
751 name = "other-role-name"
752 role = {"_id": rid, "name": name}
753 self.auth.get_role_list.return_value = [role]
754 self.auth.get_role.return_value = role
755 self.auth.get_user_list.return_value = [
756 {
757 "_id": str(uuid4()),
758 "username": self.test_name,
759 "project_role_mappings": [{"project": str(uuid4()), "role": rid}],
760 }
761 ]
762 with self.assertRaises(
763 EngineException, msg="Accepted deletion of used role"
764 ) as e:
765 self.topic.delete(self.fake_session, rid)
766 self.assertEqual(
767 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
768 )
769 self.assertIn(
770 "role '{}' ({}) is being used by user '{}'".format(
771 name, rid, self.test_name
772 ),
773 norm(str(e.exception)),
774 "Wrong exception text",
775 )
776
777
778 class Test_UserTopicAuth(TestCase):
779 @classmethod
780 def setUpClass(cls):
781 cls.test_name = "test-user-topic"
782 cls.password = "Test@123"
783
784 def setUp(self):
785 # self.db = Mock(dbbase.DbBase())
786 self.db = DbMemory()
787 self.fs = Mock(fsbase.FsBase())
788 self.msg = Mock(msgbase.MsgBase())
789 self.auth = Mock(authconn.Authconn(None, None, None))
790 self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth)
791 self.fake_session = {
792 "username": test_name,
793 "project_id": (test_pid,),
794 "method": None,
795 "admin": True,
796 "force": False,
797 "public": False,
798 "allow_show_user_project_role": True,
799 }
800 self.topic.check_quota = Mock(return_value=None) # skip quota
801
802 def test_new_user(self):
803 uid1 = str(uuid4())
804 pid = str(uuid4())
805 self.auth.get_user_list.return_value = []
806 self.auth.get_project.return_value = {"_id": pid, "name": "some_project"}
807 self.auth.create_user.return_value = {"_id": uid1, "username": self.test_name}
808 with self.subTest(i=1):
809 rollback = []
810 rid = str(uuid4())
811 self.auth.get_role.return_value = {"_id": rid, "name": "some_role"}
812 prms_in = [{"project": "some_project", "role": "some_role"}]
813 prms_out = [{"project": pid, "role": rid}]
814 uid2, oid = self.topic.new(
815 rollback,
816 self.fake_session,
817 {
818 "username": self.test_name,
819 "password": self.password,
820 "project_role_mappings": prms_in,
821 },
822 )
823 self.assertEqual(len(rollback), 1, "Wrong rollback length")
824 self.assertEqual(uid2, uid1, "Wrong project identifier")
825 content = self.auth.create_user.call_args[0][0]
826 self.assertEqual(content["username"], self.test_name, "Wrong project name")
827 self.assertEqual(content["password"], self.password, "Wrong password")
828 self.assertEqual(
829 content["project_role_mappings"],
830 prms_out,
831 "Wrong project-role mappings",
832 )
833 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
834 self.assertEqual(
835 content["_admin"]["modified"],
836 content["_admin"]["created"],
837 "Wrong modification time",
838 )
839 with self.subTest(i=2):
840 rollback = []
841 def_rid = str(uuid4())
842 def_role = {"_id": def_rid, "name": "project_admin"}
843 self.auth.get_role.return_value = def_role
844 self.auth.get_role_list.return_value = [def_role]
845 prms_out = [{"project": pid, "role": def_rid}]
846 uid2, oid = self.topic.new(
847 rollback,
848 self.fake_session,
849 {
850 "username": self.test_name,
851 "password": self.password,
852 "projects": ["some_project"],
853 },
854 )
855 self.assertEqual(len(rollback), 1, "Wrong rollback length")
856 self.assertEqual(uid2, uid1, "Wrong project identifier")
857 content = self.auth.create_user.call_args[0][0]
858 self.assertEqual(content["username"], self.test_name, "Wrong project name")
859 self.assertEqual(content["password"], self.password, "Wrong password")
860 self.assertEqual(
861 content["project_role_mappings"],
862 prms_out,
863 "Wrong project-role mappings",
864 )
865 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
866 self.assertEqual(
867 content["_admin"]["modified"],
868 content["_admin"]["created"],
869 "Wrong modification time",
870 )
871 with self.subTest(i=3):
872 rollback = []
873 with self.assertRaises(
874 EngineException, msg="Accepted wrong project-role mappings"
875 ) as e:
876 self.topic.new(
877 rollback,
878 self.fake_session,
879 {
880 "username": "other-project-name",
881 "password": "Other@pwd1",
882 "project_role_mappings": [{}],
883 },
884 )
885 self.assertEqual(len(rollback), 0, "Wrong rollback length")
886 self.assertEqual(
887 e.exception.http_code,
888 HTTPStatus.UNPROCESSABLE_ENTITY,
889 "Wrong HTTP status code",
890 )
891 self.assertIn(
892 "format error at '{}' '{}'".format(
893 "project_role_mappings:{}", "'{}' is a required property"
894 ).format(0, "project"),
895 norm(str(e.exception)),
896 "Wrong exception text",
897 )
898 with self.subTest(i=4):
899 rollback = []
900 with self.assertRaises(EngineException, msg="Accepted wrong projects") as e:
901 self.topic.new(
902 rollback,
903 self.fake_session,
904 {
905 "username": "other-project-name",
906 "password": "Other@pwd1",
907 "projects": [],
908 },
909 )
910 self.assertEqual(len(rollback), 0, "Wrong rollback length")
911 self.assertEqual(
912 e.exception.http_code,
913 HTTPStatus.UNPROCESSABLE_ENTITY,
914 "Wrong HTTP status code",
915 )
916 self.assertIn(
917 "format error at '{}' '{}'".format(
918 "projects", "{} is too short"
919 ).format([]),
920 norm(str(e.exception)),
921 "Wrong exception text",
922 )
923
924 def test_edit_user(self):
925 now = time()
926 uid = str(uuid4())
927 pid1 = str(uuid4())
928 rid1 = str(uuid4())
929 prms = [
930 {
931 "project": pid1,
932 "project_name": "project-1",
933 "role": rid1,
934 "role_name": "role-1",
935 }
936 ]
937 user = {
938 "_id": uid,
939 "username": self.test_name,
940 "project_role_mappings": prms,
941 "_admin": {"created": now, "modified": now},
942 }
943 with self.subTest(i=1):
944 self.auth.get_user_list.side_effect = [[user], []]
945 self.auth.get_user.return_value = user
946 pid2 = str(uuid4())
947 rid2 = str(uuid4())
948 self.auth.get_project.side_effect = [
949 {"_id": pid2, "name": "project-2"},
950 {"_id": pid1, "name": "project-1"},
951 ]
952 self.auth.get_role.side_effect = [
953 {"_id": rid2, "name": "role-2"},
954 {"_id": rid1, "name": "role-1"},
955 ]
956 new_name = "new-user-name"
957 new_pasw = "New@pwd1"
958 add_prms = [{"project": pid2, "role": rid2}]
959 rem_prms = [{"project": pid1, "role": rid1}]
960 self.topic.edit(
961 self.fake_session,
962 uid,
963 {
964 "username": new_name,
965 "password": new_pasw,
966 "add_project_role_mappings": add_prms,
967 "remove_project_role_mappings": rem_prms,
968 },
969 )
970 content = self.auth.update_user.call_args[0][0]
971 self.assertEqual(content["_id"], uid, "Wrong user identifier")
972 self.assertEqual(content["username"], new_name, "Wrong user name")
973 self.assertEqual(content["password"], new_pasw, "Wrong user password")
974 self.assertEqual(
975 content["add_project_role_mappings"],
976 add_prms,
977 "Wrong project-role mappings to add",
978 )
979 self.assertEqual(
980 content["remove_project_role_mappings"],
981 prms,
982 "Wrong project-role mappings to remove",
983 )
984 with self.subTest(i=2):
985 new_name = "other-user-name"
986 new_prms = [{}]
987 self.auth.get_role_list.side_effect = [[user], []]
988 self.auth.get_user_list.side_effect = [[user]]
989 with self.assertRaises(
990 EngineException, msg="Accepted wrong project-role mappings"
991 ) as e:
992 self.topic.edit(
993 self.fake_session,
994 uid,
995 {"username": new_name, "project_role_mappings": new_prms},
996 )
997 self.assertEqual(
998 e.exception.http_code,
999 HTTPStatus.UNPROCESSABLE_ENTITY,
1000 "Wrong HTTP status code",
1001 )
1002 self.assertIn(
1003 "format error at '{}' '{}'".format(
1004 "project_role_mappings:{}", "'{}' is a required property"
1005 ).format(0, "project"),
1006 norm(str(e.exception)),
1007 "Wrong exception text",
1008 )
1009 with self.subTest(i=3):
1010 self.auth.get_user_list.side_effect = [[user], []]
1011 self.auth.get_user.return_value = user
1012 old_password = self.password
1013 new_pasw = "New@pwd1"
1014 self.topic.edit(
1015 self.fake_session,
1016 uid,
1017 {
1018 "old_password": old_password,
1019 "password": new_pasw,
1020 },
1021 )
1022 content = self.auth.update_user.call_args[0][0]
1023 self.assertEqual(
1024 content["old_password"], old_password, "Wrong old password"
1025 )
1026 self.assertEqual(content["password"], new_pasw, "Wrong user password")
1027
1028 def test_delete_user(self):
1029 with self.subTest(i=1):
1030 uid = str(uuid4())
1031 self.fake_session["username"] = self.test_name
1032 user = user = {
1033 "_id": uid,
1034 "username": "other-user-name",
1035 "project_role_mappings": [],
1036 }
1037 self.auth.get_user.return_value = user
1038 self.auth.delete_user.return_value = {"deleted": 1}
1039 rc = self.topic.delete(self.fake_session, uid)
1040 self.assertEqual(rc, {"deleted": 1}, "Wrong user deletion return info")
1041 self.assertEqual(
1042 self.auth.get_user.call_args[0][0], uid, "Wrong user identifier"
1043 )
1044 self.assertEqual(
1045 self.auth.delete_user.call_args[0][0], uid, "Wrong user identifier"
1046 )
1047
1048 def test_conflict_on_new(self):
1049 with self.subTest(i=1):
1050 rollback = []
1051 uid = str(uuid4())
1052 with self.assertRaises(
1053 EngineException, msg="Accepted uuid as username"
1054 ) as e:
1055 self.topic.new(
1056 rollback,
1057 self.fake_session,
1058 {
1059 "username": uid,
1060 "password": self.password,
1061 "projects": [test_pid],
1062 },
1063 )
1064 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1065 self.assertEqual(
1066 e.exception.http_code,
1067 HTTPStatus.UNPROCESSABLE_ENTITY,
1068 "Wrong HTTP status code",
1069 )
1070 self.assertIn(
1071 "username '{}' cannot have a uuid format".format(uid),
1072 norm(str(e.exception)),
1073 "Wrong exception text",
1074 )
1075 with self.subTest(i=2):
1076 rollback = []
1077 self.auth.get_user_list.return_value = [
1078 {"_id": str(uuid4()), "username": self.test_name}
1079 ]
1080 with self.assertRaises(
1081 EngineException, msg="Accepted existing username"
1082 ) as e:
1083 self.topic.new(
1084 rollback,
1085 self.fake_session,
1086 {
1087 "username": self.test_name,
1088 "password": self.password,
1089 "projects": [test_pid],
1090 },
1091 )
1092 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1093 self.assertEqual(
1094 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1095 )
1096 self.assertIn(
1097 "username '{}' is already used".format(self.test_name),
1098 norm(str(e.exception)),
1099 "Wrong exception text",
1100 )
1101 with self.subTest(i=3):
1102 rollback = []
1103 self.auth.get_user_list.return_value = []
1104 self.auth.get_role_list.side_effect = [[], []]
1105 with self.assertRaises(
1106 AuthconnNotFoundException, msg="Accepted user without default role"
1107 ) as e:
1108 self.topic.new(
1109 rollback,
1110 self.fake_session,
1111 {
1112 "username": self.test_name,
1113 "password": self.password,
1114 "projects": [str(uuid4())],
1115 },
1116 )
1117 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1118 self.assertEqual(
1119 e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code"
1120 )
1121 self.assertIn(
1122 "can't find default role for user '{}'".format(self.test_name),
1123 norm(str(e.exception)),
1124 "Wrong exception text",
1125 )
1126
1127 def test_conflict_on_edit(self):
1128 uid = str(uuid4())
1129 with self.subTest(i=1):
1130 self.auth.get_user_list.return_value = [
1131 {"_id": uid, "username": self.test_name}
1132 ]
1133 new_name = str(uuid4())
1134 with self.assertRaises(
1135 EngineException, msg="Accepted uuid as username"
1136 ) as e:
1137 self.topic.edit(self.fake_session, uid, {"username": new_name})
1138 self.assertEqual(
1139 e.exception.http_code,
1140 HTTPStatus.UNPROCESSABLE_ENTITY,
1141 "Wrong HTTP status code",
1142 )
1143 self.assertIn(
1144 "username '{}' cannot have an uuid format".format(new_name),
1145 norm(str(e.exception)),
1146 "Wrong exception text",
1147 )
1148 with self.subTest(i=2):
1149 self.auth.get_user_list.return_value = [
1150 {"_id": uid, "username": self.test_name}
1151 ]
1152 self.auth.get_role_list.side_effect = [[], []]
1153 with self.assertRaises(
1154 AuthconnNotFoundException, msg="Accepted user without default role"
1155 ) as e:
1156 self.topic.edit(self.fake_session, uid, {"projects": [str(uuid4())]})
1157 self.assertEqual(
1158 e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code"
1159 )
1160 self.assertIn(
1161 "can't find a default role for user '{}'".format(self.test_name),
1162 norm(str(e.exception)),
1163 "Wrong exception text",
1164 )
1165 with self.subTest(i=3):
1166 admin_uid = str(uuid4())
1167 self.auth.get_user_list.return_value = [
1168 {"_id": admin_uid, "username": "admin"}
1169 ]
1170 with self.assertRaises(
1171 EngineException,
1172 msg="Accepted removing system_admin role from admin user",
1173 ) as e:
1174 self.topic.edit(
1175 self.fake_session,
1176 admin_uid,
1177 {
1178 "remove_project_role_mappings": [
1179 {"project": "admin", "role": "system_admin"}
1180 ]
1181 },
1182 )
1183 self.assertEqual(
1184 e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code"
1185 )
1186 self.assertIn(
1187 "you cannot remove system_admin role from admin user",
1188 norm(str(e.exception)),
1189 "Wrong exception text",
1190 )
1191 with self.subTest(i=4):
1192 new_name = "new-user-name"
1193 self.auth.get_user_list.side_effect = [
1194 [{"_id": uid, "name": self.test_name}],
1195 [{"_id": str(uuid4()), "name": new_name}],
1196 ]
1197 with self.assertRaises(
1198 EngineException, msg="Accepted existing username"
1199 ) as e:
1200 self.topic.edit(self.fake_session, uid, {"username": new_name})
1201 self.assertEqual(
1202 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1203 )
1204 self.assertIn(
1205 "username '{}' is already used".format(new_name),
1206 norm(str(e.exception)),
1207 "Wrong exception text",
1208 )
1209
1210 def test_conflict_on_del(self):
1211 with self.subTest(i=1):
1212 uid = str(uuid4())
1213 self.fake_session["username"] = self.test_name
1214 user = user = {
1215 "_id": uid,
1216 "username": self.test_name,
1217 "project_role_mappings": [],
1218 }
1219 self.auth.get_user.return_value = user
1220 with self.assertRaises(
1221 EngineException, msg="Accepted deletion of own user"
1222 ) as e:
1223 self.topic.delete(self.fake_session, uid)
1224 self.assertEqual(
1225 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1226 )
1227 self.assertIn(
1228 "you cannot delete your own login user",
1229 norm(str(e.exception)),
1230 "Wrong exception text",
1231 )
1232
1233 def test_user_management(self):
1234 self.config = {
1235 "user_management": True,
1236 "pwd_expire_days": 30,
1237 "max_pwd_attempt": 5,
1238 "account_expire_days": 90,
1239 "version": "dev",
1240 "deviceVendor": "test",
1241 "deviceProduct": "test",
1242 }
1243 self.permissions = {"admin": True, "default": True}
1244 now = time()
1245 rid = str(uuid4())
1246 role = {
1247 "_id": rid,
1248 "name": self.test_name,
1249 "permissions": self.permissions,
1250 "_admin": {"created": now, "modified": now},
1251 }
1252 self.db.create("roles", role)
1253 admin_user = {
1254 "_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500",
1255 "username": "admin",
1256 "password": "bf0d9f988ad9b404464cf8c8749b298209b05fd404119bae0c11e247efbbc4cb",
1257 "_admin": {
1258 "created": 1663058370.7721832,
1259 "modified": 1663681183.5651639,
1260 "salt": "37587e7e0c2f4dbfb9416f3fb5543e2b",
1261 "last_token_time": 1666876472.2962265,
1262 "user_status": "always-active",
1263 "retry_count": 0,
1264 },
1265 "project_role_mappings": [
1266 {"project": "a595ce4e-09dc-4b24-9d6f-e723830bc66b", "role": rid}
1267 ],
1268 }
1269 self.db.create("users", admin_user)
1270 with self.subTest(i=1):
1271 self.user_create = AuthconnInternal(self.config, self.db, self.permissions)
1272 user_info = {"username": "user_mgmt_true", "password": "Test@123"}
1273 self.user_create.create_user(user_info)
1274 user = self.db.get_one("users", {"username": user_info["username"]})
1275 self.assertEqual(user["username"], user_info["username"], "Wrong user name")
1276 self.assertEqual(
1277 user["_admin"]["user_status"], "active", "User status is unknown"
1278 )
1279 self.assertIn("password_expire_time", user["_admin"], "Key is not there")
1280 self.assertIn("account_expire_time", user["_admin"], "Key is not there")
1281 with self.subTest(i=2):
1282 self.user_update = AuthconnInternal(self.config, self.db, self.permissions)
1283 locked_user = {
1284 "username": "user_lock",
1285 "password": "c94ba8cfe81985cf5c84dff16d5bac95814ab17e44a8871755eb4cf3a27b7d3d",
1286 "_admin": {
1287 "created": 1667207552.2191198,
1288 "modified": 1667207552.2191815,
1289 "salt": "560a5d51b1d64bb4b9cae0ccff3f1102",
1290 "user_status": "locked",
1291 "password_expire_time": 1667207552.2191815,
1292 "account_expire_time": 1674983552.2191815,
1293 "retry_count": 5,
1294 "last_token_time": 1667207552.2191815,
1295 },
1296 "_id": "73bbbb71-ed38-4b79-9f58-ece19e7e32d6",
1297 }
1298 self.db.create("users", locked_user)
1299 user_info = {
1300 "_id": "73bbbb71-ed38-4b79-9f58-ece19e7e32d6",
1301 "system_admin_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500",
1302 "unlock": True,
1303 }
1304 self.assertEqual(
1305 locked_user["_admin"]["user_status"], "locked", "User status is unknown"
1306 )
1307 self.user_update.update_user(user_info)
1308 user = self.db.get_one("users", {"username": locked_user["username"]})
1309 self.assertEqual(
1310 user["username"], locked_user["username"], "Wrong user name"
1311 )
1312 self.assertEqual(
1313 user["_admin"]["user_status"], "active", "User status is unknown"
1314 )
1315 self.assertEqual(user["_admin"]["retry_count"], 0, "retry_count is unknown")
1316 with self.subTest(i=3):
1317 self.user_update = AuthconnInternal(self.config, self.db, self.permissions)
1318 expired_user = {
1319 "username": "user_expire",
1320 "password": "c94ba8cfe81985cf5c84dff16d5bac95814ab17e44a8871755eb4cf3a27b7d3d",
1321 "_admin": {
1322 "created": 1665602087.601298,
1323 "modified": 1665636442.1245084,
1324 "salt": "560a5d51b1d64bb4b9cae0ccff3f1102",
1325 "user_status": "expired",
1326 "password_expire_time": 1668248628.2191815,
1327 "account_expire_time": 1666952628.2191815,
1328 "retry_count": 0,
1329 "last_token_time": 1666779828.2171815,
1330 },
1331 "_id": "3266430f-8222-407f-b08f-3a242504ab94",
1332 }
1333 self.db.create("users", expired_user)
1334 user_info = {
1335 "_id": "3266430f-8222-407f-b08f-3a242504ab94",
1336 "system_admin_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500",
1337 "renew": True,
1338 }
1339 self.assertEqual(
1340 expired_user["_admin"]["user_status"],
1341 "expired",
1342 "User status is unknown",
1343 )
1344 self.user_update.update_user(user_info)
1345 user = self.db.get_one("users", {"username": expired_user["username"]})
1346 self.assertEqual(
1347 user["username"], expired_user["username"], "Wrong user name"
1348 )
1349 self.assertEqual(
1350 user["_admin"]["user_status"], "active", "User status is unknown"
1351 )
1352 self.assertGreater(
1353 user["_admin"]["account_expire_time"],
1354 expired_user["_admin"]["account_expire_time"],
1355 "User expire time is not get extended",
1356 )
1357 with self.subTest(i=4):
1358 self.config.update({"user_management": False})
1359 self.user_create = AuthconnInternal(self.config, self.db, self.permissions)
1360 user_info = {"username": "user_mgmt_false", "password": "Test@123"}
1361 self.user_create.create_user(user_info)
1362 user = self.db.get_one("users", {"username": user_info["username"]})
1363 self.assertEqual(user["username"], user_info["username"], "Wrong user name")
1364 self.assertEqual(
1365 user["_admin"]["user_status"], "active", "User status is unknown"
1366 )
1367 self.assertNotIn("password_expire_time", user["_admin"], "Key is not there")
1368 self.assertNotIn("account_expire_time", user["_admin"], "Key is not there")
1369
1370
1371 class Test_CommonVimWimSdn(TestCase):
1372 @classmethod
1373 def setUpClass(cls):
1374 cls.test_name = "test-cim-topic" # CIM = Common Infrastructure Manager
1375
1376 def setUp(self):
1377 self.db = Mock(dbbase.DbBase())
1378 self.fs = Mock(fsbase.FsBase())
1379 self.msg = Mock(msgbase.MsgBase())
1380 self.auth = Mock(authconn.Authconn(None, None, None))
1381 self.topic = CommonVimWimSdn(self.db, self.fs, self.msg, self.auth)
1382 # Use WIM schemas for testing because they are the simplest
1383 self.topic._send_msg = Mock()
1384 self.topic.topic = "wims"
1385 self.topic.schema_new = validation.wim_account_new_schema
1386 self.topic.schema_edit = validation.wim_account_edit_schema
1387 self.fake_session = {
1388 "username": test_name,
1389 "project_id": (test_pid,),
1390 "method": None,
1391 "admin": True,
1392 "force": False,
1393 "public": False,
1394 "allow_show_user_project_role": True,
1395 }
1396 self.topic.check_quota = Mock(return_value=None) # skip quota
1397
1398 def test_new_cvws(self):
1399 test_url = "http://0.0.0.0:0"
1400 with self.subTest(i=1):
1401 rollback = []
1402 test_type = "fake"
1403 self.db.get_one.return_value = None
1404 self.db.create.side_effect = lambda self, content: content["_id"]
1405 cid, oid = self.topic.new(
1406 rollback,
1407 self.fake_session,
1408 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type},
1409 )
1410 self.assertEqual(len(rollback), 1, "Wrong rollback length")
1411 args = self.db.create.call_args[0]
1412 content = args[1]
1413 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
1414 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
1415 self.assertEqual(content["name"], self.test_name, "Wrong CIM name")
1416 self.assertEqual(content["wim_url"], test_url, "Wrong URL")
1417 self.assertEqual(content["wim_type"], test_type, "Wrong CIM type")
1418 self.assertEqual(content["schema_version"], "1.11", "Wrong schema version")
1419 self.assertEqual(content["op_id"], oid, "Wrong operation identifier")
1420 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
1421 self.assertEqual(
1422 content["_admin"]["modified"],
1423 content["_admin"]["created"],
1424 "Wrong modification time",
1425 )
1426 self.assertEqual(
1427 content["_admin"]["operationalState"],
1428 "PROCESSING",
1429 "Wrong operational state",
1430 )
1431 self.assertEqual(
1432 content["_admin"]["projects_read"],
1433 [test_pid],
1434 "Wrong read-only projects",
1435 )
1436 self.assertEqual(
1437 content["_admin"]["projects_write"],
1438 [test_pid],
1439 "Wrong read/write projects",
1440 )
1441 self.assertIsNone(
1442 content["_admin"]["current_operation"], "Wrong current operation"
1443 )
1444 self.assertEqual(
1445 len(content["_admin"]["operations"]), 1, "Wrong number of operations"
1446 )
1447 operation = content["_admin"]["operations"][0]
1448 self.assertEqual(
1449 operation["lcmOperationType"], "create", "Wrong operation type"
1450 )
1451 self.assertEqual(
1452 operation["operationState"], "PROCESSING", "Wrong operation state"
1453 )
1454 self.assertGreater(
1455 operation["startTime"],
1456 content["_admin"]["created"],
1457 "Wrong operation start time",
1458 )
1459 self.assertGreater(
1460 operation["statusEnteredTime"],
1461 content["_admin"]["created"],
1462 "Wrong operation status enter time",
1463 )
1464 self.assertEqual(
1465 operation["detailed-status"], "", "Wrong operation detailed status info"
1466 )
1467 self.assertIsNone(
1468 operation["operationParams"], "Wrong operation parameters"
1469 )
1470 # This test is disabled. From Feature 8030 we admit all WIM/SDN types
1471 # with self.subTest(i=2):
1472 # rollback = []
1473 # test_type = "bad_type"
1474 # with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e:
1475 # self.topic.new(rollback, self.fake_session,
1476 # {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
1477 # self.assertEqual(len(rollback), 0, "Wrong rollback length")
1478 # self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
1479 # self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type,""),
1480 # norm(str(e.exception)), "Wrong exception text")
1481
1482 def test_conflict_on_new(self):
1483 with self.subTest(i=1):
1484 rollback = []
1485 test_url = "http://0.0.0.0:0"
1486 test_type = "fake"
1487 self.db.get_one.return_value = {"_id": str(uuid4()), "name": self.test_name}
1488 with self.assertRaises(
1489 EngineException, msg="Accepted existing CIM name"
1490 ) as e:
1491 self.topic.new(
1492 rollback,
1493 self.fake_session,
1494 {
1495 "name": self.test_name,
1496 "wim_url": test_url,
1497 "wim_type": test_type,
1498 },
1499 )
1500 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1501 self.assertEqual(
1502 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1503 )
1504 self.assertIn(
1505 "name '{}' already exists for {}".format(
1506 self.test_name, self.topic.topic
1507 ),
1508 norm(str(e.exception)),
1509 "Wrong exception text",
1510 )
1511
1512 def test_edit_cvws(self):
1513 now = time()
1514 cid = str(uuid4())
1515 test_url = "http://0.0.0.0:0"
1516 test_type = "fake"
1517 cvws = {
1518 "_id": cid,
1519 "name": self.test_name,
1520 "wim_url": test_url,
1521 "wim_type": test_type,
1522 "_admin": {
1523 "created": now,
1524 "modified": now,
1525 "operations": [{"lcmOperationType": "create"}],
1526 },
1527 }
1528 with self.subTest(i=1):
1529 new_name = "new-cim-name"
1530 new_url = "https://1.1.1.1:1"
1531 new_type = "onos"
1532 self.db.get_one.side_effect = [cvws, None]
1533 self.db.replace.return_value = {"updated": 1}
1534 # self.db.encrypt.side_effect = [b64str(), b64str()]
1535 self.topic.edit(
1536 self.fake_session,
1537 cid,
1538 {"name": new_name, "wim_url": new_url, "wim_type": new_type},
1539 )
1540 args = self.db.replace.call_args[0]
1541 content = args[2]
1542 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
1543 self.assertEqual(args[1], cid, "Wrong CIM identifier")
1544 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
1545 self.assertEqual(content["name"], new_name, "Wrong CIM name")
1546 self.assertEqual(content["wim_type"], new_type, "Wrong CIM type")
1547 self.assertEqual(content["wim_url"], new_url, "Wrong URL")
1548 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
1549 self.assertGreater(
1550 content["_admin"]["modified"],
1551 content["_admin"]["created"],
1552 "Wrong modification time",
1553 )
1554 self.assertEqual(
1555 len(content["_admin"]["operations"]), 2, "Wrong number of operations"
1556 )
1557 operation = content["_admin"]["operations"][1]
1558 self.assertEqual(
1559 operation["lcmOperationType"], "edit", "Wrong operation type"
1560 )
1561 self.assertEqual(
1562 operation["operationState"], "PROCESSING", "Wrong operation state"
1563 )
1564 self.assertGreater(
1565 operation["startTime"],
1566 content["_admin"]["modified"],
1567 "Wrong operation start time",
1568 )
1569 self.assertGreater(
1570 operation["statusEnteredTime"],
1571 content["_admin"]["modified"],
1572 "Wrong operation status enter time",
1573 )
1574 self.assertEqual(
1575 operation["detailed-status"], "", "Wrong operation detailed status info"
1576 )
1577 self.assertIsNone(
1578 operation["operationParams"], "Wrong operation parameters"
1579 )
1580 with self.subTest(i=2):
1581 self.db.get_one.side_effect = [cvws]
1582 with self.assertRaises(EngineException, msg="Accepted wrong property") as e:
1583 self.topic.edit(
1584 self.fake_session,
1585 str(uuid4()),
1586 {"name": "new-name", "extra_prop": "anything"},
1587 )
1588 self.assertEqual(
1589 e.exception.http_code,
1590 HTTPStatus.UNPROCESSABLE_ENTITY,
1591 "Wrong HTTP status code",
1592 )
1593 self.assertIn(
1594 "format error '{}'".format(
1595 "additional properties are not allowed ('{}' was unexpected)"
1596 ).format("extra_prop"),
1597 norm(str(e.exception)),
1598 "Wrong exception text",
1599 )
1600
1601 def test_conflict_on_edit(self):
1602 with self.subTest(i=1):
1603 cid = str(uuid4())
1604 new_name = "new-cim-name"
1605 self.db.get_one.side_effect = [
1606 {"_id": cid, "name": self.test_name},
1607 {"_id": str(uuid4()), "name": new_name},
1608 ]
1609 with self.assertRaises(
1610 EngineException, msg="Accepted existing CIM name"
1611 ) as e:
1612 self.topic.edit(self.fake_session, cid, {"name": new_name})
1613 self.assertEqual(
1614 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1615 )
1616 self.assertIn(
1617 "name '{}' already exists for {}".format(new_name, self.topic.topic),
1618 norm(str(e.exception)),
1619 "Wrong exception text",
1620 )
1621
1622 def test_delete_cvws(self):
1623 cid = str(uuid4())
1624 ro_pid = str(uuid4())
1625 rw_pid = str(uuid4())
1626 cvws = {"_id": cid, "name": self.test_name}
1627 self.db.get_list.return_value = []
1628 with self.subTest(i=1):
1629 cvws["_admin"] = {
1630 "projects_read": [test_pid, ro_pid, rw_pid],
1631 "projects_write": [test_pid, rw_pid],
1632 }
1633 self.db.get_one.return_value = cvws
1634 oid = self.topic.delete(self.fake_session, cid)
1635 self.assertIsNone(oid, "Wrong operation identifier")
1636 self.assertEqual(
1637 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1638 )
1639 self.assertEqual(
1640 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1641 )
1642 self.assertEqual(
1643 self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic"
1644 )
1645 self.assertEqual(
1646 self.db.set_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1647 )
1648 self.assertEqual(
1649 self.db.set_one.call_args[1]["update_dict"],
1650 None,
1651 "Wrong read-only projects update",
1652 )
1653 self.assertEqual(
1654 self.db.set_one.call_args[1]["pull_list"],
1655 {
1656 "_admin.projects_read": (test_pid,),
1657 "_admin.projects_write": (test_pid,),
1658 },
1659 "Wrong read/write projects update",
1660 )
1661 self.topic._send_msg.assert_not_called()
1662 with self.subTest(i=2):
1663 now = time()
1664 cvws["_admin"] = {
1665 "projects_read": [test_pid],
1666 "projects_write": [test_pid],
1667 "operations": [],
1668 }
1669 self.db.get_one.return_value = cvws
1670 oid = self.topic.delete(self.fake_session, cid)
1671 self.assertEqual(oid, cid + ":0", "Wrong operation identifier")
1672 self.assertEqual(
1673 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1674 )
1675 self.assertEqual(
1676 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1677 )
1678 self.assertEqual(
1679 self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic"
1680 )
1681 self.assertEqual(
1682 self.db.set_one.call_args[0][1]["_id"], cid, "Wrong user identifier"
1683 )
1684 self.assertEqual(
1685 self.db.set_one.call_args[1]["update_dict"],
1686 {"_admin.to_delete": True},
1687 "Wrong _admin.to_delete update",
1688 )
1689 operation = self.db.set_one.call_args[1]["push"]["_admin.operations"]
1690 self.assertEqual(
1691 operation["lcmOperationType"], "delete", "Wrong operation type"
1692 )
1693 self.assertEqual(
1694 operation["operationState"], "PROCESSING", "Wrong operation state"
1695 )
1696 self.assertEqual(
1697 operation["detailed-status"], "", "Wrong operation detailed status"
1698 )
1699 self.assertIsNone(
1700 operation["operationParams"], "Wrong operation parameters"
1701 )
1702 self.assertGreater(
1703 operation["startTime"], now, "Wrong operation start time"
1704 )
1705 self.assertGreater(
1706 operation["statusEnteredTime"], now, "Wrong operation status enter time"
1707 )
1708 self.topic._send_msg.assert_called_once_with(
1709 "delete", {"_id": cid, "op_id": cid + ":0"}, not_send_msg=None
1710 )
1711 with self.subTest(i=3):
1712 cvws["_admin"] = {
1713 "projects_read": [],
1714 "projects_write": [],
1715 "operations": [],
1716 }
1717 self.db.get_one.return_value = cvws
1718 self.topic._send_msg.reset_mock()
1719 self.db.get_one.reset_mock()
1720 self.db.del_one.reset_mock()
1721 self.fake_session["force"] = True # to force deletion
1722 self.fake_session["admin"] = True # to force deletion
1723 self.fake_session["project_id"] = [] # to force deletion
1724 oid = self.topic.delete(self.fake_session, cid)
1725 self.assertIsNone(oid, "Wrong operation identifier")
1726 self.assertEqual(
1727 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1728 )
1729 self.assertEqual(
1730 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1731 )
1732 self.assertEqual(
1733 self.db.del_one.call_args[0][0], self.topic.topic, "Wrong topic"
1734 )
1735 self.assertEqual(
1736 self.db.del_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1737 )
1738 self.topic._send_msg.assert_called_once_with(
1739 "deleted", {"_id": cid, "op_id": None}, not_send_msg=None
1740 )
1741
1742
1743 if __name__ == "__main__":
1744 unittest.main()