X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=policy_module%2Fosm_policy_module%2Ftests%2Fintegration%2Ftest_scaling_config_kafka_msg.py;h=aea3f4aab3d6572fc9ece5d05e72830d08efc61a;hb=24b8309395b534ffe4bff9b07f665951555ac955;hp=a44426586526362974bf6a7d8ece699c8a3dec27;hpb=62781ff00b30790610cf4cc2ef5ed5422c571e10;p=osm%2FMON.git diff --git a/policy_module/osm_policy_module/tests/integration/test_scaling_config_kafka_msg.py b/policy_module/osm_policy_module/tests/integration/test_scaling_config_kafka_msg.py index a444265..aea3f4a 100644 --- a/policy_module/osm_policy_module/tests/integration/test_scaling_config_kafka_msg.py +++ b/policy_module/osm_policy_module/tests/integration/test_scaling_config_kafka_msg.py @@ -1,34 +1,62 @@ +# -*- coding: utf-8 -*- + +# Copyright 2018 Whitestack, LLC +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Whitestack, LLC + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: bdiaz@whitestack.com or glavado@whitestack.com +## import json import logging import os import unittest -from kafka import KafkaProducer +from kafka import KafkaProducer, KafkaConsumer +from kafka.errors import KafkaError log = logging.getLogger(__name__) -# logging.basicConfig(stream=sys.stdout, -# format='%(asctime)s %(message)s', -# datefmt='%m/%d/%Y %I:%M:%S %p', -# level=logging.DEBUG) - class ScalingConfigTest(unittest.TestCase): + def setUp(self): + try: + kafka_server = '{}:{}'.format(os.getenv("KAFKA_SERVER_HOST", "localhost"), + os.getenv("KAFKA_SERVER_PORT", "9092")) + self.producer = KafkaProducer(bootstrap_servers=kafka_server, + key_serializer=str.encode, + value_serializer=str.encode) + self.consumer = KafkaConsumer(bootstrap_servers='localhost:9092', + group_id='osm_mon') + self.consumer.subscribe(['lcm_pm']) + except KafkaError: + self.skipTest('Kafka server not present.') + def test_send_scaling_config_msg(self): try: with open( os.path.join(os.path.dirname(__file__), '../examples/configure_scaling_full_example.json')) as file: payload = json.load(file) - kafka_server = '{}:{}'.format(os.getenv("KAFKA_SERVER_HOST", "localhost"), - os.getenv("KAFKA_SERVER_PORT", "9092")) - producer = KafkaProducer(bootstrap_servers=kafka_server, - key_serializer=str.encode, - value_serializer=str.encode) - future = producer.send('lcm_pm', json.dumps(payload), key="configure_scaling") + future = self.producer.send('lcm_pm', json.dumps(payload), key="configure_scaling") result = future.get(timeout=60) log.info('Result: %s', result) - producer.flush() + self.producer.flush() + # TODO: Improve assertions self.assertIsNotNone(result) except Exception as e: self.fail(e)