self.producer = kaf(
key_serializer=str.encode,
value_serializer=str.encode,
- bootstrap_servers=broker, api_version=(0, 10))
+ bootstrap_servers=broker, api_version=(0, 10, 1))
def publish(self, key, value, topic=None):
"""Send the required message on the Kafka message bus."""
try:
future = self.producer.send(topic=topic, key=key, value=value)
- record_metadata = future.get(timeout=10)
+ future.get(timeout=10)
except Exception:
logging.exception("Error publishing to {} topic." .format(topic))
raise
- try:
- logging.debug("TOPIC:", record_metadata.topic)
- logging.debug("PARTITION:", record_metadata.partition)
- logging.debug("OFFSET:", record_metadata.offset)
- except KafkaError:
- pass
def publish_alarm_request(self, key, message):
"""Publish an alarm request."""