00d6ada3f63070563743e4d22c5149c2166bcc59
1 # -*- coding: utf-8 -*-
3 vROPs Kafka Consumer that consumes the request messages
7 from kafka
import KafkaConsumer
8 from kafka
.errors
import KafkaError
11 class vROP_KafkaConsumer(object):
13 Kafka Consumer for vROPs
16 def __init__(self
, topics
=[], broker_uri
=None):
18 Method to initize KafkaConsumer
20 broker_uri - hostname:port uri of Kafka broker
21 topics - list of topics to subscribe
26 if broker_uri
is None:
27 self
.broker
= '0.0.0.0:9092'
29 self
.broker
= broker_uri
32 print ("vROPs Consumer started, Broker URI: {}".format(self
.broker
))
33 print ("Subscribed Topics {}".format(self
.topic
))
35 self
.vrops_consumer
= KafkaConsumer(bootstrap_servers
=self
.broker
)
36 self
.vrops_consumer
.subscribe(self
.topic
)
37 except Exception as exp
:
38 msg
= "fail to create consumer for topic {} with broker {} Error : {}"\
39 .format(self
.topic
, self
.broker
, exp
)