Update from master
[osm/NBI.git] / osm_nbi / tests / test_admin_topics.py
1 #! /usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 __author__ = "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com"
18 __date__ = "$2019-10-019"
19
20 import unittest
21 import random
22 from unittest import TestCase
23 from unittest.mock import Mock, patch, call, ANY
24 from uuid import uuid4
25 from http import HTTPStatus
26 from time import time
27 from osm_common import dbbase, fsbase, msgbase
28 from osm_nbi import authconn, validation
29 from osm_nbi.admin_topics import (
30 ProjectTopicAuth,
31 RoleTopicAuth,
32 UserTopicAuth,
33 CommonVimWimSdn,
34 VcaTopic,
35 VimAccountTopic,
36 )
37 from osm_nbi.engine import EngineException
38 from osm_nbi.authconn import AuthconnNotFoundException
39
40
41 test_pid = str(uuid4())
42 test_name = "test-user"
43
44
45 def norm(str):
46 """Normalize string for checking"""
47 return " ".join(str.strip().split()).lower()
48
49
50 class TestVcaTopic(TestCase):
51 def setUp(self):
52 self.db = Mock(dbbase.DbBase())
53 self.fs = Mock(fsbase.FsBase())
54 self.msg = Mock(msgbase.MsgBase())
55 self.auth = Mock(authconn.Authconn(None, None, None))
56 self.vca_topic = VcaTopic(self.db, self.fs, self.msg, self.auth)
57
58 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_new")
59 def test_format_on_new(self, mock_super_format_on_new):
60 content = {
61 "_id": "id",
62 "secret": "encrypted_secret",
63 "cacert": "encrypted_cacert",
64 }
65 self.db.encrypt.side_effect = ["secret", "cacert"]
66 mock_super_format_on_new.return_value = "1234"
67
68 oid = self.vca_topic.format_on_new(content)
69
70 self.assertEqual(oid, "1234")
71 self.assertEqual(content["secret"], "secret")
72 self.assertEqual(content["cacert"], "cacert")
73 self.db.encrypt.assert_has_calls(
74 [
75 call("encrypted_secret", schema_version="1.11", salt="id"),
76 call("encrypted_cacert", schema_version="1.11", salt="id"),
77 ]
78 )
79 mock_super_format_on_new.assert_called_with(content, None, False)
80
81 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_edit")
82 def test_format_on_edit(self, mock_super_format_on_edit):
83 edit_content = {
84 "_id": "id",
85 "secret": "encrypted_secret",
86 "cacert": "encrypted_cacert",
87 }
88 final_content = {
89 "_id": "id",
90 "schema_version": "1.11",
91 }
92 self.db.encrypt.side_effect = ["secret", "cacert"]
93 mock_super_format_on_edit.return_value = "1234"
94
95 oid = self.vca_topic.format_on_edit(final_content, edit_content)
96
97 self.assertEqual(oid, "1234")
98 self.assertEqual(final_content["secret"], "secret")
99 self.assertEqual(final_content["cacert"], "cacert")
100 self.db.encrypt.assert_has_calls(
101 [
102 call("encrypted_secret", schema_version="1.11", salt="id"),
103 call("encrypted_cacert", schema_version="1.11", salt="id"),
104 ]
105 )
106 mock_super_format_on_edit.assert_called()
107
108 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
109 def test_check_conflict_on_del(self, mock_check_conflict_on_del):
110 session = {
111 "project_id": "project-id",
112 "force": False,
113 }
114 _id = "vca-id"
115 db_content = {}
116
117 self.db.get_list.return_value = None
118
119 self.vca_topic.check_conflict_on_del(session, _id, db_content)
120
121 self.db.get_list.assert_called_with(
122 "vim_accounts",
123 {"vca": _id, "_admin.projects_read.cont": "project-id"},
124 )
125 mock_check_conflict_on_del.assert_called_with(session, _id, db_content)
126
127 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
128 def test_check_conflict_on_del_force(self, mock_check_conflict_on_del):
129 session = {
130 "project_id": "project-id",
131 "force": True,
132 }
133 _id = "vca-id"
134 db_content = {}
135
136 self.vca_topic.check_conflict_on_del(session, _id, db_content)
137
138 self.db.get_list.assert_not_called()
139 mock_check_conflict_on_del.assert_not_called()
140
141 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
142 def test_check_conflict_on_del_with_conflict(self, mock_check_conflict_on_del):
143 session = {
144 "project_id": "project-id",
145 "force": False,
146 }
147 _id = "vca-id"
148 db_content = {}
149
150 self.db.get_list.return_value = {"_id": "vim", "vca": "vca-id"}
151
152 with self.assertRaises(EngineException) as context:
153 self.vca_topic.check_conflict_on_del(session, _id, db_content)
154 self.assertEqual(
155 context.exception,
156 EngineException(
157 "There is at least one VIM account using this vca",
158 http_code=HTTPStatus.CONFLICT,
159 ),
160 )
161
162 self.db.get_list.assert_called_with(
163 "vim_accounts",
164 {"vca": _id, "_admin.projects_read.cont": "project-id"},
165 )
166 mock_check_conflict_on_del.assert_not_called()
167
168
169 class Test_ProjectTopicAuth(TestCase):
170 @classmethod
171 def setUpClass(cls):
172 cls.test_name = "test-project-topic"
173
174 def setUp(self):
175 self.db = Mock(dbbase.DbBase())
176 self.fs = Mock(fsbase.FsBase())
177 self.msg = Mock(msgbase.MsgBase())
178 self.auth = Mock(authconn.Authconn(None, None, None))
179 self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth)
180 self.fake_session = {
181 "username": self.test_name,
182 "project_id": (test_pid,),
183 "method": None,
184 "admin": True,
185 "force": False,
186 "public": False,
187 "allow_show_user_project_role": True,
188 }
189 self.topic.check_quota = Mock(return_value=None) # skip quota
190
191 def test_new_project(self):
192 with self.subTest(i=1):
193 rollback = []
194 pid1 = str(uuid4())
195 self.auth.get_project_list.return_value = []
196 self.auth.create_project.return_value = pid1
197 pid2, oid = self.topic.new(
198 rollback, self.fake_session, {"name": self.test_name, "quotas": {}}
199 )
200 self.assertEqual(len(rollback), 1, "Wrong rollback length")
201 self.assertEqual(pid2, pid1, "Wrong project identifier")
202 content = self.auth.create_project.call_args[0][0]
203 self.assertEqual(content["name"], self.test_name, "Wrong project name")
204 self.assertEqual(content["quotas"], {}, "Wrong quotas")
205 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
206 self.assertEqual(
207 content["_admin"]["modified"],
208 content["_admin"]["created"],
209 "Wrong modification time",
210 )
211 with self.subTest(i=2):
212 rollback = []
213 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
214 self.topic.new(
215 rollback,
216 self.fake_session,
217 {"name": "other-project-name", "quotas": {"baditems": 10}},
218 )
219 self.assertEqual(len(rollback), 0, "Wrong rollback length")
220 self.assertEqual(
221 e.exception.http_code,
222 HTTPStatus.UNPROCESSABLE_ENTITY,
223 "Wrong HTTP status code",
224 )
225 self.assertIn(
226 "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format(
227 "baditems"
228 ),
229 norm(str(e.exception)),
230 "Wrong exception text",
231 )
232
233 def test_edit_project(self):
234 now = time()
235 pid = str(uuid4())
236 proj = {
237 "_id": pid,
238 "name": self.test_name,
239 "_admin": {"created": now, "modified": now},
240 }
241 with self.subTest(i=1):
242 self.auth.get_project_list.side_effect = [[proj], []]
243 new_name = "new-project-name"
244 quotas = {
245 "vnfds": random.SystemRandom().randint(0, 100),
246 "nsds": random.SystemRandom().randint(0, 100),
247 }
248 self.topic.edit(
249 self.fake_session, pid, {"name": new_name, "quotas": quotas}
250 )
251 _id, content = self.auth.update_project.call_args[0]
252 self.assertEqual(_id, pid, "Wrong project identifier")
253 self.assertEqual(content["_id"], pid, "Wrong project identifier")
254 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
255 self.assertGreater(
256 content["_admin"]["modified"], now, "Wrong modification time"
257 )
258 self.assertEqual(content["name"], new_name, "Wrong project name")
259 self.assertEqual(content["quotas"], quotas, "Wrong quotas")
260 with self.subTest(i=2):
261 new_name = "other-project-name"
262 quotas = {"baditems": random.SystemRandom().randint(0, 100)}
263 self.auth.get_project_list.side_effect = [[proj], []]
264 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
265 self.topic.edit(
266 self.fake_session, pid, {"name": new_name, "quotas": quotas}
267 )
268 self.assertEqual(
269 e.exception.http_code,
270 HTTPStatus.UNPROCESSABLE_ENTITY,
271 "Wrong HTTP status code",
272 )
273 self.assertIn(
274 "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format(
275 "baditems"
276 ),
277 norm(str(e.exception)),
278 "Wrong exception text",
279 )
280
281 def test_conflict_on_new(self):
282 with self.subTest(i=1):
283 rollback = []
284 pid = str(uuid4())
285 with self.assertRaises(
286 EngineException, msg="Accepted uuid as project name"
287 ) as e:
288 self.topic.new(rollback, self.fake_session, {"name": pid})
289 self.assertEqual(len(rollback), 0, "Wrong rollback length")
290 self.assertEqual(
291 e.exception.http_code,
292 HTTPStatus.UNPROCESSABLE_ENTITY,
293 "Wrong HTTP status code",
294 )
295 self.assertIn(
296 "project name '{}' cannot have an uuid format".format(pid),
297 norm(str(e.exception)),
298 "Wrong exception text",
299 )
300 with self.subTest(i=2):
301 rollback = []
302 self.auth.get_project_list.return_value = [
303 {"_id": test_pid, "name": self.test_name}
304 ]
305 with self.assertRaises(
306 EngineException, msg="Accepted existing project name"
307 ) as e:
308 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
309 self.assertEqual(len(rollback), 0, "Wrong rollback length")
310 self.assertEqual(
311 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
312 )
313 self.assertIn(
314 "project '{}' exists".format(self.test_name),
315 norm(str(e.exception)),
316 "Wrong exception text",
317 )
318
319 def test_conflict_on_edit(self):
320 with self.subTest(i=1):
321 self.auth.get_project_list.return_value = [
322 {"_id": test_pid, "name": self.test_name}
323 ]
324 new_name = str(uuid4())
325 with self.assertRaises(
326 EngineException, msg="Accepted uuid as project name"
327 ) as e:
328 self.topic.edit(self.fake_session, test_pid, {"name": new_name})
329 self.assertEqual(
330 e.exception.http_code,
331 HTTPStatus.UNPROCESSABLE_ENTITY,
332 "Wrong HTTP status code",
333 )
334 self.assertIn(
335 "project name '{}' cannot have an uuid format".format(new_name),
336 norm(str(e.exception)),
337 "Wrong exception text",
338 )
339 with self.subTest(i=2):
340 pid = str(uuid4())
341 self.auth.get_project_list.return_value = [{"_id": pid, "name": "admin"}]
342 with self.assertRaises(
343 EngineException, msg="Accepted renaming of project 'admin'"
344 ) as e:
345 self.topic.edit(self.fake_session, pid, {"name": "new-name"})
346 self.assertEqual(
347 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
348 )
349 self.assertIn(
350 "you cannot rename project 'admin'",
351 norm(str(e.exception)),
352 "Wrong exception text",
353 )
354 with self.subTest(i=3):
355 new_name = "new-project-name"
356 self.auth.get_project_list.side_effect = [
357 [{"_id": test_pid, "name": self.test_name}],
358 [{"_id": str(uuid4()), "name": new_name}],
359 ]
360 with self.assertRaises(
361 EngineException, msg="Accepted existing project name"
362 ) as e:
363 self.topic.edit(self.fake_session, pid, {"name": new_name})
364 self.assertEqual(
365 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
366 )
367 self.assertIn(
368 "project '{}' is already used".format(new_name),
369 norm(str(e.exception)),
370 "Wrong exception text",
371 )
372
373 def test_delete_project(self):
374 with self.subTest(i=1):
375 pid = str(uuid4())
376 self.auth.get_project.return_value = {
377 "_id": pid,
378 "name": "other-project-name",
379 }
380 self.auth.delete_project.return_value = {"deleted": 1}
381 self.auth.get_user_list.return_value = []
382 self.db.get_list.return_value = []
383 rc = self.topic.delete(self.fake_session, pid)
384 self.assertEqual(rc, {"deleted": 1}, "Wrong project deletion return info")
385 self.assertEqual(
386 self.auth.get_project.call_args[0][0], pid, "Wrong project identifier"
387 )
388 self.assertEqual(
389 self.auth.delete_project.call_args[0][0],
390 pid,
391 "Wrong project identifier",
392 )
393
394 def test_conflict_on_del(self):
395 with self.subTest(i=1):
396 self.auth.get_project.return_value = {
397 "_id": test_pid,
398 "name": self.test_name,
399 }
400 with self.assertRaises(
401 EngineException, msg="Accepted deletion of own project"
402 ) as e:
403 self.topic.delete(self.fake_session, self.test_name)
404 self.assertEqual(
405 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
406 )
407 self.assertIn(
408 "you cannot delete your own project",
409 norm(str(e.exception)),
410 "Wrong exception text",
411 )
412 with self.subTest(i=2):
413 self.auth.get_project.return_value = {"_id": str(uuid4()), "name": "admin"}
414 with self.assertRaises(
415 EngineException, msg="Accepted deletion of project 'admin'"
416 ) as e:
417 self.topic.delete(self.fake_session, "admin")
418 self.assertEqual(
419 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
420 )
421 self.assertIn(
422 "you cannot delete project 'admin'",
423 norm(str(e.exception)),
424 "Wrong exception text",
425 )
426 with self.subTest(i=3):
427 pid = str(uuid4())
428 name = "other-project-name"
429 self.auth.get_project.return_value = {"_id": pid, "name": name}
430 self.auth.get_user_list.return_value = [
431 {
432 "_id": str(uuid4()),
433 "username": self.test_name,
434 "project_role_mappings": [{"project": pid, "role": str(uuid4())}],
435 }
436 ]
437 with self.assertRaises(
438 EngineException, msg="Accepted deletion of used project"
439 ) as e:
440 self.topic.delete(self.fake_session, pid)
441 self.assertEqual(
442 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
443 )
444 self.assertIn(
445 "project '{}' ({}) is being used by user '{}'".format(
446 name, pid, self.test_name
447 ),
448 norm(str(e.exception)),
449 "Wrong exception text",
450 )
451 with self.subTest(i=4):
452 self.auth.get_user_list.return_value = []
453 self.db.get_list.return_value = [
454 {
455 "_id": str(uuid4()),
456 "id": self.test_name,
457 "_admin": {"projects_read": [pid], "projects_write": []},
458 }
459 ]
460 with self.assertRaises(
461 EngineException, msg="Accepted deletion of used project"
462 ) as e:
463 self.topic.delete(self.fake_session, pid)
464 self.assertEqual(
465 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
466 )
467 self.assertIn(
468 "project '{}' ({}) is being used by {} '{}'".format(
469 name, pid, "vnf descriptor", self.test_name
470 ),
471 norm(str(e.exception)),
472 "Wrong exception text",
473 )
474
475
476 class Test_RoleTopicAuth(TestCase):
477 @classmethod
478 def setUpClass(cls):
479 cls.test_name = "test-role-topic"
480 cls.test_operations = ["tokens:get"]
481
482 def setUp(self):
483 self.db = Mock(dbbase.DbBase())
484 self.fs = Mock(fsbase.FsBase())
485 self.msg = Mock(msgbase.MsgBase())
486 self.auth = Mock(authconn.Authconn(None, None, None))
487 self.auth.role_permissions = self.test_operations
488 self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth)
489 self.fake_session = {
490 "username": test_name,
491 "project_id": (test_pid,),
492 "method": None,
493 "admin": True,
494 "force": False,
495 "public": False,
496 "allow_show_user_project_role": True,
497 }
498 self.topic.check_quota = Mock(return_value=None) # skip quota
499
500 def test_new_role(self):
501 with self.subTest(i=1):
502 rollback = []
503 rid1 = str(uuid4())
504 perms_in = {"tokens": True}
505 perms_out = {"default": False, "admin": False, "tokens": True}
506 self.auth.get_role_list.return_value = []
507 self.auth.create_role.return_value = rid1
508 rid2, oid = self.topic.new(
509 rollback,
510 self.fake_session,
511 {"name": self.test_name, "permissions": perms_in},
512 )
513 self.assertEqual(len(rollback), 1, "Wrong rollback length")
514 self.assertEqual(rid2, rid1, "Wrong project identifier")
515 content = self.auth.create_role.call_args[0][0]
516 self.assertEqual(content["name"], self.test_name, "Wrong role name")
517 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
518 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
519 self.assertEqual(
520 content["_admin"]["modified"],
521 content["_admin"]["created"],
522 "Wrong modification time",
523 )
524 with self.subTest(i=2):
525 rollback = []
526 with self.assertRaises(
527 EngineException, msg="Accepted wrong permissions"
528 ) as e:
529 self.topic.new(
530 rollback,
531 self.fake_session,
532 {"name": "other-role-name", "permissions": {"projects": True}},
533 )
534 self.assertEqual(len(rollback), 0, "Wrong rollback length")
535 self.assertEqual(
536 e.exception.http_code,
537 HTTPStatus.UNPROCESSABLE_ENTITY,
538 "Wrong HTTP status code",
539 )
540 self.assertIn(
541 "invalid permission '{}'".format("projects"),
542 norm(str(e.exception)),
543 "Wrong exception text",
544 )
545
546 def test_edit_role(self):
547 now = time()
548 rid = str(uuid4())
549 role = {
550 "_id": rid,
551 "name": self.test_name,
552 "permissions": {"tokens": True},
553 "_admin": {"created": now, "modified": now},
554 }
555 with self.subTest(i=1):
556 self.auth.get_role_list.side_effect = [[role], []]
557 self.auth.get_role.return_value = role
558 new_name = "new-role-name"
559 perms_in = {"tokens": False, "tokens:get": True}
560 perms_out = {
561 "default": False,
562 "admin": False,
563 "tokens": False,
564 "tokens:get": True,
565 }
566 self.topic.edit(
567 self.fake_session, rid, {"name": new_name, "permissions": perms_in}
568 )
569 content = self.auth.update_role.call_args[0][0]
570 self.assertEqual(content["_id"], rid, "Wrong role identifier")
571 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
572 self.assertGreater(
573 content["_admin"]["modified"], now, "Wrong modification time"
574 )
575 self.assertEqual(content["name"], new_name, "Wrong role name")
576 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
577 with self.subTest(i=2):
578 new_name = "other-role-name"
579 perms_in = {"tokens": False, "tokens:post": True}
580 self.auth.get_role_list.side_effect = [[role], []]
581 with self.assertRaises(
582 EngineException, msg="Accepted wrong permissions"
583 ) as e:
584 self.topic.edit(
585 self.fake_session, rid, {"name": new_name, "permissions": perms_in}
586 )
587 self.assertEqual(
588 e.exception.http_code,
589 HTTPStatus.UNPROCESSABLE_ENTITY,
590 "Wrong HTTP status code",
591 )
592 self.assertIn(
593 "invalid permission '{}'".format("tokens:post"),
594 norm(str(e.exception)),
595 "Wrong exception text",
596 )
597
598 def test_delete_role(self):
599 with self.subTest(i=1):
600 rid = str(uuid4())
601 role = {"_id": rid, "name": "other-role-name"}
602 self.auth.get_role_list.return_value = [role]
603 self.auth.get_role.return_value = role
604 self.auth.delete_role.return_value = {"deleted": 1}
605 self.auth.get_user_list.return_value = []
606 rc = self.topic.delete(self.fake_session, rid)
607 self.assertEqual(rc, {"deleted": 1}, "Wrong role deletion return info")
608 self.assertEqual(
609 self.auth.get_role_list.call_args[0][0]["_id"],
610 rid,
611 "Wrong role identifier",
612 )
613 self.assertEqual(
614 self.auth.get_role.call_args[0][0], rid, "Wrong role identifier"
615 )
616 self.assertEqual(
617 self.auth.delete_role.call_args[0][0], rid, "Wrong role identifier"
618 )
619
620 def test_conflict_on_new(self):
621 with self.subTest(i=1):
622 rollback = []
623 rid = str(uuid4())
624 with self.assertRaises(
625 EngineException, msg="Accepted uuid as role name"
626 ) as e:
627 self.topic.new(rollback, self.fake_session, {"name": rid})
628 self.assertEqual(len(rollback), 0, "Wrong rollback length")
629 self.assertEqual(
630 e.exception.http_code,
631 HTTPStatus.UNPROCESSABLE_ENTITY,
632 "Wrong HTTP status code",
633 )
634 self.assertIn(
635 "role name '{}' cannot have an uuid format".format(rid),
636 norm(str(e.exception)),
637 "Wrong exception text",
638 )
639 with self.subTest(i=2):
640 rollback = []
641 self.auth.get_role_list.return_value = [
642 {"_id": str(uuid4()), "name": self.test_name}
643 ]
644 with self.assertRaises(
645 EngineException, msg="Accepted existing role name"
646 ) as e:
647 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
648 self.assertEqual(len(rollback), 0, "Wrong rollback length")
649 self.assertEqual(
650 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
651 )
652 self.assertIn(
653 "role name '{}' exists".format(self.test_name),
654 norm(str(e.exception)),
655 "Wrong exception text",
656 )
657
658 def test_conflict_on_edit(self):
659 rid = str(uuid4())
660 with self.subTest(i=1):
661 self.auth.get_role_list.return_value = [
662 {"_id": rid, "name": self.test_name, "permissions": {}}
663 ]
664 new_name = str(uuid4())
665 with self.assertRaises(
666 EngineException, msg="Accepted uuid as role name"
667 ) as e:
668 self.topic.edit(self.fake_session, rid, {"name": new_name})
669 self.assertEqual(
670 e.exception.http_code,
671 HTTPStatus.UNPROCESSABLE_ENTITY,
672 "Wrong HTTP status code",
673 )
674 self.assertIn(
675 "role name '{}' cannot have an uuid format".format(new_name),
676 norm(str(e.exception)),
677 "Wrong exception text",
678 )
679 for i, role_name in enumerate(["system_admin", "project_admin"], start=2):
680 with self.subTest(i=i):
681 rid = str(uuid4())
682 self.auth.get_role.return_value = {
683 "_id": rid,
684 "name": role_name,
685 "permissions": {},
686 }
687 with self.assertRaises(
688 EngineException,
689 msg="Accepted renaming of role '{}'".format(role_name),
690 ) as e:
691 self.topic.edit(self.fake_session, rid, {"name": "new-name"})
692 self.assertEqual(
693 e.exception.http_code,
694 HTTPStatus.FORBIDDEN,
695 "Wrong HTTP status code",
696 )
697 self.assertIn(
698 "you cannot rename role '{}'".format(role_name),
699 norm(str(e.exception)),
700 "Wrong exception text",
701 )
702 with self.subTest(i=i + 1):
703 new_name = "new-role-name"
704 self.auth.get_role_list.side_effect = [
705 [{"_id": rid, "name": self.test_name, "permissions": {}}],
706 [{"_id": str(uuid4()), "name": new_name, "permissions": {}}],
707 ]
708 self.auth.get_role.return_value = {
709 "_id": rid,
710 "name": self.test_name,
711 "permissions": {},
712 }
713 with self.assertRaises(
714 EngineException, msg="Accepted existing role name"
715 ) as e:
716 self.topic.edit(self.fake_session, rid, {"name": new_name})
717 self.assertEqual(
718 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
719 )
720 self.assertIn(
721 "role name '{}' exists".format(new_name),
722 norm(str(e.exception)),
723 "Wrong exception text",
724 )
725
726 def test_conflict_on_del(self):
727 for i, role_name in enumerate(["system_admin", "project_admin"], start=1):
728 with self.subTest(i=i):
729 rid = str(uuid4())
730 role = {"_id": rid, "name": role_name}
731 self.auth.get_role_list.return_value = [role]
732 self.auth.get_role.return_value = role
733 with self.assertRaises(
734 EngineException,
735 msg="Accepted deletion of role '{}'".format(role_name),
736 ) as e:
737 self.topic.delete(self.fake_session, rid)
738 self.assertEqual(
739 e.exception.http_code,
740 HTTPStatus.FORBIDDEN,
741 "Wrong HTTP status code",
742 )
743 self.assertIn(
744 "you cannot delete role '{}'".format(role_name),
745 norm(str(e.exception)),
746 "Wrong exception text",
747 )
748 with self.subTest(i=i + 1):
749 rid = str(uuid4())
750 name = "other-role-name"
751 role = {"_id": rid, "name": name}
752 self.auth.get_role_list.return_value = [role]
753 self.auth.get_role.return_value = role
754 self.auth.get_user_list.return_value = [
755 {
756 "_id": str(uuid4()),
757 "username": self.test_name,
758 "project_role_mappings": [{"project": str(uuid4()), "role": rid}],
759 }
760 ]
761 with self.assertRaises(
762 EngineException, msg="Accepted deletion of used role"
763 ) as e:
764 self.topic.delete(self.fake_session, rid)
765 self.assertEqual(
766 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
767 )
768 self.assertIn(
769 "role '{}' ({}) is being used by user '{}'".format(
770 name, rid, self.test_name
771 ),
772 norm(str(e.exception)),
773 "Wrong exception text",
774 )
775
776
777 class Test_UserTopicAuth(TestCase):
778 @classmethod
779 def setUpClass(cls):
780 cls.test_name = "test-user-topic"
781
782 def setUp(self):
783 self.db = Mock(dbbase.DbBase())
784 self.fs = Mock(fsbase.FsBase())
785 self.msg = Mock(msgbase.MsgBase())
786 self.auth = Mock(authconn.Authconn(None, None, None))
787 self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth)
788 self.fake_session = {
789 "username": test_name,
790 "project_id": (test_pid,),
791 "method": None,
792 "admin": True,
793 "force": False,
794 "public": False,
795 "allow_show_user_project_role": True,
796 }
797 self.topic.check_quota = Mock(return_value=None) # skip quota
798
799 def test_new_user(self):
800 uid1 = str(uuid4())
801 pid = str(uuid4())
802 self.auth.get_user_list.return_value = []
803 self.auth.get_project.return_value = {"_id": pid, "name": "some_project"}
804 self.auth.create_user.return_value = {"_id": uid1, "username": self.test_name}
805 with self.subTest(i=1):
806 rollback = []
807 rid = str(uuid4())
808 self.auth.get_role.return_value = {"_id": rid, "name": "some_role"}
809 prms_in = [{"project": "some_project", "role": "some_role"}]
810 prms_out = [{"project": pid, "role": rid}]
811 uid2, oid = self.topic.new(
812 rollback,
813 self.fake_session,
814 {
815 "username": self.test_name,
816 "password": self.test_name,
817 "project_role_mappings": prms_in,
818 },
819 )
820 self.assertEqual(len(rollback), 1, "Wrong rollback length")
821 self.assertEqual(uid2, uid1, "Wrong project identifier")
822 content = self.auth.create_user.call_args[0][0]
823 self.assertEqual(content["username"], self.test_name, "Wrong project name")
824 self.assertEqual(content["password"], self.test_name, "Wrong password")
825 self.assertEqual(
826 content["project_role_mappings"],
827 prms_out,
828 "Wrong project-role mappings",
829 )
830 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
831 self.assertEqual(
832 content["_admin"]["modified"],
833 content["_admin"]["created"],
834 "Wrong modification time",
835 )
836 with self.subTest(i=2):
837 rollback = []
838 def_rid = str(uuid4())
839 def_role = {"_id": def_rid, "name": "project_admin"}
840 self.auth.get_role.return_value = def_role
841 self.auth.get_role_list.return_value = [def_role]
842 prms_out = [{"project": pid, "role": def_rid}]
843 uid2, oid = self.topic.new(
844 rollback,
845 self.fake_session,
846 {
847 "username": self.test_name,
848 "password": self.test_name,
849 "projects": ["some_project"],
850 },
851 )
852 self.assertEqual(len(rollback), 1, "Wrong rollback length")
853 self.assertEqual(uid2, uid1, "Wrong project identifier")
854 content = self.auth.create_user.call_args[0][0]
855 self.assertEqual(content["username"], self.test_name, "Wrong project name")
856 self.assertEqual(content["password"], self.test_name, "Wrong password")
857 self.assertEqual(
858 content["project_role_mappings"],
859 prms_out,
860 "Wrong project-role mappings",
861 )
862 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
863 self.assertEqual(
864 content["_admin"]["modified"],
865 content["_admin"]["created"],
866 "Wrong modification time",
867 )
868 with self.subTest(i=3):
869 rollback = []
870 with self.assertRaises(
871 EngineException, msg="Accepted wrong project-role mappings"
872 ) as e:
873 self.topic.new(
874 rollback,
875 self.fake_session,
876 {
877 "username": "other-project-name",
878 "password": "other-password",
879 "project_role_mappings": [{}],
880 },
881 )
882 self.assertEqual(len(rollback), 0, "Wrong rollback length")
883 self.assertEqual(
884 e.exception.http_code,
885 HTTPStatus.UNPROCESSABLE_ENTITY,
886 "Wrong HTTP status code",
887 )
888 self.assertIn(
889 "format error at '{}' '{}'".format(
890 "project_role_mappings:{}", "'{}' is a required property"
891 ).format(0, "project"),
892 norm(str(e.exception)),
893 "Wrong exception text",
894 )
895 with self.subTest(i=4):
896 rollback = []
897 with self.assertRaises(EngineException, msg="Accepted wrong projects") as e:
898 self.topic.new(
899 rollback,
900 self.fake_session,
901 {
902 "username": "other-project-name",
903 "password": "other-password",
904 "projects": [],
905 },
906 )
907 self.assertEqual(len(rollback), 0, "Wrong rollback length")
908 self.assertEqual(
909 e.exception.http_code,
910 HTTPStatus.UNPROCESSABLE_ENTITY,
911 "Wrong HTTP status code",
912 )
913 self.assertIn(
914 "format error at '{}' '{}'".format(
915 "projects", "{} is too short"
916 ).format([]),
917 norm(str(e.exception)),
918 "Wrong exception text",
919 )
920
921 def test_edit_user(self):
922 now = time()
923 uid = str(uuid4())
924 pid1 = str(uuid4())
925 rid1 = str(uuid4())
926 prms = [
927 {
928 "project": pid1,
929 "project_name": "project-1",
930 "role": rid1,
931 "role_name": "role-1",
932 }
933 ]
934 user = {
935 "_id": uid,
936 "username": self.test_name,
937 "project_role_mappings": prms,
938 "_admin": {"created": now, "modified": now},
939 }
940 with self.subTest(i=1):
941 self.auth.get_user_list.side_effect = [[user], []]
942 self.auth.get_user.return_value = user
943 pid2 = str(uuid4())
944 rid2 = str(uuid4())
945 self.auth.get_project.side_effect = [
946 {"_id": pid2, "name": "project-2"},
947 {"_id": pid1, "name": "project-1"},
948 ]
949 self.auth.get_role.side_effect = [
950 {"_id": rid2, "name": "role-2"},
951 {"_id": rid1, "name": "role-1"},
952 ]
953 new_name = "new-user-name"
954 new_pasw = "new-password"
955 add_prms = [{"project": pid2, "role": rid2}]
956 rem_prms = [{"project": pid1, "role": rid1}]
957 self.topic.edit(
958 self.fake_session,
959 uid,
960 {
961 "username": new_name,
962 "password": new_pasw,
963 "add_project_role_mappings": add_prms,
964 "remove_project_role_mappings": rem_prms,
965 },
966 )
967 content = self.auth.update_user.call_args[0][0]
968 self.assertEqual(content["_id"], uid, "Wrong user identifier")
969 self.assertEqual(content["username"], new_name, "Wrong user name")
970 self.assertEqual(content["password"], new_pasw, "Wrong user password")
971 self.assertEqual(
972 content["add_project_role_mappings"],
973 add_prms,
974 "Wrong project-role mappings to add",
975 )
976 self.assertEqual(
977 content["remove_project_role_mappings"],
978 prms,
979 "Wrong project-role mappings to remove",
980 )
981 with self.subTest(i=2):
982 new_name = "other-user-name"
983 new_prms = [{}]
984 self.auth.get_role_list.side_effect = [[user], []]
985 self.auth.get_user_list.side_effect = [[user]]
986 with self.assertRaises(
987 EngineException, msg="Accepted wrong project-role mappings"
988 ) as e:
989 self.topic.edit(
990 self.fake_session,
991 uid,
992 {"username": new_name, "project_role_mappings": new_prms},
993 )
994 self.assertEqual(
995 e.exception.http_code,
996 HTTPStatus.UNPROCESSABLE_ENTITY,
997 "Wrong HTTP status code",
998 )
999 self.assertIn(
1000 "format error at '{}' '{}'".format(
1001 "project_role_mappings:{}", "'{}' is a required property"
1002 ).format(0, "project"),
1003 norm(str(e.exception)),
1004 "Wrong exception text",
1005 )
1006 with self.subTest(i=3):
1007 self.auth.get_user_list.side_effect = [[user], []]
1008 self.auth.get_user.return_value = user
1009 old_password = self.test_name
1010 new_pasw = "new-password"
1011 self.topic.edit(
1012 self.fake_session,
1013 uid,
1014 {
1015 "old_password": old_password,
1016 "password": new_pasw,
1017 },
1018 )
1019 content = self.auth.update_user.call_args[0][0]
1020 self.assertEqual(
1021 content["old_password"], old_password, "Wrong old password"
1022 )
1023 self.assertEqual(content["password"], new_pasw, "Wrong user password")
1024
1025 def test_delete_user(self):
1026 with self.subTest(i=1):
1027 uid = str(uuid4())
1028 self.fake_session["username"] = self.test_name
1029 user = user = {
1030 "_id": uid,
1031 "username": "other-user-name",
1032 "project_role_mappings": [],
1033 }
1034 self.auth.get_user.return_value = user
1035 self.auth.delete_user.return_value = {"deleted": 1}
1036 rc = self.topic.delete(self.fake_session, uid)
1037 self.assertEqual(rc, {"deleted": 1}, "Wrong user deletion return info")
1038 self.assertEqual(
1039 self.auth.get_user.call_args[0][0], uid, "Wrong user identifier"
1040 )
1041 self.assertEqual(
1042 self.auth.delete_user.call_args[0][0], uid, "Wrong user identifier"
1043 )
1044
1045 def test_conflict_on_new(self):
1046 with self.subTest(i=1):
1047 rollback = []
1048 uid = str(uuid4())
1049 with self.assertRaises(
1050 EngineException, msg="Accepted uuid as username"
1051 ) as e:
1052 self.topic.new(
1053 rollback,
1054 self.fake_session,
1055 {
1056 "username": uid,
1057 "password": self.test_name,
1058 "projects": [test_pid],
1059 },
1060 )
1061 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1062 self.assertEqual(
1063 e.exception.http_code,
1064 HTTPStatus.UNPROCESSABLE_ENTITY,
1065 "Wrong HTTP status code",
1066 )
1067 self.assertIn(
1068 "username '{}' cannot have a uuid format".format(uid),
1069 norm(str(e.exception)),
1070 "Wrong exception text",
1071 )
1072 with self.subTest(i=2):
1073 rollback = []
1074 self.auth.get_user_list.return_value = [
1075 {"_id": str(uuid4()), "username": self.test_name}
1076 ]
1077 with self.assertRaises(
1078 EngineException, msg="Accepted existing username"
1079 ) as e:
1080 self.topic.new(
1081 rollback,
1082 self.fake_session,
1083 {
1084 "username": self.test_name,
1085 "password": self.test_name,
1086 "projects": [test_pid],
1087 },
1088 )
1089 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1090 self.assertEqual(
1091 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1092 )
1093 self.assertIn(
1094 "username '{}' is already used".format(self.test_name),
1095 norm(str(e.exception)),
1096 "Wrong exception text",
1097 )
1098 with self.subTest(i=3):
1099 rollback = []
1100 self.auth.get_user_list.return_value = []
1101 self.auth.get_role_list.side_effect = [[], []]
1102 with self.assertRaises(
1103 AuthconnNotFoundException, msg="Accepted user without default role"
1104 ) as e:
1105 self.topic.new(
1106 rollback,
1107 self.fake_session,
1108 {
1109 "username": self.test_name,
1110 "password": self.test_name,
1111 "projects": [str(uuid4())],
1112 },
1113 )
1114 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1115 self.assertEqual(
1116 e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code"
1117 )
1118 self.assertIn(
1119 "can't find default role for user '{}'".format(self.test_name),
1120 norm(str(e.exception)),
1121 "Wrong exception text",
1122 )
1123
1124 def test_conflict_on_edit(self):
1125 uid = str(uuid4())
1126 with self.subTest(i=1):
1127 self.auth.get_user_list.return_value = [
1128 {"_id": uid, "username": self.test_name}
1129 ]
1130 new_name = str(uuid4())
1131 with self.assertRaises(
1132 EngineException, msg="Accepted uuid as username"
1133 ) as e:
1134 self.topic.edit(self.fake_session, uid, {"username": new_name})
1135 self.assertEqual(
1136 e.exception.http_code,
1137 HTTPStatus.UNPROCESSABLE_ENTITY,
1138 "Wrong HTTP status code",
1139 )
1140 self.assertIn(
1141 "username '{}' cannot have an uuid format".format(new_name),
1142 norm(str(e.exception)),
1143 "Wrong exception text",
1144 )
1145 with self.subTest(i=2):
1146 self.auth.get_user_list.return_value = [
1147 {"_id": uid, "username": self.test_name}
1148 ]
1149 self.auth.get_role_list.side_effect = [[], []]
1150 with self.assertRaises(
1151 AuthconnNotFoundException, msg="Accepted user without default role"
1152 ) as e:
1153 self.topic.edit(self.fake_session, uid, {"projects": [str(uuid4())]})
1154 self.assertEqual(
1155 e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code"
1156 )
1157 self.assertIn(
1158 "can't find a default role for user '{}'".format(self.test_name),
1159 norm(str(e.exception)),
1160 "Wrong exception text",
1161 )
1162 with self.subTest(i=3):
1163 admin_uid = str(uuid4())
1164 self.auth.get_user_list.return_value = [
1165 {"_id": admin_uid, "username": "admin"}
1166 ]
1167 with self.assertRaises(
1168 EngineException,
1169 msg="Accepted removing system_admin role from admin user",
1170 ) as e:
1171 self.topic.edit(
1172 self.fake_session,
1173 admin_uid,
1174 {
1175 "remove_project_role_mappings": [
1176 {"project": "admin", "role": "system_admin"}
1177 ]
1178 },
1179 )
1180 self.assertEqual(
1181 e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code"
1182 )
1183 self.assertIn(
1184 "you cannot remove system_admin role from admin user",
1185 norm(str(e.exception)),
1186 "Wrong exception text",
1187 )
1188 with self.subTest(i=4):
1189 new_name = "new-user-name"
1190 self.auth.get_user_list.side_effect = [
1191 [{"_id": uid, "name": self.test_name}],
1192 [{"_id": str(uuid4()), "name": new_name}],
1193 ]
1194 with self.assertRaises(
1195 EngineException, msg="Accepted existing username"
1196 ) as e:
1197 self.topic.edit(self.fake_session, uid, {"username": new_name})
1198 self.assertEqual(
1199 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1200 )
1201 self.assertIn(
1202 "username '{}' is already used".format(new_name),
1203 norm(str(e.exception)),
1204 "Wrong exception text",
1205 )
1206
1207 def test_conflict_on_del(self):
1208 with self.subTest(i=1):
1209 uid = str(uuid4())
1210 self.fake_session["username"] = self.test_name
1211 user = user = {
1212 "_id": uid,
1213 "username": self.test_name,
1214 "project_role_mappings": [],
1215 }
1216 self.auth.get_user.return_value = user
1217 with self.assertRaises(
1218 EngineException, msg="Accepted deletion of own user"
1219 ) as e:
1220 self.topic.delete(self.fake_session, uid)
1221 self.assertEqual(
1222 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1223 )
1224 self.assertIn(
1225 "you cannot delete your own login user",
1226 norm(str(e.exception)),
1227 "Wrong exception text",
1228 )
1229
1230
1231 class Test_CommonVimWimSdn(TestCase):
1232 @classmethod
1233 def setUpClass(cls):
1234 cls.test_name = "test-cim-topic" # CIM = Common Infrastructure Manager
1235
1236 def setUp(self):
1237 self.db = Mock(dbbase.DbBase())
1238 self.fs = Mock(fsbase.FsBase())
1239 self.msg = Mock(msgbase.MsgBase())
1240 self.auth = Mock(authconn.Authconn(None, None, None))
1241 self.topic = CommonVimWimSdn(self.db, self.fs, self.msg, self.auth)
1242 # Use WIM schemas for testing because they are the simplest
1243 self.topic._send_msg = Mock()
1244 self.topic.topic = "wims"
1245 self.topic.schema_new = validation.wim_account_new_schema
1246 self.topic.schema_edit = validation.wim_account_edit_schema
1247 self.fake_session = {
1248 "username": test_name,
1249 "project_id": (test_pid,),
1250 "method": None,
1251 "admin": True,
1252 "force": False,
1253 "public": False,
1254 "allow_show_user_project_role": True,
1255 }
1256 self.topic.check_quota = Mock(return_value=None) # skip quota
1257
1258 def test_new_cvws(self):
1259 test_url = "http://0.0.0.0:0"
1260 with self.subTest(i=1):
1261 rollback = []
1262 test_type = "fake"
1263 self.db.get_one.return_value = None
1264 self.db.create.side_effect = lambda self, content: content["_id"]
1265 cid, oid = self.topic.new(
1266 rollback,
1267 self.fake_session,
1268 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type},
1269 )
1270 self.assertEqual(len(rollback), 1, "Wrong rollback length")
1271 args = self.db.create.call_args[0]
1272 content = args[1]
1273 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
1274 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
1275 self.assertEqual(content["name"], self.test_name, "Wrong CIM name")
1276 self.assertEqual(content["wim_url"], test_url, "Wrong URL")
1277 self.assertEqual(content["wim_type"], test_type, "Wrong CIM type")
1278 self.assertEqual(content["schema_version"], "1.11", "Wrong schema version")
1279 self.assertEqual(content["op_id"], oid, "Wrong operation identifier")
1280 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
1281 self.assertEqual(
1282 content["_admin"]["modified"],
1283 content["_admin"]["created"],
1284 "Wrong modification time",
1285 )
1286 self.assertEqual(
1287 content["_admin"]["operationalState"],
1288 "PROCESSING",
1289 "Wrong operational state",
1290 )
1291 self.assertEqual(
1292 content["_admin"]["projects_read"],
1293 [test_pid],
1294 "Wrong read-only projects",
1295 )
1296 self.assertEqual(
1297 content["_admin"]["projects_write"],
1298 [test_pid],
1299 "Wrong read/write projects",
1300 )
1301 self.assertIsNone(
1302 content["_admin"]["current_operation"], "Wrong current operation"
1303 )
1304 self.assertEqual(
1305 len(content["_admin"]["operations"]), 1, "Wrong number of operations"
1306 )
1307 operation = content["_admin"]["operations"][0]
1308 self.assertEqual(
1309 operation["lcmOperationType"], "create", "Wrong operation type"
1310 )
1311 self.assertEqual(
1312 operation["operationState"], "PROCESSING", "Wrong operation state"
1313 )
1314 self.assertGreater(
1315 operation["startTime"],
1316 content["_admin"]["created"],
1317 "Wrong operation start time",
1318 )
1319 self.assertGreater(
1320 operation["statusEnteredTime"],
1321 content["_admin"]["created"],
1322 "Wrong operation status enter time",
1323 )
1324 self.assertEqual(
1325 operation["detailed-status"], "", "Wrong operation detailed status info"
1326 )
1327 self.assertIsNone(
1328 operation["operationParams"], "Wrong operation parameters"
1329 )
1330 # This test is disabled. From Feature 8030 we admit all WIM/SDN types
1331 # with self.subTest(i=2):
1332 # rollback = []
1333 # test_type = "bad_type"
1334 # with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e:
1335 # self.topic.new(rollback, self.fake_session,
1336 # {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
1337 # self.assertEqual(len(rollback), 0, "Wrong rollback length")
1338 # self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
1339 # self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type,""),
1340 # norm(str(e.exception)), "Wrong exception text")
1341
1342 def test_conflict_on_new(self):
1343 with self.subTest(i=1):
1344 rollback = []
1345 test_url = "http://0.0.0.0:0"
1346 test_type = "fake"
1347 self.db.get_one.return_value = {"_id": str(uuid4()), "name": self.test_name}
1348 with self.assertRaises(
1349 EngineException, msg="Accepted existing CIM name"
1350 ) as e:
1351 self.topic.new(
1352 rollback,
1353 self.fake_session,
1354 {
1355 "name": self.test_name,
1356 "wim_url": test_url,
1357 "wim_type": test_type,
1358 },
1359 )
1360 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1361 self.assertEqual(
1362 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1363 )
1364 self.assertIn(
1365 "name '{}' already exists for {}".format(
1366 self.test_name, self.topic.topic
1367 ),
1368 norm(str(e.exception)),
1369 "Wrong exception text",
1370 )
1371
1372 def test_edit_cvws(self):
1373 now = time()
1374 cid = str(uuid4())
1375 test_url = "http://0.0.0.0:0"
1376 test_type = "fake"
1377 cvws = {
1378 "_id": cid,
1379 "name": self.test_name,
1380 "wim_url": test_url,
1381 "wim_type": test_type,
1382 "_admin": {
1383 "created": now,
1384 "modified": now,
1385 "operations": [{"lcmOperationType": "create"}],
1386 },
1387 }
1388 with self.subTest(i=1):
1389 new_name = "new-cim-name"
1390 new_url = "https://1.1.1.1:1"
1391 new_type = "onos"
1392 self.db.get_one.side_effect = [cvws, None]
1393 self.db.replace.return_value = {"updated": 1}
1394 # self.db.encrypt.side_effect = [b64str(), b64str()]
1395 self.topic.edit(
1396 self.fake_session,
1397 cid,
1398 {"name": new_name, "wim_url": new_url, "wim_type": new_type},
1399 )
1400 args = self.db.replace.call_args[0]
1401 content = args[2]
1402 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
1403 self.assertEqual(args[1], cid, "Wrong CIM identifier")
1404 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
1405 self.assertEqual(content["name"], new_name, "Wrong CIM name")
1406 self.assertEqual(content["wim_type"], new_type, "Wrong CIM type")
1407 self.assertEqual(content["wim_url"], new_url, "Wrong URL")
1408 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
1409 self.assertGreater(
1410 content["_admin"]["modified"],
1411 content["_admin"]["created"],
1412 "Wrong modification time",
1413 )
1414 self.assertEqual(
1415 len(content["_admin"]["operations"]), 2, "Wrong number of operations"
1416 )
1417 operation = content["_admin"]["operations"][1]
1418 self.assertEqual(
1419 operation["lcmOperationType"], "edit", "Wrong operation type"
1420 )
1421 self.assertEqual(
1422 operation["operationState"], "PROCESSING", "Wrong operation state"
1423 )
1424 self.assertGreater(
1425 operation["startTime"],
1426 content["_admin"]["modified"],
1427 "Wrong operation start time",
1428 )
1429 self.assertGreater(
1430 operation["statusEnteredTime"],
1431 content["_admin"]["modified"],
1432 "Wrong operation status enter time",
1433 )
1434 self.assertEqual(
1435 operation["detailed-status"], "", "Wrong operation detailed status info"
1436 )
1437 self.assertIsNone(
1438 operation["operationParams"], "Wrong operation parameters"
1439 )
1440 with self.subTest(i=2):
1441 self.db.get_one.side_effect = [cvws]
1442 with self.assertRaises(EngineException, msg="Accepted wrong property") as e:
1443 self.topic.edit(
1444 self.fake_session,
1445 str(uuid4()),
1446 {"name": "new-name", "extra_prop": "anything"},
1447 )
1448 self.assertEqual(
1449 e.exception.http_code,
1450 HTTPStatus.UNPROCESSABLE_ENTITY,
1451 "Wrong HTTP status code",
1452 )
1453 self.assertIn(
1454 "format error '{}'".format(
1455 "additional properties are not allowed ('{}' was unexpected)"
1456 ).format("extra_prop"),
1457 norm(str(e.exception)),
1458 "Wrong exception text",
1459 )
1460
1461 def test_conflict_on_edit(self):
1462 with self.subTest(i=1):
1463 cid = str(uuid4())
1464 new_name = "new-cim-name"
1465 self.db.get_one.side_effect = [
1466 {"_id": cid, "name": self.test_name},
1467 {"_id": str(uuid4()), "name": new_name},
1468 ]
1469 with self.assertRaises(
1470 EngineException, msg="Accepted existing CIM name"
1471 ) as e:
1472 self.topic.edit(self.fake_session, cid, {"name": new_name})
1473 self.assertEqual(
1474 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1475 )
1476 self.assertIn(
1477 "name '{}' already exists for {}".format(new_name, self.topic.topic),
1478 norm(str(e.exception)),
1479 "Wrong exception text",
1480 )
1481
1482 def test_delete_cvws(self):
1483 cid = str(uuid4())
1484 ro_pid = str(uuid4())
1485 rw_pid = str(uuid4())
1486 cvws = {"_id": cid, "name": self.test_name}
1487 self.db.get_list.return_value = []
1488 with self.subTest(i=1):
1489 cvws["_admin"] = {
1490 "projects_read": [test_pid, ro_pid, rw_pid],
1491 "projects_write": [test_pid, rw_pid],
1492 }
1493 self.db.get_one.return_value = cvws
1494 oid = self.topic.delete(self.fake_session, cid)
1495 self.assertIsNone(oid, "Wrong operation identifier")
1496 self.assertEqual(
1497 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1498 )
1499 self.assertEqual(
1500 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1501 )
1502 self.assertEqual(
1503 self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic"
1504 )
1505 self.assertEqual(
1506 self.db.set_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1507 )
1508 self.assertEqual(
1509 self.db.set_one.call_args[1]["update_dict"],
1510 None,
1511 "Wrong read-only projects update",
1512 )
1513 self.assertEqual(
1514 self.db.set_one.call_args[1]["pull_list"],
1515 {
1516 "_admin.projects_read": (test_pid,),
1517 "_admin.projects_write": (test_pid,),
1518 },
1519 "Wrong read/write projects update",
1520 )
1521 self.topic._send_msg.assert_not_called()
1522 with self.subTest(i=2):
1523 now = time()
1524 cvws["_admin"] = {
1525 "projects_read": [test_pid],
1526 "projects_write": [test_pid],
1527 "operations": [],
1528 }
1529 self.db.get_one.return_value = cvws
1530 oid = self.topic.delete(self.fake_session, cid)
1531 self.assertEqual(oid, cid + ":0", "Wrong operation identifier")
1532 self.assertEqual(
1533 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1534 )
1535 self.assertEqual(
1536 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1537 )
1538 self.assertEqual(
1539 self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic"
1540 )
1541 self.assertEqual(
1542 self.db.set_one.call_args[0][1]["_id"], cid, "Wrong user identifier"
1543 )
1544 self.assertEqual(
1545 self.db.set_one.call_args[1]["update_dict"],
1546 {"_admin.to_delete": True},
1547 "Wrong _admin.to_delete update",
1548 )
1549 operation = self.db.set_one.call_args[1]["push"]["_admin.operations"]
1550 self.assertEqual(
1551 operation["lcmOperationType"], "delete", "Wrong operation type"
1552 )
1553 self.assertEqual(
1554 operation["operationState"], "PROCESSING", "Wrong operation state"
1555 )
1556 self.assertEqual(
1557 operation["detailed-status"], "", "Wrong operation detailed status"
1558 )
1559 self.assertIsNone(
1560 operation["operationParams"], "Wrong operation parameters"
1561 )
1562 self.assertGreater(
1563 operation["startTime"], now, "Wrong operation start time"
1564 )
1565 self.assertGreater(
1566 operation["statusEnteredTime"], now, "Wrong operation status enter time"
1567 )
1568 self.topic._send_msg.assert_called_once_with(
1569 "delete", {"_id": cid, "op_id": cid + ":0"}, not_send_msg=None
1570 )
1571 with self.subTest(i=3):
1572 cvws["_admin"] = {
1573 "projects_read": [],
1574 "projects_write": [],
1575 "operations": [],
1576 }
1577 self.db.get_one.return_value = cvws
1578 self.topic._send_msg.reset_mock()
1579 self.db.get_one.reset_mock()
1580 self.db.del_one.reset_mock()
1581 self.fake_session["force"] = True # to force deletion
1582 self.fake_session["admin"] = True # to force deletion
1583 self.fake_session["project_id"] = [] # to force deletion
1584 oid = self.topic.delete(self.fake_session, cid)
1585 self.assertIsNone(oid, "Wrong operation identifier")
1586 self.assertEqual(
1587 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1588 )
1589 self.assertEqual(
1590 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1591 )
1592 self.assertEqual(
1593 self.db.del_one.call_args[0][0], self.topic.topic, "Wrong topic"
1594 )
1595 self.assertEqual(
1596 self.db.del_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1597 )
1598 self.topic._send_msg.assert_called_once_with(
1599 "deleted", {"_id": cid, "op_id": None}, not_send_msg=None
1600 )
1601
1602
1603 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_new")
1604 class TestVimAccountTopic(TestCase):
1605 def setUp(self):
1606 self.db = Mock(dbbase.DbBase())
1607 self.fs = Mock(fsbase.FsBase())
1608 self.msg = Mock(msgbase.MsgBase())
1609 self.auth = Mock(authconn.Authconn(None, None, None))
1610 self.topic = VimAccountTopic(self.db, self.fs, self.msg, self.auth)
1611 self.topic.check_quota = Mock(return_value=None) # skip quota
1612
1613 self.fake_session = {
1614 "username": test_name,
1615 "project_id": (test_pid,),
1616 "method": None,
1617 "admin": True,
1618 "force": False,
1619 "public": False,
1620 "allow_show_user_project_role": True,
1621 }
1622
1623 def check_invalid_indata_raises_exception(self, indata, mock_common_vim_wim_sdn):
1624 with self.assertRaises(EngineException) as error:
1625 self.topic.check_conflict_on_new(self.fake_session, indata)
1626 mock_common_vim_wim_sdn.assert_called_with(self.fake_session, indata)
1627 error_msg = "Invalid config for VIM account '{}'.".format(indata["name"])
1628 self.assertEqual(str(error.exception), error_msg)
1629
1630 def test_check_conflict_on_new_vim_type_paas(self, mock_common_vim_wim_sdn):
1631 valid_juju_paas_config = {
1632 "paas_provider": "juju",
1633 "ca_cert_content": "file_content",
1634 "cloud": "microk8s",
1635 "cloud_credentials": "cloud_credential_name",
1636 "authorized_keys": "keys",
1637 }
1638 indata = {
1639 "name": "juju_paas",
1640 "vim_type": "paas",
1641 "description": None,
1642 "vim_url": "http://0.0.0.0:0",
1643 "vim_user": "some_user",
1644 "vim_password": "some_password",
1645 "vim_tenant_name": "null",
1646 "config": valid_juju_paas_config,
1647 }
1648 self.topic.check_conflict_on_new(self.fake_session, indata)
1649 mock_common_vim_wim_sdn.assert_called_once_with(self.fake_session, indata)
1650
1651 def test_check_conflict_on_new_vim_type_paas_config_missing(
1652 self, mock_common_vim_wim_sdn
1653 ):
1654 indata = {
1655 "name": "juju_paas",
1656 "vim_type": "paas",
1657 "description": None,
1658 "vim_url": "http://0.0.0.0:0",
1659 "vim_user": "some_user",
1660 "vim_password": "some_password",
1661 "vim_tenant_name": "null",
1662 }
1663 self.check_invalid_indata_raises_exception(indata, mock_common_vim_wim_sdn)
1664
1665 def test_check_conflict_on_new_vim_type_paas_invalid_config(
1666 self, mock_common_vim_wim_sdn
1667 ):
1668 invalid_configs = [
1669 {
1670 "paas_provider": "some_paas_provider",
1671 "ca_cert_content": "file_content",
1672 "cloud": "microk8s",
1673 "cloud_credentials": "cloud_credential_name",
1674 },
1675 {
1676 "ca_cert_content": "file_content",
1677 "cloud": "microk8s",
1678 "cloud_credentials": "cloud_credential_name",
1679 },
1680 {
1681 "paas_provider": "juju",
1682 "cloud": "microk8s",
1683 "cloud_credentials": "cloud_credential_name",
1684 },
1685 {
1686 "paas_provider": "juju",
1687 "ca_cert_content": "file_content",
1688 "cloud_credentials": "cloud_credential_name",
1689 },
1690 {
1691 "paas_provider": "juju",
1692 "ca_cert_content": "file_content",
1693 "cloud": "microk8s",
1694 },
1695 {
1696 "some_param": None,
1697 },
1698 {},
1699 ]
1700 for config in invalid_configs:
1701 with self.subTest():
1702 indata = {
1703 "name": "juju_paas",
1704 "vim_type": "paas",
1705 "description": None,
1706 "vim_url": "http://0.0.0.0:0",
1707 "vim_user": "some_user",
1708 "vim_password": "some_password",
1709 "vim_tenant_name": "null",
1710 "config": config,
1711 }
1712 self.check_invalid_indata_raises_exception(
1713 indata, mock_common_vim_wim_sdn
1714 )
1715
1716 def test_kafka_message_is_not_sent_if_paas_vim(self, mock_common_vim_wim_sdn):
1717 valid_juju_paas_config = {
1718 "paas_provider": "juju",
1719 "ca_cert_content": "file_content",
1720 "cloud": "microk8s",
1721 "cloud_credentials": "cloud_credential_name",
1722 "authorized_keys": "keys",
1723 }
1724 indata = {
1725 "name": "juju_paas",
1726 "vim_type": "paas",
1727 "description": None,
1728 "vim_url": "http://0.0.0.0:0",
1729 "vim_user": "some_user",
1730 "vim_password": "some_password",
1731 "vim_tenant_name": "null",
1732 "config": valid_juju_paas_config,
1733 }
1734 rollback = []
1735 self.topic.temporal = Mock()
1736
1737 self.topic.new(rollback, self.fake_session, indata)
1738 self.assertEqual(len(rollback), 1, "Wrong rollback length")
1739 self.msg.write.assert_not_called()
1740 self.topic.temporal.start_vim_workflow.assert_called_once()
1741
1742 def test_kafka_message_is_sent_if_not_paas_vim(self, mock_common_vim_wim_sdn):
1743 indata = {
1744 "name": "juju_paas",
1745 "vim_type": "openstack",
1746 "description": None,
1747 "vim_url": "http://0.0.0.0:0",
1748 "vim_user": "some_user",
1749 "vim_password": "some_password",
1750 "vim_tenant_name": "null",
1751 }
1752 rollback = []
1753
1754 self.topic.new(rollback, self.fake_session, indata)
1755 self.assertEqual(len(rollback), 1, "Wrong rollback length")
1756 mock_common_vim_wim_sdn.assert_called_once_with(self.fake_session, indata)
1757 self.msg.write.assert_called_once_with("vim_account", "created", ANY)
1758
1759
1760 if __name__ == "__main__":
1761 unittest.main()