From: Helena McGough Date: Thu, 31 Aug 2017 07:44:48 +0000 (+0000) Subject: First instance of the gnocchi plugin X-Git-Tag: v4.0.0~96 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=refs%2Fchanges%2F14%2F2114%2F1;p=osm%2FMON.git First instance of the gnocchi plugin - Provides basic functionality for configuring metrics based on the resource_id - Reads in values from the kafka_producer app Change-Id: I0a1fb198728d567c6e1b96724b1554383dec45b8 Signed-off-by: Helena McGough --- diff --git a/plugins/OpenStack/Ceilometer/.gitkeep b/plugins/OpenStack/Ceilometer/.gitkeep deleted file mode 100644 index 2272ebb..0000000 --- a/plugins/OpenStack/Ceilometer/.gitkeep +++ /dev/null @@ -1 +0,0 @@ -#gitkeep file to keep the initial empty directory structure. diff --git a/plugins/OpenStack/Gnocchi/.gitkeep b/plugins/OpenStack/Gnocchi/.gitkeep new file mode 100644 index 0000000..2272ebb --- /dev/null +++ b/plugins/OpenStack/Gnocchi/.gitkeep @@ -0,0 +1 @@ +#gitkeep file to keep the initial empty directory structure. diff --git a/plugins/OpenStack/Gnocchi/__init__.py b/plugins/OpenStack/Gnocchi/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/plugins/OpenStack/Gnocchi/metrics.py b/plugins/OpenStack/Gnocchi/metrics.py new file mode 100644 index 0000000..bc1a729 --- /dev/null +++ b/plugins/OpenStack/Gnocchi/metrics.py @@ -0,0 +1,116 @@ +"""Gnocchi acts on a metric message received from the SO via MON.""" + +import json +import logging as log + +from kafka import KafkaConsumer + +from plugins.OpenStack.common import Common + + +class Metrics(object): + """Gnocchi based metric actions performed on info from MON.""" + + def __init__(self): + """Initialize the metric actions.""" + self._common = Common() + + # TODO(mcgoughh): Initialize a generic consumer object to consume + # message from the SO. This is hardcoded for now + server = {'server': 'localhost:9092', 'topic': 'metrics'} + self._consumer = KafkaConsumer(server['topic'], + group_id='my-group', + bootstrap_servers=server['server']) + + # TODO(mcgoughh): Initialize a producer to send messages bask to the SO + + def metric_calls(self): + """Consume info from the message bus to manage metrics.""" + # Concumer check for metric messages + for message in self._consumer: + + if message.topic == "metrics": + log.info("Metric action required on this topic: %s", + (message.topic)) + + if message.key == "configure_metric": + # Configure/Update a resource and its metric + values = json.loads(message.value) + schema = values['configure_metrics'] + metric_details = schema['metrics_configuration'] + + # Generate authentication credentials via keystone: + # auth_token, endpoint + auth_token = self._common._authenticate( + schema['tenant_uuid']) + endpoint = self._common.get_endpoint("metric") + + metric_id = self.configure_metric( + endpoint, auth_token, metric_details) + log.info("New metric created with metricID: %s", metric_id) + + # TODO(mcgoughh): will send an acknowledge message back on + # the bus via the producer + + # TODO(mcoughh): Key alternatives are "metric_data_request" and + # "metric_data_response" will be accomodated later + # Will also need a producer for this functionality + elif message.key == "metric_data_request": + log.debug("Key used to request a metrics data") + + elif message.key == "metric_data_response": + log.debug("Key used for a metrics data response") + + else: + log.debug("Unknown key, no action will be performed") + + else: + log.info("Message topic not relevant to this plugin: %s", + message.topic) + + return + + def configure_metric(self, endpoint, auth_token, values): + """Create the new SO desired metric in Gnocchi.""" + metric_id = None + + # TODO(mcgoughh): error check the values sent in the message + # will query the database for the request resource and then + # check that resource for the desired metric + metric_name = values['metric_name'] + + if metric_id is None: + + # Need to create a new version of the resource for gnocchi to + # the new metric + resource_url = "{}/v1/resource/generic".format(endpoint) + + metric = {'name': metric_name, + 'unit': values['metric_unit'], } + + resource_payload = json.dumps({'id': values['resource_uuid'], + 'metrics': {metric_name: metric}}) + + new_resource = self._common._perform_request( + resource_url, auth_token, + req_type="post", payload=resource_payload) + new_metric = json.loads(new_resource.text)['metrics'] + + return new_metric[metric_name] + else: + return metric_id + + def delete_metric(self, endpoint, auth_token, metric_id): + """Delete metric.""" + url = "{}/v1/metric/%s".format(endpoint) % (metric_id) + + self._common._perform_request(url, auth_token, req_type="delete") + return None + + def list_metrics(self, endpoint, auth_token): + """List all metrics.""" + url = "{}/v1/metric/".format(endpoint) + + metric_list = self._common._perform_request( + url, auth_token, req_type="get") + return json.loads(metric_list.text) diff --git a/plugins/OpenStack/Gnocchi/plugin_instance.py b/plugins/OpenStack/Gnocchi/plugin_instance.py new file mode 100644 index 0000000..6f9e306 --- /dev/null +++ b/plugins/OpenStack/Gnocchi/plugin_instance.py @@ -0,0 +1,36 @@ +"""Gnocchi plugin for the OSM monitoring module.""" + +import logging as log + +from plugins.OpenStack.Gnocchi.metrics import Metrics +from plugins.OpenStack.settings import Config + + +def register_plugin(): + """Register the plugin.""" + config = Config.instance() + instance = Plugin(config=config) + instance.config() + instance.metrics() + + +class Plugin(object): + """Gnocchi plugin for OSM MON.""" + + def __init__(self, config): + """Plugin instance.""" + log.info("Initialze the plugin instance.") + self._config = config + self._metrics = Metrics() + + def config(self): + """Configure plugin.""" + log.info("Configure the plugin instance.") + self._config.read_environ("gnocchi") + + def metrics(self): + """Initialize metric functionality.""" + log.info("Initialize metric functionality.") + self._metrics.metric_calls() + +register_plugin() diff --git a/plugins/OpenStack/__init__.py b/plugins/OpenStack/__init__.py index 805dce3..e69de29 100644 Binary files a/plugins/OpenStack/__init__.py and b/plugins/OpenStack/__init__.py differ diff --git a/plugins/OpenStack/common.py b/plugins/OpenStack/common.py new file mode 100644 index 0000000..d706456 --- /dev/null +++ b/plugins/OpenStack/common.py @@ -0,0 +1,86 @@ +"""Common methods for the Aodh Sender/Receiver.""" + +import logging as log + +from keystoneclient.v3 import client + +from plugins.OpenStack.settings import Config + +import requests + +# from keystoneauth1.identity.v3 import AuthMethod +# from keystoneclient.service_catalog import ServiceCatalog + + +class Common(object): + """Common calls for Gnocchi/Aodh plugins.""" + + def __init__(self): + """Create the common instance.""" + self._auth_token = None + self._endpoint = None + self._ks = None + + def _authenticate(self, tenant_id): + """Authenticate and/or renew the authentication token.""" + if self._auth_token is not None: + return self._auth_token + + try: + cfg = Config.instance() + self._ks = client.Client(auth_url=cfg.OS_AUTH_URL, + username=cfg.OS_USERNAME, + password=cfg.OS_PASSWORD, + tenant_name=cfg.OS_TENANT_NAME) + self._auth_token = self._ks.auth_token + except Exception as exc: + + log.warn("Authentication failed with the following exception: %s", + exc) + self._auth_token = None + + return self._auth_token + + def get_endpoint(self, service_type): + """Get the endpoint for Gnocchi/Aodh.""" + try: + return self._ks.service_catalog.url_for( + service_type=service_type, + endpoint_type='internalURL', + region_name='RegionOne') + except Exception as exc: + log.warning("Failed to retreive endpoint for Aodh due to: %s", + exc) + return None + + @classmethod + def _perform_request(cls, url, auth_token, + req_type=None, payload=None, params=None): + """Perform the POST/PUT/GET/DELETE request.""" + # request headers + headers = {'X-Auth-Token': auth_token, + 'Content-type': 'application/json'} + # perform request and return its result + response = None + try: + if req_type == "put": + response = requests.put( + url, data=payload, headers=headers, + timeout=1) + elif req_type == "post": + response = requests.post( + url, data=payload, headers=headers, + timeout=1) + elif req_type == "get": + response = requests.get( + url, params=params, headers=headers, timeout=1) + elif req_type == "delete": + response = requests.delete( + url, headers=headers, timeout=1) + else: + log.warn("Invalid request type") + + except Exception as e: + log.warn("Exception thrown on request", e) + + return response