X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fplugins%2FOpenStack%2FGnocchi%2Fmetrics.py;h=7bfbc47dc79fcc54868e2a91cdbd04a1fc1dd618;hb=e80db311a29dc8562dc84ae3336af167bac2ec5b;hp=825671ea348c54463d11e018a4134660b2c744d9;hpb=3729638a9d26e07d551cf4ad7ac92c553e64cc6d;p=osm%2FMON.git diff --git a/osm_mon/plugins/OpenStack/Gnocchi/metrics.py b/osm_mon/plugins/OpenStack/Gnocchi/metrics.py index 825671e..7bfbc47 100644 --- a/osm_mon/plugins/OpenStack/Gnocchi/metrics.py +++ b/osm_mon/plugins/OpenStack/Gnocchi/metrics.py @@ -25,6 +25,7 @@ import datetime import json import logging import time +from json import JSONDecodeError import six import yaml @@ -76,9 +77,11 @@ class Metrics(object): log.info("OpenStack metric action required.") try: values = json.loads(message.value) - except ValueError: + except JSONDecodeError: values = yaml.safe_load(message.value) + log.info("OpenStack metric action required.") + if 'metric_name' in values and values['metric_name'] not in METRIC_MAPPINGS.keys(): raise ValueError('Metric ' + values['metric_name'] + ' is not supported.') @@ -89,129 +92,120 @@ class Metrics(object): auth_token = Common.get_auth_token(vim_uuid, verify_ssl=verify_ssl) if message.key == "create_metric_request": - # Configure metric metric_details = values['metric_create_request'] - metric_id, resource_id, status = self.configure_metric( - endpoint, auth_token, metric_details, verify_ssl) - - # Generate and send a create metric response + status = False + metric_id = None + resource_id = None try: - resp_message = self._response.generate_response( - 'create_metric_response', - status=status, - cor_id=metric_details['correlation_id'], - metric_id=metric_id, - r_id=resource_id) - log.info("Response messages: %s", resp_message) - self._producer.publish_metrics_response( - 'create_metric_response', resp_message) - except Exception as exc: - log.warning("Failed to create response: %s", exc) + # Configure metric + metric_id, resource_id = self.configure_metric(endpoint, auth_token, metric_details, verify_ssl) + log.info("Metric successfully created") + status = True + except Exception as e: + log.exception("Error creating metric") + raise e + finally: + self._generate_and_send_response('create_metric_response', + metric_details['correlation_id'], + status=status, + metric_id=metric_id, + resource_id=resource_id) elif message.key == "read_metric_data_request": - # Read all metric data related to a specified metric - timestamps, metric_data = self.read_metric_data(endpoint, auth_token, values, verify_ssl) - - # Generate and send a response message + metric_id = None + timestamps = [] + metric_data = [] + status = False try: metric_id = self.get_metric_id(endpoint, auth_token, METRIC_MAPPINGS[values['metric_name']], values['resource_uuid'], verify_ssl) - resp_message = self._response.generate_response( - 'read_metric_data_response', - m_id=metric_id, - m_name=values['metric_name'], - r_id=values['resource_uuid'], - cor_id=values['correlation_id'], - times=timestamps, - metrics=metric_data) - log.info("Response message: %s", resp_message) - self._producer.read_metric_data_response( - 'read_metric_data_response', resp_message) - except Exception as exc: - log.warning("Failed to create response: %s", exc) + # Read all metric data related to a specified metric + timestamps, metric_data = self.read_metric_data(endpoint, auth_token, values, verify_ssl) + log.info("Metric data collected successfully") + status = True + except Exception as e: + log.exception("Error reading metric data") + raise e + finally: + self._generate_and_send_response('read_metric_data_response', + values['correlation_id'], + status=status, + metric_id=metric_id, + metric_name=values['metric_name'], + resource_id=values['resource_uuid'], + times=timestamps, + metrics=metric_data) elif message.key == "delete_metric_request": - # delete the specified metric in the request - metric_id = self.get_metric_id(endpoint, auth_token, METRIC_MAPPINGS[values['metric_name']], - values['resource_uuid'], verify_ssl) - status = self.delete_metric( - endpoint, auth_token, metric_id, verify_ssl) - - # Generate and send a response message + metric_id = None + status = False try: - resp_message = self._response.generate_response( - 'delete_metric_response', - m_id=metric_id, - m_name=values['metric_name'], - status=status, - r_id=values['resource_uuid'], - cor_id=values['correlation_id']) - log.info("Response message: %s", resp_message) - self._producer.delete_metric_response( - 'delete_metric_response', resp_message) - except Exception as exc: - log.warning("Failed to create response: %s", exc) + # delete the specified metric in the request + metric_id = self.get_metric_id(endpoint, auth_token, METRIC_MAPPINGS[values['metric_name']], + values['resource_uuid'], verify_ssl) + self.delete_metric( + endpoint, auth_token, metric_id, verify_ssl) + log.info("Metric deleted successfully") + status = True + + except Exception as e: + log.exception("Error deleting metric") + raise e + finally: + self._generate_and_send_response('delete_metric_response', + values['correlation_id'], + metric_id=metric_id, + metric_name=values['metric_name'], + status=status, + resource_id=values['resource_uuid']) elif message.key == "update_metric_request": # Gnocchi doesn't support configuration updates # Log and send a response back to this effect log.warning("Gnocchi doesn't support metric configuration updates.") - req_details = values['metric_create_request'] + req_details = values['metric_update_request'] metric_name = req_details['metric_name'] resource_id = req_details['resource_uuid'] metric_id = self.get_metric_id(endpoint, auth_token, metric_name, resource_id, verify_ssl) - - # Generate and send a response message - try: - resp_message = self._response.generate_response( - 'update_metric_response', - status=False, - cor_id=req_details['correlation_id'], - r_id=resource_id, - m_id=metric_id) - log.info("Response message: %s", resp_message) - self._producer.update_metric_response( - 'update_metric_response', resp_message) - except Exception as exc: - log.warning("Failed to create response: %s", exc) + self._generate_and_send_response('update_metric_response', + req_details['correlation_id'], + status=False, + resource_id=resource_id, + metric_id=metric_id) elif message.key == "list_metric_request": list_details = values['metrics_list_request'] - - metric_list = self.list_metrics( - endpoint, auth_token, list_details, verify_ssl) - - # Generate and send a response message + metric_list = [] + status = False try: - resp_message = self._response.generate_response( - 'list_metric_response', - m_list=metric_list, - cor_id=list_details['correlation_id']) - log.info("Response message: %s", resp_message) - self._producer.list_metric_response( - 'list_metric_response', resp_message) - except Exception as exc: - log.warning("Failed to create response: %s", exc) + metric_list = self.list_metrics( + endpoint, auth_token, list_details, verify_ssl) + log.info("Metrics listed successfully") + status = True + except Exception as e: + log.exception("Error listing metrics") + raise e + finally: + self._generate_and_send_response('list_metric_response', + list_details['correlation_id'], + status=status, + metric_list=metric_list) else: log.warning("Unknown key %s, no action will be performed.", message.key) def configure_metric(self, endpoint, auth_token, values, verify_ssl): """Create the new metric in Gnocchi.""" - try: - resource_id = values['resource_uuid'] - except KeyError: - log.warning("resource_uuid field is missing.") - return None, None, False + required_fields = ['resource_uuid', 'metric_name'] + for field in required_fields: + if field not in values: + raise ValueError("Missing field: " + field) - try: - metric_name = values['metric_name'].lower() - except KeyError: - log.warning("metric_name field is missing.") - return None, None, False + resource_id = values['resource_uuid'] + metric_name = values['metric_name'].lower() # Check for an existing metric for this resource metric_id = self.get_metric_id( @@ -236,64 +230,52 @@ class Metrics(object): metric_id = row['id'] log.info("Appended metric to existing resource.") - return metric_id, resource_id, True + return metric_id, resource_id except Exception as exc: # Gnocchi version of resource does not exist creating a new one log.info("Failed to append metric to existing resource:%s", exc) - try: - url = "{}/v1/resource/generic".format(endpoint) - metric = {'name': metric_name, - 'archive_policy_name': 'high', - 'unit': values['metric_unit'], } - - resource_payload = json.dumps({'id': resource_id, - 'metrics': { - metric_name: metric}}, sort_keys=True) - - resource = Common.perform_request( - url, - auth_token, - req_type="post", - payload=resource_payload, - verify_ssl=verify_ssl) - - # Return the newly created resource_id for creating alarms - new_resource_id = json.loads(resource.text)['id'] - log.info("Created new resource for metric: %s", - new_resource_id) - - metric_id = self.get_metric_id( - endpoint, auth_token, metric_name, new_resource_id, verify_ssl) - - return metric_id, new_resource_id, True - except Exception as exc: - log.warning("Failed to create a new resource: %s", exc) - return None, None, False + url = "{}/v1/resource/generic".format(endpoint) + metric = {'name': metric_name, + 'archive_policy_name': 'high', + 'unit': values['metric_unit'], } - else: - log.info("This metric already exists for this resource.") + resource_payload = json.dumps({'id': resource_id, + 'metrics': { + metric_name: metric}}, sort_keys=True) + + resource = Common.perform_request( + url, + auth_token, + req_type="post", + payload=resource_payload, + verify_ssl=verify_ssl) + + # Return the newly created resource_id for creating alarms + new_resource_id = json.loads(resource.text)['id'] + log.info("Created new resource for metric: %s", + new_resource_id) - return metric_id, resource_id, False + metric_id = self.get_metric_id( + endpoint, auth_token, metric_name, new_resource_id, verify_ssl) + + return metric_id, new_resource_id + + else: + raise ValueError("Metric already exists for this resource") def delete_metric(self, endpoint, auth_token, metric_id, verify_ssl): """Delete metric.""" url = "{}/v1/metric/%s".format(endpoint) % metric_id - try: - result = Common.perform_request( - url, - auth_token, - req_type="delete", - verify_ssl=verify_ssl) - if str(result.status_code) == "404": - log.warning("Failed to delete the metric.") - return False - else: - return True - except Exception as exc: - log.warning("Failed to delete metric: %s", exc) - return False + result = Common.perform_request( + url, + auth_token, + req_type="delete", + verify_ssl=verify_ssl) + if not str(result.status_code).startswith("2"): + log.warning("Failed to delete the metric.") + raise ValueError("Error deleting metric. Aodh API responded with code " + str(result.status_code)) def list_metrics(self, endpoint, auth_token, values, verify_ssl): """List all metrics.""" @@ -307,77 +289,73 @@ class Metrics(object): if 'resource_uuid' in values: resource = values['resource_uuid'] - try: - if resource: - url = "{}/v1/resource/generic/{}".format(endpoint, resource) - result = Common.perform_request( - url, auth_token, req_type="get", verify_ssl=verify_ssl) - resource_data = json.loads(result.text) - metrics = resource_data['metrics'] - - if metric_name: - if metrics.get(METRIC_MAPPINGS[metric_name]): - metric_id = metrics[METRIC_MAPPINGS[metric_name]] - url = "{}/v1/metric/{}".format(endpoint, metric_id) - result = Common.perform_request( - url, auth_token, req_type="get", verify_ssl=verify_ssl) - metric_list = json.loads(result.text) - log.info("Returning an %s resource list for %s metrics", - metric_name, resource) - return metric_list - else: - log.info("Metric {} not found for {} resource".format(metric_name, resource)) - return None + if resource: + url = "{}/v1/resource/generic/{}".format(endpoint, resource) + result = Common.perform_request( + url, auth_token, req_type="get", verify_ssl=verify_ssl) + resource_data = json.loads(result.text) + metrics = resource_data['metrics'] + + if metric_name: + if metrics.get(METRIC_MAPPINGS[metric_name]): + metric_id = metrics[METRIC_MAPPINGS[metric_name]] + url = "{}/v1/metric/{}".format(endpoint, metric_id) + result = Common.perform_request( + url, auth_token, req_type="get", verify_ssl=verify_ssl) + metric_list = json.loads(result.text) + log.info("Returning an %s resource list for %s metrics", + metric_name, resource) + return metric_list else: - metric_list = [] - for k, v in metrics.items(): - url = "{}/v1/metric/{}".format(endpoint, v) - result = Common.perform_request( - url, auth_token, req_type="get", verify_ssl=verify_ssl) - metric = json.loads(result.text) - metric_list.append(metric) - if metric_list: - log.info("Return a list of %s resource metrics", resource) - return metric_list - - else: - log.info("There are no metrics available") - return [] + log.info("Metric {} not found for {} resource".format(metric_name, resource)) + return [] else: - url = "{}/v1/metric?sort=name:asc".format(endpoint) - result = Common.perform_request( - url, auth_token, req_type="get", verify_ssl=verify_ssl) - metrics = [] - metrics_partial = json.loads(result.text) - for metric in metrics_partial: - metrics.append(metric) - - while len(json.loads(result.text)) > 0: - last_metric_id = metrics_partial[-1]['id'] - url = "{}/v1/metric?sort=name:asc&marker={}".format(endpoint, last_metric_id) + metric_list = [] + for k, v in metrics.items(): + url = "{}/v1/metric/{}".format(endpoint, v) result = Common.perform_request( url, auth_token, req_type="get", verify_ssl=verify_ssl) - if len(json.loads(result.text)) > 0: - metrics_partial = json.loads(result.text) - for metric in metrics_partial: - metrics.append(metric) - - if metrics is not None: - # Format the list response - if metric_name is not None: - metric_list = self.response_list( - metrics, metric_name=metric_name) - log.info("Returning a list of %s metrics", metric_name) - else: - metric_list = self.response_list(metrics) - log.info("Returning a complete list of metrics") + metric = json.loads(result.text) + metric_list.append(metric) + if metric_list: + log.info("Return a list of %s resource metrics", resource) return metric_list + else: log.info("There are no metrics available") return [] - except Exception as exc: - log.exception("Failed to list metrics. %s", exc) - return None + else: + url = "{}/v1/metric?sort=name:asc".format(endpoint) + result = Common.perform_request( + url, auth_token, req_type="get", verify_ssl=verify_ssl) + metrics = [] + metrics_partial = json.loads(result.text) + for metric in metrics_partial: + metrics.append(metric) + + while len(json.loads(result.text)) > 0: + last_metric_id = metrics_partial[-1]['id'] + url = "{}/v1/metric?sort=name:asc&marker={}".format(endpoint, last_metric_id) + result = Common.perform_request( + url, auth_token, req_type="get", verify_ssl=verify_ssl) + if len(json.loads(result.text)) > 0: + metrics_partial = json.loads(result.text) + for metric in metrics_partial: + metrics.append(metric) + + if metrics is not None: + # Format the list response + if metric_name is not None: + metric_list = self.response_list( + metrics, metric_name=metric_name) + log.info("Returning a list of %s metrics", metric_name) + else: + metric_list = self.response_list(metrics) + log.info("Returning a complete list of metrics") + return metric_list + else: + log.info("There are no metrics available") + return [] def get_metric_id(self, endpoint, auth_token, metric_name, resource_id, verify_ssl): """Check if the desired metric already exists for the resource.""" @@ -390,53 +368,49 @@ class Metrics(object): req_type="get", verify_ssl=verify_ssl) return json.loads(result.text)['metrics'][metric_name] - except KeyError: - log.warning("Metric doesn't exist. No metric_id available") - return None + except KeyError as e: + log.error("Metric doesn't exist. No metric_id available") + raise e def read_metric_data(self, endpoint, auth_token, values, verify_ssl): """Collect metric measures over a specified time period.""" timestamps = [] data = [] - try: - # get metric_id - metric_id = self.get_metric_id(endpoint, auth_token, METRIC_MAPPINGS[values['metric_name']], - values['resource_uuid'], verify_ssl) - # Try and collect measures - collection_unit = values['collection_unit'].upper() - collection_period = values['collection_period'] - - # Define the start and end time based on configurations - # FIXME: Local timezone may differ from timezone set in Gnocchi, causing discrepancies in measures - stop_time = time.strftime("%Y-%m-%d") + "T" + time.strftime("%X") - end_time = int(round(time.time() * 1000)) - if collection_unit == 'YEAR': - diff = PERIOD_MS[collection_unit] - else: - diff = collection_period * PERIOD_MS[collection_unit] - s_time = (end_time - diff) / 1000.0 - start_time = datetime.datetime.fromtimestamp(s_time).strftime( - '%Y-%m-%dT%H:%M:%S.%f') - base_url = "{}/v1/metric/%(0)s/measures?start=%(1)s&stop=%(2)s" - url = base_url.format(endpoint) % { - "0": metric_id, "1": start_time, "2": stop_time} - - # Perform metric data request - metric_data = Common.perform_request( - url, - auth_token, - req_type="get", - verify_ssl=verify_ssl) - - # Generate a list of the requested timestamps and data - for r in json.loads(metric_data.text): - timestamp = r[0].replace("T", " ") - timestamps.append(timestamp) - data.append(r[2]) + # get metric_id + metric_id = self.get_metric_id(endpoint, auth_token, METRIC_MAPPINGS[values['metric_name']], + values['resource_uuid'], verify_ssl) + # Try and collect measures + collection_unit = values['collection_unit'].upper() + collection_period = values['collection_period'] + + # Define the start and end time based on configurations + # FIXME: Local timezone may differ from timezone set in Gnocchi, causing discrepancies in measures + stop_time = time.strftime("%Y-%m-%d") + "T" + time.strftime("%X") + end_time = int(round(time.time() * 1000)) + if collection_unit == 'YEAR': + diff = PERIOD_MS[collection_unit] + else: + diff = collection_period * PERIOD_MS[collection_unit] + s_time = (end_time - diff) / 1000.0 + start_time = datetime.datetime.fromtimestamp(s_time).strftime( + '%Y-%m-%dT%H:%M:%S.%f') + base_url = "{}/v1/metric/%(0)s/measures?start=%(1)s&stop=%(2)s" + url = base_url.format(endpoint) % { + "0": metric_id, "1": start_time, "2": stop_time} + + # Perform metric data request + metric_data = Common.perform_request( + url, + auth_token, + req_type="get", + verify_ssl=verify_ssl) + + # Generate a list of the requested timestamps and data + for r in json.loads(metric_data.text): + timestamp = r[0].replace("T", " ") + timestamps.append(timestamp) + data.append(r[2]) - return timestamps, data - except Exception as exc: - log.warning("Failed to gather specified measures: %s", exc) return timestamps, data def response_list(self, metric_list, metric_name=None, resource=None): @@ -483,3 +457,14 @@ class Metrics(object): return res_list else: return resp_list + + def _generate_and_send_response(self, key, correlation_id, **kwargs): + try: + resp_message = self._response.generate_response( + key, cor_id=correlation_id, **kwargs) + log.info("Response Message: %s", resp_message) + self._producer.publish_metrics_response( + key, resp_message) + except Exception as e: + log.exception("Response creation failed:") + raise e