X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fplugins%2FCloudWatch%2Fplugin_alarm.py;h=40e7fe55de062f9bd32f12d0f586779d071e7100;hb=53ad3c4c5d87a5b392a74cf386c29c67276ed3cb;hp=eb48208ff148bed2846d367b76052a5cb570caa0;hpb=c7397b95dbaeebd7d872779eec809daed9e487cc;p=osm%2FMON.git diff --git a/osm_mon/plugins/CloudWatch/plugin_alarm.py b/osm_mon/plugins/CloudWatch/plugin_alarm.py index eb48208..40e7fe5 100644 --- a/osm_mon/plugins/CloudWatch/plugin_alarm.py +++ b/osm_mon/plugins/CloudWatch/plugin_alarm.py @@ -22,38 +22,25 @@ ''' AWS-Plugin implements all the methods of MON to interact with AWS using the BOTO client ''' +from io import open +from osm_mon.core.message_bus.producer import KafkaProducer +from osm_mon.plugins.CloudWatch.metric_alarms import MetricAlarm +from osm_mon.plugins.CloudWatch.metrics import Metrics __author__ = "Wajeeha Hamid" __date__ = "18-September-2017" -import sys import json -import logging as log -from jsmin import jsmin -from connection import Connection -from metric_alarms import MetricAlarm -from metrics import Metrics -from kafka import KafkaConsumer -sys.path.append("../../core/message-bus") -from producer import KafkaProducer - -class Plugin(): +import logging + +log = logging.getLogger(__name__) + +class plugin_alarms(): """Receives Alarm info from MetricAlarm and connects with the consumer/producer""" def __init__ (self): - self.conn = Connection() self.metricAlarm = MetricAlarm() self.metric = Metrics() - server = {'server': 'localhost:9092', 'topic': 'alarm_request'} - self._consumer = KafkaConsumer(server['topic'], bootstrap_servers=server['server']) - self._consumer.subscribe(['alarm_request']) self.producer = KafkaProducer('') -#--------------------------------------------------------------------------------------------------------------------------- - def connection(self): - """Connecting instances with CloudWatch""" - self.conn.setEnvironment() - self.conn = self.conn.connection_instance() - self.cloudwatch_conn = self.conn['cloudwatch_connection'] - self.ec2_conn = self.conn['ec2_connection'] #--------------------------------------------------------------------------------------------------------------------------- def configure_alarm(self,alarm_info): alarm_id = self.metricAlarm.config_alarm(self.cloudwatch_conn,alarm_info) @@ -76,118 +63,111 @@ class Plugin(): return self.metric.metricsData(self.cloudwatch_conn,metric_name,period,instance_id) #--------------------------------------------------------------------------------------------------------------------------- - def consumer(self): - """Consume info from the message bus to manage alarms.""" - try: - for message in self._consumer: - # Check the Functionlity that needs to be performed: topic = 'alarms'/'metrics'/'Access_Credentials' - if message.topic == "alarm_request": - log.info("Action required against: %s" % (message.topic)) + def alarm_calls(self,message,aws_conn): + """Gets the message from the common consumer""" + try: + self.cloudwatch_conn = aws_conn['cloudwatch_connection'] + self.ec2_conn = aws_conn['ec2_connection'] + + log.info("Action required against: %s" % (message.topic)) + alarm_info = json.loads(message.value) + + if message.key == "create_alarm_request": + alarm_inner_dict = alarm_info['alarm_create_request'] + metric_status = self.check_metric(alarm_inner_dict['metric_name']) + + if self.check_resource(alarm_inner_dict['resource_uuid']) == True and metric_status['status'] == True: + log.debug ("Resource and Metrics exists") + + alarm_info['alarm_create_request']['metric_name'] = metric_status['metric_name'] + #Generate a valid response message, send via producer + config_resp = self.configure_alarm(alarm_info) #alarm_info = message.value + + if config_resp == None: + log.debug("Alarm Already exists") + payload = json.dumps(config_resp) + file = open('../../core/models/create_alarm_resp.json','wb').write((payload)) + self.producer.publish_alarm_response(key='create_alarm_response',message=payload) + + else: + payload = json.dumps(config_resp) + file = open('../../core/models/create_alarm_resp.json','wb').write((payload)) + self.producer.publish_alarm_response(key='create_alarm_response',message=payload) + log.info("New alarm created with alarm info: %s", config_resp) + + else: + log.error("Resource ID doesn't exists") + + elif message.key == "acknowledge_alarm": + alarm_inner_dict = alarm_info['ack_details'] + + if self.check_resource(alarm_inner_dict['resource_uuid']) == True: alarm_info = json.loads(message.value) + #Generate a valid response message, send via producer + ack_details = self.get_ack_details(alarm_info) + payload = json.dumps(ack_details) + file = open('../../core/models/notify_alarm.json','wb').write((payload)) + self.producer.notify_alarm(key='notify_alarm',message=payload) + log.info("Acknowledge sent: %s", ack_details) + + else: + log.error("Resource ID is Incorrect") + + + elif message.key == "update_alarm_request": + alarm_inner_dict = alarm_info['alarm_update_request'] + metric_status = self.check_metric(alarm_inner_dict['metric_name']) + + if metric_status['status'] == True: + log.debug ("Resource and Metrics exists") + alarm_info['alarm_update_request']['metric_name'] = metric_status['metric_name'] + #Generate a valid response message, send via producer + update_resp = self.update_alarm_configuration(alarm_info) + + if update_resp == None: + payload = json.dumps(update_resp) + file = open('../../core/models/update_alarm_resp.json','wb').write((payload)) + self.producer.update_alarm_response(key='update_alarm_response',message=payload) + log.debug("Alarm Already exists") + + else: + payload = json.dumps(update_resp) + file = open('../../core/models/update_alarm_resp.json','wb').write((payload)) + self.producer.update_alarm_response(key='update_alarm_response',message=payload) + log.info("Alarm Updated with alarm info: %s", update_resp) + + else: + log.info ("Metric Not Supported") + + + elif message.key == "delete_alarm_request": + del_info = json.loads(message.value) + #Generate a valid response message, send via producer + del_resp = self.delete_alarm(del_info) + payload = json.dumps(del_resp) + file = open('../../core/models/delete_alarm_resp.json','wb').write((payload)) + self.producer.delete_alarm_response(key='delete_alarm_response',message=payload) + log.info("Alarm Deleted with alarm info: %s", del_resp) + + + elif message.key == "alarm_list_request": + alarm_inner_dict = alarm_info['alarm_list_request'] + + if self.check_resource(alarm_inner_dict['resource_uuid']) == True or alarm_inner_dict['resource_uuid'] == "": + #Generate a valid response message, send via producer + list_resp = self.get_alarms_list(alarm_info)#['alarm_names'] + payload = json.dumps(list_resp) + file = open('../../core/models/list_alarm_resp.json','wb').write((payload)) + self.producer.list_alarm_response(key='list_alarm_response',message=payload) - if message.key == "create_alarm_request": - if alarm_info['vim_type'] == 'AWS': - alarm_inner_dict = alarm_info['alarm_create_request'] - metric_status = self.check_metric(alarm_inner_dict['metric_name']) - if self.check_resource(alarm_inner_dict['resource_uuid']) == True and metric_status['status'] == True: - log.debug ("Resource and Metrics exists") - - alarm_info['alarm_create_request']['metric_name'] = metric_status['metric_name'] - #Generate a valid response message, send via producer - config_resp = self.configure_alarm(alarm_info) #alarm_info = message.value - if config_resp == None: - log.debug("Alarm Already exists") - payload = json.dumps(config_resp) - file = open('../../core/models/create_alarm_resp.json','wb').write((payload)) - self.producer.create_alarm_response(key='create_alarm_response',message=payload,topic = 'alarm_response') - else: - payload = json.dumps(config_resp) - file = open('../../core/models/create_alarm_resp.json','wb').write((payload)) - - self.producer.create_alarm_response(key='create_alarm_response',message=payload,topic = 'alarm_response') - log.info("New alarm created with alarm info: %s", config_resp) - else: - log.error("Resource ID doesn't exists") - else: - log.error("Plugin inputs are incorrect") - - - elif message.key == "acknowledge_alarm": - alarm_inner_dict = alarm_info['ack_details'] - if alarm_info['vim_type'] == 'AWS': - if self.check_resource(alarm_inner_dict['resource_uuid']) == True: - alarm_info = json.loads(message.value) - #Generate a valid response message, send via producer - ack_details = self.get_ack_details(alarm_info) - payload = json.dumps(ack_details) - file = open('../../core/models/notify_alarm.json','wb').write((payload)) - self.producer.notify_alarm(key='notify_alarm',message=payload,topic = 'alarm_response') - log.info("Acknowledge sent: %s", ack_details) - else: - log.error("Resource ID is Incorrect") - else: - log.error(" VIM type incorrect ") - - - elif message.key == "update_alarm_request": - if alarm_info['vim_type'] == 'AWS': - alarm_inner_dict = alarm_info['alarm_update_request'] - metric_status = self.check_metric(alarm_inner_dict['metric_name']) - - if metric_status['status'] == True: - log.debug ("Resource and Metrics exists") - alarm_info['alarm_update_request']['metric_name'] = metric_status['metric_name'] - #Generate a valid response message, send via producer - update_resp = self.update_alarm_configuration(alarm_info) - if update_resp == None: - payload = json.dumps(update_resp) - file = open('../../core/models/update_alarm_resp.json','wb').write((payload)) - self.producer.update_alarm_response(key='update_alarm_response',message=payload,topic = 'alarm_response') - log.debug("Alarm Already exists") - else: - payload = json.dumps(update_resp) - file = open('../../core/models/update_alarm_resp.json','wb').write((payload)) - self.producer.update_alarm_response(key='update_alarm_response',message=payload,topic = 'alarm_response') - log.info("Alarm Updated with alarm info: %s", update_resp) - else: - log.info ("Metric Not Supported") - else: - log.error(" VIM type Incorrect ") - - elif message.key == "delete_alarm_request": - if alarm_info['vim_type'] == 'AWS': - del_info = json.loads(message.value) - #Generate a valid response message, send via producer - del_resp = self.delete_alarm(del_info) - payload = json.dumps(del_resp) - file = open('../../core/models/delete_alarm_resp.json','wb').write((payload)) - self.producer.delete_alarm_response(key='delete_alarm_response',message=payload,topic = 'alarm_response') - log.info("Alarm Deleted with alarm info: %s", del_resp) - else: - log.error(" VIM type Incorrect ") - - elif message.key == "alarm_list_request": - alarm_inner_dict = alarm_info['alarm_list_request'] - if alarm_info['vim_type'] == 'AWS': - if self.check_resource(alarm_inner_dict['resource_uuid']) == True or alarm_inner_dict['resource_uuid'] == "": - #Generate a valid response message, send via producer - list_resp = self.get_alarms_list(alarm_info)#['alarm_names'] - payload = json.dumps(list_resp) - file = open('../../core/models/list_alarm_resp.json','wb').write((payload)) - self.producer.list_alarm_response(key='list_alarm_response',message=payload,topic = 'alarm_response') - else: - log.error("Resource ID is Incorrect") - else: - log.error(" VIM type Incorrect ") - - else: - log.debug("Unknown key, no action will be performed") - else: - log.info("Message topic not relevant to this plugin: %s", - message.topic) + log.error("Resource ID is Incorrect") + + else: + log.debug("Unknown key, no action will be performed") + except Exception as e: - log.error("Consumer exception: %s", str(e)) + log.error("Message retrieval exception: %s", str(e)) #--------------------------------------------------------------------------------------------------------------------------- def check_resource(self,resource_uuid): '''Finding Resource with the resource_uuid''' @@ -198,6 +178,7 @@ class Plugin(): #resource_id for instance_id in instances: instance_id = str(instance_id).split(':')[1] + if instance_id == resource_uuid: check_resp['resource_uuid'] = resource_uuid return True @@ -215,36 +196,41 @@ class Plugin(): if metric_name == 'CPU_UTILIZATION': metric_name = 'CPUUtilization' metric_status = True + elif metric_name == 'DISK_READ_OPS': metric_name = 'DiskReadOps' metric_status = True + elif metric_name == 'DISK_WRITE_OPS': metric_name = 'DiskWriteOps' metric_status = True + elif metric_name == 'DISK_READ_BYTES': metric_name = 'DiskReadBytes' metric_status = True + elif metric_name == 'DISK_WRITE_BYTES': metric_name = 'DiskWriteBytes' metric_status = True + elif metric_name == 'PACKETS_RECEIVED': metric_name = 'NetworkPacketsIn' metric_status = True + elif metric_name == 'PACKETS_SENT': metric_name = 'NetworkPacketsOut' metric_status = True + else: metric_name = None metric_status = False check_resp['metric_name'] = metric_name #status + if metric_status == True: check_resp['status'] = True return check_resp + except Exception as e: log.error("Error in Plugin Inputs %s",str(e)) #--------------------------------------------------------------------------------------------------------------------------- - -obj = Plugin() -obj.connection() -obj.consumer()