X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Fns.py;h=62f010f1b25800b297c673eeb10205b881b9e0d7;hb=89f8290b70918e151e6b6653c635ea6a05a22522;hp=5141f6d019cf9f900136144c9231f9593a563dcf;hpb=a43e672b0461451813b5366f05d710072413e23e;p=osm%2FLCM.git diff --git a/osm_lcm/ns.py b/osm_lcm/ns.py index 5141f6d..62f010f 100644 --- a/osm_lcm/ns.py +++ b/osm_lcm/ns.py @@ -43,6 +43,7 @@ from http import HTTPStatus from time import time from uuid import uuid4 from functools import partial +from random import randint __author__ = "Alfonso Tierno " @@ -51,7 +52,8 @@ class N2VCJujuConnectorLCM(N2VCJujuConnector): async def create_execution_environment(self, namespace: str, db_dict: dict, reuse_ee_id: str = None, progress_timeout: float = None, total_timeout: float = None, - artifact_path: str = None, vca_type: str = None) -> (str, dict): + config: dict = None, artifact_path: str = None, + vca_type: str = None) -> (str, dict): # admit two new parameters, artifact_path and vca_type if vca_type == "k8s_proxy_charm": ee_id = await self.n2vc.install_k8s_proxy_charm( @@ -88,7 +90,7 @@ class NsLcm(LcmBase): SUBOPERATION_STATUS_SKIP = -3 task_name_deploy_vca = "Deploying VCA" - def __init__(self, db, msg, fs, lcm_tasks, config, loop): + def __init__(self, db, msg, fs, lcm_tasks, config, loop, prometheus=None): """ Init, Connect to database, filesystem storage, and messaging :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', @@ -163,6 +165,8 @@ class NsLcm(LcmBase): "helm": self.conn_helm_ee } + self.prometheus = prometheus + # create RO client if self.ng_ro: self.RO = NgRoClient(self.loop, **self.ro_config) @@ -1369,11 +1373,13 @@ class NsLcm(LcmBase): raise LcmException("Configuration aborted because dependent charm/s timeout") async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index, - config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name): + config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name, + ee_config_descriptor): nsr_id = db_nsr["_id"] db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index) vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"] vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index] + osm_config = {"osm": {"ns_id": db_nsr["_id"]}} db_dict = { 'collection': 'nsrs', 'filter': {'_id': nsr_id}, @@ -1388,6 +1394,7 @@ class NsLcm(LcmBase): vnfr_id = None if db_vnfr: vnfr_id = db_vnfr["_id"] + osm_config["osm"]["vnf_id"] = vnfr_id namespace = "{nsi}.{ns}".format( nsi=nsi_id if nsi_id else "", @@ -1401,10 +1408,12 @@ class NsLcm(LcmBase): namespace += ".{}-{}".format(vdu_id, vdu_index or 0) element_type = 'VDU' element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0) + osm_config["osm"]["vdu_id"] = vdu_id elif kdu_name: namespace += ".{}".format(kdu_name) element_type = 'KDU' element_under_configuration = kdu_name + osm_config["osm"]["kdu_name"] = kdu_name # Get artifact path artifact_path = "{}/{}/{}/{}".format( @@ -1436,6 +1445,7 @@ class NsLcm(LcmBase): namespace=namespace, reuse_ee_id=ee_id, db_dict=db_dict, + config=osm_config, artifact_path=artifact_path, vca_type=vca_type) @@ -1637,6 +1647,19 @@ class NsLcm(LcmBase): # TODO register in database that primitive is done + # STEP 7 Configure metrics + if vca_type == "helm": + prometheus_jobs = await self.add_prometheus_metrics( + ee_id=ee_id, + artifact_path=artifact_path, + ee_config_descriptor=ee_config_descriptor, + vnfr_id=vnfr_id, + nsr_id=nsr_id, + target_ip=rw_mgmt_ip, + ) + if prometheus_jobs: + self.update_db_2("nsrs", nsr_id, {db_update_entry + "prometheus_jobs": prometheus_jobs}) + step = "instantiated at VCA" self.logger.debug(logging_text + step) @@ -2596,7 +2619,8 @@ class NsLcm(LcmBase): nslcmop_id=nslcmop_id, stage=stage, vca_type=vca_type, - vca_name=vca_name + vca_name=vca_name, + ee_config_descriptor=ee_item ) ) self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc) @@ -2896,6 +2920,9 @@ class NsLcm(LcmBase): db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index) self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}) + if vca_deployed.get("prometheus_jobs") and self.prometheus: + await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"]) + if destroy_ee: await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"]) @@ -3122,7 +3149,7 @@ class NsLcm(LcmBase): self.logger.debug(logging_text + stage[0]) stage[1] = "Looking execution environment that needs terminate." self.logger.debug(logging_text + stage[1]) - self.logger.debug("nsr_deployed: {}".format(nsr_deployed)) + # self.logger.debug("nsr_deployed: {}".format(nsr_deployed)) for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")): self.logger.debug("vca_index: {}, vca: {}".format(vca_index, vca)) config_descriptor = None @@ -3143,17 +3170,14 @@ class NsLcm(LcmBase): config_descriptor = kdud.get("kdu-configuration") else: config_descriptor = db_vnfds_from_member_index[vca["member-vnf-index"]].get("vnf-configuration") - # For helm we must destroy_ee vca_type = vca.get("type") exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and vca.get("needed_terminate")) - self.logger.debug("vca type: {}".format(vca_type)) - if not vca_type == "helm": - task = asyncio.ensure_future(self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, - vca_index, False, exec_terminate_primitives)) - else: - task = asyncio.ensure_future(self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, - vca_index, True, exec_terminate_primitives)) + # For helm we must destroy_ee + destroy_ee = "True" if vca_type == "helm" else "False" + task = asyncio.ensure_future( + self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, vca_index, + destroy_ee, exec_terminate_primitives)) tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id")) # wait for pending tasks of terminate primitives @@ -4178,3 +4202,35 @@ class NsLcm(LcmBase): self.logger.error(logging_text + "kafka_write notification Exception {}".format(e)) self.logger.debug(logging_text + "Exit") self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale") + + async def add_prometheus_metrics(self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip): + if not self.prometheus: + return + # look if exist a file called 'prometheus*.j2' and + artifact_content = self.fs.dir_ls(artifact_path) + job_file = next((f for f in artifact_content if f.startswith("prometheus") and f.endswith(".j2")), None) + if not job_file: + return + with self.fs.file_open((artifact_path, job_file), "r") as f: + job_data = f.read() + + # TODO get_service + _, _, service = ee_id.partition(".") # remove prefix "namespace." + host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"]) + host_port = "80" + vnfr_id = vnfr_id.replace("-", "") + variables = { + "JOB_NAME": vnfr_id, + "TARGET_IP": target_ip, + "EXPORTER_POD_IP": host_name, + "EXPORTER_POD_PORT": host_port, + } + job_list = self.prometheus.parse_job(job_data, variables) + # ensure job_name is using the vnfr_id. Adding the metadata nsr_id + for job in job_list: + if not isinstance(job.get("job_name"), str) or vnfr_id not in job["job_name"]: + job["job_name"] = vnfr_id + "_" + str(randint(1, 10000)) + job["nsr_id"] = nsr_id + job_dict = {jl["job_name"]: jl for jl in job_list} + if await self.prometheus.update(job_dict): + return list(job_dict.keys())