| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 1 | #! /usr/bin/python3 |
| 2 | # -*- coding: utf-8 -*- |
| 3 | |
| 4 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | # you may not use this file except in compliance with the License. |
| 6 | # You may obtain a copy of the License at |
| 7 | # |
| 8 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | # |
| 10 | # Unless required by applicable law or agreed to in writing, software |
| 11 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| 13 | # implied. |
| 14 | # See the License for the specific language governing permissions and |
| 15 | # limitations under the License. |
| 16 | |
| 17 | __author__ = "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com" |
| 18 | __date__ = "$2019-10-019" |
| 19 | |
| 20 | import unittest |
| 21 | from unittest import TestCase |
| David Garcia | ecb4132 | 2021-03-31 19:10:46 +0200 | [diff] [blame^] | 22 | from unittest.mock import Mock, patch, call |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 23 | from uuid import uuid4 |
| 24 | from http import HTTPStatus |
| 25 | from time import time |
| 26 | from random import randint |
| 27 | from osm_common import dbbase, fsbase, msgbase |
| 28 | from osm_nbi import authconn, validation |
| David Garcia | ecb4132 | 2021-03-31 19:10:46 +0200 | [diff] [blame^] | 29 | from osm_nbi.admin_topics import ( |
| 30 | ProjectTopicAuth, |
| 31 | RoleTopicAuth, |
| 32 | UserTopicAuth, |
| 33 | CommonVimWimSdn, |
| 34 | VcaTopic, |
| 35 | ) |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 36 | from osm_nbi.engine import EngineException |
| 37 | from osm_nbi.authconn import AuthconnNotFoundException |
| 38 | |
| 39 | |
| 40 | test_pid = str(uuid4()) |
| 41 | test_name = "test-user" |
| 42 | |
| 43 | |
| 44 | def norm(str): |
| 45 | """Normalize string for checking""" |
| 46 | return ' '.join(str.strip().split()).lower() |
| 47 | |
| 48 | |
| David Garcia | ecb4132 | 2021-03-31 19:10:46 +0200 | [diff] [blame^] | 49 | class TestVcaTopic(TestCase): |
| 50 | def setUp(self): |
| 51 | self.db = Mock(dbbase.DbBase()) |
| 52 | self.fs = Mock(fsbase.FsBase()) |
| 53 | self.msg = Mock(msgbase.MsgBase()) |
| 54 | self.auth = Mock(authconn.Authconn(None, None, None)) |
| 55 | self.vca_topic = VcaTopic(self.db, self.fs, self.msg, self.auth) |
| 56 | |
| 57 | @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_new") |
| 58 | def test_format_on_new(self, mock_super_format_on_new): |
| 59 | content = { |
| 60 | "_id": "id", |
| 61 | "secret": "encrypted_secret", |
| 62 | "cacert": "encrypted_cacert", |
| 63 | } |
| 64 | self.db.encrypt.side_effect = ["secret", "cacert"] |
| 65 | mock_super_format_on_new.return_value = "1234" |
| 66 | |
| 67 | oid = self.vca_topic.format_on_new(content) |
| 68 | |
| 69 | self.assertEqual(oid, "1234") |
| 70 | self.assertEqual(content["secret"], "secret") |
| 71 | self.assertEqual(content["cacert"], "cacert") |
| 72 | self.db.encrypt.assert_has_calls( |
| 73 | [ |
| 74 | call("encrypted_secret", schema_version="1.11", salt="id"), |
| 75 | call("encrypted_cacert", schema_version="1.11", salt="id"), |
| 76 | ] |
| 77 | ) |
| 78 | mock_super_format_on_new.assert_called_with(content, None, False) |
| 79 | |
| 80 | @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_edit") |
| 81 | def test_format_on_edit(self, mock_super_format_on_edit): |
| 82 | edit_content = { |
| 83 | "_id": "id", |
| 84 | "secret": "encrypted_secret", |
| 85 | "cacert": "encrypted_cacert", |
| 86 | } |
| 87 | final_content = { |
| 88 | "_id": "id", |
| 89 | "schema_version": "1.11", |
| 90 | } |
| 91 | self.db.encrypt.side_effect = ["secret", "cacert"] |
| 92 | mock_super_format_on_edit.return_value = "1234" |
| 93 | |
| 94 | oid = self.vca_topic.format_on_edit(final_content, edit_content) |
| 95 | |
| 96 | self.assertEqual(oid, "1234") |
| 97 | self.assertEqual(final_content["secret"], "secret") |
| 98 | self.assertEqual(final_content["cacert"], "cacert") |
| 99 | self.db.encrypt.assert_has_calls( |
| 100 | [ |
| 101 | call("encrypted_secret", schema_version="1.11", salt="id"), |
| 102 | call("encrypted_cacert", schema_version="1.11", salt="id"), |
| 103 | ] |
| 104 | ) |
| 105 | mock_super_format_on_edit.assert_called() |
| 106 | |
| 107 | @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del") |
| 108 | def test_check_conflict_on_del(self, mock_check_conflict_on_del): |
| 109 | session = { |
| 110 | "project_id": "project-id", |
| 111 | "force": False, |
| 112 | } |
| 113 | _id = "vca-id" |
| 114 | db_content = {} |
| 115 | |
| 116 | self.db.get_list.return_value = None |
| 117 | |
| 118 | self.vca_topic.check_conflict_on_del(session, _id, db_content) |
| 119 | |
| 120 | self.db.get_list.assert_called_with( |
| 121 | "vim_accounts", |
| 122 | {"vca": _id, '_admin.projects_read.cont': 'project-id'}, |
| 123 | ) |
| 124 | mock_check_conflict_on_del.assert_called_with(session, _id, db_content) |
| 125 | |
| 126 | @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del") |
| 127 | def test_check_conflict_on_del_force(self, mock_check_conflict_on_del): |
| 128 | session = { |
| 129 | "project_id": "project-id", |
| 130 | "force": True, |
| 131 | } |
| 132 | _id = "vca-id" |
| 133 | db_content = {} |
| 134 | |
| 135 | self.vca_topic.check_conflict_on_del(session, _id, db_content) |
| 136 | |
| 137 | self.db.get_list.assert_not_called() |
| 138 | mock_check_conflict_on_del.assert_not_called() |
| 139 | |
| 140 | @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del") |
| 141 | def test_check_conflict_on_del_with_conflict(self, mock_check_conflict_on_del): |
| 142 | session = { |
| 143 | "project_id": "project-id", |
| 144 | "force": False, |
| 145 | } |
| 146 | _id = "vca-id" |
| 147 | db_content = {} |
| 148 | |
| 149 | self.db.get_list.return_value = {"_id": "vim", "vca": "vca-id"} |
| 150 | |
| 151 | with self.assertRaises(EngineException) as context: |
| 152 | self.vca_topic.check_conflict_on_del(session, _id, db_content) |
| 153 | self.assertEqual( |
| 154 | context.exception, |
| 155 | EngineException( |
| 156 | "There is at least one VIM account using this vca", |
| 157 | http_code=HTTPStatus.CONFLICT |
| 158 | ) |
| 159 | ) |
| 160 | |
| 161 | self.db.get_list.assert_called_with( |
| 162 | "vim_accounts", |
| 163 | {"vca": _id, '_admin.projects_read.cont': 'project-id'}, |
| 164 | ) |
| 165 | mock_check_conflict_on_del.assert_not_called() |
| 166 | |
| 167 | |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 168 | class Test_ProjectTopicAuth(TestCase): |
| 169 | |
| 170 | @classmethod |
| 171 | def setUpClass(cls): |
| 172 | cls.test_name = "test-project-topic" |
| 173 | |
| 174 | def setUp(self): |
| 175 | self.db = Mock(dbbase.DbBase()) |
| 176 | self.fs = Mock(fsbase.FsBase()) |
| 177 | self.msg = Mock(msgbase.MsgBase()) |
| tierno | 9e87a7f | 2020-03-23 09:24:10 +0000 | [diff] [blame] | 178 | self.auth = Mock(authconn.Authconn(None, None, None)) |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 179 | self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth) |
| 180 | self.fake_session = {"username": self.test_name, "project_id": (test_pid,), "method": None, |
| 181 | "admin": True, "force": False, "public": False, "allow_show_user_project_role": True} |
| tierno | d774958 | 2020-05-28 10:41:10 +0000 | [diff] [blame] | 182 | self.topic.check_quota = Mock(return_value=None) # skip quota |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 183 | |
| 184 | def test_new_project(self): |
| 185 | with self.subTest(i=1): |
| 186 | rollback = [] |
| 187 | pid1 = str(uuid4()) |
| 188 | self.auth.get_project_list.return_value = [] |
| 189 | self.auth.create_project.return_value = pid1 |
| 190 | pid2, oid = self.topic.new(rollback, self.fake_session, {"name": self.test_name, "quotas": {}}) |
| 191 | self.assertEqual(len(rollback), 1, "Wrong rollback length") |
| 192 | self.assertEqual(pid2, pid1, "Wrong project identifier") |
| 193 | content = self.auth.create_project.call_args[0][0] |
| 194 | self.assertEqual(content["name"], self.test_name, "Wrong project name") |
| 195 | self.assertEqual(content["quotas"], {}, "Wrong quotas") |
| 196 | self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time") |
| 197 | self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time") |
| 198 | with self.subTest(i=2): |
| 199 | rollback = [] |
| 200 | with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e: |
| 201 | self.topic.new(rollback, self.fake_session, {"name": "other-project-name", "quotas": {"baditems": 10}}) |
| 202 | self.assertEqual(len(rollback), 0, "Wrong rollback length") |
| 203 | self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") |
| 204 | self.assertIn("format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'" |
| 205 | .format("baditems"), norm(str(e.exception)), "Wrong exception text") |
| 206 | |
| 207 | def test_edit_project(self): |
| 208 | now = time() |
| 209 | pid = str(uuid4()) |
| 210 | proj = {"_id": pid, "name": self.test_name, "_admin": {"created": now, "modified": now}} |
| 211 | with self.subTest(i=1): |
| 212 | self.auth.get_project_list.side_effect = [[proj], []] |
| 213 | new_name = "new-project-name" |
| 214 | quotas = {"vnfds": randint(0, 100), "nsds": randint(0, 100)} |
| 215 | self.topic.edit(self.fake_session, pid, {"name": new_name, "quotas": quotas}) |
| 216 | _id, content = self.auth.update_project.call_args[0] |
| 217 | self.assertEqual(_id, pid, "Wrong project identifier") |
| 218 | self.assertEqual(content["_id"], pid, "Wrong project identifier") |
| 219 | self.assertEqual(content["_admin"]["created"], now, "Wrong creation time") |
| 220 | self.assertGreater(content["_admin"]["modified"], now, "Wrong modification time") |
| 221 | self.assertEqual(content["name"], new_name, "Wrong project name") |
| 222 | self.assertEqual(content["quotas"], quotas, "Wrong quotas") |
| 223 | with self.subTest(i=2): |
| 224 | new_name = "other-project-name" |
| 225 | quotas = {"baditems": randint(0, 100)} |
| 226 | self.auth.get_project_list.side_effect = [[proj], []] |
| 227 | with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e: |
| 228 | self.topic.edit(self.fake_session, pid, {"name": new_name, "quotas": quotas}) |
| 229 | self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") |
| 230 | self.assertIn("format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'" |
| 231 | .format("baditems"), norm(str(e.exception)), "Wrong exception text") |
| 232 | |
| 233 | def test_conflict_on_new(self): |
| 234 | with self.subTest(i=1): |
| 235 | rollback = [] |
| 236 | pid = str(uuid4()) |
| 237 | with self.assertRaises(EngineException, msg="Accepted uuid as project name") as e: |
| 238 | self.topic.new(rollback, self.fake_session, {"name": pid}) |
| 239 | self.assertEqual(len(rollback), 0, "Wrong rollback length") |
| 240 | self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") |
| 241 | self.assertIn("project name '{}' cannot have an uuid format".format(pid), |
| 242 | norm(str(e.exception)), "Wrong exception text") |
| 243 | with self.subTest(i=2): |
| 244 | rollback = [] |
| 245 | self.auth.get_project_list.return_value = [{"_id": test_pid, "name": self.test_name}] |
| 246 | with self.assertRaises(EngineException, msg="Accepted existing project name") as e: |
| 247 | self.topic.new(rollback, self.fake_session, {"name": self.test_name}) |
| 248 | self.assertEqual(len(rollback), 0, "Wrong rollback length") |
| 249 | self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") |
| 250 | self.assertIn("project '{}' exists".format(self.test_name), |
| 251 | norm(str(e.exception)), "Wrong exception text") |
| 252 | |
| 253 | def test_conflict_on_edit(self): |
| 254 | with self.subTest(i=1): |
| 255 | self.auth.get_project_list.return_value = [{"_id": test_pid, "name": self.test_name}] |
| 256 | new_name = str(uuid4()) |
| 257 | with self.assertRaises(EngineException, msg="Accepted uuid as project name") as e: |
| 258 | self.topic.edit(self.fake_session, test_pid, {"name": new_name}) |
| 259 | self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") |
| 260 | self.assertIn("project name '{}' cannot have an uuid format".format(new_name), |
| 261 | norm(str(e.exception)), "Wrong exception text") |
| 262 | with self.subTest(i=2): |
| 263 | pid = str(uuid4()) |
| 264 | self.auth.get_project_list.return_value = [{"_id": pid, "name": "admin"}] |
| 265 | with self.assertRaises(EngineException, msg="Accepted renaming of project 'admin'") as e: |
| 266 | self.topic.edit(self.fake_session, pid, {"name": "new-name"}) |
| 267 | self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") |
| 268 | self.assertIn("you cannot rename project 'admin'", |
| 269 | norm(str(e.exception)), "Wrong exception text") |
| 270 | with self.subTest(i=3): |
| 271 | new_name = "new-project-name" |
| 272 | self.auth.get_project_list.side_effect = [[{"_id": test_pid, "name": self.test_name}], |
| 273 | [{"_id": str(uuid4()), "name": new_name}]] |
| 274 | with self.assertRaises(EngineException, msg="Accepted existing project name") as e: |
| 275 | self.topic.edit(self.fake_session, pid, {"name": new_name}) |
| 276 | self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") |
| 277 | self.assertIn("project '{}' is already used".format(new_name), |
| 278 | norm(str(e.exception)), "Wrong exception text") |
| 279 | |
| 280 | def test_delete_project(self): |
| 281 | with self.subTest(i=1): |
| 282 | pid = str(uuid4()) |
| 283 | self.auth.get_project.return_value = {"_id": pid, "name": "other-project-name"} |
| 284 | self.auth.delete_project.return_value = {"deleted": 1} |
| 285 | self.auth.get_user_list.return_value = [] |
| 286 | self.db.get_list.return_value = [] |
| 287 | rc = self.topic.delete(self.fake_session, pid) |
| 288 | self.assertEqual(rc, {"deleted": 1}, "Wrong project deletion return info") |
| 289 | self.assertEqual(self.auth.get_project.call_args[0][0], pid, "Wrong project identifier") |
| 290 | self.assertEqual(self.auth.delete_project.call_args[0][0], pid, "Wrong project identifier") |
| 291 | |
| 292 | def test_conflict_on_del(self): |
| 293 | with self.subTest(i=1): |
| 294 | self.auth.get_project.return_value = {"_id": test_pid, "name": self.test_name} |
| 295 | with self.assertRaises(EngineException, msg="Accepted deletion of own project") as e: |
| 296 | self.topic.delete(self.fake_session, self.test_name) |
| 297 | self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") |
| 298 | self.assertIn("you cannot delete your own project", norm(str(e.exception)), "Wrong exception text") |
| 299 | with self.subTest(i=2): |
| 300 | self.auth.get_project.return_value = {"_id": str(uuid4()), "name": "admin"} |
| 301 | with self.assertRaises(EngineException, msg="Accepted deletion of project 'admin'") as e: |
| 302 | self.topic.delete(self.fake_session, "admin") |
| 303 | self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") |
| 304 | self.assertIn("you cannot delete project 'admin'", norm(str(e.exception)), "Wrong exception text") |
| 305 | with self.subTest(i=3): |
| 306 | pid = str(uuid4()) |
| 307 | name = "other-project-name" |
| 308 | self.auth.get_project.return_value = {"_id": pid, "name": name} |
| 309 | self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name, |
| 310 | "project_role_mappings": [{"project": pid, "role": str(uuid4())}]}] |
| 311 | with self.assertRaises(EngineException, msg="Accepted deletion of used project") as e: |
| 312 | self.topic.delete(self.fake_session, pid) |
| 313 | self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") |
| 314 | self.assertIn("project '{}' ({}) is being used by user '{}'".format(name, pid, self.test_name), |
| 315 | norm(str(e.exception)), "Wrong exception text") |
| 316 | with self.subTest(i=4): |
| 317 | self.auth.get_user_list.return_value = [] |
| 318 | self.db.get_list.return_value = [{"_id": str(uuid4()), "id": self.test_name, |
| 319 | "_admin": {"projects_read": [pid], "projects_write": []}}] |
| 320 | with self.assertRaises(EngineException, msg="Accepted deletion of used project") as e: |
| 321 | self.topic.delete(self.fake_session, pid) |
| 322 | self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") |
| 323 | self.assertIn("project '{}' ({}) is being used by {} '{}'" |
| 324 | .format(name, pid, "vnf descriptor", self.test_name), |
| 325 | norm(str(e.exception)), "Wrong exception text") |
| 326 | |
| 327 | |
| 328 | class Test_RoleTopicAuth(TestCase): |
| 329 | |
| 330 | @classmethod |
| 331 | def setUpClass(cls): |
| 332 | cls.test_name = "test-role-topic" |
| 333 | cls.test_operations = ["tokens:get"] |
| 334 | |
| 335 | def setUp(self): |
| 336 | self.db = Mock(dbbase.DbBase()) |
| 337 | self.fs = Mock(fsbase.FsBase()) |
| 338 | self.msg = Mock(msgbase.MsgBase()) |
| tierno | 9e87a7f | 2020-03-23 09:24:10 +0000 | [diff] [blame] | 339 | self.auth = Mock(authconn.Authconn(None, None, None)) |
| 340 | self.auth.role_permissions = self.test_operations |
| 341 | self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth) |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 342 | self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None, |
| 343 | "admin": True, "force": False, "public": False, "allow_show_user_project_role": True} |
| tierno | d774958 | 2020-05-28 10:41:10 +0000 | [diff] [blame] | 344 | self.topic.check_quota = Mock(return_value=None) # skip quota |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 345 | |
| 346 | def test_new_role(self): |
| 347 | with self.subTest(i=1): |
| 348 | rollback = [] |
| 349 | rid1 = str(uuid4()) |
| 350 | perms_in = {"tokens": True} |
| 351 | perms_out = {"default": False, "admin": False, "tokens": True} |
| 352 | self.auth.get_role_list.return_value = [] |
| 353 | self.auth.create_role.return_value = rid1 |
| 354 | rid2, oid = self.topic.new(rollback, self.fake_session, {"name": self.test_name, "permissions": perms_in}) |
| 355 | self.assertEqual(len(rollback), 1, "Wrong rollback length") |
| 356 | self.assertEqual(rid2, rid1, "Wrong project identifier") |
| 357 | content = self.auth.create_role.call_args[0][0] |
| 358 | self.assertEqual(content["name"], self.test_name, "Wrong role name") |
| 359 | self.assertEqual(content["permissions"], perms_out, "Wrong permissions") |
| 360 | self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time") |
| 361 | self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time") |
| 362 | with self.subTest(i=2): |
| 363 | rollback = [] |
| 364 | with self.assertRaises(EngineException, msg="Accepted wrong permissions") as e: |
| 365 | self.topic.new(rollback, self.fake_session, |
| 366 | {"name": "other-role-name", "permissions": {"projects": True}}) |
| 367 | self.assertEqual(len(rollback), 0, "Wrong rollback length") |
| 368 | self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") |
| 369 | self.assertIn("invalid permission '{}'".format("projects"), |
| 370 | norm(str(e.exception)), "Wrong exception text") |
| 371 | |
| 372 | def test_edit_role(self): |
| 373 | now = time() |
| 374 | rid = str(uuid4()) |
| 375 | role = {"_id": rid, "name": self.test_name, "permissions": {"tokens": True}, |
| 376 | "_admin": {"created": now, "modified": now}} |
| 377 | with self.subTest(i=1): |
| 378 | self.auth.get_role_list.side_effect = [[role], []] |
| 379 | self.auth.get_role.return_value = role |
| 380 | new_name = "new-role-name" |
| 381 | perms_in = {"tokens": False, "tokens:get": True} |
| 382 | perms_out = {"default": False, "admin": False, "tokens": False, "tokens:get": True} |
| 383 | self.topic.edit(self.fake_session, rid, {"name": new_name, "permissions": perms_in}) |
| 384 | content = self.auth.update_role.call_args[0][0] |
| 385 | self.assertEqual(content["_id"], rid, "Wrong role identifier") |
| 386 | self.assertEqual(content["_admin"]["created"], now, "Wrong creation time") |
| 387 | self.assertGreater(content["_admin"]["modified"], now, "Wrong modification time") |
| 388 | self.assertEqual(content["name"], new_name, "Wrong role name") |
| 389 | self.assertEqual(content["permissions"], perms_out, "Wrong permissions") |
| 390 | with self.subTest(i=2): |
| 391 | new_name = "other-role-name" |
| 392 | perms_in = {"tokens": False, "tokens:post": True} |
| 393 | self.auth.get_role_list.side_effect = [[role], []] |
| 394 | with self.assertRaises(EngineException, msg="Accepted wrong permissions") as e: |
| 395 | self.topic.edit(self.fake_session, rid, {"name": new_name, "permissions": perms_in}) |
| 396 | self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") |
| 397 | self.assertIn("invalid permission '{}'".format("tokens:post"), |
| 398 | norm(str(e.exception)), "Wrong exception text") |
| 399 | |
| 400 | def test_delete_role(self): |
| 401 | with self.subTest(i=1): |
| 402 | rid = str(uuid4()) |
| 403 | role = {"_id": rid, "name": "other-role-name"} |
| 404 | self.auth.get_role_list.return_value = [role] |
| 405 | self.auth.get_role.return_value = role |
| 406 | self.auth.delete_role.return_value = {"deleted": 1} |
| 407 | self.auth.get_user_list.return_value = [] |
| 408 | rc = self.topic.delete(self.fake_session, rid) |
| 409 | self.assertEqual(rc, {"deleted": 1}, "Wrong role deletion return info") |
| 410 | self.assertEqual(self.auth.get_role_list.call_args[0][0]["_id"], rid, "Wrong role identifier") |
| 411 | self.assertEqual(self.auth.get_role.call_args[0][0], rid, "Wrong role identifier") |
| 412 | self.assertEqual(self.auth.delete_role.call_args[0][0], rid, "Wrong role identifier") |
| 413 | |
| 414 | def test_conflict_on_new(self): |
| 415 | with self.subTest(i=1): |
| 416 | rollback = [] |
| 417 | rid = str(uuid4()) |
| 418 | with self.assertRaises(EngineException, msg="Accepted uuid as role name") as e: |
| 419 | self.topic.new(rollback, self.fake_session, {"name": rid}) |
| 420 | self.assertEqual(len(rollback), 0, "Wrong rollback length") |
| 421 | self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") |
| 422 | self.assertIn("role name '{}' cannot have an uuid format".format(rid), |
| 423 | norm(str(e.exception)), "Wrong exception text") |
| 424 | with self.subTest(i=2): |
| 425 | rollback = [] |
| 426 | self.auth.get_role_list.return_value = [{"_id": str(uuid4()), "name": self.test_name}] |
| 427 | with self.assertRaises(EngineException, msg="Accepted existing role name") as e: |
| 428 | self.topic.new(rollback, self.fake_session, {"name": self.test_name}) |
| 429 | self.assertEqual(len(rollback), 0, "Wrong rollback length") |
| 430 | self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") |
| 431 | self.assertIn("role name '{}' exists".format(self.test_name), |
| 432 | norm(str(e.exception)), "Wrong exception text") |
| 433 | |
| 434 | def test_conflict_on_edit(self): |
| 435 | rid = str(uuid4()) |
| 436 | with self.subTest(i=1): |
| 437 | self.auth.get_role_list.return_value = [{"_id": rid, "name": self.test_name, "permissions": {}}] |
| 438 | new_name = str(uuid4()) |
| 439 | with self.assertRaises(EngineException, msg="Accepted uuid as role name") as e: |
| 440 | self.topic.edit(self.fake_session, rid, {"name": new_name}) |
| 441 | self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") |
| 442 | self.assertIn("role name '{}' cannot have an uuid format".format(new_name), |
| 443 | norm(str(e.exception)), "Wrong exception text") |
| 444 | for i, role_name in enumerate(["system_admin", "project_admin"], start=2): |
| 445 | with self.subTest(i=i): |
| 446 | rid = str(uuid4()) |
| 447 | self.auth.get_role.return_value = {"_id": rid, "name": role_name, "permissions": {}} |
| 448 | with self.assertRaises(EngineException, msg="Accepted renaming of role '{}'".format(role_name)) as e: |
| 449 | self.topic.edit(self.fake_session, rid, {"name": "new-name"}) |
| 450 | self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code") |
| 451 | self.assertIn("you cannot rename role '{}'".format(role_name), |
| 452 | norm(str(e.exception)), "Wrong exception text") |
| 453 | with self.subTest(i=i+1): |
| 454 | new_name = "new-role-name" |
| 455 | self.auth.get_role_list.side_effect = [[{"_id": rid, "name": self.test_name, "permissions": {}}], |
| 456 | [{"_id": str(uuid4()), "name": new_name, "permissions": {}}]] |
| 457 | self.auth.get_role.return_value = {"_id": rid, "name": self.test_name, "permissions": {}} |
| 458 | with self.assertRaises(EngineException, msg="Accepted existing role name") as e: |
| 459 | self.topic.edit(self.fake_session, rid, {"name": new_name}) |
| 460 | self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") |
| 461 | self.assertIn("role name '{}' exists".format(new_name), |
| 462 | norm(str(e.exception)), "Wrong exception text") |
| 463 | |
| 464 | def test_conflict_on_del(self): |
| 465 | for i, role_name in enumerate(["system_admin", "project_admin"], start=1): |
| 466 | with self.subTest(i=i): |
| 467 | rid = str(uuid4()) |
| 468 | role = {"_id": rid, "name": role_name} |
| 469 | self.auth.get_role_list.return_value = [role] |
| 470 | self.auth.get_role.return_value = role |
| 471 | with self.assertRaises(EngineException, msg="Accepted deletion of role '{}'".format(role_name)) as e: |
| 472 | self.topic.delete(self.fake_session, rid) |
| 473 | self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code") |
| 474 | self.assertIn("you cannot delete role '{}'".format(role_name), |
| 475 | norm(str(e.exception)), "Wrong exception text") |
| 476 | with self.subTest(i=i+1): |
| 477 | rid = str(uuid4()) |
| 478 | name = "other-role-name" |
| 479 | role = {"_id": rid, "name": name} |
| 480 | self.auth.get_role_list.return_value = [role] |
| 481 | self.auth.get_role.return_value = role |
| 482 | self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name, |
| 483 | "project_role_mappings": [{"project": str(uuid4()), "role": rid}]}] |
| 484 | with self.assertRaises(EngineException, msg="Accepted deletion of used role") as e: |
| 485 | self.topic.delete(self.fake_session, rid) |
| 486 | self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") |
| 487 | self.assertIn("role '{}' ({}) is being used by user '{}'".format(name, rid, self.test_name), |
| 488 | norm(str(e.exception)), "Wrong exception text") |
| 489 | |
| 490 | |
| 491 | class Test_UserTopicAuth(TestCase): |
| 492 | |
| 493 | @classmethod |
| 494 | def setUpClass(cls): |
| 495 | cls.test_name = "test-user-topic" |
| 496 | |
| 497 | def setUp(self): |
| 498 | self.db = Mock(dbbase.DbBase()) |
| 499 | self.fs = Mock(fsbase.FsBase()) |
| 500 | self.msg = Mock(msgbase.MsgBase()) |
| tierno | 9e87a7f | 2020-03-23 09:24:10 +0000 | [diff] [blame] | 501 | self.auth = Mock(authconn.Authconn(None, None, None)) |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 502 | self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth) |
| 503 | self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None, |
| 504 | "admin": True, "force": False, "public": False, "allow_show_user_project_role": True} |
| tierno | d774958 | 2020-05-28 10:41:10 +0000 | [diff] [blame] | 505 | self.topic.check_quota = Mock(return_value=None) # skip quota |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 506 | |
| 507 | def test_new_user(self): |
| 508 | uid1 = str(uuid4()) |
| 509 | pid = str(uuid4()) |
| 510 | self.auth.get_user_list.return_value = [] |
| 511 | self.auth.get_project.return_value = {"_id": pid, "name": "some_project"} |
| 512 | self.auth.create_user.return_value = {"_id": uid1, "username": self.test_name} |
| 513 | with self.subTest(i=1): |
| 514 | rollback = [] |
| 515 | rid = str(uuid4()) |
| 516 | self.auth.get_role.return_value = {"_id": rid, "name": "some_role"} |
| 517 | prms_in = [{"project": "some_project", "role": "some_role"}] |
| 518 | prms_out = [{"project": pid, "role": rid}] |
| 519 | uid2, oid = self.topic.new(rollback, self.fake_session, {"username": self.test_name, |
| 520 | "password": self.test_name, |
| 521 | "project_role_mappings": prms_in |
| 522 | }) |
| 523 | self.assertEqual(len(rollback), 1, "Wrong rollback length") |
| 524 | self.assertEqual(uid2, uid1, "Wrong project identifier") |
| 525 | content = self.auth.create_user.call_args[0][0] |
| 526 | self.assertEqual(content["username"], self.test_name, "Wrong project name") |
| 527 | self.assertEqual(content["password"], self.test_name, "Wrong password") |
| 528 | self.assertEqual(content["project_role_mappings"], prms_out, "Wrong project-role mappings") |
| 529 | self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time") |
| 530 | self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time") |
| 531 | with self.subTest(i=2): |
| 532 | rollback = [] |
| 533 | def_rid = str(uuid4()) |
| 534 | def_role = {"_id": def_rid, "name": "project_admin"} |
| 535 | self.auth.get_role.return_value = def_role |
| 536 | self.auth.get_role_list.return_value = [def_role] |
| 537 | prms_out = [{"project": pid, "role": def_rid}] |
| 538 | uid2, oid = self.topic.new(rollback, self.fake_session, {"username": self.test_name, |
| 539 | "password": self.test_name, |
| 540 | "projects": ["some_project"] |
| 541 | }) |
| 542 | self.assertEqual(len(rollback), 1, "Wrong rollback length") |
| 543 | self.assertEqual(uid2, uid1, "Wrong project identifier") |
| 544 | content = self.auth.create_user.call_args[0][0] |
| 545 | self.assertEqual(content["username"], self.test_name, "Wrong project name") |
| 546 | self.assertEqual(content["password"], self.test_name, "Wrong password") |
| 547 | self.assertEqual(content["project_role_mappings"], prms_out, "Wrong project-role mappings") |
| 548 | self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time") |
| 549 | self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time") |
| 550 | with self.subTest(i=3): |
| 551 | rollback = [] |
| 552 | with self.assertRaises(EngineException, msg="Accepted wrong project-role mappings") as e: |
| 553 | self.topic.new(rollback, self.fake_session, {"username": "other-project-name", |
| 554 | "password": "other-password", |
| 555 | "project_role_mappings": [{}] |
| 556 | }) |
| 557 | self.assertEqual(len(rollback), 0, "Wrong rollback length") |
| 558 | self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") |
| 559 | self.assertIn("format error at '{}' '{}'" |
| 560 | .format("project_role_mappings:{}", "'{}' is a required property").format(0, "project"), |
| 561 | norm(str(e.exception)), "Wrong exception text") |
| 562 | with self.subTest(i=4): |
| 563 | rollback = [] |
| 564 | with self.assertRaises(EngineException, msg="Accepted wrong projects") as e: |
| 565 | self.topic.new(rollback, self.fake_session, {"username": "other-project-name", |
| 566 | "password": "other-password", |
| 567 | "projects": [] |
| 568 | }) |
| 569 | self.assertEqual(len(rollback), 0, "Wrong rollback length") |
| 570 | self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") |
| 571 | self.assertIn("format error at '{}' '{}'" .format("projects", "{} is too short").format([]), |
| 572 | norm(str(e.exception)), "Wrong exception text") |
| 573 | |
| 574 | def test_edit_user(self): |
| 575 | now = time() |
| 576 | uid = str(uuid4()) |
| 577 | pid1 = str(uuid4()) |
| 578 | rid1 = str(uuid4()) |
| 579 | prms = [{"project": pid1, "project_name": "project-1", "role": rid1, "role_name": "role-1"}] |
| 580 | user = {"_id": uid, "username": self.test_name, "project_role_mappings": prms, |
| 581 | "_admin": {"created": now, "modified": now}} |
| 582 | with self.subTest(i=1): |
| 583 | self.auth.get_user_list.side_effect = [[user], []] |
| 584 | self.auth.get_user.return_value = user |
| 585 | pid2 = str(uuid4()) |
| 586 | rid2 = str(uuid4()) |
| 587 | self.auth.get_project.side_effect = [{"_id": pid2, "name": "project-2"}, |
| 588 | {"_id": pid1, "name": "project-1"}] |
| 589 | self.auth.get_role.side_effect = [{"_id": rid2, "name": "role-2"}, |
| 590 | {"_id": rid1, "name": "role-1"}] |
| 591 | new_name = "new-user-name" |
| 592 | new_pasw = "new-password" |
| 593 | add_prms = [{"project": pid2, "role": rid2}] |
| 594 | rem_prms = [{"project": pid1, "role": rid1}] |
| 595 | self.topic.edit(self.fake_session, uid, {"username": new_name, "password": new_pasw, |
| 596 | "add_project_role_mappings": add_prms, |
| 597 | "remove_project_role_mappings": rem_prms |
| 598 | }) |
| 599 | content = self.auth.update_user.call_args[0][0] |
| 600 | self.assertEqual(content["_id"], uid, "Wrong user identifier") |
| 601 | self.assertEqual(content["username"], new_name, "Wrong user name") |
| 602 | self.assertEqual(content["password"], new_pasw, "Wrong user password") |
| 603 | self.assertEqual(content["add_project_role_mappings"], add_prms, "Wrong project-role mappings to add") |
| 604 | self.assertEqual(content["remove_project_role_mappings"], prms, "Wrong project-role mappings to remove") |
| 605 | with self.subTest(i=2): |
| 606 | new_name = "other-user-name" |
| 607 | new_prms = [{}] |
| 608 | self.auth.get_role_list.side_effect = [[user], []] |
| Frank Bryden | deba68e | 2020-07-27 13:55:11 +0000 | [diff] [blame] | 609 | self.auth.get_user_list.side_effect = [[user]] |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 610 | with self.assertRaises(EngineException, msg="Accepted wrong project-role mappings") as e: |
| 611 | self.topic.edit(self.fake_session, uid, {"username": new_name, "project_role_mappings": new_prms}) |
| 612 | self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") |
| 613 | self.assertIn("format error at '{}' '{}'" |
| 614 | .format("project_role_mappings:{}", "'{}' is a required property").format(0, "project"), |
| 615 | norm(str(e.exception)), "Wrong exception text") |
| 616 | |
| 617 | def test_delete_user(self): |
| 618 | with self.subTest(i=1): |
| 619 | uid = str(uuid4()) |
| 620 | self.fake_session["username"] = self.test_name |
| 621 | user = user = {"_id": uid, "username": "other-user-name", "project_role_mappings": []} |
| 622 | self.auth.get_user.return_value = user |
| 623 | self.auth.delete_user.return_value = {"deleted": 1} |
| 624 | rc = self.topic.delete(self.fake_session, uid) |
| 625 | self.assertEqual(rc, {"deleted": 1}, "Wrong user deletion return info") |
| 626 | self.assertEqual(self.auth.get_user.call_args[0][0], uid, "Wrong user identifier") |
| 627 | self.assertEqual(self.auth.delete_user.call_args[0][0], uid, "Wrong user identifier") |
| 628 | |
| 629 | def test_conflict_on_new(self): |
| 630 | with self.subTest(i=1): |
| 631 | rollback = [] |
| 632 | uid = str(uuid4()) |
| 633 | with self.assertRaises(EngineException, msg="Accepted uuid as username") as e: |
| 634 | self.topic.new(rollback, self.fake_session, {"username": uid, "password": self.test_name, |
| 635 | "projects": [test_pid]}) |
| 636 | self.assertEqual(len(rollback), 0, "Wrong rollback length") |
| 637 | self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") |
| 638 | self.assertIn("username '{}' cannot have a uuid format".format(uid), |
| 639 | norm(str(e.exception)), "Wrong exception text") |
| 640 | with self.subTest(i=2): |
| 641 | rollback = [] |
| 642 | self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name}] |
| 643 | with self.assertRaises(EngineException, msg="Accepted existing username") as e: |
| 644 | self.topic.new(rollback, self.fake_session, {"username": self.test_name, "password": self.test_name, |
| 645 | "projects": [test_pid]}) |
| 646 | self.assertEqual(len(rollback), 0, "Wrong rollback length") |
| 647 | self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") |
| 648 | self.assertIn("username '{}' is already used".format(self.test_name), |
| 649 | norm(str(e.exception)), "Wrong exception text") |
| 650 | with self.subTest(i=3): |
| 651 | rollback = [] |
| 652 | self.auth.get_user_list.return_value = [] |
| 653 | self.auth.get_role_list.side_effect = [[], []] |
| 654 | with self.assertRaises(AuthconnNotFoundException, msg="Accepted user without default role") as e: |
| 655 | self.topic.new(rollback, self.fake_session, {"username": self.test_name, "password": self.test_name, |
| 656 | "projects": [str(uuid4())]}) |
| 657 | self.assertEqual(len(rollback), 0, "Wrong rollback length") |
| 658 | self.assertEqual(e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code") |
| 659 | self.assertIn("can't find default role for user '{}'".format(self.test_name), |
| 660 | norm(str(e.exception)), "Wrong exception text") |
| 661 | |
| 662 | def test_conflict_on_edit(self): |
| 663 | uid = str(uuid4()) |
| 664 | with self.subTest(i=1): |
| 665 | self.auth.get_user_list.return_value = [{"_id": uid, "username": self.test_name}] |
| 666 | new_name = str(uuid4()) |
| 667 | with self.assertRaises(EngineException, msg="Accepted uuid as username") as e: |
| 668 | self.topic.edit(self.fake_session, uid, {"username": new_name}) |
| 669 | self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") |
| 670 | self.assertIn("username '{}' cannot have an uuid format".format(new_name), |
| 671 | norm(str(e.exception)), "Wrong exception text") |
| 672 | with self.subTest(i=2): |
| 673 | self.auth.get_user_list.return_value = [{"_id": uid, "username": self.test_name}] |
| 674 | self.auth.get_role_list.side_effect = [[], []] |
| 675 | with self.assertRaises(AuthconnNotFoundException, msg="Accepted user without default role") as e: |
| 676 | self.topic.edit(self.fake_session, uid, {"projects": [str(uuid4())]}) |
| 677 | self.assertEqual(e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code") |
| 678 | self.assertIn("can't find a default role for user '{}'".format(self.test_name), |
| 679 | norm(str(e.exception)), "Wrong exception text") |
| 680 | with self.subTest(i=3): |
| 681 | admin_uid = str(uuid4()) |
| 682 | self.auth.get_user_list.return_value = [{"_id": admin_uid, "username": "admin"}] |
| 683 | with self.assertRaises(EngineException, msg="Accepted removing system_admin role from admin user") as e: |
| 684 | self.topic.edit(self.fake_session, admin_uid, |
| 685 | {"remove_project_role_mappings": [{"project": "admin", "role": "system_admin"}]}) |
| 686 | self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code") |
| 687 | self.assertIn("you cannot remove system_admin role from admin user", |
| 688 | norm(str(e.exception)), "Wrong exception text") |
| 689 | with self.subTest(i=4): |
| 690 | new_name = "new-user-name" |
| 691 | self.auth.get_user_list.side_effect = [[{"_id": uid, "name": self.test_name}], |
| 692 | [{"_id": str(uuid4()), "name": new_name}]] |
| 693 | with self.assertRaises(EngineException, msg="Accepted existing username") as e: |
| 694 | self.topic.edit(self.fake_session, uid, {"username": new_name}) |
| 695 | self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") |
| 696 | self.assertIn("username '{}' is already used".format(new_name), |
| 697 | norm(str(e.exception)), "Wrong exception text") |
| 698 | |
| 699 | def test_conflict_on_del(self): |
| 700 | with self.subTest(i=1): |
| 701 | uid = str(uuid4()) |
| 702 | self.fake_session["username"] = self.test_name |
| 703 | user = user = {"_id": uid, "username": self.test_name, "project_role_mappings": []} |
| 704 | self.auth.get_user.return_value = user |
| 705 | with self.assertRaises(EngineException, msg="Accepted deletion of own user") as e: |
| 706 | self.topic.delete(self.fake_session, uid) |
| 707 | self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") |
| 708 | self.assertIn("you cannot delete your own login user", norm(str(e.exception)), "Wrong exception text") |
| 709 | |
| 710 | |
| 711 | class Test_CommonVimWimSdn(TestCase): |
| 712 | |
| 713 | @classmethod |
| 714 | def setUpClass(cls): |
| 715 | cls.test_name = "test-cim-topic" # CIM = Common Infrastructure Manager |
| 716 | |
| 717 | def setUp(self): |
| 718 | self.db = Mock(dbbase.DbBase()) |
| 719 | self.fs = Mock(fsbase.FsBase()) |
| 720 | self.msg = Mock(msgbase.MsgBase()) |
| tierno | 9e87a7f | 2020-03-23 09:24:10 +0000 | [diff] [blame] | 721 | self.auth = Mock(authconn.Authconn(None, None, None)) |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 722 | self.topic = CommonVimWimSdn(self.db, self.fs, self.msg, self.auth) |
| 723 | # Use WIM schemas for testing because they are the simplest |
| tierno | f5f2e3f | 2020-03-23 14:42:10 +0000 | [diff] [blame] | 724 | self.topic._send_msg = Mock() |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 725 | self.topic.topic = "wims" |
| 726 | self.topic.schema_new = validation.wim_account_new_schema |
| 727 | self.topic.schema_edit = validation.wim_account_edit_schema |
| 728 | self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None, |
| 729 | "admin": True, "force": False, "public": False, "allow_show_user_project_role": True} |
| tierno | d774958 | 2020-05-28 10:41:10 +0000 | [diff] [blame] | 730 | self.topic.check_quota = Mock(return_value=None) # skip quota |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 731 | |
| 732 | def test_new_cvws(self): |
| 733 | test_url = "http://0.0.0.0:0" |
| 734 | with self.subTest(i=1): |
| 735 | rollback = [] |
| 736 | test_type = "fake" |
| 737 | self.db.get_one.return_value = None |
| 738 | self.db.create.side_effect = lambda self, content: content["_id"] |
| 739 | cid, oid = self.topic.new(rollback, self.fake_session, |
| 740 | {"name": self.test_name, "wim_url": test_url, "wim_type": test_type}) |
| 741 | self.assertEqual(len(rollback), 1, "Wrong rollback length") |
| 742 | args = self.db.create.call_args[0] |
| 743 | content = args[1] |
| 744 | self.assertEqual(args[0], self.topic.topic, "Wrong topic") |
| 745 | self.assertEqual(content["_id"], cid, "Wrong CIM identifier") |
| 746 | self.assertEqual(content["name"], self.test_name, "Wrong CIM name") |
| 747 | self.assertEqual(content["wim_url"], test_url, "Wrong URL") |
| 748 | self.assertEqual(content["wim_type"], test_type, "Wrong CIM type") |
| 749 | self.assertEqual(content["schema_version"], "1.11", "Wrong schema version") |
| 750 | self.assertEqual(content["op_id"], oid, "Wrong operation identifier") |
| 751 | self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time") |
| 752 | self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time") |
| 753 | self.assertEqual(content["_admin"]["operationalState"], "PROCESSING", "Wrong operational state") |
| 754 | self.assertEqual(content["_admin"]["projects_read"], [test_pid], "Wrong read-only projects") |
| 755 | self.assertEqual(content["_admin"]["projects_write"], [test_pid], "Wrong read/write projects") |
| 756 | self.assertIsNone(content["_admin"]["current_operation"], "Wrong current operation") |
| 757 | self.assertEqual(len(content["_admin"]["operations"]), 1, "Wrong number of operations") |
| 758 | operation = content["_admin"]["operations"][0] |
| 759 | self.assertEqual(operation["lcmOperationType"], "create", "Wrong operation type") |
| 760 | self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state") |
| 761 | self.assertGreater(operation["startTime"], content["_admin"]["created"], "Wrong operation start time") |
| 762 | self.assertGreater(operation["statusEnteredTime"], content["_admin"]["created"], |
| 763 | "Wrong operation status enter time") |
| 764 | self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status info") |
| 765 | self.assertIsNone(operation["operationParams"], "Wrong operation parameters") |
| tierno | b3d0a0e | 2019-11-13 15:57:51 +0000 | [diff] [blame] | 766 | # This test is disabled. From Feature 8030 we admit all WIM/SDN types |
| 767 | # with self.subTest(i=2): |
| 768 | # rollback = [] |
| 769 | # test_type = "bad_type" |
| 770 | # with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e: |
| 771 | # self.topic.new(rollback, self.fake_session, |
| 772 | # {"name": self.test_name, "wim_url": test_url, "wim_type": test_type}) |
| 773 | # self.assertEqual(len(rollback), 0, "Wrong rollback length") |
| 774 | # self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") |
| 775 | # self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type,""), |
| 776 | # norm(str(e.exception)), "Wrong exception text") |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 777 | |
| 778 | def test_conflict_on_new(self): |
| 779 | with self.subTest(i=1): |
| 780 | rollback = [] |
| 781 | test_url = "http://0.0.0.0:0" |
| 782 | test_type = "fake" |
| 783 | self.db.get_one.return_value = {"_id": str(uuid4()), "name": self.test_name} |
| 784 | with self.assertRaises(EngineException, msg="Accepted existing CIM name") as e: |
| 785 | self.topic.new(rollback, self.fake_session, |
| 786 | {"name": self.test_name, "wim_url": test_url, "wim_type": test_type}) |
| 787 | self.assertEqual(len(rollback), 0, "Wrong rollback length") |
| 788 | self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") |
| 789 | self.assertIn("name '{}' already exists for {}".format(self.test_name, self.topic.topic), |
| 790 | norm(str(e.exception)), "Wrong exception text") |
| 791 | |
| 792 | def test_edit_cvws(self): |
| 793 | now = time() |
| 794 | cid = str(uuid4()) |
| 795 | test_url = "http://0.0.0.0:0" |
| 796 | test_type = "fake" |
| 797 | cvws = {"_id": cid, "name": self.test_name, "wim_url": test_url, "wim_type": test_type, |
| 798 | "_admin": {"created": now, "modified": now, "operations": [{"lcmOperationType": "create"}]}} |
| 799 | with self.subTest(i=1): |
| 800 | new_name = "new-cim-name" |
| 801 | new_url = "https://1.1.1.1:1" |
| 802 | new_type = "onos" |
| 803 | self.db.get_one.side_effect = [cvws, None] |
| 804 | self.db.replace.return_value = {"updated": 1} |
| 805 | # self.db.encrypt.side_effect = [b64str(), b64str()] |
| 806 | self.topic.edit(self.fake_session, cid, {"name": new_name, "wim_url": new_url, "wim_type": new_type}) |
| 807 | args = self.db.replace.call_args[0] |
| 808 | content = args[2] |
| 809 | self.assertEqual(args[0], self.topic.topic, "Wrong topic") |
| 810 | self.assertEqual(args[1], cid, "Wrong CIM identifier") |
| 811 | self.assertEqual(content["_id"], cid, "Wrong CIM identifier") |
| 812 | self.assertEqual(content["name"], new_name, "Wrong CIM name") |
| 813 | self.assertEqual(content["wim_type"], new_type, "Wrong CIM type") |
| 814 | self.assertEqual(content["wim_url"], new_url, "Wrong URL") |
| 815 | self.assertEqual(content["_admin"]["created"], now, "Wrong creation time") |
| 816 | self.assertGreater(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time") |
| 817 | self.assertEqual(len(content["_admin"]["operations"]), 2, "Wrong number of operations") |
| 818 | operation = content["_admin"]["operations"][1] |
| 819 | self.assertEqual(operation["lcmOperationType"], "edit", "Wrong operation type") |
| 820 | self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state") |
| 821 | self.assertGreater(operation["startTime"], content["_admin"]["modified"], "Wrong operation start time") |
| 822 | self.assertGreater(operation["statusEnteredTime"], content["_admin"]["modified"], |
| 823 | "Wrong operation status enter time") |
| 824 | self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status info") |
| 825 | self.assertIsNone(operation["operationParams"], "Wrong operation parameters") |
| 826 | with self.subTest(i=2): |
| Frank Bryden | deba68e | 2020-07-27 13:55:11 +0000 | [diff] [blame] | 827 | self.db.get_one.side_effect = [cvws] |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 828 | with self.assertRaises(EngineException, msg="Accepted wrong property") as e: |
| 829 | self.topic.edit(self.fake_session, str(uuid4()), {"name": "new-name", "extra_prop": "anything"}) |
| 830 | self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") |
| 831 | self.assertIn("format error '{}'".format("additional properties are not allowed ('{}' was unexpected)"). |
| 832 | format("extra_prop"), |
| 833 | norm(str(e.exception)), "Wrong exception text") |
| 834 | |
| 835 | def test_conflict_on_edit(self): |
| 836 | with self.subTest(i=1): |
| 837 | cid = str(uuid4()) |
| 838 | new_name = "new-cim-name" |
| 839 | self.db.get_one.side_effect = [{"_id": cid, "name": self.test_name}, |
| 840 | {"_id": str(uuid4()), "name": new_name}] |
| 841 | with self.assertRaises(EngineException, msg="Accepted existing CIM name") as e: |
| 842 | self.topic.edit(self.fake_session, cid, {"name": new_name}) |
| 843 | self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") |
| 844 | self.assertIn("name '{}' already exists for {}".format(new_name, self.topic.topic), |
| 845 | norm(str(e.exception)), "Wrong exception text") |
| 846 | |
| 847 | def test_delete_cvws(self): |
| 848 | cid = str(uuid4()) |
| 849 | ro_pid = str(uuid4()) |
| 850 | rw_pid = str(uuid4()) |
| 851 | cvws = {"_id": cid, "name": self.test_name} |
| delacruzramo | 35c998b | 2019-11-21 11:09:16 +0100 | [diff] [blame] | 852 | self.db.get_list.return_value = [] |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 853 | with self.subTest(i=1): |
| 854 | cvws["_admin"] = {"projects_read": [test_pid, ro_pid, rw_pid], "projects_write": [test_pid, rw_pid]} |
| 855 | self.db.get_one.return_value = cvws |
| 856 | oid = self.topic.delete(self.fake_session, cid) |
| 857 | self.assertIsNone(oid, "Wrong operation identifier") |
| 858 | self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic") |
| 859 | self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier") |
| 860 | self.assertEqual(self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic") |
| 861 | self.assertEqual(self.db.set_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier") |
| tierno | f5f2e3f | 2020-03-23 14:42:10 +0000 | [diff] [blame] | 862 | self.assertEqual(self.db.set_one.call_args[1]["update_dict"], None, |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 863 | "Wrong read-only projects update") |
| tierno | 20e74d2 | 2020-06-22 12:17:22 +0000 | [diff] [blame] | 864 | self.assertEqual(self.db.set_one.call_args[1]["pull_list"], |
| 865 | {"_admin.projects_read": (test_pid,), "_admin.projects_write": (test_pid,)}, |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 866 | "Wrong read/write projects update") |
| tierno | f5f2e3f | 2020-03-23 14:42:10 +0000 | [diff] [blame] | 867 | self.topic._send_msg.assert_not_called() |
| delacruzramo | 35c998b | 2019-11-21 11:09:16 +0100 | [diff] [blame] | 868 | with self.subTest(i=2): |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 869 | now = time() |
| 870 | cvws["_admin"] = {"projects_read": [test_pid], "projects_write": [test_pid], "operations": []} |
| 871 | self.db.get_one.return_value = cvws |
| 872 | oid = self.topic.delete(self.fake_session, cid) |
| 873 | self.assertEqual(oid, cid+":0", "Wrong operation identifier") |
| 874 | self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic") |
| 875 | self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier") |
| 876 | self.assertEqual(self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic") |
| 877 | self.assertEqual(self.db.set_one.call_args[0][1]["_id"], cid, "Wrong user identifier") |
| tierno | f5f2e3f | 2020-03-23 14:42:10 +0000 | [diff] [blame] | 878 | self.assertEqual(self.db.set_one.call_args[1]["update_dict"], {"_admin.to_delete": True}, |
| 879 | "Wrong _admin.to_delete update") |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 880 | operation = self.db.set_one.call_args[1]["push"]["_admin.operations"] |
| 881 | self.assertEqual(operation["lcmOperationType"], "delete", "Wrong operation type") |
| 882 | self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state") |
| 883 | self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status") |
| 884 | self.assertIsNone(operation["operationParams"], "Wrong operation parameters") |
| 885 | self.assertGreater(operation["startTime"], now, "Wrong operation start time") |
| 886 | self.assertGreater(operation["statusEnteredTime"], now, "Wrong operation status enter time") |
| tierno | f5f2e3f | 2020-03-23 14:42:10 +0000 | [diff] [blame] | 887 | self.topic._send_msg.assert_called_once_with("delete", {"_id": cid, "op_id": cid + ":0"}, not_send_msg=None) |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 888 | with self.subTest(i=3): |
| 889 | cvws["_admin"] = {"projects_read": [], "projects_write": [], "operations": []} |
| 890 | self.db.get_one.return_value = cvws |
| tierno | f5f2e3f | 2020-03-23 14:42:10 +0000 | [diff] [blame] | 891 | self.topic._send_msg.reset_mock() |
| 892 | self.db.get_one.reset_mock() |
| 893 | self.db.del_one.reset_mock() |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 894 | self.fake_session["force"] = True # to force deletion |
| tierno | f5f2e3f | 2020-03-23 14:42:10 +0000 | [diff] [blame] | 895 | self.fake_session["admin"] = True # to force deletion |
| 896 | self.fake_session["project_id"] = [] # to force deletion |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 897 | oid = self.topic.delete(self.fake_session, cid) |
| 898 | self.assertIsNone(oid, "Wrong operation identifier") |
| 899 | self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic") |
| 900 | self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier") |
| 901 | self.assertEqual(self.db.del_one.call_args[0][0], self.topic.topic, "Wrong topic") |
| 902 | self.assertEqual(self.db.del_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier") |
| tierno | f5f2e3f | 2020-03-23 14:42:10 +0000 | [diff] [blame] | 903 | self.topic._send_msg.assert_called_once_with("deleted", {"_id": cid, "op_id": None}, not_send_msg=None) |
| delacruzramo | 79e40f4 | 2019-10-10 16:36:40 +0200 | [diff] [blame] | 904 | |
| 905 | |
| 906 | if __name__ == '__main__': |
| 907 | unittest.main() |