self.producer = None
self.loop = None
self.broker = None
+ self.group_id = None
def connect(self, config):
try:
self.port = config["port"]
self.loop = asyncio.get_event_loop()
self.broker = str(self.host) + ":" + str(self.port)
+ self.group_id = config.get("group_id")
except Exception as e: # TODO refine
raise MsgException(str(e))
else:
topic_list = (topic,)
- self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker)
+ self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker, group_id=self.group_id)
await self.consumer.start()
self.consumer.subscribe(topic_list)