3 # Copyright 2021 Whitestack, LLC
4 # *************************************************************
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: fbravo@whitestack.com
27 from bson
.json_util
import dumps
28 from bson
import ObjectId
30 from datetime
import datetime
38 mongodb_url
= os
.environ
["MONGODB_URL"]
39 target_database
= os
.environ
["TARGET_DATABASE"]
40 prometheus_config_file
= os
.environ
["PROMETHEUS_CONFIG_FILE"]
41 prometheus_base_config_file
= os
.environ
["PROMETHEUS_BASE_CONFIG_FILE"]
42 prometheus_alerts_file
= os
.environ
["PROMETHEUS_ALERTS_FILE"]
43 prometheus_base_alerts_file
= os
.environ
["PROMETHEUS_BASE_ALERTS_FILE"]
45 prometheus_url
= os
.environ
["PROMETHEUS_URL"]
49 return json
.loads(dumps(client
[target_database
].prometheus_jobs
.find({})))
52 def get_alerts(client
):
53 return json
.loads(dumps(client
[target_database
].alerts
.find({"prometheus_config": {"$exists": True}})))
56 def save_successful_jobs(client
, jobs
):
58 client
[target_database
].prometheus_jobs
.update_one(
59 {"_id": ObjectId(job
["_id"]["$oid"])}, {"$set": {"is_active": True}}
63 def clean_up_job(prometheus_job
):
64 cleaned_prometheus_job
= copy
.deepcopy(prometheus_job
)
65 # take out _id and internal keys
66 cleaned_prometheus_job
.pop("_id", None)
67 cleaned_prometheus_job
.pop("is_active", None)
68 cleaned_prometheus_job
.pop("vnfr_id", None)
69 cleaned_prometheus_job
.pop("nsr_id", None)
70 return cleaned_prometheus_job
73 def generate_prometheus_config(prometheus_jobs
, config_file_path
):
74 with
open(config_file_path
, encoding
="utf-8", mode
="r") as config_file
:
75 config_file_yaml
= yaml
.safe_load(config_file
)
76 if config_file_yaml
is None:
78 if "scrape_configs" not in config_file_yaml
:
79 config_file_yaml
["scrape_configs"] = []
81 prometheus_jobs_to_be_added
= []
83 for prometheus_job
in prometheus_jobs
:
84 cleaned_up_job
= clean_up_job(prometheus_job
)
85 job_to_be_added
= True
86 for sc
in config_file_yaml
["scrape_configs"]:
87 if sc
.get("job_name") == cleaned_up_job
.get("job_name"):
88 job_to_be_added
= False
91 prometheus_jobs_to_be_added
.append(cleaned_up_job
)
93 for job
in prometheus_jobs_to_be_added
:
94 config_file_yaml
["scrape_configs"].append(job
)
96 return config_file_yaml
99 def generate_prometheus_alerts(prometheus_alerts
, config_file_path
):
100 with
open(config_file_path
, encoding
="utf-8", mode
="r") as config_file
:
101 config_file_yaml
= yaml
.safe_load(config_file
)
102 if config_file_yaml
is None:
103 config_file_yaml
= {}
104 if "groups" not in config_file_yaml
:
105 config_file_yaml
["groups"] = []
107 timestamp
= datetime
.now().strftime("%Y%m%d%H%M%S")
109 "name": f
"_osm_alert_rules_{timestamp}_",
112 for alert
in prometheus_alerts
:
113 if "prometheus_config" in alert
:
114 group
["rules"].append(alert
["prometheus_config"])
117 config_file_yaml
["groups"].append(group
)
119 return config_file_yaml
122 async def reload_prometheus_config(prom_url
):
123 async with aiohttp
.ClientSession() as session
:
124 async with session
.post(prom_url
+ "/-/reload") as resp
:
125 if resp
.status
> 204:
126 print(f
"Error while updating prometheus config: {resp.text()}")
128 await asyncio
.sleep(5)
132 def check_configuration_equal(a_config
, b_config
):
133 if a_config
is None and b_config
is None:
135 if a_config
is None or b_config
is None:
137 if "scrape_configs" not in a_config
and "scrape_configs" not in b_config
:
139 if "scrape_configs" not in a_config
or "scrape_configs" not in b_config
:
141 a_jobs
= [j
["job_name"] for j
in a_config
["scrape_configs"]]
142 b_jobs
= [j
["job_name"] for j
in b_config
["scrape_configs"]]
144 return a_jobs
== b_jobs
147 async def validate_configuration(prom_url
, new_config
):
148 async with aiohttp
.ClientSession() as session
:
149 # Gets the configuration from prometheus
150 # and compares with the inserted one
151 # If prometheus does not admit this configuration,
152 # the old one will remain
153 async with session
.get(prom_url
+ "/api/v1/status/config") as resp
:
154 if resp
.status
> 204:
155 print(f
"Error while updating prometheus config: {resp.text()}")
157 current_config
= await resp
.json()
158 return check_configuration_equal(
159 yaml
.safe_load(current_config
["data"]["yaml"]), new_config
163 async def main_task(client
):
164 stored_jobs
= get_jobs(client
)
165 print(f
"Jobs detected: {len(stored_jobs):d}")
166 generated_prometheus_config
= generate_prometheus_config(
167 stored_jobs
, prometheus_base_config_file
169 print(f
"Writing new config file to {prometheus_config_file}")
170 config_file
= open(prometheus_config_file
, "w")
171 config_file
.truncate(0)
172 print(yaml
.safe_dump(generated_prometheus_config
))
173 config_file
.write(yaml
.safe_dump(generated_prometheus_config
))
176 if os
.path
.isfile(prometheus_base_alerts_file
):
177 stored_alerts
= get_alerts(client
)
178 print(f
"Alerts read: {len(stored_alerts):d}")
179 generated_prometheus_alerts
= generate_prometheus_alerts(
180 stored_alerts
, prometheus_base_alerts_file
182 print(f
"Writing new alerts file to {prometheus_alerts_file}")
183 config_file
= open(prometheus_alerts_file
, "w")
184 config_file
.truncate(0)
185 print(yaml
.safe_dump(generated_prometheus_alerts
))
186 config_file
.write(yaml
.safe_dump(generated_prometheus_alerts
))
189 print("New config written, updating prometheus")
190 update_resp
= await reload_prometheus_config(prometheus_url
)
191 is_valid
= await validate_configuration(prometheus_url
, generated_prometheus_config
)
192 if update_resp
and is_valid
:
193 print("Prometheus config update successful")
194 save_successful_jobs(client
, stored_jobs
)
197 "Error while updating prometheus config: "
198 "current config doesn't match with updated values"
203 client
= pymongo
.MongoClient(mongodb_url
)
204 print("Created MongoClient to connect to MongoDB!")
206 # Initial loop. First refresh of prometheus config file
207 first_refresh_completed
= False
209 while tries
<= 3 and first_refresh_completed
== False:
211 print("Generating prometheus config files")
212 await main_task(client
)
213 first_refresh_completed
= True
214 except Exception as error
:
215 print(f
"Error in configuration attempt! Number of tries: {tries}/3")
219 if not first_refresh_completed
:
220 print("Not possible to refresh prometheus config file for first time")
226 # Needs mongodb in replica mode as this feature relies in OpLog
227 change_stream
= client
[target_database
].watch(
231 "operationType": {"$in": ["insert", "delete"]},
232 "ns.coll": { "$in": ["prometheus_jobs", "alerts"]},
238 # Single thread, no race conditions and ops are queued up in order
239 print("Listening to changes in prometheus jobs and alerts collections")
240 for change
in change_stream
:
241 print("Changes detected, updating prometheus config")
242 await main_task(client
)
244 except Exception as error
:
247 "Detected failure while listening to prometheus jobs collection, "