kafka_working = True
await asyncio.sleep(10, loop=self.loop)
self.aiomain_task = asyncio.ensure_future(self.msg.aioread(("ns", "nsi"), loop=self.loop,
- callback=self._msg_callback),
+ aiocallback=self._msg_callback),
loop=self.loop)
await asyncio.wait_for(self.aiomain_task, timeout=None, loop=self.loop)
except Exception as e:
self._stop()
self.loop.close()
- def _msg_callback(self, topic, command, params):
+ async def _msg_callback(self, topic, command, params):
"""
Callback to process a received message from kafka
:param topic: topic received
:param params: rest of parameters
:return: None
"""
+ msg_to_send = []
try:
if topic == "ns":
if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
self.logger.debug("received ns terminated {}".format(params))
if params.get("autoremove"):
- self.engine.del_item(self.internal_session, "nsrs", _id=params["nsr_id"])
+ self.engine.del_item(self.internal_session, "nsrs", _id=params["nsr_id"],
+ not_send_msg=msg_to_send)
self.logger.debug("ns={} deleted from database".format(params["nsr_id"]))
- return
- if topic == "nsi":
+ elif topic == "nsi":
if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
self.logger.debug("received nsi terminated {}".format(params))
if params.get("autoremove"):
- self.engine.del_item(self.internal_session, "nsis", _id=params["nsir_id"])
+ self.engine.del_item(self.internal_session, "nsis", _id=params["nsir_id"],
+ not_send_msg=msg_to_send)
self.logger.debug("nsis={} deleted from database".format(params["nsir_id"]))
- return
+
+ # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that,
+ # but content to be written is stored at msg_to_send
+ for msg in msg_to_send:
+ await self.msg.aiowrite(*msg, loop=self.loop)
except (EngineException, DbException, MsgException) as e:
self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e))
except Exception as e: