X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;ds=sidebyside;f=osm_mon%2Fevaluator%2Fservice.py;h=ae6191ba3dec1fe596ec2ae77bc5298ff8229ae0;hb=refs%2Fchanges%2F00%2F12200%2F2;hp=20bb0adcd3a6ecfd40ff1ab761e53c199f11db97;hpb=a98b854eae15a3a86604bccaf6d84e012b583657;p=osm%2FMON.git diff --git a/osm_mon/evaluator/service.py b/osm_mon/evaluator/service.py index 20bb0ad..ae6191b 100644 --- a/osm_mon/evaluator/service.py +++ b/osm_mon/evaluator/service.py @@ -25,131 +25,92 @@ import multiprocessing from enum import Enum from typing import Tuple, List -from osm_common.dbbase import DbException - -from osm_mon.core import database from osm_mon.core.common_db import CommonDbClient from osm_mon.core.config import Config -from osm_mon.core.database import Alarm, AlarmRepository +from osm_mon.core.models import Alarm from osm_mon.evaluator.backends.prometheus import PrometheusBackend log = logging.getLogger(__name__) -BACKENDS = { - 'prometheus': PrometheusBackend -} +BACKENDS = {"prometheus": PrometheusBackend} class AlarmStatus(Enum): - ALARM = 'alarm' - OK = 'ok' - INSUFFICIENT = 'insufficient-data' + ALARM = "alarm" + OK = "ok" + INSUFFICIENT = "insufficient-data" + DISABLED = "disabled" class EvaluatorService: - def __init__(self, config: Config): self.conf = config self.common_db = CommonDbClient(self.conf) self.queue = multiprocessing.Queue() - def _get_metric_value(self, - nsr_id: str, - vnf_member_index: str, - vdur_name: str, - metric_name: str): - return BACKENDS[self.conf.get('evaluator', 'backend')](self.conf).get_metric_value(metric_name, - nsr_id, - vdur_name, - vnf_member_index) - - def _evaluate_metric(self, - nsr_id: str, - vnf_member_index: str, - vdur_name: str, - metric_name: str, - alarm: Alarm): + def _get_metric_value(self, metric_name: str, tags: dict): + return BACKENDS[self.conf.get("evaluator", "backend")]( + self.conf + ).get_metric_value(metric_name, tags) + + def _evaluate_metric(self, alarm: Alarm): + """Method to evaluate a metric value comparing it against an alarm threshold. + + Args: + alarm (Alarm): the alarm with the threshold to compare the metric against + """ + log.debug("_evaluate_metric") - metric_value = self._get_metric_value(nsr_id, vnf_member_index, vdur_name, metric_name) - if metric_value is None: - log.warning("No metric result for alarm %s", alarm.id) - self.queue.put((alarm, AlarmStatus.INSUFFICIENT)) - else: - if alarm.operation.upper() == 'GT': - if metric_value > alarm.threshold: - self.queue.put((alarm, AlarmStatus.ALARM)) - else: - self.queue.put((alarm, AlarmStatus.OK)) - elif alarm.operation.upper() == 'LT': - if metric_value < alarm.threshold: + metric_value = self._get_metric_value(alarm.metric, alarm.tags) + if alarm.alarm_status.upper() != AlarmStatus.DISABLED.value.upper(): + if metric_value is None: + log.warning("No metric result for alarm %s", alarm.uuid) + self.queue.put((alarm, AlarmStatus.INSUFFICIENT)) + else: + if ( + (alarm.operation.upper() == "GT" and metric_value > alarm.threshold) + or ( + alarm.operation.upper() == "LT" + and metric_value < alarm.threshold + ) + or ( + alarm.operation.upper() == "GE" + and metric_value >= alarm.threshold + ) + or ( + alarm.operation.upper() == "LE" + and metric_value <= alarm.threshold + ) + or ( + alarm.operation.upper() == "EQ" + and metric_value == alarm.threshold + ) + or ( + alarm.operation.upper() == "NE" + and metric_value != alarm.threshold + ) + ): self.queue.put((alarm, AlarmStatus.ALARM)) - else: + elif alarm.operation.upper() in ("GT", "LT", "GE", "LE", "EQ", "NE"): self.queue.put((alarm, AlarmStatus.OK)) + def update_alarm_status(self, alarm_state, uuid): + alarm_data = self.common_db.get_alarm_by_uuid(uuid) + if alarm_data.get("alarm_status").upper() != AlarmStatus.DISABLED.value.upper(): + self.common_db.update_alarm_status(alarm_state, uuid) + return + def evaluate_alarms(self) -> List[Tuple[Alarm, AlarmStatus]]: - log.debug('evaluate_alarms') + log.debug("evaluate_alarms") processes = [] - database.db.connect() - try: - with database.db.atomic(): - for alarm in AlarmRepository.list(): - try: - vnfr = self.common_db.get_vnfr(alarm.nsr_id, alarm.vnf_member_index) - except DbException: - log.exception("Error getting vnfr: ") - continue - vnfd = self.common_db.get_vnfd(vnfr['vnfd-id']) - try: - vdur = next(filter(lambda vdur: vdur['name'] == alarm.vdur_name, vnfr['vdur'])) - except StopIteration: - log.warning("No vdur found with name %s for alarm %s", alarm.vdur_name, alarm.id) - continue - vdu = next(filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu'])) - vnf_monitoring_param = next( - filter(lambda param: param['id'] == alarm.monitoring_param, vnfd['monitoring-param'])) - nsr_id = vnfr['nsr-id-ref'] - vnf_member_index = vnfr['member-vnf-index-ref'] - vdur_name = vdur['name'] - if 'vdu-monitoring-param' in vnf_monitoring_param: - vdu_monitoring_param = next(filter( - lambda param: param['id'] == vnf_monitoring_param['vdu-monitoring-param'][ - 'vdu-monitoring-param-ref'], vdu['monitoring-param'])) - nfvi_metric = vdu_monitoring_param['nfvi-metric'] - - p = multiprocessing.Process(target=self._evaluate_metric, - args=(nsr_id, - vnf_member_index, - vdur_name, - nfvi_metric, - alarm)) - processes.append(p) - p.start() - if 'vdu-metric' in vnf_monitoring_param: - vnf_metric_name = vnf_monitoring_param['vdu-metric']['vdu-metric-name-ref'] - p = multiprocessing.Process(target=self._evaluate_metric, - args=(nsr_id, - vnf_member_index, - vdur_name, - vnf_metric_name, - alarm)) - processes.append(p) - p.start() - if 'vnf-metric' in vnf_monitoring_param: - vnf_metric_name = vnf_monitoring_param['vnf-metric']['vnf-metric-name-ref'] - p = multiprocessing.Process(target=self._evaluate_metric, - args=(nsr_id, - vnf_member_index, - '', - vnf_metric_name, - alarm)) - processes.append(p) - p.start() - - for process in processes: - process.join(timeout=10) - alarms_tuples = [] - while not self.queue.empty(): - alarms_tuples.append(self.queue.get()) - return alarms_tuples - finally: - database.db.close() + for alarm in self.common_db.get_alarms(): + p = multiprocessing.Process(target=self._evaluate_metric, args=(alarm,)) + processes.append(p) + p.start() + + for process in processes: + process.join(timeout=10) + alarms_tuples = [] + while not self.queue.empty(): + alarms_tuples.append(self.queue.get()) + return alarms_tuples