2 # Copyright 2017 xFlow Research Pvt. Ltd
3 # This file is part of MON module
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact with: wajeeha.hamid@xflowresearch.com
23 AWS-Plugin implements all the methods of MON to interact with AWS using the BOTO client
26 __author__
= "Wajeeha Hamid"
27 __date__
= "18-September-2017"
32 from jsmin
import jsmin
33 from connection
import Connection
34 from metric_alarms
import MetricAlarm
35 from metrics
import Metrics
36 sys
.path
.append("../../core/message_bus")
37 from producer
import KafkaProducer
39 class plugin_alarms():
40 """Receives Alarm info from MetricAlarm and connects with the consumer/producer"""
42 self
.conn
= Connection()
43 self
.metricAlarm
= MetricAlarm()
44 self
.metric
= Metrics()
46 self
.producer
= KafkaProducer('')
47 #---------------------------------------------------------------------------------------------------------------------------
49 """Connecting instances with CloudWatch"""
50 self
.conn
.setEnvironment()
51 self
.conn
= self
.conn
.connection_instance()
52 self
.cloudwatch_conn
= self
.conn
['cloudwatch_connection']
53 self
.ec2_conn
= self
.conn
['ec2_connection']
54 #---------------------------------------------------------------------------------------------------------------------------
55 def configure_alarm(self
,alarm_info
):
56 alarm_id
= self
.metricAlarm
.config_alarm(self
.cloudwatch_conn
,alarm_info
)
58 #---------------------------------------------------------------------------------------------------------------------------
59 def update_alarm_configuration(self
,test
):
60 alarm_id
= self
.metricAlarm
.update_alarm(self
.cloudwatch_conn
,test
)
62 #---------------------------------------------------------------------------------------------------------------------------
63 def delete_alarm(self
,alarm_id
):
64 return self
.metricAlarm
.delete_Alarm(self
.cloudwatch_conn
,alarm_id
)
65 #---------------------------------------------------------------------------------------------------------------------------
66 def get_alarms_list(self
,instance_id
):
67 return self
.metricAlarm
.alarms_list(self
.cloudwatch_conn
,instance_id
)
68 #---------------------------------------------------------------------------------------------------------------------------
69 def get_ack_details(self
,ack_info
):
70 return self
.metricAlarm
.alarm_details(self
.cloudwatch_conn
,ack_info
)
71 #---------------------------------------------------------------------------------------------------------------------------
72 def get_metrics_data(self
,metric_name
,period
,instance_id
):
73 return self
.metric
.metricsData(self
.cloudwatch_conn
,metric_name
,period
,instance_id
)
74 #---------------------------------------------------------------------------------------------------------------------------
76 def alarm_calls(self
,message
):
77 """Gets the message from the common consumer"""
79 log
.info("Action required against: %s" % (message
.topic
))
80 alarm_info
= json
.loads(message
.value
)
82 if message
.key
== "create_alarm_request":
83 alarm_inner_dict
= alarm_info
['alarm_create_request']
84 metric_status
= self
.check_metric(alarm_inner_dict
['metric_name'])
86 if self
.check_resource(alarm_inner_dict
['resource_uuid']) == True and metric_status
['status'] == True:
87 log
.debug ("Resource and Metrics exists")
89 alarm_info
['alarm_create_request']['metric_name'] = metric_status
['metric_name']
90 #Generate a valid response message, send via producer
91 config_resp
= self
.configure_alarm(alarm_info
) #alarm_info = message.value
93 if config_resp
== None:
94 log
.debug("Alarm Already exists")
95 payload
= json
.dumps(config_resp
)
96 file = open('../../core/models/create_alarm_resp.json','wb').write((payload
))
97 self
.producer
.create_alarm_response(key
='create_alarm_response',message
=payload
,topic
= 'alarm_response')
100 payload
= json
.dumps(config_resp
)
101 file = open('../../core/models/create_alarm_resp.json','wb').write((payload
))
102 self
.producer
.create_alarm_response(key
='create_alarm_response',message
=payload
,topic
= 'alarm_response')
103 log
.info("New alarm created with alarm info: %s", config_resp
)
106 log
.error("Resource ID doesn't exists")
108 elif message
.key
== "acknowledge_alarm":
109 alarm_inner_dict
= alarm_info
['ack_details']
111 if self
.check_resource(alarm_inner_dict
['resource_uuid']) == True:
112 alarm_info
= json
.loads(message
.value
)
113 #Generate a valid response message, send via producer
114 ack_details
= self
.get_ack_details(alarm_info
)
115 payload
= json
.dumps(ack_details
)
116 file = open('../../core/models/notify_alarm.json','wb').write((payload
))
117 self
.producer
.notify_alarm(key
='notify_alarm',message
=payload
,topic
= 'alarm_response')
118 log
.info("Acknowledge sent: %s", ack_details
)
121 log
.error("Resource ID is Incorrect")
124 elif message
.key
== "update_alarm_request":
125 alarm_inner_dict
= alarm_info
['alarm_update_request']
126 metric_status
= self
.check_metric(alarm_inner_dict
['metric_name'])
128 if metric_status
['status'] == True:
129 log
.debug ("Resource and Metrics exists")
130 alarm_info
['alarm_update_request']['metric_name'] = metric_status
['metric_name']
131 #Generate a valid response message, send via producer
132 update_resp
= self
.update_alarm_configuration(alarm_info
)
134 if update_resp
== None:
135 payload
= json
.dumps(update_resp
)
136 file = open('../../core/models/update_alarm_resp.json','wb').write((payload
))
137 self
.producer
.update_alarm_response(key
='update_alarm_response',message
=payload
,topic
= 'alarm_response')
138 log
.debug("Alarm Already exists")
141 payload
= json
.dumps(update_resp
)
142 file = open('../../core/models/update_alarm_resp.json','wb').write((payload
))
143 self
.producer
.update_alarm_response(key
='update_alarm_response',message
=payload
,topic
= 'alarm_response')
144 log
.info("Alarm Updated with alarm info: %s", update_resp
)
147 log
.info ("Metric Not Supported")
150 elif message
.key
== "delete_alarm_request":
151 del_info
= json
.loads(message
.value
)
152 #Generate a valid response message, send via producer
153 del_resp
= self
.delete_alarm(del_info
)
154 payload
= json
.dumps(del_resp
)
155 file = open('../../core/models/delete_alarm_resp.json','wb').write((payload
))
156 self
.producer
.delete_alarm_response(key
='delete_alarm_response',message
=payload
,topic
= 'alarm_response')
157 log
.info("Alarm Deleted with alarm info: %s", del_resp
)
160 elif message
.key
== "alarm_list_request":
161 alarm_inner_dict
= alarm_info
['alarm_list_request']
163 if self
.check_resource(alarm_inner_dict
['resource_uuid']) == True or alarm_inner_dict
['resource_uuid'] == "":
164 #Generate a valid response message, send via producer
165 list_resp
= self
.get_alarms_list(alarm_info
)#['alarm_names']
166 payload
= json
.dumps(list_resp
)
167 file = open('../../core/models/list_alarm_resp.json','wb').write((payload
))
168 self
.producer
.list_alarm_response(key
='list_alarm_response',message
=payload
,topic
= 'alarm_response')
171 log
.error("Resource ID is Incorrect")
174 log
.debug("Unknown key, no action will be performed")
176 except Exception as e
:
177 log
.error("Message retrieval exception: %s", str(e
))
178 #---------------------------------------------------------------------------------------------------------------------------
179 def check_resource(self
,resource_uuid
):
180 '''Finding Resource with the resource_uuid'''
183 instances
= self
.ec2_conn
.get_all_instance_status()
186 for instance_id
in instances
:
187 instance_id
= str(instance_id
).split(':')[1]
189 if instance_id
== resource_uuid
:
190 check_resp
['resource_uuid'] = resource_uuid
194 except Exception as e
:
195 log
.error("Error in Plugin Inputs %s",str(e
))
196 #---------------------------------------------------------------------------------------------------------------------------
197 def check_metric(self
,metric_name
):
198 ''' Checking whether the metric is supported by AWS '''
203 if metric_name
== 'CPU_UTILIZATION':
204 metric_name
= 'CPUUtilization'
207 elif metric_name
== 'DISK_READ_OPS':
208 metric_name
= 'DiskReadOps'
211 elif metric_name
== 'DISK_WRITE_OPS':
212 metric_name
= 'DiskWriteOps'
215 elif metric_name
== 'DISK_READ_BYTES':
216 metric_name
= 'DiskReadBytes'
219 elif metric_name
== 'DISK_WRITE_BYTES':
220 metric_name
= 'DiskWriteBytes'
223 elif metric_name
== 'PACKETS_RECEIVED':
224 metric_name
= 'NetworkPacketsIn'
227 elif metric_name
== 'PACKETS_SENT':
228 metric_name
= 'NetworkPacketsOut'
233 metric_status
= False
234 check_resp
['metric_name'] = metric_name
237 if metric_status
== True:
238 check_resp
['status'] = True
241 except Exception as e
:
242 log
.error("Error in Plugin Inputs %s",str(e
))
243 #---------------------------------------------------------------------------------------------------------------------------