X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=plugins%2FOpenStack%2FGnocchi%2Fmetrics.py;h=94641e69f21dcf88a0469a319e0a83bf9bc6d969;hb=7d5e61f406eab8d151125001a09f742dbadc3d70;hp=7b135477e7bc130709703be50a4224f0cbd1bcf5;hpb=f152768e6401ffc4d58b2478bb5ff5e6343b4e09;p=osm%2FMON.git diff --git a/plugins/OpenStack/Gnocchi/metrics.py b/plugins/OpenStack/Gnocchi/metrics.py index 7b13547..94641e6 100644 --- a/plugins/OpenStack/Gnocchi/metrics.py +++ b/plugins/OpenStack/Gnocchi/metrics.py @@ -23,18 +23,19 @@ import datetime import json -import logging as log +import logging + import time from core.message_bus.producer import KafkaProducer -from kafka import KafkaConsumer - -from plugins.OpenStack.common import Common from plugins.OpenStack.response import OpenStack_Response +from plugins.OpenStack.settings import Config __author__ = "Helena McGough" +log = logging.getLogger(__name__) + METRIC_MAPPINGS = { "average_memory_utilization": "memory.percent", "disk_read_ops": "disk.disk_ops", @@ -61,14 +62,14 @@ class Metrics(object): def __init__(self): """Initialize the metric actions.""" - self._common = Common() + # Configure an instance of the OpenStack metric plugin + config = Config.instance() + config.read_environ("gnocchi") - # TODO(mcgoughh): Initialize a generic consumer object to consume - # message from the SO. This is hardcoded for now - server = {'server': 'localhost:9092', 'topic': 'metric_request'} - self._consumer = KafkaConsumer(server['topic'], - group_id='osm_mon', - bootstrap_servers=server['server']) + # Initialise authentication for API requests + self.auth_token = None + self.endpoint = None + self._common = None # Use the Response class to generate valid json response messages self._response = OpenStack_Response() @@ -76,119 +77,131 @@ class Metrics(object): # Initializer a producer to send responses back to SO self._producer = KafkaProducer("metric_response") - def metric_calls(self): + def metric_calls(self, message, common, auth_token): """Consume info from the message bus to manage metric requests.""" - # Consumer check for metric messages - for message in self._consumer: - # Check if this plugin should carry out this request - values = json.loads(message.value) - vim_type = values['vim_type'].lower() - - if vim_type == "openstack": - # Generate auth_token and endpoint - auth_token, endpoint = self.authenticate() - - if message.key == "create_metric_request": - # Configure metric - metric_details = values['metric_create'] - metric_id, resource_id, status = self.configure_metric( - endpoint, auth_token, metric_details) - - # Generate and send a create metric response - try: - resp_message = self._response.generate_response( - 'create_metric_response', status=status, - cor_id=values['correlation_id'], - metric_id=metric_id, r_id=resource_id) - self._producer.create_metrics_resp( - 'create_metric_response', resp_message, - 'metric_response') - except Exception as exc: - log.warn("Failed to create response: %s", exc) - - elif message.key == "read_metric_data_request": - # Read all metric data related to a specified metric - timestamps, metric_data = self.read_metric_data( - endpoint, auth_token, values) - - # Generate and send a response message - try: - resp_message = self._response.generate_response( - 'read_metric_data_response', - m_id=values['metric_uuid'], - m_name=values['metric_name'], - r_id=values['resource_uuid'], - cor_id=values['correlation_id'], - times=timestamps, metrics=metric_data) - self._producer.read_metric_data_response( - 'read_metric_data_response', resp_message, - 'metric_response') - except Exception as exc: - log.warn("Failed to send read metric response:%s", exc) - - elif message.key == "delete_metric_request": - # delete the specified metric in the request - metric_id = values['metric_uuid'] - status = self.delete_metric( - endpoint, auth_token, metric_id) - - # Generate and send a response message - try: - resp_message = self._response.generate_response( - 'delete_metric_response', m_id=metric_id, - m_name=values['metric_name'], - status=status, r_id=values['resource_uuid'], - cor_id=values['correlation_id']) - self._producer.delete_metric_response( - 'delete_metric_response', resp_message, - 'metric_response') - except Exception as exc: - log.warn("Failed to send delete response:%s", exc) - - elif message.key == "update_metric_request": - # Gnocchi doesn't support configuration updates - # Log and send a response back to this effect - log.warn("Gnocchi doesn't support metric configuration\ - updates.") - req_details = values['metric_create'] - metric_name = req_details['metric_name'] - resource_id = req_details['resource_uuid'] - metric_id = self.get_metric_id( - endpoint, auth_token, metric_name, resource_id) - - # Generate and send a response message - try: - resp_message = self._response.generate_response( - 'update_metric_response', status=False, - cor_id=values['correlation_id'], - r_id=resource_id, m_id=metric_id) - self._producer.update_metric_response( - 'update_metric_response', resp_message, - 'metric_response') - except Exception as exc: - log.warn("Failed to send an update response:%s", exc) - - elif message.key == "list_metric_request": - list_details = values['metrics_list_request'] - - metric_list = self.list_metrics( - endpoint, auth_token, list_details) - - # Generate and send a response message - try: - resp_message = self._response.generate_response( - 'list_metric_response', m_list=metric_list, - cor_id=list_details['correlation_id']) - self._producer.list_metric_response( - 'list_metric_response', resp_message, - 'metric_response') - except Exception as exc: - log.warn("Failed to send a list response:%s", exc) - - else: - log.warn("Unknown key, no action will be performed.") + values = json.loads(message.value) + self._common = common + log.info("OpenStack metric action required.") + + # Generate and auth_token and endpoint for request + if auth_token is not None: + if self.auth_token != auth_token: + log.info("Auth_token for metrics set by access_credentials.") + self.auth_token = auth_token else: - log.debug("Message is not for this OpenStack.") + log.info("Auth_token has not been updated.") + else: + log.info("Using environment variables to set Gnocchi auth_token.") + self.auth_token = self._common._authenticate() + + if self.endpoint is None: + log.info("Generating a new endpoint for Gnocchi.") + self.endpoint = self._common.get_endpoint("metric") + + if message.key == "create_metric_request": + # Configure metric + metric_details = values['metric_create'] + metric_id, resource_id, status = self.configure_metric( + self.endpoint, self.auth_token, metric_details) + + # Generate and send a create metric response + try: + resp_message = self._response.generate_response( + 'create_metric_response', status=status, + cor_id=values['correlation_id'], + metric_id=metric_id, r_id=resource_id) + log.info("Response messages: %s", resp_message) + self._producer.create_metrics_resp( + 'create_metric_response', resp_message, + 'metric_response') + except Exception as exc: + log.warn("Failed to create response: %s", exc) + + elif message.key == "read_metric_data_request": + # Read all metric data related to a specified metric + timestamps, metric_data = self.read_metric_data( + self.endpoint, self.auth_token, values) + + # Generate and send a response message + try: + resp_message = self._response.generate_response( + 'read_metric_data_response', + m_id=values['metric_uuid'], + m_name=values['metric_name'], + r_id=values['resource_uuid'], + cor_id=values['correlation_id'], + times=timestamps, metrics=metric_data) + log.info("Response message: %s", resp_message) + self._producer.read_metric_data_response( + 'read_metric_data_response', resp_message, + 'metric_response') + except Exception as exc: + log.warn("Failed to send read metric response:%s", exc) + + elif message.key == "delete_metric_request": + # delete the specified metric in the request + metric_id = values['metric_uuid'] + status = self.delete_metric( + self.endpoint, self.auth_token, metric_id) + + # Generate and send a response message + try: + resp_message = self._response.generate_response( + 'delete_metric_response', m_id=metric_id, + m_name=values['metric_name'], + status=status, r_id=values['resource_uuid'], + cor_id=values['correlation_id']) + log.info("Response message: %s", resp_message) + self._producer.delete_metric_response( + 'delete_metric_response', resp_message, + 'metric_response') + except Exception as exc: + log.warn("Failed to send delete response:%s", exc) + + elif message.key == "update_metric_request": + # Gnocchi doesn't support configuration updates + # Log and send a response back to this effect + log.warn("Gnocchi doesn't support metric configuration\ + updates.") + req_details = values['metric_create'] + metric_name = req_details['metric_name'] + resource_id = req_details['resource_uuid'] + metric_id = self.get_metric_id( + self.endpoint, self.auth_token, metric_name, resource_id) + + # Generate and send a response message + try: + resp_message = self._response.generate_response( + 'update_metric_response', status=False, + cor_id=values['correlation_id'], + r_id=resource_id, m_id=metric_id) + log.info("Response message: %s", resp_message) + self._producer.update_metric_response( + 'update_metric_response', resp_message, + 'metric_response') + except Exception as exc: + log.warn("Failed to send an update response:%s", exc) + + elif message.key == "list_metric_request": + list_details = values['metrics_list_request'] + + metric_list = self.list_metrics( + self.endpoint, self.auth_token, list_details) + + # Generate and send a response message + try: + resp_message = self._response.generate_response( + 'list_metric_response', m_list=metric_list, + cor_id=list_details['correlation_id']) + log.info("Response message: %s", resp_message) + self._producer.list_metric_response( + 'list_metric_response', resp_message, + 'metric_response') + except Exception as exc: + log.warn("Failed to send a list response:%s", exc) + + else: + log.warn("Unknown key, no action will be performed.") return @@ -372,7 +385,7 @@ class Metrics(object): diff = PERIOD_MS[collection_unit] else: diff = collection_period * PERIOD_MS[collection_unit] - s_time = (end_time - diff)/1000.0 + s_time = (end_time - diff) / 1000.0 start_time = datetime.datetime.fromtimestamp(s_time).strftime( '%Y-%m-%dT%H:%M:%S.%f') base_url = "{}/v1/metric/%(0)s/measures?start=%(1)s&stop=%(2)s" @@ -394,18 +407,6 @@ class Metrics(object): log.warn("Failed to gather specified measures: %s", exc) return timestamps, data - def authenticate(self): - """Generate an authentication token and endpoint for metric request.""" - try: - # Check for a tenant_id - auth_token = self._common._authenticate() - endpoint = self._common.get_endpoint("metric") - return auth_token, endpoint - except Exception as exc: - log.warn("Authentication to Keystone failed: %s", exc) - - return None, None - def response_list(self, metric_list, metric_name=None, resource=None): """Create the appropriate lists for a list response.""" resp_list, name_list, res_list = [], [], []