self.logger.error("Task Kafka Exception {}".format(e))
await asyncio.sleep(1, loop=self.loop)
self.logger.debug("Task Kafka terminating")
+ self.logger.debug("Task Kafka exit")
+
+ def start(self):
+ self.loop = asyncio.get_event_loop()
+ self.loop.run_until_complete(self.read_kafka())
# TODO
+ # self.logger.debug("Terminating cancelling creation tasks")
# self.cancel_tasks("ALL", "create")
# timeout = 200
# while self.is_pending_tasks():
# timeout -= 2
# if not timeout:
# self.cancel_tasks("ALL", "ALL")
- self.logger.debug("Task Kafka exit")
-
- def start(self):
- self.loop = asyncio.get_event_loop()
- self.loop.run_until_complete(self.read_kafka())
self.loop.close()
self.loop = None
if self.db: