X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fplugins%2FCloudWatch%2Fplugin_alarm.py;h=c125bab36e5a5fc7a7953a003250dce3d0470024;hb=93699898c51364cde193d8d441f4aed45670e7bf;hp=eb48208ff148bed2846d367b76052a5cb570caa0;hpb=c7397b95dbaeebd7d872779eec809daed9e487cc;p=osm%2FMON.git diff --git a/osm_mon/plugins/CloudWatch/plugin_alarm.py b/osm_mon/plugins/CloudWatch/plugin_alarm.py index eb48208..c125bab 100644 --- a/osm_mon/plugins/CloudWatch/plugin_alarm.py +++ b/osm_mon/plugins/CloudWatch/plugin_alarm.py @@ -19,232 +19,197 @@ # contact with: wajeeha.hamid@xflowresearch.com ## -''' +""" AWS-Plugin implements all the methods of MON to interact with AWS using the BOTO client -''' +""" +from io import UnsupportedOperation + +from osm_mon.core.settings import Config +from osm_mon.plugins.CloudWatch.metric_alarms import MetricAlarm +from osm_mon.plugins.CloudWatch.metrics import Metrics __author__ = "Wajeeha Hamid" -__date__ = "18-September-2017" - -import sys -import json -import logging as log -from jsmin import jsmin -from connection import Connection -from metric_alarms import MetricAlarm -from metrics import Metrics -from kafka import KafkaConsumer -sys.path.append("../../core/message-bus") -from producer import KafkaProducer - -class Plugin(): +__date__ = "18-September-2017" + +import logging + +log = logging.getLogger(__name__) + + +class plugin_alarms: """Receives Alarm info from MetricAlarm and connects with the consumer/producer""" - def __init__ (self): - self.conn = Connection() + + def __init__(self): + self._cfg = Config.instance() self.metricAlarm = MetricAlarm() self.metric = Metrics() - server = {'server': 'localhost:9092', 'topic': 'alarm_request'} - self._consumer = KafkaConsumer(server['topic'], bootstrap_servers=server['server']) - self._consumer.subscribe(['alarm_request']) - self.producer = KafkaProducer('') -#--------------------------------------------------------------------------------------------------------------------------- - def connection(self): - """Connecting instances with CloudWatch""" - self.conn.setEnvironment() - self.conn = self.conn.connection_instance() - self.cloudwatch_conn = self.conn['cloudwatch_connection'] - self.ec2_conn = self.conn['ec2_connection'] -#--------------------------------------------------------------------------------------------------------------------------- - def configure_alarm(self,alarm_info): - alarm_id = self.metricAlarm.config_alarm(self.cloudwatch_conn,alarm_info) + + def configure_alarm(self, alarm_info): + alarm_id = self.metricAlarm.config_alarm(self.cloudwatch_conn, alarm_info) return alarm_id -#--------------------------------------------------------------------------------------------------------------------------- - def update_alarm_configuration(self,test): - alarm_id = self.metricAlarm.update_alarm(self.cloudwatch_conn,test) + + def update_alarm_configuration(self, test): + alarm_id = self.metricAlarm.update_alarm(self.cloudwatch_conn, test) return alarm_id -#--------------------------------------------------------------------------------------------------------------------------- - def delete_alarm(self,alarm_id): - return self.metricAlarm.delete_Alarm(self.cloudwatch_conn,alarm_id) -#--------------------------------------------------------------------------------------------------------------------------- - def get_alarms_list(self,instance_id): - return self.metricAlarm.alarms_list(self.cloudwatch_conn,instance_id) -#--------------------------------------------------------------------------------------------------------------------------- - def get_ack_details(self,ack_info): - return self.metricAlarm.alarm_details(self.cloudwatch_conn,ack_info) -#--------------------------------------------------------------------------------------------------------------------------- - def get_metrics_data(self,metric_name,period,instance_id): - return self.metric.metricsData(self.cloudwatch_conn,metric_name,period,instance_id) -#--------------------------------------------------------------------------------------------------------------------------- - - def consumer(self): - """Consume info from the message bus to manage alarms.""" - try: - for message in self._consumer: - # Check the Functionlity that needs to be performed: topic = 'alarms'/'metrics'/'Access_Credentials' - if message.topic == "alarm_request": - log.info("Action required against: %s" % (message.topic)) - alarm_info = json.loads(message.value) - - if message.key == "create_alarm_request": - if alarm_info['vim_type'] == 'AWS': - alarm_inner_dict = alarm_info['alarm_create_request'] - metric_status = self.check_metric(alarm_inner_dict['metric_name']) - if self.check_resource(alarm_inner_dict['resource_uuid']) == True and metric_status['status'] == True: - log.debug ("Resource and Metrics exists") - - alarm_info['alarm_create_request']['metric_name'] = metric_status['metric_name'] - #Generate a valid response message, send via producer - config_resp = self.configure_alarm(alarm_info) #alarm_info = message.value - if config_resp == None: - log.debug("Alarm Already exists") - payload = json.dumps(config_resp) - file = open('../../core/models/create_alarm_resp.json','wb').write((payload)) - self.producer.create_alarm_response(key='create_alarm_response',message=payload,topic = 'alarm_response') - else: - payload = json.dumps(config_resp) - file = open('../../core/models/create_alarm_resp.json','wb').write((payload)) - - self.producer.create_alarm_response(key='create_alarm_response',message=payload,topic = 'alarm_response') - log.info("New alarm created with alarm info: %s", config_resp) - else: - log.error("Resource ID doesn't exists") - else: - log.error("Plugin inputs are incorrect") - - - elif message.key == "acknowledge_alarm": - alarm_inner_dict = alarm_info['ack_details'] - if alarm_info['vim_type'] == 'AWS': - if self.check_resource(alarm_inner_dict['resource_uuid']) == True: - alarm_info = json.loads(message.value) - #Generate a valid response message, send via producer - ack_details = self.get_ack_details(alarm_info) - payload = json.dumps(ack_details) - file = open('../../core/models/notify_alarm.json','wb').write((payload)) - self.producer.notify_alarm(key='notify_alarm',message=payload,topic = 'alarm_response') - log.info("Acknowledge sent: %s", ack_details) - else: - log.error("Resource ID is Incorrect") - else: - log.error(" VIM type incorrect ") - - - elif message.key == "update_alarm_request": - if alarm_info['vim_type'] == 'AWS': - alarm_inner_dict = alarm_info['alarm_update_request'] - metric_status = self.check_metric(alarm_inner_dict['metric_name']) - - if metric_status['status'] == True: - log.debug ("Resource and Metrics exists") - alarm_info['alarm_update_request']['metric_name'] = metric_status['metric_name'] - #Generate a valid response message, send via producer - update_resp = self.update_alarm_configuration(alarm_info) - if update_resp == None: - payload = json.dumps(update_resp) - file = open('../../core/models/update_alarm_resp.json','wb').write((payload)) - self.producer.update_alarm_response(key='update_alarm_response',message=payload,topic = 'alarm_response') - log.debug("Alarm Already exists") - else: - payload = json.dumps(update_resp) - file = open('../../core/models/update_alarm_resp.json','wb').write((payload)) - self.producer.update_alarm_response(key='update_alarm_response',message=payload,topic = 'alarm_response') - log.info("Alarm Updated with alarm info: %s", update_resp) - else: - log.info ("Metric Not Supported") - else: - log.error(" VIM type Incorrect ") - - elif message.key == "delete_alarm_request": - if alarm_info['vim_type'] == 'AWS': - del_info = json.loads(message.value) - #Generate a valid response message, send via producer - del_resp = self.delete_alarm(del_info) - payload = json.dumps(del_resp) - file = open('../../core/models/delete_alarm_resp.json','wb').write((payload)) - self.producer.delete_alarm_response(key='delete_alarm_response',message=payload,topic = 'alarm_response') - log.info("Alarm Deleted with alarm info: %s", del_resp) - else: - log.error(" VIM type Incorrect ") - - elif message.key == "alarm_list_request": - alarm_inner_dict = alarm_info['alarm_list_request'] - if alarm_info['vim_type'] == 'AWS': - if self.check_resource(alarm_inner_dict['resource_uuid']) == True or alarm_inner_dict['resource_uuid'] == "": - #Generate a valid response message, send via producer - list_resp = self.get_alarms_list(alarm_info)#['alarm_names'] - payload = json.dumps(list_resp) - file = open('../../core/models/list_alarm_resp.json','wb').write((payload)) - self.producer.list_alarm_response(key='list_alarm_response',message=payload,topic = 'alarm_response') - else: - log.error("Resource ID is Incorrect") - else: - log.error(" VIM type Incorrect ") - + + def delete_alarm(self, alarm_id): + return self.metricAlarm.delete_Alarm(self.cloudwatch_conn, alarm_id) + + def get_alarms_list(self, instance_id): + return self.metricAlarm.alarms_list(self.cloudwatch_conn, instance_id) + + def get_ack_details(self, ack_info): + return self.metricAlarm.alarm_details(self.cloudwatch_conn, ack_info) + + def get_metrics_data(self, metric_name, period, instance_id): + # TODO: Investigate and fix this call + return self.metric.metricsData(self.cloudwatch_conn, metric_name, period, instance_id) + + def alarm_calls(self, key: str, alarm_info: dict, aws_conn: dict): + """Gets the message from the common consumer""" + try: + self.cloudwatch_conn = aws_conn['cloudwatch_connection'] + self.ec2_conn = aws_conn['ec2_connection'] + + if key == "create_alarm_request": + alarm_inner_dict = alarm_info['alarm_create_request'] + metric_status = self.check_metric(alarm_inner_dict['metric_name']) + + if self.check_resource(alarm_inner_dict['resource_uuid']) and metric_status['status']: + log.debug("Resource and Metrics exists") + + alarm_info['alarm_create_request']['metric_name'] = metric_status['metric_name'] + # Generate a valid response message, send via producer + config_resp = self.configure_alarm(alarm_info) # alarm_info = message.value + + if config_resp is None: + log.debug("Alarm Already exists") + # TODO: This should return a response with status False + return config_resp + else: - log.debug("Unknown key, no action will be performed") - + log.info("New alarm created with alarm info: %s", config_resp) + return config_resp + else: - log.info("Message topic not relevant to this plugin: %s", - message.topic) + log.error("Resource ID doesn't exists") + + elif key == "acknowledge_alarm": + alarm_inner_dict = alarm_info['ack_details'] + + if self.check_resource(alarm_inner_dict['resource_uuid']): + ack_details = self.get_ack_details(alarm_info) + log.info("Acknowledge sent: %s", ack_details) + return ack_details + + else: + log.error("Resource ID is Incorrect") + + elif key == "update_alarm_request": + alarm_inner_dict = alarm_info['alarm_update_request'] + metric_status = self.check_metric(alarm_inner_dict['metric_name']) + + if metric_status['status']: + log.debug("Resource and Metrics exists") + alarm_info['alarm_update_request']['metric_name'] = metric_status['metric_name'] + # Generate a valid response message, send via producer + update_resp = self.update_alarm_configuration(alarm_info) + + if update_resp is None: + # TODO: This should return a response with status False + log.debug("Alarm Already exists") + return update_resp + + else: + log.info("Alarm Updated with alarm info: %s", update_resp) + return update_resp + + else: + log.info("Metric Not Supported") + + elif key == "delete_alarm_request": + # Generate a valid response message, send via producer + del_resp = self.delete_alarm(alarm_info) + log.info("Alarm Deleted with alarm info: %s", del_resp) + return del_resp + + elif key == "alarm_list_request": + alarm_inner_dict = alarm_info['alarm_list_request'] + + if self.check_resource(alarm_inner_dict['resource_uuid']) or alarm_inner_dict['resource_uuid'] == "": + # Generate a valid response message, send via producer + list_resp = self.get_alarms_list(alarm_info) # ['alarm_names'] + return list_resp + else: + log.error("Resource ID is Incorrect") + + else: + raise UnsupportedOperation("Unknown key, no action will be performed") + except Exception as e: - log.error("Consumer exception: %s", str(e)) -#--------------------------------------------------------------------------------------------------------------------------- - def check_resource(self,resource_uuid): - '''Finding Resource with the resource_uuid''' + log.error("Message retrieval exception: %s", str(e)) + + def check_resource(self, resource_uuid): + """Finding Resource with the resource_uuid""" try: check_resp = dict() instances = self.ec2_conn.get_all_instance_status() - #resource_id + # resource_id for instance_id in instances: instance_id = str(instance_id).split(':')[1] + if instance_id == resource_uuid: check_resp['resource_uuid'] = resource_uuid - return True + return True return False - except Exception as e: - log.error("Error in Plugin Inputs %s",str(e)) -#--------------------------------------------------------------------------------------------------------------------------- - def check_metric(self,metric_name): - ''' Checking whether the metric is supported by AWS ''' + except Exception as e: + log.error("Error in Plugin Inputs %s", str(e)) + + def check_metric(self, metric_name): + """ Checking whether the metric is supported by AWS """ try: check_resp = dict() - - #metric_name + + # metric_name if metric_name == 'CPU_UTILIZATION': metric_name = 'CPUUtilization' metric_status = True + elif metric_name == 'DISK_READ_OPS': metric_name = 'DiskReadOps' metric_status = True + elif metric_name == 'DISK_WRITE_OPS': metric_name = 'DiskWriteOps' metric_status = True + elif metric_name == 'DISK_READ_BYTES': metric_name = 'DiskReadBytes' metric_status = True + elif metric_name == 'DISK_WRITE_BYTES': metric_name = 'DiskWriteBytes' metric_status = True + elif metric_name == 'PACKETS_RECEIVED': metric_name = 'NetworkPacketsIn' metric_status = True + elif metric_name == 'PACKETS_SENT': metric_name = 'NetworkPacketsOut' metric_status = True + else: metric_name = None metric_status = False check_resp['metric_name'] = metric_name - #status - if metric_status == True: + # status + + if metric_status: check_resp['status'] = True - return check_resp - except Exception as e: - log.error("Error in Plugin Inputs %s",str(e)) -#--------------------------------------------------------------------------------------------------------------------------- - -obj = Plugin() -obj.connection() -obj.consumer() + return check_resp + + except Exception as e: + log.error("Error in Plugin Inputs %s", str(e))