fix bug 534: allow group_id for kafka consumer
Change-Id: I451d271bfca48047d4441347a9aa1969f42c3330
Signed-off-by: tierno <alfonso.tiernosepulveda@telefonica.com>
diff --git a/osm_common/msgkafka.py b/osm_common/msgkafka.py
index 2d82f97..3b38956 100644
--- a/osm_common/msgkafka.py
+++ b/osm_common/msgkafka.py
@@ -20,6 +20,7 @@
self.producer = None
self.loop = None
self.broker = None
+ self.group_id = None
def connect(self, config):
try:
@@ -29,6 +30,7 @@
self.port = config["port"]
self.loop = asyncio.get_event_loop()
self.broker = str(self.host) + ":" + str(self.port)
+ self.group_id = config.get("group_id")
except Exception as e: # TODO refine
raise MsgException(str(e))
@@ -99,7 +101,7 @@
else:
topic_list = (topic,)
- self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker)
+ self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker, group_id=self.group_id)
await self.consumer.start()
self.consumer.subscribe(topic_list)