From: Benjamin Diaz Date: Wed, 17 Oct 2018 15:45:02 +0000 (-0300) Subject: Removes threading from common_consumer X-Git-Tag: v5.0.0~22 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=8280121bd3684354e83cd43eb91ad616405e2e8d;p=osm%2FMON.git Removes threading from common_consumer It appears KafkaConsumer is not threadsafe, and it was generating issues regarding the sending of heartbeats, causing kafka to kill connections and causing coordinator dead errors. Also removes heartbeat and session timeouts from consumer config. They were generating conflicts with different default request timeouts, and were in place to leverage issues regarding what is mentioned above. Signed-off-by: Benjamin Diaz --- diff --git a/osm_mon/core/message_bus/common_consumer.py b/osm_mon/core/message_bus/common_consumer.py index b8e33d2..0b61e1e 100755 --- a/osm_mon/core/message_bus/common_consumer.py +++ b/osm_mon/core/message_bus/common_consumer.py @@ -22,7 +22,6 @@ import json import logging import sys -import threading from json import JSONDecodeError import six @@ -95,8 +94,7 @@ class CommonConsumer: log.info("Listening for messages...") for message in common_consumer: - t = threading.Thread(target=self.consume_message, args=(message,)) - t.start() + self.consume_message(message) def consume_message(self, message): log.info("Message arrived: %s", message) diff --git a/osm_mon/core/message_bus/consumer.py b/osm_mon/core/message_bus/consumer.py index b12cd88..363b9fc 100644 --- a/osm_mon/core/message_bus/consumer.py +++ b/osm_mon/core/message_bus/consumer.py @@ -10,6 +10,4 @@ class Consumer(KafkaConsumer): super().__init__(bootstrap_servers=cfg.BROKER_URI, key_deserializer=bytes.decode, value_deserializer=bytes.decode, - group_id=group_id, - session_timeout_ms=60000, - heartbeat_interval_ms=20000) + group_id=group_id)