1 # -*- coding: utf-8 -*-
4 # Copyright 2020 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
25 from osm_lcm
.lcm_utils
import LcmException
26 from osm_common
.dbbase
import DbException
27 from osm_lcm
.data_utils
.database
.database
import Database
28 from jinja2
import Template
, TemplateError
, TemplateNotFound
, TemplateSyntaxError
30 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
32 initial_prometheus_data
= {
37 "modified": 1593445184, # 2020-06-29
38 "created": 1593445184,
39 "version": "1.0" # to allow future version updates
41 'scrape_configs': { # Dictionary at database. Converted to list before sending to prometheus
42 'mon_exporter': {'static_configs': [{'targets': ['mon:8000']}], 'job_name': 'mon_exporter'},
44 'global': {'evaluation_interval': '15s', 'scrape_interval': '15s'},
46 'alerting': {'alertmanagers': [{'static_configs': [{'targets': None}]}]}
52 Implements a class to update Prometheus
55 PROMETHEUS_LOCKED_TIME
= 120
57 def __init__(self
, config
, worker_id
, loop
, logger
=None):
58 self
.worker_id
= worker_id
59 self
.db
= Database().instance
.db
61 self
.logger
= logger
or logging
.getLogger("lcm.prometheus")
62 self
.server
= config
["uri"]
63 self
.path
= config
["path"]
64 if not self
.path
.endswith("/"):
66 self
.cfg_file
= self
.path
+ "prometheus.yml"
67 self
.cfg_file_backup
= self
.path
+ "prometheus.yml-backup"
70 def parse_job(job_data
: str, variables
: dict) -> dict:
72 template
= Template(job_data
)
73 job_parsed
= template
.render(variables
or {})
74 return yaml
.safe_load(job_parsed
)
75 except (TemplateError
, TemplateNotFound
, TemplateSyntaxError
) as e
:
76 # TODO yaml exceptions
77 raise LcmException("Error parsing Jinja2 to prometheus job. job_data={}, variables={}. Error={}".format(
78 job_data
, variables
, e
))
80 async def start(self
):
81 for retry
in range(4):
83 # self.logger("Starting prometheus ")
85 prometheus_data
= self
.db
.get_one("admin", {"_id": "prometheus"}, fail_on_empty
=False)
86 if not prometheus_data
:
87 self
.logger
.info("Init db.admin.prometheus content")
88 self
.db
.create("admin", initial_prometheus_data
)
89 # send database config file to prometheus. Ignore loading errors, as prometheus may be starting
90 # but at least an initial configuration file is set
93 except DbException
as e
:
95 raise LcmException("Max retries trying to init prometheus configuration: {}".format(e
))
96 await asyncio
.sleep(5, loop
=self
.loop
)
98 async def update(self
, add_jobs
: dict = None, remove_jobs
: list = None) -> bool:
101 :param add_jobs: dictionary with {job_id_1: job_content, job_id_2: job_content}
102 :param remove_jobs: list with jobs to remove [job_id_1, job_id_2]
103 :return: result. If false prometheus denies this configuration. Exception on error
105 for retry
in range(20):
107 if retry
: # first time do not wait
108 await asyncio
.sleep(4 + retry
, loop
=self
.loop
)
112 if not self
.db
.set_one(
114 q_filter
={"_id": "prometheus", "_admin.locked_at.lt": now
- self
.PROMETHEUS_LOCKED_TIME
},
115 update_dict
={"_admin.locked_at": now
, "_admin.locked_by": self
.worker_id
},
116 fail_on_empty
=False):
119 prometheus_data
= self
.db
.get_one("admin", {"_id": "prometheus"})
120 update_dict
= {"_admin.locked_at": 0,
121 "_admin.locked_by": None}
123 # Make changes from prometheus_incremental
124 push_dict
= pull_dict
= None
125 if add_jobs
or remove_jobs
:
128 log_text_list
.append("adding jobs: {}".format(list(add_jobs
.keys())))
129 prometheus_data
["scrape_configs"].update(add_jobs
)
130 push_dict
= {"scrape_configs." + job_id
: job_data
for job_id
, job_data
in add_jobs
.items()}
132 log_text_list
.append("removing jobs: {}".format(list(remove_jobs
)))
133 for job_id
in remove_jobs
:
134 prometheus_data
["scrape_configs"].pop(job_id
, None)
135 pull_dict
= {"scrape_configs." + job_id
: None for job_id
in remove_jobs
}
136 self
.logger
.debug("Updating. " + ". ".join(log_text_list
))
138 if not await self
.send_data(prometheus_data
):
139 self
.logger
.error("Cannot update add_jobs: {}. remove_jobs: {}".format(add_jobs
, remove_jobs
))
140 push_dict
= pull_dict
= None
145 update_dict
.update(push_dict
)
146 if push_dict
or pull_dict
:
147 update_dict
["_admin.modified_at"] = now
148 if not self
.db
.set_one(
149 "admin", {"_id": "prometheus", "_admin.locked_at": now
, "_admin.locked_by": self
.worker_id
},
150 update_dict
=update_dict
, unset
=pull_dict
, fail_on_empty
=False):
153 raise LcmException("Cannot update prometheus database. Reached max retries")
155 async def send_data(self
, new_config
):
156 restore_backup
= False
157 del new_config
["_id"]
158 del new_config
["_admin"]
159 new_scrape_configs
= []
161 # generate a list with the values of scrape_configs
162 for scrape_config
in new_config
["scrape_configs"].values():
163 scrape_config
= scrape_config
.copy()
164 # remove nsr_id metadata from scrape_configs
165 scrape_config
.pop("nsr_id", None)
166 new_scrape_configs
.append(scrape_config
)
167 new_config
["scrape_configs"] = new_scrape_configs
170 if os
.path
.exists(self
.cfg_file
):
171 os
.rename(self
.cfg_file
, self
.cfg_file_backup
)
172 restore_backup
= True
173 with
open(self
.cfg_file
, "w+") as f
:
174 yaml
.safe_dump(new_config
, f
, indent
=4, default_flow_style
=False)
175 # self.logger.debug("new configuration: {}".format(yaml.safe_dump(new_config, indent=4,
176 # default_flow_style=False)))
177 async with aiohttp
.ClientSession() as session
:
178 async with session
.post(self
.server
+ "-/reload") as resp
:
179 if resp
.status
> 204:
180 raise LcmException(await resp
.text())
181 await asyncio
.sleep(5, loop
=self
.loop
)
182 # If prometheus does not admit this configuration, remains with the old one
183 # Then, to check if the configuration has been accepted, get the configuration from prometheus
184 # and compares with the inserted one
185 async with session
.get(self
.server
+ "api/v1/status/config") as resp
:
186 if resp
.status
> 204:
187 raise LcmException(await resp
.text())
188 current_config
= await resp
.json()
189 if not self
._check
_configuration
_equal
(current_config
, new_config
):
192 restore_backup
= False
194 except Exception as e
:
195 self
.logger
.error("Error updating configuration url={}: {}".format(self
.server
, e
))
200 os
.rename(self
.cfg_file_backup
, self
.cfg_file
)
201 except Exception as e
:
202 self
.logger
.critical("Exception while rolling back: {}".format(e
))
204 def _check_configuration_equal(self
, current_config
, expected_config
):
206 # self.logger.debug("Comparing current_config='{}' with expected_config='{}'".format(current_config,
208 current_config_yaml
= yaml
.safe_load(current_config
['data']['yaml'])
209 current_jobs
= [j
["job_name"] for j
in current_config_yaml
["scrape_configs"]]
210 expected_jobs
= [j
["job_name"] for j
in expected_config
["scrape_configs"]]
211 if current_jobs
== expected_jobs
:
214 self
.logger
.error("Not all jobs have been loaded. Target jobs: {} Loaded jobs: {}".format(
215 expected_jobs
, current_jobs
))
217 except Exception as e
:
218 self
.logger
.error("Invalid obtained status from server. Error: '{}'. Obtained data: '{}'".format(
220 # if format is not understood, cannot be compared, assume it is ok