3 # Copyright 2021 Whitestack, LLC
4 # *************************************************************
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: fbravo@whitestack.com
33 from bson
.json_util
import dumps
34 from bson
import ObjectId
37 mongodb_url
= os
.environ
["MONGODB_URL"]
38 target_database
= os
.environ
["TARGET_DATABASE"]
39 prometheus_config_file
= os
.environ
["PROMETHEUS_CONFIG_FILE"]
40 prometheus_base_config_file
= os
.environ
["PROMETHEUS_BASE_CONFIG_FILE"]
41 prometheus_url
= os
.environ
["PROMETHEUS_URL"]
45 return json
.loads(dumps(client
[target_database
].prometheus_jobs
.find({})))
48 def save_successful_jobs(client
, jobs
):
50 client
[target_database
].prometheus_jobs
.update_one(
51 {"_id": ObjectId(job
["_id"]["$oid"])}, {"$set": {"is_active": True}}
55 def clean_up_job(prometheus_job
):
56 cleaned_prometheus_job
= copy
.deepcopy(prometheus_job
)
57 # take out _id and internal keys
58 cleaned_prometheus_job
.pop("_id", None)
59 cleaned_prometheus_job
.pop("is_active", None)
60 cleaned_prometheus_job
.pop("vnfr_id", None)
61 cleaned_prometheus_job
.pop("nsr_id", None)
62 return cleaned_prometheus_job
65 def generate_prometheus_config(prometheus_jobs
, config_file_path
):
66 with
open(config_file_path
, encoding
="utf-8", mode
="r") as config_file
:
67 config_file_yaml
= yaml
.safe_load(config_file
)
68 if config_file_yaml
is None:
70 if "scrape_configs" not in config_file_yaml
:
71 config_file_yaml
["scrape_configs"] = []
73 for prometheus_job
in prometheus_jobs
:
74 cleaned_up_job
= clean_up_job(prometheus_job
)
75 config_file_yaml
["scrape_configs"].append(cleaned_up_job
)
77 return config_file_yaml
80 async def reload_prometheus_config(prom_url
):
81 async with aiohttp
.ClientSession() as session
:
82 async with session
.post(prom_url
+ "/-/reload") as resp
:
84 print(f
"Error while updating prometheus config: {resp.text()}")
86 await asyncio
.sleep(5)
90 def check_configuration_equal(a_config
, b_config
):
91 if a_config
is None and b_config
is None:
93 if a_config
is None or b_config
is None:
95 if "scrape_configs" not in a_config
and "scrape_configs" not in b_config
:
97 if "scrape_configs" not in a_config
or "scrape_configs" not in b_config
:
99 a_jobs
= [j
["job_name"] for j
in a_config
["scrape_configs"]]
100 b_jobs
= [j
["job_name"] for j
in b_config
["scrape_configs"]]
102 return a_jobs
== b_jobs
105 async def validate_configuration(prom_url
, new_config
):
106 async with aiohttp
.ClientSession() as session
:
107 # Gets the configuration from prometheus
108 # and compares with the inserted one
109 # If prometheus does not admit this configuration,
110 # the old one will remain
111 async with session
.get(prom_url
+ "/api/v1/status/config") as resp
:
112 if resp
.status
> 204:
113 print(f
"Error while updating prometheus config: {resp.text()}")
115 current_config
= await resp
.json()
116 return check_configuration_equal(
117 yaml
.safe_load(current_config
["data"]["yaml"]), new_config
121 async def main_task(client
):
122 stored_jobs
= get_jobs(client
)
123 print(f
"Jobs detected : {len(stored_jobs):d}")
124 generated_prometheus_config
= generate_prometheus_config(
125 stored_jobs
, prometheus_base_config_file
127 print(f
"Writing new config file to {prometheus_config_file}")
128 config_file
= open(prometheus_config_file
, "w")
129 config_file
.truncate(0)
130 print(yaml
.safe_dump(generated_prometheus_config
))
131 config_file
.write(yaml
.safe_dump(generated_prometheus_config
))
133 print("New config written, updating prometheus")
134 update_resp
= await reload_prometheus_config(prometheus_url
)
135 is_valid
= await validate_configuration(prometheus_url
, generated_prometheus_config
)
136 if update_resp
and is_valid
:
137 print("Prometheus config update successful")
138 save_successful_jobs(client
, stored_jobs
)
141 "Error while updating prometheus config: "
142 "current config doesn't match with updated values"
147 client
= pymongo
.MongoClient(mongodb_url
)
148 print("Created MongoClient to connect to MongoDB!")
150 # Initial loop. First refresh of prometheus config file
151 first_refresh_completed
= False
155 print("Refreshing prometheus config file for first time")
156 await main_task(client
)
157 first_refresh_completed
= True
158 except Exception as error
:
159 print(f
"Error in configuration attempt! Number of tries: {tries}/3")
163 if not first_refresh_completed
:
164 print("Not possible to refresh prometheus config file for first time")
170 # Needs mongodb in replica mode as this feature relies in OpLog
171 change_stream
= client
[target_database
].prometheus_jobs
.watch(
175 # If you want to modify a particular job,
176 # delete and insert it again
177 "operationType": {"$in": ["insert", "delete"]}
183 # Single thread, no race conditions and ops are queued up in order
184 print("Listening to changes in prometheus jobs collection")
185 for change
in change_stream
:
186 print("Change detected, updating prometheus config")
188 await main_task(client
)
190 except Exception as error
:
193 "Detected failure while listening to prometheus jobs collection, "