X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Fprometheus.py;h=c1f49b08a9fb746b3c04c292f70f0bbbe8b717ee;hb=89f8290b70918e151e6b6653c635ea6a05a22522;hp=397764f272adc82bed21aca219fe8182e84b9894;hpb=a43e672b0461451813b5366f05d710072413e23e;p=osm%2FLCM.git diff --git a/osm_lcm/prometheus.py b/osm_lcm/prometheus.py index 397764f..c1f49b0 100644 --- a/osm_lcm/prometheus.py +++ b/osm_lcm/prometheus.py @@ -24,7 +24,7 @@ import yaml import os from osm_lcm.lcm_utils import LcmException from osm_common.dbbase import DbException - +from jinja2 import Template, TemplateError, TemplateNotFound, TemplateSyntaxError __author__ = "Alfonso Tierno " @@ -37,14 +37,16 @@ initial_prometheus_data = { "created": 1593445184, "version": "1.0" # to allow future version updates }, - 'scrape_configs': [{'static_configs': [{'targets': ['mon:8000']}], 'job_name': 'mon_exporter'}], + 'scrape_configs': { # Dictionary at database. Converted to list before sending to prometheus + 'mon_exporter': {'static_configs': [{'targets': ['mon:8000']}], 'job_name': 'mon_exporter'}, + }, 'global': {'evaluation_interval': '15s', 'scrape_interval': '15s'}, 'rule_files': None, 'alerting': {'alertmanagers': [{'static_configs': [{'targets': None}]}]} } -class Prometheus(): +class Prometheus: """ Implements a class to update Prometheus """ @@ -55,7 +57,7 @@ class Prometheus(): self.worker_id = worker_id self.db = db self.loop = loop - self.logger = logger or logging.get_legger("lcm.prometheus") + self.logger = logger or logging.getLogger("lcm.prometheus") self.server = config["uri"] self.path = config["path"] if not self.path.endswith("/"): @@ -63,27 +65,47 @@ class Prometheus(): self.cfg_file = self.path + "prometheus.yml" self.cfg_file_backup = self.path + "prometheus.yml-backup" + @staticmethod + def parse_job(job_data: str, variables: dict) -> dict: + try: + template = Template(job_data) + job_parsed = template.render(variables or {}) + return yaml.safe_load(job_parsed) + except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e: + # TODO yaml exceptions + raise LcmException("Error parsing Jinja2 to prometheus job. job_data={}, variables={}. Error={}".format( + job_data, variables, e)) + async def start(self): for retry in range(4): try: + # self.logger("Starting prometheus ") # read from database - prometheus_data = self.db.get_one("admin", {"_id": "prometheus"}, fail_on_empty=True) + prometheus_data = self.db.get_one("admin", {"_id": "prometheus"}, fail_on_empty=False) if not prometheus_data: self.logger.info("Init db.admin.prometheus content") self.db.create("admin", initial_prometheus_data) # send database config file to prometheus. Ignore loading errors, as prometheus may be starting # but at least an initial configuration file is set await self.update() + return except DbException as e: if retry == 3: raise LcmException("Max retries trying to init prometheus configuration: {}".format(e)) await asyncio.sleep(5, loop=self.loop) - async def update(self, add_jobs=None, remove_jobs=None): + async def update(self, add_jobs: dict = None, remove_jobs: list = None) -> bool: + """ + + :param add_jobs: dictionary with {job_id_1: job_content, job_id_2: job_content} + :param remove_jobs: list with jobs to remove [job_id_1, job_id_2] + :return: result. If false prometheus denies this configuration. Exception on error + """ for retry in range(4): result = True if retry: # first time do not wait await asyncio.sleep(self.PROMETHEUS_LOCKED_TIME / 2, loop=self.loop) + # lock database now = time() if not self.db.set_one( @@ -94,60 +116,99 @@ class Prometheus(): continue # read database prometheus_data = self.db.get_one("admin", {"_id": "prometheus"}) + update_dict = {"_admin.locked_at": 0, + "_admin.locked_by": None} # Make changes from prometheus_incremental - push_list = pull_list = None + push_dict = pull_dict = None if add_jobs or remove_jobs: - update_dict = {"_admin.locked_at": 0, - "_admin.locked_by": None, - "_admin.modified_at": now} + log_text_list = [] if add_jobs: - push_list = {"scrape_configs.static_configs": add_jobs} - prometheus_data["scrape_configs"]["static_configs"] += add_jobs + log_text_list.append("adding jobs: {}".format(list(add_jobs.keys()))) + prometheus_data["scrape_configs"].update(add_jobs) + push_dict = {"scrape_configs." + job_id: job_data for job_id, job_data in add_jobs.items()} elif remove_jobs: - pass # TODO - if not self.send_data(prometheus_data): - push_list = pull_list = None - result = False + log_text_list.append("removing jobs: {}".format(list(remove_jobs))) + for job_id in remove_jobs: + prometheus_data["scrape_configs"].pop(job_id, None) + pull_dict = {"scrape_configs." + job_id: None for job_id in remove_jobs} + self.logger.debug(". ".join(log_text_list)) + + if not await self.send_data(prometheus_data): + push_dict = pull_dict = None + result = False # unblock database + if push_dict: + update_dict.update(push_dict) + if push_dict or pull_dict: + update_dict["_admin.modified_at"] = now if not self.db.set_one( "admin", {"_id": "prometheus", "_admin.locked_at": now, "_admin.locked_by": self.worker_id}, - update_dict=update_dict, pull_list=pull_list, push_list=push_list, fail_on_empty=False): + update_dict=update_dict, unset=pull_dict, fail_on_empty=False): continue return result raise LcmException("Cannot update prometheus database. Reached max retries") async def send_data(self, new_config): restore_backup = False + del new_config["_id"] + del new_config["_admin"] + new_scrape_configs = [] + + # generate a list with the values of scrape_configs + for scrape_config in new_config["scrape_configs"].values(): + scrape_config = scrape_config.copy() + # remove nsr_id metadata from scrape_configs + scrape_config.pop("nsr_id", None) + new_scrape_configs.append(scrape_config) + new_config["scrape_configs"] = new_scrape_configs + try: if os.path.exists(self.cfg_file): os.rename(self.cfg_file, self.cfg_file_backup) restore_backup = True with open(self.cfg_file, "w+") as f: - yaml.dump(new_config, f) + yaml.safe_dump(new_config, f, indent=4, default_flow_style=False) + # self.logger.debug("new configuration: {}".format(yaml.safe_dump(new_config, indent=4, + # default_flow_style=False))) async with aiohttp.ClientSession() as session: - async with session.post(self.server + "/-/reload") as resp: + async with session.post(self.server + "-/reload") as resp: if resp.status > 204: - raise LcmException(resp.text) + raise LcmException(await resp.text()) await asyncio.sleep(5, loop=self.loop) - async with session.get(self.server + "/api/v1/status/config") as resp: + # If prometheus does not admit this configuration, remains with the old one + # Then, to check if the configuration has been accepted, get the configuration from prometheus + # and compares with the inserted one + async with session.get(self.server + "api/v1/status/config") as resp: if resp.status > 204: - raise LcmException(resp.text) - current_config = resp.json() + raise LcmException(await resp.text()) + current_config = await resp.json() if not self._check_configuration_equal(current_config, new_config): return False else: restore_backup = False return True except Exception as e: - self.logger.error("Error updating prometheus configuration {}".format(e)) + self.logger.error("Error updating prometheus configuration url={}: {}".format(self.server, e)) return False finally: if restore_backup: - os.rename(self.cfg_file_backup, self.cfg_file) + try: + os.rename(self.cfg_file_backup, self.cfg_file) + except Exception as e: + self.logger.critical("Exception while rolling back: {}".format(e)) - @staticmethod - def _check_configuration_equal(current_config, new_config): - # TODO compare and return True if equal - return True + def _check_configuration_equal(self, current_config, expected_config): + try: + # self.logger.debug("Comparing current_config='{}' with expected_config='{}'".format(current_config, + # expected_config)) + current_config_yaml = yaml.safe_load(current_config['data']['yaml']) + current_jobs = [j["job_name"] for j in current_config_yaml["scrape_configs"]] + expected_jobs = [j["job_name"] for j in expected_config["scrape_configs"]] + return current_jobs == expected_jobs + except Exception as e: + self.logger.error("Invalid obtained prometheus status. Error: '{}'. Obtained data: '{}'".format( + e, current_config)) + # if format is not understood, cannot be compared, assume it is ok + return True