825671ea348c54463d11e018a4134660b2c744d9
[osm/MON.git] / osm_mon / plugins / OpenStack / Gnocchi / metrics.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
6
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10
11 # http://www.apache.org/licenses/LICENSE-2.0
12
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
21 ##
22 """Carry out OpenStack metric requests via Gnocchi API."""
23
24 import datetime
25 import json
26 import logging
27 import time
28
29 import six
30 import yaml
31
32 from osm_mon.core.auth import AuthManager
33 from osm_mon.core.message_bus.producer import KafkaProducer
34 from osm_mon.plugins.OpenStack.common import Common
35 from osm_mon.plugins.OpenStack.response import OpenStack_Response
36
37 log = logging.getLogger(__name__)
38
39 METRIC_MAPPINGS = {
40 "average_memory_utilization": "memory.usage",
41 "disk_read_ops": "disk.read.requests",
42 "disk_write_ops": "disk.write.requests",
43 "disk_read_bytes": "disk.read.bytes",
44 "disk_write_bytes": "disk.write.bytes",
45 "packets_dropped": "interface.if_dropped",
46 "packets_received": "interface.if_packets",
47 "packets_sent": "interface.if_packets",
48 "cpu_utilization": "cpu_util",
49 }
50
51 PERIOD_MS = {
52 "HR": 3600000,
53 "DAY": 86400000,
54 "WEEK": 604800000,
55 "MONTH": 2629746000,
56 "YEAR": 31556952000
57 }
58
59
60 class Metrics(object):
61 """OpenStack metric requests performed via the Gnocchi API."""
62
63 def __init__(self):
64 """Initialize the metric actions."""
65
66 # Use the Response class to generate valid json response messages
67 self._response = OpenStack_Response()
68
69 # Initializer a producer to send responses back to SO
70 self._producer = KafkaProducer("metric_response")
71
72 self._auth_manager = AuthManager()
73
74 def metric_calls(self, message, vim_uuid):
75 """Consume info from the message bus to manage metric requests."""
76 log.info("OpenStack metric action required.")
77 try:
78 values = json.loads(message.value)
79 except ValueError:
80 values = yaml.safe_load(message.value)
81
82 if 'metric_name' in values and values['metric_name'] not in METRIC_MAPPINGS.keys():
83 raise ValueError('Metric ' + values['metric_name'] + ' is not supported.')
84
85 verify_ssl = self._auth_manager.is_verify_ssl(vim_uuid)
86
87 endpoint = Common.get_endpoint("metric", vim_uuid, verify_ssl=verify_ssl)
88
89 auth_token = Common.get_auth_token(vim_uuid, verify_ssl=verify_ssl)
90
91 if message.key == "create_metric_request":
92 # Configure metric
93 metric_details = values['metric_create_request']
94 metric_id, resource_id, status = self.configure_metric(
95 endpoint, auth_token, metric_details, verify_ssl)
96
97 # Generate and send a create metric response
98 try:
99 resp_message = self._response.generate_response(
100 'create_metric_response',
101 status=status,
102 cor_id=metric_details['correlation_id'],
103 metric_id=metric_id,
104 r_id=resource_id)
105 log.info("Response messages: %s", resp_message)
106 self._producer.publish_metrics_response(
107 'create_metric_response', resp_message)
108 except Exception as exc:
109 log.warning("Failed to create response: %s", exc)
110
111 elif message.key == "read_metric_data_request":
112 # Read all metric data related to a specified metric
113 timestamps, metric_data = self.read_metric_data(endpoint, auth_token, values, verify_ssl)
114
115 # Generate and send a response message
116 try:
117 metric_id = self.get_metric_id(endpoint,
118 auth_token,
119 METRIC_MAPPINGS[values['metric_name']],
120 values['resource_uuid'],
121 verify_ssl)
122 resp_message = self._response.generate_response(
123 'read_metric_data_response',
124 m_id=metric_id,
125 m_name=values['metric_name'],
126 r_id=values['resource_uuid'],
127 cor_id=values['correlation_id'],
128 times=timestamps,
129 metrics=metric_data)
130 log.info("Response message: %s", resp_message)
131 self._producer.read_metric_data_response(
132 'read_metric_data_response', resp_message)
133 except Exception as exc:
134 log.warning("Failed to create response: %s", exc)
135
136 elif message.key == "delete_metric_request":
137 # delete the specified metric in the request
138 metric_id = self.get_metric_id(endpoint, auth_token, METRIC_MAPPINGS[values['metric_name']],
139 values['resource_uuid'], verify_ssl)
140 status = self.delete_metric(
141 endpoint, auth_token, metric_id, verify_ssl)
142
143 # Generate and send a response message
144 try:
145 resp_message = self._response.generate_response(
146 'delete_metric_response',
147 m_id=metric_id,
148 m_name=values['metric_name'],
149 status=status,
150 r_id=values['resource_uuid'],
151 cor_id=values['correlation_id'])
152 log.info("Response message: %s", resp_message)
153 self._producer.delete_metric_response(
154 'delete_metric_response', resp_message)
155 except Exception as exc:
156 log.warning("Failed to create response: %s", exc)
157
158 elif message.key == "update_metric_request":
159 # Gnocchi doesn't support configuration updates
160 # Log and send a response back to this effect
161 log.warning("Gnocchi doesn't support metric configuration updates.")
162 req_details = values['metric_create_request']
163 metric_name = req_details['metric_name']
164 resource_id = req_details['resource_uuid']
165 metric_id = self.get_metric_id(endpoint, auth_token, metric_name, resource_id, verify_ssl)
166
167 # Generate and send a response message
168 try:
169 resp_message = self._response.generate_response(
170 'update_metric_response',
171 status=False,
172 cor_id=req_details['correlation_id'],
173 r_id=resource_id,
174 m_id=metric_id)
175 log.info("Response message: %s", resp_message)
176 self._producer.update_metric_response(
177 'update_metric_response', resp_message)
178 except Exception as exc:
179 log.warning("Failed to create response: %s", exc)
180
181 elif message.key == "list_metric_request":
182 list_details = values['metrics_list_request']
183
184 metric_list = self.list_metrics(
185 endpoint, auth_token, list_details, verify_ssl)
186
187 # Generate and send a response message
188 try:
189 resp_message = self._response.generate_response(
190 'list_metric_response',
191 m_list=metric_list,
192 cor_id=list_details['correlation_id'])
193 log.info("Response message: %s", resp_message)
194 self._producer.list_metric_response(
195 'list_metric_response', resp_message)
196 except Exception as exc:
197 log.warning("Failed to create response: %s", exc)
198
199 else:
200 log.warning("Unknown key %s, no action will be performed.", message.key)
201
202 def configure_metric(self, endpoint, auth_token, values, verify_ssl):
203 """Create the new metric in Gnocchi."""
204 try:
205 resource_id = values['resource_uuid']
206 except KeyError:
207 log.warning("resource_uuid field is missing.")
208 return None, None, False
209
210 try:
211 metric_name = values['metric_name'].lower()
212 except KeyError:
213 log.warning("metric_name field is missing.")
214 return None, None, False
215
216 # Check for an existing metric for this resource
217 metric_id = self.get_metric_id(
218 endpoint, auth_token, metric_name, resource_id, verify_ssl)
219
220 if metric_id is None:
221 # Try appending metric to existing resource
222 try:
223 base_url = "{}/v1/resource/generic/%s/metric"
224 res_url = base_url.format(endpoint) % resource_id
225 payload = {metric_name: {'archive_policy_name': 'high',
226 'unit': values['metric_unit']}}
227 result = Common.perform_request(
228 res_url,
229 auth_token,
230 req_type="post",
231 verify_ssl=verify_ssl,
232 payload=json.dumps(payload, sort_keys=True))
233 # Get id of newly created metric
234 for row in json.loads(result.text):
235 if row['name'] == metric_name:
236 metric_id = row['id']
237 log.info("Appended metric to existing resource.")
238
239 return metric_id, resource_id, True
240 except Exception as exc:
241 # Gnocchi version of resource does not exist creating a new one
242 log.info("Failed to append metric to existing resource:%s",
243 exc)
244 try:
245 url = "{}/v1/resource/generic".format(endpoint)
246 metric = {'name': metric_name,
247 'archive_policy_name': 'high',
248 'unit': values['metric_unit'], }
249
250 resource_payload = json.dumps({'id': resource_id,
251 'metrics': {
252 metric_name: metric}}, sort_keys=True)
253
254 resource = Common.perform_request(
255 url,
256 auth_token,
257 req_type="post",
258 payload=resource_payload,
259 verify_ssl=verify_ssl)
260
261 # Return the newly created resource_id for creating alarms
262 new_resource_id = json.loads(resource.text)['id']
263 log.info("Created new resource for metric: %s",
264 new_resource_id)
265
266 metric_id = self.get_metric_id(
267 endpoint, auth_token, metric_name, new_resource_id, verify_ssl)
268
269 return metric_id, new_resource_id, True
270 except Exception as exc:
271 log.warning("Failed to create a new resource: %s", exc)
272 return None, None, False
273
274 else:
275 log.info("This metric already exists for this resource.")
276
277 return metric_id, resource_id, False
278
279 def delete_metric(self, endpoint, auth_token, metric_id, verify_ssl):
280 """Delete metric."""
281 url = "{}/v1/metric/%s".format(endpoint) % metric_id
282
283 try:
284 result = Common.perform_request(
285 url,
286 auth_token,
287 req_type="delete",
288 verify_ssl=verify_ssl)
289 if str(result.status_code) == "404":
290 log.warning("Failed to delete the metric.")
291 return False
292 else:
293 return True
294 except Exception as exc:
295 log.warning("Failed to delete metric: %s", exc)
296 return False
297
298 def list_metrics(self, endpoint, auth_token, values, verify_ssl):
299 """List all metrics."""
300
301 # Check for a specified list
302 metric_name = None
303 if 'metric_name' in values:
304 metric_name = values['metric_name'].lower()
305
306 resource = None
307 if 'resource_uuid' in values:
308 resource = values['resource_uuid']
309
310 try:
311 if resource:
312 url = "{}/v1/resource/generic/{}".format(endpoint, resource)
313 result = Common.perform_request(
314 url, auth_token, req_type="get", verify_ssl=verify_ssl)
315 resource_data = json.loads(result.text)
316 metrics = resource_data['metrics']
317
318 if metric_name:
319 if metrics.get(METRIC_MAPPINGS[metric_name]):
320 metric_id = metrics[METRIC_MAPPINGS[metric_name]]
321 url = "{}/v1/metric/{}".format(endpoint, metric_id)
322 result = Common.perform_request(
323 url, auth_token, req_type="get", verify_ssl=verify_ssl)
324 metric_list = json.loads(result.text)
325 log.info("Returning an %s resource list for %s metrics",
326 metric_name, resource)
327 return metric_list
328 else:
329 log.info("Metric {} not found for {} resource".format(metric_name, resource))
330 return None
331 else:
332 metric_list = []
333 for k, v in metrics.items():
334 url = "{}/v1/metric/{}".format(endpoint, v)
335 result = Common.perform_request(
336 url, auth_token, req_type="get", verify_ssl=verify_ssl)
337 metric = json.loads(result.text)
338 metric_list.append(metric)
339 if metric_list:
340 log.info("Return a list of %s resource metrics", resource)
341 return metric_list
342
343 else:
344 log.info("There are no metrics available")
345 return []
346 else:
347 url = "{}/v1/metric?sort=name:asc".format(endpoint)
348 result = Common.perform_request(
349 url, auth_token, req_type="get", verify_ssl=verify_ssl)
350 metrics = []
351 metrics_partial = json.loads(result.text)
352 for metric in metrics_partial:
353 metrics.append(metric)
354
355 while len(json.loads(result.text)) > 0:
356 last_metric_id = metrics_partial[-1]['id']
357 url = "{}/v1/metric?sort=name:asc&marker={}".format(endpoint, last_metric_id)
358 result = Common.perform_request(
359 url, auth_token, req_type="get", verify_ssl=verify_ssl)
360 if len(json.loads(result.text)) > 0:
361 metrics_partial = json.loads(result.text)
362 for metric in metrics_partial:
363 metrics.append(metric)
364
365 if metrics is not None:
366 # Format the list response
367 if metric_name is not None:
368 metric_list = self.response_list(
369 metrics, metric_name=metric_name)
370 log.info("Returning a list of %s metrics", metric_name)
371 else:
372 metric_list = self.response_list(metrics)
373 log.info("Returning a complete list of metrics")
374 return metric_list
375 else:
376 log.info("There are no metrics available")
377 return []
378 except Exception as exc:
379 log.exception("Failed to list metrics. %s", exc)
380 return None
381
382 def get_metric_id(self, endpoint, auth_token, metric_name, resource_id, verify_ssl):
383 """Check if the desired metric already exists for the resource."""
384 url = "{}/v1/resource/generic/%s".format(endpoint) % resource_id
385 try:
386 # Try return the metric id if it exists
387 result = Common.perform_request(
388 url,
389 auth_token,
390 req_type="get",
391 verify_ssl=verify_ssl)
392 return json.loads(result.text)['metrics'][metric_name]
393 except KeyError:
394 log.warning("Metric doesn't exist. No metric_id available")
395 return None
396
397 def read_metric_data(self, endpoint, auth_token, values, verify_ssl):
398 """Collect metric measures over a specified time period."""
399 timestamps = []
400 data = []
401 try:
402 # get metric_id
403 metric_id = self.get_metric_id(endpoint, auth_token, METRIC_MAPPINGS[values['metric_name']],
404 values['resource_uuid'], verify_ssl)
405 # Try and collect measures
406 collection_unit = values['collection_unit'].upper()
407 collection_period = values['collection_period']
408
409 # Define the start and end time based on configurations
410 # FIXME: Local timezone may differ from timezone set in Gnocchi, causing discrepancies in measures
411 stop_time = time.strftime("%Y-%m-%d") + "T" + time.strftime("%X")
412 end_time = int(round(time.time() * 1000))
413 if collection_unit == 'YEAR':
414 diff = PERIOD_MS[collection_unit]
415 else:
416 diff = collection_period * PERIOD_MS[collection_unit]
417 s_time = (end_time - diff) / 1000.0
418 start_time = datetime.datetime.fromtimestamp(s_time).strftime(
419 '%Y-%m-%dT%H:%M:%S.%f')
420 base_url = "{}/v1/metric/%(0)s/measures?start=%(1)s&stop=%(2)s"
421 url = base_url.format(endpoint) % {
422 "0": metric_id, "1": start_time, "2": stop_time}
423
424 # Perform metric data request
425 metric_data = Common.perform_request(
426 url,
427 auth_token,
428 req_type="get",
429 verify_ssl=verify_ssl)
430
431 # Generate a list of the requested timestamps and data
432 for r in json.loads(metric_data.text):
433 timestamp = r[0].replace("T", " ")
434 timestamps.append(timestamp)
435 data.append(r[2])
436
437 return timestamps, data
438 except Exception as exc:
439 log.warning("Failed to gather specified measures: %s", exc)
440 return timestamps, data
441
442 def response_list(self, metric_list, metric_name=None, resource=None):
443 """Create the appropriate lists for a list response."""
444 resp_list, name_list, res_list = [], [], []
445
446 # Create required lists
447 for row in metric_list:
448 # Only list OSM metrics
449 name = None
450 if row['name'] in METRIC_MAPPINGS.values():
451 for k, v in six.iteritems(METRIC_MAPPINGS):
452 if row['name'] == v:
453 name = k
454 metric = {"metric_name": name,
455 "metric_uuid": row['id'],
456 "metric_unit": row['unit'],
457 "resource_uuid": row['resource_id']}
458 resp_list.append(metric)
459 # Generate metric_name specific list
460 if metric_name is not None and name is not None:
461 if metric_name in METRIC_MAPPINGS.keys() and row['name'] == METRIC_MAPPINGS[metric_name]:
462 metric = {"metric_name": metric_name,
463 "metric_uuid": row['id'],
464 "metric_unit": row['unit'],
465 "resource_uuid": row['resource_id']}
466 name_list.append(metric)
467 # Generate resource specific list
468 if resource is not None and name is not None:
469 if row['resource_id'] == resource:
470 metric = {"metric_name": name,
471 "metric_uuid": row['id'],
472 "metric_unit": row['unit'],
473 "resource_uuid": row['resource_id']}
474 res_list.append(metric)
475
476 # Join required lists
477 if metric_name is not None and resource is not None:
478 # Return intersection of res_list and name_list
479 return [i for i in res_list for j in name_list if i['metric_uuid'] == j['metric_uuid']]
480 elif metric_name is not None:
481 return name_list
482 elif resource is not None:
483 return res_list
484 else:
485 return resp_list