Database errors could mean database node failure. In this cases, the process
should exit so it could be automatically restarted (when handled by Docker Swarm, K8s, etc)
so it can obtain a new DB connection to an alive node.
Change-Id: I42a3954fe238a0445101eb71add79aaef4315c2e
Signed-off-by: Benjamin Diaz <bdiaz@whitestack.com>
import logging
from json import JSONDecodeError
import logging
from json import JSONDecodeError
import yaml
from aiokafka import AIOKafkaConsumer
import yaml
from aiokafka import AIOKafkaConsumer
await self._process_msg(msg.topic, msg.key, msg.value)
finally:
await consumer.stop()
await self._process_msg(msg.topic, msg.key, msg.value)
finally:
await consumer.stop()
+ log.critical("Exiting...")
async def _process_msg(self, topic, key, msg):
log.debug("_process_msg topic=%s key=%s msg=%s", topic, key, msg)
async def _process_msg(self, topic, key, msg):
log.debug("_process_msg topic=%s key=%s msg=%s", topic, key, msg)
await self._handle_alarm_notification(content)
else:
log.debug("Key %s is not in ALLOWED_KAFKA_KEYS", key)
await self._handle_alarm_notification(content)
else:
log.debug("Key %s is not in ALLOWED_KAFKA_KEYS", key)
+ except peewee.PeeweeException:
+ log.exception("Database error consuming message: ")
+ raise
except Exception:
log.exception("Error consuming message: ")
except Exception:
log.exception("Error consuming message: ")