Code Coverage

Cobertura Coverage Report > osm_nbi.tests >

test_admin_topics.py

Trend

Classes100%
 
Lines99%
   
Conditionals100%
 

File Coverage summary

NameClassesLinesConditionals
test_admin_topics.py
100%
1/1
99%
738/740
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
test_admin_topics.py
99%
738/740
N/A

Source

osm_nbi/tests/test_admin_topics.py
1 #! /usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #    http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 1 __author__ = "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com"
18 1 __date__ = "$2019-10-019"
19
20 1 import unittest
21 1 import random
22 1 from unittest import TestCase
23 1 from unittest.mock import Mock, patch, call, ANY
24 1 from uuid import uuid4
25 1 from http import HTTPStatus
26 1 from time import time
27 1 from osm_common import dbbase, fsbase, msgbase
28 1 from osm_nbi import authconn, validation
29 1 from osm_nbi.admin_topics import (
30     ProjectTopicAuth,
31     RoleTopicAuth,
32     UserTopicAuth,
33     CommonVimWimSdn,
34     VcaTopic,
35     VimAccountTopic,
36 )
37 1 from osm_nbi.engine import EngineException
38 1 from osm_nbi.authconn import AuthconnNotFoundException
39
40
41 1 test_pid = str(uuid4())
42 1 test_name = "test-user"
43
44
45 1 def norm(str):
46     """Normalize string for checking"""
47 1     return " ".join(str.strip().split()).lower()
48
49
50 1 class TestVcaTopic(TestCase):
51 1     def setUp(self):
52 1         self.db = Mock(dbbase.DbBase())
53 1         self.fs = Mock(fsbase.FsBase())
54 1         self.msg = Mock(msgbase.MsgBase())
55 1         self.auth = Mock(authconn.Authconn(None, None, None))
56 1         self.vca_topic = VcaTopic(self.db, self.fs, self.msg, self.auth)
57
58 1     @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_new")
59 1     def test_format_on_new(self, mock_super_format_on_new):
60 1         content = {
61             "_id": "id",
62             "secret": "encrypted_secret",
63             "cacert": "encrypted_cacert",
64         }
65 1         self.db.encrypt.side_effect = ["secret", "cacert"]
66 1         mock_super_format_on_new.return_value = "1234"
67
68 1         oid = self.vca_topic.format_on_new(content)
69
70 1         self.assertEqual(oid, "1234")
71 1         self.assertEqual(content["secret"], "secret")
72 1         self.assertEqual(content["cacert"], "cacert")
73 1         self.db.encrypt.assert_has_calls(
74             [
75                 call("encrypted_secret", schema_version="1.11", salt="id"),
76                 call("encrypted_cacert", schema_version="1.11", salt="id"),
77             ]
78         )
79 1         mock_super_format_on_new.assert_called_with(content, None, False)
80
81 1     @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_edit")
82 1     def test_format_on_edit(self, mock_super_format_on_edit):
83 1         edit_content = {
84             "_id": "id",
85             "secret": "encrypted_secret",
86             "cacert": "encrypted_cacert",
87         }
88 1         final_content = {
89             "_id": "id",
90             "schema_version": "1.11",
91         }
92 1         self.db.encrypt.side_effect = ["secret", "cacert"]
93 1         mock_super_format_on_edit.return_value = "1234"
94
95 1         oid = self.vca_topic.format_on_edit(final_content, edit_content)
96
97 1         self.assertEqual(oid, "1234")
98 1         self.assertEqual(final_content["secret"], "secret")
99 1         self.assertEqual(final_content["cacert"], "cacert")
100 1         self.db.encrypt.assert_has_calls(
101             [
102                 call("encrypted_secret", schema_version="1.11", salt="id"),
103                 call("encrypted_cacert", schema_version="1.11", salt="id"),
104             ]
105         )
106 1         mock_super_format_on_edit.assert_called()
107
108 1     @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
109 1     def test_check_conflict_on_del(self, mock_check_conflict_on_del):
110 1         session = {
111             "project_id": "project-id",
112             "force": False,
113         }
114 1         _id = "vca-id"
115 1         db_content = {}
116
117 1         self.db.get_list.return_value = None
118
119 1         self.vca_topic.check_conflict_on_del(session, _id, db_content)
120
121 1         self.db.get_list.assert_called_with(
122             "vim_accounts",
123             {"vca": _id, "_admin.projects_read.cont": "project-id"},
124         )
125 1         mock_check_conflict_on_del.assert_called_with(session, _id, db_content)
126
127 1     @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
128 1     def test_check_conflict_on_del_force(self, mock_check_conflict_on_del):
129 1         session = {
130             "project_id": "project-id",
131             "force": True,
132         }
133 1         _id = "vca-id"
134 1         db_content = {}
135
136 1         self.vca_topic.check_conflict_on_del(session, _id, db_content)
137
138 1         self.db.get_list.assert_not_called()
139 1         mock_check_conflict_on_del.assert_not_called()
140
141 1     @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
142 1     def test_check_conflict_on_del_with_conflict(self, mock_check_conflict_on_del):
143 1         session = {
144             "project_id": "project-id",
145             "force": False,
146         }
147 1         _id = "vca-id"
148 1         db_content = {}
149
150 1         self.db.get_list.return_value = {"_id": "vim", "vca": "vca-id"}
151
152 1         with self.assertRaises(EngineException) as context:
153 1             self.vca_topic.check_conflict_on_del(session, _id, db_content)
154 0             self.assertEqual(
155                 context.exception,
156                 EngineException(
157                     "There is at least one VIM account using this vca",
158                     http_code=HTTPStatus.CONFLICT,
159                 ),
160             )
161
162 1         self.db.get_list.assert_called_with(
163             "vim_accounts",
164             {"vca": _id, "_admin.projects_read.cont": "project-id"},
165         )
166 1         mock_check_conflict_on_del.assert_not_called()
167
168
169 1 class Test_ProjectTopicAuth(TestCase):
170 1     @classmethod
171 1     def setUpClass(cls):
172 1         cls.test_name = "test-project-topic"
173
174 1     def setUp(self):
175 1         self.db = Mock(dbbase.DbBase())
176 1         self.fs = Mock(fsbase.FsBase())
177 1         self.msg = Mock(msgbase.MsgBase())
178 1         self.auth = Mock(authconn.Authconn(None, None, None))
179 1         self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth)
180 1         self.fake_session = {
181             "username": self.test_name,
182             "project_id": (test_pid,),
183             "method": None,
184             "admin": True,
185             "force": False,
186             "public": False,
187             "allow_show_user_project_role": True,
188         }
189 1         self.topic.check_quota = Mock(return_value=None)  # skip quota
190
191 1     def test_new_project(self):
192 1         with self.subTest(i=1):
193 1             rollback = []
194 1             pid1 = str(uuid4())
195 1             self.auth.get_project_list.return_value = []
196 1             self.auth.create_project.return_value = pid1
197 1             pid2, oid = self.topic.new(
198                 rollback, self.fake_session, {"name": self.test_name, "quotas": {}}
199             )
200 1             self.assertEqual(len(rollback), 1, "Wrong rollback length")
201 1             self.assertEqual(pid2, pid1, "Wrong project identifier")
202 1             content = self.auth.create_project.call_args[0][0]
203 1             self.assertEqual(content["name"], self.test_name, "Wrong project name")
204 1             self.assertEqual(content["quotas"], {}, "Wrong quotas")
205 1             self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
206 1             self.assertEqual(
207                 content["_admin"]["modified"],
208                 content["_admin"]["created"],
209                 "Wrong modification time",
210             )
211 1         with self.subTest(i=2):
212 1             rollback = []
213 1             with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
214 1                 self.topic.new(
215                     rollback,
216                     self.fake_session,
217                     {"name": "other-project-name", "quotas": {"baditems": 10}},
218                 )
219 1             self.assertEqual(len(rollback), 0, "Wrong rollback length")
220 1             self.assertEqual(
221                 e.exception.http_code,
222                 HTTPStatus.UNPROCESSABLE_ENTITY,
223                 "Wrong HTTP status code",
224             )
225 1             self.assertIn(
226                 "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format(
227                     "baditems"
228                 ),
229                 norm(str(e.exception)),
230                 "Wrong exception text",
231             )
232
233 1     def test_edit_project(self):
234 1         now = time()
235 1         pid = str(uuid4())
236 1         proj = {
237             "_id": pid,
238             "name": self.test_name,
239             "_admin": {"created": now, "modified": now},
240         }
241 1         with self.subTest(i=1):
242 1             self.auth.get_project_list.side_effect = [[proj], []]
243 1             new_name = "new-project-name"
244 1             quotas = {
245                 "vnfds": random.SystemRandom().randint(0, 100),
246                 "nsds": random.SystemRandom().randint(0, 100),
247             }
248 1             self.topic.edit(
249                 self.fake_session, pid, {"name": new_name, "quotas": quotas}
250             )
251 1             _id, content = self.auth.update_project.call_args[0]
252 1             self.assertEqual(_id, pid, "Wrong project identifier")
253 1             self.assertEqual(content["_id"], pid, "Wrong project identifier")
254 1             self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
255 1             self.assertGreater(
256                 content["_admin"]["modified"], now, "Wrong modification time"
257             )
258 1             self.assertEqual(content["name"], new_name, "Wrong project name")
259 1             self.assertEqual(content["quotas"], quotas, "Wrong quotas")
260 1         with self.subTest(i=2):
261 1             new_name = "other-project-name"
262 1             quotas = {"baditems": random.SystemRandom().randint(0, 100)}
263 1             self.auth.get_project_list.side_effect = [[proj], []]
264 1             with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
265 1                 self.topic.edit(
266                     self.fake_session, pid, {"name": new_name, "quotas": quotas}
267                 )
268 1             self.assertEqual(
269                 e.exception.http_code,
270                 HTTPStatus.UNPROCESSABLE_ENTITY,
271                 "Wrong HTTP status code",
272             )
273 1             self.assertIn(
274                 "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format(
275                     "baditems"
276                 ),
277                 norm(str(e.exception)),
278                 "Wrong exception text",
279             )
280
281 1     def test_conflict_on_new(self):
282 1         with self.subTest(i=1):
283 1             rollback = []
284 1             pid = str(uuid4())
285 1             with self.assertRaises(
286                 EngineException, msg="Accepted uuid as project name"
287             ) as e:
288 1                 self.topic.new(rollback, self.fake_session, {"name": pid})
289 1             self.assertEqual(len(rollback), 0, "Wrong rollback length")
290 1             self.assertEqual(
291                 e.exception.http_code,
292                 HTTPStatus.UNPROCESSABLE_ENTITY,
293                 "Wrong HTTP status code",
294             )
295 1             self.assertIn(
296                 "project name '{}' cannot have an uuid format".format(pid),
297                 norm(str(e.exception)),
298                 "Wrong exception text",
299             )
300 1         with self.subTest(i=2):
301 1             rollback = []
302 1             self.auth.get_project_list.return_value = [
303                 {"_id": test_pid, "name": self.test_name}
304             ]
305 1             with self.assertRaises(
306                 EngineException, msg="Accepted existing project name"
307             ) as e:
308 1                 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
309 1             self.assertEqual(len(rollback), 0, "Wrong rollback length")
310 1             self.assertEqual(
311                 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
312             )
313 1             self.assertIn(
314                 "project '{}' exists".format(self.test_name),
315                 norm(str(e.exception)),
316                 "Wrong exception text",
317             )
318
319 1     def test_conflict_on_edit(self):
320 1         with self.subTest(i=1):
321 1             self.auth.get_project_list.return_value = [
322                 {"_id": test_pid, "name": self.test_name}
323             ]
324 1             new_name = str(uuid4())
325 1             with self.assertRaises(
326                 EngineException, msg="Accepted uuid as project name"
327             ) as e:
328 1                 self.topic.edit(self.fake_session, test_pid, {"name": new_name})
329 1             self.assertEqual(
330                 e.exception.http_code,
331                 HTTPStatus.UNPROCESSABLE_ENTITY,
332                 "Wrong HTTP status code",
333             )
334 1             self.assertIn(
335                 "project name '{}' cannot have an uuid format".format(new_name),
336                 norm(str(e.exception)),
337                 "Wrong exception text",
338             )
339 1         with self.subTest(i=2):
340 1             pid = str(uuid4())
341 1             self.auth.get_project_list.return_value = [{"_id": pid, "name": "admin"}]
342 1             with self.assertRaises(
343                 EngineException, msg="Accepted renaming of project 'admin'"
344             ) as e:
345 1                 self.topic.edit(self.fake_session, pid, {"name": "new-name"})
346 1             self.assertEqual(
347                 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
348             )
349 1             self.assertIn(
350                 "you cannot rename project 'admin'",
351                 norm(str(e.exception)),
352                 "Wrong exception text",
353             )
354 1         with self.subTest(i=3):
355 1             new_name = "new-project-name"
356 1             self.auth.get_project_list.side_effect = [
357                 [{"_id": test_pid, "name": self.test_name}],
358                 [{"_id": str(uuid4()), "name": new_name}],
359             ]
360 1             with self.assertRaises(
361                 EngineException, msg="Accepted existing project name"
362             ) as e:
363 1                 self.topic.edit(self.fake_session, pid, {"name": new_name})
364 1             self.assertEqual(
365                 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
366             )
367 1             self.assertIn(
368                 "project '{}' is already used".format(new_name),
369                 norm(str(e.exception)),
370                 "Wrong exception text",
371             )
372
373 1     def test_delete_project(self):
374 1         with self.subTest(i=1):
375 1             pid = str(uuid4())
376 1             self.auth.get_project.return_value = {
377                 "_id": pid,
378                 "name": "other-project-name",
379             }
380 1             self.auth.delete_project.return_value = {"deleted": 1}
381 1             self.auth.get_user_list.return_value = []
382 1             self.db.get_list.return_value = []
383 1             rc = self.topic.delete(self.fake_session, pid)
384 1             self.assertEqual(rc, {"deleted": 1}, "Wrong project deletion return info")
385 1             self.assertEqual(
386                 self.auth.get_project.call_args[0][0], pid, "Wrong project identifier"
387             )
388 1             self.assertEqual(
389                 self.auth.delete_project.call_args[0][0],
390                 pid,
391                 "Wrong project identifier",
392             )
393
394 1     def test_conflict_on_del(self):
395 1         with self.subTest(i=1):
396 1             self.auth.get_project.return_value = {
397                 "_id": test_pid,
398                 "name": self.test_name,
399             }
400 1             with self.assertRaises(
401                 EngineException, msg="Accepted deletion of own project"
402             ) as e:
403 1                 self.topic.delete(self.fake_session, self.test_name)
404 1             self.assertEqual(
405                 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
406             )
407 1             self.assertIn(
408                 "you cannot delete your own project",
409                 norm(str(e.exception)),
410                 "Wrong exception text",
411             )
412 1         with self.subTest(i=2):
413 1             self.auth.get_project.return_value = {"_id": str(uuid4()), "name": "admin"}
414 1             with self.assertRaises(
415                 EngineException, msg="Accepted deletion of project 'admin'"
416             ) as e:
417 1                 self.topic.delete(self.fake_session, "admin")
418 1             self.assertEqual(
419                 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
420             )
421 1             self.assertIn(
422                 "you cannot delete project 'admin'",
423                 norm(str(e.exception)),
424                 "Wrong exception text",
425             )
426 1         with self.subTest(i=3):
427 1             pid = str(uuid4())
428 1             name = "other-project-name"
429 1             self.auth.get_project.return_value = {"_id": pid, "name": name}
430 1             self.auth.get_user_list.return_value = [
431                 {
432                     "_id": str(uuid4()),
433                     "username": self.test_name,
434                     "project_role_mappings": [{"project": pid, "role": str(uuid4())}],
435                 }
436             ]
437 1             with self.assertRaises(
438                 EngineException, msg="Accepted deletion of used project"
439             ) as e:
440 1                 self.topic.delete(self.fake_session, pid)
441 1             self.assertEqual(
442                 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
443             )
444 1             self.assertIn(
445                 "project '{}' ({}) is being used by user '{}'".format(
446                     name, pid, self.test_name
447                 ),
448                 norm(str(e.exception)),
449                 "Wrong exception text",
450             )
451 1         with self.subTest(i=4):
452 1             self.auth.get_user_list.return_value = []
453 1             self.db.get_list.return_value = [
454                 {
455                     "_id": str(uuid4()),
456                     "id": self.test_name,
457                     "_admin": {"projects_read": [pid], "projects_write": []},
458                 }
459             ]
460 1             with self.assertRaises(
461                 EngineException, msg="Accepted deletion of used project"
462             ) as e:
463 1                 self.topic.delete(self.fake_session, pid)
464 1             self.assertEqual(
465                 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
466             )
467 1             self.assertIn(
468                 "project '{}' ({}) is being used by {} '{}'".format(
469                     name, pid, "vnf descriptor", self.test_name
470                 ),
471                 norm(str(e.exception)),
472                 "Wrong exception text",
473             )
474
475
476 1 class Test_RoleTopicAuth(TestCase):
477 1     @classmethod
478 1     def setUpClass(cls):
479 1         cls.test_name = "test-role-topic"
480 1         cls.test_operations = ["tokens:get"]
481
482 1     def setUp(self):
483 1         self.db = Mock(dbbase.DbBase())
484 1         self.fs = Mock(fsbase.FsBase())
485 1         self.msg = Mock(msgbase.MsgBase())
486 1         self.auth = Mock(authconn.Authconn(None, None, None))
487 1         self.auth.role_permissions = self.test_operations
488 1         self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth)
489 1         self.fake_session = {
490             "username": test_name,
491             "project_id": (test_pid,),
492             "method": None,
493             "admin": True,
494             "force": False,
495             "public": False,
496             "allow_show_user_project_role": True,
497         }
498 1         self.topic.check_quota = Mock(return_value=None)  # skip quota
499
500 1     def test_new_role(self):
501 1         with self.subTest(i=1):
502 1             rollback = []
503 1             rid1 = str(uuid4())
504 1             perms_in = {"tokens": True}
505 1             perms_out = {"default": False, "admin": False, "tokens": True}
506 1             self.auth.get_role_list.return_value = []
507 1             self.auth.create_role.return_value = rid1
508 1             rid2, oid = self.topic.new(
509                 rollback,
510                 self.fake_session,
511                 {"name": self.test_name, "permissions": perms_in},
512             )
513 1             self.assertEqual(len(rollback), 1, "Wrong rollback length")
514 1             self.assertEqual(rid2, rid1, "Wrong project identifier")
515 1             content = self.auth.create_role.call_args[0][0]
516 1             self.assertEqual(content["name"], self.test_name, "Wrong role name")
517 1             self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
518 1             self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
519 1             self.assertEqual(
520                 content["_admin"]["modified"],
521                 content["_admin"]["created"],
522                 "Wrong modification time",
523             )
524 1         with self.subTest(i=2):
525 1             rollback = []
526 1             with self.assertRaises(
527                 EngineException, msg="Accepted wrong permissions"
528             ) as e:
529 1                 self.topic.new(
530                     rollback,
531                     self.fake_session,
532                     {"name": "other-role-name", "permissions": {"projects": True}},
533                 )
534 1             self.assertEqual(len(rollback), 0, "Wrong rollback length")
535 1             self.assertEqual(
536                 e.exception.http_code,
537                 HTTPStatus.UNPROCESSABLE_ENTITY,
538                 "Wrong HTTP status code",
539             )
540 1             self.assertIn(
541                 "invalid permission '{}'".format("projects"),
542                 norm(str(e.exception)),
543                 "Wrong exception text",
544             )
545
546 1     def test_edit_role(self):
547 1         now = time()
548 1         rid = str(uuid4())
549 1         role = {
550             "_id": rid,
551             "name": self.test_name,
552             "permissions": {"tokens": True},
553             "_admin": {"created": now, "modified": now},
554         }
555 1         with self.subTest(i=1):
556 1             self.auth.get_role_list.side_effect = [[role], []]
557 1             self.auth.get_role.return_value = role
558 1             new_name = "new-role-name"
559 1             perms_in = {"tokens": False, "tokens:get": True}
560 1             perms_out = {
561                 "default": False,
562                 "admin": False,
563                 "tokens": False,
564                 "tokens:get": True,
565             }
566 1             self.topic.edit(
567                 self.fake_session, rid, {"name": new_name, "permissions": perms_in}
568             )
569 1             content = self.auth.update_role.call_args[0][0]
570 1             self.assertEqual(content["_id"], rid, "Wrong role identifier")
571 1             self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
572 1             self.assertGreater(
573                 content["_admin"]["modified"], now, "Wrong modification time"
574             )
575 1             self.assertEqual(content["name"], new_name, "Wrong role name")
576 1             self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
577 1         with self.subTest(i=2):
578 1             new_name = "other-role-name"
579 1             perms_in = {"tokens": False, "tokens:post": True}
580 1             self.auth.get_role_list.side_effect = [[role], []]
581 1             with self.assertRaises(
582                 EngineException, msg="Accepted wrong permissions"
583             ) as e:
584 1                 self.topic.edit(
585                     self.fake_session, rid, {"name": new_name, "permissions": perms_in}
586                 )
587 1             self.assertEqual(
588                 e.exception.http_code,
589                 HTTPStatus.UNPROCESSABLE_ENTITY,
590                 "Wrong HTTP status code",
591             )
592 1             self.assertIn(
593                 "invalid permission '{}'".format("tokens:post"),
594                 norm(str(e.exception)),
595                 "Wrong exception text",
596             )
597
598 1     def test_delete_role(self):
599 1         with self.subTest(i=1):
600 1             rid = str(uuid4())
601 1             role = {"_id": rid, "name": "other-role-name"}
602 1             self.auth.get_role_list.return_value = [role]
603 1             self.auth.get_role.return_value = role
604 1             self.auth.delete_role.return_value = {"deleted": 1}
605 1             self.auth.get_user_list.return_value = []
606 1             rc = self.topic.delete(self.fake_session, rid)
607 1             self.assertEqual(rc, {"deleted": 1}, "Wrong role deletion return info")
608 1             self.assertEqual(
609                 self.auth.get_role_list.call_args[0][0]["_id"],
610                 rid,
611                 "Wrong role identifier",
612             )
613 1             self.assertEqual(
614                 self.auth.get_role.call_args[0][0], rid, "Wrong role identifier"
615             )
616 1             self.assertEqual(
617                 self.auth.delete_role.call_args[0][0], rid, "Wrong role identifier"
618             )
619
620 1     def test_conflict_on_new(self):
621 1         with self.subTest(i=1):
622 1             rollback = []
623 1             rid = str(uuid4())
624 1             with self.assertRaises(
625                 EngineException, msg="Accepted uuid as role name"
626             ) as e:
627 1                 self.topic.new(rollback, self.fake_session, {"name": rid})
628 1             self.assertEqual(len(rollback), 0, "Wrong rollback length")
629 1             self.assertEqual(
630                 e.exception.http_code,
631                 HTTPStatus.UNPROCESSABLE_ENTITY,
632                 "Wrong HTTP status code",
633             )
634 1             self.assertIn(
635                 "role name '{}' cannot have an uuid format".format(rid),
636                 norm(str(e.exception)),
637                 "Wrong exception text",
638             )
639 1         with self.subTest(i=2):
640 1             rollback = []
641 1             self.auth.get_role_list.return_value = [
642                 {"_id": str(uuid4()), "name": self.test_name}
643             ]
644 1             with self.assertRaises(
645                 EngineException, msg="Accepted existing role name"
646             ) as e:
647 1                 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
648 1             self.assertEqual(len(rollback), 0, "Wrong rollback length")
649 1             self.assertEqual(
650                 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
651             )
652 1             self.assertIn(
653                 "role name '{}' exists".format(self.test_name),
654                 norm(str(e.exception)),
655                 "Wrong exception text",
656             )
657
658 1     def test_conflict_on_edit(self):
659 1         rid = str(uuid4())
660 1         with self.subTest(i=1):
661 1             self.auth.get_role_list.return_value = [
662                 {"_id": rid, "name": self.test_name, "permissions": {}}
663             ]
664 1             new_name = str(uuid4())
665 1             with self.assertRaises(
666                 EngineException, msg="Accepted uuid as role name"
667             ) as e:
668 1                 self.topic.edit(self.fake_session, rid, {"name": new_name})
669 1             self.assertEqual(
670                 e.exception.http_code,
671                 HTTPStatus.UNPROCESSABLE_ENTITY,
672                 "Wrong HTTP status code",
673             )
674 1             self.assertIn(
675                 "role name '{}' cannot have an uuid format".format(new_name),
676                 norm(str(e.exception)),
677                 "Wrong exception text",
678             )
679 1         for i, role_name in enumerate(["system_admin", "project_admin"], start=2):
680 1             with self.subTest(i=i):
681 1                 rid = str(uuid4())
682 1                 self.auth.get_role.return_value = {
683                     "_id": rid,
684                     "name": role_name,
685                     "permissions": {},
686                 }
687 1                 with self.assertRaises(
688                     EngineException,
689                     msg="Accepted renaming of role '{}'".format(role_name),
690                 ) as e:
691 1                     self.topic.edit(self.fake_session, rid, {"name": "new-name"})
692 1                 self.assertEqual(
693                     e.exception.http_code,
694                     HTTPStatus.FORBIDDEN,
695                     "Wrong HTTP status code",
696                 )
697 1                 self.assertIn(
698                     "you cannot rename role '{}'".format(role_name),
699                     norm(str(e.exception)),
700                     "Wrong exception text",
701                 )
702 1         with self.subTest(i=i + 1):
703 1             new_name = "new-role-name"
704 1             self.auth.get_role_list.side_effect = [
705                 [{"_id": rid, "name": self.test_name, "permissions": {}}],
706                 [{"_id": str(uuid4()), "name": new_name, "permissions": {}}],
707             ]
708 1             self.auth.get_role.return_value = {
709                 "_id": rid,
710                 "name": self.test_name,
711                 "permissions": {},
712             }
713 1             with self.assertRaises(
714                 EngineException, msg="Accepted existing role name"
715             ) as e:
716 1                 self.topic.edit(self.fake_session, rid, {"name": new_name})
717 1             self.assertEqual(
718                 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
719             )
720 1             self.assertIn(
721                 "role name '{}' exists".format(new_name),
722                 norm(str(e.exception)),
723                 "Wrong exception text",
724             )
725
726 1     def test_conflict_on_del(self):
727 1         for i, role_name in enumerate(["system_admin", "project_admin"], start=1):
728 1             with self.subTest(i=i):
729 1                 rid = str(uuid4())
730 1                 role = {"_id": rid, "name": role_name}
731 1                 self.auth.get_role_list.return_value = [role]
732 1                 self.auth.get_role.return_value = role
733 1                 with self.assertRaises(
734                     EngineException,
735                     msg="Accepted deletion of role '{}'".format(role_name),
736                 ) as e:
737 1                     self.topic.delete(self.fake_session, rid)
738 1                 self.assertEqual(
739                     e.exception.http_code,
740                     HTTPStatus.FORBIDDEN,
741                     "Wrong HTTP status code",
742                 )
743 1                 self.assertIn(
744                     "you cannot delete role '{}'".format(role_name),
745                     norm(str(e.exception)),
746                     "Wrong exception text",
747                 )
748 1         with self.subTest(i=i + 1):
749 1             rid = str(uuid4())
750 1             name = "other-role-name"
751 1             role = {"_id": rid, "name": name}
752 1             self.auth.get_role_list.return_value = [role]
753 1             self.auth.get_role.return_value = role
754 1             self.auth.get_user_list.return_value = [
755                 {
756                     "_id": str(uuid4()),
757                     "username": self.test_name,
758                     "project_role_mappings": [{"project": str(uuid4()), "role": rid}],
759                 }
760             ]
761 1             with self.assertRaises(
762                 EngineException, msg="Accepted deletion of used role"
763             ) as e:
764 1                 self.topic.delete(self.fake_session, rid)
765 1             self.assertEqual(
766                 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
767             )
768 1             self.assertIn(
769                 "role '{}' ({}) is being used by user '{}'".format(
770                     name, rid, self.test_name
771                 ),
772                 norm(str(e.exception)),
773                 "Wrong exception text",
774             )
775
776
777 1 class Test_UserTopicAuth(TestCase):
778 1     @classmethod
779 1     def setUpClass(cls):
780 1         cls.test_name = "test-user-topic"
781
782 1     def setUp(self):
783 1         self.db = Mock(dbbase.DbBase())
784 1         self.fs = Mock(fsbase.FsBase())
785 1         self.msg = Mock(msgbase.MsgBase())
786 1         self.auth = Mock(authconn.Authconn(None, None, None))
787 1         self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth)
788 1         self.fake_session = {
789             "username": test_name,
790             "project_id": (test_pid,),
791             "method": None,
792             "admin": True,
793             "force": False,
794             "public": False,
795             "allow_show_user_project_role": True,
796         }
797 1         self.topic.check_quota = Mock(return_value=None)  # skip quota
798
799 1     def test_new_user(self):
800 1         uid1 = str(uuid4())
801 1         pid = str(uuid4())
802 1         self.auth.get_user_list.return_value = []
803 1         self.auth.get_project.return_value = {"_id": pid, "name": "some_project"}
804 1         self.auth.create_user.return_value = {"_id": uid1, "username": self.test_name}
805 1         with self.subTest(i=1):
806 1             rollback = []
807 1             rid = str(uuid4())
808 1             self.auth.get_role.return_value = {"_id": rid, "name": "some_role"}
809 1             prms_in = [{"project": "some_project", "role": "some_role"}]
810 1             prms_out = [{"project": pid, "role": rid}]
811 1             uid2, oid = self.topic.new(
812                 rollback,
813                 self.fake_session,
814                 {
815                     "username": self.test_name,
816                     "password": self.test_name,
817                     "project_role_mappings": prms_in,
818                 },
819             )
820 1             self.assertEqual(len(rollback), 1, "Wrong rollback length")
821 1             self.assertEqual(uid2, uid1, "Wrong project identifier")
822 1             content = self.auth.create_user.call_args[0][0]
823 1             self.assertEqual(content["username"], self.test_name, "Wrong project name")
824 1             self.assertEqual(content["password"], self.test_name, "Wrong password")
825 1             self.assertEqual(
826                 content["project_role_mappings"],
827                 prms_out,
828                 "Wrong project-role mappings",
829             )
830 1             self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
831 1             self.assertEqual(
832                 content["_admin"]["modified"],
833                 content["_admin"]["created"],
834                 "Wrong modification time",
835             )
836 1         with self.subTest(i=2):
837 1             rollback = []
838 1             def_rid = str(uuid4())
839 1             def_role = {"_id": def_rid, "name": "project_admin"}
840 1             self.auth.get_role.return_value = def_role
841 1             self.auth.get_role_list.return_value = [def_role]
842 1             prms_out = [{"project": pid, "role": def_rid}]
843 1             uid2, oid = self.topic.new(
844                 rollback,
845                 self.fake_session,
846                 {
847                     "username": self.test_name,
848                     "password": self.test_name,
849                     "projects": ["some_project"],
850                 },
851             )
852 1             self.assertEqual(len(rollback), 1, "Wrong rollback length")
853 1             self.assertEqual(uid2, uid1, "Wrong project identifier")
854 1             content = self.auth.create_user.call_args[0][0]
855 1             self.assertEqual(content["username"], self.test_name, "Wrong project name")
856 1             self.assertEqual(content["password"], self.test_name, "Wrong password")
857 1             self.assertEqual(
858                 content["project_role_mappings"],
859                 prms_out,
860                 "Wrong project-role mappings",
861             )
862 1             self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
863 1             self.assertEqual(
864                 content["_admin"]["modified"],
865                 content["_admin"]["created"],
866                 "Wrong modification time",
867             )
868 1         with self.subTest(i=3):
869 1             rollback = []
870 1             with self.assertRaises(
871                 EngineException, msg="Accepted wrong project-role mappings"
872             ) as e:
873 1                 self.topic.new(
874                     rollback,
875                     self.fake_session,
876                     {
877                         "username": "other-project-name",
878                         "password": "other-password",
879                         "project_role_mappings": [{}],
880                     },
881                 )
882 1             self.assertEqual(len(rollback), 0, "Wrong rollback length")
883 1             self.assertEqual(
884                 e.exception.http_code,
885                 HTTPStatus.UNPROCESSABLE_ENTITY,
886                 "Wrong HTTP status code",
887             )
888 1             self.assertIn(
889                 "format error at '{}' '{}'".format(
890                     "project_role_mappings:{}", "'{}' is a required property"
891                 ).format(0, "project"),
892                 norm(str(e.exception)),
893                 "Wrong exception text",
894             )
895 1         with self.subTest(i=4):
896 1             rollback = []
897 1             with self.assertRaises(EngineException, msg="Accepted wrong projects") as e:
898 1                 self.topic.new(
899                     rollback,
900                     self.fake_session,
901                     {
902                         "username": "other-project-name",
903                         "password": "other-password",
904                         "projects": [],
905                     },
906                 )
907 1             self.assertEqual(len(rollback), 0, "Wrong rollback length")
908 1             self.assertEqual(
909                 e.exception.http_code,
910                 HTTPStatus.UNPROCESSABLE_ENTITY,
911                 "Wrong HTTP status code",
912             )
913 1             self.assertIn(
914                 "format error at '{}' '{}'".format(
915                     "projects", "{} is too short"
916                 ).format([]),
917                 norm(str(e.exception)),
918                 "Wrong exception text",
919             )
920
921 1     def test_edit_user(self):
922 1         now = time()
923 1         uid = str(uuid4())
924 1         pid1 = str(uuid4())
925 1         rid1 = str(uuid4())
926 1         prms = [
927             {
928                 "project": pid1,
929                 "project_name": "project-1",
930                 "role": rid1,
931                 "role_name": "role-1",
932             }
933         ]
934 1         user = {
935             "_id": uid,
936             "username": self.test_name,
937             "project_role_mappings": prms,
938             "_admin": {"created": now, "modified": now},
939         }
940 1         with self.subTest(i=1):
941 1             self.auth.get_user_list.side_effect = [[user], []]
942 1             self.auth.get_user.return_value = user
943 1             pid2 = str(uuid4())
944 1             rid2 = str(uuid4())
945 1             self.auth.get_project.side_effect = [
946                 {"_id": pid2, "name": "project-2"},
947                 {"_id": pid1, "name": "project-1"},
948             ]
949 1             self.auth.get_role.side_effect = [
950                 {"_id": rid2, "name": "role-2"},
951                 {"_id": rid1, "name": "role-1"},
952             ]
953 1             new_name = "new-user-name"
954 1             new_pasw = "new-password"
955 1             add_prms = [{"project": pid2, "role": rid2}]
956 1             rem_prms = [{"project": pid1, "role": rid1}]
957 1             self.topic.edit(
958                 self.fake_session,
959                 uid,
960                 {
961                     "username": new_name,
962                     "password": new_pasw,
963                     "add_project_role_mappings": add_prms,
964                     "remove_project_role_mappings": rem_prms,
965                 },
966             )
967 1             content = self.auth.update_user.call_args[0][0]
968 1             self.assertEqual(content["_id"], uid, "Wrong user identifier")
969 1             self.assertEqual(content["username"], new_name, "Wrong user name")
970 1             self.assertEqual(content["password"], new_pasw, "Wrong user password")
971 1             self.assertEqual(
972                 content["add_project_role_mappings"],
973                 add_prms,
974                 "Wrong project-role mappings to add",
975             )
976 1             self.assertEqual(
977                 content["remove_project_role_mappings"],
978                 prms,
979                 "Wrong project-role mappings to remove",
980             )
981 1         with self.subTest(i=2):
982 1             new_name = "other-user-name"
983 1             new_prms = [{}]
984 1             self.auth.get_role_list.side_effect = [[user], []]
985 1             self.auth.get_user_list.side_effect = [[user]]
986 1             with self.assertRaises(
987                 EngineException, msg="Accepted wrong project-role mappings"
988             ) as e:
989 1                 self.topic.edit(
990                     self.fake_session,
991                     uid,
992                     {"username": new_name, "project_role_mappings": new_prms},
993                 )
994 1             self.assertEqual(
995                 e.exception.http_code,
996                 HTTPStatus.UNPROCESSABLE_ENTITY,
997                 "Wrong HTTP status code",
998             )
999 1             self.assertIn(
1000                 "format error at '{}' '{}'".format(
1001                     "project_role_mappings:{}", "'{}' is a required property"
1002                 ).format(0, "project"),
1003                 norm(str(e.exception)),
1004                 "Wrong exception text",
1005             )
1006 1         with self.subTest(i=3):
1007 1             self.auth.get_user_list.side_effect = [[user], []]
1008 1             self.auth.get_user.return_value = user
1009 1             old_password = self.test_name
1010 1             new_pasw = "new-password"
1011 1             self.topic.edit(
1012                 self.fake_session,
1013                 uid,
1014                 {
1015                     "old_password": old_password,
1016                     "password": new_pasw,
1017                 },
1018             )
1019 1             content = self.auth.update_user.call_args[0][0]
1020 1             self.assertEqual(
1021                 content["old_password"], old_password, "Wrong old password"
1022             )
1023 1             self.assertEqual(content["password"], new_pasw, "Wrong user password")
1024
1025 1     def test_delete_user(self):
1026 1         with self.subTest(i=1):
1027 1             uid = str(uuid4())
1028 1             self.fake_session["username"] = self.test_name
1029 1             user = user = {
1030                 "_id": uid,
1031                 "username": "other-user-name",
1032                 "project_role_mappings": [],
1033             }
1034 1             self.auth.get_user.return_value = user
1035 1             self.auth.delete_user.return_value = {"deleted": 1}
1036 1             rc = self.topic.delete(self.fake_session, uid)
1037 1             self.assertEqual(rc, {"deleted": 1}, "Wrong user deletion return info")
1038 1             self.assertEqual(
1039                 self.auth.get_user.call_args[0][0], uid, "Wrong user identifier"
1040             )
1041 1             self.assertEqual(
1042                 self.auth.delete_user.call_args[0][0], uid, "Wrong user identifier"
1043             )
1044
1045 1     def test_conflict_on_new(self):
1046 1         with self.subTest(i=1):
1047 1             rollback = []
1048 1             uid = str(uuid4())
1049 1             with self.assertRaises(
1050                 EngineException, msg="Accepted uuid as username"
1051             ) as e:
1052 1                 self.topic.new(
1053                     rollback,
1054                     self.fake_session,
1055                     {
1056                         "username": uid,
1057                         "password": self.test_name,
1058                         "projects": [test_pid],
1059                     },
1060                 )
1061 1             self.assertEqual(len(rollback), 0, "Wrong rollback length")
1062 1             self.assertEqual(
1063                 e.exception.http_code,
1064                 HTTPStatus.UNPROCESSABLE_ENTITY,
1065                 "Wrong HTTP status code",
1066             )
1067 1             self.assertIn(
1068                 "username '{}' cannot have a uuid format".format(uid),
1069                 norm(str(e.exception)),
1070                 "Wrong exception text",
1071             )
1072 1         with self.subTest(i=2):
1073 1             rollback = []
1074 1             self.auth.get_user_list.return_value = [
1075                 {"_id": str(uuid4()), "username": self.test_name}
1076             ]
1077 1             with self.assertRaises(
1078                 EngineException, msg="Accepted existing username"
1079             ) as e:
1080 1                 self.topic.new(
1081                     rollback,
1082                     self.fake_session,
1083                     {
1084                         "username": self.test_name,
1085                         "password": self.test_name,
1086                         "projects": [test_pid],
1087                     },
1088                 )
1089 1             self.assertEqual(len(rollback), 0, "Wrong rollback length")
1090 1             self.assertEqual(
1091                 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1092             )
1093 1             self.assertIn(
1094                 "username '{}' is already used".format(self.test_name),
1095                 norm(str(e.exception)),
1096                 "Wrong exception text",
1097             )
1098 1         with self.subTest(i=3):
1099 1             rollback = []
1100 1             self.auth.get_user_list.return_value = []
1101 1             self.auth.get_role_list.side_effect = [[], []]
1102 1             with self.assertRaises(
1103                 AuthconnNotFoundException, msg="Accepted user without default role"
1104             ) as e:
1105 1                 self.topic.new(
1106                     rollback,
1107                     self.fake_session,
1108                     {
1109                         "username": self.test_name,
1110                         "password": self.test_name,
1111                         "projects": [str(uuid4())],
1112                     },
1113                 )
1114 1             self.assertEqual(len(rollback), 0, "Wrong rollback length")
1115 1             self.assertEqual(
1116                 e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code"
1117             )
1118 1             self.assertIn(
1119                 "can't find default role for user '{}'".format(self.test_name),
1120                 norm(str(e.exception)),
1121                 "Wrong exception text",
1122             )
1123
1124 1     def test_conflict_on_edit(self):
1125 1         uid = str(uuid4())
1126 1         with self.subTest(i=1):
1127 1             self.auth.get_user_list.return_value = [
1128                 {"_id": uid, "username": self.test_name}
1129             ]
1130 1             new_name = str(uuid4())
1131 1             with self.assertRaises(
1132                 EngineException, msg="Accepted uuid as username"
1133             ) as e:
1134 1                 self.topic.edit(self.fake_session, uid, {"username": new_name})
1135 1             self.assertEqual(
1136                 e.exception.http_code,
1137                 HTTPStatus.UNPROCESSABLE_ENTITY,
1138                 "Wrong HTTP status code",
1139             )
1140 1             self.assertIn(
1141                 "username '{}' cannot have an uuid format".format(new_name),
1142                 norm(str(e.exception)),
1143                 "Wrong exception text",
1144             )
1145 1         with self.subTest(i=2):
1146 1             self.auth.get_user_list.return_value = [
1147                 {"_id": uid, "username": self.test_name}
1148             ]
1149 1             self.auth.get_role_list.side_effect = [[], []]
1150 1             with self.assertRaises(
1151                 AuthconnNotFoundException, msg="Accepted user without default role"
1152             ) as e:
1153 1                 self.topic.edit(self.fake_session, uid, {"projects": [str(uuid4())]})
1154 1             self.assertEqual(
1155                 e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code"
1156             )
1157 1             self.assertIn(
1158                 "can't find a default role for user '{}'".format(self.test_name),
1159                 norm(str(e.exception)),
1160                 "Wrong exception text",
1161             )
1162 1         with self.subTest(i=3):
1163 1             admin_uid = str(uuid4())
1164 1             self.auth.get_user_list.return_value = [
1165                 {"_id": admin_uid, "username": "admin"}
1166             ]
1167 1             with self.assertRaises(
1168                 EngineException,
1169                 msg="Accepted removing system_admin role from admin user",
1170             ) as e:
1171 1                 self.topic.edit(
1172                     self.fake_session,
1173                     admin_uid,
1174                     {
1175                         "remove_project_role_mappings": [
1176                             {"project": "admin", "role": "system_admin"}
1177                         ]
1178                     },
1179                 )
1180 1             self.assertEqual(
1181                 e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code"
1182             )
1183 1             self.assertIn(
1184                 "you cannot remove system_admin role from admin user",
1185                 norm(str(e.exception)),
1186                 "Wrong exception text",
1187             )
1188 1         with self.subTest(i=4):
1189 1             new_name = "new-user-name"
1190 1             self.auth.get_user_list.side_effect = [
1191                 [{"_id": uid, "name": self.test_name}],
1192                 [{"_id": str(uuid4()), "name": new_name}],
1193             ]
1194 1             with self.assertRaises(
1195                 EngineException, msg="Accepted existing username"
1196             ) as e:
1197 1                 self.topic.edit(self.fake_session, uid, {"username": new_name})
1198 1             self.assertEqual(
1199                 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1200             )
1201 1             self.assertIn(
1202                 "username '{}' is already used".format(new_name),
1203                 norm(str(e.exception)),
1204                 "Wrong exception text",
1205             )
1206
1207 1     def test_conflict_on_del(self):
1208 1         with self.subTest(i=1):
1209 1             uid = str(uuid4())
1210 1             self.fake_session["username"] = self.test_name
1211 1             user = user = {
1212                 "_id": uid,
1213                 "username": self.test_name,
1214                 "project_role_mappings": [],
1215             }
1216 1             self.auth.get_user.return_value = user
1217 1             with self.assertRaises(
1218                 EngineException, msg="Accepted deletion of own user"
1219             ) as e:
1220 1                 self.topic.delete(self.fake_session, uid)
1221 1             self.assertEqual(
1222                 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1223             )
1224 1             self.assertIn(
1225                 "you cannot delete your own login user",
1226                 norm(str(e.exception)),
1227                 "Wrong exception text",
1228             )
1229
1230
1231 1 class Test_CommonVimWimSdn(TestCase):
1232 1     @classmethod
1233 1     def setUpClass(cls):
1234 1         cls.test_name = "test-cim-topic"  # CIM = Common Infrastructure Manager
1235
1236 1     def setUp(self):
1237 1         self.db = Mock(dbbase.DbBase())
1238 1         self.fs = Mock(fsbase.FsBase())
1239 1         self.msg = Mock(msgbase.MsgBase())
1240 1         self.auth = Mock(authconn.Authconn(None, None, None))
1241 1         self.topic = CommonVimWimSdn(self.db, self.fs, self.msg, self.auth)
1242         # Use WIM schemas for testing because they are the simplest
1243 1         self.topic._send_msg = Mock()
1244 1         self.topic.topic = "wims"
1245 1         self.topic.schema_new = validation.wim_account_new_schema
1246 1         self.topic.schema_edit = validation.wim_account_edit_schema
1247 1         self.fake_session = {
1248             "username": test_name,
1249             "project_id": (test_pid,),
1250             "method": None,
1251             "admin": True,
1252             "force": False,
1253             "public": False,
1254             "allow_show_user_project_role": True,
1255         }
1256 1         self.topic.check_quota = Mock(return_value=None)  # skip quota
1257
1258 1     def test_new_cvws(self):
1259 1         test_url = "http://0.0.0.0:0"
1260 1         with self.subTest(i=1):
1261 1             rollback = []
1262 1             test_type = "fake"
1263 1             self.db.get_one.return_value = None
1264 1             self.db.create.side_effect = lambda self, content: content["_id"]
1265 1             cid, oid = self.topic.new(
1266                 rollback,
1267                 self.fake_session,
1268                 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type},
1269             )
1270 1             self.assertEqual(len(rollback), 1, "Wrong rollback length")
1271 1             args = self.db.create.call_args[0]
1272 1             content = args[1]
1273 1             self.assertEqual(args[0], self.topic.topic, "Wrong topic")
1274 1             self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
1275 1             self.assertEqual(content["name"], self.test_name, "Wrong CIM name")
1276 1             self.assertEqual(content["wim_url"], test_url, "Wrong URL")
1277 1             self.assertEqual(content["wim_type"], test_type, "Wrong CIM type")
1278 1             self.assertEqual(content["schema_version"], "1.11", "Wrong schema version")
1279 1             self.assertEqual(content["op_id"], oid, "Wrong operation identifier")
1280 1             self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
1281 1             self.assertEqual(
1282                 content["_admin"]["modified"],
1283                 content["_admin"]["created"],
1284                 "Wrong modification time",
1285             )
1286 1             self.assertEqual(
1287                 content["_admin"]["operationalState"],
1288                 "PROCESSING",
1289                 "Wrong operational state",
1290             )
1291 1             self.assertEqual(
1292                 content["_admin"]["projects_read"],
1293                 [test_pid],
1294                 "Wrong read-only projects",
1295             )
1296 1             self.assertEqual(
1297                 content["_admin"]["projects_write"],
1298                 [test_pid],
1299                 "Wrong read/write projects",
1300             )
1301 1             self.assertIsNone(
1302                 content["_admin"]["current_operation"], "Wrong current operation"
1303             )
1304 1             self.assertEqual(
1305                 len(content["_admin"]["operations"]), 1, "Wrong number of operations"
1306             )
1307 1             operation = content["_admin"]["operations"][0]
1308 1             self.assertEqual(
1309                 operation["lcmOperationType"], "create", "Wrong operation type"
1310             )
1311 1             self.assertEqual(
1312                 operation["operationState"], "PROCESSING", "Wrong operation state"
1313             )
1314 1             self.assertGreater(
1315                 operation["startTime"],
1316                 content["_admin"]["created"],
1317                 "Wrong operation start time",
1318             )
1319 1             self.assertGreater(
1320                 operation["statusEnteredTime"],
1321                 content["_admin"]["created"],
1322                 "Wrong operation status enter time",
1323             )
1324 1             self.assertEqual(
1325                 operation["detailed-status"], "", "Wrong operation detailed status info"
1326             )
1327 1             self.assertIsNone(
1328                 operation["operationParams"], "Wrong operation parameters"
1329             )
1330         # This test is disabled. From Feature 8030 we admit all WIM/SDN types
1331         # with self.subTest(i=2):
1332         #     rollback = []
1333         #     test_type = "bad_type"
1334         #     with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e:
1335         #         self.topic.new(rollback, self.fake_session,
1336         #                        {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
1337         #     self.assertEqual(len(rollback), 0, "Wrong rollback length")
1338         #     self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
1339         #     self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type,""),
1340         #                   norm(str(e.exception)), "Wrong exception text")
1341
1342 1     def test_conflict_on_new(self):
1343 1         with self.subTest(i=1):
1344 1             rollback = []
1345 1             test_url = "http://0.0.0.0:0"
1346 1             test_type = "fake"
1347 1             self.db.get_one.return_value = {"_id": str(uuid4()), "name": self.test_name}
1348 1             with self.assertRaises(
1349                 EngineException, msg="Accepted existing CIM name"
1350             ) as e:
1351 1                 self.topic.new(
1352                     rollback,
1353                     self.fake_session,
1354                     {
1355                         "name": self.test_name,
1356                         "wim_url": test_url,
1357                         "wim_type": test_type,
1358                     },
1359                 )
1360 1             self.assertEqual(len(rollback), 0, "Wrong rollback length")
1361 1             self.assertEqual(
1362                 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1363             )
1364 1             self.assertIn(
1365                 "name '{}' already exists for {}".format(
1366                     self.test_name, self.topic.topic
1367                 ),
1368                 norm(str(e.exception)),
1369                 "Wrong exception text",
1370             )
1371
1372 1     def test_edit_cvws(self):
1373 1         now = time()
1374 1         cid = str(uuid4())
1375 1         test_url = "http://0.0.0.0:0"
1376 1         test_type = "fake"
1377 1         cvws = {
1378             "_id": cid,
1379             "name": self.test_name,
1380             "wim_url": test_url,
1381             "wim_type": test_type,
1382             "_admin": {
1383                 "created": now,
1384                 "modified": now,
1385                 "operations": [{"lcmOperationType": "create"}],
1386             },
1387         }
1388 1         with self.subTest(i=1):
1389 1             new_name = "new-cim-name"
1390 1             new_url = "https://1.1.1.1:1"
1391 1             new_type = "onos"
1392 1             self.db.get_one.side_effect = [cvws, None]
1393 1             self.db.replace.return_value = {"updated": 1}
1394             # self.db.encrypt.side_effect = [b64str(), b64str()]
1395 1             self.topic.edit(
1396                 self.fake_session,
1397                 cid,
1398                 {"name": new_name, "wim_url": new_url, "wim_type": new_type},
1399             )
1400 1             args = self.db.replace.call_args[0]
1401 1             content = args[2]
1402 1             self.assertEqual(args[0], self.topic.topic, "Wrong topic")
1403 1             self.assertEqual(args[1], cid, "Wrong CIM identifier")
1404 1             self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
1405 1             self.assertEqual(content["name"], new_name, "Wrong CIM name")
1406 1             self.assertEqual(content["wim_type"], new_type, "Wrong CIM type")
1407 1             self.assertEqual(content["wim_url"], new_url, "Wrong URL")
1408 1             self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
1409 1             self.assertGreater(
1410                 content["_admin"]["modified"],
1411                 content["_admin"]["created"],
1412                 "Wrong modification time",
1413             )
1414 1             self.assertEqual(
1415                 len(content["_admin"]["operations"]), 2, "Wrong number of operations"
1416             )
1417 1             operation = content["_admin"]["operations"][1]
1418 1             self.assertEqual(
1419                 operation["lcmOperationType"], "edit", "Wrong operation type"
1420             )
1421 1             self.assertEqual(
1422                 operation["operationState"], "PROCESSING", "Wrong operation state"
1423             )
1424 1             self.assertGreater(
1425                 operation["startTime"],
1426                 content["_admin"]["modified"],
1427                 "Wrong operation start time",
1428             )
1429 1             self.assertGreater(
1430                 operation["statusEnteredTime"],
1431                 content["_admin"]["modified"],
1432                 "Wrong operation status enter time",
1433             )
1434 1             self.assertEqual(
1435                 operation["detailed-status"], "", "Wrong operation detailed status info"
1436             )
1437 1             self.assertIsNone(
1438                 operation["operationParams"], "Wrong operation parameters"
1439             )
1440 1         with self.subTest(i=2):
1441 1             self.db.get_one.side_effect = [cvws]
1442 1             with self.assertRaises(EngineException, msg="Accepted wrong property") as e:
1443 1                 self.topic.edit(
1444                     self.fake_session,
1445                     str(uuid4()),
1446                     {"name": "new-name", "extra_prop": "anything"},
1447                 )
1448 1             self.assertEqual(
1449                 e.exception.http_code,
1450                 HTTPStatus.UNPROCESSABLE_ENTITY,
1451                 "Wrong HTTP status code",
1452             )
1453 1             self.assertIn(
1454                 "format error '{}'".format(
1455                     "additional properties are not allowed ('{}' was unexpected)"
1456                 ).format("extra_prop"),
1457                 norm(str(e.exception)),
1458                 "Wrong exception text",
1459             )
1460
1461 1     def test_conflict_on_edit(self):
1462 1         with self.subTest(i=1):
1463 1             cid = str(uuid4())
1464 1             new_name = "new-cim-name"
1465 1             self.db.get_one.side_effect = [
1466                 {"_id": cid, "name": self.test_name},
1467                 {"_id": str(uuid4()), "name": new_name},
1468             ]
1469 1             with self.assertRaises(
1470                 EngineException, msg="Accepted existing CIM name"
1471             ) as e:
1472 1                 self.topic.edit(self.fake_session, cid, {"name": new_name})
1473 1             self.assertEqual(
1474                 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1475             )
1476 1             self.assertIn(
1477                 "name '{}' already exists for {}".format(new_name, self.topic.topic),
1478                 norm(str(e.exception)),
1479                 "Wrong exception text",
1480             )
1481
1482 1     def test_delete_cvws(self):
1483 1         cid = str(uuid4())
1484 1         ro_pid = str(uuid4())
1485 1         rw_pid = str(uuid4())
1486 1         cvws = {"_id": cid, "name": self.test_name}
1487 1         self.db.get_list.return_value = []
1488 1         with self.subTest(i=1):
1489 1             cvws["_admin"] = {
1490                 "projects_read": [test_pid, ro_pid, rw_pid],
1491                 "projects_write": [test_pid, rw_pid],
1492             }
1493 1             self.db.get_one.return_value = cvws
1494 1             oid = self.topic.delete(self.fake_session, cid)
1495 1             self.assertIsNone(oid, "Wrong operation identifier")
1496 1             self.assertEqual(
1497                 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1498             )
1499 1             self.assertEqual(
1500                 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1501             )
1502 1             self.assertEqual(
1503                 self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic"
1504             )
1505 1             self.assertEqual(
1506                 self.db.set_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1507             )
1508 1             self.assertEqual(
1509                 self.db.set_one.call_args[1]["update_dict"],
1510                 None,
1511                 "Wrong read-only projects update",
1512             )
1513 1             self.assertEqual(
1514                 self.db.set_one.call_args[1]["pull_list"],
1515                 {
1516                     "_admin.projects_read": (test_pid,),
1517                     "_admin.projects_write": (test_pid,),
1518                 },
1519                 "Wrong read/write projects update",
1520             )
1521 1             self.topic._send_msg.assert_not_called()
1522 1         with self.subTest(i=2):
1523 1             now = time()
1524 1             cvws["_admin"] = {
1525                 "projects_read": [test_pid],
1526                 "projects_write": [test_pid],
1527                 "operations": [],
1528             }
1529 1             self.db.get_one.return_value = cvws
1530 1             oid = self.topic.delete(self.fake_session, cid)
1531 1             self.assertEqual(oid, cid + ":0", "Wrong operation identifier")
1532 1             self.assertEqual(
1533                 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1534             )
1535 1             self.assertEqual(
1536                 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1537             )
1538 1             self.assertEqual(
1539                 self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic"
1540             )
1541 1             self.assertEqual(
1542                 self.db.set_one.call_args[0][1]["_id"], cid, "Wrong user identifier"
1543             )
1544 1             self.assertEqual(
1545                 self.db.set_one.call_args[1]["update_dict"],
1546                 {"_admin.to_delete": True},
1547                 "Wrong _admin.to_delete update",
1548             )
1549 1             operation = self.db.set_one.call_args[1]["push"]["_admin.operations"]
1550 1             self.assertEqual(
1551                 operation["lcmOperationType"], "delete", "Wrong operation type"
1552             )
1553 1             self.assertEqual(
1554                 operation["operationState"], "PROCESSING", "Wrong operation state"
1555             )
1556 1             self.assertEqual(
1557                 operation["detailed-status"], "", "Wrong operation detailed status"
1558             )
1559 1             self.assertIsNone(
1560                 operation["operationParams"], "Wrong operation parameters"
1561             )
1562 1             self.assertGreater(
1563                 operation["startTime"], now, "Wrong operation start time"
1564             )
1565 1             self.assertGreater(
1566                 operation["statusEnteredTime"], now, "Wrong operation status enter time"
1567             )
1568 1             self.topic._send_msg.assert_called_once_with(
1569                 "delete", {"_id": cid, "op_id": cid + ":0"}, not_send_msg=None
1570             )
1571 1         with self.subTest(i=3):
1572 1             cvws["_admin"] = {
1573                 "projects_read": [],
1574                 "projects_write": [],
1575                 "operations": [],
1576             }
1577 1             self.db.get_one.return_value = cvws
1578 1             self.topic._send_msg.reset_mock()
1579 1             self.db.get_one.reset_mock()
1580 1             self.db.del_one.reset_mock()
1581 1             self.fake_session["force"] = True  # to force deletion
1582 1             self.fake_session["admin"] = True  # to force deletion
1583 1             self.fake_session["project_id"] = []  # to force deletion
1584 1             oid = self.topic.delete(self.fake_session, cid)
1585 1             self.assertIsNone(oid, "Wrong operation identifier")
1586 1             self.assertEqual(
1587                 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1588             )
1589 1             self.assertEqual(
1590                 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1591             )
1592 1             self.assertEqual(
1593                 self.db.del_one.call_args[0][0], self.topic.topic, "Wrong topic"
1594             )
1595 1             self.assertEqual(
1596                 self.db.del_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1597             )
1598 1             self.topic._send_msg.assert_called_once_with(
1599                 "deleted", {"_id": cid, "op_id": None}, not_send_msg=None
1600             )
1601
1602
1603 1 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_new")
1604 1 class TestVimAccountTopic(TestCase):
1605 1     def setUp(self):
1606 1         self.db = Mock(dbbase.DbBase())
1607 1         self.fs = Mock(fsbase.FsBase())
1608 1         self.msg = Mock(msgbase.MsgBase())
1609 1         self.auth = Mock(authconn.Authconn(None, None, None))
1610 1         self.topic = VimAccountTopic(self.db, self.fs, self.msg, self.auth)
1611 1         self.topic.check_quota = Mock(return_value=None)  # skip quota
1612
1613 1         self.fake_session = {
1614             "username": test_name,
1615             "project_id": (test_pid,),
1616             "method": None,
1617             "admin": True,
1618             "force": False,
1619             "public": False,
1620             "allow_show_user_project_role": True,
1621         }
1622
1623 1     def check_invalid_indata_raises_exception(self, indata, mock_common_vim_wim_sdn):
1624 1         with self.assertRaises(EngineException) as error:
1625 1             self.topic.check_conflict_on_new(self.fake_session, indata)
1626 1         mock_common_vim_wim_sdn.assert_called_with(self.fake_session, indata)
1627 1         error_msg = "Invalid config for VIM account '{}'.".format(indata["name"])
1628 1         self.assertEqual(str(error.exception), error_msg)
1629
1630 1     def test_check_conflict_on_new_vim_type_paas(self, mock_common_vim_wim_sdn):
1631 1         valid_juju_paas_config = {
1632             "paas_provider": "juju",
1633             "ca_cert_content": "file_content",
1634             "cloud": "microk8s",
1635             "cloud_credentials": "cloud_credential_name",
1636             "authorized_keys": "keys",
1637         }
1638 1         indata = {
1639             "name": "juju_paas",
1640             "vim_type": "paas",
1641             "description": None,
1642             "vim_url": "http://0.0.0.0:0",
1643             "vim_user": "some_user",
1644             "vim_password": "some_password",
1645             "vim_tenant_name": "null",
1646             "config": valid_juju_paas_config,
1647         }
1648 1         self.topic.check_conflict_on_new(self.fake_session, indata)
1649 1         mock_common_vim_wim_sdn.assert_called_once_with(self.fake_session, indata)
1650
1651 1     def test_check_conflict_on_new_vim_type_paas_config_missing(
1652         self, mock_common_vim_wim_sdn
1653     ):
1654 1         indata = {
1655             "name": "juju_paas",
1656             "vim_type": "paas",
1657             "description": None,
1658             "vim_url": "http://0.0.0.0:0",
1659             "vim_user": "some_user",
1660             "vim_password": "some_password",
1661             "vim_tenant_name": "null",
1662         }
1663 1         self.check_invalid_indata_raises_exception(indata, mock_common_vim_wim_sdn)
1664
1665 1     def test_check_conflict_on_new_vim_type_paas_invalid_config(
1666         self, mock_common_vim_wim_sdn
1667     ):
1668 1         invalid_configs = [
1669             {
1670                 "paas_provider": "some_paas_provider",
1671                 "ca_cert_content": "file_content",
1672                 "cloud": "microk8s",
1673                 "cloud_credentials": "cloud_credential_name",
1674             },
1675             {
1676                 "ca_cert_content": "file_content",
1677                 "cloud": "microk8s",
1678                 "cloud_credentials": "cloud_credential_name",
1679             },
1680             {
1681                 "paas_provider": "juju",
1682                 "cloud": "microk8s",
1683                 "cloud_credentials": "cloud_credential_name",
1684             },
1685             {
1686                 "paas_provider": "juju",
1687                 "ca_cert_content": "file_content",
1688                 "cloud_credentials": "cloud_credential_name",
1689             },
1690             {
1691                 "paas_provider": "juju",
1692                 "ca_cert_content": "file_content",
1693                 "cloud": "microk8s",
1694             },
1695             {
1696                 "some_param": None,
1697             },
1698             {},
1699         ]
1700 1         for config in invalid_configs:
1701 1             with self.subTest():
1702 1                 indata = {
1703                     "name": "juju_paas",
1704                     "vim_type": "paas",
1705                     "description": None,
1706                     "vim_url": "http://0.0.0.0:0",
1707                     "vim_user": "some_user",
1708                     "vim_password": "some_password",
1709                     "vim_tenant_name": "null",
1710                     "config": config,
1711                 }
1712 1                 self.check_invalid_indata_raises_exception(
1713                     indata, mock_common_vim_wim_sdn
1714                 )
1715
1716 1     def test_kafka_message_is_not_sent_if_paas_vim(self, mock_common_vim_wim_sdn):
1717 1         valid_juju_paas_config = {
1718             "paas_provider": "juju",
1719             "ca_cert_content": "file_content",
1720             "cloud": "microk8s",
1721             "cloud_credentials": "cloud_credential_name",
1722             "authorized_keys": "keys",
1723         }
1724 1         indata = {
1725             "name": "juju_paas",
1726             "vim_type": "paas",
1727             "description": None,
1728             "vim_url": "http://0.0.0.0:0",
1729             "vim_user": "some_user",
1730             "vim_password": "some_password",
1731             "vim_tenant_name": "null",
1732             "config": valid_juju_paas_config,
1733         }
1734 1         rollback = []
1735 1         self.topic.temporal = Mock()
1736
1737 1         self.topic.new(rollback, self.fake_session, indata)
1738 1         self.assertEqual(len(rollback), 1, "Wrong rollback length")
1739 1         self.msg.write.assert_not_called()
1740 1         self.topic.temporal.start_vim_workflow.assert_called_once()
1741
1742 1     def test_kafka_message_is_sent_if_not_paas_vim(self, mock_common_vim_wim_sdn):
1743 1         indata = {
1744             "name": "juju_paas",
1745             "vim_type": "openstack",
1746             "description": None,
1747             "vim_url": "http://0.0.0.0:0",
1748             "vim_user": "some_user",
1749             "vim_password": "some_password",
1750             "vim_tenant_name": "null",
1751         }
1752 1         rollback = []
1753
1754 1         self.topic.new(rollback, self.fake_session, indata)
1755 1         self.assertEqual(len(rollback), 1, "Wrong rollback length")
1756 1         mock_common_vim_wim_sdn.assert_called_once_with(self.fake_session, indata)
1757 1         self.msg.write.assert_called_once_with("vim_account", "created", ANY)
1758
1759
1760 1 if __name__ == "__main__":
1761 0     unittest.main()