Obtain services ip after adding kdu 55/9355/8
authorlloretgalleg <illoret@indra.es>
Wed, 8 Jul 2020 07:53:22 +0000 (07:53 +0000)
committertierno <alfonso.tiernosepulveda@telefonica.com>
Mon, 13 Jul 2020 10:56:46 +0000 (12:56 +0200)
Change-Id: I3641a5d3b91adf0987010a6a37aa8a0d97499312
Signed-off-by: lloretgalleg <illoret@indra.es>
osm_lcm/ns.py
osm_lcm/tests/test_ns.py

index 0d84119..5c2309a 100644 (file)
@@ -42,7 +42,7 @@ from copy import copy, deepcopy
 from http import HTTPStatus
 from time import time
 from uuid import uuid4
-from functools import partial
+
 from random import randint
 
 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
@@ -2395,24 +2395,70 @@ class NsLcm(LcmBase):
             self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
             return False
 
-    def _write_db_callback(self, task, item, _id, on_done=None, on_exc=None):
-        """
-        callback for kdu install intended to store the returned kdu_instance at database
-        :return: None
-        """
-        db_update = {}
+    async def _install_kdu(self, nsr_id: str, nsr_db_path: str, vnfr_data: dict, kdu_index: int, kdur: dict, kdud: dict,
+                           vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600):
+
         try:
-            result = task.result()
-            if on_done:
-                db_update[on_done] = str(result)
+            k8sclustertype = k8s_instance_info["k8scluster-type"]
+            # Instantiate kdu
+            db_dict_install = {"collection": "nsrs",
+                               "filter": {"_id": nsr_id},
+                               "path": nsr_db_path}
+
+            kdu_instance = await self.k8scluster_map[k8sclustertype].install(
+                cluster_uuid=k8s_instance_info["k8scluster-uuid"],
+                kdu_model=k8s_instance_info["kdu-model"],
+                atomic=True,
+                params=k8params,
+                db_dict=db_dict_install,
+                timeout=timeout,
+                kdu_name=k8s_instance_info["kdu-name"],
+                namespace=k8s_instance_info["namespace"])
+            self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
+
+            # Obtain services to obtain management service ip
+            services = await self.k8scluster_map[k8sclustertype].get_services(
+                cluster_uuid=k8s_instance_info["k8scluster-uuid"],
+                kdu_instance=kdu_instance,
+                namespace=k8s_instance_info["namespace"])
+
+            # Obtain management service info (if exists)
+            if services:
+                vnfr_update_dict = {"kdur.{}.services".format(kdu_index): services}
+                mgmt_services = [service for service in kdud.get("service", []) if service.get("mgmt-service")]
+                for mgmt_service in mgmt_services:
+                    for service in services:
+                        if service["name"].startswith(mgmt_service["name"]):
+                            # Mgmt service found, Obtain service ip
+                            ip = service.get("external_ip", service.get("cluster_ip"))
+                            if isinstance(ip, list) and len(ip) == 1:
+                                ip = ip[0]
+
+                            vnfr_update_dict["kdur.{}.ip-address".format(kdu_index)] = ip
+
+                            # Check if must update also mgmt ip at the vnf
+                            service_external_cp = mgmt_service.get("external-connection-point-ref")
+                            if service_external_cp:
+                                if deep_get(vnfd, ("mgmt-interface", "cp")) == service_external_cp:
+                                    vnfr_update_dict["ip-address"] = ip
+
+                            break
+                    else:
+                        self.logger.warn("Mgmt service name: {} not found".format(mgmt_service["name"]))
+
+                self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
+
         except Exception as e:
-            if on_exc:
-                db_update[on_exc] = str(e)
-        if db_update:
+            # Prepare update db with error and raise exception
             try:
-                self.update_db_2(item, _id, db_update)
+                self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)})
             except Exception:
+                # ignore to keep original exception
                 pass
+            # reraise original error
+            raise
+
+        return kdu_instance
 
     async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
         # Launch kdus if present in the descriptor
@@ -2443,9 +2489,11 @@ class NsLcm(LcmBase):
             updated_cluster_list = []
 
             for vnfr_data in db_vnfrs.values():
-                for kdur in get_iterable(vnfr_data, "kdur"):
+                for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
+                    # Step 0: Prepare and set parameters
                     desc_params = self._format_additional_params(kdur.get("additionalParams"))
                     vnfd_id = vnfr_data.get('vnfd-id')
+                    kdud = next(kdud for kdud in db_vnfds[vnfd_id]["kdu"] if kdud["name"] == kdur["kdu-name"])
                     namespace = kdur.get("k8s-namespace")
                     if kdur.get("helm-chart"):
                         kdumodel = kdur["helm-chart"]
@@ -2475,6 +2523,7 @@ class NsLcm(LcmBase):
                     step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
                     cluster_uuid = _get_cluster_id(k8s_cluster_id, k8sclustertype)
 
+                    # Synchronize  repos
                     if k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list:
                         del_repo_list, added_repo_dict = await asyncio.ensure_future(
                             self.k8sclusterhelm.synchronize_repos(cluster_uuid=cluster_uuid))
@@ -2488,33 +2537,23 @@ class NsLcm(LcmBase):
                             self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
                         updated_cluster_list.append(cluster_uuid)
 
+                    # Instantiate kdu
                     step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
                                                                               kdur["kdu-name"], k8s_cluster_id)
-
-                    k8s_instace_info = {"kdu-instance": None,
-                                        "k8scluster-uuid": cluster_uuid,
-                                        "k8scluster-type": k8sclustertype,
-                                        "member-vnf-index": vnfr_data["member-vnf-index-ref"],
-                                        "kdu-name": kdur["kdu-name"],
-                                        "kdu-model": kdumodel,
-                                        "namespace": namespace}
+                    k8s_instance_info = {"kdu-instance": None,
+                                         "k8scluster-uuid": cluster_uuid,
+                                         "k8scluster-type": k8sclustertype,
+                                         "member-vnf-index": vnfr_data["member-vnf-index-ref"],
+                                         "kdu-name": kdur["kdu-name"],
+                                         "kdu-model": kdumodel,
+                                         "namespace": namespace}
                     db_path = "_admin.deployed.K8s.{}".format(index)
-                    db_nsr_update[db_path] = k8s_instace_info
+                    db_nsr_update[db_path] = k8s_instance_info
                     self.update_db_2("nsrs", nsr_id, db_nsr_update)
 
-                    db_dict = {"collection": "nsrs",
-                               "filter": {"_id": nsr_id},
-                               "path": db_path}
-
                     task = asyncio.ensure_future(
-                        self.k8scluster_map[k8sclustertype].install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
-                                                                    atomic=True, params=desc_params,
-                                                                    db_dict=db_dict, timeout=600,
-                                                                    kdu_name=kdur["kdu-name"], namespace=namespace))
-
-                    task.add_done_callback(partial(self._write_db_callback, item="nsrs", _id=nsr_id,
-                                                   on_done=db_path + ".kdu-instance",
-                                                   on_exc=db_path + ".detailed-status"))
+                        self._install_kdu(nsr_id, db_path, vnfr_data, kdu_index, kdur, kdud, db_vnfds[vnfd_id],
+                                          k8s_instance_info, k8params=desc_params, timeout=600))
                     self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
                     task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
 
index e17b325..fe2b652 100644 (file)
@@ -542,6 +542,7 @@ class TestMyNS(asynctest.TestCase):
         logging_text = "KDU"
         self.my_ns.k8sclusterhelm.install = asynctest.CoroutineMock(return_value="k8s_id")
         self.my_ns.k8sclusterhelm.synchronize_repos = asynctest.CoroutineMock(return_value=("", ""))
+        self.my_ns.k8sclusterhelm.get_services = asynctest.CoroutineMock(return_value=([]))
         await self.my_ns.deploy_kdus(logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_register)
         await asyncio.wait(list(task_register.keys()), timeout=100)
         db_nsr = self.db.get_list("nsrs")[1]