X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=plugins%2FOpenStack%2FGnocchi%2Fmetrics.py;h=ca6f47a5469da919821f279a1f1b2d65fa1dff2e;hb=f0434f4d3dab5f0a4fb417560de4b6bd6d6d45bb;hp=bc1a729384f408e95c5844013d96fc217b95db40;hpb=f6064437ba352d7fee6b4a7a4e7cb2582ef5cd32;p=osm%2FMON.git diff --git a/plugins/OpenStack/Gnocchi/metrics.py b/plugins/OpenStack/Gnocchi/metrics.py index bc1a729..ca6f47a 100644 --- a/plugins/OpenStack/Gnocchi/metrics.py +++ b/plugins/OpenStack/Gnocchi/metrics.py @@ -1,15 +1,63 @@ -"""Gnocchi acts on a metric message received from the SO via MON.""" +# Copyright 2017 Intel Research and Development Ireland Limited +# ************************************************************* +# This file is part of OSM Monitoring module +# All Rights Reserved to Intel Corporation + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: helena.mcgough@intel.com or adrian.hoban@intel.com +## +"""Carry out OpenStack metric requests via Gnocchi API.""" + +import datetime import json import logging as log +import time + +from core.message_bus.producer import KafkaProducer from kafka import KafkaConsumer from plugins.OpenStack.common import Common +from plugins.OpenStack.response import OpenStack_Response + +__author__ = "Helena McGough" + +METRIC_MAPPINGS = { + "AVERAGE_MEMORY_UTILIZATION": "memory.percent", + "DISK_READ_OPS": "disk.disk_ops", + "DISK_WRITE_OPS": "disk.disk_ops", + "DISK_READ_BYTES": "disk.disk_octets", + "DISK_WRITE_BYTES": "disk.disk_octets", + "PACKETS_DROPPED": "interface.if_dropped", + "PACKETS_RECEIVED": "interface.if_packets", + "PACKETS_SENT": "interface.if_packets", + "CPU_UTILIZATION": "cpu.percent", +} + +PERIOD_MS = { + "HR": 3600000, + "DAY": 86400000, + "WEEK": 604800000, + "MONTH": 2629746000, + "YEAR": 31556952000 +} class Metrics(object): - """Gnocchi based metric actions performed on info from MON.""" + """OpenStack metric requests performed via the Gnocchi API.""" def __init__(self): """Initialize the metric actions.""" @@ -17,100 +65,349 @@ class Metrics(object): # TODO(mcgoughh): Initialize a generic consumer object to consume # message from the SO. This is hardcoded for now - server = {'server': 'localhost:9092', 'topic': 'metrics'} + server = {'server': 'localhost:9092', 'topic': 'metric_request'} self._consumer = KafkaConsumer(server['topic'], - group_id='my-group', + group_id='osm_mon', bootstrap_servers=server['server']) - # TODO(mcgoughh): Initialize a producer to send messages bask to the SO + # Use the Response class to generate valid json response messages + self._response = OpenStack_Response() + + # Initializer a producer to send responses back to SO + self._producer = KafkaProducer("metric_response") def metric_calls(self): - """Consume info from the message bus to manage metrics.""" - # Concumer check for metric messages + """Consume info from the message bus to manage metric requests.""" + # Consumer check for metric messages for message in self._consumer: + # Check if this plugin should carry out this request + values = json.loads(message.value) + vim_type = values['vim_type'].lower() + + if vim_type == "openstack": + # Generate auth_token and endpoint + auth_token, endpoint = self.authenticate(values) - if message.topic == "metrics": - log.info("Metric action required on this topic: %s", - (message.topic)) + if message.key == "create_metric_request": + # Configure metric + metric_details = values['metric_create'] + metric_id, resource_id, status = self.configure_metric( + endpoint, auth_token, metric_details) - if message.key == "configure_metric": - # Configure/Update a resource and its metric - values = json.loads(message.value) - schema = values['configure_metrics'] - metric_details = schema['metrics_configuration'] + # Generate and send a create metric response + try: + resp_message = self._response.generate_response( + 'create_metric_response', status=status, + cor_id=values['correlation_id'], + metric_id=metric_id, r_id=resource_id) + self._producer.create_metrics_resp( + 'create_metric_response', resp_message, + 'metric_response') + except Exception as exc: + log.warn("Failed to create response: %s", exc) - # Generate authentication credentials via keystone: - # auth_token, endpoint - auth_token = self._common._authenticate( - schema['tenant_uuid']) - endpoint = self._common.get_endpoint("metric") + elif message.key == "read_metric_data_request": + # Read all metric data related to a specified metric + timestamps, metric_data = self.read_metric_data( + endpoint, auth_token, values) - metric_id = self.configure_metric( - endpoint, auth_token, metric_details) - log.info("New metric created with metricID: %s", metric_id) + # Generate and send a response message + try: + resp_message = self._response.generate_response( + 'read_metric_data_response', + m_id=values['metric_uuid'], + m_name=values['metric_name'], + r_id=values['resource_uuid'], + cor_id=values['correlation_id'], + times=timestamps, metrics=metric_data) + self._producer.read_metric_data_response( + 'read_metric_data_response', resp_message, + 'metric_response') + except Exception as exc: + log.warn("Failed to send read metric response:%s", exc) - # TODO(mcgoughh): will send an acknowledge message back on - # the bus via the producer + elif message.key == "delete_metric_request": + # delete the specified metric in the request + metric_id = values['metric_uuid'] + status = self.delete_metric( + endpoint, auth_token, metric_id) - # TODO(mcoughh): Key alternatives are "metric_data_request" and - # "metric_data_response" will be accomodated later - # Will also need a producer for this functionality - elif message.key == "metric_data_request": - log.debug("Key used to request a metrics data") + # Generate and send a response message + try: + resp_message = self._response.generate_response( + 'delete_metric_response', m_id=metric_id, + m_name=values['metric_name'], + status=status, r_id=values['resource_uuid'], + cor_id=values['correlation_id']) + self._producer.delete_metric_response( + 'delete_metric_response', resp_message, + 'metric_response') + except Exception as exc: + log.warn("Failed to send delete response:%s", exc) - elif message.key == "metric_data_response": - log.debug("Key used for a metrics data response") + elif message.key == "update_metric_request": + # Gnocchi doesn't support configuration updates + # Log and send a response back to this effect + log.warn("Gnocchi doesn't support metric configuration\ + updates.") + req_details = values['metric_create'] + metric_name = req_details['metric_name'] + resource_id = req_details['resource_uuid'] + metric_id = self.get_metric_id( + endpoint, auth_token, metric_name, resource_id) - else: - log.debug("Unknown key, no action will be performed") + # Generate and send a response message + try: + resp_message = self._response.generate_response( + 'update_metric_response', status=False, + cor_id=values['correlation_id'], + r_id=resource_id, m_id=metric_id) + self._producer.update_metric_response( + 'update_metric_response', resp_message, + 'metric_response') + except Exception as exc: + log.warn("Failed to send an update response:%s", exc) + + elif message.key == "list_metric_request": + list_details = values['metrics_list_request'] + + metric_list = self.list_metrics( + endpoint, auth_token, list_details) + + # Generate and send a response message + try: + resp_message = self._response.generate_response( + 'list_metric_response', m_list=metric_list, + cor_id=list_details['correlation_id']) + self._producer.list_metric_response( + 'list_metric_response', resp_message, + 'metric_response') + except Exception as exc: + log.warn("Failed to send a list response:%s", exc) + else: + log.warn("Unknown key, no action will be performed.") else: - log.info("Message topic not relevant to this plugin: %s", - message.topic) + log.debug("Message is not for this OpenStack.") return def configure_metric(self, endpoint, auth_token, values): - """Create the new SO desired metric in Gnocchi.""" - metric_id = None + """Create the new metric in Gnocchi.""" + try: + resource_id = values['resource_uuid'] + except KeyError: + log.warn("Resource is not defined correctly.") + return None, None, False - # TODO(mcgoughh): error check the values sent in the message - # will query the database for the request resource and then - # check that resource for the desired metric - metric_name = values['metric_name'] + # Check/Normalize metric name + metric_name, norm_name = self.get_metric_name(values) + if norm_name is None: + log.warn("This metric is not supported by this plugin.") + return None, resource_id, False - if metric_id is None: + # Check for an existing metric for this resource + metric_id = self.get_metric_id( + endpoint, auth_token, metric_name, resource_id) + if metric_id is None: # Need to create a new version of the resource for gnocchi to - # the new metric - resource_url = "{}/v1/resource/generic".format(endpoint) + # create the new metric based on that resource + url = "{}/v1/resource/generic".format(endpoint) + try: + # Try to create a new resource for the new metric + metric = {'name': metric_name, + 'archive_policy_name': 'high', + 'unit': values['metric_unit'], } - metric = {'name': metric_name, - 'unit': values['metric_unit'], } + resource_payload = json.dumps({'id': resource_id, + 'metrics': { + metric_name: metric}}) - resource_payload = json.dumps({'id': values['resource_uuid'], - 'metrics': {metric_name: metric}}) + new_resource = self._common._perform_request( + url, auth_token, req_type="post", payload=resource_payload) - new_resource = self._common._perform_request( - resource_url, auth_token, - req_type="post", payload=resource_payload) - new_metric = json.loads(new_resource.text)['metrics'] + resource_id = json.loads(new_resource.text)['id'] + except Exception as exc: + # Append new metric to existing resource + log.debug("This resource already exists:%s, appending metric.", + exc) + base_url = "{}/v1/resource/generic/%s/metric" + res_url = base_url.format(endpoint) % resource_id + payload = {metric_name: {'archive_policy_name': 'high', + 'unit': values['metric_unit']}} + self._common._perform_request( + res_url, auth_token, req_type="post", + payload=json.dumps(payload)) + + metric_id = self.get_metric_id( + endpoint, auth_token, metric_name, resource_id) + return metric_id, resource_id, True - return new_metric[metric_name] else: - return metric_id + log.debug("This metric already exists for this resource.") + + return metric_id, resource_id, False def delete_metric(self, endpoint, auth_token, metric_id): """Delete metric.""" url = "{}/v1/metric/%s".format(endpoint) % (metric_id) - self._common._perform_request(url, auth_token, req_type="delete") - return None + try: + result = self._common._perform_request( + url, auth_token, req_type="delete") + if str(result.status_code) == "404": + log.warn("Failed to delete the metric.") + return False + else: + return True + except Exception as exc: + log.warn("Failed to carry out delete metric request:%s", exc) + return False - def list_metrics(self, endpoint, auth_token): + def list_metrics(self, endpoint, auth_token, values): """List all metrics.""" url = "{}/v1/metric/".format(endpoint) - metric_list = self._common._perform_request( - url, auth_token, req_type="get") - return json.loads(metric_list.text) + try: + # Check if the metric_name was specified for the list + metric_name = values['metric_name'] + result = self._common._perform_request( + url, auth_token, req_type="get") + metric_list = json.loads(result.text) + + # Format the list response + metrics = self.response_list( + metric_list, metric_name=metric_name) + return metrics + except KeyError: + log.debug("Metric name is not specified for this list.") + + try: + # Check if a resource_id was specified + resource_id = values['resource_uuid'] + result = self._common._perform_request( + url, auth_token, req_type="get") + metric_list = json.loads(result.text) + # Format the list response + metrics = self.response_list( + metric_list, resource=resource_id) + return metrics + except KeyError: + log.debug("Resource id not specificed either, will return a\ + complete list.") + try: + result = self._common._perform_request( + url, auth_token, req_type="get") + metric_list = json.loads(result.text) + # Format the list response + metrics = self.response_list(metric_list) + return metrics + + except Exception as exc: + log.warn("Failed to generate any metric list. %s", exc) + return None + + def get_metric_id(self, endpoint, auth_token, metric_name, resource_id): + """Check if the desired metric already exists for the resource.""" + url = "{}/v1/resource/generic/%s".format(endpoint) % resource_id + + try: + # Try return the metric id if it exists + result = self._common._perform_request( + url, auth_token, req_type="get") + return json.loads(result.text)['metrics'][metric_name] + except Exception: + log.debug("Metric doesn't exist. No metric_id available") + return None + + def get_metric_name(self, values): + """Check metric name configuration and normalize.""" + try: + # Normalize metric name + metric_name = values['metric_name'] + return metric_name, METRIC_MAPPINGS[metric_name] + except KeyError: + log.warn("Metric name %s is invalid.", metric_name) + return metric_name, None + + def read_metric_data(self, endpoint, auth_token, values): + """Collectd metric measures over a specified time period.""" + timestamps = [] + data = [] + try: + # Try and collect measures + metric_id = values['metric_uuid'] + collection_unit = values['collection_unit'].upper() + collection_period = values['collection_period'] + + # Define the start and end time based on configurations + stop_time = time.strftime("%Y-%m-%d") + "T" + time.strftime("%X") + end_time = int(round(time.time() * 1000)) + if collection_unit == 'YEAR': + diff = PERIOD_MS[collection_unit] + else: + diff = collection_period * PERIOD_MS[collection_unit] + s_time = (end_time - diff)/1000.0 + start_time = datetime.datetime.fromtimestamp(s_time).strftime( + '%Y-%m-%dT%H:%M:%S.%f') + base_url = "{}/v1/metric/%(0)s/measures?start=%(1)s&stop=%(2)s" + url = base_url.format(endpoint) % { + "0": metric_id, "1": start_time, "2": stop_time} + + # Perform metric data request + metric_data = self._common._perform_request( + url, auth_token, req_type="get") + + # Generate a list of the requested timestamps and data + for r in json.loads(metric_data.text): + timestamp = r[0].replace("T", " ") + timestamps.append(timestamp) + data.append(r[2]) + + return timestamps, data + except Exception as exc: + log.warn("Failed to gather specified measures: %s", exc) + return timestamps, data + + def authenticate(self, values): + """Generate an authentication token and endpoint for metric request.""" + try: + # Check for a tenant_id + auth_token = self._common._authenticate( + tenant_id=values['tenant_uuid']) + endpoint = self._common.get_endpoint("metric") + except KeyError: + log.warn("Tenant ID is not specified. Will use a generic\ + authentication.") + auth_token = self._common._authenticate() + endpoint = self._common.get_endpoint("metric") + + return auth_token, endpoint + + def response_list(self, metric_list, metric_name=None, resource=None): + """Create the appropriate lists for a list response.""" + resp_list = [] + + for row in metric_list: + if metric_name is not None: + if row['name'] == metric_name: + metric = {"metric_name": row['name'], + "metric_uuid": row['id'], + "metric_unit": row['unit'], + "resource_uuid": row['resource_id']} + resp_list.append(metric) + elif resource is not None: + if row['resource_id'] == resource: + metric = {"metric_name": row['name'], + "metric_uuid": row['id'], + "metric_unit": row['unit'], + "resource_uuid": row['resource_id']} + resp_list.append(metric) + else: + metric = {"metric_name": row['name'], + "metric_uuid": row['id'], + "metric_unit": row['unit'], + "resource_uuid": row['resource_id']} + resp_list.append(metric) + return resp_list