from threading import Lock
from time import time
from traceback import format_exc as traceback_format_exc
-from typing import Any, Dict, Tuple
+from typing import Any, Dict, Tuple, Type
from uuid import uuid4
from cryptography.hazmat.backends import default_backend as crypto_default_backend
msglocal,
version as common_version,
)
-from osm_common.dbbase import DbException
-from osm_common.fsbase import FsException
+from osm_common.dbbase import DbBase, DbException
+from osm_common.fsbase import FsBase, FsException
from osm_common.msgbase import MsgException
from osm_ng_ro.ns_thread import deep_get, NsWorker, NsWorkerException
from osm_ng_ro.validation import deploy_schema, validate_input
for target_id in vims_to_unload:
self._unload_vim(target_id)
- def _get_cloud_init(self, where):
- """
- Not used as cloud init content is provided in the http body. This method reads cloud init from a file
- :param where: can be 'vnfr_id:file:file_name' or 'vnfr_id:vdu:vdu_idex'
- :return:
+ @staticmethod
+ def _get_cloud_init(
+ db: Type[DbBase],
+ fs: Type[FsBase],
+ location: str,
+ ) -> str:
+ """This method reads cloud init from a file.
+
+ Note: Not used as cloud init content is provided in the http body.
+
+ Args:
+ db (Type[DbBase]): [description]
+ fs (Type[FsBase]): [description]
+ location (str): can be 'vnfr_id:file:file_name' or 'vnfr_id:vdu:vdu_idex'
+
+ Raises:
+ NsException: [description]
+ NsException: [description]
+
+ Returns:
+ str: [description]
"""
- vnfd_id, _, other = where.partition(":")
+ vnfd_id, _, other = location.partition(":")
_type, _, name = other.partition(":")
- vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
+ vnfd = db.get_one("vnfds", {"_id": vnfd_id})
if _type == "file":
base_folder = vnfd["_admin"]["storage"]
base_folder["folder"], base_folder["pkg-dir"], name
)
- if not self.fs:
+ if not fs:
raise NsException(
"Cannot read file '{}'. Filesystem not loaded, change configuration at storage.driver".format(
cloud_init_file
)
)
- with self.fs.file_open(cloud_init_file, "r") as ci_file:
+ with fs.file_open(cloud_init_file, "r") as ci_file:
cloud_init_content = ci_file.read()
elif _type == "vdu":
cloud_init_content = vnfd["vdu"][int(name)]["cloud-init"]
else:
- raise NsException("Mismatch descriptor for cloud init: {}".format(where))
+ raise NsException("Mismatch descriptor for cloud init: {}".format(location))
return cloud_init_content
- def _parse_jinja2(self, cloud_init_content, params, context):
+ @staticmethod
+ def _parse_jinja2(
+ cloud_init_content: str,
+ params: Dict[str, Any],
+ context: str,
+ ) -> str:
+ """Function that processes the cloud init to replace Jinja2 encoded parameters.
+
+ Args:
+ cloud_init_content (str): [description]
+ params (Dict[str, Any]): [description]
+ context (str): [description]
+
+ Raises:
+ NsException: [description]
+ NsException: [description]
+
+ Returns:
+ str: [description]
+ """
try:
env = Environment(undefined=StrictUndefined)
template = env.from_string(cloud_init_content)
indata: Dict[str, Any],
vim_info: Dict[str, Any],
target_record_id: str,
+ **kwargs: Dict[str, Any],
) -> Dict[str, Any]:
"""Function to process VDU image parameters.
indata: Dict[str, Any],
vim_info: Dict[str, Any],
target_record_id: str,
+ **kwargs: Dict[str, Any],
) -> Dict[str, Any]:
"""[summary]
indata: Dict[str, Any],
vim_info: Dict[str, Any],
target_record_id: str,
+ **kwargs: Dict[str, Any],
) -> Dict[str, Any]:
"""Function to process network parameters.
return extra_dict
+ @staticmethod
+ def _process_vdu_params(
+ target_vdu: Dict[str, Any],
+ indata: Dict[str, Any],
+ vim_info: Dict[str, Any],
+ target_record_id: str,
+ **kwargs: Dict[str, Any],
+ ) -> Dict[str, Any]:
+ """Function to process VDU parameters.
+
+ Args:
+ target_vdu (Dict[str, Any]): [description]
+ indata (Dict[str, Any]): [description]
+ vim_info (Dict[str, Any]): [description]
+ target_record_id (str): [description]
+
+ Returns:
+ Dict[str, Any]: [description]
+ """
+ vnfr_id = kwargs.get("vnfr_id")
+ nsr_id = kwargs.get("nsr_id")
+ vnfr = kwargs.get("vnfr")
+ vdu2cloud_init = kwargs.get("vdu2cloud_init")
+ tasks_by_target_record_id = kwargs.get("tasks_by_target_record_id")
+ logger = kwargs.get("logger")
+ db = kwargs.get("db")
+ fs = kwargs.get("fs")
+ ro_nsr_public_key = kwargs.get("ro_nsr_public_key")
+
+ vnf_preffix = "vnfrs:{}".format(vnfr_id)
+ ns_preffix = "nsrs:{}".format(nsr_id)
+ image_text = ns_preffix + ":image." + target_vdu["ns-image-id"]
+ flavor_text = ns_preffix + ":flavor." + target_vdu["ns-flavor-id"]
+ extra_dict = {"depends_on": [image_text, flavor_text]}
+ net_list = []
+
+ for iface_index, interface in enumerate(target_vdu["interfaces"]):
+ if interface.get("ns-vld-id"):
+ net_text = ns_preffix + ":vld." + interface["ns-vld-id"]
+ elif interface.get("vnf-vld-id"):
+ net_text = vnf_preffix + ":vld." + interface["vnf-vld-id"]
+ else:
+ logger.error(
+ "Interface {} from vdu {} not connected to any vld".format(
+ iface_index, target_vdu["vdu-name"]
+ )
+ )
+
+ continue # interface not connected to any vld
+
+ extra_dict["depends_on"].append(net_text)
+
+ if "port-security-enabled" in interface:
+ interface["port_security"] = interface.pop("port-security-enabled")
+
+ if "port-security-disable-strategy" in interface:
+ interface["port_security_disable_strategy"] = interface.pop(
+ "port-security-disable-strategy"
+ )
+
+ net_item = {
+ x: v
+ for x, v in interface.items()
+ if x
+ in (
+ "name",
+ "vpci",
+ "port_security",
+ "port_security_disable_strategy",
+ "floating_ip",
+ )
+ }
+ net_item["net_id"] = "TASK-" + net_text
+ net_item["type"] = "virtual"
+
+ # TODO mac_address: used for SR-IOV ifaces #TODO for other types
+ # TODO floating_ip: True/False (or it can be None)
+ if interface.get("type") in ("SR-IOV", "PCI-PASSTHROUGH"):
+ # mark the net create task as type data
+ if deep_get(
+ tasks_by_target_record_id,
+ net_text,
+ "params",
+ "net_type",
+ ):
+ tasks_by_target_record_id[net_text]["params"]["net_type"] = "data"
+
+ net_item["use"] = "data"
+ net_item["model"] = interface["type"]
+ net_item["type"] = interface["type"]
+ elif (
+ interface.get("type") == "OM-MGMT"
+ or interface.get("mgmt-interface")
+ or interface.get("mgmt-vnf")
+ ):
+ net_item["use"] = "mgmt"
+ else:
+ # if interface.get("type") in ("VIRTIO", "E1000", "PARAVIRT"):
+ net_item["use"] = "bridge"
+ net_item["model"] = interface.get("type")
+
+ if interface.get("ip-address"):
+ net_item["ip_address"] = interface["ip-address"]
+
+ if interface.get("mac-address"):
+ net_item["mac_address"] = interface["mac-address"]
+
+ net_list.append(net_item)
+
+ if interface.get("mgmt-vnf"):
+ extra_dict["mgmt_vnf_interface"] = iface_index
+ elif interface.get("mgmt-interface"):
+ extra_dict["mgmt_vdu_interface"] = iface_index
+
+ # cloud config
+ cloud_config = {}
+
+ if target_vdu.get("cloud-init"):
+ if target_vdu["cloud-init"] not in vdu2cloud_init:
+ vdu2cloud_init[target_vdu["cloud-init"]] = Ns._get_cloud_init(
+ db=db,
+ fs=fs,
+ location=target_vdu["cloud-init"],
+ )
+
+ cloud_content_ = vdu2cloud_init[target_vdu["cloud-init"]]
+ cloud_config["user-data"] = Ns._parse_jinja2(
+ cloud_init_content=cloud_content_,
+ params=target_vdu.get("additionalParams"),
+ context=target_vdu["cloud-init"],
+ )
+
+ if target_vdu.get("boot-data-drive"):
+ cloud_config["boot-data-drive"] = target_vdu.get("boot-data-drive")
+
+ ssh_keys = []
+
+ if target_vdu.get("ssh-keys"):
+ ssh_keys += target_vdu.get("ssh-keys")
+
+ if target_vdu.get("ssh-access-required"):
+ ssh_keys.append(ro_nsr_public_key)
+
+ if ssh_keys:
+ cloud_config["key-pairs"] = ssh_keys
+
+ disk_list = None
+ if target_vdu.get("virtual-storages"):
+ disk_list = [
+ {"size": disk["size-of-storage"]}
+ for disk in target_vdu["virtual-storages"]
+ if disk.get("type-of-storage")
+ == "persistent-storage:persistent-storage"
+ ]
+
+ extra_dict["params"] = {
+ "name": "{}-{}-{}-{}".format(
+ indata["name"][:16],
+ vnfr["member-vnf-index-ref"][:16],
+ target_vdu["vdu-name"][:32],
+ target_vdu.get("count-index") or 0,
+ ),
+ "description": target_vdu["vdu-name"],
+ "start": True,
+ "image_id": "TASK-" + image_text,
+ "flavor_id": "TASK-" + flavor_text,
+ "net_list": net_list,
+ "cloud_config": cloud_config or None,
+ "disk_list": disk_list,
+ "availability_zone_index": None, # TODO
+ "availability_zone_list": None, # TODO
+ }
+
+ return extra_dict
+
def deploy(self, session, indata, version, nsr_id, *args, **kwargs):
self.logger.debug("ns.deploy nsr_id={} indata={}".format(nsr_id, indata))
validate_input(indata, deploy_schema)
index += 1
- def _process_vdu_params(target_vdu, indata, vim_info, target_record_id):
- nonlocal vnfr_id
- nonlocal nsr_id
- nonlocal vnfr
- nonlocal vdu2cloud_init
- nonlocal tasks_by_target_record_id
-
- vnf_preffix = "vnfrs:{}".format(vnfr_id)
- ns_preffix = "nsrs:{}".format(nsr_id)
- image_text = ns_preffix + ":image." + target_vdu["ns-image-id"]
- flavor_text = ns_preffix + ":flavor." + target_vdu["ns-flavor-id"]
- extra_dict = {"depends_on": [image_text, flavor_text]}
- net_list = []
-
- for iface_index, interface in enumerate(target_vdu["interfaces"]):
- if interface.get("ns-vld-id"):
- net_text = ns_preffix + ":vld." + interface["ns-vld-id"]
- elif interface.get("vnf-vld-id"):
- net_text = vnf_preffix + ":vld." + interface["vnf-vld-id"]
- else:
- self.logger.error(
- "Interface {} from vdu {} not connected to any vld".format(
- iface_index, target_vdu["vdu-name"]
- )
- )
-
- continue # interface not connected to any vld
-
- extra_dict["depends_on"].append(net_text)
-
- if "port-security-enabled" in interface:
- interface["port_security"] = interface.pop(
- "port-security-enabled"
- )
-
- if "port-security-disable-strategy" in interface:
- interface["port_security_disable_strategy"] = interface.pop(
- "port-security-disable-strategy"
- )
-
- net_item = {
- x: v
- for x, v in interface.items()
- if x
- in (
- "name",
- "vpci",
- "port_security",
- "port_security_disable_strategy",
- "floating_ip",
- )
- }
- net_item["net_id"] = "TASK-" + net_text
- net_item["type"] = "virtual"
-
- # TODO mac_address: used for SR-IOV ifaces #TODO for other types
- # TODO floating_ip: True/False (or it can be None)
- if interface.get("type") in ("SR-IOV", "PCI-PASSTHROUGH"):
- # mark the net create task as type data
- if deep_get(
- tasks_by_target_record_id, net_text, "params", "net_type"
- ):
- tasks_by_target_record_id[net_text]["params"][
- "net_type"
- ] = "data"
-
- net_item["use"] = "data"
- net_item["model"] = interface["type"]
- net_item["type"] = interface["type"]
- elif (
- interface.get("type") == "OM-MGMT"
- or interface.get("mgmt-interface")
- or interface.get("mgmt-vnf")
- ):
- net_item["use"] = "mgmt"
- else:
- # if interface.get("type") in ("VIRTIO", "E1000", "PARAVIRT"):
- net_item["use"] = "bridge"
- net_item["model"] = interface.get("type")
-
- if interface.get("ip-address"):
- net_item["ip_address"] = interface["ip-address"]
-
- if interface.get("mac-address"):
- net_item["mac_address"] = interface["mac-address"]
-
- net_list.append(net_item)
-
- if interface.get("mgmt-vnf"):
- extra_dict["mgmt_vnf_interface"] = iface_index
- elif interface.get("mgmt-interface"):
- extra_dict["mgmt_vdu_interface"] = iface_index
-
- # cloud config
- cloud_config = {}
-
- if target_vdu.get("cloud-init"):
- if target_vdu["cloud-init"] not in vdu2cloud_init:
- vdu2cloud_init[target_vdu["cloud-init"]] = self._get_cloud_init(
- target_vdu["cloud-init"]
- )
-
- cloud_content_ = vdu2cloud_init[target_vdu["cloud-init"]]
- cloud_config["user-data"] = self._parse_jinja2(
- cloud_content_,
- target_vdu.get("additionalParams"),
- target_vdu["cloud-init"],
- )
-
- if target_vdu.get("boot-data-drive"):
- cloud_config["boot-data-drive"] = target_vdu.get("boot-data-drive")
-
- ssh_keys = []
-
- if target_vdu.get("ssh-keys"):
- ssh_keys += target_vdu.get("ssh-keys")
-
- if target_vdu.get("ssh-access-required"):
- ssh_keys.append(ro_nsr_public_key)
-
- if ssh_keys:
- cloud_config["key-pairs"] = ssh_keys
-
- disk_list = None
- if target_vdu.get("virtual-storages"):
- disk_list = [
- {"size": disk["size-of-storage"]}
- for disk in target_vdu["virtual-storages"]
- if disk.get("type-of-storage")
- == "persistent-storage:persistent-storage"
- ]
-
- extra_dict["params"] = {
- "name": "{}-{}-{}-{}".format(
- indata["name"][:16],
- vnfr["member-vnf-index-ref"][:16],
- target_vdu["vdu-name"][:32],
- target_vdu.get("count-index") or 0,
- ),
- "description": target_vdu["vdu-name"],
- "start": True,
- "image_id": "TASK-" + image_text,
- "flavor_id": "TASK-" + flavor_text,
- "net_list": net_list,
- "cloud_config": cloud_config or None,
- "disk_list": disk_list,
- "availability_zone_index": None, # TODO
- "availability_zone_list": None, # TODO
- }
-
- return extra_dict
-
def _process_items(
target_list,
existing_list,
item_ = "sdn_net"
target_record_id += ".sdn"
+ kwargs = {}
+ if process_params == Ns._process_vdu_params:
+ kwargs.update(
+ {
+ "vnfr_id": vnfr_id,
+ "nsr_id": nsr_id,
+ "vnfr": vnfr,
+ "vdu2cloud_init": vdu2cloud_init,
+ "tasks_by_target_record_id": tasks_by_target_record_id,
+ "logger": self.logger,
+ "db": self.db,
+ "fs": self.fs,
+ "ro_nsr_public_key": ro_nsr_public_key,
+ }
+ )
+
extra_dict = process_params(
- target_item, indata, target_viminfo, target_record_id
+ target_item,
+ indata,
+ target_viminfo,
+ target_record_id,
+ **kwargs,
)
self._assign_vim(target_vim)
db_update=db_vnfrs_update[vnfr["_id"]],
db_path="vdur",
item="vdu",
- process_params=_process_vdu_params,
+ process_params=Ns._process_vdu_params,
)
for db_task in db_new_tasks:
#######################################################################################
import unittest
-from unittest.mock import Mock, patch
+from unittest.mock import MagicMock, Mock, patch
-from osm_ng_ro.ns import Ns
+from jinja2 import TemplateError, TemplateNotFound, UndefinedError
+from osm_ng_ro.ns import Ns, NsException
__author__ = "Eduardo Sousa"
self.assertDictEqual(expected_result, result)
self.assertTrue(ip_profile_to_ro.called)
+
+ def test__get_cloud_init_exception(self):
+ db_mock = MagicMock(name="database mock")
+ fs_mock = None
+
+ location = ""
+
+ with self.assertRaises(NsException):
+ Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location)
+
+ def test__get_cloud_init_file_fs_exception(self):
+ db_mock = MagicMock(name="database mock")
+ fs_mock = None
+
+ location = "vnfr_id_123456:file:test_file"
+ db_mock.get_one.return_value = {
+ "_admin": {
+ "storage": {
+ "folder": "/home/osm",
+ "pkg-dir": "vnfr_test_dir",
+ },
+ },
+ }
+
+ with self.assertRaises(NsException):
+ Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location)
+
+ def test__get_cloud_init_file(self):
+ db_mock = MagicMock(name="database mock")
+ fs_mock = MagicMock(name="filesystem mock")
+ file_mock = MagicMock(name="file mock")
+
+ location = "vnfr_id_123456:file:test_file"
+ cloud_init_content = "this is a cloud init file content"
+
+ db_mock.get_one.return_value = {
+ "_admin": {
+ "storage": {
+ "folder": "/home/osm",
+ "pkg-dir": "vnfr_test_dir",
+ },
+ },
+ }
+ fs_mock.file_open.return_value = file_mock
+ file_mock.__enter__.return_value.read.return_value = cloud_init_content
+
+ result = Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location)
+
+ self.assertEqual(cloud_init_content, result)
+
+ def test__get_cloud_init_vdu(self):
+ db_mock = MagicMock(name="database mock")
+ fs_mock = None
+
+ location = "vnfr_id_123456:vdu:0"
+ cloud_init_content = "this is a cloud init file content"
+
+ db_mock.get_one.return_value = {
+ "vdu": {
+ 0: {
+ "cloud-init": cloud_init_content,
+ },
+ },
+ }
+
+ result = Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location)
+
+ self.assertEqual(cloud_init_content, result)
+
+ @patch("jinja2.Environment.__init__")
+ def test__parse_jinja2_undefined_error(self, env_mock: Mock):
+ cloud_init_content = None
+ params = None
+ context = None
+
+ env_mock.side_effect = UndefinedError("UndefinedError occurred.")
+
+ with self.assertRaises(NsException):
+ Ns._parse_jinja2(
+ cloud_init_content=cloud_init_content, params=params, context=context
+ )
+
+ @patch("jinja2.Environment.__init__")
+ def test__parse_jinja2_template_error(self, env_mock: Mock):
+ cloud_init_content = None
+ params = None
+ context = None
+
+ env_mock.side_effect = TemplateError("TemplateError occurred.")
+
+ with self.assertRaises(NsException):
+ Ns._parse_jinja2(
+ cloud_init_content=cloud_init_content, params=params, context=context
+ )
+
+ @patch("jinja2.Environment.__init__")
+ def test__parse_jinja2_template_not_found(self, env_mock: Mock):
+ cloud_init_content = None
+ params = None
+ context = None
+
+ env_mock.side_effect = TemplateNotFound("TemplateNotFound occurred.")
+
+ with self.assertRaises(NsException):
+ Ns._parse_jinja2(
+ cloud_init_content=cloud_init_content, params=params, context=context
+ )
+
+ def test__parse_jinja2(self):
+ pass
+
+ def test__process_vdu_params_empty_kargs(self):
+ pass
+
+ def test__process_vdu_params_interface_ns_vld_id(self):
+ pass
+
+ def test__process_vdu_params_interface_vnf_vld_id(self):
+ pass
+
+ def test__process_vdu_params_interface_unknown(self):
+ pass
+
+ def test__process_vdu_params_interface_port_security_enabled(self):
+ pass
+
+ def test__process_vdu_params_interface_port_security_disable_strategy(self):
+ pass
+
+ def test__process_vdu_params_interface_sriov(self):
+ pass
+
+ def test__process_vdu_params_interface_pci_passthrough(self):
+ pass
+
+ def test__process_vdu_params_interface_om_mgmt(self):
+ pass
+
+ def test__process_vdu_params_interface_mgmt_interface(self):
+ pass
+
+ def test__process_vdu_params_interface_mgmt_vnf(self):
+ pass
+
+ def test__process_vdu_params_interface_bridge(self):
+ pass
+
+ def test__process_vdu_params_interface_ip_address(self):
+ pass
+
+ def test__process_vdu_params_interface_mac_address(self):
+ pass
+
+ def test__process_vdu_params_vdu_cloud_init_missing(self):
+ pass
+
+ def test__process_vdu_params_vdu_cloud_init_present(self):
+ pass
+
+ def test__process_vdu_params_vdu_boot_data_drive(self):
+ pass
+
+ def test__process_vdu_params_vdu_ssh_keys(self):
+ pass
+
+ def test__process_vdu_params_vdu_ssh_access_required(self):
+ pass
+
+ def test__process_vdu_params_vdu_virtual_storages(self):
+ pass
+
+ def test__process_vdu_params(self):
+ pass
#######################################################################################
import logging
-from types import ModuleType
import unittest
-from unittest.mock import Mock, patch
+from unittest.mock import MagicMock, patch
from osm_ng_ro.ns_thread import VimInteractionNet
+from osm_ro_plugin.vimconn import VimConnConnectionException, VimConnException
class TestVimInteractionNet(unittest.TestCase):
- @classmethod
- def setUpClass(cls):
+ def setUp(self):
module_name = "osm_ro_plugin"
- osm_ro_plugin = ModuleType(module_name)
- osm_ro_plugin.vimconn = Mock(name=module_name + ".vimconn")
- osm_ro_plugin.vimconn.VimConnector = Mock(
- name=module_name + "vimconn.VimConnector"
- )
- osm_ro_plugin.vimconn.VimConnException = Mock(
- name=module_name + ".vimconn.VimConnException"
- )
- cls.target_vim = osm_ro_plugin.vimconn.VimConnector
- cls.VimConnException = osm_ro_plugin.vimconn.VimConnException
- cls.task_depends = None
-
- @classmethod
- def tearDownClass(cls):
- del cls.target_vim
- del cls.task_depends
- del cls.VimConnException
+ self.target_vim = MagicMock(name=f"{module_name}.vimconn.VimConnector")
+ self.task_depends = None
+
+ patches = [patch(f"{module_name}.vimconn.VimConnector", self.target_vim)]
+
+ # Enabling mocks and add cleanups
+ for mock in patches:
+ mock.start()
+ self.addCleanup(mock.stop)
def test__mgmt_net_id_in_find_params_mgmt_several_vim_nets(self):
"""
"target_record": "test_target_record",
"target_record_id": "test_target_record_id",
# values coming from extra_dict
- "params": "",
+ "params": {},
"find_params": {
"mgmt": True,
"name": "some_mgmt_name",
"item": "test_item",
"target_record": "test_target_record",
"target_record_id": "test_target_record_id",
- "params": "",
+ "params": {},
# values coming from extra_dict
"find_params": {
"mgmt": True,
"target_record": "test_target_record",
"target_record_id": "test_target_record_id",
# values coming from extra_dict
- "params": "",
+ "params": {},
"find_params": {
"mgmt": True,
"name": "some_mgmt_name",
"target_record": "test_target_record",
"target_record_id": "test_target_record_id",
# values coming from extra_dict
- "params": "",
+ "params": {},
"find_params": {
"mgmt": True,
"name": "some_mgmt_name",
"target_record": "test_target_record",
"target_record_id": "test_target_record_id",
# values coming from extra_dict
- "params": "",
+ "params": {},
"find_params": {
"filter_dict": {
"name": "some-network-name",
"target_record": "test_target_record",
"target_record_id": "test_target_record_id",
# values coming from extra_dict
- "params": "test_params",
+ "params": {
+ "net_name": "test_params",
+ },
"find_params": {
"filter_dict": {
"name": "some-network-name",
"target_record": "test_target_record",
"target_record_id": "test_target_record_id",
# values coming from extra_dict
- "params": "",
+ "params": {},
"find_params": {
"filter_dict": {
"name": "some-network-name",
"target_record": "test_target_record",
"target_record_id": "test_target_record_id",
# values coming from extra_dict
- "params": "",
+ "params": {},
"find_params": {
"mgmt": True,
"name": "some_mgmt_name",
"target_record": "test_target_record",
"target_record_id": "test_target_record_id",
# values coming from extra_dict
- "params": "",
+ "params": {},
"depends_on": "test_depends_on",
},
},
}
task_index = "task_index_12"
- with self.assertRaises(TypeError):
+ self.target_vim.new_network.side_effect = VimConnConnectionException(
+ "VimConnConnectionException occurred."
+ )
+ with self.assertLogs() as captured:
instance.new(ro_task, task_index, self.task_depends)
+ self.assertEqual(captured.records[0].levelname, "ERROR")
def test__refresh_ro_task_vim_status_active(self):
"""
self.assertEqual(result[0], task_status)
self.assertDictEqual(result[1], ro_vim_item_update)
- def test__refresh_ro_task_VimConnException_occured(self):
+ def test__refresh_ro_task_VimConnException_occurred(self):
"""
vimconn.VimConnException has occured
"""
"to_check_at": 1637324200.994312,
"tasks": {},
}
- self.target_vim.refresh_nets_status.side_effect = Mock(
- side_effect=self.VimConnException("VimConnException occured")
+ self.target_vim.refresh_nets_status.side_effect = VimConnException(
+ "VimConnException occurred."
)
- with self.assertRaises(TypeError):
+ with self.assertLogs() as captured:
instance.refresh(ro_task)
- self.target_vim.refresh_nets_status.side_effect = None
+ self.assertEqual(captured.records[0].levelname, "ERROR")
def test__refresh_ro_task_vim_status_deleted(self):
"""