Change-Id: I451d271bfca48047d4441347a9aa1969f42c3330
Signed-off-by: tierno <alfonso.tiernosepulveda@telefonica.com>
-version = '0.1.5'
-date_version = '2018-05-14'
+version = '0.1.6'
+date_version = '2018-08-31'
self.producer = None
self.loop = None
self.broker = None
self.producer = None
self.loop = None
self.broker = None
def connect(self, config):
try:
def connect(self, config):
try:
self.port = config["port"]
self.loop = asyncio.get_event_loop()
self.broker = str(self.host) + ":" + str(self.port)
self.port = config["port"]
self.loop = asyncio.get_event_loop()
self.broker = str(self.host) + ":" + str(self.port)
+ self.group_id = config.get("group_id")
except Exception as e: # TODO refine
raise MsgException(str(e))
except Exception as e: # TODO refine
raise MsgException(str(e))
else:
topic_list = (topic,)
else:
topic_list = (topic,)
- self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker)
+ self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker, group_id=self.group_id)
await self.consumer.start()
self.consumer.subscribe(topic_list)
await self.consumer.start()
self.consumer.subscribe(topic_list)