From: Helena McGough Date: Tue, 12 Sep 2017 07:30:02 +0000 (+0100) Subject: Updated the OpenStack plugins X-Git-Tag: v4.0.0~88 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=refs%2Fchanges%2F73%2F2173%2F1;p=osm%2FMON.git Updated the OpenStack plugins - Included all of the licenses - Included read_metrics/list_metrics and notify alarm - Updated create metrics - Included response messages which are published via the producer Signed-off-by: Helena McGough --- diff --git a/plugins/OpenStack/Aodh/alarming.py b/plugins/OpenStack/Aodh/alarming.py index 0f4a2da..f530a35 100644 --- a/plugins/OpenStack/Aodh/alarming.py +++ b/plugins/OpenStack/Aodh/alarming.py @@ -1,14 +1,50 @@ -"""Send alarm info from Aodh to SO via MON.""" +# Copyright 2017 Intel Research and Development Ireland Limited +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Intel Corporation + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: helena.mcgough@intel.com or adrian.hoban@intel.com +## +"""Carry out alarming requests via Aodh API.""" import json import logging as log -from collections import OrderedDict +from core.message_bus.producer import KafkaProducer from kafka import KafkaConsumer from plugins.OpenStack.common import Common - +from plugins.OpenStack.response import OpenStack_Response + +__author__ = "Helena McGough" + +ALARM_NAMES = [ + "Average_Memory_Usage_Above_Threshold", + "Read_Latency_Above_Threshold", + "Write_Latency_Above_Threshold", + "DISK_READ_OPS", + "DISK_WRITE_OPS", + "DISK_READ_BYTES", + "DISK_WRITE_BYTES", + "Net_Packets_Dropped", + "Packets_in_Above_Threshold", + "Packets_out_Above_Threshold", + "CPU_Utilization_Above_Threshold"] SEVERITIES = { "WARNING": "low", @@ -19,14 +55,11 @@ SEVERITIES = { class Alarming(object): - """Receives alarm info from Aodh.""" + """Carries out alarming requests and responses via Aodh API.""" def __init__(self): - """Create the aodh_receiver instance.""" + """Create the OpenStack alarming instance.""" self._common = Common() - self.auth_token = None - self.endpoint = None - self.resp_status = None # TODO(mcgoughh): Remove hardcoded kafkaconsumer # Initialize a generic consumer object to consume message from the SO @@ -35,7 +68,11 @@ class Alarming(object): group_id='osm_mon', bootstrap_servers=server['server']) - # TODO(mcgoughh): Initialize a producer to send messages bask to the SO + # Use the Response class to generate valid json response messages + self._response = OpenStack_Response() + + # Initializer a producer to send responses back to SO + self._producer = KafkaProducer("alarm_response") def alarming(self): """Consume info from the message bus to manage alarms.""" @@ -48,94 +85,116 @@ class Alarming(object): if vim_type == "openstack": log.info("Alarm action required: %s" % (message.topic)) + # Generate and auth_token and endpoint for request + auth_token, endpoint = self.authenticate(values) + if message.key == "create_alarm_request": # Configure/Update an alarm alarm_details = values['alarm_create_request'] - # Generate an auth_token and endpoint - auth_token = self._common._authenticate( - tenant_id=alarm_details['tenant_uuid']) - endpoint = self._common.get_endpoint("alarming") - - alarm_id = self.configure_alarm( + alarm_id, alarm_status = self.configure_alarm( endpoint, auth_token, alarm_details) - # TODO(mcgoughh): will send an acknowledge message back on - # the bus via the producer - if alarm_id is not None: - self.resp_status = True - log.debug("A valid alarm was found/created: %s", - self.resp_status) - else: - self.resp_status = False - log.debug("Failed to create desired alarm: %s", - self.resp_status) + # Generate a valid response message, send via producer + try: + resp_message = self._response.generate_response( + 'create_alarm_response', status=alarm_status, + alarm_id=alarm_id, + cor_id=alarm_details['correlation_id']) + self._producer.create_alarm_response( + 'create_alarm_resonse', resp_message, + 'alarm_response') + except Exception as exc: + log.warn("Response creation failed: %s", exc) elif message.key == "list_alarm_request": - auth_token = self._common._authenticate() - endpoint = self._common.get_endpoint("alarming") - - # List all of the alarms - alarm_list = self.list_alarms(endpoint, auth_token) - - # TODO(mcgoughh): send a repsonse back to SO - if alarm_list is not None: - self.resp_status = True - log.info("A list of alarms was generated: %s", - alarm_list) - else: - self.resp_status = False - log.warn("Failed to generae an alarm list") + # Check for a specifed: alarm_name, resource_uuid, severity + # and generate the appropriate list + list_details = values['alarm_list_request'] + try: + name = list_details['alarm_name'] + alarm_list = self.list_alarms( + endpoint, auth_token, alarm_name=name) + except Exception as a_name: + log.debug("No name specified for list:%s", a_name) + try: + resource = list_details['resource_uuid'] + alarm_list = self.list_alarms( + endpoint, auth_token, resource_id=resource) + except Exception as r_id: + log.debug("No resource id specified for this list:\ + %s", r_id) + try: + severe = list_details['severity'] + alarm_list = self.list_alarms( + endpoint, auth_token, severity=severe) + except Exception as exc: + log.warn("No severity specified for list: %s.\ + will return full list.", exc) + alarm_list = self.list_alarms( + endpoint, auth_token) + + try: + # Generate and send a list response back + resp_message = self._response.generate_response( + 'list_alarm_response', alarm_list=alarm_list, + cor_id=list_details['correlation_id']) + self._producer.list_alarm_response( + 'list_alarm_response', resp_message, + 'alarm_response') + except Exception as exc: + log.warn("Failed to send a valid response back.") elif message.key == "delete_alarm_request": - # Delete the specified alarm - auth_token = self._common._authenticate() - endpoint = self._common.get_endpoint("alarming") - - alarm_id = values['alarm_delete_request']['alarm_uuid'] + request_details = values['alarm_delete_request'] + alarm_id = request_details['alarm_uuid'] - response = self.delete_alarm( + resp_status = self.delete_alarm( endpoint, auth_token, alarm_id) - # TODO(mcgoughh): send a response back on the bus - if response is True: - log.info("Requested alarm has been deleted: %s", - alarm_id) - else: - log.warn("Failed to delete requested alarm.") + # Generate and send a response message + try: + resp_message = self._response.generate_response( + 'delete_alarm_response', alarm_id=alarm_id, + status=resp_status, + cor_id=request_details['correlation_id']) + self._producer.delete_alarm_response( + 'delete_alarm_response', resp_message, + 'alarm_response') + except Exception as exc: + log.warn("Failed to create delete reponse:%s", exc) elif message.key == "acknowledge_alarm": # Acknowledge that an alarm has been dealt with by the SO - # Set its state to ok - auth_token = self._common._authenticate() - endpoint = self._common.get_endpoint("alarming") - alarm_id = values['ack_details']['alarm_uuid'] response = self.update_alarm_state( endpoint, auth_token, alarm_id) + # Log if an alarm was reset if response is True: - log.info("Status has been updated for alarm, %s.", - alarm_id) + log.info("Acknowledged the alarm and cleared it.") else: - log.warn("Failed update the state of requested alarm.") + log.warn("Failed to acknowledge/clear the alarm.") elif message.key == "update_alarm_request": # Update alarm configurations - auth_token = self._common._authenticate() - endpoint = self._common.get_endpoint("alarming") - alarm_details = values['alarm_update_request'] - alarm_id = self.update_alarm( + alarm_id, status = self.update_alarm( endpoint, auth_token, alarm_details) - # TODO(mcgoughh): send a response message to the SO - if alarm_id is not None: - log.info("Alarm configuration was update correctly.") - else: - log.warn("Unable to update the specified alarm") + # Generate a response for an update request + try: + resp_message = self._response.generate_response( + 'update_alarm_response', alarm_id=alarm_id, + cor_id=alarm_details['correlation_id'], + status=status) + self._producer.update_alarm_response( + 'update_alarm_response', resp_message, + 'alarm_response') + except Exception as exc: + log.warn("Failed to send an update response:%s", exc) else: log.debug("Unknown key, no action will be performed") @@ -145,108 +204,95 @@ class Alarming(object): return - def get_alarm_id(self, endpoint, auth_token, alarm_name): - """Get a list of alarms that exist in Aodh.""" - alarm_id = None - url = "{}/v2/alarms/".format(endpoint) - - # TODO(mcgoughh): will query on resource_id once it has been - # implemented need to create the query field when creating - # the alarm - query = OrderedDict([("q.field", 'name'), ("q.op", "eq"), - ("q.value", alarm_name)]) - - result = self._common._perform_request( - url, auth_token, req_type="get", params=query) - - try: - alarm_id = json.loads(result.text)[0]['alarm_id'] - log.info("An existing alarm was found: %s", alarm_id) - return alarm_id - except Exception: - log.debug("Alarm doesn't exist, needs to be created.") - return alarm_id - def configure_alarm(self, endpoint, auth_token, values): """Create requested alarm in Aodh.""" url = "{}/v2/alarms/".format(endpoint) + # Check if the desired alarm is supported alarm_name = values['alarm_name'] + if alarm_name not in ALARM_NAMES: + log.warn("This alarm is not supported, by a valid metric.") + return None, False - # Confirm alarm doesn't exist - alarm_id = self.get_alarm_id(endpoint, auth_token, alarm_name) - if alarm_id is None: - # Try to create the alarm - try: - metric_name = values['metric_name'] - resource_id = values['resource_uuid'] - payload = self.check_payload(values, metric_name, resource_id, - alarm_name) - new_alarm = self._common._perform_request( - url, auth_token, req_type="post", payload=payload) - - return json.loads(new_alarm.text)['alarm_id'] - except Exception as exc: - log.warn("Alarm creation could not be performed: %s", exc) - return alarm_id - else: - log.warn("This alarm already exists. Try an update instead.") - return None + try: + metric_name = values['metric_name'] + resource_id = values['resource_uuid'] + # Check the payload for the desired alarm + payload = self.check_payload(values, metric_name, resource_id, + alarm_name) + new_alarm = self._common._perform_request( + url, auth_token, req_type="post", payload=payload) + + return json.loads(new_alarm.text)['alarm_id'], True + except Exception as exc: + log.warn("Alarm creation could not be performed: %s", exc) + return None, False def delete_alarm(self, endpoint, auth_token, alarm_id): """Delete alarm function.""" url = "{}/v2/alarms/%s".format(endpoint) % (alarm_id) - result = False try: - self._common._perform_request(url, auth_token, req_type="delete") - return True + result = self._common._perform_request( + url, auth_token, req_type="delete") + if str(result.status_code) == "404": + # If status code is 404 alarm did not exist + return False + else: + return True + except Exception as exc: log.warn("Failed to delete alarm: %s because %s.", alarm_id, exc) - return result + return False def list_alarms(self, endpoint, auth_token, alarm_name=None, resource_id=None, severity=None): """Generate the requested list of alarms.""" - result = None - if (alarm_name and resource_id and severity) is None: - # List all alarms - url = "{}/v2/alarms/".format(endpoint) - - try: - result = self._common._perform_request( - url, auth_token, req_type="get") - return json.loads(result.text) - except Exception as exc: - log.warn("Unable to generate alarm list: %s", exc) + url = "{}/v2/alarms/".format(endpoint) + alarm_list = [] - return result + result = self._common._perform_request( + url, auth_token, req_type="get") + if result is not None: + # Check for a specified list based on: + # alarm_name, severity, resource_id + if alarm_name is not None: + for alarm in json.loads(result.text): + if alarm_name in str(alarm): + alarm_list.append(str(alarm)) + elif resource_id is not None: + for alarm in json.loads(result.text): + if resource_id in str(alarm): + alarm_list.append(str(alarm)) + elif severity is not None: + for alarm in json.loads(result.text): + if severity in str(alarm): + alarm_list.append(str(alarm)) + else: + alarm_list = result.text else: - # TODO(mcgoughh): support more specific lists - log.debug("Requested list is unavailable") - - return result + return None + return alarm_list def update_alarm_state(self, endpoint, auth_token, alarm_id): """Set the state of an alarm to ok when ack message is received.""" - result = False - url = "{}/v2/alarms/%s/state".format(endpoint) % alarm_id payload = json.dumps("ok") try: - result = self._common._perform_request( + self._common._perform_request( url, auth_token, req_type="put", payload=payload) return True except Exception as exc: log.warn("Unable to update alarm state: %s", exc) - return result + return False def update_alarm(self, endpoint, auth_token, values): """Get alarm name for an alarm configuration update.""" # Get already existing alarm details url = "{}/v2/alarms/%s".format(endpoint) % values['alarm_uuid'] + # Gets current configurations about the alarm try: result = self._common._perform_request( url, auth_token, req_type="get") @@ -258,22 +304,23 @@ class Alarming(object): except Exception as exc: log.warn("Failed to retreive existing alarm info: %s.\ Can only update OSM created alarms.", exc) - return None + return None, False - # Genate and check payload configuration for alarm update + # Generates and check payload configuration for alarm update payload = self.check_payload(values, metric_name, resource_id, alarm_name, alarm_state=alarm_state) + # Updates the alarm configurations with the valid payload if payload is not None: try: update_alarm = self._common._perform_request( url, auth_token, req_type="put", payload=payload) - return json.loads(update_alarm.text)['alarm_id'] + return json.loads(update_alarm.text)['alarm_id'], True except Exception as exc: log.warn("Alarm update could not be performed: %s", exc) - return None - return None + return None, False + return None, False def check_payload(self, values, metric_name, resource_id, alarm_name, alarm_state=None): @@ -283,11 +330,12 @@ class Alarming(object): severity = values['severity'] if severity == "INDETERMINATE": alarm_state = "insufficient data" - if alarm_state is None: alarm_state = "ok" # Try to configure the payload for the update/create request + # Can only update: threshold, operation, statistic and + # the severity of the alarm rule = {'threshold': values['threshold_value'], 'comparison_operator': values['operation'].lower(), 'metric': metric_name, @@ -303,3 +351,30 @@ class Alarming(object): except KeyError as exc: log.warn("Alarm is not configured correctly: %s", exc) return None + + def authenticate(self, values): + """Generate an authentication token and endpoint for alarm request.""" + try: + # Check for a tenant_id + auth_token = self._common._authenticate( + tenant_id=values['tenant_uuid']) + endpoint = self._common.get_endpoint("alarming") + except Exception as exc: + log.warn("Tenant ID is not specified. Will use a generic\ + authentication: %s", exc) + auth_token = self._common._authenticate() + endpoint = self._common.get_endpoint("alarming") + + return auth_token, endpoint + + def get_alarm_state(self, endpoint, auth_token, alarm_id): + """Get the state of the alarm.""" + url = "{}/v2/alarms/%s/state".format(endpoint) % alarm_id + + try: + alarm_state = self._common._perform_request( + url, auth_token, req_type="get") + return json.loads(alarm_state.text) + except Exception as exc: + log.warn("Failed to get the state of the alarm:%s", exc) + return None diff --git a/plugins/OpenStack/Aodh/notifier.py b/plugins/OpenStack/Aodh/notifier.py new file mode 100644 index 0000000..bd36f18 --- /dev/null +++ b/plugins/OpenStack/Aodh/notifier.py @@ -0,0 +1,84 @@ +# Copyright 2017 Intel Research and Development Ireland Limited +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Intel Corporation + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: helena.mcgough@intel.com or adrian.hoban@intel.com +## +"""Notifier class for alarm notification response.""" + +import json +import logging as log + +from core.message_bus.producer import KafkaProducer + +from plugins.OpenStack.response import OpenStack_Response +from plugins.OpenStack.singleton import Singleton + +__author__ = "Helena McGough" + +ALARM_NAMES = [ + "Average_Memory_Usage_Above_Threshold", + "Read_Latency_Above_Threshold", + "Write_Latency_Above_Threshold", + "DISK_READ_OPS", + "DISK_WRITE_OPS", + "DISK_READ_BYTES", + "DISK_WRITE_BYTES", + "Net_Packets_Dropped", + "Packets_in_Above_Threshold", + "Packets_out_Above_Threshold", + "CPU_Utilization_Above_Threshold"] + + +@Singleton +class Notifier(object): + """Alarm Notification class.""" + + def __init__(self): + """Initialize alarm notifier.""" + self._response = OpenStack_Response() + + self._producer = KafkaProducer("alarm_response", None) + + def notify(self, alarming): + """Send alarm notifications responses to the SO.""" + auth_token, endpoint = alarming.authenticate(None) + + while(1): + alarm_list = json.loads(alarming.list_alarms(endpoint, auth_token)) + for alarm in alarm_list: + alarm_id = alarm['alarm_id'] + alarm_name = alarm['name'] + # Send a notification response to the SO on alarm trigger + if alarm_name in ALARM_NAMES: + alarm_state = alarming.get_alarm_state( + endpoint, auth_token, alarm_id) + if alarm_state == "alarm": + # Generate and send an alarm notification response + try: + a_date = alarm['state_timestamp'].replace("T", " ") + rule = alarm['gnocchi_resources_threshold_rule'] + resp_message = self._response.generate_response( + 'notify_alarm', a_id=alarm_id, + r_id=rule['resource_id'], + sev=alarm['severity'], date=a_date, + state=alarm_state, vim_type="OpenStack") + self._producer.notify_alarm( + 'notify_alarm', resp_message, 'alarm_response') + except Exception as exc: + log.warn("Failed to send notify response:%s", exc) diff --git a/plugins/OpenStack/Aodh/plugin_instance.py b/plugins/OpenStack/Aodh/plugin_instance.py index 364a12e..5b8bbd0 100644 --- a/plugins/OpenStack/Aodh/plugin_instance.py +++ b/plugins/OpenStack/Aodh/plugin_instance.py @@ -1,32 +1,62 @@ +# Copyright 2017 Intel Research and Development Ireland Limited +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Intel Corporation + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: helena.mcgough@intel.com or adrian.hoban@intel.com +## """Aodh plugin for the OSM monitoring module.""" import logging as log -#import sys +# import sys -#path = "/home/stack/MON" -#if path not in sys.path: +# path = "/opt/stack/MON" +# if path not in sys.path: # sys.path.append(path) from plugins.OpenStack.Aodh.alarming import Alarming +from plugins.OpenStack.Aodh.notifier import Notifier from plugins.OpenStack.settings import Config +__author__ = "Helena McGough" + def register_plugin(): """Register the plugin.""" + # Initialize configuration and notifications config = Config.instance() - instance = Plugin(config=config) + notifier = Notifier.instance() + + # Intialize plugin + instance = Plugin(config=config, notifier=notifier) instance.config() instance.alarm() + instance.notify() class Plugin(object): """Aodh plugin for OSM MON.""" - def __init__(self, config): + def __init__(self, config, notifier): """Plugin instance.""" log.info("Initialze the plugin instance.") self._config = config - self._alarm = Alarming() + self._alarming = Alarming() + self._notifier = notifier def config(self): """Configure plugin.""" @@ -36,6 +66,13 @@ class Plugin(object): def alarm(self): """Allow alarm info to be received from Aodh.""" log.info("Begin alarm functionality.") - self._alarm.alarming() + self._alarming.alarming() + + def notify(self): + """Send notifications to the SO.""" + # TODO(mcgoughh): Run simultaneously so that notifications + # can be sent while messages are being consumed + log.info("Sending Openstack notifications to the SO.") + self._notifier.notify(self._alarming) register_plugin() diff --git a/plugins/OpenStack/Gnocchi/metrics.py b/plugins/OpenStack/Gnocchi/metrics.py index bc1a729..ca6f47a 100644 --- a/plugins/OpenStack/Gnocchi/metrics.py +++ b/plugins/OpenStack/Gnocchi/metrics.py @@ -1,15 +1,63 @@ -"""Gnocchi acts on a metric message received from the SO via MON.""" +# Copyright 2017 Intel Research and Development Ireland Limited +# ************************************************************* +# This file is part of OSM Monitoring module +# All Rights Reserved to Intel Corporation + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: helena.mcgough@intel.com or adrian.hoban@intel.com +## +"""Carry out OpenStack metric requests via Gnocchi API.""" + +import datetime import json import logging as log +import time + +from core.message_bus.producer import KafkaProducer from kafka import KafkaConsumer from plugins.OpenStack.common import Common +from plugins.OpenStack.response import OpenStack_Response + +__author__ = "Helena McGough" + +METRIC_MAPPINGS = { + "AVERAGE_MEMORY_UTILIZATION": "memory.percent", + "DISK_READ_OPS": "disk.disk_ops", + "DISK_WRITE_OPS": "disk.disk_ops", + "DISK_READ_BYTES": "disk.disk_octets", + "DISK_WRITE_BYTES": "disk.disk_octets", + "PACKETS_DROPPED": "interface.if_dropped", + "PACKETS_RECEIVED": "interface.if_packets", + "PACKETS_SENT": "interface.if_packets", + "CPU_UTILIZATION": "cpu.percent", +} + +PERIOD_MS = { + "HR": 3600000, + "DAY": 86400000, + "WEEK": 604800000, + "MONTH": 2629746000, + "YEAR": 31556952000 +} class Metrics(object): - """Gnocchi based metric actions performed on info from MON.""" + """OpenStack metric requests performed via the Gnocchi API.""" def __init__(self): """Initialize the metric actions.""" @@ -17,100 +65,349 @@ class Metrics(object): # TODO(mcgoughh): Initialize a generic consumer object to consume # message from the SO. This is hardcoded for now - server = {'server': 'localhost:9092', 'topic': 'metrics'} + server = {'server': 'localhost:9092', 'topic': 'metric_request'} self._consumer = KafkaConsumer(server['topic'], - group_id='my-group', + group_id='osm_mon', bootstrap_servers=server['server']) - # TODO(mcgoughh): Initialize a producer to send messages bask to the SO + # Use the Response class to generate valid json response messages + self._response = OpenStack_Response() + + # Initializer a producer to send responses back to SO + self._producer = KafkaProducer("metric_response") def metric_calls(self): - """Consume info from the message bus to manage metrics.""" - # Concumer check for metric messages + """Consume info from the message bus to manage metric requests.""" + # Consumer check for metric messages for message in self._consumer: + # Check if this plugin should carry out this request + values = json.loads(message.value) + vim_type = values['vim_type'].lower() + + if vim_type == "openstack": + # Generate auth_token and endpoint + auth_token, endpoint = self.authenticate(values) - if message.topic == "metrics": - log.info("Metric action required on this topic: %s", - (message.topic)) + if message.key == "create_metric_request": + # Configure metric + metric_details = values['metric_create'] + metric_id, resource_id, status = self.configure_metric( + endpoint, auth_token, metric_details) - if message.key == "configure_metric": - # Configure/Update a resource and its metric - values = json.loads(message.value) - schema = values['configure_metrics'] - metric_details = schema['metrics_configuration'] + # Generate and send a create metric response + try: + resp_message = self._response.generate_response( + 'create_metric_response', status=status, + cor_id=values['correlation_id'], + metric_id=metric_id, r_id=resource_id) + self._producer.create_metrics_resp( + 'create_metric_response', resp_message, + 'metric_response') + except Exception as exc: + log.warn("Failed to create response: %s", exc) - # Generate authentication credentials via keystone: - # auth_token, endpoint - auth_token = self._common._authenticate( - schema['tenant_uuid']) - endpoint = self._common.get_endpoint("metric") + elif message.key == "read_metric_data_request": + # Read all metric data related to a specified metric + timestamps, metric_data = self.read_metric_data( + endpoint, auth_token, values) - metric_id = self.configure_metric( - endpoint, auth_token, metric_details) - log.info("New metric created with metricID: %s", metric_id) + # Generate and send a response message + try: + resp_message = self._response.generate_response( + 'read_metric_data_response', + m_id=values['metric_uuid'], + m_name=values['metric_name'], + r_id=values['resource_uuid'], + cor_id=values['correlation_id'], + times=timestamps, metrics=metric_data) + self._producer.read_metric_data_response( + 'read_metric_data_response', resp_message, + 'metric_response') + except Exception as exc: + log.warn("Failed to send read metric response:%s", exc) - # TODO(mcgoughh): will send an acknowledge message back on - # the bus via the producer + elif message.key == "delete_metric_request": + # delete the specified metric in the request + metric_id = values['metric_uuid'] + status = self.delete_metric( + endpoint, auth_token, metric_id) - # TODO(mcoughh): Key alternatives are "metric_data_request" and - # "metric_data_response" will be accomodated later - # Will also need a producer for this functionality - elif message.key == "metric_data_request": - log.debug("Key used to request a metrics data") + # Generate and send a response message + try: + resp_message = self._response.generate_response( + 'delete_metric_response', m_id=metric_id, + m_name=values['metric_name'], + status=status, r_id=values['resource_uuid'], + cor_id=values['correlation_id']) + self._producer.delete_metric_response( + 'delete_metric_response', resp_message, + 'metric_response') + except Exception as exc: + log.warn("Failed to send delete response:%s", exc) - elif message.key == "metric_data_response": - log.debug("Key used for a metrics data response") + elif message.key == "update_metric_request": + # Gnocchi doesn't support configuration updates + # Log and send a response back to this effect + log.warn("Gnocchi doesn't support metric configuration\ + updates.") + req_details = values['metric_create'] + metric_name = req_details['metric_name'] + resource_id = req_details['resource_uuid'] + metric_id = self.get_metric_id( + endpoint, auth_token, metric_name, resource_id) - else: - log.debug("Unknown key, no action will be performed") + # Generate and send a response message + try: + resp_message = self._response.generate_response( + 'update_metric_response', status=False, + cor_id=values['correlation_id'], + r_id=resource_id, m_id=metric_id) + self._producer.update_metric_response( + 'update_metric_response', resp_message, + 'metric_response') + except Exception as exc: + log.warn("Failed to send an update response:%s", exc) + + elif message.key == "list_metric_request": + list_details = values['metrics_list_request'] + + metric_list = self.list_metrics( + endpoint, auth_token, list_details) + + # Generate and send a response message + try: + resp_message = self._response.generate_response( + 'list_metric_response', m_list=metric_list, + cor_id=list_details['correlation_id']) + self._producer.list_metric_response( + 'list_metric_response', resp_message, + 'metric_response') + except Exception as exc: + log.warn("Failed to send a list response:%s", exc) + else: + log.warn("Unknown key, no action will be performed.") else: - log.info("Message topic not relevant to this plugin: %s", - message.topic) + log.debug("Message is not for this OpenStack.") return def configure_metric(self, endpoint, auth_token, values): - """Create the new SO desired metric in Gnocchi.""" - metric_id = None + """Create the new metric in Gnocchi.""" + try: + resource_id = values['resource_uuid'] + except KeyError: + log.warn("Resource is not defined correctly.") + return None, None, False - # TODO(mcgoughh): error check the values sent in the message - # will query the database for the request resource and then - # check that resource for the desired metric - metric_name = values['metric_name'] + # Check/Normalize metric name + metric_name, norm_name = self.get_metric_name(values) + if norm_name is None: + log.warn("This metric is not supported by this plugin.") + return None, resource_id, False - if metric_id is None: + # Check for an existing metric for this resource + metric_id = self.get_metric_id( + endpoint, auth_token, metric_name, resource_id) + if metric_id is None: # Need to create a new version of the resource for gnocchi to - # the new metric - resource_url = "{}/v1/resource/generic".format(endpoint) + # create the new metric based on that resource + url = "{}/v1/resource/generic".format(endpoint) + try: + # Try to create a new resource for the new metric + metric = {'name': metric_name, + 'archive_policy_name': 'high', + 'unit': values['metric_unit'], } - metric = {'name': metric_name, - 'unit': values['metric_unit'], } + resource_payload = json.dumps({'id': resource_id, + 'metrics': { + metric_name: metric}}) - resource_payload = json.dumps({'id': values['resource_uuid'], - 'metrics': {metric_name: metric}}) + new_resource = self._common._perform_request( + url, auth_token, req_type="post", payload=resource_payload) - new_resource = self._common._perform_request( - resource_url, auth_token, - req_type="post", payload=resource_payload) - new_metric = json.loads(new_resource.text)['metrics'] + resource_id = json.loads(new_resource.text)['id'] + except Exception as exc: + # Append new metric to existing resource + log.debug("This resource already exists:%s, appending metric.", + exc) + base_url = "{}/v1/resource/generic/%s/metric" + res_url = base_url.format(endpoint) % resource_id + payload = {metric_name: {'archive_policy_name': 'high', + 'unit': values['metric_unit']}} + self._common._perform_request( + res_url, auth_token, req_type="post", + payload=json.dumps(payload)) + + metric_id = self.get_metric_id( + endpoint, auth_token, metric_name, resource_id) + return metric_id, resource_id, True - return new_metric[metric_name] else: - return metric_id + log.debug("This metric already exists for this resource.") + + return metric_id, resource_id, False def delete_metric(self, endpoint, auth_token, metric_id): """Delete metric.""" url = "{}/v1/metric/%s".format(endpoint) % (metric_id) - self._common._perform_request(url, auth_token, req_type="delete") - return None + try: + result = self._common._perform_request( + url, auth_token, req_type="delete") + if str(result.status_code) == "404": + log.warn("Failed to delete the metric.") + return False + else: + return True + except Exception as exc: + log.warn("Failed to carry out delete metric request:%s", exc) + return False - def list_metrics(self, endpoint, auth_token): + def list_metrics(self, endpoint, auth_token, values): """List all metrics.""" url = "{}/v1/metric/".format(endpoint) - metric_list = self._common._perform_request( - url, auth_token, req_type="get") - return json.loads(metric_list.text) + try: + # Check if the metric_name was specified for the list + metric_name = values['metric_name'] + result = self._common._perform_request( + url, auth_token, req_type="get") + metric_list = json.loads(result.text) + + # Format the list response + metrics = self.response_list( + metric_list, metric_name=metric_name) + return metrics + except KeyError: + log.debug("Metric name is not specified for this list.") + + try: + # Check if a resource_id was specified + resource_id = values['resource_uuid'] + result = self._common._perform_request( + url, auth_token, req_type="get") + metric_list = json.loads(result.text) + # Format the list response + metrics = self.response_list( + metric_list, resource=resource_id) + return metrics + except KeyError: + log.debug("Resource id not specificed either, will return a\ + complete list.") + try: + result = self._common._perform_request( + url, auth_token, req_type="get") + metric_list = json.loads(result.text) + # Format the list response + metrics = self.response_list(metric_list) + return metrics + + except Exception as exc: + log.warn("Failed to generate any metric list. %s", exc) + return None + + def get_metric_id(self, endpoint, auth_token, metric_name, resource_id): + """Check if the desired metric already exists for the resource.""" + url = "{}/v1/resource/generic/%s".format(endpoint) % resource_id + + try: + # Try return the metric id if it exists + result = self._common._perform_request( + url, auth_token, req_type="get") + return json.loads(result.text)['metrics'][metric_name] + except Exception: + log.debug("Metric doesn't exist. No metric_id available") + return None + + def get_metric_name(self, values): + """Check metric name configuration and normalize.""" + try: + # Normalize metric name + metric_name = values['metric_name'] + return metric_name, METRIC_MAPPINGS[metric_name] + except KeyError: + log.warn("Metric name %s is invalid.", metric_name) + return metric_name, None + + def read_metric_data(self, endpoint, auth_token, values): + """Collectd metric measures over a specified time period.""" + timestamps = [] + data = [] + try: + # Try and collect measures + metric_id = values['metric_uuid'] + collection_unit = values['collection_unit'].upper() + collection_period = values['collection_period'] + + # Define the start and end time based on configurations + stop_time = time.strftime("%Y-%m-%d") + "T" + time.strftime("%X") + end_time = int(round(time.time() * 1000)) + if collection_unit == 'YEAR': + diff = PERIOD_MS[collection_unit] + else: + diff = collection_period * PERIOD_MS[collection_unit] + s_time = (end_time - diff)/1000.0 + start_time = datetime.datetime.fromtimestamp(s_time).strftime( + '%Y-%m-%dT%H:%M:%S.%f') + base_url = "{}/v1/metric/%(0)s/measures?start=%(1)s&stop=%(2)s" + url = base_url.format(endpoint) % { + "0": metric_id, "1": start_time, "2": stop_time} + + # Perform metric data request + metric_data = self._common._perform_request( + url, auth_token, req_type="get") + + # Generate a list of the requested timestamps and data + for r in json.loads(metric_data.text): + timestamp = r[0].replace("T", " ") + timestamps.append(timestamp) + data.append(r[2]) + + return timestamps, data + except Exception as exc: + log.warn("Failed to gather specified measures: %s", exc) + return timestamps, data + + def authenticate(self, values): + """Generate an authentication token and endpoint for metric request.""" + try: + # Check for a tenant_id + auth_token = self._common._authenticate( + tenant_id=values['tenant_uuid']) + endpoint = self._common.get_endpoint("metric") + except KeyError: + log.warn("Tenant ID is not specified. Will use a generic\ + authentication.") + auth_token = self._common._authenticate() + endpoint = self._common.get_endpoint("metric") + + return auth_token, endpoint + + def response_list(self, metric_list, metric_name=None, resource=None): + """Create the appropriate lists for a list response.""" + resp_list = [] + + for row in metric_list: + if metric_name is not None: + if row['name'] == metric_name: + metric = {"metric_name": row['name'], + "metric_uuid": row['id'], + "metric_unit": row['unit'], + "resource_uuid": row['resource_id']} + resp_list.append(metric) + elif resource is not None: + if row['resource_id'] == resource: + metric = {"metric_name": row['name'], + "metric_uuid": row['id'], + "metric_unit": row['unit'], + "resource_uuid": row['resource_id']} + resp_list.append(metric) + else: + metric = {"metric_name": row['name'], + "metric_uuid": row['id'], + "metric_unit": row['unit'], + "resource_uuid": row['resource_id']} + resp_list.append(metric) + return resp_list diff --git a/plugins/OpenStack/Gnocchi/plugin_instance.py b/plugins/OpenStack/Gnocchi/plugin_instance.py index 6f9e306..40dc251 100644 --- a/plugins/OpenStack/Gnocchi/plugin_instance.py +++ b/plugins/OpenStack/Gnocchi/plugin_instance.py @@ -1,10 +1,38 @@ +# Copyright 2017 Intel Research and Development Ireland Limited +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Intel Corporation + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: helena.mcgough@intel.com or adrian.hoban@intel.com +## """Gnocchi plugin for the OSM monitoring module.""" import logging as log +import sys + +path = "/root/MON" +if path not in sys.path: + sys.path.append(path) from plugins.OpenStack.Gnocchi.metrics import Metrics from plugins.OpenStack.settings import Config +__author__ = "Helena McGough" + def register_plugin(): """Register the plugin.""" diff --git a/plugins/OpenStack/common.py b/plugins/OpenStack/common.py index 68ce4e6..25cca45 100644 --- a/plugins/OpenStack/common.py +++ b/plugins/OpenStack/common.py @@ -1,4 +1,25 @@ -"""Common methods for the Aodh Sender/Receiver.""" +# Copyright 2017 Intel Research and Development Ireland Limited +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Intel Corporation + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: helena.mcgough@intel.com or adrian.hoban@intel.com +## +"""Common methods for the OpenStack plugins.""" import logging as log @@ -8,8 +29,7 @@ from plugins.OpenStack.settings import Config import requests -# from keystoneauth1.identity.v3 import AuthMethod -# from keystoneclient.service_catalog import ServiceCatalog +__author__ = "Helena McGough" class Common(object): @@ -35,8 +55,8 @@ class Common(object): self._auth_token = self._ks.auth_token except Exception as exc: - log.warn("Authentication failed with the following exception: %s", - exc) + log.warn("Authentication failed: %s", exc) + self._auth_token = None return self._auth_token @@ -49,7 +69,7 @@ class Common(object): endpoint_type='internalURL', region_name='RegionOne') except Exception as exc: - log.warning("Failed to retreive endpoint for Aodh due to: %s", + log.warning("Failed to retreive endpoint for service due to: %s", exc) return None diff --git a/plugins/OpenStack/response.py b/plugins/OpenStack/response.py new file mode 100644 index 0000000..e59c7ca --- /dev/null +++ b/plugins/OpenStack/response.py @@ -0,0 +1,168 @@ +# Copyright 2017 Intel Research and Development Ireland Limited +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Intel Corporation + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: helena.mcgough@intel.com or adrian.hoban@intel.com +## +"""Generate valid responses to send back to the SO.""" + +import json +import logging as log + +__author__ = "Helena McGough" + +schema_version = "1.0" + + +class OpenStack_Response(object): + """Generates responses for SO from OpenStaack plugins.""" + + def __init__(self): + """Initialize OpenStack Response instance.""" + + def generate_response(self, key, **kwargs): + """Make call to appropriate response function.""" + if key == "list_alarm_response": + message = self.alarm_list_response(**kwargs) + elif key == "create_alarm_response": + message = self.create_alarm_response(**kwargs) + elif key == "delete_alarm_response": + message = self.delete_alarm_response(**kwargs) + elif key == "update_alarm_response": + message = self.update_alarm_response(**kwargs) + elif key == "create_metric_response": + message = self.metric_create_response(**kwargs) + elif key == "read_metric_data_response": + message = self.read_metric_data_response(**kwargs) + elif key == "delete_metric_response": + message = self.delete_metric_response(**kwargs) + elif key == "update_metric_response": + message = self.update_metric_response(**kwargs) + elif key == "list_metric_response": + message = self.list_metric_response(**kwargs) + elif key == "notify_alarm": + message = self.notify_alarm(**kwargs) + else: + log.warn("Failed to generate a valid response message.") + + return message + + def alarm_list_response(self, **kwargs): + """Generate the response for an alarm list request.""" + alarm_list_resp = {"schema_version": schema_version, + "schema_type": "list_alarm_response", + "correlation_id": kwargs['cor_id'], + "list_alarm_resp": kwargs['alarm_list']} + return json.dumps(alarm_list_resp) + + def create_alarm_response(self, **kwargs): + """Generate a response for a create alarm request.""" + create_alarm_resp = {"schema_version": schema_version, + "schema_type": "create_alarm_response", + "alarm_create_response": { + "correlation_id": kwargs['cor_id'], + "alarm_uuid": kwargs['alarm_id'], + "status": kwargs['status']}} + return json.dumps(create_alarm_resp) + + def delete_alarm_response(self, **kwargs): + """Generate a response for a delete alarm request.""" + delete_alarm_resp = {"schema_version": schema_version, + "schema_type": "alarm_deletion_response", + "alarm_deletion_response": { + "correlation_id": kwargs['cor_id'], + "alarm_uuid": kwargs['alarm_id'], + "status": kwargs['status']}} + return json.dumps(delete_alarm_resp) + + def update_alarm_response(self, **kwargs): + """Generate a response for an update alarm request.""" + update_alarm_resp = {"schema_version": schema_version, + "schema_type": "update_alarm_response", + "alarm_update_response": { + "correlation_id": kwargs['cor_id'], + "alarm_uuid": kwargs['alarm_id'], + "status": kwargs['status']}} + return json.dumps(update_alarm_resp) + + def metric_create_response(self, **kwargs): + """Generate a response for a create metric request.""" + create_metric_resp = {"schema_version": schema_version, + "schema_type": "create_metric_response", + "correlation_id": kwargs['cor_id'], + "metric_create_response": { + "metric_uuid": kwargs['metric_id'], + "resource_uuid": kwargs['r_id'], + "status": kwargs['status']}} + return json.dumps(create_metric_resp) + + def read_metric_data_response(self, **kwargs): + """Generate a response for a read metric data request.""" + read_metric_data_resp = {"schema_version": schema_version, + "schema_type": "read_metric_data_response", + "metric_name": kwargs['m_name'], + "metric_uuid": kwargs['m_id'], + "resource_uuid": kwargs['r_id'], + "correlation_id": kwargs['cor_id'], + "metrics_data": { + "time_series": kwargs['times'], + "metrics_series": kwargs['metrics']}} + return json.dumps(read_metric_data_resp) + + def delete_metric_response(self, **kwargs): + """Generate a response for a delete metric request.""" + delete_metric_resp = {"schema_version": schema_version, + "schema_type": "delete_metric_response", + "metric_name": kwargs['m_name'], + "metric_uuid": kwargs['m_id'], + "resource_uuid": kwargs['r_id'], + "correlation_id": kwargs['cor_id'], + "status": kwargs['status']} + return json.dumps(delete_metric_resp) + + def update_metric_response(self, **kwargs): + """Generate a repsonse for an update metric request.""" + update_metric_resp = {"schema_version": schema_version, + "schema_type": "update_metric_response", + "correlation_id": kwargs['cor_id'], + "metric_update_response": { + "metric_uuid": kwargs['m_id'], + "status": kwargs['status'], + "resource_uuid": kwargs['r_id']}} + return json.dumps(update_metric_resp) + + def list_metric_response(self, **kwargs): + """Generate a response for a list metric request.""" + list_metric_resp = {"schema_version": schema_version, + "schema_type": "list_metric_response", + "correlation_id": kwargs['cor_id'], + "metrics_list": kwargs['m_list']} + return json.dumps(list_metric_resp) + + def notify_alarm(self, **kwargs): + """Generate a response to send alarm notifications.""" + notify_alarm_resp = {"schema_version": schema_version, + "schema_type": "notify_alarm", + "notify_details": { + "alarm_uuid": kwargs['a_id'], + "resource_uuid": kwargs['r_id'], + "vim_type": kwargs['vim_type'], + "severity": kwargs['sev'], + "status": kwargs['state'], + "start_date": kwargs['date']}} + return json.dumps(notify_alarm_resp) diff --git a/plugins/OpenStack/settings.py b/plugins/OpenStack/settings.py index 45620d9..e7b06e2 100644 --- a/plugins/OpenStack/settings.py +++ b/plugins/OpenStack/settings.py @@ -1,4 +1,25 @@ -"""Configurations for the Aodh plugin.""" +# Copyright 2017 Intel Research and Development Ireland Limited +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Intel Corporation + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: helena.mcgough@intel.com or adrian.hoban@intel.com +## +"""Configurations for the OpenStack plugins.""" from __future__ import unicode_literals @@ -11,6 +32,8 @@ from plugins.OpenStack.singleton import Singleton import six +__author__ = "Helena McGough" + class BadConfigError(Exception): """Configuration exception.""" @@ -54,8 +77,6 @@ class Config(object): def read_environ(self, service): """Check the appropriate environment variables and update defaults.""" for key in self._config_keys: - # Default username for a service is it's name - setattr(self, 'OS_USERNAME', service) if (key == "OS_IDENTITY_API_VERSION" or key == "OS_PASSWORD"): val = str(os.environ[key]) setattr(self, key, val) @@ -63,6 +84,7 @@ class Config(object): val = str(os.environ[key]) + "/v3" setattr(self, key, val) else: - # TODO(mcgoughh): Log errors and no config updates required - log.warn("Configuration doesn't require updating") + # Default username for a service is it's name + setattr(self, 'OS_USERNAME', service) + log.info("Configuration complete!") return diff --git a/plugins/OpenStack/singleton.py b/plugins/OpenStack/singleton.py index 12cd5a9..abfe4e2 100644 --- a/plugins/OpenStack/singleton.py +++ b/plugins/OpenStack/singleton.py @@ -1,7 +1,30 @@ +# Copyright 2017 Intel Research and Development Ireland Limited +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Intel Corporation + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: helena.mcgough@intel.com or adrian.hoban@intel.com +## """Simple singleton class.""" from __future__ import unicode_literals +__author__ = "Helena McGough" + class Singleton(object): """Simple singleton class.""" diff --git a/plugins/__init__.py b/plugins/__init__.py new file mode 100644 index 0000000..e69de29