X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Fns.py;h=5c2309adea97681a3fceb0dc8b04452b54f900de;hb=80ad921cf8875a4fa018b9a99275115b9f26d834;hp=0d841191aedde64e1dc54cfd34f3efd5864f2a22;hpb=f36326cff016a5acb0e38b5ea47db95d0164b1fa;p=osm%2FLCM.git diff --git a/osm_lcm/ns.py b/osm_lcm/ns.py index 0d84119..5c2309a 100644 --- a/osm_lcm/ns.py +++ b/osm_lcm/ns.py @@ -42,7 +42,7 @@ from copy import copy, deepcopy from http import HTTPStatus from time import time from uuid import uuid4 -from functools import partial + from random import randint __author__ = "Alfonso Tierno " @@ -2395,24 +2395,70 @@ class NsLcm(LcmBase): self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e)) return False - def _write_db_callback(self, task, item, _id, on_done=None, on_exc=None): - """ - callback for kdu install intended to store the returned kdu_instance at database - :return: None - """ - db_update = {} + async def _install_kdu(self, nsr_id: str, nsr_db_path: str, vnfr_data: dict, kdu_index: int, kdur: dict, kdud: dict, + vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600): + try: - result = task.result() - if on_done: - db_update[on_done] = str(result) + k8sclustertype = k8s_instance_info["k8scluster-type"] + # Instantiate kdu + db_dict_install = {"collection": "nsrs", + "filter": {"_id": nsr_id}, + "path": nsr_db_path} + + kdu_instance = await self.k8scluster_map[k8sclustertype].install( + cluster_uuid=k8s_instance_info["k8scluster-uuid"], + kdu_model=k8s_instance_info["kdu-model"], + atomic=True, + params=k8params, + db_dict=db_dict_install, + timeout=timeout, + kdu_name=k8s_instance_info["kdu-name"], + namespace=k8s_instance_info["namespace"]) + self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}) + + # Obtain services to obtain management service ip + services = await self.k8scluster_map[k8sclustertype].get_services( + cluster_uuid=k8s_instance_info["k8scluster-uuid"], + kdu_instance=kdu_instance, + namespace=k8s_instance_info["namespace"]) + + # Obtain management service info (if exists) + if services: + vnfr_update_dict = {"kdur.{}.services".format(kdu_index): services} + mgmt_services = [service for service in kdud.get("service", []) if service.get("mgmt-service")] + for mgmt_service in mgmt_services: + for service in services: + if service["name"].startswith(mgmt_service["name"]): + # Mgmt service found, Obtain service ip + ip = service.get("external_ip", service.get("cluster_ip")) + if isinstance(ip, list) and len(ip) == 1: + ip = ip[0] + + vnfr_update_dict["kdur.{}.ip-address".format(kdu_index)] = ip + + # Check if must update also mgmt ip at the vnf + service_external_cp = mgmt_service.get("external-connection-point-ref") + if service_external_cp: + if deep_get(vnfd, ("mgmt-interface", "cp")) == service_external_cp: + vnfr_update_dict["ip-address"] = ip + + break + else: + self.logger.warn("Mgmt service name: {} not found".format(mgmt_service["name"])) + + self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict) + except Exception as e: - if on_exc: - db_update[on_exc] = str(e) - if db_update: + # Prepare update db with error and raise exception try: - self.update_db_2(item, _id, db_update) + self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}) except Exception: + # ignore to keep original exception pass + # reraise original error + raise + + return kdu_instance async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info): # Launch kdus if present in the descriptor @@ -2443,9 +2489,11 @@ class NsLcm(LcmBase): updated_cluster_list = [] for vnfr_data in db_vnfrs.values(): - for kdur in get_iterable(vnfr_data, "kdur"): + for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")): + # Step 0: Prepare and set parameters desc_params = self._format_additional_params(kdur.get("additionalParams")) vnfd_id = vnfr_data.get('vnfd-id') + kdud = next(kdud for kdud in db_vnfds[vnfd_id]["kdu"] if kdud["name"] == kdur["kdu-name"]) namespace = kdur.get("k8s-namespace") if kdur.get("helm-chart"): kdumodel = kdur["helm-chart"] @@ -2475,6 +2523,7 @@ class NsLcm(LcmBase): step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id) cluster_uuid = _get_cluster_id(k8s_cluster_id, k8sclustertype) + # Synchronize repos if k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list: del_repo_list, added_repo_dict = await asyncio.ensure_future( self.k8sclusterhelm.synchronize_repos(cluster_uuid=cluster_uuid)) @@ -2488,33 +2537,23 @@ class NsLcm(LcmBase): self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset) updated_cluster_list.append(cluster_uuid) + # Instantiate kdu step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"], k8s_cluster_id) - - k8s_instace_info = {"kdu-instance": None, - "k8scluster-uuid": cluster_uuid, - "k8scluster-type": k8sclustertype, - "member-vnf-index": vnfr_data["member-vnf-index-ref"], - "kdu-name": kdur["kdu-name"], - "kdu-model": kdumodel, - "namespace": namespace} + k8s_instance_info = {"kdu-instance": None, + "k8scluster-uuid": cluster_uuid, + "k8scluster-type": k8sclustertype, + "member-vnf-index": vnfr_data["member-vnf-index-ref"], + "kdu-name": kdur["kdu-name"], + "kdu-model": kdumodel, + "namespace": namespace} db_path = "_admin.deployed.K8s.{}".format(index) - db_nsr_update[db_path] = k8s_instace_info + db_nsr_update[db_path] = k8s_instance_info self.update_db_2("nsrs", nsr_id, db_nsr_update) - db_dict = {"collection": "nsrs", - "filter": {"_id": nsr_id}, - "path": db_path} - task = asyncio.ensure_future( - self.k8scluster_map[k8sclustertype].install(cluster_uuid=cluster_uuid, kdu_model=kdumodel, - atomic=True, params=desc_params, - db_dict=db_dict, timeout=600, - kdu_name=kdur["kdu-name"], namespace=namespace)) - - task.add_done_callback(partial(self._write_db_callback, item="nsrs", _id=nsr_id, - on_done=db_path + ".kdu-instance", - on_exc=db_path + ".detailed-status")) + self._install_kdu(nsr_id, db_path, vnfr_data, kdu_index, kdur, kdud, db_vnfds[vnfd_id], + k8s_instance_info, k8params=desc_params, timeout=600)) self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task) task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])