X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;ds=sidebyside;f=osm_mon%2Fevaluator%2Fservice.py;h=de3798bd7fd86e110164829c4d1849c1ca013e49;hb=628df021896fa8775f9743af62a4267b617cc35c;hp=b3b0d265a18a1c9aca463875fa33812007e08bc6;hpb=8c43ba0ce4a3910a350e3e74fe24770f04918f1e;p=osm%2FMON.git diff --git a/osm_mon/evaluator/service.py b/osm_mon/evaluator/service.py index b3b0d26..de3798b 100644 --- a/osm_mon/evaluator/service.py +++ b/osm_mon/evaluator/service.py @@ -25,9 +25,10 @@ import multiprocessing from enum import Enum from typing import Tuple, List +from osm_mon.core import database from osm_mon.core.common_db import CommonDbClient from osm_mon.core.config import Config -from osm_mon.core.models import Alarm +from osm_mon.core.database import Alarm, AlarmRepository from osm_mon.evaluator.backends.prometheus import PrometheusBackend log = logging.getLogger(__name__) @@ -56,11 +57,11 @@ class EvaluatorService: return BACKENDS[self.conf.get('evaluator', 'backend')](self.conf).get_metric_value(metric_name, tags) def _evaluate_metric(self, - alarm: Alarm): + alarm: Alarm, tags: dict): log.debug("_evaluate_metric") - metric_value = self._get_metric_value(alarm.metric, alarm.tags) + metric_value = self._get_metric_value(alarm.metric, tags) if metric_value is None: - log.warning("No metric result for alarm %s", alarm.uuid) + log.warning("No metric result for alarm %s", alarm.id) self.queue.put((alarm, AlarmStatus.INSUFFICIENT)) else: if alarm.operation.upper() == 'GT': @@ -77,15 +78,25 @@ class EvaluatorService: def evaluate_alarms(self) -> List[Tuple[Alarm, AlarmStatus]]: log.debug('evaluate_alarms') processes = [] - for alarm in self.common_db.get_alarms(): - p = multiprocessing.Process(target=self._evaluate_metric, - args=(alarm,)) - processes.append(p) - p.start() - - for process in processes: - process.join(timeout=10) - alarms_tuples = [] - while not self.queue.empty(): - alarms_tuples.append(self.queue.get()) - return alarms_tuples + database.db.connect() + try: + with database.db.atomic(): + for alarm in AlarmRepository.list(): + # Tags need to be passed inside a dict to avoid database locking issues related to process forking + tags = {} + for tag in alarm.tags: + tags[tag.name] = tag.value + p = multiprocessing.Process(target=self._evaluate_metric, + args=(alarm, tags)) + processes.append(p) + p.start() + + for process in processes: + process.join(timeout=10) + alarms_tuples = [] + log.info("Appending alarms to queue") + while not self.queue.empty(): + alarms_tuples.append(self.queue.get()) + return alarms_tuples + finally: + database.db.close()