import unittest
from unittest import TestCase
-from unittest.mock import Mock
+from unittest.mock import Mock, patch, call
from uuid import uuid4
from http import HTTPStatus
from time import time
from random import randint
from osm_common import dbbase, fsbase, msgbase
from osm_nbi import authconn, validation
-from osm_nbi.admin_topics import ProjectTopicAuth, RoleTopicAuth, UserTopicAuth, CommonVimWimSdn
+from osm_nbi.admin_topics import (
+ ProjectTopicAuth,
+ RoleTopicAuth,
+ UserTopicAuth,
+ CommonVimWimSdn,
+ VcaTopic,
+)
from osm_nbi.engine import EngineException
from osm_nbi.authconn import AuthconnNotFoundException
return ' '.join(str.strip().split()).lower()
+class TestVcaTopic(TestCase):
+ def setUp(self):
+ self.db = Mock(dbbase.DbBase())
+ self.fs = Mock(fsbase.FsBase())
+ self.msg = Mock(msgbase.MsgBase())
+ self.auth = Mock(authconn.Authconn(None, None, None))
+ self.vca_topic = VcaTopic(self.db, self.fs, self.msg, self.auth)
+
+ @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_new")
+ def test_format_on_new(self, mock_super_format_on_new):
+ content = {
+ "_id": "id",
+ "secret": "encrypted_secret",
+ "cacert": "encrypted_cacert",
+ }
+ self.db.encrypt.side_effect = ["secret", "cacert"]
+ mock_super_format_on_new.return_value = "1234"
+
+ oid = self.vca_topic.format_on_new(content)
+
+ self.assertEqual(oid, "1234")
+ self.assertEqual(content["secret"], "secret")
+ self.assertEqual(content["cacert"], "cacert")
+ self.db.encrypt.assert_has_calls(
+ [
+ call("encrypted_secret", schema_version="1.11", salt="id"),
+ call("encrypted_cacert", schema_version="1.11", salt="id"),
+ ]
+ )
+ mock_super_format_on_new.assert_called_with(content, None, False)
+
+ @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_edit")
+ def test_format_on_edit(self, mock_super_format_on_edit):
+ edit_content = {
+ "_id": "id",
+ "secret": "encrypted_secret",
+ "cacert": "encrypted_cacert",
+ }
+ final_content = {
+ "_id": "id",
+ "schema_version": "1.11",
+ }
+ self.db.encrypt.side_effect = ["secret", "cacert"]
+ mock_super_format_on_edit.return_value = "1234"
+
+ oid = self.vca_topic.format_on_edit(final_content, edit_content)
+
+ self.assertEqual(oid, "1234")
+ self.assertEqual(final_content["secret"], "secret")
+ self.assertEqual(final_content["cacert"], "cacert")
+ self.db.encrypt.assert_has_calls(
+ [
+ call("encrypted_secret", schema_version="1.11", salt="id"),
+ call("encrypted_cacert", schema_version="1.11", salt="id"),
+ ]
+ )
+ mock_super_format_on_edit.assert_called()
+
+ @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
+ def test_check_conflict_on_del(self, mock_check_conflict_on_del):
+ session = {
+ "project_id": "project-id",
+ "force": False,
+ }
+ _id = "vca-id"
+ db_content = {}
+
+ self.db.get_list.return_value = None
+
+ self.vca_topic.check_conflict_on_del(session, _id, db_content)
+
+ self.db.get_list.assert_called_with(
+ "vim_accounts",
+ {"vca": _id, '_admin.projects_read.cont': 'project-id'},
+ )
+ mock_check_conflict_on_del.assert_called_with(session, _id, db_content)
+
+ @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
+ def test_check_conflict_on_del_force(self, mock_check_conflict_on_del):
+ session = {
+ "project_id": "project-id",
+ "force": True,
+ }
+ _id = "vca-id"
+ db_content = {}
+
+ self.vca_topic.check_conflict_on_del(session, _id, db_content)
+
+ self.db.get_list.assert_not_called()
+ mock_check_conflict_on_del.assert_not_called()
+
+ @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
+ def test_check_conflict_on_del_with_conflict(self, mock_check_conflict_on_del):
+ session = {
+ "project_id": "project-id",
+ "force": False,
+ }
+ _id = "vca-id"
+ db_content = {}
+
+ self.db.get_list.return_value = {"_id": "vim", "vca": "vca-id"}
+
+ with self.assertRaises(EngineException) as context:
+ self.vca_topic.check_conflict_on_del(session, _id, db_content)
+ self.assertEqual(
+ context.exception,
+ EngineException(
+ "There is at least one VIM account using this vca",
+ http_code=HTTPStatus.CONFLICT
+ )
+ )
+
+ self.db.get_list.assert_called_with(
+ "vim_accounts",
+ {"vca": _id, '_admin.projects_read.cont': 'project-id'},
+ )
+ mock_check_conflict_on_del.assert_not_called()
+
+
class Test_ProjectTopicAuth(TestCase):
@classmethod
self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth)
self.fake_session = {"username": self.test_name, "project_id": (test_pid,), "method": None,
"admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
+ self.topic.check_quota = Mock(return_value=None) # skip quota
def test_new_project(self):
with self.subTest(i=1):
self.fs = Mock(fsbase.FsBase())
self.msg = Mock(msgbase.MsgBase())
self.auth = Mock(authconn.Authconn(None, None, None))
- self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth, self.test_operations)
+ self.auth.role_permissions = self.test_operations
+ self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth)
self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None,
"admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
+ self.topic.check_quota = Mock(return_value=None) # skip quota
def test_new_role(self):
with self.subTest(i=1):
self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth)
self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None,
"admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
+ self.topic.check_quota = Mock(return_value=None) # skip quota
def test_new_user(self):
uid1 = str(uuid4())
new_name = "other-user-name"
new_prms = [{}]
self.auth.get_role_list.side_effect = [[user], []]
+ self.auth.get_user_list.side_effect = [[user]]
with self.assertRaises(EngineException, msg="Accepted wrong project-role mappings") as e:
self.topic.edit(self.fake_session, uid, {"username": new_name, "project_role_mappings": new_prms})
self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
self.auth = Mock(authconn.Authconn(None, None, None))
self.topic = CommonVimWimSdn(self.db, self.fs, self.msg, self.auth)
# Use WIM schemas for testing because they are the simplest
+ self.topic._send_msg = Mock()
self.topic.topic = "wims"
self.topic.schema_new = validation.wim_account_new_schema
self.topic.schema_edit = validation.wim_account_edit_schema
self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None,
"admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
+ self.topic.check_quota = Mock(return_value=None) # skip quota
def test_new_cvws(self):
test_url = "http://0.0.0.0:0"
"Wrong operation status enter time")
self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status info")
self.assertIsNone(operation["operationParams"], "Wrong operation parameters")
- with self.subTest(i=2):
- rollback = []
- test_type = "bad_type"
- with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e:
- self.topic.new(rollback, self.fake_session,
- {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
- self.assertEqual(len(rollback), 0, "Wrong rollback length")
- self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
- self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type, ""),
- norm(str(e.exception)), "Wrong exception text")
+ # This test is disabled. From Feature 8030 we admit all WIM/SDN types
+ # with self.subTest(i=2):
+ # rollback = []
+ # test_type = "bad_type"
+ # with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e:
+ # self.topic.new(rollback, self.fake_session,
+ # {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
+ # self.assertEqual(len(rollback), 0, "Wrong rollback length")
+ # self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
+ # self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type,""),
+ # norm(str(e.exception)), "Wrong exception text")
def test_conflict_on_new(self):
with self.subTest(i=1):
self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status info")
self.assertIsNone(operation["operationParams"], "Wrong operation parameters")
with self.subTest(i=2):
+ self.db.get_one.side_effect = [cvws]
with self.assertRaises(EngineException, msg="Accepted wrong property") as e:
self.topic.edit(self.fake_session, str(uuid4()), {"name": "new-name", "extra_prop": "anything"})
self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
ro_pid = str(uuid4())
rw_pid = str(uuid4())
cvws = {"_id": cid, "name": self.test_name}
+ self.db.get_list.return_value = []
with self.subTest(i=1):
cvws["_admin"] = {"projects_read": [test_pid, ro_pid, rw_pid], "projects_write": [test_pid, rw_pid]}
self.db.get_one.return_value = cvws
self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
self.assertEqual(self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic")
self.assertEqual(self.db.set_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
- self.assertEqual(self.db.set_one.call_args[1]["update_dict"]["_admin.projects_read"], [ro_pid, rw_pid],
+ self.assertEqual(self.db.set_one.call_args[1]["update_dict"], None,
"Wrong read-only projects update")
- self.assertEqual(self.db.set_one.call_args[1]["update_dict"]["_admin.projects_write"], [rw_pid],
+ self.assertEqual(self.db.set_one.call_args[1]["pull_list"],
+ {"_admin.projects_read": (test_pid,), "_admin.projects_write": (test_pid,)},
"Wrong read/write projects update")
- with self.subTest(i=3):
+ self.topic._send_msg.assert_not_called()
+ with self.subTest(i=2):
now = time()
cvws["_admin"] = {"projects_read": [test_pid], "projects_write": [test_pid], "operations": []}
self.db.get_one.return_value = cvws
self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
self.assertEqual(self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic")
self.assertEqual(self.db.set_one.call_args[0][1]["_id"], cid, "Wrong user identifier")
- update_dict = self.db.set_one.call_args[1]["update_dict"]
- self.assertEqual(update_dict["_admin.projects_read"], [], "Wrong read-only projects update")
- self.assertEqual(update_dict["_admin.projects_write"], [], "Wrong read/write projects update")
- self.assertEqual(update_dict["_admin.to_delete"], True, "Wrong deletion mark")
+ self.assertEqual(self.db.set_one.call_args[1]["update_dict"], {"_admin.to_delete": True},
+ "Wrong _admin.to_delete update")
operation = self.db.set_one.call_args[1]["push"]["_admin.operations"]
self.assertEqual(operation["lcmOperationType"], "delete", "Wrong operation type")
self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state")
self.assertIsNone(operation["operationParams"], "Wrong operation parameters")
self.assertGreater(operation["startTime"], now, "Wrong operation start time")
self.assertGreater(operation["statusEnteredTime"], now, "Wrong operation status enter time")
+ self.topic._send_msg.assert_called_once_with("delete", {"_id": cid, "op_id": cid + ":0"}, not_send_msg=None)
with self.subTest(i=3):
cvws["_admin"] = {"projects_read": [], "projects_write": [], "operations": []}
self.db.get_one.return_value = cvws
+ self.topic._send_msg.reset_mock()
+ self.db.get_one.reset_mock()
+ self.db.del_one.reset_mock()
self.fake_session["force"] = True # to force deletion
+ self.fake_session["admin"] = True # to force deletion
+ self.fake_session["project_id"] = [] # to force deletion
oid = self.topic.delete(self.fake_session, cid)
self.assertIsNone(oid, "Wrong operation identifier")
self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic")
self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
self.assertEqual(self.db.del_one.call_args[0][0], self.topic.topic, "Wrong topic")
self.assertEqual(self.db.del_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
+ self.topic._send_msg.assert_called_once_with("deleted", {"_id": cid, "op_id": None}, not_send_msg=None)
if __name__ == '__main__':