3 # Copyright 2021 Whitestack, LLC
4 # *************************************************************
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: fbravo@whitestack.com
33 from bson
.json_util
import dumps
34 from bson
import ObjectId
37 mongodb_url
= os
.environ
["MONGODB_URL"]
38 target_database
= os
.environ
["TARGET_DATABASE"]
39 prometheus_config_file
= os
.environ
["PROMETHEUS_CONFIG_FILE"]
40 prometheus_url
= os
.environ
["PROMETHEUS_URL"]
44 return json
.loads(dumps(client
[target_database
].prometheus_jobs
.find({})))
47 def save_successful_jobs(client
, jobs
):
49 client
[target_database
].prometheus_jobs
.update_one(
50 {"_id": ObjectId(job
["_id"]["$oid"])},
51 {"$set": {"is_active": True}}
55 def clean_up_job(prometheus_job
):
56 cleaned_prometheus_job
= copy
.deepcopy(prometheus_job
)
57 # take out _id and internal keys
58 cleaned_prometheus_job
.pop("_id", None)
59 cleaned_prometheus_job
.pop("is_active", None)
60 cleaned_prometheus_job
.pop("vnfr_id", None)
61 cleaned_prometheus_job
.pop("nsr_id", None)
62 return cleaned_prometheus_job
65 def generate_prometheus_config(prometheus_jobs
, config_file_path
):
66 config_file
= open(config_file_path
, encoding
="utf-8", mode
="r")
67 config_file_contents
= config_file
.read()
70 config_file_yaml
= yaml
.load(config_file_contents
, yaml
.FullLoader
)
71 if config_file_yaml
is None:
74 if len(prometheus_jobs
) == 0:
75 config_file_yaml
["scrape_configs"] = []
76 return config_file_yaml
78 config_file_yaml
["scrape_configs"] = []
80 for prometheus_job
in prometheus_jobs
:
81 cleaned_up_job
= clean_up_job(prometheus_job
)
82 config_file_yaml
["scrape_configs"].append(cleaned_up_job
)
84 return config_file_yaml
87 async def reload_prometheus_config(prom_url
):
88 async with aiohttp
.ClientSession() as session
:
89 async with session
.post(prom_url
+ "/-/reload") as resp
:
91 print(f
"Error while updating prometheus config: {resp.text()}")
93 await asyncio
.sleep(5)
97 def check_configuration_equal(a_config
, b_config
):
98 if a_config
is None and b_config
is None:
100 if a_config
is None or b_config
is None:
102 if "scrape_configs" not in a_config
and "scrape_configs" not in b_config
:
104 if "scrape_configs" not in a_config
or "scrape_configs" not in b_config
:
106 a_jobs
= [j
["job_name"] for j
in a_config
["scrape_configs"]]
107 b_jobs
= [j
["job_name"] for j
in b_config
["scrape_configs"]]
109 return a_jobs
== b_jobs
112 async def validate_configuration(prom_url
, new_config
):
113 async with aiohttp
.ClientSession() as session
:
114 # Gets the configuration from prometheus
115 # and compares with the inserted one
116 # If prometheus does not admit this configuration,
117 # the old one will remain
118 async with session
.get(prom_url
+ "/api/v1/status/config") as resp
:
119 if resp
.status
> 204:
120 print(f
"Error while updating prometheus config: {resp.text()}")
122 current_config
= await resp
.json()
123 return check_configuration_equal(
124 yaml
.safe_load(current_config
["data"]["yaml"]), new_config
128 async def main_task(client
):
129 stored_jobs
= get_jobs(client
)
130 print(f
"Jobs detected : {len(stored_jobs):d}")
131 generated_prometheus_config
= generate_prometheus_config(
132 stored_jobs
, prometheus_config_file
134 print(f
"Writing new config file to {prometheus_config_file}")
135 config_file
= open(prometheus_config_file
, "w")
136 config_file
.truncate(0)
137 config_file
.write(yaml
.dump(generated_prometheus_config
))
139 print("New config written, updating prometheus")
140 update_resp
= await reload_prometheus_config(prometheus_url
)
141 is_valid
= await validate_configuration(
142 prometheus_url
, generated_prometheus_config
144 if update_resp
and is_valid
:
145 print("Prometheus config update successful")
146 save_successful_jobs(client
, stored_jobs
)
149 "Error while updating prometheus config: "
150 "current config doesn't match with updated values"
155 client
= pymongo
.MongoClient(mongodb_url
)
156 print("Connected to MongoDB!")
159 print("Refreshing prometheus config file for first time")
160 await main_task(client
)
161 except Exception as error
:
162 print("Error in first configuration attempt!")
167 # Needs mongodb in replica mode as this feature relies in OpLog
168 change_stream
= client
[target_database
].prometheus_jobs
.watch(
172 # If you want to modify a particular job,
173 # delete and insert it again
174 "operationType": {"$in": ["insert", "delete"]}
180 # Single thread, no race conditions and ops are queued up in order
181 print("Listening to changes in prometheus jobs collection")
182 for change
in change_stream
:
183 print("Change detected, updating prometheus config")
184 await main_task(client
)
186 except Exception as error
:
189 "Detected failure while listening to prometheus jobs collection, "