From: Helena McGough Date: Fri, 17 Nov 2017 14:57:08 +0000 (+0000) Subject: Added a Common KafkaConsumer for all of the plugins X-Git-Tag: v4.0.0~59 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=refs%2Fchanges%2F01%2F5701%2F1;p=osm%2FMON.git Added a Common KafkaConsumer for all of the plugins - Added the common consumer for all request messages - Updated the OpenStack plugins to use the common consumer - Included the access_credentials topic into this consumer - Added the json schemas for the openstack, vrops and cloudwatch access credentials - Updated the Openstack plugins to use access_credentials messages - Added and updated all necessary tests - Updated installation documentation Change-Id: I1486cf7230e351e5dbf18464110e5780d392eeeb Signed-off-by: Helena McGough --- diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2ed04bf --- /dev/null +++ b/.gitignore @@ -0,0 +1,76 @@ +# Copyright 2017 Intel Research and Development Ireland Limited +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Intel Corporation + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: helena.mcgough@intel.com or adrian.hoban@intel.com +## +*.py[cod] + +# C extensions +*.so + +# log files +*.log + +# Packages +*.egg +*.egg-info +dist +build +.eggs +eggs +parts +bin +var +sdist +develop-eggs +.installed.cfg +lib +lib64 + +# Installer logs +pip-log.txt + +# Unit test / coverage reports +.coverage +.tox +nosetests.xml +.testrepository +.venv +.cache + +# Translations +*.mo + +# Complexity +output/*.html +output/*/index.html + +# Sphinx +doc/build + +# pbr generates these +AUTHORS +ChangeLog + +# Editors +*~ +.*.swp +.*sw? +.settings/ +.__pycache__/ diff --git a/core/message_bus/common_consumer.py b/core/message_bus/common_consumer.py new file mode 100644 index 0000000..9f261b7 --- /dev/null +++ b/core/message_bus/common_consumer.py @@ -0,0 +1,129 @@ +# Copyright 2017 Intel Research and Development Ireland Limited +# ************************************************************* +# This file is part of OSM Monitoring module +# All Rights Reserved to Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you +# may not use this file except in compliance with the License. You may +# obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the License for the specific language governing +# permissions and limitations under the License. +# +# For those usages not covered by the Apache License, Version 2.0 please +# contact: helena.mcgough@intel.com or adrian.hoban@intel.com +"""A common KafkaConsumer for all MON plugins.""" + +import json +import logging +import sys + +sys.path.append("/root/MON") + +logging.basicConfig(filename='MON_plugins.log', + format='%(asctime)s %(message)s', + datefmt='%m/%d/%Y %I:%M:%S %p', filemode='a', + level=logging.INFO) +log = logging.getLogger(__name__) + +from kafka import KafkaConsumer +from kafka.errors import KafkaError + +from plugins.OpenStack.Aodh import alarming +from plugins.OpenStack.common import Common +from plugins.OpenStack.Gnocchi import metrics + + +# Initialize servers +server = {'server': 'localhost:9092'} + +# Initialize consumers for alarms and metrics +common_consumer = KafkaConsumer(group_id='osm_mon', + bootstrap_servers=server['server']) + +# Create OpenStack alarming and metric instances +auth_token = None +openstack_auth = Common() +openstack_metrics = metrics.Metrics() +openstack_alarms = alarming.Alarming() + + +def get_vim_type(message): + """Get the vim type that is required by the message.""" + try: + return json.loads(message.value)["vim_type"].lower() + except Exception as exc: + log.warn("vim_type is not configured correctly; %s", exc) + return None + +# Define subscribe the consumer for the plugins +topics = ['metric_request', 'alarm_request', 'access_credentials'] +common_consumer.subscribe(topics) + +try: + log.info("Listening for alarm_request and metric_request messages") + for message in common_consumer: + # Check the message topic + if message.topic == "metric_request": + # Check the vim desired by the message + vim_type = get_vim_type(message) + if vim_type == "openstack": + log.info("This message is for the OpenStack plugin.") + openstack_metrics.metric_calls( + message, openstack_auth, auth_token) + + elif vim_type == "cloudwatch": + log.info("This message is for the CloudWatch plugin.") + + elif vim_type == "vrops": + log.info("This message is for the vROPs plugin.") + + else: + log.debug("vim_type is misconfigured or unsupported; %s", + vim_type) + + elif message.topic == "alarm_request": + # Check the vim desired by the message + vim_type = get_vim_type(message) + if vim_type == "openstack": + log.info("This message is for the OpenStack plugin.") + openstack_alarms.alarming(message, openstack_auth, auth_token) + + elif vim_type == "cloudwatch": + log.info("This message is for the CloudWatch plugin.") + + elif vim_type == "vrops": + log.info("This message is for the vROPs plugin.") + + else: + log.debug("vim_type is misconfigured or unsupported; %s", + vim_type) + + elif message.topic == "access_credentials": + # Check the vim desired by the message + vim_type = get_vim_type(message) + if vim_type == "openstack": + log.info("This message is for the OpenStack plugin.") + auth_token = openstack_auth._authenticate(message=message) + + elif vim_type == "cloudwatch": + log.info("This message is for the CloudWatch plugin.") + + elif vim_type == "vrops": + log.info("This message is for the vROPs plugin.") + + else: + log.debug("vim_type is misconfigured or unsupported; %s", + vim_type) + + else: + log.info("This topic is not relevant to any of the MON plugins.") + + +except KafkaError as exc: + log.warn("Exception: %s", exc) diff --git a/core/message_bus/consumer.py b/core/message_bus/consumer.py index 3995084..c9021d2 100644 --- a/core/message_bus/consumer.py +++ b/core/message_bus/consumer.py @@ -27,7 +27,7 @@ alarms and metrics responses. ''' __author__ = "Prithiv Mohan" -__date__ = "06/Sep/2017" +__date__ = "06/Sep/2017" from kafka import KafkaConsumer @@ -37,6 +37,7 @@ import logging import logging.config import os + def logging_handler(filename, mode='a+', encoding=None): if not os.path.exists(filename): open(filename, 'a').close() @@ -50,9 +51,9 @@ log_config = { }, }, 'handlers': { - 'file':{ + 'file': { '()': logging_handler, - 'level':'DEBUG', + 'level': 'DEBUG', 'formatter': 'default', 'filename': '/var/log/osm_mon.log', 'mode': 'a+', @@ -78,8 +79,10 @@ if "BROKER_URI" in os.environ: else: broker = "localhost:9092" -alarm_consumer = KafkaConsumer('alarm_response', 'osm_mon', bootstrap_servers = broker) -metric_consumer = KafkaConsumer('metric_response', 'osm_mon', bootstrap_servers = broker) +alarm_consumer = KafkaConsumer( + 'alarm_response', 'osm_mon', bootstrap_servers=broker) +metric_consumer = KafkaConsumer( + 'metric_response', 'osm_mon', bootstrap_servers=broker) try: for message in alarm_consumer: logger.debug(message) diff --git a/core/message_bus/producer.py b/core/message_bus/producer.py index 1b0de7a..4aee831 100644 --- a/core/message_bus/producer.py +++ b/core/message_bus/producer.py @@ -19,16 +19,11 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com ## - ''' This is a kafka producer app that interacts with the SO and the plugins of the datacenters like OpenStack, VMWare, AWS. ''' -__author__ = "Prithiv Mohan" -__date__ = "06/Sep/2017" - - from kafka import KafkaProducer as kaf from kafka.errors import KafkaError import logging @@ -38,6 +33,9 @@ import os from os import listdir from jsmin import jsmin +__author__ = "Prithiv Mohan" +__date__ = "06/Sep/2017" + json_path = os.path.join(os.pardir+"/models/") @@ -45,7 +43,7 @@ class KafkaProducer(object): def __init__(self, topic): - self._topic= topic + self._topic = topic if "BROKER_URI" in os.environ: broker = os.getenv("BROKER_URI") @@ -58,10 +56,10 @@ class KafkaProducer(object): is already running. ''' - self.producer = kaf(key_serializer=str.encode, - value_serializer=lambda v: json.dumps(v).encode('ascii'), - bootstrap_servers=broker, api_version=(0,10)) - + self.producer = kaf( + key_serializer=str.encode, + value_serializer=lambda v: json.dumps(v).encode('ascii'), + bootstrap_servers=broker, api_version=(0, 10)) def publish(self, key, value, topic=None): try: @@ -80,237 +78,224 @@ class KafkaProducer(object): def create_alarm_request(self, key, message, topic): - #External to MON + # External to MON - payload_create_alarm = jsmin(open(os.path.join(json_path, - 'create_alarm.json')).read()) + payload_create_alarm = jsmin( + open(os.path.join(json_path, 'create_alarm.json')).read()) self.publish(key, - value=json.dumps(payload_create_alarm), - topic='alarm_request') + value=json.dumps(payload_create_alarm), + topic='alarm_request') def create_alarm_response(self, key, message, topic): - #Internal to MON + # Internal to MON - payload_create_alarm_resp = jsmin(open(os.path.join(json_path, - 'create_alarm_resp.json')).read()) + payload_create_alarm_resp = jsmin( + open(os.path.join(json_path, 'create_alarm_resp.json')).read()) self.publish(key, - value = message, - topic = 'alarm_response') + value=message, + topic='alarm_response') def acknowledge_alarm(self, key, message, topic): - #Internal to MON + # Internal to MON - payload_acknowledge_alarm = jsmin(open(os.path.join(json_path, - 'acknowledge_alarm.json')).read()) + payload_acknowledge_alarm = jsmin( + open(os.path.join(json_path, 'acknowledge_alarm.json')).read()) self.publish(key, - value = json.dumps(payload_acknowledge_alarm), - topic = 'alarm_request') + value=json.dumps(payload_acknowledge_alarm), + topic='alarm_request') def list_alarm_request(self, key, message, topic): - #External to MON + # External to MON - payload_alarm_list_req = jsmin(open(os.path.join(json_path, - 'list_alarm_req.json')).read()) + payload_alarm_list_req = jsmin( + open(os.path.join(json_path, 'list_alarm_req.json')).read()) self.publish(key, - value=json.dumps(payload_alarm_list_req), - topic='alarm_request') + value=json.dumps(payload_alarm_list_req), + topic='alarm_request') def notify_alarm(self, key, message, topic): - payload_notify_alarm = jsmin(open(os.path.join(json_path, - 'notify_alarm.json')).read()) + payload_notify_alarm = jsmin( + open(os.path.join(json_path, 'notify_alarm.json')).read()) self.publish(key, - value=message, - topic='alarm_response') + value=message, + topic='alarm_response') def list_alarm_response(self, key, message, topic): - payload_list_alarm_resp = jsmin(open(os.path.join(json_path, - 'list_alarm_resp.json')).read()) + payload_list_alarm_resp = jsmin( + open(os.path.join(json_path, 'list_alarm_resp.json')).read()) self.publish(key, - value=message, - topic='alarm_response') - + value=message, + topic='alarm_response') def update_alarm_request(self, key, message, topic): - # External to Mon + # External to Mon - payload_update_alarm_req = jsmin(open(os.path.join(json_path, - 'update_alarm_req.json')).read()) + payload_update_alarm_req = jsmin( + open(os.path.join(json_path, 'update_alarm_req.json')).read()) self.publish(key, - value=json.dumps(payload_update_alarm_req), - topic='alarm_request') - + value=json.dumps(payload_update_alarm_req), + topic='alarm_request') def update_alarm_response(self, key, message, topic): - # Internal to Mon + # Internal to Mon - payload_update_alarm_resp = jsmin(open(os.path.join(json_path, - 'update_alarm_resp.json')).read()) + payload_update_alarm_resp = jsmin( + open(os.path.join(json_path, 'update_alarm_resp.json')).read()) self.publish(key, - value=message, - topic='alarm_response') - + value=message, + topic='alarm_response') def delete_alarm_request(self, key, message, topic): - # External to Mon + # External to Mon - payload_delete_alarm_req = jsmin(open(os.path.join(json_path, - 'delete_alarm_req.json')).read()) + payload_delete_alarm_req = jsmin( + open(os.path.join(json_path, 'delete_alarm_req.json')).read()) self.publish(key, - value=json.dumps(payload_delete_alarm_req), - topic='alarm_request') + value=json.dumps(payload_delete_alarm_req), + topic='alarm_request') def delete_alarm_response(self, key, message, topic): - # Internal to Mon + # Internal to Mon - payload_delete_alarm_resp = jsmin(open(os.path.join(json_path, - 'delete_alarm_resp.json')).read()) + payload_delete_alarm_resp = jsmin( + open(os.path.join(json_path, 'delete_alarm_resp.json')).read()) self.publish(key, - value=message, - topic='alarm_response') - - + value=message, + topic='alarm_response') def create_metrics_request(self, key, message, topic): # External to Mon - payload_create_metrics_req = jsmin(open(os.path.join(json_path, - 'create_metric_req.json')).read()) + payload_create_metrics_req = jsmin( + open(os.path.join(json_path, 'create_metric_req.json')).read()) self.publish(key, - value=json.dumps(payload_create_metrics_req), - topic='metric_request') - + value=json.dumps(payload_create_metrics_req), + topic='metric_request') def create_metrics_resp(self, key, message, topic): # Internal to Mon - payload_create_metrics_resp = jsmin(open(os.path.join(json_path, - 'create_metric_resp.json')).read()) + payload_create_metrics_resp = jsmin( + open(os.path.join(json_path, 'create_metric_resp.json')).read()) self.publish(key, - value=message, - topic='metric_response') - + value=message, + topic='metric_response') def read_metric_data_request(self, key, message, topic): # External to Mon - payload_read_metric_data_request = jsmin(open(os.path.join(json_path, - 'read_metric_data_req.json')).read()) + payload_read_metric_data_request = jsmin( + open(os.path.join(json_path, 'read_metric_data_req.json')).read()) self.publish(key, - value=json.dumps(payload_read_metric_data_request), - topic='metric_request') - + value=json.dumps(payload_read_metric_data_request), + topic='metric_request') def read_metric_data_response(self, key, message, topic): # Internal to Mon - payload_metric_data_response = jsmin(open(os.path.join(json_path, - 'read_metric_data_resp.json')).read()) + payload_metric_data_response = jsmin( + open(os.path.join(json_path, 'read_metric_data_resp.json')).read()) self.publish(key, - value=message, - topic='metric_response') - + value=message, + topic='metric_response') def list_metric_request(self, key, message, topic): - #External to MON + # External to MON - payload_metric_list_req = jsmin(open(os.path.join(json_path, - 'list_metric_req.json')).read()) + payload_metric_list_req = jsmin( + open(os.path.join(json_path, 'list_metric_req.json')).read()) self.publish(key, - value=json.dumps(payload_metric_list_req), - topic='metric_request') + value=json.dumps(payload_metric_list_req), + topic='metric_request') def list_metric_response(self, key, message, topic): - #Internal to MON + # Internal to MON - payload_metric_list_resp = jsmin(open(os.path.join(json_path, - 'list_metrics_resp.json')).read()) + payload_metric_list_resp = jsmin( + open(os.path.join(json_path, 'list_metrics_resp.json')).read()) self.publish(key, - value=message, - topic='metric_response') - + value=message, + topic='metric_response') def delete_metric_request(self, key, message, topic): - # External to Mon + # External to Mon - payload_delete_metric_req = jsmin(open(os.path.join(json_path, - 'delete_metric_req.json')).read()) + payload_delete_metric_req = jsmin( + open(os.path.join(json_path, 'delete_metric_req.json')).read()) self.publish(key, - value=json.dumps(payload_delete_metric_req), - topic='metric_request') - + value=json.dumps(payload_delete_metric_req), + topic='metric_request') def delete_metric_response(self, key, message, topic): - # Internal to Mon + # Internal to Mon - payload_delete_metric_resp = jsmin(open(os.path.join(json_path, - 'delete_metric_resp.json')).read()) + payload_delete_metric_resp = jsmin( + open(os.path.join(json_path, 'delete_metric_resp.json')).read()) self.publish(key, - value=message, - topic='metric_response') - + value=message, + topic='metric_response') def update_metric_request(self, key, message, topic): # External to Mon - payload_update_metric_req = jsmin(open(os.path.join(json_path, - 'update_metric_req.json')).read()) + payload_update_metric_req = jsmin( + open(os.path.join(json_path, 'update_metric_req.json')).read()) self.publish(key, - value=json.dumps(payload_update_metric_req), - topic='metric_request') - + value=json.dumps(payload_update_metric_req), + topic='metric_request') def update_metric_response(self, key, message, topic): # Internal to Mon - payload_update_metric_resp = jsmin(open(os.path.join(json_path, - 'update_metric_resp.json')).read()) + payload_update_metric_resp = jsmin( + open(os.path.join(json_path, 'update_metric_resp.json')).read()) self.publish(key, - value=message, - topic='metric_response') + value=message, + topic='metric_response') def access_credentials(self, key, message, topic): - payload_access_credentials = jsmin(open(os.path.join(json_path, - 'access_credentials.json')).read()) + payload_access_credentials = jsmin( + open(os.path.join(json_path, 'access_credentials.json')).read()) self.publish(key, - value=json.dumps(payload_access_credentials), - topic='access_credentials') + value=json.dumps(payload_access_credentials), + topic='access_credentials') diff --git a/core/models/access_cred_CloudWatch.json b/core/models/access_cred_CloudWatch.json new file mode 100644 index 0000000..b5d4a81 --- /dev/null +++ b/core/models/access_cred_CloudWatch.json @@ -0,0 +1,41 @@ +/* Copyright© 2017 Intel Research and Development Ireland Limited + # This file is part of OSM Monitoring module + # All Rights Reserved to Intel Corporation + + # Licensed under the Apache License, Version 2.0 (the "License"); you may + # not use this file except in compliance with the License. You may obtain + # a copy of the License at + + # http://www.apache.org/licenses/LICENSE-2.0 + + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + # License for the specific language governing permissions and limitations + # under the License. + + # For those usages not covered by the Apache License, Version 2.0 please + # contact: helena.mcgough@intel.com or adrian.hoban@intel.com + + # This is the message bus schema for CloudWatch access credentials */ + + +{ + "schema_version": { "type": "string" }, + "schema_type": { "type": "string" }, + "vim_type": { "type": "string" }, + "access_config": + { + "aws_site": { "type": "string" }, + "user": { "type": "string" }, + "password": { "type": "string" }, + "vim_tenant_name": { "type": "string" } + }, + "required": [ "schema_version", + "schema_type", + "vim_type", + "aws_site", + "user", + "password", + "vim_tenant_name" ] +} diff --git a/core/models/access_cred_OpenStack.json b/core/models/access_cred_OpenStack.json new file mode 100644 index 0000000..7d1fbbe --- /dev/null +++ b/core/models/access_cred_OpenStack.json @@ -0,0 +1,41 @@ +/* Copyright© 2017 Intel Research and Development Ireland Limited + # This file is part of OSM Monitoring module + # All Rights Reserved to Intel Corporation + + # Licensed under the Apache License, Version 2.0 (the "License"); you may + # not use this file except in compliance with the License. You may obtain + # a copy of the License at + + # http://www.apache.org/licenses/LICENSE-2.0 + + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + # License for the specific language governing permissions and limitations + # under the License. + + # For those usages not covered by the Apache License, Version 2.0 please + # contact: helena.mcgough@intel.com or adrian.hoban@intel.com + + # This is the message bus schema for OpenStack access credentials */ + + +{ + "schema_version": { "type": "string" }, + "schema_type": { "type": "string" }, + "vim_type": { "type": "string" }, + "access_config": + { + "openstack_site": { "type" : "string" }, + "user": { "type": "string" }, + "password": { "type": "string" }, + "vim_tenant_name": { "type": "string" } + }, + "required": [ "schema_version", + "schema_type", + "vim_type", + "openstack_site", + "user", + "password", + "vim_tenant_name" ] +} diff --git a/core/models/access_cred_vROPs.json b/core/models/access_cred_vROPs.json new file mode 100644 index 0000000..fc9c8a9 --- /dev/null +++ b/core/models/access_cred_vROPs.json @@ -0,0 +1,60 @@ +/* Copyright© 2017 Intel Research and Development Ireland Limited + # This file is part of OSM Monitoring module + # All Rights Reserved to Intel Corporation + + # Licensed under the Apache License, Version 2.0 (the "License"); you may + # not use this file except in compliance with the License. You may obtain + # a copy of the License at + + # http://www.apache.org/licenses/LICENSE-2.0 + + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + # License for the specific language governing permissions and limitations + # under the License. + + # For those usages not covered by the Apache License, Version 2.0 please + # contact: helena.mcgough@intel.com or adrian.hoban@intel.com + + # This is the message bus schema for vROPs access credentials */ + + +{ + "schema_version": { "type": "string" }, + "schema_type": { "type": "string" }, + "vim_type": { "type": "string" }, + "access_config": + { + "vrops_site": { "type": "string" }, + "vrops_user": { "type": "string" }, + "vrops_password": { "type": "string" }, + "vcloud_site": { "type": "string" }, + "admin_username": { "type": "string" }, + "admin_password": { "type": "string" }, + "nsx_manager": { "type": "string" }, + "nsx_user": { "type": "string" }, + "nsx_password": { "type": "string" }, + "vcenter_ip": { "type": "string" }, + "vcenter_port": { "type": "string" }, + "vcenter_user": { "type": "string" }, + "vcenter_password": { "type": "string" }, + "vim_tenant_name": { "type": "string" }, + "orgname": { "type": "string" } + }, + "required": [ "schema_version", + "schema_type", + "vim_type", + "vrops_site", + "vrops_user", + "vrops_password", + "vcloud_site", + "admin_username", + "admin_password", + "vcenter_ip", + "vcenter_port", + "vcenter_user", + "vcenter_password", + "vim_tenant_name", + "orgname" ] +} diff --git a/doc/MON_install_guide.rst b/doc/MON_install_guide.rst index 586fe2a..57b79d3 100644 --- a/doc/MON_install_guide.rst +++ b/doc/MON_install_guide.rst @@ -84,17 +84,12 @@ please refer to the following documentation: These documents will also describe what alarming and monitoring functionality the plugins support. -* To run the Gnocchi plugin run the following command: +* The Gnocchi and Aodh plugins work from a common KafkaConsumer that checks for + the appropriate topics and keys. To run this consumer: :: - lxc exec MON - python /root/MON/plugins/OpenStack/Gnocchi/plugin_instance.py - -* To run the Aodh plugin run the following command: - - :: - - lxc exec MON - python /root/MON/plugins/OpenStack/Aodh/plugin_instance.py + lxc exec MON - python /root/MON/core/message_bus/common_consumer.py CloudWatch ~~~~~~~~~~ diff --git a/plugins/OpenStack/Aodh/__init__.py b/plugins/OpenStack/Aodh/__init__.py index 32eb94e..d854d63 100644 --- a/plugins/OpenStack/Aodh/__init__.py +++ b/plugins/OpenStack/Aodh/__init__.py @@ -19,3 +19,4 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: helena.mcgough@intel.com or adrian.hoban@intel.com ## +"""Aodh plugin for OSM MON.""" diff --git a/plugins/OpenStack/Aodh/alarming.py b/plugins/OpenStack/Aodh/alarming.py index d409d71..2343372 100644 --- a/plugins/OpenStack/Aodh/alarming.py +++ b/plugins/OpenStack/Aodh/alarming.py @@ -22,18 +22,18 @@ """Carry out alarming requests via Aodh API.""" import json + import logging -log = logging.getLogger(__name__) from core.message_bus.producer import KafkaProducer -from kafka import KafkaConsumer - -from plugins.OpenStack.common import Common from plugins.OpenStack.response import OpenStack_Response +from plugins.OpenStack.settings import Config __author__ = "Helena McGough" +log = logging.getLogger(__name__) + ALARM_NAMES = { "average_memory_usage_above_threshold": "average_memory_utilization", "disk_read_ops": "disk_read_ops", @@ -65,14 +65,14 @@ class Alarming(object): def __init__(self): """Create the OpenStack alarming instance.""" - self._common = Common() + # Initialize configuration and notifications + config = Config.instance() + config.read_environ("aodh") - # TODO(mcgoughh): Remove hardcoded kafkaconsumer - # Initialize a generic consumer object to consume message from the SO - server = {'server': 'localhost:9092', 'topic': 'alarm_request'} - self._consumer = KafkaConsumer(server['topic'], - group_id='osm_mon', - bootstrap_servers=server['server']) + # Initialise authentication for API requests + self.auth_token = None + self.endpoint = None + self.common = None # Use the Response class to generate valid json response messages self._response = OpenStack_Response() @@ -80,121 +80,126 @@ class Alarming(object): # Initializer a producer to send responses back to SO self._producer = KafkaProducer("alarm_response") - def alarming(self): + def alarming(self, message, common, auth_token): """Consume info from the message bus to manage alarms.""" - # Check the alarming functionlity that needs to be performed - for message in self._consumer: - - values = json.loads(message.value) - vim_type = values['vim_type'].lower() - - if vim_type == "openstack": - log.info("Alarm action required: %s" % (message.topic)) - - # Generate and auth_token and endpoint for request - auth_token, endpoint = self.authenticate() - - if message.key == "create_alarm_request": - # Configure/Update an alarm - alarm_details = values['alarm_create_request'] - - alarm_id, alarm_status = self.configure_alarm( - endpoint, auth_token, alarm_details) - - # Generate a valid response message, send via producer - try: - if alarm_status is True: - log.info("Alarm successfully created") - - resp_message = self._response.generate_response( - 'create_alarm_response', status=alarm_status, - alarm_id=alarm_id, - cor_id=alarm_details['correlation_id']) - log.info("Response Message: %s", resp_message) - self._producer.create_alarm_response( - 'create_alarm_resonse', resp_message, - 'alarm_response') - except Exception as exc: - log.warn("Response creation failed: %s", exc) - - elif message.key == "list_alarm_request": - # Check for a specifed: alarm_name, resource_uuid, severity - # and generate the appropriate list - list_details = values['alarm_list_request'] - - alarm_list = self.list_alarms( - endpoint, auth_token, list_details) - - try: - # Generate and send a list response back - resp_message = self._response.generate_response( - 'list_alarm_response', alarm_list=alarm_list, - cor_id=list_details['correlation_id']) - log.info("Response Message: %s", resp_message) - self._producer.list_alarm_response( - 'list_alarm_response', resp_message, - 'alarm_response') - except Exception as exc: - log.warn("Failed to send a valid response back.") - - elif message.key == "delete_alarm_request": - request_details = values['alarm_delete_request'] - alarm_id = request_details['alarm_uuid'] - - resp_status = self.delete_alarm( - endpoint, auth_token, alarm_id) - - # Generate and send a response message - try: - resp_message = self._response.generate_response( - 'delete_alarm_response', alarm_id=alarm_id, - status=resp_status, - cor_id=request_details['correlation_id']) - log.info("Response message: %s", resp_message) - self._producer.delete_alarm_response( - 'delete_alarm_response', resp_message, - 'alarm_response') - except Exception as exc: - log.warn("Failed to create delete reponse:%s", exc) - - elif message.key == "acknowledge_alarm": - # Acknowledge that an alarm has been dealt with by the SO - alarm_id = values['ack_details']['alarm_uuid'] - - response = self.update_alarm_state( - endpoint, auth_token, alarm_id) - - # Log if an alarm was reset - if response is True: - log.info("Acknowledged the alarm and cleared it.") - else: - log.warn("Failed to acknowledge/clear the alarm.") - - elif message.key == "update_alarm_request": - # Update alarm configurations - alarm_details = values['alarm_update_request'] - - alarm_id, status = self.update_alarm( - endpoint, auth_token, alarm_details) - - # Generate a response for an update request - try: - resp_message = self._response.generate_response( - 'update_alarm_response', alarm_id=alarm_id, - cor_id=alarm_details['correlation_id'], - status=status) - log.info("Response message: %s", resp_message) - self._producer.update_alarm_response( - 'update_alarm_response', resp_message, - 'alarm_response') - except Exception as exc: - log.warn("Failed to send an update response:%s", exc) + values = json.loads(message.value) + self.common = common - else: - log.debug("Unknown key, no action will be performed") + log.info("OpenStack alarm action required.") + + # Generate and auth_token and endpoint for request + if auth_token is not None: + if self.auth_token != auth_token: + log.info("Auth_token for alarming set by access_credentials.") + self.auth_token = auth_token else: - log.info("Message topic not relevant to this plugin: %s", - message.topic) + log.info("Auth_token has not been updated.") + else: + log.info("Using environment variables to set auth_token for Aodh.") + self.auth_token = self.common._authenticate() + + if self.endpoint is None: + log.info("Generating a new endpoint for Aodh.") + self.endpoint = self.common.get_endpoint("alarming") + + if message.key == "create_alarm_request": + # Configure/Update an alarm + alarm_details = values['alarm_create_request'] + + alarm_id, alarm_status = self.configure_alarm( + self.endpoint, self.auth_token, alarm_details) + + # Generate a valid response message, send via producer + try: + if alarm_status is True: + log.info("Alarm successfully created") + + resp_message = self._response.generate_response( + 'create_alarm_response', status=alarm_status, + alarm_id=alarm_id, + cor_id=alarm_details['correlation_id']) + log.info("Response Message: %s", resp_message) + self._producer.create_alarm_response( + 'create_alarm_resonse', resp_message, + 'alarm_response') + except Exception as exc: + log.warn("Response creation failed: %s", exc) + + elif message.key == "list_alarm_request": + # Check for a specifed: alarm_name, resource_uuid, severity + # and generate the appropriate list + list_details = values['alarm_list_request'] + + alarm_list = self.list_alarms( + self.endpoint, self.auth_token, list_details) + + try: + # Generate and send a list response back + resp_message = self._response.generate_response( + 'list_alarm_response', alarm_list=alarm_list, + cor_id=list_details['correlation_id']) + log.info("Response Message: %s", resp_message) + self._producer.list_alarm_response( + 'list_alarm_response', resp_message, + 'alarm_response') + except Exception as exc: + log.warn("Failed to send a valid response back.") + + elif message.key == "delete_alarm_request": + request_details = values['alarm_delete_request'] + alarm_id = request_details['alarm_uuid'] + + resp_status = self.delete_alarm( + self.endpoint, self.auth_token, alarm_id) + + # Generate and send a response message + try: + resp_message = self._response.generate_response( + 'delete_alarm_response', alarm_id=alarm_id, + status=resp_status, + cor_id=request_details['correlation_id']) + log.info("Response message: %s", resp_message) + self._producer.delete_alarm_response( + 'delete_alarm_response', resp_message, + 'alarm_response') + except Exception as exc: + log.warn("Failed to create delete reponse:%s", exc) + + elif message.key == "acknowledge_alarm": + # Acknowledge that an alarm has been dealt with by the SO + alarm_id = values['ack_details']['alarm_uuid'] + + response = self.update_alarm_state( + self.endpoint, self.auth_token, alarm_id) + + # Log if an alarm was reset + if response is True: + log.info("Acknowledged the alarm and cleared it.") + else: + log.warn("Failed to acknowledge/clear the alarm.") + + elif message.key == "update_alarm_request": + # Update alarm configurations + alarm_details = values['alarm_update_request'] + + alarm_id, status = self.update_alarm( + self.endpoint, self.auth_token, alarm_details) + + # Generate a response for an update request + try: + resp_message = self._response.generate_response( + 'update_alarm_response', alarm_id=alarm_id, + cor_id=alarm_details['correlation_id'], + status=status) + log.info("Response message: %s", resp_message) + self._producer.update_alarm_response( + 'update_alarm_response', resp_message, + 'alarm_response') + except Exception as exc: + log.warn("Failed to send an update response:%s", exc) + + else: + log.debug("Unknown key, no action will be performed") return @@ -222,7 +227,7 @@ class Alarming(object): # Create the alarm if metric is available payload = self.check_payload(values, metric_name, resource_id, alarm_name) - new_alarm = self._common._perform_request( + new_alarm = self.common._perform_request( url, auth_token, req_type="post", payload=payload) return json.loads(new_alarm.text)['alarm_id'], True else: @@ -238,7 +243,7 @@ class Alarming(object): url = "{}/v2/alarms/%s".format(endpoint) % (alarm_id) try: - result = self._common._perform_request( + result = self.common._perform_request( url, auth_token, req_type="delete") if str(result.status_code) == "404": log.info("Alarm doesn't exist: %s", result.status_code) @@ -257,7 +262,12 @@ class Alarming(object): a_list, name_list, sev_list, res_list = [], [], [], [] # TODO(mcgoughh): for now resource_id is a mandatory field - resource = list_details['resource_uuid'] + # Check for a reqource is + try: + resource = list_details['resource_uuid'] + except KeyError as exc: + log.warn("Resource id not specified for list request: %s", exc) + return None # Checking what fields are specified for a list request try: @@ -278,7 +288,7 @@ class Alarming(object): # Perform the request to get the desired list try: - result = self._common._perform_request( + result = self.common._perform_request( url, auth_token, req_type="get") if result is not None: @@ -333,7 +343,7 @@ class Alarming(object): payload = json.dumps("ok") try: - self._common._perform_request( + self.common._perform_request( url, auth_token, req_type="put", payload=payload) return True except Exception as exc: @@ -347,7 +357,7 @@ class Alarming(object): # Gets current configurations about the alarm try: - result = self._common._perform_request( + result = self.common._perform_request( url, auth_token, req_type="get") alarm_name = json.loads(result.text)['name'] rule = json.loads(result.text)['gnocchi_resources_threshold_rule'] @@ -366,7 +376,7 @@ class Alarming(object): # Updates the alarm configurations with the valid payload if payload is not None: try: - update_alarm = self._common._perform_request( + update_alarm = self.common._perform_request( url, auth_token, req_type="put", payload=payload) return json.loads(update_alarm.text)['alarm_id'], True @@ -406,23 +416,12 @@ class Alarming(object): log.warn("Alarm is not configured correctly: %s", exc) return None - def authenticate(self): - """Generate an authentication token and endpoint for alarm request.""" - try: - # Check for a tenant_id - auth_token = self._common._authenticate() - endpoint = self._common.get_endpoint("alarming") - return auth_token, endpoint - except Exception as exc: - log.warn("Authentication to Keystone failed:%s", exc) - return None, None - def get_alarm_state(self, endpoint, auth_token, alarm_id): """Get the state of the alarm.""" url = "{}/v2/alarms/%s/state".format(endpoint) % alarm_id try: - alarm_state = self._common._perform_request( + alarm_state = self.common._perform_request( url, auth_token, req_type="get") return json.loads(alarm_state.text) except Exception as exc: @@ -432,10 +431,10 @@ class Alarming(object): def check_for_metric(self, auth_token, m_name, r_id): """Check for the alarm metric.""" try: - endpoint = self._common.get_endpoint("metric") + endpoint = self.common.get_endpoint("metric") url = "{}/v1/metric/".format(endpoint) - metric_list = self._common._perform_request( + metric_list = self.common._perform_request( url, auth_token, req_type="get") for metric in json.loads(metric_list.text): diff --git a/plugins/OpenStack/Aodh/plugin_instance.py b/plugins/OpenStack/Aodh/plugin_instance.py deleted file mode 100644 index b3a02ac..0000000 --- a/plugins/OpenStack/Aodh/plugin_instance.py +++ /dev/null @@ -1,77 +0,0 @@ -# Copyright 2017 Intel Research and Development Ireland Limited -# ************************************************************* - -# This file is part of OSM Monitoring module -# All Rights Reserved to Intel Corporation - -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -# For those usages not covered by the Apache License, Version 2.0 please -# contact: helena.mcgough@intel.com or adrian.hoban@intel.com -## -"""Aodh plugin for the OSM monitoring module.""" - -import logging -import sys - -sys.path.append("MON/") - -logging.basicConfig(filename='aodh_MON.log', format='%(asctime)s %(message)s', - datefmt='%m/%d/%Y %I:%M:%S %p', filemode='a', - level=logging.INFO) -log = logging.getLogger(__name__) - - -try: - import aodhclient -except ImportError: - log.warn("Failed to import the aodhclient") - -from plugins.OpenStack.Aodh.alarming import Alarming -from plugins.OpenStack.settings import Config - -__author__ = "Helena McGough" - - -def register_plugin(): - """Register the plugin.""" - # Initialize configuration and notifications - config = Config.instance() - - # Intialize plugin - instance = Plugin(config=config) - instance.config() - instance.alarm() - - -class Plugin(object): - """Aodh plugin for OSM MON.""" - - def __init__(self, config): - """Plugin instance.""" - log.info("Initialze the plugin instance.") - self._config = config - self._alarming = Alarming() - - def config(self): - """Configure plugin.""" - log.info("Configure the plugin instance.") - self._config.read_environ("aodh") - - def alarm(self): - """Allow alarm info to be received from Aodh.""" - log.info("Begin alarm functionality.") - self._alarming.alarming() - -if aodhclient: - register_plugin() diff --git a/plugins/OpenStack/Gnocchi/__init__.py b/plugins/OpenStack/Gnocchi/__init__.py index 32eb94e..cdbd056 100644 --- a/plugins/OpenStack/Gnocchi/__init__.py +++ b/plugins/OpenStack/Gnocchi/__init__.py @@ -19,3 +19,4 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: helena.mcgough@intel.com or adrian.hoban@intel.com ## +"""Gnocchi plugin for OSM MON.""" diff --git a/plugins/OpenStack/Gnocchi/metrics.py b/plugins/OpenStack/Gnocchi/metrics.py index a1d58fd..94641e6 100644 --- a/plugins/OpenStack/Gnocchi/metrics.py +++ b/plugins/OpenStack/Gnocchi/metrics.py @@ -24,19 +24,18 @@ import datetime import json import logging -log = logging.getLogger(__name__) import time from core.message_bus.producer import KafkaProducer -from kafka import KafkaConsumer - -from plugins.OpenStack.common import Common from plugins.OpenStack.response import OpenStack_Response +from plugins.OpenStack.settings import Config __author__ = "Helena McGough" +log = logging.getLogger(__name__) + METRIC_MAPPINGS = { "average_memory_utilization": "memory.percent", "disk_read_ops": "disk.disk_ops", @@ -63,14 +62,14 @@ class Metrics(object): def __init__(self): """Initialize the metric actions.""" - self._common = Common() + # Configure an instance of the OpenStack metric plugin + config = Config.instance() + config.read_environ("gnocchi") - # TODO(mcgoughh): Initialize a generic consumer object to consume - # message from the SO. This is hardcoded for now - server = {'server': 'localhost:9092', 'topic': 'metric_request'} - self._consumer = KafkaConsumer(server['topic'], - group_id='osm_mon', - bootstrap_servers=server['server']) + # Initialise authentication for API requests + self.auth_token = None + self.endpoint = None + self._common = None # Use the Response class to generate valid json response messages self._response = OpenStack_Response() @@ -78,124 +77,131 @@ class Metrics(object): # Initializer a producer to send responses back to SO self._producer = KafkaProducer("metric_response") - def metric_calls(self): + def metric_calls(self, message, common, auth_token): """Consume info from the message bus to manage metric requests.""" - # Consumer check for metric messages - for message in self._consumer: - # Check if this plugin should carry out this request - values = json.loads(message.value) - vim_type = values['vim_type'].lower() - - if vim_type == "openstack": - # Generate auth_token and endpoint - auth_token, endpoint = self.authenticate() - - if message.key == "create_metric_request": - # Configure metric - metric_details = values['metric_create'] - metric_id, resource_id, status = self.configure_metric( - endpoint, auth_token, metric_details) - - # Generate and send a create metric response - try: - resp_message = self._response.generate_response( - 'create_metric_response', status=status, - cor_id=values['correlation_id'], - metric_id=metric_id, r_id=resource_id) - log.info("Response messages: %s", resp_message) - self._producer.create_metrics_resp( - 'create_metric_response', resp_message, - 'metric_response') - except Exception as exc: - log.warn("Failed to create response: %s", exc) - - elif message.key == "read_metric_data_request": - # Read all metric data related to a specified metric - timestamps, metric_data = self.read_metric_data( - endpoint, auth_token, values) - - # Generate and send a response message - try: - resp_message = self._response.generate_response( - 'read_metric_data_response', - m_id=values['metric_uuid'], - m_name=values['metric_name'], - r_id=values['resource_uuid'], - cor_id=values['correlation_id'], - times=timestamps, metrics=metric_data) - log.info("Response message: %s", resp_message) - self._producer.read_metric_data_response( - 'read_metric_data_response', resp_message, - 'metric_response') - except Exception as exc: - log.warn("Failed to send read metric response:%s", exc) - - elif message.key == "delete_metric_request": - # delete the specified metric in the request - metric_id = values['metric_uuid'] - status = self.delete_metric( - endpoint, auth_token, metric_id) - - # Generate and send a response message - try: - resp_message = self._response.generate_response( - 'delete_metric_response', m_id=metric_id, - m_name=values['metric_name'], - status=status, r_id=values['resource_uuid'], - cor_id=values['correlation_id']) - log.info("Response message: %s", resp_message) - self._producer.delete_metric_response( - 'delete_metric_response', resp_message, - 'metric_response') - except Exception as exc: - log.warn("Failed to send delete response:%s", exc) - - elif message.key == "update_metric_request": - # Gnocchi doesn't support configuration updates - # Log and send a response back to this effect - log.warn("Gnocchi doesn't support metric configuration\ - updates.") - req_details = values['metric_create'] - metric_name = req_details['metric_name'] - resource_id = req_details['resource_uuid'] - metric_id = self.get_metric_id( - endpoint, auth_token, metric_name, resource_id) - - # Generate and send a response message - try: - resp_message = self._response.generate_response( - 'update_metric_response', status=False, - cor_id=values['correlation_id'], - r_id=resource_id, m_id=metric_id) - log.info("Response message: %s", resp_message) - self._producer.update_metric_response( - 'update_metric_response', resp_message, - 'metric_response') - except Exception as exc: - log.warn("Failed to send an update response:%s", exc) - - elif message.key == "list_metric_request": - list_details = values['metrics_list_request'] - - metric_list = self.list_metrics( - endpoint, auth_token, list_details) - - # Generate and send a response message - try: - resp_message = self._response.generate_response( - 'list_metric_response', m_list=metric_list, - cor_id=list_details['correlation_id']) - log.info("Response message: %s", resp_message) - self._producer.list_metric_response( - 'list_metric_response', resp_message, - 'metric_response') - except Exception as exc: - log.warn("Failed to send a list response:%s", exc) - - else: - log.warn("Unknown key, no action will be performed.") + values = json.loads(message.value) + self._common = common + log.info("OpenStack metric action required.") + + # Generate and auth_token and endpoint for request + if auth_token is not None: + if self.auth_token != auth_token: + log.info("Auth_token for metrics set by access_credentials.") + self.auth_token = auth_token else: - log.debug("Message is not for this OpenStack.") + log.info("Auth_token has not been updated.") + else: + log.info("Using environment variables to set Gnocchi auth_token.") + self.auth_token = self._common._authenticate() + + if self.endpoint is None: + log.info("Generating a new endpoint for Gnocchi.") + self.endpoint = self._common.get_endpoint("metric") + + if message.key == "create_metric_request": + # Configure metric + metric_details = values['metric_create'] + metric_id, resource_id, status = self.configure_metric( + self.endpoint, self.auth_token, metric_details) + + # Generate and send a create metric response + try: + resp_message = self._response.generate_response( + 'create_metric_response', status=status, + cor_id=values['correlation_id'], + metric_id=metric_id, r_id=resource_id) + log.info("Response messages: %s", resp_message) + self._producer.create_metrics_resp( + 'create_metric_response', resp_message, + 'metric_response') + except Exception as exc: + log.warn("Failed to create response: %s", exc) + + elif message.key == "read_metric_data_request": + # Read all metric data related to a specified metric + timestamps, metric_data = self.read_metric_data( + self.endpoint, self.auth_token, values) + + # Generate and send a response message + try: + resp_message = self._response.generate_response( + 'read_metric_data_response', + m_id=values['metric_uuid'], + m_name=values['metric_name'], + r_id=values['resource_uuid'], + cor_id=values['correlation_id'], + times=timestamps, metrics=metric_data) + log.info("Response message: %s", resp_message) + self._producer.read_metric_data_response( + 'read_metric_data_response', resp_message, + 'metric_response') + except Exception as exc: + log.warn("Failed to send read metric response:%s", exc) + + elif message.key == "delete_metric_request": + # delete the specified metric in the request + metric_id = values['metric_uuid'] + status = self.delete_metric( + self.endpoint, self.auth_token, metric_id) + + # Generate and send a response message + try: + resp_message = self._response.generate_response( + 'delete_metric_response', m_id=metric_id, + m_name=values['metric_name'], + status=status, r_id=values['resource_uuid'], + cor_id=values['correlation_id']) + log.info("Response message: %s", resp_message) + self._producer.delete_metric_response( + 'delete_metric_response', resp_message, + 'metric_response') + except Exception as exc: + log.warn("Failed to send delete response:%s", exc) + + elif message.key == "update_metric_request": + # Gnocchi doesn't support configuration updates + # Log and send a response back to this effect + log.warn("Gnocchi doesn't support metric configuration\ + updates.") + req_details = values['metric_create'] + metric_name = req_details['metric_name'] + resource_id = req_details['resource_uuid'] + metric_id = self.get_metric_id( + self.endpoint, self.auth_token, metric_name, resource_id) + + # Generate and send a response message + try: + resp_message = self._response.generate_response( + 'update_metric_response', status=False, + cor_id=values['correlation_id'], + r_id=resource_id, m_id=metric_id) + log.info("Response message: %s", resp_message) + self._producer.update_metric_response( + 'update_metric_response', resp_message, + 'metric_response') + except Exception as exc: + log.warn("Failed to send an update response:%s", exc) + + elif message.key == "list_metric_request": + list_details = values['metrics_list_request'] + + metric_list = self.list_metrics( + self.endpoint, self.auth_token, list_details) + + # Generate and send a response message + try: + resp_message = self._response.generate_response( + 'list_metric_response', m_list=metric_list, + cor_id=list_details['correlation_id']) + log.info("Response message: %s", resp_message) + self._producer.list_metric_response( + 'list_metric_response', resp_message, + 'metric_response') + except Exception as exc: + log.warn("Failed to send a list response:%s", exc) + + else: + log.warn("Unknown key, no action will be performed.") return @@ -401,18 +407,6 @@ class Metrics(object): log.warn("Failed to gather specified measures: %s", exc) return timestamps, data - def authenticate(self): - """Generate an authentication token and endpoint for metric request.""" - try: - # Check for a tenant_id - auth_token = self._common._authenticate() - endpoint = self._common.get_endpoint("metric") - return auth_token, endpoint - except Exception as exc: - log.warn("Authentication to Keystone failed: %s", exc) - - return None, None - def response_list(self, metric_list, metric_name=None, resource=None): """Create the appropriate lists for a list response.""" resp_list, name_list, res_list = [], [], [] diff --git a/plugins/OpenStack/Gnocchi/plugin_instance.py b/plugins/OpenStack/Gnocchi/plugin_instance.py deleted file mode 100644 index eb8b0e3..0000000 --- a/plugins/OpenStack/Gnocchi/plugin_instance.py +++ /dev/null @@ -1,73 +0,0 @@ -# Copyright 2017 Intel Research and Development Ireland Limited -# ************************************************************* - -# This file is part of OSM Monitoring module -# All Rights Reserved to Intel Corporation - -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -# For those usages not covered by the Apache License, Version 2.0 please -# contact: helena.mcgough@intel.com or adrian.hoban@intel.com -## -"""Gnocchi plugin for the OSM monitoring module.""" - -import logging -import sys - -sys.path.append("MON/") - -logging.basicConfig(filename='gnocchi_MON.log', datefmt='%m/%d/%Y %I:%M:%S %p', - format='%(asctime)s %(message)s', filemode='a', - level=logging.INFO) -log = logging.getLogger(__name__) - -try: - import gnocchiclient -except ImportError: - log.warn("Gnocchiclient could not be imported") - -from plugins.OpenStack.Gnocchi.metrics import Metrics -from plugins.OpenStack.settings import Config - -__author__ = "Helena McGough" - - -def register_plugin(): - """Register the plugin.""" - config = Config.instance() - instance = Plugin(config=config) - instance.config() - instance.metrics() - - -class Plugin(object): - """Gnocchi plugin for OSM MON.""" - - def __init__(self, config): - """Plugin instance.""" - log.info("Initialze the plugin instance.") - self._config = config - self._metrics = Metrics() - - def config(self): - """Configure plugin.""" - log.info("Configure the plugin instance.") - self._config.read_environ("gnocchi") - - def metrics(self): - """Initialize metric functionality.""" - log.info("Initialize metric functionality.") - self._metrics.metric_calls() - -if gnocchiclient: - register_plugin() diff --git a/plugins/OpenStack/__init__.py b/plugins/OpenStack/__init__.py index 32eb94e..18eed2f 100644 --- a/plugins/OpenStack/__init__.py +++ b/plugins/OpenStack/__init__.py @@ -19,3 +19,4 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: helena.mcgough@intel.com or adrian.hoban@intel.com ## +"""OpenStack plugin for OSM MON.""" diff --git a/plugins/OpenStack/common.py b/plugins/OpenStack/common.py index c892a30..8769312 100644 --- a/plugins/OpenStack/common.py +++ b/plugins/OpenStack/common.py @@ -20,9 +20,9 @@ # contact: helena.mcgough@intel.com or adrian.hoban@intel.com ## """Common methods for the OpenStack plugins.""" +import json import logging -log = logging.getLogger(__name__) from keystoneclient.v3 import client @@ -32,6 +32,8 @@ import requests __author__ = "Helena McGough" +log = logging.getLogger(__name__) + class Common(object): """Common calls for Gnocchi/Aodh plugins.""" @@ -39,20 +41,48 @@ class Common(object): def __init__(self): """Create the common instance.""" self._auth_token = None - self._endpoint = None self._ks = None + self.openstack_url = None + self.user = None + self.password = None + self.tenant = None - def _authenticate(self): + def _authenticate(self, message=None): """Authenticate and/or renew the authentication token.""" if self._auth_token is not None: return self._auth_token + if message is not None: + values = json.loads(message.value)['access_config'] + self.openstack_url = values['openstack_site'] + self.user = values['user'] + self.password = values['password'] + self.tenant = values['vim_tenant_name'] + + try: + # try to authenticate with supplied access_credentials + self._ks = client.Client(auth_url=self.openstack_url, + username=self.user, + password=self.password, + tenant_name=self.tenant) + self._auth_token = self._ks.auth_token + log.info("Authenticating with access_credentials from SO.") + return self._auth_token + except Exception as exc: + log.warn("Authentication failed with access_credentials: %s", + exc) + + else: + log.info("Access_credentials were not sent from SO.") + + # If there are no access_credentials or they fail use env variables try: cfg = Config.instance() self._ks = client.Client(auth_url=cfg.OS_AUTH_URL, username=cfg.OS_USERNAME, password=cfg.OS_PASSWORD, tenant_name=cfg.OS_TENANT_NAME) + log.info("Authenticating with environment varialbles.") self._auth_token = self._ks.auth_token except Exception as exc: @@ -82,26 +112,28 @@ class Common(object): headers = {'X-Auth-Token': auth_token, 'Content-type': 'application/json'} # perform request and return its result + if req_type == "put": + response = requests.put( + url, data=payload, headers=headers, + timeout=1) + elif req_type == "get": + response = requests.get( + url, params=params, headers=headers, timeout=1) + elif req_type == "delete": + response = requests.delete( + url, headers=headers, timeout=1) + else: + response = requests.post( + url, data=payload, headers=headers, + timeout=1) + + # Raises exception if there was an error try: - if req_type == "put": - response = requests.put( - url, data=payload, headers=headers, - timeout=1) - elif req_type == "get": - response = requests.get( - url, params=params, headers=headers, timeout=1) - elif req_type == "delete": - response = requests.delete( - url, headers=headers, timeout=1) - else: - response = requests.post( - url, data=payload, headers=headers, - timeout=1) - - except Exception as e: - log.warn("Exception thrown on request", e) - if response is not None: - log.warn("Request resulted in %s code and %s response", - response.status_code, response.text) - + response.raise_for_status() + # pylint: disable=broad-except + except Exception: + # Log out the result of the request for debugging purpose + log.debug( + 'Result: %s, %d', + response.status_code, response.text) return response diff --git a/plugins/OpenStack/response.py b/plugins/OpenStack/response.py index 5bb620b..691374a 100644 --- a/plugins/OpenStack/response.py +++ b/plugins/OpenStack/response.py @@ -60,6 +60,7 @@ class OpenStack_Response(object): message = self.notify_alarm(**kwargs) else: log.warn("Failed to generate a valid response message.") + message = None return message diff --git a/plugins/OpenStack/settings.py b/plugins/OpenStack/settings.py index d177dcf..fe80641 100644 --- a/plugins/OpenStack/settings.py +++ b/plugins/OpenStack/settings.py @@ -22,7 +22,6 @@ """Configurations for the OpenStack plugins.""" import logging -log = logging.getLogger(__name__) import os from collections import namedtuple @@ -33,6 +32,8 @@ import six __author__ = "Helena McGough" +log = logging.getLogger(__name__) + class BadConfigError(Exception): """Configuration exception.""" diff --git a/plugins/__init__.py b/plugins/__init__.py index 32eb94e..316993c 100644 --- a/plugins/__init__.py +++ b/plugins/__init__.py @@ -1,3 +1,5 @@ +# -*- coding: utf-8 -*- + # Copyright 2017 Intel Research and Development Ireland Limited # ************************************************************* @@ -19,3 +21,4 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: helena.mcgough@intel.com or adrian.hoban@intel.com ## +"""Plugins for OSM MON.""" diff --git a/test-requirements.txt b/test-requirements.txt index 0873ca4..b8a1cb5 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -20,10 +20,17 @@ # contact: helena.mcgough@intel.com or adrian.hoban@intel.com ## -flake8 +hacking>=0.10.0,<0.11 + +flake8<3.0 mock +oslosphinx>=2.5.0 # Apache-2.0 +oslotest>=1.10.0 # Apache-2.0 os-testr -testrepository -testscenarios -testtools +testrepository>=0.0.18 +pylint +python-subunit>=0.0.18 +pytest +testscenarios>=0.4 +testtools>=1.4.0 kafka diff --git a/test/OpenStack/__init__.py b/test/OpenStack/__init__.py index d25e458..861627f 100644 --- a/test/OpenStack/__init__.py +++ b/test/OpenStack/__init__.py @@ -19,4 +19,13 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: helena.mcgough@intel.com or adrian.hoban@intel.com ## +"""OpenStack plugin tests.""" +import logging + +# Initialise a logger for tests +logging.basicConfig(filename='OpenStack_tests.log', + format='%(asctime)s %(message)s', + datefmt='%m/%d/%Y %I:%M:%S %p', filemode='a', + level=logging.INFO) +log = logging.getLogger(__name__) diff --git a/test/OpenStack/test_alarm_req.py b/test/OpenStack/test_alarm_req.py new file mode 100644 index 0000000..721fae9 --- /dev/null +++ b/test/OpenStack/test_alarm_req.py @@ -0,0 +1,129 @@ +# Copyright 2017 iIntel Research and Development Ireland Limited +# ************************************************************** + +# This file is part of OSM Monitoring module +# All Rights Reserved to Intel Corporation + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: helena.mcgough@intel.com or adrian.hoban@intel.com +## +"""Tests for all alarm request message keys.""" + +import json + +import logging + +import unittest + +import mock + +from plugins.OpenStack.Aodh import alarming as alarm_req +from plugins.OpenStack.common import Common + +__author__ = "Helena McGough" + +log = logging.getLogger(__name__) + + +class Message(object): + """A class to mock a message object value for alarm requests.""" + + def __init__(self): + """Initialize a mocked message instance.""" + self.topic = "alarm_request" + self.key = None + self.value = json.dumps({"mock_value": "mock_details"}) + + +class TestAlarmKeys(unittest.TestCase): + """Integration test for alarm request keys.""" + + def setUp(self): + """Setup the tests for alarm request keys.""" + super(TestAlarmKeys, self).setUp() + self.alarming = alarm_req.Alarming() + self.alarming.common = Common() + + @mock.patch.object(Common, "_authenticate") + def test_alarming_env_authentication(self, auth): + """Test getting an auth_token and endpoint for alarm requests.""" + # if auth_token is None environment variables are used to authenticare + message = Message() + + self.alarming.alarming(message, self.alarming.common, None) + + auth.assert_called_with() + + @mock.patch.object(Common, "_authenticate") + def test_acccess_cred_auth(self, auth): + """Test receiving auth_token from access creds.""" + message = Message() + + self.alarming.alarming(message, self.alarming.common, "my_auth_token") + + auth.assert_not_called + self.assertEqual(self.alarming.auth_token, "my_auth_token") + + @mock.patch.object(alarm_req.Alarming, "delete_alarm") + def test_delete_alarm_key(self, del_alarm): + """Test the functionality for a create alarm request.""" + # Mock a message value and key + message = Message() + message.key = "delete_alarm_request" + message.value = json.dumps({"alarm_delete_request": + {"alarm_uuid": "my_alarm_id"}}) + + # Call the alarming functionality and check delete request + self.alarming.alarming(message, self.alarming.common, "my_auth_token") + + del_alarm.assert_called_with(mock.ANY, mock.ANY, "my_alarm_id") + + @mock.patch.object(alarm_req.Alarming, "list_alarms") + def test_list_alarm_key(self, list_alarm): + """Test the functionality for a list alarm request.""" + # Mock a message with list alarm key and value + message = Message() + message.key = "list_alarm_request" + message.value = json.dumps({"alarm_list_request": "my_alarm_details"}) + + # Call the alarming functionality and check list functionality + self.alarming.alarming(message, self.alarming.common, "my_auth_token") + list_alarm.assert_called_with(mock.ANY, mock.ANY, "my_alarm_details") + + @mock.patch.object(alarm_req.Alarming, "update_alarm_state") + def test_ack_alarm_key(self, ack_alarm): + """Test the functionality for an acknowledge alarm request.""" + # Mock a message with acknowledge alarm key and value + message = Message() + message.key = "acknowledge_alarm" + message.value = json.dumps({"ack_details": + {"alarm_uuid": "my_alarm_id"}}) + + # Call alarming functionality and check acknowledge functionality + self.alarming.alarming(message, self.alarming.common, "my_auth_token") + ack_alarm.assert_called_with(mock.ANY, mock.ANY, "my_alarm_id") + + @mock.patch.object(alarm_req.Alarming, "configure_alarm") + def test_config_alarm_key(self, config_alarm): + """Test the functionality for a create alarm request.""" + # Mock a message with config alarm key and value + message = Message() + message.key = "create_alarm_request" + message.value = json.dumps({"alarm_create_request": "alarm_details"}) + + # Call alarming functionality and check config alarm call + config_alarm.return_value = "my_alarm_id", True + self.alarming.alarming(message, self.alarming.common, "my_auth_token") + config_alarm.assert_called_with(mock.ANY, mock.ANY, "alarm_details") diff --git a/test/OpenStack/test_alarming.py b/test/OpenStack/test_alarming.py new file mode 100644 index 0000000..557a93d --- /dev/null +++ b/test/OpenStack/test_alarming.py @@ -0,0 +1,271 @@ +# Copyright 2017 iIntel Research and Development Ireland Limited +# ************************************************************** + +# This file is part of OSM Monitoring module +# All Rights Reserved to Intel Corporation + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: helena.mcgough@intel.com or adrian.hoban@intel.com +## +"""Tests for all alarm request message keys.""" + +import json + +import logging + +import unittest + +import mock + +from plugins.OpenStack.Aodh import alarming as alarm_req +from plugins.OpenStack.common import Common + +__author__ = "Helena McGough" + +log = logging.getLogger(__name__) + +auth_token = mock.ANY +endpoint = mock.ANY + + +class Response(object): + """Mock a response message class.""" + + def __init__(self, result): + """Initialise the response text and status code.""" + self.text = json.dumps(result) + self.status_code = "MOCK_STATUS_CODE" + + +class TestAlarming(unittest.TestCase): + """Tests for alarming class functions.""" + + def setUp(self): + """Setup for tests.""" + super(TestAlarming, self).setUp() + self.alarming = alarm_req.Alarming() + self.alarming.common = Common() + + @mock.patch.object(alarm_req.Alarming, "check_payload") + @mock.patch.object(alarm_req.Alarming, "check_for_metric") + @mock.patch.object(Common, "_perform_request") + def test_config_invalid_alarm_req(self, perf_req, check_metric, check_pay): + """Test configure an invalid alarm request.""" + # Configuring with invalid alarm name results in failure + values = {"alarm_name": "my_alarm", + "metric_name": "my_metric", + "resource_uuid": "my_r_id"} + self.alarming.configure_alarm(endpoint, auth_token, values) + perf_req.assert_not_called + perf_req.reset_mock() + + # Correct alarm_name will check for metric in Gnocchi + # If there isn't one an alarm won;t be created + values = {"alarm_name": "disk_write_ops", + "metric_name": "disk_write_ops", + "resource_uuid": "my_r_id"} + + check_metric.return_value = None + + self.alarming.configure_alarm(endpoint, auth_token, values) + perf_req.assert_not_called + + @mock.patch.object(alarm_req.Alarming, "check_payload") + @mock.patch.object(alarm_req.Alarming, "check_for_metric") + @mock.patch.object(Common, "_perform_request") + def test_config_valid_alarm_req(self, perf_req, check_metric, check_pay): + """Test config a valid alarm.""" + # Correct alarm_name will check for metric in Gnocchi + # And conform that the payload is configured correctly + values = {"alarm_name": "disk_write_ops", + "metric_name": "disk_write_ops", + "resource_uuid": "my_r_id"} + + check_metric.return_value = "my_metric_id" + check_pay.return_value = "my_payload" + + self.alarming.configure_alarm(endpoint, auth_token, values) + perf_req.assert_called_with( + "/v2/alarms/", auth_token, + req_type="post", payload="my_payload") + + @mock.patch.object(Common, "_perform_request") + def test_delete_alarm_req(self, perf_req): + """Test delete alarm request.""" + self.alarming.delete_alarm(endpoint, auth_token, "my_alarm_id") + + perf_req.assert_called_with( + "/v2/alarms/my_alarm_id", auth_token, req_type="delete") + + @mock.patch.object(Common, "_perform_request") + def test_invalid_list_alarm_req(self, perf_req): + """Test invalid list alarm_req.""" + # Request will not be performed with out a resoure_id + list_details = {"mock_details": "invalid_details"} + self.alarming.list_alarms(endpoint, auth_token, list_details) + + perf_req.assert_not_called + + @mock.patch.object(Common, "_perform_request") + def test_valid_list_alarm_req(self, perf_req): + """Test valid list alarm request.""" + # Minimum requirement for an alarm list is resource_id + list_details = {"resource_uuid": "mock_r_id"} + self.alarming.list_alarms(endpoint, auth_token, list_details) + + perf_req.assert_called_with( + "/v2/alarms/", auth_token, req_type="get") + perf_req.reset_mock() + + # Check list with alarm_name defined + list_details = {"resource_uuid": "mock_r_id", + "alarm_name": "my_alarm", + "severity": "critical"} + self.alarming.list_alarms(endpoint, auth_token, list_details) + + perf_req.assert_called_with( + "/v2/alarms/", auth_token, req_type="get") + + @mock.patch.object(Common, "_perform_request") + def test_ack_alarm_req(self, perf_req): + """Test update alarm state for acknowledge alarm request.""" + self.alarming.update_alarm_state(endpoint, auth_token, "my_alarm_id") + + perf_req.assert_called_with( + "/v2/alarms/my_alarm_id/state", auth_token, req_type="put", + payload=json.dumps("ok")) + + @mock.patch.object(alarm_req.Alarming, "check_payload") + @mock.patch.object(Common, "_perform_request") + def test_update_alarm_invalid(self, perf_req, check_pay): + """Test update alarm with invalid get response.""" + values = {"alarm_uuid": "my_alarm_id"} + + self.alarming.update_alarm(endpoint, auth_token, values) + + perf_req.assert_called_with(mock.ANY, auth_token, req_type="get") + check_pay.assert_not_called + + @mock.patch.object(alarm_req.Alarming, "check_payload") + @mock.patch.object(Common, "_perform_request") + def test_update_alarm_invalid_payload(self, perf_req, check_pay): + """Test update alarm with invalid payload.""" + resp = Response({"name": "my_alarm", + "state": "alarm", + "gnocchi_resources_threshold_rule": + {"resource_id": "my_resource_id", + "metric": "my_metric"}}) + perf_req.return_value = resp + check_pay.return_value = None + values = {"alarm_uuid": "my_alarm_id"} + + self.alarming.update_alarm(endpoint, auth_token, values) + + perf_req.assert_called_with(mock.ANY, auth_token, req_type="get") + self.assertEqual(perf_req.call_count, 1) + + @mock.patch.object(alarm_req.Alarming, "check_payload") + @mock.patch.object(Common, "_perform_request") + def test_update_alarm_valid(self, perf_req, check_pay): + """Test valid update alarm request.""" + resp = Response({"name": "my_alarm", + "state": "alarm", + "gnocchi_resources_threshold_rule": + {"resource_id": "my_resource_id", + "metric": "my_metric"}}) + perf_req.return_value = resp + values = {"alarm_uuid": "my_alarm_id"} + + self.alarming.update_alarm(endpoint, auth_token, values) + + check_pay.assert_called_with(values, "my_metric", "my_resource_id", + "my_alarm", alarm_state="alarm") + + self.assertEqual(perf_req.call_count, 2) + # Second call is the update request + perf_req.assert_called_with( + '/v2/alarms/my_alarm_id', auth_token, + req_type="put", payload=check_pay.return_value) + + def test_check_valid_payload(self): + """Test the check payload function for a valid payload.""" + values = {"severity": "warning", + "statistic": "COUNT", + "threshold_value": 12, + "operation": "GT"} + payload = self.alarming.check_payload( + values, "my_metric", "r_id", "alarm_name") + + self.assertEqual( + json.loads(payload), {"name": "alarm_name", + "gnocchi_resources_threshold_rule": + {"resource_id": "r_id", + "metric": "my_metric", + "comparison_operator": "gt", + "aggregation_method": "count", + "threshold": 12, + "resource_type": "generic"}, + "severity": "low", + "state": "ok", + "type": "gnocchi_resources_threshold"}) + + def test_check_valid_state_payload(self): + """Test the check payload function for a valid payload with state.""" + values = {"severity": "warning", + "statistic": "COUNT", + "threshold_value": 12, + "operation": "GT"} + payload = self.alarming.check_payload( + values, "my_metric", "r_id", "alarm_name", alarm_state="alarm") + + self.assertEqual( + json.loads(payload), {"name": "alarm_name", + "gnocchi_resources_threshold_rule": + {"resource_id": "r_id", + "metric": "my_metric", + "comparison_operator": "gt", + "aggregation_method": "count", + "threshold": 12, + "resource_type": "generic"}, + "severity": "low", + "state": "alarm", + "type": "gnocchi_resources_threshold"}) + + def test_check_invalid_payload(self): + """Test the check payload function for an invalid payload.""" + values = {"alarm_values": "mock_invalid_details"} + payload = self.alarming.check_payload( + values, "my_metric", "r_id", "alarm_name") + + self.assertEqual(payload, None) + + @mock.patch.object(Common, "_perform_request") + def test_get_alarm_state(self, perf_req): + """Test the get alarm state function.""" + self.alarming.get_alarm_state(endpoint, auth_token, "alarm_id") + + perf_req.assert_called_with( + "/v2/alarms/alarm_id/state", auth_token, req_type="get") + + @mock.patch.object(Common, "get_endpoint") + @mock.patch.object(Common, "_perform_request") + def test_check_for_metric(self, perf_req, get_endpoint): + """Test the check for metric function.""" + get_endpoint.return_value = "gnocchi_endpoint" + + self.alarming.check_for_metric(auth_token, "metric_name", "r_id") + + perf_req.assert_called_with( + "gnocchi_endpoint/v1/metric/", auth_token, req_type="get") diff --git a/test/OpenStack/test_common.py b/test/OpenStack/test_common.py index 29e9558..e725670 100644 --- a/test/OpenStack/test_common.py +++ b/test/OpenStack/test_common.py @@ -21,14 +21,40 @@ ## """Tests for all common OpenStack methods.""" +import json + +import logging + import unittest +from keystoneclient.v3 import client + import mock from plugins.OpenStack.common import Common +from plugins.OpenStack.settings import Config import requests +__author__ = "Helena McGough" + +log = logging.getLogger(__name__) + + +class Message(object): + """Mock a message for an access credentials request.""" + + def __init__(self): + """Initialise the topic and value of access_cred message.""" + self.topic = "access_credentials" + self.value = json.dumps({"mock_value": "mock_details", + "vim_type": "OPENSTACK", + "access_config": + {"openstack_site": "my_site", + "user": "my_user", + "password": "my_password", + "vim_tenant_name": "my_tenant"}}) + class TestCommon(unittest.TestCase): """Test the common class for OpenStack plugins.""" @@ -38,6 +64,52 @@ class TestCommon(unittest.TestCase): super(TestCommon, self).setUp() self.common = Common() + @mock.patch.object(client, "Client") + def test_authenticate_exists(self, key_client): + """Testing if an authentication token already exists.""" + # If the auth_token is already generated a new one will not be creates + self.common._auth_token = "my_auth_token" + token = self.common._authenticate() + + self.assertEqual(token, "my_auth_token") + + @mock.patch.object(Config, "instance") + @mock.patch.object(client, "Client") + def test_authenticate_none(self, key_client, cfg): + """Test generating a new authentication token.""" + # If auth_token doesn't exist one will try to be created with keystone + # With the configuration values from the environment + self.common._auth_token = None + config = cfg.return_value + url = config.OS_AUTH_URL + user = config.OS_USERNAME + pword = config.OS_PASSWORD + tenant = config.OS_TENANT_NAME + + self.common._authenticate() + + key_client.assert_called_with(auth_url=url, + username=user, + password=pword, + tenant_name=tenant) + key_client.reset_mock() + + @mock.patch.object(client, "Client") + def test_authenticate_access_cred(self, key_client): + """Test generating an auth_token using access_credentials from SO.""" + # Mock valid message from SO + self.common._auth_token = None + message = Message() + + self.common._authenticate(message=message) + + # The class variables are set for each consifugration + self.assertEqual(self.common.openstack_url, "my_site") + self.assertEqual(self.common.user, "my_user") + self.assertEqual(self.common.password, "my_password") + self.assertEqual(self.common.tenant, "my_tenant") + key_client.assert_called + @mock.patch.object(requests, 'post') def test_post_req(self, post): """Testing a post request.""" diff --git a/test/OpenStack/test_metric_calls.py b/test/OpenStack/test_metric_calls.py new file mode 100644 index 0000000..f4c7e97 --- /dev/null +++ b/test/OpenStack/test_metric_calls.py @@ -0,0 +1,332 @@ +# Copyright 2017 iIntel Research and Development Ireland Limited +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Intel Corporation + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: helena.mcgough@intel.com or adrian.hoban@intel.com +## +"""Tests for all metric request message keys.""" + +import json + +import logging + +import unittest + +import mock + +from plugins.OpenStack.Gnocchi import metrics as metric_req + +from plugins.OpenStack.common import Common + +__author__ = "Helena McGough" + +log = logging.getLogger(__name__) + +# Mock auth_token and endpoint +endpoint = mock.ANY +auth_token = mock.ANY + +# Mock a valid metric list for some tests, and a resultant list +metric_list = [{"name": "disk_write_ops", + "id": "metric_id", + "unit": "units", + "resource_id": "r_id"}] +result_list = ["metric_id", "r_id", "units", "disk_write_ops"] + + +class Response(object): + """Mock a response object for requests.""" + + def __init__(self): + """Initialise test and status code values.""" + self.text = json.dumps("mock_response_text") + self.status_code = "STATUS_CODE" + + +class TestMetricCalls(unittest.TestCase): + """Integration test for metric request keys.""" + + def setUp(self): + """Setup the tests for metric request keys.""" + super(TestMetricCalls, self).setUp() + self.metrics = metric_req.Metrics() + self.metrics._common = Common() + + @mock.patch.object(metric_req.Metrics, "get_metric_name") + @mock.patch.object(metric_req.Metrics, "get_metric_id") + @mock.patch.object(Common, "_perform_request") + def test_invalid_config_metric_req( + self, perf_req, get_metric, get_metric_name): + """Test the configure metric function, for an invalid metric.""" + # Test invalid configuration for creating a metric + values = {"metric_details": "invalid_metric"} + + m_id, r_id, status = self.metrics.configure_metric( + endpoint, auth_token, values) + + perf_req.assert_not_called + self.assertEqual(m_id, None) + self.assertEqual(r_id, None) + self.assertEqual(status, False) + + # Test with an invalid metric name, will not perform request + values = {"resource_uuid": "r_id"} + get_metric_name.return_value = "metric_name", None + + m_id, r_id, status = self.metrics.configure_metric( + endpoint, auth_token, values) + + perf_req.assert_not_called + self.assertEqual(m_id, None) + self.assertEqual(r_id, "r_id") + self.assertEqual(status, False) + get_metric_name.reset_mock() + + # If metric exists, it won't be recreated + get_metric_name.return_value = "metric_name", "norm_name" + get_metric.return_value = "metric_id" + + m_id, r_id, status = self.metrics.configure_metric( + endpoint, auth_token, values) + + perf_req.assert_not_called + self.assertEqual(m_id, "metric_id") + self.assertEqual(r_id, "r_id") + self.assertEqual(status, False) + + @mock.patch.object(metric_req.Metrics, "get_metric_name") + @mock.patch.object(metric_req.Metrics, "get_metric_id") + @mock.patch.object(Common, "_perform_request") + def test_valid_config_metric_req( + self, perf_req, get_metric, get_metric_name): + """Test the configure metric function, for a valid metric.""" + # Test valid configuration and payload for creating a metric + values = {"resource_uuid": "r_id", + "metric_unit": "units"} + get_metric_name.return_value = "metric_name", "norm_name" + get_metric.return_value = None + payload = {"id": "r_id", + "metrics": {"metric_name": + {"archive_policy_name": "high", + "name": "metric_name", + "unit": "units"}}} + + self.metrics.configure_metric(endpoint, auth_token, values) + + perf_req.assert_called_with( + "/v1/resource/generic", auth_token, req_type="post", + payload=json.dumps(payload)) + + @mock.patch.object(Common, "_perform_request") + def test_delete_metric_req(self, perf_req): + """Test the delete metric function.""" + self.metrics.delete_metric(endpoint, auth_token, "metric_id") + + perf_req.assert_called_with( + "/v1/metric/metric_id", auth_token, req_type="delete") + + @mock.patch.object(Common, "_perform_request") + def test_delete_metric_invalid_status(self, perf_req): + """Test invalid response for delete request.""" + perf_req.return_value = "404" + + status = self.metrics.delete_metric(endpoint, auth_token, "metric_id") + + self.assertEqual(status, False) + + @mock.patch.object(metric_req.Metrics, "response_list") + @mock.patch.object(Common, "_perform_request") + def test_complete_list_metric_req(self, perf_req, resp_list): + """Test the complete list metric function.""" + # Test listing metrics without any configuration options + values = {} + resp = Response() + perf_req.return_value = resp + self.metrics.list_metrics(endpoint, auth_token, values) + + perf_req.assert_called_with( + "/v1/metric/", auth_token, req_type="get") + resp_list.assert_called_with("mock_response_text") + + @mock.patch.object(metric_req.Metrics, "response_list") + @mock.patch.object(Common, "_perform_request") + def test_resource_list_metric_req(self, perf_req, resp_list): + """Test the resource list metric function.""" + # Test listing metrics with a resource id specified + values = {"resource_uuid": "resource_id"} + resp = Response() + perf_req.return_value = resp + self.metrics.list_metrics(endpoint, auth_token, values) + + perf_req.assert_called_with( + "/v1/metric/", auth_token, req_type="get") + resp_list.assert_called_with( + "mock_response_text", resource="resource_id") + + @mock.patch.object(metric_req.Metrics, "response_list") + @mock.patch.object(Common, "_perform_request") + def test_name_list_metric_req(self, perf_req, resp_list): + """Test the metric_name list metric function.""" + # Test listing metrics with a metric_name specified + values = {"metric_name": "disk_write_bytes"} + resp = Response() + perf_req.return_value = resp + self.metrics.list_metrics(endpoint, auth_token, values) + + perf_req.assert_called_with( + "/v1/metric/", auth_token, req_type="get") + resp_list.assert_called_with( + "mock_response_text", metric_name="disk_write_bytes") + + @mock.patch.object(metric_req.Metrics, "response_list") + @mock.patch.object(Common, "_perform_request") + def test_combined_list_metric_req(self, perf_req, resp_list): + """Test the combined resource and metric list metric function.""" + # Test listing metrics with a resource id and metric name specified + values = {"resource_uuid": "resource_id", + "metric_name": "packets_sent"} + resp = Response() + perf_req.return_value = resp + self.metrics.list_metrics(endpoint, auth_token, values) + + perf_req.assert_called_with( + "/v1/metric/", auth_token, req_type="get") + resp_list.assert_called_with( + "mock_response_text", resource="resource_id", + metric_name="packets_sent") + + @mock.patch.object(Common, "_perform_request") + def test_get_metric_id(self, perf_req): + """Test get_metric_id function.""" + self.metrics.get_metric_id(endpoint, auth_token, "my_metric", "r_id") + + perf_req.assert_called_with( + "/v1/resource/generic/r_id", auth_token, req_type="get") + + def test_get_metric_name(self): + """Test the result from the get_metric_name function.""" + # test with a valid metric_name + values = {"metric_name": "disk_write_ops"} + + metric_name, norm_name = self.metrics.get_metric_name(values) + + self.assertEqual(metric_name, "disk_write_ops") + self.assertEqual(norm_name, "disk.disk_ops") + + # test with an invalid metric name + values = {"metric_name": "my_invalid_metric"} + + metric_name, norm_name = self.metrics.get_metric_name(values) + + self.assertEqual(metric_name, "my_invalid_metric") + self.assertEqual(norm_name, None) + + @mock.patch.object(Common, "_perform_request") + def test_valid_read_data_req(self, perf_req): + """Test the read metric data function, for a valid call.""" + values = {"metric_uuid": "metric_id", + "collection_unit": "DAY", + "collection_period": 1} + + self.metrics.read_metric_data(endpoint, auth_token, values) + + perf_req.assert_called_once + + @mock.patch.object(Common, "_perform_request") + def test_invalid_read_data_req(self, perf_req): + """Test the read metric data function, for an invalid call.""" + # Teo empty lists wil be returned because the values are invalid + values = {} + + times, data = self.metrics.read_metric_data( + endpoint, auth_token, values) + + self.assertEqual(times, []) + self.assertEqual(data, []) + + def test_complete_response_list(self): + """Test the response list function for formating metric lists.""" + # Mock a list for testing purposes, with valid OSM metric + resp_list = self.metrics.response_list(metric_list) + + # Check for the expected values in the resulting list + for l in result_list: + self.assertIn(l, resp_list[0]) + + def test_name_response_list(self): + """Test the response list with metric name configured.""" + # Mock the metric name to test a metric name list + # Test with a name that is not in the list + invalid_name = "my_metric" + resp_list = self.metrics.response_list( + metric_list, metric_name=invalid_name) + + self.assertEqual(resp_list, []) + + # Test with a name on the list + valid_name = "disk_write_ops" + resp_list = self.metrics.response_list( + metric_list, metric_name=valid_name) + + # Check for the expected values in the resulting list + for l in result_list: + self.assertIn(l, resp_list[0]) + + def test_resource_response_list(self): + """Test the response list with resource_id configured.""" + # Mock a resource_id to test a resource list + # Test with resource not on the list + invalid_id = "mock_resource" + resp_list = self.metrics.response_list(metric_list, resource=invalid_id) + + self.assertEqual(resp_list, []) + + # Test with a resource on the list + valid_id = "r_id" + resp_list = self.metrics.response_list(metric_list, resource=valid_id) + + # Check for the expected values in the resulting list + for l in result_list: + self.assertIn(l, resp_list[0]) + + def test_combined_response_list(self): + """Test the response list function with resource_id and metric_name.""" + # Test for a combined resource and name list + # resource and name are on the lisat + valid_name = "disk_write_ops" + valid_id = "r_id" + resp_list = self.metrics.response_list( + metric_list, metric_name=valid_name, resource=valid_id) + + # Check for the expected values in the resulting list + for l in result_list: + self.assertIn(l, resp_list[0]) + + # resource not on list + invalid_id = "mock_resource" + resp_list = self.metrics.response_list( + metric_list, metric_name=valid_name, resource=invalid_id) + + self.assertEqual(resp_list, []) + + # metric name not on list + invalid_name = "mock_metric" + resp_list = self.metrics.response_list( + metric_list, metric_name=invalid_name, resource=valid_id) + + self.assertEqual(resp_list, []) diff --git a/test/OpenStack/test_metric_req.py b/test/OpenStack/test_metric_req.py new file mode 100644 index 0000000..a3ab3dd --- /dev/null +++ b/test/OpenStack/test_metric_req.py @@ -0,0 +1,151 @@ +# Copyright 2017 iIntel Research and Development Ireland Limited +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Intel Corporation + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: helena.mcgough@intel.com or adrian.hoban@intel.com +## +"""Tests for all metric request message keys.""" + +import json + +import logging + +import unittest + +import mock + +from plugins.OpenStack.Gnocchi import metrics as metric_req + +from plugins.OpenStack.common import Common + +__author__ = "Helena McGough" + +log = logging.getLogger(__name__) + + +class Message(object): + """A class to mock a message object value for metric requests.""" + + def __init__(self): + """Initialize a mocked message instance.""" + self.topic = "metric_request" + self.key = None + self.value = json.dumps({"mock_message": "message_details"}) + + +class TestMetricReq(unittest.TestCase): + """Integration test for metric request keys.""" + + def setUp(self): + """Setup the tests for metric request keys.""" + super(TestMetricReq, self).setUp() + self.common = Common() + self.metrics = metric_req.Metrics() + + @mock.patch.object(Common, "_authenticate") + def test_access_cred_metric_auth(self, auth): + """Test authentication with access credentials.""" + message = Message() + + self.metrics.metric_calls(message, self.common, "my_auth_token") + + auth.assert_not_called + self.assertEqual(self.metrics.auth_token, "my_auth_token") + + @mock.patch.object(Common, "_authenticate") + def test_env_metric_auth(self, auth): + """Test authentication with environment variables.""" + message = Message() + + self.metrics.metric_calls(message, self.common, None) + + auth.assert_called_with() + + @mock.patch.object(metric_req.Metrics, "delete_metric") + def test_delete_metric_key(self, del_metric): + """Test the functionality for a delete metric request.""" + # Mock a message value and key + message = Message() + message.key = "delete_metric_request" + message.value = json.dumps({"metric_uuid": "my_metric_id"}) + + # Call the metric functionality and check delete request + self.metrics.metric_calls(message, self.common, "my_auth_token") + + del_metric.assert_called_with(mock.ANY, mock.ANY, "my_metric_id") + + @mock.patch.object(metric_req.Metrics, "list_metrics") + def test_list_metric_key(self, list_metrics): + """Test the functionality for a list metric request.""" + # Mock a message with list metric key and value + message = Message() + message.key = "list_metric_request" + message.value = json.dumps({"metrics_list_request": "metric_details"}) + + # Call the metric functionality and check list functionality + self.metrics.metric_calls(message, self.common, "my_auth_token") + list_metrics.assert_called_with(mock.ANY, mock.ANY, "metric_details") + + @mock.patch.object(metric_req.Metrics, "read_metric_data") + @mock.patch.object(metric_req.Metrics, "list_metrics") + @mock.patch.object(metric_req.Metrics, "delete_metric") + @mock.patch.object(metric_req.Metrics, "configure_metric") + def test_update_metric_key(self, config_metric, delete_metric, list_metrics, + read_data): + """Test the functionality for an update metric request.""" + # Mock a message with update metric key and value + message = Message() + message.key = "update_metric_request" + message.value = json.dumps({"metric_create": + {"metric_name": "my_metric", + "resource_uuid": "my_r_id"}}) + + # Call metric functionality and confirm no function is called + # Gnocchi does not support updating a metric configuration + self.metrics.metric_calls(message, self.common, "my_auth_token") + config_metric.assert_not_called + list_metrics.assert_not_called + delete_metric.assert_not_called + read_data.assert_not_called + + @mock.patch.object(metric_req.Metrics, "configure_metric") + def test_config_metric_key(self, config_metric): + """Test the functionality for a create metric request.""" + # Mock a message with create metric key and value + message = Message() + message.key = "create_metric_request" + message.value = json.dumps({"metric_create": "metric_details"}) + + # Call metric functionality and check config metric + config_metric.return_value = "metric_id", "resource_id", True + self.metrics.metric_calls(message, self.common, "my_auth_token") + config_metric.assert_called_with(mock.ANY, mock.ANY, "metric_details") + + @mock.patch.object(metric_req.Metrics, "read_metric_data") + def test_read_data_key(self, read_data): + """Test the functionality for a read metric data request.""" + # Mock a message with a read data key and value + message = Message() + message.key = "read_metric_data_request" + message.value = json.dumps({"alarm_uuid": "alarm_id"}) + + # Call metric functionality and check read data metrics + read_data.return_value = "time_stamps", "data_values" + self.metrics.metric_calls(message, self.common, "my_auth_token") + read_data.assert_called_with( + mock.ANY, mock.ANY, json.loads(message.value)) diff --git a/test/OpenStack/test_responses.py b/test/OpenStack/test_responses.py new file mode 100644 index 0000000..900d8fe --- /dev/null +++ b/test/OpenStack/test_responses.py @@ -0,0 +1,119 @@ +# Copyright 2017 iIntel Research and Development Ireland Limited +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Intel Corporation + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: helena.mcgough@intel.com or adrian.hoban@intel.com +## +"""Test that the correct responses are generated for each message.""" + +import logging + +import unittest + +import mock + +from plugins.OpenStack import response as resp + +__author__ = "Helena McGough" + +log = logging.getLogger(__name__) + + +class TestOpenStackResponse(unittest.TestCase): + """Tests for responses generated by the OpenStack plugins.""" + + def setUp(self): + """Setup for testing OpenStack plugin responses.""" + super(TestOpenStackResponse, self).setUp() + self.plugin_resp = resp.OpenStack_Response() + + def test_invalid_key(self): + """Test if an invalid key is entered for a response.""" + message = self.plugin_resp.generate_response("mock_invalid_key") + self.assertEqual(message, None) + + @mock.patch.object( + resp.OpenStack_Response, "alarm_list_response") + def test_list_alarm_resp(self, alarm_list_resp): + """Test out a function call for a list alarm response.""" + message = self.plugin_resp.generate_response("list_alarm_response") + self.assertEqual(alarm_list_resp.return_value, message) + + @mock.patch.object( + resp.OpenStack_Response, "list_metric_response") + def test_list_metric_resp(self, metric_list_resp): + """Test list metric response function call.""" + message = self.plugin_resp.generate_response("list_metric_response") + self.assertEqual(message, metric_list_resp.return_value) + + @mock.patch.object( + resp.OpenStack_Response, "delete_alarm_response") + def test_delete_alarm_resp(self, del_alarm_resp): + """Test delete alarm response function call.""" + message = self.plugin_resp.generate_response("delete_alarm_response") + self.assertEqual(message, del_alarm_resp.return_value) + + @mock.patch.object( + resp.OpenStack_Response, "delete_metric_response") + def test_delete_metric_resp(self, del_metric_resp): + """Test the response functionality of delete metric response.""" + message = self.plugin_resp.generate_response("delete_metric_response") + self.assertEqual(message, del_metric_resp.return_value) + + @mock.patch.object( + resp.OpenStack_Response, "create_alarm_response") + def test_create_alarm_resp(self, config_alarm_resp): + """Test create alarm response function call.""" + message = self.plugin_resp.generate_response("create_alarm_response") + self.assertEqual(message, config_alarm_resp.return_value) + + @mock.patch.object( + resp.OpenStack_Response, "metric_create_response") + def test_create_metric_resp(self, config_metric_resp): + """Test create metric response function call.""" + message = self.plugin_resp.generate_response("create_metric_response") + self.assertEqual(message, config_metric_resp.return_value) + + @mock.patch.object( + resp.OpenStack_Response, "update_alarm_response") + def test_update_alarm_resp(self, up_alarm_resp): + """Test update alarm response function call.""" + message = self.plugin_resp.generate_response("update_alarm_response") + self.assertEqual(message, up_alarm_resp.return_value) + + @mock.patch.object( + resp.OpenStack_Response, "update_metric_response") + def test_update_metric_resp(self, up_metric_resp): + """Test update metric response function call.""" + message = self.plugin_resp.generate_response("update_metric_response") + self.assertEqual(message, up_metric_resp.return_value) + + @mock.patch.object( + resp.OpenStack_Response, "notify_alarm") + def test_notify_alarm(self, notify_alarm): + """Test notify alarm response function call.""" + message = self.plugin_resp.generate_response("notify_alarm") + self.assertEqual(message, notify_alarm.return_value) + + @mock.patch.object( + resp.OpenStack_Response, "read_metric_data_response") + def test_read_metric_data_resp(self, read_data_resp): + """Test read metric data response function call.""" + message = self.plugin_resp.generate_response( + "read_metric_data_response") + self.assertEqual(message, read_data_resp.return_value) diff --git a/test/OpenStack/test_settings.py b/test/OpenStack/test_settings.py new file mode 100644 index 0000000..dcc1053 --- /dev/null +++ b/test/OpenStack/test_settings.py @@ -0,0 +1,59 @@ +# Copyright 2017 Intel Research and Development Ireland Limited +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Intel Corporation + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: helena.mcgough@intel.com or adrian.hoban@intel.com +## +"""Tests for settings for OpenStack plugins configurations.""" + +import logging + +import os + +import unittest + +import mock + +from plugins.OpenStack.settings import Config + +__author__ = "Helena McGough" + +log = logging.getLogger(__name__) + + +class TestSettings(unittest.TestCase): + """Test the settings class for OpenStack plugin configuration.""" + + def setUp(self): + """Test Setup.""" + super(TestSettings, self).setUp() + self.cfg = Config.instance() + + def test_set_os_username(self): + """Test reading the environment for OpenStack plugin configuration.""" + self.cfg.read_environ("my_service") + + self.assertEqual(self.cfg.OS_USERNAME, "my_service") + + @mock.patch.object(os, "environ") + def test_read_environ(self, environ): + """Test reading environment variables for configuration.""" + self.cfg.read_environ("my_service") + + # Called for each key in the configuration dictionary + environ.assert_called_once diff --git a/test/core/test_producer.py b/test/core/test_producer.py index fa2881b..5dc3caf 100644 --- a/test/core/test_producer.py +++ b/test/core/test_producer.py @@ -19,31 +19,27 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com ## +"""This is a KafkaProducer with a request function to test the plugins.""" -''' -This is a kafka producer with a common request function to test the plugins. -''' +import json +import logging as log +import os -from kafka import KafkaProducer as kaf -from kafka.errors import KafkaError -import logging as log -import json import jsmin -import os -import sys -from os import listdir -from jsmin import jsmin -import os.path as path +from kafka import KafkaProducer as kaf + +from kafka.errors import KafkaError class KafkaProducer(object): + """A KafkaProducer for testing purposes.""" def __init__(self, topic): - - self._topic= topic + """Initialize a KafkaProducer and it's topic.""" + self._topic = topic if "ZOOKEEPER_URI" in os.environ: broker = os.getenv("ZOOKEEPER_URI") @@ -56,34 +52,31 @@ class KafkaProducer(object): is already running. ''' - self.producer = kaf(key_serializer=str.encode, - value_serializer=lambda v: json.dumps(v).encode('ascii'), - bootstrap_servers=broker, api_version=(0,10)) - - + self.producer = kaf( + key_serializer=str.encode, + value_serializer=lambda v: json.dumps(v).encode('ascii'), + bootstrap_servers=broker, api_version=(0, 10)) def publish(self, key, value, topic): + """Send messages to the message bus with a defing key and topic.""" try: - future = self.producer.send(key=key, value=value,topic=topic) + future = self.producer.send(topic=topic, key=key, value=value) self.producer.flush() except Exception: log.exception("Error publishing to {} topic." .format(topic)) raise try: record_metadata = future.get(timeout=10) - #self._log.debug("TOPIC:", record_metadata.topic) - #self._log.debug("PARTITION:", record_metadata.partition) - #self._log.debug("OFFSET:", record_metadata.offset) + log.debug("TOPIC:", record_metadata.topic) + log.debug("PARTITION:", record_metadata.partition) + log.debug("OFFSET:", record_metadata.offset) except KafkaError: pass - json_path = path.abspath(path.join(os.getcwd(),"../..")) - - def request(self, path, key, message, topic): - #External to MON + def request(self, path, key, message, topic): + """Test json files are loaded and sent on the message bus.""" + # External to MON payload_create_alarm = jsmin(open(os.path.join(path)).read()) self.publish(key=key, - value = json.loads(payload_create_alarm), - topic=topic) - - + value=json.loads(payload_create_alarm), + topic=topic) diff --git a/tox.ini b/tox.ini index 17e4fab..37c4113 100644 --- a/tox.ini +++ b/tox.ini @@ -40,13 +40,7 @@ setenv = VIRTUAL_ENV={envdir} [testenv:pep8] -commands = flake8 test - -[testenv:venv] -commands = {posargs} - -[testenv:cover] -commands = python setup.py test --coverage +commands = flake8 plugins [pep8] max-line-length = 80