Bug 1830 fixed: maps completed operations to original operation types
[osm/NBI.git] / osm_nbi / tests / test_admin_topics.py
1 #! /usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 __author__ = "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com"
18 __date__ = "$2019-10-019"
19
20 import unittest
21 from unittest import TestCase
22 from unittest.mock import Mock, patch, call
23 from uuid import uuid4
24 from http import HTTPStatus
25 from time import time
26 from random import randint
27 from osm_common import dbbase, fsbase, msgbase
28 from osm_nbi import authconn, validation
29 from osm_nbi.admin_topics import (
30 ProjectTopicAuth,
31 RoleTopicAuth,
32 UserTopicAuth,
33 CommonVimWimSdn,
34 VcaTopic,
35 )
36 from osm_nbi.engine import EngineException
37 from osm_nbi.authconn import AuthconnNotFoundException
38
39
40 test_pid = str(uuid4())
41 test_name = "test-user"
42
43
44 def norm(str):
45 """Normalize string for checking"""
46 return " ".join(str.strip().split()).lower()
47
48
49 class TestVcaTopic(TestCase):
50 def setUp(self):
51 self.db = Mock(dbbase.DbBase())
52 self.fs = Mock(fsbase.FsBase())
53 self.msg = Mock(msgbase.MsgBase())
54 self.auth = Mock(authconn.Authconn(None, None, None))
55 self.vca_topic = VcaTopic(self.db, self.fs, self.msg, self.auth)
56
57 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_new")
58 def test_format_on_new(self, mock_super_format_on_new):
59 content = {
60 "_id": "id",
61 "secret": "encrypted_secret",
62 "cacert": "encrypted_cacert",
63 }
64 self.db.encrypt.side_effect = ["secret", "cacert"]
65 mock_super_format_on_new.return_value = "1234"
66
67 oid = self.vca_topic.format_on_new(content)
68
69 self.assertEqual(oid, "1234")
70 self.assertEqual(content["secret"], "secret")
71 self.assertEqual(content["cacert"], "cacert")
72 self.db.encrypt.assert_has_calls(
73 [
74 call("encrypted_secret", schema_version="1.11", salt="id"),
75 call("encrypted_cacert", schema_version="1.11", salt="id"),
76 ]
77 )
78 mock_super_format_on_new.assert_called_with(content, None, False)
79
80 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_edit")
81 def test_format_on_edit(self, mock_super_format_on_edit):
82 edit_content = {
83 "_id": "id",
84 "secret": "encrypted_secret",
85 "cacert": "encrypted_cacert",
86 }
87 final_content = {
88 "_id": "id",
89 "schema_version": "1.11",
90 }
91 self.db.encrypt.side_effect = ["secret", "cacert"]
92 mock_super_format_on_edit.return_value = "1234"
93
94 oid = self.vca_topic.format_on_edit(final_content, edit_content)
95
96 self.assertEqual(oid, "1234")
97 self.assertEqual(final_content["secret"], "secret")
98 self.assertEqual(final_content["cacert"], "cacert")
99 self.db.encrypt.assert_has_calls(
100 [
101 call("encrypted_secret", schema_version="1.11", salt="id"),
102 call("encrypted_cacert", schema_version="1.11", salt="id"),
103 ]
104 )
105 mock_super_format_on_edit.assert_called()
106
107 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
108 def test_check_conflict_on_del(self, mock_check_conflict_on_del):
109 session = {
110 "project_id": "project-id",
111 "force": False,
112 }
113 _id = "vca-id"
114 db_content = {}
115
116 self.db.get_list.return_value = None
117
118 self.vca_topic.check_conflict_on_del(session, _id, db_content)
119
120 self.db.get_list.assert_called_with(
121 "vim_accounts",
122 {"vca": _id, "_admin.projects_read.cont": "project-id"},
123 )
124 mock_check_conflict_on_del.assert_called_with(session, _id, db_content)
125
126 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
127 def test_check_conflict_on_del_force(self, mock_check_conflict_on_del):
128 session = {
129 "project_id": "project-id",
130 "force": True,
131 }
132 _id = "vca-id"
133 db_content = {}
134
135 self.vca_topic.check_conflict_on_del(session, _id, db_content)
136
137 self.db.get_list.assert_not_called()
138 mock_check_conflict_on_del.assert_not_called()
139
140 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
141 def test_check_conflict_on_del_with_conflict(self, mock_check_conflict_on_del):
142 session = {
143 "project_id": "project-id",
144 "force": False,
145 }
146 _id = "vca-id"
147 db_content = {}
148
149 self.db.get_list.return_value = {"_id": "vim", "vca": "vca-id"}
150
151 with self.assertRaises(EngineException) as context:
152 self.vca_topic.check_conflict_on_del(session, _id, db_content)
153 self.assertEqual(
154 context.exception,
155 EngineException(
156 "There is at least one VIM account using this vca",
157 http_code=HTTPStatus.CONFLICT,
158 ),
159 )
160
161 self.db.get_list.assert_called_with(
162 "vim_accounts",
163 {"vca": _id, "_admin.projects_read.cont": "project-id"},
164 )
165 mock_check_conflict_on_del.assert_not_called()
166
167
168 class Test_ProjectTopicAuth(TestCase):
169 @classmethod
170 def setUpClass(cls):
171 cls.test_name = "test-project-topic"
172
173 def setUp(self):
174 self.db = Mock(dbbase.DbBase())
175 self.fs = Mock(fsbase.FsBase())
176 self.msg = Mock(msgbase.MsgBase())
177 self.auth = Mock(authconn.Authconn(None, None, None))
178 self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth)
179 self.fake_session = {
180 "username": self.test_name,
181 "project_id": (test_pid,),
182 "method": None,
183 "admin": True,
184 "force": False,
185 "public": False,
186 "allow_show_user_project_role": True,
187 }
188 self.topic.check_quota = Mock(return_value=None) # skip quota
189
190 def test_new_project(self):
191 with self.subTest(i=1):
192 rollback = []
193 pid1 = str(uuid4())
194 self.auth.get_project_list.return_value = []
195 self.auth.create_project.return_value = pid1
196 pid2, oid = self.topic.new(
197 rollback, self.fake_session, {"name": self.test_name, "quotas": {}}
198 )
199 self.assertEqual(len(rollback), 1, "Wrong rollback length")
200 self.assertEqual(pid2, pid1, "Wrong project identifier")
201 content = self.auth.create_project.call_args[0][0]
202 self.assertEqual(content["name"], self.test_name, "Wrong project name")
203 self.assertEqual(content["quotas"], {}, "Wrong quotas")
204 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
205 self.assertEqual(
206 content["_admin"]["modified"],
207 content["_admin"]["created"],
208 "Wrong modification time",
209 )
210 with self.subTest(i=2):
211 rollback = []
212 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
213 self.topic.new(
214 rollback,
215 self.fake_session,
216 {"name": "other-project-name", "quotas": {"baditems": 10}},
217 )
218 self.assertEqual(len(rollback), 0, "Wrong rollback length")
219 self.assertEqual(
220 e.exception.http_code,
221 HTTPStatus.UNPROCESSABLE_ENTITY,
222 "Wrong HTTP status code",
223 )
224 self.assertIn(
225 "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format(
226 "baditems"
227 ),
228 norm(str(e.exception)),
229 "Wrong exception text",
230 )
231
232 def test_edit_project(self):
233 now = time()
234 pid = str(uuid4())
235 proj = {
236 "_id": pid,
237 "name": self.test_name,
238 "_admin": {"created": now, "modified": now},
239 }
240 with self.subTest(i=1):
241 self.auth.get_project_list.side_effect = [[proj], []]
242 new_name = "new-project-name"
243 quotas = {"vnfds": randint(0, 100), "nsds": randint(0, 100)}
244 self.topic.edit(
245 self.fake_session, pid, {"name": new_name, "quotas": quotas}
246 )
247 _id, content = self.auth.update_project.call_args[0]
248 self.assertEqual(_id, pid, "Wrong project identifier")
249 self.assertEqual(content["_id"], pid, "Wrong project identifier")
250 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
251 self.assertGreater(
252 content["_admin"]["modified"], now, "Wrong modification time"
253 )
254 self.assertEqual(content["name"], new_name, "Wrong project name")
255 self.assertEqual(content["quotas"], quotas, "Wrong quotas")
256 with self.subTest(i=2):
257 new_name = "other-project-name"
258 quotas = {"baditems": randint(0, 100)}
259 self.auth.get_project_list.side_effect = [[proj], []]
260 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
261 self.topic.edit(
262 self.fake_session, pid, {"name": new_name, "quotas": quotas}
263 )
264 self.assertEqual(
265 e.exception.http_code,
266 HTTPStatus.UNPROCESSABLE_ENTITY,
267 "Wrong HTTP status code",
268 )
269 self.assertIn(
270 "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format(
271 "baditems"
272 ),
273 norm(str(e.exception)),
274 "Wrong exception text",
275 )
276
277 def test_conflict_on_new(self):
278 with self.subTest(i=1):
279 rollback = []
280 pid = str(uuid4())
281 with self.assertRaises(
282 EngineException, msg="Accepted uuid as project name"
283 ) as e:
284 self.topic.new(rollback, self.fake_session, {"name": pid})
285 self.assertEqual(len(rollback), 0, "Wrong rollback length")
286 self.assertEqual(
287 e.exception.http_code,
288 HTTPStatus.UNPROCESSABLE_ENTITY,
289 "Wrong HTTP status code",
290 )
291 self.assertIn(
292 "project name '{}' cannot have an uuid format".format(pid),
293 norm(str(e.exception)),
294 "Wrong exception text",
295 )
296 with self.subTest(i=2):
297 rollback = []
298 self.auth.get_project_list.return_value = [
299 {"_id": test_pid, "name": self.test_name}
300 ]
301 with self.assertRaises(
302 EngineException, msg="Accepted existing project name"
303 ) as e:
304 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
305 self.assertEqual(len(rollback), 0, "Wrong rollback length")
306 self.assertEqual(
307 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
308 )
309 self.assertIn(
310 "project '{}' exists".format(self.test_name),
311 norm(str(e.exception)),
312 "Wrong exception text",
313 )
314
315 def test_conflict_on_edit(self):
316 with self.subTest(i=1):
317 self.auth.get_project_list.return_value = [
318 {"_id": test_pid, "name": self.test_name}
319 ]
320 new_name = str(uuid4())
321 with self.assertRaises(
322 EngineException, msg="Accepted uuid as project name"
323 ) as e:
324 self.topic.edit(self.fake_session, test_pid, {"name": new_name})
325 self.assertEqual(
326 e.exception.http_code,
327 HTTPStatus.UNPROCESSABLE_ENTITY,
328 "Wrong HTTP status code",
329 )
330 self.assertIn(
331 "project name '{}' cannot have an uuid format".format(new_name),
332 norm(str(e.exception)),
333 "Wrong exception text",
334 )
335 with self.subTest(i=2):
336 pid = str(uuid4())
337 self.auth.get_project_list.return_value = [{"_id": pid, "name": "admin"}]
338 with self.assertRaises(
339 EngineException, msg="Accepted renaming of project 'admin'"
340 ) as e:
341 self.topic.edit(self.fake_session, pid, {"name": "new-name"})
342 self.assertEqual(
343 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
344 )
345 self.assertIn(
346 "you cannot rename project 'admin'",
347 norm(str(e.exception)),
348 "Wrong exception text",
349 )
350 with self.subTest(i=3):
351 new_name = "new-project-name"
352 self.auth.get_project_list.side_effect = [
353 [{"_id": test_pid, "name": self.test_name}],
354 [{"_id": str(uuid4()), "name": new_name}],
355 ]
356 with self.assertRaises(
357 EngineException, msg="Accepted existing project name"
358 ) as e:
359 self.topic.edit(self.fake_session, pid, {"name": new_name})
360 self.assertEqual(
361 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
362 )
363 self.assertIn(
364 "project '{}' is already used".format(new_name),
365 norm(str(e.exception)),
366 "Wrong exception text",
367 )
368
369 def test_delete_project(self):
370 with self.subTest(i=1):
371 pid = str(uuid4())
372 self.auth.get_project.return_value = {
373 "_id": pid,
374 "name": "other-project-name",
375 }
376 self.auth.delete_project.return_value = {"deleted": 1}
377 self.auth.get_user_list.return_value = []
378 self.db.get_list.return_value = []
379 rc = self.topic.delete(self.fake_session, pid)
380 self.assertEqual(rc, {"deleted": 1}, "Wrong project deletion return info")
381 self.assertEqual(
382 self.auth.get_project.call_args[0][0], pid, "Wrong project identifier"
383 )
384 self.assertEqual(
385 self.auth.delete_project.call_args[0][0],
386 pid,
387 "Wrong project identifier",
388 )
389
390 def test_conflict_on_del(self):
391 with self.subTest(i=1):
392 self.auth.get_project.return_value = {
393 "_id": test_pid,
394 "name": self.test_name,
395 }
396 with self.assertRaises(
397 EngineException, msg="Accepted deletion of own project"
398 ) as e:
399 self.topic.delete(self.fake_session, self.test_name)
400 self.assertEqual(
401 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
402 )
403 self.assertIn(
404 "you cannot delete your own project",
405 norm(str(e.exception)),
406 "Wrong exception text",
407 )
408 with self.subTest(i=2):
409 self.auth.get_project.return_value = {"_id": str(uuid4()), "name": "admin"}
410 with self.assertRaises(
411 EngineException, msg="Accepted deletion of project 'admin'"
412 ) as e:
413 self.topic.delete(self.fake_session, "admin")
414 self.assertEqual(
415 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
416 )
417 self.assertIn(
418 "you cannot delete project 'admin'",
419 norm(str(e.exception)),
420 "Wrong exception text",
421 )
422 with self.subTest(i=3):
423 pid = str(uuid4())
424 name = "other-project-name"
425 self.auth.get_project.return_value = {"_id": pid, "name": name}
426 self.auth.get_user_list.return_value = [
427 {
428 "_id": str(uuid4()),
429 "username": self.test_name,
430 "project_role_mappings": [{"project": pid, "role": str(uuid4())}],
431 }
432 ]
433 with self.assertRaises(
434 EngineException, msg="Accepted deletion of used project"
435 ) as e:
436 self.topic.delete(self.fake_session, pid)
437 self.assertEqual(
438 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
439 )
440 self.assertIn(
441 "project '{}' ({}) is being used by user '{}'".format(
442 name, pid, self.test_name
443 ),
444 norm(str(e.exception)),
445 "Wrong exception text",
446 )
447 with self.subTest(i=4):
448 self.auth.get_user_list.return_value = []
449 self.db.get_list.return_value = [
450 {
451 "_id": str(uuid4()),
452 "id": self.test_name,
453 "_admin": {"projects_read": [pid], "projects_write": []},
454 }
455 ]
456 with self.assertRaises(
457 EngineException, msg="Accepted deletion of used project"
458 ) as e:
459 self.topic.delete(self.fake_session, pid)
460 self.assertEqual(
461 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
462 )
463 self.assertIn(
464 "project '{}' ({}) is being used by {} '{}'".format(
465 name, pid, "vnf descriptor", self.test_name
466 ),
467 norm(str(e.exception)),
468 "Wrong exception text",
469 )
470
471
472 class Test_RoleTopicAuth(TestCase):
473 @classmethod
474 def setUpClass(cls):
475 cls.test_name = "test-role-topic"
476 cls.test_operations = ["tokens:get"]
477
478 def setUp(self):
479 self.db = Mock(dbbase.DbBase())
480 self.fs = Mock(fsbase.FsBase())
481 self.msg = Mock(msgbase.MsgBase())
482 self.auth = Mock(authconn.Authconn(None, None, None))
483 self.auth.role_permissions = self.test_operations
484 self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth)
485 self.fake_session = {
486 "username": test_name,
487 "project_id": (test_pid,),
488 "method": None,
489 "admin": True,
490 "force": False,
491 "public": False,
492 "allow_show_user_project_role": True,
493 }
494 self.topic.check_quota = Mock(return_value=None) # skip quota
495
496 def test_new_role(self):
497 with self.subTest(i=1):
498 rollback = []
499 rid1 = str(uuid4())
500 perms_in = {"tokens": True}
501 perms_out = {"default": False, "admin": False, "tokens": True}
502 self.auth.get_role_list.return_value = []
503 self.auth.create_role.return_value = rid1
504 rid2, oid = self.topic.new(
505 rollback,
506 self.fake_session,
507 {"name": self.test_name, "permissions": perms_in},
508 )
509 self.assertEqual(len(rollback), 1, "Wrong rollback length")
510 self.assertEqual(rid2, rid1, "Wrong project identifier")
511 content = self.auth.create_role.call_args[0][0]
512 self.assertEqual(content["name"], self.test_name, "Wrong role name")
513 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
514 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
515 self.assertEqual(
516 content["_admin"]["modified"],
517 content["_admin"]["created"],
518 "Wrong modification time",
519 )
520 with self.subTest(i=2):
521 rollback = []
522 with self.assertRaises(
523 EngineException, msg="Accepted wrong permissions"
524 ) as e:
525 self.topic.new(
526 rollback,
527 self.fake_session,
528 {"name": "other-role-name", "permissions": {"projects": True}},
529 )
530 self.assertEqual(len(rollback), 0, "Wrong rollback length")
531 self.assertEqual(
532 e.exception.http_code,
533 HTTPStatus.UNPROCESSABLE_ENTITY,
534 "Wrong HTTP status code",
535 )
536 self.assertIn(
537 "invalid permission '{}'".format("projects"),
538 norm(str(e.exception)),
539 "Wrong exception text",
540 )
541
542 def test_edit_role(self):
543 now = time()
544 rid = str(uuid4())
545 role = {
546 "_id": rid,
547 "name": self.test_name,
548 "permissions": {"tokens": True},
549 "_admin": {"created": now, "modified": now},
550 }
551 with self.subTest(i=1):
552 self.auth.get_role_list.side_effect = [[role], []]
553 self.auth.get_role.return_value = role
554 new_name = "new-role-name"
555 perms_in = {"tokens": False, "tokens:get": True}
556 perms_out = {
557 "default": False,
558 "admin": False,
559 "tokens": False,
560 "tokens:get": True,
561 }
562 self.topic.edit(
563 self.fake_session, rid, {"name": new_name, "permissions": perms_in}
564 )
565 content = self.auth.update_role.call_args[0][0]
566 self.assertEqual(content["_id"], rid, "Wrong role identifier")
567 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
568 self.assertGreater(
569 content["_admin"]["modified"], now, "Wrong modification time"
570 )
571 self.assertEqual(content["name"], new_name, "Wrong role name")
572 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
573 with self.subTest(i=2):
574 new_name = "other-role-name"
575 perms_in = {"tokens": False, "tokens:post": True}
576 self.auth.get_role_list.side_effect = [[role], []]
577 with self.assertRaises(
578 EngineException, msg="Accepted wrong permissions"
579 ) as e:
580 self.topic.edit(
581 self.fake_session, rid, {"name": new_name, "permissions": perms_in}
582 )
583 self.assertEqual(
584 e.exception.http_code,
585 HTTPStatus.UNPROCESSABLE_ENTITY,
586 "Wrong HTTP status code",
587 )
588 self.assertIn(
589 "invalid permission '{}'".format("tokens:post"),
590 norm(str(e.exception)),
591 "Wrong exception text",
592 )
593
594 def test_delete_role(self):
595 with self.subTest(i=1):
596 rid = str(uuid4())
597 role = {"_id": rid, "name": "other-role-name"}
598 self.auth.get_role_list.return_value = [role]
599 self.auth.get_role.return_value = role
600 self.auth.delete_role.return_value = {"deleted": 1}
601 self.auth.get_user_list.return_value = []
602 rc = self.topic.delete(self.fake_session, rid)
603 self.assertEqual(rc, {"deleted": 1}, "Wrong role deletion return info")
604 self.assertEqual(
605 self.auth.get_role_list.call_args[0][0]["_id"],
606 rid,
607 "Wrong role identifier",
608 )
609 self.assertEqual(
610 self.auth.get_role.call_args[0][0], rid, "Wrong role identifier"
611 )
612 self.assertEqual(
613 self.auth.delete_role.call_args[0][0], rid, "Wrong role identifier"
614 )
615
616 def test_conflict_on_new(self):
617 with self.subTest(i=1):
618 rollback = []
619 rid = str(uuid4())
620 with self.assertRaises(
621 EngineException, msg="Accepted uuid as role name"
622 ) as e:
623 self.topic.new(rollback, self.fake_session, {"name": rid})
624 self.assertEqual(len(rollback), 0, "Wrong rollback length")
625 self.assertEqual(
626 e.exception.http_code,
627 HTTPStatus.UNPROCESSABLE_ENTITY,
628 "Wrong HTTP status code",
629 )
630 self.assertIn(
631 "role name '{}' cannot have an uuid format".format(rid),
632 norm(str(e.exception)),
633 "Wrong exception text",
634 )
635 with self.subTest(i=2):
636 rollback = []
637 self.auth.get_role_list.return_value = [
638 {"_id": str(uuid4()), "name": self.test_name}
639 ]
640 with self.assertRaises(
641 EngineException, msg="Accepted existing role name"
642 ) as e:
643 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
644 self.assertEqual(len(rollback), 0, "Wrong rollback length")
645 self.assertEqual(
646 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
647 )
648 self.assertIn(
649 "role name '{}' exists".format(self.test_name),
650 norm(str(e.exception)),
651 "Wrong exception text",
652 )
653
654 def test_conflict_on_edit(self):
655 rid = str(uuid4())
656 with self.subTest(i=1):
657 self.auth.get_role_list.return_value = [
658 {"_id": rid, "name": self.test_name, "permissions": {}}
659 ]
660 new_name = str(uuid4())
661 with self.assertRaises(
662 EngineException, msg="Accepted uuid as role name"
663 ) as e:
664 self.topic.edit(self.fake_session, rid, {"name": new_name})
665 self.assertEqual(
666 e.exception.http_code,
667 HTTPStatus.UNPROCESSABLE_ENTITY,
668 "Wrong HTTP status code",
669 )
670 self.assertIn(
671 "role name '{}' cannot have an uuid format".format(new_name),
672 norm(str(e.exception)),
673 "Wrong exception text",
674 )
675 for i, role_name in enumerate(["system_admin", "project_admin"], start=2):
676 with self.subTest(i=i):
677 rid = str(uuid4())
678 self.auth.get_role.return_value = {
679 "_id": rid,
680 "name": role_name,
681 "permissions": {},
682 }
683 with self.assertRaises(
684 EngineException,
685 msg="Accepted renaming of role '{}'".format(role_name),
686 ) as e:
687 self.topic.edit(self.fake_session, rid, {"name": "new-name"})
688 self.assertEqual(
689 e.exception.http_code,
690 HTTPStatus.FORBIDDEN,
691 "Wrong HTTP status code",
692 )
693 self.assertIn(
694 "you cannot rename role '{}'".format(role_name),
695 norm(str(e.exception)),
696 "Wrong exception text",
697 )
698 with self.subTest(i=i + 1):
699 new_name = "new-role-name"
700 self.auth.get_role_list.side_effect = [
701 [{"_id": rid, "name": self.test_name, "permissions": {}}],
702 [{"_id": str(uuid4()), "name": new_name, "permissions": {}}],
703 ]
704 self.auth.get_role.return_value = {
705 "_id": rid,
706 "name": self.test_name,
707 "permissions": {},
708 }
709 with self.assertRaises(
710 EngineException, msg="Accepted existing role name"
711 ) as e:
712 self.topic.edit(self.fake_session, rid, {"name": new_name})
713 self.assertEqual(
714 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
715 )
716 self.assertIn(
717 "role name '{}' exists".format(new_name),
718 norm(str(e.exception)),
719 "Wrong exception text",
720 )
721
722 def test_conflict_on_del(self):
723 for i, role_name in enumerate(["system_admin", "project_admin"], start=1):
724 with self.subTest(i=i):
725 rid = str(uuid4())
726 role = {"_id": rid, "name": role_name}
727 self.auth.get_role_list.return_value = [role]
728 self.auth.get_role.return_value = role
729 with self.assertRaises(
730 EngineException,
731 msg="Accepted deletion of role '{}'".format(role_name),
732 ) as e:
733 self.topic.delete(self.fake_session, rid)
734 self.assertEqual(
735 e.exception.http_code,
736 HTTPStatus.FORBIDDEN,
737 "Wrong HTTP status code",
738 )
739 self.assertIn(
740 "you cannot delete role '{}'".format(role_name),
741 norm(str(e.exception)),
742 "Wrong exception text",
743 )
744 with self.subTest(i=i + 1):
745 rid = str(uuid4())
746 name = "other-role-name"
747 role = {"_id": rid, "name": name}
748 self.auth.get_role_list.return_value = [role]
749 self.auth.get_role.return_value = role
750 self.auth.get_user_list.return_value = [
751 {
752 "_id": str(uuid4()),
753 "username": self.test_name,
754 "project_role_mappings": [{"project": str(uuid4()), "role": rid}],
755 }
756 ]
757 with self.assertRaises(
758 EngineException, msg="Accepted deletion of used role"
759 ) as e:
760 self.topic.delete(self.fake_session, rid)
761 self.assertEqual(
762 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
763 )
764 self.assertIn(
765 "role '{}' ({}) is being used by user '{}'".format(
766 name, rid, self.test_name
767 ),
768 norm(str(e.exception)),
769 "Wrong exception text",
770 )
771
772
773 class Test_UserTopicAuth(TestCase):
774 @classmethod
775 def setUpClass(cls):
776 cls.test_name = "test-user-topic"
777
778 def setUp(self):
779 self.db = Mock(dbbase.DbBase())
780 self.fs = Mock(fsbase.FsBase())
781 self.msg = Mock(msgbase.MsgBase())
782 self.auth = Mock(authconn.Authconn(None, None, None))
783 self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth)
784 self.fake_session = {
785 "username": test_name,
786 "project_id": (test_pid,),
787 "method": None,
788 "admin": True,
789 "force": False,
790 "public": False,
791 "allow_show_user_project_role": True,
792 }
793 self.topic.check_quota = Mock(return_value=None) # skip quota
794
795 def test_new_user(self):
796 uid1 = str(uuid4())
797 pid = str(uuid4())
798 self.auth.get_user_list.return_value = []
799 self.auth.get_project.return_value = {"_id": pid, "name": "some_project"}
800 self.auth.create_user.return_value = {"_id": uid1, "username": self.test_name}
801 with self.subTest(i=1):
802 rollback = []
803 rid = str(uuid4())
804 self.auth.get_role.return_value = {"_id": rid, "name": "some_role"}
805 prms_in = [{"project": "some_project", "role": "some_role"}]
806 prms_out = [{"project": pid, "role": rid}]
807 uid2, oid = self.topic.new(
808 rollback,
809 self.fake_session,
810 {
811 "username": self.test_name,
812 "password": self.test_name,
813 "project_role_mappings": prms_in,
814 },
815 )
816 self.assertEqual(len(rollback), 1, "Wrong rollback length")
817 self.assertEqual(uid2, uid1, "Wrong project identifier")
818 content = self.auth.create_user.call_args[0][0]
819 self.assertEqual(content["username"], self.test_name, "Wrong project name")
820 self.assertEqual(content["password"], self.test_name, "Wrong password")
821 self.assertEqual(
822 content["project_role_mappings"],
823 prms_out,
824 "Wrong project-role mappings",
825 )
826 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
827 self.assertEqual(
828 content["_admin"]["modified"],
829 content["_admin"]["created"],
830 "Wrong modification time",
831 )
832 with self.subTest(i=2):
833 rollback = []
834 def_rid = str(uuid4())
835 def_role = {"_id": def_rid, "name": "project_admin"}
836 self.auth.get_role.return_value = def_role
837 self.auth.get_role_list.return_value = [def_role]
838 prms_out = [{"project": pid, "role": def_rid}]
839 uid2, oid = self.topic.new(
840 rollback,
841 self.fake_session,
842 {
843 "username": self.test_name,
844 "password": self.test_name,
845 "projects": ["some_project"],
846 },
847 )
848 self.assertEqual(len(rollback), 1, "Wrong rollback length")
849 self.assertEqual(uid2, uid1, "Wrong project identifier")
850 content = self.auth.create_user.call_args[0][0]
851 self.assertEqual(content["username"], self.test_name, "Wrong project name")
852 self.assertEqual(content["password"], self.test_name, "Wrong password")
853 self.assertEqual(
854 content["project_role_mappings"],
855 prms_out,
856 "Wrong project-role mappings",
857 )
858 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
859 self.assertEqual(
860 content["_admin"]["modified"],
861 content["_admin"]["created"],
862 "Wrong modification time",
863 )
864 with self.subTest(i=3):
865 rollback = []
866 with self.assertRaises(
867 EngineException, msg="Accepted wrong project-role mappings"
868 ) as e:
869 self.topic.new(
870 rollback,
871 self.fake_session,
872 {
873 "username": "other-project-name",
874 "password": "other-password",
875 "project_role_mappings": [{}],
876 },
877 )
878 self.assertEqual(len(rollback), 0, "Wrong rollback length")
879 self.assertEqual(
880 e.exception.http_code,
881 HTTPStatus.UNPROCESSABLE_ENTITY,
882 "Wrong HTTP status code",
883 )
884 self.assertIn(
885 "format error at '{}' '{}'".format(
886 "project_role_mappings:{}", "'{}' is a required property"
887 ).format(0, "project"),
888 norm(str(e.exception)),
889 "Wrong exception text",
890 )
891 with self.subTest(i=4):
892 rollback = []
893 with self.assertRaises(EngineException, msg="Accepted wrong projects") as e:
894 self.topic.new(
895 rollback,
896 self.fake_session,
897 {
898 "username": "other-project-name",
899 "password": "other-password",
900 "projects": [],
901 },
902 )
903 self.assertEqual(len(rollback), 0, "Wrong rollback length")
904 self.assertEqual(
905 e.exception.http_code,
906 HTTPStatus.UNPROCESSABLE_ENTITY,
907 "Wrong HTTP status code",
908 )
909 self.assertIn(
910 "format error at '{}' '{}'".format(
911 "projects", "{} is too short"
912 ).format([]),
913 norm(str(e.exception)),
914 "Wrong exception text",
915 )
916
917 def test_edit_user(self):
918 now = time()
919 uid = str(uuid4())
920 pid1 = str(uuid4())
921 rid1 = str(uuid4())
922 prms = [
923 {
924 "project": pid1,
925 "project_name": "project-1",
926 "role": rid1,
927 "role_name": "role-1",
928 }
929 ]
930 user = {
931 "_id": uid,
932 "username": self.test_name,
933 "project_role_mappings": prms,
934 "_admin": {"created": now, "modified": now},
935 }
936 with self.subTest(i=1):
937 self.auth.get_user_list.side_effect = [[user], []]
938 self.auth.get_user.return_value = user
939 pid2 = str(uuid4())
940 rid2 = str(uuid4())
941 self.auth.get_project.side_effect = [
942 {"_id": pid2, "name": "project-2"},
943 {"_id": pid1, "name": "project-1"},
944 ]
945 self.auth.get_role.side_effect = [
946 {"_id": rid2, "name": "role-2"},
947 {"_id": rid1, "name": "role-1"},
948 ]
949 new_name = "new-user-name"
950 new_pasw = "new-password"
951 add_prms = [{"project": pid2, "role": rid2}]
952 rem_prms = [{"project": pid1, "role": rid1}]
953 self.topic.edit(
954 self.fake_session,
955 uid,
956 {
957 "username": new_name,
958 "password": new_pasw,
959 "add_project_role_mappings": add_prms,
960 "remove_project_role_mappings": rem_prms,
961 },
962 )
963 content = self.auth.update_user.call_args[0][0]
964 self.assertEqual(content["_id"], uid, "Wrong user identifier")
965 self.assertEqual(content["username"], new_name, "Wrong user name")
966 self.assertEqual(content["password"], new_pasw, "Wrong user password")
967 self.assertEqual(
968 content["add_project_role_mappings"],
969 add_prms,
970 "Wrong project-role mappings to add",
971 )
972 self.assertEqual(
973 content["remove_project_role_mappings"],
974 prms,
975 "Wrong project-role mappings to remove",
976 )
977 with self.subTest(i=2):
978 new_name = "other-user-name"
979 new_prms = [{}]
980 self.auth.get_role_list.side_effect = [[user], []]
981 self.auth.get_user_list.side_effect = [[user]]
982 with self.assertRaises(
983 EngineException, msg="Accepted wrong project-role mappings"
984 ) as e:
985 self.topic.edit(
986 self.fake_session,
987 uid,
988 {"username": new_name, "project_role_mappings": new_prms},
989 )
990 self.assertEqual(
991 e.exception.http_code,
992 HTTPStatus.UNPROCESSABLE_ENTITY,
993 "Wrong HTTP status code",
994 )
995 self.assertIn(
996 "format error at '{}' '{}'".format(
997 "project_role_mappings:{}", "'{}' is a required property"
998 ).format(0, "project"),
999 norm(str(e.exception)),
1000 "Wrong exception text",
1001 )
1002 with self.subTest(i=3):
1003 self.auth.get_user_list.side_effect = [[user], []]
1004 self.auth.get_user.return_value = user
1005 old_password = self.test_name
1006 new_pasw = "new-password"
1007 self.topic.edit(
1008 self.fake_session,
1009 uid,
1010 {
1011 "old_password": old_password,
1012 "password": new_pasw,
1013 },
1014 )
1015 content = self.auth.update_user.call_args[0][0]
1016 self.assertEqual(content["old_password"], old_password, "Wrong old password")
1017 self.assertEqual(content["password"], new_pasw, "Wrong user password")
1018
1019 def test_delete_user(self):
1020 with self.subTest(i=1):
1021 uid = str(uuid4())
1022 self.fake_session["username"] = self.test_name
1023 user = user = {
1024 "_id": uid,
1025 "username": "other-user-name",
1026 "project_role_mappings": [],
1027 }
1028 self.auth.get_user.return_value = user
1029 self.auth.delete_user.return_value = {"deleted": 1}
1030 rc = self.topic.delete(self.fake_session, uid)
1031 self.assertEqual(rc, {"deleted": 1}, "Wrong user deletion return info")
1032 self.assertEqual(
1033 self.auth.get_user.call_args[0][0], uid, "Wrong user identifier"
1034 )
1035 self.assertEqual(
1036 self.auth.delete_user.call_args[0][0], uid, "Wrong user identifier"
1037 )
1038
1039 def test_conflict_on_new(self):
1040 with self.subTest(i=1):
1041 rollback = []
1042 uid = str(uuid4())
1043 with self.assertRaises(
1044 EngineException, msg="Accepted uuid as username"
1045 ) as e:
1046 self.topic.new(
1047 rollback,
1048 self.fake_session,
1049 {
1050 "username": uid,
1051 "password": self.test_name,
1052 "projects": [test_pid],
1053 },
1054 )
1055 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1056 self.assertEqual(
1057 e.exception.http_code,
1058 HTTPStatus.UNPROCESSABLE_ENTITY,
1059 "Wrong HTTP status code",
1060 )
1061 self.assertIn(
1062 "username '{}' cannot have a uuid format".format(uid),
1063 norm(str(e.exception)),
1064 "Wrong exception text",
1065 )
1066 with self.subTest(i=2):
1067 rollback = []
1068 self.auth.get_user_list.return_value = [
1069 {"_id": str(uuid4()), "username": self.test_name}
1070 ]
1071 with self.assertRaises(
1072 EngineException, msg="Accepted existing username"
1073 ) as e:
1074 self.topic.new(
1075 rollback,
1076 self.fake_session,
1077 {
1078 "username": self.test_name,
1079 "password": self.test_name,
1080 "projects": [test_pid],
1081 },
1082 )
1083 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1084 self.assertEqual(
1085 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1086 )
1087 self.assertIn(
1088 "username '{}' is already used".format(self.test_name),
1089 norm(str(e.exception)),
1090 "Wrong exception text",
1091 )
1092 with self.subTest(i=3):
1093 rollback = []
1094 self.auth.get_user_list.return_value = []
1095 self.auth.get_role_list.side_effect = [[], []]
1096 with self.assertRaises(
1097 AuthconnNotFoundException, msg="Accepted user without default role"
1098 ) as e:
1099 self.topic.new(
1100 rollback,
1101 self.fake_session,
1102 {
1103 "username": self.test_name,
1104 "password": self.test_name,
1105 "projects": [str(uuid4())],
1106 },
1107 )
1108 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1109 self.assertEqual(
1110 e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code"
1111 )
1112 self.assertIn(
1113 "can't find default role for user '{}'".format(self.test_name),
1114 norm(str(e.exception)),
1115 "Wrong exception text",
1116 )
1117
1118 def test_conflict_on_edit(self):
1119 uid = str(uuid4())
1120 with self.subTest(i=1):
1121 self.auth.get_user_list.return_value = [
1122 {"_id": uid, "username": self.test_name}
1123 ]
1124 new_name = str(uuid4())
1125 with self.assertRaises(
1126 EngineException, msg="Accepted uuid as username"
1127 ) as e:
1128 self.topic.edit(self.fake_session, uid, {"username": new_name})
1129 self.assertEqual(
1130 e.exception.http_code,
1131 HTTPStatus.UNPROCESSABLE_ENTITY,
1132 "Wrong HTTP status code",
1133 )
1134 self.assertIn(
1135 "username '{}' cannot have an uuid format".format(new_name),
1136 norm(str(e.exception)),
1137 "Wrong exception text",
1138 )
1139 with self.subTest(i=2):
1140 self.auth.get_user_list.return_value = [
1141 {"_id": uid, "username": self.test_name}
1142 ]
1143 self.auth.get_role_list.side_effect = [[], []]
1144 with self.assertRaises(
1145 AuthconnNotFoundException, msg="Accepted user without default role"
1146 ) as e:
1147 self.topic.edit(self.fake_session, uid, {"projects": [str(uuid4())]})
1148 self.assertEqual(
1149 e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code"
1150 )
1151 self.assertIn(
1152 "can't find a default role for user '{}'".format(self.test_name),
1153 norm(str(e.exception)),
1154 "Wrong exception text",
1155 )
1156 with self.subTest(i=3):
1157 admin_uid = str(uuid4())
1158 self.auth.get_user_list.return_value = [
1159 {"_id": admin_uid, "username": "admin"}
1160 ]
1161 with self.assertRaises(
1162 EngineException,
1163 msg="Accepted removing system_admin role from admin user",
1164 ) as e:
1165 self.topic.edit(
1166 self.fake_session,
1167 admin_uid,
1168 {
1169 "remove_project_role_mappings": [
1170 {"project": "admin", "role": "system_admin"}
1171 ]
1172 },
1173 )
1174 self.assertEqual(
1175 e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code"
1176 )
1177 self.assertIn(
1178 "you cannot remove system_admin role from admin user",
1179 norm(str(e.exception)),
1180 "Wrong exception text",
1181 )
1182 with self.subTest(i=4):
1183 new_name = "new-user-name"
1184 self.auth.get_user_list.side_effect = [
1185 [{"_id": uid, "name": self.test_name}],
1186 [{"_id": str(uuid4()), "name": new_name}],
1187 ]
1188 with self.assertRaises(
1189 EngineException, msg="Accepted existing username"
1190 ) as e:
1191 self.topic.edit(self.fake_session, uid, {"username": new_name})
1192 self.assertEqual(
1193 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1194 )
1195 self.assertIn(
1196 "username '{}' is already used".format(new_name),
1197 norm(str(e.exception)),
1198 "Wrong exception text",
1199 )
1200
1201 def test_conflict_on_del(self):
1202 with self.subTest(i=1):
1203 uid = str(uuid4())
1204 self.fake_session["username"] = self.test_name
1205 user = user = {
1206 "_id": uid,
1207 "username": self.test_name,
1208 "project_role_mappings": [],
1209 }
1210 self.auth.get_user.return_value = user
1211 with self.assertRaises(
1212 EngineException, msg="Accepted deletion of own user"
1213 ) as e:
1214 self.topic.delete(self.fake_session, uid)
1215 self.assertEqual(
1216 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1217 )
1218 self.assertIn(
1219 "you cannot delete your own login user",
1220 norm(str(e.exception)),
1221 "Wrong exception text",
1222 )
1223
1224
1225 class Test_CommonVimWimSdn(TestCase):
1226 @classmethod
1227 def setUpClass(cls):
1228 cls.test_name = "test-cim-topic" # CIM = Common Infrastructure Manager
1229
1230 def setUp(self):
1231 self.db = Mock(dbbase.DbBase())
1232 self.fs = Mock(fsbase.FsBase())
1233 self.msg = Mock(msgbase.MsgBase())
1234 self.auth = Mock(authconn.Authconn(None, None, None))
1235 self.topic = CommonVimWimSdn(self.db, self.fs, self.msg, self.auth)
1236 # Use WIM schemas for testing because they are the simplest
1237 self.topic._send_msg = Mock()
1238 self.topic.topic = "wims"
1239 self.topic.schema_new = validation.wim_account_new_schema
1240 self.topic.schema_edit = validation.wim_account_edit_schema
1241 self.fake_session = {
1242 "username": test_name,
1243 "project_id": (test_pid,),
1244 "method": None,
1245 "admin": True,
1246 "force": False,
1247 "public": False,
1248 "allow_show_user_project_role": True,
1249 }
1250 self.topic.check_quota = Mock(return_value=None) # skip quota
1251
1252 def test_new_cvws(self):
1253 test_url = "http://0.0.0.0:0"
1254 with self.subTest(i=1):
1255 rollback = []
1256 test_type = "fake"
1257 self.db.get_one.return_value = None
1258 self.db.create.side_effect = lambda self, content: content["_id"]
1259 cid, oid = self.topic.new(
1260 rollback,
1261 self.fake_session,
1262 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type},
1263 )
1264 self.assertEqual(len(rollback), 1, "Wrong rollback length")
1265 args = self.db.create.call_args[0]
1266 content = args[1]
1267 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
1268 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
1269 self.assertEqual(content["name"], self.test_name, "Wrong CIM name")
1270 self.assertEqual(content["wim_url"], test_url, "Wrong URL")
1271 self.assertEqual(content["wim_type"], test_type, "Wrong CIM type")
1272 self.assertEqual(content["schema_version"], "1.11", "Wrong schema version")
1273 self.assertEqual(content["op_id"], oid, "Wrong operation identifier")
1274 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
1275 self.assertEqual(
1276 content["_admin"]["modified"],
1277 content["_admin"]["created"],
1278 "Wrong modification time",
1279 )
1280 self.assertEqual(
1281 content["_admin"]["operationalState"],
1282 "PROCESSING",
1283 "Wrong operational state",
1284 )
1285 self.assertEqual(
1286 content["_admin"]["projects_read"],
1287 [test_pid],
1288 "Wrong read-only projects",
1289 )
1290 self.assertEqual(
1291 content["_admin"]["projects_write"],
1292 [test_pid],
1293 "Wrong read/write projects",
1294 )
1295 self.assertIsNone(
1296 content["_admin"]["current_operation"], "Wrong current operation"
1297 )
1298 self.assertEqual(
1299 len(content["_admin"]["operations"]), 1, "Wrong number of operations"
1300 )
1301 operation = content["_admin"]["operations"][0]
1302 self.assertEqual(
1303 operation["lcmOperationType"], "create", "Wrong operation type"
1304 )
1305 self.assertEqual(
1306 operation["operationState"], "PROCESSING", "Wrong operation state"
1307 )
1308 self.assertGreater(
1309 operation["startTime"],
1310 content["_admin"]["created"],
1311 "Wrong operation start time",
1312 )
1313 self.assertGreater(
1314 operation["statusEnteredTime"],
1315 content["_admin"]["created"],
1316 "Wrong operation status enter time",
1317 )
1318 self.assertEqual(
1319 operation["detailed-status"], "", "Wrong operation detailed status info"
1320 )
1321 self.assertIsNone(
1322 operation["operationParams"], "Wrong operation parameters"
1323 )
1324 # This test is disabled. From Feature 8030 we admit all WIM/SDN types
1325 # with self.subTest(i=2):
1326 # rollback = []
1327 # test_type = "bad_type"
1328 # with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e:
1329 # self.topic.new(rollback, self.fake_session,
1330 # {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
1331 # self.assertEqual(len(rollback), 0, "Wrong rollback length")
1332 # self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
1333 # self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type,""),
1334 # norm(str(e.exception)), "Wrong exception text")
1335
1336 def test_conflict_on_new(self):
1337 with self.subTest(i=1):
1338 rollback = []
1339 test_url = "http://0.0.0.0:0"
1340 test_type = "fake"
1341 self.db.get_one.return_value = {"_id": str(uuid4()), "name": self.test_name}
1342 with self.assertRaises(
1343 EngineException, msg="Accepted existing CIM name"
1344 ) as e:
1345 self.topic.new(
1346 rollback,
1347 self.fake_session,
1348 {
1349 "name": self.test_name,
1350 "wim_url": test_url,
1351 "wim_type": test_type,
1352 },
1353 )
1354 self.assertEqual(len(rollback), 0, "Wrong rollback length")
1355 self.assertEqual(
1356 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1357 )
1358 self.assertIn(
1359 "name '{}' already exists for {}".format(
1360 self.test_name, self.topic.topic
1361 ),
1362 norm(str(e.exception)),
1363 "Wrong exception text",
1364 )
1365
1366 def test_edit_cvws(self):
1367 now = time()
1368 cid = str(uuid4())
1369 test_url = "http://0.0.0.0:0"
1370 test_type = "fake"
1371 cvws = {
1372 "_id": cid,
1373 "name": self.test_name,
1374 "wim_url": test_url,
1375 "wim_type": test_type,
1376 "_admin": {
1377 "created": now,
1378 "modified": now,
1379 "operations": [{"lcmOperationType": "create"}],
1380 },
1381 }
1382 with self.subTest(i=1):
1383 new_name = "new-cim-name"
1384 new_url = "https://1.1.1.1:1"
1385 new_type = "onos"
1386 self.db.get_one.side_effect = [cvws, None]
1387 self.db.replace.return_value = {"updated": 1}
1388 # self.db.encrypt.side_effect = [b64str(), b64str()]
1389 self.topic.edit(
1390 self.fake_session,
1391 cid,
1392 {"name": new_name, "wim_url": new_url, "wim_type": new_type},
1393 )
1394 args = self.db.replace.call_args[0]
1395 content = args[2]
1396 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
1397 self.assertEqual(args[1], cid, "Wrong CIM identifier")
1398 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
1399 self.assertEqual(content["name"], new_name, "Wrong CIM name")
1400 self.assertEqual(content["wim_type"], new_type, "Wrong CIM type")
1401 self.assertEqual(content["wim_url"], new_url, "Wrong URL")
1402 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
1403 self.assertGreater(
1404 content["_admin"]["modified"],
1405 content["_admin"]["created"],
1406 "Wrong modification time",
1407 )
1408 self.assertEqual(
1409 len(content["_admin"]["operations"]), 2, "Wrong number of operations"
1410 )
1411 operation = content["_admin"]["operations"][1]
1412 self.assertEqual(
1413 operation["lcmOperationType"], "edit", "Wrong operation type"
1414 )
1415 self.assertEqual(
1416 operation["operationState"], "PROCESSING", "Wrong operation state"
1417 )
1418 self.assertGreater(
1419 operation["startTime"],
1420 content["_admin"]["modified"],
1421 "Wrong operation start time",
1422 )
1423 self.assertGreater(
1424 operation["statusEnteredTime"],
1425 content["_admin"]["modified"],
1426 "Wrong operation status enter time",
1427 )
1428 self.assertEqual(
1429 operation["detailed-status"], "", "Wrong operation detailed status info"
1430 )
1431 self.assertIsNone(
1432 operation["operationParams"], "Wrong operation parameters"
1433 )
1434 with self.subTest(i=2):
1435 self.db.get_one.side_effect = [cvws]
1436 with self.assertRaises(EngineException, msg="Accepted wrong property") as e:
1437 self.topic.edit(
1438 self.fake_session,
1439 str(uuid4()),
1440 {"name": "new-name", "extra_prop": "anything"},
1441 )
1442 self.assertEqual(
1443 e.exception.http_code,
1444 HTTPStatus.UNPROCESSABLE_ENTITY,
1445 "Wrong HTTP status code",
1446 )
1447 self.assertIn(
1448 "format error '{}'".format(
1449 "additional properties are not allowed ('{}' was unexpected)"
1450 ).format("extra_prop"),
1451 norm(str(e.exception)),
1452 "Wrong exception text",
1453 )
1454
1455 def test_conflict_on_edit(self):
1456 with self.subTest(i=1):
1457 cid = str(uuid4())
1458 new_name = "new-cim-name"
1459 self.db.get_one.side_effect = [
1460 {"_id": cid, "name": self.test_name},
1461 {"_id": str(uuid4()), "name": new_name},
1462 ]
1463 with self.assertRaises(
1464 EngineException, msg="Accepted existing CIM name"
1465 ) as e:
1466 self.topic.edit(self.fake_session, cid, {"name": new_name})
1467 self.assertEqual(
1468 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
1469 )
1470 self.assertIn(
1471 "name '{}' already exists for {}".format(new_name, self.topic.topic),
1472 norm(str(e.exception)),
1473 "Wrong exception text",
1474 )
1475
1476 def test_delete_cvws(self):
1477 cid = str(uuid4())
1478 ro_pid = str(uuid4())
1479 rw_pid = str(uuid4())
1480 cvws = {"_id": cid, "name": self.test_name}
1481 self.db.get_list.return_value = []
1482 with self.subTest(i=1):
1483 cvws["_admin"] = {
1484 "projects_read": [test_pid, ro_pid, rw_pid],
1485 "projects_write": [test_pid, rw_pid],
1486 }
1487 self.db.get_one.return_value = cvws
1488 oid = self.topic.delete(self.fake_session, cid)
1489 self.assertIsNone(oid, "Wrong operation identifier")
1490 self.assertEqual(
1491 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1492 )
1493 self.assertEqual(
1494 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1495 )
1496 self.assertEqual(
1497 self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic"
1498 )
1499 self.assertEqual(
1500 self.db.set_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1501 )
1502 self.assertEqual(
1503 self.db.set_one.call_args[1]["update_dict"],
1504 None,
1505 "Wrong read-only projects update",
1506 )
1507 self.assertEqual(
1508 self.db.set_one.call_args[1]["pull_list"],
1509 {
1510 "_admin.projects_read": (test_pid,),
1511 "_admin.projects_write": (test_pid,),
1512 },
1513 "Wrong read/write projects update",
1514 )
1515 self.topic._send_msg.assert_not_called()
1516 with self.subTest(i=2):
1517 now = time()
1518 cvws["_admin"] = {
1519 "projects_read": [test_pid],
1520 "projects_write": [test_pid],
1521 "operations": [],
1522 }
1523 self.db.get_one.return_value = cvws
1524 oid = self.topic.delete(self.fake_session, cid)
1525 self.assertEqual(oid, cid + ":0", "Wrong operation identifier")
1526 self.assertEqual(
1527 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1528 )
1529 self.assertEqual(
1530 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1531 )
1532 self.assertEqual(
1533 self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic"
1534 )
1535 self.assertEqual(
1536 self.db.set_one.call_args[0][1]["_id"], cid, "Wrong user identifier"
1537 )
1538 self.assertEqual(
1539 self.db.set_one.call_args[1]["update_dict"],
1540 {"_admin.to_delete": True},
1541 "Wrong _admin.to_delete update",
1542 )
1543 operation = self.db.set_one.call_args[1]["push"]["_admin.operations"]
1544 self.assertEqual(
1545 operation["lcmOperationType"], "delete", "Wrong operation type"
1546 )
1547 self.assertEqual(
1548 operation["operationState"], "PROCESSING", "Wrong operation state"
1549 )
1550 self.assertEqual(
1551 operation["detailed-status"], "", "Wrong operation detailed status"
1552 )
1553 self.assertIsNone(
1554 operation["operationParams"], "Wrong operation parameters"
1555 )
1556 self.assertGreater(
1557 operation["startTime"], now, "Wrong operation start time"
1558 )
1559 self.assertGreater(
1560 operation["statusEnteredTime"], now, "Wrong operation status enter time"
1561 )
1562 self.topic._send_msg.assert_called_once_with(
1563 "delete", {"_id": cid, "op_id": cid + ":0"}, not_send_msg=None
1564 )
1565 with self.subTest(i=3):
1566 cvws["_admin"] = {
1567 "projects_read": [],
1568 "projects_write": [],
1569 "operations": [],
1570 }
1571 self.db.get_one.return_value = cvws
1572 self.topic._send_msg.reset_mock()
1573 self.db.get_one.reset_mock()
1574 self.db.del_one.reset_mock()
1575 self.fake_session["force"] = True # to force deletion
1576 self.fake_session["admin"] = True # to force deletion
1577 self.fake_session["project_id"] = [] # to force deletion
1578 oid = self.topic.delete(self.fake_session, cid)
1579 self.assertIsNone(oid, "Wrong operation identifier")
1580 self.assertEqual(
1581 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
1582 )
1583 self.assertEqual(
1584 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1585 )
1586 self.assertEqual(
1587 self.db.del_one.call_args[0][0], self.topic.topic, "Wrong topic"
1588 )
1589 self.assertEqual(
1590 self.db.del_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
1591 )
1592 self.topic._send_msg.assert_called_once_with(
1593 "deleted", {"_id": cid, "op_id": None}, not_send_msg=None
1594 )
1595
1596
1597 if __name__ == "__main__":
1598 unittest.main()