Optimization of "list_metrics" function
[osm/MON.git] / osm_mon / plugins / OpenStack / Gnocchi / metrics.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
6
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10
11 # http://www.apache.org/licenses/LICENSE-2.0
12
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
21 ##
22 """Carry out OpenStack metric requests via Gnocchi API."""
23
24 import datetime
25 import json
26 import logging
27 import time
28
29 import six
30 import yaml
31
32 from osm_mon.core.message_bus.producer import KafkaProducer
33 from osm_mon.core.settings import Config
34 from osm_mon.plugins.OpenStack.common import Common
35 from osm_mon.plugins.OpenStack.response import OpenStack_Response
36
37 log = logging.getLogger(__name__)
38
39 METRIC_MAPPINGS = {
40 "average_memory_utilization": "memory.usage",
41 "disk_read_ops": "disk.read.requests",
42 "disk_write_ops": "disk.write.requests",
43 "disk_read_bytes": "disk.read.bytes",
44 "disk_write_bytes": "disk.write.bytes",
45 "packets_dropped": "interface.if_dropped",
46 "packets_received": "interface.if_packets",
47 "packets_sent": "interface.if_packets",
48 "cpu_utilization": "cpu_util",
49 }
50
51 PERIOD_MS = {
52 "HR": 3600000,
53 "DAY": 86400000,
54 "WEEK": 604800000,
55 "MONTH": 2629746000,
56 "YEAR": 31556952000
57 }
58
59
60 class Metrics(object):
61 """OpenStack metric requests performed via the Gnocchi API."""
62
63 def __init__(self):
64 """Initialize the metric actions."""
65 # Configure an instance of the OpenStack metric plugin
66 config = Config.instance()
67 config.read_environ()
68
69 # Initialise authentication for API requests
70 self._common = Common()
71
72 # Use the Response class to generate valid json response messages
73 self._response = OpenStack_Response()
74
75 # Initializer a producer to send responses back to SO
76 self._producer = KafkaProducer("metric_response")
77
78 def metric_calls(self, message, vim_uuid):
79 """Consume info from the message bus to manage metric requests."""
80 try:
81 values = json.loads(message.value)
82 except ValueError:
83 values = yaml.safe_load(message.value)
84 log.info("OpenStack metric action required.")
85
86 auth_token = Common.get_auth_token(vim_uuid)
87
88 endpoint = Common.get_endpoint("metric", vim_uuid)
89
90 if 'metric_name' in values and values['metric_name'] not in METRIC_MAPPINGS.keys():
91 raise ValueError('Metric ' + values['metric_name'] + ' is not supported.')
92
93 if message.key == "create_metric_request":
94 # Configure metric
95 metric_details = values['metric_create_request']
96 metric_id, resource_id, status = self.configure_metric(
97 endpoint, auth_token, metric_details)
98
99 # Generate and send a create metric response
100 try:
101 resp_message = self._response.generate_response(
102 'create_metric_response', status=status,
103 cor_id=metric_details['correlation_id'],
104 metric_id=metric_id, r_id=resource_id)
105 log.info("Response messages: %s", resp_message)
106 self._producer.create_metrics_resp(
107 'create_metric_response', resp_message)
108 except Exception as exc:
109 log.warning("Failed to create response: %s", exc)
110
111 elif message.key == "read_metric_data_request":
112 # Read all metric data related to a specified metric
113 timestamps, metric_data = self.read_metric_data(
114 endpoint, auth_token, values)
115
116 # Generate and send a response message
117 try:
118
119 metric_id = self.get_metric_id(endpoint, auth_token, METRIC_MAPPINGS[values['metric_name']],
120 values['resource_uuid'])
121 resp_message = self._response.generate_response(
122 'read_metric_data_response',
123 m_id=metric_id,
124 m_name=values['metric_name'],
125 r_id=values['resource_uuid'],
126 cor_id=values['correlation_id'],
127 times=timestamps, metrics=metric_data)
128 log.info("Response message: %s", resp_message)
129 self._producer.read_metric_data_response(
130 'read_metric_data_response', resp_message)
131 except Exception as exc:
132 log.warning("Failed to send read metric response:%s", exc)
133
134 elif message.key == "delete_metric_request":
135 # delete the specified metric in the request
136 metric_id = self.get_metric_id(endpoint, auth_token, METRIC_MAPPINGS[values['metric_name']],
137 values['resource_uuid'])
138 status = self.delete_metric(
139 endpoint, auth_token, metric_id)
140
141 # Generate and send a response message
142 try:
143 resp_message = self._response.generate_response(
144 'delete_metric_response', m_id=metric_id,
145 m_name=values['metric_name'],
146 status=status, r_id=values['resource_uuid'],
147 cor_id=values['correlation_id'])
148 log.info("Response message: %s", resp_message)
149 self._producer.delete_metric_response(
150 'delete_metric_response', resp_message)
151 except Exception as exc:
152 log.warning("Failed to send delete response:%s", exc)
153
154 elif message.key == "update_metric_request":
155 # Gnocchi doesn't support configuration updates
156 # Log and send a response back to this effect
157 log.warning("Gnocchi doesn't support metric configuration\
158 updates.")
159 req_details = values['metric_create_request']
160 metric_name = req_details['metric_name']
161 resource_id = req_details['resource_uuid']
162 metric_id = self.get_metric_id(
163 endpoint, auth_token, metric_name, resource_id)
164
165 # Generate and send a response message
166 try:
167 resp_message = self._response.generate_response(
168 'update_metric_response', status=False,
169 cor_id=req_details['correlation_id'],
170 r_id=resource_id, m_id=metric_id)
171 log.info("Response message: %s", resp_message)
172 self._producer.update_metric_response(
173 'update_metric_response', resp_message)
174 except Exception as exc:
175 log.exception("Failed to send an update response:")
176
177 elif message.key == "list_metric_request":
178 list_details = values['metrics_list_request']
179
180 metric_list = self.list_metrics(
181 endpoint, auth_token, list_details)
182
183 # Generate and send a response message
184 try:
185 resp_message = self._response.generate_response(
186 'list_metric_response', m_list=metric_list,
187 cor_id=list_details['correlation_id'])
188 log.info("Response message: %s", resp_message)
189 self._producer.list_metric_response(
190 'list_metric_response', resp_message)
191 except Exception as exc:
192 log.warning("Failed to send a list response:%s", exc)
193
194 else:
195 log.warning("Unknown key, no action will be performed.")
196
197 return
198
199 def configure_metric(self, endpoint, auth_token, values):
200 """Create the new metric in Gnocchi."""
201 try:
202 resource_id = values['resource_uuid']
203 except KeyError:
204 log.warning("Resource is not defined correctly.")
205 return None, None, False
206
207 # Check/Normalize metric name
208 norm_name, metric_name = self.get_metric_name(values)
209 if metric_name is None:
210 log.warning("This metric is not supported by this plugin.")
211 return None, resource_id, False
212
213 # Check for an existing metric for this resource
214 metric_id = self.get_metric_id(
215 endpoint, auth_token, metric_name, resource_id)
216
217 if metric_id is None:
218 # Try appending metric to existing resource
219 try:
220 base_url = "{}/v1/resource/generic/%s/metric"
221 res_url = base_url.format(endpoint) % resource_id
222 payload = {metric_name: {'archive_policy_name': 'high',
223 'unit': values['metric_unit']}}
224 result = Common.perform_request(
225 res_url, auth_token, req_type="post",
226 payload=json.dumps(payload, sort_keys=True))
227 # Get id of newly created metric
228 for row in json.loads(result.text):
229 if row['name'] == metric_name:
230 metric_id = row['id']
231 log.info("Appended metric to existing resource.")
232
233 return metric_id, resource_id, True
234 except Exception as exc:
235 # Gnocchi version of resource does not exist creating a new one
236 log.info("Failed to append metric to existing resource:%s",
237 exc)
238 try:
239 url = "{}/v1/resource/generic".format(endpoint)
240 metric = {'name': metric_name,
241 'archive_policy_name': 'high',
242 'unit': values['metric_unit'], }
243
244 resource_payload = json.dumps({'id': resource_id,
245 'metrics': {
246 metric_name: metric}}, sort_keys=True)
247
248 resource = Common.perform_request(
249 url, auth_token, req_type="post",
250 payload=resource_payload)
251
252 # Return the newly created resource_id for creating alarms
253 new_resource_id = json.loads(resource.text)['id']
254 log.info("Created new resource for metric: %s",
255 new_resource_id)
256
257 metric_id = self.get_metric_id(
258 endpoint, auth_token, metric_name, new_resource_id)
259
260 return metric_id, new_resource_id, True
261 except Exception as exc:
262 log.warning("Failed to create a new resource:%s", exc)
263 return None, None, False
264
265 else:
266 log.info("This metric already exists for this resource.")
267
268 return metric_id, resource_id, False
269
270 def delete_metric(self, endpoint, auth_token, metric_id):
271 """Delete metric."""
272 url = "{}/v1/metric/%s".format(endpoint) % metric_id
273
274 try:
275 result = Common.perform_request(
276 url, auth_token, req_type="delete")
277 if str(result.status_code) == "404":
278 log.warning("Failed to delete the metric.")
279 return False
280 else:
281 return True
282 except Exception as exc:
283 log.warning("Failed to carry out delete metric request:%s", exc)
284 return False
285
286 def list_metrics(self, endpoint, auth_token, values):
287 """List all metrics."""
288
289 # Check for a specified list
290 try:
291 # Check if the metric_name was specified for the list
292 if values['metric_name']:
293 metric_name = values['metric_name'].lower()
294 if metric_name not in METRIC_MAPPINGS.keys():
295 log.warning("This metric is not supported, won't be listed.")
296 metric_name = None
297 else:
298 metric_name = None
299 except KeyError as exc:
300 log.info("Metric name is not specified: %s", exc)
301 metric_name = None
302
303 try:
304 resource = values['resource_uuid']
305 except KeyError as exc:
306 log.info("Resource is not specified:%s", exc)
307 resource = None
308
309 try:
310 if resource:
311 url = "{}/v1/resource/generic/{}".format(endpoint, resource)
312 result = Common.perform_request(
313 url, auth_token, req_type="get")
314 resource_data = json.loads(result.text)
315 metrics = resource_data['metrics']
316
317 if metric_name:
318 if metrics.get(METRIC_MAPPINGS[metric_name]):
319 metric_id = metrics[METRIC_MAPPINGS[metric_name]]
320 url = "{}/v1/metric/{}".format(endpoint, metric_id)
321 result = Common.perform_request(
322 url, auth_token, req_type="get")
323 metric_list = json.loads(result.text)
324 log.info("Returning an %s resource list for %s metrics",
325 metric_name, resource)
326 return metric_list
327 else:
328 log.info("Metric {} not found for {} resource".format(metric_name, resource))
329 return None
330 else:
331 metric_list = []
332 for k, v in metrics.items():
333 url = "{}/v1/metric/{}".format(endpoint, v)
334 result = Common.perform_request(
335 url, auth_token, req_type="get")
336 metric = json.loads(result.text)
337 metric_list.append(metric)
338 if metric_list:
339 log.info("Return a list of %s resource metrics", resource)
340 return metric_list
341
342 else:
343 log.info("There are no metrics available")
344 return []
345 else:
346 url = "{}/v1/metric?sort=name:asc".format(endpoint)
347 result = Common.perform_request(
348 url, auth_token, req_type="get")
349 metrics = []
350 metrics_partial = json.loads(result.text)
351 for metric in metrics_partial:
352 metrics.append(metric)
353
354 while len(json.loads(result.text)) > 0:
355 last_metric_id = metrics_partial[-1]['id']
356 url = "{}/v1/metric?sort=name:asc&marker={}".format(endpoint, last_metric_id)
357 result = Common.perform_request(
358 url, auth_token, req_type="get")
359 if len(json.loads(result.text)) > 0:
360 metrics_partial = json.loads(result.text)
361 for metric in metrics_partial:
362 metrics.append(metric)
363
364 if metrics is not None:
365 # Format the list response
366 if metric_name is not None:
367 metric_list = self.response_list(
368 metrics, metric_name=metric_name)
369 log.info("Returning a list of %s metrics", metric_name)
370 else:
371 metric_list = self.response_list(metrics)
372 log.info("Returning a complete list of metrics")
373 return metric_list
374 else:
375 log.info("There are no metrics available")
376 return []
377 except Exception as exc:
378 log.warning("Failed to generate any metric list. %s", exc)
379 return None
380
381 def get_metric_id(self, endpoint, auth_token, metric_name, resource_id):
382 """Check if the desired metric already exists for the resource."""
383 url = "{}/v1/resource/generic/%s".format(endpoint) % resource_id
384 try:
385 # Try return the metric id if it exists
386 result = Common.perform_request(
387 url, auth_token, req_type="get")
388 return json.loads(result.text)['metrics'][metric_name]
389 except Exception:
390 log.info("Metric doesn't exist. No metric_id available")
391 return None
392
393 def get_metric_name(self, values):
394 """Check metric name configuration and normalize."""
395 metric_name = None
396 try:
397 # Normalize metric name
398 metric_name = values['metric_name'].lower()
399 return metric_name, METRIC_MAPPINGS[metric_name]
400 except KeyError:
401 log.info("Metric name %s is invalid.", metric_name)
402 return metric_name, None
403
404 def read_metric_data(self, endpoint, auth_token, values):
405 """Collect metric measures over a specified time period."""
406 timestamps = []
407 data = []
408 try:
409 # get metric_id
410 metric_id = self.get_metric_id(endpoint, auth_token, METRIC_MAPPINGS[values['metric_name']],
411 values['resource_uuid'])
412 # Try and collect measures
413 collection_unit = values['collection_unit'].upper()
414 collection_period = values['collection_period']
415
416 # Define the start and end time based on configurations
417 # FIXME: Local timezone may differ from timezone set in Gnocchi, causing discrepancies in measures
418 stop_time = time.strftime("%Y-%m-%d") + "T" + time.strftime("%X")
419 end_time = int(round(time.time() * 1000))
420 if collection_unit == 'YEAR':
421 diff = PERIOD_MS[collection_unit]
422 else:
423 diff = collection_period * PERIOD_MS[collection_unit]
424 s_time = (end_time - diff) / 1000.0
425 start_time = datetime.datetime.fromtimestamp(s_time).strftime(
426 '%Y-%m-%dT%H:%M:%S.%f')
427 base_url = "{}/v1/metric/%(0)s/measures?start=%(1)s&stop=%(2)s"
428 url = base_url.format(endpoint) % {
429 "0": metric_id, "1": start_time, "2": stop_time}
430
431 # Perform metric data request
432 metric_data = Common.perform_request(
433 url, auth_token, req_type="get")
434
435 # Generate a list of the requested timestamps and data
436 for r in json.loads(metric_data.text):
437 timestamp = r[0].replace("T", " ")
438 timestamps.append(timestamp)
439 data.append(r[2])
440
441 return timestamps, data
442 except Exception as exc:
443 log.warning("Failed to gather specified measures: %s", exc)
444 return timestamps, data
445
446 def response_list(self, metric_list, metric_name=None, resource=None):
447 """Create the appropriate lists for a list response."""
448 resp_list, name_list, res_list = [], [], []
449
450 # Create required lists
451 for row in metric_list:
452 # Only list OSM metrics
453 name = None
454 if row['name'] in METRIC_MAPPINGS.values():
455 for k, v in six.iteritems(METRIC_MAPPINGS):
456 if row['name'] == v:
457 name = k
458 metric = {"metric_name": name,
459 "metric_uuid": row['id'],
460 "metric_unit": row['unit'],
461 "resource_uuid": row['resource_id']}
462 resp_list.append(metric)
463 # Generate metric_name specific list
464 if metric_name is not None and name is not None:
465 if metric_name in METRIC_MAPPINGS.keys() and row['name'] == METRIC_MAPPINGS[metric_name]:
466 metric = {"metric_name": metric_name,
467 "metric_uuid": row['id'],
468 "metric_unit": row['unit'],
469 "resource_uuid": row['resource_id']}
470 name_list.append(metric)
471 # Generate resource specific list
472 if resource is not None and name is not None:
473 if row['resource_id'] == resource:
474 metric = {"metric_name": name,
475 "metric_uuid": row['id'],
476 "metric_unit": row['unit'],
477 "resource_uuid": row['resource_id']}
478 res_list.append(metric)
479
480 # Join required lists
481 if metric_name is not None and resource is not None:
482 # Return intersection of res_list and name_list
483 return [i for i in res_list for j in name_list if i['metric_uuid'] == j['metric_uuid']]
484 elif metric_name is not None:
485 return name_list
486 elif resource is not None:
487 return res_list
488 else:
489 return resp_list