blob: a672cb22dc0b207657c435701508a62ca6f59bc2 [file] [log] [blame]
Benjamin Diaz51f44862018-11-15 10:27:12 -03001# -*- coding: utf-8 -*-
2
3# Copyright 2018 Whitestack, LLC
4# *************************************************************
5
6# This file is part of OSM Monitoring module
7# All Rights Reserved to Whitestack, LLC
8
9# Licensed under the Apache License, Version 2.0 (the "License"); you may
10# not use this file except in compliance with the License. You may obtain
11# a copy of the License at
12
13# http://www.apache.org/licenses/LICENSE-2.0
14
15# Unless required by applicable law or agreed to in writing, software
16# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18# License for the specific language governing permissions and limitations
19# under the License.
20# For those usages not covered by the Apache License, Version 2.0 please
21# contact: bdiaz@whitestack.com or glavado@whitestack.com
22##
23import json
24import logging
25import multiprocessing
26import time
27
Benjamin Diaz058d51d2018-11-20 14:01:43 -030028import requests
Benjamin Diaz51f44862018-11-15 10:27:12 -030029from osm_common.dbbase import DbException
30
Benjamin Diaz058d51d2018-11-20 14:01:43 -030031from osm_mon.collector.backends.prometheus import OSM_METRIC_PREFIX
Benjamin Diaz51f44862018-11-15 10:27:12 -030032from osm_mon.core.common_db import CommonDbClient
33from osm_mon.core.database import DatabaseManager, Alarm
34from osm_mon.core.message_bus.producer import Producer
35from osm_mon.core.response import ResponseBuilder
36from osm_mon.core.settings import Config
37
38log = logging.getLogger(__name__)
39
40
41class Evaluator:
42 def __init__(self):
43 self.common_db = CommonDbClient()
44 self.plugins = []
45 self.database_manager = DatabaseManager()
46 self.database_manager.create_tables()
47 self.queue = multiprocessing.Queue()
48
Benjamin Diaz058d51d2018-11-20 14:01:43 -030049 def _evaluate_metric(self,
50 nsr_id: str,
51 vnf_member_index: int,
52 vdur_name: str,
53 metric_name: str,
54 alarm: Alarm):
55 log.debug("_evaluate_metric")
56 # TODO: Refactor to fit backend plugin model
57 cfg = Config.instance()
58 query_section = "query={0}{{ns_id=\"{1}\",vdu_name=\"{2}\",vnf_member_index=\"{3}\"}}".format(
59 OSM_METRIC_PREFIX + metric_name, nsr_id, vdur_name, vnf_member_index)
60 request_url = cfg.OSMMON_PROMETHEUS_URL + "/api/v1/query?" + query_section
61 log.info("Querying Prometheus: %s", request_url)
62 r = requests.get(request_url)
63 if r.status_code == 200:
64 json_response = r.json()
65 if json_response['status'] == 'success':
66 result = json_response['data']['result']
67 if len(result):
68 metric_value = float(result[0]['value'][1])
69 log.info("Metric value: %s", metric_value)
70 if alarm.operation.upper() == 'GT':
71 if metric_value > alarm.threshold:
72 self.queue.put(alarm)
73 elif alarm.operation.upper() == 'LT':
74 if metric_value < alarm.threshold:
75 self.queue.put(alarm)
76 else:
77 log.warning("No metric result for alarm %s", alarm.id)
78 else:
79 log.warning("Prometheus response is not success. Got status %s", json_response['status'])
Benjamin Diaz51f44862018-11-15 10:27:12 -030080 else:
Benjamin Diaz058d51d2018-11-20 14:01:43 -030081 log.warning("Error contacting Prometheus. Got status code %s: %s", r.status_code, r.text)
Benjamin Diaz51f44862018-11-15 10:27:12 -030082
83 def evaluate_forever(self):
Benjamin Diaz058d51d2018-11-20 14:01:43 -030084 log.debug('evaluate_forever')
Benjamin Diaz51f44862018-11-15 10:27:12 -030085 cfg = Config.instance()
86 while True:
87 try:
88 self.evaluate()
89 time.sleep(cfg.OSMMON_EVALUATOR_INTERVAL)
90 except Exception:
91 log.exception("Error evaluating alarms")
92
93 def evaluate(self):
Benjamin Diaz058d51d2018-11-20 14:01:43 -030094 log.debug('evaluate')
Benjamin Diaz51f44862018-11-15 10:27:12 -030095 processes = []
96 for alarm in Alarm.select():
97 try:
98 vnfr = self.common_db.get_vnfr(alarm.nsr_id, alarm.vnf_member_index)
99 except DbException:
100 log.exception("Error getting vnfr: ")
101 continue
102 vnfd = self.common_db.get_vnfd(vnfr['vnfd-id'])
103 try:
104 vdur = next(filter(lambda vdur: vdur['name'] == alarm.vdur_name, vnfr['vdur']))
105 except StopIteration:
106 log.warning("No vdur found with name %s for alarm %s", alarm.vdur_name, alarm.id)
107 continue
108 vdu = next(filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu']))
109 vnf_monitoring_param = next(
110 filter(lambda param: param['id'] == alarm.monitoring_param, vnfd['monitoring-param']))
111 nsr_id = vnfr['nsr-id-ref']
112 vnf_member_index = vnfr['member-vnf-index-ref']
113 vdur_name = vdur['name']
114 if 'vdu-monitoring-param' in vnf_monitoring_param:
115 vdu_monitoring_param = next(filter(
116 lambda param: param['id'] == vnf_monitoring_param['vdu-monitoring-param'][
117 'vdu-monitoring-param-ref'], vdu['monitoring-param']))
118 nfvi_metric = vdu_monitoring_param['nfvi-metric']
119
Benjamin Diaz058d51d2018-11-20 14:01:43 -0300120 p = multiprocessing.Process(target=self._evaluate_metric,
Benjamin Diaz51f44862018-11-15 10:27:12 -0300121 args=(nsr_id,
122 vnf_member_index,
123 vdur_name,
124 nfvi_metric,
Benjamin Diaz51f44862018-11-15 10:27:12 -0300125 alarm))
126 processes.append(p)
127 p.start()
128 if 'vdu-metric' in vnf_monitoring_param:
129 vnf_metric_name = vnf_monitoring_param['vdu-metric']['vdu-metric-name-ref']
Benjamin Diaz058d51d2018-11-20 14:01:43 -0300130 p = multiprocessing.Process(target=self._evaluate_metric,
Benjamin Diaz51f44862018-11-15 10:27:12 -0300131 args=(nsr_id,
132 vnf_member_index,
133 vdur_name,
134 vnf_metric_name,
135 alarm))
136 processes.append(p)
137 p.start()
138 if 'vnf-metric' in vnf_monitoring_param:
Benjamin Diaz44ebeeb2018-11-24 00:05:11 -0300139 vnf_metric_name = vnf_monitoring_param['vnf-metric']['vnf-metric-name-ref']
140 p = multiprocessing.Process(target=self._evaluate_metric,
141 args=(nsr_id,
142 vnf_member_index,
143 '',
144 vnf_metric_name,
145 alarm))
146 processes.append(p)
147 p.start()
Benjamin Diaz51f44862018-11-15 10:27:12 -0300148
149 for process in processes:
150 process.join()
151 triggered_alarms = []
152 while not self.queue.empty():
153 triggered_alarms.append(self.queue.get())
154 for alarm in triggered_alarms:
155 self.notify_alarm(alarm)
156 p = multiprocessing.Process(target=self.notify_alarm,
157 args=(alarm,))
158 p.start()
159
160 def notify_alarm(self, alarm: Alarm):
Benjamin Diaz058d51d2018-11-20 14:01:43 -0300161 log.debug("notify_alarm")
Benjamin Diaz51f44862018-11-15 10:27:12 -0300162 response = ResponseBuilder()
163 now = time.strftime("%d-%m-%Y") + " " + time.strftime("%X")
164 # Generate and send response
165 resp_message = response.generate_response(
166 'notify_alarm',
Benjamin Diazde3d5702018-11-22 17:27:35 -0300167 alarm_id=alarm.uuid,
Benjamin Diaz51f44862018-11-15 10:27:12 -0300168 vdu_name=alarm.vdur_name,
169 vnf_member_index=alarm.vnf_member_index,
170 ns_id=alarm.nsr_id,
171 metric_name=alarm.monitoring_param,
172 operation=alarm.operation,
173 threshold_value=alarm.threshold,
174 sev=alarm.severity,
175 status='alarm',
176 date=now)
177 producer = Producer()
178 producer.send(topic='alarm_response', key='notify_alarm', value=json.dumps(resp_message))
179 producer.flush()
180 log.info("Sent alarm notification: %s", resp_message)