Revert "Migrates alarms to MongoDB"
[osm/MON.git] / osm_mon / evaluator / service.py
index 0868e32..de3798b 100644 (file)
@@ -25,8 +25,6 @@ import multiprocessing
 from enum import Enum
 from typing import Tuple, List
 
-from osm_common.dbbase import DbException
-
 from osm_mon.core import database
 from osm_mon.core.common_db import CommonDbClient
 from osm_mon.core.config import Config
@@ -54,22 +52,15 @@ class EvaluatorService:
         self.queue = multiprocessing.Queue()
 
     def _get_metric_value(self,
-                          nsr_id: str,
-                          vnf_member_index: int,
-                          vdur_name: str,
-                          metric_name: str):
-        return BACKENDS[self.conf.get('evaluator', 'backend')]().get_metric_value(metric_name, nsr_id, vdur_name,
-                                                                                  vnf_member_index)
+                          metric_name: str,
+                          tags: dict):
+        return BACKENDS[self.conf.get('evaluator', 'backend')](self.conf).get_metric_value(metric_name, tags)
 
     def _evaluate_metric(self,
-                         nsr_id: str,
-                         vnf_member_index: int,
-                         vdur_name: str,
-                         metric_name: str,
-                         alarm: Alarm):
+                         alarm: Alarm, tags: dict):
         log.debug("_evaluate_metric")
-        metric_value = self._get_metric_value(nsr_id, vnf_member_index, vdur_name, metric_name)
-        if not metric_value:
+        metric_value = self._get_metric_value(alarm.metric, tags)
+        if metric_value is None:
             log.warning("No metric result for alarm %s", alarm.id)
             self.queue.put((alarm, AlarmStatus.INSUFFICIENT))
         else:
@@ -91,61 +82,19 @@ class EvaluatorService:
         try:
             with database.db.atomic():
                 for alarm in AlarmRepository.list():
-                    try:
-                        vnfr = self.common_db.get_vnfr(alarm.nsr_id, alarm.vnf_member_index)
-                    except DbException:
-                        log.exception("Error getting vnfr: ")
-                        continue
-                    vnfd = self.common_db.get_vnfd(vnfr['vnfd-id'])
-                    try:
-                        vdur = next(filter(lambda vdur: vdur['name'] == alarm.vdur_name, vnfr['vdur']))
-                    except StopIteration:
-                        log.warning("No vdur found with name %s for alarm %s", alarm.vdur_name, alarm.id)
-                        continue
-                    vdu = next(filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu']))
-                    vnf_monitoring_param = next(
-                        filter(lambda param: param['id'] == alarm.monitoring_param, vnfd['monitoring-param']))
-                    nsr_id = vnfr['nsr-id-ref']
-                    vnf_member_index = vnfr['member-vnf-index-ref']
-                    vdur_name = vdur['name']
-                    if 'vdu-monitoring-param' in vnf_monitoring_param:
-                        vdu_monitoring_param = next(filter(
-                            lambda param: param['id'] == vnf_monitoring_param['vdu-monitoring-param'][
-                                'vdu-monitoring-param-ref'], vdu['monitoring-param']))
-                        nfvi_metric = vdu_monitoring_param['nfvi-metric']
-
-                        p = multiprocessing.Process(target=self._evaluate_metric,
-                                                    args=(nsr_id,
-                                                          vnf_member_index,
-                                                          vdur_name,
-                                                          nfvi_metric,
-                                                          alarm))
-                        processes.append(p)
-                        p.start()
-                    if 'vdu-metric' in vnf_monitoring_param:
-                        vnf_metric_name = vnf_monitoring_param['vdu-metric']['vdu-metric-name-ref']
-                        p = multiprocessing.Process(target=self._evaluate_metric,
-                                                    args=(nsr_id,
-                                                          vnf_member_index,
-                                                          vdur_name,
-                                                          vnf_metric_name,
-                                                          alarm))
-                        processes.append(p)
-                        p.start()
-                    if 'vnf-metric' in vnf_monitoring_param:
-                        vnf_metric_name = vnf_monitoring_param['vnf-metric']['vnf-metric-name-ref']
-                        p = multiprocessing.Process(target=self._evaluate_metric,
-                                                    args=(nsr_id,
-                                                          vnf_member_index,
-                                                          '',
-                                                          vnf_metric_name,
-                                                          alarm))
-                        processes.append(p)
-                        p.start()
+                    # Tags need to be passed inside a dict to avoid database locking issues related to process forking
+                    tags = {}
+                    for tag in alarm.tags:
+                        tags[tag.name] = tag.value
+                    p = multiprocessing.Process(target=self._evaluate_metric,
+                                                args=(alarm, tags))
+                    processes.append(p)
+                    p.start()
 
                 for process in processes:
                     process.join(timeout=10)
                 alarms_tuples = []
+                log.info("Appending alarms to queue")
                 while not self.queue.empty():
                     alarms_tuples.append(self.queue.get())
                 return alarms_tuples