self._topic= topic
- if "ZOOKEEPER_URI" in os.environ:
- broker = os.getenv("ZOOKEEPER_URI")
+ if "BROKER_URI" in os.environ:
+ broker = os.getenv("BROKER_URI")
else:
- broker = "localhost:2181"
+ broker = "localhost:9092"
'''
- If the zookeeper broker URI is not set in the env, by default,
+ If the broker URI is not set in the env, by default,
localhost container is taken as the host because an instance of
is already running.
'''
def publish(self, key, value, topic=None):
try:
- future = self.producer.send(key, value, topic)
+ future = self.producer.send(topic=topic, key=key, value=value)
self.producer.flush()
except Exception:
logging.exception("Error publishing to {} topic." .format(topic))