Refactors codebase
[osm/MON.git] / osm_mon / plugins / OpenStack / Aodh / notifier.py
index 92674cb..a1c85e6 100644 (file)
@@ -32,19 +32,29 @@ from six.moves.BaseHTTPServer import BaseHTTPRequestHandler
 from six.moves.BaseHTTPServer import HTTPServer
 
 # Initialise a logger for alarm notifier
+from osm_mon.core.message_bus.producer import Producer
+from osm_mon.core.settings import Config
+
+cfg = Config.instance()
 
 logging.basicConfig(stream=sys.stdout,
-                    format='%(asctime)s %(message)s',
+                    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
                     datefmt='%m/%d/%Y %I:%M:%S %p',
-                    level=logging.INFO)
+                    level=logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
 log = logging.getLogger(__name__)
 
+kafka_logger = logging.getLogger('kafka')
+kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
+kafka_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+kafka_handler = logging.StreamHandler(sys.stdout)
+kafka_handler.setFormatter(kafka_formatter)
+kafka_logger.addHandler(kafka_handler)
+
 sys.path.append(os.path.abspath(os.path.join(os.path.realpath(__file__), '..', '..', '..', '..', '..')))
 
 from osm_mon.core.database import DatabaseManager
-from osm_mon.core.message_bus.producer import KafkaProducer
 
-from osm_mon.plugins.OpenStack.response import OpenStack_Response
+from osm_mon.plugins.OpenStack.response import OpenStackResponseBuilder
 
 
 class NotifierHandler(BaseHTTPRequestHandler):
@@ -72,11 +82,11 @@ class NotifierHandler(BaseHTTPRequestHandler):
             post_data = post_data.decode()
         except AttributeError:
             pass
-        log.info("This alarm was triggered: %s", json.dumps(post_data))
+        log.info("This alarm was triggered: %s", post_data)
 
         # Send alarm notification to message bus
         try:
-            self.notify_alarm(json.dumps(post_data))
+            self.notify_alarm(json.loads(post_data))
         except Exception:
             log.exception("Error notifying alarm")
 
@@ -84,8 +94,7 @@ class NotifierHandler(BaseHTTPRequestHandler):
         """Sends alarm notification message to bus."""
 
         # Initialise configuration and authentication for response message
-        response = OpenStack_Response()
-        producer = KafkaProducer('alarm_response')
+        response = OpenStackResponseBuilder()
 
         database_manager = DatabaseManager()
 
@@ -97,7 +106,7 @@ class NotifierHandler(BaseHTTPRequestHandler):
         # Generate and send response
         resp_message = response.generate_response(
             'notify_alarm',
-            a_id=alarm_id,
+            alarm_id=alarm_id,
             vdu_name=alarm.vdu_name,
             vnf_member_index=alarm.vnf_member_index,
             ns_id=alarm.ns_id,
@@ -107,10 +116,14 @@ class NotifierHandler(BaseHTTPRequestHandler):
             sev=values['severity'],
             date=a_date,
             state=values['current'])
-        producer.publish_alarm_response(
-            'notify_alarm', resp_message)
+        self._publish_response('notify_alarm', json.dumps(resp_message))
         log.info("Sent alarm notification: %s", resp_message)
 
+    def _publish_response(self, key: str, msg: str):
+        producer = Producer()
+        producer.send(topic='alarm_response', key=key, value=msg)
+        producer.flush()
+
 
 def run(server_class=HTTPServer, handler_class=NotifierHandler, port=8662):
     """Run the webserver application to retrieve alarm notifications."""