Updates (AWS Plugin) code with common producer/consumer 68/5768/1
authorjavaid <usman.javaid@xflowresearch.com>
Mon, 11 Dec 2017 06:10:28 +0000 (11:10 +0500)
committerjavaid <usman.javaid@xflowresearch.com>
Mon, 11 Dec 2017 06:21:04 +0000 (11:21 +0500)
Change-Id: I99d3c2b8484ca3d2486549212692e5f0efbb2d97
Signed-off-by: javaid <usman.javaid@xflowresearch.com>
osm_mon/core/message_bus/common_consumer
osm_mon/plugins/CloudWatch/plugin_alarm.py
osm_mon/plugins/CloudWatch/plugin_metric.py [new file with mode: 0644]
osm_mon/plugins/CloudWatch/plugin_metrics.py [deleted file]

index bb8ce0c..7221d0c 100755 (executable)
@@ -25,6 +25,7 @@ import sys
 import os
 
 sys.path.append("/root/MON")
+sys.path.append("../../plugins/CloudWatch")
 
 logging.basicConfig(filename='MON_plugins.log',
                     format='%(asctime)s %(message)s',
@@ -39,13 +40,14 @@ from osm_mon.plugins.OpenStack.Aodh import alarming
 from osm_mon.plugins.OpenStack.common import Common
 from osm_mon.plugins.OpenStack.Gnocchi import metrics
 
+from plugin_alarm import plugin_alarms
+from plugin_metric import plugin_metrics
 
 # Initialize servers
 server = {'server': 'localhost:9092'}
 
 # Initialize consumers for alarms and metrics
-common_consumer = KafkaConsumer(group_id='osm_mon',
-                                bootstrap_servers=server['server'])
+common_consumer = KafkaConsumer(bootstrap_servers=server['server'])
 
 # Create OpenStack alarming and metric instances
 auth_token = None
@@ -53,6 +55,9 @@ openstack_auth = Common()
 openstack_metrics = metrics.Metrics()
 openstack_alarms = alarming.Alarming()
 
+# Create CloudWatch alarm and metric instances
+cloudwatch_alarms = plugin_alarms()
+cloudwatch_metrics = plugin_metrics()
 
 def get_vim_type(message):
     """Get the vim type that is required by the message."""
@@ -73,18 +78,20 @@ try:
         if message.topic == "metric_request":
             # Check the vim desired by the message
             vim_type = get_vim_type(message)
+            
             if vim_type == "openstack":
                 log.info("This message is for the OpenStack plugin.")
                 openstack_metrics.metric_calls(
                     message, openstack_auth, auth_token)
 
-            elif vim_type == "cloudwatch":
+            elif vim_type == "aws":
+                cloudwatch_metrics.metric_calls(message)
                 log.info("This message is for the CloudWatch plugin.")
 
             elif vim_type == "vrops":
                 log.info("This message is for the vROPs plugin.")
 
-            else:
+            else:   
                 log.debug("vim_type is misconfigured or unsupported; %s",
                           vim_type)
 
@@ -95,7 +102,8 @@ try:
                 log.info("This message is for the OpenStack plugin.")
                 openstack_alarms.alarming(message, openstack_auth, auth_token)
 
-            elif vim_type == "cloudwatch":
+            elif vim_type == "aws":
+                cloudwatch_alarms.alarm_calls(message)
                 log.info("This message is for the CloudWatch plugin.")
 
             elif vim_type == "vrops":
@@ -112,7 +120,8 @@ try:
                 log.info("This message is for the OpenStack plugin.")
                 auth_token = openstack_auth._authenticate(message=message)
 
-            elif vim_type == "cloudwatch":
+            elif vim_type == "aws":
+                #TODO Access credentials later
                 log.info("This message is for the CloudWatch plugin.")
 
             elif vim_type == "vrops":
index eb48208..adc4d29 100644 (file)
@@ -33,19 +33,16 @@ from jsmin import jsmin
 from connection import Connection
 from metric_alarms import MetricAlarm
 from metrics import Metrics
-from kafka import KafkaConsumer
-sys.path.append("../../core/message-bus")
+sys.path.append("../../core/message_bus")
 from producer import KafkaProducer
 
-class Plugin():
+class plugin_alarms():
     """Receives Alarm info from MetricAlarm and connects with the consumer/producer"""
     def __init__ (self): 
         self.conn = Connection()
         self.metricAlarm = MetricAlarm()
         self.metric = Metrics()
-        server = {'server': 'localhost:9092', 'topic': 'alarm_request'}
-        self._consumer = KafkaConsumer(server['topic'], bootstrap_servers=server['server'])
-        self._consumer.subscribe(['alarm_request'])
+        self.connection()
         self.producer = KafkaProducer('')     
 #---------------------------------------------------------------------------------------------------------------------------      
     def connection(self):
@@ -76,118 +73,108 @@ class Plugin():
         return self.metric.metricsData(self.cloudwatch_conn,metric_name,period,instance_id)
 #--------------------------------------------------------------------------------------------------------------------------- 
 
-    def consumer(self):
-        """Consume info from the message bus to manage alarms."""
+    def alarm_calls(self,message):
+        """Gets the message from the common consumer"""
         try: 
-            for message in self._consumer:
-                # Check the Functionlity that needs to be performed: topic = 'alarms'/'metrics'/'Access_Credentials'
-                if message.topic == "alarm_request":
-                    log.info("Action required against: %s" % (message.topic))
+            log.info("Action required against: %s" % (message.topic))
+            alarm_info = json.loads(message.value)
+
+            if message.key == "create_alarm_request":  
+                alarm_inner_dict = alarm_info['alarm_create_request']
+                metric_status = self.check_metric(alarm_inner_dict['metric_name'])                            
+             
+                if self.check_resource(alarm_inner_dict['resource_uuid']) == True and metric_status['status'] == True:
+                    log.debug ("Resource and Metrics exists")
+                
+                    alarm_info['alarm_create_request']['metric_name'] = metric_status['metric_name']
+                    #Generate a valid response message, send via producer
+                    config_resp = self.configure_alarm(alarm_info) #alarm_info = message.value
+             
+                    if config_resp == None:
+                        log.debug("Alarm Already exists")
+                        payload = json.dumps(config_resp)                                   
+                        file = open('../../core/models/create_alarm_resp.json','wb').write((payload))
+                        self.producer.create_alarm_response(key='create_alarm_response',message=payload,topic = 'alarm_response')
+             
+                    else: 
+                        payload = json.dumps(config_resp)                                
+                        file = open('../../core/models/create_alarm_resp.json','wb').write((payload))                           
+                        self.producer.create_alarm_response(key='create_alarm_response',message=payload,topic = 'alarm_response')
+                        log.info("New alarm created with alarm info: %s", config_resp)                           
+             
+                else:
+                    log.error("Resource ID doesn't exists")                
+                
+            elif message.key == "acknowledge_alarm":
+                alarm_inner_dict = alarm_info['ack_details']
+                
+                if self.check_resource(alarm_inner_dict['resource_uuid']) == True: 
                     alarm_info = json.loads(message.value)
+                    #Generate a valid response message, send via producer
+                    ack_details = self.get_ack_details(alarm_info)
+                    payload = json.dumps(ack_details)                                  
+                    file = open('../../core/models/notify_alarm.json','wb').write((payload))
+                    self.producer.notify_alarm(key='notify_alarm',message=payload,topic = 'alarm_response')
+                    log.info("Acknowledge sent: %s", ack_details)
 
-                    if message.key == "create_alarm_request":  
-                        if alarm_info['vim_type'] == 'AWS':                        
-                            alarm_inner_dict = alarm_info['alarm_create_request']
-                            metric_status = self.check_metric(alarm_inner_dict['metric_name'])                            
-                            if self.check_resource(alarm_inner_dict['resource_uuid']) == True and metric_status['status'] == True:
-                                log.debug ("Resource and Metrics exists")
-                            
-                                alarm_info['alarm_create_request']['metric_name'] = metric_status['metric_name']
-                                #Generate a valid response message, send via producer
-                                config_resp = self.configure_alarm(alarm_info) #alarm_info = message.value
-                                if config_resp == None:
-                                    log.debug("Alarm Already exists")
-                                    payload = json.dumps(config_resp)                                   
-                                    file = open('../../core/models/create_alarm_resp.json','wb').write((payload))
-                                    self.producer.create_alarm_response(key='create_alarm_response',message=payload,topic = 'alarm_response')
-                                else: 
-                                    payload = json.dumps(config_resp)                                   
-                                    file = open('../../core/models/create_alarm_resp.json','wb').write((payload))
-                                    
-                                    self.producer.create_alarm_response(key='create_alarm_response',message=payload,topic = 'alarm_response')
-                                    log.info("New alarm created with alarm info: %s", config_resp)                           
-                            else:
-                                log.error("Resource ID doesn't exists")
-                        else:
-                            log.error("Plugin inputs are incorrect")
-                        
-
-                    elif message.key == "acknowledge_alarm":
-                        alarm_inner_dict = alarm_info['ack_details']
-                        if alarm_info['vim_type'] == 'AWS':
-                            if self.check_resource(alarm_inner_dict['resource_uuid']) == True: 
-                                alarm_info = json.loads(message.value)
-                                #Generate a valid response message, send via producer
-                                ack_details = self.get_ack_details(alarm_info)
-                                payload = json.dumps(ack_details)                                  
-                                file = open('../../core/models/notify_alarm.json','wb').write((payload))
-                                self.producer.notify_alarm(key='notify_alarm',message=payload,topic = 'alarm_response')
-                                log.info("Acknowledge sent: %s", ack_details)
-                            else:
-                                log.error("Resource ID is Incorrect")    
-                        else:
-                            log.error(" VIM type incorrect ")     
-
-
-                    elif message.key == "update_alarm_request":
-                        if alarm_info['vim_type'] == 'AWS':                         
-                            alarm_inner_dict = alarm_info['alarm_update_request']
-                            metric_status = self.check_metric(alarm_inner_dict['metric_name'])
-                            
-                            if metric_status['status'] == True:
-                                log.debug ("Resource and Metrics exists")
-                                alarm_info['alarm_update_request']['metric_name'] = metric_status['metric_name']
-                                #Generate a valid response message, send via producer
-                                update_resp = self.update_alarm_configuration(alarm_info)
-                                if update_resp == None:                                    
-                                    payload = json.dumps(update_resp)                                   
-                                    file = open('../../core/models/update_alarm_resp.json','wb').write((payload))
-                                    self.producer.update_alarm_response(key='update_alarm_response',message=payload,topic = 'alarm_response')
-                                    log.debug("Alarm Already exists")
-                                else: 
-                                    payload = json.dumps(update_resp)                                   
-                                    file = open('../../core/models/update_alarm_resp.json','wb').write((payload))
-                                    self.producer.update_alarm_response(key='update_alarm_response',message=payload,topic = 'alarm_response')
-                                    log.info("Alarm Updated with alarm info: %s", update_resp)                           
-                            else:
-                                log.info ("Metric Not Supported")
-                        else:
-                            log.error(" VIM type Incorrect ")  
-                    
-                    elif message.key == "delete_alarm_request":  
-                        if alarm_info['vim_type'] == 'AWS':
-                            del_info = json.loads(message.value)
-                            #Generate a valid response message, send via producer
-                            del_resp = self.delete_alarm(del_info)
-                            payload = json.dumps(del_resp)                                   
-                            file = open('../../core/models/delete_alarm_resp.json','wb').write((payload))
-                            self.producer.delete_alarm_response(key='delete_alarm_response',message=payload,topic = 'alarm_response')
-                            log.info("Alarm Deleted with alarm info: %s", del_resp)
-                        else: 
-                            log.error(" VIM type Incorrect ")     
-               
-                    elif message.key == "alarm_list_request":
-                        alarm_inner_dict = alarm_info['alarm_list_request']
-                        if alarm_info['vim_type'] == 'AWS': 
-                            if self.check_resource(alarm_inner_dict['resource_uuid']) == True or alarm_inner_dict['resource_uuid'] == "": 
-                                #Generate a valid response message, send via producer
-                                list_resp = self.get_alarms_list(alarm_info)#['alarm_names']
-                                payload = json.dumps(list_resp)                                                                 
-                                file = open('../../core/models/list_alarm_resp.json','wb').write((payload))
-                                self.producer.list_alarm_response(key='list_alarm_response',message=payload,topic = 'alarm_response')
-                            else:
-                                log.error("Resource ID is Incorrect")    
-                        else:
-                            log.error(" VIM type Incorrect ") 
-                                
-                    else:
-                        log.debug("Unknown key, no action will be performed")
-                           
                 else:
-                    log.info("Message topic not relevant to this plugin: %s",
-                         message.topic)    
+                    log.error("Resource ID is Incorrect")                        
+
+
+            elif message.key == "update_alarm_request":                         
+                alarm_inner_dict = alarm_info['alarm_update_request']
+                metric_status = self.check_metric(alarm_inner_dict['metric_name'])
+                
+                if metric_status['status'] == True:
+                    log.debug ("Resource and Metrics exists")
+                    alarm_info['alarm_update_request']['metric_name'] = metric_status['metric_name']
+                    #Generate a valid response message, send via producer
+                    update_resp = self.update_alarm_configuration(alarm_info)
+
+                    if update_resp == None:                                    
+                        payload = json.dumps(update_resp)                                   
+                        file = open('../../core/models/update_alarm_resp.json','wb').write((payload))
+                        self.producer.update_alarm_response(key='update_alarm_response',message=payload,topic = 'alarm_response')
+                        log.debug("Alarm Already exists")
+
+                    else: 
+                        payload = json.dumps(update_resp)                                   
+                        file = open('../../core/models/update_alarm_resp.json','wb').write((payload))
+                        self.producer.update_alarm_response(key='update_alarm_response',message=payload,topic = 'alarm_response')
+                        log.info("Alarm Updated with alarm info: %s", update_resp)                           
+
+                else:
+                    log.info ("Metric Not Supported")
+         
+            
+            elif message.key == "delete_alarm_request":  
+                del_info = json.loads(message.value)
+                #Generate a valid response message, send via producer
+                del_resp = self.delete_alarm(del_info)
+                payload = json.dumps(del_resp)                                   
+                file = open('../../core/models/delete_alarm_resp.json','wb').write((payload))
+                self.producer.delete_alarm_response(key='delete_alarm_response',message=payload,topic = 'alarm_response')
+                log.info("Alarm Deleted with alarm info: %s", del_resp)
+
+       
+            elif message.key == "alarm_list_request":
+                alarm_inner_dict = alarm_info['alarm_list_request']
+                
+                if self.check_resource(alarm_inner_dict['resource_uuid']) == True or alarm_inner_dict['resource_uuid'] == "": 
+                    #Generate a valid response message, send via producer
+                    list_resp = self.get_alarms_list(alarm_info)#['alarm_names']
+                    payload = json.dumps(list_resp)                                                                 
+                    file = open('../../core/models/list_alarm_resp.json','wb').write((payload))
+                    self.producer.list_alarm_response(key='list_alarm_response',message=payload,topic = 'alarm_response')
+
+                else:
+                    log.error("Resource ID is Incorrect")             
+
+            else:
+                log.debug("Unknown key, no action will be performed")    
+
         except Exception as e:
-                log.error("Consumer exception: %s", str(e))             
+                log.error("Message retrieval exception: %s", str(e))             
 #--------------------------------------------------------------------------------------------------------------------------- 
     def check_resource(self,resource_uuid):
         '''Finding Resource with the resource_uuid'''
@@ -198,6 +185,7 @@ class Plugin():
             #resource_id
             for instance_id in instances:
                 instance_id = str(instance_id).split(':')[1]
+
                 if instance_id == resource_uuid:
                     check_resp['resource_uuid'] = resource_uuid
                     return True 
@@ -215,36 +203,41 @@ class Plugin():
             if metric_name == 'CPU_UTILIZATION':
                 metric_name = 'CPUUtilization'
                 metric_status = True
+
             elif metric_name == 'DISK_READ_OPS':
                 metric_name = 'DiskReadOps'
                 metric_status = True
+
             elif metric_name == 'DISK_WRITE_OPS':
                 metric_name = 'DiskWriteOps'
                 metric_status = True
+
             elif metric_name == 'DISK_READ_BYTES':
                 metric_name = 'DiskReadBytes'
                 metric_status = True
+
             elif metric_name == 'DISK_WRITE_BYTES':
                 metric_name = 'DiskWriteBytes'
                 metric_status = True
+
             elif metric_name == 'PACKETS_RECEIVED':
                 metric_name = 'NetworkPacketsIn'
                 metric_status = True
+
             elif metric_name == 'PACKETS_SENT':
                 metric_name = 'NetworkPacketsOut'
                 metric_status = True
+
             else:
                 metric_name = None
                 metric_status = False
             check_resp['metric_name'] = metric_name
             #status
+
             if metric_status == True:
                 check_resp['status'] = True
                 return check_resp   
+
         except Exception as e: 
             log.error("Error in Plugin Inputs %s",str(e)) 
 #--------------------------------------------------------------------------------------------------------------------------- 
-
-obj = Plugin()
-obj.connection()
-obj.consumer()
diff --git a/osm_mon/plugins/CloudWatch/plugin_metric.py b/osm_mon/plugins/CloudWatch/plugin_metric.py
new file mode 100644 (file)
index 0000000..6b9598f
--- /dev/null
@@ -0,0 +1,208 @@
+##
+# Copyright 2017 xFlow Research Pvt. Ltd
+# This file is part of MON module
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: wajeeha.hamid@xflowresearch.com
+##
+
+'''
+AWS-Plugin implements all the methods of MON to interact with AWS using the BOTO client
+'''
+
+__author__ = "Wajeeha Hamid"
+__date__   = "18-September-2017"
+
+import sys
+import json
+from connection import Connection
+from metric_alarms import MetricAlarm
+from metrics import Metrics
+sys.path.append("../../core/message_bus")
+from producer import KafkaProducer
+import logging as log
+
+class plugin_metrics():
+    """Receives Alarm info from MetricAlarm and connects with the consumer/producer """
+    def __init__ (self): 
+        self.conn = Connection()
+        self.metric = Metrics()
+        self.producer = KafkaProducer('')
+        self.connection()
+#---------------------------------------------------------------------------------------------------------------------------      
+    def connection(self):
+        try:
+            """Connecting instances with CloudWatch"""
+            self.conn.setEnvironment()
+            self.conn = self.conn.connection_instance()
+            self.cloudwatch_conn = self.conn['cloudwatch_connection']
+            self.ec2_conn = self.conn['ec2_connection']
+
+        except Exception as e:
+            log.error("Failed to Connect with AWS %s: " + str(e))
+#---------------------------------------------------------------------------------------------------------------------------   
+    def create_metric_request(self,metric_info):
+        '''Comaptible API using normalized parameters'''
+        metric_resp = self.metric.createMetrics(self.cloudwatch_conn,metric_info)
+        return metric_resp
+#---------------------------------------------------------------------------------------------------------------------------          
+    def update_metric_request(self,updated_info):
+        '''Comaptible API using normalized parameters'''
+        update_resp = self.metric.updateMetrics(self.cloudwatch_conn,updated_info)
+        return update_resp
+#---------------------------------------------------------------------------------------------------------------------------            
+    def delete_metric_request(self,delete_info):
+        '''Comaptible API using normalized parameters'''
+        del_resp = self.metric.deleteMetrics(self.cloudwatch_conn,delete_info)
+        return del_resp
+#---------------------------------------------------------------------------------------------------------------------------  
+    def list_metrics_request(self,list_info):
+        '''Comaptible API using normalized parameters'''
+        list_resp = self.metric.listMetrics(self.cloudwatch_conn,list_info)
+        return list_resp
+#---------------------------------------------------------------------------------------------------------------------------                        
+    def read_metrics_data(self,list_info):
+        '''Comaptible API using normalized parameters
+        Read all metric data related to a specified metric'''
+        data_resp=self.metric.metricsData(self.cloudwatch_conn,list_info)
+        return data_resp
+#--------------------------------------------------------------------------------------------------------------------------- 
+
+    def metric_calls(self,message):
+        '''Consumer will consume the message from SO,
+        1) parse the message and trigger the methods ac
+        cording to keys and topics provided in request.
+
+        2) The response from plugin is saved in json format.
+
+        3) The producer object then calls the producer response
+        methods to send the response back to message bus
+        '''
+        
+        try:
+            metric_info = json.loads(message.value)
+            metric_response = dict()
+
+            if metric_info['vim_type'] == 'AWS':
+                log.debug ("VIM support : AWS")
+
+            # Check the Functionlity that needs to be performed: topic = 'alarms'/'metrics'/'Access_Credentials'
+                if message.topic == "metric_request":
+                    log.info("Action required against: %s" % (message.topic))
+
+                    if message.key == "create_metric_request":                            
+                        if self.check_resource(metric_info['metric_create']['resource_uuid']) == True:
+                            metric_resp = self.create_metric_request(metric_info['metric_create']) #alarm_info = message.value
+                            metric_response['schema_version'] = metric_info['schema_version']
+                            metric_response['schema_type']    = "create_metric_response"
+                            metric_response['metric_create_response'] = metric_resp
+                            payload = json.dumps(metric_response)                                                                  
+                            file = open('../../core/models/create_metric_resp.json','wb').write((payload))
+                            self.producer.create_metrics_resp(key='create_metric_response',message=payload,topic = 'metric_response')
+                            
+                            log.info("Metric configured: %s", metric_resp)
+                            return metric_response
+                            
+                    elif message.key == "update_metric_request":
+                        if self.check_resource(metric_info['metric_create']['resource_uuid']) == True:
+                            update_resp = self.update_metric_request(metric_info['metric_create'])
+                            metric_response['schema_version'] = metric_info['schema_version']
+                            metric_response['schema_type'] = "update_metric_response"
+                            metric_response['metric_update_response'] = update_resp
+                            payload = json.dumps(metric_response)                                                                                               
+                            file = open('../../core/models/update_metric_resp.json','wb').write((payload))
+                            self.producer.update_metric_response(key='update_metric_response',message=payload,topic = 'metric_response')
+
+                            log.info("Metric Updates: %s",metric_response)
+                            return metric_response
+                            
+                    elif message.key == "delete_metric_request":
+                        if self.check_resource(metric_info['resource_uuid']) == True:
+                            del_resp=self.delete_metric_request(metric_info)
+                            payload = json.dumps(del_resp)                                                                                               
+                            file = open('../../core/models/delete_metric_resp.json','wb').write((payload))
+                            self.producer.delete_metric_response(key='delete_metric_response',message=payload,topic = 'metric_response')
+
+                            log.info("Metric Deletion Not supported in AWS : %s",del_resp)
+                            return del_resp
+
+                    elif message.key == "list_metric_request": 
+                        if self.check_resource(metric_info['metrics_list_request']['resource_uuid']) == True:
+                            list_resp = self.list_metrics_request(metric_info['metrics_list_request'])
+                            metric_response['schema_version'] = metric_info['schema_version']
+                            metric_response['schema_type'] = "list_metric_response"
+                            metric_response['correlation_id'] = metric_info['metrics_list_request']['correlation_id']
+                            metric_response['vim_type'] = metric_info['vim_type']
+                            metric_response['metrics_list'] = list_resp
+                            payload = json.dumps(metric_response)                                                                                                
+                            file = open('../../core/models/list_metric_resp.json','wb').write((payload))
+                            self.producer.list_metric_response(key='list_metrics_response',message=payload,topic = 'metric_response')
+
+                            log.info("Metric List: %s",metric_response)
+                            return metric_response
+
+                    elif message.key == "read_metric_data_request":
+                        if self.check_resource(metric_info['resource_uuid']) == True:
+                            data_resp = self.read_metrics_data(metric_info)
+                            metric_response['schema_version'] = metric_info['schema_version']
+                            metric_response['schema_type'] = "read_metric_data_response"
+                            metric_response['metric_name'] = metric_info['metric_name']
+                            metric_response['metric_uuid'] = metric_info['metric_uuid']
+                            metric_response['correlation_id'] = metric_info['correlation_uuid']
+                            metric_response['resource_uuid'] = metric_info['resource_uuid']
+                            metric_response['tenant_uuid'] = metric_info['tenant_uuid']
+                            metric_response['metrics_data'] = data_resp
+                            payload = json.dumps(metric_response)                                                                
+                            file = open('../../core/models/read_metric_data_resp.json','wb').write((payload))
+                            self.producer.read_metric_data_response(key='read_metric_data_response',message=payload,topic = 'metric_response')
+                            
+                            log.info("Metric Data Response: %s",metric_response)
+                            return metric_response 
+
+                    else:
+                        log.debug("Unknown key, no action will be performed")
+                else:
+                    log.info("Message topic not relevant to this plugin: %s",
+                         message.topic)
+            
+        except Exception as e:
+            log.error("Consumer exception: %s", str(e))
+
+#---------------------------------------------------------------------------------------------------------------------------
+    def check_resource(self,resource_uuid):
+
+        '''Checking the resource_uuid is present in EC2 instances'''
+        try:
+            check_resp = dict()
+            instances = self.ec2_conn.get_all_instance_status()
+            status_resource = False
+
+            #resource_id
+            for instance_id in instances:
+                instance_id = str(instance_id).split(':')[1]
+                if instance_id == resource_uuid:
+                    check_resp['resource_uuid'] = resource_uuid
+                    status_resource = True
+                else:
+                    status_resource = False
+
+            #status
+            return status_resource
+
+        except Exception as e: 
+            log.error("Error in Plugin Inputs %s",str(e))          
+#---------------------------------------------------------------------------------------------------------------------------   
+
diff --git a/osm_mon/plugins/CloudWatch/plugin_metrics.py b/osm_mon/plugins/CloudWatch/plugin_metrics.py
deleted file mode 100644 (file)
index 72365fe..0000000
+++ /dev/null
@@ -1,223 +0,0 @@
-##
-# Copyright 2017 xFlow Research Pvt. Ltd
-# This file is part of MON module
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-# For those usages not covered by the Apache License, Version 2.0 please
-# contact with: wajeeha.hamid@xflowresearch.com
-##
-
-'''
-AWS-Plugin implements all the methods of MON to interact with AWS using the BOTO client
-'''
-
-__author__ = "Wajeeha Hamid"
-__date__   = "18-September-2017"
-
-import sys
-import json
-from connection import Connection
-from metric_alarms import MetricAlarm
-from metrics import Metrics
-sys.path.append("../../core/message_bus")
-from producer import KafkaProducer
-from kafka import KafkaConsumer
-import logging as log
-
-class plugin_metrics():
-    """Receives Alarm info from MetricAlarm and connects with the consumer/producer """
-    def __init__ (self): 
-        self.conn = Connection()
-        self.metric = Metrics()
-
-        #server = {'server': 'localhost:9092', 'topic': 'metrics_request'}
-    #Initialize a Consumer object to consume message from the SO    
-        self._consumer = KafkaConsumer(bootstrap_servers='localhost:9092')
-        self._consumer.subscribe(['metric_request'])
-
-        #producer = KafkaProducer('create_metric_request')
-
-        self.producer = KafkaProducer('')
-#---------------------------------------------------------------------------------------------------------------------------      
-    def connection(self):
-        try:
-            """Connecting instances with CloudWatch"""
-            self.conn.setEnvironment()
-            self.conn = self.conn.connection_instance()
-            self.cloudwatch_conn = self.conn['cloudwatch_connection']
-            self.ec2_conn = self.conn['ec2_connection']
-
-        except Exception as e:
-            log.error("Failed to Connect with AWS %s: " + str(e))
-#---------------------------------------------------------------------------------------------------------------------------   
-    def create_metric_request(self,metric_info):
-        '''Comaptible API using normalized parameters'''
-        metric_resp = self.metric.createMetrics(self.cloudwatch_conn,metric_info)
-        return metric_resp
-#---------------------------------------------------------------------------------------------------------------------------          
-    def update_metric_request(self,updated_info):
-        '''Comaptible API using normalized parameters'''
-        update_resp = self.metric.updateMetrics(self.cloudwatch_conn,updated_info)
-        return update_resp
-#---------------------------------------------------------------------------------------------------------------------------            
-    def delete_metric_request(self,delete_info):
-        '''Comaptible API using normalized parameters'''
-        del_resp = self.metric.deleteMetrics(self.cloudwatch_conn,delete_info)
-        return del_resp
-#---------------------------------------------------------------------------------------------------------------------------  
-    def list_metrics_request(self,list_info):
-        '''Comaptible API using normalized parameters'''
-        list_resp = self.metric.listMetrics(self.cloudwatch_conn,list_info)
-        return list_resp
-#---------------------------------------------------------------------------------------------------------------------------                        
-    def read_metrics_data(self,list_info):
-        '''Comaptible API using normalized parameters
-        Read all metric data related to a specified metric'''
-        data_resp=self.metric.metricsData(self.cloudwatch_conn,list_info)
-        return data_resp
-#--------------------------------------------------------------------------------------------------------------------------- 
-
-    def consumer(self):
-        '''Consumer will consume the message from SO,
-        1) parse the message and trigger the methods ac
-        cording to keys and topics provided in request.
-
-        2) The response from plugin is saved in json format.
-
-        3) The producer object then calls the producer response
-        methods to send the response back to message bus
-        '''
-        
-        try:
-            for message in self._consumer:
-                metric_info = json.loads(message.value)
-                print metric_info
-                metric_response = dict()
-  
-                if metric_info['vim_type'] == 'AWS':
-                    log.debug ("VIM support : AWS")
-
-                # Check the Functionlity that needs to be performed: topic = 'alarms'/'metrics'/'Access_Credentials'
-                    if message.topic == "metric_request":
-                        log.info("Action required against: %s" % (message.topic))
-
-                        if message.key == "create_metric_request":                            
-                            if self.check_resource(metric_info['metric_create']['resource_uuid']) == True:
-                                metric_resp = self.create_metric_request(metric_info['metric_create']) #alarm_info = message.value
-                                metric_response['schema_version'] = metric_info['schema_version']
-                                metric_response['schema_type']    = "create_metric_response"
-                                metric_response['metric_create_response'] = metric_resp
-                                payload = json.dumps(metric_response)                                                                  
-                                file = open('../../core/models/create_metric_resp.json','wb').write((payload))
-                                self.producer.create_metrics_resp(key='create_metric_response',message=payload,topic = 'metric_response')
-                                
-                                log.info("Metric configured: %s", metric_resp)
-                                return metric_response
-                                
-                        elif message.key == "update_metric_request":
-                            if self.check_resource(metric_info['metric_create']['resource_uuid']) == True:
-                                update_resp = self.update_metric_request(metric_info['metric_create'])
-                                metric_response['schema_version'] = metric_info['schema_version']
-                                metric_response['schema_type'] = "update_metric_response"
-                                metric_response['metric_update_response'] = update_resp
-                                payload = json.dumps(metric_response)  
-                                print payload                                                                                               
-                                file = open('../../core/models/update_metric_resp.json','wb').write((payload))
-                                self.producer.update_metric_response(key='update_metric_response',message=payload,topic = 'metric_response')
-
-                                log.info("Metric Updates: %s",metric_response)
-                                return metric_response
-                                
-                        elif message.key == "delete_metric_request":
-                            if self.check_resource(metric_info['resource_uuid']) == True:
-                                del_resp=self.delete_metric_request(metric_info)
-                                payload = json.dumps(del_resp)                                                                                               
-                                file = open('../../core/models/delete_metric_resp.json','wb').write((payload))
-                                self.producer.delete_metric_response(key='delete_metric_response',message=payload,topic = 'metric_response')
-
-                                log.info("Metric Deletion Not supported in AWS : %s",del_resp)
-                                return del_resp
-
-                        elif message.key == "list_metric_request": 
-                            if self.check_resource(metric_info['metrics_list_request']['resource_uuid']) == True:
-                                list_resp = self.list_metrics_request(metric_info['metrics_list_request'])
-                                metric_response['schema_version'] = metric_info['schema_version']
-                                metric_response['schema_type'] = "list_metric_response"
-                                metric_response['correlation_id'] = metric_info['metrics_list_request']['correlation_id']
-                                metric_response['vim_type'] = metric_info['vim_type']
-                                metric_response['metrics_list'] = list_resp
-                                payload = json.dumps(metric_response)                                                                                                
-                                file = open('../../core/models/list_metric_resp.json','wb').write((payload))
-                                self.producer.list_metric_response(key='list_metrics_response',message=payload,topic = 'metric_response')
-
-                                log.info("Metric List: %s",metric_response)
-                                return metric_response
-
-                        elif message.key == "read_metric_data_request":
-                            if self.check_resource(metric_info['resource_uuid']) == True:
-                                data_resp = self.read_metrics_data(metric_info)
-                                metric_response['schema_version'] = metric_info['schema_version']
-                                metric_response['schema_type'] = "read_metric_data_response"
-                                metric_response['metric_name'] = metric_info['metric_name']
-                                metric_response['metric_uuid'] = metric_info['metric_uuid']
-                                metric_response['correlation_id'] = metric_info['correlation_uuid']
-                                metric_response['resource_uuid'] = metric_info['resource_uuid']
-                                metric_response['tenant_uuid'] = metric_info['tenant_uuid']
-                                metric_response['metrics_data'] = data_resp
-                                payload = json.dumps(metric_response)                                                                
-                                file = open('../../core/models/read_metric_data_resp.json','wb').write((payload))
-                                self.producer.read_metric_data_response(key='read_metric_data_response',message=payload,topic = 'metric_response')
-                                
-                                log.info("Metric Data Response: %s",metric_response)
-                                return metric_response 
-
-                        else:
-                            log.debug("Unknown key, no action will be performed")
-                    else:
-                        log.info("Message topic not relevant to this plugin: %s",
-                             message.topic)
-            else:
-               print "Bad VIM Request"
-        except Exception as e:
-            log.error("Consumer exception: %s", str(e))
-
-#---------------------------------------------------------------------------------------------------------------------------
-    def check_resource(self,resource_uuid):
-
-        '''Checking the resource_uuid is present in EC2 instances'''
-        try:
-            check_resp = dict()
-            instances = self.ec2_conn.get_all_instance_status()
-            status_resource = False
-
-            #resource_id
-            for instance_id in instances:
-                instance_id = str(instance_id).split(':')[1]
-                if instance_id == resource_uuid:
-                    check_resp['resource_uuid'] = resource_uuid
-                    status_resource = True
-                else:
-                    status_resource = False
-
-            #status
-            return status_resource
-
-        except Exception as e: 
-            log.error("Error in Plugin Inputs %s",str(e))          
-#---------------------------------------------------------------------------------------------------------------------------   
-
-obj = plugin_metrics()
-obj.connection() 
-obj.consumer()