Adds time.sleep in Kafka validation loop
[osm/MON.git] / osm_mon / plugins / OpenStack / Aodh / notifier.py
index 314548f..71c6c1c 100644 (file)
 """A Webserver to send alarm notifications from Aodh to the SO."""
 import json
 import logging
 """A Webserver to send alarm notifications from Aodh to the SO."""
 import json
 import logging
+import os
+import re
 import sys
 import time
 
 import sys
 import time
 
-import os
 from six.moves.BaseHTTPServer import BaseHTTPRequestHandler
 from six.moves.BaseHTTPServer import HTTPServer
 
 # Initialise a logger for alarm notifier
 from six.moves.BaseHTTPServer import BaseHTTPRequestHandler
 from six.moves.BaseHTTPServer import HTTPServer
 
 # Initialise a logger for alarm notifier
+from osm_mon.core.message_bus.producer import Producer
+from osm_mon.core.settings import Config
+
+cfg = Config.instance()
 
 logging.basicConfig(stream=sys.stdout,
 
 logging.basicConfig(stream=sys.stdout,
-                    format='%(asctime)s %(message)s',
+                    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
                     datefmt='%m/%d/%Y %I:%M:%S %p',
                     datefmt='%m/%d/%Y %I:%M:%S %p',
-                    level=logging.INFO)
+                    level=logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
 log = logging.getLogger(__name__)
 
 log = logging.getLogger(__name__)
 
+kafka_logger = logging.getLogger('kafka')
+kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
+kafka_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+kafka_handler = logging.StreamHandler(sys.stdout)
+kafka_handler.setFormatter(kafka_formatter)
+kafka_logger.addHandler(kafka_handler)
+
 sys.path.append(os.path.abspath(os.path.join(os.path.realpath(__file__), '..', '..', '..', '..', '..')))
 
 from osm_mon.core.database import DatabaseManager
 sys.path.append(os.path.abspath(os.path.join(os.path.realpath(__file__), '..', '..', '..', '..', '..')))
 
 from osm_mon.core.database import DatabaseManager
-from osm_mon.core.message_bus.producer import KafkaProducer
 
 
-from osm_mon.plugins.OpenStack.common import Common
-from osm_mon.plugins.OpenStack.response import OpenStack_Response
-from osm_mon.plugins.OpenStack.settings import Config
+from osm_mon.plugins.OpenStack.response import OpenStackResponseBuilder
 
 
 class NotifierHandler(BaseHTTPRequestHandler):
 
 
 class NotifierHandler(BaseHTTPRequestHandler):
@@ -61,7 +70,6 @@ class NotifierHandler(BaseHTTPRequestHandler):
     def do_GET(self):
         """Get request functionality."""
         self._set_headers()
     def do_GET(self):
         """Get request functionality."""
         self._set_headers()
-        self.wfile.write("<html><body><h1>hi!</h1></body></html>")
 
     def do_POST(self):
         """POST request function."""
 
     def do_POST(self):
         """POST request function."""
@@ -70,63 +78,52 @@ class NotifierHandler(BaseHTTPRequestHandler):
         # Gets the size of data
         content_length = int(self.headers['Content-Length'])
         post_data = self.rfile.read(content_length)
         # Gets the size of data
         content_length = int(self.headers['Content-Length'])
         post_data = self.rfile.read(content_length)
-        self.wfile.write("<html><body><h1>POST!</h1></body></tml>")
-        log.info("This alarm was triggered: %s", json.loads(post_data))
+        # Python 2/3 string compatibility
+        try:
+            post_data = post_data.decode()
+        except AttributeError:
+            pass
+        log.info("This alarm was triggered: %s", post_data)
 
 
-        # Generate a notify_alarm response for the SO
-        self.notify_alarm(json.loads(post_data))
+        # Send alarm notification to message bus
+        try:
+            self.notify_alarm(json.loads(post_data))
+        except Exception:
+            log.exception("Error notifying alarm")
 
     def notify_alarm(self, values):
 
     def notify_alarm(self, values):
-        """Send a notification response message to the SO."""
-
-        try:
-            # Initialise configuration and authentication for response message
-            config = Config.instance()
-            config.read_environ()
-            response = OpenStack_Response()
-            producer = KafkaProducer('alarm_response')
-
-            database_manager = DatabaseManager()
-
-            alarm_id = values['alarm_id']
-            # Get vim_uuid associated to alarm
-            creds = database_manager.get_credentials_for_alarm_id(alarm_id, 'openstack')
-            auth_token = Common.get_auth_token(creds.uuid)
-            endpoint = Common.get_endpoint("alarming", creds.uuid)
-
-            # If authenticated generate and send response message
-            if auth_token is not None and endpoint is not None:
-                url = "{}/v2/alarms/%s".format(endpoint) % alarm_id
-
-                # Get the resource_id of the triggered alarm
-                result = Common.perform_request(
-                    url, auth_token, req_type="get")
-                alarm_details = json.loads(result.text)
-                gnocchi_rule = alarm_details['gnocchi_resources_threshold_rule']
-                resource_id = gnocchi_rule['resource_id']
-
-                # Process an alarm notification if resource_id is valid
-                if resource_id is not None:
-                    # Get date and time for response message
-                    a_date = time.strftime("%d-%m-%Y") + " " + time.strftime("%X")
-                    # Try generate and send response
-                    try:
-                        resp_message = response.generate_response(
-                            'notify_alarm', a_id=alarm_id,
-                            r_id=resource_id,
-                            sev=values['severity'], date=a_date,
-                            state=values['current'], vim_type="openstack")
-                        producer.notify_alarm(
-                            'notify_alarm', resp_message, 'alarm_response')
-                        log.info("Sent an alarm response to SO: %s", resp_message)
-                    except Exception as exc:
-                        log.exception("Couldn't notify SO of the alarm:")
-                else:
-                    log.warn("No resource_id for alarm; no SO response sent.")
-            else:
-                log.warn("Authentication failure; SO notification not sent.")
-        except:
-            log.exception("Could not notify alarm.")
+        """Sends alarm notification message to bus."""
+
+        # Initialise configuration and authentication for response message
+        response = OpenStackResponseBuilder()
+
+        database_manager = DatabaseManager()
+
+        alarm_id = values['alarm_id']
+        alarm = database_manager.get_alarm(alarm_id, 'openstack')
+        # Process an alarm notification if resource_id is valid
+        # Get date and time for response message
+        a_date = time.strftime("%d-%m-%Y") + " " + time.strftime("%X")
+        # Generate and send response
+        resp_message = response.generate_response(
+            'notify_alarm',
+            alarm_id=alarm_id,
+            vdu_name=alarm.vdu_name,
+            vnf_member_index=alarm.vnf_member_index,
+            ns_id=alarm.ns_id,
+            metric_name=alarm.metric_name,
+            operation=alarm.operation,
+            threshold_value=alarm.threshold,
+            sev=values['severity'],
+            date=a_date,
+            state=values['current'])
+        self._publish_response('notify_alarm', json.dumps(resp_message))
+        log.info("Sent alarm notification: %s", resp_message)
+
+    def _publish_response(self, key: str, msg: str):
+        producer = Producer()
+        producer.send(topic='alarm_response', key=key, value=msg)
+        producer.flush()
 
 
 def run(server_class=HTTPServer, handler_class=NotifierHandler, port=8662):
 
 
 def run(server_class=HTTPServer, handler_class=NotifierHandler, port=8662):
@@ -134,18 +131,18 @@ def run(server_class=HTTPServer, handler_class=NotifierHandler, port=8662):
     try:
         server_address = ('', port)
         httpd = server_class(server_address, handler_class)
     try:
         server_address = ('', port)
         httpd = server_class(server_address, handler_class)
-        print('Starting alarm notifier...')
         log.info("Starting alarm notifier server on port: %s", port)
         httpd.serve_forever()
     except Exception as exc:
         log.info("Starting alarm notifier server on port: %s", port)
         httpd.serve_forever()
     except Exception as exc:
-        log.warn("Failed to start webserver, %s", exc)
+        log.warning("Failed to start webserver, %s", exc)
 
 
 if __name__ == "__main__":
 
 
 if __name__ == "__main__":
-    from sys import argv
-
-    # Runs the webserver
-    if len(argv) == 2:
-        run(port=int(argv[1]))
+    cfg = Config.instance()
+    p = re.compile(':(\d+)', re.IGNORECASE)
+    m = p.search(cfg.OS_NOTIFIER_URI)
+    if m:
+        port = m.group(1)
+        run(port=int(port))
     else:
         run()
     else:
         run()