2 # Copyright 2017 xFlow Research Pvt. Ltd
3 # This file is part of MON module
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact with: wajeeha.hamid@xflowresearch.com
23 AWS-Plugin implements all the methods of MON to interact with AWS using the BOTO client
26 __author__
= "Wajeeha Hamid"
27 __date__
= "18-September-2017"
31 from connection
import Connection
32 from metric_alarms
import MetricAlarm
33 from metrics
import Metrics
34 # Need to import the producer message bus,not working yet
35 #from core.message_bus.producerfunct import KafkaProducer
36 sys
.path
.append("../../core/message-bus")
37 from producer
import KafkaProducer
38 from kafka
import KafkaConsumer
41 class plugin_metrics():
42 """Receives Alarm info from MetricAlarm and connects with the consumer/producer """
44 self
.conn
= Connection()
45 self
.metric
= Metrics()
47 #server = {'server': 'localhost:9092', 'topic': 'metrics_request'}
48 #Initialize a Consumer object to consume message from the SO
49 self
._consumer
= KafkaConsumer(bootstrap_servers
='localhost:9092')
50 self
._consumer
.subscribe(['metric_request'])
52 #producer = KafkaProducer('create_metric_request')
54 self
.producer
= KafkaProducer('')
55 #---------------------------------------------------------------------------------------------------------------------------
58 """Connecting instances with CloudWatch"""
59 self
.conn
.setEnvironment()
60 self
.conn
= self
.conn
.connection_instance()
61 self
.cloudwatch_conn
= self
.conn
['cloudwatch_connection']
62 self
.ec2_conn
= self
.conn
['ec2_connection']
64 except Exception as e
:
65 log
.error("Failed to Connect with AWS %s: " + str(e
))
66 #---------------------------------------------------------------------------------------------------------------------------
67 def create_metric_request(self
,metric_info
):
68 '''Comaptible API using normalized parameters'''
69 metric_resp
= self
.metric
.createMetrics(self
.cloudwatch_conn
,metric_info
)
71 #---------------------------------------------------------------------------------------------------------------------------
72 def update_metric_request(self
,updated_info
):
73 '''Comaptible API using normalized parameters'''
74 update_resp
= self
.metric
.updateMetrics(self
.cloudwatch_conn
,updated_info
)
76 #---------------------------------------------------------------------------------------------------------------------------
77 def delete_metric_request(self
,delete_info
):
78 '''Comaptible API using normalized parameters'''
79 del_resp
= self
.metric
.deleteMetrics(self
.cloudwatch_conn
,delete_info
)
81 #---------------------------------------------------------------------------------------------------------------------------
82 def list_metrics_request(self
,list_info
):
83 '''Comaptible API using normalized parameters'''
84 list_resp
= self
.metric
.listMetrics(self
.cloudwatch_conn
,list_info
)
86 #---------------------------------------------------------------------------------------------------------------------------
87 def read_metrics_data(self
,list_info
):
88 '''Comaptible API using normalized parameters
89 Read all metric data related to a specified metric'''
90 data_resp
=self
.metric
.metricsData(self
.cloudwatch_conn
,list_info
)
92 #---------------------------------------------------------------------------------------------------------------------------
95 '''Consumer will consume the message from SO,
96 1) parse the message and trigger the methods ac
97 cording to keys and topics provided in request.
99 2) The response from plugin is saved in json format.
101 3) The producer object then calls the producer response
102 methods to send the response back to message bus
106 for message
in self
._consumer
:
108 metric_info
= json
.loads(message
.value
)
109 metric_response
= dict()
111 if metric_info
['vim_type'] == 'AWS':
112 log
.debug ("VIM support : AWS")
114 # Check the Functionlity that needs to be performed: topic = 'alarms'/'metrics'/'Access_Credentials'
115 if message
.topic
== "metric_request":
116 log
.info("Action required against: %s" % (message
.topic
))
118 if message
.key
== "create_metric_request":
119 if self
.check_resource(metric_info
['metric_create']['resource_uuid']) == True:
120 metric_resp
= self
.create_metric_request(metric_info
['metric_create']) #alarm_info = message.value
121 metric_response
['schema_version'] = metric_info
['schema_version']
122 metric_response
['schema_type'] = "create_metric_response"
123 metric_response
['metric_create_response'] = metric_resp
124 payload
= json
.dumps(metric_response
)
125 file = open('../../core/models/create_metric_resp.json','wb').write((payload
))
126 self
.producer
.create_metrics_resp(key
='create_metric_response',message
=payload
,topic
= 'metric_response')
128 log
.info("Metric configured: %s", metric_resp
)
129 return metric_response
131 elif message
.key
== "update_metric_request":
132 if self
.check_resource(metric_info
['metric_create']['resource_uuid']) == True:
133 update_resp
= self
.update_metric_request(metric_info
['metric_create'])
134 metric_response
['schema_version'] = metric_info
['schema_version']
135 metric_response
['schema_type'] = "update_metric_response"
136 metric_response
['metric_update_response'] = update_resp
137 payload
= json
.dumps(metric_response
)
138 file = open('../../core/models/update_metric_resp.json','wb').write((payload
))
139 self
.producer
.create_metrics_resp(key
='update_metric_response',message
=payload
,topic
= 'metric_response')
141 log
.info("Metric Updates: %s",metric_response
)
142 return metric_response
144 elif message
.key
== "delete_metric_request":
145 if self
.check_resource(metric_info
['resource_uuid']) == True:
146 del_resp
=self
.delete_metric_request(metric_info
)
147 payload
= json
.dumps(del_resp
)
148 file = open('../../core/models/delete_metric_resp.json','wb').write((payload
))
149 self
.producer
.create_metrics_resp(key
='delete_metric_response',message
=payload
,topic
= 'metric_response')
151 log
.info("Metric Deletion Not supported in AWS : %s",del_resp
)
154 elif message
.key
== "list_metric_request":
155 if self
.check_resource(metric_info
['metrics_list_request']['resource_uuid']) == True:
156 list_resp
= self
.list_metrics_request(metric_info
['metrics_list_request'])
157 metric_response
['schema_version'] = metric_info
['schema_version']
158 metric_response
['schema_type'] = "list_metric_response"
159 metric_response
['correlation_id'] = metric_info
['metrics_list_request']['correlation_id']
160 metric_response
['vim_type'] = metric_info
['vim_type']
161 metric_response
['metrics_list'] = list_resp
162 payload
= json
.dumps(metric_response
)
163 file = open('../../core/models/list_metric_resp.json','wb').write((payload
))
164 self
.producer
.create_metrics_resp(key
='list_metrics_response',message
=payload
,topic
= 'metric_response')
166 log
.info("Metric List: %s",metric_response
)
167 return metric_response
169 elif message
.key
== "read_metric_data_request":
170 if self
.check_resource(metric_info
['resource_uuid']) == True:
171 data_resp
= self
.read_metrics_data(metric_info
)
172 metric_response
['schema_version'] = metric_info
['schema_version']
173 metric_response
['schema_type'] = "list_metric_response"
174 metric_response
['metric_name'] = metric_info
['metric_name']
175 metric_response
['metric_uuid'] = metric_info
['metric_uuid']
176 metric_response
['correlation_id'] = metric_info
['correlation_uuid']
177 metric_response
['resource_uuid'] = metric_info
['resource_uuid']
178 metric_response
['tenant_uuid'] = metric_info
['tenant_uuid']
179 metric_response
['metrics_data'] = data_resp
180 payload
= json
.dumps(metric_response
)
182 file = open('../../core/models/read_metric_data_resp.json','wb').write((payload
))
183 self
.producer
.create_metrics_resp(key
='read_metric_data_response',message
=payload
,topic
= 'metric_response')
184 log
.info("Metric Data Response: %s",metric_response
)
185 return metric_response
188 log
.debug("Unknown key, no action will be performed")
190 log
.info("Message topic not relevant to this plugin: %s",
193 print "Bad VIM Request"
194 except Exception as e
:
195 log
.error("Consumer exception: %s", str(e
))
197 #---------------------------------------------------------------------------------------------------------------------------
198 def check_resource(self
,resource_uuid
):
200 '''Checking the resource_uuid is present in EC2 instances'''
203 instances
= self
.ec2_conn
.get_all_instance_status()
204 status_resource
= False
207 for instance_id
in instances
:
208 instance_id
= str(instance_id
).split(':')[1]
209 if instance_id
== resource_uuid
:
210 check_resp
['resource_uuid'] = resource_uuid
211 status_resource
= True
213 status_resource
= False
216 return status_resource
218 except Exception as e
:
219 log
.error("Error in Plugin Inputs %s",str(e
))
220 #---------------------------------------------------------------------------------------------------------------------------
222 obj
= plugin_metrics()