First instance of the gnocchi plugin 14/2114/1
authorHelena McGough <helena.mcgough@intel.com>
Thu, 31 Aug 2017 07:44:48 +0000 (07:44 +0000)
committerHelena McGough <helena.mcgough@intel.com>
Thu, 31 Aug 2017 14:35:15 +0000 (14:35 +0000)
 - Provides basic functionality for configuring metrics
 based on the resource_id
 - Reads in values from the kafka_producer app

Change-Id: I0a1fb198728d567c6e1b96724b1554383dec45b8
Signed-off-by: Helena McGough <helena.mcgough@intel.com>
plugins/OpenStack/Ceilometer/.gitkeep [deleted file]
plugins/OpenStack/Gnocchi/.gitkeep [new file with mode: 0644]
plugins/OpenStack/Gnocchi/__init__.py [new file with mode: 0644]
plugins/OpenStack/Gnocchi/metrics.py [new file with mode: 0644]
plugins/OpenStack/Gnocchi/plugin_instance.py [new file with mode: 0644]
plugins/OpenStack/__init__.py
plugins/OpenStack/common.py [new file with mode: 0644]

diff --git a/plugins/OpenStack/Ceilometer/.gitkeep b/plugins/OpenStack/Ceilometer/.gitkeep
deleted file mode 100644 (file)
index 2272ebb..0000000
+++ /dev/null
@@ -1 +0,0 @@
-#gitkeep file to keep the initial empty directory structure.
diff --git a/plugins/OpenStack/Gnocchi/.gitkeep b/plugins/OpenStack/Gnocchi/.gitkeep
new file mode 100644 (file)
index 0000000..2272ebb
--- /dev/null
@@ -0,0 +1 @@
+#gitkeep file to keep the initial empty directory structure.
diff --git a/plugins/OpenStack/Gnocchi/__init__.py b/plugins/OpenStack/Gnocchi/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/plugins/OpenStack/Gnocchi/metrics.py b/plugins/OpenStack/Gnocchi/metrics.py
new file mode 100644 (file)
index 0000000..bc1a729
--- /dev/null
@@ -0,0 +1,116 @@
+"""Gnocchi acts on a metric message received from the SO via MON."""
+
+import json
+import logging as log
+
+from kafka import KafkaConsumer
+
+from plugins.OpenStack.common import Common
+
+
+class Metrics(object):
+    """Gnocchi based metric actions performed on info from MON."""
+
+    def __init__(self):
+        """Initialize the metric actions."""
+        self._common = Common()
+
+        # TODO(mcgoughh): Initialize a generic consumer object to consume
+        # message from the SO. This is hardcoded for now
+        server = {'server': 'localhost:9092', 'topic': 'metrics'}
+        self._consumer = KafkaConsumer(server['topic'],
+                                       group_id='my-group',
+                                       bootstrap_servers=server['server'])
+
+        # TODO(mcgoughh): Initialize a producer to send messages bask to the SO
+
+    def metric_calls(self):
+        """Consume info from the message bus to manage metrics."""
+        # Concumer check for metric messages
+        for message in self._consumer:
+
+            if message.topic == "metrics":
+                log.info("Metric action required on this topic: %s",
+                         (message.topic))
+
+                if message.key == "configure_metric":
+                    # Configure/Update a resource and its metric
+                    values = json.loads(message.value)
+                    schema = values['configure_metrics']
+                    metric_details = schema['metrics_configuration']
+
+                    # Generate authentication credentials via keystone:
+                    # auth_token, endpoint
+                    auth_token = self._common._authenticate(
+                        schema['tenant_uuid'])
+                    endpoint = self._common.get_endpoint("metric")
+
+                    metric_id = self.configure_metric(
+                        endpoint, auth_token, metric_details)
+                    log.info("New metric created with metricID: %s", metric_id)
+
+                    # TODO(mcgoughh): will send an acknowledge message back on
+                    # the bus via the producer
+
+                # TODO(mcoughh): Key alternatives are "metric_data_request" and
+                # "metric_data_response" will be accomodated later
+                # Will also need a producer for this functionality
+                elif message.key == "metric_data_request":
+                    log.debug("Key used to request a metrics data")
+
+                elif message.key == "metric_data_response":
+                    log.debug("Key used for a metrics data response")
+
+                else:
+                    log.debug("Unknown key, no action will be performed")
+
+            else:
+                log.info("Message topic not relevant to this plugin: %s",
+                         message.topic)
+
+        return
+
+    def configure_metric(self, endpoint, auth_token, values):
+        """Create the new SO desired metric in Gnocchi."""
+        metric_id = None
+
+        # TODO(mcgoughh): error check the values sent in the message
+        # will query the database for the request resource and then
+        # check that resource for the desired metric
+        metric_name = values['metric_name']
+
+        if metric_id is None:
+
+            # Need to create a new version of the resource for gnocchi to
+            # the new metric
+            resource_url = "{}/v1/resource/generic".format(endpoint)
+
+            metric = {'name': metric_name,
+                      'unit': values['metric_unit'], }
+
+            resource_payload = json.dumps({'id': values['resource_uuid'],
+                                           'metrics': {metric_name: metric}})
+
+            new_resource = self._common._perform_request(
+                resource_url, auth_token,
+                req_type="post", payload=resource_payload)
+            new_metric = json.loads(new_resource.text)['metrics']
+
+            return new_metric[metric_name]
+        else:
+            return metric_id
+
+    def delete_metric(self, endpoint, auth_token, metric_id):
+        """Delete metric."""
+        url = "{}/v1/metric/%s".format(endpoint) % (metric_id)
+
+        self._common._perform_request(url, auth_token, req_type="delete")
+        return None
+
+    def list_metrics(self, endpoint, auth_token):
+        """List all metrics."""
+        url = "{}/v1/metric/".format(endpoint)
+
+        metric_list = self._common._perform_request(
+            url, auth_token, req_type="get")
+        return json.loads(metric_list.text)
diff --git a/plugins/OpenStack/Gnocchi/plugin_instance.py b/plugins/OpenStack/Gnocchi/plugin_instance.py
new file mode 100644 (file)
index 0000000..6f9e306
--- /dev/null
@@ -0,0 +1,36 @@
+"""Gnocchi plugin for the OSM monitoring module."""
+
+import logging as log
+
+from plugins.OpenStack.Gnocchi.metrics import Metrics
+from plugins.OpenStack.settings import Config
+
+
+def register_plugin():
+    """Register the plugin."""
+    config = Config.instance()
+    instance = Plugin(config=config)
+    instance.config()
+    instance.metrics()
+
+
+class Plugin(object):
+    """Gnocchi plugin for OSM MON."""
+
+    def __init__(self, config):
+        """Plugin instance."""
+        log.info("Initialze the plugin instance.")
+        self._config = config
+        self._metrics = Metrics()
+
+    def config(self):
+        """Configure plugin."""
+        log.info("Configure the plugin instance.")
+        self._config.read_environ("gnocchi")
+
+    def metrics(self):
+        """Initialize metric functionality."""
+        log.info("Initialize metric functionality.")
+        self._metrics.metric_calls()
+
+register_plugin()
index 805dce3..e69de29 100644 (file)
Binary files a/plugins/OpenStack/__init__.py and b/plugins/OpenStack/__init__.py differ
diff --git a/plugins/OpenStack/common.py b/plugins/OpenStack/common.py
new file mode 100644 (file)
index 0000000..d706456
--- /dev/null
@@ -0,0 +1,86 @@
+"""Common methods for the Aodh Sender/Receiver."""
+
+import logging as log
+
+from keystoneclient.v3 import client
+
+from plugins.OpenStack.settings import Config
+
+import requests
+
+# from keystoneauth1.identity.v3 import AuthMethod
+# from keystoneclient.service_catalog import ServiceCatalog
+
+
+class Common(object):
+    """Common calls for Gnocchi/Aodh plugins."""
+
+    def __init__(self):
+        """Create the common instance."""
+        self._auth_token = None
+        self._endpoint = None
+        self._ks = None
+
+    def _authenticate(self, tenant_id):
+        """Authenticate and/or renew the authentication token."""
+        if self._auth_token is not None:
+            return self._auth_token
+
+        try:
+            cfg = Config.instance()
+            self._ks = client.Client(auth_url=cfg.OS_AUTH_URL,
+                                     username=cfg.OS_USERNAME,
+                                     password=cfg.OS_PASSWORD,
+                                     tenant_name=cfg.OS_TENANT_NAME)
+            self._auth_token = self._ks.auth_token
+        except Exception as exc:
+
+            log.warn("Authentication failed with the following exception: %s",
+                     exc)
+            self._auth_token = None
+
+        return self._auth_token
+
+    def get_endpoint(self, service_type):
+        """Get the endpoint for Gnocchi/Aodh."""
+        try:
+            return self._ks.service_catalog.url_for(
+                service_type=service_type,
+                endpoint_type='internalURL',
+                region_name='RegionOne')
+        except Exception as exc:
+            log.warning("Failed to retreive endpoint for Aodh due to: %s",
+                        exc)
+        return None
+
+    @classmethod
+    def _perform_request(cls, url, auth_token,
+                         req_type=None, payload=None, params=None):
+        """Perform the POST/PUT/GET/DELETE request."""
+        # request headers
+        headers = {'X-Auth-Token': auth_token,
+                   'Content-type': 'application/json'}
+        # perform request and return its result
+        response = None
+        try:
+            if req_type == "put":
+                response = requests.put(
+                    url, data=payload, headers=headers,
+                    timeout=1)
+            elif req_type == "post":
+                response = requests.post(
+                    url, data=payload, headers=headers,
+                    timeout=1)
+            elif req_type == "get":
+                response = requests.get(
+                    url, params=params, headers=headers, timeout=1)
+            elif req_type == "delete":
+                response = requests.delete(
+                    url, headers=headers, timeout=1)
+            else:
+                log.warn("Invalid request type")
+
+        except Exception as e:
+            log.warn("Exception thrown on request", e)
+
+        return response