AWS plugin—including unit testing files and updated code with error handling
[osm/MON.git] / plugins / CloudWatch / plugin_metrics.py
1 ##
2 # Copyright 2017 xFlow Research Pvt. Ltd
3 # This file is part of MON module
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact with: wajeeha.hamid@xflowresearch.com
20 ##
21
22 '''
23 AWS-Plugin implements all the methods of MON to interact with AWS using the BOTO client
24 '''
25
26 __author__ = "Wajeeha Hamid"
27 __date__ = "18-September-2017"
28
29 import sys
30 import json
31 from connection import Connection
32 from metric_alarms import MetricAlarm
33 from metrics import Metrics
34 sys.path.append("../../core/message_bus")
35 from producer import KafkaProducer
36 from kafka import KafkaConsumer
37 import logging as log
38
39 class plugin_metrics():
40 """Receives Alarm info from MetricAlarm and connects with the consumer/producer """
41 def __init__ (self):
42 self.conn = Connection()
43 self.metric = Metrics()
44
45 #server = {'server': 'localhost:9092', 'topic': 'metrics_request'}
46 #Initialize a Consumer object to consume message from the SO
47 self._consumer = KafkaConsumer(bootstrap_servers='localhost:9092')
48 self._consumer.subscribe(['metric_request'])
49
50 #producer = KafkaProducer('create_metric_request')
51
52 self.producer = KafkaProducer('')
53 #---------------------------------------------------------------------------------------------------------------------------
54 def connection(self):
55 try:
56 """Connecting instances with CloudWatch"""
57 self.conn.setEnvironment()
58 self.conn = self.conn.connection_instance()
59 self.cloudwatch_conn = self.conn['cloudwatch_connection']
60 self.ec2_conn = self.conn['ec2_connection']
61
62 except Exception as e:
63 log.error("Failed to Connect with AWS %s: " + str(e))
64 #---------------------------------------------------------------------------------------------------------------------------
65 def create_metric_request(self,metric_info):
66 '''Comaptible API using normalized parameters'''
67 metric_resp = self.metric.createMetrics(self.cloudwatch_conn,metric_info)
68 return metric_resp
69 #---------------------------------------------------------------------------------------------------------------------------
70 def update_metric_request(self,updated_info):
71 '''Comaptible API using normalized parameters'''
72 update_resp = self.metric.updateMetrics(self.cloudwatch_conn,updated_info)
73 return update_resp
74 #---------------------------------------------------------------------------------------------------------------------------
75 def delete_metric_request(self,delete_info):
76 '''Comaptible API using normalized parameters'''
77 del_resp = self.metric.deleteMetrics(self.cloudwatch_conn,delete_info)
78 return del_resp
79 #---------------------------------------------------------------------------------------------------------------------------
80 def list_metrics_request(self,list_info):
81 '''Comaptible API using normalized parameters'''
82 list_resp = self.metric.listMetrics(self.cloudwatch_conn,list_info)
83 return list_resp
84 #---------------------------------------------------------------------------------------------------------------------------
85 def read_metrics_data(self,list_info):
86 '''Comaptible API using normalized parameters
87 Read all metric data related to a specified metric'''
88 data_resp=self.metric.metricsData(self.cloudwatch_conn,list_info)
89 return data_resp
90 #---------------------------------------------------------------------------------------------------------------------------
91
92 def consumer(self):
93 '''Consumer will consume the message from SO,
94 1) parse the message and trigger the methods ac
95 cording to keys and topics provided in request.
96
97 2) The response from plugin is saved in json format.
98
99 3) The producer object then calls the producer response
100 methods to send the response back to message bus
101 '''
102
103 try:
104 for message in self._consumer:
105 metric_info = json.loads(message.value)
106 print metric_info
107 metric_response = dict()
108
109 if metric_info['vim_type'] == 'AWS':
110 log.debug ("VIM support : AWS")
111
112 # Check the Functionlity that needs to be performed: topic = 'alarms'/'metrics'/'Access_Credentials'
113 if message.topic == "metric_request":
114 log.info("Action required against: %s" % (message.topic))
115
116 if message.key == "create_metric_request":
117 if self.check_resource(metric_info['metric_create']['resource_uuid']) == True:
118 metric_resp = self.create_metric_request(metric_info['metric_create']) #alarm_info = message.value
119 metric_response['schema_version'] = metric_info['schema_version']
120 metric_response['schema_type'] = "create_metric_response"
121 metric_response['metric_create_response'] = metric_resp
122 payload = json.dumps(metric_response)
123 file = open('../../core/models/create_metric_resp.json','wb').write((payload))
124 self.producer.create_metrics_resp(key='create_metric_response',message=payload,topic = 'metric_response')
125
126 log.info("Metric configured: %s", metric_resp)
127 return metric_response
128
129 elif message.key == "update_metric_request":
130 if self.check_resource(metric_info['metric_create']['resource_uuid']) == True:
131 update_resp = self.update_metric_request(metric_info['metric_create'])
132 metric_response['schema_version'] = metric_info['schema_version']
133 metric_response['schema_type'] = "update_metric_response"
134 metric_response['metric_update_response'] = update_resp
135 payload = json.dumps(metric_response)
136 print payload
137 file = open('../../core/models/update_metric_resp.json','wb').write((payload))
138 self.producer.update_metric_response(key='update_metric_response',message=payload,topic = 'metric_response')
139
140 log.info("Metric Updates: %s",metric_response)
141 return metric_response
142
143 elif message.key == "delete_metric_request":
144 if self.check_resource(metric_info['resource_uuid']) == True:
145 del_resp=self.delete_metric_request(metric_info)
146 payload = json.dumps(del_resp)
147 file = open('../../core/models/delete_metric_resp.json','wb').write((payload))
148 self.producer.delete_metric_response(key='delete_metric_response',message=payload,topic = 'metric_response')
149
150 log.info("Metric Deletion Not supported in AWS : %s",del_resp)
151 return del_resp
152
153 elif message.key == "list_metric_request":
154 if self.check_resource(metric_info['metrics_list_request']['resource_uuid']) == True:
155 list_resp = self.list_metrics_request(metric_info['metrics_list_request'])
156 metric_response['schema_version'] = metric_info['schema_version']
157 metric_response['schema_type'] = "list_metric_response"
158 metric_response['correlation_id'] = metric_info['metrics_list_request']['correlation_id']
159 metric_response['vim_type'] = metric_info['vim_type']
160 metric_response['metrics_list'] = list_resp
161 payload = json.dumps(metric_response)
162 file = open('../../core/models/list_metric_resp.json','wb').write((payload))
163 self.producer.list_metric_response(key='list_metrics_response',message=payload,topic = 'metric_response')
164
165 log.info("Metric List: %s",metric_response)
166 return metric_response
167
168 elif message.key == "read_metric_data_request":
169 if self.check_resource(metric_info['resource_uuid']) == True:
170 data_resp = self.read_metrics_data(metric_info)
171 metric_response['schema_version'] = metric_info['schema_version']
172 metric_response['schema_type'] = "read_metric_data_response"
173 metric_response['metric_name'] = metric_info['metric_name']
174 metric_response['metric_uuid'] = metric_info['metric_uuid']
175 metric_response['correlation_id'] = metric_info['correlation_uuid']
176 metric_response['resource_uuid'] = metric_info['resource_uuid']
177 metric_response['tenant_uuid'] = metric_info['tenant_uuid']
178 metric_response['metrics_data'] = data_resp
179 payload = json.dumps(metric_response)
180 file = open('../../core/models/read_metric_data_resp.json','wb').write((payload))
181 self.producer.read_metric_data_response(key='read_metric_data_response',message=payload,topic = 'metric_response')
182
183 log.info("Metric Data Response: %s",metric_response)
184 return metric_response
185
186 else:
187 log.debug("Unknown key, no action will be performed")
188 else:
189 log.info("Message topic not relevant to this plugin: %s",
190 message.topic)
191 else:
192 print "Bad VIM Request"
193 except Exception as e:
194 log.error("Consumer exception: %s", str(e))
195
196 #---------------------------------------------------------------------------------------------------------------------------
197 def check_resource(self,resource_uuid):
198
199 '''Checking the resource_uuid is present in EC2 instances'''
200 try:
201 check_resp = dict()
202 instances = self.ec2_conn.get_all_instance_status()
203 status_resource = False
204
205 #resource_id
206 for instance_id in instances:
207 instance_id = str(instance_id).split(':')[1]
208 if instance_id == resource_uuid:
209 check_resp['resource_uuid'] = resource_uuid
210 status_resource = True
211 else:
212 status_resource = False
213
214 #status
215 return status_resource
216
217 except Exception as e:
218 log.error("Error in Plugin Inputs %s",str(e))
219 #---------------------------------------------------------------------------------------------------------------------------
220
221 obj = plugin_metrics()
222 obj.connection()
223 obj.consumer()