Implements filebased config, config override through env vars, use of osm
[osm/MON.git] / osm_mon / evaluator / evaluator.py
index 9dc8c48..76881b9 100644 (file)
@@ -20,7 +20,7 @@
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: bdiaz@whitestack.com or glavado@whitestack.com
 ##
-import json
+import asyncio
 import logging
 import multiprocessing
 import time
@@ -31,21 +31,26 @@ from osm_common.dbbase import DbException
 
 from osm_mon.collector.backends.prometheus import OSM_METRIC_PREFIX
 from osm_mon.core.common_db import CommonDbClient
+from osm_mon.core.config import Config
 from osm_mon.core.database import DatabaseManager, Alarm
-from osm_mon.core.message_bus.producer import Producer
+from osm_mon.core.message_bus_client import MessageBusClient
 from osm_mon.core.response import ResponseBuilder
-from osm_mon.core.settings import Config
 
 log = logging.getLogger(__name__)
 
 
 class Evaluator:
-    def __init__(self):
-        self.common_db = CommonDbClient()
+    def __init__(self, config: Config, loop=None):
+        self.conf = config
+        if not loop:
+            loop = asyncio.get_event_loop()
+        self.loop = loop
+        self.common_db = CommonDbClient(self.conf)
         self.plugins = []
-        self.database_manager = DatabaseManager()
+        self.database_manager = DatabaseManager(self.conf)
         self.database_manager.create_tables()
         self.queue = multiprocessing.Queue()
+        self.msg_bus = MessageBusClient(config)
 
     def _evaluate_metric(self,
                          nsr_id: str,
@@ -55,12 +60,11 @@ class Evaluator:
                          alarm: Alarm):
         log.debug("_evaluate_metric")
         # TODO: Refactor to fit backend plugin model
-        cfg = Config.instance()
         query_section = "query={0}{{ns_id=\"{1}\",vdu_name=\"{2}\",vnf_member_index=\"{3}\"}}".format(
             OSM_METRIC_PREFIX + metric_name, nsr_id, vdur_name, vnf_member_index)
-        request_url = cfg.OSMMON_PROMETHEUS_URL + "/api/v1/query?" + query_section
+        request_url = self.conf.get('prometheus', 'url') + "/api/v1/query?" + query_section
         log.info("Querying Prometheus: %s", request_url)
-        r = requests.get(request_url, timeout=cfg.OSMMON_REQUEST_TIMEOUT)
+        r = requests.get(request_url, timeout=int(self.conf.get('global', 'request_timeout')))
         if r.status_code == 200:
             json_response = r.json()
             if json_response['status'] == 'success':
@@ -83,11 +87,10 @@ class Evaluator:
 
     def evaluate_forever(self):
         log.debug('evaluate_forever')
-        cfg = Config.instance()
         while True:
             try:
                 self.evaluate()
-                time.sleep(cfg.OSMMON_EVALUATOR_INTERVAL)
+                time.sleep(int(self.conf.get('evaluator', 'interval')))
             except peewee.PeeweeException:
                 log.exception("Database error evaluating alarms: ")
                 raise
@@ -151,12 +154,11 @@ class Evaluator:
                 p.start()
 
         for process in processes:
-            process.join()
+            process.join(timeout=10)
         triggered_alarms = []
         while not self.queue.empty():
             triggered_alarms.append(self.queue.get())
         for alarm in triggered_alarms:
-            self.notify_alarm(alarm)
             p = multiprocessing.Process(target=self.notify_alarm,
                                         args=(alarm,))
             p.start()
@@ -178,7 +180,5 @@ class Evaluator:
             sev=alarm.severity,
             status='alarm',
             date=now)
-        producer = Producer()
-        producer.send(topic='alarm_response', key='notify_alarm', value=json.dumps(resp_message))
-        producer.flush()
+        self.loop.run_until_complete(self.msg_bus.aiowrite('alarm_response', 'notify_alarm', resp_message))
         log.info("Sent alarm notification: %s", resp_message)