| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 1 | #! /usr/bin/python3 |
| 2 | # -*- coding: utf-8 -*- |
| 3 | |
| 4 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | # you may not use this file except in compliance with the License. |
| 6 | # You may obtain a copy of the License at |
| 7 | # |
| 8 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | # |
| 10 | # Unless required by applicable law or agreed to in writing, software |
| 11 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| 13 | # implied. |
| 14 | # See the License for the specific language governing permissions and |
| 15 | # limitations under the License. |
| 16 | |
| 17 | __author__ = "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com" |
| 18 | __date__ = "$2019-10-019" |
| 19 | |
| 20 | import unittest |
| 21 | from unittest import TestCase |
| 22 | from unittest.mock import Mock |
| 23 | from uuid import uuid4 |
| 24 | from http import HTTPStatus |
| 25 | from time import time |
| 26 | from random import randint |
| 27 | from osm_common import dbbase, fsbase, msgbase |
| 28 | from osm_nbi import authconn, validation |
| 29 | from osm_nbi.admin_topics import ProjectTopicAuth, RoleTopicAuth, UserTopicAuth, CommonVimWimSdn |
| 30 | from osm_nbi.engine import EngineException |
| 31 | from osm_nbi.authconn import AuthconnNotFoundException |
| 32 | |
| 33 | |
| 34 | test_pid = str(uuid4()) |
| 35 | test_name = "test-user" |
| 36 | |
| 37 | |
| 38 | def norm(str): |
| 39 | """Normalize string for checking""" |
| 40 | return ' '.join(str.strip().split()).lower() |
| 41 | |
| 42 | |
| 43 | class Test_ProjectTopicAuth(TestCase): |
| 44 | |
| 45 | @classmethod |
| 46 | def setUpClass(cls): |
| 47 | cls.test_name = "test-project-topic" |
| 48 | |
| 49 | def setUp(self): |
| 50 | self.db = Mock(dbbase.DbBase()) |
| 51 | self.fs = Mock(fsbase.FsBase()) |
| 52 | self.msg = Mock(msgbase.MsgBase()) |
| tierno | 9e87a7f | 2020-03-23 09:24:10 +0000 | [diff] [blame] | 53 | self.auth = Mock(authconn.Authconn(None, None, None)) |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 54 | self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth) |
| 55 | self.fake_session = {"username": self.test_name, "project_id": (test_pid,), "method": None, |
| 56 | "admin": True, "force": False, "public": False, "allow_show_user_project_role": True} |
| tierno | d774958 | 2020-05-28 10:41:10 +0000 | [diff] [blame] | 57 | self.topic.check_quota = Mock(return_value=None) # skip quota |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 58 | |
| 59 | def test_new_project(self): |
| 60 | with self.subTest(i=1): |
| 61 | rollback = [] |
| 62 | pid1 = str(uuid4()) |
| 63 | self.auth.get_project_list.return_value = [] |
| 64 | self.auth.create_project.return_value = pid1 |
| 65 | pid2, oid = self.topic.new(rollback, self.fake_session, {"name": self.test_name, "quotas": {}}) |
| 66 | self.assertEqual(len(rollback), 1, "Wrong rollback length") |
| 67 | self.assertEqual(pid2, pid1, "Wrong project identifier") |
| 68 | content = self.auth.create_project.call_args[0][0] |
| 69 | self.assertEqual(content["name"], self.test_name, "Wrong project name") |
| 70 | self.assertEqual(content["quotas"], {}, "Wrong quotas") |
| 71 | self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time") |
| 72 | self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time") |
| 73 | with self.subTest(i=2): |
| 74 | rollback = [] |
| 75 | with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e: |
| 76 | self.topic.new(rollback, self.fake_session, {"name": "other-project-name", "quotas": {"baditems": 10}}) |
| 77 | self.assertEqual(len(rollback), 0, "Wrong rollback length") |
| 78 | self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") |
| 79 | self.assertIn("format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'" |
| 80 | .format("baditems"), norm(str(e.exception)), "Wrong exception text") |
| 81 | |
| 82 | def test_edit_project(self): |
| 83 | now = time() |
| 84 | pid = str(uuid4()) |
| 85 | proj = {"_id": pid, "name": self.test_name, "_admin": {"created": now, "modified": now}} |
| 86 | with self.subTest(i=1): |
| 87 | self.auth.get_project_list.side_effect = [[proj], []] |
| 88 | new_name = "new-project-name" |
| 89 | quotas = {"vnfds": randint(0, 100), "nsds": randint(0, 100)} |
| 90 | self.topic.edit(self.fake_session, pid, {"name": new_name, "quotas": quotas}) |
| 91 | _id, content = self.auth.update_project.call_args[0] |
| 92 | self.assertEqual(_id, pid, "Wrong project identifier") |
| 93 | self.assertEqual(content["_id"], pid, "Wrong project identifier") |
| 94 | self.assertEqual(content["_admin"]["created"], now, "Wrong creation time") |
| 95 | self.assertGreater(content["_admin"]["modified"], now, "Wrong modification time") |
| 96 | self.assertEqual(content["name"], new_name, "Wrong project name") |
| 97 | self.assertEqual(content["quotas"], quotas, "Wrong quotas") |
| 98 | with self.subTest(i=2): |
| 99 | new_name = "other-project-name" |
| 100 | quotas = {"baditems": randint(0, 100)} |
| 101 | self.auth.get_project_list.side_effect = [[proj], []] |
| 102 | with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e: |
| 103 | self.topic.edit(self.fake_session, pid, {"name": new_name, "quotas": quotas}) |
| 104 | self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") |
| 105 | self.assertIn("format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'" |
| 106 | .format("baditems"), norm(str(e.exception)), "Wrong exception text") |
| 107 | |
| 108 | def test_conflict_on_new(self): |
| 109 | with self.subTest(i=1): |
| 110 | rollback = [] |
| 111 | pid = str(uuid4()) |
| 112 | with self.assertRaises(EngineException, msg="Accepted uuid as project name") as e: |
| 113 | self.topic.new(rollback, self.fake_session, {"name": pid}) |
| 114 | self.assertEqual(len(rollback), 0, "Wrong rollback length") |
| 115 | self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") |
| 116 | self.assertIn("project name '{}' cannot have an uuid format".format(pid), |
| 117 | norm(str(e.exception)), "Wrong exception text") |
| 118 | with self.subTest(i=2): |
| 119 | rollback = [] |
| 120 | self.auth.get_project_list.return_value = [{"_id": test_pid, "name": self.test_name}] |
| 121 | with self.assertRaises(EngineException, msg="Accepted existing project name") as e: |
| 122 | self.topic.new(rollback, self.fake_session, {"name": self.test_name}) |
| 123 | self.assertEqual(len(rollback), 0, "Wrong rollback length") |
| 124 | self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") |
| 125 | self.assertIn("project '{}' exists".format(self.test_name), |
| 126 | norm(str(e.exception)), "Wrong exception text") |
| 127 | |
| 128 | def test_conflict_on_edit(self): |
| 129 | with self.subTest(i=1): |
| 130 | self.auth.get_project_list.return_value = [{"_id": test_pid, "name": self.test_name}] |
| 131 | new_name = str(uuid4()) |
| 132 | with self.assertRaises(EngineException, msg="Accepted uuid as project name") as e: |
| 133 | self.topic.edit(self.fake_session, test_pid, {"name": new_name}) |
| 134 | self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") |
| 135 | self.assertIn("project name '{}' cannot have an uuid format".format(new_name), |
| 136 | norm(str(e.exception)), "Wrong exception text") |
| 137 | with self.subTest(i=2): |
| 138 | pid = str(uuid4()) |
| 139 | self.auth.get_project_list.return_value = [{"_id": pid, "name": "admin"}] |
| 140 | with self.assertRaises(EngineException, msg="Accepted renaming of project 'admin'") as e: |
| 141 | self.topic.edit(self.fake_session, pid, {"name": "new-name"}) |
| 142 | self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") |
| 143 | self.assertIn("you cannot rename project 'admin'", |
| 144 | norm(str(e.exception)), "Wrong exception text") |
| 145 | with self.subTest(i=3): |
| 146 | new_name = "new-project-name" |
| 147 | self.auth.get_project_list.side_effect = [[{"_id": test_pid, "name": self.test_name}], |
| 148 | [{"_id": str(uuid4()), "name": new_name}]] |
| 149 | with self.assertRaises(EngineException, msg="Accepted existing project name") as e: |
| 150 | self.topic.edit(self.fake_session, pid, {"name": new_name}) |
| 151 | self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") |
| 152 | self.assertIn("project '{}' is already used".format(new_name), |
| 153 | norm(str(e.exception)), "Wrong exception text") |
| 154 | |
| 155 | def test_delete_project(self): |
| 156 | with self.subTest(i=1): |
| 157 | pid = str(uuid4()) |
| 158 | self.auth.get_project.return_value = {"_id": pid, "name": "other-project-name"} |
| 159 | self.auth.delete_project.return_value = {"deleted": 1} |
| 160 | self.auth.get_user_list.return_value = [] |
| 161 | self.db.get_list.return_value = [] |
| 162 | rc = self.topic.delete(self.fake_session, pid) |
| 163 | self.assertEqual(rc, {"deleted": 1}, "Wrong project deletion return info") |
| 164 | self.assertEqual(self.auth.get_project.call_args[0][0], pid, "Wrong project identifier") |
| 165 | self.assertEqual(self.auth.delete_project.call_args[0][0], pid, "Wrong project identifier") |
| 166 | |
| 167 | def test_conflict_on_del(self): |
| 168 | with self.subTest(i=1): |
| 169 | self.auth.get_project.return_value = {"_id": test_pid, "name": self.test_name} |
| 170 | with self.assertRaises(EngineException, msg="Accepted deletion of own project") as e: |
| 171 | self.topic.delete(self.fake_session, self.test_name) |
| 172 | self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") |
| 173 | self.assertIn("you cannot delete your own project", norm(str(e.exception)), "Wrong exception text") |
| 174 | with self.subTest(i=2): |
| 175 | self.auth.get_project.return_value = {"_id": str(uuid4()), "name": "admin"} |
| 176 | with self.assertRaises(EngineException, msg="Accepted deletion of project 'admin'") as e: |
| 177 | self.topic.delete(self.fake_session, "admin") |
| 178 | self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") |
| 179 | self.assertIn("you cannot delete project 'admin'", norm(str(e.exception)), "Wrong exception text") |
| 180 | with self.subTest(i=3): |
| 181 | pid = str(uuid4()) |
| 182 | name = "other-project-name" |
| 183 | self.auth.get_project.return_value = {"_id": pid, "name": name} |
| 184 | self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name, |
| 185 | "project_role_mappings": [{"project": pid, "role": str(uuid4())}]}] |
| 186 | with self.assertRaises(EngineException, msg="Accepted deletion of used project") as e: |
| 187 | self.topic.delete(self.fake_session, pid) |
| 188 | self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") |
| 189 | self.assertIn("project '{}' ({}) is being used by user '{}'".format(name, pid, self.test_name), |
| 190 | norm(str(e.exception)), "Wrong exception text") |
| 191 | with self.subTest(i=4): |
| 192 | self.auth.get_user_list.return_value = [] |
| 193 | self.db.get_list.return_value = [{"_id": str(uuid4()), "id": self.test_name, |
| 194 | "_admin": {"projects_read": [pid], "projects_write": []}}] |
| 195 | with self.assertRaises(EngineException, msg="Accepted deletion of used project") as e: |
| 196 | self.topic.delete(self.fake_session, pid) |
| 197 | self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") |
| 198 | self.assertIn("project '{}' ({}) is being used by {} '{}'" |
| 199 | .format(name, pid, "vnf descriptor", self.test_name), |
| 200 | norm(str(e.exception)), "Wrong exception text") |
| 201 | |
| 202 | |
| 203 | class Test_RoleTopicAuth(TestCase): |
| 204 | |
| 205 | @classmethod |
| 206 | def setUpClass(cls): |
| 207 | cls.test_name = "test-role-topic" |
| 208 | cls.test_operations = ["tokens:get"] |
| 209 | |
| 210 | def setUp(self): |
| 211 | self.db = Mock(dbbase.DbBase()) |
| 212 | self.fs = Mock(fsbase.FsBase()) |
| 213 | self.msg = Mock(msgbase.MsgBase()) |
| tierno | 9e87a7f | 2020-03-23 09:24:10 +0000 | [diff] [blame] | 214 | self.auth = Mock(authconn.Authconn(None, None, None)) |
| 215 | self.auth.role_permissions = self.test_operations |
| 216 | self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth) |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 217 | self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None, |
| 218 | "admin": True, "force": False, "public": False, "allow_show_user_project_role": True} |
| tierno | d774958 | 2020-05-28 10:41:10 +0000 | [diff] [blame] | 219 | self.topic.check_quota = Mock(return_value=None) # skip quota |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 220 | |
| 221 | def test_new_role(self): |
| 222 | with self.subTest(i=1): |
| 223 | rollback = [] |
| 224 | rid1 = str(uuid4()) |
| 225 | perms_in = {"tokens": True} |
| 226 | perms_out = {"default": False, "admin": False, "tokens": True} |
| 227 | self.auth.get_role_list.return_value = [] |
| 228 | self.auth.create_role.return_value = rid1 |
| 229 | rid2, oid = self.topic.new(rollback, self.fake_session, {"name": self.test_name, "permissions": perms_in}) |
| 230 | self.assertEqual(len(rollback), 1, "Wrong rollback length") |
| 231 | self.assertEqual(rid2, rid1, "Wrong project identifier") |
| 232 | content = self.auth.create_role.call_args[0][0] |
| 233 | self.assertEqual(content["name"], self.test_name, "Wrong role name") |
| 234 | self.assertEqual(content["permissions"], perms_out, "Wrong permissions") |
| 235 | self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time") |
| 236 | self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time") |
| 237 | with self.subTest(i=2): |
| 238 | rollback = [] |
| 239 | with self.assertRaises(EngineException, msg="Accepted wrong permissions") as e: |
| 240 | self.topic.new(rollback, self.fake_session, |
| 241 | {"name": "other-role-name", "permissions": {"projects": True}}) |
| 242 | self.assertEqual(len(rollback), 0, "Wrong rollback length") |
| 243 | self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") |
| 244 | self.assertIn("invalid permission '{}'".format("projects"), |
| 245 | norm(str(e.exception)), "Wrong exception text") |
| 246 | |
| 247 | def test_edit_role(self): |
| 248 | now = time() |
| 249 | rid = str(uuid4()) |
| 250 | role = {"_id": rid, "name": self.test_name, "permissions": {"tokens": True}, |
| 251 | "_admin": {"created": now, "modified": now}} |
| 252 | with self.subTest(i=1): |
| 253 | self.auth.get_role_list.side_effect = [[role], []] |
| 254 | self.auth.get_role.return_value = role |
| 255 | new_name = "new-role-name" |
| 256 | perms_in = {"tokens": False, "tokens:get": True} |
| 257 | perms_out = {"default": False, "admin": False, "tokens": False, "tokens:get": True} |
| 258 | self.topic.edit(self.fake_session, rid, {"name": new_name, "permissions": perms_in}) |
| 259 | content = self.auth.update_role.call_args[0][0] |
| 260 | self.assertEqual(content["_id"], rid, "Wrong role identifier") |
| 261 | self.assertEqual(content["_admin"]["created"], now, "Wrong creation time") |
| 262 | self.assertGreater(content["_admin"]["modified"], now, "Wrong modification time") |
| 263 | self.assertEqual(content["name"], new_name, "Wrong role name") |
| 264 | self.assertEqual(content["permissions"], perms_out, "Wrong permissions") |
| 265 | with self.subTest(i=2): |
| 266 | new_name = "other-role-name" |
| 267 | perms_in = {"tokens": False, "tokens:post": True} |
| 268 | self.auth.get_role_list.side_effect = [[role], []] |
| 269 | with self.assertRaises(EngineException, msg="Accepted wrong permissions") as e: |
| 270 | self.topic.edit(self.fake_session, rid, {"name": new_name, "permissions": perms_in}) |
| 271 | self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") |
| 272 | self.assertIn("invalid permission '{}'".format("tokens:post"), |
| 273 | norm(str(e.exception)), "Wrong exception text") |
| 274 | |
| 275 | def test_delete_role(self): |
| 276 | with self.subTest(i=1): |
| 277 | rid = str(uuid4()) |
| 278 | role = {"_id": rid, "name": "other-role-name"} |
| 279 | self.auth.get_role_list.return_value = [role] |
| 280 | self.auth.get_role.return_value = role |
| 281 | self.auth.delete_role.return_value = {"deleted": 1} |
| 282 | self.auth.get_user_list.return_value = [] |
| 283 | rc = self.topic.delete(self.fake_session, rid) |
| 284 | self.assertEqual(rc, {"deleted": 1}, "Wrong role deletion return info") |
| 285 | self.assertEqual(self.auth.get_role_list.call_args[0][0]["_id"], rid, "Wrong role identifier") |
| 286 | self.assertEqual(self.auth.get_role.call_args[0][0], rid, "Wrong role identifier") |
| 287 | self.assertEqual(self.auth.delete_role.call_args[0][0], rid, "Wrong role identifier") |
| 288 | |
| 289 | def test_conflict_on_new(self): |
| 290 | with self.subTest(i=1): |
| 291 | rollback = [] |
| 292 | rid = str(uuid4()) |
| 293 | with self.assertRaises(EngineException, msg="Accepted uuid as role name") as e: |
| 294 | self.topic.new(rollback, self.fake_session, {"name": rid}) |
| 295 | self.assertEqual(len(rollback), 0, "Wrong rollback length") |
| 296 | self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") |
| 297 | self.assertIn("role name '{}' cannot have an uuid format".format(rid), |
| 298 | norm(str(e.exception)), "Wrong exception text") |
| 299 | with self.subTest(i=2): |
| 300 | rollback = [] |
| 301 | self.auth.get_role_list.return_value = [{"_id": str(uuid4()), "name": self.test_name}] |
| 302 | with self.assertRaises(EngineException, msg="Accepted existing role name") as e: |
| 303 | self.topic.new(rollback, self.fake_session, {"name": self.test_name}) |
| 304 | self.assertEqual(len(rollback), 0, "Wrong rollback length") |
| 305 | self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") |
| 306 | self.assertIn("role name '{}' exists".format(self.test_name), |
| 307 | norm(str(e.exception)), "Wrong exception text") |
| 308 | |
| 309 | def test_conflict_on_edit(self): |
| 310 | rid = str(uuid4()) |
| 311 | with self.subTest(i=1): |
| 312 | self.auth.get_role_list.return_value = [{"_id": rid, "name": self.test_name, "permissions": {}}] |
| 313 | new_name = str(uuid4()) |
| 314 | with self.assertRaises(EngineException, msg="Accepted uuid as role name") as e: |
| 315 | self.topic.edit(self.fake_session, rid, {"name": new_name}) |
| 316 | self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") |
| 317 | self.assertIn("role name '{}' cannot have an uuid format".format(new_name), |
| 318 | norm(str(e.exception)), "Wrong exception text") |
| 319 | for i, role_name in enumerate(["system_admin", "project_admin"], start=2): |
| 320 | with self.subTest(i=i): |
| 321 | rid = str(uuid4()) |
| 322 | self.auth.get_role.return_value = {"_id": rid, "name": role_name, "permissions": {}} |
| 323 | with self.assertRaises(EngineException, msg="Accepted renaming of role '{}'".format(role_name)) as e: |
| 324 | self.topic.edit(self.fake_session, rid, {"name": "new-name"}) |
| 325 | self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code") |
| 326 | self.assertIn("you cannot rename role '{}'".format(role_name), |
| 327 | norm(str(e.exception)), "Wrong exception text") |
| 328 | with self.subTest(i=i+1): |
| 329 | new_name = "new-role-name" |
| 330 | self.auth.get_role_list.side_effect = [[{"_id": rid, "name": self.test_name, "permissions": {}}], |
| 331 | [{"_id": str(uuid4()), "name": new_name, "permissions": {}}]] |
| 332 | self.auth.get_role.return_value = {"_id": rid, "name": self.test_name, "permissions": {}} |
| 333 | with self.assertRaises(EngineException, msg="Accepted existing role name") as e: |
| 334 | self.topic.edit(self.fake_session, rid, {"name": new_name}) |
| 335 | self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") |
| 336 | self.assertIn("role name '{}' exists".format(new_name), |
| 337 | norm(str(e.exception)), "Wrong exception text") |
| 338 | |
| 339 | def test_conflict_on_del(self): |
| 340 | for i, role_name in enumerate(["system_admin", "project_admin"], start=1): |
| 341 | with self.subTest(i=i): |
| 342 | rid = str(uuid4()) |
| 343 | role = {"_id": rid, "name": role_name} |
| 344 | self.auth.get_role_list.return_value = [role] |
| 345 | self.auth.get_role.return_value = role |
| 346 | with self.assertRaises(EngineException, msg="Accepted deletion of role '{}'".format(role_name)) as e: |
| 347 | self.topic.delete(self.fake_session, rid) |
| 348 | self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code") |
| 349 | self.assertIn("you cannot delete role '{}'".format(role_name), |
| 350 | norm(str(e.exception)), "Wrong exception text") |
| 351 | with self.subTest(i=i+1): |
| 352 | rid = str(uuid4()) |
| 353 | name = "other-role-name" |
| 354 | role = {"_id": rid, "name": name} |
| 355 | self.auth.get_role_list.return_value = [role] |
| 356 | self.auth.get_role.return_value = role |
| 357 | self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name, |
| 358 | "project_role_mappings": [{"project": str(uuid4()), "role": rid}]}] |
| 359 | with self.assertRaises(EngineException, msg="Accepted deletion of used role") as e: |
| 360 | self.topic.delete(self.fake_session, rid) |
| 361 | self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") |
| 362 | self.assertIn("role '{}' ({}) is being used by user '{}'".format(name, rid, self.test_name), |
| 363 | norm(str(e.exception)), "Wrong exception text") |
| 364 | |
| 365 | |
| 366 | class Test_UserTopicAuth(TestCase): |
| 367 | |
| 368 | @classmethod |
| 369 | def setUpClass(cls): |
| 370 | cls.test_name = "test-user-topic" |
| 371 | |
| 372 | def setUp(self): |
| 373 | self.db = Mock(dbbase.DbBase()) |
| 374 | self.fs = Mock(fsbase.FsBase()) |
| 375 | self.msg = Mock(msgbase.MsgBase()) |
| tierno | 9e87a7f | 2020-03-23 09:24:10 +0000 | [diff] [blame] | 376 | self.auth = Mock(authconn.Authconn(None, None, None)) |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 377 | self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth) |
| 378 | self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None, |
| 379 | "admin": True, "force": False, "public": False, "allow_show_user_project_role": True} |
| tierno | d774958 | 2020-05-28 10:41:10 +0000 | [diff] [blame] | 380 | self.topic.check_quota = Mock(return_value=None) # skip quota |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 381 | |
| 382 | def test_new_user(self): |
| 383 | uid1 = str(uuid4()) |
| 384 | pid = str(uuid4()) |
| 385 | self.auth.get_user_list.return_value = [] |
| 386 | self.auth.get_project.return_value = {"_id": pid, "name": "some_project"} |
| 387 | self.auth.create_user.return_value = {"_id": uid1, "username": self.test_name} |
| 388 | with self.subTest(i=1): |
| 389 | rollback = [] |
| 390 | rid = str(uuid4()) |
| 391 | self.auth.get_role.return_value = {"_id": rid, "name": "some_role"} |
| 392 | prms_in = [{"project": "some_project", "role": "some_role"}] |
| 393 | prms_out = [{"project": pid, "role": rid}] |
| 394 | uid2, oid = self.topic.new(rollback, self.fake_session, {"username": self.test_name, |
| 395 | "password": self.test_name, |
| 396 | "project_role_mappings": prms_in |
| 397 | }) |
| 398 | self.assertEqual(len(rollback), 1, "Wrong rollback length") |
| 399 | self.assertEqual(uid2, uid1, "Wrong project identifier") |
| 400 | content = self.auth.create_user.call_args[0][0] |
| 401 | self.assertEqual(content["username"], self.test_name, "Wrong project name") |
| 402 | self.assertEqual(content["password"], self.test_name, "Wrong password") |
| 403 | self.assertEqual(content["project_role_mappings"], prms_out, "Wrong project-role mappings") |
| 404 | self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time") |
| 405 | self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time") |
| 406 | with self.subTest(i=2): |
| 407 | rollback = [] |
| 408 | def_rid = str(uuid4()) |
| 409 | def_role = {"_id": def_rid, "name": "project_admin"} |
| 410 | self.auth.get_role.return_value = def_role |
| 411 | self.auth.get_role_list.return_value = [def_role] |
| 412 | prms_out = [{"project": pid, "role": def_rid}] |
| 413 | uid2, oid = self.topic.new(rollback, self.fake_session, {"username": self.test_name, |
| 414 | "password": self.test_name, |
| 415 | "projects": ["some_project"] |
| 416 | }) |
| 417 | self.assertEqual(len(rollback), 1, "Wrong rollback length") |
| 418 | self.assertEqual(uid2, uid1, "Wrong project identifier") |
| 419 | content = self.auth.create_user.call_args[0][0] |
| 420 | self.assertEqual(content["username"], self.test_name, "Wrong project name") |
| 421 | self.assertEqual(content["password"], self.test_name, "Wrong password") |
| 422 | self.assertEqual(content["project_role_mappings"], prms_out, "Wrong project-role mappings") |
| 423 | self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time") |
| 424 | self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time") |
| 425 | with self.subTest(i=3): |
| 426 | rollback = [] |
| 427 | with self.assertRaises(EngineException, msg="Accepted wrong project-role mappings") as e: |
| 428 | self.topic.new(rollback, self.fake_session, {"username": "other-project-name", |
| 429 | "password": "other-password", |
| 430 | "project_role_mappings": [{}] |
| 431 | }) |
| 432 | self.assertEqual(len(rollback), 0, "Wrong rollback length") |
| 433 | self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") |
| 434 | self.assertIn("format error at '{}' '{}'" |
| 435 | .format("project_role_mappings:{}", "'{}' is a required property").format(0, "project"), |
| 436 | norm(str(e.exception)), "Wrong exception text") |
| 437 | with self.subTest(i=4): |
| 438 | rollback = [] |
| 439 | with self.assertRaises(EngineException, msg="Accepted wrong projects") as e: |
| 440 | self.topic.new(rollback, self.fake_session, {"username": "other-project-name", |
| 441 | "password": "other-password", |
| 442 | "projects": [] |
| 443 | }) |
| 444 | self.assertEqual(len(rollback), 0, "Wrong rollback length") |
| 445 | self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") |
| 446 | self.assertIn("format error at '{}' '{}'" .format("projects", "{} is too short").format([]), |
| 447 | norm(str(e.exception)), "Wrong exception text") |
| 448 | |
| 449 | def test_edit_user(self): |
| 450 | now = time() |
| 451 | uid = str(uuid4()) |
| 452 | pid1 = str(uuid4()) |
| 453 | rid1 = str(uuid4()) |
| 454 | prms = [{"project": pid1, "project_name": "project-1", "role": rid1, "role_name": "role-1"}] |
| 455 | user = {"_id": uid, "username": self.test_name, "project_role_mappings": prms, |
| 456 | "_admin": {"created": now, "modified": now}} |
| 457 | with self.subTest(i=1): |
| 458 | self.auth.get_user_list.side_effect = [[user], []] |
| 459 | self.auth.get_user.return_value = user |
| 460 | pid2 = str(uuid4()) |
| 461 | rid2 = str(uuid4()) |
| 462 | self.auth.get_project.side_effect = [{"_id": pid2, "name": "project-2"}, |
| 463 | {"_id": pid1, "name": "project-1"}] |
| 464 | self.auth.get_role.side_effect = [{"_id": rid2, "name": "role-2"}, |
| 465 | {"_id": rid1, "name": "role-1"}] |
| 466 | new_name = "new-user-name" |
| 467 | new_pasw = "new-password" |
| 468 | add_prms = [{"project": pid2, "role": rid2}] |
| 469 | rem_prms = [{"project": pid1, "role": rid1}] |
| 470 | self.topic.edit(self.fake_session, uid, {"username": new_name, "password": new_pasw, |
| 471 | "add_project_role_mappings": add_prms, |
| 472 | "remove_project_role_mappings": rem_prms |
| 473 | }) |
| 474 | content = self.auth.update_user.call_args[0][0] |
| 475 | self.assertEqual(content["_id"], uid, "Wrong user identifier") |
| 476 | self.assertEqual(content["username"], new_name, "Wrong user name") |
| 477 | self.assertEqual(content["password"], new_pasw, "Wrong user password") |
| 478 | self.assertEqual(content["add_project_role_mappings"], add_prms, "Wrong project-role mappings to add") |
| 479 | self.assertEqual(content["remove_project_role_mappings"], prms, "Wrong project-role mappings to remove") |
| 480 | with self.subTest(i=2): |
| 481 | new_name = "other-user-name" |
| 482 | new_prms = [{}] |
| 483 | self.auth.get_role_list.side_effect = [[user], []] |
| Frank Bryden | deba68e | 2020-07-27 13:55:11 +0000 | [diff] [blame] | 484 | self.auth.get_user_list.side_effect = [[user]] |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 485 | with self.assertRaises(EngineException, msg="Accepted wrong project-role mappings") as e: |
| 486 | self.topic.edit(self.fake_session, uid, {"username": new_name, "project_role_mappings": new_prms}) |
| 487 | self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") |
| 488 | self.assertIn("format error at '{}' '{}'" |
| 489 | .format("project_role_mappings:{}", "'{}' is a required property").format(0, "project"), |
| 490 | norm(str(e.exception)), "Wrong exception text") |
| 491 | |
| 492 | def test_delete_user(self): |
| 493 | with self.subTest(i=1): |
| 494 | uid = str(uuid4()) |
| 495 | self.fake_session["username"] = self.test_name |
| 496 | user = user = {"_id": uid, "username": "other-user-name", "project_role_mappings": []} |
| 497 | self.auth.get_user.return_value = user |
| 498 | self.auth.delete_user.return_value = {"deleted": 1} |
| 499 | rc = self.topic.delete(self.fake_session, uid) |
| 500 | self.assertEqual(rc, {"deleted": 1}, "Wrong user deletion return info") |
| 501 | self.assertEqual(self.auth.get_user.call_args[0][0], uid, "Wrong user identifier") |
| 502 | self.assertEqual(self.auth.delete_user.call_args[0][0], uid, "Wrong user identifier") |
| 503 | |
| 504 | def test_conflict_on_new(self): |
| 505 | with self.subTest(i=1): |
| 506 | rollback = [] |
| 507 | uid = str(uuid4()) |
| 508 | with self.assertRaises(EngineException, msg="Accepted uuid as username") as e: |
| 509 | self.topic.new(rollback, self.fake_session, {"username": uid, "password": self.test_name, |
| 510 | "projects": [test_pid]}) |
| 511 | self.assertEqual(len(rollback), 0, "Wrong rollback length") |
| 512 | self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") |
| 513 | self.assertIn("username '{}' cannot have a uuid format".format(uid), |
| 514 | norm(str(e.exception)), "Wrong exception text") |
| 515 | with self.subTest(i=2): |
| 516 | rollback = [] |
| 517 | self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name}] |
| 518 | with self.assertRaises(EngineException, msg="Accepted existing username") as e: |
| 519 | self.topic.new(rollback, self.fake_session, {"username": self.test_name, "password": self.test_name, |
| 520 | "projects": [test_pid]}) |
| 521 | self.assertEqual(len(rollback), 0, "Wrong rollback length") |
| 522 | self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") |
| 523 | self.assertIn("username '{}' is already used".format(self.test_name), |
| 524 | norm(str(e.exception)), "Wrong exception text") |
| 525 | with self.subTest(i=3): |
| 526 | rollback = [] |
| 527 | self.auth.get_user_list.return_value = [] |
| 528 | self.auth.get_role_list.side_effect = [[], []] |
| 529 | with self.assertRaises(AuthconnNotFoundException, msg="Accepted user without default role") as e: |
| 530 | self.topic.new(rollback, self.fake_session, {"username": self.test_name, "password": self.test_name, |
| 531 | "projects": [str(uuid4())]}) |
| 532 | self.assertEqual(len(rollback), 0, "Wrong rollback length") |
| 533 | self.assertEqual(e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code") |
| 534 | self.assertIn("can't find default role for user '{}'".format(self.test_name), |
| 535 | norm(str(e.exception)), "Wrong exception text") |
| 536 | |
| 537 | def test_conflict_on_edit(self): |
| 538 | uid = str(uuid4()) |
| 539 | with self.subTest(i=1): |
| 540 | self.auth.get_user_list.return_value = [{"_id": uid, "username": self.test_name}] |
| 541 | new_name = str(uuid4()) |
| 542 | with self.assertRaises(EngineException, msg="Accepted uuid as username") as e: |
| 543 | self.topic.edit(self.fake_session, uid, {"username": new_name}) |
| 544 | self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") |
| 545 | self.assertIn("username '{}' cannot have an uuid format".format(new_name), |
| 546 | norm(str(e.exception)), "Wrong exception text") |
| 547 | with self.subTest(i=2): |
| 548 | self.auth.get_user_list.return_value = [{"_id": uid, "username": self.test_name}] |
| 549 | self.auth.get_role_list.side_effect = [[], []] |
| 550 | with self.assertRaises(AuthconnNotFoundException, msg="Accepted user without default role") as e: |
| 551 | self.topic.edit(self.fake_session, uid, {"projects": [str(uuid4())]}) |
| 552 | self.assertEqual(e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code") |
| 553 | self.assertIn("can't find a default role for user '{}'".format(self.test_name), |
| 554 | norm(str(e.exception)), "Wrong exception text") |
| 555 | with self.subTest(i=3): |
| 556 | admin_uid = str(uuid4()) |
| 557 | self.auth.get_user_list.return_value = [{"_id": admin_uid, "username": "admin"}] |
| 558 | with self.assertRaises(EngineException, msg="Accepted removing system_admin role from admin user") as e: |
| 559 | self.topic.edit(self.fake_session, admin_uid, |
| 560 | {"remove_project_role_mappings": [{"project": "admin", "role": "system_admin"}]}) |
| 561 | self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code") |
| 562 | self.assertIn("you cannot remove system_admin role from admin user", |
| 563 | norm(str(e.exception)), "Wrong exception text") |
| 564 | with self.subTest(i=4): |
| 565 | new_name = "new-user-name" |
| 566 | self.auth.get_user_list.side_effect = [[{"_id": uid, "name": self.test_name}], |
| 567 | [{"_id": str(uuid4()), "name": new_name}]] |
| 568 | with self.assertRaises(EngineException, msg="Accepted existing username") as e: |
| 569 | self.topic.edit(self.fake_session, uid, {"username": new_name}) |
| 570 | self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") |
| 571 | self.assertIn("username '{}' is already used".format(new_name), |
| 572 | norm(str(e.exception)), "Wrong exception text") |
| 573 | |
| 574 | def test_conflict_on_del(self): |
| 575 | with self.subTest(i=1): |
| 576 | uid = str(uuid4()) |
| 577 | self.fake_session["username"] = self.test_name |
| 578 | user = user = {"_id": uid, "username": self.test_name, "project_role_mappings": []} |
| 579 | self.auth.get_user.return_value = user |
| 580 | with self.assertRaises(EngineException, msg="Accepted deletion of own user") as e: |
| 581 | self.topic.delete(self.fake_session, uid) |
| 582 | self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") |
| 583 | self.assertIn("you cannot delete your own login user", norm(str(e.exception)), "Wrong exception text") |
| 584 | |
| 585 | |
| 586 | class Test_CommonVimWimSdn(TestCase): |
| 587 | |
| 588 | @classmethod |
| 589 | def setUpClass(cls): |
| 590 | cls.test_name = "test-cim-topic" # CIM = Common Infrastructure Manager |
| 591 | |
| 592 | def setUp(self): |
| 593 | self.db = Mock(dbbase.DbBase()) |
| 594 | self.fs = Mock(fsbase.FsBase()) |
| 595 | self.msg = Mock(msgbase.MsgBase()) |
| tierno | 9e87a7f | 2020-03-23 09:24:10 +0000 | [diff] [blame] | 596 | self.auth = Mock(authconn.Authconn(None, None, None)) |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 597 | self.topic = CommonVimWimSdn(self.db, self.fs, self.msg, self.auth) |
| 598 | # Use WIM schemas for testing because they are the simplest |
| tierno | f5f2e3f | 2020-03-23 14:42:10 +0000 | [diff] [blame] | 599 | self.topic._send_msg = Mock() |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 600 | self.topic.topic = "wims" |
| 601 | self.topic.schema_new = validation.wim_account_new_schema |
| 602 | self.topic.schema_edit = validation.wim_account_edit_schema |
| 603 | self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None, |
| 604 | "admin": True, "force": False, "public": False, "allow_show_user_project_role": True} |
| tierno | d774958 | 2020-05-28 10:41:10 +0000 | [diff] [blame] | 605 | self.topic.check_quota = Mock(return_value=None) # skip quota |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 606 | |
| 607 | def test_new_cvws(self): |
| 608 | test_url = "http://0.0.0.0:0" |
| 609 | with self.subTest(i=1): |
| 610 | rollback = [] |
| 611 | test_type = "fake" |
| 612 | self.db.get_one.return_value = None |
| 613 | self.db.create.side_effect = lambda self, content: content["_id"] |
| 614 | cid, oid = self.topic.new(rollback, self.fake_session, |
| 615 | {"name": self.test_name, "wim_url": test_url, "wim_type": test_type}) |
| 616 | self.assertEqual(len(rollback), 1, "Wrong rollback length") |
| 617 | args = self.db.create.call_args[0] |
| 618 | content = args[1] |
| 619 | self.assertEqual(args[0], self.topic.topic, "Wrong topic") |
| 620 | self.assertEqual(content["_id"], cid, "Wrong CIM identifier") |
| 621 | self.assertEqual(content["name"], self.test_name, "Wrong CIM name") |
| 622 | self.assertEqual(content["wim_url"], test_url, "Wrong URL") |
| 623 | self.assertEqual(content["wim_type"], test_type, "Wrong CIM type") |
| 624 | self.assertEqual(content["schema_version"], "1.11", "Wrong schema version") |
| 625 | self.assertEqual(content["op_id"], oid, "Wrong operation identifier") |
| 626 | self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time") |
| 627 | self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time") |
| 628 | self.assertEqual(content["_admin"]["operationalState"], "PROCESSING", "Wrong operational state") |
| 629 | self.assertEqual(content["_admin"]["projects_read"], [test_pid], "Wrong read-only projects") |
| 630 | self.assertEqual(content["_admin"]["projects_write"], [test_pid], "Wrong read/write projects") |
| 631 | self.assertIsNone(content["_admin"]["current_operation"], "Wrong current operation") |
| 632 | self.assertEqual(len(content["_admin"]["operations"]), 1, "Wrong number of operations") |
| 633 | operation = content["_admin"]["operations"][0] |
| 634 | self.assertEqual(operation["lcmOperationType"], "create", "Wrong operation type") |
| 635 | self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state") |
| 636 | self.assertGreater(operation["startTime"], content["_admin"]["created"], "Wrong operation start time") |
| 637 | self.assertGreater(operation["statusEnteredTime"], content["_admin"]["created"], |
| 638 | "Wrong operation status enter time") |
| 639 | self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status info") |
| 640 | self.assertIsNone(operation["operationParams"], "Wrong operation parameters") |
| tierno | b3d0a0e | 2019-11-13 15:57:51 +0000 | [diff] [blame] | 641 | # This test is disabled. From Feature 8030 we admit all WIM/SDN types |
| 642 | # with self.subTest(i=2): |
| 643 | # rollback = [] |
| 644 | # test_type = "bad_type" |
| 645 | # with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e: |
| 646 | # self.topic.new(rollback, self.fake_session, |
| 647 | # {"name": self.test_name, "wim_url": test_url, "wim_type": test_type}) |
| 648 | # self.assertEqual(len(rollback), 0, "Wrong rollback length") |
| 649 | # self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") |
| 650 | # self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type,""), |
| 651 | # norm(str(e.exception)), "Wrong exception text") |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 652 | |
| 653 | def test_conflict_on_new(self): |
| 654 | with self.subTest(i=1): |
| 655 | rollback = [] |
| 656 | test_url = "http://0.0.0.0:0" |
| 657 | test_type = "fake" |
| 658 | self.db.get_one.return_value = {"_id": str(uuid4()), "name": self.test_name} |
| 659 | with self.assertRaises(EngineException, msg="Accepted existing CIM name") as e: |
| 660 | self.topic.new(rollback, self.fake_session, |
| 661 | {"name": self.test_name, "wim_url": test_url, "wim_type": test_type}) |
| 662 | self.assertEqual(len(rollback), 0, "Wrong rollback length") |
| 663 | self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") |
| 664 | self.assertIn("name '{}' already exists for {}".format(self.test_name, self.topic.topic), |
| 665 | norm(str(e.exception)), "Wrong exception text") |
| 666 | |
| 667 | def test_edit_cvws(self): |
| 668 | now = time() |
| 669 | cid = str(uuid4()) |
| 670 | test_url = "http://0.0.0.0:0" |
| 671 | test_type = "fake" |
| 672 | cvws = {"_id": cid, "name": self.test_name, "wim_url": test_url, "wim_type": test_type, |
| 673 | "_admin": {"created": now, "modified": now, "operations": [{"lcmOperationType": "create"}]}} |
| 674 | with self.subTest(i=1): |
| 675 | new_name = "new-cim-name" |
| 676 | new_url = "https://1.1.1.1:1" |
| 677 | new_type = "onos" |
| 678 | self.db.get_one.side_effect = [cvws, None] |
| 679 | self.db.replace.return_value = {"updated": 1} |
| 680 | # self.db.encrypt.side_effect = [b64str(), b64str()] |
| 681 | self.topic.edit(self.fake_session, cid, {"name": new_name, "wim_url": new_url, "wim_type": new_type}) |
| 682 | args = self.db.replace.call_args[0] |
| 683 | content = args[2] |
| 684 | self.assertEqual(args[0], self.topic.topic, "Wrong topic") |
| 685 | self.assertEqual(args[1], cid, "Wrong CIM identifier") |
| 686 | self.assertEqual(content["_id"], cid, "Wrong CIM identifier") |
| 687 | self.assertEqual(content["name"], new_name, "Wrong CIM name") |
| 688 | self.assertEqual(content["wim_type"], new_type, "Wrong CIM type") |
| 689 | self.assertEqual(content["wim_url"], new_url, "Wrong URL") |
| 690 | self.assertEqual(content["_admin"]["created"], now, "Wrong creation time") |
| 691 | self.assertGreater(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time") |
| 692 | self.assertEqual(len(content["_admin"]["operations"]), 2, "Wrong number of operations") |
| 693 | operation = content["_admin"]["operations"][1] |
| 694 | self.assertEqual(operation["lcmOperationType"], "edit", "Wrong operation type") |
| 695 | self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state") |
| 696 | self.assertGreater(operation["startTime"], content["_admin"]["modified"], "Wrong operation start time") |
| 697 | self.assertGreater(operation["statusEnteredTime"], content["_admin"]["modified"], |
| 698 | "Wrong operation status enter time") |
| 699 | self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status info") |
| 700 | self.assertIsNone(operation["operationParams"], "Wrong operation parameters") |
| 701 | with self.subTest(i=2): |
| Frank Bryden | deba68e | 2020-07-27 13:55:11 +0000 | [diff] [blame] | 702 | self.db.get_one.side_effect = [cvws] |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 703 | with self.assertRaises(EngineException, msg="Accepted wrong property") as e: |
| 704 | self.topic.edit(self.fake_session, str(uuid4()), {"name": "new-name", "extra_prop": "anything"}) |
| 705 | self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") |
| 706 | self.assertIn("format error '{}'".format("additional properties are not allowed ('{}' was unexpected)"). |
| 707 | format("extra_prop"), |
| 708 | norm(str(e.exception)), "Wrong exception text") |
| 709 | |
| 710 | def test_conflict_on_edit(self): |
| 711 | with self.subTest(i=1): |
| 712 | cid = str(uuid4()) |
| 713 | new_name = "new-cim-name" |
| 714 | self.db.get_one.side_effect = [{"_id": cid, "name": self.test_name}, |
| 715 | {"_id": str(uuid4()), "name": new_name}] |
| 716 | with self.assertRaises(EngineException, msg="Accepted existing CIM name") as e: |
| 717 | self.topic.edit(self.fake_session, cid, {"name": new_name}) |
| 718 | self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") |
| 719 | self.assertIn("name '{}' already exists for {}".format(new_name, self.topic.topic), |
| 720 | norm(str(e.exception)), "Wrong exception text") |
| 721 | |
| 722 | def test_delete_cvws(self): |
| 723 | cid = str(uuid4()) |
| 724 | ro_pid = str(uuid4()) |
| 725 | rw_pid = str(uuid4()) |
| 726 | cvws = {"_id": cid, "name": self.test_name} |
| delacruzramo | 35c998b | 2019-11-21 11:09:16 +0100 | [diff] [blame] | 727 | self.db.get_list.return_value = [] |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 728 | with self.subTest(i=1): |
| 729 | cvws["_admin"] = {"projects_read": [test_pid, ro_pid, rw_pid], "projects_write": [test_pid, rw_pid]} |
| 730 | self.db.get_one.return_value = cvws |
| 731 | oid = self.topic.delete(self.fake_session, cid) |
| 732 | self.assertIsNone(oid, "Wrong operation identifier") |
| 733 | self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic") |
| 734 | self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier") |
| 735 | self.assertEqual(self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic") |
| 736 | self.assertEqual(self.db.set_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier") |
| tierno | f5f2e3f | 2020-03-23 14:42:10 +0000 | [diff] [blame] | 737 | self.assertEqual(self.db.set_one.call_args[1]["update_dict"], None, |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 738 | "Wrong read-only projects update") |
| tierno | 20e74d2 | 2020-06-22 12:17:22 +0000 | [diff] [blame] | 739 | self.assertEqual(self.db.set_one.call_args[1]["pull_list"], |
| 740 | {"_admin.projects_read": (test_pid,), "_admin.projects_write": (test_pid,)}, |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 741 | "Wrong read/write projects update") |
| tierno | f5f2e3f | 2020-03-23 14:42:10 +0000 | [diff] [blame] | 742 | self.topic._send_msg.assert_not_called() |
| delacruzramo | 35c998b | 2019-11-21 11:09:16 +0100 | [diff] [blame] | 743 | with self.subTest(i=2): |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 744 | now = time() |
| 745 | cvws["_admin"] = {"projects_read": [test_pid], "projects_write": [test_pid], "operations": []} |
| 746 | self.db.get_one.return_value = cvws |
| 747 | oid = self.topic.delete(self.fake_session, cid) |
| 748 | self.assertEqual(oid, cid+":0", "Wrong operation identifier") |
| 749 | self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic") |
| 750 | self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier") |
| 751 | self.assertEqual(self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic") |
| 752 | self.assertEqual(self.db.set_one.call_args[0][1]["_id"], cid, "Wrong user identifier") |
| tierno | f5f2e3f | 2020-03-23 14:42:10 +0000 | [diff] [blame] | 753 | self.assertEqual(self.db.set_one.call_args[1]["update_dict"], {"_admin.to_delete": True}, |
| 754 | "Wrong _admin.to_delete update") |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 755 | operation = self.db.set_one.call_args[1]["push"]["_admin.operations"] |
| 756 | self.assertEqual(operation["lcmOperationType"], "delete", "Wrong operation type") |
| 757 | self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state") |
| 758 | self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status") |
| 759 | self.assertIsNone(operation["operationParams"], "Wrong operation parameters") |
| 760 | self.assertGreater(operation["startTime"], now, "Wrong operation start time") |
| 761 | self.assertGreater(operation["statusEnteredTime"], now, "Wrong operation status enter time") |
| tierno | f5f2e3f | 2020-03-23 14:42:10 +0000 | [diff] [blame] | 762 | self.topic._send_msg.assert_called_once_with("delete", {"_id": cid, "op_id": cid + ":0"}, not_send_msg=None) |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 763 | with self.subTest(i=3): |
| 764 | cvws["_admin"] = {"projects_read": [], "projects_write": [], "operations": []} |
| 765 | self.db.get_one.return_value = cvws |
| tierno | f5f2e3f | 2020-03-23 14:42:10 +0000 | [diff] [blame] | 766 | self.topic._send_msg.reset_mock() |
| 767 | self.db.get_one.reset_mock() |
| 768 | self.db.del_one.reset_mock() |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 769 | self.fake_session["force"] = True # to force deletion |
| tierno | f5f2e3f | 2020-03-23 14:42:10 +0000 | [diff] [blame] | 770 | self.fake_session["admin"] = True # to force deletion |
| 771 | self.fake_session["project_id"] = [] # to force deletion |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 772 | oid = self.topic.delete(self.fake_session, cid) |
| 773 | self.assertIsNone(oid, "Wrong operation identifier") |
| 774 | self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic") |
| 775 | self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier") |
| 776 | self.assertEqual(self.db.del_one.call_args[0][0], self.topic.topic, "Wrong topic") |
| 777 | self.assertEqual(self.db.del_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier") |
| tierno | f5f2e3f | 2020-03-23 14:42:10 +0000 | [diff] [blame] | 778 | self.topic._send_msg.assert_called_once_with("deleted", {"_id": cid, "op_id": None}, not_send_msg=None) |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 779 | |
| 780 | |
| 781 | if __name__ == '__main__': |
| 782 | unittest.main() |