Revert "Feature 10941: User Management Enhancements"
[osm/NBI.git] / osm_nbi / tests / test_admin_topics.py
1 #! /usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 __author__ = "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com"
18 __date__ = "$2019-10-019"
19
20 import unittest
21 import random
22 from unittest import TestCase
23 from unittest.mock import Mock, patch, call
24 from uuid import uuid4
25 from http import HTTPStatus
26 from time import time
27 from osm_common import dbbase, fsbase, msgbase
28 from osm_nbi import authconn, validation
29 from osm_nbi.admin_topics import (
30 ProjectTopicAuth,
31 RoleTopicAuth,
32 UserTopicAuth,
33 CommonVimWimSdn,
34 VcaTopic,
35 )
36 from osm_nbi.engine import EngineException
37 from osm_nbi.authconn import AuthconnNotFoundException
38
39
40 test_pid = str(uuid4())
41 test_name = "test-user"
42
43
44 def norm(str):
45 """Normalize string for checking"""
46 return " ".join(str.strip().split()).lower()
47
48
49 class TestVcaTopic(TestCase):
50 def setUp(self):
51 self.db = Mock(dbbase.DbBase())
52 self.fs = Mock(fsbase.FsBase())
53 self.msg = Mock(msgbase.MsgBase())
54 self.auth = Mock(authconn.Authconn(None, None, None))
55 self.vca_topic = VcaTopic(self.db, self.fs, self.msg, self.auth)
56
57 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_new")
58 def test_format_on_new(self, mock_super_format_on_new):
59 content = {
60 "_id": "id",
61 "secret": "encrypted_secret",
62 "cacert": "encrypted_cacert",
63 }
64 self.db.encrypt.side_effect = ["secret", "cacert"]
65 mock_super_format_on_new.return_value = "1234"
66
67 oid = self.vca_topic.format_on_new(content)
68
69 self.assertEqual(oid, "1234")
70 self.assertEqual(content["secret"], "secret")
71 self.assertEqual(content["cacert"], "cacert")
72 self.db.encrypt.assert_has_calls(
73 [
74 call("encrypted_secret", schema_version="1.11", salt="id"),
75 call("encrypted_cacert", schema_version="1.11", salt="id"),
76 ]
77 )
78 mock_super_format_on_new.assert_called_with(content, None, False)
79
80 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_edit")
81 def test_format_on_edit(self, mock_super_format_on_edit):
82 edit_content = {
83 "_id": "id",
84 "secret": "encrypted_secret",
85 "cacert": "encrypted_cacert",
86 }
87 final_content = {
88 "_id": "id",
89 "schema_version": "1.11",
90 }
91 self.db.encrypt.side_effect = ["secret", "cacert"]
92 mock_super_format_on_edit.return_value = "1234"
93
94 oid = self.vca_topic.format_on_edit(final_content, edit_content)
95
96 self.assertEqual(oid, "1234")
97 self.assertEqual(final_content["secret"], "secret")
98 self.assertEqual(final_content["cacert"], "cacert")
99 self.db.encrypt.assert_has_calls(
100 [
101 call("encrypted_secret", schema_version="1.11", salt="id"),
102 call("encrypted_cacert", schema_version="1.11", salt="id"),
103 ]
104 )
105 mock_super_format_on_edit.assert_called()
106
107 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
108 def test_check_conflict_on_del(self, mock_check_conflict_on_del):
109 session = {
110 "project_id": "project-id",
111 "force": False,
112 }
113 _id = "vca-id"
114 db_content = {}
115
116 self.db.get_list.return_value = None
117
118 self.vca_topic.check_conflict_on_del(session, _id, db_content)
119
120 self.db.get_list.assert_called_with(
121 "vim_accounts",
122 {"vca": _id, "_admin.projects_read.cont": "project-id"},
123 )
124 mock_check_conflict_on_del.assert_called_with(session, _id, db_content)
125
126 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
127 def test_check_conflict_on_del_force(self, mock_check_conflict_on_del):
128 session = {
129 "project_id": "project-id",
130 "force": True,
131 }
132 _id = "vca-id"
133 db_content = {}
134
135 self.vca_topic.check_conflict_on_del(session, _id, db_content)
136
137 self.db.get_list.assert_not_called()
138 mock_check_conflict_on_del.assert_not_called()
139
140 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
141 def test_check_conflict_on_del_with_conflict(self, mock_check_conflict_on_del):
142 session = {
143 "project_id": "project-id",
144 "force": False,
145 }
146 _id = "vca-id"
147 db_content = {}
148
149 self.db.get_list.return_value = {"_id": "vim", "vca": "vca-id"}
150
151 with self.assertRaises(EngineException) as context:
152 self.vca_topic.check_conflict_on_del(session, _id, db_content)
153 self.assertEqual(
154 context.exception,
155 EngineException(
156 "There is at least one VIM account using this vca",
157 http_code=HTTPStatus.CONFLICT,
158 ),
159 )
160
161 self.db.get_list.assert_called_with(
162 "vim_accounts",
163 {"vca": _id, "_admin.projects_read.cont": "project-id"},
164 )
165 mock_check_conflict_on_del.assert_not_called()
166
167
168 class Test_ProjectTopicAuth(TestCase):
169 @classmethod
170 def setUpClass(cls):
171 cls.test_name = "test-project-topic"
172
173 def setUp(self):
174 self.db = Mock(dbbase.DbBase())
175 self.fs = Mock(fsbase.FsBase())
176 self.msg = Mock(msgbase.MsgBase())
177 self.auth = Mock(authconn.Authconn(None, None, None))
178 self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth)
179 self.fake_session = {
180 "username": self.test_name,
181 "project_id": (test_pid,),
182 "method": None,
183 "admin": True,
184 "force": False,
185 "public": False,
186 "allow_show_user_project_role": True,
187 }
188 self.topic.check_quota = Mock(return_value=None) # skip quota
189
190 def test_new_project(self):
191 with self.subTest(i=1):
192 rollback = []
193 pid1 = str(uuid4())
194 self.auth.get_project_list.return_value = []
195 self.auth.create_project.return_value = pid1
196 pid2, oid = self.topic.new(
197 rollback, self.fake_session, {"name": self.test_name, "quotas": {}}
198 )
199 self.assertEqual(len(rollback), 1, "Wrong rollback length")
200 self.assertEqual(pid2, pid1, "Wrong project identifier")
201 content = self.auth.create_project.call_args[0][0]
202 self.assertEqual(content["name"], self.test_name, "Wrong project name")
203 self.assertEqual(content["quotas"], {}, "Wrong quotas")
204 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
205 self.assertEqual(
206 content["_admin"]["modified"],
207 content["_admin"]["created"],
208 "Wrong modification time",
209 )
210 with self.subTest(i=2):
211 rollback = []
212 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
213 self.topic.new(
214 rollback,
215 self.fake_session,
216 {"name": "other-project-name", "quotas": {"baditems": 10}},
217 )
218 self.assertEqual(len(rollback), 0, "Wrong rollback length")
219 self.assertEqual(
220 e.exception.http_code,
221 HTTPStatus.UNPROCESSABLE_ENTITY,
222 "Wrong HTTP status code",
223 )
224 self.assertIn(
225 "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format(
226 "baditems"
227 ),
228 norm(str(e.exception)),
229 "Wrong exception text",
230 )
231
232 def test_edit_project(self):
233 now = time()
234 pid = str(uuid4())
235 proj = {
236 "_id": pid,
237 "name": self.test_name,
238 "_admin": {"created": now, "modified": now},
239 }
240 with self.subTest(i=1):
241 self.auth.get_project_list.side_effect = [[proj], []]
242 new_name = "new-project-name"
243 quotas = {
244 "vnfds": random.SystemRandom().randint(0, 100),
245 "nsds": random.SystemRandom().randint(0, 100),
246 }
247 self.topic.edit(
248 self.fake_session, pid, {"name": new_name, "quotas": quotas}
249 )
250 _id, content = self.auth.update_project.call_args[0]
251 self.assertEqual(_id, pid, "Wrong project identifier")
252 self.assertEqual(content["_id"], pid, "Wrong project identifier")
253 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
254 self.assertGreater(
255 content["_admin"]["modified"], now, "Wrong modification time"
256 )
257 self.assertEqual(content["name"], new_name, "Wrong project name")
258 self.assertEqual(content["quotas"], quotas, "Wrong quotas")
259 with self.subTest(i=2):
260 new_name = "other-project-name"
261 quotas = {"baditems": random.SystemRandom().randint(0, 100)}
262 self.auth.get_project_list.side_effect = [[proj], []]
263 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
264 self.topic.edit(
265 self.fake_session, pid, {"name": new_name, "quotas": quotas}
266 )
267 self.assertEqual(
268 e.exception.http_code,
269 HTTPStatus.UNPROCESSABLE_ENTITY,
270 "Wrong HTTP status code",
271 )
272 self.assertIn(
273 "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format(
274 "baditems"
275 ),
276 norm(str(e.exception)),
277 "Wrong exception text",
278 )
279
280 def test_conflict_on_new(self):
281 with self.subTest(i=1):
282 rollback = []
283 pid = str(uuid4())
284 with self.assertRaises(
285 EngineException, msg="Accepted uuid as project name"
286 ) as e:
287 self.topic.new(rollback, self.fake_session, {"name": pid})
288 self.assertEqual(len(rollback), 0, "Wrong rollback length")
289 self.assertEqual(
290 e.exception.http_code,
291 HTTPStatus.UNPROCESSABLE_ENTITY,
292 "Wrong HTTP status code",
293 )
294 self.assertIn(
295 "project name '{}' cannot have an uuid format".format(pid),
296 norm(str(e.exception)),
297 "Wrong exception text",
298 )
299 with self.subTest(i=2):
300 rollback = []
301 self.auth.get_project_list.return_value = [
302 {"_id": test_pid, "name": self.test_name}
303 ]
304 with self.assertRaises(
305 EngineException, msg="Accepted existing project name"
306 ) as e:
307 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
308 self.assertEqual(len(rollback), 0, "Wrong rollback length")
309 self.assertEqual(
310 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
311 )
312 self.assertIn(
313 "project '{}' exists".format(self.test_name),
314 norm(str(e.exception)),
315 "Wrong exception text",
316 )
317
318 def test_conflict_on_edit(self):
319 with self.subTest(i=1):
320 self.auth.get_project_list.return_value = [
321 {"_id": test_pid, "name": self.test_name}
322 ]
323 new_name = str(uuid4())
324 with self.assertRaises(
325 EngineException, msg="Accepted uuid as project name"
326 ) as e:
327 self.topic.edit(self.fake_session, test_pid, {"name": new_name})
328 self.assertEqual(
329 e.exception.http_code,
330 HTTPStatus.UNPROCESSABLE_ENTITY,
331 "Wrong HTTP status code",
332 )
333 self.assertIn(
334 "project name '{}' cannot have an uuid format".format(new_name),
335 norm(str(e.exception)),
336 "Wrong exception text",
337 )
338 with self.subTest(i=2):
339 pid = str(uuid4())
340 self.auth.get_project_list.return_value = [{"_id": pid, "name": "admin"}]
341 with self.assertRaises(
342 EngineException, msg="Accepted renaming of project 'admin'"
343 ) as e:
344 self.topic.edit(self.fake_session, pid, {"name": "new-name"})
345 self.assertEqual(
346 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
347 )
348 self.assertIn(
349 "you cannot rename project 'admin'",
350 norm(str(e.exception)),
351 "Wrong exception text",
352 )
353 with self.subTest(i=3):
354 new_name = "new-project-name"
355 self.auth.get_project_list.side_effect = [
356 [{"_id": test_pid, "name": self.test_name}],
357 [{"_id": str(uuid4()), "name": new_name}],
358 ]
359 with self.assertRaises(
360 EngineException, msg="Accepted existing project name"
361 ) as e:
362 self.topic.edit(self.fake_session, pid, {"name": new_name})
363 self.assertEqual(
364 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
365 )
366 self.assertIn(
367 "project '{}' is already used".format(new_name),
368 norm(str(e.exception)),
369 "Wrong exception text",
370 )
371
372 def test_delete_project(self):
373 with self.subTest(i=1):
374 pid = str(uuid4())
375 self.auth.get_project.return_value = {
376 "_id": pid,
377 "name": "other-project-name",
378 }
379 self.auth.delete_project.return_value = {"deleted": 1}
380 self.auth.get_user_list.return_value = []
381 self.db.get_list.return_value = []
382 rc = self.topic.delete(self.fake_session, pid)
383 self.assertEqual(rc, {"deleted": 1}, "Wrong project deletion return info")
384 self.assertEqual(
385 self.auth.get_project.call_args[0][0], pid, "Wrong project identifier"
386 )
387 self.assertEqual(
388 self.auth.delete_project.call_args[0][0],
389 pid,
390 "Wrong project identifier",
391 )
392
393 def test_conflict_on_del(self):
394 with self.subTest(i=1):
395 self.auth.get_project.return_value = {
396 "_id": test_pid,
397 "name": self.test_name,
398 }
399 with self.assertRaises(
400 EngineException, msg="Accepted deletion of own project"
401 ) as e:
402 self.topic.delete(self.fake_session, self.test_name)
403 self.assertEqual(
404 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
405 )
406 self.assertIn(
407 "you cannot delete your own project",
408 norm(str(e.exception)),
409 "Wrong exception text",
410 )
411 with self.subTest(i=2):
412 self.auth.get_project.return_value = {"_id": str(uuid4()), "name": "admin"}
413 with self.assertRaises(
414 EngineException, msg="Accepted deletion of project 'admin'"
415 ) as e:
416 self.topic.delete(self.fake_session, "admin")
417 self.assertEqual(
418 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
419 )
420 self.assertIn(
421 "you cannot delete project 'admin'",
422 norm(str(e.exception)),
423 "Wrong exception text",
424 )
425 with self.subTest(i=3):
426 pid = str(uuid4())
427 name = "other-project-name"
428 self.auth.get_project.return_value = {"_id": pid, "name": name}
429 self.auth.get_user_list.return_value = [
430 {
431 "_id": str(uuid4()),
432 "username": self.test_name,
433 "project_role_mappings": [{"project": pid, "role": str(uuid4())}],
434 }
435 ]
436 with self.assertRaises(
437 EngineException, msg="Accepted deletion of used project"
438 ) as e:
439 self.topic.delete(self.fake_session, pid)
440 self.assertEqual(
441 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
442 )
443 self.assertIn(
444 "project '{}' ({}) is being used by user '{}'".format(
445 name, pid, self.test_name
446 ),
447 norm(str(e.exception)),
448 "Wrong exception text",
449 )
450 with self.subTest(i=4):
451 self.auth.get_user_list.return_value = []
452 self.db.get_list.return_value = [
453 {
454 "_id": str(uuid4()),
455 "id": self.test_name,
456 "_admin": {"projects_read": [pid], "projects_write": []},
457 }
458 ]
459 with self.assertRaises(
460 EngineException, msg="Accepted deletion of used project"
461 ) as e:
462 self.topic.delete(self.fake_session, pid)
463 self.assertEqual(
464 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
465 )
466 self.assertIn(
467 "project '{}' ({}) is being used by {} '{}'".format(
468 name, pid, "vnf descriptor", self.test_name
469 ),
470 norm(str(e.exception)),
471 "Wrong exception text",
472 )
473
474
475 class Test_RoleTopicAuth(TestCase):
476 @classmethod
477 def setUpClass(cls):
478 cls.test_name = "test-role-topic"
479 cls.test_operations = ["tokens:get"]
480
481 def setUp(self):
482 self.db = Mock(dbbase.DbBase())
483 self.fs = Mock(fsbase.FsBase())
484 self.msg = Mock(msgbase.MsgBase())
485 self.auth = Mock(authconn.Authconn(None, None, None))
486 self.auth.role_permissions = self.test_operations
487 self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth)
488 self.fake_session = {
489 "username": test_name,
490 "project_id": (test_pid,),
491 "method": None,
492 "admin": True,
493 "force": False,
494 "public": False,
495 "allow_show_user_project_role": True,
496 }
497 self.topic.check_quota = Mock(return_value=None) # skip quota
498
499 def test_new_role(self):
500 with self.subTest(i=1):
501 rollback = []
502 rid1 = str(uuid4())
503 perms_in = {"tokens": True}
504 perms_out = {"default": False, "admin": False, "tokens": True}
505 self.auth.get_role_list.return_value = []
506 self.auth.create_role.return_value = rid1
507 rid2, oid = self.topic.new(
508 rollback,
509 self.fake_session,
510 {"name": self.test_name, "permissions": perms_in},
511 )
512 self.assertEqual(len(rollback), 1, "Wrong rollback length")
513 self.assertEqual(rid2, rid1, "Wrong project identifier")
514 content = self.auth.create_role.call_args[0][0]
515 self.assertEqual(content["name"], self.test_name, "Wrong role name")
516 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
517 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
518 self.assertEqual(
519 content["_admin"]["modified"],
520 content["_admin"]["created"],
521 "Wrong modification time",
522 )
523 with self.subTest(i=2):
524 rollback = []
525 with self.assertRaises(
526 EngineException, msg="Accepted wrong permissions"
527 ) as e:
528 self.topic.new(
529 rollback,
530 self.fake_session,
531 {"name": "other-role-name", "permissions": {"projects": True}},
532 )
533 self.assertEqual(len(rollback), 0, "Wrong rollback length")
534 self.assertEqual(
535 e.exception.http_code,
536 HTTPStatus.UNPROCESSABLE_ENTITY,
537 "Wrong HTTP status code",
538 )
539 self.assertIn(
540 "invalid permission '{}'".format("projects"),
541 norm(str(e.exception)),
542 "Wrong exception text",
543 )
544
545 def test_edit_role(self):
546 now = time()
547 rid = str(uuid4())
548 role = {
549 "_id": rid,
550 "name": self.test_name,
551 "permissions": {"tokens": True},
552 "_admin": {"created": now, "modified": now},
553 }
554 with self.subTest(i=1):
555 self.auth.get_role_list.side_effect = [[role], []]
556 self.auth.get_role.return_value = role
557 new_name = "new-role-name"
558 perms_in = {"tokens": False, "tokens:get": True}
559 perms_out = {
560 "default": False,
561 "admin": False,
562 "tokens": False,
563 "tokens:get": True,
564 }
565 self.topic.edit(
566 self.fake_session, rid, {"name": new_name, "permissions": perms_in}
567 )
568 content = self.auth.update_role.call_args[0][0]
569 self.assertEqual(content["_id"], rid, "Wrong role identifier")
570 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
571 self.assertGreater(
572 content["_admin"]["modified"], now, "Wrong modification time"
573 )
574 self.assertEqual(content["name"], new_name, "Wrong role name")
575 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
576 with self.subTest(i=2):
577 new_name = "other-role-name"
578 perms_in = {"tokens": False, "tokens:post": True}
579 self.auth.get_role_list.side_effect = [[role], []]
580 with self.assertRaises(
581 EngineException, msg="Accepted wrong permissions"
582 ) as e:
583 self.topic.edit(
584 self.fake_session, rid, {"name": new_name, "permissions": perms_in}
585 )
586 self.assertEqual(
587 e.exception.http_code,
588 HTTPStatus.UNPROCESSABLE_ENTITY,
589 "Wrong HTTP status code",
590 )
591 self.assertIn(
592 "invalid permission '{}'".format("tokens:post"),
593 norm(str(e.exception)),
594 "Wrong exception text",
595 )
596
597 def test_delete_role(self):
598 with self.subTest(i=1):
599 rid = str(uuid4())
600 role = {"_id": rid, "name": "other-role-name"}
601 self.auth.get_role_list.return_value = [role]
602 self.auth.get_role.return_value = role
603 self.auth.delete_role.return_value = {"deleted": 1}
604 self.auth.get_user_list.return_value = []
605 rc = self.topic.delete(self.fake_session, rid)
606 self.assertEqual(rc, {"deleted": 1}, "Wrong role deletion return info")
607 self.assertEqual(
608 self.auth.get_role_list.call_args[0][0]["_id"],
609 rid,
610 "Wrong role identifier",
611 )
612 self.assertEqual(
613 self.auth.get_role.call_args[0][0], rid, "Wrong role identifier"
614 )
615 self.assertEqual(
616 self.auth.delete_role.call_args[0][0], rid, "Wrong role identifier"
617 )
618
619 def test_conflict_on_new(self):
620 with self.subTest(i=1):
621 rollback = []
622 rid = str(uuid4())
623 with self.assertRaises(
624 EngineException, msg="Accepted uuid as role name"
625 ) as e:
626 self.topic.new(rollback, self.fake_session, {"name": rid})
627 self.assertEqual(len(rollback), 0, "Wrong rollback length")
628 self.assertEqual(
629 e.exception.http_code,
630 HTTPStatus.UNPROCESSABLE_ENTITY,
631 "Wrong HTTP status code",
632 )
633 self.assertIn(
634 "role name '{}' cannot have an uuid format".format(rid),
635 norm(str(e.exception)),
636 "Wrong exception text",
637 )
638 with self.subTest(i=2):
639 rollback = []
640 self.auth.get_role_list.return_value = [
641 {"_id": str(uuid4()), "name": self.test_name}
642 ]
643 with self.assertRaises(
644 EngineException, msg="Accepted existing role name"
645 ) as e:
646 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
647 self.assertEqual(len(rollback), 0, "Wrong rollback length")
648 self.assertEqual(
649 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
650 )
651 self.assertIn(
652 "role name '{}' exists".format(self.test_name),
653 norm(str(e.exception)),
654 "Wrong exception text",
655 )
656
657 def test_conflict_on_edit(self):
658 rid = str(uuid4())
659 with self.subTest(i=1):
660 self.auth.get_role_list.return_value = [
661 {"_id": rid, "name": self.test_name, "permissions": {}}
662 ]
663 new_name = str(uuid4())
664 with self.assertRaises(
665 EngineException, msg="Accepted uuid as role name"
666 ) as e:
667 self.topic.edit(self.fake_session, rid, {"name": new_name})
668 self.assertEqual(
669 e.exception.http_code,
670 HTTPStatus.UNPROCESSABLE_ENTITY,
671 "Wrong HTTP status code",
672 )
673 self.assertIn(
674 "role name '{}' cannot have an uuid format".format(new_name),
675 norm(str(e.exception)),
676 "Wrong exception text",
677 )
678 for i, role_name in enumerate(["system_admin", "project_admin"], start=2):
679 with self.subTest(i=i):
680 rid = str(uuid4())
681 self.auth.get_role.return_value = {
682 "_id": rid,
683 "name": role_name,
684 "permissions": {},
685 }
686 with self.assertRaises(
687 EngineException,
688 msg="Accepted renaming of role '{}'".format(role_name),
689 ) as e:
690 self.topic.edit(self.fake_session, rid, {"name": "new-name"})
691 self.assertEqual(
692 e.exception.http_code,
693 HTTPStatus.FORBIDDEN,
694 "Wrong HTTP status code",
695 )
696 self.assertIn(
697 "you cannot rename role '{}'".format(role_name),
698 norm(str(e.exception)),
699 "Wrong exception text",
700 )
701 with self.subTest(i=i + 1):
702 new_name = "new-role-name"
703 self.auth.get_role_list.side_effect = [
704 [{"_id": rid, "name": self.test_name, "permissions": {}}],
705 [{"_id": str(uuid4()), "name": new_name, "permissions": {}}],
706 ]
707 self.auth.get_role.return_value = {
708 "_id": rid,
709 "name": self.test_name,
710 "permissions": {},
711 }
712 with self.assertRaises(
713 EngineException, msg="Accepted existing role name"
714 ) as e:
715 self.topic.edit(self.fake_session, rid, {"name": new_name})
716 self.assertEqual(
717 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
718 )
719 self.assertIn(
720 "role name '{}' exists".format(new_name),
721 norm(str(e.exception)),
722 "Wrong exception text",
723 )
724
725 def test_conflict_on_del(self):
726 for i, role_name in enumerate(["system_admin", "project_admin"], start=1):
727 with self.subTest(i=i):
728 rid = str(uuid4())
729 role = {"_id": rid, "name": role_name}
730 self.auth.get_role_list.return_value = [role]
731 self.auth.get_role.return_value = role
732 with self.assertRaises(
733 EngineException,
734 msg="Accepted deletion of role '{}'".format(role_name),
735 ) as e:
736 self.topic.delete(self.fake_session, rid)
737 self.assertEqual(
738 e.exception.http_code,
739 HTTPStatus.FORBIDDEN,
740 "Wrong HTTP status code",
741 )
742 self.assertIn(
743 "you cannot delete role '{}'".format(role_name),
744 norm(str(e.exception)),
745 "Wrong exception text",
746 )
747 with self.subTest(i=i + 1):
748 rid = str(uuid4())
749 name = "other-role-name"
750 role = {"_id": rid, "name": name}
751 self.auth.get_role_list.return_value = [role]
752 self.auth.get_role.return_value = role
753 self.auth.get_user_list.return_value = [
754 {
755 "_id": str(uuid4()),
756 "username": self.test_name,
757 "project_role_mappings": [{"project": str(uuid4()), "role": rid}],
758 }
759 ]
760 with self.assertRaises(
761 EngineException, msg="Accepted deletion of used role"
762 ) as e:
763 self.topic.delete(self.fake_session, rid)
764 self.assertEqual(
765 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
766 )
767 self.assertIn(
768 "role '{}' ({}) is being used by user '{}'".format(
769 name, rid, self.test_name
770 ),
771 norm(str(e.exception)),
772 "Wrong exception text",
773 )
774
775
776 class Test_UserTopicAuth(TestCase):
777 @classmethod
778 def setUpClass(cls):
779 cls.test_name = "test-user-topic"
780
781 def setUp(self):
782 self.db = Mock(dbbase.DbBase())
783 self.fs = Mock(fsbase.FsBase())
784 self.msg = Mock(msgbase.MsgBase())
785 self.auth = Mock(authconn.Authconn(None, None, None))
786 self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth)
787 self.fake_session = {
788 "username": test_name,
789 "project_id": (test_pid,),
790 "method": None,
791 "admin": True,
792 "force": False,
793 "public": False,
794 "allow_show_user_project_role": True,
795 }
796 self.topic.check_quota = Mock(return_value=None) # skip quota
797
798 def test_new_user(self):
799 uid1 = str(uuid4())
800 pid = str(uuid4())
801 self.auth.get_user_list.return_value = []
802 self.auth.get_project.return_value = {"_id": pid, "name": "some_project"}
803 self.auth.create_user.return_value = {"_id": uid1, "username": self.test_name}
804 with self.subTest(i=1):
805 rollback = []
806 rid = str(uuid4())
807 self.auth.get_role.return_value = {"_id": rid, "name": "some_role"}
808 prms_in = [{"project": "some_project", "role": "some_role"}]
809 prms_out = [{"project": pid, "role": rid}]
810 uid2, oid = self.topic.new(
811 rollback,
812 self.fake_session,
813 {
814 "username": self.test_name,
815 "password": self.test_name,
816 "project_role_mappings": prms_in,
817 },
818 )
819 self.assertEqual(len(rollback), 1, "Wrong rollback length")
820 self.assertEqual(uid2, uid1, "Wrong project identifier")
821 content = self.auth.create_user.call_args[0][0]
822 self.assertEqual(content["username"], self.test_name, "Wrong project name")
823 self.assertEqual(content["password"], self.test_name, "Wrong password")
824 self.assertEqual(
825 content["project_role_mappings"],
826 prms_out,
827 "Wrong project-role mappings",
828 )
829 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
830 self.assertEqual(
831 content["_admin"]["modified"],
832 content["_admin"]["created"],
833 "Wrong modification time",
834 )
835 with self.subTest(i=2):
836 rollback = []
837 def_rid = str(uuid4())
838 def_role = {"_id": def_rid, "name": "project_admin"}
839 self.auth.get_role.return_value = def_role
840 self.auth.get_role_list.return_value = [def_role]
841 prms_out = [{"project": pid, "role": def_rid}]
842 uid2, oid = self.topic.new(
843 rollback,
844 self.fake_session,
845 {
846 "username": self.test_name,
847 "password": self.test_name,
848 "projects": ["some_project"],
849 },
850 )
851 self.assertEqual(len(rollback), 1, "Wrong rollback length")
852 self.assertEqual(uid2, uid1, "Wrong project identifier")
853 content = self.auth.create_user.call_args[0][0]
854 self.assertEqual(content["username"], self.test_name, "Wrong project name")
855 self.assertEqual(content["password"], self.test_name, "Wrong password")
856 self.assertEqual(
857 content["project_role_mappings"],
858 prms_out,
859 "Wrong project-role mappings",
860 )
861 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
862 self.assertEqual(
863 content["_admin"]["modified"],
864 content["_admin"]["created"],
865 "Wrong modification time",
866 )
867 with self.subTest(i=3):
868 rollback = []
869 with self.assertRaises(
870 EngineException, msg="Accepted wrong project-role mappings"
871 ) as e:
872 self.topic.new(
873 rollback,
874 self.fake_session,
875 {
876 "username": "other-project-name",
877 "password": "other-password",
878 "project_role_mappings": [{}],
879 },
880 )
881 self.assertEqual(len(rollback), 0, "Wrong rollback length")
882 self.assertEqual(
883 e.exception.http_code,
884 HTTPStatus.UNPROCESSABLE_ENTITY,
885 "Wrong HTTP status code",
886 )
887 self.assertIn(
888 "format error at '{}' '{}'".format(
889 "project_role_mappings:{}", "'{}' is a required property"
890 ).format(0, "project"),
891 norm(str(e.exception)),
892 "Wrong exception text",
893 )
894 with self.subTest(i=4):
895 rollback = []
896 with self.assertRaises(EngineException, msg="Accepted wrong projects") as e:
897 self.topic.new(
898 rollback,
899 self.fake_session,
900 {
901 "username": "other-project-name",
902 "password": "other-password",
903 "projects": [],
904 },
905 )
906 self.assertEqual(len(rollback), 0, "Wrong rollback length")
907 self.assertEqual(
908 e.exception.http_code,
909 HTTPStatus.UNPROCESSABLE_ENTITY,
910 "Wrong HTTP status code",
911 )
912 self.assertIn(
913 "format error at '{}' '{}'".format(
914 "projects", "{} is too short"
915 ).format([]),
916 norm(str(e.exception)),
917 "Wrong exception text",
918 )
919
920 def test_edit_user(self):
921 now = time()
922 uid = str(uuid4())
923 pid1 = str(uuid4())
924 rid1 = str(uuid4())
925 prms = [
926 {
927 "project": pid1,
928 "project_name": "project-1",
929 "role": rid1,
930 "role_name": "role-1",
931 }
932 ]
933 user = {
934 "_id": uid,
935 "username": self.test_name,
936 "project_role_mappings": prms,
937 "_admin": {"created": now, "modified": now},
938 }
939 with self.subTest(i=1):
940 self.auth.get_user_list.side_effect = [[user], []]
941 self.auth.get_user.return_value = user
942 pid2 = str(uuid4())
943 rid2 = str(uuid4())
944 self.auth.get_project.side_effect = [
945 {"_id": pid2, "name": "project-2"},
946 {"_id": pid1, "name": "project-1"},
947 ]
948 self.auth.get_role.side_effect = [
949 {"_id": rid2, "name": "role-2"},
950 {"_id": rid1, "name": "role-1"},
951 ]
952 new_name = "new-user-name"
953 new_pasw = "new-password"
954 add_prms = [{"project": pid2, "role": rid2}]
955 rem_prms = [{"project": pid1, "role": rid1}]
956 self.topic.edit(
957 self.fake_session,
958 uid,
959 {
960 "username": new_name,
961 "password": new_pasw,
962 "add_project_role_mappings": add_prms,
963 "remove_project_role_mappings": rem_prms,
964 },
965 )
966 content = self.auth.update_user.call_args[0][0]
967 self.assertEqual(content["_id"], uid, "Wrong user identifier")
968 self.assertEqual(content["username"], new_name, "Wrong user name")
969 self.assertEqual(content["password"], new_pasw, "Wrong user password")
970 self.assertEqual(
971 content["add_project_role_mappings"],
972 add_prms,
973 "Wrong project-role mappings to add",
974 )
975 self.assertEqual(
976 content["remove_project_role_mappings"],
977 prms,
978 "Wrong project-role mappings to remove",
979 )
980 with self.subTest(i=2):
981 new_name = "other-user-name"
982 new_prms = [{}]
983 self.auth.get_role_list.side_effect = [[user], []]
984 self.auth.get_user_list.side_effect = [[user]]
985 with self.assertRaises(
986 EngineException, msg="Accepted wrong project-role mappings"
987 ) as e:
988 self.topic.edit(
989 self.fake_session,
990 uid,
991 {"username": new_name, "project_role_mappings": new_prms},
992 )
993 self.assertEqual(
994 e.exception.http_code,
995 HTTPStatus.UNPROCESSABLE_ENTITY,
996 "Wrong HTTP status code",
997 )
998 self.assertIn(
999 "format error at '{}' '{}'".format(
1000 "project_role_mappings:{}", "'{}' is a required property"
1001 ).format(0, "project"),
1002 norm(str(e.exception)),
1003 "Wrong exception text",
1004 )
1005 with self.subTest(i=3):
1006 self.auth.get_user_list.side_effect = [[user], []]
1007 self.auth.get_user.return_value = user
1008 old_password = self.test_name
1009 new_pasw = "new-password"
1010 self.topic.edit(
1011 self.fake_session,
1012 uid,
1013 {
1014 "old_password": old_password,
1015 "password": new_pasw,
1016 },
1017 )
1018 content = self.auth.update_user.call_args[0][0]
1019 self.assertEqual(
1020 content["old_password"], old_password, "Wrong old password"
1021 )
1022 self.assertEqual(content["password"], new_pasw, "Wrong user password")
1023
1024 def test_delete_user(self):
1025 with self.subTest(i=1):
1026 uid = str(uuid4())
1027 self.fake_session["username"] = self.test_name
1028 user = user = {
1029 "_id": uid,
1030 "username": "other-user-name",
1031 "project_role_mappings": [],
1032 }
1033 self.auth.get_user.return_value = user
1034 self.auth.delete_user.return_value = {"deleted": 1}
1035 rc = self.topic.delete(self.fake_session, uid)
1036 self.assertEqual(rc, {"deleted": 1}, "Wrong user deletion return info")
1037 self.assertEqual(
1038 self.auth.get_user.call_args[0][0], uid, "Wrong user identifier"
1039 )
1040 self.assertEqual(
1041 self.auth.delete_user.call_args[0][0], uid, "Wrong user identifier"
1042 )
1043
1044 def test_conflict_on_new(self):
1045 with self.subTest(i=1):
1046 rollback = []
1047 uid = str(uuid4())
1048 with self.assertRaises(
1049 EngineException, msg="Accepted uuid as username"
1050 ) as e:
1051 self.topic.new(
1052 rollback,
1053 self.fake_session,
1054 {
1055 "username": uid,
1056 "password": self.test_name,
1057 "projects": [test_pid],
1058 },
1059 )
1060 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1061 self.assertEqual(
1062 e.exception.http_code,
1063 HTTPStatus.UNPROCESSABLE_ENTITY,
1064 "Wrong HTTP status code",
1065 )
1066 self.assertIn(
1067 "username '{}' cannot have a uuid format".format(uid),
1068 norm(str(e.exception)),
1069 "Wrong exception text",
1070 )
1071 with self.subTest(i=2):
1072 rollback = []
1073 self.auth.get_user_list.return_value = [
1074 {"_id": str(uuid4()), "username": self.test_name}
1075 ]
1076 with self.assertRaises(
1077 EngineException, msg="Accepted existing username"
1078 ) as e:
1079 self.topic.new(
1080 rollback,
1081 self.fake_session,
1082 {
1083 "username": self.test_name,
1084 "password": self.test_name,
1085 "projects": [test_pid],
1086 },
1087 )
1088 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1089 self.assertEqual(
1090 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1091 )
1092 self.assertIn(
1093 "username '{}' is already used".format(self.test_name),
1094 norm(str(e.exception)),
1095 "Wrong exception text",
1096 )
1097 with self.subTest(i=3):
1098 rollback = []
1099 self.auth.get_user_list.return_value = []
1100 self.auth.get_role_list.side_effect = [[], []]
1101 with self.assertRaises(
1102 AuthconnNotFoundException, msg="Accepted user without default role"
1103 ) as e:
1104 self.topic.new(
1105 rollback,
1106 self.fake_session,
1107 {
1108 "username": self.test_name,
1109 "password": self.test_name,
1110 "projects": [str(uuid4())],
1111 },
1112 )
1113 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1114 self.assertEqual(
1115 e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code"
1116 )
1117 self.assertIn(
1118 "can't find default role for user '{}'".format(self.test_name),
1119 norm(str(e.exception)),
1120 "Wrong exception text",
1121 )
1122
1123 def test_conflict_on_edit(self):
1124 uid = str(uuid4())
1125 with self.subTest(i=1):
1126 self.auth.get_user_list.return_value = [
1127 {"_id": uid, "username": self.test_name}
1128 ]
1129 new_name = str(uuid4())
1130 with self.assertRaises(
1131 EngineException, msg="Accepted uuid as username"
1132 ) as e:
1133 self.topic.edit(self.fake_session, uid, {"username": new_name})
1134 self.assertEqual(
1135 e.exception.http_code,
1136 HTTPStatus.UNPROCESSABLE_ENTITY,
1137 "Wrong HTTP status code",
1138 )
1139 self.assertIn(
1140 "username '{}' cannot have an uuid format".format(new_name),
1141 norm(str(e.exception)),
1142 "Wrong exception text",
1143 )
1144 with self.subTest(i=2):
1145 self.auth.get_user_list.return_value = [
1146 {"_id": uid, "username": self.test_name}
1147 ]
1148 self.auth.get_role_list.side_effect = [[], []]
1149 with self.assertRaises(
1150 AuthconnNotFoundException, msg="Accepted user without default role"
1151 ) as e:
1152 self.topic.edit(self.fake_session, uid, {"projects": [str(uuid4())]})
1153 self.assertEqual(
1154 e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code"
1155 )
1156 self.assertIn(
1157 "can't find a default role for user '{}'".format(self.test_name),
1158 norm(str(e.exception)),
1159 "Wrong exception text",
1160 )
1161 with self.subTest(i=3):
1162 admin_uid = str(uuid4())
1163 self.auth.get_user_list.return_value = [
1164 {"_id": admin_uid, "username": "admin"}
1165 ]
1166 with self.assertRaises(
1167 EngineException,
1168 msg="Accepted removing system_admin role from admin user",
1169 ) as e:
1170 self.topic.edit(
1171 self.fake_session,
1172 admin_uid,
1173 {
1174 "remove_project_role_mappings": [
1175 {"project": "admin", "role": "system_admin"}
1176 ]
1177 },
1178 )
1179 self.assertEqual(
1180 e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code"
1181 )
1182 self.assertIn(
1183 "you cannot remove system_admin role from admin user",
1184 norm(str(e.exception)),
1185 "Wrong exception text",
1186 )
1187 with self.subTest(i=4):
1188 new_name = "new-user-name"
1189 self.auth.get_user_list.side_effect = [
1190 [{"_id": uid, "name": self.test_name}],
1191 [{"_id": str(uuid4()), "name": new_name}],
1192 ]
1193 with self.assertRaises(
1194 EngineException, msg="Accepted existing username"
1195 ) as e:
1196 self.topic.edit(self.fake_session, uid, {"username": new_name})
1197 self.assertEqual(
1198 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1199 )
1200 self.assertIn(
1201 "username '{}' is already used".format(new_name),
1202 norm(str(e.exception)),
1203 "Wrong exception text",
1204 )
1205
1206 def test_conflict_on_del(self):
1207 with self.subTest(i=1):
1208 uid = str(uuid4())
1209 self.fake_session["username"] = self.test_name
1210 user = user = {
1211 "_id": uid,
1212 "username": self.test_name,
1213 "project_role_mappings": [],
1214 }
1215 self.auth.get_user.return_value = user
1216 with self.assertRaises(
1217 EngineException, msg="Accepted deletion of own user"
1218 ) as e:
1219 self.topic.delete(self.fake_session, uid)
1220 self.assertEqual(
1221 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1222 )
1223 self.assertIn(
1224 "you cannot delete your own login user",
1225 norm(str(e.exception)),
1226 "Wrong exception text",
1227 )
1228
1229
1230 class Test_CommonVimWimSdn(TestCase):
1231 @classmethod
1232 def setUpClass(cls):
1233 cls.test_name = "test-cim-topic" # CIM = Common Infrastructure Manager
1234
1235 def setUp(self):
1236 self.db = Mock(dbbase.DbBase())
1237 self.fs = Mock(fsbase.FsBase())
1238 self.msg = Mock(msgbase.MsgBase())
1239 self.auth = Mock(authconn.Authconn(None, None, None))
1240 self.topic = CommonVimWimSdn(self.db, self.fs, self.msg, self.auth)
1241 # Use WIM schemas for testing because they are the simplest
1242 self.topic._send_msg = Mock()
1243 self.topic.topic = "wims"
1244 self.topic.schema_new = validation.wim_account_new_schema
1245 self.topic.schema_edit = validation.wim_account_edit_schema
1246 self.fake_session = {
1247 "username": test_name,
1248 "project_id": (test_pid,),
1249 "method": None,
1250 "admin": True,
1251 "force": False,
1252 "public": False,
1253 "allow_show_user_project_role": True,
1254 }
1255 self.topic.check_quota = Mock(return_value=None) # skip quota
1256
1257 def test_new_cvws(self):
1258 test_url = "http://0.0.0.0:0"
1259 with self.subTest(i=1):
1260 rollback = []
1261 test_type = "fake"
1262 self.db.get_one.return_value = None
1263 self.db.create.side_effect = lambda self, content: content["_id"]
1264 cid, oid = self.topic.new(
1265 rollback,
1266 self.fake_session,
1267 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type},
1268 )
1269 self.assertEqual(len(rollback), 1, "Wrong rollback length")
1270 args = self.db.create.call_args[0]
1271 content = args[1]
1272 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
1273 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
1274 self.assertEqual(content["name"], self.test_name, "Wrong CIM name")
1275 self.assertEqual(content["wim_url"], test_url, "Wrong URL")
1276 self.assertEqual(content["wim_type"], test_type, "Wrong CIM type")
1277 self.assertEqual(content["schema_version"], "1.11", "Wrong schema version")
1278 self.assertEqual(content["op_id"], oid, "Wrong operation identifier")
1279 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
1280 self.assertEqual(
1281 content["_admin"]["modified"],
1282 content["_admin"]["created"],
1283 "Wrong modification time",
1284 )
1285 self.assertEqual(
1286 content["_admin"]["operationalState"],
1287 "PROCESSING",
1288 "Wrong operational state",
1289 )
1290 self.assertEqual(
1291 content["_admin"]["projects_read"],
1292 [test_pid],
1293 "Wrong read-only projects",
1294 )
1295 self.assertEqual(
1296 content["_admin"]["projects_write"],
1297 [test_pid],
1298 "Wrong read/write projects",
1299 )
1300 self.assertIsNone(
1301 content["_admin"]["current_operation"], "Wrong current operation"
1302 )
1303 self.assertEqual(
1304 len(content["_admin"]["operations"]), 1, "Wrong number of operations"
1305 )
1306 operation = content["_admin"]["operations"][0]
1307 self.assertEqual(
1308 operation["lcmOperationType"], "create", "Wrong operation type"
1309 )
1310 self.assertEqual(
1311 operation["operationState"], "PROCESSING", "Wrong operation state"
1312 )
1313 self.assertGreater(
1314 operation["startTime"],
1315 content["_admin"]["created"],
1316 "Wrong operation start time",
1317 )
1318 self.assertGreater(
1319 operation["statusEnteredTime"],
1320 content["_admin"]["created"],
1321 "Wrong operation status enter time",
1322 )
1323 self.assertEqual(
1324 operation["detailed-status"], "", "Wrong operation detailed status info"
1325 )
1326 self.assertIsNone(
1327 operation["operationParams"], "Wrong operation parameters"
1328 )
1329 # This test is disabled. From Feature 8030 we admit all WIM/SDN types
1330 # with self.subTest(i=2):
1331 # rollback = []
1332 # test_type = "bad_type"
1333 # with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e:
1334 # self.topic.new(rollback, self.fake_session,
1335 # {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
1336 # self.assertEqual(len(rollback), 0, "Wrong rollback length")
1337 # self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
1338 # self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type,""),
1339 # norm(str(e.exception)), "Wrong exception text")
1340
1341 def test_conflict_on_new(self):
1342 with self.subTest(i=1):
1343 rollback = []
1344 test_url = "http://0.0.0.0:0"
1345 test_type = "fake"
1346 self.db.get_one.return_value = {"_id": str(uuid4()), "name": self.test_name}
1347 with self.assertRaises(
1348 EngineException, msg="Accepted existing CIM name"
1349 ) as e:
1350 self.topic.new(
1351 rollback,
1352 self.fake_session,
1353 {
1354 "name": self.test_name,
1355 "wim_url": test_url,
1356 "wim_type": test_type,
1357 },
1358 )
1359 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1360 self.assertEqual(
1361 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1362 )
1363 self.assertIn(
1364 "name '{}' already exists for {}".format(
1365 self.test_name, self.topic.topic
1366 ),
1367 norm(str(e.exception)),
1368 "Wrong exception text",
1369 )
1370
1371 def test_edit_cvws(self):
1372 now = time()
1373 cid = str(uuid4())
1374 test_url = "http://0.0.0.0:0"
1375 test_type = "fake"
1376 cvws = {
1377 "_id": cid,
1378 "name": self.test_name,
1379 "wim_url": test_url,
1380 "wim_type": test_type,
1381 "_admin": {
1382 "created": now,
1383 "modified": now,
1384 "operations": [{"lcmOperationType": "create"}],
1385 },
1386 }
1387 with self.subTest(i=1):
1388 new_name = "new-cim-name"
1389 new_url = "https://1.1.1.1:1"
1390 new_type = "onos"
1391 self.db.get_one.side_effect = [cvws, None]
1392 self.db.replace.return_value = {"updated": 1}
1393 # self.db.encrypt.side_effect = [b64str(), b64str()]
1394 self.topic.edit(
1395 self.fake_session,
1396 cid,
1397 {"name": new_name, "wim_url": new_url, "wim_type": new_type},
1398 )
1399 args = self.db.replace.call_args[0]
1400 content = args[2]
1401 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
1402 self.assertEqual(args[1], cid, "Wrong CIM identifier")
1403 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
1404 self.assertEqual(content["name"], new_name, "Wrong CIM name")
1405 self.assertEqual(content["wim_type"], new_type, "Wrong CIM type")
1406 self.assertEqual(content["wim_url"], new_url, "Wrong URL")
1407 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
1408 self.assertGreater(
1409 content["_admin"]["modified"],
1410 content["_admin"]["created"],
1411 "Wrong modification time",
1412 )
1413 self.assertEqual(
1414 len(content["_admin"]["operations"]), 2, "Wrong number of operations"
1415 )
1416 operation = content["_admin"]["operations"][1]
1417 self.assertEqual(
1418 operation["lcmOperationType"], "edit", "Wrong operation type"
1419 )
1420 self.assertEqual(
1421 operation["operationState"], "PROCESSING", "Wrong operation state"
1422 )
1423 self.assertGreater(
1424 operation["startTime"],
1425 content["_admin"]["modified"],
1426 "Wrong operation start time",
1427 )
1428 self.assertGreater(
1429 operation["statusEnteredTime"],
1430 content["_admin"]["modified"],
1431 "Wrong operation status enter time",
1432 )
1433 self.assertEqual(
1434 operation["detailed-status"], "", "Wrong operation detailed status info"
1435 )
1436 self.assertIsNone(
1437 operation["operationParams"], "Wrong operation parameters"
1438 )
1439 with self.subTest(i=2):
1440 self.db.get_one.side_effect = [cvws]
1441 with self.assertRaises(EngineException, msg="Accepted wrong property") as e:
1442 self.topic.edit(
1443 self.fake_session,
1444 str(uuid4()),
1445 {"name": "new-name", "extra_prop": "anything"},
1446 )
1447 self.assertEqual(
1448 e.exception.http_code,
1449 HTTPStatus.UNPROCESSABLE_ENTITY,
1450 "Wrong HTTP status code",
1451 )
1452 self.assertIn(
1453 "format error '{}'".format(
1454 "additional properties are not allowed ('{}' was unexpected)"
1455 ).format("extra_prop"),
1456 norm(str(e.exception)),
1457 "Wrong exception text",
1458 )
1459
1460 def test_conflict_on_edit(self):
1461 with self.subTest(i=1):
1462 cid = str(uuid4())
1463 new_name = "new-cim-name"
1464 self.db.get_one.side_effect = [
1465 {"_id": cid, "name": self.test_name},
1466 {"_id": str(uuid4()), "name": new_name},
1467 ]
1468 with self.assertRaises(
1469 EngineException, msg="Accepted existing CIM name"
1470 ) as e:
1471 self.topic.edit(self.fake_session, cid, {"name": new_name})
1472 self.assertEqual(
1473 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1474 )
1475 self.assertIn(
1476 "name '{}' already exists for {}".format(new_name, self.topic.topic),
1477 norm(str(e.exception)),
1478 "Wrong exception text",
1479 )
1480
1481 def test_delete_cvws(self):
1482 cid = str(uuid4())
1483 ro_pid = str(uuid4())
1484 rw_pid = str(uuid4())
1485 cvws = {"_id": cid, "name": self.test_name}
1486 self.db.get_list.return_value = []
1487 with self.subTest(i=1):
1488 cvws["_admin"] = {
1489 "projects_read": [test_pid, ro_pid, rw_pid],
1490 "projects_write": [test_pid, rw_pid],
1491 }
1492 self.db.get_one.return_value = cvws
1493 oid = self.topic.delete(self.fake_session, cid)
1494 self.assertIsNone(oid, "Wrong operation identifier")
1495 self.assertEqual(
1496 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1497 )
1498 self.assertEqual(
1499 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1500 )
1501 self.assertEqual(
1502 self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic"
1503 )
1504 self.assertEqual(
1505 self.db.set_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1506 )
1507 self.assertEqual(
1508 self.db.set_one.call_args[1]["update_dict"],
1509 None,
1510 "Wrong read-only projects update",
1511 )
1512 self.assertEqual(
1513 self.db.set_one.call_args[1]["pull_list"],
1514 {
1515 "_admin.projects_read": (test_pid,),
1516 "_admin.projects_write": (test_pid,),
1517 },
1518 "Wrong read/write projects update",
1519 )
1520 self.topic._send_msg.assert_not_called()
1521 with self.subTest(i=2):
1522 now = time()
1523 cvws["_admin"] = {
1524 "projects_read": [test_pid],
1525 "projects_write": [test_pid],
1526 "operations": [],
1527 }
1528 self.db.get_one.return_value = cvws
1529 oid = self.topic.delete(self.fake_session, cid)
1530 self.assertEqual(oid, cid + ":0", "Wrong operation identifier")
1531 self.assertEqual(
1532 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1533 )
1534 self.assertEqual(
1535 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1536 )
1537 self.assertEqual(
1538 self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic"
1539 )
1540 self.assertEqual(
1541 self.db.set_one.call_args[0][1]["_id"], cid, "Wrong user identifier"
1542 )
1543 self.assertEqual(
1544 self.db.set_one.call_args[1]["update_dict"],
1545 {"_admin.to_delete": True},
1546 "Wrong _admin.to_delete update",
1547 )
1548 operation = self.db.set_one.call_args[1]["push"]["_admin.operations"]
1549 self.assertEqual(
1550 operation["lcmOperationType"], "delete", "Wrong operation type"
1551 )
1552 self.assertEqual(
1553 operation["operationState"], "PROCESSING", "Wrong operation state"
1554 )
1555 self.assertEqual(
1556 operation["detailed-status"], "", "Wrong operation detailed status"
1557 )
1558 self.assertIsNone(
1559 operation["operationParams"], "Wrong operation parameters"
1560 )
1561 self.assertGreater(
1562 operation["startTime"], now, "Wrong operation start time"
1563 )
1564 self.assertGreater(
1565 operation["statusEnteredTime"], now, "Wrong operation status enter time"
1566 )
1567 self.topic._send_msg.assert_called_once_with(
1568 "delete", {"_id": cid, "op_id": cid + ":0"}, not_send_msg=None
1569 )
1570 with self.subTest(i=3):
1571 cvws["_admin"] = {
1572 "projects_read": [],
1573 "projects_write": [],
1574 "operations": [],
1575 }
1576 self.db.get_one.return_value = cvws
1577 self.topic._send_msg.reset_mock()
1578 self.db.get_one.reset_mock()
1579 self.db.del_one.reset_mock()
1580 self.fake_session["force"] = True # to force deletion
1581 self.fake_session["admin"] = True # to force deletion
1582 self.fake_session["project_id"] = [] # to force deletion
1583 oid = self.topic.delete(self.fake_session, cid)
1584 self.assertIsNone(oid, "Wrong operation identifier")
1585 self.assertEqual(
1586 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1587 )
1588 self.assertEqual(
1589 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1590 )
1591 self.assertEqual(
1592 self.db.del_one.call_args[0][0], self.topic.topic, "Wrong topic"
1593 )
1594 self.assertEqual(
1595 self.db.del_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1596 )
1597 self.topic._send_msg.assert_called_once_with(
1598 "deleted", {"_id": cid, "op_id": None}, not_send_msg=None
1599 )
1600
1601
1602 if __name__ == "__main__":
1603 unittest.main()