From 80ad921cf8875a4fa018b9a99275115b9f26d834 Mon Sep 17 00:00:00 2001 From: lloretgalleg Date: Wed, 8 Jul 2020 07:53:22 +0000 Subject: [PATCH] Obtain services ip after adding kdu Change-Id: I3641a5d3b91adf0987010a6a37aa8a0d97499312 Signed-off-by: lloretgalleg --- osm_lcm/ns.py | 111 ++++++++++++++++++++++++++------------- osm_lcm/tests/test_ns.py | 1 + 2 files changed, 76 insertions(+), 36 deletions(-) diff --git a/osm_lcm/ns.py b/osm_lcm/ns.py index 0d84119..5c2309a 100644 --- a/osm_lcm/ns.py +++ b/osm_lcm/ns.py @@ -42,7 +42,7 @@ from copy import copy, deepcopy from http import HTTPStatus from time import time from uuid import uuid4 -from functools import partial + from random import randint __author__ = "Alfonso Tierno " @@ -2395,24 +2395,70 @@ class NsLcm(LcmBase): self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e)) return False - def _write_db_callback(self, task, item, _id, on_done=None, on_exc=None): - """ - callback for kdu install intended to store the returned kdu_instance at database - :return: None - """ - db_update = {} + async def _install_kdu(self, nsr_id: str, nsr_db_path: str, vnfr_data: dict, kdu_index: int, kdur: dict, kdud: dict, + vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600): + try: - result = task.result() - if on_done: - db_update[on_done] = str(result) + k8sclustertype = k8s_instance_info["k8scluster-type"] + # Instantiate kdu + db_dict_install = {"collection": "nsrs", + "filter": {"_id": nsr_id}, + "path": nsr_db_path} + + kdu_instance = await self.k8scluster_map[k8sclustertype].install( + cluster_uuid=k8s_instance_info["k8scluster-uuid"], + kdu_model=k8s_instance_info["kdu-model"], + atomic=True, + params=k8params, + db_dict=db_dict_install, + timeout=timeout, + kdu_name=k8s_instance_info["kdu-name"], + namespace=k8s_instance_info["namespace"]) + self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}) + + # Obtain services to obtain management service ip + services = await self.k8scluster_map[k8sclustertype].get_services( + cluster_uuid=k8s_instance_info["k8scluster-uuid"], + kdu_instance=kdu_instance, + namespace=k8s_instance_info["namespace"]) + + # Obtain management service info (if exists) + if services: + vnfr_update_dict = {"kdur.{}.services".format(kdu_index): services} + mgmt_services = [service for service in kdud.get("service", []) if service.get("mgmt-service")] + for mgmt_service in mgmt_services: + for service in services: + if service["name"].startswith(mgmt_service["name"]): + # Mgmt service found, Obtain service ip + ip = service.get("external_ip", service.get("cluster_ip")) + if isinstance(ip, list) and len(ip) == 1: + ip = ip[0] + + vnfr_update_dict["kdur.{}.ip-address".format(kdu_index)] = ip + + # Check if must update also mgmt ip at the vnf + service_external_cp = mgmt_service.get("external-connection-point-ref") + if service_external_cp: + if deep_get(vnfd, ("mgmt-interface", "cp")) == service_external_cp: + vnfr_update_dict["ip-address"] = ip + + break + else: + self.logger.warn("Mgmt service name: {} not found".format(mgmt_service["name"])) + + self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict) + except Exception as e: - if on_exc: - db_update[on_exc] = str(e) - if db_update: + # Prepare update db with error and raise exception try: - self.update_db_2(item, _id, db_update) + self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}) except Exception: + # ignore to keep original exception pass + # reraise original error + raise + + return kdu_instance async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info): # Launch kdus if present in the descriptor @@ -2443,9 +2489,11 @@ class NsLcm(LcmBase): updated_cluster_list = [] for vnfr_data in db_vnfrs.values(): - for kdur in get_iterable(vnfr_data, "kdur"): + for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")): + # Step 0: Prepare and set parameters desc_params = self._format_additional_params(kdur.get("additionalParams")) vnfd_id = vnfr_data.get('vnfd-id') + kdud = next(kdud for kdud in db_vnfds[vnfd_id]["kdu"] if kdud["name"] == kdur["kdu-name"]) namespace = kdur.get("k8s-namespace") if kdur.get("helm-chart"): kdumodel = kdur["helm-chart"] @@ -2475,6 +2523,7 @@ class NsLcm(LcmBase): step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id) cluster_uuid = _get_cluster_id(k8s_cluster_id, k8sclustertype) + # Synchronize repos if k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list: del_repo_list, added_repo_dict = await asyncio.ensure_future( self.k8sclusterhelm.synchronize_repos(cluster_uuid=cluster_uuid)) @@ -2488,33 +2537,23 @@ class NsLcm(LcmBase): self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset) updated_cluster_list.append(cluster_uuid) + # Instantiate kdu step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"], k8s_cluster_id) - - k8s_instace_info = {"kdu-instance": None, - "k8scluster-uuid": cluster_uuid, - "k8scluster-type": k8sclustertype, - "member-vnf-index": vnfr_data["member-vnf-index-ref"], - "kdu-name": kdur["kdu-name"], - "kdu-model": kdumodel, - "namespace": namespace} + k8s_instance_info = {"kdu-instance": None, + "k8scluster-uuid": cluster_uuid, + "k8scluster-type": k8sclustertype, + "member-vnf-index": vnfr_data["member-vnf-index-ref"], + "kdu-name": kdur["kdu-name"], + "kdu-model": kdumodel, + "namespace": namespace} db_path = "_admin.deployed.K8s.{}".format(index) - db_nsr_update[db_path] = k8s_instace_info + db_nsr_update[db_path] = k8s_instance_info self.update_db_2("nsrs", nsr_id, db_nsr_update) - db_dict = {"collection": "nsrs", - "filter": {"_id": nsr_id}, - "path": db_path} - task = asyncio.ensure_future( - self.k8scluster_map[k8sclustertype].install(cluster_uuid=cluster_uuid, kdu_model=kdumodel, - atomic=True, params=desc_params, - db_dict=db_dict, timeout=600, - kdu_name=kdur["kdu-name"], namespace=namespace)) - - task.add_done_callback(partial(self._write_db_callback, item="nsrs", _id=nsr_id, - on_done=db_path + ".kdu-instance", - on_exc=db_path + ".detailed-status")) + self._install_kdu(nsr_id, db_path, vnfr_data, kdu_index, kdur, kdud, db_vnfds[vnfd_id], + k8s_instance_info, k8params=desc_params, timeout=600)) self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task) task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"]) diff --git a/osm_lcm/tests/test_ns.py b/osm_lcm/tests/test_ns.py index e17b325..fe2b652 100644 --- a/osm_lcm/tests/test_ns.py +++ b/osm_lcm/tests/test_ns.py @@ -542,6 +542,7 @@ class TestMyNS(asynctest.TestCase): logging_text = "KDU" self.my_ns.k8sclusterhelm.install = asynctest.CoroutineMock(return_value="k8s_id") self.my_ns.k8sclusterhelm.synchronize_repos = asynctest.CoroutineMock(return_value=("", "")) + self.my_ns.k8sclusterhelm.get_services = asynctest.CoroutineMock(return_value=([])) await self.my_ns.deploy_kdus(logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_register) await asyncio.wait(list(task_register.keys()), timeout=100) db_nsr = self.db.get_list("nsrs")[1] -- 2.17.1