fix bug 534: allow group_id for kafka consumer
[osm/common.git] / osm_common / msgkafka.py
index 2d82f97..3b38956 100644 (file)
@@ -20,6 +20,7 @@ class MsgKafka(MsgBase):
         self.producer = None
         self.loop = None
         self.broker = None
+        self.group_id = None
 
     def connect(self, config):
         try:
@@ -29,6 +30,7 @@ class MsgKafka(MsgBase):
             self.port = config["port"]
             self.loop = asyncio.get_event_loop()
             self.broker = str(self.host) + ":" + str(self.port)
+            self.group_id = config.get("group_id")
 
         except Exception as e:  # TODO refine
             raise MsgException(str(e))
@@ -99,7 +101,7 @@ class MsgKafka(MsgBase):
             else:
                 topic_list = (topic,)
 
-            self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker)
+            self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker, group_id=self.group_id)
             await self.consumer.start()
             self.consumer.subscribe(topic_list)