From: delacruzramo Date: Thu, 10 Oct 2019 14:36:40 +0000 (+0200) Subject: Unit tests for admin_topics X-Git-Tag: v7.0.0rc1~19 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=refs%2Fchanges%2F42%2F8042%2F11;p=osm%2FNBI.git Unit tests for admin_topics Change-Id: I2b52b479e981bfa6a9aa1bb94328fff49dd4b262 Signed-off-by: delacruzramo --- diff --git a/osm_nbi/admin_topics.py b/osm_nbi/admin_topics.py index e887afb..ada9c7b 100644 --- a/osm_nbi/admin_topics.py +++ b/osm_nbi/admin_topics.py @@ -780,7 +780,7 @@ class ProjectTopicAuth(ProjectTopic): project_name = edit_content.get("name") if project_name != final_content["name"]: # It is a true renaming if is_valid_uuid(project_name): - raise EngineException("project name '{}' cannot have an uuid format".format(project_name), + raise EngineException("project name '{}' cannot have an uuid format".format(project_name), HTTPStatus.UNPROCESSABLE_ENTITY) if final_content["name"] == "admin": @@ -1011,6 +1011,11 @@ class RoleTopicAuth(BaseTopic): :param indata: data to be inserted :return: None or raises EngineException """ + # check name is not uuid + role_name = indata.get("name") + if is_valid_uuid(role_name): + raise EngineException("role name '{}' cannot have an uuid format".format(role_name), + HTTPStatus.UNPROCESSABLE_ENTITY) # check name not exists name = indata["name"] # if self.db.get_one(self.topic, {"name": indata.get("name")}, fail_on_empty=False, fail_on_more=False): @@ -1032,6 +1037,17 @@ class RoleTopicAuth(BaseTopic): if "admin" not in final_content["permissions"]: final_content["permissions"]["admin"] = False + # check name is not uuid + role_name = edit_content.get("name") + if is_valid_uuid(role_name): + raise EngineException("role name '{}' cannot have an uuid format".format(role_name), + HTTPStatus.UNPROCESSABLE_ENTITY) + + # Check renaming of admin roles + role = self.auth.get_role(_id) + if role["name"] in ["system_admin", "project_admin"]: + raise EngineException("You cannot rename role '{}'".format(role["name"]), http_code=HTTPStatus.FORBIDDEN) + # check name not exists if "name" in edit_content: role_name = edit_content["name"] diff --git a/osm_nbi/tests/test_admin_topics.py b/osm_nbi/tests/test_admin_topics.py new file mode 100755 index 0000000..739b68c --- /dev/null +++ b/osm_nbi/tests/test_admin_topics.py @@ -0,0 +1,765 @@ +#! /usr/bin/python3 +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__author__ = "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com" +__date__ = "$2019-10-019" + +import unittest +from unittest import TestCase +from unittest.mock import Mock +from uuid import uuid4 +from http import HTTPStatus +from time import time +from random import randint +from osm_common import dbbase, fsbase, msgbase +from osm_nbi import authconn, validation +from osm_nbi.admin_topics import ProjectTopicAuth, RoleTopicAuth, UserTopicAuth, CommonVimWimSdn +from osm_nbi.engine import EngineException +from osm_nbi.authconn import AuthconnNotFoundException + + +test_pid = str(uuid4()) +test_name = "test-user" + + +def norm(str): + """Normalize string for checking""" + return ' '.join(str.strip().split()).lower() + + +class Test_ProjectTopicAuth(TestCase): + + @classmethod + def setUpClass(cls): + cls.test_name = "test-project-topic" + + def setUp(self): + self.db = Mock(dbbase.DbBase()) + self.fs = Mock(fsbase.FsBase()) + self.msg = Mock(msgbase.MsgBase()) + self.auth = Mock(authconn.Authconn(None, None, None)) + self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth) + self.fake_session = {"username": self.test_name, "project_id": (test_pid,), "method": None, + "admin": True, "force": False, "public": False, "allow_show_user_project_role": True} + + def test_new_project(self): + with self.subTest(i=1): + rollback = [] + pid1 = str(uuid4()) + self.auth.get_project_list.return_value = [] + self.auth.create_project.return_value = pid1 + pid2, oid = self.topic.new(rollback, self.fake_session, {"name": self.test_name, "quotas": {}}) + self.assertEqual(len(rollback), 1, "Wrong rollback length") + self.assertEqual(pid2, pid1, "Wrong project identifier") + content = self.auth.create_project.call_args[0][0] + self.assertEqual(content["name"], self.test_name, "Wrong project name") + self.assertEqual(content["quotas"], {}, "Wrong quotas") + self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time") + self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time") + with self.subTest(i=2): + rollback = [] + with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e: + self.topic.new(rollback, self.fake_session, {"name": "other-project-name", "quotas": {"baditems": 10}}) + self.assertEqual(len(rollback), 0, "Wrong rollback length") + self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") + self.assertIn("format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'" + .format("baditems"), norm(str(e.exception)), "Wrong exception text") + + def test_edit_project(self): + now = time() + pid = str(uuid4()) + proj = {"_id": pid, "name": self.test_name, "_admin": {"created": now, "modified": now}} + with self.subTest(i=1): + self.auth.get_project_list.side_effect = [[proj], []] + new_name = "new-project-name" + quotas = {"vnfds": randint(0, 100), "nsds": randint(0, 100)} + self.topic.edit(self.fake_session, pid, {"name": new_name, "quotas": quotas}) + _id, content = self.auth.update_project.call_args[0] + self.assertEqual(_id, pid, "Wrong project identifier") + self.assertEqual(content["_id"], pid, "Wrong project identifier") + self.assertEqual(content["_admin"]["created"], now, "Wrong creation time") + self.assertGreater(content["_admin"]["modified"], now, "Wrong modification time") + self.assertEqual(content["name"], new_name, "Wrong project name") + self.assertEqual(content["quotas"], quotas, "Wrong quotas") + with self.subTest(i=2): + new_name = "other-project-name" + quotas = {"baditems": randint(0, 100)} + self.auth.get_project_list.side_effect = [[proj], []] + with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e: + self.topic.edit(self.fake_session, pid, {"name": new_name, "quotas": quotas}) + self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") + self.assertIn("format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'" + .format("baditems"), norm(str(e.exception)), "Wrong exception text") + + def test_conflict_on_new(self): + with self.subTest(i=1): + rollback = [] + pid = str(uuid4()) + with self.assertRaises(EngineException, msg="Accepted uuid as project name") as e: + self.topic.new(rollback, self.fake_session, {"name": pid}) + self.assertEqual(len(rollback), 0, "Wrong rollback length") + self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") + self.assertIn("project name '{}' cannot have an uuid format".format(pid), + norm(str(e.exception)), "Wrong exception text") + with self.subTest(i=2): + rollback = [] + self.auth.get_project_list.return_value = [{"_id": test_pid, "name": self.test_name}] + with self.assertRaises(EngineException, msg="Accepted existing project name") as e: + self.topic.new(rollback, self.fake_session, {"name": self.test_name}) + self.assertEqual(len(rollback), 0, "Wrong rollback length") + self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") + self.assertIn("project '{}' exists".format(self.test_name), + norm(str(e.exception)), "Wrong exception text") + + def test_conflict_on_edit(self): + with self.subTest(i=1): + self.auth.get_project_list.return_value = [{"_id": test_pid, "name": self.test_name}] + new_name = str(uuid4()) + with self.assertRaises(EngineException, msg="Accepted uuid as project name") as e: + self.topic.edit(self.fake_session, test_pid, {"name": new_name}) + self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") + self.assertIn("project name '{}' cannot have an uuid format".format(new_name), + norm(str(e.exception)), "Wrong exception text") + with self.subTest(i=2): + pid = str(uuid4()) + self.auth.get_project_list.return_value = [{"_id": pid, "name": "admin"}] + with self.assertRaises(EngineException, msg="Accepted renaming of project 'admin'") as e: + self.topic.edit(self.fake_session, pid, {"name": "new-name"}) + self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") + self.assertIn("you cannot rename project 'admin'", + norm(str(e.exception)), "Wrong exception text") + with self.subTest(i=3): + new_name = "new-project-name" + self.auth.get_project_list.side_effect = [[{"_id": test_pid, "name": self.test_name}], + [{"_id": str(uuid4()), "name": new_name}]] + with self.assertRaises(EngineException, msg="Accepted existing project name") as e: + self.topic.edit(self.fake_session, pid, {"name": new_name}) + self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") + self.assertIn("project '{}' is already used".format(new_name), + norm(str(e.exception)), "Wrong exception text") + + def test_delete_project(self): + with self.subTest(i=1): + pid = str(uuid4()) + self.auth.get_project.return_value = {"_id": pid, "name": "other-project-name"} + self.auth.delete_project.return_value = {"deleted": 1} + self.auth.get_user_list.return_value = [] + self.db.get_list.return_value = [] + rc = self.topic.delete(self.fake_session, pid) + self.assertEqual(rc, {"deleted": 1}, "Wrong project deletion return info") + self.assertEqual(self.auth.get_project.call_args[0][0], pid, "Wrong project identifier") + self.assertEqual(self.auth.delete_project.call_args[0][0], pid, "Wrong project identifier") + + def test_conflict_on_del(self): + with self.subTest(i=1): + self.auth.get_project.return_value = {"_id": test_pid, "name": self.test_name} + with self.assertRaises(EngineException, msg="Accepted deletion of own project") as e: + self.topic.delete(self.fake_session, self.test_name) + self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") + self.assertIn("you cannot delete your own project", norm(str(e.exception)), "Wrong exception text") + with self.subTest(i=2): + self.auth.get_project.return_value = {"_id": str(uuid4()), "name": "admin"} + with self.assertRaises(EngineException, msg="Accepted deletion of project 'admin'") as e: + self.topic.delete(self.fake_session, "admin") + self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") + self.assertIn("you cannot delete project 'admin'", norm(str(e.exception)), "Wrong exception text") + with self.subTest(i=3): + pid = str(uuid4()) + name = "other-project-name" + self.auth.get_project.return_value = {"_id": pid, "name": name} + self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name, + "project_role_mappings": [{"project": pid, "role": str(uuid4())}]}] + with self.assertRaises(EngineException, msg="Accepted deletion of used project") as e: + self.topic.delete(self.fake_session, pid) + self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") + self.assertIn("project '{}' ({}) is being used by user '{}'".format(name, pid, self.test_name), + norm(str(e.exception)), "Wrong exception text") + with self.subTest(i=4): + self.auth.get_user_list.return_value = [] + self.db.get_list.return_value = [{"_id": str(uuid4()), "id": self.test_name, + "_admin": {"projects_read": [pid], "projects_write": []}}] + with self.assertRaises(EngineException, msg="Accepted deletion of used project") as e: + self.topic.delete(self.fake_session, pid) + self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") + self.assertIn("project '{}' ({}) is being used by {} '{}'" + .format(name, pid, "vnf descriptor", self.test_name), + norm(str(e.exception)), "Wrong exception text") + + +class Test_RoleTopicAuth(TestCase): + + @classmethod + def setUpClass(cls): + cls.test_name = "test-role-topic" + cls.test_operations = ["tokens:get"] + + def setUp(self): + self.db = Mock(dbbase.DbBase()) + self.fs = Mock(fsbase.FsBase()) + self.msg = Mock(msgbase.MsgBase()) + self.auth = Mock(authconn.Authconn(None, None, None)) + self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth, self.test_operations) + self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None, + "admin": True, "force": False, "public": False, "allow_show_user_project_role": True} + + def test_new_role(self): + with self.subTest(i=1): + rollback = [] + rid1 = str(uuid4()) + perms_in = {"tokens": True} + perms_out = {"default": False, "admin": False, "tokens": True} + self.auth.get_role_list.return_value = [] + self.auth.create_role.return_value = rid1 + rid2, oid = self.topic.new(rollback, self.fake_session, {"name": self.test_name, "permissions": perms_in}) + self.assertEqual(len(rollback), 1, "Wrong rollback length") + self.assertEqual(rid2, rid1, "Wrong project identifier") + content = self.auth.create_role.call_args[0][0] + self.assertEqual(content["name"], self.test_name, "Wrong role name") + self.assertEqual(content["permissions"], perms_out, "Wrong permissions") + self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time") + self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time") + with self.subTest(i=2): + rollback = [] + with self.assertRaises(EngineException, msg="Accepted wrong permissions") as e: + self.topic.new(rollback, self.fake_session, + {"name": "other-role-name", "permissions": {"projects": True}}) + self.assertEqual(len(rollback), 0, "Wrong rollback length") + self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") + self.assertIn("invalid permission '{}'".format("projects"), + norm(str(e.exception)), "Wrong exception text") + + def test_edit_role(self): + now = time() + rid = str(uuid4()) + role = {"_id": rid, "name": self.test_name, "permissions": {"tokens": True}, + "_admin": {"created": now, "modified": now}} + with self.subTest(i=1): + self.auth.get_role_list.side_effect = [[role], []] + self.auth.get_role.return_value = role + new_name = "new-role-name" + perms_in = {"tokens": False, "tokens:get": True} + perms_out = {"default": False, "admin": False, "tokens": False, "tokens:get": True} + self.topic.edit(self.fake_session, rid, {"name": new_name, "permissions": perms_in}) + content = self.auth.update_role.call_args[0][0] + self.assertEqual(content["_id"], rid, "Wrong role identifier") + self.assertEqual(content["_admin"]["created"], now, "Wrong creation time") + self.assertGreater(content["_admin"]["modified"], now, "Wrong modification time") + self.assertEqual(content["name"], new_name, "Wrong role name") + self.assertEqual(content["permissions"], perms_out, "Wrong permissions") + with self.subTest(i=2): + new_name = "other-role-name" + perms_in = {"tokens": False, "tokens:post": True} + self.auth.get_role_list.side_effect = [[role], []] + with self.assertRaises(EngineException, msg="Accepted wrong permissions") as e: + self.topic.edit(self.fake_session, rid, {"name": new_name, "permissions": perms_in}) + self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") + self.assertIn("invalid permission '{}'".format("tokens:post"), + norm(str(e.exception)), "Wrong exception text") + + def test_delete_role(self): + with self.subTest(i=1): + rid = str(uuid4()) + role = {"_id": rid, "name": "other-role-name"} + self.auth.get_role_list.return_value = [role] + self.auth.get_role.return_value = role + self.auth.delete_role.return_value = {"deleted": 1} + self.auth.get_user_list.return_value = [] + rc = self.topic.delete(self.fake_session, rid) + self.assertEqual(rc, {"deleted": 1}, "Wrong role deletion return info") + self.assertEqual(self.auth.get_role_list.call_args[0][0]["_id"], rid, "Wrong role identifier") + self.assertEqual(self.auth.get_role.call_args[0][0], rid, "Wrong role identifier") + self.assertEqual(self.auth.delete_role.call_args[0][0], rid, "Wrong role identifier") + + def test_conflict_on_new(self): + with self.subTest(i=1): + rollback = [] + rid = str(uuid4()) + with self.assertRaises(EngineException, msg="Accepted uuid as role name") as e: + self.topic.new(rollback, self.fake_session, {"name": rid}) + self.assertEqual(len(rollback), 0, "Wrong rollback length") + self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") + self.assertIn("role name '{}' cannot have an uuid format".format(rid), + norm(str(e.exception)), "Wrong exception text") + with self.subTest(i=2): + rollback = [] + self.auth.get_role_list.return_value = [{"_id": str(uuid4()), "name": self.test_name}] + with self.assertRaises(EngineException, msg="Accepted existing role name") as e: + self.topic.new(rollback, self.fake_session, {"name": self.test_name}) + self.assertEqual(len(rollback), 0, "Wrong rollback length") + self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") + self.assertIn("role name '{}' exists".format(self.test_name), + norm(str(e.exception)), "Wrong exception text") + + def test_conflict_on_edit(self): + rid = str(uuid4()) + with self.subTest(i=1): + self.auth.get_role_list.return_value = [{"_id": rid, "name": self.test_name, "permissions": {}}] + new_name = str(uuid4()) + with self.assertRaises(EngineException, msg="Accepted uuid as role name") as e: + self.topic.edit(self.fake_session, rid, {"name": new_name}) + self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") + self.assertIn("role name '{}' cannot have an uuid format".format(new_name), + norm(str(e.exception)), "Wrong exception text") + for i, role_name in enumerate(["system_admin", "project_admin"], start=2): + with self.subTest(i=i): + rid = str(uuid4()) + self.auth.get_role.return_value = {"_id": rid, "name": role_name, "permissions": {}} + with self.assertRaises(EngineException, msg="Accepted renaming of role '{}'".format(role_name)) as e: + self.topic.edit(self.fake_session, rid, {"name": "new-name"}) + self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code") + self.assertIn("you cannot rename role '{}'".format(role_name), + norm(str(e.exception)), "Wrong exception text") + with self.subTest(i=i+1): + new_name = "new-role-name" + self.auth.get_role_list.side_effect = [[{"_id": rid, "name": self.test_name, "permissions": {}}], + [{"_id": str(uuid4()), "name": new_name, "permissions": {}}]] + self.auth.get_role.return_value = {"_id": rid, "name": self.test_name, "permissions": {}} + with self.assertRaises(EngineException, msg="Accepted existing role name") as e: + self.topic.edit(self.fake_session, rid, {"name": new_name}) + self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") + self.assertIn("role name '{}' exists".format(new_name), + norm(str(e.exception)), "Wrong exception text") + + def test_conflict_on_del(self): + for i, role_name in enumerate(["system_admin", "project_admin"], start=1): + with self.subTest(i=i): + rid = str(uuid4()) + role = {"_id": rid, "name": role_name} + self.auth.get_role_list.return_value = [role] + self.auth.get_role.return_value = role + with self.assertRaises(EngineException, msg="Accepted deletion of role '{}'".format(role_name)) as e: + self.topic.delete(self.fake_session, rid) + self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code") + self.assertIn("you cannot delete role '{}'".format(role_name), + norm(str(e.exception)), "Wrong exception text") + with self.subTest(i=i+1): + rid = str(uuid4()) + name = "other-role-name" + role = {"_id": rid, "name": name} + self.auth.get_role_list.return_value = [role] + self.auth.get_role.return_value = role + self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name, + "project_role_mappings": [{"project": str(uuid4()), "role": rid}]}] + with self.assertRaises(EngineException, msg="Accepted deletion of used role") as e: + self.topic.delete(self.fake_session, rid) + self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") + self.assertIn("role '{}' ({}) is being used by user '{}'".format(name, rid, self.test_name), + norm(str(e.exception)), "Wrong exception text") + + +class Test_UserTopicAuth(TestCase): + + @classmethod + def setUpClass(cls): + cls.test_name = "test-user-topic" + + def setUp(self): + self.db = Mock(dbbase.DbBase()) + self.fs = Mock(fsbase.FsBase()) + self.msg = Mock(msgbase.MsgBase()) + self.auth = Mock(authconn.Authconn(None, None, None)) + self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth) + self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None, + "admin": True, "force": False, "public": False, "allow_show_user_project_role": True} + + def test_new_user(self): + uid1 = str(uuid4()) + pid = str(uuid4()) + self.auth.get_user_list.return_value = [] + self.auth.get_project.return_value = {"_id": pid, "name": "some_project"} + self.auth.create_user.return_value = {"_id": uid1, "username": self.test_name} + with self.subTest(i=1): + rollback = [] + rid = str(uuid4()) + self.auth.get_role.return_value = {"_id": rid, "name": "some_role"} + prms_in = [{"project": "some_project", "role": "some_role"}] + prms_out = [{"project": pid, "role": rid}] + uid2, oid = self.topic.new(rollback, self.fake_session, {"username": self.test_name, + "password": self.test_name, + "project_role_mappings": prms_in + }) + self.assertEqual(len(rollback), 1, "Wrong rollback length") + self.assertEqual(uid2, uid1, "Wrong project identifier") + content = self.auth.create_user.call_args[0][0] + self.assertEqual(content["username"], self.test_name, "Wrong project name") + self.assertEqual(content["password"], self.test_name, "Wrong password") + self.assertEqual(content["project_role_mappings"], prms_out, "Wrong project-role mappings") + self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time") + self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time") + with self.subTest(i=2): + rollback = [] + def_rid = str(uuid4()) + def_role = {"_id": def_rid, "name": "project_admin"} + self.auth.get_role.return_value = def_role + self.auth.get_role_list.return_value = [def_role] + prms_out = [{"project": pid, "role": def_rid}] + uid2, oid = self.topic.new(rollback, self.fake_session, {"username": self.test_name, + "password": self.test_name, + "projects": ["some_project"] + }) + self.assertEqual(len(rollback), 1, "Wrong rollback length") + self.assertEqual(uid2, uid1, "Wrong project identifier") + content = self.auth.create_user.call_args[0][0] + self.assertEqual(content["username"], self.test_name, "Wrong project name") + self.assertEqual(content["password"], self.test_name, "Wrong password") + self.assertEqual(content["project_role_mappings"], prms_out, "Wrong project-role mappings") + self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time") + self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time") + with self.subTest(i=3): + rollback = [] + with self.assertRaises(EngineException, msg="Accepted wrong project-role mappings") as e: + self.topic.new(rollback, self.fake_session, {"username": "other-project-name", + "password": "other-password", + "project_role_mappings": [{}] + }) + self.assertEqual(len(rollback), 0, "Wrong rollback length") + self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") + self.assertIn("format error at '{}' '{}'" + .format("project_role_mappings:{}", "'{}' is a required property").format(0, "project"), + norm(str(e.exception)), "Wrong exception text") + with self.subTest(i=4): + rollback = [] + with self.assertRaises(EngineException, msg="Accepted wrong projects") as e: + self.topic.new(rollback, self.fake_session, {"username": "other-project-name", + "password": "other-password", + "projects": [] + }) + self.assertEqual(len(rollback), 0, "Wrong rollback length") + self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") + self.assertIn("format error at '{}' '{}'" .format("projects", "{} is too short").format([]), + norm(str(e.exception)), "Wrong exception text") + + def test_edit_user(self): + now = time() + uid = str(uuid4()) + pid1 = str(uuid4()) + rid1 = str(uuid4()) + prms = [{"project": pid1, "project_name": "project-1", "role": rid1, "role_name": "role-1"}] + user = {"_id": uid, "username": self.test_name, "project_role_mappings": prms, + "_admin": {"created": now, "modified": now}} + with self.subTest(i=1): + self.auth.get_user_list.side_effect = [[user], []] + self.auth.get_user.return_value = user + pid2 = str(uuid4()) + rid2 = str(uuid4()) + self.auth.get_project.side_effect = [{"_id": pid2, "name": "project-2"}, + {"_id": pid1, "name": "project-1"}] + self.auth.get_role.side_effect = [{"_id": rid2, "name": "role-2"}, + {"_id": rid1, "name": "role-1"}] + new_name = "new-user-name" + new_pasw = "new-password" + add_prms = [{"project": pid2, "role": rid2}] + rem_prms = [{"project": pid1, "role": rid1}] + self.topic.edit(self.fake_session, uid, {"username": new_name, "password": new_pasw, + "add_project_role_mappings": add_prms, + "remove_project_role_mappings": rem_prms + }) + content = self.auth.update_user.call_args[0][0] + self.assertEqual(content["_id"], uid, "Wrong user identifier") + self.assertEqual(content["username"], new_name, "Wrong user name") + self.assertEqual(content["password"], new_pasw, "Wrong user password") + self.assertEqual(content["add_project_role_mappings"], add_prms, "Wrong project-role mappings to add") + self.assertEqual(content["remove_project_role_mappings"], prms, "Wrong project-role mappings to remove") + with self.subTest(i=2): + new_name = "other-user-name" + new_prms = [{}] + self.auth.get_role_list.side_effect = [[user], []] + with self.assertRaises(EngineException, msg="Accepted wrong project-role mappings") as e: + self.topic.edit(self.fake_session, uid, {"username": new_name, "project_role_mappings": new_prms}) + self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") + self.assertIn("format error at '{}' '{}'" + .format("project_role_mappings:{}", "'{}' is a required property").format(0, "project"), + norm(str(e.exception)), "Wrong exception text") + + def test_delete_user(self): + with self.subTest(i=1): + uid = str(uuid4()) + self.fake_session["username"] = self.test_name + user = user = {"_id": uid, "username": "other-user-name", "project_role_mappings": []} + self.auth.get_user.return_value = user + self.auth.delete_user.return_value = {"deleted": 1} + rc = self.topic.delete(self.fake_session, uid) + self.assertEqual(rc, {"deleted": 1}, "Wrong user deletion return info") + self.assertEqual(self.auth.get_user.call_args[0][0], uid, "Wrong user identifier") + self.assertEqual(self.auth.delete_user.call_args[0][0], uid, "Wrong user identifier") + + def test_conflict_on_new(self): + with self.subTest(i=1): + rollback = [] + uid = str(uuid4()) + with self.assertRaises(EngineException, msg="Accepted uuid as username") as e: + self.topic.new(rollback, self.fake_session, {"username": uid, "password": self.test_name, + "projects": [test_pid]}) + self.assertEqual(len(rollback), 0, "Wrong rollback length") + self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") + self.assertIn("username '{}' cannot have a uuid format".format(uid), + norm(str(e.exception)), "Wrong exception text") + with self.subTest(i=2): + rollback = [] + self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name}] + with self.assertRaises(EngineException, msg="Accepted existing username") as e: + self.topic.new(rollback, self.fake_session, {"username": self.test_name, "password": self.test_name, + "projects": [test_pid]}) + self.assertEqual(len(rollback), 0, "Wrong rollback length") + self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") + self.assertIn("username '{}' is already used".format(self.test_name), + norm(str(e.exception)), "Wrong exception text") + with self.subTest(i=3): + rollback = [] + self.auth.get_user_list.return_value = [] + self.auth.get_role_list.side_effect = [[], []] + with self.assertRaises(AuthconnNotFoundException, msg="Accepted user without default role") as e: + self.topic.new(rollback, self.fake_session, {"username": self.test_name, "password": self.test_name, + "projects": [str(uuid4())]}) + self.assertEqual(len(rollback), 0, "Wrong rollback length") + self.assertEqual(e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code") + self.assertIn("can't find default role for user '{}'".format(self.test_name), + norm(str(e.exception)), "Wrong exception text") + + def test_conflict_on_edit(self): + uid = str(uuid4()) + with self.subTest(i=1): + self.auth.get_user_list.return_value = [{"_id": uid, "username": self.test_name}] + new_name = str(uuid4()) + with self.assertRaises(EngineException, msg="Accepted uuid as username") as e: + self.topic.edit(self.fake_session, uid, {"username": new_name}) + self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") + self.assertIn("username '{}' cannot have an uuid format".format(new_name), + norm(str(e.exception)), "Wrong exception text") + with self.subTest(i=2): + self.auth.get_user_list.return_value = [{"_id": uid, "username": self.test_name}] + self.auth.get_role_list.side_effect = [[], []] + with self.assertRaises(AuthconnNotFoundException, msg="Accepted user without default role") as e: + self.topic.edit(self.fake_session, uid, {"projects": [str(uuid4())]}) + self.assertEqual(e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code") + self.assertIn("can't find a default role for user '{}'".format(self.test_name), + norm(str(e.exception)), "Wrong exception text") + with self.subTest(i=3): + admin_uid = str(uuid4()) + self.auth.get_user_list.return_value = [{"_id": admin_uid, "username": "admin"}] + with self.assertRaises(EngineException, msg="Accepted removing system_admin role from admin user") as e: + self.topic.edit(self.fake_session, admin_uid, + {"remove_project_role_mappings": [{"project": "admin", "role": "system_admin"}]}) + self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code") + self.assertIn("you cannot remove system_admin role from admin user", + norm(str(e.exception)), "Wrong exception text") + with self.subTest(i=4): + new_name = "new-user-name" + self.auth.get_user_list.side_effect = [[{"_id": uid, "name": self.test_name}], + [{"_id": str(uuid4()), "name": new_name}]] + with self.assertRaises(EngineException, msg="Accepted existing username") as e: + self.topic.edit(self.fake_session, uid, {"username": new_name}) + self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") + self.assertIn("username '{}' is already used".format(new_name), + norm(str(e.exception)), "Wrong exception text") + + def test_conflict_on_del(self): + with self.subTest(i=1): + uid = str(uuid4()) + self.fake_session["username"] = self.test_name + user = user = {"_id": uid, "username": self.test_name, "project_role_mappings": []} + self.auth.get_user.return_value = user + with self.assertRaises(EngineException, msg="Accepted deletion of own user") as e: + self.topic.delete(self.fake_session, uid) + self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") + self.assertIn("you cannot delete your own login user", norm(str(e.exception)), "Wrong exception text") + + +class Test_CommonVimWimSdn(TestCase): + + @classmethod + def setUpClass(cls): + cls.test_name = "test-cim-topic" # CIM = Common Infrastructure Manager + + def setUp(self): + self.db = Mock(dbbase.DbBase()) + self.fs = Mock(fsbase.FsBase()) + self.msg = Mock(msgbase.MsgBase()) + self.auth = Mock(authconn.Authconn(None, None, None)) + self.topic = CommonVimWimSdn(self.db, self.fs, self.msg, self.auth) + # Use WIM schemas for testing because they are the simplest + self.topic.topic = "wims" + self.topic.schema_new = validation.wim_account_new_schema + self.topic.schema_edit = validation.wim_account_edit_schema + self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None, + "admin": True, "force": False, "public": False, "allow_show_user_project_role": True} + + def test_new_cvws(self): + test_url = "http://0.0.0.0:0" + with self.subTest(i=1): + rollback = [] + test_type = "fake" + self.db.get_one.return_value = None + self.db.create.side_effect = lambda self, content: content["_id"] + cid, oid = self.topic.new(rollback, self.fake_session, + {"name": self.test_name, "wim_url": test_url, "wim_type": test_type}) + self.assertEqual(len(rollback), 1, "Wrong rollback length") + args = self.db.create.call_args[0] + content = args[1] + self.assertEqual(args[0], self.topic.topic, "Wrong topic") + self.assertEqual(content["_id"], cid, "Wrong CIM identifier") + self.assertEqual(content["name"], self.test_name, "Wrong CIM name") + self.assertEqual(content["wim_url"], test_url, "Wrong URL") + self.assertEqual(content["wim_type"], test_type, "Wrong CIM type") + self.assertEqual(content["schema_version"], "1.11", "Wrong schema version") + self.assertEqual(content["op_id"], oid, "Wrong operation identifier") + self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time") + self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time") + self.assertEqual(content["_admin"]["operationalState"], "PROCESSING", "Wrong operational state") + self.assertEqual(content["_admin"]["projects_read"], [test_pid], "Wrong read-only projects") + self.assertEqual(content["_admin"]["projects_write"], [test_pid], "Wrong read/write projects") + self.assertIsNone(content["_admin"]["current_operation"], "Wrong current operation") + self.assertEqual(len(content["_admin"]["operations"]), 1, "Wrong number of operations") + operation = content["_admin"]["operations"][0] + self.assertEqual(operation["lcmOperationType"], "create", "Wrong operation type") + self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state") + self.assertGreater(operation["startTime"], content["_admin"]["created"], "Wrong operation start time") + self.assertGreater(operation["statusEnteredTime"], content["_admin"]["created"], + "Wrong operation status enter time") + self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status info") + self.assertIsNone(operation["operationParams"], "Wrong operation parameters") + with self.subTest(i=2): + rollback = [] + test_type = "bad_type" + with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e: + self.topic.new(rollback, self.fake_session, + {"name": self.test_name, "wim_url": test_url, "wim_type": test_type}) + self.assertEqual(len(rollback), 0, "Wrong rollback length") + self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") + self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type, ""), + norm(str(e.exception)), "Wrong exception text") + + def test_conflict_on_new(self): + with self.subTest(i=1): + rollback = [] + test_url = "http://0.0.0.0:0" + test_type = "fake" + self.db.get_one.return_value = {"_id": str(uuid4()), "name": self.test_name} + with self.assertRaises(EngineException, msg="Accepted existing CIM name") as e: + self.topic.new(rollback, self.fake_session, + {"name": self.test_name, "wim_url": test_url, "wim_type": test_type}) + self.assertEqual(len(rollback), 0, "Wrong rollback length") + self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") + self.assertIn("name '{}' already exists for {}".format(self.test_name, self.topic.topic), + norm(str(e.exception)), "Wrong exception text") + + def test_edit_cvws(self): + now = time() + cid = str(uuid4()) + test_url = "http://0.0.0.0:0" + test_type = "fake" + cvws = {"_id": cid, "name": self.test_name, "wim_url": test_url, "wim_type": test_type, + "_admin": {"created": now, "modified": now, "operations": [{"lcmOperationType": "create"}]}} + with self.subTest(i=1): + new_name = "new-cim-name" + new_url = "https://1.1.1.1:1" + new_type = "onos" + self.db.get_one.side_effect = [cvws, None] + self.db.replace.return_value = {"updated": 1} + # self.db.encrypt.side_effect = [b64str(), b64str()] + self.topic.edit(self.fake_session, cid, {"name": new_name, "wim_url": new_url, "wim_type": new_type}) + args = self.db.replace.call_args[0] + content = args[2] + self.assertEqual(args[0], self.topic.topic, "Wrong topic") + self.assertEqual(args[1], cid, "Wrong CIM identifier") + self.assertEqual(content["_id"], cid, "Wrong CIM identifier") + self.assertEqual(content["name"], new_name, "Wrong CIM name") + self.assertEqual(content["wim_type"], new_type, "Wrong CIM type") + self.assertEqual(content["wim_url"], new_url, "Wrong URL") + self.assertEqual(content["_admin"]["created"], now, "Wrong creation time") + self.assertGreater(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time") + self.assertEqual(len(content["_admin"]["operations"]), 2, "Wrong number of operations") + operation = content["_admin"]["operations"][1] + self.assertEqual(operation["lcmOperationType"], "edit", "Wrong operation type") + self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state") + self.assertGreater(operation["startTime"], content["_admin"]["modified"], "Wrong operation start time") + self.assertGreater(operation["statusEnteredTime"], content["_admin"]["modified"], + "Wrong operation status enter time") + self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status info") + self.assertIsNone(operation["operationParams"], "Wrong operation parameters") + with self.subTest(i=2): + with self.assertRaises(EngineException, msg="Accepted wrong property") as e: + self.topic.edit(self.fake_session, str(uuid4()), {"name": "new-name", "extra_prop": "anything"}) + self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") + self.assertIn("format error '{}'".format("additional properties are not allowed ('{}' was unexpected)"). + format("extra_prop"), + norm(str(e.exception)), "Wrong exception text") + + def test_conflict_on_edit(self): + with self.subTest(i=1): + cid = str(uuid4()) + new_name = "new-cim-name" + self.db.get_one.side_effect = [{"_id": cid, "name": self.test_name}, + {"_id": str(uuid4()), "name": new_name}] + with self.assertRaises(EngineException, msg="Accepted existing CIM name") as e: + self.topic.edit(self.fake_session, cid, {"name": new_name}) + self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") + self.assertIn("name '{}' already exists for {}".format(new_name, self.topic.topic), + norm(str(e.exception)), "Wrong exception text") + + def test_delete_cvws(self): + cid = str(uuid4()) + ro_pid = str(uuid4()) + rw_pid = str(uuid4()) + cvws = {"_id": cid, "name": self.test_name} + with self.subTest(i=1): + cvws["_admin"] = {"projects_read": [test_pid, ro_pid, rw_pid], "projects_write": [test_pid, rw_pid]} + self.db.get_one.return_value = cvws + oid = self.topic.delete(self.fake_session, cid) + self.assertIsNone(oid, "Wrong operation identifier") + self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic") + self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier") + self.assertEqual(self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic") + self.assertEqual(self.db.set_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier") + self.assertEqual(self.db.set_one.call_args[1]["update_dict"]["_admin.projects_read"], [ro_pid, rw_pid], + "Wrong read-only projects update") + self.assertEqual(self.db.set_one.call_args[1]["update_dict"]["_admin.projects_write"], [rw_pid], + "Wrong read/write projects update") + with self.subTest(i=3): + now = time() + cvws["_admin"] = {"projects_read": [test_pid], "projects_write": [test_pid], "operations": []} + self.db.get_one.return_value = cvws + oid = self.topic.delete(self.fake_session, cid) + self.assertEqual(oid, cid+":0", "Wrong operation identifier") + self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic") + self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier") + self.assertEqual(self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic") + self.assertEqual(self.db.set_one.call_args[0][1]["_id"], cid, "Wrong user identifier") + update_dict = self.db.set_one.call_args[1]["update_dict"] + self.assertEqual(update_dict["_admin.projects_read"], [], "Wrong read-only projects update") + self.assertEqual(update_dict["_admin.projects_write"], [], "Wrong read/write projects update") + self.assertEqual(update_dict["_admin.to_delete"], True, "Wrong deletion mark") + operation = self.db.set_one.call_args[1]["push"]["_admin.operations"] + self.assertEqual(operation["lcmOperationType"], "delete", "Wrong operation type") + self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state") + self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status") + self.assertIsNone(operation["operationParams"], "Wrong operation parameters") + self.assertGreater(operation["startTime"], now, "Wrong operation start time") + self.assertGreater(operation["statusEnteredTime"], now, "Wrong operation status enter time") + with self.subTest(i=3): + cvws["_admin"] = {"projects_read": [], "projects_write": [], "operations": []} + self.db.get_one.return_value = cvws + self.fake_session["force"] = True # to force deletion + oid = self.topic.delete(self.fake_session, cid) + self.assertIsNone(oid, "Wrong operation identifier") + self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic") + self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier") + self.assertEqual(self.db.del_one.call_args[0][0], self.topic.topic, "Wrong topic") + self.assertEqual(self.db.del_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier") + + +if __name__ == '__main__': + unittest.main()