From: tierno Date: Thu, 12 Sep 2019 09:33:40 +0000 (+0000) Subject: fix issue with ns.NsLcm.ns_params_2_RO. Adding unittest X-Git-Tag: v7.0.0rc1~27 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=refs%2Fchanges%2F44%2F7944%2F1;p=osm%2FLCM.git fix issue with ns.NsLcm.ns_params_2_RO. Adding unittest Change-Id: I5c115c061ee3e67eb80a205135b1b1c364421b65 Signed-off-by: tierno --- diff --git a/osm_lcm/ns.py b/osm_lcm/ns.py index 8742488..47cff47 100644 --- a/osm_lcm/ns.py +++ b/osm_lcm/ns.py @@ -53,7 +53,7 @@ def get_iterable(in_dict, in_key): def populate_dict(target_dict, key_list, value): """ - Upate target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value. + Update target_dict creating nested dictionaries with the key_list. Last key_list item is assigned the value. Example target_dict={K: J}; key_list=[a,b,c]; target_dict will be {K: J, a: {b: {c: value}}} :param target_dict: dictionary to be changed :param key_list: list of keys to insert at target_dict @@ -67,6 +67,21 @@ def populate_dict(target_dict, key_list, value): target_dict[key_list[-1]] = value +def deep_get(target_dict, key_list): + """ + Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None + Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None + :param target_dict: dictionary to be read + :param key_list: list of keys to read from target_dict + :return: The wanted value if exist, None otherwise + """ + for key in key_list: + if not isinstance(target_dict, dict) or key not in target_dict: + return None + target_dict = target_dict[key] + return target_dict + + class NsLcm(LcmBase): timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed total_deploy_timeout = 2 * 3600 # global timeout for deployment @@ -309,7 +324,7 @@ class NsLcm(LcmBase): vdu_needed_access = [] mgmt_cp = None if vnfd.get("vnf-configuration"): - ssh_required = vnfd["vnf-configuration"].get("config-access", {}).get("ssh-access").get("required") + ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required")) if ssh_required and vnfd.get("mgmt-interface"): if vnfd["mgmt-interface"].get("vdu-id"): vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"]) @@ -318,7 +333,7 @@ class NsLcm(LcmBase): for vdu in vnfd.get("vdu", ()): if vdu.get("vdu-configuration"): - ssh_required = vdu["vdu-configuration"].get("config-access", {}).get("ssh-access").get("required") + ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required")) if ssh_required: vdu_needed_access.append(vdu["id"]) elif mgmt_cp: @@ -700,8 +715,7 @@ class NsLcm(LcmBase): db_nsr_update["detailed-status"] = "creating" db_nsr_update["operational-status"] = "init" - if not db_nsr["_admin"].get("deployed") or not db_nsr["_admin"]["deployed"].get("RO") or \ - not db_nsr["_admin"]["deployed"]["RO"].get("vnfd"): + if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list): populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), []) db_nsr_update["_admin.deployed.RO.vnfd"] = [] @@ -1110,7 +1124,7 @@ class NsLcm(LcmBase): # Crate ns at RO # if present use it unless in error status - RO_nsr_id = db_nsr["_admin"].get("deployed", {}).get("RO", {}).get("nsr_id") + RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id")) if RO_nsr_id: try: step = db_nsr_update["detailed-status"] = "Looking for existing ns at RO" diff --git a/osm_lcm/tests/test_ns.py b/osm_lcm/tests/test_ns.py index 4d191ce..aa3933f 100644 --- a/osm_lcm/tests/test_ns.py +++ b/osm_lcm/tests/test_ns.py @@ -1131,6 +1131,32 @@ class TestMyNS(asynctest.TestCase): # TODO add more checks of called methods # TODO add a terminate + def test_ns_params_2_RO(self): + vim = self._db_get_list("vim_accounts")[0] + vim_id = vim["_id"] + ro_vim_id = vim["_admin"]["deployed"]["RO"] + ns_params = {"vimAccountId": vim_id} + mgmt_interface = {"cp": "cp"} + vdu = [{"id": "vdu_id", "interface": [{"external-connection-point-ref": "cp"}]}] + vnfd_dict = { + "1": {"vdu": vdu, "mgmt-interface": mgmt_interface}, + "2": {"vdu": vdu, "mgmt-interface": mgmt_interface, "vnf-configuration": None}, + "3": {"vdu": vdu, "mgmt-interface": mgmt_interface, "vnf-configuration": {"config-access": None}}, + "4": {"vdu": vdu, "mgmt-interface": mgmt_interface, + "vnf-configuration": {"config-access": {"ssh-access": None}}}, + "5": {"vdu": vdu, "mgmt-interface": mgmt_interface, + "vnf-configuration": {"config-access": {"ssh-access": {"required": True, "default_user": "U"}}}}, + } + nsd = {"constituent-vnfd": []} + for k in vnfd_dict.keys(): + nsd["constituent-vnfd"].append({"vnfd-id-ref": k, "member-vnf-index": k}) + + n2vc_key_list = ["key"] + ro_ns_params = self.my_ns.ns_params_2_RO(ns_params, nsd, vnfd_dict, n2vc_key_list) + ro_params_expected = {'wim_account': None, "datacenter": ro_vim_id, + "vnfs": {"5": {"vdus": {"vdu_id": {"mgmt_keys": n2vc_key_list}}}}} + self.assertEqual(ro_ns_params, ro_params_expected) + @asynctest.fail_on(active_handles=True) # all async tasks must be completed async def test_scale(self): pass