Extracting Ns._process_vdu_params() and creating unit test 83/11483/7
authorsousaedu <eduardo.sousa@canonical.com>
Tue, 7 Dec 2021 15:33:46 +0000 (15:33 +0000)
committersousaedu <eduardo.sousa@canonical.com>
Tue, 11 Jan 2022 13:14:40 +0000 (13:14 +0000)
Change-Id: Ie5bf2a57d85114d3eacc9a416004423a18d1f27c
Signed-off-by: sousaedu <eduardo.sousa@canonical.com>
NG-RO/osm_ng_ro/ns.py
NG-RO/osm_ng_ro/tests/test_ns.py
NG-RO/osm_ng_ro/tests/test_ns_thread.py
releasenotes/notes/extracting_process_vdu_params-0a7602cda0555c80.yaml [new file with mode: 0644]

index 706454d..d187dd8 100644 (file)
@@ -22,7 +22,7 @@ from random import choice as random_choice
 from threading import Lock
 from time import time
 from traceback import format_exc as traceback_format_exc
-from typing import Any, Dict, Tuple
+from typing import Any, Dict, Tuple, Type
 from uuid import uuid4
 
 from cryptography.hazmat.backends import default_backend as crypto_default_backend
@@ -44,8 +44,8 @@ from osm_common import (
     msglocal,
     version as common_version,
 )
-from osm_common.dbbase import DbException
-from osm_common.fsbase import FsException
+from osm_common.dbbase import DbBase, DbException
+from osm_common.fsbase import FsBase, FsException
 from osm_common.msgbase import MsgException
 from osm_ng_ro.ns_thread import deep_get, NsWorker, NsWorkerException
 from osm_ng_ro.validation import deploy_schema, validate_input
@@ -295,15 +295,31 @@ class Ns(object):
             for target_id in vims_to_unload:
                 self._unload_vim(target_id)
 
-    def _get_cloud_init(self, where):
-        """
-        Not used as cloud init content is provided in the http body. This method reads cloud init from a file
-        :param where: can be 'vnfr_id:file:file_name' or 'vnfr_id:vdu:vdu_idex'
-        :return:
+    @staticmethod
+    def _get_cloud_init(
+        db: Type[DbBase],
+        fs: Type[FsBase],
+        location: str,
+    ) -> str:
+        """This method reads cloud init from a file.
+
+        Note: Not used as cloud init content is provided in the http body.
+
+        Args:
+            db (Type[DbBase]): [description]
+            fs (Type[FsBase]): [description]
+            location (str): can be 'vnfr_id:file:file_name' or 'vnfr_id:vdu:vdu_idex'
+
+        Raises:
+            NsException: [description]
+            NsException: [description]
+
+        Returns:
+            str: [description]
         """
-        vnfd_id, _, other = where.partition(":")
+        vnfd_id, _, other = location.partition(":")
         _type, _, name = other.partition(":")
-        vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
+        vnfd = db.get_one("vnfds", {"_id": vnfd_id})
 
         if _type == "file":
             base_folder = vnfd["_admin"]["storage"]
@@ -311,23 +327,42 @@ class Ns(object):
                 base_folder["folder"], base_folder["pkg-dir"], name
             )
 
-            if not self.fs:
+            if not fs:
                 raise NsException(
                     "Cannot read file '{}'. Filesystem not loaded, change configuration at storage.driver".format(
                         cloud_init_file
                     )
                 )
 
-            with self.fs.file_open(cloud_init_file, "r") as ci_file:
+            with fs.file_open(cloud_init_file, "r") as ci_file:
                 cloud_init_content = ci_file.read()
         elif _type == "vdu":
             cloud_init_content = vnfd["vdu"][int(name)]["cloud-init"]
         else:
-            raise NsException("Mismatch descriptor for cloud init: {}".format(where))
+            raise NsException("Mismatch descriptor for cloud init: {}".format(location))
 
         return cloud_init_content
 
-    def _parse_jinja2(self, cloud_init_content, params, context):
+    @staticmethod
+    def _parse_jinja2(
+        cloud_init_content: str,
+        params: Dict[str, Any],
+        context: str,
+    ) -> str:
+        """Function that processes the cloud init to replace Jinja2 encoded parameters.
+
+        Args:
+            cloud_init_content (str): [description]
+            params (Dict[str, Any]): [description]
+            context (str): [description]
+
+        Raises:
+            NsException: [description]
+            NsException: [description]
+
+        Returns:
+            str: [description]
+        """
         try:
             env = Environment(undefined=StrictUndefined)
             template = env.from_string(cloud_init_content)
@@ -474,6 +509,7 @@ class Ns(object):
         indata: Dict[str, Any],
         vim_info: Dict[str, Any],
         target_record_id: str,
+        **kwargs: Dict[str, Any],
     ) -> Dict[str, Any]:
         """Function to process VDU image parameters.
 
@@ -711,6 +747,7 @@ class Ns(object):
         indata: Dict[str, Any],
         vim_info: Dict[str, Any],
         target_record_id: str,
+        **kwargs: Dict[str, Any],
     ) -> Dict[str, Any]:
         """[summary]
 
@@ -799,6 +836,7 @@ class Ns(object):
         indata: Dict[str, Any],
         vim_info: Dict[str, Any],
         target_record_id: str,
+        **kwargs: Dict[str, Any],
     ) -> Dict[str, Any]:
         """Function to process network parameters.
 
@@ -868,6 +906,181 @@ class Ns(object):
 
         return extra_dict
 
+    @staticmethod
+    def _process_vdu_params(
+        target_vdu: Dict[str, Any],
+        indata: Dict[str, Any],
+        vim_info: Dict[str, Any],
+        target_record_id: str,
+        **kwargs: Dict[str, Any],
+    ) -> Dict[str, Any]:
+        """Function to process VDU parameters.
+
+        Args:
+            target_vdu (Dict[str, Any]): [description]
+            indata (Dict[str, Any]): [description]
+            vim_info (Dict[str, Any]): [description]
+            target_record_id (str): [description]
+
+        Returns:
+            Dict[str, Any]: [description]
+        """
+        vnfr_id = kwargs.get("vnfr_id")
+        nsr_id = kwargs.get("nsr_id")
+        vnfr = kwargs.get("vnfr")
+        vdu2cloud_init = kwargs.get("vdu2cloud_init")
+        tasks_by_target_record_id = kwargs.get("tasks_by_target_record_id")
+        logger = kwargs.get("logger")
+        db = kwargs.get("db")
+        fs = kwargs.get("fs")
+        ro_nsr_public_key = kwargs.get("ro_nsr_public_key")
+
+        vnf_preffix = "vnfrs:{}".format(vnfr_id)
+        ns_preffix = "nsrs:{}".format(nsr_id)
+        image_text = ns_preffix + ":image." + target_vdu["ns-image-id"]
+        flavor_text = ns_preffix + ":flavor." + target_vdu["ns-flavor-id"]
+        extra_dict = {"depends_on": [image_text, flavor_text]}
+        net_list = []
+
+        for iface_index, interface in enumerate(target_vdu["interfaces"]):
+            if interface.get("ns-vld-id"):
+                net_text = ns_preffix + ":vld." + interface["ns-vld-id"]
+            elif interface.get("vnf-vld-id"):
+                net_text = vnf_preffix + ":vld." + interface["vnf-vld-id"]
+            else:
+                logger.error(
+                    "Interface {} from vdu {} not connected to any vld".format(
+                        iface_index, target_vdu["vdu-name"]
+                    )
+                )
+
+                continue  # interface not connected to any vld
+
+            extra_dict["depends_on"].append(net_text)
+
+            if "port-security-enabled" in interface:
+                interface["port_security"] = interface.pop("port-security-enabled")
+
+            if "port-security-disable-strategy" in interface:
+                interface["port_security_disable_strategy"] = interface.pop(
+                    "port-security-disable-strategy"
+                )
+
+            net_item = {
+                x: v
+                for x, v in interface.items()
+                if x
+                in (
+                    "name",
+                    "vpci",
+                    "port_security",
+                    "port_security_disable_strategy",
+                    "floating_ip",
+                )
+            }
+            net_item["net_id"] = "TASK-" + net_text
+            net_item["type"] = "virtual"
+
+            # TODO mac_address: used for  SR-IOV ifaces #TODO for other types
+            # TODO floating_ip: True/False (or it can be None)
+            if interface.get("type") in ("SR-IOV", "PCI-PASSTHROUGH"):
+                # mark the net create task as type data
+                if deep_get(
+                    tasks_by_target_record_id,
+                    net_text,
+                    "params",
+                    "net_type",
+                ):
+                    tasks_by_target_record_id[net_text]["params"]["net_type"] = "data"
+
+                net_item["use"] = "data"
+                net_item["model"] = interface["type"]
+                net_item["type"] = interface["type"]
+            elif (
+                interface.get("type") == "OM-MGMT"
+                or interface.get("mgmt-interface")
+                or interface.get("mgmt-vnf")
+            ):
+                net_item["use"] = "mgmt"
+            else:
+                # if interface.get("type") in ("VIRTIO", "E1000", "PARAVIRT"):
+                net_item["use"] = "bridge"
+                net_item["model"] = interface.get("type")
+
+            if interface.get("ip-address"):
+                net_item["ip_address"] = interface["ip-address"]
+
+            if interface.get("mac-address"):
+                net_item["mac_address"] = interface["mac-address"]
+
+            net_list.append(net_item)
+
+            if interface.get("mgmt-vnf"):
+                extra_dict["mgmt_vnf_interface"] = iface_index
+            elif interface.get("mgmt-interface"):
+                extra_dict["mgmt_vdu_interface"] = iface_index
+
+        # cloud config
+        cloud_config = {}
+
+        if target_vdu.get("cloud-init"):
+            if target_vdu["cloud-init"] not in vdu2cloud_init:
+                vdu2cloud_init[target_vdu["cloud-init"]] = Ns._get_cloud_init(
+                    db=db,
+                    fs=fs,
+                    location=target_vdu["cloud-init"],
+                )
+
+            cloud_content_ = vdu2cloud_init[target_vdu["cloud-init"]]
+            cloud_config["user-data"] = Ns._parse_jinja2(
+                cloud_init_content=cloud_content_,
+                params=target_vdu.get("additionalParams"),
+                context=target_vdu["cloud-init"],
+            )
+
+        if target_vdu.get("boot-data-drive"):
+            cloud_config["boot-data-drive"] = target_vdu.get("boot-data-drive")
+
+        ssh_keys = []
+
+        if target_vdu.get("ssh-keys"):
+            ssh_keys += target_vdu.get("ssh-keys")
+
+        if target_vdu.get("ssh-access-required"):
+            ssh_keys.append(ro_nsr_public_key)
+
+        if ssh_keys:
+            cloud_config["key-pairs"] = ssh_keys
+
+        disk_list = None
+        if target_vdu.get("virtual-storages"):
+            disk_list = [
+                {"size": disk["size-of-storage"]}
+                for disk in target_vdu["virtual-storages"]
+                if disk.get("type-of-storage")
+                == "persistent-storage:persistent-storage"
+            ]
+
+        extra_dict["params"] = {
+            "name": "{}-{}-{}-{}".format(
+                indata["name"][:16],
+                vnfr["member-vnf-index-ref"][:16],
+                target_vdu["vdu-name"][:32],
+                target_vdu.get("count-index") or 0,
+            ),
+            "description": target_vdu["vdu-name"],
+            "start": True,
+            "image_id": "TASK-" + image_text,
+            "flavor_id": "TASK-" + flavor_text,
+            "net_list": net_list,
+            "cloud_config": cloud_config or None,
+            "disk_list": disk_list,
+            "availability_zone_index": None,  # TODO
+            "availability_zone_list": None,  # TODO
+        }
+
+        return extra_dict
+
     def deploy(self, session, indata, version, nsr_id, *args, **kwargs):
         self.logger.debug("ns.deploy nsr_id={} indata={}".format(nsr_id, indata))
         validate_input(indata, deploy_schema)
@@ -924,158 +1137,6 @@ class Ns(object):
 
                     index += 1
 
-            def _process_vdu_params(target_vdu, indata, vim_info, target_record_id):
-                nonlocal vnfr_id
-                nonlocal nsr_id
-                nonlocal vnfr
-                nonlocal vdu2cloud_init
-                nonlocal tasks_by_target_record_id
-
-                vnf_preffix = "vnfrs:{}".format(vnfr_id)
-                ns_preffix = "nsrs:{}".format(nsr_id)
-                image_text = ns_preffix + ":image." + target_vdu["ns-image-id"]
-                flavor_text = ns_preffix + ":flavor." + target_vdu["ns-flavor-id"]
-                extra_dict = {"depends_on": [image_text, flavor_text]}
-                net_list = []
-
-                for iface_index, interface in enumerate(target_vdu["interfaces"]):
-                    if interface.get("ns-vld-id"):
-                        net_text = ns_preffix + ":vld." + interface["ns-vld-id"]
-                    elif interface.get("vnf-vld-id"):
-                        net_text = vnf_preffix + ":vld." + interface["vnf-vld-id"]
-                    else:
-                        self.logger.error(
-                            "Interface {} from vdu {} not connected to any vld".format(
-                                iface_index, target_vdu["vdu-name"]
-                            )
-                        )
-
-                        continue  # interface not connected to any vld
-
-                    extra_dict["depends_on"].append(net_text)
-
-                    if "port-security-enabled" in interface:
-                        interface["port_security"] = interface.pop(
-                            "port-security-enabled"
-                        )
-
-                    if "port-security-disable-strategy" in interface:
-                        interface["port_security_disable_strategy"] = interface.pop(
-                            "port-security-disable-strategy"
-                        )
-
-                    net_item = {
-                        x: v
-                        for x, v in interface.items()
-                        if x
-                        in (
-                            "name",
-                            "vpci",
-                            "port_security",
-                            "port_security_disable_strategy",
-                            "floating_ip",
-                        )
-                    }
-                    net_item["net_id"] = "TASK-" + net_text
-                    net_item["type"] = "virtual"
-
-                    # TODO mac_address: used for  SR-IOV ifaces #TODO for other types
-                    # TODO floating_ip: True/False (or it can be None)
-                    if interface.get("type") in ("SR-IOV", "PCI-PASSTHROUGH"):
-                        # mark the net create task as type data
-                        if deep_get(
-                            tasks_by_target_record_id, net_text, "params", "net_type"
-                        ):
-                            tasks_by_target_record_id[net_text]["params"][
-                                "net_type"
-                            ] = "data"
-
-                        net_item["use"] = "data"
-                        net_item["model"] = interface["type"]
-                        net_item["type"] = interface["type"]
-                    elif (
-                        interface.get("type") == "OM-MGMT"
-                        or interface.get("mgmt-interface")
-                        or interface.get("mgmt-vnf")
-                    ):
-                        net_item["use"] = "mgmt"
-                    else:
-                        # if interface.get("type") in ("VIRTIO", "E1000", "PARAVIRT"):
-                        net_item["use"] = "bridge"
-                        net_item["model"] = interface.get("type")
-
-                    if interface.get("ip-address"):
-                        net_item["ip_address"] = interface["ip-address"]
-
-                    if interface.get("mac-address"):
-                        net_item["mac_address"] = interface["mac-address"]
-
-                    net_list.append(net_item)
-
-                    if interface.get("mgmt-vnf"):
-                        extra_dict["mgmt_vnf_interface"] = iface_index
-                    elif interface.get("mgmt-interface"):
-                        extra_dict["mgmt_vdu_interface"] = iface_index
-
-                # cloud config
-                cloud_config = {}
-
-                if target_vdu.get("cloud-init"):
-                    if target_vdu["cloud-init"] not in vdu2cloud_init:
-                        vdu2cloud_init[target_vdu["cloud-init"]] = self._get_cloud_init(
-                            target_vdu["cloud-init"]
-                        )
-
-                    cloud_content_ = vdu2cloud_init[target_vdu["cloud-init"]]
-                    cloud_config["user-data"] = self._parse_jinja2(
-                        cloud_content_,
-                        target_vdu.get("additionalParams"),
-                        target_vdu["cloud-init"],
-                    )
-
-                if target_vdu.get("boot-data-drive"):
-                    cloud_config["boot-data-drive"] = target_vdu.get("boot-data-drive")
-
-                ssh_keys = []
-
-                if target_vdu.get("ssh-keys"):
-                    ssh_keys += target_vdu.get("ssh-keys")
-
-                if target_vdu.get("ssh-access-required"):
-                    ssh_keys.append(ro_nsr_public_key)
-
-                if ssh_keys:
-                    cloud_config["key-pairs"] = ssh_keys
-
-                disk_list = None
-                if target_vdu.get("virtual-storages"):
-                    disk_list = [
-                        {"size": disk["size-of-storage"]}
-                        for disk in target_vdu["virtual-storages"]
-                        if disk.get("type-of-storage")
-                        == "persistent-storage:persistent-storage"
-                    ]
-
-                extra_dict["params"] = {
-                    "name": "{}-{}-{}-{}".format(
-                        indata["name"][:16],
-                        vnfr["member-vnf-index-ref"][:16],
-                        target_vdu["vdu-name"][:32],
-                        target_vdu.get("count-index") or 0,
-                    ),
-                    "description": target_vdu["vdu-name"],
-                    "start": True,
-                    "image_id": "TASK-" + image_text,
-                    "flavor_id": "TASK-" + flavor_text,
-                    "net_list": net_list,
-                    "cloud_config": cloud_config or None,
-                    "disk_list": disk_list,
-                    "availability_zone_index": None,  # TODO
-                    "availability_zone_list": None,  # TODO
-                }
-
-                return extra_dict
-
             def _process_items(
                 target_list,
                 existing_list,
@@ -1187,8 +1248,28 @@ class Ns(object):
                             item_ = "sdn_net"
                             target_record_id += ".sdn"
 
+                        kwargs = {}
+                        if process_params == Ns._process_vdu_params:
+                            kwargs.update(
+                                {
+                                    "vnfr_id": vnfr_id,
+                                    "nsr_id": nsr_id,
+                                    "vnfr": vnfr,
+                                    "vdu2cloud_init": vdu2cloud_init,
+                                    "tasks_by_target_record_id": tasks_by_target_record_id,
+                                    "logger": self.logger,
+                                    "db": self.db,
+                                    "fs": self.fs,
+                                    "ro_nsr_public_key": ro_nsr_public_key,
+                                }
+                            )
+
                         extra_dict = process_params(
-                            target_item, indata, target_viminfo, target_record_id
+                            target_item,
+                            indata,
+                            target_viminfo,
+                            target_record_id,
+                            **kwargs,
                         )
                         self._assign_vim(target_vim)
 
@@ -1369,7 +1450,7 @@ class Ns(object):
                             db_update=db_vnfrs_update[vnfr["_id"]],
                             db_path="vdur",
                             item="vdu",
-                            process_params=_process_vdu_params,
+                            process_params=Ns._process_vdu_params,
                         )
 
                 for db_task in db_new_tasks:
index 34590c1..bcd5d46 100644 (file)
 #######################################################################################
 
 import unittest
-from unittest.mock import Mock, patch
+from unittest.mock import MagicMock, Mock, patch
 
-from osm_ng_ro.ns import Ns
+from jinja2 import TemplateError, TemplateNotFound, UndefinedError
+from osm_ng_ro.ns import Ns, NsException
 
 
 __author__ = "Eduardo Sousa"
@@ -2288,3 +2289,176 @@ class TestNs(unittest.TestCase):
 
         self.assertDictEqual(expected_result, result)
         self.assertTrue(ip_profile_to_ro.called)
+
+    def test__get_cloud_init_exception(self):
+        db_mock = MagicMock(name="database mock")
+        fs_mock = None
+
+        location = ""
+
+        with self.assertRaises(NsException):
+            Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location)
+
+    def test__get_cloud_init_file_fs_exception(self):
+        db_mock = MagicMock(name="database mock")
+        fs_mock = None
+
+        location = "vnfr_id_123456:file:test_file"
+        db_mock.get_one.return_value = {
+            "_admin": {
+                "storage": {
+                    "folder": "/home/osm",
+                    "pkg-dir": "vnfr_test_dir",
+                },
+            },
+        }
+
+        with self.assertRaises(NsException):
+            Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location)
+
+    def test__get_cloud_init_file(self):
+        db_mock = MagicMock(name="database mock")
+        fs_mock = MagicMock(name="filesystem mock")
+        file_mock = MagicMock(name="file mock")
+
+        location = "vnfr_id_123456:file:test_file"
+        cloud_init_content = "this is a cloud init file content"
+
+        db_mock.get_one.return_value = {
+            "_admin": {
+                "storage": {
+                    "folder": "/home/osm",
+                    "pkg-dir": "vnfr_test_dir",
+                },
+            },
+        }
+        fs_mock.file_open.return_value = file_mock
+        file_mock.__enter__.return_value.read.return_value = cloud_init_content
+
+        result = Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location)
+
+        self.assertEqual(cloud_init_content, result)
+
+    def test__get_cloud_init_vdu(self):
+        db_mock = MagicMock(name="database mock")
+        fs_mock = None
+
+        location = "vnfr_id_123456:vdu:0"
+        cloud_init_content = "this is a cloud init file content"
+
+        db_mock.get_one.return_value = {
+            "vdu": {
+                0: {
+                    "cloud-init": cloud_init_content,
+                },
+            },
+        }
+
+        result = Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location)
+
+        self.assertEqual(cloud_init_content, result)
+
+    @patch("jinja2.Environment.__init__")
+    def test__parse_jinja2_undefined_error(self, env_mock: Mock):
+        cloud_init_content = None
+        params = None
+        context = None
+
+        env_mock.side_effect = UndefinedError("UndefinedError occurred.")
+
+        with self.assertRaises(NsException):
+            Ns._parse_jinja2(
+                cloud_init_content=cloud_init_content, params=params, context=context
+            )
+
+    @patch("jinja2.Environment.__init__")
+    def test__parse_jinja2_template_error(self, env_mock: Mock):
+        cloud_init_content = None
+        params = None
+        context = None
+
+        env_mock.side_effect = TemplateError("TemplateError occurred.")
+
+        with self.assertRaises(NsException):
+            Ns._parse_jinja2(
+                cloud_init_content=cloud_init_content, params=params, context=context
+            )
+
+    @patch("jinja2.Environment.__init__")
+    def test__parse_jinja2_template_not_found(self, env_mock: Mock):
+        cloud_init_content = None
+        params = None
+        context = None
+
+        env_mock.side_effect = TemplateNotFound("TemplateNotFound occurred.")
+
+        with self.assertRaises(NsException):
+            Ns._parse_jinja2(
+                cloud_init_content=cloud_init_content, params=params, context=context
+            )
+
+    def test__parse_jinja2(self):
+        pass
+
+    def test__process_vdu_params_empty_kargs(self):
+        pass
+
+    def test__process_vdu_params_interface_ns_vld_id(self):
+        pass
+
+    def test__process_vdu_params_interface_vnf_vld_id(self):
+        pass
+
+    def test__process_vdu_params_interface_unknown(self):
+        pass
+
+    def test__process_vdu_params_interface_port_security_enabled(self):
+        pass
+
+    def test__process_vdu_params_interface_port_security_disable_strategy(self):
+        pass
+
+    def test__process_vdu_params_interface_sriov(self):
+        pass
+
+    def test__process_vdu_params_interface_pci_passthrough(self):
+        pass
+
+    def test__process_vdu_params_interface_om_mgmt(self):
+        pass
+
+    def test__process_vdu_params_interface_mgmt_interface(self):
+        pass
+
+    def test__process_vdu_params_interface_mgmt_vnf(self):
+        pass
+
+    def test__process_vdu_params_interface_bridge(self):
+        pass
+
+    def test__process_vdu_params_interface_ip_address(self):
+        pass
+
+    def test__process_vdu_params_interface_mac_address(self):
+        pass
+
+    def test__process_vdu_params_vdu_cloud_init_missing(self):
+        pass
+
+    def test__process_vdu_params_vdu_cloud_init_present(self):
+        pass
+
+    def test__process_vdu_params_vdu_boot_data_drive(self):
+        pass
+
+    def test__process_vdu_params_vdu_ssh_keys(self):
+        pass
+
+    def test__process_vdu_params_vdu_ssh_access_required(self):
+        pass
+
+    def test__process_vdu_params_vdu_virtual_storages(self):
+        pass
+
+    def test__process_vdu_params(self):
+        pass
index 16b98e7..9aa7c18 100644 (file)
 #######################################################################################
 
 import logging
-from types import ModuleType
 import unittest
-from unittest.mock import Mock, patch
+from unittest.mock import MagicMock, patch
 
 from osm_ng_ro.ns_thread import VimInteractionNet
+from osm_ro_plugin.vimconn import VimConnConnectionException, VimConnException
 
 
 class TestVimInteractionNet(unittest.TestCase):
-    @classmethod
-    def setUpClass(cls):
+    def setUp(self):
         module_name = "osm_ro_plugin"
-        osm_ro_plugin = ModuleType(module_name)
-        osm_ro_plugin.vimconn = Mock(name=module_name + ".vimconn")
-        osm_ro_plugin.vimconn.VimConnector = Mock(
-            name=module_name + "vimconn.VimConnector"
-        )
-        osm_ro_plugin.vimconn.VimConnException = Mock(
-            name=module_name + ".vimconn.VimConnException"
-        )
-        cls.target_vim = osm_ro_plugin.vimconn.VimConnector
-        cls.VimConnException = osm_ro_plugin.vimconn.VimConnException
-        cls.task_depends = None
-
-    @classmethod
-    def tearDownClass(cls):
-        del cls.target_vim
-        del cls.task_depends
-        del cls.VimConnException
+        self.target_vim = MagicMock(name=f"{module_name}.vimconn.VimConnector")
+        self.task_depends = None
+
+        patches = [patch(f"{module_name}.vimconn.VimConnector", self.target_vim)]
+
+        # Enabling mocks and add cleanups
+        for mock in patches:
+            mock.start()
+            self.addCleanup(mock.stop)
 
     def test__mgmt_net_id_in_find_params_mgmt_several_vim_nets(self):
         """
@@ -79,7 +70,7 @@ class TestVimInteractionNet(unittest.TestCase):
                         "target_record": "test_target_record",
                         "target_record_id": "test_target_record_id",
                         # values coming from extra_dict
-                        "params": "",
+                        "params": {},
                         "find_params": {
                             "mgmt": True,
                             "name": "some_mgmt_name",
@@ -139,7 +130,7 @@ class TestVimInteractionNet(unittest.TestCase):
                         "item": "test_item",
                         "target_record": "test_target_record",
                         "target_record_id": "test_target_record_id",
-                        "params": "",
+                        "params": {},
                         # values coming from extra_dict
                         "find_params": {
                             "mgmt": True,
@@ -198,7 +189,7 @@ class TestVimInteractionNet(unittest.TestCase):
                         "target_record": "test_target_record",
                         "target_record_id": "test_target_record_id",
                         # values coming from extra_dict
-                        "params": "",
+                        "params": {},
                         "find_params": {
                             "mgmt": True,
                             "name": "some_mgmt_name",
@@ -259,7 +250,7 @@ class TestVimInteractionNet(unittest.TestCase):
                         "target_record": "test_target_record",
                         "target_record_id": "test_target_record_id",
                         # values coming from extra_dict
-                        "params": "",
+                        "params": {},
                         "find_params": {
                             "mgmt": True,
                             "name": "some_mgmt_name",
@@ -316,7 +307,7 @@ class TestVimInteractionNet(unittest.TestCase):
                         "target_record": "test_target_record",
                         "target_record_id": "test_target_record_id",
                         # values coming from extra_dict
-                        "params": "",
+                        "params": {},
                         "find_params": {
                             "filter_dict": {
                                 "name": "some-network-name",
@@ -434,7 +425,9 @@ class TestVimInteractionNet(unittest.TestCase):
                         "target_record": "test_target_record",
                         "target_record_id": "test_target_record_id",
                         # values coming from extra_dict
-                        "params": "test_params",
+                        "params": {
+                            "net_name": "test_params",
+                        },
                         "find_params": {
                             "filter_dict": {
                                 "name": "some-network-name",
@@ -489,7 +482,7 @@ class TestVimInteractionNet(unittest.TestCase):
                         "target_record": "test_target_record",
                         "target_record_id": "test_target_record_id",
                         # values coming from extra_dict
-                        "params": "",
+                        "params": {},
                         "find_params": {
                             "filter_dict": {
                                 "name": "some-network-name",
@@ -548,7 +541,7 @@ class TestVimInteractionNet(unittest.TestCase):
                         "target_record": "test_target_record",
                         "target_record_id": "test_target_record_id",
                         # values coming from extra_dict
-                        "params": "",
+                        "params": {},
                         "find_params": {
                             "mgmt": True,
                             "name": "some_mgmt_name",
@@ -655,15 +648,19 @@ class TestVimInteractionNet(unittest.TestCase):
                         "target_record": "test_target_record",
                         "target_record_id": "test_target_record_id",
                         # values coming from extra_dict
-                        "params": "",
+                        "params": {},
                         "depends_on": "test_depends_on",
                     },
                 },
             }
 
             task_index = "task_index_12"
-            with self.assertRaises(TypeError):
+            self.target_vim.new_network.side_effect = VimConnConnectionException(
+                "VimConnConnectionException occurred."
+            )
+            with self.assertLogs() as captured:
                 instance.new(ro_task, task_index, self.task_depends)
+                self.assertEqual(captured.records[0].levelname, "ERROR")
 
     def test__refresh_ro_task_vim_status_active(self):
         """
@@ -823,7 +820,7 @@ class TestVimInteractionNet(unittest.TestCase):
             self.assertEqual(result[0], task_status)
             self.assertDictEqual(result[1], ro_vim_item_update)
 
-    def test__refresh_ro_task_VimConnException_occured(self):
+    def test__refresh_ro_task_VimConnException_occurred(self):
         """
         vimconn.VimConnException has occured
         """
@@ -858,12 +855,12 @@ class TestVimInteractionNet(unittest.TestCase):
                 "to_check_at": 1637324200.994312,
                 "tasks": {},
             }
-            self.target_vim.refresh_nets_status.side_effect = Mock(
-                side_effect=self.VimConnException("VimConnException occured")
+            self.target_vim.refresh_nets_status.side_effect = VimConnException(
+                "VimConnException occurred."
             )
-            with self.assertRaises(TypeError):
+            with self.assertLogs() as captured:
                 instance.refresh(ro_task)
-            self.target_vim.refresh_nets_status.side_effect = None
+                self.assertEqual(captured.records[0].levelname, "ERROR")
 
     def test__refresh_ro_task_vim_status_deleted(self):
         """
diff --git a/releasenotes/notes/extracting_process_vdu_params-0a7602cda0555c80.yaml b/releasenotes/notes/extracting_process_vdu_params-0a7602cda0555c80.yaml
new file mode 100644 (file)
index 0000000..82ee1bb
--- /dev/null
@@ -0,0 +1,23 @@
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+---
+other:
+  - |
+    Extraction of _process_vdu_params() from being a nested function inside Ns.deploy().
+    The _process_vdu_params() function is now a static method inside the Ns class. This
+    eases the testability of _process_vdu_params().
+    With this extraction a unit test was introduced to cover the extracted function.