2 # Copyright 2017 xFlow Research Pvt. Ltd
3 # This file is part of MON module
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact with: wajeeha.hamid@xflowresearch.com
23 AWS-Plugin implements all the methods of MON to interact with AWS using the BOTO client
26 __author__
= "Wajeeha Hamid"
27 __date__
= "18-September-2017"
32 from jsmin
import jsmin
33 from connection
import Connection
34 from metric_alarms
import MetricAlarm
35 from metrics
import Metrics
36 from kafka
import KafkaConsumer
37 sys
.path
.append("../../core/message-bus")
38 from producer
import KafkaProducer
41 """Receives Alarm info from MetricAlarm and connects with the consumer/producer"""
43 self
.conn
= Connection()
44 self
.metricAlarm
= MetricAlarm()
45 self
.metric
= Metrics()
46 server
= {'server': 'localhost:9092', 'topic': 'alarm_request'}
47 self
._consumer
= KafkaConsumer(server
['topic'], bootstrap_servers
=server
['server'])
48 self
._consumer
.subscribe(['alarm_request'])
49 self
.producer
= KafkaProducer('')
50 #---------------------------------------------------------------------------------------------------------------------------
52 """Connecting instances with CloudWatch"""
53 self
.conn
.setEnvironment()
54 self
.conn
= self
.conn
.connection_instance()
55 self
.cloudwatch_conn
= self
.conn
['cloudwatch_connection']
56 self
.ec2_conn
= self
.conn
['ec2_connection']
57 #---------------------------------------------------------------------------------------------------------------------------
58 def configure_alarm(self
,alarm_info
):
59 alarm_id
= self
.metricAlarm
.config_alarm(self
.cloudwatch_conn
,alarm_info
)
61 #---------------------------------------------------------------------------------------------------------------------------
62 def update_alarm_configuration(self
,test
):
63 alarm_id
= self
.metricAlarm
.update_alarm(self
.cloudwatch_conn
,test
)
65 #---------------------------------------------------------------------------------------------------------------------------
66 def delete_alarm(self
,alarm_id
):
67 return self
.metricAlarm
.delete_Alarm(self
.cloudwatch_conn
,alarm_id
)
68 #---------------------------------------------------------------------------------------------------------------------------
69 def get_alarms_list(self
,instance_id
):
70 return self
.metricAlarm
.alarms_list(self
.cloudwatch_conn
,instance_id
)
71 #---------------------------------------------------------------------------------------------------------------------------
72 def get_ack_details(self
,ack_info
):
73 return self
.metricAlarm
.alarm_details(self
.cloudwatch_conn
,ack_info
)
74 #---------------------------------------------------------------------------------------------------------------------------
75 def get_metrics_data(self
,metric_name
,period
,instance_id
):
76 return self
.metric
.metricsData(self
.cloudwatch_conn
,metric_name
,period
,instance_id
)
77 #---------------------------------------------------------------------------------------------------------------------------
80 """Consume info from the message bus to manage alarms."""
82 for message
in self
._consumer
:
83 # Check the Functionlity that needs to be performed: topic = 'alarms'/'metrics'/'Access_Credentials'
84 if message
.topic
== "alarm_request":
85 log
.info("Action required against: %s" % (message
.topic
))
86 alarm_info
= json
.loads(message
.value
)
88 if message
.key
== "create_alarm_request":
89 if alarm_info
['vim_type'] == 'AWS':
90 alarm_inner_dict
= alarm_info
['alarm_create_request']
91 metric_status
= self
.check_metric(alarm_inner_dict
['metric_name'])
92 if self
.check_resource(alarm_inner_dict
['resource_uuid']) == True and metric_status
['status'] == True:
93 log
.debug ("Resource and Metrics exists")
95 alarm_info
['alarm_create_request']['metric_name'] = metric_status
['metric_name']
96 #Generate a valid response message, send via producer
97 config_resp
= self
.configure_alarm(alarm_info
) #alarm_info = message.value
98 if config_resp
== None:
99 log
.debug("Alarm Already exists")
100 payload
= json
.dumps(config_resp
)
101 file = open('../../core/models/create_alarm_resp.json','wb').write((payload
))
102 self
.producer
.create_alarm_response(key
='create_alarm_response',message
=payload
,topic
= 'alarm_response')
104 payload
= json
.dumps(config_resp
)
105 file = open('../../core/models/create_alarm_resp.json','wb').write((payload
))
107 self
.producer
.create_alarm_response(key
='create_alarm_response',message
=payload
,topic
= 'alarm_response')
108 log
.info("New alarm created with alarm info: %s", config_resp
)
110 log
.error("Resource ID doesn't exists")
112 log
.error("Plugin inputs are incorrect")
115 elif message
.key
== "acknowledge_alarm":
116 alarm_inner_dict
= alarm_info
['ack_details']
117 if alarm_info
['vim_type'] == 'AWS':
118 if self
.check_resource(alarm_inner_dict
['resource_uuid']) == True:
119 alarm_info
= json
.loads(message
.value
)
120 #Generate a valid response message, send via producer
121 ack_details
= self
.get_ack_details(alarm_info
)
122 payload
= json
.dumps(ack_details
)
123 file = open('../../core/models/notify_alarm.json','wb').write((payload
))
124 self
.producer
.notify_alarm(key
='notify_alarm',message
=payload
,topic
= 'alarm_response')
125 log
.info("Acknowledge sent: %s", ack_details
)
127 log
.error("Resource ID is Incorrect")
129 log
.error(" VIM type incorrect ")
132 elif message
.key
== "update_alarm_request":
133 if alarm_info
['vim_type'] == 'AWS':
134 alarm_inner_dict
= alarm_info
['alarm_update_request']
135 metric_status
= self
.check_metric(alarm_inner_dict
['metric_name'])
137 if metric_status
['status'] == True:
138 log
.debug ("Resource and Metrics exists")
139 alarm_info
['alarm_update_request']['metric_name'] = metric_status
['metric_name']
140 #Generate a valid response message, send via producer
141 update_resp
= self
.update_alarm_configuration(alarm_info
)
142 if update_resp
== None:
143 payload
= json
.dumps(update_resp
)
144 file = open('../../core/models/update_alarm_resp.json','wb').write((payload
))
145 self
.producer
.update_alarm_response(key
='update_alarm_response',message
=payload
,topic
= 'alarm_response')
146 log
.debug("Alarm Already exists")
148 payload
= json
.dumps(update_resp
)
149 file = open('../../core/models/update_alarm_resp.json','wb').write((payload
))
150 self
.producer
.update_alarm_response(key
='update_alarm_response',message
=payload
,topic
= 'alarm_response')
151 log
.info("Alarm Updated with alarm info: %s", update_resp
)
153 log
.info ("Metric Not Supported")
155 log
.error(" VIM type Incorrect ")
157 elif message
.key
== "delete_alarm_request":
158 if alarm_info
['vim_type'] == 'AWS':
159 del_info
= json
.loads(message
.value
)
160 #Generate a valid response message, send via producer
161 del_resp
= self
.delete_alarm(del_info
)
162 payload
= json
.dumps(del_resp
)
163 file = open('../../core/models/delete_alarm_resp.json','wb').write((payload
))
164 self
.producer
.delete_alarm_response(key
='delete_alarm_response',message
=payload
,topic
= 'alarm_response')
165 log
.info("Alarm Deleted with alarm info: %s", del_resp
)
167 log
.error(" VIM type Incorrect ")
169 elif message
.key
== "alarm_list_request":
170 alarm_inner_dict
= alarm_info
['alarm_list_request']
171 if alarm_info
['vim_type'] == 'AWS':
172 if self
.check_resource(alarm_inner_dict
['resource_uuid']) == True or alarm_inner_dict
['resource_uuid'] == "":
173 #Generate a valid response message, send via producer
174 list_resp
= self
.get_alarms_list(alarm_info
)#['alarm_names']
175 payload
= json
.dumps(list_resp
)
176 file = open('../../core/models/list_alarm_resp.json','wb').write((payload
))
177 self
.producer
.list_alarm_response(key
='list_alarm_response',message
=payload
,topic
= 'alarm_response')
179 log
.error("Resource ID is Incorrect")
181 log
.error(" VIM type Incorrect ")
184 log
.debug("Unknown key, no action will be performed")
187 log
.info("Message topic not relevant to this plugin: %s",
189 except Exception as e
:
190 log
.error("Consumer exception: %s", str(e
))
191 #---------------------------------------------------------------------------------------------------------------------------
192 def check_resource(self
,resource_uuid
):
193 '''Finding Resource with the resource_uuid'''
196 instances
= self
.ec2_conn
.get_all_instance_status()
199 for instance_id
in instances
:
200 instance_id
= str(instance_id
).split(':')[1]
201 if instance_id
== resource_uuid
:
202 check_resp
['resource_uuid'] = resource_uuid
206 except Exception as e
:
207 log
.error("Error in Plugin Inputs %s",str(e
))
208 #---------------------------------------------------------------------------------------------------------------------------
209 def check_metric(self
,metric_name
):
210 ''' Checking whether the metric is supported by AWS '''
215 if metric_name
== 'CPU_UTILIZATION':
216 metric_name
= 'CPUUtilization'
218 elif metric_name
== 'DISK_READ_OPS':
219 metric_name
= 'DiskReadOps'
221 elif metric_name
== 'DISK_WRITE_OPS':
222 metric_name
= 'DiskWriteOps'
224 elif metric_name
== 'DISK_READ_BYTES':
225 metric_name
= 'DiskReadBytes'
227 elif metric_name
== 'DISK_WRITE_BYTES':
228 metric_name
= 'DiskWriteBytes'
230 elif metric_name
== 'PACKETS_RECEIVED':
231 metric_name
= 'NetworkPacketsIn'
233 elif metric_name
== 'PACKETS_SENT':
234 metric_name
= 'NetworkPacketsOut'
238 metric_status
= False
239 check_resp
['metric_name'] = metric_name
241 if metric_status
== True:
242 check_resp
['status'] = True
244 except Exception as e
:
245 log
.error("Error in Plugin Inputs %s",str(e
))
246 #---------------------------------------------------------------------------------------------------------------------------