From: hamid Date: Mon, 18 Sep 2017 11:56:59 +0000 (+0500) Subject: Updated Code of AWS plugin including Minor alternations of consumer/producer apps X-Git-Tag: v4.0.0~84^2 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=681877c666be228fe01654b6a02f44c1b9a4fb32;p=osm%2FMON.git Updated Code of AWS plugin including Minor alternations of consumer/producer apps Change-Id: I44606c67ad00b68fd3e95934c2f1cb94eab99a6b Signed-off-by: hamid --- diff --git a/plugins/CloudWatch/connection.py b/plugins/CloudWatch/connection.py index 666879a..547c454 100644 --- a/plugins/CloudWatch/connection.py +++ b/plugins/CloudWatch/connection.py @@ -20,11 +20,11 @@ ## ''' -AWS-Plugin implements all the methods of MON to interact with AWS using the BOTO client +Connecting with AWS services --CloudWatch/EC2 using Required keys ''' __author__ = "Wajeeha Hamid" -__date__ = "31-August-2017" +__date__ = "18-September-2017" import sys import os @@ -48,7 +48,7 @@ except: class Connection(): """Connection Establishement with AWS -- VPC/EC2/CloudWatch""" #----------------------------------------------------------------------------------------------------------------------------- - def setEnvironment(self): + def setEnvironment(self): """Credentials for connecting to AWS-CloudWatch""" self.AWS_KEY = os.environ.get("AWS_ACCESS_KEY_ID") @@ -62,13 +62,13 @@ class Connection(): self.vpc_conn = boto.vpc.connect_to_region(self.AWS_REGION, aws_access_key_id=self.AWS_KEY, aws_secret_access_key=self.AWS_SECRET) - print self.vpc_conn + #EC2 Connection self.ec2_conn = boto.ec2.connect_to_region(self.AWS_REGION, aws_access_key_id=self.AWS_KEY, aws_secret_access_key=self.AWS_SECRET) - print self.ec2_conn + """ TODO : Required to add actions against alarms when needed """ #self.sns = connect_to_region(self.AWS_REGION) @@ -80,10 +80,10 @@ class Connection(): self.AWS_REGION, aws_access_key_id=self.AWS_KEY, aws_secret_access_key=self.AWS_SECRET) - - return self.cloudwatch_conn - print "--- Connection Established with AWS ---" - print "\n" + connection_dict = dict() + connection_dict['ec2_connection'] = self.ec2_conn + connection_dict['cloudwatch_connection'] = self.cloudwatch_conn + return connection_dict except Exception as e: log.error("Failed to Connect with AWS %s: ",str(e)) diff --git a/plugins/CloudWatch/metric_alarms.py b/plugins/CloudWatch/metric_alarms.py index c58987d..bd76c81 100644 --- a/plugins/CloudWatch/metric_alarms.py +++ b/plugins/CloudWatch/metric_alarms.py @@ -1,13 +1,41 @@ +## +# Copyright 2017 xFlow Research Pvt. Ltd +# This file is part of MON module +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# For those usages not covered by the Apache License, Version 2.0 please +# contact with: wajeeha.hamid@xflowresearch.com +## + +''' Handling of alarms requests via BOTO 2.48 ''' + +__author__ = "Wajeeha Hamid" +__date__ = "18-September-2017" + import sys import os import re import datetime import random +import json import logging as log from random import randint from operator import itemgetter from connection import Connection + try: import boto import boto.ec2 @@ -19,31 +47,35 @@ except: class MetricAlarm(): - """Alarms Functionality Handler -- Cloudwatch """ - - def config_alarm(self,cloudwatch_conn,alarm_info): - """Configure or Create a new alarm""" + """Alarms Functionality Handler -- Carries out alarming requests and responses via BOTO.Cloudwatch """ + def __init__(self): + self.alarm_resp = dict() + self.del_resp = dict() + def config_alarm(self,cloudwatch_conn,create_info): + """Configure or Create a new alarm""" + inner_dict = dict() """ Alarm Name to ID Mapping """ - alarm_id = alarm_info['alarm_name'] + "_" + alarm_info['resource_id'] - if self.is_present(cloudwatch_conn,alarm_id) == True: + alarm_info = create_info['alarm_create_request'] + alarm_id = alarm_info['alarm_name'] + "_" + alarm_info['resource_uuid'] + if self.is_present(cloudwatch_conn,alarm_id)['status'] == True: alarm_id = None log.debug ("Alarm already exists, Try updating the alarm using 'update_alarm_configuration()'") else: try: alarm = boto.ec2.cloudwatch.alarm.MetricAlarm( connection = cloudwatch_conn, - name = alarm_info['alarm_name'] + "_" + alarm_info['resource_id'], - metric = alarm_info['alarm_metric'], - namespace = alarm_info['instance_type'], - statistic = alarm_info['alarm_statistics'], - comparison = alarm_info['alarm_comparison'], - threshold = alarm_info['alarm_threshold'], - period = alarm_info['alarm_period'], - evaluation_periods = alarm_info['alarm_evaluation_period'], - unit=alarm_info['alarm_unit'], - description = alarm_info['alarm_severity'] + ";" + alarm_info['alarm_description'], - dimensions = {'InstanceId':alarm_info['resource_id']}, + name = alarm_info['alarm_name'] + "_" + alarm_info['resource_uuid'], + metric = alarm_info['metric_name'], + namespace = "AWS/EC2", + statistic = alarm_info['statistic'], + comparison = alarm_info['operation'], + threshold = alarm_info['threshold_value'], + period = 60, + evaluation_periods = 1, + unit=alarm_info['unit'], + description = alarm_info['severity'] + ";" + alarm_id + ";" + alarm_info['description'], + dimensions = {'InstanceId':alarm_info['resource_uuid']}, alarm_actions = None, ok_actions = None, insufficient_data_actions = None) @@ -51,126 +83,181 @@ class MetricAlarm(): """Setting Alarm Actions : alarm_actions = ['arn:aws:swf:us-west-2:465479087178:action/actions/AWS_EC2.InstanceId.Stop/1.0']""" - cloudwatch_conn.put_metric_alarm(alarm) + status=cloudwatch_conn.put_metric_alarm(alarm) + log.debug ("Alarm Configured Succesfully") - print "created" - print "\n" + self.alarm_resp['schema_version'] = str(create_info['schema_version']) + self.alarm_resp['schema_type'] = 'create_alarm_response' + + inner_dict['correlation_id'] = str(alarm_info['correlation_id']) + inner_dict['alarm_uuid'] = str(alarm_id) + inner_dict['status'] = status + + self.alarm_resp['alarm_create_response'] = inner_dict + if status == True: + return self.alarm_resp + else: + return None + except Exception as e: log.error("Alarm Configuration Failed: " + str(e)) - return alarm_id + #----------------------------------------------------------------------------------------------------------------------------- - def update_alarm(self,cloudwatch_conn,alarm_info): + def update_alarm(self,cloudwatch_conn,update_info): """Update or reconfigure an alarm""" - + inner_dict = dict() + alarm_info = update_info['alarm_update_request'] + """Alarm Name to ID Mapping""" - alarm_id = alarm_info['alarm_name'] + "_" + alarm_info['resource_id'] + alarm_id = alarm_info['alarm_uuid'] + status = self.is_present(cloudwatch_conn,alarm_id) """Verifying : Alarm exists already""" - if self.is_present(cloudwatch_conn,alarm_id) == False: + if status['status'] == False: alarm_id = None - log.debug("Alarm not found, Try creating the alarm using 'configure_alarm()'") + log.debug("Alarm not found, Try creating the alarm using 'configure_alarm()'") + return alarm_id else: try: alarm = boto.ec2.cloudwatch.alarm.MetricAlarm( - connection = cloudwatch_conn, - name = alarm_info['alarm_name'] + "_" + alarm_info['resource_id'], - metric = alarm_info['alarm_metric'], - namespace = alarm_info['instance_type'], - statistic = alarm_info['alarm_statistics'], - comparison = alarm_info['alarm_comparison'], - threshold = alarm_info['alarm_threshold'], - period = alarm_info['alarm_period'], - evaluation_periods = alarm_info['alarm_evaluation_period'], - unit=alarm_info['alarm_unit'], - description = alarm_info['alarm_severity'] + ";" + alarm_info['alarm_description'], - dimensions = {'InstanceId':alarm_info['resource_id']}, - alarm_actions = None, - ok_actions = None, - insufficient_data_actions = None) - cloudwatch_conn.put_metric_alarm(alarm) + connection = cloudwatch_conn, + name = status['info'].name , + metric = alarm_info['metric_name'], + namespace = "AWS/EC2", + statistic = alarm_info['statistic'], + comparison = alarm_info['operation'], + threshold = alarm_info['threshold_value'], + period = 60, + evaluation_periods = 1, + unit=alarm_info['unit'], + description = alarm_info['severity'] + ";" + alarm_id + ";" + alarm_info['description'], + dimensions = {'InstanceId':str(status['info'].dimensions['InstanceId']).split("'")[1]}, + alarm_actions = None, + ok_actions = None, + insufficient_data_actions = None) + + """Setting Alarm Actions : + alarm_actions = ['arn:aws:swf:us-west-2:465479087178:action/actions/AWS_EC2.InstanceId.Stop/1.0']""" + + status=cloudwatch_conn.put_metric_alarm(alarm) log.debug("Alarm %s Updated ",alarm.name) - print "updated" + self.alarm_resp['schema_version'] = str(update_info['schema_version']) + self.alarm_resp['schema_type'] = 'update_alarm_response' + + inner_dict['correlation_id'] = str(alarm_info['correlation_id']) + inner_dict['alarm_uuid'] = str(alarm_id) + inner_dict['status'] = status + + self.alarm_resp['alarm_update_response'] = inner_dict + return self.alarm_resp except Exception as e: log.error ("Error in Updating Alarm " + str(e)) - return alarm_id + #----------------------------------------------------------------------------------------------------------------------------- - def delete_Alarm(self,cloudwatch_conn,alarm_id): + def delete_Alarm(self,cloudwatch_conn,del_info_all): + """Deletes an Alarm with specified alarm_id""" + inner_dict = dict() + del_info = del_info_all['alarm_delete_request'] + status = self.is_present(cloudwatch_conn,del_info['alarm_uuid']) try: - if self.is_present(cloudwatch_conn,alarm_id) == True: - deleted_alarm=cloudwatch_conn.delete_alarms(alarm_id) - return alarm_id + if status['status'] == True: + del_status=cloudwatch_conn.delete_alarms(status['info'].name) + self.del_resp['schema_version'] = str(del_info_all['schema_version']) + self.del_resp['schema_type'] = 'delete_alarm_response' + inner_dict['correlation_id'] = str(del_info['correlation_id']) + inner_dict['alarm_id'] = str(del_info['alarm_uuid']) + inner_dict['status'] = del_status + self.del_resp['alarm_deletion_response'] = inner_dict + return self.del_resp return None except Exception as e: log.error("Alarm Not Deleted: " + str(e)) #----------------------------------------------------------------------------------------------------------------------------- - def alarms_list(self,cloudwatch_conn,instance_id): + def alarms_list(self,cloudwatch_conn,list_info): - """Get a list of alarms that are present on a particular VM instance""" - try: - log.debug("Getting Alarm list for %s",instance_id) - alarm_dict = dict() - alarm_list = [] + """Get a list of alarms that are present on a particular VIM type""" + alarm_list = [] + alarm_info = dict() + try: #id vim alarms = cloudwatch_conn.describe_alarms() itr = 0 for alarm in alarms: - if str(alarm.dimensions['InstanceId']).split("'")[1] == instance_id: - alarm_list.insert(itr,str(alarm.name)) - itr += 1 - alarm_dict['alarm_names'] = alarm_list - alarm_dict['resource_id'] = instance_id - return alarm_dict + list_info['alarm_list_request']['alarm_uuid'] = str(alarm.description).split(';')[1] + alarm_list.insert(itr,self.alarm_details(cloudwatch_conn,list_info)) + itr += 1 + + alarm_info['schema_version'] = str(list_info['schema_version']) + alarm_info['schema_type'] = 'list_alarm_response' + alarm_info['list_alarm_resp'] = json.dumps(alarm_list) + + return alarm_info except Exception as e: log.error("Error in Getting List : %s",str(e)) #----------------------------------------------------------------------------------------------------------------------------- - def alarm_details(self,cloudwatch_conn,alarm_name): + def alarm_details(self,cloudwatch_conn,ack_info): """Get an individual alarm details specified by alarm_name""" try: - alarms_details=cloudwatch_conn.describe_alarm_history() + alarms_details=cloudwatch_conn.describe_alarm_history() + alarm_details_all = dict() alarm_details_dict = dict() + ack_info_all = ack_info + + + if 'ack_details' in ack_info: + ack_info = ack_info['ack_details'] + elif 'alarm_list_request' in ack_info: + ack_info = ack_info['alarm_list_request'] + is_present = self.is_present(cloudwatch_conn,ack_info['alarm_uuid']) + for itr in range (len(alarms_details)): - if alarms_details[itr].name == alarm_name and 'created' in alarms_details[itr].summary :#name, timestamp, summary - status = alarms_details[itr].summary.split() + if alarms_details[itr].name == is_present['info'].name :#name, timestamp, summary + if 'created' in alarms_details[itr].summary: + alarm_details_dict['status'] = "New" + elif 'updated' in alarms_details[itr].summary: + alarm_details_dict['status'] = "Update" + elif 'deleted' in alarms_details[itr].summary: + alarm_details_dict['status'] = "Canceled" + + status = alarms_details[itr].summary.split() alarms = cloudwatch_conn.describe_alarms() for alarm in alarms: - if alarm.name == alarm_name: - alarm_details_dict['alarm_id'] = alarm_name - alarm_details_dict['resource_id'] = str(alarm.dimensions['InstanceId']).split("'")[1] - alarm_details_dict['severity'] = str(alarm.description) - alarm_details_dict['start_date_time'] = str(alarms_details[x].timestamp) + if str(alarm.description).split(';')[1] == ack_info['alarm_uuid']: + alarm_details_dict['alarm_uuid'] = str(ack_info['alarm_uuid']) + alarm_details_dict['resource_uuid'] = str(alarm.dimensions['InstanceId']).split("'")[1] + alarm_details_dict['description'] = str(alarm.description).split(';')[1] + alarm_details_dict['severity'] = str(alarm.description).split(';')[0] + alarm_details_dict['start_date_time'] = str(alarms_details[itr].timestamp) + alarm_details_dict['vim_type'] = str(ack_info_all['vim_type']) + #TODO : tenant id + if 'ack_details' in ack_info_all: + alarm_details_all['schema_version'] = str(ack_info_all['schema_version']) + alarm_details_all['schema_type'] = 'notify_alarm' + alarm_details_all['notify_details'] = alarm_details_dict + return alarm_details_all - return alarm_details_dict + elif 'alarm_list_request' in ack_info_all: + return alarm_details_dict except Exception as e: log.error("Error getting alarm details: %s",str(e)) #----------------------------------------------------------------------------------------------------------------------------- - def metrics_data(self,cloudwatch_conn,metric_name,instance_id,period,metric_unit): - - """Getting Metrics Stats for an Hour. Time interval can be modified using Timedelta value""" - metric_data= dict() - metric_stats=cloudwatch_conn.get_metric_statistics(period, datetime.datetime.utcnow() - datetime.timedelta(seconds=3600), - datetime.datetime.utcnow(),metric_name,'AWS/EC2', 'Maximum', - dimensions={'InstanceId':instance_id}, unit=metric_unit) - - for itr in range (len(metric_stats)): - metric_data['metric_name'] = metric_name - metric_data['Resource_id'] = instance_id - metric_data['Unit'] = metric_stats[itr]['Unit'] - metric_data['Timestamp'] = metric_stats[itr]['Timestamp'] - return metric_data - -#----------------------------------------------------------------------------------------------------------------------------- - def is_present(self,cloudwatch_conn,alarm_name): - """Finding Alarm exists or not""" + def is_present(self,cloudwatch_conn,alarm_id): + """Finding alarm from already configured alarms""" + alarm_info = dict() try: alarms = cloudwatch_conn.describe_alarms() for alarm in alarms: - if alarm.name == alarm_name: - return True - return False + if str(alarm.description).split(';')[1] == alarm_id: + alarm_info['status'] = True + alarm_info['info'] = alarm + return alarm_info + alarm_info['status'] = False + return alarm_info except Exception as e: log.error("Error Finding Alarm",str(e)) #----------------------------------------------------------------------------------------------------------------------------- + \ No newline at end of file diff --git a/plugins/CloudWatch/metrics.py b/plugins/CloudWatch/metrics.py new file mode 100644 index 0000000..ddda7e4 --- /dev/null +++ b/plugins/CloudWatch/metrics.py @@ -0,0 +1,236 @@ +## +# Copyright 2017 xFlow Research Pvt. Ltd +# This file is part of MON module +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# For those usages not covered by the Apache License, Version 2.0 please +# contact with: wajeeha.hamid@xflowresearch.com +## + +''' +AWS-Plugin implements all the methods of MON to interact with AWS using the BOTO client +''' + +__author__ = "Wajeeha Hamid" +__date__ = "18-Sept-2017" + +import sys +import datetime +import json +import logging as log + +try: + import boto + import boto.ec2 + import boto.vpc + import boto.ec2.cloudwatch + import boto.ec2.connection +except: + exit("Boto not avialable. Try activating your virtualenv OR `pip install boto`") + + + +class Metrics(): + + def createMetrics(self,cloudwatch_conn,metric_info): + try: + + '''createMetrics will be returning the metric_uuid=0 and + status=True when the metric is supported by AWS''' + + supported=self.check_metric(metric_info['metric_name']) + metric_resp = dict() + if supported['status'] == True: + metric_resp['status'] = True + metric_resp['metric_uuid'] = 0 + else: + metric_resp['status'] = False + metric_resp['metric_uuid'] = None + + metric_resp['resource_uuid'] = metric_info['resource_uuid'] + log.debug("Metrics Configured Succesfully : %s" , metric_resp) + return metric_resp + + except Exception as e: + log.error("Metric Configuration Failed: " + str(e)) +#----------------------------------------------------------------------------------------------------------------------------- + + def metricsData(self,cloudwatch_conn,data_info): + + """Getting Metrics Stats for an Hour.The datapoints are + received after every one minute. + Time interval can be modified using Timedelta value""" + + try: + metric_info = dict() + metric_info_dict = dict() + timestamp_arr = {} + value_arr = {} + + supported=self.check_metric(data_info['metric_name']) + + metric_stats=cloudwatch_conn.get_metric_statistics(60, datetime.datetime.utcnow() - datetime.timedelta(seconds=int(data_info['collection_period'])), + datetime.datetime.utcnow(),supported['metric_name'],'AWS/EC2', 'Maximum', + dimensions={'InstanceId':data_info['resource_uuid']}, unit='Percent') + + index = 0 + for itr in range (len(metric_stats)): + timestamp_arr[index] = str(metric_stats[itr]['Timestamp']) + value_arr[index] = metric_stats[itr]['Maximum'] + index +=1 + + metric_info_dict['time_series'] = timestamp_arr + metric_info_dict['metrics_series'] = value_arr + log.debug("Metrics Data : %s", metric_info_dict) + return metric_info_dict + + except Exception as e: + log.error("Error returning Metrics Data" + str(e)) + +#----------------------------------------------------------------------------------------------------------------------------- + def updateMetrics(self,cloudwatch_conn,metric_info): + + '''updateMetrics will be returning the metric_uuid=0 and + status=True when the metric is supported by AWS''' + try: + supported=self.check_metric(metric_info['metric_name']) + update_resp = dict() + if supported['status'] == True: + update_resp['status'] = True + update_resp['metric_uuid'] = 0 + else: + update_resp['status'] = False + update_resp['metric_uuid'] = None + + update_resp['resource_uuid'] = metric_info['resource_uuid'] + log.debug("Metric Updated : %s", update_resp) + return update_resp + + except Exception as e: + log.error("Error in Update Metrics" + str(e)) +#----------------------------------------------------------------------------------------------------------------------------- + def deleteMetrics(self,cloudwatch_conn,del_info): + + ''' " Not supported in AWS" + Returning the required parameters with status = False''' + try: + + del_resp = dict() + del_resp['schema_version'] = del_info['schema_version'] + del_resp['schema_type'] = "delete_metric_response" + del_resp['metric_name'] = del_info['metric_name'] + del_resp['metric_uuid'] = del_info['metric_uuid'] + del_resp['resource_uuid'] = del_info['resource_uuid'] + # TODO : yet to finalize + del_resp['tenant_uuid'] = del_info['tenant_uuid'] + del_resp['correlation_id'] = del_info['correlation_uuid'] + del_resp['status'] = False + log.info("Metric Deletion Not supported in AWS : %s",del_resp) + return del_resp + + except Exception as e: + log.error(" Metric Deletion Not supported in AWS : " + str(e)) +#------------------------------------------------------------------------------------------------------------------------------------ + + def listMetrics(self,cloudwatch_conn ,list_info): + + '''Returns the list of available AWS/EC2 metrics on which + alarms have been configured and the metrics are being monitored''' + try: + supported = self.check_metric(list_info['metric_name']) + + metrics_list = [] + metrics_data = dict() + metrics_info = dict() + + #To get the list of associated metrics with the alarms + alarms = cloudwatch_conn.describe_alarms() + itr = 0 + if list_info['metric_name'] == None: + for alarm in alarms: + instance_id = str(alarm.dimensions['InstanceId']).split("'")[1] + metrics_info['metric_name'] = str(alarm.metric) + metrics_info['metric_uuid'] = 0 + metrics_info['metric_unit'] = str(alarm.unit) + metrics_info['resource_uuid'] = instance_id + metrics_list.insert(itr,metrics_info) + itr += 1 + else: + for alarm in alarms: + print supported['metric_name'] + if alarm.metric == supported['metric_name']: + instance_id = str(alarm.dimensions['InstanceId']).split("'")[1] + metrics_info['metric_name'] = str(alarm.metric) + metrics_info['metric_uuid'] = 0 + metrics_info['metric_unit'] = str(alarm.unit) + metrics_info['resource_uuid'] = instance_id + metrics_list.insert(itr,metrics_info) + itr += 1 + + log.debug("Metrics List : %s",metrics_list) + return metrics_list + + except Exception as e: + log.error("Error in Getting Metric List " + str(e)) + +#------------------------------------------------------------------------------------------------------------------------------------ + + def check_metric(self,metric_name): + + ''' Checking whether the metric is supported by AWS ''' + try: + check_resp = dict() + #metric_name + if metric_name == 'CPU_UTILIZATION': + metric_name = 'CPUUtilization' + metric_status = True + elif metric_name == 'DISK_READ_OPS': + metric_name = 'DiskReadOps' + metric_status = True + elif metric_name == 'DISK_WRITE_OPS': + metric_name = 'DiskWriteOps' + metric_status = True + elif metric_name == 'DISK_READ_BYTES': + metric_name = 'DiskReadBytes' + metric_status = True + elif metric_name == 'DISK_WRITE_BYTES': + metric_name = 'DiskWriteBytes' + metric_status = True + elif metric_name == 'PACKETS_RECEIVED': + metric_name = 'NetworkPacketsIn' + metric_status = True + elif metric_name == 'PACKETS_SENT': + metric_name = 'NetworkPacketsOut' + metric_status = True + else: + metric_name = None + log.info("Metric Not Supported by AWS plugin ") + metric_status = False + check_resp['metric_name'] = metric_name + #status + if metric_status == True: + check_resp['status'] = True + return check_resp + except Exception as e: + log.error("Error in Plugin Inputs %s",str(e)) +#-------------------------------------------------------------------------------------------------------------------------------------- + + + + + + + + diff --git a/plugins/CloudWatch/plugin.py b/plugins/CloudWatch/plugin.py deleted file mode 100644 index 9f917ae..0000000 --- a/plugins/CloudWatch/plugin.py +++ /dev/null @@ -1,134 +0,0 @@ -## -# Copyright 2017 xFlow Research Pvt. Ltd -# This file is part of MON module -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# -# For those usages not covered by the Apache License, Version 2.0 please -# contact with: wajeeha.hamid@xflowresearch.com -## - -''' -AWS-Plugin implements all the methods of MON to interact with AWS using the BOTO client -''' - -__author__ = "Wajeeha Hamid" -__date__ = "31-August-2017" - -import sys -from connection import Connection -from metric_alarms import MetricAlarm -try: - from kafka import KafkaConsumer - from kafka.errors import KafkaError -except: - exit("Kafka Error. Try activating your Kafka Services") - -class Plugin(): - """Receives Alarm info from MetricAlarm and connects with the consumer/producer """ - def __init__ (self): - self.conn = Connection() - self.metricAlarm = MetricAlarm() - - server = {'server': 'localhost:9092', 'topic': 'alarms'} - #Initialize a Consumer object to consume message from the SO - self._consumer = KafkaConsumer(server['topic'], - group_id='my-group', - bootstrap_servers=server['server']) -#--------------------------------------------------------------------------------------------------------------------------- - def connection(self): - """Connecting instances with CloudWatch""" - self.conn.setEnvironment() - self.cloudwatch_conn = self.conn.connection_instance() -#--------------------------------------------------------------------------------------------------------------------------- - def configure_alarm(self,alarm_info): - - alarm_id = self.metricAlarm.config_alarm(self.cloudwatch_conn,alarm_info) - return alarm_id -#--------------------------------------------------------------------------------------------------------------------------- - def update_alarm_configuration(self,test): - alarm_id = self.metricAlarm.update_alarm(self.cloudwatch_conn,test) - return alarm_id -#--------------------------------------------------------------------------------------------------------------------------- - def delete_alarm(self,alarm_id): - return self.metricAlarm.delete_Alarm(self.cloudwatch_conn,alarm_id) -#--------------------------------------------------------------------------------------------------------------------------- - def get_alarms_list(self,instance_id): - return self.metricAlarm.alarms_list(self.cloudwatch_conn,instance_id) -#--------------------------------------------------------------------------------------------------------------------------- - def get_alarm_details(self,alarm_id): - return self.metricAlarm.alarm_details(self.cloudwatch_conn,alarm_id) -#--------------------------------------------------------------------------------------------------------------------------- - def get_metrics_data(self,metric_name,instance_id,period,metric_unit): - return self.metricAlarm.metrics_data(self.cloudwatch_conn,metric_name,instance_id,period,metric_unit) -#--------------------------------------------------------------------------------------------------------------------------- - def consumer(self,alarm_info): - try: - for message in self._consumer: - - # Check the Functionlity that needs to be performed: topic = 'alarms'/'metrics'/'Access_Credentials' - if message.topic == "alarms": - log.info("Action required against: %s" % (message.topic)) - - if message.key == "Configure_Alarm": - #alarm_info = json.loads(message.value) - alarm_id = self.configure_alarm(alarm_info) #alarm_info = message.value - log.info("New alarm created with alarmID: %s", alarm_id) - #Keys other than Configure_Alarm and Notify_Alarm are already handled here which are not yet finalized - elif message.key == "Notify_Alarm": - alarm_details = self.get_alarm_details(alarm_info['alarm_name'])#['alarm_id'] - - elif message.key == "Update_Alarm": - alarm_id = self.update_alarm_configuration(alarm_info) - log.info("Alarm Updated with alarmID: %s", alarm_id) - - elif message.key == "Delete_Alarm": - alarm_id = self.delete_alarm(alarm_info['alarm_name']) - log.info("Alarm Deleted with alarmID: %s", alarm_id) - - elif message.key == "Alarms_List": - self.get_alarms_list(alarm_info['resource_id'])#['alarm_names'] - else: - log.debug("Unknown key, no action will be performed") - else: - log.info("Message topic not relevant to this plugin: %s", - message.topic) - except Exception as e: - log.error("Consumer exception: %s", str(e)) -#--------------------------------------------------------------------------------------------------------------------------- -"""For Testing Purpose: Required calls to Trigger defined functions """ -'''obj = Plugin() -obj.connection() -obj.consumer() - -alarm_info = dict() -alarm_info['resource_id'] = 'i-098da78cbd8304e17' -alarm_info['alarm_name'] = 'alarm-6' -alarm_info['alarm_metric'] = 'CPUUtilization' -alarm_info['alarm_severity'] = 'Critical' -alarm_info['instance_type'] = 'AWS/EC2' -alarm_info['alarm_statistics'] = 'Maximum' -alarm_info['alarm_comparison'] = '>=' -alarm_info['alarm_threshold'] = 1.5 -alarm_info['alarm_period'] = 60 -alarm_info['alarm_evaluation_period'] = 1 -alarm_info['alarm_unit'] = None -alarm_info['alarm_description'] = '' - -#obj.configure_alarm(alarm_info) -#obj.update_alarm_configuration(alarm_info) -#obj.delete_alarm('alarm-5|i-098da78cbd8304e17') -#obj.get_alarms_list('i-098da78cbd8304e17')#['alarm_names'] -#obj.get_alarm_details('alarm-5|i-098da78cbd8304e17')#['alarm_id'] -#print obj.get_metrics_data('CPUUtilization','i-09462760703837b26','60',None) ''' \ No newline at end of file diff --git a/plugins/CloudWatch/plugin_alarm.py b/plugins/CloudWatch/plugin_alarm.py new file mode 100644 index 0000000..b3446bd --- /dev/null +++ b/plugins/CloudWatch/plugin_alarm.py @@ -0,0 +1,257 @@ +## +# Copyright 2017 xFlow Research Pvt. Ltd +# This file is part of MON module +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# For those usages not covered by the Apache License, Version 2.0 please +# contact with: wajeeha.hamid@xflowresearch.com +## + +''' +AWS-Plugin implements all the methods of MON to interact with AWS using the BOTO client +''' + +__author__ = "Wajeeha Hamid" +__date__ = "18-September-2017" + +import sys +import json +import logging as log +from jsmin import jsmin +from connection import Connection +from metric_alarms import MetricAlarm +from metrics import Metrics +from kafka import KafkaConsumer +sys.path.append("../../core/message-bus") +from producer import KafkaProducer + +class Plugin(): + """Receives Alarm info from MetricAlarm and connects with the consumer/producer""" + def __init__ (self): + self.conn = Connection() + self.metricAlarm = MetricAlarm() + self.metric = Metrics() + + server = {'server': 'localhost:9092', 'topic': 'alarm_request'} + + self._consumer = KafkaConsumer(server['topic'], bootstrap_servers=server['server']) + self._consumer.subscribe(['alarm_request']) + + #self.producer = KafkaProducer('create_alarm_request') + self.producer = KafkaProducer('') + + +#--------------------------------------------------------------------------------------------------------------------------- + def connection(self): + """Connecting instances with CloudWatch""" + self.conn.setEnvironment() + self.conn = self.conn.connection_instance() + self.cloudwatch_conn = self.conn['cloudwatch_connection'] + self.ec2_conn = self.conn['ec2_connection'] +#--------------------------------------------------------------------------------------------------------------------------- + def configure_alarm(self,alarm_info): + alarm_id = self.metricAlarm.config_alarm(self.cloudwatch_conn,alarm_info) + return alarm_id +#--------------------------------------------------------------------------------------------------------------------------- + def update_alarm_configuration(self,test): + alarm_id = self.metricAlarm.update_alarm(self.cloudwatch_conn,test) + return alarm_id +#--------------------------------------------------------------------------------------------------------------------------- + def delete_alarm(self,alarm_id): + return self.metricAlarm.delete_Alarm(self.cloudwatch_conn,alarm_id) +#--------------------------------------------------------------------------------------------------------------------------- + def get_alarms_list(self,instance_id): + return self.metricAlarm.alarms_list(self.cloudwatch_conn,instance_id) +#--------------------------------------------------------------------------------------------------------------------------- + def get_ack_details(self,ack_info): + return self.metricAlarm.alarm_details(self.cloudwatch_conn,ack_info) +#--------------------------------------------------------------------------------------------------------------------------- + def get_metrics_data(self,metric_name,period,instance_id): + return self.metric.metricsData(self.cloudwatch_conn,metric_name,period,instance_id) +#--------------------------------------------------------------------------------------------------------------------------- + + def consumer(self): + """Consume info from the message bus to manage alarms.""" + try: + for message in self._consumer: + # Check the Functionlity that needs to be performed: topic = 'alarms'/'metrics'/'Access_Credentials' + if message.topic == "alarm_request": + log.info("Action required against: %s" % (message.topic)) + alarm_info = json.loads(message.value) + + if message.key == "create_alarm_request": + if alarm_info['vim_type'] == 'AWS': + alarm_inner_dict = alarm_info['alarm_create_request'] + metric_status = self.check_metric(alarm_inner_dict['metric_name']) + + if self.check_resource(alarm_inner_dict['resource_uuid']) == True and metric_status['status'] == True: + log.debug ("Resource and Metrics exists") + + alarm_info['alarm_create_request']['metric_name'] = metric_status['metric_name'] + #Generate a valid response message, send via producer + config_resp = self.configure_alarm(alarm_info) #alarm_info = message.value + if config_resp == None: + log.debug("Alarm Already exists") + payload = json.dumps(config_resp) + file = open('../../core/models/create_alarm_resp.json','wb').write((payload)) + self.producer.create_alarm_response(key='create_alarm_response',message=payload,topic = 'alarm_response') + else: + payload = json.dumps(config_resp) + file = open('../../core/models/create_alarm_resp.json','wb').write((payload)) + + self.producer.create_alarm_response(key='create_alarm_response',message=payload,topic = 'alarm_response') + log.info("New alarm created with alarm info: %s", config_resp) + else: + log.error("Resource ID doesn't exists") + else: + log.error("Plugin inputs are incorrect") + + + elif message.key == "acknowledge_alarm": + alarm_inner_dict = alarm_info['ack_details'] + if alarm_info['vim_type'] == 'AWS': + if self.check_resource(alarm_inner_dict['resource_uuid']) == True: + alarm_info = json.loads(message.value) + #Generate a valid response message, send via producer + ack_details = self.get_ack_details(alarm_info) + payload = json.dumps(ack_details) + file = open('../../core/models/notify_alarm.json','wb').write((payload)) + self.producer.update_alarm_response(key='notify_alarm',message=payload,topic = 'alarm_response') + log.info("Acknowledge sent: %s", ack_details) + else: + log.error("Resource ID is Incorrect") + else: + log.error(" VIM type incorrect ") + + + elif message.key == "update_alarm_request": + if alarm_info['vim_type'] == 'AWS': + alarm_inner_dict = alarm_info['alarm_update_request'] + metric_status = self.check_metric(alarm_inner_dict['metric_name']) + + if metric_status['status'] == True: + log.debug ("Resource and Metrics exists") + alarm_info['alarm_update_request']['metric_name'] = metric_status['metric_name'] + #Generate a valid response message, send via producer + update_resp = self.update_alarm_configuration(alarm_info) + if update_resp == None: + payload = json.dumps(update_resp) + file = open('../../core/models/update_alarm_resp.json','wb').write((payload)) + self.producer.update_alarm_response(key='update_alarm_response',message=payload,topic = 'alarm_response') + log.debug("Alarm Already exists") + else: + payload = json.dumps(update_resp) + file = open('../../core/models/update_alarm_resp.json','wb').write((payload)) + self.producer.update_alarm_response(key='update_alarm_response',message=payload,topic = 'alarm_response') + log.info("Alarm Updated with alarm info: %s", update_resp) + else: + log.info ("Metric Not Supported") + else: + log.error(" VIM type Incorrect ") + + elif message.key == "delete_alarm_request": + if alarm_info['vim_type'] == 'AWS': + del_info = json.loads(message.value) + #Generate a valid response message, send via producer + del_resp = self.delete_alarm(del_info) + payload = json.dumps(del_resp) + file = open('../../core/models/delete_alarm_resp.json','wb').write((payload)) + self.producer.update_alarm_response(key='delete_alarm_response',message=payload,topic = 'alarm_response') + log.info("Alarm Deleted with alarm info: %s", del_resp) + else: + log.error(" VIM type Incorrect ") + + elif message.key == "alarm_list_request": + alarm_inner_dict = alarm_info['alarm_list_request'] + if alarm_info['vim_type'] == 'AWS': + if self.check_resource(alarm_inner_dict['resource_uuid']) == True: + #Generate a valid response message, send via producer + list_resp = self.get_alarms_list(alarm_info)#['alarm_names'] + payload = json.dumps(list_resp) + file = open('../../core/models/list_alarm_resp.json','wb').write((payload)) + self.producer.update_alarm_response(key='list_alarm_response',message=payload,topic = 'alarm_response') + else: + log.error("Resource ID is Incorrect") + else: + log.error(" VIM type Incorrect ") + + else: + log.debug("Unknown key, no action will be performed") + + else: + log.info("Message topic not relevant to this plugin: %s", + message.topic) + except Exception as e: + log.error("Consumer exception: %s", str(e)) +#--------------------------------------------------------------------------------------------------------------------------- + def check_resource(self,resource_uuid): + '''Finding Resource with the resource_uuid''' + try: + check_resp = dict() + instances = self.ec2_conn.get_all_instance_status() + + #resource_id + for instance_id in instances: + instance_id = str(instance_id).split(':')[1] + if instance_id == resource_uuid: + check_resp['resource_uuid'] = resource_uuid + return True + return False + + except Exception as e: + log.error("Error in Plugin Inputs %s",str(e)) +#--------------------------------------------------------------------------------------------------------------------------- + def check_metric(self,metric_name): + ''' Checking whether the metric is supported by AWS ''' + try: + check_resp = dict() + + #metric_name + if metric_name == 'CPU_UTILIZATION': + metric_name = 'CPUUtilization' + metric_status = True + elif metric_name == 'DISK_READ_OPS': + metric_name = 'DiskReadOps' + metric_status = True + elif metric_name == 'DISK_WRITE_OPS': + metric_name = 'DiskWriteOps' + metric_status = True + elif metric_name == 'DISK_READ_BYTES': + metric_name = 'DiskReadBytes' + metric_status = True + elif metric_name == 'DISK_WRITE_BYTES': + metric_name = 'DiskWriteBytes' + metric_status = True + elif metric_name == 'PACKETS_RECEIVED': + metric_name = 'NetworkPacketsIn' + metric_status = True + elif metric_name == 'PACKETS_SENT': + metric_name = 'NetworkPacketsOut' + metric_status = True + else: + metric_name = None + metric_status = False + check_resp['metric_name'] = metric_name + #status + if metric_status == True: + check_resp['status'] = True + return check_resp + except Exception as e: + log.error("Error in Plugin Inputs %s",str(e)) +#--------------------------------------------------------------------------------------------------------------------------- + +obj = Plugin() +obj.connection() +obj.consumer() diff --git a/plugins/CloudWatch/plugin_metrics.py b/plugins/CloudWatch/plugin_metrics.py new file mode 100644 index 0000000..cb04a65 --- /dev/null +++ b/plugins/CloudWatch/plugin_metrics.py @@ -0,0 +1,224 @@ +## +# Copyright 2017 xFlow Research Pvt. Ltd +# This file is part of MON module +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# For those usages not covered by the Apache License, Version 2.0 please +# contact with: wajeeha.hamid@xflowresearch.com +## + +''' +AWS-Plugin implements all the methods of MON to interact with AWS using the BOTO client +''' + +__author__ = "Wajeeha Hamid" +__date__ = "18-September-2017" + +import sys +import json +from connection import Connection +from metric_alarms import MetricAlarm +from metrics import Metrics +# Need to import the producer message bus,not working yet +#from core.message_bus.producerfunct import KafkaProducer +sys.path.append("../../core/message-bus") +from producer import KafkaProducer +from kafka import KafkaConsumer +import logging as log + +class plugin_metrics(): + """Receives Alarm info from MetricAlarm and connects with the consumer/producer """ + def __init__ (self): + self.conn = Connection() + self.metric = Metrics() + + #server = {'server': 'localhost:9092', 'topic': 'metrics_request'} + #Initialize a Consumer object to consume message from the SO + self._consumer = KafkaConsumer(bootstrap_servers='localhost:9092') + self._consumer.subscribe(['metric_request']) + + #producer = KafkaProducer('create_metric_request') + + self.producer = KafkaProducer('') +#--------------------------------------------------------------------------------------------------------------------------- + def connection(self): + try: + """Connecting instances with CloudWatch""" + self.conn.setEnvironment() + self.conn = self.conn.connection_instance() + self.cloudwatch_conn = self.conn['cloudwatch_connection'] + self.ec2_conn = self.conn['ec2_connection'] + + except Exception as e: + log.error("Failed to Connect with AWS %s: " + str(e)) +#--------------------------------------------------------------------------------------------------------------------------- + def create_metric_request(self,metric_info): + '''Comaptible API using normalized parameters''' + metric_resp = self.metric.createMetrics(self.cloudwatch_conn,metric_info) + return metric_resp +#--------------------------------------------------------------------------------------------------------------------------- + def update_metric_request(self,updated_info): + '''Comaptible API using normalized parameters''' + update_resp = self.metric.updateMetrics(self.cloudwatch_conn,updated_info) + return update_resp +#--------------------------------------------------------------------------------------------------------------------------- + def delete_metric_request(self,delete_info): + '''Comaptible API using normalized parameters''' + del_resp = self.metric.deleteMetrics(self.cloudwatch_conn,delete_info) + return del_resp +#--------------------------------------------------------------------------------------------------------------------------- + def list_metrics_request(self,list_info): + '''Comaptible API using normalized parameters''' + list_resp = self.metric.listMetrics(self.cloudwatch_conn,list_info) + return list_resp +#--------------------------------------------------------------------------------------------------------------------------- + def read_metrics_data(self,list_info): + '''Comaptible API using normalized parameters + Read all metric data related to a specified metric''' + data_resp=self.metric.metricsData(self.cloudwatch_conn,list_info) + return data_resp +#--------------------------------------------------------------------------------------------------------------------------- + + def consumer(self): + '''Consumer will consume the message from SO, + 1) parse the message and trigger the methods ac + cording to keys and topics provided in request. + + 2) The response from plugin is saved in json format. + + 3) The producer object then calls the producer response + methods to send the response back to message bus + ''' + + try: + for message in self._consumer: + + metric_info = json.loads(message.value) + metric_response = dict() + + if metric_info['vim_type'] == 'AWS': + log.debug ("VIM support : AWS") + + # Check the Functionlity that needs to be performed: topic = 'alarms'/'metrics'/'Access_Credentials' + if message.topic == "metric_request": + log.info("Action required against: %s" % (message.topic)) + + if message.key == "create_metric_request": + if self.check_resource(metric_info['metric_create']['resource_uuid']) == True: + metric_resp = self.create_metric_request(metric_info['metric_create']) #alarm_info = message.value + metric_response['schema_version'] = metric_info['schema_version'] + metric_response['schema_type'] = "create_metric_response" + metric_response['metric_create_response'] = metric_resp + payload = json.dumps(metric_response) + file = open('../../core/models/create_metric_resp.json','wb').write((payload)) + self.producer.create_metrics_resp(key='create_metric_response',message=payload,topic = 'metric_response') + + log.info("Metric configured: %s", metric_resp) + return metric_response + + elif message.key == "update_metric_request": + if self.check_resource(metric_info['metric_create']['resource_uuid']) == True: + update_resp = self.update_metric_request(metric_info['metric_create']) + metric_response['schema_version'] = metric_info['schema_version'] + metric_response['schema_type'] = "update_metric_response" + metric_response['metric_update_response'] = update_resp + payload = json.dumps(metric_response) + file = open('../../core/models/update_metric_resp.json','wb').write((payload)) + self.producer.create_metrics_resp(key='update_metric_response',message=payload,topic = 'metric_response') + + log.info("Metric Updates: %s",metric_response) + return metric_response + + elif message.key == "delete_metric_request": + if self.check_resource(metric_info['resource_uuid']) == True: + del_resp=self.delete_metric_request(metric_info) + payload = json.dumps(del_resp) + file = open('../../core/models/delete_metric_resp.json','wb').write((payload)) + self.producer.create_metrics_resp(key='delete_metric_response',message=payload,topic = 'metric_response') + + log.info("Metric Deletion Not supported in AWS : %s",del_resp) + return del_resp + + elif message.key == "list_metric_request": + if self.check_resource(metric_info['metrics_list_request']['resource_uuid']) == True: + list_resp = self.list_metrics_request(metric_info['metrics_list_request']) + metric_response['schema_version'] = metric_info['schema_version'] + metric_response['schema_type'] = "list_metric_response" + metric_response['correlation_id'] = metric_info['metrics_list_request']['correlation_id'] + metric_response['vim_type'] = metric_info['vim_type'] + metric_response['metrics_list'] = list_resp + payload = json.dumps(metric_response) + file = open('../../core/models/list_metric_resp.json','wb').write((payload)) + self.producer.create_metrics_resp(key='list_metrics_response',message=payload,topic = 'metric_response') + + log.info("Metric List: %s",metric_response) + return metric_response + + elif message.key == "read_metric_data_request": + if self.check_resource(metric_info['resource_uuid']) == True: + data_resp = self.read_metrics_data(metric_info) + metric_response['schema_version'] = metric_info['schema_version'] + metric_response['schema_type'] = "list_metric_response" + metric_response['metric_name'] = metric_info['metric_name'] + metric_response['metric_uuid'] = metric_info['metric_uuid'] + metric_response['correlation_id'] = metric_info['correlation_uuid'] + metric_response['resource_uuid'] = metric_info['resource_uuid'] + metric_response['tenant_uuid'] = metric_info['tenant_uuid'] + metric_response['metrics_data'] = data_resp + payload = json.dumps(metric_response) + + file = open('../../core/models/read_metric_data_resp.json','wb').write((payload)) + self.producer.create_metrics_resp(key='read_metric_data_response',message=payload,topic = 'metric_response') + log.info("Metric Data Response: %s",metric_response) + return metric_response + + else: + log.debug("Unknown key, no action will be performed") + else: + log.info("Message topic not relevant to this plugin: %s", + message.topic) + else: + print "Bad VIM Request" + except Exception as e: + log.error("Consumer exception: %s", str(e)) + +#--------------------------------------------------------------------------------------------------------------------------- + def check_resource(self,resource_uuid): + + '''Checking the resource_uuid is present in EC2 instances''' + try: + check_resp = dict() + instances = self.ec2_conn.get_all_instance_status() + status_resource = False + + #resource_id + for instance_id in instances: + instance_id = str(instance_id).split(':')[1] + if instance_id == resource_uuid: + check_resp['resource_uuid'] = resource_uuid + status_resource = True + else: + status_resource = False + + #status + return status_resource + + except Exception as e: + log.error("Error in Plugin Inputs %s",str(e)) +#--------------------------------------------------------------------------------------------------------------------------- + +obj = plugin_metrics() +obj.connection() +obj.consumer()