X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Fns.py;h=4bed2c6523609b3655ce050fa11dc45304d62532;hb=aebd7da39850e4e7edac17417350256a2a571ec4;hp=31c1f6f3dc403eb1a4dc6619207aa6c21178cc1c;hpb=21e42212823a32e845c3d5f38b427f3d3f387984;p=osm%2FLCM.git diff --git a/osm_lcm/ns.py b/osm_lcm/ns.py index 31c1f6f..4bed2c6 100644 --- a/osm_lcm/ns.py +++ b/osm_lcm/ns.py @@ -42,7 +42,7 @@ from copy import copy, deepcopy from http import HTTPStatus from time import time from uuid import uuid4 -from functools import partial + from random import randint __author__ = "Alfonso Tierno " @@ -56,7 +56,7 @@ class N2VCJujuConnectorLCM(N2VCJujuConnector): vca_type: str = None) -> (str, dict): # admit two new parameters, artifact_path and vca_type if vca_type == "k8s_proxy_charm": - ee_id = await self.n2vc.install_k8s_proxy_charm( + ee_id = await self.install_k8s_proxy_charm( charm_name=artifact_path[artifact_path.rfind("/") + 1:], namespace=namespace, artifact_path=artifact_path, @@ -1560,7 +1560,8 @@ class NsLcm(LcmBase): vca_index=vca_index, vca_type=vca_type) # if SSH access is required, then get execution environment SSH public - if vca_type in ("lxc_proxy_charm", "helm"): # if native charm we have waited already to VM be UP + # if native charm we have waited already to VM be UP + if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm"): pub_key = None user = None # self.logger.debug("get ssh key block") @@ -1902,6 +1903,7 @@ class NsLcm(LcmBase): db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf + # if we haven't this vnfd, read it from db if vnfd_id not in db_vnfds: # read from db @@ -1938,6 +1940,7 @@ class NsLcm(LcmBase): # set state to INSTANTIATED. When instantiated NBI will not delete directly db_nsr_update["_admin.nsState"] = "INSTANTIATED" self.update_db_2("nsrs", nsr_id, db_nsr_update) + self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"}) # n2vc_redesign STEP 2 Deploy Network Scenario stage[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.' @@ -2394,41 +2397,96 @@ class NsLcm(LcmBase): self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e)) return False - def _write_db_callback(self, task, item, _id, on_done=None, on_exc=None): - """ - callback for kdu install intended to store the returned kdu_instance at database - :return: None - """ - db_update = {} + async def _install_kdu(self, nsr_id: str, nsr_db_path: str, vnfr_data: dict, kdu_index: int, kdur: dict, kdud: dict, + vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600): + try: - result = task.result() - if on_done: - db_update[on_done] = str(result) + k8sclustertype = k8s_instance_info["k8scluster-type"] + # Instantiate kdu + db_dict_install = {"collection": "nsrs", + "filter": {"_id": nsr_id}, + "path": nsr_db_path} + + kdu_instance = await self.k8scluster_map[k8sclustertype].install( + cluster_uuid=k8s_instance_info["k8scluster-uuid"], + kdu_model=k8s_instance_info["kdu-model"], + atomic=True, + params=k8params, + db_dict=db_dict_install, + timeout=timeout, + kdu_name=k8s_instance_info["kdu-name"], + namespace=k8s_instance_info["namespace"]) + self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}) + + # Obtain services to obtain management service ip + services = await self.k8scluster_map[k8sclustertype].get_services( + cluster_uuid=k8s_instance_info["k8scluster-uuid"], + kdu_instance=kdu_instance, + namespace=k8s_instance_info["namespace"]) + + # Obtain management service info (if exists) + if services: + vnfr_update_dict = {"kdur.{}.services".format(kdu_index): services} + mgmt_services = [service for service in kdud.get("service", []) if service.get("mgmt-service")] + for mgmt_service in mgmt_services: + for service in services: + if service["name"].startswith(mgmt_service["name"]): + # Mgmt service found, Obtain service ip + ip = service.get("external_ip", service.get("cluster_ip")) + if isinstance(ip, list) and len(ip) == 1: + ip = ip[0] + + vnfr_update_dict["kdur.{}.ip-address".format(kdu_index)] = ip + + # Check if must update also mgmt ip at the vnf + service_external_cp = mgmt_service.get("external-connection-point-ref") + if service_external_cp: + if deep_get(vnfd, ("mgmt-interface", "cp")) == service_external_cp: + vnfr_update_dict["ip-address"] = ip + + break + else: + self.logger.warn("Mgmt service name: {} not found".format(mgmt_service["name"])) + + self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict) + except Exception as e: - if on_exc: - db_update[on_exc] = str(e) - if db_update: + # Prepare update db with error and raise exception try: - self.update_db_2(item, _id, db_update) + self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}) except Exception: + # ignore to keep original exception pass + # reraise original error + raise + + return kdu_instance async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info): # Launch kdus if present in the descriptor k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}} - def _get_cluster_id(cluster_id, cluster_type): + async def _get_cluster_id(cluster_id, cluster_type): nonlocal k8scluster_id_2_uuic if cluster_id in k8scluster_id_2_uuic[cluster_type]: return k8scluster_id_2_uuic[cluster_type][cluster_id] + # check if K8scluster is creating and wait look if previous tasks in process + task_name, task_dependency = self.lcm_tasks.lookfor_related("k8scluster", cluster_id) + if task_dependency: + text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(task_name, cluster_id) + self.logger.debug(logging_text + text) + await asyncio.wait(task_dependency, timeout=3600) + db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False) if not db_k8scluster: raise LcmException("K8s cluster {} cannot be found".format(cluster_id)) + k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id")) if not k8s_id: - raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type)) + raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id, + cluster_type)) k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id return k8s_id @@ -2442,9 +2500,11 @@ class NsLcm(LcmBase): updated_cluster_list = [] for vnfr_data in db_vnfrs.values(): - for kdur in get_iterable(vnfr_data, "kdur"): + for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")): + # Step 0: Prepare and set parameters desc_params = self._format_additional_params(kdur.get("additionalParams")) vnfd_id = vnfr_data.get('vnfd-id') + kdud = next(kdud for kdud in db_vnfds[vnfd_id]["kdu"] if kdud["name"] == kdur["kdu-name"]) namespace = kdur.get("k8s-namespace") if kdur.get("helm-chart"): kdumodel = kdur["helm-chart"] @@ -2472,8 +2532,9 @@ class NsLcm(LcmBase): k8s_cluster_id = kdur["k8s-cluster"]["id"] step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id) - cluster_uuid = _get_cluster_id(k8s_cluster_id, k8sclustertype) + cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype) + # Synchronize repos if k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list: del_repo_list, added_repo_dict = await asyncio.ensure_future( self.k8sclusterhelm.synchronize_repos(cluster_uuid=cluster_uuid)) @@ -2487,33 +2548,23 @@ class NsLcm(LcmBase): self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset) updated_cluster_list.append(cluster_uuid) + # Instantiate kdu step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"], k8s_cluster_id) - - k8s_instace_info = {"kdu-instance": None, - "k8scluster-uuid": cluster_uuid, - "k8scluster-type": k8sclustertype, - "member-vnf-index": vnfr_data["member-vnf-index-ref"], - "kdu-name": kdur["kdu-name"], - "kdu-model": kdumodel, - "namespace": namespace} + k8s_instance_info = {"kdu-instance": None, + "k8scluster-uuid": cluster_uuid, + "k8scluster-type": k8sclustertype, + "member-vnf-index": vnfr_data["member-vnf-index-ref"], + "kdu-name": kdur["kdu-name"], + "kdu-model": kdumodel, + "namespace": namespace} db_path = "_admin.deployed.K8s.{}".format(index) - db_nsr_update[db_path] = k8s_instace_info + db_nsr_update[db_path] = k8s_instance_info self.update_db_2("nsrs", nsr_id, db_nsr_update) - db_dict = {"collection": "nsrs", - "filter": {"_id": nsr_id}, - "path": db_path} - task = asyncio.ensure_future( - self.k8scluster_map[k8sclustertype].install(cluster_uuid=cluster_uuid, kdu_model=kdumodel, - atomic=True, params=desc_params, - db_dict=db_dict, timeout=600, - kdu_name=kdur["kdu-name"], namespace=namespace)) - - task.add_done_callback(partial(self._write_db_callback, item="nsrs", _id=nsr_id, - on_done=db_path + ".kdu-instance", - on_exc=db_path + ".detailed-status")) + self._install_kdu(nsr_id, db_path, vnfr_data, kdu_index, kdur, kdud, db_vnfds[vnfd_id], + k8s_instance_info, k8params=desc_params, timeout=600)) self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task) task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"]) @@ -3174,8 +3225,9 @@ class NsLcm(LcmBase): vca_type = vca.get("type") exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and vca.get("needed_terminate")) - # For helm we must destroy_ee - destroy_ee = "True" if vca_type == "helm" else "False" + # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are + # pending native charms + destroy_ee = "True" if vca_type in ("helm", "native_charm") else "False" task = asyncio.ensure_future( self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, vca_index, destroy_ee, exec_terminate_primitives)) @@ -3297,6 +3349,12 @@ class NsLcm(LcmBase): operation_state=nslcmop_operation_state, other_update=db_nslcmop_update, ) + if ns_state == "NOT_INSTANTIATED": + try: + self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "NOT_INSTANTIATED"}) + except DbException as e: + self.logger.warn(logging_text + 'Error writing VNFR status for nsr-id-ref: {} -> {}'. + format(nsr_id, e)) if operation_params: autoremove = operation_params.get("autoremove", False) if nslcmop_operation_state: @@ -3555,11 +3613,15 @@ class NsLcm(LcmBase): config_primitive_desc = config_primitive break - if not config_primitive_desc and not (kdu_name and primitive in ("upgrade", "rollback", "status")): - raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ". - format(primitive)) - primitive_name = config_primitive_desc.get("execution-environment-primitive", primitive) - ee_descriptor_id = config_primitive_desc.get("execution-environment-ref") + if not config_primitive_desc: + if not (kdu_name and primitive in ("upgrade", "rollback", "status")): + raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ". + format(primitive)) + primitive_name = primitive + ee_descriptor_id = None + else: + primitive_name = config_primitive_desc.get("execution-environment-primitive", primitive) + ee_descriptor_id = config_primitive_desc.get("execution-environment-ref") if vnf_index: if vdu_id: