From efa4c2b1e2baaf564d4550a45265b12330df5970 Mon Sep 17 00:00:00 2001 From: garciadeblas Date: Wed, 7 Sep 2022 22:35:53 +0200 Subject: [PATCH 1/1] Flake8 compliance in Prometheus/src/app.py Change-Id: I1f09864d979d0d300ad85d794e1a3bb9bebacf68 Signed-off-by: garciadeblas --- docker/Prometheus/src/app.py | 241 +++++++++++++++++++---------------- 1 file changed, 133 insertions(+), 108 deletions(-) diff --git a/docker/Prometheus/src/app.py b/docker/Prometheus/src/app.py index fc1e2bb1..43cca085 100755 --- a/docker/Prometheus/src/app.py +++ b/docker/Prometheus/src/app.py @@ -33,138 +33,163 @@ import time from bson.json_util import dumps from bson import ObjectId -#Env variables -mongodb_url = os.environ['MONGODB_URL'] -target_database = os.environ['TARGET_DATABASE'] -prometheus_config_file = os.environ['PROMETHEUS_CONFIG_FILE'] -prometheus_url = os.environ['PROMETHEUS_URL'] +# Env variables +mongodb_url = os.environ["MONGODB_URL"] +target_database = os.environ["TARGET_DATABASE"] +prometheus_config_file = os.environ["PROMETHEUS_CONFIG_FILE"] +prometheus_url = os.environ["PROMETHEUS_URL"] + def get_jobs(client): - return json.loads(dumps(client[target_database].prometheus_jobs.find({}))) + return json.loads(dumps(client[target_database].prometheus_jobs.find({}))) + def save_successful_jobs(client, jobs): - for job in jobs: - client[target_database].prometheus_jobs.update_one({ - "_id": ObjectId(job["_id"]["$oid"]) - }, { - "$set": { - "is_active": True - } - }) + for job in jobs: + client[target_database].prometheus_jobs.update_one( + {"_id": ObjectId(job["_id"]["$oid"])}, + {"$set": {"is_active": True}} + ) + def clean_up_job(prometheus_job): - cleaned_prometheus_job = copy.deepcopy(prometheus_job) - #take out _id and internal keys - cleaned_prometheus_job.pop('_id', None) - cleaned_prometheus_job.pop('is_active', None) - cleaned_prometheus_job.pop('vnfr_id', None) - cleaned_prometheus_job.pop('nsr_id', None) - return cleaned_prometheus_job + cleaned_prometheus_job = copy.deepcopy(prometheus_job) + # take out _id and internal keys + cleaned_prometheus_job.pop("_id", None) + cleaned_prometheus_job.pop("is_active", None) + cleaned_prometheus_job.pop("vnfr_id", None) + cleaned_prometheus_job.pop("nsr_id", None) + return cleaned_prometheus_job + def generate_prometheus_config(prometheus_jobs, config_file_path): - config_file = open(config_file_path, encoding='utf-8', mode='r') - config_file_contents = config_file.read() - config_file.close() + config_file = open(config_file_path, encoding="utf-8", mode="r") + config_file_contents = config_file.read() + config_file.close() - config_file_yaml = yaml.load(config_file_contents, yaml.FullLoader) - if config_file_yaml is None: - config_file_yaml = {} + config_file_yaml = yaml.load(config_file_contents, yaml.FullLoader) + if config_file_yaml is None: + config_file_yaml = {} + + if len(prometheus_jobs) == 0: + config_file_yaml["scrape_configs"] = [] + return config_file_yaml + + config_file_yaml["scrape_configs"] = [] + + for prometheus_job in prometheus_jobs: + cleaned_up_job = clean_up_job(prometheus_job) + config_file_yaml["scrape_configs"].append(cleaned_up_job) - if len(prometheus_jobs) == 0: - config_file_yaml['scrape_configs'] = [] return config_file_yaml - config_file_yaml['scrape_configs'] = [] - for prometheus_job in prometheus_jobs: - cleaned_up_job = clean_up_job(prometheus_job) - config_file_yaml['scrape_configs'].append(cleaned_up_job) +async def reload_prometheus_config(prom_url): + async with aiohttp.ClientSession() as session: + async with session.post(prom_url + "/-/reload") as resp: + if resp.status > 204: + print(f"Error while updating prometheus config: {resp.text()}") + return False + await asyncio.sleep(5) + return True - return config_file_yaml -async def reload_prometheus_config(the_prometheus_url): - async with aiohttp.ClientSession() as session: - async with session.post(the_prometheus_url + "/-/reload") as resp: - if resp.status > 204: - print(f"Error while updating prometheus config : {resp.text()}") +def check_configuration_equal(a_config, b_config): + if a_config is None and b_config is None: + return True + if a_config is None or b_config is None: return False - await asyncio.sleep(5) - return True + if "scrape_configs" not in a_config and "scrape_configs" not in b_config: + return True + if "scrape_configs" not in a_config or "scrape_configs" not in b_config: + return False + a_jobs = [j["job_name"] for j in a_config["scrape_configs"]] + b_jobs = [j["job_name"] for j in b_config["scrape_configs"]] + + return a_jobs == b_jobs + + +async def validate_configuration(prom_url, new_config): + async with aiohttp.ClientSession() as session: + # Gets the configuration from prometheus + # and compares with the inserted one + # If prometheus does not admit this configuration, + # the old one will remain + async with session.get(prom_url + "/api/v1/status/config") as resp: + if resp.status > 204: + print(f"Error while updating prometheus config: {resp.text()}") + return False + current_config = await resp.json() + return check_configuration_equal( + yaml.safe_load(current_config["data"]["yaml"]), new_config + ) -def check_configuration_equal(a_config, b_config): - if a_config is None and b_config is None: - return True - if a_config is None or b_config is None: - return False - if not "scrape_configs" in a_config and not "scrape_configs" in b_config: - return True - if not "scrape_configs" in a_config or not "scrape_configs" in b_config: - return False - a_jobs = [j["job_name"] for j in a_config["scrape_configs"]] - b_jobs = [j["job_name"] for j in b_config["scrape_configs"]] - - return a_jobs == b_jobs - -async def validate_configuration(the_prometheus_url, new_config): - async with aiohttp.ClientSession() as session: - # If prometheus does not admit this configuration, remains with the old one - # Then, to check if the configuration has been accepted, get the configuration from prometheus - # and compares with the inserted one - async with session.get(the_prometheus_url + "/api/v1/status/config") as resp: - if resp.status > 204: - print(f"Error while updating prometheus config : {resp.text()}") - return False - current_config = await resp.json() - return check_configuration_equal(yaml.load(current_config["data"]["yaml"], yaml.FullLoader), new_config) async def main_task(client): - stored_jobs = get_jobs(client) - print(f"Jobs detected : {len(stored_jobs):d}") - generated_prometheus_config = generate_prometheus_config(stored_jobs, prometheus_config_file) - print(f"Writing new config file to {prometheus_config_file}") - config_file = open(prometheus_config_file, "w") - config_file.truncate(0) - config_file.write(yaml.dump(generated_prometheus_config)) - config_file.close() - print("New config written, updating prometheus") - update_resp = await reload_prometheus_config(prometheus_url) - is_valid = await validate_configuration(prometheus_url, generated_prometheus_config) - if update_resp and is_valid: - print("Prometheus config update successful") - save_successful_jobs(client, stored_jobs) - else: - print("Error while updating prometheus config: current config doesn't match with updated values") + stored_jobs = get_jobs(client) + print(f"Jobs detected : {len(stored_jobs):d}") + generated_prometheus_config = generate_prometheus_config( + stored_jobs, prometheus_config_file + ) + print(f"Writing new config file to {prometheus_config_file}") + config_file = open(prometheus_config_file, "w") + config_file.truncate(0) + config_file.write(yaml.dump(generated_prometheus_config)) + config_file.close() + print("New config written, updating prometheus") + update_resp = await reload_prometheus_config(prometheus_url) + is_valid = await validate_configuration( + prometheus_url, generated_prometheus_config + ) + if update_resp and is_valid: + print("Prometheus config update successful") + save_successful_jobs(client, stored_jobs) + else: + print( + "Error while updating prometheus config: " + "current config doesn't match with updated values" + ) -async def main(): - client = pymongo.MongoClient(mongodb_url) - print('Connected to MongoDB!') - try: - print('Refreshing prometheus config file for first time') - await main_task(client) - except Exception as error: - print("Error in first configuration attempt!") - print(error) +async def main(): + client = pymongo.MongoClient(mongodb_url) + print("Connected to MongoDB!") - while(True): try: - #Needs mongodb in replica mode as this feature relies in OpLog - change_stream = client[target_database].prometheus_jobs.watch([{ - '$match': { - #If you want to modify a particular job, delete and insert it again - 'operationType': { '$in': ['insert', 'delete'] } - } - }]) - - #Single thread, no race conditions and ops are queued up in order - print("Listening to changes in prometheus jobs collection") - for change in change_stream: - print("Change detected, updating prometheus config") + print("Refreshing prometheus config file for first time") await main_task(client) - print() except Exception as error: - print(error) - print("Detected failure while listening to prometheus jobs collection, retrying...") - time.sleep(5) + print("Error in first configuration attempt!") + print(error) + + while True: + try: + # Needs mongodb in replica mode as this feature relies in OpLog + change_stream = client[target_database].prometheus_jobs.watch( + [ + { + "$match": { + # If you want to modify a particular job, + # delete and insert it again + "operationType": {"$in": ["insert", "delete"]} + } + } + ] + ) + + # Single thread, no race conditions and ops are queued up in order + print("Listening to changes in prometheus jobs collection") + for change in change_stream: + print("Change detected, updating prometheus config") + await main_task(client) + print() + except Exception as error: + print(error) + print( + "Detected failure while listening to prometheus jobs collection, " + "retrying..." + ) + time.sleep(5) + asyncio.run(main()) -- 2.25.1