3 # Copyright 2021 Whitestack, LLC
4 # *************************************************************
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: fbravo@whitestack.com
32 from bson
.json_util
import dumps
33 from bson
import ObjectId
36 mongodb_url
= os
.environ
['MONGODB_URL']
37 target_database
= os
.environ
['TARGET_DATABASE']
38 prometheus_config_file
= os
.environ
['PROMETHEUS_CONFIG_FILE']
39 prometheus_url
= os
.environ
['PROMETHEUS_URL']
42 return json
.loads(dumps(client
[target_database
].prometheus_jobs
.find({})))
44 def save_successful_jobs(client
, jobs
):
46 client
[target_database
].prometheus_jobs
.update_one({
47 "_id": ObjectId(job
["_id"]["$oid"])
54 def clean_up_job(prometheus_job
):
55 cleaned_prometheus_job
= copy
.deepcopy(prometheus_job
)
56 #take out _id and internal keys
57 cleaned_prometheus_job
.pop('_id', None)
58 cleaned_prometheus_job
.pop('is_active', None)
59 cleaned_prometheus_job
.pop('vnfr_id', None)
60 cleaned_prometheus_job
.pop('nsr_id', None)
61 return cleaned_prometheus_job
63 def generate_prometheus_config(prometheus_jobs
, config_file_path
):
64 config_file
= open(config_file_path
, encoding
='utf-8', mode
='r')
65 config_file_contents
= config_file
.read()
68 config_file_yaml
= yaml
.load(config_file_contents
, yaml
.FullLoader
)
69 if config_file_yaml
is None:
72 if len(prometheus_jobs
) == 0:
73 config_file_yaml
['scrape_configs'] = []
74 return config_file_yaml
76 config_file_yaml
['scrape_configs'] = []
78 for prometheus_job
in prometheus_jobs
:
79 cleaned_up_job
= clean_up_job(prometheus_job
)
80 config_file_yaml
['scrape_configs'].append(cleaned_up_job
)
82 return config_file_yaml
84 async def reload_prometheus_config(the_prometheus_url
):
85 async with aiohttp
.ClientSession() as session
:
86 async with session
.post(the_prometheus_url
+ "/-/reload") as resp
:
88 print(f
"Error while updating prometheus config : {resp.text()}")
90 await asyncio
.sleep(5)
93 def check_configuration_equal(a_config
, b_config
):
94 if a_config
is None and b_config
is None:
96 if a_config
is None or b_config
is None:
98 if not "scrape_configs" in a_config
and not "scrape_configs" in b_config
:
100 if not "scrape_configs" in a_config
or not "scrape_configs" in b_config
:
102 a_jobs
= [j
["job_name"] for j
in a_config
["scrape_configs"]]
103 b_jobs
= [j
["job_name"] for j
in b_config
["scrape_configs"]]
105 return a_jobs
== b_jobs
107 async def validate_configuration(the_prometheus_url
, new_config
):
108 async with aiohttp
.ClientSession() as session
:
109 # If prometheus does not admit this configuration, remains with the old one
110 # Then, to check if the configuration has been accepted, get the configuration from prometheus
111 # and compares with the inserted one
112 async with session
.get(the_prometheus_url
+ "/api/v1/status/config") as resp
:
113 if resp
.status
> 204:
114 print(f
"Error while updating prometheus config : {resp.text()}")
116 current_config
= await resp
.json()
117 return check_configuration_equal(yaml
.load(current_config
["data"]["yaml"], yaml
.FullLoader
), new_config
)
119 async def main_task(client
):
120 stored_jobs
= get_jobs(client
)
121 print(f
"Jobs detected : {len(stored_jobs):d}")
122 generated_prometheus_config
= generate_prometheus_config(stored_jobs
, prometheus_config_file
)
123 print(f
"Writing new config file to {prometheus_config_file}")
124 config_file
= open(prometheus_config_file
, "w")
125 config_file
.truncate(0)
126 config_file
.write(yaml
.dump(generated_prometheus_config
))
128 print("New config written, updating prometheus")
129 update_resp
= await reload_prometheus_config(prometheus_url
)
130 is_valid
= await validate_configuration(prometheus_url
, generated_prometheus_config
)
131 if update_resp
and is_valid
:
132 print("Prometheus config update successful")
133 save_successful_jobs(client
, stored_jobs
)
135 print("Error while updating prometheus config: current config doesn't match with updated values")
138 client
= pymongo
.MongoClient(mongodb_url
)
139 print('Connected to MongoDB!')
141 print('Refreshing prometheus config file for first time')
142 await main_task(client
)
144 #Needs mongodb in replica mode as this feature relies in OpLog
145 change_stream
= client
[target_database
].prometheus_jobs
.watch([{
147 #If you want to modify a particular job, delete and insert it again
148 'operationType': { '$in': ['insert', 'delete'] }
152 #Single thread, no race conditions and ops are queued up in order
153 for change
in change_stream
:
154 print("Change detected, updating prometheus config")
155 await main_task(client
)