It appears KafkaConsumer is not threadsafe, and it was generating
issues regarding the sending of heartbeats, causing kafka to kill
connections and causing coordinator dead errors.
Also removes heartbeat and session timeouts from consumer config.
They were generating conflicts with different default request
timeouts, and were in place to leverage issues regarding what is
mentioned above.
Signed-off-by: Benjamin Diaz <bdiaz@whitestack.com>
import json
import logging
import sys
-import threading
from json import JSONDecodeError
import six
log.info("Listening for messages...")
for message in common_consumer:
- t = threading.Thread(target=self.consume_message, args=(message,))
- t.start()
+ self.consume_message(message)
def consume_message(self, message):
log.info("Message arrived: %s", message)
super().__init__(bootstrap_servers=cfg.BROKER_URI,
key_deserializer=bytes.decode,
value_deserializer=bytes.decode,
- group_id=group_id,
- session_timeout_ms=60000,
- heartbeat_interval_ms=20000)
+ group_id=group_id)