Revert "Migrates alarms to MongoDB"
This reverts commit b6233b3d90ca1ba617e476b915fba056cf7891f7.
Change-Id: I82b7c5916cdf7eb2816348c4ea13609d0fddffe6
Signed-off-by: lavado <glavado@whitestack.com>
diff --git a/osm_mon/evaluator/evaluator.py b/osm_mon/evaluator/evaluator.py
index cc9a8ad..2f22625 100644
--- a/osm_mon/evaluator/evaluator.py
+++ b/osm_mon/evaluator/evaluator.py
@@ -28,8 +28,8 @@
import peewee
from osm_mon.core.config import Config
+from osm_mon.core.database import Alarm
from osm_mon.core.message_bus_client import MessageBusClient
-from osm_mon.core.models import Alarm
from osm_mon.core.response import ResponseBuilder
from osm_mon.evaluator.service import EvaluatorService, AlarmStatus
diff --git a/osm_mon/evaluator/service.py b/osm_mon/evaluator/service.py
index b3b0d26..de3798b 100644
--- a/osm_mon/evaluator/service.py
+++ b/osm_mon/evaluator/service.py
@@ -25,9 +25,10 @@
from enum import Enum
from typing import Tuple, List
+from osm_mon.core import database
from osm_mon.core.common_db import CommonDbClient
from osm_mon.core.config import Config
-from osm_mon.core.models import Alarm
+from osm_mon.core.database import Alarm, AlarmRepository
from osm_mon.evaluator.backends.prometheus import PrometheusBackend
log = logging.getLogger(__name__)
@@ -56,11 +57,11 @@
return BACKENDS[self.conf.get('evaluator', 'backend')](self.conf).get_metric_value(metric_name, tags)
def _evaluate_metric(self,
- alarm: Alarm):
+ alarm: Alarm, tags: dict):
log.debug("_evaluate_metric")
- metric_value = self._get_metric_value(alarm.metric, alarm.tags)
+ metric_value = self._get_metric_value(alarm.metric, tags)
if metric_value is None:
- log.warning("No metric result for alarm %s", alarm.uuid)
+ log.warning("No metric result for alarm %s", alarm.id)
self.queue.put((alarm, AlarmStatus.INSUFFICIENT))
else:
if alarm.operation.upper() == 'GT':
@@ -77,15 +78,25 @@
def evaluate_alarms(self) -> List[Tuple[Alarm, AlarmStatus]]:
log.debug('evaluate_alarms')
processes = []
- for alarm in self.common_db.get_alarms():
- p = multiprocessing.Process(target=self._evaluate_metric,
- args=(alarm,))
- processes.append(p)
- p.start()
+ database.db.connect()
+ try:
+ with database.db.atomic():
+ for alarm in AlarmRepository.list():
+ # Tags need to be passed inside a dict to avoid database locking issues related to process forking
+ tags = {}
+ for tag in alarm.tags:
+ tags[tag.name] = tag.value
+ p = multiprocessing.Process(target=self._evaluate_metric,
+ args=(alarm, tags))
+ processes.append(p)
+ p.start()
- for process in processes:
- process.join(timeout=10)
- alarms_tuples = []
- while not self.queue.empty():
- alarms_tuples.append(self.queue.get())
- return alarms_tuples
+ for process in processes:
+ process.join(timeout=10)
+ alarms_tuples = []
+ log.info("Appending alarms to queue")
+ while not self.queue.empty():
+ alarms_tuples.append(self.queue.get())
+ return alarms_tuples
+ finally:
+ database.db.close()