From: sousaedu Date: Tue, 7 Dec 2021 15:33:46 +0000 (+0000) Subject: Extracting Ns._process_vdu_params() and creating unit test X-Git-Tag: v12.0.0rc1~41 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=refs%2Fchanges%2F83%2F11483%2F7;p=osm%2FRO.git Extracting Ns._process_vdu_params() and creating unit test Change-Id: Ie5bf2a57d85114d3eacc9a416004423a18d1f27c Signed-off-by: sousaedu --- diff --git a/NG-RO/osm_ng_ro/ns.py b/NG-RO/osm_ng_ro/ns.py index 706454d5..d187dd84 100644 --- a/NG-RO/osm_ng_ro/ns.py +++ b/NG-RO/osm_ng_ro/ns.py @@ -22,7 +22,7 @@ from random import choice as random_choice from threading import Lock from time import time from traceback import format_exc as traceback_format_exc -from typing import Any, Dict, Tuple +from typing import Any, Dict, Tuple, Type from uuid import uuid4 from cryptography.hazmat.backends import default_backend as crypto_default_backend @@ -44,8 +44,8 @@ from osm_common import ( msglocal, version as common_version, ) -from osm_common.dbbase import DbException -from osm_common.fsbase import FsException +from osm_common.dbbase import DbBase, DbException +from osm_common.fsbase import FsBase, FsException from osm_common.msgbase import MsgException from osm_ng_ro.ns_thread import deep_get, NsWorker, NsWorkerException from osm_ng_ro.validation import deploy_schema, validate_input @@ -295,15 +295,31 @@ class Ns(object): for target_id in vims_to_unload: self._unload_vim(target_id) - def _get_cloud_init(self, where): - """ - Not used as cloud init content is provided in the http body. This method reads cloud init from a file - :param where: can be 'vnfr_id:file:file_name' or 'vnfr_id:vdu:vdu_idex' - :return: + @staticmethod + def _get_cloud_init( + db: Type[DbBase], + fs: Type[FsBase], + location: str, + ) -> str: + """This method reads cloud init from a file. + + Note: Not used as cloud init content is provided in the http body. + + Args: + db (Type[DbBase]): [description] + fs (Type[FsBase]): [description] + location (str): can be 'vnfr_id:file:file_name' or 'vnfr_id:vdu:vdu_idex' + + Raises: + NsException: [description] + NsException: [description] + + Returns: + str: [description] """ - vnfd_id, _, other = where.partition(":") + vnfd_id, _, other = location.partition(":") _type, _, name = other.partition(":") - vnfd = self.db.get_one("vnfds", {"_id": vnfd_id}) + vnfd = db.get_one("vnfds", {"_id": vnfd_id}) if _type == "file": base_folder = vnfd["_admin"]["storage"] @@ -311,23 +327,42 @@ class Ns(object): base_folder["folder"], base_folder["pkg-dir"], name ) - if not self.fs: + if not fs: raise NsException( "Cannot read file '{}'. Filesystem not loaded, change configuration at storage.driver".format( cloud_init_file ) ) - with self.fs.file_open(cloud_init_file, "r") as ci_file: + with fs.file_open(cloud_init_file, "r") as ci_file: cloud_init_content = ci_file.read() elif _type == "vdu": cloud_init_content = vnfd["vdu"][int(name)]["cloud-init"] else: - raise NsException("Mismatch descriptor for cloud init: {}".format(where)) + raise NsException("Mismatch descriptor for cloud init: {}".format(location)) return cloud_init_content - def _parse_jinja2(self, cloud_init_content, params, context): + @staticmethod + def _parse_jinja2( + cloud_init_content: str, + params: Dict[str, Any], + context: str, + ) -> str: + """Function that processes the cloud init to replace Jinja2 encoded parameters. + + Args: + cloud_init_content (str): [description] + params (Dict[str, Any]): [description] + context (str): [description] + + Raises: + NsException: [description] + NsException: [description] + + Returns: + str: [description] + """ try: env = Environment(undefined=StrictUndefined) template = env.from_string(cloud_init_content) @@ -474,6 +509,7 @@ class Ns(object): indata: Dict[str, Any], vim_info: Dict[str, Any], target_record_id: str, + **kwargs: Dict[str, Any], ) -> Dict[str, Any]: """Function to process VDU image parameters. @@ -711,6 +747,7 @@ class Ns(object): indata: Dict[str, Any], vim_info: Dict[str, Any], target_record_id: str, + **kwargs: Dict[str, Any], ) -> Dict[str, Any]: """[summary] @@ -799,6 +836,7 @@ class Ns(object): indata: Dict[str, Any], vim_info: Dict[str, Any], target_record_id: str, + **kwargs: Dict[str, Any], ) -> Dict[str, Any]: """Function to process network parameters. @@ -868,6 +906,181 @@ class Ns(object): return extra_dict + @staticmethod + def _process_vdu_params( + target_vdu: Dict[str, Any], + indata: Dict[str, Any], + vim_info: Dict[str, Any], + target_record_id: str, + **kwargs: Dict[str, Any], + ) -> Dict[str, Any]: + """Function to process VDU parameters. + + Args: + target_vdu (Dict[str, Any]): [description] + indata (Dict[str, Any]): [description] + vim_info (Dict[str, Any]): [description] + target_record_id (str): [description] + + Returns: + Dict[str, Any]: [description] + """ + vnfr_id = kwargs.get("vnfr_id") + nsr_id = kwargs.get("nsr_id") + vnfr = kwargs.get("vnfr") + vdu2cloud_init = kwargs.get("vdu2cloud_init") + tasks_by_target_record_id = kwargs.get("tasks_by_target_record_id") + logger = kwargs.get("logger") + db = kwargs.get("db") + fs = kwargs.get("fs") + ro_nsr_public_key = kwargs.get("ro_nsr_public_key") + + vnf_preffix = "vnfrs:{}".format(vnfr_id) + ns_preffix = "nsrs:{}".format(nsr_id) + image_text = ns_preffix + ":image." + target_vdu["ns-image-id"] + flavor_text = ns_preffix + ":flavor." + target_vdu["ns-flavor-id"] + extra_dict = {"depends_on": [image_text, flavor_text]} + net_list = [] + + for iface_index, interface in enumerate(target_vdu["interfaces"]): + if interface.get("ns-vld-id"): + net_text = ns_preffix + ":vld." + interface["ns-vld-id"] + elif interface.get("vnf-vld-id"): + net_text = vnf_preffix + ":vld." + interface["vnf-vld-id"] + else: + logger.error( + "Interface {} from vdu {} not connected to any vld".format( + iface_index, target_vdu["vdu-name"] + ) + ) + + continue # interface not connected to any vld + + extra_dict["depends_on"].append(net_text) + + if "port-security-enabled" in interface: + interface["port_security"] = interface.pop("port-security-enabled") + + if "port-security-disable-strategy" in interface: + interface["port_security_disable_strategy"] = interface.pop( + "port-security-disable-strategy" + ) + + net_item = { + x: v + for x, v in interface.items() + if x + in ( + "name", + "vpci", + "port_security", + "port_security_disable_strategy", + "floating_ip", + ) + } + net_item["net_id"] = "TASK-" + net_text + net_item["type"] = "virtual" + + # TODO mac_address: used for SR-IOV ifaces #TODO for other types + # TODO floating_ip: True/False (or it can be None) + if interface.get("type") in ("SR-IOV", "PCI-PASSTHROUGH"): + # mark the net create task as type data + if deep_get( + tasks_by_target_record_id, + net_text, + "params", + "net_type", + ): + tasks_by_target_record_id[net_text]["params"]["net_type"] = "data" + + net_item["use"] = "data" + net_item["model"] = interface["type"] + net_item["type"] = interface["type"] + elif ( + interface.get("type") == "OM-MGMT" + or interface.get("mgmt-interface") + or interface.get("mgmt-vnf") + ): + net_item["use"] = "mgmt" + else: + # if interface.get("type") in ("VIRTIO", "E1000", "PARAVIRT"): + net_item["use"] = "bridge" + net_item["model"] = interface.get("type") + + if interface.get("ip-address"): + net_item["ip_address"] = interface["ip-address"] + + if interface.get("mac-address"): + net_item["mac_address"] = interface["mac-address"] + + net_list.append(net_item) + + if interface.get("mgmt-vnf"): + extra_dict["mgmt_vnf_interface"] = iface_index + elif interface.get("mgmt-interface"): + extra_dict["mgmt_vdu_interface"] = iface_index + + # cloud config + cloud_config = {} + + if target_vdu.get("cloud-init"): + if target_vdu["cloud-init"] not in vdu2cloud_init: + vdu2cloud_init[target_vdu["cloud-init"]] = Ns._get_cloud_init( + db=db, + fs=fs, + location=target_vdu["cloud-init"], + ) + + cloud_content_ = vdu2cloud_init[target_vdu["cloud-init"]] + cloud_config["user-data"] = Ns._parse_jinja2( + cloud_init_content=cloud_content_, + params=target_vdu.get("additionalParams"), + context=target_vdu["cloud-init"], + ) + + if target_vdu.get("boot-data-drive"): + cloud_config["boot-data-drive"] = target_vdu.get("boot-data-drive") + + ssh_keys = [] + + if target_vdu.get("ssh-keys"): + ssh_keys += target_vdu.get("ssh-keys") + + if target_vdu.get("ssh-access-required"): + ssh_keys.append(ro_nsr_public_key) + + if ssh_keys: + cloud_config["key-pairs"] = ssh_keys + + disk_list = None + if target_vdu.get("virtual-storages"): + disk_list = [ + {"size": disk["size-of-storage"]} + for disk in target_vdu["virtual-storages"] + if disk.get("type-of-storage") + == "persistent-storage:persistent-storage" + ] + + extra_dict["params"] = { + "name": "{}-{}-{}-{}".format( + indata["name"][:16], + vnfr["member-vnf-index-ref"][:16], + target_vdu["vdu-name"][:32], + target_vdu.get("count-index") or 0, + ), + "description": target_vdu["vdu-name"], + "start": True, + "image_id": "TASK-" + image_text, + "flavor_id": "TASK-" + flavor_text, + "net_list": net_list, + "cloud_config": cloud_config or None, + "disk_list": disk_list, + "availability_zone_index": None, # TODO + "availability_zone_list": None, # TODO + } + + return extra_dict + def deploy(self, session, indata, version, nsr_id, *args, **kwargs): self.logger.debug("ns.deploy nsr_id={} indata={}".format(nsr_id, indata)) validate_input(indata, deploy_schema) @@ -924,158 +1137,6 @@ class Ns(object): index += 1 - def _process_vdu_params(target_vdu, indata, vim_info, target_record_id): - nonlocal vnfr_id - nonlocal nsr_id - nonlocal vnfr - nonlocal vdu2cloud_init - nonlocal tasks_by_target_record_id - - vnf_preffix = "vnfrs:{}".format(vnfr_id) - ns_preffix = "nsrs:{}".format(nsr_id) - image_text = ns_preffix + ":image." + target_vdu["ns-image-id"] - flavor_text = ns_preffix + ":flavor." + target_vdu["ns-flavor-id"] - extra_dict = {"depends_on": [image_text, flavor_text]} - net_list = [] - - for iface_index, interface in enumerate(target_vdu["interfaces"]): - if interface.get("ns-vld-id"): - net_text = ns_preffix + ":vld." + interface["ns-vld-id"] - elif interface.get("vnf-vld-id"): - net_text = vnf_preffix + ":vld." + interface["vnf-vld-id"] - else: - self.logger.error( - "Interface {} from vdu {} not connected to any vld".format( - iface_index, target_vdu["vdu-name"] - ) - ) - - continue # interface not connected to any vld - - extra_dict["depends_on"].append(net_text) - - if "port-security-enabled" in interface: - interface["port_security"] = interface.pop( - "port-security-enabled" - ) - - if "port-security-disable-strategy" in interface: - interface["port_security_disable_strategy"] = interface.pop( - "port-security-disable-strategy" - ) - - net_item = { - x: v - for x, v in interface.items() - if x - in ( - "name", - "vpci", - "port_security", - "port_security_disable_strategy", - "floating_ip", - ) - } - net_item["net_id"] = "TASK-" + net_text - net_item["type"] = "virtual" - - # TODO mac_address: used for SR-IOV ifaces #TODO for other types - # TODO floating_ip: True/False (or it can be None) - if interface.get("type") in ("SR-IOV", "PCI-PASSTHROUGH"): - # mark the net create task as type data - if deep_get( - tasks_by_target_record_id, net_text, "params", "net_type" - ): - tasks_by_target_record_id[net_text]["params"][ - "net_type" - ] = "data" - - net_item["use"] = "data" - net_item["model"] = interface["type"] - net_item["type"] = interface["type"] - elif ( - interface.get("type") == "OM-MGMT" - or interface.get("mgmt-interface") - or interface.get("mgmt-vnf") - ): - net_item["use"] = "mgmt" - else: - # if interface.get("type") in ("VIRTIO", "E1000", "PARAVIRT"): - net_item["use"] = "bridge" - net_item["model"] = interface.get("type") - - if interface.get("ip-address"): - net_item["ip_address"] = interface["ip-address"] - - if interface.get("mac-address"): - net_item["mac_address"] = interface["mac-address"] - - net_list.append(net_item) - - if interface.get("mgmt-vnf"): - extra_dict["mgmt_vnf_interface"] = iface_index - elif interface.get("mgmt-interface"): - extra_dict["mgmt_vdu_interface"] = iface_index - - # cloud config - cloud_config = {} - - if target_vdu.get("cloud-init"): - if target_vdu["cloud-init"] not in vdu2cloud_init: - vdu2cloud_init[target_vdu["cloud-init"]] = self._get_cloud_init( - target_vdu["cloud-init"] - ) - - cloud_content_ = vdu2cloud_init[target_vdu["cloud-init"]] - cloud_config["user-data"] = self._parse_jinja2( - cloud_content_, - target_vdu.get("additionalParams"), - target_vdu["cloud-init"], - ) - - if target_vdu.get("boot-data-drive"): - cloud_config["boot-data-drive"] = target_vdu.get("boot-data-drive") - - ssh_keys = [] - - if target_vdu.get("ssh-keys"): - ssh_keys += target_vdu.get("ssh-keys") - - if target_vdu.get("ssh-access-required"): - ssh_keys.append(ro_nsr_public_key) - - if ssh_keys: - cloud_config["key-pairs"] = ssh_keys - - disk_list = None - if target_vdu.get("virtual-storages"): - disk_list = [ - {"size": disk["size-of-storage"]} - for disk in target_vdu["virtual-storages"] - if disk.get("type-of-storage") - == "persistent-storage:persistent-storage" - ] - - extra_dict["params"] = { - "name": "{}-{}-{}-{}".format( - indata["name"][:16], - vnfr["member-vnf-index-ref"][:16], - target_vdu["vdu-name"][:32], - target_vdu.get("count-index") or 0, - ), - "description": target_vdu["vdu-name"], - "start": True, - "image_id": "TASK-" + image_text, - "flavor_id": "TASK-" + flavor_text, - "net_list": net_list, - "cloud_config": cloud_config or None, - "disk_list": disk_list, - "availability_zone_index": None, # TODO - "availability_zone_list": None, # TODO - } - - return extra_dict - def _process_items( target_list, existing_list, @@ -1187,8 +1248,28 @@ class Ns(object): item_ = "sdn_net" target_record_id += ".sdn" + kwargs = {} + if process_params == Ns._process_vdu_params: + kwargs.update( + { + "vnfr_id": vnfr_id, + "nsr_id": nsr_id, + "vnfr": vnfr, + "vdu2cloud_init": vdu2cloud_init, + "tasks_by_target_record_id": tasks_by_target_record_id, + "logger": self.logger, + "db": self.db, + "fs": self.fs, + "ro_nsr_public_key": ro_nsr_public_key, + } + ) + extra_dict = process_params( - target_item, indata, target_viminfo, target_record_id + target_item, + indata, + target_viminfo, + target_record_id, + **kwargs, ) self._assign_vim(target_vim) @@ -1369,7 +1450,7 @@ class Ns(object): db_update=db_vnfrs_update[vnfr["_id"]], db_path="vdur", item="vdu", - process_params=_process_vdu_params, + process_params=Ns._process_vdu_params, ) for db_task in db_new_tasks: diff --git a/NG-RO/osm_ng_ro/tests/test_ns.py b/NG-RO/osm_ng_ro/tests/test_ns.py index 34590c12..bcd5d46d 100644 --- a/NG-RO/osm_ng_ro/tests/test_ns.py +++ b/NG-RO/osm_ng_ro/tests/test_ns.py @@ -16,9 +16,10 @@ ####################################################################################### import unittest -from unittest.mock import Mock, patch +from unittest.mock import MagicMock, Mock, patch -from osm_ng_ro.ns import Ns +from jinja2 import TemplateError, TemplateNotFound, UndefinedError +from osm_ng_ro.ns import Ns, NsException __author__ = "Eduardo Sousa" @@ -2288,3 +2289,176 @@ class TestNs(unittest.TestCase): self.assertDictEqual(expected_result, result) self.assertTrue(ip_profile_to_ro.called) + + def test__get_cloud_init_exception(self): + db_mock = MagicMock(name="database mock") + fs_mock = None + + location = "" + + with self.assertRaises(NsException): + Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location) + + def test__get_cloud_init_file_fs_exception(self): + db_mock = MagicMock(name="database mock") + fs_mock = None + + location = "vnfr_id_123456:file:test_file" + db_mock.get_one.return_value = { + "_admin": { + "storage": { + "folder": "/home/osm", + "pkg-dir": "vnfr_test_dir", + }, + }, + } + + with self.assertRaises(NsException): + Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location) + + def test__get_cloud_init_file(self): + db_mock = MagicMock(name="database mock") + fs_mock = MagicMock(name="filesystem mock") + file_mock = MagicMock(name="file mock") + + location = "vnfr_id_123456:file:test_file" + cloud_init_content = "this is a cloud init file content" + + db_mock.get_one.return_value = { + "_admin": { + "storage": { + "folder": "/home/osm", + "pkg-dir": "vnfr_test_dir", + }, + }, + } + fs_mock.file_open.return_value = file_mock + file_mock.__enter__.return_value.read.return_value = cloud_init_content + + result = Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location) + + self.assertEqual(cloud_init_content, result) + + def test__get_cloud_init_vdu(self): + db_mock = MagicMock(name="database mock") + fs_mock = None + + location = "vnfr_id_123456:vdu:0" + cloud_init_content = "this is a cloud init file content" + + db_mock.get_one.return_value = { + "vdu": { + 0: { + "cloud-init": cloud_init_content, + }, + }, + } + + result = Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location) + + self.assertEqual(cloud_init_content, result) + + @patch("jinja2.Environment.__init__") + def test__parse_jinja2_undefined_error(self, env_mock: Mock): + cloud_init_content = None + params = None + context = None + + env_mock.side_effect = UndefinedError("UndefinedError occurred.") + + with self.assertRaises(NsException): + Ns._parse_jinja2( + cloud_init_content=cloud_init_content, params=params, context=context + ) + + @patch("jinja2.Environment.__init__") + def test__parse_jinja2_template_error(self, env_mock: Mock): + cloud_init_content = None + params = None + context = None + + env_mock.side_effect = TemplateError("TemplateError occurred.") + + with self.assertRaises(NsException): + Ns._parse_jinja2( + cloud_init_content=cloud_init_content, params=params, context=context + ) + + @patch("jinja2.Environment.__init__") + def test__parse_jinja2_template_not_found(self, env_mock: Mock): + cloud_init_content = None + params = None + context = None + + env_mock.side_effect = TemplateNotFound("TemplateNotFound occurred.") + + with self.assertRaises(NsException): + Ns._parse_jinja2( + cloud_init_content=cloud_init_content, params=params, context=context + ) + + def test__parse_jinja2(self): + pass + + def test__process_vdu_params_empty_kargs(self): + pass + + def test__process_vdu_params_interface_ns_vld_id(self): + pass + + def test__process_vdu_params_interface_vnf_vld_id(self): + pass + + def test__process_vdu_params_interface_unknown(self): + pass + + def test__process_vdu_params_interface_port_security_enabled(self): + pass + + def test__process_vdu_params_interface_port_security_disable_strategy(self): + pass + + def test__process_vdu_params_interface_sriov(self): + pass + + def test__process_vdu_params_interface_pci_passthrough(self): + pass + + def test__process_vdu_params_interface_om_mgmt(self): + pass + + def test__process_vdu_params_interface_mgmt_interface(self): + pass + + def test__process_vdu_params_interface_mgmt_vnf(self): + pass + + def test__process_vdu_params_interface_bridge(self): + pass + + def test__process_vdu_params_interface_ip_address(self): + pass + + def test__process_vdu_params_interface_mac_address(self): + pass + + def test__process_vdu_params_vdu_cloud_init_missing(self): + pass + + def test__process_vdu_params_vdu_cloud_init_present(self): + pass + + def test__process_vdu_params_vdu_boot_data_drive(self): + pass + + def test__process_vdu_params_vdu_ssh_keys(self): + pass + + def test__process_vdu_params_vdu_ssh_access_required(self): + pass + + def test__process_vdu_params_vdu_virtual_storages(self): + pass + + def test__process_vdu_params(self): + pass diff --git a/NG-RO/osm_ng_ro/tests/test_ns_thread.py b/NG-RO/osm_ng_ro/tests/test_ns_thread.py index 16b98e72..9aa7c18e 100644 --- a/NG-RO/osm_ng_ro/tests/test_ns_thread.py +++ b/NG-RO/osm_ng_ro/tests/test_ns_thread.py @@ -16,34 +16,25 @@ ####################################################################################### import logging -from types import ModuleType import unittest -from unittest.mock import Mock, patch +from unittest.mock import MagicMock, patch from osm_ng_ro.ns_thread import VimInteractionNet +from osm_ro_plugin.vimconn import VimConnConnectionException, VimConnException class TestVimInteractionNet(unittest.TestCase): - @classmethod - def setUpClass(cls): + def setUp(self): module_name = "osm_ro_plugin" - osm_ro_plugin = ModuleType(module_name) - osm_ro_plugin.vimconn = Mock(name=module_name + ".vimconn") - osm_ro_plugin.vimconn.VimConnector = Mock( - name=module_name + "vimconn.VimConnector" - ) - osm_ro_plugin.vimconn.VimConnException = Mock( - name=module_name + ".vimconn.VimConnException" - ) - cls.target_vim = osm_ro_plugin.vimconn.VimConnector - cls.VimConnException = osm_ro_plugin.vimconn.VimConnException - cls.task_depends = None - - @classmethod - def tearDownClass(cls): - del cls.target_vim - del cls.task_depends - del cls.VimConnException + self.target_vim = MagicMock(name=f"{module_name}.vimconn.VimConnector") + self.task_depends = None + + patches = [patch(f"{module_name}.vimconn.VimConnector", self.target_vim)] + + # Enabling mocks and add cleanups + for mock in patches: + mock.start() + self.addCleanup(mock.stop) def test__mgmt_net_id_in_find_params_mgmt_several_vim_nets(self): """ @@ -79,7 +70,7 @@ class TestVimInteractionNet(unittest.TestCase): "target_record": "test_target_record", "target_record_id": "test_target_record_id", # values coming from extra_dict - "params": "", + "params": {}, "find_params": { "mgmt": True, "name": "some_mgmt_name", @@ -139,7 +130,7 @@ class TestVimInteractionNet(unittest.TestCase): "item": "test_item", "target_record": "test_target_record", "target_record_id": "test_target_record_id", - "params": "", + "params": {}, # values coming from extra_dict "find_params": { "mgmt": True, @@ -198,7 +189,7 @@ class TestVimInteractionNet(unittest.TestCase): "target_record": "test_target_record", "target_record_id": "test_target_record_id", # values coming from extra_dict - "params": "", + "params": {}, "find_params": { "mgmt": True, "name": "some_mgmt_name", @@ -259,7 +250,7 @@ class TestVimInteractionNet(unittest.TestCase): "target_record": "test_target_record", "target_record_id": "test_target_record_id", # values coming from extra_dict - "params": "", + "params": {}, "find_params": { "mgmt": True, "name": "some_mgmt_name", @@ -316,7 +307,7 @@ class TestVimInteractionNet(unittest.TestCase): "target_record": "test_target_record", "target_record_id": "test_target_record_id", # values coming from extra_dict - "params": "", + "params": {}, "find_params": { "filter_dict": { "name": "some-network-name", @@ -434,7 +425,9 @@ class TestVimInteractionNet(unittest.TestCase): "target_record": "test_target_record", "target_record_id": "test_target_record_id", # values coming from extra_dict - "params": "test_params", + "params": { + "net_name": "test_params", + }, "find_params": { "filter_dict": { "name": "some-network-name", @@ -489,7 +482,7 @@ class TestVimInteractionNet(unittest.TestCase): "target_record": "test_target_record", "target_record_id": "test_target_record_id", # values coming from extra_dict - "params": "", + "params": {}, "find_params": { "filter_dict": { "name": "some-network-name", @@ -548,7 +541,7 @@ class TestVimInteractionNet(unittest.TestCase): "target_record": "test_target_record", "target_record_id": "test_target_record_id", # values coming from extra_dict - "params": "", + "params": {}, "find_params": { "mgmt": True, "name": "some_mgmt_name", @@ -655,15 +648,19 @@ class TestVimInteractionNet(unittest.TestCase): "target_record": "test_target_record", "target_record_id": "test_target_record_id", # values coming from extra_dict - "params": "", + "params": {}, "depends_on": "test_depends_on", }, }, } task_index = "task_index_12" - with self.assertRaises(TypeError): + self.target_vim.new_network.side_effect = VimConnConnectionException( + "VimConnConnectionException occurred." + ) + with self.assertLogs() as captured: instance.new(ro_task, task_index, self.task_depends) + self.assertEqual(captured.records[0].levelname, "ERROR") def test__refresh_ro_task_vim_status_active(self): """ @@ -823,7 +820,7 @@ class TestVimInteractionNet(unittest.TestCase): self.assertEqual(result[0], task_status) self.assertDictEqual(result[1], ro_vim_item_update) - def test__refresh_ro_task_VimConnException_occured(self): + def test__refresh_ro_task_VimConnException_occurred(self): """ vimconn.VimConnException has occured """ @@ -858,12 +855,12 @@ class TestVimInteractionNet(unittest.TestCase): "to_check_at": 1637324200.994312, "tasks": {}, } - self.target_vim.refresh_nets_status.side_effect = Mock( - side_effect=self.VimConnException("VimConnException occured") + self.target_vim.refresh_nets_status.side_effect = VimConnException( + "VimConnException occurred." ) - with self.assertRaises(TypeError): + with self.assertLogs() as captured: instance.refresh(ro_task) - self.target_vim.refresh_nets_status.side_effect = None + self.assertEqual(captured.records[0].levelname, "ERROR") def test__refresh_ro_task_vim_status_deleted(self): """ diff --git a/releasenotes/notes/extracting_process_vdu_params-0a7602cda0555c80.yaml b/releasenotes/notes/extracting_process_vdu_params-0a7602cda0555c80.yaml new file mode 100644 index 00000000..82ee1bbf --- /dev/null +++ b/releasenotes/notes/extracting_process_vdu_params-0a7602cda0555c80.yaml @@ -0,0 +1,23 @@ +####################################################################################### +# Copyright ETSI Contributors and Others. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +####################################################################################### +--- +other: + - | + Extraction of _process_vdu_params() from being a nested function inside Ns.deploy(). + The _process_vdu_params() function is now a static method inside the Ns class. This + eases the testability of _process_vdu_params(). + With this extraction a unit test was introduced to cover the extracted function.