From: tierno Date: Fri, 3 Jul 2020 14:52:28 +0000 (+0000) Subject: fixing prometheus metric exporter issues X-Git-Tag: release-v9.0-start~53 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=b996d94c0f74b1922d7f93a51ec328c6f370ff86;p=osm%2FLCM.git fixing prometheus metric exporter issues Use the same instance of ns at netslice. Avoid two N2VC connections Change-Id: I346c08c111e5ffc7dbc1768851dc069d2cda10d1 Signed-off-by: tierno --- diff --git a/osm_lcm/lcm.cfg b/osm_lcm/lcm.cfg index 8f141f1..4aadf22 100644 --- a/osm_lcm/lcm.cfg +++ b/osm_lcm/lcm.cfg @@ -84,3 +84,11 @@ message: # loglevel: DEBUG # logfile: /var/log/osm/lcm-message.log group_id: lcm-server + +prometheus: + driver: prometheus + # local file to store the configuration + path: /etc/prometheus + uri: http://prometheus:9090/ + # loglevel: DEBUG + # logfile: /var/log/osm/lcm-message.log diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py index 5d27277..b914604 100644 --- a/osm_lcm/lcm.py +++ b/osm_lcm/lcm.py @@ -29,9 +29,7 @@ import logging.handlers import getopt import sys -from osm_lcm import ns -from osm_lcm import vim_sdn -from osm_lcm import netslice +from osm_lcm import ns, prometheus, vim_sdn, netslice from osm_lcm.ng_ro import NgRoException, NgRoClient from osm_lcm.ROclient import ROClient, ROClientException @@ -192,8 +190,13 @@ class Lcm: # contains created tasks/futures to be able to cancel self.lcm_tasks = TaskRegistry(self.worker_id, self.db, self.logger) - self.ns = ns.NsLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop) - self.netslice = netslice.NetsliceLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop) + if self.config.get("prometheus"): + self.prometheus = prometheus.Prometheus(self.config["prometheus"], self.worker_id, self.db, self.loop) + else: + self.prometheus = None + self.ns = ns.NsLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop, self.prometheus) + self.netslice = netslice.NetsliceLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop, + self.ns) self.vim = vim_sdn.VimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop) self.wim = vim_sdn.WimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop) self.sdn = vim_sdn.SdnLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop) @@ -506,6 +509,10 @@ class Lcm: # check RO version self.loop.run_until_complete(self.check_RO_version()) + # configure Prometheus + if self.prometheus: + self.loop.run_until_complete(self.prometheus.start()) + self.loop.run_until_complete(asyncio.gather( self.kafka_read(), self.kafka_ping() diff --git a/osm_lcm/lcm_helm_conn.py b/osm_lcm/lcm_helm_conn.py index 146da9d..c18405b 100644 --- a/osm_lcm/lcm_helm_conn.py +++ b/osm_lcm/lcm_helm_conn.py @@ -123,6 +123,7 @@ class LCMHelmConn(N2VCConnector): reuse_ee_id: str = None, progress_timeout: float = None, total_timeout: float = None, + config: dict = None, artifact_path: str = None, vca_type: str = None) -> (str, dict): """ @@ -137,8 +138,9 @@ class LCMHelmConn(N2VCConnector): :param str reuse_ee_id: ee id from an older execution. TODO - right now this params is not used :param float progress_timeout: :param float total_timeout: - :param str artifact_path path of package content - :param str vca_type Type of vca, not used as assumed of type helm + :param dict config: General variables to instantiate KDU + :param str artifact_path: path of package content + :param str vca_type: Type of vca, not used as assumed of type helm :returns str, dict: id of the new execution environment including namespace.helm_id and credentials object set to None as all credentials should be osm kubernetes .kubeconfig """ @@ -177,10 +179,16 @@ class LCMHelmConn(N2VCConnector): # Call helm conn install # Obtain system cluster id from database system_cluster_uuid = self._get_system_cluster_id() + # Add parameter osm if exist to global + if config and config.get("osm"): + if not config.get("global"): + config["global"] = {} + config["global"]["osm"] = config.get("osm") self.log.debug("install helm chart: {}".format(full_path)) helm_id = await self._k8sclusterhelm.install(system_cluster_uuid, kdu_model=full_path, namespace=self._KUBECTL_OSM_NAMESPACE, + params=config, db_dict=db_dict, timeout=progress_timeout) diff --git a/osm_lcm/netslice.py b/osm_lcm/netslice.py index 4d731af..6b0a6ca 100644 --- a/osm_lcm/netslice.py +++ b/osm_lcm/netslice.py @@ -17,7 +17,7 @@ import asyncio import logging import logging.handlers import traceback -from osm_lcm import ROclient, ns +from osm_lcm import ROclient from osm_lcm.lcm_utils import LcmException, LcmBase, populate_dict, get_iterable, deep_get from osm_common.dbbase import DbException from time import time @@ -31,7 +31,7 @@ class NetsliceLcm(LcmBase): timeout_nsi_deploy = 2 * 3600 # default global timeout for deployment a nsi - def __init__(self, db, msg, fs, lcm_tasks, config, loop): + def __init__(self, db, msg, fs, lcm_tasks, config, loop, ns): """ Init, Connect to database, filesystem storage, and messaging :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', @@ -41,7 +41,7 @@ class NetsliceLcm(LcmBase): self.logger = logging.getLogger('lcm.netslice') self.loop = loop self.lcm_tasks = lcm_tasks - self.ns = ns.NsLcm(db, msg, fs, lcm_tasks, config, loop) + self.ns = ns self.ro_config = config["ro_config"] self.timeout = config["timeout"] diff --git a/osm_lcm/ns.py b/osm_lcm/ns.py index 5141f6d..62f010f 100644 --- a/osm_lcm/ns.py +++ b/osm_lcm/ns.py @@ -43,6 +43,7 @@ from http import HTTPStatus from time import time from uuid import uuid4 from functools import partial +from random import randint __author__ = "Alfonso Tierno " @@ -51,7 +52,8 @@ class N2VCJujuConnectorLCM(N2VCJujuConnector): async def create_execution_environment(self, namespace: str, db_dict: dict, reuse_ee_id: str = None, progress_timeout: float = None, total_timeout: float = None, - artifact_path: str = None, vca_type: str = None) -> (str, dict): + config: dict = None, artifact_path: str = None, + vca_type: str = None) -> (str, dict): # admit two new parameters, artifact_path and vca_type if vca_type == "k8s_proxy_charm": ee_id = await self.n2vc.install_k8s_proxy_charm( @@ -88,7 +90,7 @@ class NsLcm(LcmBase): SUBOPERATION_STATUS_SKIP = -3 task_name_deploy_vca = "Deploying VCA" - def __init__(self, db, msg, fs, lcm_tasks, config, loop): + def __init__(self, db, msg, fs, lcm_tasks, config, loop, prometheus=None): """ Init, Connect to database, filesystem storage, and messaging :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', @@ -163,6 +165,8 @@ class NsLcm(LcmBase): "helm": self.conn_helm_ee } + self.prometheus = prometheus + # create RO client if self.ng_ro: self.RO = NgRoClient(self.loop, **self.ro_config) @@ -1369,11 +1373,13 @@ class NsLcm(LcmBase): raise LcmException("Configuration aborted because dependent charm/s timeout") async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index, - config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name): + config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name, + ee_config_descriptor): nsr_id = db_nsr["_id"] db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index) vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"] vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index] + osm_config = {"osm": {"ns_id": db_nsr["_id"]}} db_dict = { 'collection': 'nsrs', 'filter': {'_id': nsr_id}, @@ -1388,6 +1394,7 @@ class NsLcm(LcmBase): vnfr_id = None if db_vnfr: vnfr_id = db_vnfr["_id"] + osm_config["osm"]["vnf_id"] = vnfr_id namespace = "{nsi}.{ns}".format( nsi=nsi_id if nsi_id else "", @@ -1401,10 +1408,12 @@ class NsLcm(LcmBase): namespace += ".{}-{}".format(vdu_id, vdu_index or 0) element_type = 'VDU' element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0) + osm_config["osm"]["vdu_id"] = vdu_id elif kdu_name: namespace += ".{}".format(kdu_name) element_type = 'KDU' element_under_configuration = kdu_name + osm_config["osm"]["kdu_name"] = kdu_name # Get artifact path artifact_path = "{}/{}/{}/{}".format( @@ -1436,6 +1445,7 @@ class NsLcm(LcmBase): namespace=namespace, reuse_ee_id=ee_id, db_dict=db_dict, + config=osm_config, artifact_path=artifact_path, vca_type=vca_type) @@ -1637,6 +1647,19 @@ class NsLcm(LcmBase): # TODO register in database that primitive is done + # STEP 7 Configure metrics + if vca_type == "helm": + prometheus_jobs = await self.add_prometheus_metrics( + ee_id=ee_id, + artifact_path=artifact_path, + ee_config_descriptor=ee_config_descriptor, + vnfr_id=vnfr_id, + nsr_id=nsr_id, + target_ip=rw_mgmt_ip, + ) + if prometheus_jobs: + self.update_db_2("nsrs", nsr_id, {db_update_entry + "prometheus_jobs": prometheus_jobs}) + step = "instantiated at VCA" self.logger.debug(logging_text + step) @@ -2596,7 +2619,8 @@ class NsLcm(LcmBase): nslcmop_id=nslcmop_id, stage=stage, vca_type=vca_type, - vca_name=vca_name + vca_name=vca_name, + ee_config_descriptor=ee_item ) ) self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc) @@ -2896,6 +2920,9 @@ class NsLcm(LcmBase): db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index) self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}) + if vca_deployed.get("prometheus_jobs") and self.prometheus: + await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"]) + if destroy_ee: await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"]) @@ -3122,7 +3149,7 @@ class NsLcm(LcmBase): self.logger.debug(logging_text + stage[0]) stage[1] = "Looking execution environment that needs terminate." self.logger.debug(logging_text + stage[1]) - self.logger.debug("nsr_deployed: {}".format(nsr_deployed)) + # self.logger.debug("nsr_deployed: {}".format(nsr_deployed)) for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")): self.logger.debug("vca_index: {}, vca: {}".format(vca_index, vca)) config_descriptor = None @@ -3143,17 +3170,14 @@ class NsLcm(LcmBase): config_descriptor = kdud.get("kdu-configuration") else: config_descriptor = db_vnfds_from_member_index[vca["member-vnf-index"]].get("vnf-configuration") - # For helm we must destroy_ee vca_type = vca.get("type") exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and vca.get("needed_terminate")) - self.logger.debug("vca type: {}".format(vca_type)) - if not vca_type == "helm": - task = asyncio.ensure_future(self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, - vca_index, False, exec_terminate_primitives)) - else: - task = asyncio.ensure_future(self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, - vca_index, True, exec_terminate_primitives)) + # For helm we must destroy_ee + destroy_ee = "True" if vca_type == "helm" else "False" + task = asyncio.ensure_future( + self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, vca_index, + destroy_ee, exec_terminate_primitives)) tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id")) # wait for pending tasks of terminate primitives @@ -4178,3 +4202,35 @@ class NsLcm(LcmBase): self.logger.error(logging_text + "kafka_write notification Exception {}".format(e)) self.logger.debug(logging_text + "Exit") self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale") + + async def add_prometheus_metrics(self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip): + if not self.prometheus: + return + # look if exist a file called 'prometheus*.j2' and + artifact_content = self.fs.dir_ls(artifact_path) + job_file = next((f for f in artifact_content if f.startswith("prometheus") and f.endswith(".j2")), None) + if not job_file: + return + with self.fs.file_open((artifact_path, job_file), "r") as f: + job_data = f.read() + + # TODO get_service + _, _, service = ee_id.partition(".") # remove prefix "namespace." + host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"]) + host_port = "80" + vnfr_id = vnfr_id.replace("-", "") + variables = { + "JOB_NAME": vnfr_id, + "TARGET_IP": target_ip, + "EXPORTER_POD_IP": host_name, + "EXPORTER_POD_PORT": host_port, + } + job_list = self.prometheus.parse_job(job_data, variables) + # ensure job_name is using the vnfr_id. Adding the metadata nsr_id + for job in job_list: + if not isinstance(job.get("job_name"), str) or vnfr_id not in job["job_name"]: + job["job_name"] = vnfr_id + "_" + str(randint(1, 10000)) + job["nsr_id"] = nsr_id + job_dict = {jl["job_name"]: jl for jl in job_list} + if await self.prometheus.update(job_dict): + return list(job_dict.keys()) diff --git a/osm_lcm/prometheus.py b/osm_lcm/prometheus.py index 397764f..c1f49b0 100644 --- a/osm_lcm/prometheus.py +++ b/osm_lcm/prometheus.py @@ -24,7 +24,7 @@ import yaml import os from osm_lcm.lcm_utils import LcmException from osm_common.dbbase import DbException - +from jinja2 import Template, TemplateError, TemplateNotFound, TemplateSyntaxError __author__ = "Alfonso Tierno " @@ -37,14 +37,16 @@ initial_prometheus_data = { "created": 1593445184, "version": "1.0" # to allow future version updates }, - 'scrape_configs': [{'static_configs': [{'targets': ['mon:8000']}], 'job_name': 'mon_exporter'}], + 'scrape_configs': { # Dictionary at database. Converted to list before sending to prometheus + 'mon_exporter': {'static_configs': [{'targets': ['mon:8000']}], 'job_name': 'mon_exporter'}, + }, 'global': {'evaluation_interval': '15s', 'scrape_interval': '15s'}, 'rule_files': None, 'alerting': {'alertmanagers': [{'static_configs': [{'targets': None}]}]} } -class Prometheus(): +class Prometheus: """ Implements a class to update Prometheus """ @@ -55,7 +57,7 @@ class Prometheus(): self.worker_id = worker_id self.db = db self.loop = loop - self.logger = logger or logging.get_legger("lcm.prometheus") + self.logger = logger or logging.getLogger("lcm.prometheus") self.server = config["uri"] self.path = config["path"] if not self.path.endswith("/"): @@ -63,27 +65,47 @@ class Prometheus(): self.cfg_file = self.path + "prometheus.yml" self.cfg_file_backup = self.path + "prometheus.yml-backup" + @staticmethod + def parse_job(job_data: str, variables: dict) -> dict: + try: + template = Template(job_data) + job_parsed = template.render(variables or {}) + return yaml.safe_load(job_parsed) + except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e: + # TODO yaml exceptions + raise LcmException("Error parsing Jinja2 to prometheus job. job_data={}, variables={}. Error={}".format( + job_data, variables, e)) + async def start(self): for retry in range(4): try: + # self.logger("Starting prometheus ") # read from database - prometheus_data = self.db.get_one("admin", {"_id": "prometheus"}, fail_on_empty=True) + prometheus_data = self.db.get_one("admin", {"_id": "prometheus"}, fail_on_empty=False) if not prometheus_data: self.logger.info("Init db.admin.prometheus content") self.db.create("admin", initial_prometheus_data) # send database config file to prometheus. Ignore loading errors, as prometheus may be starting # but at least an initial configuration file is set await self.update() + return except DbException as e: if retry == 3: raise LcmException("Max retries trying to init prometheus configuration: {}".format(e)) await asyncio.sleep(5, loop=self.loop) - async def update(self, add_jobs=None, remove_jobs=None): + async def update(self, add_jobs: dict = None, remove_jobs: list = None) -> bool: + """ + + :param add_jobs: dictionary with {job_id_1: job_content, job_id_2: job_content} + :param remove_jobs: list with jobs to remove [job_id_1, job_id_2] + :return: result. If false prometheus denies this configuration. Exception on error + """ for retry in range(4): result = True if retry: # first time do not wait await asyncio.sleep(self.PROMETHEUS_LOCKED_TIME / 2, loop=self.loop) + # lock database now = time() if not self.db.set_one( @@ -94,60 +116,99 @@ class Prometheus(): continue # read database prometheus_data = self.db.get_one("admin", {"_id": "prometheus"}) + update_dict = {"_admin.locked_at": 0, + "_admin.locked_by": None} # Make changes from prometheus_incremental - push_list = pull_list = None + push_dict = pull_dict = None if add_jobs or remove_jobs: - update_dict = {"_admin.locked_at": 0, - "_admin.locked_by": None, - "_admin.modified_at": now} + log_text_list = [] if add_jobs: - push_list = {"scrape_configs.static_configs": add_jobs} - prometheus_data["scrape_configs"]["static_configs"] += add_jobs + log_text_list.append("adding jobs: {}".format(list(add_jobs.keys()))) + prometheus_data["scrape_configs"].update(add_jobs) + push_dict = {"scrape_configs." + job_id: job_data for job_id, job_data in add_jobs.items()} elif remove_jobs: - pass # TODO - if not self.send_data(prometheus_data): - push_list = pull_list = None - result = False + log_text_list.append("removing jobs: {}".format(list(remove_jobs))) + for job_id in remove_jobs: + prometheus_data["scrape_configs"].pop(job_id, None) + pull_dict = {"scrape_configs." + job_id: None for job_id in remove_jobs} + self.logger.debug(". ".join(log_text_list)) + + if not await self.send_data(prometheus_data): + push_dict = pull_dict = None + result = False # unblock database + if push_dict: + update_dict.update(push_dict) + if push_dict or pull_dict: + update_dict["_admin.modified_at"] = now if not self.db.set_one( "admin", {"_id": "prometheus", "_admin.locked_at": now, "_admin.locked_by": self.worker_id}, - update_dict=update_dict, pull_list=pull_list, push_list=push_list, fail_on_empty=False): + update_dict=update_dict, unset=pull_dict, fail_on_empty=False): continue return result raise LcmException("Cannot update prometheus database. Reached max retries") async def send_data(self, new_config): restore_backup = False + del new_config["_id"] + del new_config["_admin"] + new_scrape_configs = [] + + # generate a list with the values of scrape_configs + for scrape_config in new_config["scrape_configs"].values(): + scrape_config = scrape_config.copy() + # remove nsr_id metadata from scrape_configs + scrape_config.pop("nsr_id", None) + new_scrape_configs.append(scrape_config) + new_config["scrape_configs"] = new_scrape_configs + try: if os.path.exists(self.cfg_file): os.rename(self.cfg_file, self.cfg_file_backup) restore_backup = True with open(self.cfg_file, "w+") as f: - yaml.dump(new_config, f) + yaml.safe_dump(new_config, f, indent=4, default_flow_style=False) + # self.logger.debug("new configuration: {}".format(yaml.safe_dump(new_config, indent=4, + # default_flow_style=False))) async with aiohttp.ClientSession() as session: - async with session.post(self.server + "/-/reload") as resp: + async with session.post(self.server + "-/reload") as resp: if resp.status > 204: - raise LcmException(resp.text) + raise LcmException(await resp.text()) await asyncio.sleep(5, loop=self.loop) - async with session.get(self.server + "/api/v1/status/config") as resp: + # If prometheus does not admit this configuration, remains with the old one + # Then, to check if the configuration has been accepted, get the configuration from prometheus + # and compares with the inserted one + async with session.get(self.server + "api/v1/status/config") as resp: if resp.status > 204: - raise LcmException(resp.text) - current_config = resp.json() + raise LcmException(await resp.text()) + current_config = await resp.json() if not self._check_configuration_equal(current_config, new_config): return False else: restore_backup = False return True except Exception as e: - self.logger.error("Error updating prometheus configuration {}".format(e)) + self.logger.error("Error updating prometheus configuration url={}: {}".format(self.server, e)) return False finally: if restore_backup: - os.rename(self.cfg_file_backup, self.cfg_file) + try: + os.rename(self.cfg_file_backup, self.cfg_file) + except Exception as e: + self.logger.critical("Exception while rolling back: {}".format(e)) - @staticmethod - def _check_configuration_equal(current_config, new_config): - # TODO compare and return True if equal - return True + def _check_configuration_equal(self, current_config, expected_config): + try: + # self.logger.debug("Comparing current_config='{}' with expected_config='{}'".format(current_config, + # expected_config)) + current_config_yaml = yaml.safe_load(current_config['data']['yaml']) + current_jobs = [j["job_name"] for j in current_config_yaml["scrape_configs"]] + expected_jobs = [j["job_name"] for j in expected_config["scrape_configs"]] + return current_jobs == expected_jobs + except Exception as e: + self.logger.error("Invalid obtained prometheus status. Error: '{}'. Obtained data: '{}'".format( + e, current_config)) + # if format is not understood, cannot be compared, assume it is ok + return True diff --git a/osm_lcm/tests/test_prometheus.py b/osm_lcm/tests/test_prometheus.py new file mode 100644 index 0000000..064ede8 --- /dev/null +++ b/osm_lcm/tests/test_prometheus.py @@ -0,0 +1,130 @@ +## +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# For those usages not covered by the Apache License, Version 2.0 please +# contact: alfonso.tiernosepulveda@telefonica.com +## + +import asynctest +from osm_lcm.prometheus import Prometheus, initial_prometheus_data +from asynctest.mock import Mock +from osm_common.dbmemory import DbMemory + +__author__ = 'Alfonso Tierno ' + + +class TestPrometheus(asynctest.TestCase): + + async def setUp(self): + config = {'uri': 'http:prometheus:9090', + 'path': '/etc/prometheus'} + self.db = Mock(DbMemory()) + self.p = Prometheus(config, worker_id='1', db=self.db, loop=self.loop) + + @asynctest.fail_on(active_handles=True) + async def test_start(self): + # test with database empty + self.db.get_one.return_value = False + self.p.update = asynctest.CoroutineMock() + await self.p.start() + self.db.create.assert_called_once_with('admin', initial_prometheus_data) + self.p.update.assert_called_once_with() + + # test with database not empty + self.db.create.reset_mock() + self.db.get_one.return_value = initial_prometheus_data + self.p.update.reset_mock() + await self.p.start() + self.db.create.assert_not_called() + self.p.update.assert_called_once_with() + + @asynctest.fail_on(active_handles=True) + async def test_update(self): + self.p.PROMETHEUS_LOCKED_TIME = 1 + number_call_set_one = 0 + + def _db_set_one(*args, **kwargs): + # simulated that database is not locked at first call + nonlocal number_call_set_one + + number_call_set_one += 1 + if number_call_set_one == 1: + return + else: + return {'update': 1} + + def _check_set_one_calls(set_one_calls): + # check the three calls to database set_one + self.assertEqual(len(set_one_calls), 3, 'Not called three times to db.set_one, two blocks, one unblock') + self.assertIn('admin', set_one_calls[0][0], 'db.set_one collection should be admin') + first_used_time = set_one_calls[0][1]['update_dict']['_admin.locked_at'] + second_used_time = set_one_calls[1][1]['update_dict']['_admin.locked_at'] + third_used_time = set_one_calls[2][1]['update_dict']['_admin.locked_at'] + self.assertTrue(first_used_time != 0 and second_used_time != 0, 'blocking locked_at time must not be 0') + self.assertGreater(second_used_time, first_used_time, + 'Every blocking try must contain a new locked_at time') + self.assertEqual(third_used_time, 0, 'For unblocking must be set locked_at=0') + + # check add_jobs + number_call_set_one = 0 + self.db.get_one.return_value = initial_prometheus_data + self.db.set_one.side_effect = _db_set_one + self.p.send_data = asynctest.CoroutineMock(return_value=True) + add_jobs = {'job1': {'job_name': 'job1', 'nsr_id': 'nsr_id'}} + await self.p.update(add_jobs=add_jobs) + set_one_calls = self.db.set_one.call_args_list + _check_set_one_calls(set_one_calls) + update_dict = set_one_calls[2][1]['update_dict'] + unset_dict = set_one_calls[2][1]['unset'] + expected_final_set = { + '_admin.locked_at': 0, + '_admin.locked_by': None, + '_admin.modified_at': set_one_calls[1][1]['update_dict']['_admin.locked_at'], + 'scrape_configs.job1': add_jobs['job1']} + self.assertEqual(update_dict, expected_final_set, 'invalid set and unlock values') + self.assertEqual(unset_dict, None, 'invalid unset and unlock values') + + # check remove_jobs + number_call_set_one = 0 + remove_jobs = ['job1'] + self.db.set_one.reset_mock() + await self.p.update(remove_jobs=remove_jobs) + set_one_calls = self.db.set_one.call_args_list + _check_set_one_calls(set_one_calls) + update_dict = set_one_calls[2][1]['update_dict'] + unset_dict = set_one_calls[2][1]['unset'] + expected_final_set = { + '_admin.locked_at': 0, + '_admin.locked_by': None, + '_admin.modified_at': set_one_calls[1][1]['update_dict']['_admin.locked_at'] + } + self.assertEqual(update_dict, expected_final_set, 'invalid set and unlock values') + self.assertEqual(unset_dict, {'scrape_configs.job1': None}, 'invalid unset and unlock values') + + def test_parse_job(self): + text_to_parse = """ + # yaml format with jinja2 + key1: "parsing var1='{{ var1 }}'" + key2: "parsing var2='{{ var2 }}'" + """ + vars = {'var1': 'VAR1', 'var2': 'VAR2', 'var3': 'VAR3'} + expected = { + 'key1': "parsing var1='VAR1'", + 'key2': "parsing var2='VAR2'" + } + result = self.p.parse_job(text_to_parse, vars) + self.assertEqual(result, expected, 'Error at jinja2 parse') + + +if __name__ == '__main__': + asynctest.main() diff --git a/tox.ini b/tox.ini index 582a897..40b0366 100644 --- a/tox.ini +++ b/tox.ini @@ -48,7 +48,7 @@ commands = flake8 osm_lcm --max-line-length 120 \ [testenv:unittest] basepython = python3 deps = asynctest -commands = python3 -m unittest osm_lcm.tests.test_ns +commands = python3 -m unittest osm_lcm.tests.test_ns osm_lcm.tests.test_prometheus [testenv:build] basepython = python3