From: Benjamin Diaz Date: Wed, 17 Oct 2018 15:45:02 +0000 (-0300) Subject: Removes threading from common_consumer X-Git-Tag: v5.0.0~22 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=8280121bd3684354e83cd43eb91ad616405e2e8d;p=osm%2FMON.git Removes threading from common_consumer It appears KafkaConsumer is not threadsafe, and it was generating issues regarding the sending of heartbeats, causing kafka to kill connections and causing coordinator dead errors. Also removes heartbeat and session timeouts from consumer config. They were generating conflicts with different default request timeouts, and were in place to leverage issues regarding what is mentioned above. Signed-off-by: Benjamin Diaz --- diff --git a/osm_mon/core/message_bus/common_consumer.py b/osm_mon/core/message_bus/common_consumer.py index b8e33d23..0b61e1ee 100755 --- a/osm_mon/core/message_bus/common_consumer.py +++ b/osm_mon/core/message_bus/common_consumer.py @@ -22,7 +22,6 @@ import json import logging import sys -import threading from json import JSONDecodeError import six @@ -95,8 +94,7 @@ class CommonConsumer: log.info("Listening for messages...") for message in common_consumer: - t = threading.Thread(target=self.consume_message, args=(message,)) - t.start() + self.consume_message(message) def consume_message(self, message): log.info("Message arrived: %s", message) diff --git a/osm_mon/core/message_bus/consumer.py b/osm_mon/core/message_bus/consumer.py index b12cd888..363b9fc7 100644 --- a/osm_mon/core/message_bus/consumer.py +++ b/osm_mon/core/message_bus/consumer.py @@ -10,6 +10,4 @@ class Consumer(KafkaConsumer): super().__init__(bootstrap_servers=cfg.BROKER_URI, key_deserializer=bytes.decode, value_deserializer=bytes.decode, - group_id=group_id, - session_timeout_ms=60000, - heartbeat_interval_ms=20000) + group_id=group_id)