Merge "Changes in vROPs Plugin. 1.Added specifications for Create, Update, Delete...
[osm/MON.git] / plugins / CloudWatch / plugin_alarm.py
1 ##
2 # Copyright 2017 xFlow Research Pvt. Ltd
3 # This file is part of MON module
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact with: wajeeha.hamid@xflowresearch.com
20 ##
21
22 '''
23 AWS-Plugin implements all the methods of MON to interact with AWS using the BOTO client
24 '''
25
26 __author__ = "Wajeeha Hamid"
27 __date__ = "18-September-2017"
28
29 import sys
30 import json
31 import logging as log
32 from jsmin import jsmin
33 from connection import Connection
34 from metric_alarms import MetricAlarm
35 from metrics import Metrics
36 from kafka import KafkaConsumer
37 sys.path.append("../../core/message-bus")
38 from producer import KafkaProducer
39
40 class Plugin():
41 """Receives Alarm info from MetricAlarm and connects with the consumer/producer"""
42 def __init__ (self):
43 self.conn = Connection()
44 self.metricAlarm = MetricAlarm()
45 self.metric = Metrics()
46
47 server = {'server': 'localhost:9092', 'topic': 'alarm_request'}
48
49 self._consumer = KafkaConsumer(server['topic'], bootstrap_servers=server['server'])
50 self._consumer.subscribe(['alarm_request'])
51
52 #self.producer = KafkaProducer('create_alarm_request')
53 self.producer = KafkaProducer('')
54
55
56 #---------------------------------------------------------------------------------------------------------------------------
57 def connection(self):
58 """Connecting instances with CloudWatch"""
59 self.conn.setEnvironment()
60 self.conn = self.conn.connection_instance()
61 self.cloudwatch_conn = self.conn['cloudwatch_connection']
62 self.ec2_conn = self.conn['ec2_connection']
63 #---------------------------------------------------------------------------------------------------------------------------
64 def configure_alarm(self,alarm_info):
65 alarm_id = self.metricAlarm.config_alarm(self.cloudwatch_conn,alarm_info)
66 return alarm_id
67 #---------------------------------------------------------------------------------------------------------------------------
68 def update_alarm_configuration(self,test):
69 alarm_id = self.metricAlarm.update_alarm(self.cloudwatch_conn,test)
70 return alarm_id
71 #---------------------------------------------------------------------------------------------------------------------------
72 def delete_alarm(self,alarm_id):
73 return self.metricAlarm.delete_Alarm(self.cloudwatch_conn,alarm_id)
74 #---------------------------------------------------------------------------------------------------------------------------
75 def get_alarms_list(self,instance_id):
76 return self.metricAlarm.alarms_list(self.cloudwatch_conn,instance_id)
77 #---------------------------------------------------------------------------------------------------------------------------
78 def get_ack_details(self,ack_info):
79 return self.metricAlarm.alarm_details(self.cloudwatch_conn,ack_info)
80 #---------------------------------------------------------------------------------------------------------------------------
81 def get_metrics_data(self,metric_name,period,instance_id):
82 return self.metric.metricsData(self.cloudwatch_conn,metric_name,period,instance_id)
83 #---------------------------------------------------------------------------------------------------------------------------
84
85 def consumer(self):
86 """Consume info from the message bus to manage alarms."""
87 try:
88 for message in self._consumer:
89 # Check the Functionlity that needs to be performed: topic = 'alarms'/'metrics'/'Access_Credentials'
90 if message.topic == "alarm_request":
91 log.info("Action required against: %s" % (message.topic))
92 alarm_info = json.loads(message.value)
93
94 if message.key == "create_alarm_request":
95 if alarm_info['vim_type'] == 'AWS':
96 alarm_inner_dict = alarm_info['alarm_create_request']
97 metric_status = self.check_metric(alarm_inner_dict['metric_name'])
98
99 if self.check_resource(alarm_inner_dict['resource_uuid']) == True and metric_status['status'] == True:
100 log.debug ("Resource and Metrics exists")
101
102 alarm_info['alarm_create_request']['metric_name'] = metric_status['metric_name']
103 #Generate a valid response message, send via producer
104 config_resp = self.configure_alarm(alarm_info) #alarm_info = message.value
105 if config_resp == None:
106 log.debug("Alarm Already exists")
107 payload = json.dumps(config_resp)
108 file = open('../../core/models/create_alarm_resp.json','wb').write((payload))
109 self.producer.create_alarm_response(key='create_alarm_response',message=payload,topic = 'alarm_response')
110 else:
111 payload = json.dumps(config_resp)
112 file = open('../../core/models/create_alarm_resp.json','wb').write((payload))
113
114 self.producer.create_alarm_response(key='create_alarm_response',message=payload,topic = 'alarm_response')
115 log.info("New alarm created with alarm info: %s", config_resp)
116 else:
117 log.error("Resource ID doesn't exists")
118 else:
119 log.error("Plugin inputs are incorrect")
120
121
122 elif message.key == "acknowledge_alarm":
123 alarm_inner_dict = alarm_info['ack_details']
124 if alarm_info['vim_type'] == 'AWS':
125 if self.check_resource(alarm_inner_dict['resource_uuid']) == True:
126 alarm_info = json.loads(message.value)
127 #Generate a valid response message, send via producer
128 ack_details = self.get_ack_details(alarm_info)
129 payload = json.dumps(ack_details)
130 file = open('../../core/models/notify_alarm.json','wb').write((payload))
131 self.producer.update_alarm_response(key='notify_alarm',message=payload,topic = 'alarm_response')
132 log.info("Acknowledge sent: %s", ack_details)
133 else:
134 log.error("Resource ID is Incorrect")
135 else:
136 log.error(" VIM type incorrect ")
137
138
139 elif message.key == "update_alarm_request":
140 if alarm_info['vim_type'] == 'AWS':
141 alarm_inner_dict = alarm_info['alarm_update_request']
142 metric_status = self.check_metric(alarm_inner_dict['metric_name'])
143
144 if metric_status['status'] == True:
145 log.debug ("Resource and Metrics exists")
146 alarm_info['alarm_update_request']['metric_name'] = metric_status['metric_name']
147 #Generate a valid response message, send via producer
148 update_resp = self.update_alarm_configuration(alarm_info)
149 if update_resp == None:
150 payload = json.dumps(update_resp)
151 file = open('../../core/models/update_alarm_resp.json','wb').write((payload))
152 self.producer.update_alarm_response(key='update_alarm_response',message=payload,topic = 'alarm_response')
153 log.debug("Alarm Already exists")
154 else:
155 payload = json.dumps(update_resp)
156 file = open('../../core/models/update_alarm_resp.json','wb').write((payload))
157 self.producer.update_alarm_response(key='update_alarm_response',message=payload,topic = 'alarm_response')
158 log.info("Alarm Updated with alarm info: %s", update_resp)
159 else:
160 log.info ("Metric Not Supported")
161 else:
162 log.error(" VIM type Incorrect ")
163
164 elif message.key == "delete_alarm_request":
165 if alarm_info['vim_type'] == 'AWS':
166 del_info = json.loads(message.value)
167 #Generate a valid response message, send via producer
168 del_resp = self.delete_alarm(del_info)
169 payload = json.dumps(del_resp)
170 file = open('../../core/models/delete_alarm_resp.json','wb').write((payload))
171 self.producer.update_alarm_response(key='delete_alarm_response',message=payload,topic = 'alarm_response')
172 log.info("Alarm Deleted with alarm info: %s", del_resp)
173 else:
174 log.error(" VIM type Incorrect ")
175
176 elif message.key == "alarm_list_request":
177 alarm_inner_dict = alarm_info['alarm_list_request']
178 if alarm_info['vim_type'] == 'AWS':
179 if self.check_resource(alarm_inner_dict['resource_uuid']) == True:
180 #Generate a valid response message, send via producer
181 list_resp = self.get_alarms_list(alarm_info)#['alarm_names']
182 payload = json.dumps(list_resp)
183 file = open('../../core/models/list_alarm_resp.json','wb').write((payload))
184 self.producer.update_alarm_response(key='list_alarm_response',message=payload,topic = 'alarm_response')
185 else:
186 log.error("Resource ID is Incorrect")
187 else:
188 log.error(" VIM type Incorrect ")
189
190 else:
191 log.debug("Unknown key, no action will be performed")
192
193 else:
194 log.info("Message topic not relevant to this plugin: %s",
195 message.topic)
196 except Exception as e:
197 log.error("Consumer exception: %s", str(e))
198 #---------------------------------------------------------------------------------------------------------------------------
199 def check_resource(self,resource_uuid):
200 '''Finding Resource with the resource_uuid'''
201 try:
202 check_resp = dict()
203 instances = self.ec2_conn.get_all_instance_status()
204
205 #resource_id
206 for instance_id in instances:
207 instance_id = str(instance_id).split(':')[1]
208 if instance_id == resource_uuid:
209 check_resp['resource_uuid'] = resource_uuid
210 return True
211 return False
212
213 except Exception as e:
214 log.error("Error in Plugin Inputs %s",str(e))
215 #---------------------------------------------------------------------------------------------------------------------------
216 def check_metric(self,metric_name):
217 ''' Checking whether the metric is supported by AWS '''
218 try:
219 check_resp = dict()
220
221 #metric_name
222 if metric_name == 'CPU_UTILIZATION':
223 metric_name = 'CPUUtilization'
224 metric_status = True
225 elif metric_name == 'DISK_READ_OPS':
226 metric_name = 'DiskReadOps'
227 metric_status = True
228 elif metric_name == 'DISK_WRITE_OPS':
229 metric_name = 'DiskWriteOps'
230 metric_status = True
231 elif metric_name == 'DISK_READ_BYTES':
232 metric_name = 'DiskReadBytes'
233 metric_status = True
234 elif metric_name == 'DISK_WRITE_BYTES':
235 metric_name = 'DiskWriteBytes'
236 metric_status = True
237 elif metric_name == 'PACKETS_RECEIVED':
238 metric_name = 'NetworkPacketsIn'
239 metric_status = True
240 elif metric_name == 'PACKETS_SENT':
241 metric_name = 'NetworkPacketsOut'
242 metric_status = True
243 else:
244 metric_name = None
245 metric_status = False
246 check_resp['metric_name'] = metric_name
247 #status
248 if metric_status == True:
249 check_resp['status'] = True
250 return check_resp
251 except Exception as e:
252 log.error("Error in Plugin Inputs %s",str(e))
253 #---------------------------------------------------------------------------------------------------------------------------
254
255 obj = Plugin()
256 obj.connection()
257 obj.consumer()