X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fplugins%2FOpenStack%2FAodh%2Fnotifier.py;h=a1c85e61f4b49e45bd1e9ba626f6ef0ae30ab47c;hb=93699898c51364cde193d8d441f4aed45670e7bf;hp=3487daae9587c756e976cb1260e706ab47b22239;hpb=c7397b95dbaeebd7d872779eec809daed9e487cc;p=osm%2FMON.git diff --git a/osm_mon/plugins/OpenStack/Aodh/notifier.py b/osm_mon/plugins/OpenStack/Aodh/notifier.py index 3487daa..a1c85e6 100644 --- a/osm_mon/plugins/OpenStack/Aodh/notifier.py +++ b/osm_mon/plugins/OpenStack/Aodh/notifier.py @@ -19,89 +19,129 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: helena.mcgough@intel.com or adrian.hoban@intel.com ## -"""Notifier class for alarm notification response.""" - +# __author__ = Helena McGough +# +"""A Webserver to send alarm notifications from Aodh to the SO.""" import json -import logging as log - -try: - import aodhclient -except ImportError: - log.warn("Failed to import the aodhclient") - - -from core.message_bus.producer import KafkaProducer - -from plugins.OpenStack.Aodh.alarming import Alarming -from plugins.OpenStack.response import OpenStack_Response -from plugins.OpenStack.settings import Config - -__author__ = "Helena McGough" - -ALARM_NAMES = [ - "average_memory_usage_above_threshold", - "disk_read_ops", - "disk_write_ops", - "disk_read_bytes", - "disk_write_bytes", - "net_packets_dropped", - "packets_in_above_threshold", - "packets_out_above_threshold", - "cpu_utilization_above_threshold"] - - -def register_notifier(): - """Run the notifier instance.""" - config = Config.instance() - instance = Notifier(config=config) - instance.config() - instance.notify() - - -class Notifier(object): - """Alarm Notification class.""" - - def __init__(self, config): - """Initialize alarm notifier.""" - log.info("Initialize the notifier for the SO.") - self._config = config - self._response = OpenStack_Response() - self._producer = KafkaProducer("alarm_response") - self._alarming = Alarming() - - def config(self): - """Configure the alarm notifier.""" - log.info("Configure the notifier instance.") - self._config.read_environ("aodh") - - def notify(self): - """Send alarm notifications responses to the SO.""" - log.info("Checking for alarm notifications") - auth_token, endpoint = self._alarming.authenticate() - - while(1): - alarm_list = self._alarming.list_alarms(endpoint, auth_token) - for alarm in json.loads(alarm_list): - alarm_id = alarm['alarm_id'] - alarm_name = alarm['name'] - # Send a notification response to the SO on alarm trigger - if alarm_name in ALARM_NAMES: - alarm_state = self._alarming.get_alarm_state( - endpoint, auth_token, alarm_id) - if alarm_state == "alarm": - # Generate and send an alarm notification response - try: - a_date = alarm['state_timestamp'].replace("T", " ") - rule = alarm['gnocchi_resources_threshold_rule'] - resp_message = self._response.generate_response( - 'notify_alarm', a_id=alarm_id, - r_id=rule['resource_id'], - sev=alarm['severity'], date=a_date, - state=alarm_state, vim_type="OpenStack") - self._producer.notify_alarm( - 'notify_alarm', resp_message, 'alarm_response') - except Exception as exc: - log.warn("Failed to send notify response:%s", exc) - -if aodhclient: - register_notifier() +import logging +import os +import sys +import time + +from six.moves.BaseHTTPServer import BaseHTTPRequestHandler +from six.moves.BaseHTTPServer import HTTPServer + +# Initialise a logger for alarm notifier +from osm_mon.core.message_bus.producer import Producer +from osm_mon.core.settings import Config + +cfg = Config.instance() + +logging.basicConfig(stream=sys.stdout, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + datefmt='%m/%d/%Y %I:%M:%S %p', + level=logging.getLevelName(cfg.OSMMON_LOG_LEVEL)) +log = logging.getLogger(__name__) + +kafka_logger = logging.getLogger('kafka') +kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL)) +kafka_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +kafka_handler = logging.StreamHandler(sys.stdout) +kafka_handler.setFormatter(kafka_formatter) +kafka_logger.addHandler(kafka_handler) + +sys.path.append(os.path.abspath(os.path.join(os.path.realpath(__file__), '..', '..', '..', '..', '..'))) + +from osm_mon.core.database import DatabaseManager + +from osm_mon.plugins.OpenStack.response import OpenStackResponseBuilder + + +class NotifierHandler(BaseHTTPRequestHandler): + """Handler class for alarm_actions triggered by OSM alarms.""" + + def _set_headers(self): + """Set the headers for a request.""" + self.send_response(200) + self.send_header('Content-type', 'text/html') + self.end_headers() + + def do_GET(self): + """Get request functionality.""" + self._set_headers() + + def do_POST(self): + """POST request function.""" + # Gets header and data from the post request and records info + self._set_headers() + # Gets the size of data + content_length = int(self.headers['Content-Length']) + post_data = self.rfile.read(content_length) + # Python 2/3 string compatibility + try: + post_data = post_data.decode() + except AttributeError: + pass + log.info("This alarm was triggered: %s", post_data) + + # Send alarm notification to message bus + try: + self.notify_alarm(json.loads(post_data)) + except Exception: + log.exception("Error notifying alarm") + + def notify_alarm(self, values): + """Sends alarm notification message to bus.""" + + # Initialise configuration and authentication for response message + response = OpenStackResponseBuilder() + + database_manager = DatabaseManager() + + alarm_id = values['alarm_id'] + alarm = database_manager.get_alarm(alarm_id, 'openstack') + # Process an alarm notification if resource_id is valid + # Get date and time for response message + a_date = time.strftime("%d-%m-%Y") + " " + time.strftime("%X") + # Generate and send response + resp_message = response.generate_response( + 'notify_alarm', + alarm_id=alarm_id, + vdu_name=alarm.vdu_name, + vnf_member_index=alarm.vnf_member_index, + ns_id=alarm.ns_id, + metric_name=alarm.metric_name, + operation=alarm.operation, + threshold_value=alarm.threshold, + sev=values['severity'], + date=a_date, + state=values['current']) + self._publish_response('notify_alarm', json.dumps(resp_message)) + log.info("Sent alarm notification: %s", resp_message) + + def _publish_response(self, key: str, msg: str): + producer = Producer() + producer.send(topic='alarm_response', key=key, value=msg) + producer.flush() + + +def run(server_class=HTTPServer, handler_class=NotifierHandler, port=8662): + """Run the webserver application to retrieve alarm notifications.""" + try: + server_address = ('', port) + httpd = server_class(server_address, handler_class) + print('Starting alarm notifier...') + log.info("Starting alarm notifier server on port: %s", port) + httpd.serve_forever() + except Exception as exc: + log.warning("Failed to start webserver, %s", exc) + + +if __name__ == "__main__": + from sys import argv + + # Runs the webserver + if len(argv) == 2: + run(port=int(argv[1])) + else: + run()