Adds support for vdu_name, ns_id and vnf_member_index
[osm/MON.git] / osm_mon / plugins / OpenStack / Gnocchi / metrics.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
6
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10
11 # http://www.apache.org/licenses/LICENSE-2.0
12
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
21 ##
22 """Carry out OpenStack metric requests via Gnocchi API."""
23
24 import datetime
25 import json
26 import logging
27 import time
28
29 import six
30 import yaml
31
32 from osm_mon.core.message_bus.producer import KafkaProducer
33 from osm_mon.core.settings import Config
34 from osm_mon.plugins.OpenStack.common import Common
35 from osm_mon.plugins.OpenStack.response import OpenStack_Response
36
37 log = logging.getLogger(__name__)
38
39 METRIC_MAPPINGS = {
40 "average_memory_utilization": "memory.usage",
41 "disk_read_ops": "disk.read.requests",
42 "disk_write_ops": "disk.write.requests",
43 "disk_read_bytes": "disk.read.bytes",
44 "disk_write_bytes": "disk.write.bytes",
45 "packets_dropped": "interface.if_dropped",
46 "packets_received": "interface.if_packets",
47 "packets_sent": "interface.if_packets",
48 "cpu_utilization": "cpu_util",
49 }
50
51 PERIOD_MS = {
52 "HR": 3600000,
53 "DAY": 86400000,
54 "WEEK": 604800000,
55 "MONTH": 2629746000,
56 "YEAR": 31556952000
57 }
58
59
60 class Metrics(object):
61 """OpenStack metric requests performed via the Gnocchi API."""
62
63 def __init__(self):
64 """Initialize the metric actions."""
65 # Configure an instance of the OpenStack metric plugin
66 config = Config.instance()
67 config.read_environ()
68
69 # Initialise authentication for API requests
70 self._common = Common()
71
72 # Use the Response class to generate valid json response messages
73 self._response = OpenStack_Response()
74
75 # Initializer a producer to send responses back to SO
76 self._producer = KafkaProducer("metric_response")
77
78 def metric_calls(self, message, vim_uuid):
79 """Consume info from the message bus to manage metric requests."""
80 try:
81 values = json.loads(message.value)
82 except ValueError:
83 values = yaml.safe_load(message.value)
84 log.info("OpenStack metric action required.")
85
86 auth_token = Common.get_auth_token(vim_uuid)
87
88 endpoint = Common.get_endpoint("metric", vim_uuid)
89
90 if 'metric_name' in values and values['metric_name'] not in METRIC_MAPPINGS.keys():
91 raise ValueError('Metric ' + values['metric_name'] + ' is not supported.')
92
93 if message.key == "create_metric_request":
94 # Configure metric
95 metric_details = values['metric_create_request']
96 metric_id, resource_id, status = self.configure_metric(
97 endpoint, auth_token, metric_details)
98
99 # Generate and send a create metric response
100 try:
101 resp_message = self._response.generate_response(
102 'create_metric_response', status=status,
103 cor_id=metric_details['correlation_id'],
104 metric_id=metric_id, r_id=resource_id)
105 log.info("Response messages: %s", resp_message)
106 self._producer.create_metrics_resp(
107 'create_metric_response', resp_message)
108 except Exception as exc:
109 log.warning("Failed to create response: %s", exc)
110
111 elif message.key == "read_metric_data_request":
112 # Read all metric data related to a specified metric
113 timestamps, metric_data = self.read_metric_data(
114 endpoint, auth_token, values)
115
116 # Generate and send a response message
117 try:
118
119 metric_id = self.get_metric_id(endpoint, auth_token, METRIC_MAPPINGS[values['metric_name']],
120 values['resource_uuid'])
121 resp_message = self._response.generate_response(
122 'read_metric_data_response',
123 m_id=metric_id,
124 m_name=values['metric_name'],
125 r_id=values['resource_uuid'],
126 cor_id=values['correlation_id'],
127 times=timestamps, metrics=metric_data)
128 log.info("Response message: %s", resp_message)
129 self._producer.read_metric_data_response(
130 'read_metric_data_response', resp_message)
131 except Exception as exc:
132 log.warning("Failed to send read metric response:%s", exc)
133
134 elif message.key == "delete_metric_request":
135 # delete the specified metric in the request
136 metric_id = self.get_metric_id(endpoint, auth_token, METRIC_MAPPINGS[values['metric_name']],
137 values['resource_uuid'])
138 status = self.delete_metric(
139 endpoint, auth_token, metric_id)
140
141 # Generate and send a response message
142 try:
143 resp_message = self._response.generate_response(
144 'delete_metric_response', m_id=metric_id,
145 m_name=values['metric_name'],
146 status=status, r_id=values['resource_uuid'],
147 cor_id=values['correlation_id'])
148 log.info("Response message: %s", resp_message)
149 self._producer.delete_metric_response(
150 'delete_metric_response', resp_message)
151 except Exception as exc:
152 log.warning("Failed to send delete response:%s", exc)
153
154 elif message.key == "update_metric_request":
155 # Gnocchi doesn't support configuration updates
156 # Log and send a response back to this effect
157 log.warning("Gnocchi doesn't support metric configuration\
158 updates.")
159 req_details = values['metric_create_request']
160 metric_name = req_details['metric_name']
161 resource_id = req_details['resource_uuid']
162 metric_id = self.get_metric_id(
163 endpoint, auth_token, metric_name, resource_id)
164
165 # Generate and send a response message
166 try:
167 resp_message = self._response.generate_response(
168 'update_metric_response', status=False,
169 cor_id=req_details['correlation_id'],
170 r_id=resource_id, m_id=metric_id)
171 log.info("Response message: %s", resp_message)
172 self._producer.update_metric_response(
173 'update_metric_response', resp_message)
174 except Exception as exc:
175 log.exception("Failed to send an update response:")
176
177 elif message.key == "list_metric_request":
178 list_details = values['metrics_list_request']
179
180 metric_list = self.list_metrics(
181 endpoint, auth_token, list_details)
182
183 # Generate and send a response message
184 try:
185 resp_message = self._response.generate_response(
186 'list_metric_response', m_list=metric_list,
187 cor_id=list_details['correlation_id'])
188 log.info("Response message: %s", resp_message)
189 self._producer.list_metric_response(
190 'list_metric_response', resp_message)
191 except Exception as exc:
192 log.warning("Failed to send a list response:%s", exc)
193
194 else:
195 log.warning("Unknown key, no action will be performed.")
196
197 return
198
199 def configure_metric(self, endpoint, auth_token, values):
200 """Create the new metric in Gnocchi."""
201 try:
202 resource_id = values['resource_uuid']
203 except KeyError:
204 log.warning("Resource is not defined correctly.")
205 return None, None, False
206
207 # Check/Normalize metric name
208 norm_name, metric_name = self.get_metric_name(values)
209 if metric_name is None:
210 log.warning("This metric is not supported by this plugin.")
211 return None, resource_id, False
212
213 # Check for an existing metric for this resource
214 metric_id = self.get_metric_id(
215 endpoint, auth_token, metric_name, resource_id)
216
217 if metric_id is None:
218 # Try appending metric to existing resource
219 try:
220 base_url = "{}/v1/resource/generic/%s/metric"
221 res_url = base_url.format(endpoint) % resource_id
222 payload = {metric_name: {'archive_policy_name': 'high',
223 'unit': values['metric_unit']}}
224 result = Common.perform_request(
225 res_url, auth_token, req_type="post",
226 payload=json.dumps(payload, sort_keys=True))
227 # Get id of newly created metric
228 for row in json.loads(result.text):
229 if row['name'] == metric_name:
230 metric_id = row['id']
231 log.info("Appended metric to existing resource.")
232
233 return metric_id, resource_id, True
234 except Exception as exc:
235 # Gnocchi version of resource does not exist creating a new one
236 log.info("Failed to append metric to existing resource:%s",
237 exc)
238 try:
239 url = "{}/v1/resource/generic".format(endpoint)
240 metric = {'name': metric_name,
241 'archive_policy_name': 'high',
242 'unit': values['metric_unit'], }
243
244 resource_payload = json.dumps({'id': resource_id,
245 'metrics': {
246 metric_name: metric}}, sort_keys=True)
247
248 resource = Common.perform_request(
249 url, auth_token, req_type="post",
250 payload=resource_payload)
251
252 # Return the newly created resource_id for creating alarms
253 new_resource_id = json.loads(resource.text)['id']
254 log.info("Created new resource for metric: %s",
255 new_resource_id)
256
257 metric_id = self.get_metric_id(
258 endpoint, auth_token, metric_name, new_resource_id)
259
260 return metric_id, new_resource_id, True
261 except Exception as exc:
262 log.warning("Failed to create a new resource:%s", exc)
263 return None, None, False
264
265 else:
266 log.info("This metric already exists for this resource.")
267
268 return metric_id, resource_id, False
269
270 def delete_metric(self, endpoint, auth_token, metric_id):
271 """Delete metric."""
272 url = "{}/v1/metric/%s".format(endpoint) % metric_id
273
274 try:
275 result = Common.perform_request(
276 url, auth_token, req_type="delete")
277 if str(result.status_code) == "404":
278 log.warning("Failed to delete the metric.")
279 return False
280 else:
281 return True
282 except Exception as exc:
283 log.warning("Failed to carry out delete metric request:%s", exc)
284 return False
285
286 def list_metrics(self, endpoint, auth_token, values):
287 """List all metrics."""
288
289 # Check for a specified list
290 try:
291 # Check if the metric_name was specified for the list
292 metric_name = values['metric_name'].lower()
293 if metric_name not in METRIC_MAPPINGS.keys():
294 log.warning("This metric is not supported, won't be listed.")
295 metric_name = None
296 except KeyError as exc:
297 log.info("Metric name is not specified: %s", exc)
298 metric_name = None
299
300 try:
301 resource = values['resource_uuid']
302 except KeyError as exc:
303 log.info("Resource is not specified:%s", exc)
304 resource = None
305
306 try:
307 url = "{}/v1/metric?sort=name:asc".format(endpoint)
308 result = Common.perform_request(
309 url, auth_token, req_type="get")
310 metrics = []
311 metrics_partial = json.loads(result.text)
312 for metric in metrics_partial:
313 metrics.append(metric)
314
315 while len(json.loads(result.text)) > 0:
316 last_metric_id = metrics_partial[-1]['id']
317 url = "{}/v1/metric?sort=name:asc&marker={}".format(endpoint, last_metric_id)
318 result = Common.perform_request(
319 url, auth_token, req_type="get")
320 if len(json.loads(result.text)) > 0:
321 metrics_partial = json.loads(result.text)
322 for metric in metrics_partial:
323 metrics.append(metric)
324
325 if metrics is not None:
326 # Format the list response
327 if metric_name is not None and resource is not None:
328 metric_list = self.response_list(
329 metrics, metric_name=metric_name, resource=resource)
330 log.info("Returning an %s resource list for %s metrics",
331 metric_name, resource)
332 elif metric_name is not None:
333 metric_list = self.response_list(
334 metrics, metric_name=metric_name)
335 log.info("Returning a list of %s metrics", metric_name)
336 elif resource is not None:
337 metric_list = self.response_list(
338 metrics, resource=resource)
339 log.info("Return a list of %s resource metrics", resource)
340 else:
341 metric_list = self.response_list(metrics)
342 log.info("Returning a complete list of metrics")
343
344 return metric_list
345 else:
346 log.info("There are no metrics available")
347 return []
348 except Exception as exc:
349 log.warning("Failed to generate any metric list. %s", exc)
350 return None
351
352 def get_metric_id(self, endpoint, auth_token, metric_name, resource_id):
353 """Check if the desired metric already exists for the resource."""
354 url = "{}/v1/resource/generic/%s".format(endpoint) % resource_id
355 try:
356 # Try return the metric id if it exists
357 result = Common.perform_request(
358 url, auth_token, req_type="get")
359 return json.loads(result.text)['metrics'][metric_name]
360 except Exception:
361 log.info("Metric doesn't exist. No metric_id available")
362 return None
363
364 def get_metric_name(self, values):
365 """Check metric name configuration and normalize."""
366 metric_name = None
367 try:
368 # Normalize metric name
369 metric_name = values['metric_name'].lower()
370 return metric_name, METRIC_MAPPINGS[metric_name]
371 except KeyError:
372 log.info("Metric name %s is invalid.", metric_name)
373 return metric_name, None
374
375 def read_metric_data(self, endpoint, auth_token, values):
376 """Collect metric measures over a specified time period."""
377 timestamps = []
378 data = []
379 try:
380 # get metric_id
381 metric_id = self.get_metric_id(endpoint, auth_token, METRIC_MAPPINGS[values['metric_name']],
382 values['resource_uuid'])
383 # Try and collect measures
384 collection_unit = values['collection_unit'].upper()
385 collection_period = values['collection_period']
386
387 # Define the start and end time based on configurations
388 # FIXME: Local timezone may differ from timezone set in Gnocchi, causing discrepancies in measures
389 stop_time = time.strftime("%Y-%m-%d") + "T" + time.strftime("%X")
390 end_time = int(round(time.time() * 1000))
391 if collection_unit == 'YEAR':
392 diff = PERIOD_MS[collection_unit]
393 else:
394 diff = collection_period * PERIOD_MS[collection_unit]
395 s_time = (end_time - diff) / 1000.0
396 start_time = datetime.datetime.fromtimestamp(s_time).strftime(
397 '%Y-%m-%dT%H:%M:%S.%f')
398 base_url = "{}/v1/metric/%(0)s/measures?start=%(1)s&stop=%(2)s"
399 url = base_url.format(endpoint) % {
400 "0": metric_id, "1": start_time, "2": stop_time}
401
402 # Perform metric data request
403 metric_data = Common.perform_request(
404 url, auth_token, req_type="get")
405
406 # Generate a list of the requested timestamps and data
407 for r in json.loads(metric_data.text):
408 timestamp = r[0].replace("T", " ")
409 timestamps.append(timestamp)
410 data.append(r[2])
411
412 return timestamps, data
413 except Exception as exc:
414 log.warning("Failed to gather specified measures: %s", exc)
415 return timestamps, data
416
417 def response_list(self, metric_list, metric_name=None, resource=None):
418 """Create the appropriate lists for a list response."""
419 resp_list, name_list, res_list = [], [], []
420
421 # Create required lists
422 for row in metric_list:
423 # Only list OSM metrics
424 name = None
425 if row['name'] in METRIC_MAPPINGS.values():
426 for k, v in six.iteritems(METRIC_MAPPINGS):
427 if row['name'] == v:
428 name = k
429 metric = {"metric_name": name,
430 "metric_uuid": row['id'],
431 "metric_unit": row['unit'],
432 "resource_uuid": row['resource_id']}
433 resp_list.append(metric)
434 # Generate metric_name specific list
435 if metric_name is not None and name is not None:
436 if metric_name in METRIC_MAPPINGS.keys() and row['name'] == METRIC_MAPPINGS[metric_name]:
437 metric = {"metric_name": metric_name,
438 "metric_uuid": row['id'],
439 "metric_unit": row['unit'],
440 "resource_uuid": row['resource_id']}
441 name_list.append(metric)
442 # Generate resource specific list
443 if resource is not None and name is not None:
444 if row['resource_id'] == resource:
445 metric = {"metric_name": name,
446 "metric_uuid": row['id'],
447 "metric_unit": row['unit'],
448 "resource_uuid": row['resource_id']}
449 res_list.append(metric)
450
451 # Join required lists
452 if metric_name is not None and resource is not None:
453 # Return intersection of res_list and name_list
454 return [i for i in res_list for j in name_list if i['metric_uuid'] == j['metric_uuid']]
455 elif metric_name is not None:
456 return name_list
457 elif resource is not None:
458 return res_list
459 else:
460 return resp_list