X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FMON.git;a=blobdiff_plain;f=osm_mon%2Fevaluator%2Fservice.py;h=de3798bd7fd86e110164829c4d1849c1ca013e49;hp=20bb0adcd3a6ecfd40ff1ab761e53c199f11db97;hb=628df021896fa8775f9743af62a4267b617cc35c;hpb=8283936d7cecd8941116409edbfaceeba6570acb diff --git a/osm_mon/evaluator/service.py b/osm_mon/evaluator/service.py index 20bb0ad..de3798b 100644 --- a/osm_mon/evaluator/service.py +++ b/osm_mon/evaluator/service.py @@ -25,8 +25,6 @@ import multiprocessing from enum import Enum from typing import Tuple, List -from osm_common.dbbase import DbException - from osm_mon.core import database from osm_mon.core.common_db import CommonDbClient from osm_mon.core.config import Config @@ -54,23 +52,14 @@ class EvaluatorService: self.queue = multiprocessing.Queue() def _get_metric_value(self, - nsr_id: str, - vnf_member_index: str, - vdur_name: str, - metric_name: str): - return BACKENDS[self.conf.get('evaluator', 'backend')](self.conf).get_metric_value(metric_name, - nsr_id, - vdur_name, - vnf_member_index) + metric_name: str, + tags: dict): + return BACKENDS[self.conf.get('evaluator', 'backend')](self.conf).get_metric_value(metric_name, tags) def _evaluate_metric(self, - nsr_id: str, - vnf_member_index: str, - vdur_name: str, - metric_name: str, - alarm: Alarm): + alarm: Alarm, tags: dict): log.debug("_evaluate_metric") - metric_value = self._get_metric_value(nsr_id, vnf_member_index, vdur_name, metric_name) + metric_value = self._get_metric_value(alarm.metric, tags) if metric_value is None: log.warning("No metric result for alarm %s", alarm.id) self.queue.put((alarm, AlarmStatus.INSUFFICIENT)) @@ -93,61 +82,19 @@ class EvaluatorService: try: with database.db.atomic(): for alarm in AlarmRepository.list(): - try: - vnfr = self.common_db.get_vnfr(alarm.nsr_id, alarm.vnf_member_index) - except DbException: - log.exception("Error getting vnfr: ") - continue - vnfd = self.common_db.get_vnfd(vnfr['vnfd-id']) - try: - vdur = next(filter(lambda vdur: vdur['name'] == alarm.vdur_name, vnfr['vdur'])) - except StopIteration: - log.warning("No vdur found with name %s for alarm %s", alarm.vdur_name, alarm.id) - continue - vdu = next(filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu'])) - vnf_monitoring_param = next( - filter(lambda param: param['id'] == alarm.monitoring_param, vnfd['monitoring-param'])) - nsr_id = vnfr['nsr-id-ref'] - vnf_member_index = vnfr['member-vnf-index-ref'] - vdur_name = vdur['name'] - if 'vdu-monitoring-param' in vnf_monitoring_param: - vdu_monitoring_param = next(filter( - lambda param: param['id'] == vnf_monitoring_param['vdu-monitoring-param'][ - 'vdu-monitoring-param-ref'], vdu['monitoring-param'])) - nfvi_metric = vdu_monitoring_param['nfvi-metric'] - - p = multiprocessing.Process(target=self._evaluate_metric, - args=(nsr_id, - vnf_member_index, - vdur_name, - nfvi_metric, - alarm)) - processes.append(p) - p.start() - if 'vdu-metric' in vnf_monitoring_param: - vnf_metric_name = vnf_monitoring_param['vdu-metric']['vdu-metric-name-ref'] - p = multiprocessing.Process(target=self._evaluate_metric, - args=(nsr_id, - vnf_member_index, - vdur_name, - vnf_metric_name, - alarm)) - processes.append(p) - p.start() - if 'vnf-metric' in vnf_monitoring_param: - vnf_metric_name = vnf_monitoring_param['vnf-metric']['vnf-metric-name-ref'] - p = multiprocessing.Process(target=self._evaluate_metric, - args=(nsr_id, - vnf_member_index, - '', - vnf_metric_name, - alarm)) - processes.append(p) - p.start() + # Tags need to be passed inside a dict to avoid database locking issues related to process forking + tags = {} + for tag in alarm.tags: + tags[tag.name] = tag.value + p = multiprocessing.Process(target=self._evaluate_metric, + args=(alarm, tags)) + processes.append(p) + p.start() for process in processes: process.join(timeout=10) alarms_tuples = [] + log.info("Appending alarms to queue") while not self.queue.empty(): alarms_tuples.append(self.queue.get()) return alarms_tuples