[MON] Implements multithreading for message consumption
[osm/MON.git] / osm_mon / plugins / CloudWatch / plugin_alarm.py
index eb48208..40e7fe5 100644 (file)
 '''
 AWS-Plugin implements all the methods of MON to interact with AWS using the BOTO client
 '''
+from io import open
+from osm_mon.core.message_bus.producer import KafkaProducer
+from osm_mon.plugins.CloudWatch.metric_alarms import MetricAlarm
+from osm_mon.plugins.CloudWatch.metrics import Metrics
 
 __author__ = "Wajeeha Hamid"
 __date__   = "18-September-2017"
 
-import sys
 import json
-import logging as log
-from jsmin import jsmin
-from connection import Connection
-from metric_alarms import MetricAlarm
-from metrics import Metrics
-from kafka import KafkaConsumer
-sys.path.append("../../core/message-bus")
-from producer import KafkaProducer
-
-class Plugin():
+import logging
+
+log = logging.getLogger(__name__)
+
+class plugin_alarms():
     """Receives Alarm info from MetricAlarm and connects with the consumer/producer"""
     def __init__ (self): 
-        self.conn = Connection()
         self.metricAlarm = MetricAlarm()
         self.metric = Metrics()
-        server = {'server': 'localhost:9092', 'topic': 'alarm_request'}
-        self._consumer = KafkaConsumer(server['topic'], bootstrap_servers=server['server'])
-        self._consumer.subscribe(['alarm_request'])
         self.producer = KafkaProducer('')     
-#---------------------------------------------------------------------------------------------------------------------------      
-    def connection(self):
-        """Connecting instances with CloudWatch"""
-        self.conn.setEnvironment()
-        self.conn = self.conn.connection_instance()
-        self.cloudwatch_conn = self.conn['cloudwatch_connection']
-        self.ec2_conn = self.conn['ec2_connection']
 #---------------------------------------------------------------------------------------------------------------------------   
     def configure_alarm(self,alarm_info):
         alarm_id = self.metricAlarm.config_alarm(self.cloudwatch_conn,alarm_info)
@@ -76,118 +63,111 @@ class Plugin():
         return self.metric.metricsData(self.cloudwatch_conn,metric_name,period,instance_id)
 #--------------------------------------------------------------------------------------------------------------------------- 
 
-    def consumer(self):
-        """Consume info from the message bus to manage alarms."""
-        try: 
-            for message in self._consumer:
-                # Check the Functionlity that needs to be performed: topic = 'alarms'/'metrics'/'Access_Credentials'
-                if message.topic == "alarm_request":
-                    log.info("Action required against: %s" % (message.topic))
+    def alarm_calls(self,message,aws_conn):
+        """Gets the message from the common consumer"""
+        try:
+            self.cloudwatch_conn = aws_conn['cloudwatch_connection']
+            self.ec2_conn = aws_conn['ec2_connection'] 
+
+            log.info("Action required against: %s" % (message.topic))
+            alarm_info = json.loads(message.value)
+
+            if message.key == "create_alarm_request":  
+                alarm_inner_dict = alarm_info['alarm_create_request']
+                metric_status = self.check_metric(alarm_inner_dict['metric_name'])                            
+             
+                if self.check_resource(alarm_inner_dict['resource_uuid']) == True and metric_status['status'] == True:
+                    log.debug ("Resource and Metrics exists")
+                
+                    alarm_info['alarm_create_request']['metric_name'] = metric_status['metric_name']
+                    #Generate a valid response message, send via producer
+                    config_resp = self.configure_alarm(alarm_info) #alarm_info = message.value
+             
+                    if config_resp == None:
+                        log.debug("Alarm Already exists")
+                        payload = json.dumps(config_resp)                                   
+                        file = open('../../core/models/create_alarm_resp.json','wb').write((payload))
+                        self.producer.publish_alarm_response(key='create_alarm_response',message=payload)
+             
+                    else: 
+                        payload = json.dumps(config_resp)                                
+                        file = open('../../core/models/create_alarm_resp.json','wb').write((payload))                           
+                        self.producer.publish_alarm_response(key='create_alarm_response',message=payload)
+                        log.info("New alarm created with alarm info: %s", config_resp)                           
+             
+                else:
+                    log.error("Resource ID doesn't exists")                
+                
+            elif message.key == "acknowledge_alarm":
+                alarm_inner_dict = alarm_info['ack_details']
+                
+                if self.check_resource(alarm_inner_dict['resource_uuid']) == True: 
                     alarm_info = json.loads(message.value)
+                    #Generate a valid response message, send via producer
+                    ack_details = self.get_ack_details(alarm_info)
+                    payload = json.dumps(ack_details)                                  
+                    file = open('../../core/models/notify_alarm.json','wb').write((payload))
+                    self.producer.notify_alarm(key='notify_alarm',message=payload)
+                    log.info("Acknowledge sent: %s", ack_details)
+
+                else:
+                    log.error("Resource ID is Incorrect")                        
+
+
+            elif message.key == "update_alarm_request":                         
+                alarm_inner_dict = alarm_info['alarm_update_request']
+                metric_status = self.check_metric(alarm_inner_dict['metric_name'])
+                
+                if metric_status['status'] == True:
+                    log.debug ("Resource and Metrics exists")
+                    alarm_info['alarm_update_request']['metric_name'] = metric_status['metric_name']
+                    #Generate a valid response message, send via producer
+                    update_resp = self.update_alarm_configuration(alarm_info)
+
+                    if update_resp == None:                                    
+                        payload = json.dumps(update_resp)                                   
+                        file = open('../../core/models/update_alarm_resp.json','wb').write((payload))
+                        self.producer.update_alarm_response(key='update_alarm_response',message=payload)
+                        log.debug("Alarm Already exists")
+
+                    else: 
+                        payload = json.dumps(update_resp)                                   
+                        file = open('../../core/models/update_alarm_resp.json','wb').write((payload))
+                        self.producer.update_alarm_response(key='update_alarm_response',message=payload)
+                        log.info("Alarm Updated with alarm info: %s", update_resp)                           
+
+                else:
+                    log.info ("Metric Not Supported")
+         
+            
+            elif message.key == "delete_alarm_request":  
+                del_info = json.loads(message.value)
+                #Generate a valid response message, send via producer
+                del_resp = self.delete_alarm(del_info)
+                payload = json.dumps(del_resp)                                   
+                file = open('../../core/models/delete_alarm_resp.json','wb').write((payload))
+                self.producer.delete_alarm_response(key='delete_alarm_response',message=payload)
+                log.info("Alarm Deleted with alarm info: %s", del_resp)
+
+       
+            elif message.key == "alarm_list_request":
+                alarm_inner_dict = alarm_info['alarm_list_request']
+                
+                if self.check_resource(alarm_inner_dict['resource_uuid']) == True or alarm_inner_dict['resource_uuid'] == "": 
+                    #Generate a valid response message, send via producer
+                    list_resp = self.get_alarms_list(alarm_info)#['alarm_names']
+                    payload = json.dumps(list_resp)                                                                 
+                    file = open('../../core/models/list_alarm_resp.json','wb').write((payload))
+                    self.producer.list_alarm_response(key='list_alarm_response',message=payload)
 
-                    if message.key == "create_alarm_request":  
-                        if alarm_info['vim_type'] == 'AWS':                        
-                            alarm_inner_dict = alarm_info['alarm_create_request']
-                            metric_status = self.check_metric(alarm_inner_dict['metric_name'])                            
-                            if self.check_resource(alarm_inner_dict['resource_uuid']) == True and metric_status['status'] == True:
-                                log.debug ("Resource and Metrics exists")
-                            
-                                alarm_info['alarm_create_request']['metric_name'] = metric_status['metric_name']
-                                #Generate a valid response message, send via producer
-                                config_resp = self.configure_alarm(alarm_info) #alarm_info = message.value
-                                if config_resp == None:
-                                    log.debug("Alarm Already exists")
-                                    payload = json.dumps(config_resp)                                   
-                                    file = open('../../core/models/create_alarm_resp.json','wb').write((payload))
-                                    self.producer.create_alarm_response(key='create_alarm_response',message=payload,topic = 'alarm_response')
-                                else: 
-                                    payload = json.dumps(config_resp)                                   
-                                    file = open('../../core/models/create_alarm_resp.json','wb').write((payload))
-                                    
-                                    self.producer.create_alarm_response(key='create_alarm_response',message=payload,topic = 'alarm_response')
-                                    log.info("New alarm created with alarm info: %s", config_resp)                           
-                            else:
-                                log.error("Resource ID doesn't exists")
-                        else:
-                            log.error("Plugin inputs are incorrect")
-                        
-
-                    elif message.key == "acknowledge_alarm":
-                        alarm_inner_dict = alarm_info['ack_details']
-                        if alarm_info['vim_type'] == 'AWS':
-                            if self.check_resource(alarm_inner_dict['resource_uuid']) == True: 
-                                alarm_info = json.loads(message.value)
-                                #Generate a valid response message, send via producer
-                                ack_details = self.get_ack_details(alarm_info)
-                                payload = json.dumps(ack_details)                                  
-                                file = open('../../core/models/notify_alarm.json','wb').write((payload))
-                                self.producer.notify_alarm(key='notify_alarm',message=payload,topic = 'alarm_response')
-                                log.info("Acknowledge sent: %s", ack_details)
-                            else:
-                                log.error("Resource ID is Incorrect")    
-                        else:
-                            log.error(" VIM type incorrect ")     
-
-
-                    elif message.key == "update_alarm_request":
-                        if alarm_info['vim_type'] == 'AWS':                         
-                            alarm_inner_dict = alarm_info['alarm_update_request']
-                            metric_status = self.check_metric(alarm_inner_dict['metric_name'])
-                            
-                            if metric_status['status'] == True:
-                                log.debug ("Resource and Metrics exists")
-                                alarm_info['alarm_update_request']['metric_name'] = metric_status['metric_name']
-                                #Generate a valid response message, send via producer
-                                update_resp = self.update_alarm_configuration(alarm_info)
-                                if update_resp == None:                                    
-                                    payload = json.dumps(update_resp)                                   
-                                    file = open('../../core/models/update_alarm_resp.json','wb').write((payload))
-                                    self.producer.update_alarm_response(key='update_alarm_response',message=payload,topic = 'alarm_response')
-                                    log.debug("Alarm Already exists")
-                                else: 
-                                    payload = json.dumps(update_resp)                                   
-                                    file = open('../../core/models/update_alarm_resp.json','wb').write((payload))
-                                    self.producer.update_alarm_response(key='update_alarm_response',message=payload,topic = 'alarm_response')
-                                    log.info("Alarm Updated with alarm info: %s", update_resp)                           
-                            else:
-                                log.info ("Metric Not Supported")
-                        else:
-                            log.error(" VIM type Incorrect ")  
-                    
-                    elif message.key == "delete_alarm_request":  
-                        if alarm_info['vim_type'] == 'AWS':
-                            del_info = json.loads(message.value)
-                            #Generate a valid response message, send via producer
-                            del_resp = self.delete_alarm(del_info)
-                            payload = json.dumps(del_resp)                                   
-                            file = open('../../core/models/delete_alarm_resp.json','wb').write((payload))
-                            self.producer.delete_alarm_response(key='delete_alarm_response',message=payload,topic = 'alarm_response')
-                            log.info("Alarm Deleted with alarm info: %s", del_resp)
-                        else: 
-                            log.error(" VIM type Incorrect ")     
-               
-                    elif message.key == "alarm_list_request":
-                        alarm_inner_dict = alarm_info['alarm_list_request']
-                        if alarm_info['vim_type'] == 'AWS': 
-                            if self.check_resource(alarm_inner_dict['resource_uuid']) == True or alarm_inner_dict['resource_uuid'] == "": 
-                                #Generate a valid response message, send via producer
-                                list_resp = self.get_alarms_list(alarm_info)#['alarm_names']
-                                payload = json.dumps(list_resp)                                                                 
-                                file = open('../../core/models/list_alarm_resp.json','wb').write((payload))
-                                self.producer.list_alarm_response(key='list_alarm_response',message=payload,topic = 'alarm_response')
-                            else:
-                                log.error("Resource ID is Incorrect")    
-                        else:
-                            log.error(" VIM type Incorrect ") 
-                                
-                    else:
-                        log.debug("Unknown key, no action will be performed")
-                           
                 else:
-                    log.info("Message topic not relevant to this plugin: %s",
-                         message.topic)    
+                    log.error("Resource ID is Incorrect")             
+
+            else:
+                log.debug("Unknown key, no action will be performed")    
+
         except Exception as e:
-                log.error("Consumer exception: %s", str(e))             
+                log.error("Message retrieval exception: %s", str(e))             
 #--------------------------------------------------------------------------------------------------------------------------- 
     def check_resource(self,resource_uuid):
         '''Finding Resource with the resource_uuid'''
@@ -198,6 +178,7 @@ class Plugin():
             #resource_id
             for instance_id in instances:
                 instance_id = str(instance_id).split(':')[1]
+
                 if instance_id == resource_uuid:
                     check_resp['resource_uuid'] = resource_uuid
                     return True 
@@ -215,36 +196,41 @@ class Plugin():
             if metric_name == 'CPU_UTILIZATION':
                 metric_name = 'CPUUtilization'
                 metric_status = True
+
             elif metric_name == 'DISK_READ_OPS':
                 metric_name = 'DiskReadOps'
                 metric_status = True
+
             elif metric_name == 'DISK_WRITE_OPS':
                 metric_name = 'DiskWriteOps'
                 metric_status = True
+
             elif metric_name == 'DISK_READ_BYTES':
                 metric_name = 'DiskReadBytes'
                 metric_status = True
+
             elif metric_name == 'DISK_WRITE_BYTES':
                 metric_name = 'DiskWriteBytes'
                 metric_status = True
+
             elif metric_name == 'PACKETS_RECEIVED':
                 metric_name = 'NetworkPacketsIn'
                 metric_status = True
+
             elif metric_name == 'PACKETS_SENT':
                 metric_name = 'NetworkPacketsOut'
                 metric_status = True
+
             else:
                 metric_name = None
                 metric_status = False
             check_resp['metric_name'] = metric_name
             #status
+
             if metric_status == True:
                 check_resp['status'] = True
                 return check_resp   
+
         except Exception as e: 
             log.error("Error in Plugin Inputs %s",str(e)) 
 #--------------------------------------------------------------------------------------------------------------------------- 
-
-obj = Plugin()
-obj.connection()
-obj.consumer()