X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Ftest%2Fcore%2Fkafka_test.py;fp=osm_mon%2Ftest%2Fcore%2Fkafka_test.py;h=646e00bb04133ded90c638fabff3b137dee7e46e;hb=c7397b95dbaeebd7d872779eec809daed9e487cc;hp=0000000000000000000000000000000000000000;hpb=71ce7eca516321aff84332df56702e718968735b;p=osm%2FMON.git diff --git a/osm_mon/test/core/kafka_test.py b/osm_mon/test/core/kafka_test.py new file mode 100644 index 0000000..646e00b --- /dev/null +++ b/osm_mon/test/core/kafka_test.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright 2017 Intel Research and Development Ireland Limited +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Intel Corporation + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: prithiv.mohan@intel.com or adrian.hoban@intel.com + +#__author__ = "Prithiv Mohan" +#__date__ = "25/Sep/2017" + +import sys +import threading +import pytest +from kafka import KafkaConsumer, KafkaProducer + +def test_end_to_end(kafka_broker): + connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)]) + producer = KafkaProducer(bootstrap_servers=connect_str, + retries=5, + max_block_ms=10000, + value_serializer=str.encode) + consumer = KafkaConsumer(bootstrap_servers=connect_str, + group_id=None, + consumer_timeout_ms=10000, + auto_offset_reset='earliest', + value_deserializer=bytes.decode) + + topic = 'TutorialTopic' + + messages = 100 + futures = [] + for i in range(messages): + futures.append(producer.send(topic, 'msg %d' % i)) + ret = [f.get(timeout=30) for f in futures] + assert len(ret) == messages + + producer.close() + + consumer.subscribe([topic]) + msgs = set() + for i in range(messages): + try: + msgs.add(next(consumer).value) + except StopIteration: + break + + assert msgs == set(['msg %d' % i for i in range(messages)])