2 # Copyright 2017 xFlow Research Pvt. Ltd
3 # This file is part of MON module
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact with: wajeeha.hamid@xflowresearch.com
23 AWS-Plugin implements all the methods of MON to interact with AWS using the BOTO client
26 __author__
= "Wajeeha Hamid"
27 __date__
= "18-September-2017"
31 from connection
import Connection
32 from metric_alarms
import MetricAlarm
33 from metrics
import Metrics
34 sys
.path
.append("../../core/message_bus")
35 from producer
import KafkaProducer
36 from kafka
import KafkaConsumer
39 class plugin_metrics():
40 """Receives Alarm info from MetricAlarm and connects with the consumer/producer """
42 self
.conn
= Connection()
43 self
.metric
= Metrics()
45 #server = {'server': 'localhost:9092', 'topic': 'metrics_request'}
46 #Initialize a Consumer object to consume message from the SO
47 self
._consumer
= KafkaConsumer(bootstrap_servers
='localhost:9092')
48 self
._consumer
.subscribe(['metric_request'])
50 #producer = KafkaProducer('create_metric_request')
52 self
.producer
= KafkaProducer('')
53 #---------------------------------------------------------------------------------------------------------------------------
56 """Connecting instances with CloudWatch"""
57 self
.conn
.setEnvironment()
58 self
.conn
= self
.conn
.connection_instance()
59 self
.cloudwatch_conn
= self
.conn
['cloudwatch_connection']
60 self
.ec2_conn
= self
.conn
['ec2_connection']
62 except Exception as e
:
63 log
.error("Failed to Connect with AWS %s: " + str(e
))
64 #---------------------------------------------------------------------------------------------------------------------------
65 def create_metric_request(self
,metric_info
):
66 '''Comaptible API using normalized parameters'''
67 metric_resp
= self
.metric
.createMetrics(self
.cloudwatch_conn
,metric_info
)
69 #---------------------------------------------------------------------------------------------------------------------------
70 def update_metric_request(self
,updated_info
):
71 '''Comaptible API using normalized parameters'''
72 update_resp
= self
.metric
.updateMetrics(self
.cloudwatch_conn
,updated_info
)
74 #---------------------------------------------------------------------------------------------------------------------------
75 def delete_metric_request(self
,delete_info
):
76 '''Comaptible API using normalized parameters'''
77 del_resp
= self
.metric
.deleteMetrics(self
.cloudwatch_conn
,delete_info
)
79 #---------------------------------------------------------------------------------------------------------------------------
80 def list_metrics_request(self
,list_info
):
81 '''Comaptible API using normalized parameters'''
82 list_resp
= self
.metric
.listMetrics(self
.cloudwatch_conn
,list_info
)
84 #---------------------------------------------------------------------------------------------------------------------------
85 def read_metrics_data(self
,list_info
):
86 '''Comaptible API using normalized parameters
87 Read all metric data related to a specified metric'''
88 data_resp
=self
.metric
.metricsData(self
.cloudwatch_conn
,list_info
)
90 #---------------------------------------------------------------------------------------------------------------------------
93 '''Consumer will consume the message from SO,
94 1) parse the message and trigger the methods ac
95 cording to keys and topics provided in request.
97 2) The response from plugin is saved in json format.
99 3) The producer object then calls the producer response
100 methods to send the response back to message bus
104 for message
in self
._consumer
:
105 metric_info
= json
.loads(message
.value
)
107 metric_response
= dict()
109 if metric_info
['vim_type'] == 'AWS':
110 log
.debug ("VIM support : AWS")
112 # Check the Functionlity that needs to be performed: topic = 'alarms'/'metrics'/'Access_Credentials'
113 if message
.topic
== "metric_request":
114 log
.info("Action required against: %s" % (message
.topic
))
116 if message
.key
== "create_metric_request":
117 if self
.check_resource(metric_info
['metric_create']['resource_uuid']) == True:
118 metric_resp
= self
.create_metric_request(metric_info
['metric_create']) #alarm_info = message.value
119 metric_response
['schema_version'] = metric_info
['schema_version']
120 metric_response
['schema_type'] = "create_metric_response"
121 metric_response
['metric_create_response'] = metric_resp
122 payload
= json
.dumps(metric_response
)
123 file = open('../../core/models/create_metric_resp.json','wb').write((payload
))
124 self
.producer
.create_metrics_resp(key
='create_metric_response',message
=payload
,topic
= 'metric_response')
126 log
.info("Metric configured: %s", metric_resp
)
127 return metric_response
129 elif message
.key
== "update_metric_request":
130 if self
.check_resource(metric_info
['metric_create']['resource_uuid']) == True:
131 update_resp
= self
.update_metric_request(metric_info
['metric_create'])
132 metric_response
['schema_version'] = metric_info
['schema_version']
133 metric_response
['schema_type'] = "update_metric_response"
134 metric_response
['metric_update_response'] = update_resp
135 payload
= json
.dumps(metric_response
)
137 file = open('../../core/models/update_metric_resp.json','wb').write((payload
))
138 self
.producer
.update_metric_response(key
='update_metric_response',message
=payload
,topic
= 'metric_response')
140 log
.info("Metric Updates: %s",metric_response
)
141 return metric_response
143 elif message
.key
== "delete_metric_request":
144 if self
.check_resource(metric_info
['resource_uuid']) == True:
145 del_resp
=self
.delete_metric_request(metric_info
)
146 payload
= json
.dumps(del_resp
)
147 file = open('../../core/models/delete_metric_resp.json','wb').write((payload
))
148 self
.producer
.delete_metric_response(key
='delete_metric_response',message
=payload
,topic
= 'metric_response')
150 log
.info("Metric Deletion Not supported in AWS : %s",del_resp
)
153 elif message
.key
== "list_metric_request":
154 if self
.check_resource(metric_info
['metrics_list_request']['resource_uuid']) == True:
155 list_resp
= self
.list_metrics_request(metric_info
['metrics_list_request'])
156 metric_response
['schema_version'] = metric_info
['schema_version']
157 metric_response
['schema_type'] = "list_metric_response"
158 metric_response
['correlation_id'] = metric_info
['metrics_list_request']['correlation_id']
159 metric_response
['vim_type'] = metric_info
['vim_type']
160 metric_response
['metrics_list'] = list_resp
161 payload
= json
.dumps(metric_response
)
162 file = open('../../core/models/list_metric_resp.json','wb').write((payload
))
163 self
.producer
.list_metric_response(key
='list_metrics_response',message
=payload
,topic
= 'metric_response')
165 log
.info("Metric List: %s",metric_response
)
166 return metric_response
168 elif message
.key
== "read_metric_data_request":
169 if self
.check_resource(metric_info
['resource_uuid']) == True:
170 data_resp
= self
.read_metrics_data(metric_info
)
171 metric_response
['schema_version'] = metric_info
['schema_version']
172 metric_response
['schema_type'] = "read_metric_data_response"
173 metric_response
['metric_name'] = metric_info
['metric_name']
174 metric_response
['metric_uuid'] = metric_info
['metric_uuid']
175 metric_response
['correlation_id'] = metric_info
['correlation_uuid']
176 metric_response
['resource_uuid'] = metric_info
['resource_uuid']
177 metric_response
['tenant_uuid'] = metric_info
['tenant_uuid']
178 metric_response
['metrics_data'] = data_resp
179 payload
= json
.dumps(metric_response
)
180 file = open('../../core/models/read_metric_data_resp.json','wb').write((payload
))
181 self
.producer
.read_metric_data_response(key
='read_metric_data_response',message
=payload
,topic
= 'metric_response')
183 log
.info("Metric Data Response: %s",metric_response
)
184 return metric_response
187 log
.debug("Unknown key, no action will be performed")
189 log
.info("Message topic not relevant to this plugin: %s",
192 print "Bad VIM Request"
193 except Exception as e
:
194 log
.error("Consumer exception: %s", str(e
))
196 #---------------------------------------------------------------------------------------------------------------------------
197 def check_resource(self
,resource_uuid
):
199 '''Checking the resource_uuid is present in EC2 instances'''
202 instances
= self
.ec2_conn
.get_all_instance_status()
203 status_resource
= False
206 for instance_id
in instances
:
207 instance_id
= str(instance_id
).split(':')[1]
208 if instance_id
== resource_uuid
:
209 check_resp
['resource_uuid'] = resource_uuid
210 status_resource
= True
212 status_resource
= False
215 return status_resource
217 except Exception as e
:
218 log
.error("Error in Plugin Inputs %s",str(e
))
219 #---------------------------------------------------------------------------------------------------------------------------
221 obj
= plugin_metrics()