X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Fns.py;h=a83495afdd382f9727fd34d8771387286da50d90;hb=ba89cbb37502265011935fddbc04cb4304b14ca2;hp=0d841191aedde64e1dc54cfd34f3efd5864f2a22;hpb=a27e20a542488a488deec71cda81e2be5b623111;p=osm%2FLCM.git diff --git a/osm_lcm/ns.py b/osm_lcm/ns.py index 0d84119..a83495a 100644 --- a/osm_lcm/ns.py +++ b/osm_lcm/ns.py @@ -22,7 +22,7 @@ import logging import logging.handlers import traceback import json -from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError +from jinja2 import Environment, TemplateError, TemplateNotFound, StrictUndefined, UndefinedError from osm_lcm import ROclient from osm_lcm.ng_ro import NgRoClient, NgRoException @@ -42,7 +42,7 @@ from copy import copy, deepcopy from http import HTTPStatus from time import time from uuid import uuid4 -from functools import partial + from random import randint __author__ = "Alfonso Tierno " @@ -148,7 +148,9 @@ class NsLcm(LcmBase): fs=self.fs, log=self.logger, db=self.db, + loop=self.loop, on_update_db=None, + vca_config=self.vca_config, ) self.k8scluster_map = { @@ -278,6 +280,69 @@ class NsLcm(LcmBase): except Exception as e: self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e)) + @staticmethod + def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id): + try: + env = Environment(undefined=StrictUndefined) + template = env.from_string(cloud_init_text) + return template.render(additional_params or {}) + except UndefinedError as e: + raise LcmException("Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-" + "file, must be provided in the instantiation parameters inside the " + "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id)) + except (TemplateError, TemplateNotFound) as e: + raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}". + format(vnfd_id, vdu_id, e)) + + def _get_cloud_init(self, vdu, vnfd): + try: + cloud_init_content = cloud_init_file = None + if vdu.get("cloud-init-file"): + base_folder = vnfd["_admin"]["storage"] + cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"], + vdu["cloud-init-file"]) + with self.fs.file_open(cloud_init_file, "r") as ci_file: + cloud_init_content = ci_file.read() + elif vdu.get("cloud-init"): + cloud_init_content = vdu["cloud-init"] + + return cloud_init_content + except FsException as e: + raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}". + format(vnfd["id"], vdu["id"], cloud_init_file, e)) + + def _get_osm_params(self, db_vnfr, vdu_id=None, vdu_count_index=0): + osm_params = {x.replace("-", "_"): db_vnfr[x] for x in ("ip-address", "vim-account-id", "vnfd-id", "vnfd-ref") + if db_vnfr.get(x) is not None} + osm_params["ns_id"] = db_vnfr["nsr-id-ref"] + osm_params["vnf_id"] = db_vnfr["_id"] + osm_params["member_vnf_index"] = db_vnfr["member-vnf-index-ref"] + if db_vnfr.get("vdur"): + osm_params["vdu"] = {} + for vdur in db_vnfr["vdur"]: + vdu = { + "count_index": vdur["count-index"], + "vdu_id": vdur["vdu-id-ref"], + "interfaces": {} + } + if vdur.get("ip-address"): + vdu["ip_address"] = vdur["ip-address"] + for iface in vdur["interfaces"]: + vdu["interfaces"][iface["name"]] = \ + {x.replace("-", "_"): iface[x] for x in ("mac-address", "ip-address", "vnf-vld-id", "name") + if iface.get(x) is not None} + vdu_id_index = "{}-{}".format(vdur["vdu-id-ref"], vdur["count-index"]) + osm_params["vdu"][vdu_id_index] = vdu + if vdu_id: + osm_params["vdu_id"] = vdu_id + osm_params["count_index"] = vdu_count_index + return osm_params + + def _get_vdu_additional_params(self, db_vnfr, vdu_id): + vdur = next(vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]) + additional_params = vdur.get("additionalParams") + return self._format_additional_params(additional_params) + def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None): """ Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd @@ -287,54 +352,23 @@ class NsLcm(LcmBase): :param nsrId: Id of the NSR :return: copy of vnfd """ - try: - vnfd_RO = deepcopy(vnfd) - # remove unused by RO configuration, monitoring, scaling and internal keys - vnfd_RO.pop("_id", None) - vnfd_RO.pop("_admin", None) - vnfd_RO.pop("vnf-configuration", None) - vnfd_RO.pop("monitoring-param", None) - vnfd_RO.pop("scaling-group-descriptor", None) - vnfd_RO.pop("kdu", None) - vnfd_RO.pop("k8s-cluster", None) - if new_id: - vnfd_RO["id"] = new_id - - # parse cloud-init or cloud-init-file with the provided variables using Jinja2 - for vdu in get_iterable(vnfd_RO, "vdu"): - cloud_init_file = None - if vdu.get("cloud-init-file"): - base_folder = vnfd["_admin"]["storage"] - cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"], - vdu["cloud-init-file"]) - with self.fs.file_open(cloud_init_file, "r") as ci_file: - cloud_init_content = ci_file.read() - vdu.pop("cloud-init-file", None) - elif vdu.get("cloud-init"): - cloud_init_content = vdu["cloud-init"] - else: - continue - - env = Environment() - ast = env.parse(cloud_init_content) - mandatory_vars = meta.find_undeclared_variables(ast) - if mandatory_vars: - for var in mandatory_vars: - if not additionalParams or var not in additionalParams.keys(): - raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-" - "file, must be provided in the instantiation parameters inside the " - "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"])) - template = Template(cloud_init_content) - cloud_init_content = template.render(additionalParams or {}) - vdu["cloud-init"] = cloud_init_content - - return vnfd_RO - except FsException as e: - raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}". - format(vnfd["id"], vdu["id"], cloud_init_file, e)) - except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e: - raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}". - format(vnfd["id"], vdu["id"], e)) + vnfd_RO = deepcopy(vnfd) + # remove unused by RO configuration, monitoring, scaling and internal keys + vnfd_RO.pop("_id", None) + vnfd_RO.pop("_admin", None) + vnfd_RO.pop("vnf-configuration", None) + vnfd_RO.pop("monitoring-param", None) + vnfd_RO.pop("scaling-group-descriptor", None) + vnfd_RO.pop("kdu", None) + vnfd_RO.pop("k8s-cluster", None) + if new_id: + vnfd_RO["id"] = new_id + + # parse cloud-init or cloud-init-file with the provided variables using Jinja2 + for vdu in get_iterable(vnfd_RO, "vdu"): + vdu.pop("cloud-init-file", None) + vdu.pop("cloud-init", None) + return vnfd_RO def _ns_params_2_RO(self, ns_params, nsd, vnfd_dict, db_vnfrs, n2vc_key_list): """ @@ -444,6 +478,25 @@ class NsLcm(LcmBase): populate_dict(RO_ns_params, ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"), n2vc_key_list) + # cloud init + for vdu in get_iterable(vnfd, "vdu"): + cloud_init_text = self._get_cloud_init(vdu, vnfd) + if not cloud_init_text: + continue + for vnf_member in nsd.get("constituent-vnfd"): + if vnf_member["vnfd-id-ref"] != vnfd_ref: + continue + db_vnfr = db_vnfrs[vnf_member["member-vnf-index"]] + additional_params = self._get_vdu_additional_params(db_vnfr, vdu["id"]) or {} + + cloud_init_list = [] + for vdu_index in range(0, int(vdu.get("count", 1))): + additional_params["OSM"] = self._get_osm_params(db_vnfr, vdu["id"], vdu_index) + cloud_init_list.append(self._parse_cloud_init(cloud_init_text, additional_params, vnfd["id"], + vdu["id"])) + populate_dict(RO_ns_params, + ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu["id"], "cloud_init"), + cloud_init_list) if ns_params.get("vduImage"): RO_ns_params["vduImage"] = ns_params["vduImage"] @@ -629,11 +682,21 @@ class NsLcm(LcmBase): continue vdu_id_ref = vdur["vdu-id-ref"] if vdu_create and vdu_create.get(vdu_id_ref): + vdur_copy = deepcopy(vdur) + vdur_copy["status"] = "BUILD" + vdur_copy["status-detailed"] = None + vdur_copy["ip_address"]: None + for iface in vdur_copy["interfaces"]: + iface["ip-address"] = None + iface["mac-address"] = None + iface.pop("mgmt_vnf", None) # only first vdu can be managment of vnf # TODO ALF for index in range(0, vdu_create[vdu_id_ref]): - vdur = deepcopy(vdur) - vdur["_id"] = str(uuid4()) - vdur["count-index"] += 1 - vdurs.insert(vdu_index+1+index, vdur) + vdur_copy["_id"] = str(uuid4()) + vdur_copy["count-index"] += 1 + vdurs.insert(vdu_index+1+index, vdur_copy) + self.logger.debug("scale out, adding vdu={}".format(vdur_copy)) + vdur_copy = deepcopy(vdur_copy) + del vdu_create[vdu_id_ref] if vdu_delete and vdu_delete.get(vdu_id_ref): del vdurs[vdu_index] @@ -1236,6 +1299,34 @@ class NsLcm(LcmBase): self.set_vnfr_at_error(db_vnfrs, str(e)) raise + async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name): + """ + Wait for kdu to be up, get ip address + :param logging_text: prefix use for logging + :param nsr_id: + :param vnfr_id: + :param kdu_name: + :return: IP address + """ + + # self.logger.debug(logging_text + "Starting wait_kdu_up") + nb_tries = 0 + + while nb_tries < 360: + db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id}) + kdur = next((x for x in get_iterable(db_vnfr, "kdur") if x.get("kdu-name") == kdu_name), None) + if not kdur: + raise LcmException("Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name)) + if kdur.get("status"): + if kdur["status"] in ("READY", "ENABLED"): + return kdur.get("ip-address") + else: + raise LcmException("target KDU={} is in error state".format(kdu_name)) + + await asyncio.sleep(10, loop=self.loop) + nb_tries += 1 + raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name)) + async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None): """ Wait for ip addres at RO, and optionally, insert public key in virtual machine @@ -1581,8 +1672,11 @@ class NsLcm(LcmBase): # n2vc_redesign STEP 5.1 # wait for RO (ip-address) Insert pub_key into VM if vnfr_id: - rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, - user=user, pub_key=pub_key) + if kdu_name: + rw_mgmt_ip = await self.wait_kdu_up(logging_text, nsr_id, vnfr_id, kdu_name) + else: + rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, + vdu_index, user=user, pub_key=pub_key) else: rw_mgmt_ip = None # This is for a NS configuration @@ -1850,11 +1944,11 @@ class NsLcm(LcmBase): # wait for any previous tasks in process await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id) - stage[1] = "Sync filesystem from database" + stage[1] = "Sync filesystem from database." self.fs.sync() # TODO, make use of partial sync, only for the needed packages # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds) - stage[1] = "Reading from database" + stage[1] = "Reading from database." # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id db_nsr_update["detailed-status"] = "creating" db_nsr_update["operational-status"] = "init" @@ -1872,7 +1966,7 @@ class NsLcm(LcmBase): ) # read from db: operation - stage[1] = "Getting nslcmop={} from db".format(nslcmop_id) + stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id) db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) ns_params = db_nslcmop.get("operationParams") if ns_params and ns_params.get("timeout_ns_deploy"): @@ -1881,15 +1975,15 @@ class NsLcm(LcmBase): timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy) # read from db: ns - stage[1] = "Getting nsr={} from db".format(nsr_id) + stage[1] = "Getting nsr={} from db.".format(nsr_id) db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) - stage[1] = "Getting nsd={} from db".format(db_nsr["nsd-id"]) + stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"]) nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]}) db_nsr["nsd"] = nsd # nsr_name = db_nsr["name"] # TODO short-name?? # read from db: vnf's of this ns - stage[1] = "Getting vnfrs from db" + stage[1] = "Getting vnfrs from db." self.logger.debug(logging_text + stage[1]) db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}) @@ -1903,10 +1997,11 @@ class NsLcm(LcmBase): db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf + # if we haven't this vnfd, read it from db if vnfd_id not in db_vnfds: # read from db - stage[1] = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref) + stage[1] = "Getting vnfd={} id='{}' from db.".format(vnfd_id, vnfd_ref) self.logger.debug(logging_text + stage[1]) vnfd = self.db.get_one("vnfds", {"_id": vnfd_id}) @@ -1939,6 +2034,7 @@ class NsLcm(LcmBase): # set state to INSTANTIATED. When instantiated NBI will not delete directly db_nsr_update["_admin.nsState"] = "INSTANTIATED" self.update_db_2("nsrs", nsr_id, db_nsr_update) + self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"}) # n2vc_redesign STEP 2 Deploy Network Scenario stage[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.' @@ -1947,7 +2043,7 @@ class NsLcm(LcmBase): stage=stage ) - stage[1] = "Deploying KDUs," + stage[1] = "Deploying KDUs." # self.logger.debug(logging_text + "Before deploy_kdus") # Call to deploy_kdus in case exists the "vdu:kdu" param await self.deploy_kdus( @@ -2002,9 +2098,9 @@ class NsLcm(LcmBase): kdu_name = None # Get additional parameters - deploy_params = {} + deploy_params = {"OSM": self._get_osm_params(db_vnfr)} if db_vnfr.get("additionalParamsForVnf"): - deploy_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy()) + deploy_params.update(self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy())) descriptor_config = vnfd.get("vnf-configuration") if descriptor_config: @@ -2037,15 +2133,8 @@ class NsLcm(LcmBase): deploy_params_vdu = self._format_additional_params(vdur["additionalParams"]) else: deploy_params_vdu = deploy_params + deploy_params_vdu["OSM"] = self._get_osm_params(db_vnfr, vdu_id, vdu_count_index=0) if descriptor_config: - # look for vdu index in the db_vnfr["vdu"] section - # for vdur_index, vdur in enumerate(db_vnfr["vdur"]): - # if vdur["vdu-id-ref"] == vdu_id: - # break - # else: - # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for " - # "member_vnf_index={}".format(vdu_id, member_vnf_index)) - # vdu_name = vdur.get("name") vdu_name = None kdu_name = None for vdu_index in range(int(vdud.get("count", 1))): @@ -2077,15 +2166,10 @@ class NsLcm(LcmBase): vdu_id = None vdu_index = 0 vdu_name = None - # look for vdu index in the db_vnfr["vdu"] section - # for vdur_index, vdur in enumerate(db_vnfr["vdur"]): - # if vdur["vdu-id-ref"] == vdu_id: - # break - # else: - # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for " - # "member_vnf_index={}".format(vdu_id, member_vnf_index)) - # vdu_name = vdur.get("name") - # vdu_name = None + kdur = next(x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name) + deploy_params_kdu = {"OSM": self._get_osm_params(db_vnfr)} + if kdur.get("additionalParams"): + deploy_params_kdu = self._format_additional_params(kdur["additionalParams"]) self._deploy_n2vc( logging_text=logging_text, @@ -2100,7 +2184,7 @@ class NsLcm(LcmBase): member_vnf_index=member_vnf_index, vdu_index=vdu_index, vdu_name=vdu_name, - deploy_params=deploy_params, + deploy_params=deploy_params_kdu, descriptor_config=descriptor_config, base_folder=base_folder, task_instantiation_info=tasks_dict_info, @@ -2119,9 +2203,9 @@ class NsLcm(LcmBase): vdu_name = None # Get additional parameters - deploy_params = {} + deploy_params = {"OSM": self._get_osm_params(db_vnfr)} if db_nsr.get("additionalParamsForNs"): - deploy_params = self._format_additional_params(db_nsr["additionalParamsForNs"].copy()) + deploy_params.update(self._format_additional_params(db_nsr["additionalParamsForNs"].copy())) base_folder = nsd["_admin"]["storage"] self._deploy_n2vc( logging_text=logging_text, @@ -2188,8 +2272,8 @@ class NsLcm(LcmBase): if error_list: error_detail = ". ".join(error_list) self.logger.error(logging_text + error_detail) - error_description_nslcmop = 'Stage: {}. Detail: {}'.format(stage[0], error_detail) - error_description_nsr = 'Operation: INSTANTIATING.{}, Stage {}'.format(nslcmop_id, stage[0]) + error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail) + error_description_nsr = 'Operation: INSTANTIATING.{}, {}'.format(nslcmop_id, stage[0]) db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail db_nslcmop_update["detailed-status"] = error_detail @@ -2295,7 +2379,7 @@ class NsLcm(LcmBase): db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) # for each defined NS relation, find the VCA's related - for r in ns_relations: + for r in ns_relations.copy(): from_vca_ee_id = None to_vca_ee_id = None from_vca_endpoint = None @@ -2340,17 +2424,20 @@ class NsLcm(LcmBase): pass # for each defined VNF relation, find the VCA's related - for r in vnf_relations: + for r in vnf_relations.copy(): from_vca_ee_id = None to_vca_ee_id = None from_vca_endpoint = None to_vca_endpoint = None vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA')) for vca in vca_list: - if vca.get('vdu_id') == r.get('entities')[0].get('id') and vca.get('config_sw_installed'): + key_to_check = "vdu_id" + if vca.get("vdu_id") is None: + key_to_check = "vnfd_id" + if vca.get(key_to_check) == r.get('entities')[0].get('id') and vca.get('config_sw_installed'): from_vca_ee_id = vca.get('ee_id') from_vca_endpoint = r.get('entities')[0].get('endpoint') - if vca.get('vdu_id') == r.get('entities')[1].get('id') and vca.get('config_sw_installed'): + if vca.get(key_to_check) == r.get('entities')[1].get('id') and vca.get('config_sw_installed'): to_vca_ee_id = vca.get('ee_id') to_vca_endpoint = r.get('entities')[1].get('endpoint') if from_vca_ee_id and to_vca_ee_id: @@ -2373,11 +2460,11 @@ class NsLcm(LcmBase): if vca.get('vdu_id') == r.get('entities')[0].get('id'): if vca_status.get('status') == 'BROKEN': # peer broken: remove relation from list - ns_relations.remove(r) + vnf_relations.remove(r) if vca.get('vdu_id') == r.get('entities')[1].get('id'): if vca_status.get('status') == 'BROKEN': # peer broken: remove relation from list - ns_relations.remove(r) + vnf_relations.remove(r) except Exception: # ignore pass @@ -2395,41 +2482,115 @@ class NsLcm(LcmBase): self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e)) return False - def _write_db_callback(self, task, item, _id, on_done=None, on_exc=None): - """ - callback for kdu install intended to store the returned kdu_instance at database - :return: None - """ - db_update = {} + async def _install_kdu(self, nsr_id: str, nsr_db_path: str, vnfr_data: dict, kdu_index: int, kdud: dict, + vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600): + try: - result = task.result() - if on_done: - db_update[on_done] = str(result) + k8sclustertype = k8s_instance_info["k8scluster-type"] + # Instantiate kdu + db_dict_install = {"collection": "nsrs", + "filter": {"_id": nsr_id}, + "path": nsr_db_path} + + kdu_instance = await self.k8scluster_map[k8sclustertype].install( + cluster_uuid=k8s_instance_info["k8scluster-uuid"], + kdu_model=k8s_instance_info["kdu-model"], + atomic=True, + params=k8params, + db_dict=db_dict_install, + timeout=timeout, + kdu_name=k8s_instance_info["kdu-name"], + namespace=k8s_instance_info["namespace"]) + self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}) + + # Obtain services to obtain management service ip + services = await self.k8scluster_map[k8sclustertype].get_services( + cluster_uuid=k8s_instance_info["k8scluster-uuid"], + kdu_instance=kdu_instance, + namespace=k8s_instance_info["namespace"]) + + # Obtain management service info (if exists) + vnfr_update_dict = {} + if services: + vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services + mgmt_services = [service for service in kdud.get("service", []) if service.get("mgmt-service")] + for mgmt_service in mgmt_services: + for service in services: + if service["name"].startswith(mgmt_service["name"]): + # Mgmt service found, Obtain service ip + ip = service.get("external_ip", service.get("cluster_ip")) + if isinstance(ip, list) and len(ip) == 1: + ip = ip[0] + + vnfr_update_dict["kdur.{}.ip-address".format(kdu_index)] = ip + + # Check if must update also mgmt ip at the vnf + service_external_cp = mgmt_service.get("external-connection-point-ref") + if service_external_cp: + if deep_get(vnfd, ("mgmt-interface", "cp")) == service_external_cp: + vnfr_update_dict["ip-address"] = ip + + break + else: + self.logger.warn("Mgmt service name: {} not found".format(mgmt_service["name"])) + + vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY" + self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict) + + kdu_config = kdud.get("kdu-configuration") + if kdu_config and kdu_config.get("initial-config-primitive") and kdu_config.get("juju") is None: + initial_config_primitive_list = kdu_config.get("initial-config-primitive") + initial_config_primitive_list.sort(key=lambda val: int(val["seq"])) + + for initial_config_primitive in initial_config_primitive_list: + primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, {}) + + await asyncio.wait_for( + self.k8scluster_map[k8sclustertype].exec_primitive( + cluster_uuid=k8s_instance_info["k8scluster-uuid"], + kdu_instance=kdu_instance, + primitive_name=initial_config_primitive["name"], + params=primitive_params_, db_dict={}), + timeout=timeout) + except Exception as e: - if on_exc: - db_update[on_exc] = str(e) - if db_update: + # Prepare update db with error and raise exception try: - self.update_db_2(item, _id, db_update) + self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}) + self.update_db_2("vnfrs", vnfr_data.get("_id"), {"kdur.{}.status".format(kdu_index): "ERROR"}) except Exception: + # ignore to keep original exception pass + # reraise original error + raise + + return kdu_instance async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info): # Launch kdus if present in the descriptor k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}} - def _get_cluster_id(cluster_id, cluster_type): + async def _get_cluster_id(cluster_id, cluster_type): nonlocal k8scluster_id_2_uuic if cluster_id in k8scluster_id_2_uuic[cluster_type]: return k8scluster_id_2_uuic[cluster_type][cluster_id] + # check if K8scluster is creating and wait look if previous tasks in process + task_name, task_dependency = self.lcm_tasks.lookfor_related("k8scluster", cluster_id) + if task_dependency: + text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(task_name, cluster_id) + self.logger.debug(logging_text + text) + await asyncio.wait(task_dependency, timeout=3600) + db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False) if not db_k8scluster: raise LcmException("K8s cluster {} cannot be found".format(cluster_id)) + k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id")) if not k8s_id: - raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type)) + raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id, + cluster_type)) k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id return k8s_id @@ -2443,9 +2604,11 @@ class NsLcm(LcmBase): updated_cluster_list = [] for vnfr_data in db_vnfrs.values(): - for kdur in get_iterable(vnfr_data, "kdur"): + for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")): + # Step 0: Prepare and set parameters desc_params = self._format_additional_params(kdur.get("additionalParams")) vnfd_id = vnfr_data.get('vnfd-id') + kdud = next(kdud for kdud in db_vnfds[vnfd_id]["kdu"] if kdud["name"] == kdur["kdu-name"]) namespace = kdur.get("k8s-namespace") if kdur.get("helm-chart"): kdumodel = kdur["helm-chart"] @@ -2473,8 +2636,9 @@ class NsLcm(LcmBase): k8s_cluster_id = kdur["k8s-cluster"]["id"] step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id) - cluster_uuid = _get_cluster_id(k8s_cluster_id, k8sclustertype) + cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype) + # Synchronize repos if k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list: del_repo_list, added_repo_dict = await asyncio.ensure_future( self.k8sclusterhelm.synchronize_repos(cluster_uuid=cluster_uuid)) @@ -2488,33 +2652,23 @@ class NsLcm(LcmBase): self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset) updated_cluster_list.append(cluster_uuid) + # Instantiate kdu step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"], k8s_cluster_id) - - k8s_instace_info = {"kdu-instance": None, - "k8scluster-uuid": cluster_uuid, - "k8scluster-type": k8sclustertype, - "member-vnf-index": vnfr_data["member-vnf-index-ref"], - "kdu-name": kdur["kdu-name"], - "kdu-model": kdumodel, - "namespace": namespace} + k8s_instance_info = {"kdu-instance": None, + "k8scluster-uuid": cluster_uuid, + "k8scluster-type": k8sclustertype, + "member-vnf-index": vnfr_data["member-vnf-index-ref"], + "kdu-name": kdur["kdu-name"], + "kdu-model": kdumodel, + "namespace": namespace} db_path = "_admin.deployed.K8s.{}".format(index) - db_nsr_update[db_path] = k8s_instace_info + db_nsr_update[db_path] = k8s_instance_info self.update_db_2("nsrs", nsr_id, db_nsr_update) - db_dict = {"collection": "nsrs", - "filter": {"_id": nsr_id}, - "path": db_path} - task = asyncio.ensure_future( - self.k8scluster_map[k8sclustertype].install(cluster_uuid=cluster_uuid, kdu_model=kdumodel, - atomic=True, params=desc_params, - db_dict=db_dict, timeout=600, - kdu_name=kdur["kdu-name"], namespace=namespace)) - - task.add_done_callback(partial(self._write_db_callback, item="nsrs", _id=nsr_id, - on_done=db_path + ".kdu-instance", - on_exc=db_path + ".detailed-status")) + self._install_kdu(nsr_id, db_path, vnfr_data, kdu_index, kdud, db_vnfds[vnfd_id], + k8s_instance_info, k8params=desc_params, timeout=600)) self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task) task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"]) @@ -3153,7 +3307,6 @@ class NsLcm(LcmBase): self.logger.debug(logging_text + stage[1]) # self.logger.debug("nsr_deployed: {}".format(nsr_deployed)) for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")): - self.logger.debug("vca_index: {}, vca: {}".format(vca_index, vca)) config_descriptor = None if not vca or not vca.get("ee_id"): continue @@ -3175,8 +3328,11 @@ class NsLcm(LcmBase): vca_type = vca.get("type") exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and vca.get("needed_terminate")) - # For helm we must destroy_ee - destroy_ee = "True" if vca_type == "helm" else "False" + # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are + # pending native charms + destroy_ee = True if vca_type in ("helm", "native_charm") else False + # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format( + # vca_index, vca.get("ee_id"), vca_type, destroy_ee)) task = asyncio.ensure_future( self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, vca_index, destroy_ee, exec_terminate_primitives)) @@ -3184,13 +3340,13 @@ class NsLcm(LcmBase): # wait for pending tasks of terminate primitives if tasks_dict_info: - self.logger.debug(logging_text + 'Waiting for terminate primitive pending tasks...') + self.logger.debug(logging_text + 'Waiting for tasks {}'.format(list(tasks_dict_info.keys()))) error_list = await self._wait_for_tasks(logging_text, tasks_dict_info, min(self.timeout_charm_delete, timeout_ns_terminate), stage, nslcmop_id) + tasks_dict_info.clear() if error_list: return # raise LcmException("; ".join(error_list)) - tasks_dict_info.clear() # remove All execution environments at once stage[0] = "Stage 3/3 delete all." @@ -3263,8 +3419,8 @@ class NsLcm(LcmBase): if error_list: error_detail = "; ".join(error_list) # self.logger.error(logging_text + error_detail) - error_description_nslcmop = 'Stage: {}. Detail: {}'.format(stage[0], error_detail) - error_description_nsr = 'Operation: TERMINATING.{}, Stage {}.'.format(nslcmop_id, stage[0]) + error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail) + error_description_nsr = 'Operation: TERMINATING.{}, {}.'.format(nslcmop_id, stage[0]) db_nsr_update["operational-status"] = "failed" db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail @@ -3298,6 +3454,12 @@ class NsLcm(LcmBase): operation_state=nslcmop_operation_state, other_update=db_nslcmop_update, ) + if ns_state == "NOT_INSTANTIATED": + try: + self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "NOT_INSTANTIATED"}) + except DbException as e: + self.logger.warn(logging_text + 'Error writing VNFR status for nsr-id-ref: {} -> {}'. + format(nsr_id, e)) if operation_params: autoremove = operation_params.get("autoremove", False) if nslcmop_operation_state: @@ -3361,8 +3523,7 @@ class NsLcm(LcmBase): self._write_op_status(nslcmop_id, stage) return error_detail_list - @staticmethod - def _map_primitive_params(primitive_desc, params, instantiation_params): + def _map_primitive_params(self, primitive_desc, params, instantiation_params): """ Generates the params to be provided to charm before executing primitive. If user does not provide a parameter, The default-value is used. If it is between < > it look for a value at instantiation_params @@ -3397,11 +3558,20 @@ class NsLcm(LcmBase): width=256) elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "): calculated_params[param_name] = calculated_params[param_name][7:] + if parameter.get("data-type") == "INTEGER": + try: + calculated_params[param_name] = int(calculated_params[param_name]) + except ValueError: # error converting string to int + raise LcmException( + "Parameter {} of primitive {} must be integer".format(param_name, primitive_desc["name"])) + elif parameter.get("data-type") == "BOOLEAN": + calculated_params[param_name] = not ((str(calculated_params[param_name])).lower() == 'false') # add always ns_config_info if primitive name is config if primitive_desc["name"] == "config": if "ns_config_info" in instantiation_params: calculated_params["ns_config_info"] = instantiation_params["ns_config_info"] + calculated_params["VCA"] = self.vca_config return calculated_params def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_count_index, kdu_name=None, @@ -3556,11 +3726,15 @@ class NsLcm(LcmBase): config_primitive_desc = config_primitive break - if not config_primitive_desc and not (kdu_name and primitive in ("upgrade", "rollback", "status")): - raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ". - format(primitive)) - primitive_name = config_primitive_desc.get("execution-environment-primitive", primitive) - ee_descriptor_id = config_primitive_desc.get("execution-environment-ref") + if not config_primitive_desc: + if not (kdu_name and primitive in ("upgrade", "rollback", "status")): + raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ". + format(primitive)) + primitive_name = primitive + ee_descriptor_id = None + else: + primitive_name = config_primitive_desc.get("execution-environment-primitive", primitive) + ee_descriptor_id = config_primitive_desc.get("execution-environment-ref") if vnf_index: if vdu_id: @@ -3738,7 +3912,6 @@ class NsLcm(LcmBase): scale_process = None old_operational_status = "" old_config_status = "" - vnfr_scaled = False try: # wait for any previous tasks in process step = "Waiting for previous operations to terminate" @@ -3833,8 +4006,25 @@ class NsLcm(LcmBase): vdu_scaling_info["scaling_direction"] = "OUT" vdu_scaling_info["vdu-create"] = {} for vdu_scale_info in scaling_descriptor["vdu"]: + vdud = next(vdu for vdu in db_vnfd.get("vdu") if vdu["id"] == vdu_scale_info["vdu-id-ref"]) + vdu_index = len([x for x in db_vnfr.get("vdur", ()) + if x.get("vdu-id-ref") == vdu_scale_info["vdu-id-ref"] and + x.get("member-vnf-index-ref") == vnf_index]) + cloud_init_text = self._get_cloud_init(vdud, db_vnfd) + if cloud_init_text: + additional_params = self._get_vdu_additional_params(db_vnfr, vdud["id"]) or {} + cloud_init_list = [] + for x in range(vdu_scale_info.get("count", 1)): + if cloud_init_text: + # TODO Information of its own ip is not available because db_vnfr is not updated. + additional_params["OSM"] = self._get_osm_params(db_vnfr, vdu_scale_info["vdu-id-ref"], + vdu_index + x) + cloud_init_list.append(self._parse_cloud_init(cloud_init_text, additional_params, + db_vnfd["id"], vdud["id"])) RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index, "type": "create", "count": vdu_scale_info.get("count", 1)}) + if cloud_init_list: + RO_scaling_info[-1]["cloud_init"] = cloud_init_list vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1) elif scaling_type == "SCALE_IN": @@ -3983,7 +4173,7 @@ class NsLcm(LcmBase): db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id RO_task_done = False - step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id) + step = detailed_status = "Waiting for VIM to scale. RO_task_id={}.".format(RO_nslcmop_id) detailed_status_old = None self.logger.debug(logging_text + step) @@ -4003,11 +4193,16 @@ class NsLcm(LcmBase): detailed_status = step + "; {}".format(ns_status_info) elif ns_status == "ACTIVE": RO_task_done = True + self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete) step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id) self.logger.debug(logging_text + step) else: assert False, "ROclient.check_action_status returns unknown {}".format(ns_status) else: + desc = await self.RO.show("ns", RO_nsr_id) + ns_status, ns_status_info = self.RO.check_ns_status(desc) + # deploymentStatus + self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc) if ns_status == "ERROR": raise ROclient.ROClientException(ns_status_info) @@ -4016,15 +4211,7 @@ class NsLcm(LcmBase): elif ns_status == "ACTIVE": step = detailed_status = \ "Waiting for management IP address reported by the VIM. Updating VNFRs" - if not vnfr_scaled: - self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete) - vnfr_scaled = True try: - desc = await self.RO.show("ns", RO_nsr_id) - - # deploymentStatus - self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc) - # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc) self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc) break