Added a Common KafkaConsumer for all of the plugins
[osm/MON.git] / plugins / OpenStack / Gnocchi / metrics.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
6
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10
11 # http://www.apache.org/licenses/LICENSE-2.0
12
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
21 ##
22 """Carry out OpenStack metric requests via Gnocchi API."""
23
24 import datetime
25 import json
26 import logging
27
28 import time
29
30 from core.message_bus.producer import KafkaProducer
31
32 from plugins.OpenStack.response import OpenStack_Response
33 from plugins.OpenStack.settings import Config
34
35 __author__ = "Helena McGough"
36
37 log = logging.getLogger(__name__)
38
39 METRIC_MAPPINGS = {
40 "average_memory_utilization": "memory.percent",
41 "disk_read_ops": "disk.disk_ops",
42 "disk_write_ops": "disk.disk_ops",
43 "disk_read_bytes": "disk.disk_octets",
44 "disk_write_bytes": "disk.disk_octets",
45 "packets_dropped": "interface.if_dropped",
46 "packets_received": "interface.if_packets",
47 "packets_sent": "interface.if_packets",
48 "cpu_utilization": "cpu.percent",
49 }
50
51 PERIOD_MS = {
52 "HR": 3600000,
53 "DAY": 86400000,
54 "WEEK": 604800000,
55 "MONTH": 2629746000,
56 "YEAR": 31556952000
57 }
58
59
60 class Metrics(object):
61 """OpenStack metric requests performed via the Gnocchi API."""
62
63 def __init__(self):
64 """Initialize the metric actions."""
65 # Configure an instance of the OpenStack metric plugin
66 config = Config.instance()
67 config.read_environ("gnocchi")
68
69 # Initialise authentication for API requests
70 self.auth_token = None
71 self.endpoint = None
72 self._common = None
73
74 # Use the Response class to generate valid json response messages
75 self._response = OpenStack_Response()
76
77 # Initializer a producer to send responses back to SO
78 self._producer = KafkaProducer("metric_response")
79
80 def metric_calls(self, message, common, auth_token):
81 """Consume info from the message bus to manage metric requests."""
82 values = json.loads(message.value)
83 self._common = common
84 log.info("OpenStack metric action required.")
85
86 # Generate and auth_token and endpoint for request
87 if auth_token is not None:
88 if self.auth_token != auth_token:
89 log.info("Auth_token for metrics set by access_credentials.")
90 self.auth_token = auth_token
91 else:
92 log.info("Auth_token has not been updated.")
93 else:
94 log.info("Using environment variables to set Gnocchi auth_token.")
95 self.auth_token = self._common._authenticate()
96
97 if self.endpoint is None:
98 log.info("Generating a new endpoint for Gnocchi.")
99 self.endpoint = self._common.get_endpoint("metric")
100
101 if message.key == "create_metric_request":
102 # Configure metric
103 metric_details = values['metric_create']
104 metric_id, resource_id, status = self.configure_metric(
105 self.endpoint, self.auth_token, metric_details)
106
107 # Generate and send a create metric response
108 try:
109 resp_message = self._response.generate_response(
110 'create_metric_response', status=status,
111 cor_id=values['correlation_id'],
112 metric_id=metric_id, r_id=resource_id)
113 log.info("Response messages: %s", resp_message)
114 self._producer.create_metrics_resp(
115 'create_metric_response', resp_message,
116 'metric_response')
117 except Exception as exc:
118 log.warn("Failed to create response: %s", exc)
119
120 elif message.key == "read_metric_data_request":
121 # Read all metric data related to a specified metric
122 timestamps, metric_data = self.read_metric_data(
123 self.endpoint, self.auth_token, values)
124
125 # Generate and send a response message
126 try:
127 resp_message = self._response.generate_response(
128 'read_metric_data_response',
129 m_id=values['metric_uuid'],
130 m_name=values['metric_name'],
131 r_id=values['resource_uuid'],
132 cor_id=values['correlation_id'],
133 times=timestamps, metrics=metric_data)
134 log.info("Response message: %s", resp_message)
135 self._producer.read_metric_data_response(
136 'read_metric_data_response', resp_message,
137 'metric_response')
138 except Exception as exc:
139 log.warn("Failed to send read metric response:%s", exc)
140
141 elif message.key == "delete_metric_request":
142 # delete the specified metric in the request
143 metric_id = values['metric_uuid']
144 status = self.delete_metric(
145 self.endpoint, self.auth_token, metric_id)
146
147 # Generate and send a response message
148 try:
149 resp_message = self._response.generate_response(
150 'delete_metric_response', m_id=metric_id,
151 m_name=values['metric_name'],
152 status=status, r_id=values['resource_uuid'],
153 cor_id=values['correlation_id'])
154 log.info("Response message: %s", resp_message)
155 self._producer.delete_metric_response(
156 'delete_metric_response', resp_message,
157 'metric_response')
158 except Exception as exc:
159 log.warn("Failed to send delete response:%s", exc)
160
161 elif message.key == "update_metric_request":
162 # Gnocchi doesn't support configuration updates
163 # Log and send a response back to this effect
164 log.warn("Gnocchi doesn't support metric configuration\
165 updates.")
166 req_details = values['metric_create']
167 metric_name = req_details['metric_name']
168 resource_id = req_details['resource_uuid']
169 metric_id = self.get_metric_id(
170 self.endpoint, self.auth_token, metric_name, resource_id)
171
172 # Generate and send a response message
173 try:
174 resp_message = self._response.generate_response(
175 'update_metric_response', status=False,
176 cor_id=values['correlation_id'],
177 r_id=resource_id, m_id=metric_id)
178 log.info("Response message: %s", resp_message)
179 self._producer.update_metric_response(
180 'update_metric_response', resp_message,
181 'metric_response')
182 except Exception as exc:
183 log.warn("Failed to send an update response:%s", exc)
184
185 elif message.key == "list_metric_request":
186 list_details = values['metrics_list_request']
187
188 metric_list = self.list_metrics(
189 self.endpoint, self.auth_token, list_details)
190
191 # Generate and send a response message
192 try:
193 resp_message = self._response.generate_response(
194 'list_metric_response', m_list=metric_list,
195 cor_id=list_details['correlation_id'])
196 log.info("Response message: %s", resp_message)
197 self._producer.list_metric_response(
198 'list_metric_response', resp_message,
199 'metric_response')
200 except Exception as exc:
201 log.warn("Failed to send a list response:%s", exc)
202
203 else:
204 log.warn("Unknown key, no action will be performed.")
205
206 return
207
208 def configure_metric(self, endpoint, auth_token, values):
209 """Create the new metric in Gnocchi."""
210 try:
211 resource_id = values['resource_uuid']
212 except KeyError:
213 log.warn("Resource is not defined correctly.")
214 return None, None, False
215
216 # Check/Normalize metric name
217 metric_name, norm_name = self.get_metric_name(values)
218 if norm_name is None:
219 log.warn("This metric is not supported by this plugin.")
220 return None, resource_id, False
221
222 # Check for an existing metric for this resource
223 metric_id = self.get_metric_id(
224 endpoint, auth_token, metric_name, resource_id)
225
226 if metric_id is None:
227 # Try appending metric to existing resource
228 try:
229 base_url = "{}/v1/resource/generic/%s/metric"
230 res_url = base_url.format(endpoint) % resource_id
231 payload = {metric_name: {'archive_policy_name': 'high',
232 'unit': values['metric_unit']}}
233 result = self._common._perform_request(
234 res_url, auth_token, req_type="post",
235 payload=json.dumps(payload))
236 # Get id of newly created metric
237 for row in json.loads(result.text):
238 if row['name'] == metric_name:
239 metric_id = row['id']
240 log.info("Appended metric to existing resource.")
241
242 return metric_id, resource_id, True
243 except Exception as exc:
244 # Gnocchi version of resource does not exist creating a new one
245 log.info("Failed to append metric to existing resource:%s",
246 exc)
247 try:
248 url = "{}/v1/resource/generic".format(endpoint)
249 metric = {'name': metric_name,
250 'archive_policy_name': 'high',
251 'unit': values['metric_unit'], }
252
253 resource_payload = json.dumps({'id': resource_id,
254 'metrics': {
255 metric_name: metric}})
256
257 resource = self._common._perform_request(
258 url, auth_token, req_type="post",
259 payload=resource_payload)
260
261 # Return the newly created resource_id for creating alarms
262 new_resource_id = json.loads(resource.text)['id']
263 log.info("Created new resource for metric: %s",
264 new_resource_id)
265
266 metric_id = self.get_metric_id(
267 endpoint, auth_token, metric_name, new_resource_id)
268
269 return metric_id, new_resource_id, True
270 except Exception as exc:
271 log.warn("Failed to create a new resource:%s", exc)
272 return None, None, False
273
274 else:
275 log.info("This metric already exists for this resource.")
276
277 return metric_id, resource_id, False
278
279 def delete_metric(self, endpoint, auth_token, metric_id):
280 """Delete metric."""
281 url = "{}/v1/metric/%s".format(endpoint) % (metric_id)
282
283 try:
284 result = self._common._perform_request(
285 url, auth_token, req_type="delete")
286 if str(result.status_code) == "404":
287 log.warn("Failed to delete the metric.")
288 return False
289 else:
290 return True
291 except Exception as exc:
292 log.warn("Failed to carry out delete metric request:%s", exc)
293 return False
294
295 def list_metrics(self, endpoint, auth_token, values):
296 """List all metrics."""
297 url = "{}/v1/metric/".format(endpoint)
298
299 # Check for a specified list
300 try:
301 # Check if the metric_name was specified for the list
302 metric_name = values['metric_name'].lower()
303 if metric_name not in METRIC_MAPPINGS.keys():
304 log.warn("This metric is not supported, won't be listed.")
305 metric_name = None
306 except KeyError as exc:
307 log.info("Metric name is not specified: %s", exc)
308 metric_name = None
309
310 try:
311 resource = values['resource_uuid']
312 except KeyError as exc:
313 log.info("Resource is not specified:%s", exc)
314 resource = None
315
316 try:
317 result = self._common._perform_request(
318 url, auth_token, req_type="get")
319 metrics = json.loads(result.text)
320
321 if metrics is not None:
322 # Format the list response
323 if metric_name is not None and resource is not None:
324 metric_list = self.response_list(
325 metrics, metric_name=metric_name, resource=resource)
326 log.info("Returning an %s resource list for %s metrics",
327 metric_name, resource)
328 elif metric_name is not None:
329 metric_list = self.response_list(
330 metrics, metric_name=metric_name)
331 log.info("Returning a list of %s metrics", metric_name)
332 elif resource is not None:
333 metric_list = self.response_list(
334 metrics, resource=resource)
335 log.info("Return a list of %s resource metrics", resource)
336 else:
337 metric_list = self.response_list(metrics)
338 log.info("Returning a complete list of metrics")
339
340 return metric_list
341 else:
342 log.info("There are no metrics available")
343 return []
344 except Exception as exc:
345 log.warn("Failed to generate any metric list. %s", exc)
346 return None
347
348 def get_metric_id(self, endpoint, auth_token, metric_name, resource_id):
349 """Check if the desired metric already exists for the resource."""
350 url = "{}/v1/resource/generic/%s".format(endpoint) % resource_id
351
352 try:
353 # Try return the metric id if it exists
354 result = self._common._perform_request(
355 url, auth_token, req_type="get")
356 return json.loads(result.text)['metrics'][metric_name]
357 except Exception:
358 log.info("Metric doesn't exist. No metric_id available")
359 return None
360
361 def get_metric_name(self, values):
362 """Check metric name configuration and normalize."""
363 try:
364 # Normalize metric name
365 metric_name = values['metric_name'].lower()
366 return metric_name, METRIC_MAPPINGS[metric_name]
367 except KeyError:
368 log.info("Metric name %s is invalid.", metric_name)
369 return metric_name, None
370
371 def read_metric_data(self, endpoint, auth_token, values):
372 """Collectd metric measures over a specified time period."""
373 timestamps = []
374 data = []
375 try:
376 # Try and collect measures
377 metric_id = values['metric_uuid']
378 collection_unit = values['collection_unit'].upper()
379 collection_period = values['collection_period']
380
381 # Define the start and end time based on configurations
382 stop_time = time.strftime("%Y-%m-%d") + "T" + time.strftime("%X")
383 end_time = int(round(time.time() * 1000))
384 if collection_unit == 'YEAR':
385 diff = PERIOD_MS[collection_unit]
386 else:
387 diff = collection_period * PERIOD_MS[collection_unit]
388 s_time = (end_time - diff) / 1000.0
389 start_time = datetime.datetime.fromtimestamp(s_time).strftime(
390 '%Y-%m-%dT%H:%M:%S.%f')
391 base_url = "{}/v1/metric/%(0)s/measures?start=%(1)s&stop=%(2)s"
392 url = base_url.format(endpoint) % {
393 "0": metric_id, "1": start_time, "2": stop_time}
394
395 # Perform metric data request
396 metric_data = self._common._perform_request(
397 url, auth_token, req_type="get")
398
399 # Generate a list of the requested timestamps and data
400 for r in json.loads(metric_data.text):
401 timestamp = r[0].replace("T", " ")
402 timestamps.append(timestamp)
403 data.append(r[2])
404
405 return timestamps, data
406 except Exception as exc:
407 log.warn("Failed to gather specified measures: %s", exc)
408 return timestamps, data
409
410 def response_list(self, metric_list, metric_name=None, resource=None):
411 """Create the appropriate lists for a list response."""
412 resp_list, name_list, res_list = [], [], []
413
414 # Create required lists
415 for row in metric_list:
416 # Only list OSM metrics
417 if row['name'] in METRIC_MAPPINGS.keys():
418 metric = {"metric_name": row['name'],
419 "metric_uuid": row['id'],
420 "metric_unit": row['unit'],
421 "resource_uuid": row['resource_id']}
422 resp_list.append(str(metric))
423 # Generate metric_name specific list
424 if metric_name is not None:
425 if row['name'] == metric_name:
426 metric = {"metric_name": row['name'],
427 "metric_uuid": row['id'],
428 "metric_unit": row['unit'],
429 "resource_uuid": row['resource_id']}
430 name_list.append(str(metric))
431 # Generate resource specific list
432 if resource is not None:
433 if row['resource_id'] == resource:
434 metric = {"metric_name": row['name'],
435 "metric_uuid": row['id'],
436 "metric_unit": row['unit'],
437 "resource_uuid": row['resource_id']}
438 res_list.append(str(metric))
439
440 # Join required lists
441 if metric_name is not None and resource is not None:
442 return list(set(res_list).intersection(name_list))
443 elif metric_name is not None:
444 return name_list
445 elif resource is not None:
446 return list(set(res_list).intersection(resp_list))
447 else:
448 return resp_list