import logging
from json import JSONDecodeError
+import peewee
import yaml
from aiokafka import AIOKafkaConsumer
await self._process_msg(msg.topic, msg.key, msg.value)
finally:
await consumer.stop()
+ log.critical("Exiting...")
async def _process_msg(self, topic, key, msg):
log.debug("_process_msg topic=%s key=%s msg=%s", topic, key, msg)
await self._handle_alarm_notification(content)
else:
log.debug("Key %s is not in ALLOWED_KAFKA_KEYS", key)
+ except peewee.PeeweeException:
+ log.exception("Database error consuming message: ")
+ raise
except Exception:
log.exception("Error consuming message: ")