import os
sys.path.append("/root/MON")
+sys.path.append("../../plugins/CloudWatch")
logging.basicConfig(filename='MON_plugins.log',
format='%(asctime)s %(message)s',
from osm_mon.plugins.OpenStack.common import Common
from osm_mon.plugins.OpenStack.Gnocchi import metrics
+from plugin_alarm import plugin_alarms
+from plugin_metric import plugin_metrics
# Initialize servers
server = {'server': 'localhost:9092'}
# Initialize consumers for alarms and metrics
-common_consumer = KafkaConsumer(group_id='osm_mon',
- bootstrap_servers=server['server'])
+common_consumer = KafkaConsumer(bootstrap_servers=server['server'])
# Create OpenStack alarming and metric instances
auth_token = None
openstack_metrics = metrics.Metrics()
openstack_alarms = alarming.Alarming()
+# Create CloudWatch alarm and metric instances
+cloudwatch_alarms = plugin_alarms()
+cloudwatch_metrics = plugin_metrics()
def get_vim_type(message):
"""Get the vim type that is required by the message."""
if message.topic == "metric_request":
# Check the vim desired by the message
vim_type = get_vim_type(message)
+
if vim_type == "openstack":
log.info("This message is for the OpenStack plugin.")
openstack_metrics.metric_calls(
message, openstack_auth, auth_token)
- elif vim_type == "cloudwatch":
+ elif vim_type == "aws":
+ cloudwatch_metrics.metric_calls(message)
log.info("This message is for the CloudWatch plugin.")
elif vim_type == "vrops":
log.info("This message is for the vROPs plugin.")
- else:
+ else:
log.debug("vim_type is misconfigured or unsupported; %s",
vim_type)
log.info("This message is for the OpenStack plugin.")
openstack_alarms.alarming(message, openstack_auth, auth_token)
- elif vim_type == "cloudwatch":
+ elif vim_type == "aws":
+ cloudwatch_alarms.alarm_calls(message)
log.info("This message is for the CloudWatch plugin.")
elif vim_type == "vrops":
log.info("This message is for the OpenStack plugin.")
auth_token = openstack_auth._authenticate(message=message)
- elif vim_type == "cloudwatch":
+ elif vim_type == "aws":
+ #TODO Access credentials later
log.info("This message is for the CloudWatch plugin.")
elif vim_type == "vrops":
from connection import Connection
from metric_alarms import MetricAlarm
from metrics import Metrics
-from kafka import KafkaConsumer
-sys.path.append("../../core/message-bus")
+sys.path.append("../../core/message_bus")
from producer import KafkaProducer
-class Plugin():
+class plugin_alarms():
"""Receives Alarm info from MetricAlarm and connects with the consumer/producer"""
def __init__ (self):
self.conn = Connection()
self.metricAlarm = MetricAlarm()
self.metric = Metrics()
- server = {'server': 'localhost:9092', 'topic': 'alarm_request'}
- self._consumer = KafkaConsumer(server['topic'], bootstrap_servers=server['server'])
- self._consumer.subscribe(['alarm_request'])
+ self.connection()
self.producer = KafkaProducer('')
#---------------------------------------------------------------------------------------------------------------------------
def connection(self):
return self.metric.metricsData(self.cloudwatch_conn,metric_name,period,instance_id)
#---------------------------------------------------------------------------------------------------------------------------
- def consumer(self):
- """Consume info from the message bus to manage alarms."""
+ def alarm_calls(self,message):
+ """Gets the message from the common consumer"""
try:
- for message in self._consumer:
- # Check the Functionlity that needs to be performed: topic = 'alarms'/'metrics'/'Access_Credentials'
- if message.topic == "alarm_request":
- log.info("Action required against: %s" % (message.topic))
+ log.info("Action required against: %s" % (message.topic))
+ alarm_info = json.loads(message.value)
+
+ if message.key == "create_alarm_request":
+ alarm_inner_dict = alarm_info['alarm_create_request']
+ metric_status = self.check_metric(alarm_inner_dict['metric_name'])
+
+ if self.check_resource(alarm_inner_dict['resource_uuid']) == True and metric_status['status'] == True:
+ log.debug ("Resource and Metrics exists")
+
+ alarm_info['alarm_create_request']['metric_name'] = metric_status['metric_name']
+ #Generate a valid response message, send via producer
+ config_resp = self.configure_alarm(alarm_info) #alarm_info = message.value
+
+ if config_resp == None:
+ log.debug("Alarm Already exists")
+ payload = json.dumps(config_resp)
+ file = open('../../core/models/create_alarm_resp.json','wb').write((payload))
+ self.producer.create_alarm_response(key='create_alarm_response',message=payload,topic = 'alarm_response')
+
+ else:
+ payload = json.dumps(config_resp)
+ file = open('../../core/models/create_alarm_resp.json','wb').write((payload))
+ self.producer.create_alarm_response(key='create_alarm_response',message=payload,topic = 'alarm_response')
+ log.info("New alarm created with alarm info: %s", config_resp)
+
+ else:
+ log.error("Resource ID doesn't exists")
+
+ elif message.key == "acknowledge_alarm":
+ alarm_inner_dict = alarm_info['ack_details']
+
+ if self.check_resource(alarm_inner_dict['resource_uuid']) == True:
alarm_info = json.loads(message.value)
+ #Generate a valid response message, send via producer
+ ack_details = self.get_ack_details(alarm_info)
+ payload = json.dumps(ack_details)
+ file = open('../../core/models/notify_alarm.json','wb').write((payload))
+ self.producer.notify_alarm(key='notify_alarm',message=payload,topic = 'alarm_response')
+ log.info("Acknowledge sent: %s", ack_details)
- if message.key == "create_alarm_request":
- if alarm_info['vim_type'] == 'AWS':
- alarm_inner_dict = alarm_info['alarm_create_request']
- metric_status = self.check_metric(alarm_inner_dict['metric_name'])
- if self.check_resource(alarm_inner_dict['resource_uuid']) == True and metric_status['status'] == True:
- log.debug ("Resource and Metrics exists")
-
- alarm_info['alarm_create_request']['metric_name'] = metric_status['metric_name']
- #Generate a valid response message, send via producer
- config_resp = self.configure_alarm(alarm_info) #alarm_info = message.value
- if config_resp == None:
- log.debug("Alarm Already exists")
- payload = json.dumps(config_resp)
- file = open('../../core/models/create_alarm_resp.json','wb').write((payload))
- self.producer.create_alarm_response(key='create_alarm_response',message=payload,topic = 'alarm_response')
- else:
- payload = json.dumps(config_resp)
- file = open('../../core/models/create_alarm_resp.json','wb').write((payload))
-
- self.producer.create_alarm_response(key='create_alarm_response',message=payload,topic = 'alarm_response')
- log.info("New alarm created with alarm info: %s", config_resp)
- else:
- log.error("Resource ID doesn't exists")
- else:
- log.error("Plugin inputs are incorrect")
-
-
- elif message.key == "acknowledge_alarm":
- alarm_inner_dict = alarm_info['ack_details']
- if alarm_info['vim_type'] == 'AWS':
- if self.check_resource(alarm_inner_dict['resource_uuid']) == True:
- alarm_info = json.loads(message.value)
- #Generate a valid response message, send via producer
- ack_details = self.get_ack_details(alarm_info)
- payload = json.dumps(ack_details)
- file = open('../../core/models/notify_alarm.json','wb').write((payload))
- self.producer.notify_alarm(key='notify_alarm',message=payload,topic = 'alarm_response')
- log.info("Acknowledge sent: %s", ack_details)
- else:
- log.error("Resource ID is Incorrect")
- else:
- log.error(" VIM type incorrect ")
-
-
- elif message.key == "update_alarm_request":
- if alarm_info['vim_type'] == 'AWS':
- alarm_inner_dict = alarm_info['alarm_update_request']
- metric_status = self.check_metric(alarm_inner_dict['metric_name'])
-
- if metric_status['status'] == True:
- log.debug ("Resource and Metrics exists")
- alarm_info['alarm_update_request']['metric_name'] = metric_status['metric_name']
- #Generate a valid response message, send via producer
- update_resp = self.update_alarm_configuration(alarm_info)
- if update_resp == None:
- payload = json.dumps(update_resp)
- file = open('../../core/models/update_alarm_resp.json','wb').write((payload))
- self.producer.update_alarm_response(key='update_alarm_response',message=payload,topic = 'alarm_response')
- log.debug("Alarm Already exists")
- else:
- payload = json.dumps(update_resp)
- file = open('../../core/models/update_alarm_resp.json','wb').write((payload))
- self.producer.update_alarm_response(key='update_alarm_response',message=payload,topic = 'alarm_response')
- log.info("Alarm Updated with alarm info: %s", update_resp)
- else:
- log.info ("Metric Not Supported")
- else:
- log.error(" VIM type Incorrect ")
-
- elif message.key == "delete_alarm_request":
- if alarm_info['vim_type'] == 'AWS':
- del_info = json.loads(message.value)
- #Generate a valid response message, send via producer
- del_resp = self.delete_alarm(del_info)
- payload = json.dumps(del_resp)
- file = open('../../core/models/delete_alarm_resp.json','wb').write((payload))
- self.producer.delete_alarm_response(key='delete_alarm_response',message=payload,topic = 'alarm_response')
- log.info("Alarm Deleted with alarm info: %s", del_resp)
- else:
- log.error(" VIM type Incorrect ")
-
- elif message.key == "alarm_list_request":
- alarm_inner_dict = alarm_info['alarm_list_request']
- if alarm_info['vim_type'] == 'AWS':
- if self.check_resource(alarm_inner_dict['resource_uuid']) == True or alarm_inner_dict['resource_uuid'] == "":
- #Generate a valid response message, send via producer
- list_resp = self.get_alarms_list(alarm_info)#['alarm_names']
- payload = json.dumps(list_resp)
- file = open('../../core/models/list_alarm_resp.json','wb').write((payload))
- self.producer.list_alarm_response(key='list_alarm_response',message=payload,topic = 'alarm_response')
- else:
- log.error("Resource ID is Incorrect")
- else:
- log.error(" VIM type Incorrect ")
-
- else:
- log.debug("Unknown key, no action will be performed")
-
else:
- log.info("Message topic not relevant to this plugin: %s",
- message.topic)
+ log.error("Resource ID is Incorrect")
+
+
+ elif message.key == "update_alarm_request":
+ alarm_inner_dict = alarm_info['alarm_update_request']
+ metric_status = self.check_metric(alarm_inner_dict['metric_name'])
+
+ if metric_status['status'] == True:
+ log.debug ("Resource and Metrics exists")
+ alarm_info['alarm_update_request']['metric_name'] = metric_status['metric_name']
+ #Generate a valid response message, send via producer
+ update_resp = self.update_alarm_configuration(alarm_info)
+
+ if update_resp == None:
+ payload = json.dumps(update_resp)
+ file = open('../../core/models/update_alarm_resp.json','wb').write((payload))
+ self.producer.update_alarm_response(key='update_alarm_response',message=payload,topic = 'alarm_response')
+ log.debug("Alarm Already exists")
+
+ else:
+ payload = json.dumps(update_resp)
+ file = open('../../core/models/update_alarm_resp.json','wb').write((payload))
+ self.producer.update_alarm_response(key='update_alarm_response',message=payload,topic = 'alarm_response')
+ log.info("Alarm Updated with alarm info: %s", update_resp)
+
+ else:
+ log.info ("Metric Not Supported")
+
+
+ elif message.key == "delete_alarm_request":
+ del_info = json.loads(message.value)
+ #Generate a valid response message, send via producer
+ del_resp = self.delete_alarm(del_info)
+ payload = json.dumps(del_resp)
+ file = open('../../core/models/delete_alarm_resp.json','wb').write((payload))
+ self.producer.delete_alarm_response(key='delete_alarm_response',message=payload,topic = 'alarm_response')
+ log.info("Alarm Deleted with alarm info: %s", del_resp)
+
+
+ elif message.key == "alarm_list_request":
+ alarm_inner_dict = alarm_info['alarm_list_request']
+
+ if self.check_resource(alarm_inner_dict['resource_uuid']) == True or alarm_inner_dict['resource_uuid'] == "":
+ #Generate a valid response message, send via producer
+ list_resp = self.get_alarms_list(alarm_info)#['alarm_names']
+ payload = json.dumps(list_resp)
+ file = open('../../core/models/list_alarm_resp.json','wb').write((payload))
+ self.producer.list_alarm_response(key='list_alarm_response',message=payload,topic = 'alarm_response')
+
+ else:
+ log.error("Resource ID is Incorrect")
+
+ else:
+ log.debug("Unknown key, no action will be performed")
+
except Exception as e:
- log.error("Consumer exception: %s", str(e))
+ log.error("Message retrieval exception: %s", str(e))
#---------------------------------------------------------------------------------------------------------------------------
def check_resource(self,resource_uuid):
'''Finding Resource with the resource_uuid'''
#resource_id
for instance_id in instances:
instance_id = str(instance_id).split(':')[1]
+
if instance_id == resource_uuid:
check_resp['resource_uuid'] = resource_uuid
return True
if metric_name == 'CPU_UTILIZATION':
metric_name = 'CPUUtilization'
metric_status = True
+
elif metric_name == 'DISK_READ_OPS':
metric_name = 'DiskReadOps'
metric_status = True
+
elif metric_name == 'DISK_WRITE_OPS':
metric_name = 'DiskWriteOps'
metric_status = True
+
elif metric_name == 'DISK_READ_BYTES':
metric_name = 'DiskReadBytes'
metric_status = True
+
elif metric_name == 'DISK_WRITE_BYTES':
metric_name = 'DiskWriteBytes'
metric_status = True
+
elif metric_name == 'PACKETS_RECEIVED':
metric_name = 'NetworkPacketsIn'
metric_status = True
+
elif metric_name == 'PACKETS_SENT':
metric_name = 'NetworkPacketsOut'
metric_status = True
+
else:
metric_name = None
metric_status = False
check_resp['metric_name'] = metric_name
#status
+
if metric_status == True:
check_resp['status'] = True
return check_resp
+
except Exception as e:
log.error("Error in Plugin Inputs %s",str(e))
#---------------------------------------------------------------------------------------------------------------------------
-
-obj = Plugin()
-obj.connection()
-obj.consumer()
--- /dev/null
+##
+# Copyright 2017 xFlow Research Pvt. Ltd
+# This file is part of MON module
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: wajeeha.hamid@xflowresearch.com
+##
+
+'''
+AWS-Plugin implements all the methods of MON to interact with AWS using the BOTO client
+'''
+
+__author__ = "Wajeeha Hamid"
+__date__ = "18-September-2017"
+
+import sys
+import json
+from connection import Connection
+from metric_alarms import MetricAlarm
+from metrics import Metrics
+sys.path.append("../../core/message_bus")
+from producer import KafkaProducer
+import logging as log
+
+class plugin_metrics():
+ """Receives Alarm info from MetricAlarm and connects with the consumer/producer """
+ def __init__ (self):
+ self.conn = Connection()
+ self.metric = Metrics()
+ self.producer = KafkaProducer('')
+ self.connection()
+#---------------------------------------------------------------------------------------------------------------------------
+ def connection(self):
+ try:
+ """Connecting instances with CloudWatch"""
+ self.conn.setEnvironment()
+ self.conn = self.conn.connection_instance()
+ self.cloudwatch_conn = self.conn['cloudwatch_connection']
+ self.ec2_conn = self.conn['ec2_connection']
+
+ except Exception as e:
+ log.error("Failed to Connect with AWS %s: " + str(e))
+#---------------------------------------------------------------------------------------------------------------------------
+ def create_metric_request(self,metric_info):
+ '''Comaptible API using normalized parameters'''
+ metric_resp = self.metric.createMetrics(self.cloudwatch_conn,metric_info)
+ return metric_resp
+#---------------------------------------------------------------------------------------------------------------------------
+ def update_metric_request(self,updated_info):
+ '''Comaptible API using normalized parameters'''
+ update_resp = self.metric.updateMetrics(self.cloudwatch_conn,updated_info)
+ return update_resp
+#---------------------------------------------------------------------------------------------------------------------------
+ def delete_metric_request(self,delete_info):
+ '''Comaptible API using normalized parameters'''
+ del_resp = self.metric.deleteMetrics(self.cloudwatch_conn,delete_info)
+ return del_resp
+#---------------------------------------------------------------------------------------------------------------------------
+ def list_metrics_request(self,list_info):
+ '''Comaptible API using normalized parameters'''
+ list_resp = self.metric.listMetrics(self.cloudwatch_conn,list_info)
+ return list_resp
+#---------------------------------------------------------------------------------------------------------------------------
+ def read_metrics_data(self,list_info):
+ '''Comaptible API using normalized parameters
+ Read all metric data related to a specified metric'''
+ data_resp=self.metric.metricsData(self.cloudwatch_conn,list_info)
+ return data_resp
+#---------------------------------------------------------------------------------------------------------------------------
+
+ def metric_calls(self,message):
+ '''Consumer will consume the message from SO,
+ 1) parse the message and trigger the methods ac
+ cording to keys and topics provided in request.
+
+ 2) The response from plugin is saved in json format.
+
+ 3) The producer object then calls the producer response
+ methods to send the response back to message bus
+ '''
+
+ try:
+ metric_info = json.loads(message.value)
+ metric_response = dict()
+
+ if metric_info['vim_type'] == 'AWS':
+ log.debug ("VIM support : AWS")
+
+ # Check the Functionlity that needs to be performed: topic = 'alarms'/'metrics'/'Access_Credentials'
+ if message.topic == "metric_request":
+ log.info("Action required against: %s" % (message.topic))
+
+ if message.key == "create_metric_request":
+ if self.check_resource(metric_info['metric_create']['resource_uuid']) == True:
+ metric_resp = self.create_metric_request(metric_info['metric_create']) #alarm_info = message.value
+ metric_response['schema_version'] = metric_info['schema_version']
+ metric_response['schema_type'] = "create_metric_response"
+ metric_response['metric_create_response'] = metric_resp
+ payload = json.dumps(metric_response)
+ file = open('../../core/models/create_metric_resp.json','wb').write((payload))
+ self.producer.create_metrics_resp(key='create_metric_response',message=payload,topic = 'metric_response')
+
+ log.info("Metric configured: %s", metric_resp)
+ return metric_response
+
+ elif message.key == "update_metric_request":
+ if self.check_resource(metric_info['metric_create']['resource_uuid']) == True:
+ update_resp = self.update_metric_request(metric_info['metric_create'])
+ metric_response['schema_version'] = metric_info['schema_version']
+ metric_response['schema_type'] = "update_metric_response"
+ metric_response['metric_update_response'] = update_resp
+ payload = json.dumps(metric_response)
+ file = open('../../core/models/update_metric_resp.json','wb').write((payload))
+ self.producer.update_metric_response(key='update_metric_response',message=payload,topic = 'metric_response')
+
+ log.info("Metric Updates: %s",metric_response)
+ return metric_response
+
+ elif message.key == "delete_metric_request":
+ if self.check_resource(metric_info['resource_uuid']) == True:
+ del_resp=self.delete_metric_request(metric_info)
+ payload = json.dumps(del_resp)
+ file = open('../../core/models/delete_metric_resp.json','wb').write((payload))
+ self.producer.delete_metric_response(key='delete_metric_response',message=payload,topic = 'metric_response')
+
+ log.info("Metric Deletion Not supported in AWS : %s",del_resp)
+ return del_resp
+
+ elif message.key == "list_metric_request":
+ if self.check_resource(metric_info['metrics_list_request']['resource_uuid']) == True:
+ list_resp = self.list_metrics_request(metric_info['metrics_list_request'])
+ metric_response['schema_version'] = metric_info['schema_version']
+ metric_response['schema_type'] = "list_metric_response"
+ metric_response['correlation_id'] = metric_info['metrics_list_request']['correlation_id']
+ metric_response['vim_type'] = metric_info['vim_type']
+ metric_response['metrics_list'] = list_resp
+ payload = json.dumps(metric_response)
+ file = open('../../core/models/list_metric_resp.json','wb').write((payload))
+ self.producer.list_metric_response(key='list_metrics_response',message=payload,topic = 'metric_response')
+
+ log.info("Metric List: %s",metric_response)
+ return metric_response
+
+ elif message.key == "read_metric_data_request":
+ if self.check_resource(metric_info['resource_uuid']) == True:
+ data_resp = self.read_metrics_data(metric_info)
+ metric_response['schema_version'] = metric_info['schema_version']
+ metric_response['schema_type'] = "read_metric_data_response"
+ metric_response['metric_name'] = metric_info['metric_name']
+ metric_response['metric_uuid'] = metric_info['metric_uuid']
+ metric_response['correlation_id'] = metric_info['correlation_uuid']
+ metric_response['resource_uuid'] = metric_info['resource_uuid']
+ metric_response['tenant_uuid'] = metric_info['tenant_uuid']
+ metric_response['metrics_data'] = data_resp
+ payload = json.dumps(metric_response)
+ file = open('../../core/models/read_metric_data_resp.json','wb').write((payload))
+ self.producer.read_metric_data_response(key='read_metric_data_response',message=payload,topic = 'metric_response')
+
+ log.info("Metric Data Response: %s",metric_response)
+ return metric_response
+
+ else:
+ log.debug("Unknown key, no action will be performed")
+ else:
+ log.info("Message topic not relevant to this plugin: %s",
+ message.topic)
+
+ except Exception as e:
+ log.error("Consumer exception: %s", str(e))
+
+#---------------------------------------------------------------------------------------------------------------------------
+ def check_resource(self,resource_uuid):
+
+ '''Checking the resource_uuid is present in EC2 instances'''
+ try:
+ check_resp = dict()
+ instances = self.ec2_conn.get_all_instance_status()
+ status_resource = False
+
+ #resource_id
+ for instance_id in instances:
+ instance_id = str(instance_id).split(':')[1]
+ if instance_id == resource_uuid:
+ check_resp['resource_uuid'] = resource_uuid
+ status_resource = True
+ else:
+ status_resource = False
+
+ #status
+ return status_resource
+
+ except Exception as e:
+ log.error("Error in Plugin Inputs %s",str(e))
+#---------------------------------------------------------------------------------------------------------------------------
+
+++ /dev/null
-##
-# Copyright 2017 xFlow Research Pvt. Ltd
-# This file is part of MON module
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-# For those usages not covered by the Apache License, Version 2.0 please
-# contact with: wajeeha.hamid@xflowresearch.com
-##
-
-'''
-AWS-Plugin implements all the methods of MON to interact with AWS using the BOTO client
-'''
-
-__author__ = "Wajeeha Hamid"
-__date__ = "18-September-2017"
-
-import sys
-import json
-from connection import Connection
-from metric_alarms import MetricAlarm
-from metrics import Metrics
-sys.path.append("../../core/message_bus")
-from producer import KafkaProducer
-from kafka import KafkaConsumer
-import logging as log
-
-class plugin_metrics():
- """Receives Alarm info from MetricAlarm and connects with the consumer/producer """
- def __init__ (self):
- self.conn = Connection()
- self.metric = Metrics()
-
- #server = {'server': 'localhost:9092', 'topic': 'metrics_request'}
- #Initialize a Consumer object to consume message from the SO
- self._consumer = KafkaConsumer(bootstrap_servers='localhost:9092')
- self._consumer.subscribe(['metric_request'])
-
- #producer = KafkaProducer('create_metric_request')
-
- self.producer = KafkaProducer('')
-#---------------------------------------------------------------------------------------------------------------------------
- def connection(self):
- try:
- """Connecting instances with CloudWatch"""
- self.conn.setEnvironment()
- self.conn = self.conn.connection_instance()
- self.cloudwatch_conn = self.conn['cloudwatch_connection']
- self.ec2_conn = self.conn['ec2_connection']
-
- except Exception as e:
- log.error("Failed to Connect with AWS %s: " + str(e))
-#---------------------------------------------------------------------------------------------------------------------------
- def create_metric_request(self,metric_info):
- '''Comaptible API using normalized parameters'''
- metric_resp = self.metric.createMetrics(self.cloudwatch_conn,metric_info)
- return metric_resp
-#---------------------------------------------------------------------------------------------------------------------------
- def update_metric_request(self,updated_info):
- '''Comaptible API using normalized parameters'''
- update_resp = self.metric.updateMetrics(self.cloudwatch_conn,updated_info)
- return update_resp
-#---------------------------------------------------------------------------------------------------------------------------
- def delete_metric_request(self,delete_info):
- '''Comaptible API using normalized parameters'''
- del_resp = self.metric.deleteMetrics(self.cloudwatch_conn,delete_info)
- return del_resp
-#---------------------------------------------------------------------------------------------------------------------------
- def list_metrics_request(self,list_info):
- '''Comaptible API using normalized parameters'''
- list_resp = self.metric.listMetrics(self.cloudwatch_conn,list_info)
- return list_resp
-#---------------------------------------------------------------------------------------------------------------------------
- def read_metrics_data(self,list_info):
- '''Comaptible API using normalized parameters
- Read all metric data related to a specified metric'''
- data_resp=self.metric.metricsData(self.cloudwatch_conn,list_info)
- return data_resp
-#---------------------------------------------------------------------------------------------------------------------------
-
- def consumer(self):
- '''Consumer will consume the message from SO,
- 1) parse the message and trigger the methods ac
- cording to keys and topics provided in request.
-
- 2) The response from plugin is saved in json format.
-
- 3) The producer object then calls the producer response
- methods to send the response back to message bus
- '''
-
- try:
- for message in self._consumer:
- metric_info = json.loads(message.value)
- print metric_info
- metric_response = dict()
-
- if metric_info['vim_type'] == 'AWS':
- log.debug ("VIM support : AWS")
-
- # Check the Functionlity that needs to be performed: topic = 'alarms'/'metrics'/'Access_Credentials'
- if message.topic == "metric_request":
- log.info("Action required against: %s" % (message.topic))
-
- if message.key == "create_metric_request":
- if self.check_resource(metric_info['metric_create']['resource_uuid']) == True:
- metric_resp = self.create_metric_request(metric_info['metric_create']) #alarm_info = message.value
- metric_response['schema_version'] = metric_info['schema_version']
- metric_response['schema_type'] = "create_metric_response"
- metric_response['metric_create_response'] = metric_resp
- payload = json.dumps(metric_response)
- file = open('../../core/models/create_metric_resp.json','wb').write((payload))
- self.producer.create_metrics_resp(key='create_metric_response',message=payload,topic = 'metric_response')
-
- log.info("Metric configured: %s", metric_resp)
- return metric_response
-
- elif message.key == "update_metric_request":
- if self.check_resource(metric_info['metric_create']['resource_uuid']) == True:
- update_resp = self.update_metric_request(metric_info['metric_create'])
- metric_response['schema_version'] = metric_info['schema_version']
- metric_response['schema_type'] = "update_metric_response"
- metric_response['metric_update_response'] = update_resp
- payload = json.dumps(metric_response)
- print payload
- file = open('../../core/models/update_metric_resp.json','wb').write((payload))
- self.producer.update_metric_response(key='update_metric_response',message=payload,topic = 'metric_response')
-
- log.info("Metric Updates: %s",metric_response)
- return metric_response
-
- elif message.key == "delete_metric_request":
- if self.check_resource(metric_info['resource_uuid']) == True:
- del_resp=self.delete_metric_request(metric_info)
- payload = json.dumps(del_resp)
- file = open('../../core/models/delete_metric_resp.json','wb').write((payload))
- self.producer.delete_metric_response(key='delete_metric_response',message=payload,topic = 'metric_response')
-
- log.info("Metric Deletion Not supported in AWS : %s",del_resp)
- return del_resp
-
- elif message.key == "list_metric_request":
- if self.check_resource(metric_info['metrics_list_request']['resource_uuid']) == True:
- list_resp = self.list_metrics_request(metric_info['metrics_list_request'])
- metric_response['schema_version'] = metric_info['schema_version']
- metric_response['schema_type'] = "list_metric_response"
- metric_response['correlation_id'] = metric_info['metrics_list_request']['correlation_id']
- metric_response['vim_type'] = metric_info['vim_type']
- metric_response['metrics_list'] = list_resp
- payload = json.dumps(metric_response)
- file = open('../../core/models/list_metric_resp.json','wb').write((payload))
- self.producer.list_metric_response(key='list_metrics_response',message=payload,topic = 'metric_response')
-
- log.info("Metric List: %s",metric_response)
- return metric_response
-
- elif message.key == "read_metric_data_request":
- if self.check_resource(metric_info['resource_uuid']) == True:
- data_resp = self.read_metrics_data(metric_info)
- metric_response['schema_version'] = metric_info['schema_version']
- metric_response['schema_type'] = "read_metric_data_response"
- metric_response['metric_name'] = metric_info['metric_name']
- metric_response['metric_uuid'] = metric_info['metric_uuid']
- metric_response['correlation_id'] = metric_info['correlation_uuid']
- metric_response['resource_uuid'] = metric_info['resource_uuid']
- metric_response['tenant_uuid'] = metric_info['tenant_uuid']
- metric_response['metrics_data'] = data_resp
- payload = json.dumps(metric_response)
- file = open('../../core/models/read_metric_data_resp.json','wb').write((payload))
- self.producer.read_metric_data_response(key='read_metric_data_response',message=payload,topic = 'metric_response')
-
- log.info("Metric Data Response: %s",metric_response)
- return metric_response
-
- else:
- log.debug("Unknown key, no action will be performed")
- else:
- log.info("Message topic not relevant to this plugin: %s",
- message.topic)
- else:
- print "Bad VIM Request"
- except Exception as e:
- log.error("Consumer exception: %s", str(e))
-
-#---------------------------------------------------------------------------------------------------------------------------
- def check_resource(self,resource_uuid):
-
- '''Checking the resource_uuid is present in EC2 instances'''
- try:
- check_resp = dict()
- instances = self.ec2_conn.get_all_instance_status()
- status_resource = False
-
- #resource_id
- for instance_id in instances:
- instance_id = str(instance_id).split(':')[1]
- if instance_id == resource_uuid:
- check_resp['resource_uuid'] = resource_uuid
- status_resource = True
- else:
- status_resource = False
-
- #status
- return status_resource
-
- except Exception as e:
- log.error("Error in Plugin Inputs %s",str(e))
-#---------------------------------------------------------------------------------------------------------------------------
-
-obj = plugin_metrics()
-obj.connection()
-obj.consumer()