Changes insufficient data validation in alarms to account for metric results 0.0
[osm/MON.git] / osm_mon / evaluator / service.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact: bdiaz@whitestack.com or glavado@whitestack.com
22 ##
23 import logging
24 import multiprocessing
25 from enum import Enum
26 from typing import Tuple, List
27
28 from osm_common.dbbase import DbException
29
30 from osm_mon.core import database
31 from osm_mon.core.common_db import CommonDbClient
32 from osm_mon.core.config import Config
33 from osm_mon.core.database import Alarm, AlarmRepository
34 from osm_mon.evaluator.backends.prometheus import PrometheusBackend
35
36 log = logging.getLogger(__name__)
37
38 BACKENDS = {
39 'prometheus': PrometheusBackend
40 }
41
42
43 class AlarmStatus(Enum):
44 ALARM = 'alarm'
45 OK = 'ok'
46 INSUFFICIENT = 'insufficient-data'
47
48
49 class EvaluatorService:
50
51 def __init__(self, config: Config):
52 self.conf = config
53 self.common_db = CommonDbClient(self.conf)
54 self.queue = multiprocessing.Queue()
55
56 def _get_metric_value(self,
57 nsr_id: str,
58 vnf_member_index: str,
59 vdur_name: str,
60 metric_name: str):
61 return BACKENDS[self.conf.get('evaluator', 'backend')](self.conf).get_metric_value(metric_name,
62 nsr_id,
63 vdur_name,
64 vnf_member_index)
65
66 def _evaluate_metric(self,
67 nsr_id: str,
68 vnf_member_index: str,
69 vdur_name: str,
70 metric_name: str,
71 alarm: Alarm):
72 log.debug("_evaluate_metric")
73 metric_value = self._get_metric_value(nsr_id, vnf_member_index, vdur_name, metric_name)
74 if metric_value is None:
75 log.warning("No metric result for alarm %s", alarm.id)
76 self.queue.put((alarm, AlarmStatus.INSUFFICIENT))
77 else:
78 if alarm.operation.upper() == 'GT':
79 if metric_value > alarm.threshold:
80 self.queue.put((alarm, AlarmStatus.ALARM))
81 else:
82 self.queue.put((alarm, AlarmStatus.OK))
83 elif alarm.operation.upper() == 'LT':
84 if metric_value < alarm.threshold:
85 self.queue.put((alarm, AlarmStatus.ALARM))
86 else:
87 self.queue.put((alarm, AlarmStatus.OK))
88
89 def evaluate_alarms(self) -> List[Tuple[Alarm, AlarmStatus]]:
90 log.debug('evaluate_alarms')
91 processes = []
92 database.db.connect()
93 try:
94 with database.db.atomic():
95 for alarm in AlarmRepository.list():
96 try:
97 vnfr = self.common_db.get_vnfr(alarm.nsr_id, alarm.vnf_member_index)
98 except DbException:
99 log.exception("Error getting vnfr: ")
100 continue
101 vnfd = self.common_db.get_vnfd(vnfr['vnfd-id'])
102 try:
103 vdur = next(filter(lambda vdur: vdur['name'] == alarm.vdur_name, vnfr['vdur']))
104 except StopIteration:
105 log.warning("No vdur found with name %s for alarm %s", alarm.vdur_name, alarm.id)
106 continue
107 vdu = next(filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu']))
108 vnf_monitoring_param = next(
109 filter(lambda param: param['id'] == alarm.monitoring_param, vnfd['monitoring-param']))
110 nsr_id = vnfr['nsr-id-ref']
111 vnf_member_index = vnfr['member-vnf-index-ref']
112 vdur_name = vdur['name']
113 if 'vdu-monitoring-param' in vnf_monitoring_param:
114 vdu_monitoring_param = next(filter(
115 lambda param: param['id'] == vnf_monitoring_param['vdu-monitoring-param'][
116 'vdu-monitoring-param-ref'], vdu['monitoring-param']))
117 nfvi_metric = vdu_monitoring_param['nfvi-metric']
118
119 p = multiprocessing.Process(target=self._evaluate_metric,
120 args=(nsr_id,
121 vnf_member_index,
122 vdur_name,
123 nfvi_metric,
124 alarm))
125 processes.append(p)
126 p.start()
127 if 'vdu-metric' in vnf_monitoring_param:
128 vnf_metric_name = vnf_monitoring_param['vdu-metric']['vdu-metric-name-ref']
129 p = multiprocessing.Process(target=self._evaluate_metric,
130 args=(nsr_id,
131 vnf_member_index,
132 vdur_name,
133 vnf_metric_name,
134 alarm))
135 processes.append(p)
136 p.start()
137 if 'vnf-metric' in vnf_monitoring_param:
138 vnf_metric_name = vnf_monitoring_param['vnf-metric']['vnf-metric-name-ref']
139 p = multiprocessing.Process(target=self._evaluate_metric,
140 args=(nsr_id,
141 vnf_member_index,
142 '',
143 vnf_metric_name,
144 alarm))
145 processes.append(p)
146 p.start()
147
148 for process in processes:
149 process.join(timeout=10)
150 alarms_tuples = []
151 while not self.queue.empty():
152 alarms_tuples.append(self.queue.get())
153 return alarms_tuples
154 finally:
155 database.db.close()