Remove NsState from VNF record
[osm/NBI.git] / osm_nbi / tests / test_admin_topics.py
1 #! /usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 __author__ = "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com"
18 __date__ = "$2019-10-019"
19
20 import unittest
21 from unittest import TestCase
22 from unittest.mock import Mock, patch, call, ANY
23 from uuid import uuid4
24 from http import HTTPStatus
25 from time import time
26 from random import randint
27 from osm_common import dbbase, fsbase, msgbase
28 from osm_nbi import authconn, validation
29 from osm_nbi.admin_topics import (
30 ProjectTopicAuth,
31 RoleTopicAuth,
32 UserTopicAuth,
33 CommonVimWimSdn,
34 VcaTopic,
35 VimAccountTopic,
36 )
37 from osm_nbi.engine import EngineException
38 from osm_nbi.authconn import AuthconnNotFoundException
39
40
41 test_pid = str(uuid4())
42 test_name = "test-user"
43
44
45 def norm(str):
46 """Normalize string for checking"""
47 return " ".join(str.strip().split()).lower()
48
49
50 class TestVcaTopic(TestCase):
51 def setUp(self):
52 self.db = Mock(dbbase.DbBase())
53 self.fs = Mock(fsbase.FsBase())
54 self.msg = Mock(msgbase.MsgBase())
55 self.auth = Mock(authconn.Authconn(None, None, None))
56 self.vca_topic = VcaTopic(self.db, self.fs, self.msg, self.auth)
57
58 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_new")
59 def test_format_on_new(self, mock_super_format_on_new):
60 content = {
61 "_id": "id",
62 "secret": "encrypted_secret",
63 "cacert": "encrypted_cacert",
64 }
65 self.db.encrypt.side_effect = ["secret", "cacert"]
66 mock_super_format_on_new.return_value = "1234"
67
68 oid = self.vca_topic.format_on_new(content)
69
70 self.assertEqual(oid, "1234")
71 self.assertEqual(content["secret"], "secret")
72 self.assertEqual(content["cacert"], "cacert")
73 self.db.encrypt.assert_has_calls(
74 [
75 call("encrypted_secret", schema_version="1.11", salt="id"),
76 call("encrypted_cacert", schema_version="1.11", salt="id"),
77 ]
78 )
79 mock_super_format_on_new.assert_called_with(content, None, False)
80
81 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_edit")
82 def test_format_on_edit(self, mock_super_format_on_edit):
83 edit_content = {
84 "_id": "id",
85 "secret": "encrypted_secret",
86 "cacert": "encrypted_cacert",
87 }
88 final_content = {
89 "_id": "id",
90 "schema_version": "1.11",
91 }
92 self.db.encrypt.side_effect = ["secret", "cacert"]
93 mock_super_format_on_edit.return_value = "1234"
94
95 oid = self.vca_topic.format_on_edit(final_content, edit_content)
96
97 self.assertEqual(oid, "1234")
98 self.assertEqual(final_content["secret"], "secret")
99 self.assertEqual(final_content["cacert"], "cacert")
100 self.db.encrypt.assert_has_calls(
101 [
102 call("encrypted_secret", schema_version="1.11", salt="id"),
103 call("encrypted_cacert", schema_version="1.11", salt="id"),
104 ]
105 )
106 mock_super_format_on_edit.assert_called()
107
108 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
109 def test_check_conflict_on_del(self, mock_check_conflict_on_del):
110 session = {
111 "project_id": "project-id",
112 "force": False,
113 }
114 _id = "vca-id"
115 db_content = {}
116
117 self.db.get_list.return_value = None
118
119 self.vca_topic.check_conflict_on_del(session, _id, db_content)
120
121 self.db.get_list.assert_called_with(
122 "vim_accounts",
123 {"vca": _id, "_admin.projects_read.cont": "project-id"},
124 )
125 mock_check_conflict_on_del.assert_called_with(session, _id, db_content)
126
127 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
128 def test_check_conflict_on_del_force(self, mock_check_conflict_on_del):
129 session = {
130 "project_id": "project-id",
131 "force": True,
132 }
133 _id = "vca-id"
134 db_content = {}
135
136 self.vca_topic.check_conflict_on_del(session, _id, db_content)
137
138 self.db.get_list.assert_not_called()
139 mock_check_conflict_on_del.assert_not_called()
140
141 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
142 def test_check_conflict_on_del_with_conflict(self, mock_check_conflict_on_del):
143 session = {
144 "project_id": "project-id",
145 "force": False,
146 }
147 _id = "vca-id"
148 db_content = {}
149
150 self.db.get_list.return_value = {"_id": "vim", "vca": "vca-id"}
151
152 with self.assertRaises(EngineException) as context:
153 self.vca_topic.check_conflict_on_del(session, _id, db_content)
154 self.assertEqual(
155 context.exception,
156 EngineException(
157 "There is at least one VIM account using this vca",
158 http_code=HTTPStatus.CONFLICT,
159 ),
160 )
161
162 self.db.get_list.assert_called_with(
163 "vim_accounts",
164 {"vca": _id, "_admin.projects_read.cont": "project-id"},
165 )
166 mock_check_conflict_on_del.assert_not_called()
167
168
169 class Test_ProjectTopicAuth(TestCase):
170 @classmethod
171 def setUpClass(cls):
172 cls.test_name = "test-project-topic"
173
174 def setUp(self):
175 self.db = Mock(dbbase.DbBase())
176 self.fs = Mock(fsbase.FsBase())
177 self.msg = Mock(msgbase.MsgBase())
178 self.auth = Mock(authconn.Authconn(None, None, None))
179 self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth)
180 self.fake_session = {
181 "username": self.test_name,
182 "project_id": (test_pid,),
183 "method": None,
184 "admin": True,
185 "force": False,
186 "public": False,
187 "allow_show_user_project_role": True,
188 }
189 self.topic.check_quota = Mock(return_value=None) # skip quota
190
191 def test_new_project(self):
192 with self.subTest(i=1):
193 rollback = []
194 pid1 = str(uuid4())
195 self.auth.get_project_list.return_value = []
196 self.auth.create_project.return_value = pid1
197 pid2, oid = self.topic.new(
198 rollback, self.fake_session, {"name": self.test_name, "quotas": {}}
199 )
200 self.assertEqual(len(rollback), 1, "Wrong rollback length")
201 self.assertEqual(pid2, pid1, "Wrong project identifier")
202 content = self.auth.create_project.call_args[0][0]
203 self.assertEqual(content["name"], self.test_name, "Wrong project name")
204 self.assertEqual(content["quotas"], {}, "Wrong quotas")
205 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
206 self.assertEqual(
207 content["_admin"]["modified"],
208 content["_admin"]["created"],
209 "Wrong modification time",
210 )
211 with self.subTest(i=2):
212 rollback = []
213 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
214 self.topic.new(
215 rollback,
216 self.fake_session,
217 {"name": "other-project-name", "quotas": {"baditems": 10}},
218 )
219 self.assertEqual(len(rollback), 0, "Wrong rollback length")
220 self.assertEqual(
221 e.exception.http_code,
222 HTTPStatus.UNPROCESSABLE_ENTITY,
223 "Wrong HTTP status code",
224 )
225 self.assertIn(
226 "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format(
227 "baditems"
228 ),
229 norm(str(e.exception)),
230 "Wrong exception text",
231 )
232
233 def test_edit_project(self):
234 now = time()
235 pid = str(uuid4())
236 proj = {
237 "_id": pid,
238 "name": self.test_name,
239 "_admin": {"created": now, "modified": now},
240 }
241 with self.subTest(i=1):
242 self.auth.get_project_list.side_effect = [[proj], []]
243 new_name = "new-project-name"
244 quotas = {"vnfds": randint(0, 100), "nsds": randint(0, 100)}
245 self.topic.edit(
246 self.fake_session, pid, {"name": new_name, "quotas": quotas}
247 )
248 _id, content = self.auth.update_project.call_args[0]
249 self.assertEqual(_id, pid, "Wrong project identifier")
250 self.assertEqual(content["_id"], pid, "Wrong project identifier")
251 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
252 self.assertGreater(
253 content["_admin"]["modified"], now, "Wrong modification time"
254 )
255 self.assertEqual(content["name"], new_name, "Wrong project name")
256 self.assertEqual(content["quotas"], quotas, "Wrong quotas")
257 with self.subTest(i=2):
258 new_name = "other-project-name"
259 quotas = {"baditems": randint(0, 100)}
260 self.auth.get_project_list.side_effect = [[proj], []]
261 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
262 self.topic.edit(
263 self.fake_session, pid, {"name": new_name, "quotas": quotas}
264 )
265 self.assertEqual(
266 e.exception.http_code,
267 HTTPStatus.UNPROCESSABLE_ENTITY,
268 "Wrong HTTP status code",
269 )
270 self.assertIn(
271 "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format(
272 "baditems"
273 ),
274 norm(str(e.exception)),
275 "Wrong exception text",
276 )
277
278 def test_conflict_on_new(self):
279 with self.subTest(i=1):
280 rollback = []
281 pid = str(uuid4())
282 with self.assertRaises(
283 EngineException, msg="Accepted uuid as project name"
284 ) as e:
285 self.topic.new(rollback, self.fake_session, {"name": pid})
286 self.assertEqual(len(rollback), 0, "Wrong rollback length")
287 self.assertEqual(
288 e.exception.http_code,
289 HTTPStatus.UNPROCESSABLE_ENTITY,
290 "Wrong HTTP status code",
291 )
292 self.assertIn(
293 "project name '{}' cannot have an uuid format".format(pid),
294 norm(str(e.exception)),
295 "Wrong exception text",
296 )
297 with self.subTest(i=2):
298 rollback = []
299 self.auth.get_project_list.return_value = [
300 {"_id": test_pid, "name": self.test_name}
301 ]
302 with self.assertRaises(
303 EngineException, msg="Accepted existing project name"
304 ) as e:
305 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
306 self.assertEqual(len(rollback), 0, "Wrong rollback length")
307 self.assertEqual(
308 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
309 )
310 self.assertIn(
311 "project '{}' exists".format(self.test_name),
312 norm(str(e.exception)),
313 "Wrong exception text",
314 )
315
316 def test_conflict_on_edit(self):
317 with self.subTest(i=1):
318 self.auth.get_project_list.return_value = [
319 {"_id": test_pid, "name": self.test_name}
320 ]
321 new_name = str(uuid4())
322 with self.assertRaises(
323 EngineException, msg="Accepted uuid as project name"
324 ) as e:
325 self.topic.edit(self.fake_session, test_pid, {"name": new_name})
326 self.assertEqual(
327 e.exception.http_code,
328 HTTPStatus.UNPROCESSABLE_ENTITY,
329 "Wrong HTTP status code",
330 )
331 self.assertIn(
332 "project name '{}' cannot have an uuid format".format(new_name),
333 norm(str(e.exception)),
334 "Wrong exception text",
335 )
336 with self.subTest(i=2):
337 pid = str(uuid4())
338 self.auth.get_project_list.return_value = [{"_id": pid, "name": "admin"}]
339 with self.assertRaises(
340 EngineException, msg="Accepted renaming of project 'admin'"
341 ) as e:
342 self.topic.edit(self.fake_session, pid, {"name": "new-name"})
343 self.assertEqual(
344 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
345 )
346 self.assertIn(
347 "you cannot rename project 'admin'",
348 norm(str(e.exception)),
349 "Wrong exception text",
350 )
351 with self.subTest(i=3):
352 new_name = "new-project-name"
353 self.auth.get_project_list.side_effect = [
354 [{"_id": test_pid, "name": self.test_name}],
355 [{"_id": str(uuid4()), "name": new_name}],
356 ]
357 with self.assertRaises(
358 EngineException, msg="Accepted existing project name"
359 ) as e:
360 self.topic.edit(self.fake_session, pid, {"name": new_name})
361 self.assertEqual(
362 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
363 )
364 self.assertIn(
365 "project '{}' is already used".format(new_name),
366 norm(str(e.exception)),
367 "Wrong exception text",
368 )
369
370 def test_delete_project(self):
371 with self.subTest(i=1):
372 pid = str(uuid4())
373 self.auth.get_project.return_value = {
374 "_id": pid,
375 "name": "other-project-name",
376 }
377 self.auth.delete_project.return_value = {"deleted": 1}
378 self.auth.get_user_list.return_value = []
379 self.db.get_list.return_value = []
380 rc = self.topic.delete(self.fake_session, pid)
381 self.assertEqual(rc, {"deleted": 1}, "Wrong project deletion return info")
382 self.assertEqual(
383 self.auth.get_project.call_args[0][0], pid, "Wrong project identifier"
384 )
385 self.assertEqual(
386 self.auth.delete_project.call_args[0][0],
387 pid,
388 "Wrong project identifier",
389 )
390
391 def test_conflict_on_del(self):
392 with self.subTest(i=1):
393 self.auth.get_project.return_value = {
394 "_id": test_pid,
395 "name": self.test_name,
396 }
397 with self.assertRaises(
398 EngineException, msg="Accepted deletion of own project"
399 ) as e:
400 self.topic.delete(self.fake_session, self.test_name)
401 self.assertEqual(
402 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
403 )
404 self.assertIn(
405 "you cannot delete your own project",
406 norm(str(e.exception)),
407 "Wrong exception text",
408 )
409 with self.subTest(i=2):
410 self.auth.get_project.return_value = {"_id": str(uuid4()), "name": "admin"}
411 with self.assertRaises(
412 EngineException, msg="Accepted deletion of project 'admin'"
413 ) as e:
414 self.topic.delete(self.fake_session, "admin")
415 self.assertEqual(
416 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
417 )
418 self.assertIn(
419 "you cannot delete project 'admin'",
420 norm(str(e.exception)),
421 "Wrong exception text",
422 )
423 with self.subTest(i=3):
424 pid = str(uuid4())
425 name = "other-project-name"
426 self.auth.get_project.return_value = {"_id": pid, "name": name}
427 self.auth.get_user_list.return_value = [
428 {
429 "_id": str(uuid4()),
430 "username": self.test_name,
431 "project_role_mappings": [{"project": pid, "role": str(uuid4())}],
432 }
433 ]
434 with self.assertRaises(
435 EngineException, msg="Accepted deletion of used project"
436 ) as e:
437 self.topic.delete(self.fake_session, pid)
438 self.assertEqual(
439 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
440 )
441 self.assertIn(
442 "project '{}' ({}) is being used by user '{}'".format(
443 name, pid, self.test_name
444 ),
445 norm(str(e.exception)),
446 "Wrong exception text",
447 )
448 with self.subTest(i=4):
449 self.auth.get_user_list.return_value = []
450 self.db.get_list.return_value = [
451 {
452 "_id": str(uuid4()),
453 "id": self.test_name,
454 "_admin": {"projects_read": [pid], "projects_write": []},
455 }
456 ]
457 with self.assertRaises(
458 EngineException, msg="Accepted deletion of used project"
459 ) as e:
460 self.topic.delete(self.fake_session, pid)
461 self.assertEqual(
462 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
463 )
464 self.assertIn(
465 "project '{}' ({}) is being used by {} '{}'".format(
466 name, pid, "vnf descriptor", self.test_name
467 ),
468 norm(str(e.exception)),
469 "Wrong exception text",
470 )
471
472
473 class Test_RoleTopicAuth(TestCase):
474 @classmethod
475 def setUpClass(cls):
476 cls.test_name = "test-role-topic"
477 cls.test_operations = ["tokens:get"]
478
479 def setUp(self):
480 self.db = Mock(dbbase.DbBase())
481 self.fs = Mock(fsbase.FsBase())
482 self.msg = Mock(msgbase.MsgBase())
483 self.auth = Mock(authconn.Authconn(None, None, None))
484 self.auth.role_permissions = self.test_operations
485 self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth)
486 self.fake_session = {
487 "username": test_name,
488 "project_id": (test_pid,),
489 "method": None,
490 "admin": True,
491 "force": False,
492 "public": False,
493 "allow_show_user_project_role": True,
494 }
495 self.topic.check_quota = Mock(return_value=None) # skip quota
496
497 def test_new_role(self):
498 with self.subTest(i=1):
499 rollback = []
500 rid1 = str(uuid4())
501 perms_in = {"tokens": True}
502 perms_out = {"default": False, "admin": False, "tokens": True}
503 self.auth.get_role_list.return_value = []
504 self.auth.create_role.return_value = rid1
505 rid2, oid = self.topic.new(
506 rollback,
507 self.fake_session,
508 {"name": self.test_name, "permissions": perms_in},
509 )
510 self.assertEqual(len(rollback), 1, "Wrong rollback length")
511 self.assertEqual(rid2, rid1, "Wrong project identifier")
512 content = self.auth.create_role.call_args[0][0]
513 self.assertEqual(content["name"], self.test_name, "Wrong role name")
514 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
515 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
516 self.assertEqual(
517 content["_admin"]["modified"],
518 content["_admin"]["created"],
519 "Wrong modification time",
520 )
521 with self.subTest(i=2):
522 rollback = []
523 with self.assertRaises(
524 EngineException, msg="Accepted wrong permissions"
525 ) as e:
526 self.topic.new(
527 rollback,
528 self.fake_session,
529 {"name": "other-role-name", "permissions": {"projects": True}},
530 )
531 self.assertEqual(len(rollback), 0, "Wrong rollback length")
532 self.assertEqual(
533 e.exception.http_code,
534 HTTPStatus.UNPROCESSABLE_ENTITY,
535 "Wrong HTTP status code",
536 )
537 self.assertIn(
538 "invalid permission '{}'".format("projects"),
539 norm(str(e.exception)),
540 "Wrong exception text",
541 )
542
543 def test_edit_role(self):
544 now = time()
545 rid = str(uuid4())
546 role = {
547 "_id": rid,
548 "name": self.test_name,
549 "permissions": {"tokens": True},
550 "_admin": {"created": now, "modified": now},
551 }
552 with self.subTest(i=1):
553 self.auth.get_role_list.side_effect = [[role], []]
554 self.auth.get_role.return_value = role
555 new_name = "new-role-name"
556 perms_in = {"tokens": False, "tokens:get": True}
557 perms_out = {
558 "default": False,
559 "admin": False,
560 "tokens": False,
561 "tokens:get": True,
562 }
563 self.topic.edit(
564 self.fake_session, rid, {"name": new_name, "permissions": perms_in}
565 )
566 content = self.auth.update_role.call_args[0][0]
567 self.assertEqual(content["_id"], rid, "Wrong role identifier")
568 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
569 self.assertGreater(
570 content["_admin"]["modified"], now, "Wrong modification time"
571 )
572 self.assertEqual(content["name"], new_name, "Wrong role name")
573 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
574 with self.subTest(i=2):
575 new_name = "other-role-name"
576 perms_in = {"tokens": False, "tokens:post": True}
577 self.auth.get_role_list.side_effect = [[role], []]
578 with self.assertRaises(
579 EngineException, msg="Accepted wrong permissions"
580 ) as e:
581 self.topic.edit(
582 self.fake_session, rid, {"name": new_name, "permissions": perms_in}
583 )
584 self.assertEqual(
585 e.exception.http_code,
586 HTTPStatus.UNPROCESSABLE_ENTITY,
587 "Wrong HTTP status code",
588 )
589 self.assertIn(
590 "invalid permission '{}'".format("tokens:post"),
591 norm(str(e.exception)),
592 "Wrong exception text",
593 )
594
595 def test_delete_role(self):
596 with self.subTest(i=1):
597 rid = str(uuid4())
598 role = {"_id": rid, "name": "other-role-name"}
599 self.auth.get_role_list.return_value = [role]
600 self.auth.get_role.return_value = role
601 self.auth.delete_role.return_value = {"deleted": 1}
602 self.auth.get_user_list.return_value = []
603 rc = self.topic.delete(self.fake_session, rid)
604 self.assertEqual(rc, {"deleted": 1}, "Wrong role deletion return info")
605 self.assertEqual(
606 self.auth.get_role_list.call_args[0][0]["_id"],
607 rid,
608 "Wrong role identifier",
609 )
610 self.assertEqual(
611 self.auth.get_role.call_args[0][0], rid, "Wrong role identifier"
612 )
613 self.assertEqual(
614 self.auth.delete_role.call_args[0][0], rid, "Wrong role identifier"
615 )
616
617 def test_conflict_on_new(self):
618 with self.subTest(i=1):
619 rollback = []
620 rid = str(uuid4())
621 with self.assertRaises(
622 EngineException, msg="Accepted uuid as role name"
623 ) as e:
624 self.topic.new(rollback, self.fake_session, {"name": rid})
625 self.assertEqual(len(rollback), 0, "Wrong rollback length")
626 self.assertEqual(
627 e.exception.http_code,
628 HTTPStatus.UNPROCESSABLE_ENTITY,
629 "Wrong HTTP status code",
630 )
631 self.assertIn(
632 "role name '{}' cannot have an uuid format".format(rid),
633 norm(str(e.exception)),
634 "Wrong exception text",
635 )
636 with self.subTest(i=2):
637 rollback = []
638 self.auth.get_role_list.return_value = [
639 {"_id": str(uuid4()), "name": self.test_name}
640 ]
641 with self.assertRaises(
642 EngineException, msg="Accepted existing role name"
643 ) as e:
644 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
645 self.assertEqual(len(rollback), 0, "Wrong rollback length")
646 self.assertEqual(
647 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
648 )
649 self.assertIn(
650 "role name '{}' exists".format(self.test_name),
651 norm(str(e.exception)),
652 "Wrong exception text",
653 )
654
655 def test_conflict_on_edit(self):
656 rid = str(uuid4())
657 with self.subTest(i=1):
658 self.auth.get_role_list.return_value = [
659 {"_id": rid, "name": self.test_name, "permissions": {}}
660 ]
661 new_name = str(uuid4())
662 with self.assertRaises(
663 EngineException, msg="Accepted uuid as role name"
664 ) as e:
665 self.topic.edit(self.fake_session, rid, {"name": new_name})
666 self.assertEqual(
667 e.exception.http_code,
668 HTTPStatus.UNPROCESSABLE_ENTITY,
669 "Wrong HTTP status code",
670 )
671 self.assertIn(
672 "role name '{}' cannot have an uuid format".format(new_name),
673 norm(str(e.exception)),
674 "Wrong exception text",
675 )
676 for i, role_name in enumerate(["system_admin", "project_admin"], start=2):
677 with self.subTest(i=i):
678 rid = str(uuid4())
679 self.auth.get_role.return_value = {
680 "_id": rid,
681 "name": role_name,
682 "permissions": {},
683 }
684 with self.assertRaises(
685 EngineException,
686 msg="Accepted renaming of role '{}'".format(role_name),
687 ) as e:
688 self.topic.edit(self.fake_session, rid, {"name": "new-name"})
689 self.assertEqual(
690 e.exception.http_code,
691 HTTPStatus.FORBIDDEN,
692 "Wrong HTTP status code",
693 )
694 self.assertIn(
695 "you cannot rename role '{}'".format(role_name),
696 norm(str(e.exception)),
697 "Wrong exception text",
698 )
699 with self.subTest(i=i + 1):
700 new_name = "new-role-name"
701 self.auth.get_role_list.side_effect = [
702 [{"_id": rid, "name": self.test_name, "permissions": {}}],
703 [{"_id": str(uuid4()), "name": new_name, "permissions": {}}],
704 ]
705 self.auth.get_role.return_value = {
706 "_id": rid,
707 "name": self.test_name,
708 "permissions": {},
709 }
710 with self.assertRaises(
711 EngineException, msg="Accepted existing role name"
712 ) as e:
713 self.topic.edit(self.fake_session, rid, {"name": new_name})
714 self.assertEqual(
715 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
716 )
717 self.assertIn(
718 "role name '{}' exists".format(new_name),
719 norm(str(e.exception)),
720 "Wrong exception text",
721 )
722
723 def test_conflict_on_del(self):
724 for i, role_name in enumerate(["system_admin", "project_admin"], start=1):
725 with self.subTest(i=i):
726 rid = str(uuid4())
727 role = {"_id": rid, "name": role_name}
728 self.auth.get_role_list.return_value = [role]
729 self.auth.get_role.return_value = role
730 with self.assertRaises(
731 EngineException,
732 msg="Accepted deletion of role '{}'".format(role_name),
733 ) as e:
734 self.topic.delete(self.fake_session, rid)
735 self.assertEqual(
736 e.exception.http_code,
737 HTTPStatus.FORBIDDEN,
738 "Wrong HTTP status code",
739 )
740 self.assertIn(
741 "you cannot delete role '{}'".format(role_name),
742 norm(str(e.exception)),
743 "Wrong exception text",
744 )
745 with self.subTest(i=i + 1):
746 rid = str(uuid4())
747 name = "other-role-name"
748 role = {"_id": rid, "name": name}
749 self.auth.get_role_list.return_value = [role]
750 self.auth.get_role.return_value = role
751 self.auth.get_user_list.return_value = [
752 {
753 "_id": str(uuid4()),
754 "username": self.test_name,
755 "project_role_mappings": [{"project": str(uuid4()), "role": rid}],
756 }
757 ]
758 with self.assertRaises(
759 EngineException, msg="Accepted deletion of used role"
760 ) as e:
761 self.topic.delete(self.fake_session, rid)
762 self.assertEqual(
763 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
764 )
765 self.assertIn(
766 "role '{}' ({}) is being used by user '{}'".format(
767 name, rid, self.test_name
768 ),
769 norm(str(e.exception)),
770 "Wrong exception text",
771 )
772
773
774 class Test_UserTopicAuth(TestCase):
775 @classmethod
776 def setUpClass(cls):
777 cls.test_name = "test-user-topic"
778
779 def setUp(self):
780 self.db = Mock(dbbase.DbBase())
781 self.fs = Mock(fsbase.FsBase())
782 self.msg = Mock(msgbase.MsgBase())
783 self.auth = Mock(authconn.Authconn(None, None, None))
784 self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth)
785 self.fake_session = {
786 "username": test_name,
787 "project_id": (test_pid,),
788 "method": None,
789 "admin": True,
790 "force": False,
791 "public": False,
792 "allow_show_user_project_role": True,
793 }
794 self.topic.check_quota = Mock(return_value=None) # skip quota
795
796 def test_new_user(self):
797 uid1 = str(uuid4())
798 pid = str(uuid4())
799 self.auth.get_user_list.return_value = []
800 self.auth.get_project.return_value = {"_id": pid, "name": "some_project"}
801 self.auth.create_user.return_value = {"_id": uid1, "username": self.test_name}
802 with self.subTest(i=1):
803 rollback = []
804 rid = str(uuid4())
805 self.auth.get_role.return_value = {"_id": rid, "name": "some_role"}
806 prms_in = [{"project": "some_project", "role": "some_role"}]
807 prms_out = [{"project": pid, "role": rid}]
808 uid2, oid = self.topic.new(
809 rollback,
810 self.fake_session,
811 {
812 "username": self.test_name,
813 "password": self.test_name,
814 "project_role_mappings": prms_in,
815 },
816 )
817 self.assertEqual(len(rollback), 1, "Wrong rollback length")
818 self.assertEqual(uid2, uid1, "Wrong project identifier")
819 content = self.auth.create_user.call_args[0][0]
820 self.assertEqual(content["username"], self.test_name, "Wrong project name")
821 self.assertEqual(content["password"], self.test_name, "Wrong password")
822 self.assertEqual(
823 content["project_role_mappings"],
824 prms_out,
825 "Wrong project-role mappings",
826 )
827 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
828 self.assertEqual(
829 content["_admin"]["modified"],
830 content["_admin"]["created"],
831 "Wrong modification time",
832 )
833 with self.subTest(i=2):
834 rollback = []
835 def_rid = str(uuid4())
836 def_role = {"_id": def_rid, "name": "project_admin"}
837 self.auth.get_role.return_value = def_role
838 self.auth.get_role_list.return_value = [def_role]
839 prms_out = [{"project": pid, "role": def_rid}]
840 uid2, oid = self.topic.new(
841 rollback,
842 self.fake_session,
843 {
844 "username": self.test_name,
845 "password": self.test_name,
846 "projects": ["some_project"],
847 },
848 )
849 self.assertEqual(len(rollback), 1, "Wrong rollback length")
850 self.assertEqual(uid2, uid1, "Wrong project identifier")
851 content = self.auth.create_user.call_args[0][0]
852 self.assertEqual(content["username"], self.test_name, "Wrong project name")
853 self.assertEqual(content["password"], self.test_name, "Wrong password")
854 self.assertEqual(
855 content["project_role_mappings"],
856 prms_out,
857 "Wrong project-role mappings",
858 )
859 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
860 self.assertEqual(
861 content["_admin"]["modified"],
862 content["_admin"]["created"],
863 "Wrong modification time",
864 )
865 with self.subTest(i=3):
866 rollback = []
867 with self.assertRaises(
868 EngineException, msg="Accepted wrong project-role mappings"
869 ) as e:
870 self.topic.new(
871 rollback,
872 self.fake_session,
873 {
874 "username": "other-project-name",
875 "password": "other-password",
876 "project_role_mappings": [{}],
877 },
878 )
879 self.assertEqual(len(rollback), 0, "Wrong rollback length")
880 self.assertEqual(
881 e.exception.http_code,
882 HTTPStatus.UNPROCESSABLE_ENTITY,
883 "Wrong HTTP status code",
884 )
885 self.assertIn(
886 "format error at '{}' '{}'".format(
887 "project_role_mappings:{}", "'{}' is a required property"
888 ).format(0, "project"),
889 norm(str(e.exception)),
890 "Wrong exception text",
891 )
892 with self.subTest(i=4):
893 rollback = []
894 with self.assertRaises(EngineException, msg="Accepted wrong projects") as e:
895 self.topic.new(
896 rollback,
897 self.fake_session,
898 {
899 "username": "other-project-name",
900 "password": "other-password",
901 "projects": [],
902 },
903 )
904 self.assertEqual(len(rollback), 0, "Wrong rollback length")
905 self.assertEqual(
906 e.exception.http_code,
907 HTTPStatus.UNPROCESSABLE_ENTITY,
908 "Wrong HTTP status code",
909 )
910 self.assertIn(
911 "format error at '{}' '{}'".format(
912 "projects", "{} is too short"
913 ).format([]),
914 norm(str(e.exception)),
915 "Wrong exception text",
916 )
917
918 def test_edit_user(self):
919 now = time()
920 uid = str(uuid4())
921 pid1 = str(uuid4())
922 rid1 = str(uuid4())
923 prms = [
924 {
925 "project": pid1,
926 "project_name": "project-1",
927 "role": rid1,
928 "role_name": "role-1",
929 }
930 ]
931 user = {
932 "_id": uid,
933 "username": self.test_name,
934 "project_role_mappings": prms,
935 "_admin": {"created": now, "modified": now},
936 }
937 with self.subTest(i=1):
938 self.auth.get_user_list.side_effect = [[user], []]
939 self.auth.get_user.return_value = user
940 pid2 = str(uuid4())
941 rid2 = str(uuid4())
942 self.auth.get_project.side_effect = [
943 {"_id": pid2, "name": "project-2"},
944 {"_id": pid1, "name": "project-1"},
945 ]
946 self.auth.get_role.side_effect = [
947 {"_id": rid2, "name": "role-2"},
948 {"_id": rid1, "name": "role-1"},
949 ]
950 new_name = "new-user-name"
951 new_pasw = "new-password"
952 add_prms = [{"project": pid2, "role": rid2}]
953 rem_prms = [{"project": pid1, "role": rid1}]
954 self.topic.edit(
955 self.fake_session,
956 uid,
957 {
958 "username": new_name,
959 "password": new_pasw,
960 "add_project_role_mappings": add_prms,
961 "remove_project_role_mappings": rem_prms,
962 },
963 )
964 content = self.auth.update_user.call_args[0][0]
965 self.assertEqual(content["_id"], uid, "Wrong user identifier")
966 self.assertEqual(content["username"], new_name, "Wrong user name")
967 self.assertEqual(content["password"], new_pasw, "Wrong user password")
968 self.assertEqual(
969 content["add_project_role_mappings"],
970 add_prms,
971 "Wrong project-role mappings to add",
972 )
973 self.assertEqual(
974 content["remove_project_role_mappings"],
975 prms,
976 "Wrong project-role mappings to remove",
977 )
978 with self.subTest(i=2):
979 new_name = "other-user-name"
980 new_prms = [{}]
981 self.auth.get_role_list.side_effect = [[user], []]
982 self.auth.get_user_list.side_effect = [[user]]
983 with self.assertRaises(
984 EngineException, msg="Accepted wrong project-role mappings"
985 ) as e:
986 self.topic.edit(
987 self.fake_session,
988 uid,
989 {"username": new_name, "project_role_mappings": new_prms},
990 )
991 self.assertEqual(
992 e.exception.http_code,
993 HTTPStatus.UNPROCESSABLE_ENTITY,
994 "Wrong HTTP status code",
995 )
996 self.assertIn(
997 "format error at '{}' '{}'".format(
998 "project_role_mappings:{}", "'{}' is a required property"
999 ).format(0, "project"),
1000 norm(str(e.exception)),
1001 "Wrong exception text",
1002 )
1003 with self.subTest(i=3):
1004 self.auth.get_user_list.side_effect = [[user], []]
1005 self.auth.get_user.return_value = user
1006 old_password = self.test_name
1007 new_pasw = "new-password"
1008 self.topic.edit(
1009 self.fake_session,
1010 uid,
1011 {
1012 "old_password": old_password,
1013 "password": new_pasw,
1014 },
1015 )
1016 content = self.auth.update_user.call_args[0][0]
1017 self.assertEqual(
1018 content["old_password"], old_password, "Wrong old password"
1019 )
1020 self.assertEqual(content["password"], new_pasw, "Wrong user password")
1021
1022 def test_delete_user(self):
1023 with self.subTest(i=1):
1024 uid = str(uuid4())
1025 self.fake_session["username"] = self.test_name
1026 user = user = {
1027 "_id": uid,
1028 "username": "other-user-name",
1029 "project_role_mappings": [],
1030 }
1031 self.auth.get_user.return_value = user
1032 self.auth.delete_user.return_value = {"deleted": 1}
1033 rc = self.topic.delete(self.fake_session, uid)
1034 self.assertEqual(rc, {"deleted": 1}, "Wrong user deletion return info")
1035 self.assertEqual(
1036 self.auth.get_user.call_args[0][0], uid, "Wrong user identifier"
1037 )
1038 self.assertEqual(
1039 self.auth.delete_user.call_args[0][0], uid, "Wrong user identifier"
1040 )
1041
1042 def test_conflict_on_new(self):
1043 with self.subTest(i=1):
1044 rollback = []
1045 uid = str(uuid4())
1046 with self.assertRaises(
1047 EngineException, msg="Accepted uuid as username"
1048 ) as e:
1049 self.topic.new(
1050 rollback,
1051 self.fake_session,
1052 {
1053 "username": uid,
1054 "password": self.test_name,
1055 "projects": [test_pid],
1056 },
1057 )
1058 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1059 self.assertEqual(
1060 e.exception.http_code,
1061 HTTPStatus.UNPROCESSABLE_ENTITY,
1062 "Wrong HTTP status code",
1063 )
1064 self.assertIn(
1065 "username '{}' cannot have a uuid format".format(uid),
1066 norm(str(e.exception)),
1067 "Wrong exception text",
1068 )
1069 with self.subTest(i=2):
1070 rollback = []
1071 self.auth.get_user_list.return_value = [
1072 {"_id": str(uuid4()), "username": self.test_name}
1073 ]
1074 with self.assertRaises(
1075 EngineException, msg="Accepted existing username"
1076 ) as e:
1077 self.topic.new(
1078 rollback,
1079 self.fake_session,
1080 {
1081 "username": self.test_name,
1082 "password": self.test_name,
1083 "projects": [test_pid],
1084 },
1085 )
1086 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1087 self.assertEqual(
1088 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1089 )
1090 self.assertIn(
1091 "username '{}' is already used".format(self.test_name),
1092 norm(str(e.exception)),
1093 "Wrong exception text",
1094 )
1095 with self.subTest(i=3):
1096 rollback = []
1097 self.auth.get_user_list.return_value = []
1098 self.auth.get_role_list.side_effect = [[], []]
1099 with self.assertRaises(
1100 AuthconnNotFoundException, msg="Accepted user without default role"
1101 ) as e:
1102 self.topic.new(
1103 rollback,
1104 self.fake_session,
1105 {
1106 "username": self.test_name,
1107 "password": self.test_name,
1108 "projects": [str(uuid4())],
1109 },
1110 )
1111 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1112 self.assertEqual(
1113 e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code"
1114 )
1115 self.assertIn(
1116 "can't find default role for user '{}'".format(self.test_name),
1117 norm(str(e.exception)),
1118 "Wrong exception text",
1119 )
1120
1121 def test_conflict_on_edit(self):
1122 uid = str(uuid4())
1123 with self.subTest(i=1):
1124 self.auth.get_user_list.return_value = [
1125 {"_id": uid, "username": self.test_name}
1126 ]
1127 new_name = str(uuid4())
1128 with self.assertRaises(
1129 EngineException, msg="Accepted uuid as username"
1130 ) as e:
1131 self.topic.edit(self.fake_session, uid, {"username": new_name})
1132 self.assertEqual(
1133 e.exception.http_code,
1134 HTTPStatus.UNPROCESSABLE_ENTITY,
1135 "Wrong HTTP status code",
1136 )
1137 self.assertIn(
1138 "username '{}' cannot have an uuid format".format(new_name),
1139 norm(str(e.exception)),
1140 "Wrong exception text",
1141 )
1142 with self.subTest(i=2):
1143 self.auth.get_user_list.return_value = [
1144 {"_id": uid, "username": self.test_name}
1145 ]
1146 self.auth.get_role_list.side_effect = [[], []]
1147 with self.assertRaises(
1148 AuthconnNotFoundException, msg="Accepted user without default role"
1149 ) as e:
1150 self.topic.edit(self.fake_session, uid, {"projects": [str(uuid4())]})
1151 self.assertEqual(
1152 e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code"
1153 )
1154 self.assertIn(
1155 "can't find a default role for user '{}'".format(self.test_name),
1156 norm(str(e.exception)),
1157 "Wrong exception text",
1158 )
1159 with self.subTest(i=3):
1160 admin_uid = str(uuid4())
1161 self.auth.get_user_list.return_value = [
1162 {"_id": admin_uid, "username": "admin"}
1163 ]
1164 with self.assertRaises(
1165 EngineException,
1166 msg="Accepted removing system_admin role from admin user",
1167 ) as e:
1168 self.topic.edit(
1169 self.fake_session,
1170 admin_uid,
1171 {
1172 "remove_project_role_mappings": [
1173 {"project": "admin", "role": "system_admin"}
1174 ]
1175 },
1176 )
1177 self.assertEqual(
1178 e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code"
1179 )
1180 self.assertIn(
1181 "you cannot remove system_admin role from admin user",
1182 norm(str(e.exception)),
1183 "Wrong exception text",
1184 )
1185 with self.subTest(i=4):
1186 new_name = "new-user-name"
1187 self.auth.get_user_list.side_effect = [
1188 [{"_id": uid, "name": self.test_name}],
1189 [{"_id": str(uuid4()), "name": new_name}],
1190 ]
1191 with self.assertRaises(
1192 EngineException, msg="Accepted existing username"
1193 ) as e:
1194 self.topic.edit(self.fake_session, uid, {"username": new_name})
1195 self.assertEqual(
1196 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1197 )
1198 self.assertIn(
1199 "username '{}' is already used".format(new_name),
1200 norm(str(e.exception)),
1201 "Wrong exception text",
1202 )
1203
1204 def test_conflict_on_del(self):
1205 with self.subTest(i=1):
1206 uid = str(uuid4())
1207 self.fake_session["username"] = self.test_name
1208 user = user = {
1209 "_id": uid,
1210 "username": self.test_name,
1211 "project_role_mappings": [],
1212 }
1213 self.auth.get_user.return_value = user
1214 with self.assertRaises(
1215 EngineException, msg="Accepted deletion of own user"
1216 ) as e:
1217 self.topic.delete(self.fake_session, uid)
1218 self.assertEqual(
1219 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1220 )
1221 self.assertIn(
1222 "you cannot delete your own login user",
1223 norm(str(e.exception)),
1224 "Wrong exception text",
1225 )
1226
1227
1228 class Test_CommonVimWimSdn(TestCase):
1229 @classmethod
1230 def setUpClass(cls):
1231 cls.test_name = "test-cim-topic" # CIM = Common Infrastructure Manager
1232
1233 def setUp(self):
1234 self.db = Mock(dbbase.DbBase())
1235 self.fs = Mock(fsbase.FsBase())
1236 self.msg = Mock(msgbase.MsgBase())
1237 self.auth = Mock(authconn.Authconn(None, None, None))
1238 self.topic = CommonVimWimSdn(self.db, self.fs, self.msg, self.auth)
1239 # Use WIM schemas for testing because they are the simplest
1240 self.topic._send_msg = Mock()
1241 self.topic.topic = "wims"
1242 self.topic.schema_new = validation.wim_account_new_schema
1243 self.topic.schema_edit = validation.wim_account_edit_schema
1244 self.fake_session = {
1245 "username": test_name,
1246 "project_id": (test_pid,),
1247 "method": None,
1248 "admin": True,
1249 "force": False,
1250 "public": False,
1251 "allow_show_user_project_role": True,
1252 }
1253 self.topic.check_quota = Mock(return_value=None) # skip quota
1254
1255 def test_new_cvws(self):
1256 test_url = "http://0.0.0.0:0"
1257 with self.subTest(i=1):
1258 rollback = []
1259 test_type = "fake"
1260 self.db.get_one.return_value = None
1261 self.db.create.side_effect = lambda self, content: content["_id"]
1262 cid, oid = self.topic.new(
1263 rollback,
1264 self.fake_session,
1265 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type},
1266 )
1267 self.assertEqual(len(rollback), 1, "Wrong rollback length")
1268 args = self.db.create.call_args[0]
1269 content = args[1]
1270 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
1271 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
1272 self.assertEqual(content["name"], self.test_name, "Wrong CIM name")
1273 self.assertEqual(content["wim_url"], test_url, "Wrong URL")
1274 self.assertEqual(content["wim_type"], test_type, "Wrong CIM type")
1275 self.assertEqual(content["schema_version"], "1.11", "Wrong schema version")
1276 self.assertEqual(content["op_id"], oid, "Wrong operation identifier")
1277 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
1278 self.assertEqual(
1279 content["_admin"]["modified"],
1280 content["_admin"]["created"],
1281 "Wrong modification time",
1282 )
1283 self.assertEqual(
1284 content["_admin"]["operationalState"],
1285 "PROCESSING",
1286 "Wrong operational state",
1287 )
1288 self.assertEqual(
1289 content["_admin"]["projects_read"],
1290 [test_pid],
1291 "Wrong read-only projects",
1292 )
1293 self.assertEqual(
1294 content["_admin"]["projects_write"],
1295 [test_pid],
1296 "Wrong read/write projects",
1297 )
1298 self.assertIsNone(
1299 content["_admin"]["current_operation"], "Wrong current operation"
1300 )
1301 self.assertEqual(
1302 len(content["_admin"]["operations"]), 1, "Wrong number of operations"
1303 )
1304 operation = content["_admin"]["operations"][0]
1305 self.assertEqual(
1306 operation["lcmOperationType"], "create", "Wrong operation type"
1307 )
1308 self.assertEqual(
1309 operation["operationState"], "PROCESSING", "Wrong operation state"
1310 )
1311 self.assertGreater(
1312 operation["startTime"],
1313 content["_admin"]["created"],
1314 "Wrong operation start time",
1315 )
1316 self.assertGreater(
1317 operation["statusEnteredTime"],
1318 content["_admin"]["created"],
1319 "Wrong operation status enter time",
1320 )
1321 self.assertEqual(
1322 operation["detailed-status"], "", "Wrong operation detailed status info"
1323 )
1324 self.assertIsNone(
1325 operation["operationParams"], "Wrong operation parameters"
1326 )
1327 # This test is disabled. From Feature 8030 we admit all WIM/SDN types
1328 # with self.subTest(i=2):
1329 # rollback = []
1330 # test_type = "bad_type"
1331 # with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e:
1332 # self.topic.new(rollback, self.fake_session,
1333 # {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
1334 # self.assertEqual(len(rollback), 0, "Wrong rollback length")
1335 # self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
1336 # self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type,""),
1337 # norm(str(e.exception)), "Wrong exception text")
1338
1339 def test_conflict_on_new(self):
1340 with self.subTest(i=1):
1341 rollback = []
1342 test_url = "http://0.0.0.0:0"
1343 test_type = "fake"
1344 self.db.get_one.return_value = {"_id": str(uuid4()), "name": self.test_name}
1345 with self.assertRaises(
1346 EngineException, msg="Accepted existing CIM name"
1347 ) as e:
1348 self.topic.new(
1349 rollback,
1350 self.fake_session,
1351 {
1352 "name": self.test_name,
1353 "wim_url": test_url,
1354 "wim_type": test_type,
1355 },
1356 )
1357 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1358 self.assertEqual(
1359 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1360 )
1361 self.assertIn(
1362 "name '{}' already exists for {}".format(
1363 self.test_name, self.topic.topic
1364 ),
1365 norm(str(e.exception)),
1366 "Wrong exception text",
1367 )
1368
1369 def test_edit_cvws(self):
1370 now = time()
1371 cid = str(uuid4())
1372 test_url = "http://0.0.0.0:0"
1373 test_type = "fake"
1374 cvws = {
1375 "_id": cid,
1376 "name": self.test_name,
1377 "wim_url": test_url,
1378 "wim_type": test_type,
1379 "_admin": {
1380 "created": now,
1381 "modified": now,
1382 "operations": [{"lcmOperationType": "create"}],
1383 },
1384 }
1385 with self.subTest(i=1):
1386 new_name = "new-cim-name"
1387 new_url = "https://1.1.1.1:1"
1388 new_type = "onos"
1389 self.db.get_one.side_effect = [cvws, None]
1390 self.db.replace.return_value = {"updated": 1}
1391 # self.db.encrypt.side_effect = [b64str(), b64str()]
1392 self.topic.edit(
1393 self.fake_session,
1394 cid,
1395 {"name": new_name, "wim_url": new_url, "wim_type": new_type},
1396 )
1397 args = self.db.replace.call_args[0]
1398 content = args[2]
1399 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
1400 self.assertEqual(args[1], cid, "Wrong CIM identifier")
1401 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
1402 self.assertEqual(content["name"], new_name, "Wrong CIM name")
1403 self.assertEqual(content["wim_type"], new_type, "Wrong CIM type")
1404 self.assertEqual(content["wim_url"], new_url, "Wrong URL")
1405 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
1406 self.assertGreater(
1407 content["_admin"]["modified"],
1408 content["_admin"]["created"],
1409 "Wrong modification time",
1410 )
1411 self.assertEqual(
1412 len(content["_admin"]["operations"]), 2, "Wrong number of operations"
1413 )
1414 operation = content["_admin"]["operations"][1]
1415 self.assertEqual(
1416 operation["lcmOperationType"], "edit", "Wrong operation type"
1417 )
1418 self.assertEqual(
1419 operation["operationState"], "PROCESSING", "Wrong operation state"
1420 )
1421 self.assertGreater(
1422 operation["startTime"],
1423 content["_admin"]["modified"],
1424 "Wrong operation start time",
1425 )
1426 self.assertGreater(
1427 operation["statusEnteredTime"],
1428 content["_admin"]["modified"],
1429 "Wrong operation status enter time",
1430 )
1431 self.assertEqual(
1432 operation["detailed-status"], "", "Wrong operation detailed status info"
1433 )
1434 self.assertIsNone(
1435 operation["operationParams"], "Wrong operation parameters"
1436 )
1437 with self.subTest(i=2):
1438 self.db.get_one.side_effect = [cvws]
1439 with self.assertRaises(EngineException, msg="Accepted wrong property") as e:
1440 self.topic.edit(
1441 self.fake_session,
1442 str(uuid4()),
1443 {"name": "new-name", "extra_prop": "anything"},
1444 )
1445 self.assertEqual(
1446 e.exception.http_code,
1447 HTTPStatus.UNPROCESSABLE_ENTITY,
1448 "Wrong HTTP status code",
1449 )
1450 self.assertIn(
1451 "format error '{}'".format(
1452 "additional properties are not allowed ('{}' was unexpected)"
1453 ).format("extra_prop"),
1454 norm(str(e.exception)),
1455 "Wrong exception text",
1456 )
1457
1458 def test_conflict_on_edit(self):
1459 with self.subTest(i=1):
1460 cid = str(uuid4())
1461 new_name = "new-cim-name"
1462 self.db.get_one.side_effect = [
1463 {"_id": cid, "name": self.test_name},
1464 {"_id": str(uuid4()), "name": new_name},
1465 ]
1466 with self.assertRaises(
1467 EngineException, msg="Accepted existing CIM name"
1468 ) as e:
1469 self.topic.edit(self.fake_session, cid, {"name": new_name})
1470 self.assertEqual(
1471 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1472 )
1473 self.assertIn(
1474 "name '{}' already exists for {}".format(new_name, self.topic.topic),
1475 norm(str(e.exception)),
1476 "Wrong exception text",
1477 )
1478
1479 def test_delete_cvws(self):
1480 cid = str(uuid4())
1481 ro_pid = str(uuid4())
1482 rw_pid = str(uuid4())
1483 cvws = {"_id": cid, "name": self.test_name}
1484 self.db.get_list.return_value = []
1485 with self.subTest(i=1):
1486 cvws["_admin"] = {
1487 "projects_read": [test_pid, ro_pid, rw_pid],
1488 "projects_write": [test_pid, rw_pid],
1489 }
1490 self.db.get_one.return_value = cvws
1491 oid = self.topic.delete(self.fake_session, cid)
1492 self.assertIsNone(oid, "Wrong operation identifier")
1493 self.assertEqual(
1494 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1495 )
1496 self.assertEqual(
1497 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1498 )
1499 self.assertEqual(
1500 self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic"
1501 )
1502 self.assertEqual(
1503 self.db.set_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1504 )
1505 self.assertEqual(
1506 self.db.set_one.call_args[1]["update_dict"],
1507 None,
1508 "Wrong read-only projects update",
1509 )
1510 self.assertEqual(
1511 self.db.set_one.call_args[1]["pull_list"],
1512 {
1513 "_admin.projects_read": (test_pid,),
1514 "_admin.projects_write": (test_pid,),
1515 },
1516 "Wrong read/write projects update",
1517 )
1518 self.topic._send_msg.assert_not_called()
1519 with self.subTest(i=2):
1520 now = time()
1521 cvws["_admin"] = {
1522 "projects_read": [test_pid],
1523 "projects_write": [test_pid],
1524 "operations": [],
1525 }
1526 self.db.get_one.return_value = cvws
1527 oid = self.topic.delete(self.fake_session, cid)
1528 self.assertEqual(oid, cid + ":0", "Wrong operation identifier")
1529 self.assertEqual(
1530 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1531 )
1532 self.assertEqual(
1533 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1534 )
1535 self.assertEqual(
1536 self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic"
1537 )
1538 self.assertEqual(
1539 self.db.set_one.call_args[0][1]["_id"], cid, "Wrong user identifier"
1540 )
1541 self.assertEqual(
1542 self.db.set_one.call_args[1]["update_dict"],
1543 {"_admin.to_delete": True},
1544 "Wrong _admin.to_delete update",
1545 )
1546 operation = self.db.set_one.call_args[1]["push"]["_admin.operations"]
1547 self.assertEqual(
1548 operation["lcmOperationType"], "delete", "Wrong operation type"
1549 )
1550 self.assertEqual(
1551 operation["operationState"], "PROCESSING", "Wrong operation state"
1552 )
1553 self.assertEqual(
1554 operation["detailed-status"], "", "Wrong operation detailed status"
1555 )
1556 self.assertIsNone(
1557 operation["operationParams"], "Wrong operation parameters"
1558 )
1559 self.assertGreater(
1560 operation["startTime"], now, "Wrong operation start time"
1561 )
1562 self.assertGreater(
1563 operation["statusEnteredTime"], now, "Wrong operation status enter time"
1564 )
1565 self.topic._send_msg.assert_called_once_with(
1566 "delete", {"_id": cid, "op_id": cid + ":0"}, not_send_msg=None
1567 )
1568 with self.subTest(i=3):
1569 cvws["_admin"] = {
1570 "projects_read": [],
1571 "projects_write": [],
1572 "operations": [],
1573 }
1574 self.db.get_one.return_value = cvws
1575 self.topic._send_msg.reset_mock()
1576 self.db.get_one.reset_mock()
1577 self.db.del_one.reset_mock()
1578 self.fake_session["force"] = True # to force deletion
1579 self.fake_session["admin"] = True # to force deletion
1580 self.fake_session["project_id"] = [] # to force deletion
1581 oid = self.topic.delete(self.fake_session, cid)
1582 self.assertIsNone(oid, "Wrong operation identifier")
1583 self.assertEqual(
1584 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1585 )
1586 self.assertEqual(
1587 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1588 )
1589 self.assertEqual(
1590 self.db.del_one.call_args[0][0], self.topic.topic, "Wrong topic"
1591 )
1592 self.assertEqual(
1593 self.db.del_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1594 )
1595 self.topic._send_msg.assert_called_once_with(
1596 "deleted", {"_id": cid, "op_id": None}, not_send_msg=None
1597 )
1598
1599
1600 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_new")
1601 class TestVimAccountTopic(TestCase):
1602 def setUp(self):
1603 self.db = Mock(dbbase.DbBase())
1604 self.fs = Mock(fsbase.FsBase())
1605 self.msg = Mock(msgbase.MsgBase())
1606 self.auth = Mock(authconn.Authconn(None, None, None))
1607 self.topic = VimAccountTopic(self.db, self.fs, self.msg, self.auth)
1608 self.topic.check_quota = Mock(return_value=None) # skip quota
1609
1610 self.fake_session = {
1611 "username": test_name,
1612 "project_id": (test_pid,),
1613 "method": None,
1614 "admin": True,
1615 "force": False,
1616 "public": False,
1617 "allow_show_user_project_role": True,
1618 }
1619
1620 def check_invalid_indata_raises_exception(self, indata, mock_common_vim_wim_sdn):
1621 with self.assertRaises(EngineException) as error:
1622 self.topic.check_conflict_on_new(self.fake_session, indata)
1623 mock_common_vim_wim_sdn.assert_called_with(self.fake_session, indata)
1624 error_msg = "Invalid config for VIM account '{}'.".format(indata["name"])
1625 self.assertEqual(str(error.exception), error_msg)
1626
1627 def test_check_conflict_on_new_vim_type_paas(self, mock_common_vim_wim_sdn):
1628 valid_juju_paas_config = {
1629 "paas_provider": "juju",
1630 "ca_cert_content": "file_content",
1631 "cloud": "microk8s",
1632 "cloud_credentials": "cloud_credential_name",
1633 "authorized_keys": "keys",
1634 }
1635 indata = {
1636 "name": "juju_paas",
1637 "vim_type": "paas",
1638 "description": None,
1639 "vim_url": "http://0.0.0.0:0",
1640 "vim_user": "some_user",
1641 "vim_password": "some_password",
1642 "vim_tenant_name": "null",
1643 "config": valid_juju_paas_config,
1644 }
1645 self.topic.check_conflict_on_new(self.fake_session, indata)
1646 mock_common_vim_wim_sdn.assert_called_once_with(self.fake_session, indata)
1647
1648 def test_check_conflict_on_new_vim_type_paas_config_missing(
1649 self, mock_common_vim_wim_sdn
1650 ):
1651 indata = {
1652 "name": "juju_paas",
1653 "vim_type": "paas",
1654 "description": None,
1655 "vim_url": "http://0.0.0.0:0",
1656 "vim_user": "some_user",
1657 "vim_password": "some_password",
1658 "vim_tenant_name": "null",
1659 }
1660 self.check_invalid_indata_raises_exception(indata, mock_common_vim_wim_sdn)
1661
1662 def test_check_conflict_on_new_vim_type_paas_invalid_config(
1663 self, mock_common_vim_wim_sdn
1664 ):
1665 invalid_configs = [
1666 {
1667 "paas_provider": "some_paas_provider",
1668 "ca_cert_content": "file_content",
1669 "cloud": "microk8s",
1670 "cloud_credentials": "cloud_credential_name",
1671 },
1672 {
1673 "ca_cert_content": "file_content",
1674 "cloud": "microk8s",
1675 "cloud_credentials": "cloud_credential_name",
1676 },
1677 {
1678 "paas_provider": "juju",
1679 "cloud": "microk8s",
1680 "cloud_credentials": "cloud_credential_name",
1681 },
1682 {
1683 "paas_provider": "juju",
1684 "ca_cert_content": "file_content",
1685 "cloud_credentials": "cloud_credential_name",
1686 },
1687 {
1688 "paas_provider": "juju",
1689 "ca_cert_content": "file_content",
1690 "cloud": "microk8s",
1691 },
1692 {
1693 "some_param": None,
1694 },
1695 {},
1696 ]
1697 for config in invalid_configs:
1698 with self.subTest():
1699 indata = {
1700 "name": "juju_paas",
1701 "vim_type": "paas",
1702 "description": None,
1703 "vim_url": "http://0.0.0.0:0",
1704 "vim_user": "some_user",
1705 "vim_password": "some_password",
1706 "vim_tenant_name": "null",
1707 "config": config,
1708 }
1709 self.check_invalid_indata_raises_exception(
1710 indata, mock_common_vim_wim_sdn
1711 )
1712
1713 def test_kafka_message_is_not_sent_if_paas_vim(self, mock_common_vim_wim_sdn):
1714 valid_juju_paas_config = {
1715 "paas_provider": "juju",
1716 "ca_cert_content": "file_content",
1717 "cloud": "microk8s",
1718 "cloud_credentials": "cloud_credential_name",
1719 "authorized_keys": "keys",
1720 }
1721 indata = {
1722 "name": "juju_paas",
1723 "vim_type": "paas",
1724 "description": None,
1725 "vim_url": "http://0.0.0.0:0",
1726 "vim_user": "some_user",
1727 "vim_password": "some_password",
1728 "vim_tenant_name": "null",
1729 "config": valid_juju_paas_config,
1730 }
1731 rollback = []
1732 self.topic.temporal = Mock()
1733
1734 self.topic.new(rollback, self.fake_session, indata)
1735 self.assertEqual(len(rollback), 1, "Wrong rollback length")
1736 self.msg.write.assert_not_called()
1737 self.topic.temporal.start_vim_workflow.assert_called_once()
1738
1739 def test_kafka_message_is_sent_if_not_paas_vim(self, mock_common_vim_wim_sdn):
1740 indata = {
1741 "name": "juju_paas",
1742 "vim_type": "openstack",
1743 "description": None,
1744 "vim_url": "http://0.0.0.0:0",
1745 "vim_user": "some_user",
1746 "vim_password": "some_password",
1747 "vim_tenant_name": "null",
1748 }
1749 rollback = []
1750
1751 self.topic.new(rollback, self.fake_session, indata)
1752 self.assertEqual(len(rollback), 1, "Wrong rollback length")
1753 mock_common_vim_wim_sdn.assert_called_once_with(self.fake_session, indata)
1754 self.msg.write.assert_called_once_with("vim_account", "created", ANY)
1755
1756
1757 if __name__ == "__main__":
1758 unittest.main()