b040198c94f1e0623246015baa73408bf701002a
[osm/MON.git] / osm_mon / evaluator / evaluator.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact: bdiaz@whitestack.com or glavado@whitestack.com
22 ##
23 import json
24 import logging
25 import multiprocessing
26 import time
27
28 from osm_common.dbbase import DbException
29
30 from osm_mon.collector.collector import VIM_COLLECTORS
31 from osm_mon.collector.collectors.juju import VCACollector
32 from osm_mon.core.common_db import CommonDbClient
33 from osm_mon.core.database import DatabaseManager, Alarm
34 from osm_mon.core.message_bus.producer import Producer
35 from osm_mon.core.response import ResponseBuilder
36 from osm_mon.core.settings import Config
37
38 log = logging.getLogger(__name__)
39
40
41 class Evaluator:
42 def __init__(self):
43 self.common_db = CommonDbClient()
44 self.plugins = []
45 self.database_manager = DatabaseManager()
46 self.database_manager.create_tables()
47 self.queue = multiprocessing.Queue()
48
49 def _evaluate_vim_metric(self,
50 nsr_id: str,
51 vnf_member_index: int,
52 vdur_name: str,
53 nfvi_metric_name: str,
54 vim_account_id: str,
55 alarm: Alarm):
56 vim_type = self.database_manager.get_vim_type(vim_account_id)
57 if vim_type in VIM_COLLECTORS:
58 collector = VIM_COLLECTORS[vim_type](vim_account_id)
59 metric = collector.collect_one(nsr_id, vnf_member_index, vdur_name, nfvi_metric_name)
60 if alarm.operation.upper() == 'GT':
61 if metric.value > alarm.threshold:
62 self.queue.put(alarm)
63 elif alarm.operation.upper() == 'LT':
64 if metric.value < alarm.threshold:
65 self.queue.put(alarm)
66
67 else:
68 log.debug("vimtype %s is not supported.", vim_type)
69
70 def _evaluate_vca_metric(self,
71 nsr_id: str,
72 vnf_member_index: int,
73 vdur_name: str,
74 vnf_metric_name: str,
75 alarm: Alarm):
76 collector = VCACollector()
77 metric = collector.collect_one(nsr_id, vnf_member_index, vdur_name, vnf_metric_name)
78 if alarm.operation.upper() == 'GT':
79 if metric.value > alarm.threshold:
80 self.queue.put(alarm)
81 elif alarm.operation.upper() == 'LT':
82 if metric.value < alarm.threshold:
83 self.queue.put(alarm)
84
85 def evaluate_forever(self):
86 log.debug('collect_forever')
87 cfg = Config.instance()
88 while True:
89 try:
90 self.evaluate()
91 time.sleep(cfg.OSMMON_EVALUATOR_INTERVAL)
92 except Exception:
93 log.exception("Error evaluating alarms")
94
95 def evaluate(self):
96 processes = []
97 for alarm in Alarm.select():
98 try:
99 vnfr = self.common_db.get_vnfr(alarm.nsr_id, alarm.vnf_member_index)
100 except DbException:
101 log.exception("Error getting vnfr: ")
102 continue
103 vnfd = self.common_db.get_vnfd(vnfr['vnfd-id'])
104 try:
105 vdur = next(filter(lambda vdur: vdur['name'] == alarm.vdur_name, vnfr['vdur']))
106 except StopIteration:
107 log.warning("No vdur found with name %s for alarm %s", alarm.vdur_name, alarm.id)
108 continue
109 vdu = next(filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu']))
110 vnf_monitoring_param = next(
111 filter(lambda param: param['id'] == alarm.monitoring_param, vnfd['monitoring-param']))
112 nsr_id = vnfr['nsr-id-ref']
113 vnf_member_index = vnfr['member-vnf-index-ref']
114 vdur_name = vdur['name']
115 if 'vdu-monitoring-param' in vnf_monitoring_param:
116 vdu_monitoring_param = next(filter(
117 lambda param: param['id'] == vnf_monitoring_param['vdu-monitoring-param'][
118 'vdu-monitoring-param-ref'], vdu['monitoring-param']))
119 nfvi_metric = vdu_monitoring_param['nfvi-metric']
120
121 vim_account_id = self.common_db.get_vim_account_id(nsr_id, vnf_member_index)
122 p = multiprocessing.Process(target=self._evaluate_vim_metric,
123 args=(nsr_id,
124 vnf_member_index,
125 vdur_name,
126 nfvi_metric,
127 vim_account_id,
128 alarm))
129 processes.append(p)
130 p.start()
131 if 'vdu-metric' in vnf_monitoring_param:
132 vnf_metric_name = vnf_monitoring_param['vdu-metric']['vdu-metric-name-ref']
133 p = multiprocessing.Process(target=self._evaluate_vca_metric,
134 args=(nsr_id,
135 vnf_member_index,
136 vdur_name,
137 vnf_metric_name,
138 alarm))
139 processes.append(p)
140 p.start()
141 if 'vnf-metric' in vnf_monitoring_param:
142 log.warning("vnf-metric is not currently supported.")
143 continue
144
145 for process in processes:
146 process.join()
147 triggered_alarms = []
148 while not self.queue.empty():
149 triggered_alarms.append(self.queue.get())
150 for alarm in triggered_alarms:
151 self.notify_alarm(alarm)
152 p = multiprocessing.Process(target=self.notify_alarm,
153 args=(alarm,))
154 p.start()
155
156 def notify_alarm(self, alarm: Alarm):
157 response = ResponseBuilder()
158 now = time.strftime("%d-%m-%Y") + " " + time.strftime("%X")
159 # Generate and send response
160 resp_message = response.generate_response(
161 'notify_alarm',
162 alarm_id=alarm.id,
163 vdu_name=alarm.vdur_name,
164 vnf_member_index=alarm.vnf_member_index,
165 ns_id=alarm.nsr_id,
166 metric_name=alarm.monitoring_param,
167 operation=alarm.operation,
168 threshold_value=alarm.threshold,
169 sev=alarm.severity,
170 status='alarm',
171 date=now)
172 producer = Producer()
173 producer.send(topic='alarm_response', key='notify_alarm', value=json.dumps(resp_message))
174 producer.flush()
175 log.info("Sent alarm notification: %s", resp_message)