fixing prometheus metric exporter issues 12/9312/4
authortierno <alfonso.tiernosepulveda@telefonica.com>
Fri, 3 Jul 2020 14:52:28 +0000 (14:52 +0000)
committertierno <alfonso.tiernosepulveda@telefonica.com>
Thu, 9 Jul 2020 16:03:55 +0000 (16:03 +0000)
Use the same instance of ns at netslice. Avoid two N2VC connections

Change-Id: I346c08c111e5ffc7dbc1768851dc069d2cda10d1
Signed-off-by: tierno <alfonso.tiernosepulveda@telefonica.com>
osm_lcm/lcm.cfg
osm_lcm/lcm.py
osm_lcm/lcm_helm_conn.py
osm_lcm/netslice.py
osm_lcm/ns.py
osm_lcm/prometheus.py
osm_lcm/tests/test_prometheus.py [new file with mode: 0644]
tox.ini

index 8f141f1..4aadf22 100644 (file)
@@ -84,3 +84,11 @@ message:
     # loglevel: DEBUG
     # logfile:  /var/log/osm/lcm-message.log
     group_id: lcm-server
+
+prometheus:
+    driver:   prometheus
+    # local file to store the configuration
+    path:     /etc/prometheus
+    uri:     http://prometheus:9090/
+    # loglevel: DEBUG
+    # logfile:  /var/log/osm/lcm-message.log
index 5d27277..b914604 100644 (file)
@@ -29,9 +29,7 @@ import logging.handlers
 import getopt
 import sys
 
-from osm_lcm import ns
-from osm_lcm import vim_sdn
-from osm_lcm import netslice
+from osm_lcm import ns, prometheus, vim_sdn, netslice
 from osm_lcm.ng_ro import NgRoException, NgRoClient
 from osm_lcm.ROclient import ROClient, ROClientException
 
@@ -192,8 +190,13 @@ class Lcm:
         # contains created tasks/futures to be able to cancel
         self.lcm_tasks = TaskRegistry(self.worker_id, self.db, self.logger)
 
-        self.ns = ns.NsLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
-        self.netslice = netslice.NetsliceLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
+        if self.config.get("prometheus"):
+            self.prometheus = prometheus.Prometheus(self.config["prometheus"], self.worker_id, self.db, self.loop)
+        else:
+            self.prometheus = None
+        self.ns = ns.NsLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop, self.prometheus)
+        self.netslice = netslice.NetsliceLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop,
+                                             self.ns)
         self.vim = vim_sdn.VimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
         self.wim = vim_sdn.WimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
         self.sdn = vim_sdn.SdnLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
@@ -506,6 +509,10 @@ class Lcm:
         # check RO version
         self.loop.run_until_complete(self.check_RO_version())
 
+        # configure Prometheus
+        if self.prometheus:
+            self.loop.run_until_complete(self.prometheus.start())
+
         self.loop.run_until_complete(asyncio.gather(
             self.kafka_read(),
             self.kafka_ping()
index 146da9d..c18405b 100644 (file)
@@ -123,6 +123,7 @@ class LCMHelmConn(N2VCConnector):
                                            reuse_ee_id: str = None,
                                            progress_timeout: float = None,
                                            total_timeout: float = None,
+                                           config: dict = None,
                                            artifact_path: str = None,
                                            vca_type: str = None) -> (str, dict):
         """
@@ -137,8 +138,9 @@ class LCMHelmConn(N2VCConnector):
         :param str reuse_ee_id: ee id from an older execution. TODO - right now this params is not used
         :param float progress_timeout:
         :param float total_timeout:
-        :param str artifact_path  path of package content
-        :param str vca_type  Type of vca, not used as assumed of type helm
+        :param dict config:  General variables to instantiate KDU
+        :param str artifact_path:  path of package content
+        :param str vca_type:  Type of vca, not used as assumed of type helm
         :returns str, dict: id of the new execution environment including namespace.helm_id
         and credentials object set to None as all credentials should be osm kubernetes .kubeconfig
         """
@@ -177,10 +179,16 @@ class LCMHelmConn(N2VCConnector):
             # Call helm conn install
             # Obtain system cluster id from database
             system_cluster_uuid = self._get_system_cluster_id()
+            # Add parameter osm if exist to global
+            if config and config.get("osm"):
+                if not config.get("global"):
+                    config["global"] = {}
+                config["global"]["osm"] = config.get("osm")
 
             self.log.debug("install helm chart: {}".format(full_path))
             helm_id = await self._k8sclusterhelm.install(system_cluster_uuid, kdu_model=full_path,
                                                          namespace=self._KUBECTL_OSM_NAMESPACE,
+                                                         params=config,
                                                          db_dict=db_dict,
                                                          timeout=progress_timeout)
 
index 4d731af..6b0a6ca 100644 (file)
@@ -17,7 +17,7 @@ import asyncio
 import logging
 import logging.handlers
 import traceback
-from osm_lcm import ROclient, ns
+from osm_lcm import ROclient
 from osm_lcm.lcm_utils import LcmException, LcmBase, populate_dict, get_iterable, deep_get
 from osm_common.dbbase import DbException
 from time import time
@@ -31,7 +31,7 @@ class NetsliceLcm(LcmBase):
 
     timeout_nsi_deploy = 2 * 3600  # default global timeout for deployment a nsi
 
-    def __init__(self, db, msg, fs, lcm_tasks, config, loop):
+    def __init__(self, db, msg, fs, lcm_tasks, config, loop, ns):
         """
         Init, Connect to database, filesystem storage, and messaging
         :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
@@ -41,7 +41,7 @@ class NetsliceLcm(LcmBase):
         self.logger = logging.getLogger('lcm.netslice')
         self.loop = loop
         self.lcm_tasks = lcm_tasks
-        self.ns = ns.NsLcm(db, msg, fs, lcm_tasks, config, loop)
+        self.ns = ns
         self.ro_config = config["ro_config"]
         self.timeout = config["timeout"]
 
index 5141f6d..62f010f 100644 (file)
@@ -43,6 +43,7 @@ from http import HTTPStatus
 from time import time
 from uuid import uuid4
 from functools import partial
+from random import randint
 
 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
 
@@ -51,7 +52,8 @@ class N2VCJujuConnectorLCM(N2VCJujuConnector):
 
     async def create_execution_environment(self, namespace: str, db_dict: dict, reuse_ee_id: str = None,
                                            progress_timeout: float = None, total_timeout: float = None,
-                                           artifact_path: str = None, vca_type: str = None) -> (str, dict):
+                                           config: dict = None, artifact_path: str = None,
+                                           vca_type: str = None) -> (str, dict):
         # admit two new parameters, artifact_path and vca_type
         if vca_type == "k8s_proxy_charm":
             ee_id = await self.n2vc.install_k8s_proxy_charm(
@@ -88,7 +90,7 @@ class NsLcm(LcmBase):
     SUBOPERATION_STATUS_SKIP = -3
     task_name_deploy_vca = "Deploying VCA"
 
-    def __init__(self, db, msg, fs, lcm_tasks, config, loop):
+    def __init__(self, db, msg, fs, lcm_tasks, config, loop, prometheus=None):
         """
         Init, Connect to database, filesystem storage, and messaging
         :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
@@ -163,6 +165,8 @@ class NsLcm(LcmBase):
             "helm": self.conn_helm_ee
         }
 
+        self.prometheus = prometheus
+
         # create RO client
         if self.ng_ro:
             self.RO = NgRoClient(self.loop, **self.ro_config)
@@ -1369,11 +1373,13 @@ class NsLcm(LcmBase):
         raise LcmException("Configuration aborted because dependent charm/s timeout")
 
     async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
-                               config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name):
+                               config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name,
+                               ee_config_descriptor):
         nsr_id = db_nsr["_id"]
         db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
         vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
         vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
+        osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
         db_dict = {
             'collection': 'nsrs',
             'filter': {'_id': nsr_id},
@@ -1388,6 +1394,7 @@ class NsLcm(LcmBase):
             vnfr_id = None
             if db_vnfr:
                 vnfr_id = db_vnfr["_id"]
+                osm_config["osm"]["vnf_id"] = vnfr_id
 
             namespace = "{nsi}.{ns}".format(
                 nsi=nsi_id if nsi_id else "",
@@ -1401,10 +1408,12 @@ class NsLcm(LcmBase):
                     namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
                     element_type = 'VDU'
                     element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
+                    osm_config["osm"]["vdu_id"] = vdu_id
                 elif kdu_name:
                     namespace += ".{}".format(kdu_name)
                     element_type = 'KDU'
                     element_under_configuration = kdu_name
+                    osm_config["osm"]["kdu_name"] = kdu_name
 
             # Get artifact path
             artifact_path = "{}/{}/{}/{}".format(
@@ -1436,6 +1445,7 @@ class NsLcm(LcmBase):
                     namespace=namespace,
                     reuse_ee_id=ee_id,
                     db_dict=db_dict,
+                    config=osm_config,
                     artifact_path=artifact_path,
                     vca_type=vca_type)
 
@@ -1637,6 +1647,19 @@ class NsLcm(LcmBase):
 
                 # TODO register in database that primitive is done
 
+            # STEP 7 Configure metrics
+            if vca_type == "helm":
+                prometheus_jobs = await self.add_prometheus_metrics(
+                    ee_id=ee_id,
+                    artifact_path=artifact_path,
+                    ee_config_descriptor=ee_config_descriptor,
+                    vnfr_id=vnfr_id,
+                    nsr_id=nsr_id,
+                    target_ip=rw_mgmt_ip,
+                )
+                if prometheus_jobs:
+                    self.update_db_2("nsrs", nsr_id, {db_update_entry + "prometheus_jobs": prometheus_jobs})
+
             step = "instantiated at VCA"
             self.logger.debug(logging_text + step)
 
@@ -2596,7 +2619,8 @@ class NsLcm(LcmBase):
                     nslcmop_id=nslcmop_id,
                     stage=stage,
                     vca_type=vca_type,
-                    vca_name=vca_name
+                    vca_name=vca_name,
+                    ee_config_descriptor=ee_item
                 )
             )
             self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
@@ -2896,6 +2920,9 @@ class NsLcm(LcmBase):
                 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
                 self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
 
+        if vca_deployed.get("prometheus_jobs") and self.prometheus:
+            await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
+
         if destroy_ee:
             await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"])
 
@@ -3122,7 +3149,7 @@ class NsLcm(LcmBase):
             self.logger.debug(logging_text + stage[0])
             stage[1] = "Looking execution environment that needs terminate."
             self.logger.debug(logging_text + stage[1])
-            self.logger.debug("nsr_deployed: {}".format(nsr_deployed))
+            self.logger.debug("nsr_deployed: {}".format(nsr_deployed))
             for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
                 self.logger.debug("vca_index: {}, vca: {}".format(vca_index, vca))
                 config_descriptor = None
@@ -3143,17 +3170,14 @@ class NsLcm(LcmBase):
                         config_descriptor = kdud.get("kdu-configuration")
                 else:
                     config_descriptor = db_vnfds_from_member_index[vca["member-vnf-index"]].get("vnf-configuration")
-                # For helm we must destroy_ee
                 vca_type = vca.get("type")
                 exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
                                              vca.get("needed_terminate"))
-                self.logger.debug("vca type: {}".format(vca_type))
-                if not vca_type == "helm":
-                    task = asyncio.ensure_future(self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor,
-                                                                   vca_index, False, exec_terminate_primitives))
-                else:
-                    task = asyncio.ensure_future(self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor,
-                                                                   vca_index, True, exec_terminate_primitives))
+                # For helm we must destroy_ee
+                destroy_ee = "True" if vca_type == "helm" else "False"
+                task = asyncio.ensure_future(
+                    self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, vca_index,
+                                      destroy_ee, exec_terminate_primitives))
                 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
 
             # wait for pending tasks of terminate primitives
@@ -4178,3 +4202,35 @@ class NsLcm(LcmBase):
                     self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
             self.logger.debug(logging_text + "Exit")
             self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
+
+    async def add_prometheus_metrics(self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip):
+        if not self.prometheus:
+            return
+        # look if exist a file called 'prometheus*.j2' and
+        artifact_content = self.fs.dir_ls(artifact_path)
+        job_file = next((f for f in artifact_content if f.startswith("prometheus") and f.endswith(".j2")), None)
+        if not job_file:
+            return
+        with self.fs.file_open((artifact_path, job_file), "r") as f:
+            job_data = f.read()
+
+        # TODO get_service
+        _, _, service = ee_id.partition(".")   # remove prefix   "namespace."
+        host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
+        host_port = "80"
+        vnfr_id = vnfr_id.replace("-", "")
+        variables = {
+            "JOB_NAME": vnfr_id,
+            "TARGET_IP": target_ip,
+            "EXPORTER_POD_IP": host_name,
+            "EXPORTER_POD_PORT": host_port,
+        }
+        job_list = self.prometheus.parse_job(job_data, variables)
+        # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
+        for job in job_list:
+            if not isinstance(job.get("job_name"), str) or vnfr_id not in job["job_name"]:
+                job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
+            job["nsr_id"] = nsr_id
+        job_dict = {jl["job_name"]: jl for jl in job_list}
+        if await self.prometheus.update(job_dict):
+            return list(job_dict.keys())
index 397764f..c1f49b0 100644 (file)
@@ -24,7 +24,7 @@ import yaml
 import os
 from osm_lcm.lcm_utils import LcmException
 from osm_common.dbbase import DbException
-
+from jinja2 import Template, TemplateError, TemplateNotFound, TemplateSyntaxError
 
 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
 
@@ -37,14 +37,16 @@ initial_prometheus_data = {
         "created": 1593445184,
         "version": "1.0"    # to allow future version updates
     },
-    'scrape_configs': [{'static_configs': [{'targets': ['mon:8000']}], 'job_name': 'mon_exporter'}],
+    'scrape_configs': {   # Dictionary at database. Converted to list before sending to prometheus
+        'mon_exporter': {'static_configs': [{'targets': ['mon:8000']}], 'job_name': 'mon_exporter'},
+    },
     'global': {'evaluation_interval': '15s', 'scrape_interval': '15s'},
     'rule_files': None,
     'alerting': {'alertmanagers': [{'static_configs': [{'targets': None}]}]}
 }
 
 
-class Prometheus():
+class Prometheus:
     """
     Implements a class to update Prometheus
     """
@@ -55,7 +57,7 @@ class Prometheus():
         self.worker_id = worker_id
         self.db = db
         self.loop = loop
-        self.logger = logger or logging.get_legger("lcm.prometheus")
+        self.logger = logger or logging.getLogger("lcm.prometheus")
         self.server = config["uri"]
         self.path = config["path"]
         if not self.path.endswith("/"):
@@ -63,27 +65,47 @@ class Prometheus():
         self.cfg_file = self.path + "prometheus.yml"
         self.cfg_file_backup = self.path + "prometheus.yml-backup"
 
+    @staticmethod
+    def parse_job(job_data: str, variables: dict) -> dict:
+        try:
+            template = Template(job_data)
+            job_parsed = template.render(variables or {})
+            return yaml.safe_load(job_parsed)
+        except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
+            # TODO yaml exceptions
+            raise LcmException("Error parsing Jinja2 to prometheus job. job_data={}, variables={}. Error={}".format(
+                job_data, variables, e))
+
     async def start(self):
         for retry in range(4):
             try:
+                # self.logger("Starting prometheus ")
                 # read from database
-                prometheus_data = self.db.get_one("admin", {"_id": "prometheus"}, fail_on_empty=True)
+                prometheus_data = self.db.get_one("admin", {"_id": "prometheus"}, fail_on_empty=False)
                 if not prometheus_data:
                     self.logger.info("Init db.admin.prometheus content")
                     self.db.create("admin", initial_prometheus_data)
                 # send database config file to prometheus. Ignore loading errors, as prometheus may be starting
                 # but at least an initial configuration file is set
                 await self.update()
+                return
             except DbException as e:
                 if retry == 3:
                     raise LcmException("Max retries trying to init prometheus configuration: {}".format(e))
                 await asyncio.sleep(5, loop=self.loop)
 
-    async def update(self, add_jobs=None, remove_jobs=None):
+    async def update(self, add_jobs: dict = None, remove_jobs: list = None) -> bool:
+        """
+
+        :param add_jobs: dictionary with {job_id_1: job_content, job_id_2: job_content}
+        :param remove_jobs: list with jobs to remove [job_id_1, job_id_2]
+        :return: result. If false prometheus denies this configuration. Exception on error
+        """
         for retry in range(4):
             result = True
             if retry:  # first time do not wait
                 await asyncio.sleep(self.PROMETHEUS_LOCKED_TIME / 2, loop=self.loop)
+
             # lock database
             now = time()
             if not self.db.set_one(
@@ -94,60 +116,99 @@ class Prometheus():
                 continue
             # read database
             prometheus_data = self.db.get_one("admin", {"_id": "prometheus"})
+            update_dict = {"_admin.locked_at": 0,
+                           "_admin.locked_by": None}
 
             # Make changes from prometheus_incremental
-            push_list = pull_list = None
+            push_dict = pull_dict = None
             if add_jobs or remove_jobs:
-                update_dict = {"_admin.locked_at": 0,
-                               "_admin.locked_by": None,
-                               "_admin.modified_at": now}
+                log_text_list = []
                 if add_jobs:
-                    push_list = {"scrape_configs.static_configs": add_jobs}
-                    prometheus_data["scrape_configs"]["static_configs"] += add_jobs
+                    log_text_list.append("adding jobs: {}".format(list(add_jobs.keys())))
+                    prometheus_data["scrape_configs"].update(add_jobs)
+                    push_dict = {"scrape_configs." + job_id: job_data for job_id, job_data in add_jobs.items()}
                 elif remove_jobs:
-                    pass    # TODO
-                if not self.send_data(prometheus_data):
-                    push_list = pull_list = None
-                    result = False
+                    log_text_list.append("removing jobs: {}".format(list(remove_jobs)))
+                    for job_id in remove_jobs:
+                        prometheus_data["scrape_configs"].pop(job_id, None)
+                    pull_dict = {"scrape_configs." + job_id: None for job_id in remove_jobs}
+                self.logger.debug(". ".join(log_text_list))
+
+            if not await self.send_data(prometheus_data):
+                push_dict = pull_dict = None
+                result = False
 
             # unblock database
+            if push_dict:
+                update_dict.update(push_dict)
+            if push_dict or pull_dict:
+                update_dict["_admin.modified_at"] = now
             if not self.db.set_one(
                     "admin", {"_id": "prometheus", "_admin.locked_at": now, "_admin.locked_by": self.worker_id},
-                    update_dict=update_dict, pull_list=pull_list, push_list=push_list, fail_on_empty=False):
+                    update_dict=update_dict, unset=pull_dict, fail_on_empty=False):
                 continue
             return result
         raise LcmException("Cannot update prometheus database. Reached max retries")
 
     async def send_data(self, new_config):
         restore_backup = False
+        del new_config["_id"]
+        del new_config["_admin"]
+        new_scrape_configs = []
+        
+        # generate a list with the values of scrape_configs
+        for scrape_config in new_config["scrape_configs"].values():
+            scrape_config = scrape_config.copy()
+            # remove nsr_id metadata from scrape_configs
+            scrape_config.pop("nsr_id", None)
+            new_scrape_configs.append(scrape_config)
+        new_config["scrape_configs"] = new_scrape_configs
+
         try:
             if os.path.exists(self.cfg_file):
                 os.rename(self.cfg_file, self.cfg_file_backup)
                 restore_backup = True
             with open(self.cfg_file, "w+") as f:
-                yaml.dump(new_config, f)
+                yaml.safe_dump(new_config, f, indent=4, default_flow_style=False)
+            # self.logger.debug("new configuration: {}".format(yaml.safe_dump(new_config, indent=4,
+            #                                                                 default_flow_style=False)))
             async with aiohttp.ClientSession() as session:
-                async with session.post(self.server + "/-/reload") as resp:
+                async with session.post(self.server + "-/reload") as resp:
                     if resp.status > 204:
-                        raise LcmException(resp.text)
+                        raise LcmException(await resp.text())
                 await asyncio.sleep(5, loop=self.loop)
-                async with session.get(self.server + "/api/v1/status/config") as resp:
+                # If prometheus does not admit this configuration, remains with the old one
+                # Then, to check if the configuration has been accepted, get the configuration from prometheus
+                # and compares with the inserted one
+                async with session.get(self.server + "api/v1/status/config") as resp:
                     if resp.status > 204:
-                        raise LcmException(resp.text)
-                    current_config = resp.json()
+                        raise LcmException(await resp.text())
+                    current_config = await resp.json()
                     if not self._check_configuration_equal(current_config, new_config):
                         return False
                     else:
                         restore_backup = False
             return True
         except Exception as e:
-            self.logger.error("Error updating prometheus configuration {}".format(e))
+            self.logger.error("Error updating prometheus configuration url={}: {}".format(self.server, e))
             return False
         finally:
             if restore_backup:
-                os.rename(self.cfg_file_backup, self.cfg_file)
+                try:
+                    os.rename(self.cfg_file_backup, self.cfg_file)
+                except Exception as e:
+                    self.logger.critical("Exception while rolling back: {}".format(e))
 
-    @staticmethod
-    def _check_configuration_equal(current_config, new_config):
-        # TODO compare and return True if equal
-        return True
+    def _check_configuration_equal(self, current_config, expected_config):
+        try:
+            # self.logger.debug("Comparing current_config='{}' with expected_config='{}'".format(current_config,
+            #                                                                                    expected_config))
+            current_config_yaml = yaml.safe_load(current_config['data']['yaml'])
+            current_jobs = [j["job_name"] for j in current_config_yaml["scrape_configs"]]
+            expected_jobs = [j["job_name"] for j in expected_config["scrape_configs"]]
+            return current_jobs == expected_jobs
+        except Exception as e:
+            self.logger.error("Invalid obtained prometheus status. Error: '{}'. Obtained data: '{}'".format(
+                e, current_config))
+            # if format is not understood, cannot be compared, assume it is ok
+            return True
diff --git a/osm_lcm/tests/test_prometheus.py b/osm_lcm/tests/test_prometheus.py
new file mode 100644 (file)
index 0000000..064ede8
--- /dev/null
@@ -0,0 +1,130 @@
+##
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: alfonso.tiernosepulveda@telefonica.com
+##
+
+import asynctest
+from osm_lcm.prometheus import Prometheus, initial_prometheus_data
+from asynctest.mock import Mock
+from osm_common.dbmemory import DbMemory
+
+__author__ = 'Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>'
+
+
+class TestPrometheus(asynctest.TestCase):
+
+    async def setUp(self):
+        config = {'uri': 'http:prometheus:9090',
+                  'path': '/etc/prometheus'}
+        self.db = Mock(DbMemory())
+        self.p = Prometheus(config, worker_id='1', db=self.db, loop=self.loop)
+
+    @asynctest.fail_on(active_handles=True)
+    async def test_start(self):
+        # test with database empty
+        self.db.get_one.return_value = False
+        self.p.update = asynctest.CoroutineMock()
+        await self.p.start()
+        self.db.create.assert_called_once_with('admin', initial_prometheus_data)
+        self.p.update.assert_called_once_with()
+
+        # test with database not empty
+        self.db.create.reset_mock()
+        self.db.get_one.return_value = initial_prometheus_data
+        self.p.update.reset_mock()
+        await self.p.start()
+        self.db.create.assert_not_called()
+        self.p.update.assert_called_once_with()
+
+    @asynctest.fail_on(active_handles=True)
+    async def test_update(self):
+        self.p.PROMETHEUS_LOCKED_TIME = 1
+        number_call_set_one = 0
+
+        def _db_set_one(*args, **kwargs):
+            # simulated that database is not locked at first call
+            nonlocal number_call_set_one
+
+            number_call_set_one += 1
+            if number_call_set_one == 1:
+                return
+            else:
+                return {'update': 1}
+
+        def _check_set_one_calls(set_one_calls):
+            # check the three calls to database set_one
+            self.assertEqual(len(set_one_calls), 3, 'Not called three times to db.set_one, two blocks, one unblock')
+            self.assertIn('admin', set_one_calls[0][0], 'db.set_one collection should be admin')
+            first_used_time = set_one_calls[0][1]['update_dict']['_admin.locked_at']
+            second_used_time = set_one_calls[1][1]['update_dict']['_admin.locked_at']
+            third_used_time = set_one_calls[2][1]['update_dict']['_admin.locked_at']
+            self.assertTrue(first_used_time != 0 and second_used_time != 0, 'blocking locked_at time must not be 0')
+            self.assertGreater(second_used_time, first_used_time,
+                               'Every blocking try must contain a new locked_at time')
+            self.assertEqual(third_used_time, 0, 'For unblocking must be set locked_at=0')
+
+        # check add_jobs
+        number_call_set_one = 0
+        self.db.get_one.return_value = initial_prometheus_data
+        self.db.set_one.side_effect = _db_set_one
+        self.p.send_data = asynctest.CoroutineMock(return_value=True)
+        add_jobs = {'job1': {'job_name': 'job1', 'nsr_id': 'nsr_id'}}
+        await self.p.update(add_jobs=add_jobs)
+        set_one_calls = self.db.set_one.call_args_list
+        _check_set_one_calls(set_one_calls)
+        update_dict = set_one_calls[2][1]['update_dict']
+        unset_dict = set_one_calls[2][1]['unset']
+        expected_final_set = {
+            '_admin.locked_at': 0,
+            '_admin.locked_by': None,
+            '_admin.modified_at': set_one_calls[1][1]['update_dict']['_admin.locked_at'],
+            'scrape_configs.job1': add_jobs['job1']}
+        self.assertEqual(update_dict, expected_final_set, 'invalid set and unlock values')
+        self.assertEqual(unset_dict, None, 'invalid unset and unlock values')
+
+        # check remove_jobs
+        number_call_set_one = 0
+        remove_jobs = ['job1']
+        self.db.set_one.reset_mock()
+        await self.p.update(remove_jobs=remove_jobs)
+        set_one_calls = self.db.set_one.call_args_list
+        _check_set_one_calls(set_one_calls)
+        update_dict = set_one_calls[2][1]['update_dict']
+        unset_dict = set_one_calls[2][1]['unset']
+        expected_final_set = {
+            '_admin.locked_at': 0,
+            '_admin.locked_by': None,
+            '_admin.modified_at': set_one_calls[1][1]['update_dict']['_admin.locked_at']
+        }
+        self.assertEqual(update_dict, expected_final_set, 'invalid set and unlock values')
+        self.assertEqual(unset_dict, {'scrape_configs.job1': None}, 'invalid unset and unlock values')
+
+    def test_parse_job(self):
+        text_to_parse = """
+            # yaml format with jinja2
+            key1: "parsing var1='{{ var1 }}'"
+            key2: "parsing var2='{{ var2 }}'"
+        """
+        vars = {'var1': 'VAR1', 'var2': 'VAR2', 'var3': 'VAR3'}
+        expected = {
+            'key1': "parsing var1='VAR1'",
+            'key2': "parsing var2='VAR2'"
+        }
+        result = self.p.parse_job(text_to_parse, vars)
+        self.assertEqual(result, expected, 'Error at jinja2 parse')
+
+
+if __name__ == '__main__':
+    asynctest.main()
diff --git a/tox.ini b/tox.ini
index 582a897..40b0366 100644 (file)
--- a/tox.ini
+++ b/tox.ini
@@ -48,7 +48,7 @@ commands = flake8 osm_lcm --max-line-length 120 \
 [testenv:unittest]
 basepython = python3
 deps = asynctest
-commands = python3 -m unittest osm_lcm.tests.test_ns
+commands = python3 -m unittest osm_lcm.tests.test_ns osm_lcm.tests.test_prometheus
 
 [testenv:build]
 basepython = python3