Removes threading from common_consumer 13/6713/1
authorBenjamin Diaz <bdiaz@whitestack.com>
Wed, 17 Oct 2018 15:45:02 +0000 (12:45 -0300)
committerBenjamin Diaz <bdiaz@whitestack.com>
Wed, 17 Oct 2018 15:45:02 +0000 (12:45 -0300)
It appears KafkaConsumer is not threadsafe, and it was generating
issues regarding the sending of heartbeats, causing kafka to kill
connections and causing coordinator dead errors.
Also removes heartbeat and session timeouts from consumer config.
They were generating conflicts with different default request
timeouts, and were in place to leverage issues regarding what is
mentioned above.

Signed-off-by: Benjamin Diaz <bdiaz@whitestack.com>
osm_mon/core/message_bus/common_consumer.py
osm_mon/core/message_bus/consumer.py

index b8e33d2..0b61e1e 100755 (executable)
@@ -22,7 +22,6 @@
 import json
 import logging
 import sys
-import threading
 from json import JSONDecodeError
 
 import six
@@ -95,8 +94,7 @@ class CommonConsumer:
 
         log.info("Listening for messages...")
         for message in common_consumer:
-            t = threading.Thread(target=self.consume_message, args=(message,))
-            t.start()
+            self.consume_message(message)
 
     def consume_message(self, message):
         log.info("Message arrived: %s", message)
index b12cd88..363b9fc 100644 (file)
@@ -10,6 +10,4 @@ class Consumer(KafkaConsumer):
         super().__init__(bootstrap_servers=cfg.BROKER_URI,
                          key_deserializer=bytes.decode,
                          value_deserializer=bytes.decode,
-                         group_id=group_id,
-                         session_timeout_ms=60000,
-                         heartbeat_interval_ms=20000)
+                         group_id=group_id)