From 73da4fadae4efc8b32375723f37e482293d847ac Mon Sep 17 00:00:00 2001 From: tierno Date: Fri, 31 Aug 2018 13:50:59 +0000 Subject: [PATCH] fix bug 534: allow group_id for kafka consumer Change-Id: I451d271bfca48047d4441347a9aa1969f42c3330 Signed-off-by: tierno --- osm_common/__init__.py | 4 ++-- osm_common/msgkafka.py | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/osm_common/__init__.py b/osm_common/__init__.py index 4855fe7..601b140 100644 --- a/osm_common/__init__.py +++ b/osm_common/__init__.py @@ -1,2 +1,2 @@ -version = '0.1.5' -date_version = '2018-05-14' +version = '0.1.6' +date_version = '2018-08-31' diff --git a/osm_common/msgkafka.py b/osm_common/msgkafka.py index 2d82f97..3b38956 100644 --- a/osm_common/msgkafka.py +++ b/osm_common/msgkafka.py @@ -20,6 +20,7 @@ class MsgKafka(MsgBase): self.producer = None self.loop = None self.broker = None + self.group_id = None def connect(self, config): try: @@ -29,6 +30,7 @@ class MsgKafka(MsgBase): self.port = config["port"] self.loop = asyncio.get_event_loop() self.broker = str(self.host) + ":" + str(self.port) + self.group_id = config.get("group_id") except Exception as e: # TODO refine raise MsgException(str(e)) @@ -99,7 +101,7 @@ class MsgKafka(MsgBase): else: topic_list = (topic,) - self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker) + self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker, group_id=self.group_id) await self.consumer.start() self.consumer.subscribe(topic_list) -- 2.25.1