3 # Copyright 2021 Whitestack, LLC
4 # *************************************************************
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: fbravo@whitestack.com
33 from bson
.json_util
import dumps
34 from bson
import ObjectId
37 mongodb_url
= os
.environ
["MONGODB_URL"]
38 target_database
= os
.environ
["TARGET_DATABASE"]
39 prometheus_config_file
= os
.environ
["PROMETHEUS_CONFIG_FILE"]
40 prometheus_base_config_file
= os
.environ
["PROMETHEUS_BASE_CONFIG_FILE"]
41 prometheus_url
= os
.environ
["PROMETHEUS_URL"]
45 return json
.loads(dumps(client
[target_database
].prometheus_jobs
.find({})))
48 def save_successful_jobs(client
, jobs
):
50 client
[target_database
].prometheus_jobs
.update_one(
51 {"_id": ObjectId(job
["_id"]["$oid"])}, {"$set": {"is_active": True}}
55 def clean_up_job(prometheus_job
):
56 cleaned_prometheus_job
= copy
.deepcopy(prometheus_job
)
57 # take out _id and internal keys
58 cleaned_prometheus_job
.pop("_id", None)
59 cleaned_prometheus_job
.pop("is_active", None)
60 cleaned_prometheus_job
.pop("vnfr_id", None)
61 cleaned_prometheus_job
.pop("nsr_id", None)
62 return cleaned_prometheus_job
65 def generate_prometheus_config(prometheus_jobs
, config_file_path
):
66 with
open(config_file_path
, encoding
="utf-8", mode
="r") as config_file
:
67 config_file_yaml
= yaml
.safe_load(config_file
)
68 if config_file_yaml
is None:
70 if "scrape_configs" not in config_file_yaml
:
71 config_file_yaml
["scrape_configs"] = []
73 prometheus_jobs_to_be_added
= []
75 for prometheus_job
in prometheus_jobs
:
76 cleaned_up_job
= clean_up_job(prometheus_job
)
77 job_to_be_added
= True
78 for sc
in config_file_yaml
["scrape_configs"]:
79 if sc
.get("job_name") == cleaned_up_job
.get("job_name"):
80 job_to_be_added
= False
83 prometheus_jobs_to_be_added
.append(cleaned_up_job
)
85 for job
in prometheus_jobs_to_be_added
:
86 config_file_yaml
["scrape_configs"].append(job
)
88 return config_file_yaml
91 async def reload_prometheus_config(prom_url
):
92 async with aiohttp
.ClientSession() as session
:
93 async with session
.post(prom_url
+ "/-/reload") as resp
:
95 print(f
"Error while updating prometheus config: {resp.text()}")
97 await asyncio
.sleep(5)
101 def check_configuration_equal(a_config
, b_config
):
102 if a_config
is None and b_config
is None:
104 if a_config
is None or b_config
is None:
106 if "scrape_configs" not in a_config
and "scrape_configs" not in b_config
:
108 if "scrape_configs" not in a_config
or "scrape_configs" not in b_config
:
110 a_jobs
= [j
["job_name"] for j
in a_config
["scrape_configs"]]
111 b_jobs
= [j
["job_name"] for j
in b_config
["scrape_configs"]]
113 return a_jobs
== b_jobs
116 async def validate_configuration(prom_url
, new_config
):
117 async with aiohttp
.ClientSession() as session
:
118 # Gets the configuration from prometheus
119 # and compares with the inserted one
120 # If prometheus does not admit this configuration,
121 # the old one will remain
122 async with session
.get(prom_url
+ "/api/v1/status/config") as resp
:
123 if resp
.status
> 204:
124 print(f
"Error while updating prometheus config: {resp.text()}")
126 current_config
= await resp
.json()
127 return check_configuration_equal(
128 yaml
.safe_load(current_config
["data"]["yaml"]), new_config
132 async def main_task(client
):
133 stored_jobs
= get_jobs(client
)
134 print(f
"Jobs detected : {len(stored_jobs):d}")
135 generated_prometheus_config
= generate_prometheus_config(
136 stored_jobs
, prometheus_base_config_file
138 print(f
"Writing new config file to {prometheus_config_file}")
139 config_file
= open(prometheus_config_file
, "w")
140 config_file
.truncate(0)
141 print(yaml
.safe_dump(generated_prometheus_config
))
142 config_file
.write(yaml
.safe_dump(generated_prometheus_config
))
144 print("New config written, updating prometheus")
145 update_resp
= await reload_prometheus_config(prometheus_url
)
146 is_valid
= await validate_configuration(prometheus_url
, generated_prometheus_config
)
147 if update_resp
and is_valid
:
148 print("Prometheus config update successful")
149 save_successful_jobs(client
, stored_jobs
)
152 "Error while updating prometheus config: "
153 "current config doesn't match with updated values"
158 client
= pymongo
.MongoClient(mongodb_url
)
159 print("Created MongoClient to connect to MongoDB!")
161 # Initial loop. First refresh of prometheus config file
162 first_refresh_completed
= False
166 print("Refreshing prometheus config file for first time")
167 await main_task(client
)
168 first_refresh_completed
= True
169 except Exception as error
:
170 print(f
"Error in configuration attempt! Number of tries: {tries}/3")
174 if not first_refresh_completed
:
175 print("Not possible to refresh prometheus config file for first time")
181 # Needs mongodb in replica mode as this feature relies in OpLog
182 change_stream
= client
[target_database
].prometheus_jobs
.watch(
186 # If you want to modify a particular job,
187 # delete and insert it again
188 "operationType": {"$in": ["insert", "delete"]}
194 # Single thread, no race conditions and ops are queued up in order
195 print("Listening to changes in prometheus jobs collection")
196 for change
in change_stream
:
197 print("Change detected, updating prometheus config")
199 await main_task(client
)
201 except Exception as error
:
204 "Detected failure while listening to prometheus jobs collection, "