blob: 57551c7cfaa07f2f4d372f0356124d9b47614626 [file] [log] [blame]
#! /usr/bin/python3
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__author__ = "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com"
__date__ = "$2019-10-019"
import unittest
from unittest import TestCase
from unittest.mock import Mock, patch, call, ANY
from uuid import uuid4
from http import HTTPStatus
from time import time
from random import randint
from osm_common import dbbase, fsbase, msgbase
from osm_nbi import authconn, validation
from osm_nbi.admin_topics import (
ProjectTopicAuth,
RoleTopicAuth,
UserTopicAuth,
CommonVimWimSdn,
VcaTopic,
VimAccountTopic,
)
from osm_nbi.engine import EngineException
from osm_nbi.authconn import AuthconnNotFoundException
test_pid = str(uuid4())
test_name = "test-user"
def norm(str):
"""Normalize string for checking"""
return " ".join(str.strip().split()).lower()
class TestVcaTopic(TestCase):
def setUp(self):
self.db = Mock(dbbase.DbBase())
self.fs = Mock(fsbase.FsBase())
self.msg = Mock(msgbase.MsgBase())
self.auth = Mock(authconn.Authconn(None, None, None))
self.vca_topic = VcaTopic(self.db, self.fs, self.msg, self.auth)
@patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_new")
def test_format_on_new(self, mock_super_format_on_new):
content = {
"_id": "id",
"secret": "encrypted_secret",
"cacert": "encrypted_cacert",
}
self.db.encrypt.side_effect = ["secret", "cacert"]
mock_super_format_on_new.return_value = "1234"
oid = self.vca_topic.format_on_new(content)
self.assertEqual(oid, "1234")
self.assertEqual(content["secret"], "secret")
self.assertEqual(content["cacert"], "cacert")
self.db.encrypt.assert_has_calls(
[
call("encrypted_secret", schema_version="1.11", salt="id"),
call("encrypted_cacert", schema_version="1.11", salt="id"),
]
)
mock_super_format_on_new.assert_called_with(content, None, False)
@patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_edit")
def test_format_on_edit(self, mock_super_format_on_edit):
edit_content = {
"_id": "id",
"secret": "encrypted_secret",
"cacert": "encrypted_cacert",
}
final_content = {
"_id": "id",
"schema_version": "1.11",
}
self.db.encrypt.side_effect = ["secret", "cacert"]
mock_super_format_on_edit.return_value = "1234"
oid = self.vca_topic.format_on_edit(final_content, edit_content)
self.assertEqual(oid, "1234")
self.assertEqual(final_content["secret"], "secret")
self.assertEqual(final_content["cacert"], "cacert")
self.db.encrypt.assert_has_calls(
[
call("encrypted_secret", schema_version="1.11", salt="id"),
call("encrypted_cacert", schema_version="1.11", salt="id"),
]
)
mock_super_format_on_edit.assert_called()
@patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
def test_check_conflict_on_del(self, mock_check_conflict_on_del):
session = {
"project_id": "project-id",
"force": False,
}
_id = "vca-id"
db_content = {}
self.db.get_list.return_value = None
self.vca_topic.check_conflict_on_del(session, _id, db_content)
self.db.get_list.assert_called_with(
"vim_accounts",
{"vca": _id, "_admin.projects_read.cont": "project-id"},
)
mock_check_conflict_on_del.assert_called_with(session, _id, db_content)
@patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
def test_check_conflict_on_del_force(self, mock_check_conflict_on_del):
session = {
"project_id": "project-id",
"force": True,
}
_id = "vca-id"
db_content = {}
self.vca_topic.check_conflict_on_del(session, _id, db_content)
self.db.get_list.assert_not_called()
mock_check_conflict_on_del.assert_not_called()
@patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
def test_check_conflict_on_del_with_conflict(self, mock_check_conflict_on_del):
session = {
"project_id": "project-id",
"force": False,
}
_id = "vca-id"
db_content = {}
self.db.get_list.return_value = {"_id": "vim", "vca": "vca-id"}
with self.assertRaises(EngineException) as context:
self.vca_topic.check_conflict_on_del(session, _id, db_content)
self.assertEqual(
context.exception,
EngineException(
"There is at least one VIM account using this vca",
http_code=HTTPStatus.CONFLICT,
),
)
self.db.get_list.assert_called_with(
"vim_accounts",
{"vca": _id, "_admin.projects_read.cont": "project-id"},
)
mock_check_conflict_on_del.assert_not_called()
class Test_ProjectTopicAuth(TestCase):
@classmethod
def setUpClass(cls):
cls.test_name = "test-project-topic"
def setUp(self):
self.db = Mock(dbbase.DbBase())
self.fs = Mock(fsbase.FsBase())
self.msg = Mock(msgbase.MsgBase())
self.auth = Mock(authconn.Authconn(None, None, None))
self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth)
self.fake_session = {
"username": self.test_name,
"project_id": (test_pid,),
"method": None,
"admin": True,
"force": False,
"public": False,
"allow_show_user_project_role": True,
}
self.topic.check_quota = Mock(return_value=None) # skip quota
def test_new_project(self):
with self.subTest(i=1):
rollback = []
pid1 = str(uuid4())
self.auth.get_project_list.return_value = []
self.auth.create_project.return_value = pid1
pid2, oid = self.topic.new(
rollback, self.fake_session, {"name": self.test_name, "quotas": {}}
)
self.assertEqual(len(rollback), 1, "Wrong rollback length")
self.assertEqual(pid2, pid1, "Wrong project identifier")
content = self.auth.create_project.call_args[0][0]
self.assertEqual(content["name"], self.test_name, "Wrong project name")
self.assertEqual(content["quotas"], {}, "Wrong quotas")
self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
self.assertEqual(
content["_admin"]["modified"],
content["_admin"]["created"],
"Wrong modification time",
)
with self.subTest(i=2):
rollback = []
with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
self.topic.new(
rollback,
self.fake_session,
{"name": "other-project-name", "quotas": {"baditems": 10}},
)
self.assertEqual(len(rollback), 0, "Wrong rollback length")
self.assertEqual(
e.exception.http_code,
HTTPStatus.UNPROCESSABLE_ENTITY,
"Wrong HTTP status code",
)
self.assertIn(
"format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format(
"baditems"
),
norm(str(e.exception)),
"Wrong exception text",
)
def test_edit_project(self):
now = time()
pid = str(uuid4())
proj = {
"_id": pid,
"name": self.test_name,
"_admin": {"created": now, "modified": now},
}
with self.subTest(i=1):
self.auth.get_project_list.side_effect = [[proj], []]
new_name = "new-project-name"
quotas = {"vnfds": randint(0, 100), "nsds": randint(0, 100)}
self.topic.edit(
self.fake_session, pid, {"name": new_name, "quotas": quotas}
)
_id, content = self.auth.update_project.call_args[0]
self.assertEqual(_id, pid, "Wrong project identifier")
self.assertEqual(content["_id"], pid, "Wrong project identifier")
self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
self.assertGreater(
content["_admin"]["modified"], now, "Wrong modification time"
)
self.assertEqual(content["name"], new_name, "Wrong project name")
self.assertEqual(content["quotas"], quotas, "Wrong quotas")
with self.subTest(i=2):
new_name = "other-project-name"
quotas = {"baditems": randint(0, 100)}
self.auth.get_project_list.side_effect = [[proj], []]
with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
self.topic.edit(
self.fake_session, pid, {"name": new_name, "quotas": quotas}
)
self.assertEqual(
e.exception.http_code,
HTTPStatus.UNPROCESSABLE_ENTITY,
"Wrong HTTP status code",
)
self.assertIn(
"format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format(
"baditems"
),
norm(str(e.exception)),
"Wrong exception text",
)
def test_conflict_on_new(self):
with self.subTest(i=1):
rollback = []
pid = str(uuid4())
with self.assertRaises(
EngineException, msg="Accepted uuid as project name"
) as e:
self.topic.new(rollback, self.fake_session, {"name": pid})
self.assertEqual(len(rollback), 0, "Wrong rollback length")
self.assertEqual(
e.exception.http_code,
HTTPStatus.UNPROCESSABLE_ENTITY,
"Wrong HTTP status code",
)
self.assertIn(
"project name '{}' cannot have an uuid format".format(pid),
norm(str(e.exception)),
"Wrong exception text",
)
with self.subTest(i=2):
rollback = []
self.auth.get_project_list.return_value = [
{"_id": test_pid, "name": self.test_name}
]
with self.assertRaises(
EngineException, msg="Accepted existing project name"
) as e:
self.topic.new(rollback, self.fake_session, {"name": self.test_name})
self.assertEqual(len(rollback), 0, "Wrong rollback length")
self.assertEqual(
e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
)
self.assertIn(
"project '{}' exists".format(self.test_name),
norm(str(e.exception)),
"Wrong exception text",
)
def test_conflict_on_edit(self):
with self.subTest(i=1):
self.auth.get_project_list.return_value = [
{"_id": test_pid, "name": self.test_name}
]
new_name = str(uuid4())
with self.assertRaises(
EngineException, msg="Accepted uuid as project name"
) as e:
self.topic.edit(self.fake_session, test_pid, {"name": new_name})
self.assertEqual(
e.exception.http_code,
HTTPStatus.UNPROCESSABLE_ENTITY,
"Wrong HTTP status code",
)
self.assertIn(
"project name '{}' cannot have an uuid format".format(new_name),
norm(str(e.exception)),
"Wrong exception text",
)
with self.subTest(i=2):
pid = str(uuid4())
self.auth.get_project_list.return_value = [{"_id": pid, "name": "admin"}]
with self.assertRaises(
EngineException, msg="Accepted renaming of project 'admin'"
) as e:
self.topic.edit(self.fake_session, pid, {"name": "new-name"})
self.assertEqual(
e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
)
self.assertIn(
"you cannot rename project 'admin'",
norm(str(e.exception)),
"Wrong exception text",
)
with self.subTest(i=3):
new_name = "new-project-name"
self.auth.get_project_list.side_effect = [
[{"_id": test_pid, "name": self.test_name}],
[{"_id": str(uuid4()), "name": new_name}],
]
with self.assertRaises(
EngineException, msg="Accepted existing project name"
) as e:
self.topic.edit(self.fake_session, pid, {"name": new_name})
self.assertEqual(
e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
)
self.assertIn(
"project '{}' is already used".format(new_name),
norm(str(e.exception)),
"Wrong exception text",
)
def test_delete_project(self):
with self.subTest(i=1):
pid = str(uuid4())
self.auth.get_project.return_value = {
"_id": pid,
"name": "other-project-name",
}
self.auth.delete_project.return_value = {"deleted": 1}
self.auth.get_user_list.return_value = []
self.db.get_list.return_value = []
rc = self.topic.delete(self.fake_session, pid)
self.assertEqual(rc, {"deleted": 1}, "Wrong project deletion return info")
self.assertEqual(
self.auth.get_project.call_args[0][0], pid, "Wrong project identifier"
)
self.assertEqual(
self.auth.delete_project.call_args[0][0],
pid,
"Wrong project identifier",
)
def test_conflict_on_del(self):
with self.subTest(i=1):
self.auth.get_project.return_value = {
"_id": test_pid,
"name": self.test_name,
}
with self.assertRaises(
EngineException, msg="Accepted deletion of own project"
) as e:
self.topic.delete(self.fake_session, self.test_name)
self.assertEqual(
e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
)
self.assertIn(
"you cannot delete your own project",
norm(str(e.exception)),
"Wrong exception text",
)
with self.subTest(i=2):
self.auth.get_project.return_value = {"_id": str(uuid4()), "name": "admin"}
with self.assertRaises(
EngineException, msg="Accepted deletion of project 'admin'"
) as e:
self.topic.delete(self.fake_session, "admin")
self.assertEqual(
e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
)
self.assertIn(
"you cannot delete project 'admin'",
norm(str(e.exception)),
"Wrong exception text",
)
with self.subTest(i=3):
pid = str(uuid4())
name = "other-project-name"
self.auth.get_project.return_value = {"_id": pid, "name": name}
self.auth.get_user_list.return_value = [
{
"_id": str(uuid4()),
"username": self.test_name,
"project_role_mappings": [{"project": pid, "role": str(uuid4())}],
}
]
with self.assertRaises(
EngineException, msg="Accepted deletion of used project"
) as e:
self.topic.delete(self.fake_session, pid)
self.assertEqual(
e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
)
self.assertIn(
"project '{}' ({}) is being used by user '{}'".format(
name, pid, self.test_name
),
norm(str(e.exception)),
"Wrong exception text",
)
with self.subTest(i=4):
self.auth.get_user_list.return_value = []
self.db.get_list.return_value = [
{
"_id": str(uuid4()),
"id": self.test_name,
"_admin": {"projects_read": [pid], "projects_write": []},
}
]
with self.assertRaises(
EngineException, msg="Accepted deletion of used project"
) as e:
self.topic.delete(self.fake_session, pid)
self.assertEqual(
e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
)
self.assertIn(
"project '{}' ({}) is being used by {} '{}'".format(
name, pid, "vnf descriptor", self.test_name
),
norm(str(e.exception)),
"Wrong exception text",
)
class Test_RoleTopicAuth(TestCase):
@classmethod
def setUpClass(cls):
cls.test_name = "test-role-topic"
cls.test_operations = ["tokens:get"]
def setUp(self):
self.db = Mock(dbbase.DbBase())
self.fs = Mock(fsbase.FsBase())
self.msg = Mock(msgbase.MsgBase())
self.auth = Mock(authconn.Authconn(None, None, None))
self.auth.role_permissions = self.test_operations
self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth)
self.fake_session = {
"username": test_name,
"project_id": (test_pid,),
"method": None,
"admin": True,
"force": False,
"public": False,
"allow_show_user_project_role": True,
}
self.topic.check_quota = Mock(return_value=None) # skip quota
def test_new_role(self):
with self.subTest(i=1):
rollback = []
rid1 = str(uuid4())
perms_in = {"tokens": True}
perms_out = {"default": False, "admin": False, "tokens": True}
self.auth.get_role_list.return_value = []
self.auth.create_role.return_value = rid1
rid2, oid = self.topic.new(
rollback,
self.fake_session,
{"name": self.test_name, "permissions": perms_in},
)
self.assertEqual(len(rollback), 1, "Wrong rollback length")
self.assertEqual(rid2, rid1, "Wrong project identifier")
content = self.auth.create_role.call_args[0][0]
self.assertEqual(content["name"], self.test_name, "Wrong role name")
self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
self.assertEqual(
content["_admin"]["modified"],
content["_admin"]["created"],
"Wrong modification time",
)
with self.subTest(i=2):
rollback = []
with self.assertRaises(
EngineException, msg="Accepted wrong permissions"
) as e:
self.topic.new(
rollback,
self.fake_session,
{"name": "other-role-name", "permissions": {"projects": True}},
)
self.assertEqual(len(rollback), 0, "Wrong rollback length")
self.assertEqual(
e.exception.http_code,
HTTPStatus.UNPROCESSABLE_ENTITY,
"Wrong HTTP status code",
)
self.assertIn(
"invalid permission '{}'".format("projects"),
norm(str(e.exception)),
"Wrong exception text",
)
def test_edit_role(self):
now = time()
rid = str(uuid4())
role = {
"_id": rid,
"name": self.test_name,
"permissions": {"tokens": True},
"_admin": {"created": now, "modified": now},
}
with self.subTest(i=1):
self.auth.get_role_list.side_effect = [[role], []]
self.auth.get_role.return_value = role
new_name = "new-role-name"
perms_in = {"tokens": False, "tokens:get": True}
perms_out = {
"default": False,
"admin": False,
"tokens": False,
"tokens:get": True,
}
self.topic.edit(
self.fake_session, rid, {"name": new_name, "permissions": perms_in}
)
content = self.auth.update_role.call_args[0][0]
self.assertEqual(content["_id"], rid, "Wrong role identifier")
self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
self.assertGreater(
content["_admin"]["modified"], now, "Wrong modification time"
)
self.assertEqual(content["name"], new_name, "Wrong role name")
self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
with self.subTest(i=2):
new_name = "other-role-name"
perms_in = {"tokens": False, "tokens:post": True}
self.auth.get_role_list.side_effect = [[role], []]
with self.assertRaises(
EngineException, msg="Accepted wrong permissions"
) as e:
self.topic.edit(
self.fake_session, rid, {"name": new_name, "permissions": perms_in}
)
self.assertEqual(
e.exception.http_code,
HTTPStatus.UNPROCESSABLE_ENTITY,
"Wrong HTTP status code",
)
self.assertIn(
"invalid permission '{}'".format("tokens:post"),
norm(str(e.exception)),
"Wrong exception text",
)
def test_delete_role(self):
with self.subTest(i=1):
rid = str(uuid4())
role = {"_id": rid, "name": "other-role-name"}
self.auth.get_role_list.return_value = [role]
self.auth.get_role.return_value = role
self.auth.delete_role.return_value = {"deleted": 1}
self.auth.get_user_list.return_value = []
rc = self.topic.delete(self.fake_session, rid)
self.assertEqual(rc, {"deleted": 1}, "Wrong role deletion return info")
self.assertEqual(
self.auth.get_role_list.call_args[0][0]["_id"],
rid,
"Wrong role identifier",
)
self.assertEqual(
self.auth.get_role.call_args[0][0], rid, "Wrong role identifier"
)
self.assertEqual(
self.auth.delete_role.call_args[0][0], rid, "Wrong role identifier"
)
def test_conflict_on_new(self):
with self.subTest(i=1):
rollback = []
rid = str(uuid4())
with self.assertRaises(
EngineException, msg="Accepted uuid as role name"
) as e:
self.topic.new(rollback, self.fake_session, {"name": rid})
self.assertEqual(len(rollback), 0, "Wrong rollback length")
self.assertEqual(
e.exception.http_code,
HTTPStatus.UNPROCESSABLE_ENTITY,
"Wrong HTTP status code",
)
self.assertIn(
"role name '{}' cannot have an uuid format".format(rid),
norm(str(e.exception)),
"Wrong exception text",
)
with self.subTest(i=2):
rollback = []
self.auth.get_role_list.return_value = [
{"_id": str(uuid4()), "name": self.test_name}
]
with self.assertRaises(
EngineException, msg="Accepted existing role name"
) as e:
self.topic.new(rollback, self.fake_session, {"name": self.test_name})
self.assertEqual(len(rollback), 0, "Wrong rollback length")
self.assertEqual(
e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
)
self.assertIn(
"role name '{}' exists".format(self.test_name),
norm(str(e.exception)),
"Wrong exception text",
)
def test_conflict_on_edit(self):
rid = str(uuid4())
with self.subTest(i=1):
self.auth.get_role_list.return_value = [
{"_id": rid, "name": self.test_name, "permissions": {}}
]
new_name = str(uuid4())
with self.assertRaises(
EngineException, msg="Accepted uuid as role name"
) as e:
self.topic.edit(self.fake_session, rid, {"name": new_name})
self.assertEqual(
e.exception.http_code,
HTTPStatus.UNPROCESSABLE_ENTITY,
"Wrong HTTP status code",
)
self.assertIn(
"role name '{}' cannot have an uuid format".format(new_name),
norm(str(e.exception)),
"Wrong exception text",
)
for i, role_name in enumerate(["system_admin", "project_admin"], start=2):
with self.subTest(i=i):
rid = str(uuid4())
self.auth.get_role.return_value = {
"_id": rid,
"name": role_name,
"permissions": {},
}
with self.assertRaises(
EngineException,
msg="Accepted renaming of role '{}'".format(role_name),
) as e:
self.topic.edit(self.fake_session, rid, {"name": "new-name"})
self.assertEqual(
e.exception.http_code,
HTTPStatus.FORBIDDEN,
"Wrong HTTP status code",
)
self.assertIn(
"you cannot rename role '{}'".format(role_name),
norm(str(e.exception)),
"Wrong exception text",
)
with self.subTest(i=i + 1):
new_name = "new-role-name"
self.auth.get_role_list.side_effect = [
[{"_id": rid, "name": self.test_name, "permissions": {}}],
[{"_id": str(uuid4()), "name": new_name, "permissions": {}}],
]
self.auth.get_role.return_value = {
"_id": rid,
"name": self.test_name,
"permissions": {},
}
with self.assertRaises(
EngineException, msg="Accepted existing role name"
) as e:
self.topic.edit(self.fake_session, rid, {"name": new_name})
self.assertEqual(
e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
)
self.assertIn(
"role name '{}' exists".format(new_name),
norm(str(e.exception)),
"Wrong exception text",
)
def test_conflict_on_del(self):
for i, role_name in enumerate(["system_admin", "project_admin"], start=1):
with self.subTest(i=i):
rid = str(uuid4())
role = {"_id": rid, "name": role_name}
self.auth.get_role_list.return_value = [role]
self.auth.get_role.return_value = role
with self.assertRaises(
EngineException,
msg="Accepted deletion of role '{}'".format(role_name),
) as e:
self.topic.delete(self.fake_session, rid)
self.assertEqual(
e.exception.http_code,
HTTPStatus.FORBIDDEN,
"Wrong HTTP status code",
)
self.assertIn(
"you cannot delete role '{}'".format(role_name),
norm(str(e.exception)),
"Wrong exception text",
)
with self.subTest(i=i + 1):
rid = str(uuid4())
name = "other-role-name"
role = {"_id": rid, "name": name}
self.auth.get_role_list.return_value = [role]
self.auth.get_role.return_value = role
self.auth.get_user_list.return_value = [
{
"_id": str(uuid4()),
"username": self.test_name,
"project_role_mappings": [{"project": str(uuid4()), "role": rid}],
}
]
with self.assertRaises(
EngineException, msg="Accepted deletion of used role"
) as e:
self.topic.delete(self.fake_session, rid)
self.assertEqual(
e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
)
self.assertIn(
"role '{}' ({}) is being used by user '{}'".format(
name, rid, self.test_name
),
norm(str(e.exception)),
"Wrong exception text",
)
class Test_UserTopicAuth(TestCase):
@classmethod
def setUpClass(cls):
cls.test_name = "test-user-topic"
def setUp(self):
self.db = Mock(dbbase.DbBase())
self.fs = Mock(fsbase.FsBase())
self.msg = Mock(msgbase.MsgBase())
self.auth = Mock(authconn.Authconn(None, None, None))
self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth)
self.fake_session = {
"username": test_name,
"project_id": (test_pid,),
"method": None,
"admin": True,
"force": False,
"public": False,
"allow_show_user_project_role": True,
}
self.topic.check_quota = Mock(return_value=None) # skip quota
def test_new_user(self):
uid1 = str(uuid4())
pid = str(uuid4())
self.auth.get_user_list.return_value = []
self.auth.get_project.return_value = {"_id": pid, "name": "some_project"}
self.auth.create_user.return_value = {"_id": uid1, "username": self.test_name}
with self.subTest(i=1):
rollback = []
rid = str(uuid4())
self.auth.get_role.return_value = {"_id": rid, "name": "some_role"}
prms_in = [{"project": "some_project", "role": "some_role"}]
prms_out = [{"project": pid, "role": rid}]
uid2, oid = self.topic.new(
rollback,
self.fake_session,
{
"username": self.test_name,
"password": self.test_name,
"project_role_mappings": prms_in,
},
)
self.assertEqual(len(rollback), 1, "Wrong rollback length")
self.assertEqual(uid2, uid1, "Wrong project identifier")
content = self.auth.create_user.call_args[0][0]
self.assertEqual(content["username"], self.test_name, "Wrong project name")
self.assertEqual(content["password"], self.test_name, "Wrong password")
self.assertEqual(
content["project_role_mappings"],
prms_out,
"Wrong project-role mappings",
)
self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
self.assertEqual(
content["_admin"]["modified"],
content["_admin"]["created"],
"Wrong modification time",
)
with self.subTest(i=2):
rollback = []
def_rid = str(uuid4())
def_role = {"_id": def_rid, "name": "project_admin"}
self.auth.get_role.return_value = def_role
self.auth.get_role_list.return_value = [def_role]
prms_out = [{"project": pid, "role": def_rid}]
uid2, oid = self.topic.new(
rollback,
self.fake_session,
{
"username": self.test_name,
"password": self.test_name,
"projects": ["some_project"],
},
)
self.assertEqual(len(rollback), 1, "Wrong rollback length")
self.assertEqual(uid2, uid1, "Wrong project identifier")
content = self.auth.create_user.call_args[0][0]
self.assertEqual(content["username"], self.test_name, "Wrong project name")
self.assertEqual(content["password"], self.test_name, "Wrong password")
self.assertEqual(
content["project_role_mappings"],
prms_out,
"Wrong project-role mappings",
)
self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
self.assertEqual(
content["_admin"]["modified"],
content["_admin"]["created"],
"Wrong modification time",
)
with self.subTest(i=3):
rollback = []
with self.assertRaises(
EngineException, msg="Accepted wrong project-role mappings"
) as e:
self.topic.new(
rollback,
self.fake_session,
{
"username": "other-project-name",
"password": "other-password",
"project_role_mappings": [{}],
},
)
self.assertEqual(len(rollback), 0, "Wrong rollback length")
self.assertEqual(
e.exception.http_code,
HTTPStatus.UNPROCESSABLE_ENTITY,
"Wrong HTTP status code",
)
self.assertIn(
"format error at '{}' '{}'".format(
"project_role_mappings:{}", "'{}' is a required property"
).format(0, "project"),
norm(str(e.exception)),
"Wrong exception text",
)
with self.subTest(i=4):
rollback = []
with self.assertRaises(EngineException, msg="Accepted wrong projects") as e:
self.topic.new(
rollback,
self.fake_session,
{
"username": "other-project-name",
"password": "other-password",
"projects": [],
},
)
self.assertEqual(len(rollback), 0, "Wrong rollback length")
self.assertEqual(
e.exception.http_code,
HTTPStatus.UNPROCESSABLE_ENTITY,
"Wrong HTTP status code",
)
self.assertIn(
"format error at '{}' '{}'".format(
"projects", "{} is too short"
).format([]),
norm(str(e.exception)),
"Wrong exception text",
)
def test_edit_user(self):
now = time()
uid = str(uuid4())
pid1 = str(uuid4())
rid1 = str(uuid4())
prms = [
{
"project": pid1,
"project_name": "project-1",
"role": rid1,
"role_name": "role-1",
}
]
user = {
"_id": uid,
"username": self.test_name,
"project_role_mappings": prms,
"_admin": {"created": now, "modified": now},
}
with self.subTest(i=1):
self.auth.get_user_list.side_effect = [[user], []]
self.auth.get_user.return_value = user
pid2 = str(uuid4())
rid2 = str(uuid4())
self.auth.get_project.side_effect = [
{"_id": pid2, "name": "project-2"},
{"_id": pid1, "name": "project-1"},
]
self.auth.get_role.side_effect = [
{"_id": rid2, "name": "role-2"},
{"_id": rid1, "name": "role-1"},
]
new_name = "new-user-name"
new_pasw = "new-password"
add_prms = [{"project": pid2, "role": rid2}]
rem_prms = [{"project": pid1, "role": rid1}]
self.topic.edit(
self.fake_session,
uid,
{
"username": new_name,
"password": new_pasw,
"add_project_role_mappings": add_prms,
"remove_project_role_mappings": rem_prms,
},
)
content = self.auth.update_user.call_args[0][0]
self.assertEqual(content["_id"], uid, "Wrong user identifier")
self.assertEqual(content["username"], new_name, "Wrong user name")
self.assertEqual(content["password"], new_pasw, "Wrong user password")
self.assertEqual(
content["add_project_role_mappings"],
add_prms,
"Wrong project-role mappings to add",
)
self.assertEqual(
content["remove_project_role_mappings"],
prms,
"Wrong project-role mappings to remove",
)
with self.subTest(i=2):
new_name = "other-user-name"
new_prms = [{}]
self.auth.get_role_list.side_effect = [[user], []]
self.auth.get_user_list.side_effect = [[user]]
with self.assertRaises(
EngineException, msg="Accepted wrong project-role mappings"
) as e:
self.topic.edit(
self.fake_session,
uid,
{"username": new_name, "project_role_mappings": new_prms},
)
self.assertEqual(
e.exception.http_code,
HTTPStatus.UNPROCESSABLE_ENTITY,
"Wrong HTTP status code",
)
self.assertIn(
"format error at '{}' '{}'".format(
"project_role_mappings:{}", "'{}' is a required property"
).format(0, "project"),
norm(str(e.exception)),
"Wrong exception text",
)
with self.subTest(i=3):
self.auth.get_user_list.side_effect = [[user], []]
self.auth.get_user.return_value = user
old_password = self.test_name
new_pasw = "new-password"
self.topic.edit(
self.fake_session,
uid,
{
"old_password": old_password,
"password": new_pasw,
},
)
content = self.auth.update_user.call_args[0][0]
self.assertEqual(
content["old_password"], old_password, "Wrong old password"
)
self.assertEqual(content["password"], new_pasw, "Wrong user password")
def test_delete_user(self):
with self.subTest(i=1):
uid = str(uuid4())
self.fake_session["username"] = self.test_name
user = user = {
"_id": uid,
"username": "other-user-name",
"project_role_mappings": [],
}
self.auth.get_user.return_value = user
self.auth.delete_user.return_value = {"deleted": 1}
rc = self.topic.delete(self.fake_session, uid)
self.assertEqual(rc, {"deleted": 1}, "Wrong user deletion return info")
self.assertEqual(
self.auth.get_user.call_args[0][0], uid, "Wrong user identifier"
)
self.assertEqual(
self.auth.delete_user.call_args[0][0], uid, "Wrong user identifier"
)
def test_conflict_on_new(self):
with self.subTest(i=1):
rollback = []
uid = str(uuid4())
with self.assertRaises(
EngineException, msg="Accepted uuid as username"
) as e:
self.topic.new(
rollback,
self.fake_session,
{
"username": uid,
"password": self.test_name,
"projects": [test_pid],
},
)
self.assertEqual(len(rollback), 0, "Wrong rollback length")
self.assertEqual(
e.exception.http_code,
HTTPStatus.UNPROCESSABLE_ENTITY,
"Wrong HTTP status code",
)
self.assertIn(
"username '{}' cannot have a uuid format".format(uid),
norm(str(e.exception)),
"Wrong exception text",
)
with self.subTest(i=2):
rollback = []
self.auth.get_user_list.return_value = [
{"_id": str(uuid4()), "username": self.test_name}
]
with self.assertRaises(
EngineException, msg="Accepted existing username"
) as e:
self.topic.new(
rollback,
self.fake_session,
{
"username": self.test_name,
"password": self.test_name,
"projects": [test_pid],
},
)
self.assertEqual(len(rollback), 0, "Wrong rollback length")
self.assertEqual(
e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
)
self.assertIn(
"username '{}' is already used".format(self.test_name),
norm(str(e.exception)),
"Wrong exception text",
)
with self.subTest(i=3):
rollback = []
self.auth.get_user_list.return_value = []
self.auth.get_role_list.side_effect = [[], []]
with self.assertRaises(
AuthconnNotFoundException, msg="Accepted user without default role"
) as e:
self.topic.new(
rollback,
self.fake_session,
{
"username": self.test_name,
"password": self.test_name,
"projects": [str(uuid4())],
},
)
self.assertEqual(len(rollback), 0, "Wrong rollback length")
self.assertEqual(
e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code"
)
self.assertIn(
"can't find default role for user '{}'".format(self.test_name),
norm(str(e.exception)),
"Wrong exception text",
)
def test_conflict_on_edit(self):
uid = str(uuid4())
with self.subTest(i=1):
self.auth.get_user_list.return_value = [
{"_id": uid, "username": self.test_name}
]
new_name = str(uuid4())
with self.assertRaises(
EngineException, msg="Accepted uuid as username"
) as e:
self.topic.edit(self.fake_session, uid, {"username": new_name})
self.assertEqual(
e.exception.http_code,
HTTPStatus.UNPROCESSABLE_ENTITY,
"Wrong HTTP status code",
)
self.assertIn(
"username '{}' cannot have an uuid format".format(new_name),
norm(str(e.exception)),
"Wrong exception text",
)
with self.subTest(i=2):
self.auth.get_user_list.return_value = [
{"_id": uid, "username": self.test_name}
]
self.auth.get_role_list.side_effect = [[], []]
with self.assertRaises(
AuthconnNotFoundException, msg="Accepted user without default role"
) as e:
self.topic.edit(self.fake_session, uid, {"projects": [str(uuid4())]})
self.assertEqual(
e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code"
)
self.assertIn(
"can't find a default role for user '{}'".format(self.test_name),
norm(str(e.exception)),
"Wrong exception text",
)
with self.subTest(i=3):
admin_uid = str(uuid4())
self.auth.get_user_list.return_value = [
{"_id": admin_uid, "username": "admin"}
]
with self.assertRaises(
EngineException,
msg="Accepted removing system_admin role from admin user",
) as e:
self.topic.edit(
self.fake_session,
admin_uid,
{
"remove_project_role_mappings": [
{"project": "admin", "role": "system_admin"}
]
},
)
self.assertEqual(
e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code"
)
self.assertIn(
"you cannot remove system_admin role from admin user",
norm(str(e.exception)),
"Wrong exception text",
)
with self.subTest(i=4):
new_name = "new-user-name"
self.auth.get_user_list.side_effect = [
[{"_id": uid, "name": self.test_name}],
[{"_id": str(uuid4()), "name": new_name}],
]
with self.assertRaises(
EngineException, msg="Accepted existing username"
) as e:
self.topic.edit(self.fake_session, uid, {"username": new_name})
self.assertEqual(
e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
)
self.assertIn(
"username '{}' is already used".format(new_name),
norm(str(e.exception)),
"Wrong exception text",
)
def test_conflict_on_del(self):
with self.subTest(i=1):
uid = str(uuid4())
self.fake_session["username"] = self.test_name
user = user = {
"_id": uid,
"username": self.test_name,
"project_role_mappings": [],
}
self.auth.get_user.return_value = user
with self.assertRaises(
EngineException, msg="Accepted deletion of own user"
) as e:
self.topic.delete(self.fake_session, uid)
self.assertEqual(
e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
)
self.assertIn(
"you cannot delete your own login user",
norm(str(e.exception)),
"Wrong exception text",
)
class Test_CommonVimWimSdn(TestCase):
@classmethod
def setUpClass(cls):
cls.test_name = "test-cim-topic" # CIM = Common Infrastructure Manager
def setUp(self):
self.db = Mock(dbbase.DbBase())
self.fs = Mock(fsbase.FsBase())
self.msg = Mock(msgbase.MsgBase())
self.auth = Mock(authconn.Authconn(None, None, None))
self.topic = CommonVimWimSdn(self.db, self.fs, self.msg, self.auth)
# Use WIM schemas for testing because they are the simplest
self.topic._send_msg = Mock()
self.topic.topic = "wims"
self.topic.schema_new = validation.wim_account_new_schema
self.topic.schema_edit = validation.wim_account_edit_schema
self.fake_session = {
"username": test_name,
"project_id": (test_pid,),
"method": None,
"admin": True,
"force": False,
"public": False,
"allow_show_user_project_role": True,
}
self.topic.check_quota = Mock(return_value=None) # skip quota
def test_new_cvws(self):
test_url = "http://0.0.0.0:0"
with self.subTest(i=1):
rollback = []
test_type = "fake"
self.db.get_one.return_value = None
self.db.create.side_effect = lambda self, content: content["_id"]
cid, oid = self.topic.new(
rollback,
self.fake_session,
{"name": self.test_name, "wim_url": test_url, "wim_type": test_type},
)
self.assertEqual(len(rollback), 1, "Wrong rollback length")
args = self.db.create.call_args[0]
content = args[1]
self.assertEqual(args[0], self.topic.topic, "Wrong topic")
self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
self.assertEqual(content["name"], self.test_name, "Wrong CIM name")
self.assertEqual(content["wim_url"], test_url, "Wrong URL")
self.assertEqual(content["wim_type"], test_type, "Wrong CIM type")
self.assertEqual(content["schema_version"], "1.11", "Wrong schema version")
self.assertEqual(content["op_id"], oid, "Wrong operation identifier")
self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
self.assertEqual(
content["_admin"]["modified"],
content["_admin"]["created"],
"Wrong modification time",
)
self.assertEqual(
content["_admin"]["operationalState"],
"PROCESSING",
"Wrong operational state",
)
self.assertEqual(
content["_admin"]["projects_read"],
[test_pid],
"Wrong read-only projects",
)
self.assertEqual(
content["_admin"]["projects_write"],
[test_pid],
"Wrong read/write projects",
)
self.assertIsNone(
content["_admin"]["current_operation"], "Wrong current operation"
)
self.assertEqual(
len(content["_admin"]["operations"]), 1, "Wrong number of operations"
)
operation = content["_admin"]["operations"][0]
self.assertEqual(
operation["lcmOperationType"], "create", "Wrong operation type"
)
self.assertEqual(
operation["operationState"], "PROCESSING", "Wrong operation state"
)
self.assertGreater(
operation["startTime"],
content["_admin"]["created"],
"Wrong operation start time",
)
self.assertGreater(
operation["statusEnteredTime"],
content["_admin"]["created"],
"Wrong operation status enter time",
)
self.assertEqual(
operation["detailed-status"], "", "Wrong operation detailed status info"
)
self.assertIsNone(
operation["operationParams"], "Wrong operation parameters"
)
# This test is disabled. From Feature 8030 we admit all WIM/SDN types
# with self.subTest(i=2):
# rollback = []
# test_type = "bad_type"
# with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e:
# self.topic.new(rollback, self.fake_session,
# {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
# self.assertEqual(len(rollback), 0, "Wrong rollback length")
# self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
# self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type,""),
# norm(str(e.exception)), "Wrong exception text")
def test_conflict_on_new(self):
with self.subTest(i=1):
rollback = []
test_url = "http://0.0.0.0:0"
test_type = "fake"
self.db.get_one.return_value = {"_id": str(uuid4()), "name": self.test_name}
with self.assertRaises(
EngineException, msg="Accepted existing CIM name"
) as e:
self.topic.new(
rollback,
self.fake_session,
{
"name": self.test_name,
"wim_url": test_url,
"wim_type": test_type,
},
)
self.assertEqual(len(rollback), 0, "Wrong rollback length")
self.assertEqual(
e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
)
self.assertIn(
"name '{}' already exists for {}".format(
self.test_name, self.topic.topic
),
norm(str(e.exception)),
"Wrong exception text",
)
def test_edit_cvws(self):
now = time()
cid = str(uuid4())
test_url = "http://0.0.0.0:0"
test_type = "fake"
cvws = {
"_id": cid,
"name": self.test_name,
"wim_url": test_url,
"wim_type": test_type,
"_admin": {
"created": now,
"modified": now,
"operations": [{"lcmOperationType": "create"}],
},
}
with self.subTest(i=1):
new_name = "new-cim-name"
new_url = "https://1.1.1.1:1"
new_type = "onos"
self.db.get_one.side_effect = [cvws, None]
self.db.replace.return_value = {"updated": 1}
# self.db.encrypt.side_effect = [b64str(), b64str()]
self.topic.edit(
self.fake_session,
cid,
{"name": new_name, "wim_url": new_url, "wim_type": new_type},
)
args = self.db.replace.call_args[0]
content = args[2]
self.assertEqual(args[0], self.topic.topic, "Wrong topic")
self.assertEqual(args[1], cid, "Wrong CIM identifier")
self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
self.assertEqual(content["name"], new_name, "Wrong CIM name")
self.assertEqual(content["wim_type"], new_type, "Wrong CIM type")
self.assertEqual(content["wim_url"], new_url, "Wrong URL")
self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
self.assertGreater(
content["_admin"]["modified"],
content["_admin"]["created"],
"Wrong modification time",
)
self.assertEqual(
len(content["_admin"]["operations"]), 2, "Wrong number of operations"
)
operation = content["_admin"]["operations"][1]
self.assertEqual(
operation["lcmOperationType"], "edit", "Wrong operation type"
)
self.assertEqual(
operation["operationState"], "PROCESSING", "Wrong operation state"
)
self.assertGreater(
operation["startTime"],
content["_admin"]["modified"],
"Wrong operation start time",
)
self.assertGreater(
operation["statusEnteredTime"],
content["_admin"]["modified"],
"Wrong operation status enter time",
)
self.assertEqual(
operation["detailed-status"], "", "Wrong operation detailed status info"
)
self.assertIsNone(
operation["operationParams"], "Wrong operation parameters"
)
with self.subTest(i=2):
self.db.get_one.side_effect = [cvws]
with self.assertRaises(EngineException, msg="Accepted wrong property") as e:
self.topic.edit(
self.fake_session,
str(uuid4()),
{"name": "new-name", "extra_prop": "anything"},
)
self.assertEqual(
e.exception.http_code,
HTTPStatus.UNPROCESSABLE_ENTITY,
"Wrong HTTP status code",
)
self.assertIn(
"format error '{}'".format(
"additional properties are not allowed ('{}' was unexpected)"
).format("extra_prop"),
norm(str(e.exception)),
"Wrong exception text",
)
def test_conflict_on_edit(self):
with self.subTest(i=1):
cid = str(uuid4())
new_name = "new-cim-name"
self.db.get_one.side_effect = [
{"_id": cid, "name": self.test_name},
{"_id": str(uuid4()), "name": new_name},
]
with self.assertRaises(
EngineException, msg="Accepted existing CIM name"
) as e:
self.topic.edit(self.fake_session, cid, {"name": new_name})
self.assertEqual(
e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
)
self.assertIn(
"name '{}' already exists for {}".format(new_name, self.topic.topic),
norm(str(e.exception)),
"Wrong exception text",
)
def test_delete_cvws(self):
cid = str(uuid4())
ro_pid = str(uuid4())
rw_pid = str(uuid4())
cvws = {"_id": cid, "name": self.test_name}
self.db.get_list.return_value = []
with self.subTest(i=1):
cvws["_admin"] = {
"projects_read": [test_pid, ro_pid, rw_pid],
"projects_write": [test_pid, rw_pid],
}
self.db.get_one.return_value = cvws
oid = self.topic.delete(self.fake_session, cid)
self.assertIsNone(oid, "Wrong operation identifier")
self.assertEqual(
self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
)
self.assertEqual(
self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
)
self.assertEqual(
self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic"
)
self.assertEqual(
self.db.set_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
)
self.assertEqual(
self.db.set_one.call_args[1]["update_dict"],
None,
"Wrong read-only projects update",
)
self.assertEqual(
self.db.set_one.call_args[1]["pull_list"],
{
"_admin.projects_read": (test_pid,),
"_admin.projects_write": (test_pid,),
},
"Wrong read/write projects update",
)
self.topic._send_msg.assert_not_called()
with self.subTest(i=2):
now = time()
cvws["_admin"] = {
"projects_read": [test_pid],
"projects_write": [test_pid],
"operations": [],
}
self.db.get_one.return_value = cvws
oid = self.topic.delete(self.fake_session, cid)
self.assertEqual(oid, cid + ":0", "Wrong operation identifier")
self.assertEqual(
self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
)
self.assertEqual(
self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
)
self.assertEqual(
self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic"
)
self.assertEqual(
self.db.set_one.call_args[0][1]["_id"], cid, "Wrong user identifier"
)
self.assertEqual(
self.db.set_one.call_args[1]["update_dict"],
{"_admin.to_delete": True},
"Wrong _admin.to_delete update",
)
operation = self.db.set_one.call_args[1]["push"]["_admin.operations"]
self.assertEqual(
operation["lcmOperationType"], "delete", "Wrong operation type"
)
self.assertEqual(
operation["operationState"], "PROCESSING", "Wrong operation state"
)
self.assertEqual(
operation["detailed-status"], "", "Wrong operation detailed status"
)
self.assertIsNone(
operation["operationParams"], "Wrong operation parameters"
)
self.assertGreater(
operation["startTime"], now, "Wrong operation start time"
)
self.assertGreater(
operation["statusEnteredTime"], now, "Wrong operation status enter time"
)
self.topic._send_msg.assert_called_once_with(
"delete", {"_id": cid, "op_id": cid + ":0"}, not_send_msg=None
)
with self.subTest(i=3):
cvws["_admin"] = {
"projects_read": [],
"projects_write": [],
"operations": [],
}
self.db.get_one.return_value = cvws
self.topic._send_msg.reset_mock()
self.db.get_one.reset_mock()
self.db.del_one.reset_mock()
self.fake_session["force"] = True # to force deletion
self.fake_session["admin"] = True # to force deletion
self.fake_session["project_id"] = [] # to force deletion
oid = self.topic.delete(self.fake_session, cid)
self.assertIsNone(oid, "Wrong operation identifier")
self.assertEqual(
self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
)
self.assertEqual(
self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
)
self.assertEqual(
self.db.del_one.call_args[0][0], self.topic.topic, "Wrong topic"
)
self.assertEqual(
self.db.del_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
)
self.topic._send_msg.assert_called_once_with(
"deleted", {"_id": cid, "op_id": None}, not_send_msg=None
)
@patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_new")
class TestVimAccountTopic(TestCase):
def setUp(self):
self.db = Mock(dbbase.DbBase())
self.fs = Mock(fsbase.FsBase())
self.msg = Mock(msgbase.MsgBase())
self.auth = Mock(authconn.Authconn(None, None, None))
self.topic = VimAccountTopic(self.db, self.fs, self.msg, self.auth)
self.topic.check_quota = Mock(return_value=None) # skip quota
self.fake_session = {
"username": test_name,
"project_id": (test_pid,),
"method": None,
"admin": True,
"force": False,
"public": False,
"allow_show_user_project_role": True,
}
def check_invalid_indata_raises_exception(self, indata, mock_common_vim_wim_sdn):
with self.assertRaises(EngineException) as error:
self.topic.check_conflict_on_new(self.fake_session, indata)
mock_common_vim_wim_sdn.assert_called_with(self.fake_session, indata)
error_msg = "Invalid config for VIM account '{}'.".format(indata["name"])
self.assertEqual(str(error.exception), error_msg)
def test_check_conflict_on_new_vim_type_paas(self, mock_common_vim_wim_sdn):
valid_juju_paas_config = {
"paas_provider": "juju",
"ca_cert_content": "file_content",
"cloud": "microk8s",
"cloud_credentials": "cloud_credential_name",
"authorized_keys": "keys",
}
indata = {
"name": "juju_paas",
"vim_type": "paas",
"description": None,
"vim_url": "http://0.0.0.0:0",
"vim_user": "some_user",
"vim_password": "some_password",
"vim_tenant_name": "null",
"config": valid_juju_paas_config,
}
self.topic.check_conflict_on_new(self.fake_session, indata)
mock_common_vim_wim_sdn.assert_called_once_with(self.fake_session, indata)
def test_check_conflict_on_new_vim_type_paas_config_missing(
self, mock_common_vim_wim_sdn
):
indata = {
"name": "juju_paas",
"vim_type": "paas",
"description": None,
"vim_url": "http://0.0.0.0:0",
"vim_user": "some_user",
"vim_password": "some_password",
"vim_tenant_name": "null",
}
self.check_invalid_indata_raises_exception(indata, mock_common_vim_wim_sdn)
def test_check_conflict_on_new_vim_type_paas_invalid_config(
self, mock_common_vim_wim_sdn
):
invalid_configs = [
{
"paas_provider": "some_paas_provider",
"ca_cert_content": "file_content",
"cloud": "microk8s",
"cloud_credentials": "cloud_credential_name",
},
{
"ca_cert_content": "file_content",
"cloud": "microk8s",
"cloud_credentials": "cloud_credential_name",
},
{
"paas_provider": "juju",
"cloud": "microk8s",
"cloud_credentials": "cloud_credential_name",
},
{
"paas_provider": "juju",
"ca_cert_content": "file_content",
"cloud_credentials": "cloud_credential_name",
},
{
"paas_provider": "juju",
"ca_cert_content": "file_content",
"cloud": "microk8s",
},
{
"some_param": None,
},
{},
]
for config in invalid_configs:
with self.subTest():
indata = {
"name": "juju_paas",
"vim_type": "paas",
"description": None,
"vim_url": "http://0.0.0.0:0",
"vim_user": "some_user",
"vim_password": "some_password",
"vim_tenant_name": "null",
"config": config,
}
self.check_invalid_indata_raises_exception(
indata, mock_common_vim_wim_sdn
)
def test_kafka_message_is_not_sent_if_paas_vim(self, mock_common_vim_wim_sdn):
valid_juju_paas_config = {
"paas_provider": "juju",
"ca_cert_content": "file_content",
"cloud": "microk8s",
"cloud_credentials": "cloud_credential_name",
"authorized_keys": "keys",
}
indata = {
"name": "juju_paas",
"vim_type": "paas",
"description": None,
"vim_url": "http://0.0.0.0:0",
"vim_user": "some_user",
"vim_password": "some_password",
"vim_tenant_name": "null",
"config": valid_juju_paas_config,
}
rollback = []
self.topic.temporal = Mock()
self.topic.new(rollback, self.fake_session, indata)
self.assertEqual(len(rollback), 1, "Wrong rollback length")
self.msg.write.assert_not_called()
self.topic.temporal.start_vim_workflow.assert_called_once()
def test_kafka_message_is_sent_if_not_paas_vim(self, mock_common_vim_wim_sdn):
indata = {
"name": "juju_paas",
"vim_type": "openstack",
"description": None,
"vim_url": "http://0.0.0.0:0",
"vim_user": "some_user",
"vim_password": "some_password",
"vim_tenant_name": "null",
}
rollback = []
self.topic.new(rollback, self.fake_session, indata)
self.assertEqual(len(rollback), 1, "Wrong rollback length")
mock_common_vim_wim_sdn.assert_called_once_with(self.fake_session, indata)
self.msg.write.assert_called_once_with("vim_account", "created", ANY)
if __name__ == "__main__":
unittest.main()