Implements filebased config, config override through env vars, use of osm
[osm/MON.git] / osm_mon / evaluator / evaluator.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact: bdiaz@whitestack.com or glavado@whitestack.com
22 ##
23 import asyncio
24 import logging
25 import multiprocessing
26 import time
27
28 import peewee
29 import requests
30 from osm_common.dbbase import DbException
31
32 from osm_mon.collector.backends.prometheus import OSM_METRIC_PREFIX
33 from osm_mon.core.common_db import CommonDbClient
34 from osm_mon.core.config import Config
35 from osm_mon.core.database import DatabaseManager, Alarm
36 from osm_mon.core.message_bus_client import MessageBusClient
37 from osm_mon.core.response import ResponseBuilder
38
39 log = logging.getLogger(__name__)
40
41
42 class Evaluator:
43 def __init__(self, config: Config, loop=None):
44 self.conf = config
45 if not loop:
46 loop = asyncio.get_event_loop()
47 self.loop = loop
48 self.common_db = CommonDbClient(self.conf)
49 self.plugins = []
50 self.database_manager = DatabaseManager(self.conf)
51 self.database_manager.create_tables()
52 self.queue = multiprocessing.Queue()
53 self.msg_bus = MessageBusClient(config)
54
55 def _evaluate_metric(self,
56 nsr_id: str,
57 vnf_member_index: int,
58 vdur_name: str,
59 metric_name: str,
60 alarm: Alarm):
61 log.debug("_evaluate_metric")
62 # TODO: Refactor to fit backend plugin model
63 query_section = "query={0}{{ns_id=\"{1}\",vdu_name=\"{2}\",vnf_member_index=\"{3}\"}}".format(
64 OSM_METRIC_PREFIX + metric_name, nsr_id, vdur_name, vnf_member_index)
65 request_url = self.conf.get('prometheus', 'url') + "/api/v1/query?" + query_section
66 log.info("Querying Prometheus: %s", request_url)
67 r = requests.get(request_url, timeout=int(self.conf.get('global', 'request_timeout')))
68 if r.status_code == 200:
69 json_response = r.json()
70 if json_response['status'] == 'success':
71 result = json_response['data']['result']
72 if len(result):
73 metric_value = float(result[0]['value'][1])
74 log.info("Metric value: %s", metric_value)
75 if alarm.operation.upper() == 'GT':
76 if metric_value > alarm.threshold:
77 self.queue.put(alarm)
78 elif alarm.operation.upper() == 'LT':
79 if metric_value < alarm.threshold:
80 self.queue.put(alarm)
81 else:
82 log.warning("No metric result for alarm %s", alarm.id)
83 else:
84 log.warning("Prometheus response is not success. Got status %s", json_response['status'])
85 else:
86 log.warning("Error contacting Prometheus. Got status code %s: %s", r.status_code, r.text)
87
88 def evaluate_forever(self):
89 log.debug('evaluate_forever')
90 while True:
91 try:
92 self.evaluate()
93 time.sleep(int(self.conf.get('evaluator', 'interval')))
94 except peewee.PeeweeException:
95 log.exception("Database error evaluating alarms: ")
96 raise
97 except Exception:
98 log.exception("Error evaluating alarms")
99
100 def evaluate(self):
101 log.debug('evaluate')
102 processes = []
103 for alarm in Alarm.select():
104 try:
105 vnfr = self.common_db.get_vnfr(alarm.nsr_id, alarm.vnf_member_index)
106 except DbException:
107 log.exception("Error getting vnfr: ")
108 continue
109 vnfd = self.common_db.get_vnfd(vnfr['vnfd-id'])
110 try:
111 vdur = next(filter(lambda vdur: vdur['name'] == alarm.vdur_name, vnfr['vdur']))
112 except StopIteration:
113 log.warning("No vdur found with name %s for alarm %s", alarm.vdur_name, alarm.id)
114 continue
115 vdu = next(filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu']))
116 vnf_monitoring_param = next(
117 filter(lambda param: param['id'] == alarm.monitoring_param, vnfd['monitoring-param']))
118 nsr_id = vnfr['nsr-id-ref']
119 vnf_member_index = vnfr['member-vnf-index-ref']
120 vdur_name = vdur['name']
121 if 'vdu-monitoring-param' in vnf_monitoring_param:
122 vdu_monitoring_param = next(filter(
123 lambda param: param['id'] == vnf_monitoring_param['vdu-monitoring-param'][
124 'vdu-monitoring-param-ref'], vdu['monitoring-param']))
125 nfvi_metric = vdu_monitoring_param['nfvi-metric']
126
127 p = multiprocessing.Process(target=self._evaluate_metric,
128 args=(nsr_id,
129 vnf_member_index,
130 vdur_name,
131 nfvi_metric,
132 alarm))
133 processes.append(p)
134 p.start()
135 if 'vdu-metric' in vnf_monitoring_param:
136 vnf_metric_name = vnf_monitoring_param['vdu-metric']['vdu-metric-name-ref']
137 p = multiprocessing.Process(target=self._evaluate_metric,
138 args=(nsr_id,
139 vnf_member_index,
140 vdur_name,
141 vnf_metric_name,
142 alarm))
143 processes.append(p)
144 p.start()
145 if 'vnf-metric' in vnf_monitoring_param:
146 vnf_metric_name = vnf_monitoring_param['vnf-metric']['vnf-metric-name-ref']
147 p = multiprocessing.Process(target=self._evaluate_metric,
148 args=(nsr_id,
149 vnf_member_index,
150 '',
151 vnf_metric_name,
152 alarm))
153 processes.append(p)
154 p.start()
155
156 for process in processes:
157 process.join(timeout=10)
158 triggered_alarms = []
159 while not self.queue.empty():
160 triggered_alarms.append(self.queue.get())
161 for alarm in triggered_alarms:
162 p = multiprocessing.Process(target=self.notify_alarm,
163 args=(alarm,))
164 p.start()
165
166 def notify_alarm(self, alarm: Alarm):
167 log.debug("notify_alarm")
168 response = ResponseBuilder()
169 now = time.strftime("%d-%m-%Y") + " " + time.strftime("%X")
170 # Generate and send response
171 resp_message = response.generate_response(
172 'notify_alarm',
173 alarm_id=alarm.uuid,
174 vdu_name=alarm.vdur_name,
175 vnf_member_index=alarm.vnf_member_index,
176 ns_id=alarm.nsr_id,
177 metric_name=alarm.monitoring_param,
178 operation=alarm.operation,
179 threshold_value=alarm.threshold,
180 sev=alarm.severity,
181 status='alarm',
182 date=now)
183 self.loop.run_until_complete(self.msg_bus.aiowrite('alarm_response', 'notify_alarm', resp_message))
184 log.info("Sent alarm notification: %s", resp_message)