Fix installation of Kubernetes metrics server by updating the URL
[osm/devops.git] / docker / Prometheus / src / app.py
1 #!/usr/bin/env python
2
3 # Copyright 2021 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: fbravo@whitestack.com
23 ##
24
25 import aiohttp
26 import asyncio
27 from bson.json_util import dumps
28 from bson import ObjectId
29 import copy
30 from datetime import datetime
31 import json
32 import os
33 import pymongo
34 import time
35 import yaml
36
37 # Env variables
38 mongodb_url = os.environ["MONGODB_URL"]
39 target_database = os.environ["TARGET_DATABASE"]
40 prometheus_config_file = os.environ["PROMETHEUS_CONFIG_FILE"]
41 prometheus_base_config_file = os.environ["PROMETHEUS_BASE_CONFIG_FILE"]
42 prometheus_alerts_file = os.environ["PROMETHEUS_ALERTS_FILE"]
43 prometheus_base_alerts_file = os.environ["PROMETHEUS_BASE_ALERTS_FILE"]
44
45 prometheus_url = os.environ["PROMETHEUS_URL"]
46
47
48 def get_jobs(client):
49 return json.loads(dumps(client[target_database].prometheus_jobs.find({})))
50
51
52 def get_alerts(client):
53 return json.loads(dumps(client[target_database].alerts.find({"prometheus_config": {"$exists": True}})))
54
55
56 def save_successful_jobs(client, jobs):
57 for job in jobs:
58 client[target_database].prometheus_jobs.update_one(
59 {"_id": ObjectId(job["_id"]["$oid"])}, {"$set": {"is_active": True}}
60 )
61
62
63 def clean_up_job(prometheus_job):
64 cleaned_prometheus_job = copy.deepcopy(prometheus_job)
65 # take out _id and internal keys
66 cleaned_prometheus_job.pop("_id", None)
67 cleaned_prometheus_job.pop("is_active", None)
68 cleaned_prometheus_job.pop("vnfr_id", None)
69 cleaned_prometheus_job.pop("nsr_id", None)
70 return cleaned_prometheus_job
71
72
73 def generate_prometheus_config(prometheus_jobs, config_file_path):
74 with open(config_file_path, encoding="utf-8", mode="r") as config_file:
75 config_file_yaml = yaml.safe_load(config_file)
76 if config_file_yaml is None:
77 config_file_yaml = {}
78 if "scrape_configs" not in config_file_yaml:
79 config_file_yaml["scrape_configs"] = []
80
81 prometheus_jobs_to_be_added = []
82
83 for prometheus_job in prometheus_jobs:
84 cleaned_up_job = clean_up_job(prometheus_job)
85 job_to_be_added = True
86 for sc in config_file_yaml["scrape_configs"]:
87 if sc.get("job_name") == cleaned_up_job.get("job_name"):
88 job_to_be_added = False
89 break
90 if job_to_be_added:
91 prometheus_jobs_to_be_added.append(cleaned_up_job)
92
93 for job in prometheus_jobs_to_be_added:
94 config_file_yaml["scrape_configs"].append(job)
95
96 return config_file_yaml
97
98
99 def generate_prometheus_alerts(prometheus_alerts, config_file_path):
100 with open(config_file_path, encoding="utf-8", mode="r") as config_file:
101 config_file_yaml = yaml.safe_load(config_file)
102 if config_file_yaml is None:
103 config_file_yaml = {}
104 if "groups" not in config_file_yaml:
105 config_file_yaml["groups"] = []
106
107 timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
108 group = {
109 "name": f"_osm_alert_rules_{timestamp}_",
110 "rules": [],
111 }
112 for alert in prometheus_alerts:
113 if "prometheus_config" in alert:
114 group["rules"].append(alert["prometheus_config"])
115
116 if group["rules"]:
117 config_file_yaml["groups"].append(group)
118
119 return config_file_yaml
120
121
122 async def reload_prometheus_config(prom_url):
123 async with aiohttp.ClientSession() as session:
124 async with session.post(prom_url + "/-/reload") as resp:
125 if resp.status > 204:
126 print(f"Error while updating prometheus config: {resp.text()}")
127 return False
128 await asyncio.sleep(5)
129 return True
130
131
132 def check_configuration_equal(a_config, b_config):
133 if a_config is None and b_config is None:
134 return True
135 if a_config is None or b_config is None:
136 return False
137 if "scrape_configs" not in a_config and "scrape_configs" not in b_config:
138 return True
139 if "scrape_configs" not in a_config or "scrape_configs" not in b_config:
140 return False
141 a_jobs = [j["job_name"] for j in a_config["scrape_configs"]]
142 b_jobs = [j["job_name"] for j in b_config["scrape_configs"]]
143
144 return a_jobs == b_jobs
145
146
147 async def validate_configuration(prom_url, new_config):
148 async with aiohttp.ClientSession() as session:
149 # Gets the configuration from prometheus
150 # and compares with the inserted one
151 # If prometheus does not admit this configuration,
152 # the old one will remain
153 async with session.get(prom_url + "/api/v1/status/config") as resp:
154 if resp.status > 204:
155 print(f"Error while updating prometheus config: {resp.text()}")
156 return False
157 current_config = await resp.json()
158 return check_configuration_equal(
159 yaml.safe_load(current_config["data"]["yaml"]), new_config
160 )
161
162
163 async def main_task(client):
164 stored_jobs = get_jobs(client)
165 print(f"Jobs detected: {len(stored_jobs):d}")
166 generated_prometheus_config = generate_prometheus_config(
167 stored_jobs, prometheus_base_config_file
168 )
169 print(f"Writing new config file to {prometheus_config_file}")
170 config_file = open(prometheus_config_file, "w")
171 config_file.truncate(0)
172 print(yaml.safe_dump(generated_prometheus_config))
173 config_file.write(yaml.safe_dump(generated_prometheus_config))
174 config_file.close()
175
176 if os.path.isfile(prometheus_base_alerts_file):
177 stored_alerts = get_alerts(client)
178 print(f"Alerts read: {len(stored_alerts):d}")
179 generated_prometheus_alerts = generate_prometheus_alerts(
180 stored_alerts, prometheus_base_alerts_file
181 )
182 print(f"Writing new alerts file to {prometheus_alerts_file}")
183 config_file = open(prometheus_alerts_file, "w")
184 config_file.truncate(0)
185 print(yaml.safe_dump(generated_prometheus_alerts))
186 config_file.write(yaml.safe_dump(generated_prometheus_alerts))
187 config_file.close()
188
189 print("New config written, updating prometheus")
190 update_resp = await reload_prometheus_config(prometheus_url)
191 is_valid = await validate_configuration(prometheus_url, generated_prometheus_config)
192 if update_resp and is_valid:
193 print("Prometheus config update successful")
194 save_successful_jobs(client, stored_jobs)
195 else:
196 print(
197 "Error while updating prometheus config: "
198 "current config doesn't match with updated values"
199 )
200
201
202 async def main():
203 client = pymongo.MongoClient(mongodb_url)
204 print("Created MongoClient to connect to MongoDB!")
205
206 # Initial loop. First refresh of prometheus config file
207 first_refresh_completed = False
208 tries = 1
209 while tries <= 3 and first_refresh_completed == False:
210 try:
211 print("Generating prometheus config files")
212 await main_task(client)
213 first_refresh_completed = True
214 except Exception as error:
215 print(f"Error in configuration attempt! Number of tries: {tries}/3")
216 print(error)
217 time.sleep(5)
218 tries += 1
219 if not first_refresh_completed:
220 print("Not possible to refresh prometheus config file for first time")
221 return
222
223 # Main loop
224 while True:
225 try:
226 # Needs mongodb in replica mode as this feature relies in OpLog
227 change_stream = client[target_database].watch(
228 [
229 {
230 "$match": {
231 "operationType": {"$in": ["insert", "delete"]},
232 "ns.coll": { "$in": ["prometheus_jobs", "alerts"]},
233 }
234 }
235 ]
236 )
237
238 # Single thread, no race conditions and ops are queued up in order
239 print("Listening to changes in prometheus jobs and alerts collections")
240 for change in change_stream:
241 print("Changes detected, updating prometheus config")
242 await main_task(client)
243 print()
244 except Exception as error:
245 print(error)
246 print(
247 "Detected failure while listening to prometheus jobs collection, "
248 "retrying..."
249 )
250 time.sleep(5)
251
252
253 asyncio.run(main())