from connection import Connection
from metric_alarms import MetricAlarm
from metrics import Metrics
-from kafka import KafkaConsumer
-sys.path.append("../../core/message-bus")
+sys.path.append("../../core/message_bus")
from producer import KafkaProducer
-class Plugin():
+class plugin_alarms():
"""Receives Alarm info from MetricAlarm and connects with the consumer/producer"""
def __init__ (self):
self.conn = Connection()
self.metricAlarm = MetricAlarm()
self.metric = Metrics()
- server = {'server': 'localhost:9092', 'topic': 'alarm_request'}
- self._consumer = KafkaConsumer(server['topic'], bootstrap_servers=server['server'])
- self._consumer.subscribe(['alarm_request'])
+ self.connection()
self.producer = KafkaProducer('')
#---------------------------------------------------------------------------------------------------------------------------
def connection(self):
return self.metric.metricsData(self.cloudwatch_conn,metric_name,period,instance_id)
#---------------------------------------------------------------------------------------------------------------------------
- def consumer(self):
- """Consume info from the message bus to manage alarms."""
+ def alarm_calls(self,message):
+ """Gets the message from the common consumer"""
try:
- for message in self._consumer:
- # Check the Functionlity that needs to be performed: topic = 'alarms'/'metrics'/'Access_Credentials'
- if message.topic == "alarm_request":
- log.info("Action required against: %s" % (message.topic))
+ log.info("Action required against: %s" % (message.topic))
+ alarm_info = json.loads(message.value)
+
+ if message.key == "create_alarm_request":
+ alarm_inner_dict = alarm_info['alarm_create_request']
+ metric_status = self.check_metric(alarm_inner_dict['metric_name'])
+
+ if self.check_resource(alarm_inner_dict['resource_uuid']) == True and metric_status['status'] == True:
+ log.debug ("Resource and Metrics exists")
+
+ alarm_info['alarm_create_request']['metric_name'] = metric_status['metric_name']
+ #Generate a valid response message, send via producer
+ config_resp = self.configure_alarm(alarm_info) #alarm_info = message.value
+
+ if config_resp == None:
+ log.debug("Alarm Already exists")
+ payload = json.dumps(config_resp)
+ file = open('../../core/models/create_alarm_resp.json','wb').write((payload))
+ self.producer.create_alarm_response(key='create_alarm_response',message=payload,topic = 'alarm_response')
+
+ else:
+ payload = json.dumps(config_resp)
+ file = open('../../core/models/create_alarm_resp.json','wb').write((payload))
+ self.producer.create_alarm_response(key='create_alarm_response',message=payload,topic = 'alarm_response')
+ log.info("New alarm created with alarm info: %s", config_resp)
+
+ else:
+ log.error("Resource ID doesn't exists")
+
+ elif message.key == "acknowledge_alarm":
+ alarm_inner_dict = alarm_info['ack_details']
+
+ if self.check_resource(alarm_inner_dict['resource_uuid']) == True:
alarm_info = json.loads(message.value)
+ #Generate a valid response message, send via producer
+ ack_details = self.get_ack_details(alarm_info)
+ payload = json.dumps(ack_details)
+ file = open('../../core/models/notify_alarm.json','wb').write((payload))
+ self.producer.notify_alarm(key='notify_alarm',message=payload,topic = 'alarm_response')
+ log.info("Acknowledge sent: %s", ack_details)
- if message.key == "create_alarm_request":
- if alarm_info['vim_type'] == 'AWS':
- alarm_inner_dict = alarm_info['alarm_create_request']
- metric_status = self.check_metric(alarm_inner_dict['metric_name'])
- if self.check_resource(alarm_inner_dict['resource_uuid']) == True and metric_status['status'] == True:
- log.debug ("Resource and Metrics exists")
-
- alarm_info['alarm_create_request']['metric_name'] = metric_status['metric_name']
- #Generate a valid response message, send via producer
- config_resp = self.configure_alarm(alarm_info) #alarm_info = message.value
- if config_resp == None:
- log.debug("Alarm Already exists")
- payload = json.dumps(config_resp)
- file = open('../../core/models/create_alarm_resp.json','wb').write((payload))
- self.producer.create_alarm_response(key='create_alarm_response',message=payload,topic = 'alarm_response')
- else:
- payload = json.dumps(config_resp)
- file = open('../../core/models/create_alarm_resp.json','wb').write((payload))
-
- self.producer.create_alarm_response(key='create_alarm_response',message=payload,topic = 'alarm_response')
- log.info("New alarm created with alarm info: %s", config_resp)
- else:
- log.error("Resource ID doesn't exists")
- else:
- log.error("Plugin inputs are incorrect")
-
-
- elif message.key == "acknowledge_alarm":
- alarm_inner_dict = alarm_info['ack_details']
- if alarm_info['vim_type'] == 'AWS':
- if self.check_resource(alarm_inner_dict['resource_uuid']) == True:
- alarm_info = json.loads(message.value)
- #Generate a valid response message, send via producer
- ack_details = self.get_ack_details(alarm_info)
- payload = json.dumps(ack_details)
- file = open('../../core/models/notify_alarm.json','wb').write((payload))
- self.producer.notify_alarm(key='notify_alarm',message=payload,topic = 'alarm_response')
- log.info("Acknowledge sent: %s", ack_details)
- else:
- log.error("Resource ID is Incorrect")
- else:
- log.error(" VIM type incorrect ")
-
-
- elif message.key == "update_alarm_request":
- if alarm_info['vim_type'] == 'AWS':
- alarm_inner_dict = alarm_info['alarm_update_request']
- metric_status = self.check_metric(alarm_inner_dict['metric_name'])
-
- if metric_status['status'] == True:
- log.debug ("Resource and Metrics exists")
- alarm_info['alarm_update_request']['metric_name'] = metric_status['metric_name']
- #Generate a valid response message, send via producer
- update_resp = self.update_alarm_configuration(alarm_info)
- if update_resp == None:
- payload = json.dumps(update_resp)
- file = open('../../core/models/update_alarm_resp.json','wb').write((payload))
- self.producer.update_alarm_response(key='update_alarm_response',message=payload,topic = 'alarm_response')
- log.debug("Alarm Already exists")
- else:
- payload = json.dumps(update_resp)
- file = open('../../core/models/update_alarm_resp.json','wb').write((payload))
- self.producer.update_alarm_response(key='update_alarm_response',message=payload,topic = 'alarm_response')
- log.info("Alarm Updated with alarm info: %s", update_resp)
- else:
- log.info ("Metric Not Supported")
- else:
- log.error(" VIM type Incorrect ")
-
- elif message.key == "delete_alarm_request":
- if alarm_info['vim_type'] == 'AWS':
- del_info = json.loads(message.value)
- #Generate a valid response message, send via producer
- del_resp = self.delete_alarm(del_info)
- payload = json.dumps(del_resp)
- file = open('../../core/models/delete_alarm_resp.json','wb').write((payload))
- self.producer.delete_alarm_response(key='delete_alarm_response',message=payload,topic = 'alarm_response')
- log.info("Alarm Deleted with alarm info: %s", del_resp)
- else:
- log.error(" VIM type Incorrect ")
-
- elif message.key == "alarm_list_request":
- alarm_inner_dict = alarm_info['alarm_list_request']
- if alarm_info['vim_type'] == 'AWS':
- if self.check_resource(alarm_inner_dict['resource_uuid']) == True or alarm_inner_dict['resource_uuid'] == "":
- #Generate a valid response message, send via producer
- list_resp = self.get_alarms_list(alarm_info)#['alarm_names']
- payload = json.dumps(list_resp)
- file = open('../../core/models/list_alarm_resp.json','wb').write((payload))
- self.producer.list_alarm_response(key='list_alarm_response',message=payload,topic = 'alarm_response')
- else:
- log.error("Resource ID is Incorrect")
- else:
- log.error(" VIM type Incorrect ")
-
- else:
- log.debug("Unknown key, no action will be performed")
-
else:
- log.info("Message topic not relevant to this plugin: %s",
- message.topic)
+ log.error("Resource ID is Incorrect")
+
+
+ elif message.key == "update_alarm_request":
+ alarm_inner_dict = alarm_info['alarm_update_request']
+ metric_status = self.check_metric(alarm_inner_dict['metric_name'])
+
+ if metric_status['status'] == True:
+ log.debug ("Resource and Metrics exists")
+ alarm_info['alarm_update_request']['metric_name'] = metric_status['metric_name']
+ #Generate a valid response message, send via producer
+ update_resp = self.update_alarm_configuration(alarm_info)
+
+ if update_resp == None:
+ payload = json.dumps(update_resp)
+ file = open('../../core/models/update_alarm_resp.json','wb').write((payload))
+ self.producer.update_alarm_response(key='update_alarm_response',message=payload,topic = 'alarm_response')
+ log.debug("Alarm Already exists")
+
+ else:
+ payload = json.dumps(update_resp)
+ file = open('../../core/models/update_alarm_resp.json','wb').write((payload))
+ self.producer.update_alarm_response(key='update_alarm_response',message=payload,topic = 'alarm_response')
+ log.info("Alarm Updated with alarm info: %s", update_resp)
+
+ else:
+ log.info ("Metric Not Supported")
+
+
+ elif message.key == "delete_alarm_request":
+ del_info = json.loads(message.value)
+ #Generate a valid response message, send via producer
+ del_resp = self.delete_alarm(del_info)
+ payload = json.dumps(del_resp)
+ file = open('../../core/models/delete_alarm_resp.json','wb').write((payload))
+ self.producer.delete_alarm_response(key='delete_alarm_response',message=payload,topic = 'alarm_response')
+ log.info("Alarm Deleted with alarm info: %s", del_resp)
+
+
+ elif message.key == "alarm_list_request":
+ alarm_inner_dict = alarm_info['alarm_list_request']
+
+ if self.check_resource(alarm_inner_dict['resource_uuid']) == True or alarm_inner_dict['resource_uuid'] == "":
+ #Generate a valid response message, send via producer
+ list_resp = self.get_alarms_list(alarm_info)#['alarm_names']
+ payload = json.dumps(list_resp)
+ file = open('../../core/models/list_alarm_resp.json','wb').write((payload))
+ self.producer.list_alarm_response(key='list_alarm_response',message=payload,topic = 'alarm_response')
+
+ else:
+ log.error("Resource ID is Incorrect")
+
+ else:
+ log.debug("Unknown key, no action will be performed")
+
except Exception as e:
- log.error("Consumer exception: %s", str(e))
+ log.error("Message retrieval exception: %s", str(e))
#---------------------------------------------------------------------------------------------------------------------------
def check_resource(self,resource_uuid):
'''Finding Resource with the resource_uuid'''
#resource_id
for instance_id in instances:
instance_id = str(instance_id).split(':')[1]
+
if instance_id == resource_uuid:
check_resp['resource_uuid'] = resource_uuid
return True
if metric_name == 'CPU_UTILIZATION':
metric_name = 'CPUUtilization'
metric_status = True
+
elif metric_name == 'DISK_READ_OPS':
metric_name = 'DiskReadOps'
metric_status = True
+
elif metric_name == 'DISK_WRITE_OPS':
metric_name = 'DiskWriteOps'
metric_status = True
+
elif metric_name == 'DISK_READ_BYTES':
metric_name = 'DiskReadBytes'
metric_status = True
+
elif metric_name == 'DISK_WRITE_BYTES':
metric_name = 'DiskWriteBytes'
metric_status = True
+
elif metric_name == 'PACKETS_RECEIVED':
metric_name = 'NetworkPacketsIn'
metric_status = True
+
elif metric_name == 'PACKETS_SENT':
metric_name = 'NetworkPacketsOut'
metric_status = True
+
else:
metric_name = None
metric_status = False
check_resp['metric_name'] = metric_name
#status
+
if metric_status == True:
check_resp['status'] = True
return check_resp
+
except Exception as e:
log.error("Error in Plugin Inputs %s",str(e))
#---------------------------------------------------------------------------------------------------------------------------
-
-obj = Plugin()
-obj.connection()
-obj.consumer()