Added a Common KafkaConsumer for all of the plugins
[osm/MON.git] / plugins / OpenStack / Aodh / alarming.py
index d409d71..2343372 100644 (file)
 """Carry out alarming requests via Aodh API."""
 
 import json
+
 import logging
-log = logging.getLogger(__name__)
 
 from core.message_bus.producer import KafkaProducer
 
-from kafka import KafkaConsumer
-
-from plugins.OpenStack.common import Common
 from plugins.OpenStack.response import OpenStack_Response
+from plugins.OpenStack.settings import Config
 
 __author__ = "Helena McGough"
 
+log = logging.getLogger(__name__)
+
 ALARM_NAMES = {
     "average_memory_usage_above_threshold": "average_memory_utilization",
     "disk_read_ops": "disk_read_ops",
@@ -65,14 +65,14 @@ class Alarming(object):
 
     def __init__(self):
         """Create the OpenStack alarming instance."""
-        self._common = Common()
+        # Initialize configuration and notifications
+        config = Config.instance()
+        config.read_environ("aodh")
 
-        # TODO(mcgoughh): Remove hardcoded kafkaconsumer
-        # Initialize a generic consumer object to consume message from the SO
-        server = {'server': 'localhost:9092', 'topic': 'alarm_request'}
-        self._consumer = KafkaConsumer(server['topic'],
-                                       group_id='osm_mon',
-                                       bootstrap_servers=server['server'])
+        # Initialise authentication for API requests
+        self.auth_token = None
+        self.endpoint = None
+        self.common = None
 
         # Use the Response class to generate valid json response messages
         self._response = OpenStack_Response()
@@ -80,121 +80,126 @@ class Alarming(object):
         # Initializer a producer to send responses back to SO
         self._producer = KafkaProducer("alarm_response")
 
-    def alarming(self):
+    def alarming(self, message, common, auth_token):
         """Consume info from the message bus to manage alarms."""
-        # Check the alarming functionlity that needs to be performed
-        for message in self._consumer:
-
-            values = json.loads(message.value)
-            vim_type = values['vim_type'].lower()
-
-            if vim_type == "openstack":
-                log.info("Alarm action required: %s" % (message.topic))
-
-                # Generate and auth_token and endpoint for request
-                auth_token, endpoint = self.authenticate()
-
-                if message.key == "create_alarm_request":
-                    # Configure/Update an alarm
-                    alarm_details = values['alarm_create_request']
-
-                    alarm_id, alarm_status = self.configure_alarm(
-                        endpoint, auth_token, alarm_details)
-
-                    # Generate a valid response message, send via producer
-                    try:
-                        if alarm_status is True:
-                            log.info("Alarm successfully created")
-
-                        resp_message = self._response.generate_response(
-                            'create_alarm_response', status=alarm_status,
-                            alarm_id=alarm_id,
-                            cor_id=alarm_details['correlation_id'])
-                        log.info("Response Message: %s", resp_message)
-                        self._producer.create_alarm_response(
-                            'create_alarm_resonse', resp_message,
-                            'alarm_response')
-                    except Exception as exc:
-                        log.warn("Response creation failed: %s", exc)
-
-                elif message.key == "list_alarm_request":
-                    # Check for a specifed: alarm_name, resource_uuid, severity
-                    # and generate the appropriate list
-                    list_details = values['alarm_list_request']
-
-                    alarm_list = self.list_alarms(
-                        endpoint, auth_token, list_details)
-
-                    try:
-                        # Generate and send a list response back
-                        resp_message = self._response.generate_response(
-                            'list_alarm_response', alarm_list=alarm_list,
-                            cor_id=list_details['correlation_id'])
-                        log.info("Response Message: %s", resp_message)
-                        self._producer.list_alarm_response(
-                            'list_alarm_response', resp_message,
-                            'alarm_response')
-                    except Exception as exc:
-                        log.warn("Failed to send a valid response back.")
-
-                elif message.key == "delete_alarm_request":
-                    request_details = values['alarm_delete_request']
-                    alarm_id = request_details['alarm_uuid']
-
-                    resp_status = self.delete_alarm(
-                        endpoint, auth_token, alarm_id)
-
-                    # Generate and send a response message
-                    try:
-                        resp_message = self._response.generate_response(
-                            'delete_alarm_response', alarm_id=alarm_id,
-                            status=resp_status,
-                            cor_id=request_details['correlation_id'])
-                        log.info("Response message: %s", resp_message)
-                        self._producer.delete_alarm_response(
-                            'delete_alarm_response', resp_message,
-                            'alarm_response')
-                    except Exception as exc:
-                        log.warn("Failed to create delete reponse:%s", exc)
-
-                elif message.key == "acknowledge_alarm":
-                    # Acknowledge that an alarm has been dealt with by the SO
-                    alarm_id = values['ack_details']['alarm_uuid']
-
-                    response = self.update_alarm_state(
-                        endpoint, auth_token, alarm_id)
-
-                    # Log if an alarm was reset
-                    if response is True:
-                        log.info("Acknowledged the alarm and cleared it.")
-                    else:
-                        log.warn("Failed to acknowledge/clear the alarm.")
-
-                elif message.key == "update_alarm_request":
-                    # Update alarm configurations
-                    alarm_details = values['alarm_update_request']
-
-                    alarm_id, status = self.update_alarm(
-                        endpoint, auth_token, alarm_details)
-
-                    # Generate a response for an update request
-                    try:
-                        resp_message = self._response.generate_response(
-                            'update_alarm_response', alarm_id=alarm_id,
-                            cor_id=alarm_details['correlation_id'],
-                            status=status)
-                        log.info("Response message: %s", resp_message)
-                        self._producer.update_alarm_response(
-                            'update_alarm_response', resp_message,
-                            'alarm_response')
-                    except Exception as exc:
-                        log.warn("Failed to send an update response:%s", exc)
+        values = json.loads(message.value)
+        self.common = common
 
-                else:
-                    log.debug("Unknown key, no action will be performed")
+        log.info("OpenStack alarm action required.")
+
+        # Generate and auth_token and endpoint for request
+        if auth_token is not None:
+            if self.auth_token != auth_token:
+                log.info("Auth_token for alarming set by access_credentials.")
+                self.auth_token = auth_token
             else:
-                log.info("Message topic not relevant to this plugin: %s",
-                         message.topic)
+                log.info("Auth_token has not been updated.")
+        else:
+            log.info("Using environment variables to set auth_token for Aodh.")
+            self.auth_token = self.common._authenticate()
+
+        if self.endpoint is None:
+            log.info("Generating a new endpoint for Aodh.")
+            self.endpoint = self.common.get_endpoint("alarming")
+
+        if message.key == "create_alarm_request":
+            # Configure/Update an alarm
+            alarm_details = values['alarm_create_request']
+
+            alarm_id, alarm_status = self.configure_alarm(
+                self.endpoint, self.auth_token, alarm_details)
+
+            # Generate a valid response message, send via producer
+            try:
+                if alarm_status is True:
+                    log.info("Alarm successfully created")
+
+                resp_message = self._response.generate_response(
+                    'create_alarm_response', status=alarm_status,
+                    alarm_id=alarm_id,
+                    cor_id=alarm_details['correlation_id'])
+                log.info("Response Message: %s", resp_message)
+                self._producer.create_alarm_response(
+                    'create_alarm_resonse', resp_message,
+                    'alarm_response')
+            except Exception as exc:
+                log.warn("Response creation failed: %s", exc)
+
+        elif message.key == "list_alarm_request":
+            # Check for a specifed: alarm_name, resource_uuid, severity
+            # and generate the appropriate list
+            list_details = values['alarm_list_request']
+
+            alarm_list = self.list_alarms(
+                self.endpoint, self.auth_token, list_details)
+
+            try:
+                # Generate and send a list response back
+                resp_message = self._response.generate_response(
+                    'list_alarm_response', alarm_list=alarm_list,
+                    cor_id=list_details['correlation_id'])
+                log.info("Response Message: %s", resp_message)
+                self._producer.list_alarm_response(
+                    'list_alarm_response', resp_message,
+                    'alarm_response')
+            except Exception as exc:
+                log.warn("Failed to send a valid response back.")
+
+        elif message.key == "delete_alarm_request":
+            request_details = values['alarm_delete_request']
+            alarm_id = request_details['alarm_uuid']
+
+            resp_status = self.delete_alarm(
+                self.endpoint, self.auth_token, alarm_id)
+
+            # Generate and send a response message
+            try:
+                resp_message = self._response.generate_response(
+                    'delete_alarm_response', alarm_id=alarm_id,
+                    status=resp_status,
+                    cor_id=request_details['correlation_id'])
+                log.info("Response message: %s", resp_message)
+                self._producer.delete_alarm_response(
+                    'delete_alarm_response', resp_message,
+                    'alarm_response')
+            except Exception as exc:
+                log.warn("Failed to create delete reponse:%s", exc)
+
+        elif message.key == "acknowledge_alarm":
+            # Acknowledge that an alarm has been dealt with by the SO
+            alarm_id = values['ack_details']['alarm_uuid']
+
+            response = self.update_alarm_state(
+                self.endpoint, self.auth_token, alarm_id)
+
+            # Log if an alarm was reset
+            if response is True:
+                log.info("Acknowledged the alarm and cleared it.")
+            else:
+                log.warn("Failed to acknowledge/clear the alarm.")
+
+        elif message.key == "update_alarm_request":
+            # Update alarm configurations
+            alarm_details = values['alarm_update_request']
+
+            alarm_id, status = self.update_alarm(
+                self.endpoint, self.auth_token, alarm_details)
+
+            # Generate a response for an update request
+            try:
+                resp_message = self._response.generate_response(
+                    'update_alarm_response', alarm_id=alarm_id,
+                    cor_id=alarm_details['correlation_id'],
+                    status=status)
+                log.info("Response message: %s", resp_message)
+                self._producer.update_alarm_response(
+                    'update_alarm_response', resp_message,
+                    'alarm_response')
+            except Exception as exc:
+                log.warn("Failed to send an update response:%s", exc)
+
+        else:
+            log.debug("Unknown key, no action will be performed")
 
         return
 
@@ -222,7 +227,7 @@ class Alarming(object):
                 # Create the alarm if metric is available
                 payload = self.check_payload(values, metric_name, resource_id,
                                              alarm_name)
-                new_alarm = self._common._perform_request(
+                new_alarm = self.common._perform_request(
                     url, auth_token, req_type="post", payload=payload)
                 return json.loads(new_alarm.text)['alarm_id'], True
             else:
@@ -238,7 +243,7 @@ class Alarming(object):
         url = "{}/v2/alarms/%s".format(endpoint) % (alarm_id)
 
         try:
-            result = self._common._perform_request(
+            result = self.common._perform_request(
                 url, auth_token, req_type="delete")
             if str(result.status_code) == "404":
                 log.info("Alarm doesn't exist: %s", result.status_code)
@@ -257,7 +262,12 @@ class Alarming(object):
         a_list, name_list, sev_list, res_list = [], [], [], []
 
         # TODO(mcgoughh): for now resource_id is a mandatory field
-        resource = list_details['resource_uuid']
+        # Check for a reqource is
+        try:
+            resource = list_details['resource_uuid']
+        except KeyError as exc:
+            log.warn("Resource id not specified for list request: %s", exc)
+            return None
 
         # Checking what fields are specified for a list request
         try:
@@ -278,7 +288,7 @@ class Alarming(object):
 
         # Perform the request to get the desired list
         try:
-            result = self._common._perform_request(
+            result = self.common._perform_request(
                 url, auth_token, req_type="get")
 
             if result is not None:
@@ -333,7 +343,7 @@ class Alarming(object):
         payload = json.dumps("ok")
 
         try:
-            self._common._perform_request(
+            self.common._perform_request(
                 url, auth_token, req_type="put", payload=payload)
             return True
         except Exception as exc:
@@ -347,7 +357,7 @@ class Alarming(object):
 
         # Gets current configurations about the alarm
         try:
-            result = self._common._perform_request(
+            result = self.common._perform_request(
                 url, auth_token, req_type="get")
             alarm_name = json.loads(result.text)['name']
             rule = json.loads(result.text)['gnocchi_resources_threshold_rule']
@@ -366,7 +376,7 @@ class Alarming(object):
         # Updates the alarm configurations with the valid payload
         if payload is not None:
             try:
-                update_alarm = self._common._perform_request(
+                update_alarm = self.common._perform_request(
                     url, auth_token, req_type="put", payload=payload)
 
                 return json.loads(update_alarm.text)['alarm_id'], True
@@ -406,23 +416,12 @@ class Alarming(object):
             log.warn("Alarm is not configured correctly: %s", exc)
         return None
 
-    def authenticate(self):
-        """Generate an authentication token and endpoint for alarm request."""
-        try:
-            # Check for a tenant_id
-            auth_token = self._common._authenticate()
-            endpoint = self._common.get_endpoint("alarming")
-            return auth_token, endpoint
-        except Exception as exc:
-            log.warn("Authentication to Keystone failed:%s", exc)
-        return None, None
-
     def get_alarm_state(self, endpoint, auth_token, alarm_id):
         """Get the state of the alarm."""
         url = "{}/v2/alarms/%s/state".format(endpoint) % alarm_id
 
         try:
-            alarm_state = self._common._perform_request(
+            alarm_state = self.common._perform_request(
                 url, auth_token, req_type="get")
             return json.loads(alarm_state.text)
         except Exception as exc:
@@ -432,10 +431,10 @@ class Alarming(object):
     def check_for_metric(self, auth_token, m_name, r_id):
         """Check for the alarm metric."""
         try:
-            endpoint = self._common.get_endpoint("metric")
+            endpoint = self.common.get_endpoint("metric")
 
             url = "{}/v1/metric/".format(endpoint)
-            metric_list = self._common._perform_request(
+            metric_list = self.common._perform_request(
                 url, auth_token, req_type="get")
 
             for metric in json.loads(metric_list.text):