397764f272adc82bed21aca219fe8182e84b9894
[osm/LCM.git] / osm_lcm / prometheus.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 from time import time
21 import logging
22 import aiohttp
23 import yaml
24 import os
25 from osm_lcm.lcm_utils import LcmException
26 from osm_common.dbbase import DbException
27
28
29 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
30
31 initial_prometheus_data = {
32 "_id": "prometheus",
33 "_admin": {
34 "locked_at": 0,
35 "locked_by": None,
36 "modified": 1593445184, # 2020-06-29
37 "created": 1593445184,
38 "version": "1.0" # to allow future version updates
39 },
40 'scrape_configs': [{'static_configs': [{'targets': ['mon:8000']}], 'job_name': 'mon_exporter'}],
41 'global': {'evaluation_interval': '15s', 'scrape_interval': '15s'},
42 'rule_files': None,
43 'alerting': {'alertmanagers': [{'static_configs': [{'targets': None}]}]}
44 }
45
46
47 class Prometheus():
48 """
49 Implements a class to update Prometheus
50 """
51
52 PROMETHEUS_LOCKED_TIME = 120
53
54 def __init__(self, config, worker_id, db, loop, logger=None):
55 self.worker_id = worker_id
56 self.db = db
57 self.loop = loop
58 self.logger = logger or logging.get_legger("lcm.prometheus")
59 self.server = config["uri"]
60 self.path = config["path"]
61 if not self.path.endswith("/"):
62 self.path += "/"
63 self.cfg_file = self.path + "prometheus.yml"
64 self.cfg_file_backup = self.path + "prometheus.yml-backup"
65
66 async def start(self):
67 for retry in range(4):
68 try:
69 # read from database
70 prometheus_data = self.db.get_one("admin", {"_id": "prometheus"}, fail_on_empty=True)
71 if not prometheus_data:
72 self.logger.info("Init db.admin.prometheus content")
73 self.db.create("admin", initial_prometheus_data)
74 # send database config file to prometheus. Ignore loading errors, as prometheus may be starting
75 # but at least an initial configuration file is set
76 await self.update()
77 except DbException as e:
78 if retry == 3:
79 raise LcmException("Max retries trying to init prometheus configuration: {}".format(e))
80 await asyncio.sleep(5, loop=self.loop)
81
82 async def update(self, add_jobs=None, remove_jobs=None):
83 for retry in range(4):
84 result = True
85 if retry: # first time do not wait
86 await asyncio.sleep(self.PROMETHEUS_LOCKED_TIME / 2, loop=self.loop)
87 # lock database
88 now = time()
89 if not self.db.set_one(
90 "admin",
91 q_filter={"_id": "prometheus", "_admin.locked_at.lt": now - self.PROMETHEUS_LOCKED_TIME},
92 update_dict={"_admin.locked_at": now, "_admin.locked_by": self.worker_id},
93 fail_on_empty=False):
94 continue
95 # read database
96 prometheus_data = self.db.get_one("admin", {"_id": "prometheus"})
97
98 # Make changes from prometheus_incremental
99 push_list = pull_list = None
100 if add_jobs or remove_jobs:
101 update_dict = {"_admin.locked_at": 0,
102 "_admin.locked_by": None,
103 "_admin.modified_at": now}
104 if add_jobs:
105 push_list = {"scrape_configs.static_configs": add_jobs}
106 prometheus_data["scrape_configs"]["static_configs"] += add_jobs
107 elif remove_jobs:
108 pass # TODO
109 if not self.send_data(prometheus_data):
110 push_list = pull_list = None
111 result = False
112
113 # unblock database
114 if not self.db.set_one(
115 "admin", {"_id": "prometheus", "_admin.locked_at": now, "_admin.locked_by": self.worker_id},
116 update_dict=update_dict, pull_list=pull_list, push_list=push_list, fail_on_empty=False):
117 continue
118 return result
119 raise LcmException("Cannot update prometheus database. Reached max retries")
120
121 async def send_data(self, new_config):
122 restore_backup = False
123 try:
124 if os.path.exists(self.cfg_file):
125 os.rename(self.cfg_file, self.cfg_file_backup)
126 restore_backup = True
127 with open(self.cfg_file, "w+") as f:
128 yaml.dump(new_config, f)
129 async with aiohttp.ClientSession() as session:
130 async with session.post(self.server + "/-/reload") as resp:
131 if resp.status > 204:
132 raise LcmException(resp.text)
133 await asyncio.sleep(5, loop=self.loop)
134 async with session.get(self.server + "/api/v1/status/config") as resp:
135 if resp.status > 204:
136 raise LcmException(resp.text)
137 current_config = resp.json()
138 if not self._check_configuration_equal(current_config, new_config):
139 return False
140 else:
141 restore_backup = False
142 return True
143 except Exception as e:
144 self.logger.error("Error updating prometheus configuration {}".format(e))
145 return False
146 finally:
147 if restore_backup:
148 os.rename(self.cfg_file_backup, self.cfg_file)
149
150 @staticmethod
151 def _check_configuration_equal(current_config, new_config):
152 # TODO compare and return True if equal
153 return True