From 8b16911cdbe9f76c6d381bd96ee449e87b03541f Mon Sep 17 00:00:00 2001 From: javaid Date: Mon, 11 Dec 2017 11:10:28 +0500 Subject: [PATCH 1/1] Updates (AWS Plugin) code with common producer/consumer Change-Id: I99d3c2b8484ca3d2486549212692e5f0efbb2d97 Signed-off-by: javaid --- osm_mon/core/message_bus/common_consumer | 21 +- osm_mon/plugins/CloudWatch/plugin_alarm.py | 227 +++++++++---------- osm_mon/plugins/CloudWatch/plugin_metric.py | 208 +++++++++++++++++ osm_mon/plugins/CloudWatch/plugin_metrics.py | 223 ------------------ 4 files changed, 333 insertions(+), 346 deletions(-) create mode 100644 osm_mon/plugins/CloudWatch/plugin_metric.py delete mode 100644 osm_mon/plugins/CloudWatch/plugin_metrics.py diff --git a/osm_mon/core/message_bus/common_consumer b/osm_mon/core/message_bus/common_consumer index bb8ce0c..7221d0c 100755 --- a/osm_mon/core/message_bus/common_consumer +++ b/osm_mon/core/message_bus/common_consumer @@ -25,6 +25,7 @@ import sys import os sys.path.append("/root/MON") +sys.path.append("../../plugins/CloudWatch") logging.basicConfig(filename='MON_plugins.log', format='%(asctime)s %(message)s', @@ -39,13 +40,14 @@ from osm_mon.plugins.OpenStack.Aodh import alarming from osm_mon.plugins.OpenStack.common import Common from osm_mon.plugins.OpenStack.Gnocchi import metrics +from plugin_alarm import plugin_alarms +from plugin_metric import plugin_metrics # Initialize servers server = {'server': 'localhost:9092'} # Initialize consumers for alarms and metrics -common_consumer = KafkaConsumer(group_id='osm_mon', - bootstrap_servers=server['server']) +common_consumer = KafkaConsumer(bootstrap_servers=server['server']) # Create OpenStack alarming and metric instances auth_token = None @@ -53,6 +55,9 @@ openstack_auth = Common() openstack_metrics = metrics.Metrics() openstack_alarms = alarming.Alarming() +# Create CloudWatch alarm and metric instances +cloudwatch_alarms = plugin_alarms() +cloudwatch_metrics = plugin_metrics() def get_vim_type(message): """Get the vim type that is required by the message.""" @@ -73,18 +78,20 @@ try: if message.topic == "metric_request": # Check the vim desired by the message vim_type = get_vim_type(message) + if vim_type == "openstack": log.info("This message is for the OpenStack plugin.") openstack_metrics.metric_calls( message, openstack_auth, auth_token) - elif vim_type == "cloudwatch": + elif vim_type == "aws": + cloudwatch_metrics.metric_calls(message) log.info("This message is for the CloudWatch plugin.") elif vim_type == "vrops": log.info("This message is for the vROPs plugin.") - else: + else: log.debug("vim_type is misconfigured or unsupported; %s", vim_type) @@ -95,7 +102,8 @@ try: log.info("This message is for the OpenStack plugin.") openstack_alarms.alarming(message, openstack_auth, auth_token) - elif vim_type == "cloudwatch": + elif vim_type == "aws": + cloudwatch_alarms.alarm_calls(message) log.info("This message is for the CloudWatch plugin.") elif vim_type == "vrops": @@ -112,7 +120,8 @@ try: log.info("This message is for the OpenStack plugin.") auth_token = openstack_auth._authenticate(message=message) - elif vim_type == "cloudwatch": + elif vim_type == "aws": + #TODO Access credentials later log.info("This message is for the CloudWatch plugin.") elif vim_type == "vrops": diff --git a/osm_mon/plugins/CloudWatch/plugin_alarm.py b/osm_mon/plugins/CloudWatch/plugin_alarm.py index eb48208..adc4d29 100644 --- a/osm_mon/plugins/CloudWatch/plugin_alarm.py +++ b/osm_mon/plugins/CloudWatch/plugin_alarm.py @@ -33,19 +33,16 @@ from jsmin import jsmin from connection import Connection from metric_alarms import MetricAlarm from metrics import Metrics -from kafka import KafkaConsumer -sys.path.append("../../core/message-bus") +sys.path.append("../../core/message_bus") from producer import KafkaProducer -class Plugin(): +class plugin_alarms(): """Receives Alarm info from MetricAlarm and connects with the consumer/producer""" def __init__ (self): self.conn = Connection() self.metricAlarm = MetricAlarm() self.metric = Metrics() - server = {'server': 'localhost:9092', 'topic': 'alarm_request'} - self._consumer = KafkaConsumer(server['topic'], bootstrap_servers=server['server']) - self._consumer.subscribe(['alarm_request']) + self.connection() self.producer = KafkaProducer('') #--------------------------------------------------------------------------------------------------------------------------- def connection(self): @@ -76,118 +73,108 @@ class Plugin(): return self.metric.metricsData(self.cloudwatch_conn,metric_name,period,instance_id) #--------------------------------------------------------------------------------------------------------------------------- - def consumer(self): - """Consume info from the message bus to manage alarms.""" + def alarm_calls(self,message): + """Gets the message from the common consumer""" try: - for message in self._consumer: - # Check the Functionlity that needs to be performed: topic = 'alarms'/'metrics'/'Access_Credentials' - if message.topic == "alarm_request": - log.info("Action required against: %s" % (message.topic)) + log.info("Action required against: %s" % (message.topic)) + alarm_info = json.loads(message.value) + + if message.key == "create_alarm_request": + alarm_inner_dict = alarm_info['alarm_create_request'] + metric_status = self.check_metric(alarm_inner_dict['metric_name']) + + if self.check_resource(alarm_inner_dict['resource_uuid']) == True and metric_status['status'] == True: + log.debug ("Resource and Metrics exists") + + alarm_info['alarm_create_request']['metric_name'] = metric_status['metric_name'] + #Generate a valid response message, send via producer + config_resp = self.configure_alarm(alarm_info) #alarm_info = message.value + + if config_resp == None: + log.debug("Alarm Already exists") + payload = json.dumps(config_resp) + file = open('../../core/models/create_alarm_resp.json','wb').write((payload)) + self.producer.create_alarm_response(key='create_alarm_response',message=payload,topic = 'alarm_response') + + else: + payload = json.dumps(config_resp) + file = open('../../core/models/create_alarm_resp.json','wb').write((payload)) + self.producer.create_alarm_response(key='create_alarm_response',message=payload,topic = 'alarm_response') + log.info("New alarm created with alarm info: %s", config_resp) + + else: + log.error("Resource ID doesn't exists") + + elif message.key == "acknowledge_alarm": + alarm_inner_dict = alarm_info['ack_details'] + + if self.check_resource(alarm_inner_dict['resource_uuid']) == True: alarm_info = json.loads(message.value) + #Generate a valid response message, send via producer + ack_details = self.get_ack_details(alarm_info) + payload = json.dumps(ack_details) + file = open('../../core/models/notify_alarm.json','wb').write((payload)) + self.producer.notify_alarm(key='notify_alarm',message=payload,topic = 'alarm_response') + log.info("Acknowledge sent: %s", ack_details) - if message.key == "create_alarm_request": - if alarm_info['vim_type'] == 'AWS': - alarm_inner_dict = alarm_info['alarm_create_request'] - metric_status = self.check_metric(alarm_inner_dict['metric_name']) - if self.check_resource(alarm_inner_dict['resource_uuid']) == True and metric_status['status'] == True: - log.debug ("Resource and Metrics exists") - - alarm_info['alarm_create_request']['metric_name'] = metric_status['metric_name'] - #Generate a valid response message, send via producer - config_resp = self.configure_alarm(alarm_info) #alarm_info = message.value - if config_resp == None: - log.debug("Alarm Already exists") - payload = json.dumps(config_resp) - file = open('../../core/models/create_alarm_resp.json','wb').write((payload)) - self.producer.create_alarm_response(key='create_alarm_response',message=payload,topic = 'alarm_response') - else: - payload = json.dumps(config_resp) - file = open('../../core/models/create_alarm_resp.json','wb').write((payload)) - - self.producer.create_alarm_response(key='create_alarm_response',message=payload,topic = 'alarm_response') - log.info("New alarm created with alarm info: %s", config_resp) - else: - log.error("Resource ID doesn't exists") - else: - log.error("Plugin inputs are incorrect") - - - elif message.key == "acknowledge_alarm": - alarm_inner_dict = alarm_info['ack_details'] - if alarm_info['vim_type'] == 'AWS': - if self.check_resource(alarm_inner_dict['resource_uuid']) == True: - alarm_info = json.loads(message.value) - #Generate a valid response message, send via producer - ack_details = self.get_ack_details(alarm_info) - payload = json.dumps(ack_details) - file = open('../../core/models/notify_alarm.json','wb').write((payload)) - self.producer.notify_alarm(key='notify_alarm',message=payload,topic = 'alarm_response') - log.info("Acknowledge sent: %s", ack_details) - else: - log.error("Resource ID is Incorrect") - else: - log.error(" VIM type incorrect ") - - - elif message.key == "update_alarm_request": - if alarm_info['vim_type'] == 'AWS': - alarm_inner_dict = alarm_info['alarm_update_request'] - metric_status = self.check_metric(alarm_inner_dict['metric_name']) - - if metric_status['status'] == True: - log.debug ("Resource and Metrics exists") - alarm_info['alarm_update_request']['metric_name'] = metric_status['metric_name'] - #Generate a valid response message, send via producer - update_resp = self.update_alarm_configuration(alarm_info) - if update_resp == None: - payload = json.dumps(update_resp) - file = open('../../core/models/update_alarm_resp.json','wb').write((payload)) - self.producer.update_alarm_response(key='update_alarm_response',message=payload,topic = 'alarm_response') - log.debug("Alarm Already exists") - else: - payload = json.dumps(update_resp) - file = open('../../core/models/update_alarm_resp.json','wb').write((payload)) - self.producer.update_alarm_response(key='update_alarm_response',message=payload,topic = 'alarm_response') - log.info("Alarm Updated with alarm info: %s", update_resp) - else: - log.info ("Metric Not Supported") - else: - log.error(" VIM type Incorrect ") - - elif message.key == "delete_alarm_request": - if alarm_info['vim_type'] == 'AWS': - del_info = json.loads(message.value) - #Generate a valid response message, send via producer - del_resp = self.delete_alarm(del_info) - payload = json.dumps(del_resp) - file = open('../../core/models/delete_alarm_resp.json','wb').write((payload)) - self.producer.delete_alarm_response(key='delete_alarm_response',message=payload,topic = 'alarm_response') - log.info("Alarm Deleted with alarm info: %s", del_resp) - else: - log.error(" VIM type Incorrect ") - - elif message.key == "alarm_list_request": - alarm_inner_dict = alarm_info['alarm_list_request'] - if alarm_info['vim_type'] == 'AWS': - if self.check_resource(alarm_inner_dict['resource_uuid']) == True or alarm_inner_dict['resource_uuid'] == "": - #Generate a valid response message, send via producer - list_resp = self.get_alarms_list(alarm_info)#['alarm_names'] - payload = json.dumps(list_resp) - file = open('../../core/models/list_alarm_resp.json','wb').write((payload)) - self.producer.list_alarm_response(key='list_alarm_response',message=payload,topic = 'alarm_response') - else: - log.error("Resource ID is Incorrect") - else: - log.error(" VIM type Incorrect ") - - else: - log.debug("Unknown key, no action will be performed") - else: - log.info("Message topic not relevant to this plugin: %s", - message.topic) + log.error("Resource ID is Incorrect") + + + elif message.key == "update_alarm_request": + alarm_inner_dict = alarm_info['alarm_update_request'] + metric_status = self.check_metric(alarm_inner_dict['metric_name']) + + if metric_status['status'] == True: + log.debug ("Resource and Metrics exists") + alarm_info['alarm_update_request']['metric_name'] = metric_status['metric_name'] + #Generate a valid response message, send via producer + update_resp = self.update_alarm_configuration(alarm_info) + + if update_resp == None: + payload = json.dumps(update_resp) + file = open('../../core/models/update_alarm_resp.json','wb').write((payload)) + self.producer.update_alarm_response(key='update_alarm_response',message=payload,topic = 'alarm_response') + log.debug("Alarm Already exists") + + else: + payload = json.dumps(update_resp) + file = open('../../core/models/update_alarm_resp.json','wb').write((payload)) + self.producer.update_alarm_response(key='update_alarm_response',message=payload,topic = 'alarm_response') + log.info("Alarm Updated with alarm info: %s", update_resp) + + else: + log.info ("Metric Not Supported") + + + elif message.key == "delete_alarm_request": + del_info = json.loads(message.value) + #Generate a valid response message, send via producer + del_resp = self.delete_alarm(del_info) + payload = json.dumps(del_resp) + file = open('../../core/models/delete_alarm_resp.json','wb').write((payload)) + self.producer.delete_alarm_response(key='delete_alarm_response',message=payload,topic = 'alarm_response') + log.info("Alarm Deleted with alarm info: %s", del_resp) + + + elif message.key == "alarm_list_request": + alarm_inner_dict = alarm_info['alarm_list_request'] + + if self.check_resource(alarm_inner_dict['resource_uuid']) == True or alarm_inner_dict['resource_uuid'] == "": + #Generate a valid response message, send via producer + list_resp = self.get_alarms_list(alarm_info)#['alarm_names'] + payload = json.dumps(list_resp) + file = open('../../core/models/list_alarm_resp.json','wb').write((payload)) + self.producer.list_alarm_response(key='list_alarm_response',message=payload,topic = 'alarm_response') + + else: + log.error("Resource ID is Incorrect") + + else: + log.debug("Unknown key, no action will be performed") + except Exception as e: - log.error("Consumer exception: %s", str(e)) + log.error("Message retrieval exception: %s", str(e)) #--------------------------------------------------------------------------------------------------------------------------- def check_resource(self,resource_uuid): '''Finding Resource with the resource_uuid''' @@ -198,6 +185,7 @@ class Plugin(): #resource_id for instance_id in instances: instance_id = str(instance_id).split(':')[1] + if instance_id == resource_uuid: check_resp['resource_uuid'] = resource_uuid return True @@ -215,36 +203,41 @@ class Plugin(): if metric_name == 'CPU_UTILIZATION': metric_name = 'CPUUtilization' metric_status = True + elif metric_name == 'DISK_READ_OPS': metric_name = 'DiskReadOps' metric_status = True + elif metric_name == 'DISK_WRITE_OPS': metric_name = 'DiskWriteOps' metric_status = True + elif metric_name == 'DISK_READ_BYTES': metric_name = 'DiskReadBytes' metric_status = True + elif metric_name == 'DISK_WRITE_BYTES': metric_name = 'DiskWriteBytes' metric_status = True + elif metric_name == 'PACKETS_RECEIVED': metric_name = 'NetworkPacketsIn' metric_status = True + elif metric_name == 'PACKETS_SENT': metric_name = 'NetworkPacketsOut' metric_status = True + else: metric_name = None metric_status = False check_resp['metric_name'] = metric_name #status + if metric_status == True: check_resp['status'] = True return check_resp + except Exception as e: log.error("Error in Plugin Inputs %s",str(e)) #--------------------------------------------------------------------------------------------------------------------------- - -obj = Plugin() -obj.connection() -obj.consumer() diff --git a/osm_mon/plugins/CloudWatch/plugin_metric.py b/osm_mon/plugins/CloudWatch/plugin_metric.py new file mode 100644 index 0000000..6b9598f --- /dev/null +++ b/osm_mon/plugins/CloudWatch/plugin_metric.py @@ -0,0 +1,208 @@ +## +# Copyright 2017 xFlow Research Pvt. Ltd +# This file is part of MON module +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# For those usages not covered by the Apache License, Version 2.0 please +# contact with: wajeeha.hamid@xflowresearch.com +## + +''' +AWS-Plugin implements all the methods of MON to interact with AWS using the BOTO client +''' + +__author__ = "Wajeeha Hamid" +__date__ = "18-September-2017" + +import sys +import json +from connection import Connection +from metric_alarms import MetricAlarm +from metrics import Metrics +sys.path.append("../../core/message_bus") +from producer import KafkaProducer +import logging as log + +class plugin_metrics(): + """Receives Alarm info from MetricAlarm and connects with the consumer/producer """ + def __init__ (self): + self.conn = Connection() + self.metric = Metrics() + self.producer = KafkaProducer('') + self.connection() +#--------------------------------------------------------------------------------------------------------------------------- + def connection(self): + try: + """Connecting instances with CloudWatch""" + self.conn.setEnvironment() + self.conn = self.conn.connection_instance() + self.cloudwatch_conn = self.conn['cloudwatch_connection'] + self.ec2_conn = self.conn['ec2_connection'] + + except Exception as e: + log.error("Failed to Connect with AWS %s: " + str(e)) +#--------------------------------------------------------------------------------------------------------------------------- + def create_metric_request(self,metric_info): + '''Comaptible API using normalized parameters''' + metric_resp = self.metric.createMetrics(self.cloudwatch_conn,metric_info) + return metric_resp +#--------------------------------------------------------------------------------------------------------------------------- + def update_metric_request(self,updated_info): + '''Comaptible API using normalized parameters''' + update_resp = self.metric.updateMetrics(self.cloudwatch_conn,updated_info) + return update_resp +#--------------------------------------------------------------------------------------------------------------------------- + def delete_metric_request(self,delete_info): + '''Comaptible API using normalized parameters''' + del_resp = self.metric.deleteMetrics(self.cloudwatch_conn,delete_info) + return del_resp +#--------------------------------------------------------------------------------------------------------------------------- + def list_metrics_request(self,list_info): + '''Comaptible API using normalized parameters''' + list_resp = self.metric.listMetrics(self.cloudwatch_conn,list_info) + return list_resp +#--------------------------------------------------------------------------------------------------------------------------- + def read_metrics_data(self,list_info): + '''Comaptible API using normalized parameters + Read all metric data related to a specified metric''' + data_resp=self.metric.metricsData(self.cloudwatch_conn,list_info) + return data_resp +#--------------------------------------------------------------------------------------------------------------------------- + + def metric_calls(self,message): + '''Consumer will consume the message from SO, + 1) parse the message and trigger the methods ac + cording to keys and topics provided in request. + + 2) The response from plugin is saved in json format. + + 3) The producer object then calls the producer response + methods to send the response back to message bus + ''' + + try: + metric_info = json.loads(message.value) + metric_response = dict() + + if metric_info['vim_type'] == 'AWS': + log.debug ("VIM support : AWS") + + # Check the Functionlity that needs to be performed: topic = 'alarms'/'metrics'/'Access_Credentials' + if message.topic == "metric_request": + log.info("Action required against: %s" % (message.topic)) + + if message.key == "create_metric_request": + if self.check_resource(metric_info['metric_create']['resource_uuid']) == True: + metric_resp = self.create_metric_request(metric_info['metric_create']) #alarm_info = message.value + metric_response['schema_version'] = metric_info['schema_version'] + metric_response['schema_type'] = "create_metric_response" + metric_response['metric_create_response'] = metric_resp + payload = json.dumps(metric_response) + file = open('../../core/models/create_metric_resp.json','wb').write((payload)) + self.producer.create_metrics_resp(key='create_metric_response',message=payload,topic = 'metric_response') + + log.info("Metric configured: %s", metric_resp) + return metric_response + + elif message.key == "update_metric_request": + if self.check_resource(metric_info['metric_create']['resource_uuid']) == True: + update_resp = self.update_metric_request(metric_info['metric_create']) + metric_response['schema_version'] = metric_info['schema_version'] + metric_response['schema_type'] = "update_metric_response" + metric_response['metric_update_response'] = update_resp + payload = json.dumps(metric_response) + file = open('../../core/models/update_metric_resp.json','wb').write((payload)) + self.producer.update_metric_response(key='update_metric_response',message=payload,topic = 'metric_response') + + log.info("Metric Updates: %s",metric_response) + return metric_response + + elif message.key == "delete_metric_request": + if self.check_resource(metric_info['resource_uuid']) == True: + del_resp=self.delete_metric_request(metric_info) + payload = json.dumps(del_resp) + file = open('../../core/models/delete_metric_resp.json','wb').write((payload)) + self.producer.delete_metric_response(key='delete_metric_response',message=payload,topic = 'metric_response') + + log.info("Metric Deletion Not supported in AWS : %s",del_resp) + return del_resp + + elif message.key == "list_metric_request": + if self.check_resource(metric_info['metrics_list_request']['resource_uuid']) == True: + list_resp = self.list_metrics_request(metric_info['metrics_list_request']) + metric_response['schema_version'] = metric_info['schema_version'] + metric_response['schema_type'] = "list_metric_response" + metric_response['correlation_id'] = metric_info['metrics_list_request']['correlation_id'] + metric_response['vim_type'] = metric_info['vim_type'] + metric_response['metrics_list'] = list_resp + payload = json.dumps(metric_response) + file = open('../../core/models/list_metric_resp.json','wb').write((payload)) + self.producer.list_metric_response(key='list_metrics_response',message=payload,topic = 'metric_response') + + log.info("Metric List: %s",metric_response) + return metric_response + + elif message.key == "read_metric_data_request": + if self.check_resource(metric_info['resource_uuid']) == True: + data_resp = self.read_metrics_data(metric_info) + metric_response['schema_version'] = metric_info['schema_version'] + metric_response['schema_type'] = "read_metric_data_response" + metric_response['metric_name'] = metric_info['metric_name'] + metric_response['metric_uuid'] = metric_info['metric_uuid'] + metric_response['correlation_id'] = metric_info['correlation_uuid'] + metric_response['resource_uuid'] = metric_info['resource_uuid'] + metric_response['tenant_uuid'] = metric_info['tenant_uuid'] + metric_response['metrics_data'] = data_resp + payload = json.dumps(metric_response) + file = open('../../core/models/read_metric_data_resp.json','wb').write((payload)) + self.producer.read_metric_data_response(key='read_metric_data_response',message=payload,topic = 'metric_response') + + log.info("Metric Data Response: %s",metric_response) + return metric_response + + else: + log.debug("Unknown key, no action will be performed") + else: + log.info("Message topic not relevant to this plugin: %s", + message.topic) + + except Exception as e: + log.error("Consumer exception: %s", str(e)) + +#--------------------------------------------------------------------------------------------------------------------------- + def check_resource(self,resource_uuid): + + '''Checking the resource_uuid is present in EC2 instances''' + try: + check_resp = dict() + instances = self.ec2_conn.get_all_instance_status() + status_resource = False + + #resource_id + for instance_id in instances: + instance_id = str(instance_id).split(':')[1] + if instance_id == resource_uuid: + check_resp['resource_uuid'] = resource_uuid + status_resource = True + else: + status_resource = False + + #status + return status_resource + + except Exception as e: + log.error("Error in Plugin Inputs %s",str(e)) +#--------------------------------------------------------------------------------------------------------------------------- + diff --git a/osm_mon/plugins/CloudWatch/plugin_metrics.py b/osm_mon/plugins/CloudWatch/plugin_metrics.py deleted file mode 100644 index 72365fe..0000000 --- a/osm_mon/plugins/CloudWatch/plugin_metrics.py +++ /dev/null @@ -1,223 +0,0 @@ -## -# Copyright 2017 xFlow Research Pvt. Ltd -# This file is part of MON module -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# -# For those usages not covered by the Apache License, Version 2.0 please -# contact with: wajeeha.hamid@xflowresearch.com -## - -''' -AWS-Plugin implements all the methods of MON to interact with AWS using the BOTO client -''' - -__author__ = "Wajeeha Hamid" -__date__ = "18-September-2017" - -import sys -import json -from connection import Connection -from metric_alarms import MetricAlarm -from metrics import Metrics -sys.path.append("../../core/message_bus") -from producer import KafkaProducer -from kafka import KafkaConsumer -import logging as log - -class plugin_metrics(): - """Receives Alarm info from MetricAlarm and connects with the consumer/producer """ - def __init__ (self): - self.conn = Connection() - self.metric = Metrics() - - #server = {'server': 'localhost:9092', 'topic': 'metrics_request'} - #Initialize a Consumer object to consume message from the SO - self._consumer = KafkaConsumer(bootstrap_servers='localhost:9092') - self._consumer.subscribe(['metric_request']) - - #producer = KafkaProducer('create_metric_request') - - self.producer = KafkaProducer('') -#--------------------------------------------------------------------------------------------------------------------------- - def connection(self): - try: - """Connecting instances with CloudWatch""" - self.conn.setEnvironment() - self.conn = self.conn.connection_instance() - self.cloudwatch_conn = self.conn['cloudwatch_connection'] - self.ec2_conn = self.conn['ec2_connection'] - - except Exception as e: - log.error("Failed to Connect with AWS %s: " + str(e)) -#--------------------------------------------------------------------------------------------------------------------------- - def create_metric_request(self,metric_info): - '''Comaptible API using normalized parameters''' - metric_resp = self.metric.createMetrics(self.cloudwatch_conn,metric_info) - return metric_resp -#--------------------------------------------------------------------------------------------------------------------------- - def update_metric_request(self,updated_info): - '''Comaptible API using normalized parameters''' - update_resp = self.metric.updateMetrics(self.cloudwatch_conn,updated_info) - return update_resp -#--------------------------------------------------------------------------------------------------------------------------- - def delete_metric_request(self,delete_info): - '''Comaptible API using normalized parameters''' - del_resp = self.metric.deleteMetrics(self.cloudwatch_conn,delete_info) - return del_resp -#--------------------------------------------------------------------------------------------------------------------------- - def list_metrics_request(self,list_info): - '''Comaptible API using normalized parameters''' - list_resp = self.metric.listMetrics(self.cloudwatch_conn,list_info) - return list_resp -#--------------------------------------------------------------------------------------------------------------------------- - def read_metrics_data(self,list_info): - '''Comaptible API using normalized parameters - Read all metric data related to a specified metric''' - data_resp=self.metric.metricsData(self.cloudwatch_conn,list_info) - return data_resp -#--------------------------------------------------------------------------------------------------------------------------- - - def consumer(self): - '''Consumer will consume the message from SO, - 1) parse the message and trigger the methods ac - cording to keys and topics provided in request. - - 2) The response from plugin is saved in json format. - - 3) The producer object then calls the producer response - methods to send the response back to message bus - ''' - - try: - for message in self._consumer: - metric_info = json.loads(message.value) - print metric_info - metric_response = dict() - - if metric_info['vim_type'] == 'AWS': - log.debug ("VIM support : AWS") - - # Check the Functionlity that needs to be performed: topic = 'alarms'/'metrics'/'Access_Credentials' - if message.topic == "metric_request": - log.info("Action required against: %s" % (message.topic)) - - if message.key == "create_metric_request": - if self.check_resource(metric_info['metric_create']['resource_uuid']) == True: - metric_resp = self.create_metric_request(metric_info['metric_create']) #alarm_info = message.value - metric_response['schema_version'] = metric_info['schema_version'] - metric_response['schema_type'] = "create_metric_response" - metric_response['metric_create_response'] = metric_resp - payload = json.dumps(metric_response) - file = open('../../core/models/create_metric_resp.json','wb').write((payload)) - self.producer.create_metrics_resp(key='create_metric_response',message=payload,topic = 'metric_response') - - log.info("Metric configured: %s", metric_resp) - return metric_response - - elif message.key == "update_metric_request": - if self.check_resource(metric_info['metric_create']['resource_uuid']) == True: - update_resp = self.update_metric_request(metric_info['metric_create']) - metric_response['schema_version'] = metric_info['schema_version'] - metric_response['schema_type'] = "update_metric_response" - metric_response['metric_update_response'] = update_resp - payload = json.dumps(metric_response) - print payload - file = open('../../core/models/update_metric_resp.json','wb').write((payload)) - self.producer.update_metric_response(key='update_metric_response',message=payload,topic = 'metric_response') - - log.info("Metric Updates: %s",metric_response) - return metric_response - - elif message.key == "delete_metric_request": - if self.check_resource(metric_info['resource_uuid']) == True: - del_resp=self.delete_metric_request(metric_info) - payload = json.dumps(del_resp) - file = open('../../core/models/delete_metric_resp.json','wb').write((payload)) - self.producer.delete_metric_response(key='delete_metric_response',message=payload,topic = 'metric_response') - - log.info("Metric Deletion Not supported in AWS : %s",del_resp) - return del_resp - - elif message.key == "list_metric_request": - if self.check_resource(metric_info['metrics_list_request']['resource_uuid']) == True: - list_resp = self.list_metrics_request(metric_info['metrics_list_request']) - metric_response['schema_version'] = metric_info['schema_version'] - metric_response['schema_type'] = "list_metric_response" - metric_response['correlation_id'] = metric_info['metrics_list_request']['correlation_id'] - metric_response['vim_type'] = metric_info['vim_type'] - metric_response['metrics_list'] = list_resp - payload = json.dumps(metric_response) - file = open('../../core/models/list_metric_resp.json','wb').write((payload)) - self.producer.list_metric_response(key='list_metrics_response',message=payload,topic = 'metric_response') - - log.info("Metric List: %s",metric_response) - return metric_response - - elif message.key == "read_metric_data_request": - if self.check_resource(metric_info['resource_uuid']) == True: - data_resp = self.read_metrics_data(metric_info) - metric_response['schema_version'] = metric_info['schema_version'] - metric_response['schema_type'] = "read_metric_data_response" - metric_response['metric_name'] = metric_info['metric_name'] - metric_response['metric_uuid'] = metric_info['metric_uuid'] - metric_response['correlation_id'] = metric_info['correlation_uuid'] - metric_response['resource_uuid'] = metric_info['resource_uuid'] - metric_response['tenant_uuid'] = metric_info['tenant_uuid'] - metric_response['metrics_data'] = data_resp - payload = json.dumps(metric_response) - file = open('../../core/models/read_metric_data_resp.json','wb').write((payload)) - self.producer.read_metric_data_response(key='read_metric_data_response',message=payload,topic = 'metric_response') - - log.info("Metric Data Response: %s",metric_response) - return metric_response - - else: - log.debug("Unknown key, no action will be performed") - else: - log.info("Message topic not relevant to this plugin: %s", - message.topic) - else: - print "Bad VIM Request" - except Exception as e: - log.error("Consumer exception: %s", str(e)) - -#--------------------------------------------------------------------------------------------------------------------------- - def check_resource(self,resource_uuid): - - '''Checking the resource_uuid is present in EC2 instances''' - try: - check_resp = dict() - instances = self.ec2_conn.get_all_instance_status() - status_resource = False - - #resource_id - for instance_id in instances: - instance_id = str(instance_id).split(':')[1] - if instance_id == resource_uuid: - check_resp['resource_uuid'] = resource_uuid - status_resource = True - else: - status_resource = False - - #status - return status_resource - - except Exception as e: - log.error("Error in Plugin Inputs %s",str(e)) -#--------------------------------------------------------------------------------------------------------------------------- - -obj = plugin_metrics() -obj.connection() -obj.consumer() -- 2.25.1