X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Fns.py;h=62f010f1b25800b297c673eeb10205b881b9e0d7;hb=89f8290b70918e151e6b6653c635ea6a05a22522;hp=3951ae568c21bf17544e9f3ac95177c08f4d5822;hpb=eccc21010cd4aedd32763acee92fd63cf061cfd1;p=osm%2FLCM.git diff --git a/osm_lcm/ns.py b/osm_lcm/ns.py index 3951ae5..62f010f 100644 --- a/osm_lcm/ns.py +++ b/osm_lcm/ns.py @@ -36,15 +36,47 @@ from osm_common.fsbase import FsException from n2vc.n2vc_juju_conn import N2VCJujuConnector from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException +from osm_lcm.lcm_helm_conn import LCMHelmConn + from copy import copy, deepcopy from http import HTTPStatus from time import time from uuid import uuid4 from functools import partial +from random import randint __author__ = "Alfonso Tierno " +class N2VCJujuConnectorLCM(N2VCJujuConnector): + + async def create_execution_environment(self, namespace: str, db_dict: dict, reuse_ee_id: str = None, + progress_timeout: float = None, total_timeout: float = None, + config: dict = None, artifact_path: str = None, + vca_type: str = None) -> (str, dict): + # admit two new parameters, artifact_path and vca_type + if vca_type == "k8s_proxy_charm": + ee_id = await self.n2vc.install_k8s_proxy_charm( + charm_name=artifact_path[artifact_path.rfind("/") + 1:], + namespace=namespace, + artifact_path=artifact_path, + db_dict=db_dict) + return ee_id, None + else: + return await super().create_execution_environment( + namespace=namespace, db_dict=db_dict, reuse_ee_id=reuse_ee_id, + progress_timeout=progress_timeout, total_timeout=total_timeout) + + async def install_configuration_sw(self, ee_id: str, artifact_path: str, db_dict: dict, + progress_timeout: float = None, total_timeout: float = None, + config: dict = None, num_units: int = 1, vca_type: str = "lxc_proxy_charm"): + if vca_type == "k8s_proxy_charm": + return + return await super().install_configuration_sw( + ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict, progress_timeout=progress_timeout, + total_timeout=total_timeout, config=config, num_units=num_units) + + class NsLcm(LcmBase): timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns @@ -58,7 +90,7 @@ class NsLcm(LcmBase): SUBOPERATION_STATUS_SKIP = -3 task_name_deploy_vca = "Deploying VCA" - def __init__(self, db, msg, fs, lcm_tasks, config, loop): + def __init__(self, db, msg, fs, lcm_tasks, config, loop, prometheus=None): """ Init, Connect to database, filesystem storage, and messaging :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', @@ -79,7 +111,7 @@ class NsLcm(LcmBase): self.vca_config = config["VCA"].copy() # create N2VC connector - self.n2vc = N2VCJujuConnector( + self.n2vc = N2VCJujuConnectorLCM( db=self.db, fs=self.fs, log=self.logger, @@ -90,6 +122,17 @@ class NsLcm(LcmBase): on_update_db=self._on_update_n2vc_db ) + self.conn_helm_ee = LCMHelmConn( + db=self.db, + fs=self.fs, + log=self.logger, + loop=self.loop, + url=None, + username=None, + vca_config=self.vca_config, + on_update_db=self._on_update_n2vc_db + ) + self.k8sclusterhelm = K8sHelmConnector( kubectl_command=self.vca_config.get("kubectlpath"), helm_command=self.vca_config.get("helmpath"), @@ -114,6 +157,16 @@ class NsLcm(LcmBase): "juju-bundle": self.k8sclusterjuju, "juju": self.k8sclusterjuju, } + + self.vca_map = { + "lxc_proxy_charm": self.n2vc, + "native_charm": self.n2vc, + "k8s_proxy_charm": self.n2vc, + "helm": self.conn_helm_ee + } + + self.prometheus = prometheus + # create RO client if self.ng_ro: self.RO = NgRoClient(self.loop, **self.ro_config) @@ -1320,11 +1373,13 @@ class NsLcm(LcmBase): raise LcmException("Configuration aborted because dependent charm/s timeout") async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index, - config_descriptor, deploy_params, base_folder, nslcmop_id, stage): + config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name, + ee_config_descriptor): nsr_id = db_nsr["_id"] db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index) vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"] vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index] + osm_config = {"osm": {"ns_id": db_nsr["_id"]}} db_dict = { 'collection': 'nsrs', 'filter': {'_id': nsr_id}, @@ -1339,6 +1394,7 @@ class NsLcm(LcmBase): vnfr_id = None if db_vnfr: vnfr_id = db_vnfr["_id"] + osm_config["osm"]["vnf_id"] = vnfr_id namespace = "{nsi}.{ns}".format( nsi=nsi_id if nsi_id else "", @@ -1352,88 +1408,93 @@ class NsLcm(LcmBase): namespace += ".{}-{}".format(vdu_id, vdu_index or 0) element_type = 'VDU' element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0) + osm_config["osm"]["vdu_id"] = vdu_id elif kdu_name: namespace += ".{}".format(kdu_name) element_type = 'KDU' element_under_configuration = kdu_name + osm_config["osm"]["kdu_name"] = kdu_name # Get artifact path - artifact_path = "{}/{}/charms/{}".format( + artifact_path = "{}/{}/{}/{}".format( base_folder["folder"], base_folder["pkg-dir"], - config_descriptor["juju"]["charm"] + "charms" if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts", + vca_name ) - is_proxy_charm = deep_get(config_descriptor, ('juju', 'charm')) is not None - is_k8s_proxy_charm = False - - if deep_get(config_descriptor, ('juju', 'proxy')) is False: - is_proxy_charm = False - - if deep_get(config_descriptor, ('juju', 'k8s')) is True and is_proxy_charm: - is_k8s_proxy_charm = True + # n2vc_redesign STEP 3.1 - if not is_k8s_proxy_charm: - # n2vc_redesign STEP 3.1 + # find old ee_id if exists + ee_id = vca_deployed.get("ee_id") - # find old ee_id if exists - ee_id = vca_deployed.get("ee_id") + # create or register execution environment in VCA + if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm"): - # create or register execution environment in VCA - if is_proxy_charm: + self._write_configuration_status( + nsr_id=nsr_id, + vca_index=vca_index, + status='CREATING', + element_under_configuration=element_under_configuration, + element_type=element_type + ) - self._write_configuration_status( - nsr_id=nsr_id, - vca_index=vca_index, - status='CREATING', - element_under_configuration=element_under_configuration, - element_type=element_type - ) + step = "create execution environment" + self.logger.debug(logging_text + step) + ee_id, credentials = await self.vca_map[vca_type].create_execution_environment( + namespace=namespace, + reuse_ee_id=ee_id, + db_dict=db_dict, + config=osm_config, + artifact_path=artifact_path, + vca_type=vca_type) - step = "create execution environment" - self.logger.debug(logging_text + step) - ee_id, credentials = await self.n2vc.create_execution_environment(namespace=namespace, - reuse_ee_id=ee_id, - db_dict=db_dict) + elif vca_type == "native_charm": + step = "Waiting to VM being up and getting IP address" + self.logger.debug(logging_text + step) + rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, + user=None, pub_key=None) + credentials = {"hostname": rw_mgmt_ip} + # get username + username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user")) + # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were + # merged. Meanwhile let's get username from initial-config-primitive + if not username and config_descriptor.get("initial-config-primitive"): + for config_primitive in config_descriptor["initial-config-primitive"]: + for param in config_primitive.get("parameter", ()): + if param["name"] == "ssh-username": + username = param["value"] + break + if not username: + raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with " + "'config-access.ssh-access.default-user'") + credentials["username"] = username + # n2vc_redesign STEP 3.2 - else: - step = "Waiting to VM being up and getting IP address" - self.logger.debug(logging_text + step) - rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, - user=None, pub_key=None) - credentials = {"hostname": rw_mgmt_ip} - # get username - username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user")) - # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were - # merged. Meanwhile let's get username from initial-config-primitive - if not username and config_descriptor.get("initial-config-primitive"): - for config_primitive in config_descriptor["initial-config-primitive"]: - for param in config_primitive.get("parameter", ()): - if param["name"] == "ssh-username": - username = param["value"] - break - if not username: - raise LcmException("Cannot determine the username neither with" - "'initial-config-promitive' nor with " - "'config-access.ssh-access.default-user'") - credentials["username"] = username - # n2vc_redesign STEP 3.2 - - self._write_configuration_status( - nsr_id=nsr_id, - vca_index=vca_index, - status='REGISTERING', - element_under_configuration=element_under_configuration, - element_type=element_type - ) + self._write_configuration_status( + nsr_id=nsr_id, + vca_index=vca_index, + status='REGISTERING', + element_under_configuration=element_under_configuration, + element_type=element_type + ) - step = "register execution environment {}".format(credentials) - self.logger.debug(logging_text + step) - ee_id = await self.n2vc.register_execution_environment(credentials=credentials, namespace=namespace, - db_dict=db_dict) + step = "register execution environment {}".format(credentials) + self.logger.debug(logging_text + step) + ee_id = await self.vca_map[vca_type].register_execution_environment( + credentials=credentials, namespace=namespace, db_dict=db_dict) + + # for compatibility with MON/POL modules, the need model and application name at database + # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name" + ee_id_parts = ee_id.split('.') + db_nsr_update = {db_update_entry + "ee_id": ee_id} + if len(ee_id_parts) >= 2: + model_name = ee_id_parts[0] + application_name = ee_id_parts[1] + db_nsr_update[db_update_entry + "model"] = model_name + db_nsr_update[db_update_entry + "application"] = application_name # n2vc_redesign STEP 3.3 - step = "Install configuration Software" self._write_configuration_status( @@ -1442,12 +1503,13 @@ class NsLcm(LcmBase): status='INSTALLING SW', element_under_configuration=element_under_configuration, element_type=element_type, + other_update=db_nsr_update ) # TODO check if already done self.logger.debug(logging_text + step) config = None - if not is_proxy_charm: + if vca_type == "native_charm": initial_config_primitive_list = config_descriptor.get('initial-config-primitive') if initial_config_primitive_list: for primitive in initial_config_primitive_list: @@ -1458,55 +1520,49 @@ class NsLcm(LcmBase): deploy_params ) break - if is_k8s_proxy_charm: - charm_name = deep_get(config_descriptor, ('juju', 'charm')) - self.logger.debug("Installing K8s Proxy Charm: {}".format(charm_name)) - - ee_id = await self.n2vc.install_k8s_proxy_charm( - charm_name=charm_name, - namespace=namespace, - artifact_path=artifact_path, - db_dict=db_dict - ) - else: - num_units = 1 - if is_proxy_charm: - if element_type == "NS": - num_units = db_nsr.get("config-units") or 1 - elif element_type == "VNF": - num_units = db_vnfr.get("config-units") or 1 - elif element_type == "VDU": - for v in db_vnfr["vdur"]: - if vdu_id == v["vdu-id-ref"]: - num_units = v.get("config-units") or 1 - break + num_units = 1 + if vca_type == "lxc_proxy_charm": + if element_type == "NS": + num_units = db_nsr.get("config-units") or 1 + elif element_type == "VNF": + num_units = db_vnfr.get("config-units") or 1 + elif element_type == "VDU": + for v in db_vnfr["vdur"]: + if vdu_id == v["vdu-id-ref"]: + num_units = v.get("config-units") or 1 + break - await self.n2vc.install_configuration_sw( - ee_id=ee_id, - artifact_path=artifact_path, - db_dict=db_dict, - config=config, - num_units=num_units - ) + await self.vca_map[vca_type].install_configuration_sw( + ee_id=ee_id, + artifact_path=artifact_path, + db_dict=db_dict, + config=config, + num_units=num_units, + vca_type=vca_type + ) # write in db flag of configuration_sw already installed self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}) # add relations for this VCA (wait for other peers related with this VCA) - await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id, vca_index=vca_index) + await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id, + vca_index=vca_index, vca_type=vca_type) # if SSH access is required, then get execution environment SSH public - if is_proxy_charm: # if native charm we have waited already to VM be UP + if vca_type in ("lxc_proxy_charm", "helm"): # if native charm we have waited already to VM be UP pub_key = None user = None + # self.logger.debug("get ssh key block") if deep_get(config_descriptor, ("config-access", "ssh-access", "required")): + # self.logger.debug("ssh key needed") # Needed to inject a ssh key user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user")) step = "Install configuration Software, getting public ssh key" - pub_key = await self.n2vc.get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict) + pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict) step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key) else: + # self.logger.debug("no need to get ssh key") step = "Waiting to VM being up and getting IP address" self.logger.debug(logging_text + step) @@ -1577,7 +1633,7 @@ class NsLcm(LcmBase): step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_) self.logger.debug(logging_text + step) - await self.n2vc.exec_primitive( + await self.vca_map[vca_type].exec_primitive( ee_id=ee_id, primitive_name=initial_config_primitive["name"], params_dict=primitive_params_, @@ -1591,6 +1647,19 @@ class NsLcm(LcmBase): # TODO register in database that primitive is done + # STEP 7 Configure metrics + if vca_type == "helm": + prometheus_jobs = await self.add_prometheus_metrics( + ee_id=ee_id, + artifact_path=artifact_path, + ee_config_descriptor=ee_config_descriptor, + vnfr_id=vnfr_id, + nsr_id=nsr_id, + target_ip=rw_mgmt_ip, + ) + if prometheus_jobs: + self.update_db_2("nsrs", nsr_id, {db_update_entry + "prometheus_jobs": prometheus_jobs}) + step = "instantiated at VCA" self.logger.debug(logging_text + step) @@ -1940,7 +2009,7 @@ class NsLcm(LcmBase): deploy_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy()) descriptor_config = vnfd.get("vnf-configuration") - if descriptor_config and descriptor_config.get("juju"): + if descriptor_config: self._deploy_n2vc( logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index), db_nsr=db_nsr, @@ -1970,7 +2039,7 @@ class NsLcm(LcmBase): deploy_params_vdu = self._format_additional_params(vdur["additionalParams"]) else: deploy_params_vdu = deploy_params - if descriptor_config and descriptor_config.get("juju"): + if descriptor_config: # look for vdu index in the db_vnfr["vdu"] section # for vdur_index, vdur in enumerate(db_vnfr["vdur"]): # if vdur["vdu-id-ref"] == vdu_id: @@ -2006,7 +2075,7 @@ class NsLcm(LcmBase): for kdud in get_iterable(vnfd, 'kdu'): kdu_name = kdud["name"] descriptor_config = kdud.get('kdu-configuration') - if descriptor_config and descriptor_config.get("juju"): + if descriptor_config: vdu_id = None vdu_index = 0 vdu_name = None @@ -2165,7 +2234,8 @@ class NsLcm(LcmBase): self.logger.debug(logging_text + "Exit") self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate") - async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int, timeout: int = 3600) -> bool: + async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int, + timeout: int = 3600, vca_type: str = None) -> bool: # steps: # 1. find all relations for this VCA @@ -2173,6 +2243,7 @@ class NsLcm(LcmBase): # 3. add relations try: + vca_type = vca_type or "lxc_proxy_charm" # STEP 1: find all relations for this VCA @@ -2243,7 +2314,7 @@ class NsLcm(LcmBase): to_vca_endpoint = r.get('entities')[1].get('endpoint') if from_vca_ee_id and to_vca_ee_id: # add relation - await self.n2vc.add_relation( + await self.vca_map[vca_type].add_relation( ee_id_1=from_vca_ee_id, ee_id_2=to_vca_ee_id, endpoint_1=from_vca_endpoint, @@ -2286,7 +2357,7 @@ class NsLcm(LcmBase): to_vca_endpoint = r.get('entities')[1].get('endpoint') if from_vca_ee_id and to_vca_ee_id: # add relation - await self.n2vc.add_relation( + await self.vca_map[vca_type].add_relation( ee_id_1=from_vca_ee_id, ee_id_2=to_vca_ee_id, endpoint_1=from_vca_endpoint, @@ -2470,62 +2541,91 @@ class NsLcm(LcmBase): # launch instantiate_N2VC in a asyncio task and register task object # Look where information of this charm is at database ._admin.deployed.VCA # if not found, create one entry and update database - # fill db_nsr._admin.deployed.VCA. - vca_index = -1 - for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]): - if not vca_deployed: - continue - if vca_deployed.get("member-vnf-index") == member_vnf_index and \ - vca_deployed.get("vdu_id") == vdu_id and \ - vca_deployed.get("kdu_name") == kdu_name and \ - vca_deployed.get("vdu_count_index", 0) == vdu_index: - break - else: - # not found, create one. - vca_deployed = { - "member-vnf-index": member_vnf_index, - "vdu_id": vdu_id, - "kdu_name": kdu_name, - "vdu_count_index": vdu_index, - "operational-status": "init", # TODO revise - "detailed-status": "", # TODO revise - "step": "initial-deploy", # TODO revise - "vnfd_id": vnfd_id, - "vdu_name": vdu_name, - } - vca_index += 1 - # create VCA and configurationStatus in db - db_dict = { - "_admin.deployed.VCA.{}".format(vca_index): vca_deployed, - "configurationStatus.{}".format(vca_index): dict() - } - self.update_db_2("nsrs", nsr_id, db_dict) - - db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed) + self.logger.debug(logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)) + if descriptor_config.get("juju"): # There is one execution envioronment of type juju + ee_list = [descriptor_config] + elif descriptor_config.get("execution-environment-list"): + ee_list = descriptor_config.get("execution-environment-list") + else: # other types as script are not supported + ee_list = [] + + for ee_item in ee_list: + self.logger.debug(logging_text + "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item.get('juju'), + ee_item.get("helm-chart"))) + if ee_item.get("juju"): + vca_name = ee_item['juju'].get('charm') + vca_type = "lxc_proxy_charm" if ee_item['juju'].get('charm') is not None else "native_charm" + if ee_item['juju'].get('cloud') == "k8s": + vca_type = "k8s_proxy_charm" + elif ee_item['juju'].get('proxy') is False: + vca_type = "native_charm" + elif ee_item.get("helm-chart"): + vca_name = ee_item['helm-chart'] + vca_type = "helm" + else: + self.logger.debug(logging_text + "skipping non juju neither charm configuration") + continue - # Launch task - task_n2vc = asyncio.ensure_future( - self.instantiate_N2VC( - logging_text=logging_text, - vca_index=vca_index, - nsi_id=nsi_id, - db_nsr=db_nsr, - db_vnfr=db_vnfr, - vdu_id=vdu_id, - kdu_name=kdu_name, - vdu_index=vdu_index, - deploy_params=deploy_params, - config_descriptor=descriptor_config, - base_folder=base_folder, - nslcmop_id=nslcmop_id, - stage=stage + vca_index = -1 + for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]): + if not vca_deployed: + continue + if vca_deployed.get("member-vnf-index") == member_vnf_index and \ + vca_deployed.get("vdu_id") == vdu_id and \ + vca_deployed.get("kdu_name") == kdu_name and \ + vca_deployed.get("vdu_count_index", 0) == vdu_index: + break + else: + # not found, create one. + vca_deployed = { + "member-vnf-index": member_vnf_index, + "vdu_id": vdu_id, + "kdu_name": kdu_name, + "vdu_count_index": vdu_index, + "operational-status": "init", # TODO revise + "detailed-status": "", # TODO revise + "step": "initial-deploy", # TODO revise + "vnfd_id": vnfd_id, + "vdu_name": vdu_name, + "type": vca_type + } + vca_index += 1 + + # create VCA and configurationStatus in db + db_dict = { + "_admin.deployed.VCA.{}".format(vca_index): vca_deployed, + "configurationStatus.{}".format(vca_index): dict() + } + self.update_db_2("nsrs", nsr_id, db_dict) + + db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed) + + # Launch task + task_n2vc = asyncio.ensure_future( + self.instantiate_N2VC( + logging_text=logging_text, + vca_index=vca_index, + nsi_id=nsi_id, + db_nsr=db_nsr, + db_vnfr=db_vnfr, + vdu_id=vdu_id, + kdu_name=kdu_name, + vdu_index=vdu_index, + deploy_params=deploy_params, + config_descriptor=descriptor_config, + base_folder=base_folder, + nslcmop_id=nslcmop_id, + stage=stage, + vca_type=vca_type, + vca_name=vca_name, + ee_config_descriptor=ee_item + ) ) - ) - self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc) - task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format( - member_vnf_index or "", vdu_id or "") + self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc) + task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format( + member_vnf_index or "", vdu_id or "") # Check if this VNFD has a configured terminate action def _has_terminate_config_primitive(self, vnfd): @@ -2752,7 +2852,8 @@ class NsLcm(LcmBase): if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id: return vca["ee_id"] - async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor, vca_index, destroy_ee=True): + async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor, + vca_index, destroy_ee=True, exec_primitives=True): """ Execute the terminate primitives and destroy the execution environment (if destroy_ee=False :param logging_text: @@ -2761,53 +2862,69 @@ class NsLcm(LcmBase): :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu :param vca_index: index in the database _admin.deployed.VCA :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once + :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has + not executed properly :return: None or exception """ + + self.logger.debug( + logging_text + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format( + vca_index, vca_deployed, config_descriptor, destroy_ee + ) + ) + + vca_type = vca_deployed.get("type", "lxc_proxy_charm") + # execute terminate_primitives - terminate_primitives = config_descriptor.get("terminate-config-primitive") - vdu_id = vca_deployed.get("vdu_id") - vdu_count_index = vca_deployed.get("vdu_count_index") - vdu_name = vca_deployed.get("vdu_name") - vnf_index = vca_deployed.get("member-vnf-index") - if terminate_primitives and vca_deployed.get("needed_terminate"): - # Get all 'seq' tags in seq_list, order sequences numerically, ascending. - terminate_primitives = sorted(terminate_primitives, key=lambda x: int(x['seq'])) - for seq in terminate_primitives: - # For each sequence in list, get primitive and call _ns_execute_primitive() - step = "Calling terminate action for vnf_member_index={} primitive={}".format( - vnf_index, seq.get("name")) - self.logger.debug(logging_text + step) - # Create the primitive for each sequence, i.e. "primitive": "touch" - primitive = seq.get('name') - mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index) - # The following 3 parameters are currently set to None for 'terminate': - # vdu_id, vdu_count_index, vdu_name - - # Add sub-operation - self._add_suboperation(db_nslcmop, - vnf_index, - vdu_id, - vdu_count_index, - vdu_name, - primitive, - mapped_primitive_params) - # Sub-operations: Call _ns_execute_primitive() instead of action() - try: - result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive, - mapped_primitive_params) - except LcmException: - # this happens when VCA is not deployed. In this case it is not needed to terminate - continue - result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED'] - if result not in result_ok: - raise LcmException("terminate_primitive {} for vnf_member_index={} fails with " - "error {}".format(seq.get("name"), vnf_index, result_detail)) - # set that this VCA do not need terminated - db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index) - self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}) + if exec_primitives: + terminate_primitives = config_descriptor.get("terminate-config-primitive") + vdu_id = vca_deployed.get("vdu_id") + vdu_count_index = vca_deployed.get("vdu_count_index") + vdu_name = vca_deployed.get("vdu_name") + vnf_index = vca_deployed.get("member-vnf-index") + if terminate_primitives and vca_deployed.get("needed_terminate"): + # Get all 'seq' tags in seq_list, order sequences numerically, ascending. + terminate_primitives = sorted(terminate_primitives, key=lambda x: int(x['seq'])) + for seq in terminate_primitives: + # For each sequence in list, get primitive and call _ns_execute_primitive() + step = "Calling terminate action for vnf_member_index={} primitive={}".format( + vnf_index, seq.get("name")) + self.logger.debug(logging_text + step) + # Create the primitive for each sequence, i.e. "primitive": "touch" + primitive = seq.get('name') + mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index) + # The following 3 parameters are currently set to None for 'terminate': + # vdu_id, vdu_count_index, vdu_name + + # Add sub-operation + self._add_suboperation(db_nslcmop, + vnf_index, + vdu_id, + vdu_count_index, + vdu_name, + primitive, + mapped_primitive_params) + # Sub-operations: Call _ns_execute_primitive() instead of action() + try: + result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive, + mapped_primitive_params, + vca_type=vca_type) + except LcmException: + # this happens when VCA is not deployed. In this case it is not needed to terminate + continue + result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED'] + if result not in result_ok: + raise LcmException("terminate_primitive {} for vnf_member_index={} fails with " + "error {}".format(seq.get("name"), vnf_index, result_detail)) + # set that this VCA do not need terminated + db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index) + self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}) + + if vca_deployed.get("prometheus_jobs") and self.prometheus: + await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"]) if destroy_ee: - await self.n2vc.delete_execution_environment(vca_deployed["ee_id"]) + await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"]) async def _delete_all_N2VC(self, db_nsr: dict): self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING') @@ -3025,42 +3142,53 @@ class NsLcm(LcmBase): # Destroy individual execution environments when there are terminating primitives. # Rest of EE will be deleted at once - if not operation_params.get("skip_terminate_primitives"): - stage[0] = "Stage 2/3 execute terminating primitives." - stage[1] = "Looking execution environment that needs terminate." - self.logger.debug(logging_text + stage[1]) - for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")): - config_descriptor = None - if not vca or not vca.get("ee_id") or not vca.get("needed_terminate"): - continue - if not vca.get("member-vnf-index"): - # ns - config_descriptor = db_nsr.get("ns-configuration") - elif vca.get("vdu_id"): - db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]] - vdud = next((vdu for vdu in db_vnfd.get("vdu", ()) if vdu["id"] == vca.get("vdu_id")), None) - if vdud: - config_descriptor = vdud.get("vdu-configuration") - elif vca.get("kdu_name"): - db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]] - kdud = next((kdu for kdu in db_vnfd.get("kdu", ()) if kdu["name"] == vca.get("kdu_name")), None) - if kdud: - config_descriptor = kdud.get("kdu-configuration") - else: - config_descriptor = db_vnfds_from_member_index[vca["member-vnf-index"]].get("vnf-configuration") - task = asyncio.ensure_future(self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, - vca_index, False)) - tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id")) - - # wait for pending tasks of terminate primitives - if tasks_dict_info: - self.logger.debug(logging_text + 'Waiting for terminate primitive pending tasks...') - error_list = await self._wait_for_tasks(logging_text, tasks_dict_info, - min(self.timeout_charm_delete, timeout_ns_terminate), - stage, nslcmop_id) - if error_list: - return # raise LcmException("; ".join(error_list)) - tasks_dict_info.clear() + # TODO - check before calling _destroy_N2VC + # if not operation_params.get("skip_terminate_primitives"):# + # or not vca.get("needed_terminate"): + stage[0] = "Stage 2/3 execute terminating primitives." + self.logger.debug(logging_text + stage[0]) + stage[1] = "Looking execution environment that needs terminate." + self.logger.debug(logging_text + stage[1]) + # self.logger.debug("nsr_deployed: {}".format(nsr_deployed)) + for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")): + self.logger.debug("vca_index: {}, vca: {}".format(vca_index, vca)) + config_descriptor = None + if not vca or not vca.get("ee_id"): + continue + if not vca.get("member-vnf-index"): + # ns + config_descriptor = db_nsr.get("ns-configuration") + elif vca.get("vdu_id"): + db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]] + vdud = next((vdu for vdu in db_vnfd.get("vdu", ()) if vdu["id"] == vca.get("vdu_id")), None) + if vdud: + config_descriptor = vdud.get("vdu-configuration") + elif vca.get("kdu_name"): + db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]] + kdud = next((kdu for kdu in db_vnfd.get("kdu", ()) if kdu["name"] == vca.get("kdu_name")), None) + if kdud: + config_descriptor = kdud.get("kdu-configuration") + else: + config_descriptor = db_vnfds_from_member_index[vca["member-vnf-index"]].get("vnf-configuration") + vca_type = vca.get("type") + exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and + vca.get("needed_terminate")) + # For helm we must destroy_ee + destroy_ee = "True" if vca_type == "helm" else "False" + task = asyncio.ensure_future( + self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, vca_index, + destroy_ee, exec_terminate_primitives)) + tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id")) + + # wait for pending tasks of terminate primitives + if tasks_dict_info: + self.logger.debug(logging_text + 'Waiting for terminate primitive pending tasks...') + error_list = await self._wait_for_tasks(logging_text, tasks_dict_info, + min(self.timeout_charm_delete, timeout_ns_terminate), + stage, nslcmop_id) + if error_list: + return # raise LcmException("; ".join(error_list)) + tasks_dict_info.clear() # remove All execution environments at once stage[0] = "Stage 3/3 delete all." @@ -3293,27 +3421,32 @@ class NsLcm(LcmBase): # get ee_id ee_id = vca.get("ee_id") + vca_type = vca.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm if not ee_id: raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not " "execution environment" .format(member_vnf_index, vdu_id, kdu_name, vdu_count_index)) - return ee_id + return ee_id, vca_type async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0, - retries_interval=30, timeout=None) -> (str, str): + retries_interval=30, timeout=None, + vca_type=None, db_dict=None) -> (str, str): try: if primitive == "config": primitive_params = {"params": primitive_params} + vca_type = vca_type or "lxc_proxy_charm" + while retries >= 0: try: output = await asyncio.wait_for( - self.n2vc.exec_primitive( + self.vca_map[vca_type].exec_primitive( ee_id=ee_id, primitive_name=primitive, params_dict=primitive_params, progress_timeout=self.timeout_progress_primitive, - total_timeout=self.timeout_primitive), + total_timeout=self.timeout_primitive, + db_dict=db_dict), timeout=timeout or self.timeout_primitive) # execution was OK break @@ -3514,14 +3647,20 @@ class NsLcm(LcmBase): detailed_status = '' nslcmop_operation_state = 'FAILED' else: + ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"], + member_vnf_index=vnf_index, + vdu_id=vdu_id, + vdu_count_index=vdu_count_index) + db_nslcmop_notif = {"collection": "nslcmops", + "filter": {"_id": nslcmop_id}, + "path": "admin.VCA"} nslcmop_operation_state, detailed_status = await self._ns_execute_primitive( - self._look_for_deployed_vca(nsr_deployed["VCA"], - member_vnf_index=vnf_index, - vdu_id=vdu_id, - vdu_count_index=vdu_count_index), + ee_id, primitive=primitive, primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params), - timeout=timeout_ns_action) + timeout=timeout_ns_action, + vca_type=vca_type, + db_dict=db_nslcmop_notif) db_nslcmop_update["detailed-status"] = detailed_status error_description_nslcmop = detailed_status if nslcmop_operation_state == "FAILED" else "" @@ -3783,13 +3922,13 @@ class NsLcm(LcmBase): primitive_params = op.get('primitive_params') self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry". format(vnf_config_primitive)) - # Execute the primitive, either with new (first-time) or registered (retry) args + # Execute the primitive, either with new (first-time) or registered (reintent) args + ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"], + member_vnf_index=vnf_index, + vdu_id=None, + vdu_count_index=None) result, result_detail = await self._ns_execute_primitive( - self._look_for_deployed_vca(nsr_deployed["VCA"], - member_vnf_index=vnf_index, - vdu_id=None, - vdu_count_index=None), - vnf_config_primitive, primitive_params) + ee_id, vnf_config_primitive, primitive_params, vca_type) self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format( vnf_config_primitive, result, result_detail)) # Update operationState = COMPLETED | FAILED @@ -3976,13 +4115,13 @@ class NsLcm(LcmBase): primitive_params = op.get('primitive_params') self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry". format(vnf_config_primitive)) - # Execute the primitive, either with new (first-time) or registered (retry) args + # Execute the primitive, either with new (first-time) or registered (reintent) args + ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"], + member_vnf_index=vnf_index, + vdu_id=None, + vdu_count_index=None) result, result_detail = await self._ns_execute_primitive( - self._look_for_deployed_vca(nsr_deployed["VCA"], - member_vnf_index=vnf_index, - vdu_id=None, - vdu_count_index=None), - vnf_config_primitive, primitive_params) + ee_id, vnf_config_primitive, primitive_params, vca_type) self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format( vnf_config_primitive, result, result_detail)) # Update operationState = COMPLETED | FAILED @@ -4063,3 +4202,35 @@ class NsLcm(LcmBase): self.logger.error(logging_text + "kafka_write notification Exception {}".format(e)) self.logger.debug(logging_text + "Exit") self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale") + + async def add_prometheus_metrics(self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip): + if not self.prometheus: + return + # look if exist a file called 'prometheus*.j2' and + artifact_content = self.fs.dir_ls(artifact_path) + job_file = next((f for f in artifact_content if f.startswith("prometheus") and f.endswith(".j2")), None) + if not job_file: + return + with self.fs.file_open((artifact_path, job_file), "r") as f: + job_data = f.read() + + # TODO get_service + _, _, service = ee_id.partition(".") # remove prefix "namespace." + host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"]) + host_port = "80" + vnfr_id = vnfr_id.replace("-", "") + variables = { + "JOB_NAME": vnfr_id, + "TARGET_IP": target_ip, + "EXPORTER_POD_IP": host_name, + "EXPORTER_POD_PORT": host_port, + } + job_list = self.prometheus.parse_job(job_data, variables) + # ensure job_name is using the vnfr_id. Adding the metadata nsr_id + for job in job_list: + if not isinstance(job.get("job_name"), str) or vnfr_id not in job["job_name"]: + job["job_name"] = vnfr_id + "_" + str(randint(1, 10000)) + job["nsr_id"] = nsr_id + job_dict = {jl["job_name"]: jl for jl in job_list} + if await self.prometheus.update(job_dict): + return list(job_dict.keys())