# Check if kafka is ready
while(True):
port_in_use = False
- # Logic to check kafka is ready
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
- if s.connect_ex(("kafka", int(config.conf["message"].get("port")))) == 0:
- port_in_use = True
+ try:
+ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
+ if (
+ s.connect_ex(
+ (
+ config.conf.get("message", {}).get("host", "kafka"),
+ int(config.conf["message"].get("port")),
+ )
+ )
+ == 0
+ ):
+ port_in_use = True
+ except Exception as e:
+ logging.info("Error when trying to get kafka status.")
+ logging.debug("Exception when trying to get kafka status: %s", str(e))
if port_in_use:
break
else: