blob: 8124ce42b7d1ea5290320fc968fc3a173be67b23 [file] [log] [blame]
#! /usr/bin/python3
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__author__ = "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com"
__date__ = "$2019-10-019"
import unittest
from unittest import TestCase
from unittest.mock import Mock, patch, call
from uuid import uuid4
from http import HTTPStatus
from time import time
from random import randint
from osm_common import dbbase, fsbase, msgbase
from osm_nbi import authconn, validation
from osm_nbi.admin_topics import (
ProjectTopicAuth,
RoleTopicAuth,
UserTopicAuth,
CommonVimWimSdn,
VcaTopic,
)
from osm_nbi.engine import EngineException
from osm_nbi.authconn import AuthconnNotFoundException
test_pid = str(uuid4())
test_name = "test-user"
def norm(str):
"""Normalize string for checking"""
return " ".join(str.strip().split()).lower()
class TestVcaTopic(TestCase):
def setUp(self):
self.db = Mock(dbbase.DbBase())
self.fs = Mock(fsbase.FsBase())
self.msg = Mock(msgbase.MsgBase())
self.auth = Mock(authconn.Authconn(None, None, None))
self.vca_topic = VcaTopic(self.db, self.fs, self.msg, self.auth)
@patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_new")
def test_format_on_new(self, mock_super_format_on_new):
content = {
"_id": "id",
"secret": "encrypted_secret",
"cacert": "encrypted_cacert",
}
self.db.encrypt.side_effect = ["secret", "cacert"]
mock_super_format_on_new.return_value = "1234"
oid = self.vca_topic.format_on_new(content)
self.assertEqual(oid, "1234")
self.assertEqual(content["secret"], "secret")
self.assertEqual(content["cacert"], "cacert")
self.db.encrypt.assert_has_calls(
[
call("encrypted_secret", schema_version="1.11", salt="id"),
call("encrypted_cacert", schema_version="1.11", salt="id"),
]
)
mock_super_format_on_new.assert_called_with(content, None, False)
@patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_edit")
def test_format_on_edit(self, mock_super_format_on_edit):
edit_content = {
"_id": "id",
"secret": "encrypted_secret",
"cacert": "encrypted_cacert",
}
final_content = {
"_id": "id",
"schema_version": "1.11",
}
self.db.encrypt.side_effect = ["secret", "cacert"]
mock_super_format_on_edit.return_value = "1234"
oid = self.vca_topic.format_on_edit(final_content, edit_content)
self.assertEqual(oid, "1234")
self.assertEqual(final_content["secret"], "secret")
self.assertEqual(final_content["cacert"], "cacert")
self.db.encrypt.assert_has_calls(
[
call("encrypted_secret", schema_version="1.11", salt="id"),
call("encrypted_cacert", schema_version="1.11", salt="id"),
]
)
mock_super_format_on_edit.assert_called()
@patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
def test_check_conflict_on_del(self, mock_check_conflict_on_del):
session = {
"project_id": "project-id",
"force": False,
}
_id = "vca-id"
db_content = {}
self.db.get_list.return_value = None
self.vca_topic.check_conflict_on_del(session, _id, db_content)
self.db.get_list.assert_called_with(
"vim_accounts",
{"vca": _id, "_admin.projects_read.cont": "project-id"},
)
mock_check_conflict_on_del.assert_called_with(session, _id, db_content)
@patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
def test_check_conflict_on_del_force(self, mock_check_conflict_on_del):
session = {
"project_id": "project-id",
"force": True,
}
_id = "vca-id"
db_content = {}
self.vca_topic.check_conflict_on_del(session, _id, db_content)
self.db.get_list.assert_not_called()
mock_check_conflict_on_del.assert_not_called()
@patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
def test_check_conflict_on_del_with_conflict(self, mock_check_conflict_on_del):
session = {
"project_id": "project-id",
"force": False,
}
_id = "vca-id"
db_content = {}
self.db.get_list.return_value = {"_id": "vim", "vca": "vca-id"}
with self.assertRaises(EngineException) as context:
self.vca_topic.check_conflict_on_del(session, _id, db_content)
self.assertEqual(
context.exception,
EngineException(
"There is at least one VIM account using this vca",
http_code=HTTPStatus.CONFLICT,
),
)
self.db.get_list.assert_called_with(
"vim_accounts",
{"vca": _id, "_admin.projects_read.cont": "project-id"},
)
mock_check_conflict_on_del.assert_not_called()
class Test_ProjectTopicAuth(TestCase):
@classmethod
def setUpClass(cls):
cls.test_name = "test-project-topic"
def setUp(self):
self.db = Mock(dbbase.DbBase())
self.fs = Mock(fsbase.FsBase())
self.msg = Mock(msgbase.MsgBase())
self.auth = Mock(authconn.Authconn(None, None, None))
self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth)
self.fake_session = {
"username": self.test_name,
"project_id": (test_pid,),
"method": None,
"admin": True,
"force": False,
"public": False,
"allow_show_user_project_role": True,
}
self.topic.check_quota = Mock(return_value=None) # skip quota
def test_new_project(self):
with self.subTest(i=1):
rollback = []
pid1 = str(uuid4())
self.auth.get_project_list.return_value = []
self.auth.create_project.return_value = pid1
pid2, oid = self.topic.new(
rollback, self.fake_session, {"name": self.test_name, "quotas": {}}
)
self.assertEqual(len(rollback), 1, "Wrong rollback length")
self.assertEqual(pid2, pid1, "Wrong project identifier")
content = self.auth.create_project.call_args[0][0]
self.assertEqual(content["name"], self.test_name, "Wrong project name")
self.assertEqual(content["quotas"], {}, "Wrong quotas")
self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
self.assertEqual(
content["_admin"]["modified"],
content["_admin"]["created"],
"Wrong modification time",
)
with self.subTest(i=2):
rollback = []
with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
self.topic.new(
rollback,
self.fake_session,
{"name": "other-project-name", "quotas": {"baditems": 10}},
)
self.assertEqual(len(rollback), 0, "Wrong rollback length")
self.assertEqual(
e.exception.http_code,
HTTPStatus.UNPROCESSABLE_ENTITY,
"Wrong HTTP status code",
)
self.assertIn(
"format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format(
"baditems"
),
norm(str(e.exception)),
"Wrong exception text",
)
def test_edit_project(self):
now = time()
pid = str(uuid4())
proj = {
"_id": pid,
"name": self.test_name,
"_admin": {"created": now, "modified": now},
}
with self.subTest(i=1):
self.auth.get_project_list.side_effect = [[proj], []]
new_name = "new-project-name"
quotas = {"vnfds": randint(0, 100), "nsds": randint(0, 100)}
self.topic.edit(
self.fake_session, pid, {"name": new_name, "quotas": quotas}
)
_id, content = self.auth.update_project.call_args[0]
self.assertEqual(_id, pid, "Wrong project identifier")
self.assertEqual(content["_id"], pid, "Wrong project identifier")
self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
self.assertGreater(
content["_admin"]["modified"], now, "Wrong modification time"
)
self.assertEqual(content["name"], new_name, "Wrong project name")
self.assertEqual(content["quotas"], quotas, "Wrong quotas")
with self.subTest(i=2):
new_name = "other-project-name"
quotas = {"baditems": randint(0, 100)}
self.auth.get_project_list.side_effect = [[proj], []]
with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
self.topic.edit(
self.fake_session, pid, {"name": new_name, "quotas": quotas}
)
self.assertEqual(
e.exception.http_code,
HTTPStatus.UNPROCESSABLE_ENTITY,
"Wrong HTTP status code",
)
self.assertIn(
"format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format(
"baditems"
),
norm(str(e.exception)),
"Wrong exception text",
)
def test_conflict_on_new(self):
with self.subTest(i=1):
rollback = []
pid = str(uuid4())
with self.assertRaises(
EngineException, msg="Accepted uuid as project name"
) as e:
self.topic.new(rollback, self.fake_session, {"name": pid})
self.assertEqual(len(rollback), 0, "Wrong rollback length")
self.assertEqual(
e.exception.http_code,
HTTPStatus.UNPROCESSABLE_ENTITY,
"Wrong HTTP status code",
)
self.assertIn(
"project name '{}' cannot have an uuid format".format(pid),
norm(str(e.exception)),
"Wrong exception text",
)
with self.subTest(i=2):
rollback = []
self.auth.get_project_list.return_value = [
{"_id": test_pid, "name": self.test_name}
]
with self.assertRaises(
EngineException, msg="Accepted existing project name"
) as e:
self.topic.new(rollback, self.fake_session, {"name": self.test_name})
self.assertEqual(len(rollback), 0, "Wrong rollback length")
self.assertEqual(
e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
)
self.assertIn(
"project '{}' exists".format(self.test_name),
norm(str(e.exception)),
"Wrong exception text",
)
def test_conflict_on_edit(self):
with self.subTest(i=1):
self.auth.get_project_list.return_value = [
{"_id": test_pid, "name": self.test_name}
]
new_name = str(uuid4())
with self.assertRaises(
EngineException, msg="Accepted uuid as project name"
) as e:
self.topic.edit(self.fake_session, test_pid, {"name": new_name})
self.assertEqual(
e.exception.http_code,
HTTPStatus.UNPROCESSABLE_ENTITY,
"Wrong HTTP status code",
)
self.assertIn(
"project name '{}' cannot have an uuid format".format(new_name),
norm(str(e.exception)),
"Wrong exception text",
)
with self.subTest(i=2):
pid = str(uuid4())
self.auth.get_project_list.return_value = [{"_id": pid, "name": "admin"}]
with self.assertRaises(
EngineException, msg="Accepted renaming of project 'admin'"
) as e:
self.topic.edit(self.fake_session, pid, {"name": "new-name"})
self.assertEqual(
e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
)
self.assertIn(
"you cannot rename project 'admin'",
norm(str(e.exception)),
"Wrong exception text",
)
with self.subTest(i=3):
new_name = "new-project-name"
self.auth.get_project_list.side_effect = [
[{"_id": test_pid, "name": self.test_name}],
[{"_id": str(uuid4()), "name": new_name}],
]
with self.assertRaises(
EngineException, msg="Accepted existing project name"
) as e:
self.topic.edit(self.fake_session, pid, {"name": new_name})
self.assertEqual(
e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
)
self.assertIn(
"project '{}' is already used".format(new_name),
norm(str(e.exception)),
"Wrong exception text",
)
def test_delete_project(self):
with self.subTest(i=1):
pid = str(uuid4())
self.auth.get_project.return_value = {
"_id": pid,
"name": "other-project-name",
}
self.auth.delete_project.return_value = {"deleted": 1}
self.auth.get_user_list.return_value = []
self.db.get_list.return_value = []
rc = self.topic.delete(self.fake_session, pid)
self.assertEqual(rc, {"deleted": 1}, "Wrong project deletion return info")
self.assertEqual(
self.auth.get_project.call_args[0][0], pid, "Wrong project identifier"
)
self.assertEqual(
self.auth.delete_project.call_args[0][0],
pid,
"Wrong project identifier",
)
def test_conflict_on_del(self):
with self.subTest(i=1):
self.auth.get_project.return_value = {
"_id": test_pid,
"name": self.test_name,
}
with self.assertRaises(
EngineException, msg="Accepted deletion of own project"
) as e:
self.topic.delete(self.fake_session, self.test_name)
self.assertEqual(
e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
)
self.assertIn(
"you cannot delete your own project",
norm(str(e.exception)),
"Wrong exception text",
)
with self.subTest(i=2):
self.auth.get_project.return_value = {"_id": str(uuid4()), "name": "admin"}
with self.assertRaises(
EngineException, msg="Accepted deletion of project 'admin'"
) as e:
self.topic.delete(self.fake_session, "admin")
self.assertEqual(
e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
)
self.assertIn(
"you cannot delete project 'admin'",
norm(str(e.exception)),
"Wrong exception text",
)
with self.subTest(i=3):
pid = str(uuid4())
name = "other-project-name"
self.auth.get_project.return_value = {"_id": pid, "name": name}
self.auth.get_user_list.return_value = [
{
"_id": str(uuid4()),
"username": self.test_name,
"project_role_mappings": [{"project": pid, "role": str(uuid4())}],
}
]
with self.assertRaises(
EngineException, msg="Accepted deletion of used project"
) as e:
self.topic.delete(self.fake_session, pid)
self.assertEqual(
e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
)
self.assertIn(
"project '{}' ({}) is being used by user '{}'".format(
name, pid, self.test_name
),
norm(str(e.exception)),
"Wrong exception text",
)
with self.subTest(i=4):
self.auth.get_user_list.return_value = []
self.db.get_list.return_value = [
{
"_id": str(uuid4()),
"id": self.test_name,
"_admin": {"projects_read": [pid], "projects_write": []},
}
]
with self.assertRaises(
EngineException, msg="Accepted deletion of used project"
) as e:
self.topic.delete(self.fake_session, pid)
self.assertEqual(
e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
)
self.assertIn(
"project '{}' ({}) is being used by {} '{}'".format(
name, pid, "vnf descriptor", self.test_name
),
norm(str(e.exception)),
"Wrong exception text",
)
class Test_RoleTopicAuth(TestCase):
@classmethod
def setUpClass(cls):
cls.test_name = "test-role-topic"
cls.test_operations = ["tokens:get"]
def setUp(self):
self.db = Mock(dbbase.DbBase())
self.fs = Mock(fsbase.FsBase())
self.msg = Mock(msgbase.MsgBase())
self.auth = Mock(authconn.Authconn(None, None, None))
self.auth.role_permissions = self.test_operations
self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth)
self.fake_session = {
"username": test_name,
"project_id": (test_pid,),
"method": None,
"admin": True,
"force": False,
"public": False,
"allow_show_user_project_role": True,
}
self.topic.check_quota = Mock(return_value=None) # skip quota
def test_new_role(self):
with self.subTest(i=1):
rollback = []
rid1 = str(uuid4())
perms_in = {"tokens": True}
perms_out = {"default": False, "admin": False, "tokens": True}
self.auth.get_role_list.return_value = []
self.auth.create_role.return_value = rid1
rid2, oid = self.topic.new(
rollback,
self.fake_session,
{"name": self.test_name, "permissions": perms_in},
)
self.assertEqual(len(rollback), 1, "Wrong rollback length")
self.assertEqual(rid2, rid1, "Wrong project identifier")
content = self.auth.create_role.call_args[0][0]
self.assertEqual(content["name"], self.test_name, "Wrong role name")
self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
self.assertEqual(
content["_admin"]["modified"],
content["_admin"]["created"],
"Wrong modification time",
)
with self.subTest(i=2):
rollback = []
with self.assertRaises(
EngineException, msg="Accepted wrong permissions"
) as e:
self.topic.new(
rollback,
self.fake_session,
{"name": "other-role-name", "permissions": {"projects": True}},
)
self.assertEqual(len(rollback), 0, "Wrong rollback length")
self.assertEqual(
e.exception.http_code,
HTTPStatus.UNPROCESSABLE_ENTITY,
"Wrong HTTP status code",
)
self.assertIn(
"invalid permission '{}'".format("projects"),
norm(str(e.exception)),
"Wrong exception text",
)
def test_edit_role(self):
now = time()
rid = str(uuid4())
role = {
"_id": rid,
"name": self.test_name,
"permissions": {"tokens": True},
"_admin": {"created": now, "modified": now},
}
with self.subTest(i=1):
self.auth.get_role_list.side_effect = [[role], []]
self.auth.get_role.return_value = role
new_name = "new-role-name"
perms_in = {"tokens": False, "tokens:get": True}
perms_out = {
"default": False,
"admin": False,
"tokens": False,
"tokens:get": True,
}
self.topic.edit(
self.fake_session, rid, {"name": new_name, "permissions": perms_in}
)
content = self.auth.update_role.call_args[0][0]
self.assertEqual(content["_id"], rid, "Wrong role identifier")
self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
self.assertGreater(
content["_admin"]["modified"], now, "Wrong modification time"
)
self.assertEqual(content["name"], new_name, "Wrong role name")
self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
with self.subTest(i=2):
new_name = "other-role-name"
perms_in = {"tokens": False, "tokens:post": True}
self.auth.get_role_list.side_effect = [[role], []]
with self.assertRaises(
EngineException, msg="Accepted wrong permissions"
) as e:
self.topic.edit(
self.fake_session, rid, {"name": new_name, "permissions": perms_in}
)
self.assertEqual(
e.exception.http_code,
HTTPStatus.UNPROCESSABLE_ENTITY,
"Wrong HTTP status code",
)
self.assertIn(
"invalid permission '{}'".format("tokens:post"),
norm(str(e.exception)),
"Wrong exception text",
)
def test_delete_role(self):
with self.subTest(i=1):
rid = str(uuid4())
role = {"_id": rid, "name": "other-role-name"}
self.auth.get_role_list.return_value = [role]
self.auth.get_role.return_value = role
self.auth.delete_role.return_value = {"deleted": 1}
self.auth.get_user_list.return_value = []
rc = self.topic.delete(self.fake_session, rid)
self.assertEqual(rc, {"deleted": 1}, "Wrong role deletion return info")
self.assertEqual(
self.auth.get_role_list.call_args[0][0]["_id"],
rid,
"Wrong role identifier",
)
self.assertEqual(
self.auth.get_role.call_args[0][0], rid, "Wrong role identifier"
)
self.assertEqual(
self.auth.delete_role.call_args[0][0], rid, "Wrong role identifier"
)
def test_conflict_on_new(self):
with self.subTest(i=1):
rollback = []
rid = str(uuid4())
with self.assertRaises(
EngineException, msg="Accepted uuid as role name"
) as e:
self.topic.new(rollback, self.fake_session, {"name": rid})
self.assertEqual(len(rollback), 0, "Wrong rollback length")
self.assertEqual(
e.exception.http_code,
HTTPStatus.UNPROCESSABLE_ENTITY,
"Wrong HTTP status code",
)
self.assertIn(
"role name '{}' cannot have an uuid format".format(rid),
norm(str(e.exception)),
"Wrong exception text",
)
with self.subTest(i=2):
rollback = []
self.auth.get_role_list.return_value = [
{"_id": str(uuid4()), "name": self.test_name}
]
with self.assertRaises(
EngineException, msg="Accepted existing role name"
) as e:
self.topic.new(rollback, self.fake_session, {"name": self.test_name})
self.assertEqual(len(rollback), 0, "Wrong rollback length")
self.assertEqual(
e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
)
self.assertIn(
"role name '{}' exists".format(self.test_name),
norm(str(e.exception)),
"Wrong exception text",
)
def test_conflict_on_edit(self):
rid = str(uuid4())
with self.subTest(i=1):
self.auth.get_role_list.return_value = [
{"_id": rid, "name": self.test_name, "permissions": {}}
]
new_name = str(uuid4())
with self.assertRaises(
EngineException, msg="Accepted uuid as role name"
) as e:
self.topic.edit(self.fake_session, rid, {"name": new_name})
self.assertEqual(
e.exception.http_code,
HTTPStatus.UNPROCESSABLE_ENTITY,
"Wrong HTTP status code",
)
self.assertIn(
"role name '{}' cannot have an uuid format".format(new_name),
norm(str(e.exception)),
"Wrong exception text",
)
for i, role_name in enumerate(["system_admin", "project_admin"], start=2):
with self.subTest(i=i):
rid = str(uuid4())
self.auth.get_role.return_value = {
"_id": rid,
"name": role_name,
"permissions": {},
}
with self.assertRaises(
EngineException,
msg="Accepted renaming of role '{}'".format(role_name),
) as e:
self.topic.edit(self.fake_session, rid, {"name": "new-name"})
self.assertEqual(
e.exception.http_code,
HTTPStatus.FORBIDDEN,
"Wrong HTTP status code",
)
self.assertIn(
"you cannot rename role '{}'".format(role_name),
norm(str(e.exception)),
"Wrong exception text",
)
with self.subTest(i=i + 1):
new_name = "new-role-name"
self.auth.get_role_list.side_effect = [
[{"_id": rid, "name": self.test_name, "permissions": {}}],
[{"_id": str(uuid4()), "name": new_name, "permissions": {}}],
]
self.auth.get_role.return_value = {
"_id": rid,
"name": self.test_name,
"permissions": {},
}
with self.assertRaises(
EngineException, msg="Accepted existing role name"
) as e:
self.topic.edit(self.fake_session, rid, {"name": new_name})
self.assertEqual(
e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
)
self.assertIn(
"role name '{}' exists".format(new_name),
norm(str(e.exception)),
"Wrong exception text",
)
def test_conflict_on_del(self):
for i, role_name in enumerate(["system_admin", "project_admin"], start=1):
with self.subTest(i=i):
rid = str(uuid4())
role = {"_id": rid, "name": role_name}
self.auth.get_role_list.return_value = [role]
self.auth.get_role.return_value = role
with self.assertRaises(
EngineException,
msg="Accepted deletion of role '{}'".format(role_name),
) as e:
self.topic.delete(self.fake_session, rid)
self.assertEqual(
e.exception.http_code,
HTTPStatus.FORBIDDEN,
"Wrong HTTP status code",
)
self.assertIn(
"you cannot delete role '{}'".format(role_name),
norm(str(e.exception)),
"Wrong exception text",
)
with self.subTest(i=i + 1):
rid = str(uuid4())
name = "other-role-name"
role = {"_id": rid, "name": name}
self.auth.get_role_list.return_value = [role]
self.auth.get_role.return_value = role
self.auth.get_user_list.return_value = [
{
"_id": str(uuid4()),
"username": self.test_name,
"project_role_mappings": [{"project": str(uuid4()), "role": rid}],
}
]
with self.assertRaises(
EngineException, msg="Accepted deletion of used role"
) as e:
self.topic.delete(self.fake_session, rid)
self.assertEqual(
e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
)
self.assertIn(
"role '{}' ({}) is being used by user '{}'".format(
name, rid, self.test_name
),
norm(str(e.exception)),
"Wrong exception text",
)
class Test_UserTopicAuth(TestCase):
@classmethod
def setUpClass(cls):
cls.test_name = "test-user-topic"
def setUp(self):
self.db = Mock(dbbase.DbBase())
self.fs = Mock(fsbase.FsBase())
self.msg = Mock(msgbase.MsgBase())
self.auth = Mock(authconn.Authconn(None, None, None))
self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth)
self.fake_session = {
"username": test_name,
"project_id": (test_pid,),
"method": None,
"admin": True,
"force": False,
"public": False,
"allow_show_user_project_role": True,
}
self.topic.check_quota = Mock(return_value=None) # skip quota
def test_new_user(self):
uid1 = str(uuid4())
pid = str(uuid4())
self.auth.get_user_list.return_value = []
self.auth.get_project.return_value = {"_id": pid, "name": "some_project"}
self.auth.create_user.return_value = {"_id": uid1, "username": self.test_name}
with self.subTest(i=1):
rollback = []
rid = str(uuid4())
self.auth.get_role.return_value = {"_id": rid, "name": "some_role"}
prms_in = [{"project": "some_project", "role": "some_role"}]
prms_out = [{"project": pid, "role": rid}]
uid2, oid = self.topic.new(
rollback,
self.fake_session,
{
"username": self.test_name,
"password": self.test_name,
"project_role_mappings": prms_in,
},
)
self.assertEqual(len(rollback), 1, "Wrong rollback length")
self.assertEqual(uid2, uid1, "Wrong project identifier")
content = self.auth.create_user.call_args[0][0]
self.assertEqual(content["username"], self.test_name, "Wrong project name")
self.assertEqual(content["password"], self.test_name, "Wrong password")
self.assertEqual(
content["project_role_mappings"],
prms_out,
"Wrong project-role mappings",
)
self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
self.assertEqual(
content["_admin"]["modified"],
content["_admin"]["created"],
"Wrong modification time",
)
with self.subTest(i=2):
rollback = []
def_rid = str(uuid4())
def_role = {"_id": def_rid, "name": "project_admin"}
self.auth.get_role.return_value = def_role
self.auth.get_role_list.return_value = [def_role]
prms_out = [{"project": pid, "role": def_rid}]
uid2, oid = self.topic.new(
rollback,
self.fake_session,
{
"username": self.test_name,
"password": self.test_name,
"projects": ["some_project"],
},
)
self.assertEqual(len(rollback), 1, "Wrong rollback length")
self.assertEqual(uid2, uid1, "Wrong project identifier")
content = self.auth.create_user.call_args[0][0]
self.assertEqual(content["username"], self.test_name, "Wrong project name")
self.assertEqual(content["password"], self.test_name, "Wrong password")
self.assertEqual(
content["project_role_mappings"],
prms_out,
"Wrong project-role mappings",
)
self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
self.assertEqual(
content["_admin"]["modified"],
content["_admin"]["created"],
"Wrong modification time",
)
with self.subTest(i=3):
rollback = []
with self.assertRaises(
EngineException, msg="Accepted wrong project-role mappings"
) as e:
self.topic.new(
rollback,
self.fake_session,
{
"username": "other-project-name",
"password": "other-password",
"project_role_mappings": [{}],
},
)
self.assertEqual(len(rollback), 0, "Wrong rollback length")
self.assertEqual(
e.exception.http_code,
HTTPStatus.UNPROCESSABLE_ENTITY,
"Wrong HTTP status code",
)
self.assertIn(
"format error at '{}' '{}'".format(
"project_role_mappings:{}", "'{}' is a required property"
).format(0, "project"),
norm(str(e.exception)),
"Wrong exception text",
)
with self.subTest(i=4):
rollback = []
with self.assertRaises(EngineException, msg="Accepted wrong projects") as e:
self.topic.new(
rollback,
self.fake_session,
{
"username": "other-project-name",
"password": "other-password",
"projects": [],
},
)
self.assertEqual(len(rollback), 0, "Wrong rollback length")
self.assertEqual(
e.exception.http_code,
HTTPStatus.UNPROCESSABLE_ENTITY,
"Wrong HTTP status code",
)
self.assertIn(
"format error at '{}' '{}'".format(
"projects", "{} is too short"
).format([]),
norm(str(e.exception)),
"Wrong exception text",
)
def test_edit_user(self):
now = time()
uid = str(uuid4())
pid1 = str(uuid4())
rid1 = str(uuid4())
prms = [
{
"project": pid1,
"project_name": "project-1",
"role": rid1,
"role_name": "role-1",
}
]
user = {
"_id": uid,
"username": self.test_name,
"project_role_mappings": prms,
"_admin": {"created": now, "modified": now},
}
with self.subTest(i=1):
self.auth.get_user_list.side_effect = [[user], []]
self.auth.get_user.return_value = user
pid2 = str(uuid4())
rid2 = str(uuid4())
self.auth.get_project.side_effect = [
{"_id": pid2, "name": "project-2"},
{"_id": pid1, "name": "project-1"},
]
self.auth.get_role.side_effect = [
{"_id": rid2, "name": "role-2"},
{"_id": rid1, "name": "role-1"},
]
new_name = "new-user-name"
new_pasw = "new-password"
add_prms = [{"project": pid2, "role": rid2}]
rem_prms = [{"project": pid1, "role": rid1}]
self.topic.edit(
self.fake_session,
uid,
{
"username": new_name,
"password": new_pasw,
"add_project_role_mappings": add_prms,
"remove_project_role_mappings": rem_prms,
},
)
content = self.auth.update_user.call_args[0][0]
self.assertEqual(content["_id"], uid, "Wrong user identifier")
self.assertEqual(content["username"], new_name, "Wrong user name")
self.assertEqual(content["password"], new_pasw, "Wrong user password")
self.assertEqual(
content["add_project_role_mappings"],
add_prms,
"Wrong project-role mappings to add",
)
self.assertEqual(
content["remove_project_role_mappings"],
prms,
"Wrong project-role mappings to remove",
)
with self.subTest(i=2):
new_name = "other-user-name"
new_prms = [{}]
self.auth.get_role_list.side_effect = [[user], []]
self.auth.get_user_list.side_effect = [[user]]
with self.assertRaises(
EngineException, msg="Accepted wrong project-role mappings"
) as e:
self.topic.edit(
self.fake_session,
uid,
{"username": new_name, "project_role_mappings": new_prms},
)
self.assertEqual(
e.exception.http_code,
HTTPStatus.UNPROCESSABLE_ENTITY,
"Wrong HTTP status code",
)
self.assertIn(
"format error at '{}' '{}'".format(
"project_role_mappings:{}", "'{}' is a required property"
).format(0, "project"),
norm(str(e.exception)),
"Wrong exception text",
)
with self.subTest(i=3):
self.auth.get_user_list.side_effect = [[user], []]
self.auth.get_user.return_value = user
old_password = self.test_name
new_pasw = "new-password"
self.topic.edit(
self.fake_session,
uid,
{
"old_password": old_password,
"password": new_pasw,
},
)
content = self.auth.update_user.call_args[0][0]
self.assertEqual(content["old_password"], old_password, "Wrong old password")
self.assertEqual(content["password"], new_pasw, "Wrong user password")
def test_delete_user(self):
with self.subTest(i=1):
uid = str(uuid4())
self.fake_session["username"] = self.test_name
user = user = {
"_id": uid,
"username": "other-user-name",
"project_role_mappings": [],
}
self.auth.get_user.return_value = user
self.auth.delete_user.return_value = {"deleted": 1}
rc = self.topic.delete(self.fake_session, uid)
self.assertEqual(rc, {"deleted": 1}, "Wrong user deletion return info")
self.assertEqual(
self.auth.get_user.call_args[0][0], uid, "Wrong user identifier"
)
self.assertEqual(
self.auth.delete_user.call_args[0][0], uid, "Wrong user identifier"
)
def test_conflict_on_new(self):
with self.subTest(i=1):
rollback = []
uid = str(uuid4())
with self.assertRaises(
EngineException, msg="Accepted uuid as username"
) as e:
self.topic.new(
rollback,
self.fake_session,
{
"username": uid,
"password": self.test_name,
"projects": [test_pid],
},
)
self.assertEqual(len(rollback), 0, "Wrong rollback length")
self.assertEqual(
e.exception.http_code,
HTTPStatus.UNPROCESSABLE_ENTITY,
"Wrong HTTP status code",
)
self.assertIn(
"username '{}' cannot have a uuid format".format(uid),
norm(str(e.exception)),
"Wrong exception text",
)
with self.subTest(i=2):
rollback = []
self.auth.get_user_list.return_value = [
{"_id": str(uuid4()), "username": self.test_name}
]
with self.assertRaises(
EngineException, msg="Accepted existing username"
) as e:
self.topic.new(
rollback,
self.fake_session,
{
"username": self.test_name,
"password": self.test_name,
"projects": [test_pid],
},
)
self.assertEqual(len(rollback), 0, "Wrong rollback length")
self.assertEqual(
e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
)
self.assertIn(
"username '{}' is already used".format(self.test_name),
norm(str(e.exception)),
"Wrong exception text",
)
with self.subTest(i=3):
rollback = []
self.auth.get_user_list.return_value = []
self.auth.get_role_list.side_effect = [[], []]
with self.assertRaises(
AuthconnNotFoundException, msg="Accepted user without default role"
) as e:
self.topic.new(
rollback,
self.fake_session,
{
"username": self.test_name,
"password": self.test_name,
"projects": [str(uuid4())],
},
)
self.assertEqual(len(rollback), 0, "Wrong rollback length")
self.assertEqual(
e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code"
)
self.assertIn(
"can't find default role for user '{}'".format(self.test_name),
norm(str(e.exception)),
"Wrong exception text",
)
def test_conflict_on_edit(self):
uid = str(uuid4())
with self.subTest(i=1):
self.auth.get_user_list.return_value = [
{"_id": uid, "username": self.test_name}
]
new_name = str(uuid4())
with self.assertRaises(
EngineException, msg="Accepted uuid as username"
) as e:
self.topic.edit(self.fake_session, uid, {"username": new_name})
self.assertEqual(
e.exception.http_code,
HTTPStatus.UNPROCESSABLE_ENTITY,
"Wrong HTTP status code",
)
self.assertIn(
"username '{}' cannot have an uuid format".format(new_name),
norm(str(e.exception)),
"Wrong exception text",
)
with self.subTest(i=2):
self.auth.get_user_list.return_value = [
{"_id": uid, "username": self.test_name}
]
self.auth.get_role_list.side_effect = [[], []]
with self.assertRaises(
AuthconnNotFoundException, msg="Accepted user without default role"
) as e:
self.topic.edit(self.fake_session, uid, {"projects": [str(uuid4())]})
self.assertEqual(
e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code"
)
self.assertIn(
"can't find a default role for user '{}'".format(self.test_name),
norm(str(e.exception)),
"Wrong exception text",
)
with self.subTest(i=3):
admin_uid = str(uuid4())
self.auth.get_user_list.return_value = [
{"_id": admin_uid, "username": "admin"}
]
with self.assertRaises(
EngineException,
msg="Accepted removing system_admin role from admin user",
) as e:
self.topic.edit(
self.fake_session,
admin_uid,
{
"remove_project_role_mappings": [
{"project": "admin", "role": "system_admin"}
]
},
)
self.assertEqual(
e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code"
)
self.assertIn(
"you cannot remove system_admin role from admin user",
norm(str(e.exception)),
"Wrong exception text",
)
with self.subTest(i=4):
new_name = "new-user-name"
self.auth.get_user_list.side_effect = [
[{"_id": uid, "name": self.test_name}],
[{"_id": str(uuid4()), "name": new_name}],
]
with self.assertRaises(
EngineException, msg="Accepted existing username"
) as e:
self.topic.edit(self.fake_session, uid, {"username": new_name})
self.assertEqual(
e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
)
self.assertIn(
"username '{}' is already used".format(new_name),
norm(str(e.exception)),
"Wrong exception text",
)
def test_conflict_on_del(self):
with self.subTest(i=1):
uid = str(uuid4())
self.fake_session["username"] = self.test_name
user = user = {
"_id": uid,
"username": self.test_name,
"project_role_mappings": [],
}
self.auth.get_user.return_value = user
with self.assertRaises(
EngineException, msg="Accepted deletion of own user"
) as e:
self.topic.delete(self.fake_session, uid)
self.assertEqual(
e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
)
self.assertIn(
"you cannot delete your own login user",
norm(str(e.exception)),
"Wrong exception text",
)
class Test_CommonVimWimSdn(TestCase):
@classmethod
def setUpClass(cls):
cls.test_name = "test-cim-topic" # CIM = Common Infrastructure Manager
def setUp(self):
self.db = Mock(dbbase.DbBase())
self.fs = Mock(fsbase.FsBase())
self.msg = Mock(msgbase.MsgBase())
self.auth = Mock(authconn.Authconn(None, None, None))
self.topic = CommonVimWimSdn(self.db, self.fs, self.msg, self.auth)
# Use WIM schemas for testing because they are the simplest
self.topic._send_msg = Mock()
self.topic.topic = "wims"
self.topic.schema_new = validation.wim_account_new_schema
self.topic.schema_edit = validation.wim_account_edit_schema
self.fake_session = {
"username": test_name,
"project_id": (test_pid,),
"method": None,
"admin": True,
"force": False,
"public": False,
"allow_show_user_project_role": True,
}
self.topic.check_quota = Mock(return_value=None) # skip quota
def test_new_cvws(self):
test_url = "http://0.0.0.0:0"
with self.subTest(i=1):
rollback = []
test_type = "fake"
self.db.get_one.return_value = None
self.db.create.side_effect = lambda self, content: content["_id"]
cid, oid = self.topic.new(
rollback,
self.fake_session,
{"name": self.test_name, "wim_url": test_url, "wim_type": test_type},
)
self.assertEqual(len(rollback), 1, "Wrong rollback length")
args = self.db.create.call_args[0]
content = args[1]
self.assertEqual(args[0], self.topic.topic, "Wrong topic")
self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
self.assertEqual(content["name"], self.test_name, "Wrong CIM name")
self.assertEqual(content["wim_url"], test_url, "Wrong URL")
self.assertEqual(content["wim_type"], test_type, "Wrong CIM type")
self.assertEqual(content["schema_version"], "1.11", "Wrong schema version")
self.assertEqual(content["op_id"], oid, "Wrong operation identifier")
self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
self.assertEqual(
content["_admin"]["modified"],
content["_admin"]["created"],
"Wrong modification time",
)
self.assertEqual(
content["_admin"]["operationalState"],
"PROCESSING",
"Wrong operational state",
)
self.assertEqual(
content["_admin"]["projects_read"],
[test_pid],
"Wrong read-only projects",
)
self.assertEqual(
content["_admin"]["projects_write"],
[test_pid],
"Wrong read/write projects",
)
self.assertIsNone(
content["_admin"]["current_operation"], "Wrong current operation"
)
self.assertEqual(
len(content["_admin"]["operations"]), 1, "Wrong number of operations"
)
operation = content["_admin"]["operations"][0]
self.assertEqual(
operation["lcmOperationType"], "create", "Wrong operation type"
)
self.assertEqual(
operation["operationState"], "PROCESSING", "Wrong operation state"
)
self.assertGreater(
operation["startTime"],
content["_admin"]["created"],
"Wrong operation start time",
)
self.assertGreater(
operation["statusEnteredTime"],
content["_admin"]["created"],
"Wrong operation status enter time",
)
self.assertEqual(
operation["detailed-status"], "", "Wrong operation detailed status info"
)
self.assertIsNone(
operation["operationParams"], "Wrong operation parameters"
)
# This test is disabled. From Feature 8030 we admit all WIM/SDN types
# with self.subTest(i=2):
# rollback = []
# test_type = "bad_type"
# with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e:
# self.topic.new(rollback, self.fake_session,
# {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
# self.assertEqual(len(rollback), 0, "Wrong rollback length")
# self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
# self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type,""),
# norm(str(e.exception)), "Wrong exception text")
def test_conflict_on_new(self):
with self.subTest(i=1):
rollback = []
test_url = "http://0.0.0.0:0"
test_type = "fake"
self.db.get_one.return_value = {"_id": str(uuid4()), "name": self.test_name}
with self.assertRaises(
EngineException, msg="Accepted existing CIM name"
) as e:
self.topic.new(
rollback,
self.fake_session,
{
"name": self.test_name,
"wim_url": test_url,
"wim_type": test_type,
},
)
self.assertEqual(len(rollback), 0, "Wrong rollback length")
self.assertEqual(
e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
)
self.assertIn(
"name '{}' already exists for {}".format(
self.test_name, self.topic.topic
),
norm(str(e.exception)),
"Wrong exception text",
)
def test_edit_cvws(self):
now = time()
cid = str(uuid4())
test_url = "http://0.0.0.0:0"
test_type = "fake"
cvws = {
"_id": cid,
"name": self.test_name,
"wim_url": test_url,
"wim_type": test_type,
"_admin": {
"created": now,
"modified": now,
"operations": [{"lcmOperationType": "create"}],
},
}
with self.subTest(i=1):
new_name = "new-cim-name"
new_url = "https://1.1.1.1:1"
new_type = "onos"
self.db.get_one.side_effect = [cvws, None]
self.db.replace.return_value = {"updated": 1}
# self.db.encrypt.side_effect = [b64str(), b64str()]
self.topic.edit(
self.fake_session,
cid,
{"name": new_name, "wim_url": new_url, "wim_type": new_type},
)
args = self.db.replace.call_args[0]
content = args[2]
self.assertEqual(args[0], self.topic.topic, "Wrong topic")
self.assertEqual(args[1], cid, "Wrong CIM identifier")
self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
self.assertEqual(content["name"], new_name, "Wrong CIM name")
self.assertEqual(content["wim_type"], new_type, "Wrong CIM type")
self.assertEqual(content["wim_url"], new_url, "Wrong URL")
self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
self.assertGreater(
content["_admin"]["modified"],
content["_admin"]["created"],
"Wrong modification time",
)
self.assertEqual(
len(content["_admin"]["operations"]), 2, "Wrong number of operations"
)
operation = content["_admin"]["operations"][1]
self.assertEqual(
operation["lcmOperationType"], "edit", "Wrong operation type"
)
self.assertEqual(
operation["operationState"], "PROCESSING", "Wrong operation state"
)
self.assertGreater(
operation["startTime"],
content["_admin"]["modified"],
"Wrong operation start time",
)
self.assertGreater(
operation["statusEnteredTime"],
content["_admin"]["modified"],
"Wrong operation status enter time",
)
self.assertEqual(
operation["detailed-status"], "", "Wrong operation detailed status info"
)
self.assertIsNone(
operation["operationParams"], "Wrong operation parameters"
)
with self.subTest(i=2):
self.db.get_one.side_effect = [cvws]
with self.assertRaises(EngineException, msg="Accepted wrong property") as e:
self.topic.edit(
self.fake_session,
str(uuid4()),
{"name": "new-name", "extra_prop": "anything"},
)
self.assertEqual(
e.exception.http_code,
HTTPStatus.UNPROCESSABLE_ENTITY,
"Wrong HTTP status code",
)
self.assertIn(
"format error '{}'".format(
"additional properties are not allowed ('{}' was unexpected)"
).format("extra_prop"),
norm(str(e.exception)),
"Wrong exception text",
)
def test_conflict_on_edit(self):
with self.subTest(i=1):
cid = str(uuid4())
new_name = "new-cim-name"
self.db.get_one.side_effect = [
{"_id": cid, "name": self.test_name},
{"_id": str(uuid4()), "name": new_name},
]
with self.assertRaises(
EngineException, msg="Accepted existing CIM name"
) as e:
self.topic.edit(self.fake_session, cid, {"name": new_name})
self.assertEqual(
e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code"
)
self.assertIn(
"name '{}' already exists for {}".format(new_name, self.topic.topic),
norm(str(e.exception)),
"Wrong exception text",
)
def test_delete_cvws(self):
cid = str(uuid4())
ro_pid = str(uuid4())
rw_pid = str(uuid4())
cvws = {"_id": cid, "name": self.test_name}
self.db.get_list.return_value = []
with self.subTest(i=1):
cvws["_admin"] = {
"projects_read": [test_pid, ro_pid, rw_pid],
"projects_write": [test_pid, rw_pid],
}
self.db.get_one.return_value = cvws
oid = self.topic.delete(self.fake_session, cid)
self.assertIsNone(oid, "Wrong operation identifier")
self.assertEqual(
self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
)
self.assertEqual(
self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
)
self.assertEqual(
self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic"
)
self.assertEqual(
self.db.set_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
)
self.assertEqual(
self.db.set_one.call_args[1]["update_dict"],
None,
"Wrong read-only projects update",
)
self.assertEqual(
self.db.set_one.call_args[1]["pull_list"],
{
"_admin.projects_read": (test_pid,),
"_admin.projects_write": (test_pid,),
},
"Wrong read/write projects update",
)
self.topic._send_msg.assert_not_called()
with self.subTest(i=2):
now = time()
cvws["_admin"] = {
"projects_read": [test_pid],
"projects_write": [test_pid],
"operations": [],
}
self.db.get_one.return_value = cvws
oid = self.topic.delete(self.fake_session, cid)
self.assertEqual(oid, cid + ":0", "Wrong operation identifier")
self.assertEqual(
self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
)
self.assertEqual(
self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
)
self.assertEqual(
self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic"
)
self.assertEqual(
self.db.set_one.call_args[0][1]["_id"], cid, "Wrong user identifier"
)
self.assertEqual(
self.db.set_one.call_args[1]["update_dict"],
{"_admin.to_delete": True},
"Wrong _admin.to_delete update",
)
operation = self.db.set_one.call_args[1]["push"]["_admin.operations"]
self.assertEqual(
operation["lcmOperationType"], "delete", "Wrong operation type"
)
self.assertEqual(
operation["operationState"], "PROCESSING", "Wrong operation state"
)
self.assertEqual(
operation["detailed-status"], "", "Wrong operation detailed status"
)
self.assertIsNone(
operation["operationParams"], "Wrong operation parameters"
)
self.assertGreater(
operation["startTime"], now, "Wrong operation start time"
)
self.assertGreater(
operation["statusEnteredTime"], now, "Wrong operation status enter time"
)
self.topic._send_msg.assert_called_once_with(
"delete", {"_id": cid, "op_id": cid + ":0"}, not_send_msg=None
)
with self.subTest(i=3):
cvws["_admin"] = {
"projects_read": [],
"projects_write": [],
"operations": [],
}
self.db.get_one.return_value = cvws
self.topic._send_msg.reset_mock()
self.db.get_one.reset_mock()
self.db.del_one.reset_mock()
self.fake_session["force"] = True # to force deletion
self.fake_session["admin"] = True # to force deletion
self.fake_session["project_id"] = [] # to force deletion
oid = self.topic.delete(self.fake_session, cid)
self.assertIsNone(oid, "Wrong operation identifier")
self.assertEqual(
self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic"
)
self.assertEqual(
self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
)
self.assertEqual(
self.db.del_one.call_args[0][0], self.topic.topic, "Wrong topic"
)
self.assertEqual(
self.db.del_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier"
)
self.topic._send_msg.assert_called_once_with(
"deleted", {"_id": cid, "op_id": None}, not_send_msg=None
)
if __name__ == "__main__":
unittest.main()