X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fplugins%2FCloudWatch%2Fplugin_alarm.py;h=adc4d29cc79c9f31d57f7928c79a6c30ef576ba6;hb=8b16911cdbe9f76c6d381bd96ee449e87b03541f;hp=eb48208ff148bed2846d367b76052a5cb570caa0;hpb=058ddc52db38655cc3c7a621859794780ee0305b;p=osm%2FMON.git diff --git a/osm_mon/plugins/CloudWatch/plugin_alarm.py b/osm_mon/plugins/CloudWatch/plugin_alarm.py index eb48208..adc4d29 100644 --- a/osm_mon/plugins/CloudWatch/plugin_alarm.py +++ b/osm_mon/plugins/CloudWatch/plugin_alarm.py @@ -33,19 +33,16 @@ from jsmin import jsmin from connection import Connection from metric_alarms import MetricAlarm from metrics import Metrics -from kafka import KafkaConsumer -sys.path.append("../../core/message-bus") +sys.path.append("../../core/message_bus") from producer import KafkaProducer -class Plugin(): +class plugin_alarms(): """Receives Alarm info from MetricAlarm and connects with the consumer/producer""" def __init__ (self): self.conn = Connection() self.metricAlarm = MetricAlarm() self.metric = Metrics() - server = {'server': 'localhost:9092', 'topic': 'alarm_request'} - self._consumer = KafkaConsumer(server['topic'], bootstrap_servers=server['server']) - self._consumer.subscribe(['alarm_request']) + self.connection() self.producer = KafkaProducer('') #--------------------------------------------------------------------------------------------------------------------------- def connection(self): @@ -76,118 +73,108 @@ class Plugin(): return self.metric.metricsData(self.cloudwatch_conn,metric_name,period,instance_id) #--------------------------------------------------------------------------------------------------------------------------- - def consumer(self): - """Consume info from the message bus to manage alarms.""" + def alarm_calls(self,message): + """Gets the message from the common consumer""" try: - for message in self._consumer: - # Check the Functionlity that needs to be performed: topic = 'alarms'/'metrics'/'Access_Credentials' - if message.topic == "alarm_request": - log.info("Action required against: %s" % (message.topic)) + log.info("Action required against: %s" % (message.topic)) + alarm_info = json.loads(message.value) + + if message.key == "create_alarm_request": + alarm_inner_dict = alarm_info['alarm_create_request'] + metric_status = self.check_metric(alarm_inner_dict['metric_name']) + + if self.check_resource(alarm_inner_dict['resource_uuid']) == True and metric_status['status'] == True: + log.debug ("Resource and Metrics exists") + + alarm_info['alarm_create_request']['metric_name'] = metric_status['metric_name'] + #Generate a valid response message, send via producer + config_resp = self.configure_alarm(alarm_info) #alarm_info = message.value + + if config_resp == None: + log.debug("Alarm Already exists") + payload = json.dumps(config_resp) + file = open('../../core/models/create_alarm_resp.json','wb').write((payload)) + self.producer.create_alarm_response(key='create_alarm_response',message=payload,topic = 'alarm_response') + + else: + payload = json.dumps(config_resp) + file = open('../../core/models/create_alarm_resp.json','wb').write((payload)) + self.producer.create_alarm_response(key='create_alarm_response',message=payload,topic = 'alarm_response') + log.info("New alarm created with alarm info: %s", config_resp) + + else: + log.error("Resource ID doesn't exists") + + elif message.key == "acknowledge_alarm": + alarm_inner_dict = alarm_info['ack_details'] + + if self.check_resource(alarm_inner_dict['resource_uuid']) == True: alarm_info = json.loads(message.value) + #Generate a valid response message, send via producer + ack_details = self.get_ack_details(alarm_info) + payload = json.dumps(ack_details) + file = open('../../core/models/notify_alarm.json','wb').write((payload)) + self.producer.notify_alarm(key='notify_alarm',message=payload,topic = 'alarm_response') + log.info("Acknowledge sent: %s", ack_details) - if message.key == "create_alarm_request": - if alarm_info['vim_type'] == 'AWS': - alarm_inner_dict = alarm_info['alarm_create_request'] - metric_status = self.check_metric(alarm_inner_dict['metric_name']) - if self.check_resource(alarm_inner_dict['resource_uuid']) == True and metric_status['status'] == True: - log.debug ("Resource and Metrics exists") - - alarm_info['alarm_create_request']['metric_name'] = metric_status['metric_name'] - #Generate a valid response message, send via producer - config_resp = self.configure_alarm(alarm_info) #alarm_info = message.value - if config_resp == None: - log.debug("Alarm Already exists") - payload = json.dumps(config_resp) - file = open('../../core/models/create_alarm_resp.json','wb').write((payload)) - self.producer.create_alarm_response(key='create_alarm_response',message=payload,topic = 'alarm_response') - else: - payload = json.dumps(config_resp) - file = open('../../core/models/create_alarm_resp.json','wb').write((payload)) - - self.producer.create_alarm_response(key='create_alarm_response',message=payload,topic = 'alarm_response') - log.info("New alarm created with alarm info: %s", config_resp) - else: - log.error("Resource ID doesn't exists") - else: - log.error("Plugin inputs are incorrect") - - - elif message.key == "acknowledge_alarm": - alarm_inner_dict = alarm_info['ack_details'] - if alarm_info['vim_type'] == 'AWS': - if self.check_resource(alarm_inner_dict['resource_uuid']) == True: - alarm_info = json.loads(message.value) - #Generate a valid response message, send via producer - ack_details = self.get_ack_details(alarm_info) - payload = json.dumps(ack_details) - file = open('../../core/models/notify_alarm.json','wb').write((payload)) - self.producer.notify_alarm(key='notify_alarm',message=payload,topic = 'alarm_response') - log.info("Acknowledge sent: %s", ack_details) - else: - log.error("Resource ID is Incorrect") - else: - log.error(" VIM type incorrect ") - - - elif message.key == "update_alarm_request": - if alarm_info['vim_type'] == 'AWS': - alarm_inner_dict = alarm_info['alarm_update_request'] - metric_status = self.check_metric(alarm_inner_dict['metric_name']) - - if metric_status['status'] == True: - log.debug ("Resource and Metrics exists") - alarm_info['alarm_update_request']['metric_name'] = metric_status['metric_name'] - #Generate a valid response message, send via producer - update_resp = self.update_alarm_configuration(alarm_info) - if update_resp == None: - payload = json.dumps(update_resp) - file = open('../../core/models/update_alarm_resp.json','wb').write((payload)) - self.producer.update_alarm_response(key='update_alarm_response',message=payload,topic = 'alarm_response') - log.debug("Alarm Already exists") - else: - payload = json.dumps(update_resp) - file = open('../../core/models/update_alarm_resp.json','wb').write((payload)) - self.producer.update_alarm_response(key='update_alarm_response',message=payload,topic = 'alarm_response') - log.info("Alarm Updated with alarm info: %s", update_resp) - else: - log.info ("Metric Not Supported") - else: - log.error(" VIM type Incorrect ") - - elif message.key == "delete_alarm_request": - if alarm_info['vim_type'] == 'AWS': - del_info = json.loads(message.value) - #Generate a valid response message, send via producer - del_resp = self.delete_alarm(del_info) - payload = json.dumps(del_resp) - file = open('../../core/models/delete_alarm_resp.json','wb').write((payload)) - self.producer.delete_alarm_response(key='delete_alarm_response',message=payload,topic = 'alarm_response') - log.info("Alarm Deleted with alarm info: %s", del_resp) - else: - log.error(" VIM type Incorrect ") - - elif message.key == "alarm_list_request": - alarm_inner_dict = alarm_info['alarm_list_request'] - if alarm_info['vim_type'] == 'AWS': - if self.check_resource(alarm_inner_dict['resource_uuid']) == True or alarm_inner_dict['resource_uuid'] == "": - #Generate a valid response message, send via producer - list_resp = self.get_alarms_list(alarm_info)#['alarm_names'] - payload = json.dumps(list_resp) - file = open('../../core/models/list_alarm_resp.json','wb').write((payload)) - self.producer.list_alarm_response(key='list_alarm_response',message=payload,topic = 'alarm_response') - else: - log.error("Resource ID is Incorrect") - else: - log.error(" VIM type Incorrect ") - - else: - log.debug("Unknown key, no action will be performed") - else: - log.info("Message topic not relevant to this plugin: %s", - message.topic) + log.error("Resource ID is Incorrect") + + + elif message.key == "update_alarm_request": + alarm_inner_dict = alarm_info['alarm_update_request'] + metric_status = self.check_metric(alarm_inner_dict['metric_name']) + + if metric_status['status'] == True: + log.debug ("Resource and Metrics exists") + alarm_info['alarm_update_request']['metric_name'] = metric_status['metric_name'] + #Generate a valid response message, send via producer + update_resp = self.update_alarm_configuration(alarm_info) + + if update_resp == None: + payload = json.dumps(update_resp) + file = open('../../core/models/update_alarm_resp.json','wb').write((payload)) + self.producer.update_alarm_response(key='update_alarm_response',message=payload,topic = 'alarm_response') + log.debug("Alarm Already exists") + + else: + payload = json.dumps(update_resp) + file = open('../../core/models/update_alarm_resp.json','wb').write((payload)) + self.producer.update_alarm_response(key='update_alarm_response',message=payload,topic = 'alarm_response') + log.info("Alarm Updated with alarm info: %s", update_resp) + + else: + log.info ("Metric Not Supported") + + + elif message.key == "delete_alarm_request": + del_info = json.loads(message.value) + #Generate a valid response message, send via producer + del_resp = self.delete_alarm(del_info) + payload = json.dumps(del_resp) + file = open('../../core/models/delete_alarm_resp.json','wb').write((payload)) + self.producer.delete_alarm_response(key='delete_alarm_response',message=payload,topic = 'alarm_response') + log.info("Alarm Deleted with alarm info: %s", del_resp) + + + elif message.key == "alarm_list_request": + alarm_inner_dict = alarm_info['alarm_list_request'] + + if self.check_resource(alarm_inner_dict['resource_uuid']) == True or alarm_inner_dict['resource_uuid'] == "": + #Generate a valid response message, send via producer + list_resp = self.get_alarms_list(alarm_info)#['alarm_names'] + payload = json.dumps(list_resp) + file = open('../../core/models/list_alarm_resp.json','wb').write((payload)) + self.producer.list_alarm_response(key='list_alarm_response',message=payload,topic = 'alarm_response') + + else: + log.error("Resource ID is Incorrect") + + else: + log.debug("Unknown key, no action will be performed") + except Exception as e: - log.error("Consumer exception: %s", str(e)) + log.error("Message retrieval exception: %s", str(e)) #--------------------------------------------------------------------------------------------------------------------------- def check_resource(self,resource_uuid): '''Finding Resource with the resource_uuid''' @@ -198,6 +185,7 @@ class Plugin(): #resource_id for instance_id in instances: instance_id = str(instance_id).split(':')[1] + if instance_id == resource_uuid: check_resp['resource_uuid'] = resource_uuid return True @@ -215,36 +203,41 @@ class Plugin(): if metric_name == 'CPU_UTILIZATION': metric_name = 'CPUUtilization' metric_status = True + elif metric_name == 'DISK_READ_OPS': metric_name = 'DiskReadOps' metric_status = True + elif metric_name == 'DISK_WRITE_OPS': metric_name = 'DiskWriteOps' metric_status = True + elif metric_name == 'DISK_READ_BYTES': metric_name = 'DiskReadBytes' metric_status = True + elif metric_name == 'DISK_WRITE_BYTES': metric_name = 'DiskWriteBytes' metric_status = True + elif metric_name == 'PACKETS_RECEIVED': metric_name = 'NetworkPacketsIn' metric_status = True + elif metric_name == 'PACKETS_SENT': metric_name = 'NetworkPacketsOut' metric_status = True + else: metric_name = None metric_status = False check_resp['metric_name'] = metric_name #status + if metric_status == True: check_resp['status'] = True return check_resp + except Exception as e: log.error("Error in Plugin Inputs %s",str(e)) #--------------------------------------------------------------------------------------------------------------------------- - -obj = Plugin() -obj.connection() -obj.consumer()