5afa4963c1f3e34e13da9bd6a2161d8ff186931b
[osm/LCM.git] / osm_lcm / prometheus.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 from time import time
21 import logging
22 import aiohttp
23 import yaml
24 import os
25 from osm_lcm.lcm_utils import LcmException
26 from osm_common.dbbase import DbException
27 from osm_lcm.data_utils.database.database import Database
28 from jinja2 import Template, TemplateError, TemplateNotFound, TemplateSyntaxError
29
30 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
31
32 initial_prometheus_data = {
33 "_id": "prometheus",
34 "_admin": {
35 "locked_at": 0,
36 "locked_by": None,
37 "modified": 1593445184, # 2020-06-29
38 "created": 1593445184,
39 "version": "1.0" # to allow future version updates
40 },
41 'scrape_configs': { # Dictionary at database. Converted to list before sending to prometheus
42 'mon_exporter': {'static_configs': [{'targets': ['mon:8000']}], 'job_name': 'mon_exporter'},
43 },
44 'global': {'evaluation_interval': '15s', 'scrape_interval': '15s'},
45 'rule_files': None,
46 'alerting': {'alertmanagers': [{'static_configs': [{'targets': None}]}]}
47 }
48
49
50 class Prometheus:
51 """
52 Implements a class to update Prometheus
53 """
54
55 PROMETHEUS_LOCKED_TIME = 120
56
57 def __init__(self, config, worker_id, loop, logger=None):
58 self.worker_id = worker_id
59 self.db = Database().instance.db
60 self.loop = loop
61 self.logger = logger or logging.getLogger("lcm.prometheus")
62 self.server = config["uri"]
63 self.path = config["path"]
64 if not self.path.endswith("/"):
65 self.path += "/"
66 self.cfg_file = self.path + "prometheus.yml"
67 self.cfg_file_backup = self.path + "prometheus.yml-backup"
68
69 @staticmethod
70 def parse_job(job_data: str, variables: dict) -> dict:
71 try:
72 template = Template(job_data)
73 job_parsed = template.render(variables or {})
74 return yaml.safe_load(job_parsed)
75 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
76 # TODO yaml exceptions
77 raise LcmException("Error parsing Jinja2 to prometheus job. job_data={}, variables={}. Error={}".format(
78 job_data, variables, e))
79
80 async def start(self):
81 for retry in range(4):
82 try:
83 # self.logger("Starting prometheus ")
84 # read from database
85 prometheus_data = self.db.get_one("admin", {"_id": "prometheus"}, fail_on_empty=False)
86 if not prometheus_data:
87 self.logger.info("Init db.admin.prometheus content")
88 self.db.create("admin", initial_prometheus_data)
89 # send database config file to prometheus. Ignore loading errors, as prometheus may be starting
90 # but at least an initial configuration file is set
91 await self.update()
92 return
93 except DbException as e:
94 if retry == 3:
95 raise LcmException("Max retries trying to init prometheus configuration: {}".format(e))
96 await asyncio.sleep(5, loop=self.loop)
97
98 async def update(self, add_jobs: dict = None, remove_jobs: list = None) -> bool:
99 """
100
101 :param add_jobs: dictionary with {job_id_1: job_content, job_id_2: job_content}
102 :param remove_jobs: list with jobs to remove [job_id_1, job_id_2]
103 :return: result. If false prometheus denies this configuration. Exception on error
104 """
105 for retry in range(20):
106 result = True
107 if retry: # first time do not wait
108 await asyncio.sleep(4 + retry, loop=self.loop)
109
110 # lock database
111 now = time()
112 if not self.db.set_one(
113 "admin",
114 q_filter={"_id": "prometheus", "_admin.locked_at.lt": now - self.PROMETHEUS_LOCKED_TIME},
115 update_dict={"_admin.locked_at": now, "_admin.locked_by": self.worker_id},
116 fail_on_empty=False):
117 continue
118 # read database
119 prometheus_data = self.db.get_one("admin", {"_id": "prometheus"})
120 update_dict = {"_admin.locked_at": 0,
121 "_admin.locked_by": None}
122
123 # Make changes from prometheus_incremental
124 push_dict = pull_dict = None
125 if add_jobs or remove_jobs:
126 log_text_list = []
127 if add_jobs:
128 log_text_list.append("adding jobs: {}".format(list(add_jobs.keys())))
129 prometheus_data["scrape_configs"].update(add_jobs)
130 push_dict = {"scrape_configs." + job_id: job_data for job_id, job_data in add_jobs.items()}
131 elif remove_jobs:
132 log_text_list.append("removing jobs: {}".format(list(remove_jobs)))
133 for job_id in remove_jobs:
134 prometheus_data["scrape_configs"].pop(job_id, None)
135 pull_dict = {"scrape_configs." + job_id: None for job_id in remove_jobs}
136 self.logger.debug("Updating. " + ". ".join(log_text_list))
137
138 if not await self.send_data(prometheus_data):
139 self.logger.error("Cannot update add_jobs: {}. remove_jobs: {}".format(add_jobs, remove_jobs))
140 push_dict = pull_dict = None
141 result = False
142
143 # unblock database
144 if push_dict:
145 update_dict.update(push_dict)
146 if push_dict or pull_dict:
147 update_dict["_admin.modified_at"] = now
148 if not self.db.set_one(
149 "admin", {"_id": "prometheus", "_admin.locked_at": now, "_admin.locked_by": self.worker_id},
150 update_dict=update_dict, unset=pull_dict, fail_on_empty=False):
151 continue
152 return result
153 raise LcmException("Cannot update prometheus database. Reached max retries")
154
155 async def send_data(self, new_config):
156 restore_backup = False
157 del new_config["_id"]
158 del new_config["_admin"]
159 new_scrape_configs = []
160
161 # generate a list with the values of scrape_configs
162 for scrape_config in new_config["scrape_configs"].values():
163 scrape_config = scrape_config.copy()
164 # remove nsr_id metadata from scrape_configs
165 scrape_config.pop("nsr_id", None)
166 new_scrape_configs.append(scrape_config)
167 new_config["scrape_configs"] = new_scrape_configs
168
169 try:
170 if os.path.exists(self.cfg_file):
171 os.rename(self.cfg_file, self.cfg_file_backup)
172 restore_backup = True
173 with open(self.cfg_file, "w+") as f:
174 yaml.safe_dump(new_config, f, indent=4, default_flow_style=False)
175 # self.logger.debug("new configuration: {}".format(yaml.safe_dump(new_config, indent=4,
176 # default_flow_style=False)))
177 async with aiohttp.ClientSession() as session:
178 async with session.post(self.server + "-/reload") as resp:
179 if resp.status > 204:
180 raise LcmException(await resp.text())
181 await asyncio.sleep(5, loop=self.loop)
182 # If prometheus does not admit this configuration, remains with the old one
183 # Then, to check if the configuration has been accepted, get the configuration from prometheus
184 # and compares with the inserted one
185 async with session.get(self.server + "api/v1/status/config") as resp:
186 if resp.status > 204:
187 raise LcmException(await resp.text())
188 current_config = await resp.json()
189 if not self._check_configuration_equal(current_config, new_config):
190 return False
191 else:
192 restore_backup = False
193 return True
194 except Exception as e:
195 self.logger.error("Error updating configuration url={}: {}".format(self.server, e))
196 return False
197 finally:
198 if restore_backup:
199 try:
200 os.rename(self.cfg_file_backup, self.cfg_file)
201 except Exception as e:
202 self.logger.critical("Exception while rolling back: {}".format(e))
203
204 def _check_configuration_equal(self, current_config, expected_config):
205 try:
206 # self.logger.debug("Comparing current_config='{}' with expected_config='{}'".format(current_config,
207 # expected_config))
208 current_config_yaml = yaml.safe_load(current_config['data']['yaml'])
209 current_jobs = [j["job_name"] for j in current_config_yaml["scrape_configs"]]
210 expected_jobs = [j["job_name"] for j in expected_config["scrape_configs"]]
211 if current_jobs == expected_jobs:
212 return True
213 else:
214 self.logger.error("Not all jobs have been loaded. Target jobs: {} Loaded jobs: {}".format(
215 expected_jobs, current_jobs))
216 return False
217 except Exception as e:
218 self.logger.error("Invalid obtained status from server. Error: '{}'. Obtained data: '{}'".format(
219 e, current_config))
220 # if format is not understood, cannot be compared, assume it is ok
221 return True