Refactors code in OpenStack plugin
[osm/MON.git] / osm_mon / plugins / OpenStack / Gnocchi / metrics.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
6
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10
11 # http://www.apache.org/licenses/LICENSE-2.0
12
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
21 ##
22 """Carry out OpenStack metric requests via Gnocchi API."""
23
24 import datetime
25 import json
26 import logging
27 import time
28 from json import JSONDecodeError
29
30 import six
31 import yaml
32
33 from osm_mon.core.auth import AuthManager
34 from osm_mon.core.message_bus.producer import KafkaProducer
35 from osm_mon.plugins.OpenStack.common import Common
36 from osm_mon.plugins.OpenStack.response import OpenStack_Response
37
38 log = logging.getLogger(__name__)
39
40 METRIC_MAPPINGS = {
41 "average_memory_utilization": "memory.usage",
42 "disk_read_ops": "disk.read.requests",
43 "disk_write_ops": "disk.write.requests",
44 "disk_read_bytes": "disk.read.bytes",
45 "disk_write_bytes": "disk.write.bytes",
46 "packets_dropped": "interface.if_dropped",
47 "packets_received": "interface.if_packets",
48 "packets_sent": "interface.if_packets",
49 "cpu_utilization": "cpu_util",
50 }
51
52 PERIOD_MS = {
53 "HR": 3600000,
54 "DAY": 86400000,
55 "WEEK": 604800000,
56 "MONTH": 2629746000,
57 "YEAR": 31556952000
58 }
59
60
61 class Metrics(object):
62 """OpenStack metric requests performed via the Gnocchi API."""
63
64 def __init__(self):
65 """Initialize the metric actions."""
66
67 # Use the Response class to generate valid json response messages
68 self._response = OpenStack_Response()
69
70 # Initializer a producer to send responses back to SO
71 self._producer = KafkaProducer("metric_response")
72
73 self._auth_manager = AuthManager()
74
75 def metric_calls(self, message, vim_uuid):
76 """Consume info from the message bus to manage metric requests."""
77 log.info("OpenStack metric action required.")
78 try:
79 values = json.loads(message.value)
80 except JSONDecodeError:
81 values = yaml.safe_load(message.value)
82
83 log.info("OpenStack metric action required.")
84
85 if 'metric_name' in values and values['metric_name'] not in METRIC_MAPPINGS.keys():
86 raise ValueError('Metric ' + values['metric_name'] + ' is not supported.')
87
88 verify_ssl = self._auth_manager.is_verify_ssl(vim_uuid)
89
90 endpoint = Common.get_endpoint("metric", vim_uuid, verify_ssl=verify_ssl)
91
92 auth_token = Common.get_auth_token(vim_uuid, verify_ssl=verify_ssl)
93
94 if message.key == "create_metric_request":
95 metric_details = values['metric_create_request']
96 status = False
97 metric_id = None
98 resource_id = None
99 try:
100 # Configure metric
101 metric_id, resource_id = self.configure_metric(endpoint, auth_token, metric_details, verify_ssl)
102 log.info("Metric successfully created")
103 status = True
104 except Exception as e:
105 log.exception("Error creating metric")
106 raise e
107 finally:
108 self._generate_and_send_response('create_metric_response',
109 metric_details['correlation_id'],
110 status=status,
111 metric_id=metric_id,
112 resource_id=resource_id)
113
114 elif message.key == "read_metric_data_request":
115 metric_id = None
116 timestamps = []
117 metric_data = []
118 status = False
119 try:
120 metric_id = self.get_metric_id(endpoint,
121 auth_token,
122 METRIC_MAPPINGS[values['metric_name']],
123 values['resource_uuid'],
124 verify_ssl)
125 # Read all metric data related to a specified metric
126 timestamps, metric_data = self.read_metric_data(endpoint, auth_token, values, verify_ssl)
127 log.info("Metric data collected successfully")
128 status = True
129 except Exception as e:
130 log.exception("Error reading metric data")
131 raise e
132 finally:
133 self._generate_and_send_response('read_metric_data_response',
134 values['correlation_id'],
135 status=status,
136 metric_id=metric_id,
137 metric_name=values['metric_name'],
138 resource_id=values['resource_uuid'],
139 times=timestamps,
140 metrics=metric_data)
141
142 elif message.key == "delete_metric_request":
143 metric_id = None
144 status = False
145 try:
146 # delete the specified metric in the request
147 metric_id = self.get_metric_id(endpoint, auth_token, METRIC_MAPPINGS[values['metric_name']],
148 values['resource_uuid'], verify_ssl)
149 self.delete_metric(
150 endpoint, auth_token, metric_id, verify_ssl)
151 log.info("Metric deleted successfully")
152 status = True
153
154 except Exception as e:
155 log.exception("Error deleting metric")
156 raise e
157 finally:
158 self._generate_and_send_response('delete_metric_response',
159 values['correlation_id'],
160 metric_id=metric_id,
161 metric_name=values['metric_name'],
162 status=status,
163 resource_id=values['resource_uuid'])
164
165 elif message.key == "update_metric_request":
166 # Gnocchi doesn't support configuration updates
167 # Log and send a response back to this effect
168 log.warning("Gnocchi doesn't support metric configuration updates.")
169 req_details = values['metric_update_request']
170 metric_name = req_details['metric_name']
171 resource_id = req_details['resource_uuid']
172 metric_id = self.get_metric_id(endpoint, auth_token, metric_name, resource_id, verify_ssl)
173 self._generate_and_send_response('update_metric_response',
174 req_details['correlation_id'],
175 status=False,
176 resource_id=resource_id,
177 metric_id=metric_id)
178
179 elif message.key == "list_metric_request":
180 list_details = values['metrics_list_request']
181 metric_list = []
182 status = False
183 try:
184 metric_list = self.list_metrics(
185 endpoint, auth_token, list_details, verify_ssl)
186 log.info("Metrics listed successfully")
187 status = True
188 except Exception as e:
189 log.exception("Error listing metrics")
190 raise e
191 finally:
192 self._generate_and_send_response('list_metric_response',
193 list_details['correlation_id'],
194 status=status,
195 metric_list=metric_list)
196
197 else:
198 log.warning("Unknown key %s, no action will be performed.", message.key)
199
200 def configure_metric(self, endpoint, auth_token, values, verify_ssl):
201 """Create the new metric in Gnocchi."""
202 required_fields = ['resource_uuid', 'metric_name']
203 for field in required_fields:
204 if field not in values:
205 raise ValueError("Missing field: " + field)
206
207 resource_id = values['resource_uuid']
208 metric_name = values['metric_name'].lower()
209
210 # Check for an existing metric for this resource
211 metric_id = self.get_metric_id(
212 endpoint, auth_token, metric_name, resource_id, verify_ssl)
213
214 if metric_id is None:
215 # Try appending metric to existing resource
216 try:
217 base_url = "{}/v1/resource/generic/%s/metric"
218 res_url = base_url.format(endpoint) % resource_id
219 payload = {metric_name: {'archive_policy_name': 'high',
220 'unit': values['metric_unit']}}
221 result = Common.perform_request(
222 res_url,
223 auth_token,
224 req_type="post",
225 verify_ssl=verify_ssl,
226 payload=json.dumps(payload, sort_keys=True))
227 # Get id of newly created metric
228 for row in json.loads(result.text):
229 if row['name'] == metric_name:
230 metric_id = row['id']
231 log.info("Appended metric to existing resource.")
232
233 return metric_id, resource_id
234 except Exception as exc:
235 # Gnocchi version of resource does not exist creating a new one
236 log.info("Failed to append metric to existing resource:%s",
237 exc)
238 url = "{}/v1/resource/generic".format(endpoint)
239 metric = {'name': metric_name,
240 'archive_policy_name': 'high',
241 'unit': values['metric_unit'], }
242
243 resource_payload = json.dumps({'id': resource_id,
244 'metrics': {
245 metric_name: metric}}, sort_keys=True)
246
247 resource = Common.perform_request(
248 url,
249 auth_token,
250 req_type="post",
251 payload=resource_payload,
252 verify_ssl=verify_ssl)
253
254 # Return the newly created resource_id for creating alarms
255 new_resource_id = json.loads(resource.text)['id']
256 log.info("Created new resource for metric: %s",
257 new_resource_id)
258
259 metric_id = self.get_metric_id(
260 endpoint, auth_token, metric_name, new_resource_id, verify_ssl)
261
262 return metric_id, new_resource_id
263
264 else:
265 raise ValueError("Metric already exists for this resource")
266
267 def delete_metric(self, endpoint, auth_token, metric_id, verify_ssl):
268 """Delete metric."""
269 url = "{}/v1/metric/%s".format(endpoint) % metric_id
270
271 result = Common.perform_request(
272 url,
273 auth_token,
274 req_type="delete",
275 verify_ssl=verify_ssl)
276 if not str(result.status_code).startswith("2"):
277 log.warning("Failed to delete the metric.")
278 raise ValueError("Error deleting metric. Aodh API responded with code " + str(result.status_code))
279
280 def list_metrics(self, endpoint, auth_token, values, verify_ssl):
281 """List all metrics."""
282
283 # Check for a specified list
284 metric_name = None
285 if 'metric_name' in values:
286 metric_name = values['metric_name'].lower()
287
288 resource = None
289 if 'resource_uuid' in values:
290 resource = values['resource_uuid']
291
292 if resource:
293 url = "{}/v1/resource/generic/{}".format(endpoint, resource)
294 result = Common.perform_request(
295 url, auth_token, req_type="get", verify_ssl=verify_ssl)
296 resource_data = json.loads(result.text)
297 metrics = resource_data['metrics']
298
299 if metric_name:
300 if metrics.get(METRIC_MAPPINGS[metric_name]):
301 metric_id = metrics[METRIC_MAPPINGS[metric_name]]
302 url = "{}/v1/metric/{}".format(endpoint, metric_id)
303 result = Common.perform_request(
304 url, auth_token, req_type="get", verify_ssl=verify_ssl)
305 metric_list = json.loads(result.text)
306 log.info("Returning an %s resource list for %s metrics",
307 metric_name, resource)
308 return metric_list
309 else:
310 log.info("Metric {} not found for {} resource".format(metric_name, resource))
311 return []
312 else:
313 metric_list = []
314 for k, v in metrics.items():
315 url = "{}/v1/metric/{}".format(endpoint, v)
316 result = Common.perform_request(
317 url, auth_token, req_type="get", verify_ssl=verify_ssl)
318 metric = json.loads(result.text)
319 metric_list.append(metric)
320 if metric_list:
321 log.info("Return a list of %s resource metrics", resource)
322 return metric_list
323
324 else:
325 log.info("There are no metrics available")
326 return []
327 else:
328 url = "{}/v1/metric?sort=name:asc".format(endpoint)
329 result = Common.perform_request(
330 url, auth_token, req_type="get", verify_ssl=verify_ssl)
331 metrics = []
332 metrics_partial = json.loads(result.text)
333 for metric in metrics_partial:
334 metrics.append(metric)
335
336 while len(json.loads(result.text)) > 0:
337 last_metric_id = metrics_partial[-1]['id']
338 url = "{}/v1/metric?sort=name:asc&marker={}".format(endpoint, last_metric_id)
339 result = Common.perform_request(
340 url, auth_token, req_type="get", verify_ssl=verify_ssl)
341 if len(json.loads(result.text)) > 0:
342 metrics_partial = json.loads(result.text)
343 for metric in metrics_partial:
344 metrics.append(metric)
345
346 if metrics is not None:
347 # Format the list response
348 if metric_name is not None:
349 metric_list = self.response_list(
350 metrics, metric_name=metric_name)
351 log.info("Returning a list of %s metrics", metric_name)
352 else:
353 metric_list = self.response_list(metrics)
354 log.info("Returning a complete list of metrics")
355 return metric_list
356 else:
357 log.info("There are no metrics available")
358 return []
359
360 def get_metric_id(self, endpoint, auth_token, metric_name, resource_id, verify_ssl):
361 """Check if the desired metric already exists for the resource."""
362 url = "{}/v1/resource/generic/%s".format(endpoint) % resource_id
363 try:
364 # Try return the metric id if it exists
365 result = Common.perform_request(
366 url,
367 auth_token,
368 req_type="get",
369 verify_ssl=verify_ssl)
370 return json.loads(result.text)['metrics'][metric_name]
371 except KeyError as e:
372 log.error("Metric doesn't exist. No metric_id available")
373 raise e
374
375 def read_metric_data(self, endpoint, auth_token, values, verify_ssl):
376 """Collect metric measures over a specified time period."""
377 timestamps = []
378 data = []
379 # get metric_id
380 metric_id = self.get_metric_id(endpoint, auth_token, METRIC_MAPPINGS[values['metric_name']],
381 values['resource_uuid'], verify_ssl)
382 # Try and collect measures
383 collection_unit = values['collection_unit'].upper()
384 collection_period = values['collection_period']
385
386 # Define the start and end time based on configurations
387 # FIXME: Local timezone may differ from timezone set in Gnocchi, causing discrepancies in measures
388 stop_time = time.strftime("%Y-%m-%d") + "T" + time.strftime("%X")
389 end_time = int(round(time.time() * 1000))
390 if collection_unit == 'YEAR':
391 diff = PERIOD_MS[collection_unit]
392 else:
393 diff = collection_period * PERIOD_MS[collection_unit]
394 s_time = (end_time - diff) / 1000.0
395 start_time = datetime.datetime.fromtimestamp(s_time).strftime(
396 '%Y-%m-%dT%H:%M:%S.%f')
397 base_url = "{}/v1/metric/%(0)s/measures?start=%(1)s&stop=%(2)s"
398 url = base_url.format(endpoint) % {
399 "0": metric_id, "1": start_time, "2": stop_time}
400
401 # Perform metric data request
402 metric_data = Common.perform_request(
403 url,
404 auth_token,
405 req_type="get",
406 verify_ssl=verify_ssl)
407
408 # Generate a list of the requested timestamps and data
409 for r in json.loads(metric_data.text):
410 timestamp = r[0].replace("T", " ")
411 timestamps.append(timestamp)
412 data.append(r[2])
413
414 return timestamps, data
415
416 def response_list(self, metric_list, metric_name=None, resource=None):
417 """Create the appropriate lists for a list response."""
418 resp_list, name_list, res_list = [], [], []
419
420 # Create required lists
421 for row in metric_list:
422 # Only list OSM metrics
423 name = None
424 if row['name'] in METRIC_MAPPINGS.values():
425 for k, v in six.iteritems(METRIC_MAPPINGS):
426 if row['name'] == v:
427 name = k
428 metric = {"metric_name": name,
429 "metric_uuid": row['id'],
430 "metric_unit": row['unit'],
431 "resource_uuid": row['resource_id']}
432 resp_list.append(metric)
433 # Generate metric_name specific list
434 if metric_name is not None and name is not None:
435 if metric_name in METRIC_MAPPINGS.keys() and row['name'] == METRIC_MAPPINGS[metric_name]:
436 metric = {"metric_name": metric_name,
437 "metric_uuid": row['id'],
438 "metric_unit": row['unit'],
439 "resource_uuid": row['resource_id']}
440 name_list.append(metric)
441 # Generate resource specific list
442 if resource is not None and name is not None:
443 if row['resource_id'] == resource:
444 metric = {"metric_name": name,
445 "metric_uuid": row['id'],
446 "metric_unit": row['unit'],
447 "resource_uuid": row['resource_id']}
448 res_list.append(metric)
449
450 # Join required lists
451 if metric_name is not None and resource is not None:
452 # Return intersection of res_list and name_list
453 return [i for i in res_list for j in name_list if i['metric_uuid'] == j['metric_uuid']]
454 elif metric_name is not None:
455 return name_list
456 elif resource is not None:
457 return res_list
458 else:
459 return resp_list
460
461 def _generate_and_send_response(self, key, correlation_id, **kwargs):
462 try:
463 resp_message = self._response.generate_response(
464 key, cor_id=correlation_id, **kwargs)
465 log.info("Response Message: %s", resp_message)
466 self._producer.publish_metrics_response(
467 key, resp_message)
468 except Exception as e:
469 log.exception("Response creation failed:")
470 raise e