From 6a76e7045a98043170ac6517b3bf57481d5275e5 Mon Sep 17 00:00:00 2001 From: prithiv Date: Mon, 2 Oct 2017 00:38:02 +0100 Subject: [PATCH] Tests for Kafka Producer and Consumer Signed-off-by: prithiv --- test/core/__init__.py | 26 +++++++++++++++++ test/core/kafka_test.py | 64 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+) create mode 100644 test/core/__init__.py create mode 100644 test/core/kafka_test.py diff --git a/test/core/__init__.py b/test/core/__init__.py new file mode 100644 index 0000000..1dad343 --- /dev/null +++ b/test/core/__init__.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright 2017 Intel Research and Development Ireland Limited +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Intel Corporation + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: prithiv.mohan@intel.com or adrian.hoban@intel.com + +#__author__ = "Prithiv Mohan" +#__date__ = "25/Sep/2017" diff --git a/test/core/kafka_test.py b/test/core/kafka_test.py new file mode 100644 index 0000000..646e00b --- /dev/null +++ b/test/core/kafka_test.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright 2017 Intel Research and Development Ireland Limited +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Intel Corporation + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: prithiv.mohan@intel.com or adrian.hoban@intel.com + +#__author__ = "Prithiv Mohan" +#__date__ = "25/Sep/2017" + +import sys +import threading +import pytest +from kafka import KafkaConsumer, KafkaProducer + +def test_end_to_end(kafka_broker): + connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)]) + producer = KafkaProducer(bootstrap_servers=connect_str, + retries=5, + max_block_ms=10000, + value_serializer=str.encode) + consumer = KafkaConsumer(bootstrap_servers=connect_str, + group_id=None, + consumer_timeout_ms=10000, + auto_offset_reset='earliest', + value_deserializer=bytes.decode) + + topic = 'TutorialTopic' + + messages = 100 + futures = [] + for i in range(messages): + futures.append(producer.send(topic, 'msg %d' % i)) + ret = [f.get(timeout=30) for f in futures] + assert len(ret) == messages + + producer.close() + + consumer.subscribe([topic]) + msgs = set() + for i in range(messages): + try: + msgs.add(next(consumer).value) + except StopIteration: + break + + assert msgs == set(['msg %d' % i for i in range(messages)]) -- 2.25.1