Refactors codebase
[osm/MON.git] / osm_mon / plugins / OpenStack / Aodh / notifier.py
index 3487daa..a1c85e6 100644 (file)
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
 ##
-"""Notifier class for alarm notification response."""
-
+# __author__ = Helena McGough
+#
+"""A Webserver to send alarm notifications from Aodh to the SO."""
 import json
-import logging as log
-
-try:
-    import aodhclient
-except ImportError:
-    log.warn("Failed to import the aodhclient")
-
-
-from core.message_bus.producer import KafkaProducer
-
-from plugins.OpenStack.Aodh.alarming import Alarming
-from plugins.OpenStack.response import OpenStack_Response
-from plugins.OpenStack.settings import Config
-
-__author__ = "Helena McGough"
-
-ALARM_NAMES = [
-    "average_memory_usage_above_threshold",
-    "disk_read_ops",
-    "disk_write_ops",
-    "disk_read_bytes",
-    "disk_write_bytes",
-    "net_packets_dropped",
-    "packets_in_above_threshold",
-    "packets_out_above_threshold",
-    "cpu_utilization_above_threshold"]
-
-
-def register_notifier():
-    """Run the notifier instance."""
-    config = Config.instance()
-    instance = Notifier(config=config)
-    instance.config()
-    instance.notify()
-
-
-class Notifier(object):
-    """Alarm Notification class."""
-
-    def __init__(self, config):
-        """Initialize alarm notifier."""
-        log.info("Initialize the notifier for the SO.")
-        self._config = config
-        self._response = OpenStack_Response()
-        self._producer = KafkaProducer("alarm_response")
-        self._alarming = Alarming()
-
-    def config(self):
-        """Configure the alarm notifier."""
-        log.info("Configure the notifier instance.")
-        self._config.read_environ("aodh")
-
-    def notify(self):
-        """Send alarm notifications responses to the SO."""
-        log.info("Checking for alarm notifications")
-        auth_token, endpoint = self._alarming.authenticate()
-
-        while(1):
-            alarm_list = self._alarming.list_alarms(endpoint, auth_token)
-            for alarm in json.loads(alarm_list):
-                alarm_id = alarm['alarm_id']
-                alarm_name = alarm['name']
-                # Send a notification response to the SO on alarm trigger
-                if alarm_name in ALARM_NAMES:
-                    alarm_state = self._alarming.get_alarm_state(
-                        endpoint, auth_token, alarm_id)
-                    if alarm_state == "alarm":
-                        # Generate and send an alarm notification response
-                        try:
-                            a_date = alarm['state_timestamp'].replace("T", " ")
-                            rule = alarm['gnocchi_resources_threshold_rule']
-                            resp_message = self._response.generate_response(
-                                'notify_alarm', a_id=alarm_id,
-                                r_id=rule['resource_id'],
-                                sev=alarm['severity'], date=a_date,
-                                state=alarm_state, vim_type="OpenStack")
-                            self._producer.notify_alarm(
-                                'notify_alarm', resp_message, 'alarm_response')
-                        except Exception as exc:
-                            log.warn("Failed to send notify response:%s", exc)
-
-if aodhclient:
-    register_notifier()
+import logging
+import os
+import sys
+import time
+
+from six.moves.BaseHTTPServer import BaseHTTPRequestHandler
+from six.moves.BaseHTTPServer import HTTPServer
+
+# Initialise a logger for alarm notifier
+from osm_mon.core.message_bus.producer import Producer
+from osm_mon.core.settings import Config
+
+cfg = Config.instance()
+
+logging.basicConfig(stream=sys.stdout,
+                    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+                    datefmt='%m/%d/%Y %I:%M:%S %p',
+                    level=logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
+log = logging.getLogger(__name__)
+
+kafka_logger = logging.getLogger('kafka')
+kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
+kafka_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+kafka_handler = logging.StreamHandler(sys.stdout)
+kafka_handler.setFormatter(kafka_formatter)
+kafka_logger.addHandler(kafka_handler)
+
+sys.path.append(os.path.abspath(os.path.join(os.path.realpath(__file__), '..', '..', '..', '..', '..')))
+
+from osm_mon.core.database import DatabaseManager
+
+from osm_mon.plugins.OpenStack.response import OpenStackResponseBuilder
+
+
+class NotifierHandler(BaseHTTPRequestHandler):
+    """Handler class for alarm_actions triggered by OSM alarms."""
+
+    def _set_headers(self):
+        """Set the headers for a request."""
+        self.send_response(200)
+        self.send_header('Content-type', 'text/html')
+        self.end_headers()
+
+    def do_GET(self):
+        """Get request functionality."""
+        self._set_headers()
+
+    def do_POST(self):
+        """POST request function."""
+        # Gets header and data from the post request and records info
+        self._set_headers()
+        # Gets the size of data
+        content_length = int(self.headers['Content-Length'])
+        post_data = self.rfile.read(content_length)
+        # Python 2/3 string compatibility
+        try:
+            post_data = post_data.decode()
+        except AttributeError:
+            pass
+        log.info("This alarm was triggered: %s", post_data)
+
+        # Send alarm notification to message bus
+        try:
+            self.notify_alarm(json.loads(post_data))
+        except Exception:
+            log.exception("Error notifying alarm")
+
+    def notify_alarm(self, values):
+        """Sends alarm notification message to bus."""
+
+        # Initialise configuration and authentication for response message
+        response = OpenStackResponseBuilder()
+
+        database_manager = DatabaseManager()
+
+        alarm_id = values['alarm_id']
+        alarm = database_manager.get_alarm(alarm_id, 'openstack')
+        # Process an alarm notification if resource_id is valid
+        # Get date and time for response message
+        a_date = time.strftime("%d-%m-%Y") + " " + time.strftime("%X")
+        # Generate and send response
+        resp_message = response.generate_response(
+            'notify_alarm',
+            alarm_id=alarm_id,
+            vdu_name=alarm.vdu_name,
+            vnf_member_index=alarm.vnf_member_index,
+            ns_id=alarm.ns_id,
+            metric_name=alarm.metric_name,
+            operation=alarm.operation,
+            threshold_value=alarm.threshold,
+            sev=values['severity'],
+            date=a_date,
+            state=values['current'])
+        self._publish_response('notify_alarm', json.dumps(resp_message))
+        log.info("Sent alarm notification: %s", resp_message)
+
+    def _publish_response(self, key: str, msg: str):
+        producer = Producer()
+        producer.send(topic='alarm_response', key=key, value=msg)
+        producer.flush()
+
+
+def run(server_class=HTTPServer, handler_class=NotifierHandler, port=8662):
+    """Run the webserver application to retrieve alarm notifications."""
+    try:
+        server_address = ('', port)
+        httpd = server_class(server_address, handler_class)
+        print('Starting alarm notifier...')
+        log.info("Starting alarm notifier server on port: %s", port)
+        httpd.serve_forever()
+    except Exception as exc:
+        log.warning("Failed to start webserver, %s", exc)
+
+
+if __name__ == "__main__":
+    from sys import argv
+
+    # Runs the webserver
+    if len(argv) == 2:
+        run(port=int(argv[1]))
+    else:
+        run()