2 # -*- coding: utf-8 -*-
4 # Copyright 2017 Intel Research and Development Ireland Limited
5 # *************************************************************
7 # This file is part of OSM Monitoring module
8 # All Rights Reserved to Intel Corporation
10 # Licensed under the Apache License, Version 2.0 (the "License"); you may
11 # not use this file except in compliance with the License. You may obtain
12 # a copy of the License at
14 # http://www.apache.org/licenses/LICENSE-2.0
16 # Unless required by applicable law or agreed to in writing, software
17 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19 # License for the specific language governing permissions and limitations
22 # For those usages not covered by the Apache License, Version 2.0 please
23 # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
25 #__author__ = "Prithiv Mohan"
26 #__date__ = "25/Sep/2017"
31 from kafka
import KafkaConsumer
, KafkaProducer
33 def test_end_to_end(kafka_broker
):
34 connect_str
= ':'.join([kafka_broker
.host
, str(kafka_broker
.port
)])
35 producer
= KafkaProducer(bootstrap_servers
=connect_str
,
38 value_serializer
=str.encode
)
39 consumer
= KafkaConsumer(bootstrap_servers
=connect_str
,
41 consumer_timeout_ms
=10000,
42 auto_offset_reset
='earliest',
43 value_deserializer
=bytes
.decode
)
45 topic
= 'TutorialTopic'
49 for i
in range(messages
):
50 futures
.append(producer
.send(topic
, 'msg %d' % i
))
51 ret
= [f
.get(timeout
=30) for f
in futures
]
52 assert len(ret
) == messages
56 consumer
.subscribe([topic
])
58 for i
in range(messages
):
60 msgs
.add(next(consumer
).value
)
64 assert msgs
== set(['msg %d' % i
for i
in range(messages
)])