2 # Copyright 2017 xFlow Research Pvt. Ltd
3 # This file is part of MON module
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact with: wajeeha.hamid@xflowresearch.com
23 AWS-Plugin implements all the methods of MON to interact with AWS using the BOTO client
26 __author__
= "Wajeeha Hamid"
27 __date__
= "18-September-2017"
31 from connection
import Connection
32 from metric_alarms
import MetricAlarm
33 from metrics
import Metrics
34 sys
.path
.append("../../core/message_bus")
35 from producer
import KafkaProducer
38 class plugin_metrics():
39 """Receives Alarm info from MetricAlarm and connects with the consumer/producer """
41 self
.conn
= Connection()
42 self
.metric
= Metrics()
43 self
.producer
= KafkaProducer('')
45 #---------------------------------------------------------------------------------------------------------------------------
48 """Connecting instances with CloudWatch"""
49 self
.conn
.setEnvironment()
50 self
.conn
= self
.conn
.connection_instance()
51 self
.cloudwatch_conn
= self
.conn
['cloudwatch_connection']
52 self
.ec2_conn
= self
.conn
['ec2_connection']
54 except Exception as e
:
55 log
.error("Failed to Connect with AWS %s: " + str(e
))
56 #---------------------------------------------------------------------------------------------------------------------------
57 def create_metric_request(self
,metric_info
):
58 '''Comaptible API using normalized parameters'''
59 metric_resp
= self
.metric
.createMetrics(self
.cloudwatch_conn
,metric_info
)
61 #---------------------------------------------------------------------------------------------------------------------------
62 def update_metric_request(self
,updated_info
):
63 '''Comaptible API using normalized parameters'''
64 update_resp
= self
.metric
.updateMetrics(self
.cloudwatch_conn
,updated_info
)
66 #---------------------------------------------------------------------------------------------------------------------------
67 def delete_metric_request(self
,delete_info
):
68 '''Comaptible API using normalized parameters'''
69 del_resp
= self
.metric
.deleteMetrics(self
.cloudwatch_conn
,delete_info
)
71 #---------------------------------------------------------------------------------------------------------------------------
72 def list_metrics_request(self
,list_info
):
73 '''Comaptible API using normalized parameters'''
74 list_resp
= self
.metric
.listMetrics(self
.cloudwatch_conn
,list_info
)
76 #---------------------------------------------------------------------------------------------------------------------------
77 def read_metrics_data(self
,list_info
):
78 '''Comaptible API using normalized parameters
79 Read all metric data related to a specified metric'''
80 data_resp
=self
.metric
.metricsData(self
.cloudwatch_conn
,list_info
)
82 #---------------------------------------------------------------------------------------------------------------------------
84 def metric_calls(self
,message
):
85 '''Consumer will consume the message from SO,
86 1) parse the message and trigger the methods ac
87 cording to keys and topics provided in request.
89 2) The response from plugin is saved in json format.
91 3) The producer object then calls the producer response
92 methods to send the response back to message bus
96 metric_info
= json
.loads(message
.value
)
97 metric_response
= dict()
99 if metric_info
['vim_type'] == 'AWS':
100 log
.debug ("VIM support : AWS")
102 # Check the Functionlity that needs to be performed: topic = 'alarms'/'metrics'/'Access_Credentials'
103 if message
.topic
== "metric_request":
104 log
.info("Action required against: %s" % (message
.topic
))
106 if message
.key
== "create_metric_request":
107 if self
.check_resource(metric_info
['metric_create']['resource_uuid']) == True:
108 metric_resp
= self
.create_metric_request(metric_info
['metric_create']) #alarm_info = message.value
109 metric_response
['schema_version'] = metric_info
['schema_version']
110 metric_response
['schema_type'] = "create_metric_response"
111 metric_response
['metric_create_response'] = metric_resp
112 payload
= json
.dumps(metric_response
)
113 file = open('../../core/models/create_metric_resp.json','wb').write((payload
))
114 self
.producer
.create_metrics_resp(key
='create_metric_response',message
=payload
,topic
= 'metric_response')
116 log
.info("Metric configured: %s", metric_resp
)
117 return metric_response
119 elif message
.key
== "update_metric_request":
120 if self
.check_resource(metric_info
['metric_create']['resource_uuid']) == True:
121 update_resp
= self
.update_metric_request(metric_info
['metric_create'])
122 metric_response
['schema_version'] = metric_info
['schema_version']
123 metric_response
['schema_type'] = "update_metric_response"
124 metric_response
['metric_update_response'] = update_resp
125 payload
= json
.dumps(metric_response
)
126 file = open('../../core/models/update_metric_resp.json','wb').write((payload
))
127 self
.producer
.update_metric_response(key
='update_metric_response',message
=payload
,topic
= 'metric_response')
129 log
.info("Metric Updates: %s",metric_response
)
130 return metric_response
132 elif message
.key
== "delete_metric_request":
133 if self
.check_resource(metric_info
['resource_uuid']) == True:
134 del_resp
=self
.delete_metric_request(metric_info
)
135 payload
= json
.dumps(del_resp
)
136 file = open('../../core/models/delete_metric_resp.json','wb').write((payload
))
137 self
.producer
.delete_metric_response(key
='delete_metric_response',message
=payload
,topic
= 'metric_response')
139 log
.info("Metric Deletion Not supported in AWS : %s",del_resp
)
142 elif message
.key
== "list_metric_request":
143 if self
.check_resource(metric_info
['metrics_list_request']['resource_uuid']) == True:
144 list_resp
= self
.list_metrics_request(metric_info
['metrics_list_request'])
145 metric_response
['schema_version'] = metric_info
['schema_version']
146 metric_response
['schema_type'] = "list_metric_response"
147 metric_response
['correlation_id'] = metric_info
['metrics_list_request']['correlation_id']
148 metric_response
['vim_type'] = metric_info
['vim_type']
149 metric_response
['metrics_list'] = list_resp
150 payload
= json
.dumps(metric_response
)
151 file = open('../../core/models/list_metric_resp.json','wb').write((payload
))
152 self
.producer
.list_metric_response(key
='list_metrics_response',message
=payload
,topic
= 'metric_response')
154 log
.info("Metric List: %s",metric_response
)
155 return metric_response
157 elif message
.key
== "read_metric_data_request":
158 if self
.check_resource(metric_info
['resource_uuid']) == True:
159 data_resp
= self
.read_metrics_data(metric_info
)
160 metric_response
['schema_version'] = metric_info
['schema_version']
161 metric_response
['schema_type'] = "read_metric_data_response"
162 metric_response
['metric_name'] = metric_info
['metric_name']
163 metric_response
['metric_uuid'] = metric_info
['metric_uuid']
164 metric_response
['correlation_id'] = metric_info
['correlation_uuid']
165 metric_response
['resource_uuid'] = metric_info
['resource_uuid']
166 metric_response
['tenant_uuid'] = metric_info
['tenant_uuid']
167 metric_response
['metrics_data'] = data_resp
168 payload
= json
.dumps(metric_response
)
169 file = open('../../core/models/read_metric_data_resp.json','wb').write((payload
))
170 self
.producer
.read_metric_data_response(key
='read_metric_data_response',message
=payload
,topic
= 'metric_response')
172 log
.info("Metric Data Response: %s",metric_response
)
173 return metric_response
176 log
.debug("Unknown key, no action will be performed")
178 log
.info("Message topic not relevant to this plugin: %s",
181 except Exception as e
:
182 log
.error("Consumer exception: %s", str(e
))
184 #---------------------------------------------------------------------------------------------------------------------------
185 def check_resource(self
,resource_uuid
):
187 '''Checking the resource_uuid is present in EC2 instances'''
190 instances
= self
.ec2_conn
.get_all_instance_status()
191 status_resource
= False
194 for instance_id
in instances
:
195 instance_id
= str(instance_id
).split(':')[1]
196 if instance_id
== resource_uuid
:
197 check_resp
['resource_uuid'] = resource_uuid
198 status_resource
= True
200 status_resource
= False
203 return status_resource
205 except Exception as e
:
206 log
.error("Error in Plugin Inputs %s",str(e
))
207 #---------------------------------------------------------------------------------------------------------------------------