blob: 6517d0bf375b27442fcc7ae7568bc8bbd683666c [file] [log] [blame]
tiernof800c5c2020-06-30 13:24:17 +00001# -*- coding: utf-8 -*-
2
3##
4# Copyright 2020 Telefonica S.A.
5#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
17##
18
19import asyncio
20from time import time
21import logging
22import aiohttp
23import yaml
24import os
25from osm_lcm.lcm_utils import LcmException
26from osm_common.dbbase import DbException
bravof922c4172020-11-24 21:21:43 -030027from osm_lcm.data_utils.database.database import Database
tiernob996d942020-07-03 14:52:28 +000028from jinja2 import Template, TemplateError, TemplateNotFound, TemplateSyntaxError
tiernof800c5c2020-06-30 13:24:17 +000029
30__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
31
32initial_prometheus_data = {
33 "_id": "prometheus",
34 "_admin": {
35 "locked_at": 0,
36 "locked_by": None,
37 "modified": 1593445184, # 2020-06-29
38 "created": 1593445184,
garciadeblas5697b8b2021-03-24 09:17:02 +010039 "version": "1.0", # to allow future version updates
tiernof800c5c2020-06-30 13:24:17 +000040 },
garciadeblas5697b8b2021-03-24 09:17:02 +010041 "scrape_configs": { # Dictionary at database. Converted to list before sending to prometheus
42 "mon_exporter": {
43 "static_configs": [{"targets": ["mon:8000"]}],
44 "job_name": "mon_exporter",
45 },
tiernob996d942020-07-03 14:52:28 +000046 },
garciadeblas5697b8b2021-03-24 09:17:02 +010047 "global": {"evaluation_interval": "15s", "scrape_interval": "15s"},
48 "rule_files": None,
49 "alerting": {"alertmanagers": [{"static_configs": [{"targets": None}]}]},
tiernof800c5c2020-06-30 13:24:17 +000050}
51
52
tiernob996d942020-07-03 14:52:28 +000053class Prometheus:
tiernof800c5c2020-06-30 13:24:17 +000054 """
55 Implements a class to update Prometheus
56 """
57
58 PROMETHEUS_LOCKED_TIME = 120
59
bravof922c4172020-11-24 21:21:43 -030060 def __init__(self, config, worker_id, loop, logger=None):
tiernof800c5c2020-06-30 13:24:17 +000061 self.worker_id = worker_id
bravof922c4172020-11-24 21:21:43 -030062 self.db = Database().instance.db
tiernof800c5c2020-06-30 13:24:17 +000063 self.loop = loop
tiernob996d942020-07-03 14:52:28 +000064 self.logger = logger or logging.getLogger("lcm.prometheus")
tiernof800c5c2020-06-30 13:24:17 +000065 self.server = config["uri"]
66 self.path = config["path"]
67 if not self.path.endswith("/"):
68 self.path += "/"
69 self.cfg_file = self.path + "prometheus.yml"
70 self.cfg_file_backup = self.path + "prometheus.yml-backup"
71
tiernob996d942020-07-03 14:52:28 +000072 @staticmethod
73 def parse_job(job_data: str, variables: dict) -> dict:
74 try:
75 template = Template(job_data)
76 job_parsed = template.render(variables or {})
77 return yaml.safe_load(job_parsed)
78 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
79 # TODO yaml exceptions
garciadeblas5697b8b2021-03-24 09:17:02 +010080 raise LcmException(
81 "Error parsing Jinja2 to prometheus job. job_data={}, variables={}. Error={}".format(
82 job_data, variables, e
83 )
84 )
tiernob996d942020-07-03 14:52:28 +000085
tiernof800c5c2020-06-30 13:24:17 +000086 async def start(self):
87 for retry in range(4):
88 try:
tiernob996d942020-07-03 14:52:28 +000089 # self.logger("Starting prometheus ")
tiernof800c5c2020-06-30 13:24:17 +000090 # read from database
garciadeblas5697b8b2021-03-24 09:17:02 +010091 prometheus_data = self.db.get_one(
92 "admin", {"_id": "prometheus"}, fail_on_empty=False
93 )
tiernof800c5c2020-06-30 13:24:17 +000094 if not prometheus_data:
95 self.logger.info("Init db.admin.prometheus content")
96 self.db.create("admin", initial_prometheus_data)
97 # send database config file to prometheus. Ignore loading errors, as prometheus may be starting
98 # but at least an initial configuration file is set
99 await self.update()
tiernob996d942020-07-03 14:52:28 +0000100 return
tiernof800c5c2020-06-30 13:24:17 +0000101 except DbException as e:
102 if retry == 3:
garciadeblas5697b8b2021-03-24 09:17:02 +0100103 raise LcmException(
104 "Max retries trying to init prometheus configuration: {}".format(
105 e
106 )
107 )
tiernof800c5c2020-06-30 13:24:17 +0000108 await asyncio.sleep(5, loop=self.loop)
109
tiernob996d942020-07-03 14:52:28 +0000110 async def update(self, add_jobs: dict = None, remove_jobs: list = None) -> bool:
111 """
112
113 :param add_jobs: dictionary with {job_id_1: job_content, job_id_2: job_content}
114 :param remove_jobs: list with jobs to remove [job_id_1, job_id_2]
115 :return: result. If false prometheus denies this configuration. Exception on error
116 """
tiernod1ac0ae2020-09-30 08:01:05 +0000117 for retry in range(20):
tiernof800c5c2020-06-30 13:24:17 +0000118 result = True
119 if retry: # first time do not wait
tiernod1ac0ae2020-09-30 08:01:05 +0000120 await asyncio.sleep(4 + retry, loop=self.loop)
tiernob996d942020-07-03 14:52:28 +0000121
tiernof800c5c2020-06-30 13:24:17 +0000122 # lock database
123 now = time()
124 if not self.db.set_one(
garciadeblas5697b8b2021-03-24 09:17:02 +0100125 "admin",
126 q_filter={
127 "_id": "prometheus",
128 "_admin.locked_at.lt": now - self.PROMETHEUS_LOCKED_TIME,
129 },
130 update_dict={
131 "_admin.locked_at": now,
132 "_admin.locked_by": self.worker_id,
133 },
134 fail_on_empty=False,
135 ):
tiernof800c5c2020-06-30 13:24:17 +0000136 continue
137 # read database
138 prometheus_data = self.db.get_one("admin", {"_id": "prometheus"})
garciadeblas5697b8b2021-03-24 09:17:02 +0100139 update_dict = {"_admin.locked_at": 0, "_admin.locked_by": None}
tiernof800c5c2020-06-30 13:24:17 +0000140
141 # Make changes from prometheus_incremental
tiernob996d942020-07-03 14:52:28 +0000142 push_dict = pull_dict = None
tiernof800c5c2020-06-30 13:24:17 +0000143 if add_jobs or remove_jobs:
tiernob996d942020-07-03 14:52:28 +0000144 log_text_list = []
tiernof800c5c2020-06-30 13:24:17 +0000145 if add_jobs:
garciadeblas5697b8b2021-03-24 09:17:02 +0100146 log_text_list.append(
147 "adding jobs: {}".format(list(add_jobs.keys()))
148 )
tiernob996d942020-07-03 14:52:28 +0000149 prometheus_data["scrape_configs"].update(add_jobs)
garciadeblas5697b8b2021-03-24 09:17:02 +0100150 push_dict = {
151 "scrape_configs." + job_id: job_data
152 for job_id, job_data in add_jobs.items()
153 }
tiernof800c5c2020-06-30 13:24:17 +0000154 elif remove_jobs:
tiernob996d942020-07-03 14:52:28 +0000155 log_text_list.append("removing jobs: {}".format(list(remove_jobs)))
156 for job_id in remove_jobs:
157 prometheus_data["scrape_configs"].pop(job_id, None)
garciadeblas5697b8b2021-03-24 09:17:02 +0100158 pull_dict = {
159 "scrape_configs." + job_id: None for job_id in remove_jobs
160 }
tierno88dc97b2020-07-15 07:04:06 +0000161 self.logger.debug("Updating. " + ". ".join(log_text_list))
tiernob996d942020-07-03 14:52:28 +0000162
163 if not await self.send_data(prometheus_data):
garciadeblas5697b8b2021-03-24 09:17:02 +0100164 self.logger.error(
165 "Cannot update add_jobs: {}. remove_jobs: {}".format(
166 add_jobs, remove_jobs
167 )
168 )
tiernob996d942020-07-03 14:52:28 +0000169 push_dict = pull_dict = None
170 result = False
tiernof800c5c2020-06-30 13:24:17 +0000171
172 # unblock database
tiernob996d942020-07-03 14:52:28 +0000173 if push_dict:
174 update_dict.update(push_dict)
175 if push_dict or pull_dict:
176 update_dict["_admin.modified_at"] = now
tiernof800c5c2020-06-30 13:24:17 +0000177 if not self.db.set_one(
garciadeblas5697b8b2021-03-24 09:17:02 +0100178 "admin",
179 {
180 "_id": "prometheus",
181 "_admin.locked_at": now,
182 "_admin.locked_by": self.worker_id,
183 },
184 update_dict=update_dict,
185 unset=pull_dict,
186 fail_on_empty=False,
187 ):
tiernof800c5c2020-06-30 13:24:17 +0000188 continue
189 return result
190 raise LcmException("Cannot update prometheus database. Reached max retries")
191
192 async def send_data(self, new_config):
193 restore_backup = False
tiernob996d942020-07-03 14:52:28 +0000194 del new_config["_id"]
195 del new_config["_admin"]
196 new_scrape_configs = []
tiernoa278b842020-07-08 15:33:55 +0000197
tiernob996d942020-07-03 14:52:28 +0000198 # generate a list with the values of scrape_configs
199 for scrape_config in new_config["scrape_configs"].values():
200 scrape_config = scrape_config.copy()
201 # remove nsr_id metadata from scrape_configs
202 scrape_config.pop("nsr_id", None)
203 new_scrape_configs.append(scrape_config)
204 new_config["scrape_configs"] = new_scrape_configs
205
tiernof800c5c2020-06-30 13:24:17 +0000206 try:
207 if os.path.exists(self.cfg_file):
208 os.rename(self.cfg_file, self.cfg_file_backup)
209 restore_backup = True
210 with open(self.cfg_file, "w+") as f:
tiernob996d942020-07-03 14:52:28 +0000211 yaml.safe_dump(new_config, f, indent=4, default_flow_style=False)
212 # self.logger.debug("new configuration: {}".format(yaml.safe_dump(new_config, indent=4,
213 # default_flow_style=False)))
tiernof800c5c2020-06-30 13:24:17 +0000214 async with aiohttp.ClientSession() as session:
tiernob996d942020-07-03 14:52:28 +0000215 async with session.post(self.server + "-/reload") as resp:
tiernof800c5c2020-06-30 13:24:17 +0000216 if resp.status > 204:
tiernob996d942020-07-03 14:52:28 +0000217 raise LcmException(await resp.text())
tiernof800c5c2020-06-30 13:24:17 +0000218 await asyncio.sleep(5, loop=self.loop)
tiernob996d942020-07-03 14:52:28 +0000219 # If prometheus does not admit this configuration, remains with the old one
220 # Then, to check if the configuration has been accepted, get the configuration from prometheus
221 # and compares with the inserted one
222 async with session.get(self.server + "api/v1/status/config") as resp:
tiernof800c5c2020-06-30 13:24:17 +0000223 if resp.status > 204:
tiernob996d942020-07-03 14:52:28 +0000224 raise LcmException(await resp.text())
225 current_config = await resp.json()
tiernof800c5c2020-06-30 13:24:17 +0000226 if not self._check_configuration_equal(current_config, new_config):
227 return False
228 else:
229 restore_backup = False
230 return True
231 except Exception as e:
garciadeblas5697b8b2021-03-24 09:17:02 +0100232 self.logger.error(
233 "Error updating configuration url={}: {}".format(self.server, e)
234 )
tiernof800c5c2020-06-30 13:24:17 +0000235 return False
236 finally:
237 if restore_backup:
tiernob996d942020-07-03 14:52:28 +0000238 try:
239 os.rename(self.cfg_file_backup, self.cfg_file)
240 except Exception as e:
241 self.logger.critical("Exception while rolling back: {}".format(e))
tiernof800c5c2020-06-30 13:24:17 +0000242
tiernob996d942020-07-03 14:52:28 +0000243 def _check_configuration_equal(self, current_config, expected_config):
244 try:
245 # self.logger.debug("Comparing current_config='{}' with expected_config='{}'".format(current_config,
246 # expected_config))
garciadeblas5697b8b2021-03-24 09:17:02 +0100247 current_config_yaml = yaml.safe_load(current_config["data"]["yaml"])
248 current_jobs = [
249 j["job_name"] for j in current_config_yaml["scrape_configs"]
250 ]
tiernob996d942020-07-03 14:52:28 +0000251 expected_jobs = [j["job_name"] for j in expected_config["scrape_configs"]]
tierno88dc97b2020-07-15 07:04:06 +0000252 if current_jobs == expected_jobs:
253 return True
254 else:
garciadeblas5697b8b2021-03-24 09:17:02 +0100255 self.logger.error(
256 "Not all jobs have been loaded. Target jobs: {} Loaded jobs: {}".format(
257 expected_jobs, current_jobs
258 )
259 )
tierno88dc97b2020-07-15 07:04:06 +0000260 return False
tiernob996d942020-07-03 14:52:28 +0000261 except Exception as e:
garciadeblas5697b8b2021-03-24 09:17:02 +0100262 self.logger.error(
263 "Invalid obtained status from server. Error: '{}'. Obtained data: '{}'".format(
264 e, current_config
265 )
266 )
tiernob996d942020-07-03 14:52:28 +0000267 # if format is not understood, cannot be compared, assume it is ok
268 return True