Implements filebased config, config override through env vars, use of osm
[osm/MON.git] / osm_mon / evaluator / evaluator.py
index b040198..76881b9 100644 (file)
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: bdiaz@whitestack.com or glavado@whitestack.com
 ##
-import json
+import asyncio
 import logging
 import multiprocessing
 import time
 
+import peewee
+import requests
 from osm_common.dbbase import DbException
 
-from osm_mon.collector.collector import VIM_COLLECTORS
-from osm_mon.collector.collectors.juju import VCACollector
+from osm_mon.collector.backends.prometheus import OSM_METRIC_PREFIX
 from osm_mon.core.common_db import CommonDbClient
+from osm_mon.core.config import Config
 from osm_mon.core.database import DatabaseManager, Alarm
-from osm_mon.core.message_bus.producer import Producer
+from osm_mon.core.message_bus_client import MessageBusClient
 from osm_mon.core.response import ResponseBuilder
-from osm_mon.core.settings import Config
 
 log = logging.getLogger(__name__)
 
 
 class Evaluator:
-    def __init__(self):
-        self.common_db = CommonDbClient()
+    def __init__(self, config: Config, loop=None):
+        self.conf = config
+        if not loop:
+            loop = asyncio.get_event_loop()
+        self.loop = loop
+        self.common_db = CommonDbClient(self.conf)
         self.plugins = []
-        self.database_manager = DatabaseManager()
+        self.database_manager = DatabaseManager(self.conf)
         self.database_manager.create_tables()
         self.queue = multiprocessing.Queue()
-
-    def _evaluate_vim_metric(self,
-                             nsr_id: str,
-                             vnf_member_index: int,
-                             vdur_name: str,
-                             nfvi_metric_name: str,
-                             vim_account_id: str,
-                             alarm: Alarm):
-        vim_type = self.database_manager.get_vim_type(vim_account_id)
-        if vim_type in VIM_COLLECTORS:
-            collector = VIM_COLLECTORS[vim_type](vim_account_id)
-            metric = collector.collect_one(nsr_id, vnf_member_index, vdur_name, nfvi_metric_name)
-            if alarm.operation.upper() == 'GT':
-                if metric.value > alarm.threshold:
-                    self.queue.put(alarm)
-            elif alarm.operation.upper() == 'LT':
-                if metric.value < alarm.threshold:
-                    self.queue.put(alarm)
-
+        self.msg_bus = MessageBusClient(config)
+
+    def _evaluate_metric(self,
+                         nsr_id: str,
+                         vnf_member_index: int,
+                         vdur_name: str,
+                         metric_name: str,
+                         alarm: Alarm):
+        log.debug("_evaluate_metric")
+        # TODO: Refactor to fit backend plugin model
+        query_section = "query={0}{{ns_id=\"{1}\",vdu_name=\"{2}\",vnf_member_index=\"{3}\"}}".format(
+            OSM_METRIC_PREFIX + metric_name, nsr_id, vdur_name, vnf_member_index)
+        request_url = self.conf.get('prometheus', 'url') + "/api/v1/query?" + query_section
+        log.info("Querying Prometheus: %s", request_url)
+        r = requests.get(request_url, timeout=int(self.conf.get('global', 'request_timeout')))
+        if r.status_code == 200:
+            json_response = r.json()
+            if json_response['status'] == 'success':
+                result = json_response['data']['result']
+                if len(result):
+                    metric_value = float(result[0]['value'][1])
+                    log.info("Metric value: %s", metric_value)
+                    if alarm.operation.upper() == 'GT':
+                        if metric_value > alarm.threshold:
+                            self.queue.put(alarm)
+                    elif alarm.operation.upper() == 'LT':
+                        if metric_value < alarm.threshold:
+                            self.queue.put(alarm)
+                else:
+                    log.warning("No metric result for alarm %s", alarm.id)
+            else:
+                log.warning("Prometheus response is not success. Got status %s", json_response['status'])
         else:
-            log.debug("vimtype %s is not supported.", vim_type)
-
-    def _evaluate_vca_metric(self,
-                             nsr_id: str,
-                             vnf_member_index: int,
-                             vdur_name: str,
-                             vnf_metric_name: str,
-                             alarm: Alarm):
-        collector = VCACollector()
-        metric = collector.collect_one(nsr_id, vnf_member_index, vdur_name, vnf_metric_name)
-        if alarm.operation.upper() == 'GT':
-            if metric.value > alarm.threshold:
-                self.queue.put(alarm)
-        elif alarm.operation.upper() == 'LT':
-            if metric.value < alarm.threshold:
-                self.queue.put(alarm)
+            log.warning("Error contacting Prometheus. Got status code %s: %s", r.status_code, r.text)
 
     def evaluate_forever(self):
-        log.debug('collect_forever')
-        cfg = Config.instance()
+        log.debug('evaluate_forever')
         while True:
             try:
                 self.evaluate()
-                time.sleep(cfg.OSMMON_EVALUATOR_INTERVAL)
+                time.sleep(int(self.conf.get('evaluator', 'interval')))
+            except peewee.PeeweeException:
+                log.exception("Database error evaluating alarms: ")
+                raise
             except Exception:
                 log.exception("Error evaluating alarms")
 
     def evaluate(self):
+        log.debug('evaluate')
         processes = []
         for alarm in Alarm.select():
             try:
@@ -118,19 +124,17 @@ class Evaluator:
                         'vdu-monitoring-param-ref'], vdu['monitoring-param']))
                 nfvi_metric = vdu_monitoring_param['nfvi-metric']
 
-                vim_account_id = self.common_db.get_vim_account_id(nsr_id, vnf_member_index)
-                p = multiprocessing.Process(target=self._evaluate_vim_metric,
+                p = multiprocessing.Process(target=self._evaluate_metric,
                                             args=(nsr_id,
                                                   vnf_member_index,
                                                   vdur_name,
                                                   nfvi_metric,
-                                                  vim_account_id,
                                                   alarm))
                 processes.append(p)
                 p.start()
             if 'vdu-metric' in vnf_monitoring_param:
                 vnf_metric_name = vnf_monitoring_param['vdu-metric']['vdu-metric-name-ref']
-                p = multiprocessing.Process(target=self._evaluate_vca_metric,
+                p = multiprocessing.Process(target=self._evaluate_metric,
                                             args=(nsr_id,
                                                   vnf_member_index,
                                                   vdur_name,
@@ -139,27 +143,34 @@ class Evaluator:
                 processes.append(p)
                 p.start()
             if 'vnf-metric' in vnf_monitoring_param:
-                log.warning("vnf-metric is not currently supported.")
-                continue
+                vnf_metric_name = vnf_monitoring_param['vnf-metric']['vnf-metric-name-ref']
+                p = multiprocessing.Process(target=self._evaluate_metric,
+                                            args=(nsr_id,
+                                                  vnf_member_index,
+                                                  '',
+                                                  vnf_metric_name,
+                                                  alarm))
+                processes.append(p)
+                p.start()
 
         for process in processes:
-            process.join()
+            process.join(timeout=10)
         triggered_alarms = []
         while not self.queue.empty():
             triggered_alarms.append(self.queue.get())
         for alarm in triggered_alarms:
-            self.notify_alarm(alarm)
             p = multiprocessing.Process(target=self.notify_alarm,
                                         args=(alarm,))
             p.start()
 
     def notify_alarm(self, alarm: Alarm):
+        log.debug("notify_alarm")
         response = ResponseBuilder()
         now = time.strftime("%d-%m-%Y") + " " + time.strftime("%X")
         # Generate and send response
         resp_message = response.generate_response(
             'notify_alarm',
-            alarm_id=alarm.id,
+            alarm_id=alarm.uuid,
             vdu_name=alarm.vdur_name,
             vnf_member_index=alarm.vnf_member_index,
             ns_id=alarm.nsr_id,
@@ -169,7 +180,5 @@ class Evaluator:
             sev=alarm.severity,
             status='alarm',
             date=now)
-        producer = Producer()
-        producer.send(topic='alarm_response', key='notify_alarm', value=json.dumps(resp_message))
-        producer.flush()
+        self.loop.run_until_complete(self.msg_bus.aiowrite('alarm_response', 'notify_alarm', resp_message))
         log.info("Sent alarm notification: %s", resp_message)