fix issue with ns.NsLcm.ns_params_2_RO. Adding unittest
[osm/LCM.git] / osm_lcm / ns.py
index e1c01b3..47cff47 100644 (file)
@@ -53,7 +53,7 @@ def get_iterable(in_dict, in_key):
 
 def populate_dict(target_dict, key_list, value):
     """
-    Upate target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value.
+    Update target_dict creating nested dictionaries with the key_list. Last key_list item is assigned the value.
     Example target_dict={K: J}; key_list=[a,b,c];  target_dict will be {K: J, a: {b: {c: value}}}
     :param target_dict: dictionary to be changed
     :param key_list: list of keys to insert at target_dict
@@ -67,6 +67,21 @@ def populate_dict(target_dict, key_list, value):
     target_dict[key_list[-1]] = value
 
 
+def deep_get(target_dict, key_list):
+    """
+    Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
+    Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
+    :param target_dict: dictionary to be read
+    :param key_list: list of keys to read from  target_dict
+    :return: The wanted value if exist, None otherwise
+    """
+    for key in key_list:
+        if not isinstance(target_dict, dict) or key not in target_dict:
+            return None
+        target_dict = target_dict[key]
+    return target_dict
+
+
 class NsLcm(LcmBase):
     timeout_vca_on_error = 5 * 60   # Time for charm from first time at blocked,error status to mark as failed
     total_deploy_timeout = 2 * 3600   # global timeout for deployment
@@ -304,36 +319,39 @@ class NsLcm(LcmBase):
             "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
             # "scenario": ns_params["nsdId"],
         }
-        if n2vc_key_list:
-            for vnfd_ref, vnfd in vnfd_dict.items():
-                vdu_needed_access = []
-                mgmt_cp = None
-                if vnfd.get("vnf-configuration"):
-                    if vnfd.get("mgmt-interface"):
-                        if vnfd["mgmt-interface"].get("vdu-id"):
-                            vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
-                        elif vnfd["mgmt-interface"].get("cp"):
-                            mgmt_cp = vnfd["mgmt-interface"]["cp"]
-
-                for vdu in vnfd.get("vdu", ()):
-                    if vdu.get("vdu-configuration"):
+        n2vc_key_list = n2vc_key_list or []
+        for vnfd_ref, vnfd in vnfd_dict.items():
+            vdu_needed_access = []
+            mgmt_cp = None
+            if vnfd.get("vnf-configuration"):
+                ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
+                if ssh_required and vnfd.get("mgmt-interface"):
+                    if vnfd["mgmt-interface"].get("vdu-id"):
+                        vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
+                    elif vnfd["mgmt-interface"].get("cp"):
+                        mgmt_cp = vnfd["mgmt-interface"]["cp"]
+
+            for vdu in vnfd.get("vdu", ()):
+                if vdu.get("vdu-configuration"):
+                    ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
+                    if ssh_required:
                         vdu_needed_access.append(vdu["id"])
-                    elif mgmt_cp:
-                        for vdu_interface in vdu.get("interface"):
-                            if vdu_interface.get("external-connection-point-ref") and \
-                                    vdu_interface["external-connection-point-ref"] == mgmt_cp:
-                                vdu_needed_access.append(vdu["id"])
-                                mgmt_cp = None
-                                break
+                elif mgmt_cp:
+                    for vdu_interface in vdu.get("interface"):
+                        if vdu_interface.get("external-connection-point-ref") and \
+                                vdu_interface["external-connection-point-ref"] == mgmt_cp:
+                            vdu_needed_access.append(vdu["id"])
+                            mgmt_cp = None
+                            break
 
-                if vdu_needed_access:
-                    for vnf_member in nsd.get("constituent-vnfd"):
-                        if vnf_member["vnfd-id-ref"] != vnfd_ref:
-                            continue
-                        for vdu in vdu_needed_access:
-                            populate_dict(RO_ns_params,
-                                          ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
-                                          n2vc_key_list)
+            if vdu_needed_access:
+                for vnf_member in nsd.get("constituent-vnfd"):
+                    if vnf_member["vnfd-id-ref"] != vnfd_ref:
+                        continue
+                    for vdu in vdu_needed_access:
+                        populate_dict(RO_ns_params,
+                                      ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
+                                      n2vc_key_list)
 
         if ns_params.get("vduImage"):
             RO_ns_params["vduImage"] = ns_params["vduImage"]
@@ -697,8 +715,7 @@ class NsLcm(LcmBase):
 
             db_nsr_update["detailed-status"] = "creating"
             db_nsr_update["operational-status"] = "init"
-            if not db_nsr["_admin"].get("deployed") or not db_nsr["_admin"]["deployed"].get("RO") or \
-                    not db_nsr["_admin"]["deployed"]["RO"].get("vnfd"):
+            if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
                 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
                 db_nsr_update["_admin.deployed.RO.vnfd"] = []
 
@@ -1107,7 +1124,7 @@ class NsLcm(LcmBase):
 
             # Crate ns at RO
             # if present use it unless in error status
-            RO_nsr_id = db_nsr["_admin"].get("deployed", {}).get("RO", {}).get("nsr_id")
+            RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
             if RO_nsr_id:
                 try:
                     step = db_nsr_update["detailed-status"] = "Looking for existing ns at RO"