fixing prometheus metric exporter issues
[osm/LCM.git] / osm_lcm / ns.py
index 5141f6d..62f010f 100644 (file)
@@ -43,6 +43,7 @@ from http import HTTPStatus
 from time import time
 from uuid import uuid4
 from functools import partial
+from random import randint
 
 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
 
@@ -51,7 +52,8 @@ class N2VCJujuConnectorLCM(N2VCJujuConnector):
 
     async def create_execution_environment(self, namespace: str, db_dict: dict, reuse_ee_id: str = None,
                                            progress_timeout: float = None, total_timeout: float = None,
-                                           artifact_path: str = None, vca_type: str = None) -> (str, dict):
+                                           config: dict = None, artifact_path: str = None,
+                                           vca_type: str = None) -> (str, dict):
         # admit two new parameters, artifact_path and vca_type
         if vca_type == "k8s_proxy_charm":
             ee_id = await self.n2vc.install_k8s_proxy_charm(
@@ -88,7 +90,7 @@ class NsLcm(LcmBase):
     SUBOPERATION_STATUS_SKIP = -3
     task_name_deploy_vca = "Deploying VCA"
 
-    def __init__(self, db, msg, fs, lcm_tasks, config, loop):
+    def __init__(self, db, msg, fs, lcm_tasks, config, loop, prometheus=None):
         """
         Init, Connect to database, filesystem storage, and messaging
         :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
@@ -163,6 +165,8 @@ class NsLcm(LcmBase):
             "helm": self.conn_helm_ee
         }
 
+        self.prometheus = prometheus
+
         # create RO client
         if self.ng_ro:
             self.RO = NgRoClient(self.loop, **self.ro_config)
@@ -1369,11 +1373,13 @@ class NsLcm(LcmBase):
         raise LcmException("Configuration aborted because dependent charm/s timeout")
 
     async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
-                               config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name):
+                               config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name,
+                               ee_config_descriptor):
         nsr_id = db_nsr["_id"]
         db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
         vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
         vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
+        osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
         db_dict = {
             'collection': 'nsrs',
             'filter': {'_id': nsr_id},
@@ -1388,6 +1394,7 @@ class NsLcm(LcmBase):
             vnfr_id = None
             if db_vnfr:
                 vnfr_id = db_vnfr["_id"]
+                osm_config["osm"]["vnf_id"] = vnfr_id
 
             namespace = "{nsi}.{ns}".format(
                 nsi=nsi_id if nsi_id else "",
@@ -1401,10 +1408,12 @@ class NsLcm(LcmBase):
                     namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
                     element_type = 'VDU'
                     element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
+                    osm_config["osm"]["vdu_id"] = vdu_id
                 elif kdu_name:
                     namespace += ".{}".format(kdu_name)
                     element_type = 'KDU'
                     element_under_configuration = kdu_name
+                    osm_config["osm"]["kdu_name"] = kdu_name
 
             # Get artifact path
             artifact_path = "{}/{}/{}/{}".format(
@@ -1436,6 +1445,7 @@ class NsLcm(LcmBase):
                     namespace=namespace,
                     reuse_ee_id=ee_id,
                     db_dict=db_dict,
+                    config=osm_config,
                     artifact_path=artifact_path,
                     vca_type=vca_type)
 
@@ -1637,6 +1647,19 @@ class NsLcm(LcmBase):
 
                 # TODO register in database that primitive is done
 
+            # STEP 7 Configure metrics
+            if vca_type == "helm":
+                prometheus_jobs = await self.add_prometheus_metrics(
+                    ee_id=ee_id,
+                    artifact_path=artifact_path,
+                    ee_config_descriptor=ee_config_descriptor,
+                    vnfr_id=vnfr_id,
+                    nsr_id=nsr_id,
+                    target_ip=rw_mgmt_ip,
+                )
+                if prometheus_jobs:
+                    self.update_db_2("nsrs", nsr_id, {db_update_entry + "prometheus_jobs": prometheus_jobs})
+
             step = "instantiated at VCA"
             self.logger.debug(logging_text + step)
 
@@ -2596,7 +2619,8 @@ class NsLcm(LcmBase):
                     nslcmop_id=nslcmop_id,
                     stage=stage,
                     vca_type=vca_type,
-                    vca_name=vca_name
+                    vca_name=vca_name,
+                    ee_config_descriptor=ee_item
                 )
             )
             self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
@@ -2896,6 +2920,9 @@ class NsLcm(LcmBase):
                 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
                 self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
 
+        if vca_deployed.get("prometheus_jobs") and self.prometheus:
+            await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
+
         if destroy_ee:
             await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"])
 
@@ -3122,7 +3149,7 @@ class NsLcm(LcmBase):
             self.logger.debug(logging_text + stage[0])
             stage[1] = "Looking execution environment that needs terminate."
             self.logger.debug(logging_text + stage[1])
-            self.logger.debug("nsr_deployed: {}".format(nsr_deployed))
+            self.logger.debug("nsr_deployed: {}".format(nsr_deployed))
             for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
                 self.logger.debug("vca_index: {}, vca: {}".format(vca_index, vca))
                 config_descriptor = None
@@ -3143,17 +3170,14 @@ class NsLcm(LcmBase):
                         config_descriptor = kdud.get("kdu-configuration")
                 else:
                     config_descriptor = db_vnfds_from_member_index[vca["member-vnf-index"]].get("vnf-configuration")
-                # For helm we must destroy_ee
                 vca_type = vca.get("type")
                 exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
                                              vca.get("needed_terminate"))
-                self.logger.debug("vca type: {}".format(vca_type))
-                if not vca_type == "helm":
-                    task = asyncio.ensure_future(self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor,
-                                                                   vca_index, False, exec_terminate_primitives))
-                else:
-                    task = asyncio.ensure_future(self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor,
-                                                                   vca_index, True, exec_terminate_primitives))
+                # For helm we must destroy_ee
+                destroy_ee = "True" if vca_type == "helm" else "False"
+                task = asyncio.ensure_future(
+                    self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, vca_index,
+                                      destroy_ee, exec_terminate_primitives))
                 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
 
             # wait for pending tasks of terminate primitives
@@ -4178,3 +4202,35 @@ class NsLcm(LcmBase):
                     self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
             self.logger.debug(logging_text + "Exit")
             self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
+
+    async def add_prometheus_metrics(self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip):
+        if not self.prometheus:
+            return
+        # look if exist a file called 'prometheus*.j2' and
+        artifact_content = self.fs.dir_ls(artifact_path)
+        job_file = next((f for f in artifact_content if f.startswith("prometheus") and f.endswith(".j2")), None)
+        if not job_file:
+            return
+        with self.fs.file_open((artifact_path, job_file), "r") as f:
+            job_data = f.read()
+
+        # TODO get_service
+        _, _, service = ee_id.partition(".")   # remove prefix   "namespace."
+        host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
+        host_port = "80"
+        vnfr_id = vnfr_id.replace("-", "")
+        variables = {
+            "JOB_NAME": vnfr_id,
+            "TARGET_IP": target_ip,
+            "EXPORTER_POD_IP": host_name,
+            "EXPORTER_POD_PORT": host_port,
+        }
+        job_list = self.prometheus.parse_job(job_data, variables)
+        # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
+        for job in job_list:
+            if not isinstance(job.get("job_name"), str) or vnfr_id not in job["job_name"]:
+                job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
+            job["nsr_id"] = nsr_id
+        job_dict = {jl["job_name"]: jl for jl in job_list}
+        if await self.prometheus.update(job_dict):
+            return list(job_dict.keys())