3 # Copyright 2021 Whitestack, LLC
4 # *************************************************************
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: fbravo@whitestack.com
33 from bson
.json_util
import dumps
34 from bson
import ObjectId
37 mongodb_url
= os
.environ
['MONGODB_URL']
38 target_database
= os
.environ
['TARGET_DATABASE']
39 prometheus_config_file
= os
.environ
['PROMETHEUS_CONFIG_FILE']
40 prometheus_url
= os
.environ
['PROMETHEUS_URL']
43 return json
.loads(dumps(client
[target_database
].prometheus_jobs
.find({})))
45 def save_successful_jobs(client
, jobs
):
47 client
[target_database
].prometheus_jobs
.update_one({
48 "_id": ObjectId(job
["_id"]["$oid"])
55 def clean_up_job(prometheus_job
):
56 cleaned_prometheus_job
= copy
.deepcopy(prometheus_job
)
57 #take out _id and internal keys
58 cleaned_prometheus_job
.pop('_id', None)
59 cleaned_prometheus_job
.pop('is_active', None)
60 cleaned_prometheus_job
.pop('vnfr_id', None)
61 cleaned_prometheus_job
.pop('nsr_id', None)
62 return cleaned_prometheus_job
64 def generate_prometheus_config(prometheus_jobs
, config_file_path
):
65 config_file
= open(config_file_path
, encoding
='utf-8', mode
='r')
66 config_file_contents
= config_file
.read()
69 config_file_yaml
= yaml
.load(config_file_contents
, yaml
.FullLoader
)
70 if config_file_yaml
is None:
73 if len(prometheus_jobs
) == 0:
74 config_file_yaml
['scrape_configs'] = []
75 return config_file_yaml
77 config_file_yaml
['scrape_configs'] = []
79 for prometheus_job
in prometheus_jobs
:
80 cleaned_up_job
= clean_up_job(prometheus_job
)
81 config_file_yaml
['scrape_configs'].append(cleaned_up_job
)
83 return config_file_yaml
85 async def reload_prometheus_config(the_prometheus_url
):
86 async with aiohttp
.ClientSession() as session
:
87 async with session
.post(the_prometheus_url
+ "/-/reload") as resp
:
89 print(f
"Error while updating prometheus config : {resp.text()}")
91 await asyncio
.sleep(5)
94 def check_configuration_equal(a_config
, b_config
):
95 if a_config
is None and b_config
is None:
97 if a_config
is None or b_config
is None:
99 if not "scrape_configs" in a_config
and not "scrape_configs" in b_config
:
101 if not "scrape_configs" in a_config
or not "scrape_configs" in b_config
:
103 a_jobs
= [j
["job_name"] for j
in a_config
["scrape_configs"]]
104 b_jobs
= [j
["job_name"] for j
in b_config
["scrape_configs"]]
106 return a_jobs
== b_jobs
108 async def validate_configuration(the_prometheus_url
, new_config
):
109 async with aiohttp
.ClientSession() as session
:
110 # If prometheus does not admit this configuration, remains with the old one
111 # Then, to check if the configuration has been accepted, get the configuration from prometheus
112 # and compares with the inserted one
113 async with session
.get(the_prometheus_url
+ "/api/v1/status/config") as resp
:
114 if resp
.status
> 204:
115 print(f
"Error while updating prometheus config : {resp.text()}")
117 current_config
= await resp
.json()
118 return check_configuration_equal(yaml
.load(current_config
["data"]["yaml"], yaml
.FullLoader
), new_config
)
120 async def main_task(client
):
121 stored_jobs
= get_jobs(client
)
122 print(f
"Jobs detected : {len(stored_jobs):d}")
123 generated_prometheus_config
= generate_prometheus_config(stored_jobs
, prometheus_config_file
)
124 print(f
"Writing new config file to {prometheus_config_file}")
125 config_file
= open(prometheus_config_file
, "w")
126 config_file
.truncate(0)
127 config_file
.write(yaml
.dump(generated_prometheus_config
))
129 print("New config written, updating prometheus")
130 update_resp
= await reload_prometheus_config(prometheus_url
)
131 is_valid
= await validate_configuration(prometheus_url
, generated_prometheus_config
)
132 if update_resp
and is_valid
:
133 print("Prometheus config update successful")
134 save_successful_jobs(client
, stored_jobs
)
136 print("Error while updating prometheus config: current config doesn't match with updated values")
139 client
= pymongo
.MongoClient(mongodb_url
)
140 print('Connected to MongoDB!')
143 print('Refreshing prometheus config file for first time')
144 await main_task(client
)
145 except Exception as error
:
146 print("Error in first configuration attempt!")
151 #Needs mongodb in replica mode as this feature relies in OpLog
152 change_stream
= client
[target_database
].prometheus_jobs
.watch([{
154 #If you want to modify a particular job, delete and insert it again
155 'operationType': { '$in': ['insert', 'delete'] }
159 #Single thread, no race conditions and ops are queued up in order
160 print("Listening to changes in prometheus jobs collection")
161 for change
in change_stream
:
162 print("Change detected, updating prometheus config")
163 await main_task(client
)
165 except Exception as error
:
167 print("Detected failure while listening to prometheus jobs collection, retrying...")