--- /dev/null
+# Copyright 2017 Intel Research and Development Ireland Limited
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Intel Corporation
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: helena.mcgough@intel.com or adrian.hoban@intel.com
+##
+*.py[cod]
+
+# C extensions
+*.so
+
+# log files
+*.log
+
+# Packages
+*.egg
+*.egg-info
+dist
+build
+.eggs
+eggs
+parts
+bin
+var
+sdist
+develop-eggs
+.installed.cfg
+lib
+lib64
+
+# Installer logs
+pip-log.txt
+
+# Unit test / coverage reports
+.coverage
+.tox
+nosetests.xml
+.testrepository
+.venv
+.cache
+
+# Translations
+*.mo
+
+# Complexity
+output/*.html
+output/*/index.html
+
+# Sphinx
+doc/build
+
+# pbr generates these
+AUTHORS
+ChangeLog
+
+# Editors
+*~
+.*.swp
+.*sw?
+.settings/
+.__pycache__/
--- /dev/null
+# Copyright 2017 Intel Research and Development Ireland Limited
+# *************************************************************
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you
+# may not use this file except in compliance with the License. You may
+# obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied. See the License for the specific language governing
+# permissions and limitations under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: helena.mcgough@intel.com or adrian.hoban@intel.com
+"""A common KafkaConsumer for all MON plugins."""
+
+import json
+import logging
+import sys
+
+sys.path.append("/root/MON")
+
+logging.basicConfig(filename='MON_plugins.log',
+ format='%(asctime)s %(message)s',
+ datefmt='%m/%d/%Y %I:%M:%S %p', filemode='a',
+ level=logging.INFO)
+log = logging.getLogger(__name__)
+
+from kafka import KafkaConsumer
+from kafka.errors import KafkaError
+
+from plugins.OpenStack.Aodh import alarming
+from plugins.OpenStack.common import Common
+from plugins.OpenStack.Gnocchi import metrics
+
+
+# Initialize servers
+server = {'server': 'localhost:9092'}
+
+# Initialize consumers for alarms and metrics
+common_consumer = KafkaConsumer(group_id='osm_mon',
+ bootstrap_servers=server['server'])
+
+# Create OpenStack alarming and metric instances
+auth_token = None
+openstack_auth = Common()
+openstack_metrics = metrics.Metrics()
+openstack_alarms = alarming.Alarming()
+
+
+def get_vim_type(message):
+ """Get the vim type that is required by the message."""
+ try:
+ return json.loads(message.value)["vim_type"].lower()
+ except Exception as exc:
+ log.warn("vim_type is not configured correctly; %s", exc)
+ return None
+
+# Define subscribe the consumer for the plugins
+topics = ['metric_request', 'alarm_request', 'access_credentials']
+common_consumer.subscribe(topics)
+
+try:
+ log.info("Listening for alarm_request and metric_request messages")
+ for message in common_consumer:
+ # Check the message topic
+ if message.topic == "metric_request":
+ # Check the vim desired by the message
+ vim_type = get_vim_type(message)
+ if vim_type == "openstack":
+ log.info("This message is for the OpenStack plugin.")
+ openstack_metrics.metric_calls(
+ message, openstack_auth, auth_token)
+
+ elif vim_type == "cloudwatch":
+ log.info("This message is for the CloudWatch plugin.")
+
+ elif vim_type == "vrops":
+ log.info("This message is for the vROPs plugin.")
+
+ else:
+ log.debug("vim_type is misconfigured or unsupported; %s",
+ vim_type)
+
+ elif message.topic == "alarm_request":
+ # Check the vim desired by the message
+ vim_type = get_vim_type(message)
+ if vim_type == "openstack":
+ log.info("This message is for the OpenStack plugin.")
+ openstack_alarms.alarming(message, openstack_auth, auth_token)
+
+ elif vim_type == "cloudwatch":
+ log.info("This message is for the CloudWatch plugin.")
+
+ elif vim_type == "vrops":
+ log.info("This message is for the vROPs plugin.")
+
+ else:
+ log.debug("vim_type is misconfigured or unsupported; %s",
+ vim_type)
+
+ elif message.topic == "access_credentials":
+ # Check the vim desired by the message
+ vim_type = get_vim_type(message)
+ if vim_type == "openstack":
+ log.info("This message is for the OpenStack plugin.")
+ auth_token = openstack_auth._authenticate(message=message)
+
+ elif vim_type == "cloudwatch":
+ log.info("This message is for the CloudWatch plugin.")
+
+ elif vim_type == "vrops":
+ log.info("This message is for the vROPs plugin.")
+
+ else:
+ log.debug("vim_type is misconfigured or unsupported; %s",
+ vim_type)
+
+ else:
+ log.info("This topic is not relevant to any of the MON plugins.")
+
+
+except KafkaError as exc:
+ log.warn("Exception: %s", exc)
'''
__author__ = "Prithiv Mohan"
-__date__ = "06/Sep/2017"
+__date__ = "06/Sep/2017"
from kafka import KafkaConsumer
import logging.config
import os
+
def logging_handler(filename, mode='a+', encoding=None):
if not os.path.exists(filename):
open(filename, 'a').close()
},
},
'handlers': {
- 'file':{
+ 'file': {
'()': logging_handler,
- 'level':'DEBUG',
+ 'level': 'DEBUG',
'formatter': 'default',
'filename': '/var/log/osm_mon.log',
'mode': 'a+',
else:
broker = "localhost:9092"
-alarm_consumer = KafkaConsumer('alarm_response', 'osm_mon', bootstrap_servers = broker)
-metric_consumer = KafkaConsumer('metric_response', 'osm_mon', bootstrap_servers = broker)
+alarm_consumer = KafkaConsumer(
+ 'alarm_response', 'osm_mon', bootstrap_servers=broker)
+metric_consumer = KafkaConsumer(
+ 'metric_response', 'osm_mon', bootstrap_servers=broker)
try:
for message in alarm_consumer:
logger.debug(message)
# For those usages not covered by the Apache License, Version 2.0 please
# contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
##
-
'''
This is a kafka producer app that interacts with the SO and the plugins of the
datacenters like OpenStack, VMWare, AWS.
'''
-__author__ = "Prithiv Mohan"
-__date__ = "06/Sep/2017"
-
-
from kafka import KafkaProducer as kaf
from kafka.errors import KafkaError
import logging
from os import listdir
from jsmin import jsmin
+__author__ = "Prithiv Mohan"
+__date__ = "06/Sep/2017"
+
json_path = os.path.join(os.pardir+"/models/")
def __init__(self, topic):
- self._topic= topic
+ self._topic = topic
if "BROKER_URI" in os.environ:
broker = os.getenv("BROKER_URI")
is already running.
'''
- self.producer = kaf(key_serializer=str.encode,
- value_serializer=lambda v: json.dumps(v).encode('ascii'),
- bootstrap_servers=broker, api_version=(0,10))
-
+ self.producer = kaf(
+ key_serializer=str.encode,
+ value_serializer=lambda v: json.dumps(v).encode('ascii'),
+ bootstrap_servers=broker, api_version=(0, 10))
def publish(self, key, value, topic=None):
try:
def create_alarm_request(self, key, message, topic):
- #External to MON
+ # External to MON
- payload_create_alarm = jsmin(open(os.path.join(json_path,
- 'create_alarm.json')).read())
+ payload_create_alarm = jsmin(
+ open(os.path.join(json_path, 'create_alarm.json')).read())
self.publish(key,
- value=json.dumps(payload_create_alarm),
- topic='alarm_request')
+ value=json.dumps(payload_create_alarm),
+ topic='alarm_request')
def create_alarm_response(self, key, message, topic):
- #Internal to MON
+ # Internal to MON
- payload_create_alarm_resp = jsmin(open(os.path.join(json_path,
- 'create_alarm_resp.json')).read())
+ payload_create_alarm_resp = jsmin(
+ open(os.path.join(json_path, 'create_alarm_resp.json')).read())
self.publish(key,
- value = message,
- topic = 'alarm_response')
+ value=message,
+ topic='alarm_response')
def acknowledge_alarm(self, key, message, topic):
- #Internal to MON
+ # Internal to MON
- payload_acknowledge_alarm = jsmin(open(os.path.join(json_path,
- 'acknowledge_alarm.json')).read())
+ payload_acknowledge_alarm = jsmin(
+ open(os.path.join(json_path, 'acknowledge_alarm.json')).read())
self.publish(key,
- value = json.dumps(payload_acknowledge_alarm),
- topic = 'alarm_request')
+ value=json.dumps(payload_acknowledge_alarm),
+ topic='alarm_request')
def list_alarm_request(self, key, message, topic):
- #External to MON
+ # External to MON
- payload_alarm_list_req = jsmin(open(os.path.join(json_path,
- 'list_alarm_req.json')).read())
+ payload_alarm_list_req = jsmin(
+ open(os.path.join(json_path, 'list_alarm_req.json')).read())
self.publish(key,
- value=json.dumps(payload_alarm_list_req),
- topic='alarm_request')
+ value=json.dumps(payload_alarm_list_req),
+ topic='alarm_request')
def notify_alarm(self, key, message, topic):
- payload_notify_alarm = jsmin(open(os.path.join(json_path,
- 'notify_alarm.json')).read())
+ payload_notify_alarm = jsmin(
+ open(os.path.join(json_path, 'notify_alarm.json')).read())
self.publish(key,
- value=message,
- topic='alarm_response')
+ value=message,
+ topic='alarm_response')
def list_alarm_response(self, key, message, topic):
- payload_list_alarm_resp = jsmin(open(os.path.join(json_path,
- 'list_alarm_resp.json')).read())
+ payload_list_alarm_resp = jsmin(
+ open(os.path.join(json_path, 'list_alarm_resp.json')).read())
self.publish(key,
- value=message,
- topic='alarm_response')
-
+ value=message,
+ topic='alarm_response')
def update_alarm_request(self, key, message, topic):
- # External to Mon
+ # External to Mon
- payload_update_alarm_req = jsmin(open(os.path.join(json_path,
- 'update_alarm_req.json')).read())
+ payload_update_alarm_req = jsmin(
+ open(os.path.join(json_path, 'update_alarm_req.json')).read())
self.publish(key,
- value=json.dumps(payload_update_alarm_req),
- topic='alarm_request')
-
+ value=json.dumps(payload_update_alarm_req),
+ topic='alarm_request')
def update_alarm_response(self, key, message, topic):
- # Internal to Mon
+ # Internal to Mon
- payload_update_alarm_resp = jsmin(open(os.path.join(json_path,
- 'update_alarm_resp.json')).read())
+ payload_update_alarm_resp = jsmin(
+ open(os.path.join(json_path, 'update_alarm_resp.json')).read())
self.publish(key,
- value=message,
- topic='alarm_response')
-
+ value=message,
+ topic='alarm_response')
def delete_alarm_request(self, key, message, topic):
- # External to Mon
+ # External to Mon
- payload_delete_alarm_req = jsmin(open(os.path.join(json_path,
- 'delete_alarm_req.json')).read())
+ payload_delete_alarm_req = jsmin(
+ open(os.path.join(json_path, 'delete_alarm_req.json')).read())
self.publish(key,
- value=json.dumps(payload_delete_alarm_req),
- topic='alarm_request')
+ value=json.dumps(payload_delete_alarm_req),
+ topic='alarm_request')
def delete_alarm_response(self, key, message, topic):
- # Internal to Mon
+ # Internal to Mon
- payload_delete_alarm_resp = jsmin(open(os.path.join(json_path,
- 'delete_alarm_resp.json')).read())
+ payload_delete_alarm_resp = jsmin(
+ open(os.path.join(json_path, 'delete_alarm_resp.json')).read())
self.publish(key,
- value=message,
- topic='alarm_response')
-
-
+ value=message,
+ topic='alarm_response')
def create_metrics_request(self, key, message, topic):
# External to Mon
- payload_create_metrics_req = jsmin(open(os.path.join(json_path,
- 'create_metric_req.json')).read())
+ payload_create_metrics_req = jsmin(
+ open(os.path.join(json_path, 'create_metric_req.json')).read())
self.publish(key,
- value=json.dumps(payload_create_metrics_req),
- topic='metric_request')
-
+ value=json.dumps(payload_create_metrics_req),
+ topic='metric_request')
def create_metrics_resp(self, key, message, topic):
# Internal to Mon
- payload_create_metrics_resp = jsmin(open(os.path.join(json_path,
- 'create_metric_resp.json')).read())
+ payload_create_metrics_resp = jsmin(
+ open(os.path.join(json_path, 'create_metric_resp.json')).read())
self.publish(key,
- value=message,
- topic='metric_response')
-
+ value=message,
+ topic='metric_response')
def read_metric_data_request(self, key, message, topic):
# External to Mon
- payload_read_metric_data_request = jsmin(open(os.path.join(json_path,
- 'read_metric_data_req.json')).read())
+ payload_read_metric_data_request = jsmin(
+ open(os.path.join(json_path, 'read_metric_data_req.json')).read())
self.publish(key,
- value=json.dumps(payload_read_metric_data_request),
- topic='metric_request')
-
+ value=json.dumps(payload_read_metric_data_request),
+ topic='metric_request')
def read_metric_data_response(self, key, message, topic):
# Internal to Mon
- payload_metric_data_response = jsmin(open(os.path.join(json_path,
- 'read_metric_data_resp.json')).read())
+ payload_metric_data_response = jsmin(
+ open(os.path.join(json_path, 'read_metric_data_resp.json')).read())
self.publish(key,
- value=message,
- topic='metric_response')
-
+ value=message,
+ topic='metric_response')
def list_metric_request(self, key, message, topic):
- #External to MON
+ # External to MON
- payload_metric_list_req = jsmin(open(os.path.join(json_path,
- 'list_metric_req.json')).read())
+ payload_metric_list_req = jsmin(
+ open(os.path.join(json_path, 'list_metric_req.json')).read())
self.publish(key,
- value=json.dumps(payload_metric_list_req),
- topic='metric_request')
+ value=json.dumps(payload_metric_list_req),
+ topic='metric_request')
def list_metric_response(self, key, message, topic):
- #Internal to MON
+ # Internal to MON
- payload_metric_list_resp = jsmin(open(os.path.join(json_path,
- 'list_metrics_resp.json')).read())
+ payload_metric_list_resp = jsmin(
+ open(os.path.join(json_path, 'list_metrics_resp.json')).read())
self.publish(key,
- value=message,
- topic='metric_response')
-
+ value=message,
+ topic='metric_response')
def delete_metric_request(self, key, message, topic):
- # External to Mon
+ # External to Mon
- payload_delete_metric_req = jsmin(open(os.path.join(json_path,
- 'delete_metric_req.json')).read())
+ payload_delete_metric_req = jsmin(
+ open(os.path.join(json_path, 'delete_metric_req.json')).read())
self.publish(key,
- value=json.dumps(payload_delete_metric_req),
- topic='metric_request')
-
+ value=json.dumps(payload_delete_metric_req),
+ topic='metric_request')
def delete_metric_response(self, key, message, topic):
- # Internal to Mon
+ # Internal to Mon
- payload_delete_metric_resp = jsmin(open(os.path.join(json_path,
- 'delete_metric_resp.json')).read())
+ payload_delete_metric_resp = jsmin(
+ open(os.path.join(json_path, 'delete_metric_resp.json')).read())
self.publish(key,
- value=message,
- topic='metric_response')
-
+ value=message,
+ topic='metric_response')
def update_metric_request(self, key, message, topic):
# External to Mon
- payload_update_metric_req = jsmin(open(os.path.join(json_path,
- 'update_metric_req.json')).read())
+ payload_update_metric_req = jsmin(
+ open(os.path.join(json_path, 'update_metric_req.json')).read())
self.publish(key,
- value=json.dumps(payload_update_metric_req),
- topic='metric_request')
-
+ value=json.dumps(payload_update_metric_req),
+ topic='metric_request')
def update_metric_response(self, key, message, topic):
# Internal to Mon
- payload_update_metric_resp = jsmin(open(os.path.join(json_path,
- 'update_metric_resp.json')).read())
+ payload_update_metric_resp = jsmin(
+ open(os.path.join(json_path, 'update_metric_resp.json')).read())
self.publish(key,
- value=message,
- topic='metric_response')
+ value=message,
+ topic='metric_response')
def access_credentials(self, key, message, topic):
- payload_access_credentials = jsmin(open(os.path.join(json_path,
- 'access_credentials.json')).read())
+ payload_access_credentials = jsmin(
+ open(os.path.join(json_path, 'access_credentials.json')).read())
self.publish(key,
- value=json.dumps(payload_access_credentials),
- topic='access_credentials')
+ value=json.dumps(payload_access_credentials),
+ topic='access_credentials')
--- /dev/null
+/* Copyright© 2017 Intel Research and Development Ireland Limited
+ # This file is part of OSM Monitoring module
+ # All Rights Reserved to Intel Corporation
+
+ # Licensed under the Apache License, Version 2.0 (the "License"); you may
+ # not use this file except in compliance with the License. You may obtain
+ # a copy of the License at
+
+ # http://www.apache.org/licenses/LICENSE-2.0
+
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ # License for the specific language governing permissions and limitations
+ # under the License.
+
+ # For those usages not covered by the Apache License, Version 2.0 please
+ # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
+
+ # This is the message bus schema for CloudWatch access credentials */
+
+
+{
+ "schema_version": { "type": "string" },
+ "schema_type": { "type": "string" },
+ "vim_type": { "type": "string" },
+ "access_config":
+ {
+ "aws_site": { "type": "string" },
+ "user": { "type": "string" },
+ "password": { "type": "string" },
+ "vim_tenant_name": { "type": "string" }
+ },
+ "required": [ "schema_version",
+ "schema_type",
+ "vim_type",
+ "aws_site",
+ "user",
+ "password",
+ "vim_tenant_name" ]
+}
--- /dev/null
+/* Copyright© 2017 Intel Research and Development Ireland Limited
+ # This file is part of OSM Monitoring module
+ # All Rights Reserved to Intel Corporation
+
+ # Licensed under the Apache License, Version 2.0 (the "License"); you may
+ # not use this file except in compliance with the License. You may obtain
+ # a copy of the License at
+
+ # http://www.apache.org/licenses/LICENSE-2.0
+
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ # License for the specific language governing permissions and limitations
+ # under the License.
+
+ # For those usages not covered by the Apache License, Version 2.0 please
+ # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
+
+ # This is the message bus schema for OpenStack access credentials */
+
+
+{
+ "schema_version": { "type": "string" },
+ "schema_type": { "type": "string" },
+ "vim_type": { "type": "string" },
+ "access_config":
+ {
+ "openstack_site": { "type" : "string" },
+ "user": { "type": "string" },
+ "password": { "type": "string" },
+ "vim_tenant_name": { "type": "string" }
+ },
+ "required": [ "schema_version",
+ "schema_type",
+ "vim_type",
+ "openstack_site",
+ "user",
+ "password",
+ "vim_tenant_name" ]
+}
--- /dev/null
+/* Copyright© 2017 Intel Research and Development Ireland Limited
+ # This file is part of OSM Monitoring module
+ # All Rights Reserved to Intel Corporation
+
+ # Licensed under the Apache License, Version 2.0 (the "License"); you may
+ # not use this file except in compliance with the License. You may obtain
+ # a copy of the License at
+
+ # http://www.apache.org/licenses/LICENSE-2.0
+
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ # License for the specific language governing permissions and limitations
+ # under the License.
+
+ # For those usages not covered by the Apache License, Version 2.0 please
+ # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
+
+ # This is the message bus schema for vROPs access credentials */
+
+
+{
+ "schema_version": { "type": "string" },
+ "schema_type": { "type": "string" },
+ "vim_type": { "type": "string" },
+ "access_config":
+ {
+ "vrops_site": { "type": "string" },
+ "vrops_user": { "type": "string" },
+ "vrops_password": { "type": "string" },
+ "vcloud_site": { "type": "string" },
+ "admin_username": { "type": "string" },
+ "admin_password": { "type": "string" },
+ "nsx_manager": { "type": "string" },
+ "nsx_user": { "type": "string" },
+ "nsx_password": { "type": "string" },
+ "vcenter_ip": { "type": "string" },
+ "vcenter_port": { "type": "string" },
+ "vcenter_user": { "type": "string" },
+ "vcenter_password": { "type": "string" },
+ "vim_tenant_name": { "type": "string" },
+ "orgname": { "type": "string" }
+ },
+ "required": [ "schema_version",
+ "schema_type",
+ "vim_type",
+ "vrops_site",
+ "vrops_user",
+ "vrops_password",
+ "vcloud_site",
+ "admin_username",
+ "admin_password",
+ "vcenter_ip",
+ "vcenter_port",
+ "vcenter_user",
+ "vcenter_password",
+ "vim_tenant_name",
+ "orgname" ]
+}
These documents will also describe what alarming and monitoring functionality
the plugins support.
-* To run the Gnocchi plugin run the following command:
+* The Gnocchi and Aodh plugins work from a common KafkaConsumer that checks for
+ the appropriate topics and keys. To run this consumer:
::
- lxc exec MON - python /root/MON/plugins/OpenStack/Gnocchi/plugin_instance.py
-
-* To run the Aodh plugin run the following command:
-
- ::
-
- lxc exec MON - python /root/MON/plugins/OpenStack/Aodh/plugin_instance.py
+ lxc exec MON - python /root/MON/core/message_bus/common_consumer.py
CloudWatch
~~~~~~~~~~
# For those usages not covered by the Apache License, Version 2.0 please
# contact: helena.mcgough@intel.com or adrian.hoban@intel.com
##
+"""Aodh plugin for OSM MON."""
"""Carry out alarming requests via Aodh API."""
import json
+
import logging
-log = logging.getLogger(__name__)
from core.message_bus.producer import KafkaProducer
-from kafka import KafkaConsumer
-
-from plugins.OpenStack.common import Common
from plugins.OpenStack.response import OpenStack_Response
+from plugins.OpenStack.settings import Config
__author__ = "Helena McGough"
+log = logging.getLogger(__name__)
+
ALARM_NAMES = {
"average_memory_usage_above_threshold": "average_memory_utilization",
"disk_read_ops": "disk_read_ops",
def __init__(self):
"""Create the OpenStack alarming instance."""
- self._common = Common()
+ # Initialize configuration and notifications
+ config = Config.instance()
+ config.read_environ("aodh")
- # TODO(mcgoughh): Remove hardcoded kafkaconsumer
- # Initialize a generic consumer object to consume message from the SO
- server = {'server': 'localhost:9092', 'topic': 'alarm_request'}
- self._consumer = KafkaConsumer(server['topic'],
- group_id='osm_mon',
- bootstrap_servers=server['server'])
+ # Initialise authentication for API requests
+ self.auth_token = None
+ self.endpoint = None
+ self.common = None
# Use the Response class to generate valid json response messages
self._response = OpenStack_Response()
# Initializer a producer to send responses back to SO
self._producer = KafkaProducer("alarm_response")
- def alarming(self):
+ def alarming(self, message, common, auth_token):
"""Consume info from the message bus to manage alarms."""
- # Check the alarming functionlity that needs to be performed
- for message in self._consumer:
-
- values = json.loads(message.value)
- vim_type = values['vim_type'].lower()
-
- if vim_type == "openstack":
- log.info("Alarm action required: %s" % (message.topic))
-
- # Generate and auth_token and endpoint for request
- auth_token, endpoint = self.authenticate()
-
- if message.key == "create_alarm_request":
- # Configure/Update an alarm
- alarm_details = values['alarm_create_request']
-
- alarm_id, alarm_status = self.configure_alarm(
- endpoint, auth_token, alarm_details)
-
- # Generate a valid response message, send via producer
- try:
- if alarm_status is True:
- log.info("Alarm successfully created")
-
- resp_message = self._response.generate_response(
- 'create_alarm_response', status=alarm_status,
- alarm_id=alarm_id,
- cor_id=alarm_details['correlation_id'])
- log.info("Response Message: %s", resp_message)
- self._producer.create_alarm_response(
- 'create_alarm_resonse', resp_message,
- 'alarm_response')
- except Exception as exc:
- log.warn("Response creation failed: %s", exc)
-
- elif message.key == "list_alarm_request":
- # Check for a specifed: alarm_name, resource_uuid, severity
- # and generate the appropriate list
- list_details = values['alarm_list_request']
-
- alarm_list = self.list_alarms(
- endpoint, auth_token, list_details)
-
- try:
- # Generate and send a list response back
- resp_message = self._response.generate_response(
- 'list_alarm_response', alarm_list=alarm_list,
- cor_id=list_details['correlation_id'])
- log.info("Response Message: %s", resp_message)
- self._producer.list_alarm_response(
- 'list_alarm_response', resp_message,
- 'alarm_response')
- except Exception as exc:
- log.warn("Failed to send a valid response back.")
-
- elif message.key == "delete_alarm_request":
- request_details = values['alarm_delete_request']
- alarm_id = request_details['alarm_uuid']
-
- resp_status = self.delete_alarm(
- endpoint, auth_token, alarm_id)
-
- # Generate and send a response message
- try:
- resp_message = self._response.generate_response(
- 'delete_alarm_response', alarm_id=alarm_id,
- status=resp_status,
- cor_id=request_details['correlation_id'])
- log.info("Response message: %s", resp_message)
- self._producer.delete_alarm_response(
- 'delete_alarm_response', resp_message,
- 'alarm_response')
- except Exception as exc:
- log.warn("Failed to create delete reponse:%s", exc)
-
- elif message.key == "acknowledge_alarm":
- # Acknowledge that an alarm has been dealt with by the SO
- alarm_id = values['ack_details']['alarm_uuid']
-
- response = self.update_alarm_state(
- endpoint, auth_token, alarm_id)
-
- # Log if an alarm was reset
- if response is True:
- log.info("Acknowledged the alarm and cleared it.")
- else:
- log.warn("Failed to acknowledge/clear the alarm.")
-
- elif message.key == "update_alarm_request":
- # Update alarm configurations
- alarm_details = values['alarm_update_request']
-
- alarm_id, status = self.update_alarm(
- endpoint, auth_token, alarm_details)
-
- # Generate a response for an update request
- try:
- resp_message = self._response.generate_response(
- 'update_alarm_response', alarm_id=alarm_id,
- cor_id=alarm_details['correlation_id'],
- status=status)
- log.info("Response message: %s", resp_message)
- self._producer.update_alarm_response(
- 'update_alarm_response', resp_message,
- 'alarm_response')
- except Exception as exc:
- log.warn("Failed to send an update response:%s", exc)
+ values = json.loads(message.value)
+ self.common = common
- else:
- log.debug("Unknown key, no action will be performed")
+ log.info("OpenStack alarm action required.")
+
+ # Generate and auth_token and endpoint for request
+ if auth_token is not None:
+ if self.auth_token != auth_token:
+ log.info("Auth_token for alarming set by access_credentials.")
+ self.auth_token = auth_token
else:
- log.info("Message topic not relevant to this plugin: %s",
- message.topic)
+ log.info("Auth_token has not been updated.")
+ else:
+ log.info("Using environment variables to set auth_token for Aodh.")
+ self.auth_token = self.common._authenticate()
+
+ if self.endpoint is None:
+ log.info("Generating a new endpoint for Aodh.")
+ self.endpoint = self.common.get_endpoint("alarming")
+
+ if message.key == "create_alarm_request":
+ # Configure/Update an alarm
+ alarm_details = values['alarm_create_request']
+
+ alarm_id, alarm_status = self.configure_alarm(
+ self.endpoint, self.auth_token, alarm_details)
+
+ # Generate a valid response message, send via producer
+ try:
+ if alarm_status is True:
+ log.info("Alarm successfully created")
+
+ resp_message = self._response.generate_response(
+ 'create_alarm_response', status=alarm_status,
+ alarm_id=alarm_id,
+ cor_id=alarm_details['correlation_id'])
+ log.info("Response Message: %s", resp_message)
+ self._producer.create_alarm_response(
+ 'create_alarm_resonse', resp_message,
+ 'alarm_response')
+ except Exception as exc:
+ log.warn("Response creation failed: %s", exc)
+
+ elif message.key == "list_alarm_request":
+ # Check for a specifed: alarm_name, resource_uuid, severity
+ # and generate the appropriate list
+ list_details = values['alarm_list_request']
+
+ alarm_list = self.list_alarms(
+ self.endpoint, self.auth_token, list_details)
+
+ try:
+ # Generate and send a list response back
+ resp_message = self._response.generate_response(
+ 'list_alarm_response', alarm_list=alarm_list,
+ cor_id=list_details['correlation_id'])
+ log.info("Response Message: %s", resp_message)
+ self._producer.list_alarm_response(
+ 'list_alarm_response', resp_message,
+ 'alarm_response')
+ except Exception as exc:
+ log.warn("Failed to send a valid response back.")
+
+ elif message.key == "delete_alarm_request":
+ request_details = values['alarm_delete_request']
+ alarm_id = request_details['alarm_uuid']
+
+ resp_status = self.delete_alarm(
+ self.endpoint, self.auth_token, alarm_id)
+
+ # Generate and send a response message
+ try:
+ resp_message = self._response.generate_response(
+ 'delete_alarm_response', alarm_id=alarm_id,
+ status=resp_status,
+ cor_id=request_details['correlation_id'])
+ log.info("Response message: %s", resp_message)
+ self._producer.delete_alarm_response(
+ 'delete_alarm_response', resp_message,
+ 'alarm_response')
+ except Exception as exc:
+ log.warn("Failed to create delete reponse:%s", exc)
+
+ elif message.key == "acknowledge_alarm":
+ # Acknowledge that an alarm has been dealt with by the SO
+ alarm_id = values['ack_details']['alarm_uuid']
+
+ response = self.update_alarm_state(
+ self.endpoint, self.auth_token, alarm_id)
+
+ # Log if an alarm was reset
+ if response is True:
+ log.info("Acknowledged the alarm and cleared it.")
+ else:
+ log.warn("Failed to acknowledge/clear the alarm.")
+
+ elif message.key == "update_alarm_request":
+ # Update alarm configurations
+ alarm_details = values['alarm_update_request']
+
+ alarm_id, status = self.update_alarm(
+ self.endpoint, self.auth_token, alarm_details)
+
+ # Generate a response for an update request
+ try:
+ resp_message = self._response.generate_response(
+ 'update_alarm_response', alarm_id=alarm_id,
+ cor_id=alarm_details['correlation_id'],
+ status=status)
+ log.info("Response message: %s", resp_message)
+ self._producer.update_alarm_response(
+ 'update_alarm_response', resp_message,
+ 'alarm_response')
+ except Exception as exc:
+ log.warn("Failed to send an update response:%s", exc)
+
+ else:
+ log.debug("Unknown key, no action will be performed")
return
# Create the alarm if metric is available
payload = self.check_payload(values, metric_name, resource_id,
alarm_name)
- new_alarm = self._common._perform_request(
+ new_alarm = self.common._perform_request(
url, auth_token, req_type="post", payload=payload)
return json.loads(new_alarm.text)['alarm_id'], True
else:
url = "{}/v2/alarms/%s".format(endpoint) % (alarm_id)
try:
- result = self._common._perform_request(
+ result = self.common._perform_request(
url, auth_token, req_type="delete")
if str(result.status_code) == "404":
log.info("Alarm doesn't exist: %s", result.status_code)
a_list, name_list, sev_list, res_list = [], [], [], []
# TODO(mcgoughh): for now resource_id is a mandatory field
- resource = list_details['resource_uuid']
+ # Check for a reqource is
+ try:
+ resource = list_details['resource_uuid']
+ except KeyError as exc:
+ log.warn("Resource id not specified for list request: %s", exc)
+ return None
# Checking what fields are specified for a list request
try:
# Perform the request to get the desired list
try:
- result = self._common._perform_request(
+ result = self.common._perform_request(
url, auth_token, req_type="get")
if result is not None:
payload = json.dumps("ok")
try:
- self._common._perform_request(
+ self.common._perform_request(
url, auth_token, req_type="put", payload=payload)
return True
except Exception as exc:
# Gets current configurations about the alarm
try:
- result = self._common._perform_request(
+ result = self.common._perform_request(
url, auth_token, req_type="get")
alarm_name = json.loads(result.text)['name']
rule = json.loads(result.text)['gnocchi_resources_threshold_rule']
# Updates the alarm configurations with the valid payload
if payload is not None:
try:
- update_alarm = self._common._perform_request(
+ update_alarm = self.common._perform_request(
url, auth_token, req_type="put", payload=payload)
return json.loads(update_alarm.text)['alarm_id'], True
log.warn("Alarm is not configured correctly: %s", exc)
return None
- def authenticate(self):
- """Generate an authentication token and endpoint for alarm request."""
- try:
- # Check for a tenant_id
- auth_token = self._common._authenticate()
- endpoint = self._common.get_endpoint("alarming")
- return auth_token, endpoint
- except Exception as exc:
- log.warn("Authentication to Keystone failed:%s", exc)
- return None, None
-
def get_alarm_state(self, endpoint, auth_token, alarm_id):
"""Get the state of the alarm."""
url = "{}/v2/alarms/%s/state".format(endpoint) % alarm_id
try:
- alarm_state = self._common._perform_request(
+ alarm_state = self.common._perform_request(
url, auth_token, req_type="get")
return json.loads(alarm_state.text)
except Exception as exc:
def check_for_metric(self, auth_token, m_name, r_id):
"""Check for the alarm metric."""
try:
- endpoint = self._common.get_endpoint("metric")
+ endpoint = self.common.get_endpoint("metric")
url = "{}/v1/metric/".format(endpoint)
- metric_list = self._common._perform_request(
+ metric_list = self.common._perform_request(
url, auth_token, req_type="get")
for metric in json.loads(metric_list.text):
+++ /dev/null
-# Copyright 2017 Intel Research and Development Ireland Limited
-# *************************************************************
-
-# This file is part of OSM Monitoring module
-# All Rights Reserved to Intel Corporation
-
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-# For those usages not covered by the Apache License, Version 2.0 please
-# contact: helena.mcgough@intel.com or adrian.hoban@intel.com
-##
-"""Aodh plugin for the OSM monitoring module."""
-
-import logging
-import sys
-
-sys.path.append("MON/")
-
-logging.basicConfig(filename='aodh_MON.log', format='%(asctime)s %(message)s',
- datefmt='%m/%d/%Y %I:%M:%S %p', filemode='a',
- level=logging.INFO)
-log = logging.getLogger(__name__)
-
-
-try:
- import aodhclient
-except ImportError:
- log.warn("Failed to import the aodhclient")
-
-from plugins.OpenStack.Aodh.alarming import Alarming
-from plugins.OpenStack.settings import Config
-
-__author__ = "Helena McGough"
-
-
-def register_plugin():
- """Register the plugin."""
- # Initialize configuration and notifications
- config = Config.instance()
-
- # Intialize plugin
- instance = Plugin(config=config)
- instance.config()
- instance.alarm()
-
-
-class Plugin(object):
- """Aodh plugin for OSM MON."""
-
- def __init__(self, config):
- """Plugin instance."""
- log.info("Initialze the plugin instance.")
- self._config = config
- self._alarming = Alarming()
-
- def config(self):
- """Configure plugin."""
- log.info("Configure the plugin instance.")
- self._config.read_environ("aodh")
-
- def alarm(self):
- """Allow alarm info to be received from Aodh."""
- log.info("Begin alarm functionality.")
- self._alarming.alarming()
-
-if aodhclient:
- register_plugin()
# For those usages not covered by the Apache License, Version 2.0 please
# contact: helena.mcgough@intel.com or adrian.hoban@intel.com
##
+"""Gnocchi plugin for OSM MON."""
import datetime
import json
import logging
-log = logging.getLogger(__name__)
import time
from core.message_bus.producer import KafkaProducer
-from kafka import KafkaConsumer
-
-from plugins.OpenStack.common import Common
from plugins.OpenStack.response import OpenStack_Response
+from plugins.OpenStack.settings import Config
__author__ = "Helena McGough"
+log = logging.getLogger(__name__)
+
METRIC_MAPPINGS = {
"average_memory_utilization": "memory.percent",
"disk_read_ops": "disk.disk_ops",
def __init__(self):
"""Initialize the metric actions."""
- self._common = Common()
+ # Configure an instance of the OpenStack metric plugin
+ config = Config.instance()
+ config.read_environ("gnocchi")
- # TODO(mcgoughh): Initialize a generic consumer object to consume
- # message from the SO. This is hardcoded for now
- server = {'server': 'localhost:9092', 'topic': 'metric_request'}
- self._consumer = KafkaConsumer(server['topic'],
- group_id='osm_mon',
- bootstrap_servers=server['server'])
+ # Initialise authentication for API requests
+ self.auth_token = None
+ self.endpoint = None
+ self._common = None
# Use the Response class to generate valid json response messages
self._response = OpenStack_Response()
# Initializer a producer to send responses back to SO
self._producer = KafkaProducer("metric_response")
- def metric_calls(self):
+ def metric_calls(self, message, common, auth_token):
"""Consume info from the message bus to manage metric requests."""
- # Consumer check for metric messages
- for message in self._consumer:
- # Check if this plugin should carry out this request
- values = json.loads(message.value)
- vim_type = values['vim_type'].lower()
-
- if vim_type == "openstack":
- # Generate auth_token and endpoint
- auth_token, endpoint = self.authenticate()
-
- if message.key == "create_metric_request":
- # Configure metric
- metric_details = values['metric_create']
- metric_id, resource_id, status = self.configure_metric(
- endpoint, auth_token, metric_details)
-
- # Generate and send a create metric response
- try:
- resp_message = self._response.generate_response(
- 'create_metric_response', status=status,
- cor_id=values['correlation_id'],
- metric_id=metric_id, r_id=resource_id)
- log.info("Response messages: %s", resp_message)
- self._producer.create_metrics_resp(
- 'create_metric_response', resp_message,
- 'metric_response')
- except Exception as exc:
- log.warn("Failed to create response: %s", exc)
-
- elif message.key == "read_metric_data_request":
- # Read all metric data related to a specified metric
- timestamps, metric_data = self.read_metric_data(
- endpoint, auth_token, values)
-
- # Generate and send a response message
- try:
- resp_message = self._response.generate_response(
- 'read_metric_data_response',
- m_id=values['metric_uuid'],
- m_name=values['metric_name'],
- r_id=values['resource_uuid'],
- cor_id=values['correlation_id'],
- times=timestamps, metrics=metric_data)
- log.info("Response message: %s", resp_message)
- self._producer.read_metric_data_response(
- 'read_metric_data_response', resp_message,
- 'metric_response')
- except Exception as exc:
- log.warn("Failed to send read metric response:%s", exc)
-
- elif message.key == "delete_metric_request":
- # delete the specified metric in the request
- metric_id = values['metric_uuid']
- status = self.delete_metric(
- endpoint, auth_token, metric_id)
-
- # Generate and send a response message
- try:
- resp_message = self._response.generate_response(
- 'delete_metric_response', m_id=metric_id,
- m_name=values['metric_name'],
- status=status, r_id=values['resource_uuid'],
- cor_id=values['correlation_id'])
- log.info("Response message: %s", resp_message)
- self._producer.delete_metric_response(
- 'delete_metric_response', resp_message,
- 'metric_response')
- except Exception as exc:
- log.warn("Failed to send delete response:%s", exc)
-
- elif message.key == "update_metric_request":
- # Gnocchi doesn't support configuration updates
- # Log and send a response back to this effect
- log.warn("Gnocchi doesn't support metric configuration\
- updates.")
- req_details = values['metric_create']
- metric_name = req_details['metric_name']
- resource_id = req_details['resource_uuid']
- metric_id = self.get_metric_id(
- endpoint, auth_token, metric_name, resource_id)
-
- # Generate and send a response message
- try:
- resp_message = self._response.generate_response(
- 'update_metric_response', status=False,
- cor_id=values['correlation_id'],
- r_id=resource_id, m_id=metric_id)
- log.info("Response message: %s", resp_message)
- self._producer.update_metric_response(
- 'update_metric_response', resp_message,
- 'metric_response')
- except Exception as exc:
- log.warn("Failed to send an update response:%s", exc)
-
- elif message.key == "list_metric_request":
- list_details = values['metrics_list_request']
-
- metric_list = self.list_metrics(
- endpoint, auth_token, list_details)
-
- # Generate and send a response message
- try:
- resp_message = self._response.generate_response(
- 'list_metric_response', m_list=metric_list,
- cor_id=list_details['correlation_id'])
- log.info("Response message: %s", resp_message)
- self._producer.list_metric_response(
- 'list_metric_response', resp_message,
- 'metric_response')
- except Exception as exc:
- log.warn("Failed to send a list response:%s", exc)
-
- else:
- log.warn("Unknown key, no action will be performed.")
+ values = json.loads(message.value)
+ self._common = common
+ log.info("OpenStack metric action required.")
+
+ # Generate and auth_token and endpoint for request
+ if auth_token is not None:
+ if self.auth_token != auth_token:
+ log.info("Auth_token for metrics set by access_credentials.")
+ self.auth_token = auth_token
else:
- log.debug("Message is not for this OpenStack.")
+ log.info("Auth_token has not been updated.")
+ else:
+ log.info("Using environment variables to set Gnocchi auth_token.")
+ self.auth_token = self._common._authenticate()
+
+ if self.endpoint is None:
+ log.info("Generating a new endpoint for Gnocchi.")
+ self.endpoint = self._common.get_endpoint("metric")
+
+ if message.key == "create_metric_request":
+ # Configure metric
+ metric_details = values['metric_create']
+ metric_id, resource_id, status = self.configure_metric(
+ self.endpoint, self.auth_token, metric_details)
+
+ # Generate and send a create metric response
+ try:
+ resp_message = self._response.generate_response(
+ 'create_metric_response', status=status,
+ cor_id=values['correlation_id'],
+ metric_id=metric_id, r_id=resource_id)
+ log.info("Response messages: %s", resp_message)
+ self._producer.create_metrics_resp(
+ 'create_metric_response', resp_message,
+ 'metric_response')
+ except Exception as exc:
+ log.warn("Failed to create response: %s", exc)
+
+ elif message.key == "read_metric_data_request":
+ # Read all metric data related to a specified metric
+ timestamps, metric_data = self.read_metric_data(
+ self.endpoint, self.auth_token, values)
+
+ # Generate and send a response message
+ try:
+ resp_message = self._response.generate_response(
+ 'read_metric_data_response',
+ m_id=values['metric_uuid'],
+ m_name=values['metric_name'],
+ r_id=values['resource_uuid'],
+ cor_id=values['correlation_id'],
+ times=timestamps, metrics=metric_data)
+ log.info("Response message: %s", resp_message)
+ self._producer.read_metric_data_response(
+ 'read_metric_data_response', resp_message,
+ 'metric_response')
+ except Exception as exc:
+ log.warn("Failed to send read metric response:%s", exc)
+
+ elif message.key == "delete_metric_request":
+ # delete the specified metric in the request
+ metric_id = values['metric_uuid']
+ status = self.delete_metric(
+ self.endpoint, self.auth_token, metric_id)
+
+ # Generate and send a response message
+ try:
+ resp_message = self._response.generate_response(
+ 'delete_metric_response', m_id=metric_id,
+ m_name=values['metric_name'],
+ status=status, r_id=values['resource_uuid'],
+ cor_id=values['correlation_id'])
+ log.info("Response message: %s", resp_message)
+ self._producer.delete_metric_response(
+ 'delete_metric_response', resp_message,
+ 'metric_response')
+ except Exception as exc:
+ log.warn("Failed to send delete response:%s", exc)
+
+ elif message.key == "update_metric_request":
+ # Gnocchi doesn't support configuration updates
+ # Log and send a response back to this effect
+ log.warn("Gnocchi doesn't support metric configuration\
+ updates.")
+ req_details = values['metric_create']
+ metric_name = req_details['metric_name']
+ resource_id = req_details['resource_uuid']
+ metric_id = self.get_metric_id(
+ self.endpoint, self.auth_token, metric_name, resource_id)
+
+ # Generate and send a response message
+ try:
+ resp_message = self._response.generate_response(
+ 'update_metric_response', status=False,
+ cor_id=values['correlation_id'],
+ r_id=resource_id, m_id=metric_id)
+ log.info("Response message: %s", resp_message)
+ self._producer.update_metric_response(
+ 'update_metric_response', resp_message,
+ 'metric_response')
+ except Exception as exc:
+ log.warn("Failed to send an update response:%s", exc)
+
+ elif message.key == "list_metric_request":
+ list_details = values['metrics_list_request']
+
+ metric_list = self.list_metrics(
+ self.endpoint, self.auth_token, list_details)
+
+ # Generate and send a response message
+ try:
+ resp_message = self._response.generate_response(
+ 'list_metric_response', m_list=metric_list,
+ cor_id=list_details['correlation_id'])
+ log.info("Response message: %s", resp_message)
+ self._producer.list_metric_response(
+ 'list_metric_response', resp_message,
+ 'metric_response')
+ except Exception as exc:
+ log.warn("Failed to send a list response:%s", exc)
+
+ else:
+ log.warn("Unknown key, no action will be performed.")
return
log.warn("Failed to gather specified measures: %s", exc)
return timestamps, data
- def authenticate(self):
- """Generate an authentication token and endpoint for metric request."""
- try:
- # Check for a tenant_id
- auth_token = self._common._authenticate()
- endpoint = self._common.get_endpoint("metric")
- return auth_token, endpoint
- except Exception as exc:
- log.warn("Authentication to Keystone failed: %s", exc)
-
- return None, None
-
def response_list(self, metric_list, metric_name=None, resource=None):
"""Create the appropriate lists for a list response."""
resp_list, name_list, res_list = [], [], []
+++ /dev/null
-# Copyright 2017 Intel Research and Development Ireland Limited
-# *************************************************************
-
-# This file is part of OSM Monitoring module
-# All Rights Reserved to Intel Corporation
-
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-# For those usages not covered by the Apache License, Version 2.0 please
-# contact: helena.mcgough@intel.com or adrian.hoban@intel.com
-##
-"""Gnocchi plugin for the OSM monitoring module."""
-
-import logging
-import sys
-
-sys.path.append("MON/")
-
-logging.basicConfig(filename='gnocchi_MON.log', datefmt='%m/%d/%Y %I:%M:%S %p',
- format='%(asctime)s %(message)s', filemode='a',
- level=logging.INFO)
-log = logging.getLogger(__name__)
-
-try:
- import gnocchiclient
-except ImportError:
- log.warn("Gnocchiclient could not be imported")
-
-from plugins.OpenStack.Gnocchi.metrics import Metrics
-from plugins.OpenStack.settings import Config
-
-__author__ = "Helena McGough"
-
-
-def register_plugin():
- """Register the plugin."""
- config = Config.instance()
- instance = Plugin(config=config)
- instance.config()
- instance.metrics()
-
-
-class Plugin(object):
- """Gnocchi plugin for OSM MON."""
-
- def __init__(self, config):
- """Plugin instance."""
- log.info("Initialze the plugin instance.")
- self._config = config
- self._metrics = Metrics()
-
- def config(self):
- """Configure plugin."""
- log.info("Configure the plugin instance.")
- self._config.read_environ("gnocchi")
-
- def metrics(self):
- """Initialize metric functionality."""
- log.info("Initialize metric functionality.")
- self._metrics.metric_calls()
-
-if gnocchiclient:
- register_plugin()
# For those usages not covered by the Apache License, Version 2.0 please
# contact: helena.mcgough@intel.com or adrian.hoban@intel.com
##
+"""OpenStack plugin for OSM MON."""
# contact: helena.mcgough@intel.com or adrian.hoban@intel.com
##
"""Common methods for the OpenStack plugins."""
+import json
import logging
-log = logging.getLogger(__name__)
from keystoneclient.v3 import client
__author__ = "Helena McGough"
+log = logging.getLogger(__name__)
+
class Common(object):
"""Common calls for Gnocchi/Aodh plugins."""
def __init__(self):
"""Create the common instance."""
self._auth_token = None
- self._endpoint = None
self._ks = None
+ self.openstack_url = None
+ self.user = None
+ self.password = None
+ self.tenant = None
- def _authenticate(self):
+ def _authenticate(self, message=None):
"""Authenticate and/or renew the authentication token."""
if self._auth_token is not None:
return self._auth_token
+ if message is not None:
+ values = json.loads(message.value)['access_config']
+ self.openstack_url = values['openstack_site']
+ self.user = values['user']
+ self.password = values['password']
+ self.tenant = values['vim_tenant_name']
+
+ try:
+ # try to authenticate with supplied access_credentials
+ self._ks = client.Client(auth_url=self.openstack_url,
+ username=self.user,
+ password=self.password,
+ tenant_name=self.tenant)
+ self._auth_token = self._ks.auth_token
+ log.info("Authenticating with access_credentials from SO.")
+ return self._auth_token
+ except Exception as exc:
+ log.warn("Authentication failed with access_credentials: %s",
+ exc)
+
+ else:
+ log.info("Access_credentials were not sent from SO.")
+
+ # If there are no access_credentials or they fail use env variables
try:
cfg = Config.instance()
self._ks = client.Client(auth_url=cfg.OS_AUTH_URL,
username=cfg.OS_USERNAME,
password=cfg.OS_PASSWORD,
tenant_name=cfg.OS_TENANT_NAME)
+ log.info("Authenticating with environment varialbles.")
self._auth_token = self._ks.auth_token
except Exception as exc:
headers = {'X-Auth-Token': auth_token,
'Content-type': 'application/json'}
# perform request and return its result
+ if req_type == "put":
+ response = requests.put(
+ url, data=payload, headers=headers,
+ timeout=1)
+ elif req_type == "get":
+ response = requests.get(
+ url, params=params, headers=headers, timeout=1)
+ elif req_type == "delete":
+ response = requests.delete(
+ url, headers=headers, timeout=1)
+ else:
+ response = requests.post(
+ url, data=payload, headers=headers,
+ timeout=1)
+
+ # Raises exception if there was an error
try:
- if req_type == "put":
- response = requests.put(
- url, data=payload, headers=headers,
- timeout=1)
- elif req_type == "get":
- response = requests.get(
- url, params=params, headers=headers, timeout=1)
- elif req_type == "delete":
- response = requests.delete(
- url, headers=headers, timeout=1)
- else:
- response = requests.post(
- url, data=payload, headers=headers,
- timeout=1)
-
- except Exception as e:
- log.warn("Exception thrown on request", e)
- if response is not None:
- log.warn("Request resulted in %s code and %s response",
- response.status_code, response.text)
-
+ response.raise_for_status()
+ # pylint: disable=broad-except
+ except Exception:
+ # Log out the result of the request for debugging purpose
+ log.debug(
+ 'Result: %s, %d',
+ response.status_code, response.text)
return response
message = self.notify_alarm(**kwargs)
else:
log.warn("Failed to generate a valid response message.")
+ message = None
return message
"""Configurations for the OpenStack plugins."""
import logging
-log = logging.getLogger(__name__)
import os
from collections import namedtuple
__author__ = "Helena McGough"
+log = logging.getLogger(__name__)
+
class BadConfigError(Exception):
"""Configuration exception."""
+# -*- coding: utf-8 -*-
+
# Copyright 2017 Intel Research and Development Ireland Limited
# *************************************************************
# For those usages not covered by the Apache License, Version 2.0 please
# contact: helena.mcgough@intel.com or adrian.hoban@intel.com
##
+"""Plugins for OSM MON."""
# contact: helena.mcgough@intel.com or adrian.hoban@intel.com
##
-flake8
+hacking>=0.10.0,<0.11
+
+flake8<3.0
mock
+oslosphinx>=2.5.0 # Apache-2.0
+oslotest>=1.10.0 # Apache-2.0
os-testr
-testrepository
-testscenarios
-testtools
+testrepository>=0.0.18
+pylint
+python-subunit>=0.0.18
+pytest
+testscenarios>=0.4
+testtools>=1.4.0
kafka
# For those usages not covered by the Apache License, Version 2.0 please
# contact: helena.mcgough@intel.com or adrian.hoban@intel.com
##
+"""OpenStack plugin tests."""
+import logging
+
+# Initialise a logger for tests
+logging.basicConfig(filename='OpenStack_tests.log',
+ format='%(asctime)s %(message)s',
+ datefmt='%m/%d/%Y %I:%M:%S %p', filemode='a',
+ level=logging.INFO)
+log = logging.getLogger(__name__)
--- /dev/null
+# Copyright 2017 iIntel Research and Development Ireland Limited
+# **************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Intel Corporation
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: helena.mcgough@intel.com or adrian.hoban@intel.com
+##
+"""Tests for all alarm request message keys."""
+
+import json
+
+import logging
+
+import unittest
+
+import mock
+
+from plugins.OpenStack.Aodh import alarming as alarm_req
+from plugins.OpenStack.common import Common
+
+__author__ = "Helena McGough"
+
+log = logging.getLogger(__name__)
+
+
+class Message(object):
+ """A class to mock a message object value for alarm requests."""
+
+ def __init__(self):
+ """Initialize a mocked message instance."""
+ self.topic = "alarm_request"
+ self.key = None
+ self.value = json.dumps({"mock_value": "mock_details"})
+
+
+class TestAlarmKeys(unittest.TestCase):
+ """Integration test for alarm request keys."""
+
+ def setUp(self):
+ """Setup the tests for alarm request keys."""
+ super(TestAlarmKeys, self).setUp()
+ self.alarming = alarm_req.Alarming()
+ self.alarming.common = Common()
+
+ @mock.patch.object(Common, "_authenticate")
+ def test_alarming_env_authentication(self, auth):
+ """Test getting an auth_token and endpoint for alarm requests."""
+ # if auth_token is None environment variables are used to authenticare
+ message = Message()
+
+ self.alarming.alarming(message, self.alarming.common, None)
+
+ auth.assert_called_with()
+
+ @mock.patch.object(Common, "_authenticate")
+ def test_acccess_cred_auth(self, auth):
+ """Test receiving auth_token from access creds."""
+ message = Message()
+
+ self.alarming.alarming(message, self.alarming.common, "my_auth_token")
+
+ auth.assert_not_called
+ self.assertEqual(self.alarming.auth_token, "my_auth_token")
+
+ @mock.patch.object(alarm_req.Alarming, "delete_alarm")
+ def test_delete_alarm_key(self, del_alarm):
+ """Test the functionality for a create alarm request."""
+ # Mock a message value and key
+ message = Message()
+ message.key = "delete_alarm_request"
+ message.value = json.dumps({"alarm_delete_request":
+ {"alarm_uuid": "my_alarm_id"}})
+
+ # Call the alarming functionality and check delete request
+ self.alarming.alarming(message, self.alarming.common, "my_auth_token")
+
+ del_alarm.assert_called_with(mock.ANY, mock.ANY, "my_alarm_id")
+
+ @mock.patch.object(alarm_req.Alarming, "list_alarms")
+ def test_list_alarm_key(self, list_alarm):
+ """Test the functionality for a list alarm request."""
+ # Mock a message with list alarm key and value
+ message = Message()
+ message.key = "list_alarm_request"
+ message.value = json.dumps({"alarm_list_request": "my_alarm_details"})
+
+ # Call the alarming functionality and check list functionality
+ self.alarming.alarming(message, self.alarming.common, "my_auth_token")
+ list_alarm.assert_called_with(mock.ANY, mock.ANY, "my_alarm_details")
+
+ @mock.patch.object(alarm_req.Alarming, "update_alarm_state")
+ def test_ack_alarm_key(self, ack_alarm):
+ """Test the functionality for an acknowledge alarm request."""
+ # Mock a message with acknowledge alarm key and value
+ message = Message()
+ message.key = "acknowledge_alarm"
+ message.value = json.dumps({"ack_details":
+ {"alarm_uuid": "my_alarm_id"}})
+
+ # Call alarming functionality and check acknowledge functionality
+ self.alarming.alarming(message, self.alarming.common, "my_auth_token")
+ ack_alarm.assert_called_with(mock.ANY, mock.ANY, "my_alarm_id")
+
+ @mock.patch.object(alarm_req.Alarming, "configure_alarm")
+ def test_config_alarm_key(self, config_alarm):
+ """Test the functionality for a create alarm request."""
+ # Mock a message with config alarm key and value
+ message = Message()
+ message.key = "create_alarm_request"
+ message.value = json.dumps({"alarm_create_request": "alarm_details"})
+
+ # Call alarming functionality and check config alarm call
+ config_alarm.return_value = "my_alarm_id", True
+ self.alarming.alarming(message, self.alarming.common, "my_auth_token")
+ config_alarm.assert_called_with(mock.ANY, mock.ANY, "alarm_details")
--- /dev/null
+# Copyright 2017 iIntel Research and Development Ireland Limited
+# **************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Intel Corporation
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: helena.mcgough@intel.com or adrian.hoban@intel.com
+##
+"""Tests for all alarm request message keys."""
+
+import json
+
+import logging
+
+import unittest
+
+import mock
+
+from plugins.OpenStack.Aodh import alarming as alarm_req
+from plugins.OpenStack.common import Common
+
+__author__ = "Helena McGough"
+
+log = logging.getLogger(__name__)
+
+auth_token = mock.ANY
+endpoint = mock.ANY
+
+
+class Response(object):
+ """Mock a response message class."""
+
+ def __init__(self, result):
+ """Initialise the response text and status code."""
+ self.text = json.dumps(result)
+ self.status_code = "MOCK_STATUS_CODE"
+
+
+class TestAlarming(unittest.TestCase):
+ """Tests for alarming class functions."""
+
+ def setUp(self):
+ """Setup for tests."""
+ super(TestAlarming, self).setUp()
+ self.alarming = alarm_req.Alarming()
+ self.alarming.common = Common()
+
+ @mock.patch.object(alarm_req.Alarming, "check_payload")
+ @mock.patch.object(alarm_req.Alarming, "check_for_metric")
+ @mock.patch.object(Common, "_perform_request")
+ def test_config_invalid_alarm_req(self, perf_req, check_metric, check_pay):
+ """Test configure an invalid alarm request."""
+ # Configuring with invalid alarm name results in failure
+ values = {"alarm_name": "my_alarm",
+ "metric_name": "my_metric",
+ "resource_uuid": "my_r_id"}
+ self.alarming.configure_alarm(endpoint, auth_token, values)
+ perf_req.assert_not_called
+ perf_req.reset_mock()
+
+ # Correct alarm_name will check for metric in Gnocchi
+ # If there isn't one an alarm won;t be created
+ values = {"alarm_name": "disk_write_ops",
+ "metric_name": "disk_write_ops",
+ "resource_uuid": "my_r_id"}
+
+ check_metric.return_value = None
+
+ self.alarming.configure_alarm(endpoint, auth_token, values)
+ perf_req.assert_not_called
+
+ @mock.patch.object(alarm_req.Alarming, "check_payload")
+ @mock.patch.object(alarm_req.Alarming, "check_for_metric")
+ @mock.patch.object(Common, "_perform_request")
+ def test_config_valid_alarm_req(self, perf_req, check_metric, check_pay):
+ """Test config a valid alarm."""
+ # Correct alarm_name will check for metric in Gnocchi
+ # And conform that the payload is configured correctly
+ values = {"alarm_name": "disk_write_ops",
+ "metric_name": "disk_write_ops",
+ "resource_uuid": "my_r_id"}
+
+ check_metric.return_value = "my_metric_id"
+ check_pay.return_value = "my_payload"
+
+ self.alarming.configure_alarm(endpoint, auth_token, values)
+ perf_req.assert_called_with(
+ "<ANY>/v2/alarms/", auth_token,
+ req_type="post", payload="my_payload")
+
+ @mock.patch.object(Common, "_perform_request")
+ def test_delete_alarm_req(self, perf_req):
+ """Test delete alarm request."""
+ self.alarming.delete_alarm(endpoint, auth_token, "my_alarm_id")
+
+ perf_req.assert_called_with(
+ "<ANY>/v2/alarms/my_alarm_id", auth_token, req_type="delete")
+
+ @mock.patch.object(Common, "_perform_request")
+ def test_invalid_list_alarm_req(self, perf_req):
+ """Test invalid list alarm_req."""
+ # Request will not be performed with out a resoure_id
+ list_details = {"mock_details": "invalid_details"}
+ self.alarming.list_alarms(endpoint, auth_token, list_details)
+
+ perf_req.assert_not_called
+
+ @mock.patch.object(Common, "_perform_request")
+ def test_valid_list_alarm_req(self, perf_req):
+ """Test valid list alarm request."""
+ # Minimum requirement for an alarm list is resource_id
+ list_details = {"resource_uuid": "mock_r_id"}
+ self.alarming.list_alarms(endpoint, auth_token, list_details)
+
+ perf_req.assert_called_with(
+ "<ANY>/v2/alarms/", auth_token, req_type="get")
+ perf_req.reset_mock()
+
+ # Check list with alarm_name defined
+ list_details = {"resource_uuid": "mock_r_id",
+ "alarm_name": "my_alarm",
+ "severity": "critical"}
+ self.alarming.list_alarms(endpoint, auth_token, list_details)
+
+ perf_req.assert_called_with(
+ "<ANY>/v2/alarms/", auth_token, req_type="get")
+
+ @mock.patch.object(Common, "_perform_request")
+ def test_ack_alarm_req(self, perf_req):
+ """Test update alarm state for acknowledge alarm request."""
+ self.alarming.update_alarm_state(endpoint, auth_token, "my_alarm_id")
+
+ perf_req.assert_called_with(
+ "<ANY>/v2/alarms/my_alarm_id/state", auth_token, req_type="put",
+ payload=json.dumps("ok"))
+
+ @mock.patch.object(alarm_req.Alarming, "check_payload")
+ @mock.patch.object(Common, "_perform_request")
+ def test_update_alarm_invalid(self, perf_req, check_pay):
+ """Test update alarm with invalid get response."""
+ values = {"alarm_uuid": "my_alarm_id"}
+
+ self.alarming.update_alarm(endpoint, auth_token, values)
+
+ perf_req.assert_called_with(mock.ANY, auth_token, req_type="get")
+ check_pay.assert_not_called
+
+ @mock.patch.object(alarm_req.Alarming, "check_payload")
+ @mock.patch.object(Common, "_perform_request")
+ def test_update_alarm_invalid_payload(self, perf_req, check_pay):
+ """Test update alarm with invalid payload."""
+ resp = Response({"name": "my_alarm",
+ "state": "alarm",
+ "gnocchi_resources_threshold_rule":
+ {"resource_id": "my_resource_id",
+ "metric": "my_metric"}})
+ perf_req.return_value = resp
+ check_pay.return_value = None
+ values = {"alarm_uuid": "my_alarm_id"}
+
+ self.alarming.update_alarm(endpoint, auth_token, values)
+
+ perf_req.assert_called_with(mock.ANY, auth_token, req_type="get")
+ self.assertEqual(perf_req.call_count, 1)
+
+ @mock.patch.object(alarm_req.Alarming, "check_payload")
+ @mock.patch.object(Common, "_perform_request")
+ def test_update_alarm_valid(self, perf_req, check_pay):
+ """Test valid update alarm request."""
+ resp = Response({"name": "my_alarm",
+ "state": "alarm",
+ "gnocchi_resources_threshold_rule":
+ {"resource_id": "my_resource_id",
+ "metric": "my_metric"}})
+ perf_req.return_value = resp
+ values = {"alarm_uuid": "my_alarm_id"}
+
+ self.alarming.update_alarm(endpoint, auth_token, values)
+
+ check_pay.assert_called_with(values, "my_metric", "my_resource_id",
+ "my_alarm", alarm_state="alarm")
+
+ self.assertEqual(perf_req.call_count, 2)
+ # Second call is the update request
+ perf_req.assert_called_with(
+ '<ANY>/v2/alarms/my_alarm_id', auth_token,
+ req_type="put", payload=check_pay.return_value)
+
+ def test_check_valid_payload(self):
+ """Test the check payload function for a valid payload."""
+ values = {"severity": "warning",
+ "statistic": "COUNT",
+ "threshold_value": 12,
+ "operation": "GT"}
+ payload = self.alarming.check_payload(
+ values, "my_metric", "r_id", "alarm_name")
+
+ self.assertEqual(
+ json.loads(payload), {"name": "alarm_name",
+ "gnocchi_resources_threshold_rule":
+ {"resource_id": "r_id",
+ "metric": "my_metric",
+ "comparison_operator": "gt",
+ "aggregation_method": "count",
+ "threshold": 12,
+ "resource_type": "generic"},
+ "severity": "low",
+ "state": "ok",
+ "type": "gnocchi_resources_threshold"})
+
+ def test_check_valid_state_payload(self):
+ """Test the check payload function for a valid payload with state."""
+ values = {"severity": "warning",
+ "statistic": "COUNT",
+ "threshold_value": 12,
+ "operation": "GT"}
+ payload = self.alarming.check_payload(
+ values, "my_metric", "r_id", "alarm_name", alarm_state="alarm")
+
+ self.assertEqual(
+ json.loads(payload), {"name": "alarm_name",
+ "gnocchi_resources_threshold_rule":
+ {"resource_id": "r_id",
+ "metric": "my_metric",
+ "comparison_operator": "gt",
+ "aggregation_method": "count",
+ "threshold": 12,
+ "resource_type": "generic"},
+ "severity": "low",
+ "state": "alarm",
+ "type": "gnocchi_resources_threshold"})
+
+ def test_check_invalid_payload(self):
+ """Test the check payload function for an invalid payload."""
+ values = {"alarm_values": "mock_invalid_details"}
+ payload = self.alarming.check_payload(
+ values, "my_metric", "r_id", "alarm_name")
+
+ self.assertEqual(payload, None)
+
+ @mock.patch.object(Common, "_perform_request")
+ def test_get_alarm_state(self, perf_req):
+ """Test the get alarm state function."""
+ self.alarming.get_alarm_state(endpoint, auth_token, "alarm_id")
+
+ perf_req.assert_called_with(
+ "<ANY>/v2/alarms/alarm_id/state", auth_token, req_type="get")
+
+ @mock.patch.object(Common, "get_endpoint")
+ @mock.patch.object(Common, "_perform_request")
+ def test_check_for_metric(self, perf_req, get_endpoint):
+ """Test the check for metric function."""
+ get_endpoint.return_value = "gnocchi_endpoint"
+
+ self.alarming.check_for_metric(auth_token, "metric_name", "r_id")
+
+ perf_req.assert_called_with(
+ "gnocchi_endpoint/v1/metric/", auth_token, req_type="get")
##
"""Tests for all common OpenStack methods."""
+import json
+
+import logging
+
import unittest
+from keystoneclient.v3 import client
+
import mock
from plugins.OpenStack.common import Common
+from plugins.OpenStack.settings import Config
import requests
+__author__ = "Helena McGough"
+
+log = logging.getLogger(__name__)
+
+
+class Message(object):
+ """Mock a message for an access credentials request."""
+
+ def __init__(self):
+ """Initialise the topic and value of access_cred message."""
+ self.topic = "access_credentials"
+ self.value = json.dumps({"mock_value": "mock_details",
+ "vim_type": "OPENSTACK",
+ "access_config":
+ {"openstack_site": "my_site",
+ "user": "my_user",
+ "password": "my_password",
+ "vim_tenant_name": "my_tenant"}})
+
class TestCommon(unittest.TestCase):
"""Test the common class for OpenStack plugins."""
super(TestCommon, self).setUp()
self.common = Common()
+ @mock.patch.object(client, "Client")
+ def test_authenticate_exists(self, key_client):
+ """Testing if an authentication token already exists."""
+ # If the auth_token is already generated a new one will not be creates
+ self.common._auth_token = "my_auth_token"
+ token = self.common._authenticate()
+
+ self.assertEqual(token, "my_auth_token")
+
+ @mock.patch.object(Config, "instance")
+ @mock.patch.object(client, "Client")
+ def test_authenticate_none(self, key_client, cfg):
+ """Test generating a new authentication token."""
+ # If auth_token doesn't exist one will try to be created with keystone
+ # With the configuration values from the environment
+ self.common._auth_token = None
+ config = cfg.return_value
+ url = config.OS_AUTH_URL
+ user = config.OS_USERNAME
+ pword = config.OS_PASSWORD
+ tenant = config.OS_TENANT_NAME
+
+ self.common._authenticate()
+
+ key_client.assert_called_with(auth_url=url,
+ username=user,
+ password=pword,
+ tenant_name=tenant)
+ key_client.reset_mock()
+
+ @mock.patch.object(client, "Client")
+ def test_authenticate_access_cred(self, key_client):
+ """Test generating an auth_token using access_credentials from SO."""
+ # Mock valid message from SO
+ self.common._auth_token = None
+ message = Message()
+
+ self.common._authenticate(message=message)
+
+ # The class variables are set for each consifugration
+ self.assertEqual(self.common.openstack_url, "my_site")
+ self.assertEqual(self.common.user, "my_user")
+ self.assertEqual(self.common.password, "my_password")
+ self.assertEqual(self.common.tenant, "my_tenant")
+ key_client.assert_called
+
@mock.patch.object(requests, 'post')
def test_post_req(self, post):
"""Testing a post request."""
--- /dev/null
+# Copyright 2017 iIntel Research and Development Ireland Limited
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Intel Corporation
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: helena.mcgough@intel.com or adrian.hoban@intel.com
+##
+"""Tests for all metric request message keys."""
+
+import json
+
+import logging
+
+import unittest
+
+import mock
+
+from plugins.OpenStack.Gnocchi import metrics as metric_req
+
+from plugins.OpenStack.common import Common
+
+__author__ = "Helena McGough"
+
+log = logging.getLogger(__name__)
+
+# Mock auth_token and endpoint
+endpoint = mock.ANY
+auth_token = mock.ANY
+
+# Mock a valid metric list for some tests, and a resultant list
+metric_list = [{"name": "disk_write_ops",
+ "id": "metric_id",
+ "unit": "units",
+ "resource_id": "r_id"}]
+result_list = ["metric_id", "r_id", "units", "disk_write_ops"]
+
+
+class Response(object):
+ """Mock a response object for requests."""
+
+ def __init__(self):
+ """Initialise test and status code values."""
+ self.text = json.dumps("mock_response_text")
+ self.status_code = "STATUS_CODE"
+
+
+class TestMetricCalls(unittest.TestCase):
+ """Integration test for metric request keys."""
+
+ def setUp(self):
+ """Setup the tests for metric request keys."""
+ super(TestMetricCalls, self).setUp()
+ self.metrics = metric_req.Metrics()
+ self.metrics._common = Common()
+
+ @mock.patch.object(metric_req.Metrics, "get_metric_name")
+ @mock.patch.object(metric_req.Metrics, "get_metric_id")
+ @mock.patch.object(Common, "_perform_request")
+ def test_invalid_config_metric_req(
+ self, perf_req, get_metric, get_metric_name):
+ """Test the configure metric function, for an invalid metric."""
+ # Test invalid configuration for creating a metric
+ values = {"metric_details": "invalid_metric"}
+
+ m_id, r_id, status = self.metrics.configure_metric(
+ endpoint, auth_token, values)
+
+ perf_req.assert_not_called
+ self.assertEqual(m_id, None)
+ self.assertEqual(r_id, None)
+ self.assertEqual(status, False)
+
+ # Test with an invalid metric name, will not perform request
+ values = {"resource_uuid": "r_id"}
+ get_metric_name.return_value = "metric_name", None
+
+ m_id, r_id, status = self.metrics.configure_metric(
+ endpoint, auth_token, values)
+
+ perf_req.assert_not_called
+ self.assertEqual(m_id, None)
+ self.assertEqual(r_id, "r_id")
+ self.assertEqual(status, False)
+ get_metric_name.reset_mock()
+
+ # If metric exists, it won't be recreated
+ get_metric_name.return_value = "metric_name", "norm_name"
+ get_metric.return_value = "metric_id"
+
+ m_id, r_id, status = self.metrics.configure_metric(
+ endpoint, auth_token, values)
+
+ perf_req.assert_not_called
+ self.assertEqual(m_id, "metric_id")
+ self.assertEqual(r_id, "r_id")
+ self.assertEqual(status, False)
+
+ @mock.patch.object(metric_req.Metrics, "get_metric_name")
+ @mock.patch.object(metric_req.Metrics, "get_metric_id")
+ @mock.patch.object(Common, "_perform_request")
+ def test_valid_config_metric_req(
+ self, perf_req, get_metric, get_metric_name):
+ """Test the configure metric function, for a valid metric."""
+ # Test valid configuration and payload for creating a metric
+ values = {"resource_uuid": "r_id",
+ "metric_unit": "units"}
+ get_metric_name.return_value = "metric_name", "norm_name"
+ get_metric.return_value = None
+ payload = {"id": "r_id",
+ "metrics": {"metric_name":
+ {"archive_policy_name": "high",
+ "name": "metric_name",
+ "unit": "units"}}}
+
+ self.metrics.configure_metric(endpoint, auth_token, values)
+
+ perf_req.assert_called_with(
+ "<ANY>/v1/resource/generic", auth_token, req_type="post",
+ payload=json.dumps(payload))
+
+ @mock.patch.object(Common, "_perform_request")
+ def test_delete_metric_req(self, perf_req):
+ """Test the delete metric function."""
+ self.metrics.delete_metric(endpoint, auth_token, "metric_id")
+
+ perf_req.assert_called_with(
+ "<ANY>/v1/metric/metric_id", auth_token, req_type="delete")
+
+ @mock.patch.object(Common, "_perform_request")
+ def test_delete_metric_invalid_status(self, perf_req):
+ """Test invalid response for delete request."""
+ perf_req.return_value = "404"
+
+ status = self.metrics.delete_metric(endpoint, auth_token, "metric_id")
+
+ self.assertEqual(status, False)
+
+ @mock.patch.object(metric_req.Metrics, "response_list")
+ @mock.patch.object(Common, "_perform_request")
+ def test_complete_list_metric_req(self, perf_req, resp_list):
+ """Test the complete list metric function."""
+ # Test listing metrics without any configuration options
+ values = {}
+ resp = Response()
+ perf_req.return_value = resp
+ self.metrics.list_metrics(endpoint, auth_token, values)
+
+ perf_req.assert_called_with(
+ "<ANY>/v1/metric/", auth_token, req_type="get")
+ resp_list.assert_called_with("mock_response_text")
+
+ @mock.patch.object(metric_req.Metrics, "response_list")
+ @mock.patch.object(Common, "_perform_request")
+ def test_resource_list_metric_req(self, perf_req, resp_list):
+ """Test the resource list metric function."""
+ # Test listing metrics with a resource id specified
+ values = {"resource_uuid": "resource_id"}
+ resp = Response()
+ perf_req.return_value = resp
+ self.metrics.list_metrics(endpoint, auth_token, values)
+
+ perf_req.assert_called_with(
+ "<ANY>/v1/metric/", auth_token, req_type="get")
+ resp_list.assert_called_with(
+ "mock_response_text", resource="resource_id")
+
+ @mock.patch.object(metric_req.Metrics, "response_list")
+ @mock.patch.object(Common, "_perform_request")
+ def test_name_list_metric_req(self, perf_req, resp_list):
+ """Test the metric_name list metric function."""
+ # Test listing metrics with a metric_name specified
+ values = {"metric_name": "disk_write_bytes"}
+ resp = Response()
+ perf_req.return_value = resp
+ self.metrics.list_metrics(endpoint, auth_token, values)
+
+ perf_req.assert_called_with(
+ "<ANY>/v1/metric/", auth_token, req_type="get")
+ resp_list.assert_called_with(
+ "mock_response_text", metric_name="disk_write_bytes")
+
+ @mock.patch.object(metric_req.Metrics, "response_list")
+ @mock.patch.object(Common, "_perform_request")
+ def test_combined_list_metric_req(self, perf_req, resp_list):
+ """Test the combined resource and metric list metric function."""
+ # Test listing metrics with a resource id and metric name specified
+ values = {"resource_uuid": "resource_id",
+ "metric_name": "packets_sent"}
+ resp = Response()
+ perf_req.return_value = resp
+ self.metrics.list_metrics(endpoint, auth_token, values)
+
+ perf_req.assert_called_with(
+ "<ANY>/v1/metric/", auth_token, req_type="get")
+ resp_list.assert_called_with(
+ "mock_response_text", resource="resource_id",
+ metric_name="packets_sent")
+
+ @mock.patch.object(Common, "_perform_request")
+ def test_get_metric_id(self, perf_req):
+ """Test get_metric_id function."""
+ self.metrics.get_metric_id(endpoint, auth_token, "my_metric", "r_id")
+
+ perf_req.assert_called_with(
+ "<ANY>/v1/resource/generic/r_id", auth_token, req_type="get")
+
+ def test_get_metric_name(self):
+ """Test the result from the get_metric_name function."""
+ # test with a valid metric_name
+ values = {"metric_name": "disk_write_ops"}
+
+ metric_name, norm_name = self.metrics.get_metric_name(values)
+
+ self.assertEqual(metric_name, "disk_write_ops")
+ self.assertEqual(norm_name, "disk.disk_ops")
+
+ # test with an invalid metric name
+ values = {"metric_name": "my_invalid_metric"}
+
+ metric_name, norm_name = self.metrics.get_metric_name(values)
+
+ self.assertEqual(metric_name, "my_invalid_metric")
+ self.assertEqual(norm_name, None)
+
+ @mock.patch.object(Common, "_perform_request")
+ def test_valid_read_data_req(self, perf_req):
+ """Test the read metric data function, for a valid call."""
+ values = {"metric_uuid": "metric_id",
+ "collection_unit": "DAY",
+ "collection_period": 1}
+
+ self.metrics.read_metric_data(endpoint, auth_token, values)
+
+ perf_req.assert_called_once
+
+ @mock.patch.object(Common, "_perform_request")
+ def test_invalid_read_data_req(self, perf_req):
+ """Test the read metric data function, for an invalid call."""
+ # Teo empty lists wil be returned because the values are invalid
+ values = {}
+
+ times, data = self.metrics.read_metric_data(
+ endpoint, auth_token, values)
+
+ self.assertEqual(times, [])
+ self.assertEqual(data, [])
+
+ def test_complete_response_list(self):
+ """Test the response list function for formating metric lists."""
+ # Mock a list for testing purposes, with valid OSM metric
+ resp_list = self.metrics.response_list(metric_list)
+
+ # Check for the expected values in the resulting list
+ for l in result_list:
+ self.assertIn(l, resp_list[0])
+
+ def test_name_response_list(self):
+ """Test the response list with metric name configured."""
+ # Mock the metric name to test a metric name list
+ # Test with a name that is not in the list
+ invalid_name = "my_metric"
+ resp_list = self.metrics.response_list(
+ metric_list, metric_name=invalid_name)
+
+ self.assertEqual(resp_list, [])
+
+ # Test with a name on the list
+ valid_name = "disk_write_ops"
+ resp_list = self.metrics.response_list(
+ metric_list, metric_name=valid_name)
+
+ # Check for the expected values in the resulting list
+ for l in result_list:
+ self.assertIn(l, resp_list[0])
+
+ def test_resource_response_list(self):
+ """Test the response list with resource_id configured."""
+ # Mock a resource_id to test a resource list
+ # Test with resource not on the list
+ invalid_id = "mock_resource"
+ resp_list = self.metrics.response_list(metric_list, resource=invalid_id)
+
+ self.assertEqual(resp_list, [])
+
+ # Test with a resource on the list
+ valid_id = "r_id"
+ resp_list = self.metrics.response_list(metric_list, resource=valid_id)
+
+ # Check for the expected values in the resulting list
+ for l in result_list:
+ self.assertIn(l, resp_list[0])
+
+ def test_combined_response_list(self):
+ """Test the response list function with resource_id and metric_name."""
+ # Test for a combined resource and name list
+ # resource and name are on the lisat
+ valid_name = "disk_write_ops"
+ valid_id = "r_id"
+ resp_list = self.metrics.response_list(
+ metric_list, metric_name=valid_name, resource=valid_id)
+
+ # Check for the expected values in the resulting list
+ for l in result_list:
+ self.assertIn(l, resp_list[0])
+
+ # resource not on list
+ invalid_id = "mock_resource"
+ resp_list = self.metrics.response_list(
+ metric_list, metric_name=valid_name, resource=invalid_id)
+
+ self.assertEqual(resp_list, [])
+
+ # metric name not on list
+ invalid_name = "mock_metric"
+ resp_list = self.metrics.response_list(
+ metric_list, metric_name=invalid_name, resource=valid_id)
+
+ self.assertEqual(resp_list, [])
--- /dev/null
+# Copyright 2017 iIntel Research and Development Ireland Limited
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Intel Corporation
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: helena.mcgough@intel.com or adrian.hoban@intel.com
+##
+"""Tests for all metric request message keys."""
+
+import json
+
+import logging
+
+import unittest
+
+import mock
+
+from plugins.OpenStack.Gnocchi import metrics as metric_req
+
+from plugins.OpenStack.common import Common
+
+__author__ = "Helena McGough"
+
+log = logging.getLogger(__name__)
+
+
+class Message(object):
+ """A class to mock a message object value for metric requests."""
+
+ def __init__(self):
+ """Initialize a mocked message instance."""
+ self.topic = "metric_request"
+ self.key = None
+ self.value = json.dumps({"mock_message": "message_details"})
+
+
+class TestMetricReq(unittest.TestCase):
+ """Integration test for metric request keys."""
+
+ def setUp(self):
+ """Setup the tests for metric request keys."""
+ super(TestMetricReq, self).setUp()
+ self.common = Common()
+ self.metrics = metric_req.Metrics()
+
+ @mock.patch.object(Common, "_authenticate")
+ def test_access_cred_metric_auth(self, auth):
+ """Test authentication with access credentials."""
+ message = Message()
+
+ self.metrics.metric_calls(message, self.common, "my_auth_token")
+
+ auth.assert_not_called
+ self.assertEqual(self.metrics.auth_token, "my_auth_token")
+
+ @mock.patch.object(Common, "_authenticate")
+ def test_env_metric_auth(self, auth):
+ """Test authentication with environment variables."""
+ message = Message()
+
+ self.metrics.metric_calls(message, self.common, None)
+
+ auth.assert_called_with()
+
+ @mock.patch.object(metric_req.Metrics, "delete_metric")
+ def test_delete_metric_key(self, del_metric):
+ """Test the functionality for a delete metric request."""
+ # Mock a message value and key
+ message = Message()
+ message.key = "delete_metric_request"
+ message.value = json.dumps({"metric_uuid": "my_metric_id"})
+
+ # Call the metric functionality and check delete request
+ self.metrics.metric_calls(message, self.common, "my_auth_token")
+
+ del_metric.assert_called_with(mock.ANY, mock.ANY, "my_metric_id")
+
+ @mock.patch.object(metric_req.Metrics, "list_metrics")
+ def test_list_metric_key(self, list_metrics):
+ """Test the functionality for a list metric request."""
+ # Mock a message with list metric key and value
+ message = Message()
+ message.key = "list_metric_request"
+ message.value = json.dumps({"metrics_list_request": "metric_details"})
+
+ # Call the metric functionality and check list functionality
+ self.metrics.metric_calls(message, self.common, "my_auth_token")
+ list_metrics.assert_called_with(mock.ANY, mock.ANY, "metric_details")
+
+ @mock.patch.object(metric_req.Metrics, "read_metric_data")
+ @mock.patch.object(metric_req.Metrics, "list_metrics")
+ @mock.patch.object(metric_req.Metrics, "delete_metric")
+ @mock.patch.object(metric_req.Metrics, "configure_metric")
+ def test_update_metric_key(self, config_metric, delete_metric, list_metrics,
+ read_data):
+ """Test the functionality for an update metric request."""
+ # Mock a message with update metric key and value
+ message = Message()
+ message.key = "update_metric_request"
+ message.value = json.dumps({"metric_create":
+ {"metric_name": "my_metric",
+ "resource_uuid": "my_r_id"}})
+
+ # Call metric functionality and confirm no function is called
+ # Gnocchi does not support updating a metric configuration
+ self.metrics.metric_calls(message, self.common, "my_auth_token")
+ config_metric.assert_not_called
+ list_metrics.assert_not_called
+ delete_metric.assert_not_called
+ read_data.assert_not_called
+
+ @mock.patch.object(metric_req.Metrics, "configure_metric")
+ def test_config_metric_key(self, config_metric):
+ """Test the functionality for a create metric request."""
+ # Mock a message with create metric key and value
+ message = Message()
+ message.key = "create_metric_request"
+ message.value = json.dumps({"metric_create": "metric_details"})
+
+ # Call metric functionality and check config metric
+ config_metric.return_value = "metric_id", "resource_id", True
+ self.metrics.metric_calls(message, self.common, "my_auth_token")
+ config_metric.assert_called_with(mock.ANY, mock.ANY, "metric_details")
+
+ @mock.patch.object(metric_req.Metrics, "read_metric_data")
+ def test_read_data_key(self, read_data):
+ """Test the functionality for a read metric data request."""
+ # Mock a message with a read data key and value
+ message = Message()
+ message.key = "read_metric_data_request"
+ message.value = json.dumps({"alarm_uuid": "alarm_id"})
+
+ # Call metric functionality and check read data metrics
+ read_data.return_value = "time_stamps", "data_values"
+ self.metrics.metric_calls(message, self.common, "my_auth_token")
+ read_data.assert_called_with(
+ mock.ANY, mock.ANY, json.loads(message.value))
--- /dev/null
+# Copyright 2017 iIntel Research and Development Ireland Limited
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Intel Corporation
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: helena.mcgough@intel.com or adrian.hoban@intel.com
+##
+"""Test that the correct responses are generated for each message."""
+
+import logging
+
+import unittest
+
+import mock
+
+from plugins.OpenStack import response as resp
+
+__author__ = "Helena McGough"
+
+log = logging.getLogger(__name__)
+
+
+class TestOpenStackResponse(unittest.TestCase):
+ """Tests for responses generated by the OpenStack plugins."""
+
+ def setUp(self):
+ """Setup for testing OpenStack plugin responses."""
+ super(TestOpenStackResponse, self).setUp()
+ self.plugin_resp = resp.OpenStack_Response()
+
+ def test_invalid_key(self):
+ """Test if an invalid key is entered for a response."""
+ message = self.plugin_resp.generate_response("mock_invalid_key")
+ self.assertEqual(message, None)
+
+ @mock.patch.object(
+ resp.OpenStack_Response, "alarm_list_response")
+ def test_list_alarm_resp(self, alarm_list_resp):
+ """Test out a function call for a list alarm response."""
+ message = self.plugin_resp.generate_response("list_alarm_response")
+ self.assertEqual(alarm_list_resp.return_value, message)
+
+ @mock.patch.object(
+ resp.OpenStack_Response, "list_metric_response")
+ def test_list_metric_resp(self, metric_list_resp):
+ """Test list metric response function call."""
+ message = self.plugin_resp.generate_response("list_metric_response")
+ self.assertEqual(message, metric_list_resp.return_value)
+
+ @mock.patch.object(
+ resp.OpenStack_Response, "delete_alarm_response")
+ def test_delete_alarm_resp(self, del_alarm_resp):
+ """Test delete alarm response function call."""
+ message = self.plugin_resp.generate_response("delete_alarm_response")
+ self.assertEqual(message, del_alarm_resp.return_value)
+
+ @mock.patch.object(
+ resp.OpenStack_Response, "delete_metric_response")
+ def test_delete_metric_resp(self, del_metric_resp):
+ """Test the response functionality of delete metric response."""
+ message = self.plugin_resp.generate_response("delete_metric_response")
+ self.assertEqual(message, del_metric_resp.return_value)
+
+ @mock.patch.object(
+ resp.OpenStack_Response, "create_alarm_response")
+ def test_create_alarm_resp(self, config_alarm_resp):
+ """Test create alarm response function call."""
+ message = self.plugin_resp.generate_response("create_alarm_response")
+ self.assertEqual(message, config_alarm_resp.return_value)
+
+ @mock.patch.object(
+ resp.OpenStack_Response, "metric_create_response")
+ def test_create_metric_resp(self, config_metric_resp):
+ """Test create metric response function call."""
+ message = self.plugin_resp.generate_response("create_metric_response")
+ self.assertEqual(message, config_metric_resp.return_value)
+
+ @mock.patch.object(
+ resp.OpenStack_Response, "update_alarm_response")
+ def test_update_alarm_resp(self, up_alarm_resp):
+ """Test update alarm response function call."""
+ message = self.plugin_resp.generate_response("update_alarm_response")
+ self.assertEqual(message, up_alarm_resp.return_value)
+
+ @mock.patch.object(
+ resp.OpenStack_Response, "update_metric_response")
+ def test_update_metric_resp(self, up_metric_resp):
+ """Test update metric response function call."""
+ message = self.plugin_resp.generate_response("update_metric_response")
+ self.assertEqual(message, up_metric_resp.return_value)
+
+ @mock.patch.object(
+ resp.OpenStack_Response, "notify_alarm")
+ def test_notify_alarm(self, notify_alarm):
+ """Test notify alarm response function call."""
+ message = self.plugin_resp.generate_response("notify_alarm")
+ self.assertEqual(message, notify_alarm.return_value)
+
+ @mock.patch.object(
+ resp.OpenStack_Response, "read_metric_data_response")
+ def test_read_metric_data_resp(self, read_data_resp):
+ """Test read metric data response function call."""
+ message = self.plugin_resp.generate_response(
+ "read_metric_data_response")
+ self.assertEqual(message, read_data_resp.return_value)
--- /dev/null
+# Copyright 2017 Intel Research and Development Ireland Limited
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Intel Corporation
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: helena.mcgough@intel.com or adrian.hoban@intel.com
+##
+"""Tests for settings for OpenStack plugins configurations."""
+
+import logging
+
+import os
+
+import unittest
+
+import mock
+
+from plugins.OpenStack.settings import Config
+
+__author__ = "Helena McGough"
+
+log = logging.getLogger(__name__)
+
+
+class TestSettings(unittest.TestCase):
+ """Test the settings class for OpenStack plugin configuration."""
+
+ def setUp(self):
+ """Test Setup."""
+ super(TestSettings, self).setUp()
+ self.cfg = Config.instance()
+
+ def test_set_os_username(self):
+ """Test reading the environment for OpenStack plugin configuration."""
+ self.cfg.read_environ("my_service")
+
+ self.assertEqual(self.cfg.OS_USERNAME, "my_service")
+
+ @mock.patch.object(os, "environ")
+ def test_read_environ(self, environ):
+ """Test reading environment variables for configuration."""
+ self.cfg.read_environ("my_service")
+
+ # Called for each key in the configuration dictionary
+ environ.assert_called_once
# For those usages not covered by the Apache License, Version 2.0 please
# contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
##
+"""This is a KafkaProducer with a request function to test the plugins."""
-'''
-This is a kafka producer with a common request function to test the plugins.
-'''
+import json
+import logging as log
+import os
-from kafka import KafkaProducer as kaf
-from kafka.errors import KafkaError
-import logging as log
-import json
import jsmin
-import os
-import sys
-from os import listdir
-from jsmin import jsmin
-import os.path as path
+from kafka import KafkaProducer as kaf
+
+from kafka.errors import KafkaError
class KafkaProducer(object):
+ """A KafkaProducer for testing purposes."""
def __init__(self, topic):
-
- self._topic= topic
+ """Initialize a KafkaProducer and it's topic."""
+ self._topic = topic
if "ZOOKEEPER_URI" in os.environ:
broker = os.getenv("ZOOKEEPER_URI")
is already running.
'''
- self.producer = kaf(key_serializer=str.encode,
- value_serializer=lambda v: json.dumps(v).encode('ascii'),
- bootstrap_servers=broker, api_version=(0,10))
-
-
+ self.producer = kaf(
+ key_serializer=str.encode,
+ value_serializer=lambda v: json.dumps(v).encode('ascii'),
+ bootstrap_servers=broker, api_version=(0, 10))
def publish(self, key, value, topic):
+ """Send messages to the message bus with a defing key and topic."""
try:
- future = self.producer.send(key=key, value=value,topic=topic)
+ future = self.producer.send(topic=topic, key=key, value=value)
self.producer.flush()
except Exception:
log.exception("Error publishing to {} topic." .format(topic))
raise
try:
record_metadata = future.get(timeout=10)
- #self._log.debug("TOPIC:", record_metadata.topic)
- #self._log.debug("PARTITION:", record_metadata.partition)
- #self._log.debug("OFFSET:", record_metadata.offset)
+ log.debug("TOPIC:", record_metadata.topic)
+ log.debug("PARTITION:", record_metadata.partition)
+ log.debug("OFFSET:", record_metadata.offset)
except KafkaError:
pass
- json_path = path.abspath(path.join(os.getcwd(),"../.."))
-
- def request(self, path, key, message, topic):
- #External to MON
+ def request(self, path, key, message, topic):
+ """Test json files are loaded and sent on the message bus."""
+ # External to MON
payload_create_alarm = jsmin(open(os.path.join(path)).read())
self.publish(key=key,
- value = json.loads(payload_create_alarm),
- topic=topic)
-
-
+ value=json.loads(payload_create_alarm),
+ topic=topic)
VIRTUAL_ENV={envdir}
[testenv:pep8]
-commands = flake8 test
-
-[testenv:venv]
-commands = {posargs}
-
-[testenv:cover]
-commands = python setup.py test --coverage
+commands = flake8 plugins
[pep8]
max-line-length = 80