Adds granularity support in OpenStack vim config
[osm/MON.git] / osm_mon / plugins / OpenStack / Gnocchi / metrics.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
6
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10
11 # http://www.apache.org/licenses/LICENSE-2.0
12
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
21 ##
22 """Carry out OpenStack metric requests via Gnocchi API."""
23
24 import datetime
25 import json
26 import logging
27 import time
28
29 import six
30 import yaml
31
32 from osm_mon.core.message_bus.producer import KafkaProducer
33 from osm_mon.core.settings import Config
34 from osm_mon.plugins.OpenStack.common import Common
35 from osm_mon.plugins.OpenStack.response import OpenStack_Response
36
37 log = logging.getLogger(__name__)
38
39 METRIC_MAPPINGS = {
40 "average_memory_utilization": "memory.usage",
41 "disk_read_ops": "disk.read.requests",
42 "disk_write_ops": "disk.write.requests",
43 "disk_read_bytes": "disk.read.bytes",
44 "disk_write_bytes": "disk.write.bytes",
45 "packets_dropped": "interface.if_dropped",
46 "packets_received": "interface.if_packets",
47 "packets_sent": "interface.if_packets",
48 "cpu_utilization": "cpu_util",
49 }
50
51 PERIOD_MS = {
52 "HR": 3600000,
53 "DAY": 86400000,
54 "WEEK": 604800000,
55 "MONTH": 2629746000,
56 "YEAR": 31556952000
57 }
58
59
60 class Metrics(object):
61 """OpenStack metric requests performed via the Gnocchi API."""
62
63 def __init__(self):
64 """Initialize the metric actions."""
65 # Configure an instance of the OpenStack metric plugin
66 config = Config.instance()
67 config.read_environ()
68
69 # Initialise authentication for API requests
70 self._common = Common()
71
72 # Use the Response class to generate valid json response messages
73 self._response = OpenStack_Response()
74
75 # Initializer a producer to send responses back to SO
76 self._producer = KafkaProducer("metric_response")
77
78 def metric_calls(self, message):
79 """Consume info from the message bus to manage metric requests."""
80 try:
81 values = json.loads(message.value)
82 except ValueError:
83 values = yaml.safe_load(message.value)
84 log.info("OpenStack metric action required.")
85
86 auth_token = Common.get_auth_token(values['vim_uuid'])
87
88 endpoint = Common.get_endpoint("metric", values['vim_uuid'])
89
90 if message.key == "create_metric_request":
91 # Configure metric
92 metric_details = values['metric_create']
93 metric_id, resource_id, status = self.configure_metric(
94 endpoint, auth_token, metric_details)
95
96 # Generate and send a create metric response
97 try:
98 resp_message = self._response.generate_response(
99 'create_metric_response', status=status,
100 cor_id=values['correlation_id'],
101 metric_id=metric_id, r_id=resource_id)
102 log.info("Response messages: %s", resp_message)
103 self._producer.create_metrics_resp(
104 'create_metric_response', resp_message,
105 'metric_response')
106 except Exception as exc:
107 log.warn("Failed to create response: %s", exc)
108
109 elif message.key == "read_metric_data_request":
110 # Read all metric data related to a specified metric
111 timestamps, metric_data = self.read_metric_data(
112 endpoint, auth_token, values)
113
114 # Generate and send a response message
115 try:
116
117 metric_id = self.get_metric_id(endpoint, auth_token, METRIC_MAPPINGS[values['metric_name']],
118 values['resource_uuid'])
119 resp_message = self._response.generate_response(
120 'read_metric_data_response',
121 m_id=metric_id,
122 m_name=values['metric_name'],
123 r_id=values['resource_uuid'],
124 cor_id=values['correlation_id'],
125 times=timestamps, metrics=metric_data)
126 log.info("Response message: %s", resp_message)
127 self._producer.read_metric_data_response(
128 'read_metric_data_response', resp_message,
129 'metric_response')
130 except Exception as exc:
131 log.warn("Failed to send read metric response:%s", exc)
132
133 elif message.key == "delete_metric_request":
134 # delete the specified metric in the request
135 metric_id = self.get_metric_id(endpoint, auth_token, METRIC_MAPPINGS[values['metric_name']],
136 values['resource_uuid'])
137 status = self.delete_metric(
138 endpoint, auth_token, metric_id)
139
140 # Generate and send a response message
141 try:
142 resp_message = self._response.generate_response(
143 'delete_metric_response', m_id=metric_id,
144 m_name=values['metric_name'],
145 status=status, r_id=values['resource_uuid'],
146 cor_id=values['correlation_id'])
147 log.info("Response message: %s", resp_message)
148 self._producer.delete_metric_response(
149 'delete_metric_response', resp_message,
150 'metric_response')
151 except Exception as exc:
152 log.warn("Failed to send delete response:%s", exc)
153
154 elif message.key == "update_metric_request":
155 # Gnocchi doesn't support configuration updates
156 # Log and send a response back to this effect
157 log.warn("Gnocchi doesn't support metric configuration\
158 updates.")
159 req_details = values['metric_create']
160 metric_name = req_details['metric_name']
161 resource_id = req_details['resource_uuid']
162 metric_id = self.get_metric_id(
163 endpoint, auth_token, metric_name, resource_id)
164
165 # Generate and send a response message
166 try:
167 resp_message = self._response.generate_response(
168 'update_metric_response', status=False,
169 cor_id=values['correlation_id'],
170 r_id=resource_id, m_id=metric_id)
171 log.info("Response message: %s", resp_message)
172 self._producer.update_metric_response(
173 'update_metric_response', resp_message,
174 'metric_response')
175 except Exception as exc:
176 log.warn("Failed to send an update response:%s", exc)
177
178 elif message.key == "list_metric_request":
179 list_details = values['metrics_list_request']
180
181 metric_list = self.list_metrics(
182 endpoint, auth_token, list_details)
183
184 # Generate and send a response message
185 try:
186 resp_message = self._response.generate_response(
187 'list_metric_response', m_list=metric_list,
188 cor_id=list_details['correlation_id'])
189 log.info("Response message: %s", resp_message)
190 self._producer.list_metric_response(
191 'list_metric_response', resp_message,
192 'metric_response')
193 except Exception as exc:
194 log.warn("Failed to send a list response:%s", exc)
195
196 else:
197 log.warn("Unknown key, no action will be performed.")
198
199 return
200
201 def configure_metric(self, endpoint, auth_token, values):
202 """Create the new metric in Gnocchi."""
203 try:
204 resource_id = values['resource_uuid']
205 except KeyError:
206 log.warn("Resource is not defined correctly.")
207 return None, None, False
208
209 # Check/Normalize metric name
210 norm_name, metric_name = self.get_metric_name(values)
211 if metric_name is None:
212 log.warn("This metric is not supported by this plugin.")
213 return None, resource_id, False
214
215 # Check for an existing metric for this resource
216 metric_id = self.get_metric_id(
217 endpoint, auth_token, metric_name, resource_id)
218
219 if metric_id is None:
220 # Try appending metric to existing resource
221 try:
222 base_url = "{}/v1/resource/generic/%s/metric"
223 res_url = base_url.format(endpoint) % resource_id
224 payload = {metric_name: {'archive_policy_name': 'high',
225 'unit': values['metric_unit']}}
226 result = Common.perform_request(
227 res_url, auth_token, req_type="post",
228 payload=json.dumps(payload))
229 # Get id of newly created metric
230 for row in json.loads(result.text):
231 if row['name'] == metric_name:
232 metric_id = row['id']
233 log.info("Appended metric to existing resource.")
234
235 return metric_id, resource_id, True
236 except Exception as exc:
237 # Gnocchi version of resource does not exist creating a new one
238 log.info("Failed to append metric to existing resource:%s",
239 exc)
240 try:
241 url = "{}/v1/resource/generic".format(endpoint)
242 metric = {'name': metric_name,
243 'archive_policy_name': 'high',
244 'unit': values['metric_unit'], }
245
246 resource_payload = json.dumps({'id': resource_id,
247 'metrics': {
248 metric_name: metric}})
249
250 resource = Common.perform_request(
251 url, auth_token, req_type="post",
252 payload=resource_payload)
253
254 # Return the newly created resource_id for creating alarms
255 new_resource_id = json.loads(resource.text)['id']
256 log.info("Created new resource for metric: %s",
257 new_resource_id)
258
259 metric_id = self.get_metric_id(
260 endpoint, auth_token, metric_name, new_resource_id)
261
262 return metric_id, new_resource_id, True
263 except Exception as exc:
264 log.warn("Failed to create a new resource:%s", exc)
265 return None, None, False
266
267 else:
268 log.info("This metric already exists for this resource.")
269
270 return metric_id, resource_id, False
271
272 def delete_metric(self, endpoint, auth_token, metric_id):
273 """Delete metric."""
274 url = "{}/v1/metric/%s".format(endpoint) % metric_id
275
276 try:
277 result = Common.perform_request(
278 url, auth_token, req_type="delete")
279 if str(result.status_code) == "404":
280 log.warn("Failed to delete the metric.")
281 return False
282 else:
283 return True
284 except Exception as exc:
285 log.warn("Failed to carry out delete metric request:%s", exc)
286 return False
287
288 def list_metrics(self, endpoint, auth_token, values):
289 """List all metrics."""
290
291 # Check for a specified list
292 try:
293 # Check if the metric_name was specified for the list
294 metric_name = values['metric_name'].lower()
295 if metric_name not in METRIC_MAPPINGS.keys():
296 log.warn("This metric is not supported, won't be listed.")
297 metric_name = None
298 except KeyError as exc:
299 log.info("Metric name is not specified: %s", exc)
300 metric_name = None
301
302 try:
303 resource = values['resource_uuid']
304 except KeyError as exc:
305 log.info("Resource is not specified:%s", exc)
306 resource = None
307
308 try:
309 url = "{}/v1/metric?sort=name:asc".format(endpoint)
310 result = Common.perform_request(
311 url, auth_token, req_type="get")
312 metrics = []
313 metrics_partial = json.loads(result.text)
314 for metric in metrics_partial:
315 metrics.append(metric)
316
317 while len(json.loads(result.text)) > 0:
318 last_metric_id = metrics_partial[-1]['id']
319 url = "{}/v1/metric?sort=name:asc&marker={}".format(endpoint, last_metric_id)
320 result = Common.perform_request(
321 url, auth_token, req_type="get")
322 if len(json.loads(result.text)) > 0:
323 metrics_partial = json.loads(result.text)
324 for metric in metrics_partial:
325 metrics.append(metric)
326
327 if metrics is not None:
328 # Format the list response
329 if metric_name is not None and resource is not None:
330 metric_list = self.response_list(
331 metrics, metric_name=metric_name, resource=resource)
332 log.info("Returning an %s resource list for %s metrics",
333 metric_name, resource)
334 elif metric_name is not None:
335 metric_list = self.response_list(
336 metrics, metric_name=metric_name)
337 log.info("Returning a list of %s metrics", metric_name)
338 elif resource is not None:
339 metric_list = self.response_list(
340 metrics, resource=resource)
341 log.info("Return a list of %s resource metrics", resource)
342 else:
343 metric_list = self.response_list(metrics)
344 log.info("Returning a complete list of metrics")
345
346 return metric_list
347 else:
348 log.info("There are no metrics available")
349 return []
350 except Exception as exc:
351 log.warn("Failed to generate any metric list. %s", exc)
352 return None
353
354 def get_metric_id(self, endpoint, auth_token, metric_name, resource_id):
355 """Check if the desired metric already exists for the resource."""
356 url = "{}/v1/resource/generic/%s".format(endpoint) % resource_id
357 try:
358 # Try return the metric id if it exists
359 result = Common.perform_request(
360 url, auth_token, req_type="get")
361 return json.loads(result.text)['metrics'][metric_name]
362 except Exception:
363 log.info("Metric doesn't exist. No metric_id available")
364 return None
365
366 def get_metric_name(self, values):
367 """Check metric name configuration and normalize."""
368 metric_name = None
369 try:
370 # Normalize metric name
371 metric_name = values['metric_name'].lower()
372 return metric_name, METRIC_MAPPINGS[metric_name]
373 except KeyError:
374 log.info("Metric name %s is invalid.", metric_name)
375 return metric_name, None
376
377 def read_metric_data(self, endpoint, auth_token, values):
378 """Collect metric measures over a specified time period."""
379 timestamps = []
380 data = []
381 try:
382 #get metric_id
383 metric_id = self.get_metric_id(endpoint, auth_token, METRIC_MAPPINGS[values['metric_name']], values['resource_uuid'])
384 # Try and collect measures
385 collection_unit = values['collection_unit'].upper()
386 collection_period = values['collection_period']
387
388 # Define the start and end time based on configurations
389 # FIXME: Local timezone may differ from timezone set in Gnocchi, causing discrepancies in measures
390 stop_time = time.strftime("%Y-%m-%d") + "T" + time.strftime("%X")
391 end_time = int(round(time.time() * 1000))
392 if collection_unit == 'YEAR':
393 diff = PERIOD_MS[collection_unit]
394 else:
395 diff = collection_period * PERIOD_MS[collection_unit]
396 s_time = (end_time - diff) / 1000.0
397 start_time = datetime.datetime.fromtimestamp(s_time).strftime(
398 '%Y-%m-%dT%H:%M:%S.%f')
399 base_url = "{}/v1/metric/%(0)s/measures?start=%(1)s&stop=%(2)s"
400 url = base_url.format(endpoint) % {
401 "0": metric_id, "1": start_time, "2": stop_time}
402
403 # Perform metric data request
404 metric_data = Common.perform_request(
405 url, auth_token, req_type="get")
406
407 # Generate a list of the requested timestamps and data
408 for r in json.loads(metric_data.text):
409 timestamp = r[0].replace("T", " ")
410 timestamps.append(timestamp)
411 data.append(r[2])
412
413 return timestamps, data
414 except Exception as exc:
415 log.warn("Failed to gather specified measures: %s", exc)
416 return timestamps, data
417
418 def response_list(self, metric_list, metric_name=None, resource=None):
419 """Create the appropriate lists for a list response."""
420 resp_list, name_list, res_list = [], [], []
421
422 # Create required lists
423 for row in metric_list:
424 # Only list OSM metrics
425 name = None
426 if row['name'] in METRIC_MAPPINGS.values():
427 for k,v in six.iteritems(METRIC_MAPPINGS):
428 if row['name'] == v:
429 name = k
430 metric = {"metric_name": name,
431 "metric_uuid": row['id'],
432 "metric_unit": row['unit'],
433 "resource_uuid": row['resource_id']}
434 resp_list.append(metric)
435 # Generate metric_name specific list
436 if metric_name is not None and name is not None:
437 if metric_name in METRIC_MAPPINGS.keys() and row['name'] == METRIC_MAPPINGS[metric_name]:
438 metric = {"metric_name": metric_name,
439 "metric_uuid": row['id'],
440 "metric_unit": row['unit'],
441 "resource_uuid": row['resource_id']}
442 name_list.append(metric)
443 # Generate resource specific list
444 if resource is not None and name is not None:
445 if row['resource_id'] == resource:
446 metric = {"metric_name": name,
447 "metric_uuid": row['id'],
448 "metric_unit": row['unit'],
449 "resource_uuid": row['resource_id']}
450 res_list.append(metric)
451
452 # Join required lists
453 if metric_name is not None and resource is not None:
454 # Return intersection of res_list and name_list
455 return [i for i in res_list for j in name_list if i['metric_uuid'] == j['metric_uuid']]
456 elif metric_name is not None:
457 return name_list
458 elif resource is not None:
459 return res_list
460 else:
461 return resp_list