Refactors alarms to decouple them from vnf specific data

Alarms now handle the concept of tags, instead of having vnf specific
parameters in the model. This allows for wider usecases of alarms (e.g. fault management).

Change-Id: I2b395c4bb7f72d4fb7c53b75feccd7de00508013
Signed-off-by: Benjamin Diaz <bdiaz@whitestack.com>
diff --git a/osm_mon/evaluator/backends/base.py b/osm_mon/evaluator/backends/base.py
index 0e9fc0d..5ef1598 100644
--- a/osm_mon/evaluator/backends/base.py
+++ b/osm_mon/evaluator/backends/base.py
@@ -19,6 +19,7 @@
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: bdiaz@whitestack.com or glavado@whitestack.com
 ##
+
 from osm_mon.core.config import Config
 
 
@@ -26,5 +27,5 @@
     def __init__(self, config: Config):
         pass
 
-    def get_metric_value(self, metric_name, nsr_id, vdur_name, vnf_member_index):
+    def get_metric_value(self, metric_name: str, tags: dict):
         pass
diff --git a/osm_mon/evaluator/backends/prometheus.py b/osm_mon/evaluator/backends/prometheus.py
index 9ff50d6..070cf69 100644
--- a/osm_mon/evaluator/backends/prometheus.py
+++ b/osm_mon/evaluator/backends/prometheus.py
@@ -38,24 +38,36 @@
         super().__init__(config)
         self.conf = config
 
-    def get_metric_value(self, metric_name, nsr_id, vdur_name, vnf_member_index):
-        query_section = "query={0}{{ns_id=\"{1}\",vdu_name=\"{2}\",vnf_member_index=\"{3}\"}}".format(
-            OSM_METRIC_PREFIX + metric_name, nsr_id, vdur_name, vnf_member_index)
-        request_url = self.conf.get('prometheus', 'url') + "/api/v1/query?" + query_section
+    def get_metric_value(self, metric_name: str, tags: dict):
+        query = self._build_query(metric_name, tags)
+        request_url = self._build_url(query)
         log.info("Querying Prometheus: %s", request_url)
         r = requests.get(request_url, timeout=int(self.conf.get('global', 'request_timeout')))
         if r.status_code == 200:
             json_response = r.json()
             if json_response['status'] == 'success':
-                result = json_response['data']['result']
-                if len(result):
-                    metric_value = float(result[0]['value'][1])
-                    log.info("Metric value: %s", metric_value)
-                    return metric_value
-                else:
-                    return None
+                return self._get_metric_value_from_response(json_response)
             else:
                 log.warning("Prometheus response is not success. Got status %s", json_response['status'])
         else:
             log.warning("Error contacting Prometheus. Got status code %s: %s", r.status_code, r.text)
         return None
+
+    def _build_query(self, metric_name: str, tags: dict) -> str:
+        query_section_tags = []
+        for k, v in tags.items():
+            query_section_tags.append(k + '=\"' + v + '\"')
+        query_section = "query={0}{{{1}}}".format(OSM_METRIC_PREFIX + metric_name, ','.join(query_section_tags))
+        return query_section
+
+    def _build_url(self, query: str):
+        return self.conf.get('prometheus', 'url') + "/api/v1/query?" + query
+
+    def _get_metric_value_from_response(self, json_response):
+        result = json_response['data']['result']
+        if len(result):
+            metric_value = float(result[0]['value'][1])
+            log.info("Metric value: %s", metric_value)
+            return metric_value
+        else:
+            return None
diff --git a/osm_mon/evaluator/evaluator.py b/osm_mon/evaluator/evaluator.py
index d3fdfd5..2f22625 100644
--- a/osm_mon/evaluator/evaluator.py
+++ b/osm_mon/evaluator/evaluator.py
@@ -61,10 +61,14 @@
     def evaluate(self):
         log.debug('evaluate')
         alarms_tuples = self.service.evaluate_alarms()
+        processes = []
         for alarm, status in alarms_tuples:
             p = multiprocessing.Process(target=self.notify_alarm,
                                         args=(alarm, status))
             p.start()
+            processes.append(p)
+        for process in processes:
+            process.join(timeout=10)
 
     def notify_alarm(self, alarm: Alarm, status: AlarmStatus):
         log.debug("notify_alarm")
@@ -74,16 +78,17 @@
 
     def _build_alarm_response(self, alarm: Alarm, status: AlarmStatus):
         response = ResponseBuilder()
+        tags = {}
+        for tag in alarm.tags:
+            tags[tag.name] = tag.value
         now = time.strftime("%d-%m-%Y") + " " + time.strftime("%X")
         return response.generate_response(
             'notify_alarm',
             alarm_id=alarm.uuid,
-            vdu_name=alarm.vdur_name,
-            vnf_member_index=alarm.vnf_member_index,
-            ns_id=alarm.nsr_id,
-            metric_name=alarm.monitoring_param,
+            metric_name=alarm.metric,
             operation=alarm.operation,
             threshold_value=alarm.threshold,
             sev=alarm.severity,
             status=status.value,
-            date=now)
+            date=now,
+            tags=tags)
diff --git a/osm_mon/evaluator/service.py b/osm_mon/evaluator/service.py
index 20bb0ad..de3798b 100644
--- a/osm_mon/evaluator/service.py
+++ b/osm_mon/evaluator/service.py
@@ -25,8 +25,6 @@
 from enum import Enum
 from typing import Tuple, List
 
-from osm_common.dbbase import DbException
-
 from osm_mon.core import database
 from osm_mon.core.common_db import CommonDbClient
 from osm_mon.core.config import Config
@@ -54,23 +52,14 @@
         self.queue = multiprocessing.Queue()
 
     def _get_metric_value(self,
-                          nsr_id: str,
-                          vnf_member_index: str,
-                          vdur_name: str,
-                          metric_name: str):
-        return BACKENDS[self.conf.get('evaluator', 'backend')](self.conf).get_metric_value(metric_name,
-                                                                                           nsr_id,
-                                                                                           vdur_name,
-                                                                                           vnf_member_index)
+                          metric_name: str,
+                          tags: dict):
+        return BACKENDS[self.conf.get('evaluator', 'backend')](self.conf).get_metric_value(metric_name, tags)
 
     def _evaluate_metric(self,
-                         nsr_id: str,
-                         vnf_member_index: str,
-                         vdur_name: str,
-                         metric_name: str,
-                         alarm: Alarm):
+                         alarm: Alarm, tags: dict):
         log.debug("_evaluate_metric")
-        metric_value = self._get_metric_value(nsr_id, vnf_member_index, vdur_name, metric_name)
+        metric_value = self._get_metric_value(alarm.metric, tags)
         if metric_value is None:
             log.warning("No metric result for alarm %s", alarm.id)
             self.queue.put((alarm, AlarmStatus.INSUFFICIENT))
@@ -93,61 +82,19 @@
         try:
             with database.db.atomic():
                 for alarm in AlarmRepository.list():
-                    try:
-                        vnfr = self.common_db.get_vnfr(alarm.nsr_id, alarm.vnf_member_index)
-                    except DbException:
-                        log.exception("Error getting vnfr: ")
-                        continue
-                    vnfd = self.common_db.get_vnfd(vnfr['vnfd-id'])
-                    try:
-                        vdur = next(filter(lambda vdur: vdur['name'] == alarm.vdur_name, vnfr['vdur']))
-                    except StopIteration:
-                        log.warning("No vdur found with name %s for alarm %s", alarm.vdur_name, alarm.id)
-                        continue
-                    vdu = next(filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu']))
-                    vnf_monitoring_param = next(
-                        filter(lambda param: param['id'] == alarm.monitoring_param, vnfd['monitoring-param']))
-                    nsr_id = vnfr['nsr-id-ref']
-                    vnf_member_index = vnfr['member-vnf-index-ref']
-                    vdur_name = vdur['name']
-                    if 'vdu-monitoring-param' in vnf_monitoring_param:
-                        vdu_monitoring_param = next(filter(
-                            lambda param: param['id'] == vnf_monitoring_param['vdu-monitoring-param'][
-                                'vdu-monitoring-param-ref'], vdu['monitoring-param']))
-                        nfvi_metric = vdu_monitoring_param['nfvi-metric']
-
-                        p = multiprocessing.Process(target=self._evaluate_metric,
-                                                    args=(nsr_id,
-                                                          vnf_member_index,
-                                                          vdur_name,
-                                                          nfvi_metric,
-                                                          alarm))
-                        processes.append(p)
-                        p.start()
-                    if 'vdu-metric' in vnf_monitoring_param:
-                        vnf_metric_name = vnf_monitoring_param['vdu-metric']['vdu-metric-name-ref']
-                        p = multiprocessing.Process(target=self._evaluate_metric,
-                                                    args=(nsr_id,
-                                                          vnf_member_index,
-                                                          vdur_name,
-                                                          vnf_metric_name,
-                                                          alarm))
-                        processes.append(p)
-                        p.start()
-                    if 'vnf-metric' in vnf_monitoring_param:
-                        vnf_metric_name = vnf_monitoring_param['vnf-metric']['vnf-metric-name-ref']
-                        p = multiprocessing.Process(target=self._evaluate_metric,
-                                                    args=(nsr_id,
-                                                          vnf_member_index,
-                                                          '',
-                                                          vnf_metric_name,
-                                                          alarm))
-                        processes.append(p)
-                        p.start()
+                    # Tags need to be passed inside a dict to avoid database locking issues related to process forking
+                    tags = {}
+                    for tag in alarm.tags:
+                        tags[tag.name] = tag.value
+                    p = multiprocessing.Process(target=self._evaluate_metric,
+                                                args=(alarm, tags))
+                    processes.append(p)
+                    p.start()
 
                 for process in processes:
                     process.join(timeout=10)
                 alarms_tuples = []
+                log.info("Appending alarms to queue")
                 while not self.queue.empty():
                     alarms_tuples.append(self.queue.get())
                 return alarms_tuples