+async def main_task(client):
+ stored_jobs = get_jobs(client)
+ print(f"Jobs detected: {len(stored_jobs):d}")
+ generated_prometheus_config = generate_prometheus_config(
+ stored_jobs, prometheus_base_config_file
+ )
+ print(f"Writing new config file to {prometheus_config_file}")
+ config_file = open(prometheus_config_file, "w")
+ config_file.truncate(0)
+ print(yaml.safe_dump(generated_prometheus_config))
+ config_file.write(yaml.safe_dump(generated_prometheus_config))
+ config_file.close()
+
+ if os.path.isfile(prometheus_base_alerts_file):
+ stored_alerts = get_alerts(client)
+ print(f"Alerts read: {len(stored_alerts):d}")
+ generated_prometheus_alerts = generate_prometheus_alerts(
+ stored_alerts, prometheus_base_alerts_file
+ )
+ print(f"Writing new alerts file to {prometheus_alerts_file}")
+ config_file = open(prometheus_alerts_file, "w")
+ config_file.truncate(0)
+ print(yaml.safe_dump(generated_prometheus_alerts))
+ config_file.write(yaml.safe_dump(generated_prometheus_alerts))
+ config_file.close()
+
+ print("New config written, updating prometheus")
+ update_resp = await reload_prometheus_config(prometheus_url)
+ is_valid = await validate_configuration(prometheus_url, generated_prometheus_config)
+ if update_resp and is_valid:
+ print("Prometheus config update successful")
+ save_successful_jobs(client, stored_jobs)
+ else:
+ print(
+ "Error while updating prometheus config: "
+ "current config doesn't match with updated values"
+ )
+
+
+async def main():
+ client = pymongo.MongoClient(mongodb_url)
+ print("Created MongoClient to connect to MongoDB!")
+
+ # Initial loop. First refresh of prometheus config file
+ first_refresh_completed = False
+ tries = 1
+ while tries <= 3 and first_refresh_completed == False:
+ try:
+ print("Generating prometheus config files")
+ await main_task(client)
+ first_refresh_completed = True
+ except Exception as error:
+ print(f"Error in configuration attempt! Number of tries: {tries}/3")
+ print(error)
+ time.sleep(5)
+ tries += 1
+ if not first_refresh_completed:
+ print("Not possible to refresh prometheus config file for first time")
+ return
+
+ # Main loop
+ while True:
+ try:
+ # Needs mongodb in replica mode as this feature relies in OpLog
+ change_stream = client[target_database].watch(
+ [
+ {
+ "$match": {
+ "operationType": {"$in": ["insert", "delete"]},
+ "ns.coll": { "$in": ["prometheus_jobs", "alerts"]},
+ }
+ }
+ ]
+ )
+
+ # Single thread, no race conditions and ops are queued up in order
+ print("Listening to changes in prometheus jobs and alerts collections")
+ for change in change_stream:
+ print("Changes detected, updating prometheus config")
+ await main_task(client)
+ print()
+ except Exception as error:
+ print(error)
+ print(
+ "Detected failure while listening to prometheus jobs collection, "
+ "retrying..."
+ )
+ time.sleep(5)