--- /dev/null
+"""Gnocchi acts on a metric message received from the SO via MON."""
+
+import json
+import logging as log
+
+from kafka import KafkaConsumer
+
+from plugins.OpenStack.common import Common
+
+
+class Metrics(object):
+ """Gnocchi based metric actions performed on info from MON."""
+
+ def __init__(self):
+ """Initialize the metric actions."""
+ self._common = Common()
+
+ # TODO(mcgoughh): Initialize a generic consumer object to consume
+ # message from the SO. This is hardcoded for now
+ server = {'server': 'localhost:9092', 'topic': 'metrics'}
+ self._consumer = KafkaConsumer(server['topic'],
+ group_id='my-group',
+ bootstrap_servers=server['server'])
+
+ # TODO(mcgoughh): Initialize a producer to send messages bask to the SO
+
+ def metric_calls(self):
+ """Consume info from the message bus to manage metrics."""
+ # Concumer check for metric messages
+ for message in self._consumer:
+
+ if message.topic == "metrics":
+ log.info("Metric action required on this topic: %s",
+ (message.topic))
+
+ if message.key == "configure_metric":
+ # Configure/Update a resource and its metric
+ values = json.loads(message.value)
+ schema = values['configure_metrics']
+ metric_details = schema['metrics_configuration']
+
+ # Generate authentication credentials via keystone:
+ # auth_token, endpoint
+ auth_token = self._common._authenticate(
+ schema['tenant_uuid'])
+ endpoint = self._common.get_endpoint("metric")
+
+ metric_id = self.configure_metric(
+ endpoint, auth_token, metric_details)
+ log.info("New metric created with metricID: %s", metric_id)
+
+ # TODO(mcgoughh): will send an acknowledge message back on
+ # the bus via the producer
+
+ # TODO(mcoughh): Key alternatives are "metric_data_request" and
+ # "metric_data_response" will be accomodated later
+ # Will also need a producer for this functionality
+ elif message.key == "metric_data_request":
+ log.debug("Key used to request a metrics data")
+
+ elif message.key == "metric_data_response":
+ log.debug("Key used for a metrics data response")
+
+ else:
+ log.debug("Unknown key, no action will be performed")
+
+ else:
+ log.info("Message topic not relevant to this plugin: %s",
+ message.topic)
+
+ return
+
+ def configure_metric(self, endpoint, auth_token, values):
+ """Create the new SO desired metric in Gnocchi."""
+ metric_id = None
+
+ # TODO(mcgoughh): error check the values sent in the message
+ # will query the database for the request resource and then
+ # check that resource for the desired metric
+ metric_name = values['metric_name']
+
+ if metric_id is None:
+
+ # Need to create a new version of the resource for gnocchi to
+ # the new metric
+ resource_url = "{}/v1/resource/generic".format(endpoint)
+
+ metric = {'name': metric_name,
+ 'unit': values['metric_unit'], }
+
+ resource_payload = json.dumps({'id': values['resource_uuid'],
+ 'metrics': {metric_name: metric}})
+
+ new_resource = self._common._perform_request(
+ resource_url, auth_token,
+ req_type="post", payload=resource_payload)
+ new_metric = json.loads(new_resource.text)['metrics']
+
+ return new_metric[metric_name]
+ else:
+ return metric_id
+
+ def delete_metric(self, endpoint, auth_token, metric_id):
+ """Delete metric."""
+ url = "{}/v1/metric/%s".format(endpoint) % (metric_id)
+
+ self._common._perform_request(url, auth_token, req_type="delete")
+ return None
+
+ def list_metrics(self, endpoint, auth_token):
+ """List all metrics."""
+ url = "{}/v1/metric/".format(endpoint)
+
+ metric_list = self._common._perform_request(
+ url, auth_token, req_type="get")
+ return json.loads(metric_list.text)
--- /dev/null
+"""Gnocchi plugin for the OSM monitoring module."""
+
+import logging as log
+
+from plugins.OpenStack.Gnocchi.metrics import Metrics
+from plugins.OpenStack.settings import Config
+
+
+def register_plugin():
+ """Register the plugin."""
+ config = Config.instance()
+ instance = Plugin(config=config)
+ instance.config()
+ instance.metrics()
+
+
+class Plugin(object):
+ """Gnocchi plugin for OSM MON."""
+
+ def __init__(self, config):
+ """Plugin instance."""
+ log.info("Initialze the plugin instance.")
+ self._config = config
+ self._metrics = Metrics()
+
+ def config(self):
+ """Configure plugin."""
+ log.info("Configure the plugin instance.")
+ self._config.read_environ("gnocchi")
+
+ def metrics(self):
+ """Initialize metric functionality."""
+ log.info("Initialize metric functionality.")
+ self._metrics.metric_calls()
+
+register_plugin()
--- /dev/null
+"""Common methods for the Aodh Sender/Receiver."""
+
+import logging as log
+
+from keystoneclient.v3 import client
+
+from plugins.OpenStack.settings import Config
+
+import requests
+
+# from keystoneauth1.identity.v3 import AuthMethod
+# from keystoneclient.service_catalog import ServiceCatalog
+
+
+class Common(object):
+ """Common calls for Gnocchi/Aodh plugins."""
+
+ def __init__(self):
+ """Create the common instance."""
+ self._auth_token = None
+ self._endpoint = None
+ self._ks = None
+
+ def _authenticate(self, tenant_id):
+ """Authenticate and/or renew the authentication token."""
+ if self._auth_token is not None:
+ return self._auth_token
+
+ try:
+ cfg = Config.instance()
+ self._ks = client.Client(auth_url=cfg.OS_AUTH_URL,
+ username=cfg.OS_USERNAME,
+ password=cfg.OS_PASSWORD,
+ tenant_name=cfg.OS_TENANT_NAME)
+ self._auth_token = self._ks.auth_token
+ except Exception as exc:
+
+ log.warn("Authentication failed with the following exception: %s",
+ exc)
+ self._auth_token = None
+
+ return self._auth_token
+
+ def get_endpoint(self, service_type):
+ """Get the endpoint for Gnocchi/Aodh."""
+ try:
+ return self._ks.service_catalog.url_for(
+ service_type=service_type,
+ endpoint_type='internalURL',
+ region_name='RegionOne')
+ except Exception as exc:
+ log.warning("Failed to retreive endpoint for Aodh due to: %s",
+ exc)
+ return None
+
+ @classmethod
+ def _perform_request(cls, url, auth_token,
+ req_type=None, payload=None, params=None):
+ """Perform the POST/PUT/GET/DELETE request."""
+ # request headers
+ headers = {'X-Auth-Token': auth_token,
+ 'Content-type': 'application/json'}
+ # perform request and return its result
+ response = None
+ try:
+ if req_type == "put":
+ response = requests.put(
+ url, data=payload, headers=headers,
+ timeout=1)
+ elif req_type == "post":
+ response = requests.post(
+ url, data=payload, headers=headers,
+ timeout=1)
+ elif req_type == "get":
+ response = requests.get(
+ url, params=params, headers=headers, timeout=1)
+ elif req_type == "delete":
+ response = requests.delete(
+ url, headers=headers, timeout=1)
+ else:
+ log.warn("Invalid request type")
+
+ except Exception as e:
+ log.warn("Exception thrown on request", e)
+
+ return response