cb04a656089790c245037bf2e3303b2405e85b9d
[osm/MON.git] / plugins / CloudWatch / plugin_metrics.py
1 ##
2 # Copyright 2017 xFlow Research Pvt. Ltd
3 # This file is part of MON module
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact with: wajeeha.hamid@xflowresearch.com
20 ##
21
22 '''
23 AWS-Plugin implements all the methods of MON to interact with AWS using the BOTO client
24 '''
25
26 __author__ = "Wajeeha Hamid"
27 __date__ = "18-September-2017"
28
29 import sys
30 import json
31 from connection import Connection
32 from metric_alarms import MetricAlarm
33 from metrics import Metrics
34 # Need to import the producer message bus,not working yet
35 #from core.message_bus.producerfunct import KafkaProducer
36 sys.path.append("../../core/message-bus")
37 from producer import KafkaProducer
38 from kafka import KafkaConsumer
39 import logging as log
40
41 class plugin_metrics():
42 """Receives Alarm info from MetricAlarm and connects with the consumer/producer """
43 def __init__ (self):
44 self.conn = Connection()
45 self.metric = Metrics()
46
47 #server = {'server': 'localhost:9092', 'topic': 'metrics_request'}
48 #Initialize a Consumer object to consume message from the SO
49 self._consumer = KafkaConsumer(bootstrap_servers='localhost:9092')
50 self._consumer.subscribe(['metric_request'])
51
52 #producer = KafkaProducer('create_metric_request')
53
54 self.producer = KafkaProducer('')
55 #---------------------------------------------------------------------------------------------------------------------------
56 def connection(self):
57 try:
58 """Connecting instances with CloudWatch"""
59 self.conn.setEnvironment()
60 self.conn = self.conn.connection_instance()
61 self.cloudwatch_conn = self.conn['cloudwatch_connection']
62 self.ec2_conn = self.conn['ec2_connection']
63
64 except Exception as e:
65 log.error("Failed to Connect with AWS %s: " + str(e))
66 #---------------------------------------------------------------------------------------------------------------------------
67 def create_metric_request(self,metric_info):
68 '''Comaptible API using normalized parameters'''
69 metric_resp = self.metric.createMetrics(self.cloudwatch_conn,metric_info)
70 return metric_resp
71 #---------------------------------------------------------------------------------------------------------------------------
72 def update_metric_request(self,updated_info):
73 '''Comaptible API using normalized parameters'''
74 update_resp = self.metric.updateMetrics(self.cloudwatch_conn,updated_info)
75 return update_resp
76 #---------------------------------------------------------------------------------------------------------------------------
77 def delete_metric_request(self,delete_info):
78 '''Comaptible API using normalized parameters'''
79 del_resp = self.metric.deleteMetrics(self.cloudwatch_conn,delete_info)
80 return del_resp
81 #---------------------------------------------------------------------------------------------------------------------------
82 def list_metrics_request(self,list_info):
83 '''Comaptible API using normalized parameters'''
84 list_resp = self.metric.listMetrics(self.cloudwatch_conn,list_info)
85 return list_resp
86 #---------------------------------------------------------------------------------------------------------------------------
87 def read_metrics_data(self,list_info):
88 '''Comaptible API using normalized parameters
89 Read all metric data related to a specified metric'''
90 data_resp=self.metric.metricsData(self.cloudwatch_conn,list_info)
91 return data_resp
92 #---------------------------------------------------------------------------------------------------------------------------
93
94 def consumer(self):
95 '''Consumer will consume the message from SO,
96 1) parse the message and trigger the methods ac
97 cording to keys and topics provided in request.
98
99 2) The response from plugin is saved in json format.
100
101 3) The producer object then calls the producer response
102 methods to send the response back to message bus
103 '''
104
105 try:
106 for message in self._consumer:
107
108 metric_info = json.loads(message.value)
109 metric_response = dict()
110
111 if metric_info['vim_type'] == 'AWS':
112 log.debug ("VIM support : AWS")
113
114 # Check the Functionlity that needs to be performed: topic = 'alarms'/'metrics'/'Access_Credentials'
115 if message.topic == "metric_request":
116 log.info("Action required against: %s" % (message.topic))
117
118 if message.key == "create_metric_request":
119 if self.check_resource(metric_info['metric_create']['resource_uuid']) == True:
120 metric_resp = self.create_metric_request(metric_info['metric_create']) #alarm_info = message.value
121 metric_response['schema_version'] = metric_info['schema_version']
122 metric_response['schema_type'] = "create_metric_response"
123 metric_response['metric_create_response'] = metric_resp
124 payload = json.dumps(metric_response)
125 file = open('../../core/models/create_metric_resp.json','wb').write((payload))
126 self.producer.create_metrics_resp(key='create_metric_response',message=payload,topic = 'metric_response')
127
128 log.info("Metric configured: %s", metric_resp)
129 return metric_response
130
131 elif message.key == "update_metric_request":
132 if self.check_resource(metric_info['metric_create']['resource_uuid']) == True:
133 update_resp = self.update_metric_request(metric_info['metric_create'])
134 metric_response['schema_version'] = metric_info['schema_version']
135 metric_response['schema_type'] = "update_metric_response"
136 metric_response['metric_update_response'] = update_resp
137 payload = json.dumps(metric_response)
138 file = open('../../core/models/update_metric_resp.json','wb').write((payload))
139 self.producer.create_metrics_resp(key='update_metric_response',message=payload,topic = 'metric_response')
140
141 log.info("Metric Updates: %s",metric_response)
142 return metric_response
143
144 elif message.key == "delete_metric_request":
145 if self.check_resource(metric_info['resource_uuid']) == True:
146 del_resp=self.delete_metric_request(metric_info)
147 payload = json.dumps(del_resp)
148 file = open('../../core/models/delete_metric_resp.json','wb').write((payload))
149 self.producer.create_metrics_resp(key='delete_metric_response',message=payload,topic = 'metric_response')
150
151 log.info("Metric Deletion Not supported in AWS : %s",del_resp)
152 return del_resp
153
154 elif message.key == "list_metric_request":
155 if self.check_resource(metric_info['metrics_list_request']['resource_uuid']) == True:
156 list_resp = self.list_metrics_request(metric_info['metrics_list_request'])
157 metric_response['schema_version'] = metric_info['schema_version']
158 metric_response['schema_type'] = "list_metric_response"
159 metric_response['correlation_id'] = metric_info['metrics_list_request']['correlation_id']
160 metric_response['vim_type'] = metric_info['vim_type']
161 metric_response['metrics_list'] = list_resp
162 payload = json.dumps(metric_response)
163 file = open('../../core/models/list_metric_resp.json','wb').write((payload))
164 self.producer.create_metrics_resp(key='list_metrics_response',message=payload,topic = 'metric_response')
165
166 log.info("Metric List: %s",metric_response)
167 return metric_response
168
169 elif message.key == "read_metric_data_request":
170 if self.check_resource(metric_info['resource_uuid']) == True:
171 data_resp = self.read_metrics_data(metric_info)
172 metric_response['schema_version'] = metric_info['schema_version']
173 metric_response['schema_type'] = "list_metric_response"
174 metric_response['metric_name'] = metric_info['metric_name']
175 metric_response['metric_uuid'] = metric_info['metric_uuid']
176 metric_response['correlation_id'] = metric_info['correlation_uuid']
177 metric_response['resource_uuid'] = metric_info['resource_uuid']
178 metric_response['tenant_uuid'] = metric_info['tenant_uuid']
179 metric_response['metrics_data'] = data_resp
180 payload = json.dumps(metric_response)
181
182 file = open('../../core/models/read_metric_data_resp.json','wb').write((payload))
183 self.producer.create_metrics_resp(key='read_metric_data_response',message=payload,topic = 'metric_response')
184 log.info("Metric Data Response: %s",metric_response)
185 return metric_response
186
187 else:
188 log.debug("Unknown key, no action will be performed")
189 else:
190 log.info("Message topic not relevant to this plugin: %s",
191 message.topic)
192 else:
193 print "Bad VIM Request"
194 except Exception as e:
195 log.error("Consumer exception: %s", str(e))
196
197 #---------------------------------------------------------------------------------------------------------------------------
198 def check_resource(self,resource_uuid):
199
200 '''Checking the resource_uuid is present in EC2 instances'''
201 try:
202 check_resp = dict()
203 instances = self.ec2_conn.get_all_instance_status()
204 status_resource = False
205
206 #resource_id
207 for instance_id in instances:
208 instance_id = str(instance_id).split(':')[1]
209 if instance_id == resource_uuid:
210 check_resp['resource_uuid'] = resource_uuid
211 status_resource = True
212 else:
213 status_resource = False
214
215 #status
216 return status_resource
217
218 except Exception as e:
219 log.error("Error in Plugin Inputs %s",str(e))
220 #---------------------------------------------------------------------------------------------------------------------------
221
222 obj = plugin_metrics()
223 obj.connection()
224 obj.consumer()