X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Fprometheus.py;h=6517d0bf375b27442fcc7ae7568bc8bbd683666c;hb=refs%2Fchanges%2F34%2F10534%2F3;hp=1a4793fdcd9dc289526d2a183b6b2ae8ffb63f66;hpb=a278b841363dfc6f759120406ed19d95c4966e55;p=osm%2FLCM.git diff --git a/osm_lcm/prometheus.py b/osm_lcm/prometheus.py index 1a4793f..6517d0b 100644 --- a/osm_lcm/prometheus.py +++ b/osm_lcm/prometheus.py @@ -24,6 +24,7 @@ import yaml import os from osm_lcm.lcm_utils import LcmException from osm_common.dbbase import DbException +from osm_lcm.data_utils.database.database import Database from jinja2 import Template, TemplateError, TemplateNotFound, TemplateSyntaxError __author__ = "Alfonso Tierno " @@ -35,14 +36,17 @@ initial_prometheus_data = { "locked_by": None, "modified": 1593445184, # 2020-06-29 "created": 1593445184, - "version": "1.0" # to allow future version updates + "version": "1.0", # to allow future version updates }, - 'scrape_configs': { # Dictionary at database. Converted to list before sending to prometheus - 'mon_exporter': {'static_configs': [{'targets': ['mon:8000']}], 'job_name': 'mon_exporter'}, + "scrape_configs": { # Dictionary at database. Converted to list before sending to prometheus + "mon_exporter": { + "static_configs": [{"targets": ["mon:8000"]}], + "job_name": "mon_exporter", + }, }, - 'global': {'evaluation_interval': '15s', 'scrape_interval': '15s'}, - 'rule_files': None, - 'alerting': {'alertmanagers': [{'static_configs': [{'targets': None}]}]} + "global": {"evaluation_interval": "15s", "scrape_interval": "15s"}, + "rule_files": None, + "alerting": {"alertmanagers": [{"static_configs": [{"targets": None}]}]}, } @@ -53,9 +57,9 @@ class Prometheus: PROMETHEUS_LOCKED_TIME = 120 - def __init__(self, config, worker_id, db, loop, logger=None): + def __init__(self, config, worker_id, loop, logger=None): self.worker_id = worker_id - self.db = db + self.db = Database().instance.db self.loop = loop self.logger = logger or logging.getLogger("lcm.prometheus") self.server = config["uri"] @@ -73,15 +77,20 @@ class Prometheus: return yaml.safe_load(job_parsed) except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e: # TODO yaml exceptions - raise LcmException("Error parsing Jinja2 to prometheus job. job_data={}, variables={}. Error={}".format( - job_data, variables, e)) + raise LcmException( + "Error parsing Jinja2 to prometheus job. job_data={}, variables={}. Error={}".format( + job_data, variables, e + ) + ) async def start(self): for retry in range(4): try: # self.logger("Starting prometheus ") # read from database - prometheus_data = self.db.get_one("admin", {"_id": "prometheus"}, fail_on_empty=False) + prometheus_data = self.db.get_one( + "admin", {"_id": "prometheus"}, fail_on_empty=False + ) if not prometheus_data: self.logger.info("Init db.admin.prometheus content") self.db.create("admin", initial_prometheus_data) @@ -91,7 +100,11 @@ class Prometheus: return except DbException as e: if retry == 3: - raise LcmException("Max retries trying to init prometheus configuration: {}".format(e)) + raise LcmException( + "Max retries trying to init prometheus configuration: {}".format( + e + ) + ) await asyncio.sleep(5, loop=self.loop) async def update(self, add_jobs: dict = None, remove_jobs: list = None) -> bool: @@ -101,40 +114,58 @@ class Prometheus: :param remove_jobs: list with jobs to remove [job_id_1, job_id_2] :return: result. If false prometheus denies this configuration. Exception on error """ - for retry in range(4): + for retry in range(20): result = True if retry: # first time do not wait - await asyncio.sleep(self.PROMETHEUS_LOCKED_TIME / 2, loop=self.loop) + await asyncio.sleep(4 + retry, loop=self.loop) # lock database now = time() if not self.db.set_one( - "admin", - q_filter={"_id": "prometheus", "_admin.locked_at.lt": now - self.PROMETHEUS_LOCKED_TIME}, - update_dict={"_admin.locked_at": now, "_admin.locked_by": self.worker_id}, - fail_on_empty=False): + "admin", + q_filter={ + "_id": "prometheus", + "_admin.locked_at.lt": now - self.PROMETHEUS_LOCKED_TIME, + }, + update_dict={ + "_admin.locked_at": now, + "_admin.locked_by": self.worker_id, + }, + fail_on_empty=False, + ): continue # read database prometheus_data = self.db.get_one("admin", {"_id": "prometheus"}) - update_dict = {"_admin.locked_at": 0, - "_admin.locked_by": None} + update_dict = {"_admin.locked_at": 0, "_admin.locked_by": None} # Make changes from prometheus_incremental push_dict = pull_dict = None if add_jobs or remove_jobs: log_text_list = [] if add_jobs: - log_text_list.append("adding jobs: {}".format(list(add_jobs.keys()))) + log_text_list.append( + "adding jobs: {}".format(list(add_jobs.keys())) + ) prometheus_data["scrape_configs"].update(add_jobs) - push_dict = {"scrape_configs." + job_id: job_data for job_id, job_data in add_jobs.items()} + push_dict = { + "scrape_configs." + job_id: job_data + for job_id, job_data in add_jobs.items() + } elif remove_jobs: log_text_list.append("removing jobs: {}".format(list(remove_jobs))) for job_id in remove_jobs: prometheus_data["scrape_configs"].pop(job_id, None) - pull_dict = {"scrape_configs." + job_id: None for job_id in remove_jobs} - self.logger.debug(". ".join(log_text_list)) + pull_dict = { + "scrape_configs." + job_id: None for job_id in remove_jobs + } + self.logger.debug("Updating. " + ". ".join(log_text_list)) if not await self.send_data(prometheus_data): + self.logger.error( + "Cannot update add_jobs: {}. remove_jobs: {}".format( + add_jobs, remove_jobs + ) + ) push_dict = pull_dict = None result = False @@ -144,8 +175,16 @@ class Prometheus: if push_dict or pull_dict: update_dict["_admin.modified_at"] = now if not self.db.set_one( - "admin", {"_id": "prometheus", "_admin.locked_at": now, "_admin.locked_by": self.worker_id}, - update_dict=update_dict, unset=pull_dict, fail_on_empty=False): + "admin", + { + "_id": "prometheus", + "_admin.locked_at": now, + "_admin.locked_by": self.worker_id, + }, + update_dict=update_dict, + unset=pull_dict, + fail_on_empty=False, + ): continue return result raise LcmException("Cannot update prometheus database. Reached max retries") @@ -190,7 +229,9 @@ class Prometheus: restore_backup = False return True except Exception as e: - self.logger.error("Error updating prometheus configuration url={}: {}".format(self.server, e)) + self.logger.error( + "Error updating configuration url={}: {}".format(self.server, e) + ) return False finally: if restore_backup: @@ -203,12 +244,25 @@ class Prometheus: try: # self.logger.debug("Comparing current_config='{}' with expected_config='{}'".format(current_config, # expected_config)) - current_config_yaml = yaml.safe_load(current_config['data']['yaml']) - current_jobs = [j["job_name"] for j in current_config_yaml["scrape_configs"]] + current_config_yaml = yaml.safe_load(current_config["data"]["yaml"]) + current_jobs = [ + j["job_name"] for j in current_config_yaml["scrape_configs"] + ] expected_jobs = [j["job_name"] for j in expected_config["scrape_configs"]] - return current_jobs == expected_jobs + if current_jobs == expected_jobs: + return True + else: + self.logger.error( + "Not all jobs have been loaded. Target jobs: {} Loaded jobs: {}".format( + expected_jobs, current_jobs + ) + ) + return False except Exception as e: - self.logger.error("Invalid obtained prometheus status. Error: '{}'. Obtained data: '{}'".format( - e, current_config)) + self.logger.error( + "Invalid obtained status from server. Error: '{}'. Obtained data: '{}'".format( + e, current_config + ) + ) # if format is not understood, cannot be compared, assume it is ok return True