fix bug 534: allow group_id for kafka consumer 51/6451/1
authortierno <alfonso.tiernosepulveda@telefonica.com>
Fri, 31 Aug 2018 13:50:59 +0000 (13:50 +0000)
committertierno <alfonso.tiernosepulveda@telefonica.com>
Fri, 31 Aug 2018 13:52:49 +0000 (13:52 +0000)
Change-Id: I451d271bfca48047d4441347a9aa1969f42c3330
Signed-off-by: tierno <alfonso.tiernosepulveda@telefonica.com>
osm_common/__init__.py
osm_common/msgkafka.py

index 4855fe7..601b140 100644 (file)
@@ -1,2 +1,2 @@
-version = '0.1.5'
-date_version = '2018-05-14'
+version = '0.1.6'
+date_version = '2018-08-31'
index 2d82f97..3b38956 100644 (file)
@@ -20,6 +20,7 @@ class MsgKafka(MsgBase):
         self.producer = None
         self.loop = None
         self.broker = None
+        self.group_id = None
 
     def connect(self, config):
         try:
@@ -29,6 +30,7 @@ class MsgKafka(MsgBase):
             self.port = config["port"]
             self.loop = asyncio.get_event_loop()
             self.broker = str(self.host) + ":" + str(self.port)
+            self.group_id = config.get("group_id")
 
         except Exception as e:  # TODO refine
             raise MsgException(str(e))
@@ -99,7 +101,7 @@ class MsgKafka(MsgBase):
             else:
                 topic_list = (topic,)
 
-            self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker)
+            self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker, group_id=self.group_id)
             await self.consumer.start()
             self.consumer.subscribe(topic_list)