From 8280121bd3684354e83cd43eb91ad616405e2e8d Mon Sep 17 00:00:00 2001 From: Benjamin Diaz Date: Wed, 17 Oct 2018 12:45:02 -0300 Subject: [PATCH] Removes threading from common_consumer It appears KafkaConsumer is not threadsafe, and it was generating issues regarding the sending of heartbeats, causing kafka to kill connections and causing coordinator dead errors. Also removes heartbeat and session timeouts from consumer config. They were generating conflicts with different default request timeouts, and were in place to leverage issues regarding what is mentioned above. Signed-off-by: Benjamin Diaz --- osm_mon/core/message_bus/common_consumer.py | 4 +--- osm_mon/core/message_bus/consumer.py | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/osm_mon/core/message_bus/common_consumer.py b/osm_mon/core/message_bus/common_consumer.py index b8e33d2..0b61e1e 100755 --- a/osm_mon/core/message_bus/common_consumer.py +++ b/osm_mon/core/message_bus/common_consumer.py @@ -22,7 +22,6 @@ import json import logging import sys -import threading from json import JSONDecodeError import six @@ -95,8 +94,7 @@ class CommonConsumer: log.info("Listening for messages...") for message in common_consumer: - t = threading.Thread(target=self.consume_message, args=(message,)) - t.start() + self.consume_message(message) def consume_message(self, message): log.info("Message arrived: %s", message) diff --git a/osm_mon/core/message_bus/consumer.py b/osm_mon/core/message_bus/consumer.py index b12cd88..363b9fc 100644 --- a/osm_mon/core/message_bus/consumer.py +++ b/osm_mon/core/message_bus/consumer.py @@ -10,6 +10,4 @@ class Consumer(KafkaConsumer): super().__init__(bootstrap_servers=cfg.BROKER_URI, key_deserializer=bytes.decode, value_deserializer=bytes.decode, - group_id=group_id, - session_timeout_ms=60000, - heartbeat_interval_ms=20000) + group_id=group_id) -- 2.25.1