[MON] Implements multithreading for message consumption
[osm/MON.git] / osm_mon / plugins / CloudWatch / plugin_alarm.py
index adc4d29..40e7fe5 100644 (file)
 '''
 AWS-Plugin implements all the methods of MON to interact with AWS using the BOTO client
 '''
+from io import open
+from osm_mon.core.message_bus.producer import KafkaProducer
+from osm_mon.plugins.CloudWatch.metric_alarms import MetricAlarm
+from osm_mon.plugins.CloudWatch.metrics import Metrics
 
 __author__ = "Wajeeha Hamid"
 __date__   = "18-September-2017"
 
-import sys
 import json
-import logging as log
-from jsmin import jsmin
-from connection import Connection
-from metric_alarms import MetricAlarm
-from metrics import Metrics
-sys.path.append("../../core/message_bus")
-from producer import KafkaProducer
+import logging
+
+log = logging.getLogger(__name__)
 
 class plugin_alarms():
     """Receives Alarm info from MetricAlarm and connects with the consumer/producer"""
     def __init__ (self): 
-        self.conn = Connection()
         self.metricAlarm = MetricAlarm()
         self.metric = Metrics()
-        self.connection()
         self.producer = KafkaProducer('')     
-#---------------------------------------------------------------------------------------------------------------------------      
-    def connection(self):
-        """Connecting instances with CloudWatch"""
-        self.conn.setEnvironment()
-        self.conn = self.conn.connection_instance()
-        self.cloudwatch_conn = self.conn['cloudwatch_connection']
-        self.ec2_conn = self.conn['ec2_connection']
 #---------------------------------------------------------------------------------------------------------------------------   
     def configure_alarm(self,alarm_info):
         alarm_id = self.metricAlarm.config_alarm(self.cloudwatch_conn,alarm_info)
@@ -73,9 +63,12 @@ class plugin_alarms():
         return self.metric.metricsData(self.cloudwatch_conn,metric_name,period,instance_id)
 #--------------------------------------------------------------------------------------------------------------------------- 
 
-    def alarm_calls(self,message):
+    def alarm_calls(self,message,aws_conn):
         """Gets the message from the common consumer"""
-        try: 
+        try:
+            self.cloudwatch_conn = aws_conn['cloudwatch_connection']
+            self.ec2_conn = aws_conn['ec2_connection'] 
+
             log.info("Action required against: %s" % (message.topic))
             alarm_info = json.loads(message.value)
 
@@ -94,12 +87,12 @@ class plugin_alarms():
                         log.debug("Alarm Already exists")
                         payload = json.dumps(config_resp)                                   
                         file = open('../../core/models/create_alarm_resp.json','wb').write((payload))
-                        self.producer.create_alarm_response(key='create_alarm_response',message=payload,topic = 'alarm_response')
+                        self.producer.publish_alarm_response(key='create_alarm_response',message=payload)
              
                     else: 
                         payload = json.dumps(config_resp)                                
                         file = open('../../core/models/create_alarm_resp.json','wb').write((payload))                           
-                        self.producer.create_alarm_response(key='create_alarm_response',message=payload,topic = 'alarm_response')
+                        self.producer.publish_alarm_response(key='create_alarm_response',message=payload)
                         log.info("New alarm created with alarm info: %s", config_resp)                           
              
                 else:
@@ -114,7 +107,7 @@ class plugin_alarms():
                     ack_details = self.get_ack_details(alarm_info)
                     payload = json.dumps(ack_details)                                  
                     file = open('../../core/models/notify_alarm.json','wb').write((payload))
-                    self.producer.notify_alarm(key='notify_alarm',message=payload,topic = 'alarm_response')
+                    self.producer.notify_alarm(key='notify_alarm',message=payload)
                     log.info("Acknowledge sent: %s", ack_details)
 
                 else:
@@ -134,13 +127,13 @@ class plugin_alarms():
                     if update_resp == None:                                    
                         payload = json.dumps(update_resp)                                   
                         file = open('../../core/models/update_alarm_resp.json','wb').write((payload))
-                        self.producer.update_alarm_response(key='update_alarm_response',message=payload,topic = 'alarm_response')
+                        self.producer.update_alarm_response(key='update_alarm_response',message=payload)
                         log.debug("Alarm Already exists")
 
                     else: 
                         payload = json.dumps(update_resp)                                   
                         file = open('../../core/models/update_alarm_resp.json','wb').write((payload))
-                        self.producer.update_alarm_response(key='update_alarm_response',message=payload,topic = 'alarm_response')
+                        self.producer.update_alarm_response(key='update_alarm_response',message=payload)
                         log.info("Alarm Updated with alarm info: %s", update_resp)                           
 
                 else:
@@ -153,7 +146,7 @@ class plugin_alarms():
                 del_resp = self.delete_alarm(del_info)
                 payload = json.dumps(del_resp)                                   
                 file = open('../../core/models/delete_alarm_resp.json','wb').write((payload))
-                self.producer.delete_alarm_response(key='delete_alarm_response',message=payload,topic = 'alarm_response')
+                self.producer.delete_alarm_response(key='delete_alarm_response',message=payload)
                 log.info("Alarm Deleted with alarm info: %s", del_resp)
 
        
@@ -165,7 +158,7 @@ class plugin_alarms():
                     list_resp = self.get_alarms_list(alarm_info)#['alarm_names']
                     payload = json.dumps(list_resp)                                                                 
                     file = open('../../core/models/list_alarm_resp.json','wb').write((payload))
-                    self.producer.list_alarm_response(key='list_alarm_response',message=payload,topic = 'alarm_response')
+                    self.producer.list_alarm_response(key='list_alarm_response',message=payload)
 
                 else:
                     log.error("Resource ID is Incorrect")