Adds support for VNF metric based alarming
[osm/MON.git] / osm_mon / evaluator / evaluator.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact: bdiaz@whitestack.com or glavado@whitestack.com
22 ##
23 import json
24 import logging
25 import multiprocessing
26 import time
27
28 import requests
29 from osm_common.dbbase import DbException
30
31 from osm_mon.collector.backends.prometheus import OSM_METRIC_PREFIX
32 from osm_mon.core.common_db import CommonDbClient
33 from osm_mon.core.database import DatabaseManager, Alarm
34 from osm_mon.core.message_bus.producer import Producer
35 from osm_mon.core.response import ResponseBuilder
36 from osm_mon.core.settings import Config
37
38 log = logging.getLogger(__name__)
39
40
41 class Evaluator:
42 def __init__(self):
43 self.common_db = CommonDbClient()
44 self.plugins = []
45 self.database_manager = DatabaseManager()
46 self.database_manager.create_tables()
47 self.queue = multiprocessing.Queue()
48
49 def _evaluate_metric(self,
50 nsr_id: str,
51 vnf_member_index: int,
52 vdur_name: str,
53 metric_name: str,
54 alarm: Alarm):
55 log.debug("_evaluate_metric")
56 # TODO: Refactor to fit backend plugin model
57 cfg = Config.instance()
58 query_section = "query={0}{{ns_id=\"{1}\",vdu_name=\"{2}\",vnf_member_index=\"{3}\"}}".format(
59 OSM_METRIC_PREFIX + metric_name, nsr_id, vdur_name, vnf_member_index)
60 request_url = cfg.OSMMON_PROMETHEUS_URL + "/api/v1/query?" + query_section
61 log.info("Querying Prometheus: %s", request_url)
62 r = requests.get(request_url)
63 if r.status_code == 200:
64 json_response = r.json()
65 if json_response['status'] == 'success':
66 result = json_response['data']['result']
67 if len(result):
68 metric_value = float(result[0]['value'][1])
69 log.info("Metric value: %s", metric_value)
70 if alarm.operation.upper() == 'GT':
71 if metric_value > alarm.threshold:
72 self.queue.put(alarm)
73 elif alarm.operation.upper() == 'LT':
74 if metric_value < alarm.threshold:
75 self.queue.put(alarm)
76 else:
77 log.warning("No metric result for alarm %s", alarm.id)
78 else:
79 log.warning("Prometheus response is not success. Got status %s", json_response['status'])
80 else:
81 log.warning("Error contacting Prometheus. Got status code %s: %s", r.status_code, r.text)
82
83 def evaluate_forever(self):
84 log.debug('evaluate_forever')
85 cfg = Config.instance()
86 while True:
87 try:
88 self.evaluate()
89 time.sleep(cfg.OSMMON_EVALUATOR_INTERVAL)
90 except Exception:
91 log.exception("Error evaluating alarms")
92
93 def evaluate(self):
94 log.debug('evaluate')
95 processes = []
96 for alarm in Alarm.select():
97 try:
98 vnfr = self.common_db.get_vnfr(alarm.nsr_id, alarm.vnf_member_index)
99 except DbException:
100 log.exception("Error getting vnfr: ")
101 continue
102 vnfd = self.common_db.get_vnfd(vnfr['vnfd-id'])
103 try:
104 vdur = next(filter(lambda vdur: vdur['name'] == alarm.vdur_name, vnfr['vdur']))
105 except StopIteration:
106 log.warning("No vdur found with name %s for alarm %s", alarm.vdur_name, alarm.id)
107 continue
108 vdu = next(filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu']))
109 vnf_monitoring_param = next(
110 filter(lambda param: param['id'] == alarm.monitoring_param, vnfd['monitoring-param']))
111 nsr_id = vnfr['nsr-id-ref']
112 vnf_member_index = vnfr['member-vnf-index-ref']
113 vdur_name = vdur['name']
114 if 'vdu-monitoring-param' in vnf_monitoring_param:
115 vdu_monitoring_param = next(filter(
116 lambda param: param['id'] == vnf_monitoring_param['vdu-monitoring-param'][
117 'vdu-monitoring-param-ref'], vdu['monitoring-param']))
118 nfvi_metric = vdu_monitoring_param['nfvi-metric']
119
120 p = multiprocessing.Process(target=self._evaluate_metric,
121 args=(nsr_id,
122 vnf_member_index,
123 vdur_name,
124 nfvi_metric,
125 alarm))
126 processes.append(p)
127 p.start()
128 if 'vdu-metric' in vnf_monitoring_param:
129 vnf_metric_name = vnf_monitoring_param['vdu-metric']['vdu-metric-name-ref']
130 p = multiprocessing.Process(target=self._evaluate_metric,
131 args=(nsr_id,
132 vnf_member_index,
133 vdur_name,
134 vnf_metric_name,
135 alarm))
136 processes.append(p)
137 p.start()
138 if 'vnf-metric' in vnf_monitoring_param:
139 log.warning("vnf-metric is not currently supported.")
140 continue
141
142 for process in processes:
143 process.join()
144 triggered_alarms = []
145 while not self.queue.empty():
146 triggered_alarms.append(self.queue.get())
147 for alarm in triggered_alarms:
148 self.notify_alarm(alarm)
149 p = multiprocessing.Process(target=self.notify_alarm,
150 args=(alarm,))
151 p.start()
152
153 def notify_alarm(self, alarm: Alarm):
154 log.debug("notify_alarm")
155 response = ResponseBuilder()
156 now = time.strftime("%d-%m-%Y") + " " + time.strftime("%X")
157 # Generate and send response
158 resp_message = response.generate_response(
159 'notify_alarm',
160 alarm_id=alarm.id,
161 vdu_name=alarm.vdur_name,
162 vnf_member_index=alarm.vnf_member_index,
163 ns_id=alarm.nsr_id,
164 metric_name=alarm.monitoring_param,
165 operation=alarm.operation,
166 threshold_value=alarm.threshold,
167 sev=alarm.severity,
168 status='alarm',
169 date=now)
170 producer = Producer()
171 producer.send(topic='alarm_response', key='notify_alarm', value=json.dumps(resp_message))
172 producer.flush()
173 log.info("Sent alarm notification: %s", resp_message)