3 # Copyright 2021 Whitestack, LLC
4 # *************************************************************
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: fbravo@whitestack.com
33 from bson
.json_util
import dumps
34 from bson
import ObjectId
37 mongodb_url
= os
.environ
["MONGODB_URL"]
38 target_database
= os
.environ
["TARGET_DATABASE"]
39 prometheus_config_file
= os
.environ
["PROMETHEUS_CONFIG_FILE"]
40 prometheus_base_config_file
= os
.environ
["PROMETHEUS_BASE_CONFIG_FILE"]
41 prometheus_url
= os
.environ
["PROMETHEUS_URL"]
45 return json
.loads(dumps(client
[target_database
].prometheus_jobs
.find({})))
48 def save_successful_jobs(client
, jobs
):
50 client
[target_database
].prometheus_jobs
.update_one(
51 {"_id": ObjectId(job
["_id"]["$oid"])},
52 {"$set": {"is_active": True}}
56 def clean_up_job(prometheus_job
):
57 cleaned_prometheus_job
= copy
.deepcopy(prometheus_job
)
58 # take out _id and internal keys
59 cleaned_prometheus_job
.pop("_id", None)
60 cleaned_prometheus_job
.pop("is_active", None)
61 cleaned_prometheus_job
.pop("vnfr_id", None)
62 cleaned_prometheus_job
.pop("nsr_id", None)
63 return cleaned_prometheus_job
66 def generate_prometheus_config(prometheus_jobs
, config_file_path
):
67 with
open(config_file_path
, encoding
="utf-8", mode
="r") as config_file
:
68 config_file_yaml
= yaml
.safe_load(config_file
)
69 if config_file_yaml
is None:
71 if "scrape_configs" not in config_file_yaml
:
72 config_file_yaml
["scrape_configs"] = []
74 for prometheus_job
in prometheus_jobs
:
75 cleaned_up_job
= clean_up_job(prometheus_job
)
76 config_file_yaml
["scrape_configs"].append(cleaned_up_job
)
78 return config_file_yaml
81 async def reload_prometheus_config(prom_url
):
82 async with aiohttp
.ClientSession() as session
:
83 async with session
.post(prom_url
+ "/-/reload") as resp
:
85 print(f
"Error while updating prometheus config: {resp.text()}")
87 await asyncio
.sleep(5)
91 def check_configuration_equal(a_config
, b_config
):
92 if a_config
is None and b_config
is None:
94 if a_config
is None or b_config
is None:
96 if "scrape_configs" not in a_config
and "scrape_configs" not in b_config
:
98 if "scrape_configs" not in a_config
or "scrape_configs" not in b_config
:
100 a_jobs
= [j
["job_name"] for j
in a_config
["scrape_configs"]]
101 b_jobs
= [j
["job_name"] for j
in b_config
["scrape_configs"]]
103 return a_jobs
== b_jobs
106 async def validate_configuration(prom_url
, new_config
):
107 async with aiohttp
.ClientSession() as session
:
108 # Gets the configuration from prometheus
109 # and compares with the inserted one
110 # If prometheus does not admit this configuration,
111 # the old one will remain
112 async with session
.get(prom_url
+ "/api/v1/status/config") as resp
:
113 if resp
.status
> 204:
114 print(f
"Error while updating prometheus config: {resp.text()}")
116 current_config
= await resp
.json()
117 return check_configuration_equal(
118 yaml
.safe_load(current_config
["data"]["yaml"]), new_config
122 async def main_task(client
):
123 stored_jobs
= get_jobs(client
)
124 print(f
"Jobs detected : {len(stored_jobs):d}")
125 generated_prometheus_config
= generate_prometheus_config(
126 stored_jobs
, prometheus_base_config_file
128 print(f
"Writing new config file to {prometheus_config_file}")
129 config_file
= open(prometheus_config_file
, "w")
130 config_file
.truncate(0)
131 config_file
.write(yaml
.safe_dump(generated_prometheus_config
))
133 print("New config written, updating prometheus")
134 update_resp
= await reload_prometheus_config(prometheus_url
)
135 is_valid
= await validate_configuration(
136 prometheus_url
, generated_prometheus_config
138 if update_resp
and is_valid
:
139 print("Prometheus config update successful")
140 save_successful_jobs(client
, stored_jobs
)
143 "Error while updating prometheus config: "
144 "current config doesn't match with updated values"
149 client
= pymongo
.MongoClient(mongodb_url
)
150 print("Connected to MongoDB!")
153 print("Refreshing prometheus config file for first time")
154 await main_task(client
)
155 except Exception as error
:
156 print("Error in first configuration attempt!")
161 # Needs mongodb in replica mode as this feature relies in OpLog
162 change_stream
= client
[target_database
].prometheus_jobs
.watch(
166 # If you want to modify a particular job,
167 # delete and insert it again
168 "operationType": {"$in": ["insert", "delete"]}
174 # Single thread, no race conditions and ops are queued up in order
175 print("Listening to changes in prometheus jobs collection")
176 for change
in change_stream
:
177 print("Change detected, updating prometheus config")
178 await main_task(client
)
180 except Exception as error
:
183 "Detected failure while listening to prometheus jobs collection, "