Readds plugins code and respective tests
[osm/MON.git] / osm_mon / plugins / OpenStack / Aodh / notifier.py
index c43f238..698784a 100644 (file)
@@ -25,6 +25,7 @@
 import json
 import logging
 import os
+import re
 import sys
 import time
 
@@ -32,6 +33,7 @@ from six.moves.BaseHTTPServer import BaseHTTPRequestHandler
 from six.moves.BaseHTTPServer import HTTPServer
 
 # Initialise a logger for alarm notifier
+from osm_mon.core.message_bus.producer import Producer
 from osm_mon.core.settings import Config
 
 cfg = Config.instance()
@@ -42,12 +44,18 @@ logging.basicConfig(stream=sys.stdout,
                     level=logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
 log = logging.getLogger(__name__)
 
+kafka_logger = logging.getLogger('kafka')
+kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
+kafka_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+kafka_handler = logging.StreamHandler(sys.stdout)
+kafka_handler.setFormatter(kafka_formatter)
+kafka_logger.addHandler(kafka_handler)
+
 sys.path.append(os.path.abspath(os.path.join(os.path.realpath(__file__), '..', '..', '..', '..', '..')))
 
 from osm_mon.core.database import DatabaseManager
-from osm_mon.core.message_bus.producer import KafkaProducer
 
-from osm_mon.plugins.OpenStack.response import OpenStack_Response
+from osm_mon.plugins.OpenStack.response import OpenStackResponseBuilder
 
 
 class NotifierHandler(BaseHTTPRequestHandler):
@@ -87,8 +95,7 @@ class NotifierHandler(BaseHTTPRequestHandler):
         """Sends alarm notification message to bus."""
 
         # Initialise configuration and authentication for response message
-        response = OpenStack_Response()
-        producer = KafkaProducer('alarm_response')
+        response = OpenStackResponseBuilder()
 
         database_manager = DatabaseManager()
 
@@ -100,27 +107,30 @@ class NotifierHandler(BaseHTTPRequestHandler):
         # Generate and send response
         resp_message = response.generate_response(
             'notify_alarm',
-            a_id=alarm_id,
-            vdu_name=alarm.vdu_name,
+            alarm_id=alarm_id,
+            vdu_name=alarm.vdur_name,
             vnf_member_index=alarm.vnf_member_index,
-            ns_id=alarm.ns_id,
-            metric_name=alarm.metric_name,
+            ns_id=alarm.nsr_id,
+            metric_name=alarm.monitoring_param,
             operation=alarm.operation,
             threshold_value=alarm.threshold,
             sev=values['severity'],
             date=a_date,
             state=values['current'])
-        producer.publish_alarm_response(
-            'notify_alarm', resp_message)
+        self._publish_response('notify_alarm', json.dumps(resp_message))
         log.info("Sent alarm notification: %s", resp_message)
 
+    def _publish_response(self, key: str, msg: str):
+        producer = Producer()
+        producer.send(topic='alarm_response', key=key, value=msg)
+        producer.flush()
+
 
 def run(server_class=HTTPServer, handler_class=NotifierHandler, port=8662):
     """Run the webserver application to retrieve alarm notifications."""
     try:
         server_address = ('', port)
         httpd = server_class(server_address, handler_class)
-        print('Starting alarm notifier...')
         log.info("Starting alarm notifier server on port: %s", port)
         httpd.serve_forever()
     except Exception as exc:
@@ -128,10 +138,11 @@ def run(server_class=HTTPServer, handler_class=NotifierHandler, port=8662):
 
 
 if __name__ == "__main__":
-    from sys import argv
-
-    # Runs the webserver
-    if len(argv) == 2:
-        run(port=int(argv[1]))
+    cfg = Config.instance()
+    p = re.compile(':(\d+)', re.IGNORECASE)
+    m = p.search(cfg.OS_NOTIFIER_URI)
+    if m:
+        port = m.group(1)
+        run(port=int(port))
     else:
         run()