Merge "Get Metric_UUID directly from VIM Change-Id: I9b8c47b2fd7987f8834dc7c90e826d9d...
[osm/MON.git] / osm_mon / plugins / OpenStack / Gnocchi / metrics.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
6
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10
11 # http://www.apache.org/licenses/LICENSE-2.0
12
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
21 ##
22 """Carry out OpenStack metric requests via Gnocchi API."""
23
24 import datetime
25 import json
26 import logging
27
28 import time
29
30 import six
31 import yaml
32
33 from osm_mon.core.message_bus.producer import KafkaProducer
34 from osm_mon.plugins.OpenStack.common import Common
35
36 from osm_mon.plugins.OpenStack.response import OpenStack_Response
37 from osm_mon.plugins.OpenStack.settings import Config
38
39 log = logging.getLogger(__name__)
40
41 METRIC_MAPPINGS = {
42 "average_memory_utilization": "memory.usage",
43 "disk_read_ops": "disk.read.requests",
44 "disk_write_ops": "disk.write.requests",
45 "disk_read_bytes": "disk.read.bytes",
46 "disk_write_bytes": "disk.write.bytes",
47 "packets_dropped": "interface.if_dropped",
48 "packets_received": "interface.if_packets",
49 "packets_sent": "interface.if_packets",
50 "cpu_utilization": "cpu_util",
51 }
52
53 PERIOD_MS = {
54 "HR": 3600000,
55 "DAY": 86400000,
56 "WEEK": 604800000,
57 "MONTH": 2629746000,
58 "YEAR": 31556952000
59 }
60
61
62 class Metrics(object):
63 """OpenStack metric requests performed via the Gnocchi API."""
64
65 def __init__(self):
66 """Initialize the metric actions."""
67 # Configure an instance of the OpenStack metric plugin
68 config = Config.instance()
69 config.read_environ()
70
71 # Initialise authentication for API requests
72 self._common = Common()
73
74 # Use the Response class to generate valid json response messages
75 self._response = OpenStack_Response()
76
77 # Initializer a producer to send responses back to SO
78 self._producer = KafkaProducer("metric_response")
79
80 def metric_calls(self, message):
81 """Consume info from the message bus to manage metric requests."""
82 try:
83 values = json.loads(message.value)
84 except ValueError:
85 values = yaml.safe_load(message.value)
86 log.info("OpenStack metric action required.")
87
88 auth_token = Common.get_auth_token(values['vim_uuid'])
89
90 endpoint = Common.get_endpoint("metric", values['vim_uuid'])
91
92 if message.key == "create_metric_request":
93 # Configure metric
94 metric_details = values['metric_create']
95 metric_id, resource_id, status = self.configure_metric(
96 endpoint, auth_token, metric_details)
97
98 # Generate and send a create metric response
99 try:
100 resp_message = self._response.generate_response(
101 'create_metric_response', status=status,
102 cor_id=values['correlation_id'],
103 metric_id=metric_id, r_id=resource_id)
104 log.info("Response messages: %s", resp_message)
105 self._producer.create_metrics_resp(
106 'create_metric_response', resp_message,
107 'metric_response')
108 except Exception as exc:
109 log.warn("Failed to create response: %s", exc)
110
111 elif message.key == "read_metric_data_request":
112 # Read all metric data related to a specified metric
113 timestamps, metric_data = self.read_metric_data(
114 endpoint, auth_token, values)
115
116 # Generate and send a response message
117 try:
118
119 metric_id = self.get_metric_id(endpoint, auth_token, METRIC_MAPPINGS[values['metric_name']],
120 values['resource_uuid'])
121 resp_message = self._response.generate_response(
122 'read_metric_data_response',
123 m_id=metric_id,
124 m_name=values['metric_name'],
125 r_id=values['resource_uuid'],
126 cor_id=values['correlation_id'],
127 times=timestamps, metrics=metric_data)
128 log.info("Response message: %s", resp_message)
129 self._producer.read_metric_data_response(
130 'read_metric_data_response', resp_message,
131 'metric_response')
132 except Exception as exc:
133 log.warn("Failed to send read metric response:%s", exc)
134
135 elif message.key == "delete_metric_request":
136 # delete the specified metric in the request
137 metric_id = self.get_metric_id(endpoint, auth_token, METRIC_MAPPINGS[values['metric_name']],
138 values['resource_uuid'])
139 status = self.delete_metric(
140 endpoint, auth_token, metric_id)
141
142 # Generate and send a response message
143 try:
144 resp_message = self._response.generate_response(
145 'delete_metric_response', m_id=metric_id,
146 m_name=values['metric_name'],
147 status=status, r_id=values['resource_uuid'],
148 cor_id=values['correlation_id'])
149 log.info("Response message: %s", resp_message)
150 self._producer.delete_metric_response(
151 'delete_metric_response', resp_message,
152 'metric_response')
153 except Exception as exc:
154 log.warn("Failed to send delete response:%s", exc)
155
156 elif message.key == "update_metric_request":
157 # Gnocchi doesn't support configuration updates
158 # Log and send a response back to this effect
159 log.warn("Gnocchi doesn't support metric configuration\
160 updates.")
161 req_details = values['metric_create']
162 metric_name = req_details['metric_name']
163 resource_id = req_details['resource_uuid']
164 metric_id = self.get_metric_id(
165 endpoint, auth_token, metric_name, resource_id)
166
167 # Generate and send a response message
168 try:
169 resp_message = self._response.generate_response(
170 'update_metric_response', status=False,
171 cor_id=values['correlation_id'],
172 r_id=resource_id, m_id=metric_id)
173 log.info("Response message: %s", resp_message)
174 self._producer.update_metric_response(
175 'update_metric_response', resp_message,
176 'metric_response')
177 except Exception as exc:
178 log.warn("Failed to send an update response:%s", exc)
179
180 elif message.key == "list_metric_request":
181 list_details = values['metrics_list_request']
182
183 metric_list = self.list_metrics(
184 endpoint, auth_token, list_details)
185
186 # Generate and send a response message
187 try:
188 resp_message = self._response.generate_response(
189 'list_metric_response', m_list=metric_list,
190 cor_id=list_details['correlation_id'])
191 log.info("Response message: %s", resp_message)
192 self._producer.list_metric_response(
193 'list_metric_response', resp_message,
194 'metric_response')
195 except Exception as exc:
196 log.warn("Failed to send a list response:%s", exc)
197
198 else:
199 log.warn("Unknown key, no action will be performed.")
200
201 return
202
203 def configure_metric(self, endpoint, auth_token, values):
204 """Create the new metric in Gnocchi."""
205 try:
206 resource_id = values['resource_uuid']
207 except KeyError:
208 log.warn("Resource is not defined correctly.")
209 return None, None, False
210
211 # Check/Normalize metric name
212 norm_name, metric_name = self.get_metric_name(values)
213 if metric_name is None:
214 log.warn("This metric is not supported by this plugin.")
215 return None, resource_id, False
216
217 # Check for an existing metric for this resource
218 metric_id = self.get_metric_id(
219 endpoint, auth_token, metric_name, resource_id)
220
221 if metric_id is None:
222 # Try appending metric to existing resource
223 try:
224 base_url = "{}/v1/resource/generic/%s/metric"
225 res_url = base_url.format(endpoint) % resource_id
226 payload = {metric_name: {'archive_policy_name': 'high',
227 'unit': values['metric_unit']}}
228 result = Common.perform_request(
229 res_url, auth_token, req_type="post",
230 payload=json.dumps(payload))
231 # Get id of newly created metric
232 for row in json.loads(result.text):
233 if row['name'] == metric_name:
234 metric_id = row['id']
235 log.info("Appended metric to existing resource.")
236
237 return metric_id, resource_id, True
238 except Exception as exc:
239 # Gnocchi version of resource does not exist creating a new one
240 log.info("Failed to append metric to existing resource:%s",
241 exc)
242 try:
243 url = "{}/v1/resource/generic".format(endpoint)
244 metric = {'name': metric_name,
245 'archive_policy_name': 'high',
246 'unit': values['metric_unit'], }
247
248 resource_payload = json.dumps({'id': resource_id,
249 'metrics': {
250 metric_name: metric}})
251
252 resource = Common.perform_request(
253 url, auth_token, req_type="post",
254 payload=resource_payload)
255
256 # Return the newly created resource_id for creating alarms
257 new_resource_id = json.loads(resource.text)['id']
258 log.info("Created new resource for metric: %s",
259 new_resource_id)
260
261 metric_id = self.get_metric_id(
262 endpoint, auth_token, metric_name, new_resource_id)
263
264 return metric_id, new_resource_id, True
265 except Exception as exc:
266 log.warn("Failed to create a new resource:%s", exc)
267 return None, None, False
268
269 else:
270 log.info("This metric already exists for this resource.")
271
272 return metric_id, resource_id, False
273
274 def delete_metric(self, endpoint, auth_token, metric_id):
275 """Delete metric."""
276 url = "{}/v1/metric/%s".format(endpoint) % metric_id
277
278 try:
279 result = Common.perform_request(
280 url, auth_token, req_type="delete")
281 if str(result.status_code) == "404":
282 log.warn("Failed to delete the metric.")
283 return False
284 else:
285 return True
286 except Exception as exc:
287 log.warn("Failed to carry out delete metric request:%s", exc)
288 return False
289
290 def list_metrics(self, endpoint, auth_token, values):
291 """List all metrics."""
292
293 # Check for a specified list
294 try:
295 # Check if the metric_name was specified for the list
296 metric_name = values['metric_name'].lower()
297 if metric_name not in METRIC_MAPPINGS.keys():
298 log.warn("This metric is not supported, won't be listed.")
299 metric_name = None
300 except KeyError as exc:
301 log.info("Metric name is not specified: %s", exc)
302 metric_name = None
303
304 try:
305 resource = values['resource_uuid']
306 except KeyError as exc:
307 log.info("Resource is not specified:%s", exc)
308 resource = None
309
310 try:
311 url = "{}/v1/metric?sort=name:asc".format(endpoint)
312 result = Common.perform_request(
313 url, auth_token, req_type="get")
314 metrics = []
315 metrics_partial = json.loads(result.text)
316 for metric in metrics_partial:
317 metrics.append(metric)
318
319 while len(json.loads(result.text)) > 0:
320 last_metric_id = metrics_partial[-1]['id']
321 url = "{}/v1/metric?sort=name:asc&marker={}".format(endpoint, last_metric_id)
322 result = Common.perform_request(
323 url, auth_token, req_type="get")
324 if len(json.loads(result.text)) > 0:
325 metrics_partial = json.loads(result.text)
326 for metric in metrics_partial:
327 metrics.append(metric)
328
329 if metrics is not None:
330 # Format the list response
331 if metric_name is not None and resource is not None:
332 metric_list = self.response_list(
333 metrics, metric_name=metric_name, resource=resource)
334 log.info("Returning an %s resource list for %s metrics",
335 metric_name, resource)
336 elif metric_name is not None:
337 metric_list = self.response_list(
338 metrics, metric_name=metric_name)
339 log.info("Returning a list of %s metrics", metric_name)
340 elif resource is not None:
341 metric_list = self.response_list(
342 metrics, resource=resource)
343 log.info("Return a list of %s resource metrics", resource)
344 else:
345 metric_list = self.response_list(metrics)
346 log.info("Returning a complete list of metrics")
347
348 return metric_list
349 else:
350 log.info("There are no metrics available")
351 return []
352 except Exception as exc:
353 log.warn("Failed to generate any metric list. %s", exc)
354 return None
355
356 def get_metric_id(self, endpoint, auth_token, metric_name, resource_id):
357 """Check if the desired metric already exists for the resource."""
358 url = "{}/v1/resource/generic/%s".format(endpoint) % resource_id
359 try:
360 # Try return the metric id if it exists
361 result = Common.perform_request(
362 url, auth_token, req_type="get")
363 return json.loads(result.text)['metrics'][metric_name]
364 except Exception:
365 log.info("Metric doesn't exist. No metric_id available")
366 return None
367
368 def get_metric_name(self, values):
369 """Check metric name configuration and normalize."""
370 metric_name = None
371 try:
372 # Normalize metric name
373 metric_name = values['metric_name'].lower()
374 return metric_name, METRIC_MAPPINGS[metric_name]
375 except KeyError:
376 log.info("Metric name %s is invalid.", metric_name)
377 return metric_name, None
378
379 def read_metric_data(self, endpoint, auth_token, values):
380 """Collect metric measures over a specified time period."""
381 timestamps = []
382 data = []
383 try:
384 #get metric_id
385 metric_id = self.get_metric_id(endpoint, auth_token, METRIC_MAPPINGS[values['metric_name']], values['resource_uuid'])
386 # Try and collect measures
387 collection_unit = values['collection_unit'].upper()
388 collection_period = values['collection_period']
389
390 # Define the start and end time based on configurations
391 # FIXME: Local timezone may differ from timezone set in Gnocchi, causing discrepancies in measures
392 stop_time = time.strftime("%Y-%m-%d") + "T" + time.strftime("%X")
393 end_time = int(round(time.time() * 1000))
394 if collection_unit == 'YEAR':
395 diff = PERIOD_MS[collection_unit]
396 else:
397 diff = collection_period * PERIOD_MS[collection_unit]
398 s_time = (end_time - diff) / 1000.0
399 start_time = datetime.datetime.fromtimestamp(s_time).strftime(
400 '%Y-%m-%dT%H:%M:%S.%f')
401 base_url = "{}/v1/metric/%(0)s/measures?start=%(1)s&stop=%(2)s"
402 url = base_url.format(endpoint) % {
403 "0": metric_id, "1": start_time, "2": stop_time}
404
405 # Perform metric data request
406 metric_data = Common.perform_request(
407 url, auth_token, req_type="get")
408
409 # Generate a list of the requested timestamps and data
410 for r in json.loads(metric_data.text):
411 timestamp = r[0].replace("T", " ")
412 timestamps.append(timestamp)
413 data.append(r[2])
414
415 return timestamps, data
416 except Exception as exc:
417 log.warn("Failed to gather specified measures: %s", exc)
418 return timestamps, data
419
420 def response_list(self, metric_list, metric_name=None, resource=None):
421 """Create the appropriate lists for a list response."""
422 resp_list, name_list, res_list = [], [], []
423
424 # Create required lists
425 for row in metric_list:
426 # Only list OSM metrics
427 name = None
428 if row['name'] in METRIC_MAPPINGS.values():
429 for k,v in six.iteritems(METRIC_MAPPINGS):
430 if row['name'] == v:
431 name = k
432 metric = {"metric_name": name,
433 "metric_uuid": row['id'],
434 "metric_unit": row['unit'],
435 "resource_uuid": row['resource_id']}
436 resp_list.append(metric)
437 # Generate metric_name specific list
438 if metric_name is not None and name is not None:
439 if metric_name in METRIC_MAPPINGS.keys() and row['name'] == METRIC_MAPPINGS[metric_name]:
440 metric = {"metric_name": metric_name,
441 "metric_uuid": row['id'],
442 "metric_unit": row['unit'],
443 "resource_uuid": row['resource_id']}
444 name_list.append(metric)
445 # Generate resource specific list
446 if resource is not None and name is not None:
447 if row['resource_id'] == resource:
448 metric = {"metric_name": name,
449 "metric_uuid": row['id'],
450 "metric_unit": row['unit'],
451 "resource_uuid": row['resource_id']}
452 res_list.append(metric)
453
454 # Join required lists
455 if metric_name is not None and resource is not None:
456 # Return intersection of res_list and name_list
457 return [i for i in res_list for j in name_list if i['metric_uuid'] == j['metric_uuid']]
458 elif metric_name is not None:
459 return name_list
460 elif resource is not None:
461 return res_list
462 else:
463 return resp_list