Flake8 compliance in Prometheus/src/app.py 34/12534/4
authorgarciadeblas <gerardo.garciadeblas@telefonica.com>
Wed, 7 Sep 2022 20:35:53 +0000 (22:35 +0200)
committergarciadeblas <gerardo.garciadeblas@telefonica.com>
Fri, 30 Sep 2022 10:38:20 +0000 (12:38 +0200)
Change-Id: I1f09864d979d0d300ad85d794e1a3bb9bebacf68
Signed-off-by: garciadeblas <gerardo.garciadeblas@telefonica.com>
docker/Prometheus/src/app.py

index fc1e2bb..43cca08 100755 (executable)
@@ -33,138 +33,163 @@ import time
 from bson.json_util import dumps
 from bson import ObjectId
 
-#Env variables
-mongodb_url = os.environ['MONGODB_URL']
-target_database = os.environ['TARGET_DATABASE']
-prometheus_config_file = os.environ['PROMETHEUS_CONFIG_FILE']
-prometheus_url = os.environ['PROMETHEUS_URL']
+# Env variables
+mongodb_url = os.environ["MONGODB_URL"]
+target_database = os.environ["TARGET_DATABASE"]
+prometheus_config_file = os.environ["PROMETHEUS_CONFIG_FILE"]
+prometheus_url = os.environ["PROMETHEUS_URL"]
+
 
 def get_jobs(client):
-  return json.loads(dumps(client[target_database].prometheus_jobs.find({})))
+    return json.loads(dumps(client[target_database].prometheus_jobs.find({})))
+
 
 def save_successful_jobs(client, jobs):
-  for job in jobs:
-    client[target_database].prometheus_jobs.update_one({
-      "_id": ObjectId(job["_id"]["$oid"])
-    }, {
-      "$set": {
-        "is_active": True
-      }
-    })
+    for job in jobs:
+        client[target_database].prometheus_jobs.update_one(
+            {"_id": ObjectId(job["_id"]["$oid"])},
+            {"$set": {"is_active": True}}
+        )
+
 
 def clean_up_job(prometheus_job):
-  cleaned_prometheus_job = copy.deepcopy(prometheus_job)
-  #take out _id and internal keys
-  cleaned_prometheus_job.pop('_id', None)
-  cleaned_prometheus_job.pop('is_active', None)
-  cleaned_prometheus_job.pop('vnfr_id', None)
-  cleaned_prometheus_job.pop('nsr_id', None)
-  return cleaned_prometheus_job
+    cleaned_prometheus_job = copy.deepcopy(prometheus_job)
+    # take out _id and internal keys
+    cleaned_prometheus_job.pop("_id", None)
+    cleaned_prometheus_job.pop("is_active", None)
+    cleaned_prometheus_job.pop("vnfr_id", None)
+    cleaned_prometheus_job.pop("nsr_id", None)
+    return cleaned_prometheus_job
+
 
 def generate_prometheus_config(prometheus_jobs, config_file_path):
-  config_file = open(config_file_path, encoding='utf-8', mode='r')
-  config_file_contents = config_file.read()
-  config_file.close()
+    config_file = open(config_file_path, encoding="utf-8", mode="r")
+    config_file_contents = config_file.read()
+    config_file.close()
 
-  config_file_yaml = yaml.load(config_file_contents, yaml.FullLoader)
-  if config_file_yaml is None:
-    config_file_yaml = {}
+    config_file_yaml = yaml.load(config_file_contents, yaml.FullLoader)
+    if config_file_yaml is None:
+        config_file_yaml = {}
+
+    if len(prometheus_jobs) == 0:
+        config_file_yaml["scrape_configs"] = []
+        return config_file_yaml
+
+    config_file_yaml["scrape_configs"] = []
+
+    for prometheus_job in prometheus_jobs:
+        cleaned_up_job = clean_up_job(prometheus_job)
+        config_file_yaml["scrape_configs"].append(cleaned_up_job)
 
-  if len(prometheus_jobs) == 0:
-    config_file_yaml['scrape_configs'] = []
     return config_file_yaml
 
-  config_file_yaml['scrape_configs'] = []
 
-  for prometheus_job in prometheus_jobs:
-    cleaned_up_job = clean_up_job(prometheus_job)
-    config_file_yaml['scrape_configs'].append(cleaned_up_job)
+async def reload_prometheus_config(prom_url):
+    async with aiohttp.ClientSession() as session:
+        async with session.post(prom_url + "/-/reload") as resp:
+            if resp.status > 204:
+                print(f"Error while updating prometheus config: {resp.text()}")
+                return False
+        await asyncio.sleep(5)
+        return True
 
-  return config_file_yaml
 
-async def reload_prometheus_config(the_prometheus_url):
-  async with aiohttp.ClientSession() as session:
-    async with session.post(the_prometheus_url + "/-/reload") as resp:
-      if resp.status > 204:
-        print(f"Error while updating prometheus config : {resp.text()}")
+def check_configuration_equal(a_config, b_config):
+    if a_config is None and b_config is None:
+        return True
+    if a_config is None or b_config is None:
         return False
-    await asyncio.sleep(5)
-    return True
+    if "scrape_configs" not in a_config and "scrape_configs" not in b_config:
+        return True
+    if "scrape_configs" not in a_config or "scrape_configs" not in b_config:
+        return False
+    a_jobs = [j["job_name"] for j in a_config["scrape_configs"]]
+    b_jobs = [j["job_name"] for j in b_config["scrape_configs"]]
+
+    return a_jobs == b_jobs
+
+
+async def validate_configuration(prom_url, new_config):
+    async with aiohttp.ClientSession() as session:
+        # Gets the configuration from prometheus
+        # and compares with the inserted one
+        # If prometheus does not admit this configuration,
+        # the old one will remain
+        async with session.get(prom_url + "/api/v1/status/config") as resp:
+            if resp.status > 204:
+                print(f"Error while updating prometheus config: {resp.text()}")
+                return False
+            current_config = await resp.json()
+            return check_configuration_equal(
+                yaml.safe_load(current_config["data"]["yaml"]), new_config
+            )
 
-def check_configuration_equal(a_config, b_config):
-  if a_config is None and b_config is None:
-    return True
-  if a_config is None or b_config is None:
-    return False
-  if not "scrape_configs" in a_config and not "scrape_configs" in b_config:
-    return True
-  if not "scrape_configs" in a_config or not "scrape_configs" in b_config:
-    return False
-  a_jobs = [j["job_name"] for j in a_config["scrape_configs"]]
-  b_jobs = [j["job_name"] for j in b_config["scrape_configs"]]
-
-  return a_jobs == b_jobs
-
-async def validate_configuration(the_prometheus_url, new_config):
-  async with aiohttp.ClientSession() as session:
-    # If prometheus does not admit this configuration, remains with the old one
-    # Then, to check if the configuration has been accepted, get the configuration from prometheus
-    # and compares with the inserted one
-    async with session.get(the_prometheus_url + "/api/v1/status/config") as resp:
-        if resp.status > 204:
-          print(f"Error while updating prometheus config : {resp.text()}")
-          return False
-        current_config = await resp.json()
-        return check_configuration_equal(yaml.load(current_config["data"]["yaml"], yaml.FullLoader), new_config)
 
 async def main_task(client):
-  stored_jobs = get_jobs(client)
-  print(f"Jobs detected : {len(stored_jobs):d}")
-  generated_prometheus_config = generate_prometheus_config(stored_jobs, prometheus_config_file)
-  print(f"Writing new config file to {prometheus_config_file}")
-  config_file = open(prometheus_config_file, "w")
-  config_file.truncate(0)
-  config_file.write(yaml.dump(generated_prometheus_config))
-  config_file.close()
-  print("New config written, updating prometheus")
-  update_resp = await reload_prometheus_config(prometheus_url)
-  is_valid = await validate_configuration(prometheus_url, generated_prometheus_config)
-  if update_resp and is_valid:
-    print("Prometheus config update successful")
-    save_successful_jobs(client, stored_jobs)
-  else:
-    print("Error while updating prometheus config: current config doesn't match with updated values")
+    stored_jobs = get_jobs(client)
+    print(f"Jobs detected : {len(stored_jobs):d}")
+    generated_prometheus_config = generate_prometheus_config(
+        stored_jobs, prometheus_config_file
+    )
+    print(f"Writing new config file to {prometheus_config_file}")
+    config_file = open(prometheus_config_file, "w")
+    config_file.truncate(0)
+    config_file.write(yaml.dump(generated_prometheus_config))
+    config_file.close()
+    print("New config written, updating prometheus")
+    update_resp = await reload_prometheus_config(prometheus_url)
+    is_valid = await validate_configuration(
+        prometheus_url, generated_prometheus_config
+    )
+    if update_resp and is_valid:
+        print("Prometheus config update successful")
+        save_successful_jobs(client, stored_jobs)
+    else:
+        print(
+            "Error while updating prometheus config: "
+            "current config doesn't match with updated values"
+        )
 
-async def main():
-  client = pymongo.MongoClient(mongodb_url)
-  print('Connected to MongoDB!')
 
-  try:
-    print('Refreshing prometheus config file for first time')
-    await main_task(client)
-  except Exception as error:
-    print("Error in first configuration attempt!")
-    print(error)
+async def main():
+    client = pymongo.MongoClient(mongodb_url)
+    print("Connected to MongoDB!")
 
-  while(True):
     try:
-      #Needs mongodb in replica mode as this feature relies in OpLog
-      change_stream = client[target_database].prometheus_jobs.watch([{
-        '$match': {
-          #If you want to modify a particular job, delete and insert it again
-          'operationType': { '$in': ['insert', 'delete'] }
-        }
-      }])
-
-      #Single thread, no race conditions and ops are queued up in order
-      print("Listening to changes in prometheus jobs collection")
-      for change in change_stream:
-        print("Change detected, updating prometheus config")
+        print("Refreshing prometheus config file for first time")
         await main_task(client)
-        print()
     except Exception as error:
-      print(error)
-    print("Detected failure while listening to prometheus jobs collection, retrying...")
-    time.sleep(5) 
+        print("Error in first configuration attempt!")
+        print(error)
+
+    while True:
+        try:
+            # Needs mongodb in replica mode as this feature relies in OpLog
+            change_stream = client[target_database].prometheus_jobs.watch(
+                [
+                    {
+                        "$match": {
+                            # If you want to modify a particular job,
+                            # delete and insert it again
+                            "operationType": {"$in": ["insert", "delete"]}
+                        }
+                    }
+                ]
+            )
+
+            # Single thread, no race conditions and ops are queued up in order
+            print("Listening to changes in prometheus jobs collection")
+            for change in change_stream:
+                print("Change detected, updating prometheus config")
+                await main_task(client)
+                print()
+        except Exception as error:
+            print(error)
+        print(
+            "Detected failure while listening to prometheus jobs collection, "
+            "retrying..."
+        )
+        time.sleep(5)
+
 
 asyncio.run(main())