Merge "Updated kafka version"
[osm/MON.git] / osm_mon / plugins / CloudWatch / plugin_metric.py
1 ##
2 # Copyright 2017 xFlow Research Pvt. Ltd
3 # This file is part of MON module
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact with: wajeeha.hamid@xflowresearch.com
20 ##
21
22 '''
23 AWS-Plugin implements all the methods of MON to interact with AWS using the BOTO client
24 '''
25
26 __author__ = "Wajeeha Hamid"
27 __date__ = "18-September-2017"
28
29 import sys
30 import json
31 from connection import Connection
32 from metric_alarms import MetricAlarm
33 from metrics import Metrics
34 sys.path.append("../../core/message_bus")
35 from producer import KafkaProducer
36 import logging as log
37
38 class plugin_metrics():
39 """Receives Alarm info from MetricAlarm and connects with the consumer/producer """
40 def __init__ (self):
41 self.conn = Connection()
42 self.metric = Metrics()
43 self.producer = KafkaProducer('')
44 self.connection()
45 #---------------------------------------------------------------------------------------------------------------------------
46 def connection(self):
47 try:
48 """Connecting instances with CloudWatch"""
49 self.conn.setEnvironment()
50 self.conn = self.conn.connection_instance()
51 self.cloudwatch_conn = self.conn['cloudwatch_connection']
52 self.ec2_conn = self.conn['ec2_connection']
53
54 except Exception as e:
55 log.error("Failed to Connect with AWS %s: " + str(e))
56 #---------------------------------------------------------------------------------------------------------------------------
57 def create_metric_request(self,metric_info):
58 '''Comaptible API using normalized parameters'''
59 metric_resp = self.metric.createMetrics(self.cloudwatch_conn,metric_info)
60 return metric_resp
61 #---------------------------------------------------------------------------------------------------------------------------
62 def update_metric_request(self,updated_info):
63 '''Comaptible API using normalized parameters'''
64 update_resp = self.metric.updateMetrics(self.cloudwatch_conn,updated_info)
65 return update_resp
66 #---------------------------------------------------------------------------------------------------------------------------
67 def delete_metric_request(self,delete_info):
68 '''Comaptible API using normalized parameters'''
69 del_resp = self.metric.deleteMetrics(self.cloudwatch_conn,delete_info)
70 return del_resp
71 #---------------------------------------------------------------------------------------------------------------------------
72 def list_metrics_request(self,list_info):
73 '''Comaptible API using normalized parameters'''
74 list_resp = self.metric.listMetrics(self.cloudwatch_conn,list_info)
75 return list_resp
76 #---------------------------------------------------------------------------------------------------------------------------
77 def read_metrics_data(self,list_info):
78 '''Comaptible API using normalized parameters
79 Read all metric data related to a specified metric'''
80 data_resp=self.metric.metricsData(self.cloudwatch_conn,list_info)
81 return data_resp
82 #---------------------------------------------------------------------------------------------------------------------------
83
84 def metric_calls(self,message):
85 '''Consumer will consume the message from SO,
86 1) parse the message and trigger the methods ac
87 cording to keys and topics provided in request.
88
89 2) The response from plugin is saved in json format.
90
91 3) The producer object then calls the producer response
92 methods to send the response back to message bus
93 '''
94
95 try:
96 metric_info = json.loads(message.value)
97 metric_response = dict()
98
99 if metric_info['vim_type'] == 'AWS':
100 log.debug ("VIM support : AWS")
101
102 # Check the Functionlity that needs to be performed: topic = 'alarms'/'metrics'/'Access_Credentials'
103 if message.topic == "metric_request":
104 log.info("Action required against: %s" % (message.topic))
105
106 if message.key == "create_metric_request":
107 if self.check_resource(metric_info['metric_create']['resource_uuid']) == True:
108 metric_resp = self.create_metric_request(metric_info['metric_create']) #alarm_info = message.value
109 metric_response['schema_version'] = metric_info['schema_version']
110 metric_response['schema_type'] = "create_metric_response"
111 metric_response['metric_create_response'] = metric_resp
112 payload = json.dumps(metric_response)
113 file = open('../../core/models/create_metric_resp.json','wb').write((payload))
114 self.producer.create_metrics_resp(key='create_metric_response',message=payload,topic = 'metric_response')
115
116 log.info("Metric configured: %s", metric_resp)
117 return metric_response
118
119 elif message.key == "update_metric_request":
120 if self.check_resource(metric_info['metric_create']['resource_uuid']) == True:
121 update_resp = self.update_metric_request(metric_info['metric_create'])
122 metric_response['schema_version'] = metric_info['schema_version']
123 metric_response['schema_type'] = "update_metric_response"
124 metric_response['metric_update_response'] = update_resp
125 payload = json.dumps(metric_response)
126 file = open('../../core/models/update_metric_resp.json','wb').write((payload))
127 self.producer.update_metric_response(key='update_metric_response',message=payload,topic = 'metric_response')
128
129 log.info("Metric Updates: %s",metric_response)
130 return metric_response
131
132 elif message.key == "delete_metric_request":
133 if self.check_resource(metric_info['resource_uuid']) == True:
134 del_resp=self.delete_metric_request(metric_info)
135 payload = json.dumps(del_resp)
136 file = open('../../core/models/delete_metric_resp.json','wb').write((payload))
137 self.producer.delete_metric_response(key='delete_metric_response',message=payload,topic = 'metric_response')
138
139 log.info("Metric Deletion Not supported in AWS : %s",del_resp)
140 return del_resp
141
142 elif message.key == "list_metric_request":
143 if self.check_resource(metric_info['metrics_list_request']['resource_uuid']) == True:
144 list_resp = self.list_metrics_request(metric_info['metrics_list_request'])
145 metric_response['schema_version'] = metric_info['schema_version']
146 metric_response['schema_type'] = "list_metric_response"
147 metric_response['correlation_id'] = metric_info['metrics_list_request']['correlation_id']
148 metric_response['vim_type'] = metric_info['vim_type']
149 metric_response['metrics_list'] = list_resp
150 payload = json.dumps(metric_response)
151 file = open('../../core/models/list_metric_resp.json','wb').write((payload))
152 self.producer.list_metric_response(key='list_metrics_response',message=payload,topic = 'metric_response')
153
154 log.info("Metric List: %s",metric_response)
155 return metric_response
156
157 elif message.key == "read_metric_data_request":
158 if self.check_resource(metric_info['resource_uuid']) == True:
159 data_resp = self.read_metrics_data(metric_info)
160 metric_response['schema_version'] = metric_info['schema_version']
161 metric_response['schema_type'] = "read_metric_data_response"
162 metric_response['metric_name'] = metric_info['metric_name']
163 metric_response['metric_uuid'] = metric_info['metric_uuid']
164 metric_response['correlation_id'] = metric_info['correlation_uuid']
165 metric_response['resource_uuid'] = metric_info['resource_uuid']
166 metric_response['tenant_uuid'] = metric_info['tenant_uuid']
167 metric_response['metrics_data'] = data_resp
168 payload = json.dumps(metric_response)
169 file = open('../../core/models/read_metric_data_resp.json','wb').write((payload))
170 self.producer.read_metric_data_response(key='read_metric_data_response',message=payload,topic = 'metric_response')
171
172 log.info("Metric Data Response: %s",metric_response)
173 return metric_response
174
175 else:
176 log.debug("Unknown key, no action will be performed")
177 else:
178 log.info("Message topic not relevant to this plugin: %s",
179 message.topic)
180
181 except Exception as e:
182 log.error("Consumer exception: %s", str(e))
183
184 #---------------------------------------------------------------------------------------------------------------------------
185 def check_resource(self,resource_uuid):
186
187 '''Checking the resource_uuid is present in EC2 instances'''
188 try:
189 check_resp = dict()
190 instances = self.ec2_conn.get_all_instance_status()
191 status_resource = False
192
193 #resource_id
194 for instance_id in instances:
195 instance_id = str(instance_id).split(':')[1]
196 if instance_id == resource_uuid:
197 check_resp['resource_uuid'] = resource_uuid
198 status_resource = True
199 else:
200 status_resource = False
201
202 #status
203 return status_resource
204
205 except Exception as e:
206 log.error("Error in Plugin Inputs %s",str(e))
207 #---------------------------------------------------------------------------------------------------------------------------
208