Revert "Migrates alarms to MongoDB"
[osm/MON.git] / osm_mon / evaluator / service.py
index b3b0d26..de3798b 100644 (file)
@@ -25,9 +25,10 @@ import multiprocessing
 from enum import Enum
 from typing import Tuple, List
 
+from osm_mon.core import database
 from osm_mon.core.common_db import CommonDbClient
 from osm_mon.core.config import Config
-from osm_mon.core.models import Alarm
+from osm_mon.core.database import Alarm, AlarmRepository
 from osm_mon.evaluator.backends.prometheus import PrometheusBackend
 
 log = logging.getLogger(__name__)
@@ -56,11 +57,11 @@ class EvaluatorService:
         return BACKENDS[self.conf.get('evaluator', 'backend')](self.conf).get_metric_value(metric_name, tags)
 
     def _evaluate_metric(self,
-                         alarm: Alarm):
+                         alarm: Alarm, tags: dict):
         log.debug("_evaluate_metric")
-        metric_value = self._get_metric_value(alarm.metric, alarm.tags)
+        metric_value = self._get_metric_value(alarm.metric, tags)
         if metric_value is None:
-            log.warning("No metric result for alarm %s", alarm.uuid)
+            log.warning("No metric result for alarm %s", alarm.id)
             self.queue.put((alarm, AlarmStatus.INSUFFICIENT))
         else:
             if alarm.operation.upper() == 'GT':
@@ -77,15 +78,25 @@ class EvaluatorService:
     def evaluate_alarms(self) -> List[Tuple[Alarm, AlarmStatus]]:
         log.debug('evaluate_alarms')
         processes = []
-        for alarm in self.common_db.get_alarms():
-            p = multiprocessing.Process(target=self._evaluate_metric,
-                                        args=(alarm,))
-            processes.append(p)
-            p.start()
-
-        for process in processes:
-            process.join(timeout=10)
-        alarms_tuples = []
-        while not self.queue.empty():
-            alarms_tuples.append(self.queue.get())
-        return alarms_tuples
+        database.db.connect()
+        try:
+            with database.db.atomic():
+                for alarm in AlarmRepository.list():
+                    # Tags need to be passed inside a dict to avoid database locking issues related to process forking
+                    tags = {}
+                    for tag in alarm.tags:
+                        tags[tag.name] = tag.value
+                    p = multiprocessing.Process(target=self._evaluate_metric,
+                                                args=(alarm, tags))
+                    processes.append(p)
+                    p.start()
+
+                for process in processes:
+                    process.join(timeout=10)
+                alarms_tuples = []
+                log.info("Appending alarms to queue")
+                while not self.queue.empty():
+                    alarms_tuples.append(self.queue.get())
+                return alarms_tuples
+        finally:
+            database.db.close()