X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=plugins%2FOpenStack%2FGnocchi%2Fmetrics.py;h=94641e69f21dcf88a0469a319e0a83bf9bc6d969;hb=7d5e61f406eab8d151125001a09f742dbadc3d70;hp=2004c38dee99f40aa39db1732d3fe77480265ffb;hpb=d00ff82f2b06b5ae2a353e0f6a9b30d813ffa21f;p=osm%2FMON.git diff --git a/plugins/OpenStack/Gnocchi/metrics.py b/plugins/OpenStack/Gnocchi/metrics.py index 2004c38..94641e6 100644 --- a/plugins/OpenStack/Gnocchi/metrics.py +++ b/plugins/OpenStack/Gnocchi/metrics.py @@ -23,18 +23,19 @@ import datetime import json -import logging as log +import logging + import time from core.message_bus.producer import KafkaProducer -from kafka import KafkaConsumer - -from plugins.OpenStack.common import Common from plugins.OpenStack.response import OpenStack_Response +from plugins.OpenStack.settings import Config __author__ = "Helena McGough" +log = logging.getLogger(__name__) + METRIC_MAPPINGS = { "average_memory_utilization": "memory.percent", "disk_read_ops": "disk.disk_ops", @@ -61,14 +62,14 @@ class Metrics(object): def __init__(self): """Initialize the metric actions.""" - self._common = Common() + # Configure an instance of the OpenStack metric plugin + config = Config.instance() + config.read_environ("gnocchi") - # TODO(mcgoughh): Initialize a generic consumer object to consume - # message from the SO. This is hardcoded for now - server = {'server': 'localhost:9092', 'topic': 'metric_request'} - self._consumer = KafkaConsumer(server['topic'], - group_id='osm_mon', - bootstrap_servers=server['server']) + # Initialise authentication for API requests + self.auth_token = None + self.endpoint = None + self._common = None # Use the Response class to generate valid json response messages self._response = OpenStack_Response() @@ -76,119 +77,131 @@ class Metrics(object): # Initializer a producer to send responses back to SO self._producer = KafkaProducer("metric_response") - def metric_calls(self): + def metric_calls(self, message, common, auth_token): """Consume info from the message bus to manage metric requests.""" - # Consumer check for metric messages - for message in self._consumer: - # Check if this plugin should carry out this request - values = json.loads(message.value) - vim_type = values['vim_type'].lower() - - if vim_type == "openstack": - # Generate auth_token and endpoint - auth_token, endpoint = self.authenticate() - - if message.key == "create_metric_request": - # Configure metric - metric_details = values['metric_create'] - metric_id, resource_id, status = self.configure_metric( - endpoint, auth_token, metric_details) - - # Generate and send a create metric response - try: - resp_message = self._response.generate_response( - 'create_metric_response', status=status, - cor_id=values['correlation_id'], - metric_id=metric_id, r_id=resource_id) - self._producer.create_metrics_resp( - 'create_metric_response', resp_message, - 'metric_response') - except Exception as exc: - log.warn("Failed to create response: %s", exc) - - elif message.key == "read_metric_data_request": - # Read all metric data related to a specified metric - timestamps, metric_data = self.read_metric_data( - endpoint, auth_token, values) - - # Generate and send a response message - try: - resp_message = self._response.generate_response( - 'read_metric_data_response', - m_id=values['metric_uuid'], - m_name=values['metric_name'], - r_id=values['resource_uuid'], - cor_id=values['correlation_id'], - times=timestamps, metrics=metric_data) - self._producer.read_metric_data_response( - 'read_metric_data_response', resp_message, - 'metric_response') - except Exception as exc: - log.warn("Failed to send read metric response:%s", exc) - - elif message.key == "delete_metric_request": - # delete the specified metric in the request - metric_id = values['metric_uuid'] - status = self.delete_metric( - endpoint, auth_token, metric_id) - - # Generate and send a response message - try: - resp_message = self._response.generate_response( - 'delete_metric_response', m_id=metric_id, - m_name=values['metric_name'], - status=status, r_id=values['resource_uuid'], - cor_id=values['correlation_id']) - self._producer.delete_metric_response( - 'delete_metric_response', resp_message, - 'metric_response') - except Exception as exc: - log.warn("Failed to send delete response:%s", exc) - - elif message.key == "update_metric_request": - # Gnocchi doesn't support configuration updates - # Log and send a response back to this effect - log.warn("Gnocchi doesn't support metric configuration\ - updates.") - req_details = values['metric_create'] - metric_name = req_details['metric_name'] - resource_id = req_details['resource_uuid'] - metric_id = self.get_metric_id( - endpoint, auth_token, metric_name, resource_id) - - # Generate and send a response message - try: - resp_message = self._response.generate_response( - 'update_metric_response', status=False, - cor_id=values['correlation_id'], - r_id=resource_id, m_id=metric_id) - self._producer.update_metric_response( - 'update_metric_response', resp_message, - 'metric_response') - except Exception as exc: - log.warn("Failed to send an update response:%s", exc) - - elif message.key == "list_metric_request": - list_details = values['metrics_list_request'] - - metric_list = self.list_metrics( - endpoint, auth_token, list_details) - - # Generate and send a response message - try: - resp_message = self._response.generate_response( - 'list_metric_response', m_list=metric_list, - cor_id=list_details['correlation_id']) - self._producer.list_metric_response( - 'list_metric_response', resp_message, - 'metric_response') - except Exception as exc: - log.warn("Failed to send a list response:%s", exc) - - else: - log.warn("Unknown key, no action will be performed.") + values = json.loads(message.value) + self._common = common + log.info("OpenStack metric action required.") + + # Generate and auth_token and endpoint for request + if auth_token is not None: + if self.auth_token != auth_token: + log.info("Auth_token for metrics set by access_credentials.") + self.auth_token = auth_token else: - log.debug("Message is not for this OpenStack.") + log.info("Auth_token has not been updated.") + else: + log.info("Using environment variables to set Gnocchi auth_token.") + self.auth_token = self._common._authenticate() + + if self.endpoint is None: + log.info("Generating a new endpoint for Gnocchi.") + self.endpoint = self._common.get_endpoint("metric") + + if message.key == "create_metric_request": + # Configure metric + metric_details = values['metric_create'] + metric_id, resource_id, status = self.configure_metric( + self.endpoint, self.auth_token, metric_details) + + # Generate and send a create metric response + try: + resp_message = self._response.generate_response( + 'create_metric_response', status=status, + cor_id=values['correlation_id'], + metric_id=metric_id, r_id=resource_id) + log.info("Response messages: %s", resp_message) + self._producer.create_metrics_resp( + 'create_metric_response', resp_message, + 'metric_response') + except Exception as exc: + log.warn("Failed to create response: %s", exc) + + elif message.key == "read_metric_data_request": + # Read all metric data related to a specified metric + timestamps, metric_data = self.read_metric_data( + self.endpoint, self.auth_token, values) + + # Generate and send a response message + try: + resp_message = self._response.generate_response( + 'read_metric_data_response', + m_id=values['metric_uuid'], + m_name=values['metric_name'], + r_id=values['resource_uuid'], + cor_id=values['correlation_id'], + times=timestamps, metrics=metric_data) + log.info("Response message: %s", resp_message) + self._producer.read_metric_data_response( + 'read_metric_data_response', resp_message, + 'metric_response') + except Exception as exc: + log.warn("Failed to send read metric response:%s", exc) + + elif message.key == "delete_metric_request": + # delete the specified metric in the request + metric_id = values['metric_uuid'] + status = self.delete_metric( + self.endpoint, self.auth_token, metric_id) + + # Generate and send a response message + try: + resp_message = self._response.generate_response( + 'delete_metric_response', m_id=metric_id, + m_name=values['metric_name'], + status=status, r_id=values['resource_uuid'], + cor_id=values['correlation_id']) + log.info("Response message: %s", resp_message) + self._producer.delete_metric_response( + 'delete_metric_response', resp_message, + 'metric_response') + except Exception as exc: + log.warn("Failed to send delete response:%s", exc) + + elif message.key == "update_metric_request": + # Gnocchi doesn't support configuration updates + # Log and send a response back to this effect + log.warn("Gnocchi doesn't support metric configuration\ + updates.") + req_details = values['metric_create'] + metric_name = req_details['metric_name'] + resource_id = req_details['resource_uuid'] + metric_id = self.get_metric_id( + self.endpoint, self.auth_token, metric_name, resource_id) + + # Generate and send a response message + try: + resp_message = self._response.generate_response( + 'update_metric_response', status=False, + cor_id=values['correlation_id'], + r_id=resource_id, m_id=metric_id) + log.info("Response message: %s", resp_message) + self._producer.update_metric_response( + 'update_metric_response', resp_message, + 'metric_response') + except Exception as exc: + log.warn("Failed to send an update response:%s", exc) + + elif message.key == "list_metric_request": + list_details = values['metrics_list_request'] + + metric_list = self.list_metrics( + self.endpoint, self.auth_token, list_details) + + # Generate and send a response message + try: + resp_message = self._response.generate_response( + 'list_metric_response', m_list=metric_list, + cor_id=list_details['correlation_id']) + log.info("Response message: %s", resp_message) + self._producer.list_metric_response( + 'list_metric_response', resp_message, + 'metric_response') + except Exception as exc: + log.warn("Failed to send a list response:%s", exc) + + else: + log.warn("Unknown key, no action will be performed.") return @@ -283,43 +296,53 @@ class Metrics(object): """List all metrics.""" url = "{}/v1/metric/".format(endpoint) + # Check for a specified list try: # Check if the metric_name was specified for the list - metric_name = values['metric_name'] - result = self._common._perform_request( - url, auth_token, req_type="get") - metric_list = json.loads(result.text) + metric_name = values['metric_name'].lower() + if metric_name not in METRIC_MAPPINGS.keys(): + log.warn("This metric is not supported, won't be listed.") + metric_name = None + except KeyError as exc: + log.info("Metric name is not specified: %s", exc) + metric_name = None - # Format the list response - metrics = self.response_list( - metric_list, metric_name=metric_name) - return metrics - except KeyError: - log.debug("Metric name is not specified for this list.") + try: + resource = values['resource_uuid'] + except KeyError as exc: + log.info("Resource is not specified:%s", exc) + resource = None try: - # Check if a resource_id was specified - resource_id = values['resource_uuid'] result = self._common._perform_request( url, auth_token, req_type="get") - metric_list = json.loads(result.text) - # Format the list response - metrics = self.response_list( - metric_list, resource=resource_id) - return metrics - except KeyError: - log.debug("Resource id not specificed either, will return a\ - complete list.") - try: - result = self._common._perform_request( - url, auth_token, req_type="get") - metric_list = json.loads(result.text) + metrics = json.loads(result.text) + + if metrics is not None: # Format the list response - metrics = self.response_list(metric_list) - return metrics + if metric_name is not None and resource is not None: + metric_list = self.response_list( + metrics, metric_name=metric_name, resource=resource) + log.info("Returning an %s resource list for %s metrics", + metric_name, resource) + elif metric_name is not None: + metric_list = self.response_list( + metrics, metric_name=metric_name) + log.info("Returning a list of %s metrics", metric_name) + elif resource is not None: + metric_list = self.response_list( + metrics, resource=resource) + log.info("Return a list of %s resource metrics", resource) + else: + metric_list = self.response_list(metrics) + log.info("Returning a complete list of metrics") - except Exception as exc: - log.warn("Failed to generate any metric list. %s", exc) + return metric_list + else: + log.info("There are no metrics available") + return [] + except Exception as exc: + log.warn("Failed to generate any metric list. %s", exc) return None def get_metric_id(self, endpoint, auth_token, metric_name, resource_id): @@ -362,7 +385,7 @@ class Metrics(object): diff = PERIOD_MS[collection_unit] else: diff = collection_period * PERIOD_MS[collection_unit] - s_time = (end_time - diff)/1000.0 + s_time = (end_time - diff) / 1000.0 start_time = datetime.datetime.fromtimestamp(s_time).strftime( '%Y-%m-%dT%H:%M:%S.%f') base_url = "{}/v1/metric/%(0)s/measures?start=%(1)s&stop=%(2)s" @@ -384,41 +407,42 @@ class Metrics(object): log.warn("Failed to gather specified measures: %s", exc) return timestamps, data - def authenticate(self): - """Generate an authentication token and endpoint for metric request.""" - try: - # Check for a tenant_id - auth_token = self._common._authenticate() - endpoint = self._common.get_endpoint("metric") - return auth_token, endpoint - except Exception as exc: - log.warn("Authentication to Keystone failed: %s", exc) - - return None, None - def response_list(self, metric_list, metric_name=None, resource=None): """Create the appropriate lists for a list response.""" - resp_list = [] + resp_list, name_list, res_list = [], [], [] + # Create required lists for row in metric_list: + # Only list OSM metrics + if row['name'] in METRIC_MAPPINGS.keys(): + metric = {"metric_name": row['name'], + "metric_uuid": row['id'], + "metric_unit": row['unit'], + "resource_uuid": row['resource_id']} + resp_list.append(str(metric)) + # Generate metric_name specific list if metric_name is not None: if row['name'] == metric_name: metric = {"metric_name": row['name'], "metric_uuid": row['id'], "metric_unit": row['unit'], "resource_uuid": row['resource_id']} - resp_list.append(metric) - elif resource is not None: + name_list.append(str(metric)) + # Generate resource specific list + if resource is not None: if row['resource_id'] == resource: metric = {"metric_name": row['name'], "metric_uuid": row['id'], "metric_unit": row['unit'], "resource_uuid": row['resource_id']} - resp_list.append(metric) - else: - metric = {"metric_name": row['name'], - "metric_uuid": row['id'], - "metric_unit": row['unit'], - "resource_uuid": row['resource_id']} - resp_list.append(metric) - return resp_list + res_list.append(str(metric)) + + # Join required lists + if metric_name is not None and resource is not None: + return list(set(res_list).intersection(name_list)) + elif metric_name is not None: + return name_list + elif resource is not None: + return list(set(res_list).intersection(resp_list)) + else: + return resp_list