Adds yaml format in OpenStack plugin
[osm/MON.git] / osm_mon / plugins / OpenStack / Gnocchi / metrics.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
6
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10
11 # http://www.apache.org/licenses/LICENSE-2.0
12
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
21 ##
22 """Carry out OpenStack metric requests via Gnocchi API."""
23
24 import datetime
25 import json
26 import logging
27
28 import time
29
30 import six
31 import yaml
32
33 from osm_mon.core.message_bus.producer import KafkaProducer
34 from osm_mon.plugins.OpenStack.common import Common
35
36 from osm_mon.plugins.OpenStack.response import OpenStack_Response
37 from osm_mon.plugins.OpenStack.settings import Config
38
39 log = logging.getLogger(__name__)
40
41 METRIC_MAPPINGS = {
42 "average_memory_utilization": "memory.usage",
43 "disk_read_ops": "disk.read.requests",
44 "disk_write_ops": "disk.write.requests",
45 "disk_read_bytes": "disk.read.bytes",
46 "disk_write_bytes": "disk.write.bytes",
47 "packets_dropped": "interface.if_dropped",
48 "packets_received": "interface.if_packets",
49 "packets_sent": "interface.if_packets",
50 "cpu_utilization": "cpu_util",
51 }
52
53 PERIOD_MS = {
54 "HR": 3600000,
55 "DAY": 86400000,
56 "WEEK": 604800000,
57 "MONTH": 2629746000,
58 "YEAR": 31556952000
59 }
60
61
62 class Metrics(object):
63 """OpenStack metric requests performed via the Gnocchi API."""
64
65 def __init__(self):
66 """Initialize the metric actions."""
67 # Configure an instance of the OpenStack metric plugin
68 config = Config.instance()
69 config.read_environ()
70
71 # Initialise authentication for API requests
72 self._common = Common()
73
74 # Use the Response class to generate valid json response messages
75 self._response = OpenStack_Response()
76
77 # Initializer a producer to send responses back to SO
78 self._producer = KafkaProducer("metric_response")
79
80 def metric_calls(self, message):
81 """Consume info from the message bus to manage metric requests."""
82 try:
83 values = json.loads(message.value)
84 except ValueError:
85 values = yaml.safe_load(message.value)
86 log.info("OpenStack metric action required.")
87
88 auth_token = Common.get_auth_token(values['vim_uuid'])
89
90 endpoint = Common.get_endpoint("metric", values['vim_uuid'])
91
92 if message.key == "create_metric_request":
93 # Configure metric
94 metric_details = values['metric_create']
95 metric_id, resource_id, status = self.configure_metric(
96 endpoint, auth_token, metric_details)
97
98 # Generate and send a create metric response
99 try:
100 resp_message = self._response.generate_response(
101 'create_metric_response', status=status,
102 cor_id=values['correlation_id'],
103 metric_id=metric_id, r_id=resource_id)
104 log.info("Response messages: %s", resp_message)
105 self._producer.create_metrics_resp(
106 'create_metric_response', resp_message,
107 'metric_response')
108 except Exception as exc:
109 log.warn("Failed to create response: %s", exc)
110
111 elif message.key == "read_metric_data_request":
112 # Read all metric data related to a specified metric
113 timestamps, metric_data = self.read_metric_data(
114 endpoint, auth_token, values)
115
116 # Generate and send a response message
117 try:
118 resp_message = self._response.generate_response(
119 'read_metric_data_response',
120 m_id=values['metric_uuid'],
121 m_name=values['metric_name'],
122 r_id=values['resource_uuid'],
123 cor_id=values['correlation_id'],
124 times=timestamps, metrics=metric_data)
125 log.info("Response message: %s", resp_message)
126 self._producer.read_metric_data_response(
127 'read_metric_data_response', resp_message,
128 'metric_response')
129 except Exception as exc:
130 log.warn("Failed to send read metric response:%s", exc)
131
132 elif message.key == "delete_metric_request":
133 # delete the specified metric in the request
134 metric_id = values['metric_uuid']
135 status = self.delete_metric(
136 endpoint, auth_token, metric_id)
137
138 # Generate and send a response message
139 try:
140 resp_message = self._response.generate_response(
141 'delete_metric_response', m_id=metric_id,
142 m_name=values['metric_name'],
143 status=status, r_id=values['resource_uuid'],
144 cor_id=values['correlation_id'])
145 log.info("Response message: %s", resp_message)
146 self._producer.delete_metric_response(
147 'delete_metric_response', resp_message,
148 'metric_response')
149 except Exception as exc:
150 log.warn("Failed to send delete response:%s", exc)
151
152 elif message.key == "update_metric_request":
153 # Gnocchi doesn't support configuration updates
154 # Log and send a response back to this effect
155 log.warn("Gnocchi doesn't support metric configuration\
156 updates.")
157 req_details = values['metric_create']
158 metric_name = req_details['metric_name']
159 resource_id = req_details['resource_uuid']
160 metric_id = self.get_metric_id(
161 endpoint, auth_token, metric_name, resource_id)
162
163 # Generate and send a response message
164 try:
165 resp_message = self._response.generate_response(
166 'update_metric_response', status=False,
167 cor_id=values['correlation_id'],
168 r_id=resource_id, m_id=metric_id)
169 log.info("Response message: %s", resp_message)
170 self._producer.update_metric_response(
171 'update_metric_response', resp_message,
172 'metric_response')
173 except Exception as exc:
174 log.warn("Failed to send an update response:%s", exc)
175
176 elif message.key == "list_metric_request":
177 list_details = values['metrics_list_request']
178
179 metric_list = self.list_metrics(
180 endpoint, auth_token, list_details)
181
182 # Generate and send a response message
183 try:
184 resp_message = self._response.generate_response(
185 'list_metric_response', m_list=metric_list,
186 cor_id=list_details['correlation_id'])
187 log.info("Response message: %s", resp_message)
188 self._producer.list_metric_response(
189 'list_metric_response', resp_message,
190 'metric_response')
191 except Exception as exc:
192 log.warn("Failed to send a list response:%s", exc)
193
194 else:
195 log.warn("Unknown key, no action will be performed.")
196
197 return
198
199 def configure_metric(self, endpoint, auth_token, values):
200 """Create the new metric in Gnocchi."""
201 try:
202 resource_id = values['resource_uuid']
203 except KeyError:
204 log.warn("Resource is not defined correctly.")
205 return None, None, False
206
207 # Check/Normalize metric name
208 norm_name, metric_name = self.get_metric_name(values)
209 if metric_name is None:
210 log.warn("This metric is not supported by this plugin.")
211 return None, resource_id, False
212
213 # Check for an existing metric for this resource
214 metric_id = self.get_metric_id(
215 endpoint, auth_token, metric_name, resource_id)
216
217 if metric_id is None:
218 # Try appending metric to existing resource
219 try:
220 base_url = "{}/v1/resource/generic/%s/metric"
221 res_url = base_url.format(endpoint) % resource_id
222 payload = {metric_name: {'archive_policy_name': 'high',
223 'unit': values['metric_unit']}}
224 result = Common.perform_request(
225 res_url, auth_token, req_type="post",
226 payload=json.dumps(payload))
227 # Get id of newly created metric
228 for row in json.loads(result.text):
229 if row['name'] == metric_name:
230 metric_id = row['id']
231 log.info("Appended metric to existing resource.")
232
233 return metric_id, resource_id, True
234 except Exception as exc:
235 # Gnocchi version of resource does not exist creating a new one
236 log.info("Failed to append metric to existing resource:%s",
237 exc)
238 try:
239 url = "{}/v1/resource/generic".format(endpoint)
240 metric = {'name': metric_name,
241 'archive_policy_name': 'high',
242 'unit': values['metric_unit'], }
243
244 resource_payload = json.dumps({'id': resource_id,
245 'metrics': {
246 metric_name: metric}})
247
248 resource = Common.perform_request(
249 url, auth_token, req_type="post",
250 payload=resource_payload)
251
252 # Return the newly created resource_id for creating alarms
253 new_resource_id = json.loads(resource.text)['id']
254 log.info("Created new resource for metric: %s",
255 new_resource_id)
256
257 metric_id = self.get_metric_id(
258 endpoint, auth_token, metric_name, new_resource_id)
259
260 return metric_id, new_resource_id, True
261 except Exception as exc:
262 log.warn("Failed to create a new resource:%s", exc)
263 return None, None, False
264
265 else:
266 log.info("This metric already exists for this resource.")
267
268 return metric_id, resource_id, False
269
270 def delete_metric(self, endpoint, auth_token, metric_id):
271 """Delete metric."""
272 url = "{}/v1/metric/%s".format(endpoint) % metric_id
273
274 try:
275 result = Common.perform_request(
276 url, auth_token, req_type="delete")
277 if str(result.status_code) == "404":
278 log.warn("Failed to delete the metric.")
279 return False
280 else:
281 return True
282 except Exception as exc:
283 log.warn("Failed to carry out delete metric request:%s", exc)
284 return False
285
286 def list_metrics(self, endpoint, auth_token, values):
287 """List all metrics."""
288
289 # Check for a specified list
290 try:
291 # Check if the metric_name was specified for the list
292 metric_name = values['metric_name'].lower()
293 if metric_name not in METRIC_MAPPINGS.keys():
294 log.warn("This metric is not supported, won't be listed.")
295 metric_name = None
296 except KeyError as exc:
297 log.info("Metric name is not specified: %s", exc)
298 metric_name = None
299
300 try:
301 resource = values['resource_uuid']
302 except KeyError as exc:
303 log.info("Resource is not specified:%s", exc)
304 resource = None
305
306 try:
307 url = "{}/v1/metric?sort=name:asc".format(endpoint)
308 result = Common.perform_request(
309 url, auth_token, req_type="get")
310 metrics = []
311 metrics_partial = json.loads(result.text)
312 for metric in metrics_partial:
313 metrics.append(metric)
314
315 while len(json.loads(result.text)) > 0:
316 last_metric_id = metrics_partial[-1]['id']
317 url = "{}/v1/metric?sort=name:asc&marker={}".format(endpoint, last_metric_id)
318 result = Common.perform_request(
319 url, auth_token, req_type="get")
320 if len(json.loads(result.text)) > 0:
321 metrics_partial = json.loads(result.text)
322 for metric in metrics_partial:
323 metrics.append(metric)
324
325 if metrics is not None:
326 # Format the list response
327 if metric_name is not None and resource is not None:
328 metric_list = self.response_list(
329 metrics, metric_name=metric_name, resource=resource)
330 log.info("Returning an %s resource list for %s metrics",
331 metric_name, resource)
332 elif metric_name is not None:
333 metric_list = self.response_list(
334 metrics, metric_name=metric_name)
335 log.info("Returning a list of %s metrics", metric_name)
336 elif resource is not None:
337 metric_list = self.response_list(
338 metrics, resource=resource)
339 log.info("Return a list of %s resource metrics", resource)
340 else:
341 metric_list = self.response_list(metrics)
342 log.info("Returning a complete list of metrics")
343
344 return metric_list
345 else:
346 log.info("There are no metrics available")
347 return []
348 except Exception as exc:
349 log.warn("Failed to generate any metric list. %s", exc)
350 return None
351
352 def get_metric_id(self, endpoint, auth_token, metric_name, resource_id):
353 """Check if the desired metric already exists for the resource."""
354 url = "{}/v1/resource/generic/%s".format(endpoint) % resource_id
355
356 try:
357 # Try return the metric id if it exists
358 result = Common.perform_request(
359 url, auth_token, req_type="get")
360 return json.loads(result.text)['metrics'][metric_name]
361 except Exception:
362 log.info("Metric doesn't exist. No metric_id available")
363 return None
364
365 def get_metric_name(self, values):
366 """Check metric name configuration and normalize."""
367 metric_name = None
368 try:
369 # Normalize metric name
370 metric_name = values['metric_name'].lower()
371 return metric_name, METRIC_MAPPINGS[metric_name]
372 except KeyError:
373 log.info("Metric name %s is invalid.", metric_name)
374 return metric_name, None
375
376 def read_metric_data(self, endpoint, auth_token, values):
377 """Collect metric measures over a specified time period."""
378 timestamps = []
379 data = []
380 try:
381 # Try and collect measures
382 metric_id = values['metric_uuid']
383 collection_unit = values['collection_unit'].upper()
384 collection_period = values['collection_period']
385
386 # Define the start and end time based on configurations
387 # FIXME: Local timezone may differ from timezone set in Gnocchi, causing discrepancies in measures
388 stop_time = time.strftime("%Y-%m-%d") + "T" + time.strftime("%X")
389 end_time = int(round(time.time() * 1000))
390 if collection_unit == 'YEAR':
391 diff = PERIOD_MS[collection_unit]
392 else:
393 diff = collection_period * PERIOD_MS[collection_unit]
394 s_time = (end_time - diff) / 1000.0
395 start_time = datetime.datetime.fromtimestamp(s_time).strftime(
396 '%Y-%m-%dT%H:%M:%S.%f')
397 base_url = "{}/v1/metric/%(0)s/measures?start=%(1)s&stop=%(2)s"
398 url = base_url.format(endpoint) % {
399 "0": metric_id, "1": start_time, "2": stop_time}
400
401 # Perform metric data request
402 metric_data = Common.perform_request(
403 url, auth_token, req_type="get")
404
405 # Generate a list of the requested timestamps and data
406 for r in json.loads(metric_data.text):
407 timestamp = r[0].replace("T", " ")
408 timestamps.append(timestamp)
409 data.append(r[2])
410
411 return timestamps, data
412 except Exception as exc:
413 log.warn("Failed to gather specified measures: %s", exc)
414 return timestamps, data
415
416 def response_list(self, metric_list, metric_name=None, resource=None):
417 """Create the appropriate lists for a list response."""
418 resp_list, name_list, res_list = [], [], []
419
420 # Create required lists
421 for row in metric_list:
422 # Only list OSM metrics
423 name = None
424 if row['name'] in METRIC_MAPPINGS.values():
425 for k,v in six.iteritems(METRIC_MAPPINGS):
426 if row['name'] == v:
427 name = k
428 metric = {"metric_name": name,
429 "metric_uuid": row['id'],
430 "metric_unit": row['unit'],
431 "resource_uuid": row['resource_id']}
432 resp_list.append(metric)
433 # Generate metric_name specific list
434 if metric_name is not None and name is not None:
435 if metric_name in METRIC_MAPPINGS.keys() and row['name'] == METRIC_MAPPINGS[metric_name]:
436 metric = {"metric_name": metric_name,
437 "metric_uuid": row['id'],
438 "metric_unit": row['unit'],
439 "resource_uuid": row['resource_id']}
440 name_list.append(metric)
441 # Generate resource specific list
442 if resource is not None and name is not None:
443 if row['resource_id'] == resource:
444 metric = {"metric_name": name,
445 "metric_uuid": row['id'],
446 "metric_unit": row['unit'],
447 "resource_uuid": row['resource_id']}
448 res_list.append(metric)
449
450 # Join required lists
451 if metric_name is not None and resource is not None:
452 # Return intersection of res_list and name_list
453 return [i for i in res_list for j in name_list if i['metric_uuid'] == j['metric_uuid']]
454 elif metric_name is not None:
455 return name_list
456 elif resource is not None:
457 return res_list
458 else:
459 return resp_list