From 569d88829af5fb1f1a8a74660d6e72050f7c6e27 Mon Sep 17 00:00:00 2001 From: bravof Date: Tue, 11 May 2021 07:38:47 -0400 Subject: [PATCH] feat(prometheus): prometheus config no longer depends on LCM Change-Id: I444ede724b50142244d2149fc276ae171a72a112 Signed-off-by: bravof (cherry picked from commit 73bac504fefbcddeed4551d97adc0ea13eebd705) --- osm_lcm/lcm.py | 23 +-- osm_lcm/ns.py | 42 ++++-- osm_lcm/prometheus.py | 252 ++----------------------------- osm_lcm/tests/test_prometheus.py | 128 +--------------- 4 files changed, 48 insertions(+), 397 deletions(-) diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py index 6202b32..060aa07 100644 --- a/osm_lcm/lcm.py +++ b/osm_lcm/lcm.py @@ -29,7 +29,7 @@ import logging.handlers import getopt import sys -from osm_lcm import ns, prometheus, vim_sdn, netslice +from osm_lcm import ns, vim_sdn, netslice from osm_lcm.ng_ro import NgRoException, NgRoClient from osm_lcm.ROclient import ROClient, ROClientException @@ -215,20 +215,6 @@ class Lcm: # contains created tasks/futures to be able to cancel self.lcm_tasks = TaskRegistry(self.worker_id, self.logger) - if self.config.get("tsdb") and self.config["tsdb"].get("driver"): - if self.config["tsdb"]["driver"] == "prometheus": - self.prometheus = prometheus.Prometheus( - self.config["tsdb"], self.worker_id, self.loop - ) - else: - raise LcmException( - "Invalid configuration param '{}' at '[tsdb]':'driver'".format( - config["tsdb"]["driver"] - ) - ) - else: - self.prometheus = None - async def check_RO_version(self): tries = 14 last_error = None @@ -686,7 +672,7 @@ class Lcm: self.loop.run_until_complete(self.check_RO_version()) self.ns = ns.NsLcm( - self.msg, self.lcm_tasks, self.config, self.loop, self.prometheus + self.msg, self.lcm_tasks, self.config, self.loop ) self.netslice = netslice.NetsliceLcm( self.msg, self.lcm_tasks, self.config, self.loop, self.ns @@ -702,13 +688,10 @@ class Lcm: self.msg, self.lcm_tasks, self.config, self.loop ) - # configure tsdb prometheus - if self.prometheus: - self.loop.run_until_complete(self.prometheus.start()) - self.loop.run_until_complete( asyncio.gather(self.kafka_read(), self.kafka_ping()) ) + # TODO # self.logger.debug("Terminating cancelling creation tasks") # self.lcm_tasks.cancel("ALL", "create") diff --git a/osm_lcm/ns.py b/osm_lcm/ns.py index 22bc226..7833df0 100644 --- a/osm_lcm/ns.py +++ b/osm_lcm/ns.py @@ -96,6 +96,7 @@ from n2vc.n2vc_juju_conn import N2VCJujuConnector from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException from osm_lcm.lcm_helm_conn import LCMHelmConn +from osm_lcm.prometheus import parse_job from copy import copy, deepcopy from time import time @@ -123,7 +124,7 @@ class NsLcm(LcmBase): SUBOPERATION_STATUS_SKIP = -3 task_name_deploy_vca = "Deploying VCA" - def __init__(self, msg, lcm_tasks, config, loop, prometheus=None): + def __init__(self, msg, lcm_tasks, config, loop): """ Init, Connect to database, filesystem storage, and messaging :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', @@ -200,8 +201,6 @@ class NsLcm(LcmBase): "helm-v3": self.conn_helm_ee, } - self.prometheus = prometheus - # create RO client self.RO = NgRoClient(self.loop, **self.ro_config) @@ -1987,7 +1986,7 @@ class NsLcm(LcmBase): # STEP 7 Configure metrics if vca_type == "helm" or vca_type == "helm-v3": - prometheus_jobs = await self.add_prometheus_metrics( + prometheus_jobs = await self.extract_prometheus_scrape_jobs( ee_id=ee_id, artifact_path=artifact_path, ee_config_descriptor=ee_config_descriptor, @@ -2002,6 +2001,17 @@ class NsLcm(LcmBase): {db_update_entry + "prometheus_jobs": prometheus_jobs}, ) + for job in prometheus_jobs: + self.db.set_one( + "prometheus_jobs", + { + "job_name": job["job_name"] + }, + job, + upsert=True, + fail_on_empty=False + ) + step = "instantiated at VCA" self.logger.debug(logging_text + step) @@ -3946,8 +3956,9 @@ class NsLcm(LcmBase): "nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False} ) - if vca_deployed.get("prometheus_jobs") and self.prometheus: - await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"]) + # Delete Prometheus Jobs if any + # This uses NSR_ID, so it will destroy any jobs under this index + self.db.del_list("prometheus_jobs", {"nsr_id": db_nslcmop["nsInstanceId"]}) if destroy_ee: await self.vca_map[vca_type].delete_execution_environment( @@ -6332,11 +6343,15 @@ class NsLcm(LcmBase): db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False ) - async def add_prometheus_metrics( - self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip + async def extract_prometheus_scrape_jobs( + self, + ee_id, + artifact_path, + ee_config_descriptor, + vnfr_id, + nsr_id, + target_ip ): - if not self.prometheus: - return # look if exist a file called 'prometheus*.j2' and artifact_content = self.fs.dir_ls(artifact_path) job_file = next( @@ -6363,7 +6378,7 @@ class NsLcm(LcmBase): "EXPORTER_POD_IP": host_name, "EXPORTER_POD_PORT": host_port, } - job_list = self.prometheus.parse_job(job_data, variables) + job_list = parse_job(job_data, variables) # ensure job_name is using the vnfr_id. Adding the metadata nsr_id for job in job_list: if ( @@ -6372,9 +6387,8 @@ class NsLcm(LcmBase): ): job["job_name"] = vnfr_id + "_" + str(randint(1, 10000)) job["nsr_id"] = nsr_id - job_dict = {jl["job_name"]: jl for jl in job_list} - if await self.prometheus.update(job_dict): - return list(job_dict.keys()) + job["vnfr_id"] = vnfr_id + return job_list def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str): """ diff --git a/osm_lcm/prometheus.py b/osm_lcm/prometheus.py index 6517d0b..9ea2989 100644 --- a/osm_lcm/prometheus.py +++ b/osm_lcm/prometheus.py @@ -16,253 +16,19 @@ # under the License. ## -import asyncio -from time import time -import logging -import aiohttp import yaml -import os from osm_lcm.lcm_utils import LcmException -from osm_common.dbbase import DbException -from osm_lcm.data_utils.database.database import Database from jinja2 import Template, TemplateError, TemplateNotFound, TemplateSyntaxError __author__ = "Alfonso Tierno " -initial_prometheus_data = { - "_id": "prometheus", - "_admin": { - "locked_at": 0, - "locked_by": None, - "modified": 1593445184, # 2020-06-29 - "created": 1593445184, - "version": "1.0", # to allow future version updates - }, - "scrape_configs": { # Dictionary at database. Converted to list before sending to prometheus - "mon_exporter": { - "static_configs": [{"targets": ["mon:8000"]}], - "job_name": "mon_exporter", - }, - }, - "global": {"evaluation_interval": "15s", "scrape_interval": "15s"}, - "rule_files": None, - "alerting": {"alertmanagers": [{"static_configs": [{"targets": None}]}]}, -} - -class Prometheus: - """ - Implements a class to update Prometheus - """ - - PROMETHEUS_LOCKED_TIME = 120 - - def __init__(self, config, worker_id, loop, logger=None): - self.worker_id = worker_id - self.db = Database().instance.db - self.loop = loop - self.logger = logger or logging.getLogger("lcm.prometheus") - self.server = config["uri"] - self.path = config["path"] - if not self.path.endswith("/"): - self.path += "/" - self.cfg_file = self.path + "prometheus.yml" - self.cfg_file_backup = self.path + "prometheus.yml-backup" - - @staticmethod - def parse_job(job_data: str, variables: dict) -> dict: - try: - template = Template(job_data) - job_parsed = template.render(variables or {}) - return yaml.safe_load(job_parsed) - except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e: - # TODO yaml exceptions - raise LcmException( - "Error parsing Jinja2 to prometheus job. job_data={}, variables={}. Error={}".format( - job_data, variables, e - ) - ) - - async def start(self): - for retry in range(4): - try: - # self.logger("Starting prometheus ") - # read from database - prometheus_data = self.db.get_one( - "admin", {"_id": "prometheus"}, fail_on_empty=False - ) - if not prometheus_data: - self.logger.info("Init db.admin.prometheus content") - self.db.create("admin", initial_prometheus_data) - # send database config file to prometheus. Ignore loading errors, as prometheus may be starting - # but at least an initial configuration file is set - await self.update() - return - except DbException as e: - if retry == 3: - raise LcmException( - "Max retries trying to init prometheus configuration: {}".format( - e - ) - ) - await asyncio.sleep(5, loop=self.loop) - - async def update(self, add_jobs: dict = None, remove_jobs: list = None) -> bool: - """ - - :param add_jobs: dictionary with {job_id_1: job_content, job_id_2: job_content} - :param remove_jobs: list with jobs to remove [job_id_1, job_id_2] - :return: result. If false prometheus denies this configuration. Exception on error - """ - for retry in range(20): - result = True - if retry: # first time do not wait - await asyncio.sleep(4 + retry, loop=self.loop) - - # lock database - now = time() - if not self.db.set_one( - "admin", - q_filter={ - "_id": "prometheus", - "_admin.locked_at.lt": now - self.PROMETHEUS_LOCKED_TIME, - }, - update_dict={ - "_admin.locked_at": now, - "_admin.locked_by": self.worker_id, - }, - fail_on_empty=False, - ): - continue - # read database - prometheus_data = self.db.get_one("admin", {"_id": "prometheus"}) - update_dict = {"_admin.locked_at": 0, "_admin.locked_by": None} - - # Make changes from prometheus_incremental - push_dict = pull_dict = None - if add_jobs or remove_jobs: - log_text_list = [] - if add_jobs: - log_text_list.append( - "adding jobs: {}".format(list(add_jobs.keys())) - ) - prometheus_data["scrape_configs"].update(add_jobs) - push_dict = { - "scrape_configs." + job_id: job_data - for job_id, job_data in add_jobs.items() - } - elif remove_jobs: - log_text_list.append("removing jobs: {}".format(list(remove_jobs))) - for job_id in remove_jobs: - prometheus_data["scrape_configs"].pop(job_id, None) - pull_dict = { - "scrape_configs." + job_id: None for job_id in remove_jobs - } - self.logger.debug("Updating. " + ". ".join(log_text_list)) - - if not await self.send_data(prometheus_data): - self.logger.error( - "Cannot update add_jobs: {}. remove_jobs: {}".format( - add_jobs, remove_jobs - ) - ) - push_dict = pull_dict = None - result = False - - # unblock database - if push_dict: - update_dict.update(push_dict) - if push_dict or pull_dict: - update_dict["_admin.modified_at"] = now - if not self.db.set_one( - "admin", - { - "_id": "prometheus", - "_admin.locked_at": now, - "_admin.locked_by": self.worker_id, - }, - update_dict=update_dict, - unset=pull_dict, - fail_on_empty=False, - ): - continue - return result - raise LcmException("Cannot update prometheus database. Reached max retries") - - async def send_data(self, new_config): - restore_backup = False - del new_config["_id"] - del new_config["_admin"] - new_scrape_configs = [] - - # generate a list with the values of scrape_configs - for scrape_config in new_config["scrape_configs"].values(): - scrape_config = scrape_config.copy() - # remove nsr_id metadata from scrape_configs - scrape_config.pop("nsr_id", None) - new_scrape_configs.append(scrape_config) - new_config["scrape_configs"] = new_scrape_configs - - try: - if os.path.exists(self.cfg_file): - os.rename(self.cfg_file, self.cfg_file_backup) - restore_backup = True - with open(self.cfg_file, "w+") as f: - yaml.safe_dump(new_config, f, indent=4, default_flow_style=False) - # self.logger.debug("new configuration: {}".format(yaml.safe_dump(new_config, indent=4, - # default_flow_style=False))) - async with aiohttp.ClientSession() as session: - async with session.post(self.server + "-/reload") as resp: - if resp.status > 204: - raise LcmException(await resp.text()) - await asyncio.sleep(5, loop=self.loop) - # If prometheus does not admit this configuration, remains with the old one - # Then, to check if the configuration has been accepted, get the configuration from prometheus - # and compares with the inserted one - async with session.get(self.server + "api/v1/status/config") as resp: - if resp.status > 204: - raise LcmException(await resp.text()) - current_config = await resp.json() - if not self._check_configuration_equal(current_config, new_config): - return False - else: - restore_backup = False - return True - except Exception as e: - self.logger.error( - "Error updating configuration url={}: {}".format(self.server, e) - ) - return False - finally: - if restore_backup: - try: - os.rename(self.cfg_file_backup, self.cfg_file) - except Exception as e: - self.logger.critical("Exception while rolling back: {}".format(e)) - - def _check_configuration_equal(self, current_config, expected_config): - try: - # self.logger.debug("Comparing current_config='{}' with expected_config='{}'".format(current_config, - # expected_config)) - current_config_yaml = yaml.safe_load(current_config["data"]["yaml"]) - current_jobs = [ - j["job_name"] for j in current_config_yaml["scrape_configs"] - ] - expected_jobs = [j["job_name"] for j in expected_config["scrape_configs"]] - if current_jobs == expected_jobs: - return True - else: - self.logger.error( - "Not all jobs have been loaded. Target jobs: {} Loaded jobs: {}".format( - expected_jobs, current_jobs - ) - ) - return False - except Exception as e: - self.logger.error( - "Invalid obtained status from server. Error: '{}'. Obtained data: '{}'".format( - e, current_config - ) - ) - # if format is not understood, cannot be compared, assume it is ok - return True +def parse_job(job_data: str, variables: dict) -> dict: + try: + template = Template(job_data) + job_parsed = template.render(variables or {}) + return yaml.safe_load(job_parsed) + except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e: + # TODO yaml exceptions + raise LcmException("Error parsing Jinja2 to prometheus job. job_data={}, variables={}. Error={}".format( + job_data, variables, e)) diff --git a/osm_lcm/tests/test_prometheus.py b/osm_lcm/tests/test_prometheus.py index 2eb81f5..67b460d 100644 --- a/osm_lcm/tests/test_prometheus.py +++ b/osm_lcm/tests/test_prometheus.py @@ -16,127 +16,12 @@ ## import asynctest -from osm_lcm.prometheus import Prometheus, initial_prometheus_data -from asynctest.mock import Mock -from osm_lcm.data_utils.database.database import Database +from osm_lcm.prometheus import parse_job __author__ = "Alfonso Tierno " class TestPrometheus(asynctest.TestCase): - async def setUp(self): - config = {"uri": "http:prometheus:9090", "path": "/etc/prometheus"} - # Cleanup singleton Database instance - Database.instance = None - - self.db = Mock(Database({"database": {"driver": "memory"}}).instance.db) - Database().instance.db = self.db - self.p = Prometheus(config, worker_id="1", loop=self.loop) - - @asynctest.fail_on(active_handles=True) - async def test_start(self): - # test with database empty - self.db.get_one.return_value = False - self.p.update = asynctest.CoroutineMock() - await self.p.start() - self.db.create.assert_called_once_with("admin", initial_prometheus_data) - self.p.update.assert_called_once_with() - - # test with database not empty - self.db.create.reset_mock() - self.db.get_one.return_value = initial_prometheus_data - self.p.update.reset_mock() - await self.p.start() - self.db.create.assert_not_called() - self.p.update.assert_called_once_with() - - @asynctest.fail_on(active_handles=True) - async def test_update(self): - self.p.PROMETHEUS_LOCKED_TIME = 1 - number_call_set_one = 0 - - def _db_set_one(*args, **kwargs): - # simulated that database is not locked at first call - nonlocal number_call_set_one - - number_call_set_one += 1 - if number_call_set_one == 1: - return - else: - return {"update": 1} - - def _check_set_one_calls(set_one_calls): - # check the three calls to database set_one - self.assertEqual( - len(set_one_calls), - 3, - "Not called three times to db.set_one, two blocks, one unblock", - ) - self.assertIn( - "admin", set_one_calls[0][0], "db.set_one collection should be admin" - ) - first_used_time = set_one_calls[0][1]["update_dict"]["_admin.locked_at"] - second_used_time = set_one_calls[1][1]["update_dict"]["_admin.locked_at"] - third_used_time = set_one_calls[2][1]["update_dict"]["_admin.locked_at"] - self.assertTrue( - first_used_time != 0 and second_used_time != 0, - "blocking locked_at time must not be 0", - ) - self.assertGreater( - second_used_time, - first_used_time, - "Every blocking try must contain a new locked_at time", - ) - self.assertEqual( - third_used_time, 0, "For unblocking must be set locked_at=0" - ) - - # check add_jobs - number_call_set_one = 0 - self.db.get_one.return_value = initial_prometheus_data - self.db.set_one.side_effect = _db_set_one - self.p.send_data = asynctest.CoroutineMock(return_value=True) - add_jobs = {"job1": {"job_name": "job1", "nsr_id": "nsr_id"}} - await self.p.update(add_jobs=add_jobs) - set_one_calls = self.db.set_one.call_args_list - _check_set_one_calls(set_one_calls) - update_dict = set_one_calls[2][1]["update_dict"] - unset_dict = set_one_calls[2][1]["unset"] - expected_final_set = { - "_admin.locked_at": 0, - "_admin.locked_by": None, - "_admin.modified_at": set_one_calls[1][1]["update_dict"][ - "_admin.locked_at" - ], - "scrape_configs.job1": add_jobs["job1"], - } - self.assertEqual( - update_dict, expected_final_set, "invalid set and unlock values" - ) - self.assertEqual(unset_dict, None, "invalid unset and unlock values") - - # check remove_jobs - number_call_set_one = 0 - remove_jobs = ["job1"] - self.db.set_one.reset_mock() - await self.p.update(remove_jobs=remove_jobs) - set_one_calls = self.db.set_one.call_args_list - _check_set_one_calls(set_one_calls) - update_dict = set_one_calls[2][1]["update_dict"] - unset_dict = set_one_calls[2][1]["unset"] - expected_final_set = { - "_admin.locked_at": 0, - "_admin.locked_by": None, - "_admin.modified_at": set_one_calls[1][1]["update_dict"][ - "_admin.locked_at" - ], - } - self.assertEqual( - update_dict, expected_final_set, "invalid set and unlock values" - ) - self.assertEqual( - unset_dict, {"scrape_configs.job1": None}, "invalid unset and unlock values" - ) def test_parse_job(self): text_to_parse = """ @@ -144,10 +29,13 @@ class TestPrometheus(asynctest.TestCase): key1: "parsing var1='{{ var1 }}'" key2: "parsing var2='{{ var2 }}'" """ - vars = {"var1": "VAR1", "var2": "VAR2", "var3": "VAR3"} - expected = {"key1": "parsing var1='VAR1'", "key2": "parsing var2='VAR2'"} - result = self.p.parse_job(text_to_parse, vars) - self.assertEqual(result, expected, "Error at jinja2 parse") + vars = {'var1': 'VAR1', 'var2': 'VAR2', 'var3': 'VAR3'} + expected = { + 'key1': "parsing var1='VAR1'", + 'key2': "parsing var2='VAR2'" + } + result = parse_job(text_to_parse, vars) + self.assertEqual(result, expected, 'Error at jinja2 parse') if __name__ == "__main__": -- 2.25.1