Addition of PaaS
[osm/NBI.git] / osm_nbi / tests / test_admin_topics.py
1 #! /usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 __author__ = "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com"
18 __date__ = "$2019-10-019"
19
20 import unittest
21 from unittest import TestCase
22 from unittest.mock import Mock, patch, call
23 from uuid import uuid4
24 from http import HTTPStatus
25 from time import time
26 from random import randint
27 from osm_common import dbbase, fsbase, msgbase
28 from osm_nbi import authconn, validation
29 from osm_nbi.admin_topics import (
30 ProjectTopicAuth,
31 RoleTopicAuth,
32 UserTopicAuth,
33 CommonVimWimSdn,
34 VcaTopic,
35 PaasTopic,
36 )
37 from osm_nbi.engine import EngineException
38 from osm_nbi.authconn import AuthconnNotFoundException
39
40
41 test_pid = str(uuid4())
42 test_name = "test-user"
43
44
45 def norm(str):
46 """Normalize string for checking"""
47 return " ".join(str.strip().split()).lower()
48
49
50 class TestVcaTopic(TestCase):
51 def setUp(self):
52 self.db = Mock(dbbase.DbBase())
53 self.fs = Mock(fsbase.FsBase())
54 self.msg = Mock(msgbase.MsgBase())
55 self.auth = Mock(authconn.Authconn(None, None, None))
56 self.vca_topic = VcaTopic(self.db, self.fs, self.msg, self.auth)
57
58 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_new")
59 def test_format_on_new(self, mock_super_format_on_new):
60 content = {
61 "_id": "id",
62 "secret": "encrypted_secret",
63 "cacert": "encrypted_cacert",
64 }
65 self.db.encrypt.side_effect = ["secret", "cacert"]
66 mock_super_format_on_new.return_value = "1234"
67
68 oid = self.vca_topic.format_on_new(content)
69
70 self.assertEqual(oid, "1234")
71 self.assertEqual(content["secret"], "secret")
72 self.assertEqual(content["cacert"], "cacert")
73 self.db.encrypt.assert_has_calls(
74 [
75 call("encrypted_secret", schema_version="1.11", salt="id"),
76 call("encrypted_cacert", schema_version="1.11", salt="id"),
77 ]
78 )
79 mock_super_format_on_new.assert_called_with(content, None, False)
80
81 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_edit")
82 def test_format_on_edit(self, mock_super_format_on_edit):
83 edit_content = {
84 "_id": "id",
85 "secret": "encrypted_secret",
86 "cacert": "encrypted_cacert",
87 }
88 final_content = {
89 "_id": "id",
90 "schema_version": "1.11",
91 }
92 self.db.encrypt.side_effect = ["secret", "cacert"]
93 mock_super_format_on_edit.return_value = "1234"
94
95 oid = self.vca_topic.format_on_edit(final_content, edit_content)
96
97 self.assertEqual(oid, "1234")
98 self.assertEqual(final_content["secret"], "secret")
99 self.assertEqual(final_content["cacert"], "cacert")
100 self.db.encrypt.assert_has_calls(
101 [
102 call("encrypted_secret", schema_version="1.11", salt="id"),
103 call("encrypted_cacert", schema_version="1.11", salt="id"),
104 ]
105 )
106 mock_super_format_on_edit.assert_called()
107
108 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
109 def test_check_conflict_on_del(self, mock_check_conflict_on_del):
110 session = {
111 "project_id": "project-id",
112 "force": False,
113 }
114 _id = "vca-id"
115 db_content = {}
116
117 self.db.get_list.return_value = None
118
119 self.vca_topic.check_conflict_on_del(session, _id, db_content)
120
121 self.db.get_list.assert_called_with(
122 "vim_accounts",
123 {"vca": _id, "_admin.projects_read.cont": "project-id"},
124 )
125 mock_check_conflict_on_del.assert_called_with(session, _id, db_content)
126
127 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
128 def test_check_conflict_on_del_force(self, mock_check_conflict_on_del):
129 session = {
130 "project_id": "project-id",
131 "force": True,
132 }
133 _id = "vca-id"
134 db_content = {}
135
136 self.vca_topic.check_conflict_on_del(session, _id, db_content)
137
138 self.db.get_list.assert_not_called()
139 mock_check_conflict_on_del.assert_not_called()
140
141 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
142 def test_check_conflict_on_del_with_conflict(self, mock_check_conflict_on_del):
143 session = {
144 "project_id": "project-id",
145 "force": False,
146 }
147 _id = "vca-id"
148 db_content = {}
149
150 self.db.get_list.return_value = {"_id": "vim", "vca": "vca-id"}
151
152 with self.assertRaises(EngineException) as context:
153 self.vca_topic.check_conflict_on_del(session, _id, db_content)
154 self.assertEqual(
155 context.exception,
156 EngineException(
157 "There is at least one VIM account using this vca",
158 http_code=HTTPStatus.CONFLICT,
159 ),
160 )
161
162 self.db.get_list.assert_called_with(
163 "vim_accounts",
164 {"vca": _id, "_admin.projects_read.cont": "project-id"},
165 )
166 mock_check_conflict_on_del.assert_not_called()
167
168
169 class TestPaaSTopic(TestCase):
170 def setUp(self):
171 self.db = Mock(dbbase.DbBase())
172 self.fs = Mock(fsbase.FsBase())
173 self.msg = Mock(msgbase.MsgBase())
174 self.auth = Mock(authconn.Authconn(None, None, None))
175 self.paas_topic = PaasTopic(self.db, self.fs, self.msg, self.auth)
176
177 def test_format_on_new(self):
178 content = {
179 "_id": "id",
180 "secret": "secret_to_encrypt",
181 }
182 self.db.encrypt.side_effect = ["encrypted_secret"]
183
184 expecte_oid = "id:0"
185 expected_num_operations = 1
186 oid = self.paas_topic.format_on_new(content)
187
188 self.assertEqual(oid, expecte_oid)
189 self.assertEqual(content["secret"], "encrypted_secret")
190 self.assertEqual(content["_admin"]["operationalState"], "PROCESSING")
191 self.assertEqual(content["_admin"]["current_operation"], None)
192 self.assertEqual(len(content["_admin"]["operations"]), expected_num_operations)
193 self.assertEqual(
194 content["_admin"]["operations"][0]["lcmOperationType"], "create"
195 )
196 self.db.encrypt.assert_called_with(
197 "secret_to_encrypt", schema_version="1.11", salt="id"
198 )
199
200 @patch("osm_nbi.base_topic.BaseTopic._get_project_filter")
201 def test_check_conflict_on_new(self, mock_get_project_filter):
202 indata = {"name": "new_paas_name"}
203 session = {}
204 mock_get_project_filter.return_value = {}
205 self.db.get_one.return_value = None
206 self.paas_topic.check_conflict_on_new(session, indata)
207
208 @patch("osm_nbi.base_topic.BaseTopic._get_project_filter")
209 def test_check_conflict_on_new_raise_exception(self, mock_get_project_filter):
210 indata = {"name": "new_paas_name"}
211 session = {}
212 mock_get_project_filter.return_value = {}
213 self.db.get_one.return_value = ["Found_PaaS"]
214 with self.assertRaises(EngineException):
215 self.paas_topic.check_conflict_on_new(session, indata)
216
217 @patch("osm_nbi.base_topic.BaseTopic._get_project_filter")
218 def test_check_conflict_on_edit(self, mock_get_project_filter):
219 edit_content = {"name": "new_paas_name"}
220 final_content = {}
221 session = {"force": None}
222 mock_get_project_filter.return_value = {}
223 self.db.get_one.return_value = None
224 self.paas_topic.check_conflict_on_edit(
225 session, final_content, edit_content, "id"
226 )
227
228 @patch("osm_nbi.base_topic.BaseTopic._get_project_filter")
229 def test_check_conflict_on_edit_raise_exception(self, mock_get_project_filter):
230 edit_content = {"name": "new_paas_name"}
231 final_content = {}
232 session = {"force": None}
233 mock_get_project_filter.return_value = {}
234 self.db.get_one.return_value = ["Found_PaaS"]
235 with self.assertRaises(EngineException):
236 self.paas_topic.check_conflict_on_edit(
237 session, final_content, edit_content, "id"
238 )
239
240 def test_format_on_edit(self):
241 edit_content = {
242 "_id": "id",
243 "secret": "secret_to_encrypt",
244 }
245 final_content = {
246 "_id": "id",
247 "_admin": {"operations": [{"lcmOperationType": "create"}]},
248 "schema_version": "1.11",
249 }
250 self.db.encrypt.side_effect = ["encrypted_secret"]
251 expected_oid = "id:1"
252 expected_num_operations = 2
253 print(self.paas_topic.password_to_encrypt)
254 oid = self.paas_topic.format_on_edit(final_content, edit_content)
255
256 self.assertEqual(oid, expected_oid)
257 self.assertEqual(final_content["secret"], "encrypted_secret")
258 self.assertEqual(
259 len(final_content["_admin"]["operations"]), expected_num_operations
260 )
261 self.assertEqual(final_content["_admin"]["operationalState"], "PROCESSING")
262 self.assertEqual(final_content["_admin"]["detailed-status"], "Editing")
263 self.db.encrypt.assert_called_with(
264 "secret_to_encrypt", schema_version="1.11", salt="id"
265 )
266
267
268 class Test_ProjectTopicAuth(TestCase):
269 @classmethod
270 def setUpClass(cls):
271 cls.test_name = "test-project-topic"
272
273 def setUp(self):
274 self.db = Mock(dbbase.DbBase())
275 self.fs = Mock(fsbase.FsBase())
276 self.msg = Mock(msgbase.MsgBase())
277 self.auth = Mock(authconn.Authconn(None, None, None))
278 self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth)
279 self.fake_session = {
280 "username": self.test_name,
281 "project_id": (test_pid,),
282 "method": None,
283 "admin": True,
284 "force": False,
285 "public": False,
286 "allow_show_user_project_role": True,
287 }
288 self.topic.check_quota = Mock(return_value=None) # skip quota
289
290 def test_new_project(self):
291 with self.subTest(i=1):
292 rollback = []
293 pid1 = str(uuid4())
294 self.auth.get_project_list.return_value = []
295 self.auth.create_project.return_value = pid1
296 pid2, oid = self.topic.new(
297 rollback, self.fake_session, {"name": self.test_name, "quotas": {}}
298 )
299 self.assertEqual(len(rollback), 1, "Wrong rollback length")
300 self.assertEqual(pid2, pid1, "Wrong project identifier")
301 content = self.auth.create_project.call_args[0][0]
302 self.assertEqual(content["name"], self.test_name, "Wrong project name")
303 self.assertEqual(content["quotas"], {}, "Wrong quotas")
304 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
305 self.assertEqual(
306 content["_admin"]["modified"],
307 content["_admin"]["created"],
308 "Wrong modification time",
309 )
310 with self.subTest(i=2):
311 rollback = []
312 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
313 self.topic.new(
314 rollback,
315 self.fake_session,
316 {"name": "other-project-name", "quotas": {"baditems": 10}},
317 )
318 self.assertEqual(len(rollback), 0, "Wrong rollback length")
319 self.assertEqual(
320 e.exception.http_code,
321 HTTPStatus.UNPROCESSABLE_ENTITY,
322 "Wrong HTTP status code",
323 )
324 self.assertIn(
325 "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format(
326 "baditems"
327 ),
328 norm(str(e.exception)),
329 "Wrong exception text",
330 )
331
332 def test_edit_project(self):
333 now = time()
334 pid = str(uuid4())
335 proj = {
336 "_id": pid,
337 "name": self.test_name,
338 "_admin": {"created": now, "modified": now},
339 }
340 with self.subTest(i=1):
341 self.auth.get_project_list.side_effect = [[proj], []]
342 new_name = "new-project-name"
343 quotas = {"vnfds": randint(0, 100), "nsds": randint(0, 100)}
344 self.topic.edit(
345 self.fake_session, pid, {"name": new_name, "quotas": quotas}
346 )
347 _id, content = self.auth.update_project.call_args[0]
348 self.assertEqual(_id, pid, "Wrong project identifier")
349 self.assertEqual(content["_id"], pid, "Wrong project identifier")
350 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
351 self.assertGreater(
352 content["_admin"]["modified"], now, "Wrong modification time"
353 )
354 self.assertEqual(content["name"], new_name, "Wrong project name")
355 self.assertEqual(content["quotas"], quotas, "Wrong quotas")
356 with self.subTest(i=2):
357 new_name = "other-project-name"
358 quotas = {"baditems": randint(0, 100)}
359 self.auth.get_project_list.side_effect = [[proj], []]
360 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
361 self.topic.edit(
362 self.fake_session, pid, {"name": new_name, "quotas": quotas}
363 )
364 self.assertEqual(
365 e.exception.http_code,
366 HTTPStatus.UNPROCESSABLE_ENTITY,
367 "Wrong HTTP status code",
368 )
369 self.assertIn(
370 "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format(
371 "baditems"
372 ),
373 norm(str(e.exception)),
374 "Wrong exception text",
375 )
376
377 def test_conflict_on_new(self):
378 with self.subTest(i=1):
379 rollback = []
380 pid = str(uuid4())
381 with self.assertRaises(
382 EngineException, msg="Accepted uuid as project name"
383 ) as e:
384 self.topic.new(rollback, self.fake_session, {"name": pid})
385 self.assertEqual(len(rollback), 0, "Wrong rollback length")
386 self.assertEqual(
387 e.exception.http_code,
388 HTTPStatus.UNPROCESSABLE_ENTITY,
389 "Wrong HTTP status code",
390 )
391 self.assertIn(
392 "project name '{}' cannot have an uuid format".format(pid),
393 norm(str(e.exception)),
394 "Wrong exception text",
395 )
396 with self.subTest(i=2):
397 rollback = []
398 self.auth.get_project_list.return_value = [
399 {"_id": test_pid, "name": self.test_name}
400 ]
401 with self.assertRaises(
402 EngineException, msg="Accepted existing project name"
403 ) as e:
404 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
405 self.assertEqual(len(rollback), 0, "Wrong rollback length")
406 self.assertEqual(
407 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
408 )
409 self.assertIn(
410 "project '{}' exists".format(self.test_name),
411 norm(str(e.exception)),
412 "Wrong exception text",
413 )
414
415 def test_conflict_on_edit(self):
416 with self.subTest(i=1):
417 self.auth.get_project_list.return_value = [
418 {"_id": test_pid, "name": self.test_name}
419 ]
420 new_name = str(uuid4())
421 with self.assertRaises(
422 EngineException, msg="Accepted uuid as project name"
423 ) as e:
424 self.topic.edit(self.fake_session, test_pid, {"name": new_name})
425 self.assertEqual(
426 e.exception.http_code,
427 HTTPStatus.UNPROCESSABLE_ENTITY,
428 "Wrong HTTP status code",
429 )
430 self.assertIn(
431 "project name '{}' cannot have an uuid format".format(new_name),
432 norm(str(e.exception)),
433 "Wrong exception text",
434 )
435 with self.subTest(i=2):
436 pid = str(uuid4())
437 self.auth.get_project_list.return_value = [{"_id": pid, "name": "admin"}]
438 with self.assertRaises(
439 EngineException, msg="Accepted renaming of project 'admin'"
440 ) as e:
441 self.topic.edit(self.fake_session, pid, {"name": "new-name"})
442 self.assertEqual(
443 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
444 )
445 self.assertIn(
446 "you cannot rename project 'admin'",
447 norm(str(e.exception)),
448 "Wrong exception text",
449 )
450 with self.subTest(i=3):
451 new_name = "new-project-name"
452 self.auth.get_project_list.side_effect = [
453 [{"_id": test_pid, "name": self.test_name}],
454 [{"_id": str(uuid4()), "name": new_name}],
455 ]
456 with self.assertRaises(
457 EngineException, msg="Accepted existing project name"
458 ) as e:
459 self.topic.edit(self.fake_session, pid, {"name": new_name})
460 self.assertEqual(
461 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
462 )
463 self.assertIn(
464 "project '{}' is already used".format(new_name),
465 norm(str(e.exception)),
466 "Wrong exception text",
467 )
468
469 def test_delete_project(self):
470 with self.subTest(i=1):
471 pid = str(uuid4())
472 self.auth.get_project.return_value = {
473 "_id": pid,
474 "name": "other-project-name",
475 }
476 self.auth.delete_project.return_value = {"deleted": 1}
477 self.auth.get_user_list.return_value = []
478 self.db.get_list.return_value = []
479 rc = self.topic.delete(self.fake_session, pid)
480 self.assertEqual(rc, {"deleted": 1}, "Wrong project deletion return info")
481 self.assertEqual(
482 self.auth.get_project.call_args[0][0], pid, "Wrong project identifier"
483 )
484 self.assertEqual(
485 self.auth.delete_project.call_args[0][0],
486 pid,
487 "Wrong project identifier",
488 )
489
490 def test_conflict_on_del(self):
491 with self.subTest(i=1):
492 self.auth.get_project.return_value = {
493 "_id": test_pid,
494 "name": self.test_name,
495 }
496 with self.assertRaises(
497 EngineException, msg="Accepted deletion of own project"
498 ) as e:
499 self.topic.delete(self.fake_session, self.test_name)
500 self.assertEqual(
501 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
502 )
503 self.assertIn(
504 "you cannot delete your own project",
505 norm(str(e.exception)),
506 "Wrong exception text",
507 )
508 with self.subTest(i=2):
509 self.auth.get_project.return_value = {"_id": str(uuid4()), "name": "admin"}
510 with self.assertRaises(
511 EngineException, msg="Accepted deletion of project 'admin'"
512 ) as e:
513 self.topic.delete(self.fake_session, "admin")
514 self.assertEqual(
515 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
516 )
517 self.assertIn(
518 "you cannot delete project 'admin'",
519 norm(str(e.exception)),
520 "Wrong exception text",
521 )
522 with self.subTest(i=3):
523 pid = str(uuid4())
524 name = "other-project-name"
525 self.auth.get_project.return_value = {"_id": pid, "name": name}
526 self.auth.get_user_list.return_value = [
527 {
528 "_id": str(uuid4()),
529 "username": self.test_name,
530 "project_role_mappings": [{"project": pid, "role": str(uuid4())}],
531 }
532 ]
533 with self.assertRaises(
534 EngineException, msg="Accepted deletion of used project"
535 ) as e:
536 self.topic.delete(self.fake_session, pid)
537 self.assertEqual(
538 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
539 )
540 self.assertIn(
541 "project '{}' ({}) is being used by user '{}'".format(
542 name, pid, self.test_name
543 ),
544 norm(str(e.exception)),
545 "Wrong exception text",
546 )
547 with self.subTest(i=4):
548 self.auth.get_user_list.return_value = []
549 self.db.get_list.return_value = [
550 {
551 "_id": str(uuid4()),
552 "id": self.test_name,
553 "_admin": {"projects_read": [pid], "projects_write": []},
554 }
555 ]
556 with self.assertRaises(
557 EngineException, msg="Accepted deletion of used project"
558 ) as e:
559 self.topic.delete(self.fake_session, pid)
560 self.assertEqual(
561 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
562 )
563 self.assertIn(
564 "project '{}' ({}) is being used by {} '{}'".format(
565 name, pid, "vnf descriptor", self.test_name
566 ),
567 norm(str(e.exception)),
568 "Wrong exception text",
569 )
570
571
572 class Test_RoleTopicAuth(TestCase):
573 @classmethod
574 def setUpClass(cls):
575 cls.test_name = "test-role-topic"
576 cls.test_operations = ["tokens:get"]
577
578 def setUp(self):
579 self.db = Mock(dbbase.DbBase())
580 self.fs = Mock(fsbase.FsBase())
581 self.msg = Mock(msgbase.MsgBase())
582 self.auth = Mock(authconn.Authconn(None, None, None))
583 self.auth.role_permissions = self.test_operations
584 self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth)
585 self.fake_session = {
586 "username": test_name,
587 "project_id": (test_pid,),
588 "method": None,
589 "admin": True,
590 "force": False,
591 "public": False,
592 "allow_show_user_project_role": True,
593 }
594 self.topic.check_quota = Mock(return_value=None) # skip quota
595
596 def test_new_role(self):
597 with self.subTest(i=1):
598 rollback = []
599 rid1 = str(uuid4())
600 perms_in = {"tokens": True}
601 perms_out = {"default": False, "admin": False, "tokens": True}
602 self.auth.get_role_list.return_value = []
603 self.auth.create_role.return_value = rid1
604 rid2, oid = self.topic.new(
605 rollback,
606 self.fake_session,
607 {"name": self.test_name, "permissions": perms_in},
608 )
609 self.assertEqual(len(rollback), 1, "Wrong rollback length")
610 self.assertEqual(rid2, rid1, "Wrong project identifier")
611 content = self.auth.create_role.call_args[0][0]
612 self.assertEqual(content["name"], self.test_name, "Wrong role name")
613 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
614 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
615 self.assertEqual(
616 content["_admin"]["modified"],
617 content["_admin"]["created"],
618 "Wrong modification time",
619 )
620 with self.subTest(i=2):
621 rollback = []
622 with self.assertRaises(
623 EngineException, msg="Accepted wrong permissions"
624 ) as e:
625 self.topic.new(
626 rollback,
627 self.fake_session,
628 {"name": "other-role-name", "permissions": {"projects": True}},
629 )
630 self.assertEqual(len(rollback), 0, "Wrong rollback length")
631 self.assertEqual(
632 e.exception.http_code,
633 HTTPStatus.UNPROCESSABLE_ENTITY,
634 "Wrong HTTP status code",
635 )
636 self.assertIn(
637 "invalid permission '{}'".format("projects"),
638 norm(str(e.exception)),
639 "Wrong exception text",
640 )
641
642 def test_edit_role(self):
643 now = time()
644 rid = str(uuid4())
645 role = {
646 "_id": rid,
647 "name": self.test_name,
648 "permissions": {"tokens": True},
649 "_admin": {"created": now, "modified": now},
650 }
651 with self.subTest(i=1):
652 self.auth.get_role_list.side_effect = [[role], []]
653 self.auth.get_role.return_value = role
654 new_name = "new-role-name"
655 perms_in = {"tokens": False, "tokens:get": True}
656 perms_out = {
657 "default": False,
658 "admin": False,
659 "tokens": False,
660 "tokens:get": True,
661 }
662 self.topic.edit(
663 self.fake_session, rid, {"name": new_name, "permissions": perms_in}
664 )
665 content = self.auth.update_role.call_args[0][0]
666 self.assertEqual(content["_id"], rid, "Wrong role identifier")
667 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
668 self.assertGreater(
669 content["_admin"]["modified"], now, "Wrong modification time"
670 )
671 self.assertEqual(content["name"], new_name, "Wrong role name")
672 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
673 with self.subTest(i=2):
674 new_name = "other-role-name"
675 perms_in = {"tokens": False, "tokens:post": True}
676 self.auth.get_role_list.side_effect = [[role], []]
677 with self.assertRaises(
678 EngineException, msg="Accepted wrong permissions"
679 ) as e:
680 self.topic.edit(
681 self.fake_session, rid, {"name": new_name, "permissions": perms_in}
682 )
683 self.assertEqual(
684 e.exception.http_code,
685 HTTPStatus.UNPROCESSABLE_ENTITY,
686 "Wrong HTTP status code",
687 )
688 self.assertIn(
689 "invalid permission '{}'".format("tokens:post"),
690 norm(str(e.exception)),
691 "Wrong exception text",
692 )
693
694 def test_delete_role(self):
695 with self.subTest(i=1):
696 rid = str(uuid4())
697 role = {"_id": rid, "name": "other-role-name"}
698 self.auth.get_role_list.return_value = [role]
699 self.auth.get_role.return_value = role
700 self.auth.delete_role.return_value = {"deleted": 1}
701 self.auth.get_user_list.return_value = []
702 rc = self.topic.delete(self.fake_session, rid)
703 self.assertEqual(rc, {"deleted": 1}, "Wrong role deletion return info")
704 self.assertEqual(
705 self.auth.get_role_list.call_args[0][0]["_id"],
706 rid,
707 "Wrong role identifier",
708 )
709 self.assertEqual(
710 self.auth.get_role.call_args[0][0], rid, "Wrong role identifier"
711 )
712 self.assertEqual(
713 self.auth.delete_role.call_args[0][0], rid, "Wrong role identifier"
714 )
715
716 def test_conflict_on_new(self):
717 with self.subTest(i=1):
718 rollback = []
719 rid = str(uuid4())
720 with self.assertRaises(
721 EngineException, msg="Accepted uuid as role name"
722 ) as e:
723 self.topic.new(rollback, self.fake_session, {"name": rid})
724 self.assertEqual(len(rollback), 0, "Wrong rollback length")
725 self.assertEqual(
726 e.exception.http_code,
727 HTTPStatus.UNPROCESSABLE_ENTITY,
728 "Wrong HTTP status code",
729 )
730 self.assertIn(
731 "role name '{}' cannot have an uuid format".format(rid),
732 norm(str(e.exception)),
733 "Wrong exception text",
734 )
735 with self.subTest(i=2):
736 rollback = []
737 self.auth.get_role_list.return_value = [
738 {"_id": str(uuid4()), "name": self.test_name}
739 ]
740 with self.assertRaises(
741 EngineException, msg="Accepted existing role name"
742 ) as e:
743 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
744 self.assertEqual(len(rollback), 0, "Wrong rollback length")
745 self.assertEqual(
746 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
747 )
748 self.assertIn(
749 "role name '{}' exists".format(self.test_name),
750 norm(str(e.exception)),
751 "Wrong exception text",
752 )
753
754 def test_conflict_on_edit(self):
755 rid = str(uuid4())
756 with self.subTest(i=1):
757 self.auth.get_role_list.return_value = [
758 {"_id": rid, "name": self.test_name, "permissions": {}}
759 ]
760 new_name = str(uuid4())
761 with self.assertRaises(
762 EngineException, msg="Accepted uuid as role name"
763 ) as e:
764 self.topic.edit(self.fake_session, rid, {"name": new_name})
765 self.assertEqual(
766 e.exception.http_code,
767 HTTPStatus.UNPROCESSABLE_ENTITY,
768 "Wrong HTTP status code",
769 )
770 self.assertIn(
771 "role name '{}' cannot have an uuid format".format(new_name),
772 norm(str(e.exception)),
773 "Wrong exception text",
774 )
775 for i, role_name in enumerate(["system_admin", "project_admin"], start=2):
776 with self.subTest(i=i):
777 rid = str(uuid4())
778 self.auth.get_role.return_value = {
779 "_id": rid,
780 "name": role_name,
781 "permissions": {},
782 }
783 with self.assertRaises(
784 EngineException,
785 msg="Accepted renaming of role '{}'".format(role_name),
786 ) as e:
787 self.topic.edit(self.fake_session, rid, {"name": "new-name"})
788 self.assertEqual(
789 e.exception.http_code,
790 HTTPStatus.FORBIDDEN,
791 "Wrong HTTP status code",
792 )
793 self.assertIn(
794 "you cannot rename role '{}'".format(role_name),
795 norm(str(e.exception)),
796 "Wrong exception text",
797 )
798 with self.subTest(i=i + 1):
799 new_name = "new-role-name"
800 self.auth.get_role_list.side_effect = [
801 [{"_id": rid, "name": self.test_name, "permissions": {}}],
802 [{"_id": str(uuid4()), "name": new_name, "permissions": {}}],
803 ]
804 self.auth.get_role.return_value = {
805 "_id": rid,
806 "name": self.test_name,
807 "permissions": {},
808 }
809 with self.assertRaises(
810 EngineException, msg="Accepted existing role name"
811 ) as e:
812 self.topic.edit(self.fake_session, rid, {"name": new_name})
813 self.assertEqual(
814 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
815 )
816 self.assertIn(
817 "role name '{}' exists".format(new_name),
818 norm(str(e.exception)),
819 "Wrong exception text",
820 )
821
822 def test_conflict_on_del(self):
823 for i, role_name in enumerate(["system_admin", "project_admin"], start=1):
824 with self.subTest(i=i):
825 rid = str(uuid4())
826 role = {"_id": rid, "name": role_name}
827 self.auth.get_role_list.return_value = [role]
828 self.auth.get_role.return_value = role
829 with self.assertRaises(
830 EngineException,
831 msg="Accepted deletion of role '{}'".format(role_name),
832 ) as e:
833 self.topic.delete(self.fake_session, rid)
834 self.assertEqual(
835 e.exception.http_code,
836 HTTPStatus.FORBIDDEN,
837 "Wrong HTTP status code",
838 )
839 self.assertIn(
840 "you cannot delete role '{}'".format(role_name),
841 norm(str(e.exception)),
842 "Wrong exception text",
843 )
844 with self.subTest(i=i + 1):
845 rid = str(uuid4())
846 name = "other-role-name"
847 role = {"_id": rid, "name": name}
848 self.auth.get_role_list.return_value = [role]
849 self.auth.get_role.return_value = role
850 self.auth.get_user_list.return_value = [
851 {
852 "_id": str(uuid4()),
853 "username": self.test_name,
854 "project_role_mappings": [{"project": str(uuid4()), "role": rid}],
855 }
856 ]
857 with self.assertRaises(
858 EngineException, msg="Accepted deletion of used role"
859 ) as e:
860 self.topic.delete(self.fake_session, rid)
861 self.assertEqual(
862 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
863 )
864 self.assertIn(
865 "role '{}' ({}) is being used by user '{}'".format(
866 name, rid, self.test_name
867 ),
868 norm(str(e.exception)),
869 "Wrong exception text",
870 )
871
872
873 class Test_UserTopicAuth(TestCase):
874 @classmethod
875 def setUpClass(cls):
876 cls.test_name = "test-user-topic"
877
878 def setUp(self):
879 self.db = Mock(dbbase.DbBase())
880 self.fs = Mock(fsbase.FsBase())
881 self.msg = Mock(msgbase.MsgBase())
882 self.auth = Mock(authconn.Authconn(None, None, None))
883 self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth)
884 self.fake_session = {
885 "username": test_name,
886 "project_id": (test_pid,),
887 "method": None,
888 "admin": True,
889 "force": False,
890 "public": False,
891 "allow_show_user_project_role": True,
892 }
893 self.topic.check_quota = Mock(return_value=None) # skip quota
894
895 def test_new_user(self):
896 uid1 = str(uuid4())
897 pid = str(uuid4())
898 self.auth.get_user_list.return_value = []
899 self.auth.get_project.return_value = {"_id": pid, "name": "some_project"}
900 self.auth.create_user.return_value = {"_id": uid1, "username": self.test_name}
901 with self.subTest(i=1):
902 rollback = []
903 rid = str(uuid4())
904 self.auth.get_role.return_value = {"_id": rid, "name": "some_role"}
905 prms_in = [{"project": "some_project", "role": "some_role"}]
906 prms_out = [{"project": pid, "role": rid}]
907 uid2, oid = self.topic.new(
908 rollback,
909 self.fake_session,
910 {
911 "username": self.test_name,
912 "password": self.test_name,
913 "project_role_mappings": prms_in,
914 },
915 )
916 self.assertEqual(len(rollback), 1, "Wrong rollback length")
917 self.assertEqual(uid2, uid1, "Wrong project identifier")
918 content = self.auth.create_user.call_args[0][0]
919 self.assertEqual(content["username"], self.test_name, "Wrong project name")
920 self.assertEqual(content["password"], self.test_name, "Wrong password")
921 self.assertEqual(
922 content["project_role_mappings"],
923 prms_out,
924 "Wrong project-role mappings",
925 )
926 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
927 self.assertEqual(
928 content["_admin"]["modified"],
929 content["_admin"]["created"],
930 "Wrong modification time",
931 )
932 with self.subTest(i=2):
933 rollback = []
934 def_rid = str(uuid4())
935 def_role = {"_id": def_rid, "name": "project_admin"}
936 self.auth.get_role.return_value = def_role
937 self.auth.get_role_list.return_value = [def_role]
938 prms_out = [{"project": pid, "role": def_rid}]
939 uid2, oid = self.topic.new(
940 rollback,
941 self.fake_session,
942 {
943 "username": self.test_name,
944 "password": self.test_name,
945 "projects": ["some_project"],
946 },
947 )
948 self.assertEqual(len(rollback), 1, "Wrong rollback length")
949 self.assertEqual(uid2, uid1, "Wrong project identifier")
950 content = self.auth.create_user.call_args[0][0]
951 self.assertEqual(content["username"], self.test_name, "Wrong project name")
952 self.assertEqual(content["password"], self.test_name, "Wrong password")
953 self.assertEqual(
954 content["project_role_mappings"],
955 prms_out,
956 "Wrong project-role mappings",
957 )
958 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
959 self.assertEqual(
960 content["_admin"]["modified"],
961 content["_admin"]["created"],
962 "Wrong modification time",
963 )
964 with self.subTest(i=3):
965 rollback = []
966 with self.assertRaises(
967 EngineException, msg="Accepted wrong project-role mappings"
968 ) as e:
969 self.topic.new(
970 rollback,
971 self.fake_session,
972 {
973 "username": "other-project-name",
974 "password": "other-password",
975 "project_role_mappings": [{}],
976 },
977 )
978 self.assertEqual(len(rollback), 0, "Wrong rollback length")
979 self.assertEqual(
980 e.exception.http_code,
981 HTTPStatus.UNPROCESSABLE_ENTITY,
982 "Wrong HTTP status code",
983 )
984 self.assertIn(
985 "format error at '{}' '{}'".format(
986 "project_role_mappings:{}", "'{}' is a required property"
987 ).format(0, "project"),
988 norm(str(e.exception)),
989 "Wrong exception text",
990 )
991 with self.subTest(i=4):
992 rollback = []
993 with self.assertRaises(EngineException, msg="Accepted wrong projects") as e:
994 self.topic.new(
995 rollback,
996 self.fake_session,
997 {
998 "username": "other-project-name",
999 "password": "other-password",
1000 "projects": [],
1001 },
1002 )
1003 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1004 self.assertEqual(
1005 e.exception.http_code,
1006 HTTPStatus.UNPROCESSABLE_ENTITY,
1007 "Wrong HTTP status code",
1008 )
1009 self.assertIn(
1010 "format error at '{}' '{}'".format(
1011 "projects", "{} is too short"
1012 ).format([]),
1013 norm(str(e.exception)),
1014 "Wrong exception text",
1015 )
1016
1017 def test_edit_user(self):
1018 now = time()
1019 uid = str(uuid4())
1020 pid1 = str(uuid4())
1021 rid1 = str(uuid4())
1022 prms = [
1023 {
1024 "project": pid1,
1025 "project_name": "project-1",
1026 "role": rid1,
1027 "role_name": "role-1",
1028 }
1029 ]
1030 user = {
1031 "_id": uid,
1032 "username": self.test_name,
1033 "project_role_mappings": prms,
1034 "_admin": {"created": now, "modified": now},
1035 }
1036 with self.subTest(i=1):
1037 self.auth.get_user_list.side_effect = [[user], []]
1038 self.auth.get_user.return_value = user
1039 pid2 = str(uuid4())
1040 rid2 = str(uuid4())
1041 self.auth.get_project.side_effect = [
1042 {"_id": pid2, "name": "project-2"},
1043 {"_id": pid1, "name": "project-1"},
1044 ]
1045 self.auth.get_role.side_effect = [
1046 {"_id": rid2, "name": "role-2"},
1047 {"_id": rid1, "name": "role-1"},
1048 ]
1049 new_name = "new-user-name"
1050 new_pasw = "new-password"
1051 add_prms = [{"project": pid2, "role": rid2}]
1052 rem_prms = [{"project": pid1, "role": rid1}]
1053 self.topic.edit(
1054 self.fake_session,
1055 uid,
1056 {
1057 "username": new_name,
1058 "password": new_pasw,
1059 "add_project_role_mappings": add_prms,
1060 "remove_project_role_mappings": rem_prms,
1061 },
1062 )
1063 content = self.auth.update_user.call_args[0][0]
1064 self.assertEqual(content["_id"], uid, "Wrong user identifier")
1065 self.assertEqual(content["username"], new_name, "Wrong user name")
1066 self.assertEqual(content["password"], new_pasw, "Wrong user password")
1067 self.assertEqual(
1068 content["add_project_role_mappings"],
1069 add_prms,
1070 "Wrong project-role mappings to add",
1071 )
1072 self.assertEqual(
1073 content["remove_project_role_mappings"],
1074 prms,
1075 "Wrong project-role mappings to remove",
1076 )
1077 with self.subTest(i=2):
1078 new_name = "other-user-name"
1079 new_prms = [{}]
1080 self.auth.get_role_list.side_effect = [[user], []]
1081 self.auth.get_user_list.side_effect = [[user]]
1082 with self.assertRaises(
1083 EngineException, msg="Accepted wrong project-role mappings"
1084 ) as e:
1085 self.topic.edit(
1086 self.fake_session,
1087 uid,
1088 {"username": new_name, "project_role_mappings": new_prms},
1089 )
1090 self.assertEqual(
1091 e.exception.http_code,
1092 HTTPStatus.UNPROCESSABLE_ENTITY,
1093 "Wrong HTTP status code",
1094 )
1095 self.assertIn(
1096 "format error at '{}' '{}'".format(
1097 "project_role_mappings:{}", "'{}' is a required property"
1098 ).format(0, "project"),
1099 norm(str(e.exception)),
1100 "Wrong exception text",
1101 )
1102 with self.subTest(i=3):
1103 self.auth.get_user_list.side_effect = [[user], []]
1104 self.auth.get_user.return_value = user
1105 old_password = self.test_name
1106 new_pasw = "new-password"
1107 self.topic.edit(
1108 self.fake_session,
1109 uid,
1110 {
1111 "old_password": old_password,
1112 "password": new_pasw,
1113 },
1114 )
1115 content = self.auth.update_user.call_args[0][0]
1116 self.assertEqual(
1117 content["old_password"], old_password, "Wrong old password"
1118 )
1119 self.assertEqual(content["password"], new_pasw, "Wrong user password")
1120
1121 def test_delete_user(self):
1122 with self.subTest(i=1):
1123 uid = str(uuid4())
1124 self.fake_session["username"] = self.test_name
1125 user = user = {
1126 "_id": uid,
1127 "username": "other-user-name",
1128 "project_role_mappings": [],
1129 }
1130 self.auth.get_user.return_value = user
1131 self.auth.delete_user.return_value = {"deleted": 1}
1132 rc = self.topic.delete(self.fake_session, uid)
1133 self.assertEqual(rc, {"deleted": 1}, "Wrong user deletion return info")
1134 self.assertEqual(
1135 self.auth.get_user.call_args[0][0], uid, "Wrong user identifier"
1136 )
1137 self.assertEqual(
1138 self.auth.delete_user.call_args[0][0], uid, "Wrong user identifier"
1139 )
1140
1141 def test_conflict_on_new(self):
1142 with self.subTest(i=1):
1143 rollback = []
1144 uid = str(uuid4())
1145 with self.assertRaises(
1146 EngineException, msg="Accepted uuid as username"
1147 ) as e:
1148 self.topic.new(
1149 rollback,
1150 self.fake_session,
1151 {
1152 "username": uid,
1153 "password": self.test_name,
1154 "projects": [test_pid],
1155 },
1156 )
1157 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1158 self.assertEqual(
1159 e.exception.http_code,
1160 HTTPStatus.UNPROCESSABLE_ENTITY,
1161 "Wrong HTTP status code",
1162 )
1163 self.assertIn(
1164 "username '{}' cannot have a uuid format".format(uid),
1165 norm(str(e.exception)),
1166 "Wrong exception text",
1167 )
1168 with self.subTest(i=2):
1169 rollback = []
1170 self.auth.get_user_list.return_value = [
1171 {"_id": str(uuid4()), "username": self.test_name}
1172 ]
1173 with self.assertRaises(
1174 EngineException, msg="Accepted existing username"
1175 ) as e:
1176 self.topic.new(
1177 rollback,
1178 self.fake_session,
1179 {
1180 "username": self.test_name,
1181 "password": self.test_name,
1182 "projects": [test_pid],
1183 },
1184 )
1185 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1186 self.assertEqual(
1187 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1188 )
1189 self.assertIn(
1190 "username '{}' is already used".format(self.test_name),
1191 norm(str(e.exception)),
1192 "Wrong exception text",
1193 )
1194 with self.subTest(i=3):
1195 rollback = []
1196 self.auth.get_user_list.return_value = []
1197 self.auth.get_role_list.side_effect = [[], []]
1198 with self.assertRaises(
1199 AuthconnNotFoundException, msg="Accepted user without default role"
1200 ) as e:
1201 self.topic.new(
1202 rollback,
1203 self.fake_session,
1204 {
1205 "username": self.test_name,
1206 "password": self.test_name,
1207 "projects": [str(uuid4())],
1208 },
1209 )
1210 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1211 self.assertEqual(
1212 e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code"
1213 )
1214 self.assertIn(
1215 "can't find default role for user '{}'".format(self.test_name),
1216 norm(str(e.exception)),
1217 "Wrong exception text",
1218 )
1219
1220 def test_conflict_on_edit(self):
1221 uid = str(uuid4())
1222 with self.subTest(i=1):
1223 self.auth.get_user_list.return_value = [
1224 {"_id": uid, "username": self.test_name}
1225 ]
1226 new_name = str(uuid4())
1227 with self.assertRaises(
1228 EngineException, msg="Accepted uuid as username"
1229 ) as e:
1230 self.topic.edit(self.fake_session, uid, {"username": new_name})
1231 self.assertEqual(
1232 e.exception.http_code,
1233 HTTPStatus.UNPROCESSABLE_ENTITY,
1234 "Wrong HTTP status code",
1235 )
1236 self.assertIn(
1237 "username '{}' cannot have an uuid format".format(new_name),
1238 norm(str(e.exception)),
1239 "Wrong exception text",
1240 )
1241 with self.subTest(i=2):
1242 self.auth.get_user_list.return_value = [
1243 {"_id": uid, "username": self.test_name}
1244 ]
1245 self.auth.get_role_list.side_effect = [[], []]
1246 with self.assertRaises(
1247 AuthconnNotFoundException, msg="Accepted user without default role"
1248 ) as e:
1249 self.topic.edit(self.fake_session, uid, {"projects": [str(uuid4())]})
1250 self.assertEqual(
1251 e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code"
1252 )
1253 self.assertIn(
1254 "can't find a default role for user '{}'".format(self.test_name),
1255 norm(str(e.exception)),
1256 "Wrong exception text",
1257 )
1258 with self.subTest(i=3):
1259 admin_uid = str(uuid4())
1260 self.auth.get_user_list.return_value = [
1261 {"_id": admin_uid, "username": "admin"}
1262 ]
1263 with self.assertRaises(
1264 EngineException,
1265 msg="Accepted removing system_admin role from admin user",
1266 ) as e:
1267 self.topic.edit(
1268 self.fake_session,
1269 admin_uid,
1270 {
1271 "remove_project_role_mappings": [
1272 {"project": "admin", "role": "system_admin"}
1273 ]
1274 },
1275 )
1276 self.assertEqual(
1277 e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code"
1278 )
1279 self.assertIn(
1280 "you cannot remove system_admin role from admin user",
1281 norm(str(e.exception)),
1282 "Wrong exception text",
1283 )
1284 with self.subTest(i=4):
1285 new_name = "new-user-name"
1286 self.auth.get_user_list.side_effect = [
1287 [{"_id": uid, "name": self.test_name}],
1288 [{"_id": str(uuid4()), "name": new_name}],
1289 ]
1290 with self.assertRaises(
1291 EngineException, msg="Accepted existing username"
1292 ) as e:
1293 self.topic.edit(self.fake_session, uid, {"username": new_name})
1294 self.assertEqual(
1295 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1296 )
1297 self.assertIn(
1298 "username '{}' is already used".format(new_name),
1299 norm(str(e.exception)),
1300 "Wrong exception text",
1301 )
1302
1303 def test_conflict_on_del(self):
1304 with self.subTest(i=1):
1305 uid = str(uuid4())
1306 self.fake_session["username"] = self.test_name
1307 user = user = {
1308 "_id": uid,
1309 "username": self.test_name,
1310 "project_role_mappings": [],
1311 }
1312 self.auth.get_user.return_value = user
1313 with self.assertRaises(
1314 EngineException, msg="Accepted deletion of own user"
1315 ) as e:
1316 self.topic.delete(self.fake_session, uid)
1317 self.assertEqual(
1318 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1319 )
1320 self.assertIn(
1321 "you cannot delete your own login user",
1322 norm(str(e.exception)),
1323 "Wrong exception text",
1324 )
1325
1326
1327 class Test_CommonVimWimSdn(TestCase):
1328 @classmethod
1329 def setUpClass(cls):
1330 cls.test_name = "test-cim-topic" # CIM = Common Infrastructure Manager
1331
1332 def setUp(self):
1333 self.db = Mock(dbbase.DbBase())
1334 self.fs = Mock(fsbase.FsBase())
1335 self.msg = Mock(msgbase.MsgBase())
1336 self.auth = Mock(authconn.Authconn(None, None, None))
1337 self.topic = CommonVimWimSdn(self.db, self.fs, self.msg, self.auth)
1338 # Use WIM schemas for testing because they are the simplest
1339 self.topic._send_msg = Mock()
1340 self.topic.topic = "wims"
1341 self.topic.schema_new = validation.wim_account_new_schema
1342 self.topic.schema_edit = validation.wim_account_edit_schema
1343 self.fake_session = {
1344 "username": test_name,
1345 "project_id": (test_pid,),
1346 "method": None,
1347 "admin": True,
1348 "force": False,
1349 "public": False,
1350 "allow_show_user_project_role": True,
1351 }
1352 self.topic.check_quota = Mock(return_value=None) # skip quota
1353
1354 def test_new_cvws(self):
1355 test_url = "http://0.0.0.0:0"
1356 with self.subTest(i=1):
1357 rollback = []
1358 test_type = "fake"
1359 self.db.get_one.return_value = None
1360 self.db.create.side_effect = lambda self, content: content["_id"]
1361 cid, oid = self.topic.new(
1362 rollback,
1363 self.fake_session,
1364 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type},
1365 )
1366 self.assertEqual(len(rollback), 1, "Wrong rollback length")
1367 args = self.db.create.call_args[0]
1368 content = args[1]
1369 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
1370 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
1371 self.assertEqual(content["name"], self.test_name, "Wrong CIM name")
1372 self.assertEqual(content["wim_url"], test_url, "Wrong URL")
1373 self.assertEqual(content["wim_type"], test_type, "Wrong CIM type")
1374 self.assertEqual(content["schema_version"], "1.11", "Wrong schema version")
1375 self.assertEqual(content["op_id"], oid, "Wrong operation identifier")
1376 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
1377 self.assertEqual(
1378 content["_admin"]["modified"],
1379 content["_admin"]["created"],
1380 "Wrong modification time",
1381 )
1382 self.assertEqual(
1383 content["_admin"]["operationalState"],
1384 "PROCESSING",
1385 "Wrong operational state",
1386 )
1387 self.assertEqual(
1388 content["_admin"]["projects_read"],
1389 [test_pid],
1390 "Wrong read-only projects",
1391 )
1392 self.assertEqual(
1393 content["_admin"]["projects_write"],
1394 [test_pid],
1395 "Wrong read/write projects",
1396 )
1397 self.assertIsNone(
1398 content["_admin"]["current_operation"], "Wrong current operation"
1399 )
1400 self.assertEqual(
1401 len(content["_admin"]["operations"]), 1, "Wrong number of operations"
1402 )
1403 operation = content["_admin"]["operations"][0]
1404 self.assertEqual(
1405 operation["lcmOperationType"], "create", "Wrong operation type"
1406 )
1407 self.assertEqual(
1408 operation["operationState"], "PROCESSING", "Wrong operation state"
1409 )
1410 self.assertGreater(
1411 operation["startTime"],
1412 content["_admin"]["created"],
1413 "Wrong operation start time",
1414 )
1415 self.assertGreater(
1416 operation["statusEnteredTime"],
1417 content["_admin"]["created"],
1418 "Wrong operation status enter time",
1419 )
1420 self.assertEqual(
1421 operation["detailed-status"], "", "Wrong operation detailed status info"
1422 )
1423 self.assertIsNone(
1424 operation["operationParams"], "Wrong operation parameters"
1425 )
1426 # This test is disabled. From Feature 8030 we admit all WIM/SDN types
1427 # with self.subTest(i=2):
1428 # rollback = []
1429 # test_type = "bad_type"
1430 # with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e:
1431 # self.topic.new(rollback, self.fake_session,
1432 # {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
1433 # self.assertEqual(len(rollback), 0, "Wrong rollback length")
1434 # self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
1435 # self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type,""),
1436 # norm(str(e.exception)), "Wrong exception text")
1437
1438 def test_conflict_on_new(self):
1439 with self.subTest(i=1):
1440 rollback = []
1441 test_url = "http://0.0.0.0:0"
1442 test_type = "fake"
1443 self.db.get_one.return_value = {"_id": str(uuid4()), "name": self.test_name}
1444 with self.assertRaises(
1445 EngineException, msg="Accepted existing CIM name"
1446 ) as e:
1447 self.topic.new(
1448 rollback,
1449 self.fake_session,
1450 {
1451 "name": self.test_name,
1452 "wim_url": test_url,
1453 "wim_type": test_type,
1454 },
1455 )
1456 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1457 self.assertEqual(
1458 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1459 )
1460 self.assertIn(
1461 "name '{}' already exists for {}".format(
1462 self.test_name, self.topic.topic
1463 ),
1464 norm(str(e.exception)),
1465 "Wrong exception text",
1466 )
1467
1468 def test_edit_cvws(self):
1469 now = time()
1470 cid = str(uuid4())
1471 test_url = "http://0.0.0.0:0"
1472 test_type = "fake"
1473 cvws = {
1474 "_id": cid,
1475 "name": self.test_name,
1476 "wim_url": test_url,
1477 "wim_type": test_type,
1478 "_admin": {
1479 "created": now,
1480 "modified": now,
1481 "operations": [{"lcmOperationType": "create"}],
1482 },
1483 }
1484 with self.subTest(i=1):
1485 new_name = "new-cim-name"
1486 new_url = "https://1.1.1.1:1"
1487 new_type = "onos"
1488 self.db.get_one.side_effect = [cvws, None]
1489 self.db.replace.return_value = {"updated": 1}
1490 # self.db.encrypt.side_effect = [b64str(), b64str()]
1491 self.topic.edit(
1492 self.fake_session,
1493 cid,
1494 {"name": new_name, "wim_url": new_url, "wim_type": new_type},
1495 )
1496 args = self.db.replace.call_args[0]
1497 content = args[2]
1498 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
1499 self.assertEqual(args[1], cid, "Wrong CIM identifier")
1500 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
1501 self.assertEqual(content["name"], new_name, "Wrong CIM name")
1502 self.assertEqual(content["wim_type"], new_type, "Wrong CIM type")
1503 self.assertEqual(content["wim_url"], new_url, "Wrong URL")
1504 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
1505 self.assertGreater(
1506 content["_admin"]["modified"],
1507 content["_admin"]["created"],
1508 "Wrong modification time",
1509 )
1510 self.assertEqual(
1511 len(content["_admin"]["operations"]), 2, "Wrong number of operations"
1512 )
1513 operation = content["_admin"]["operations"][1]
1514 self.assertEqual(
1515 operation["lcmOperationType"], "edit", "Wrong operation type"
1516 )
1517 self.assertEqual(
1518 operation["operationState"], "PROCESSING", "Wrong operation state"
1519 )
1520 self.assertGreater(
1521 operation["startTime"],
1522 content["_admin"]["modified"],
1523 "Wrong operation start time",
1524 )
1525 self.assertGreater(
1526 operation["statusEnteredTime"],
1527 content["_admin"]["modified"],
1528 "Wrong operation status enter time",
1529 )
1530 self.assertEqual(
1531 operation["detailed-status"], "", "Wrong operation detailed status info"
1532 )
1533 self.assertIsNone(
1534 operation["operationParams"], "Wrong operation parameters"
1535 )
1536 with self.subTest(i=2):
1537 self.db.get_one.side_effect = [cvws]
1538 with self.assertRaises(EngineException, msg="Accepted wrong property") as e:
1539 self.topic.edit(
1540 self.fake_session,
1541 str(uuid4()),
1542 {"name": "new-name", "extra_prop": "anything"},
1543 )
1544 self.assertEqual(
1545 e.exception.http_code,
1546 HTTPStatus.UNPROCESSABLE_ENTITY,
1547 "Wrong HTTP status code",
1548 )
1549 self.assertIn(
1550 "format error '{}'".format(
1551 "additional properties are not allowed ('{}' was unexpected)"
1552 ).format("extra_prop"),
1553 norm(str(e.exception)),
1554 "Wrong exception text",
1555 )
1556
1557 def test_conflict_on_edit(self):
1558 with self.subTest(i=1):
1559 cid = str(uuid4())
1560 new_name = "new-cim-name"
1561 self.db.get_one.side_effect = [
1562 {"_id": cid, "name": self.test_name},
1563 {"_id": str(uuid4()), "name": new_name},
1564 ]
1565 with self.assertRaises(
1566 EngineException, msg="Accepted existing CIM name"
1567 ) as e:
1568 self.topic.edit(self.fake_session, cid, {"name": new_name})
1569 self.assertEqual(
1570 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1571 )
1572 self.assertIn(
1573 "name '{}' already exists for {}".format(new_name, self.topic.topic),
1574 norm(str(e.exception)),
1575 "Wrong exception text",
1576 )
1577
1578 def test_delete_cvws(self):
1579 cid = str(uuid4())
1580 ro_pid = str(uuid4())
1581 rw_pid = str(uuid4())
1582 cvws = {"_id": cid, "name": self.test_name}
1583 self.db.get_list.return_value = []
1584 with self.subTest(i=1):
1585 cvws["_admin"] = {
1586 "projects_read": [test_pid, ro_pid, rw_pid],
1587 "projects_write": [test_pid, rw_pid],
1588 }
1589 self.db.get_one.return_value = cvws
1590 oid = self.topic.delete(self.fake_session, cid)
1591 self.assertIsNone(oid, "Wrong operation identifier")
1592 self.assertEqual(
1593 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1594 )
1595 self.assertEqual(
1596 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1597 )
1598 self.assertEqual(
1599 self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic"
1600 )
1601 self.assertEqual(
1602 self.db.set_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1603 )
1604 self.assertEqual(
1605 self.db.set_one.call_args[1]["update_dict"],
1606 None,
1607 "Wrong read-only projects update",
1608 )
1609 self.assertEqual(
1610 self.db.set_one.call_args[1]["pull_list"],
1611 {
1612 "_admin.projects_read": (test_pid,),
1613 "_admin.projects_write": (test_pid,),
1614 },
1615 "Wrong read/write projects update",
1616 )
1617 self.topic._send_msg.assert_not_called()
1618 with self.subTest(i=2):
1619 now = time()
1620 cvws["_admin"] = {
1621 "projects_read": [test_pid],
1622 "projects_write": [test_pid],
1623 "operations": [],
1624 }
1625 self.db.get_one.return_value = cvws
1626 oid = self.topic.delete(self.fake_session, cid)
1627 self.assertEqual(oid, cid + ":0", "Wrong operation identifier")
1628 self.assertEqual(
1629 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1630 )
1631 self.assertEqual(
1632 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1633 )
1634 self.assertEqual(
1635 self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic"
1636 )
1637 self.assertEqual(
1638 self.db.set_one.call_args[0][1]["_id"], cid, "Wrong user identifier"
1639 )
1640 self.assertEqual(
1641 self.db.set_one.call_args[1]["update_dict"],
1642 {"_admin.to_delete": True},
1643 "Wrong _admin.to_delete update",
1644 )
1645 operation = self.db.set_one.call_args[1]["push"]["_admin.operations"]
1646 self.assertEqual(
1647 operation["lcmOperationType"], "delete", "Wrong operation type"
1648 )
1649 self.assertEqual(
1650 operation["operationState"], "PROCESSING", "Wrong operation state"
1651 )
1652 self.assertEqual(
1653 operation["detailed-status"], "", "Wrong operation detailed status"
1654 )
1655 self.assertIsNone(
1656 operation["operationParams"], "Wrong operation parameters"
1657 )
1658 self.assertGreater(
1659 operation["startTime"], now, "Wrong operation start time"
1660 )
1661 self.assertGreater(
1662 operation["statusEnteredTime"], now, "Wrong operation status enter time"
1663 )
1664 self.topic._send_msg.assert_called_once_with(
1665 "delete", {"_id": cid, "op_id": cid + ":0"}, not_send_msg=None
1666 )
1667 with self.subTest(i=3):
1668 cvws["_admin"] = {
1669 "projects_read": [],
1670 "projects_write": [],
1671 "operations": [],
1672 }
1673 self.db.get_one.return_value = cvws
1674 self.topic._send_msg.reset_mock()
1675 self.db.get_one.reset_mock()
1676 self.db.del_one.reset_mock()
1677 self.fake_session["force"] = True # to force deletion
1678 self.fake_session["admin"] = True # to force deletion
1679 self.fake_session["project_id"] = [] # to force deletion
1680 oid = self.topic.delete(self.fake_session, cid)
1681 self.assertIsNone(oid, "Wrong operation identifier")
1682 self.assertEqual(
1683 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1684 )
1685 self.assertEqual(
1686 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1687 )
1688 self.assertEqual(
1689 self.db.del_one.call_args[0][0], self.topic.topic, "Wrong topic"
1690 )
1691 self.assertEqual(
1692 self.db.del_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1693 )
1694 self.topic._send_msg.assert_called_once_with(
1695 "deleted", {"_id": cid, "op_id": None}, not_send_msg=None
1696 )
1697
1698
1699 if __name__ == "__main__":
1700 unittest.main()