X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FNBI.git;a=blobdiff_plain;f=osm_nbi%2Fsubscriptions.py;h=03bb92b65f4f084b001e18389d65af1b5dc26d4b;hp=39eed76f18e6a3e7403f347bddfc65f3cd8461a2;hb=bee3bad8d15fe0893855d0dff92cef4351629edb;hpb=23acf4001306e92a587de566be4bab00931104ba diff --git a/osm_nbi/subscriptions.py b/osm_nbi/subscriptions.py index 39eed76..03bb92b 100644 --- a/osm_nbi/subscriptions.py +++ b/osm_nbi/subscriptions.py @@ -81,7 +81,7 @@ class SubscriptionThread(threading.Thread): kafka_working = True await asyncio.sleep(10, loop=self.loop) self.aiomain_task = asyncio.ensure_future(self.msg.aioread(("ns", "nsi"), loop=self.loop, - callback=self._msg_callback), + aiocallback=self._msg_callback), loop=self.loop) await asyncio.wait_for(self.aiomain_task, timeout=None, loop=self.loop) except Exception as e: @@ -141,7 +141,7 @@ class SubscriptionThread(threading.Thread): self._stop() self.loop.close() - def _msg_callback(self, topic, command, params): + async def _msg_callback(self, topic, command, params): """ Callback to process a received message from kafka :param topic: topic received @@ -149,21 +149,27 @@ class SubscriptionThread(threading.Thread): :param params: rest of parameters :return: None """ + msg_to_send = [] try: if topic == "ns": if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"): self.logger.debug("received ns terminated {}".format(params)) if params.get("autoremove"): - self.engine.del_item(self.internal_session, "nsrs", _id=params["nsr_id"]) + self.engine.del_item(self.internal_session, "nsrs", _id=params["nsr_id"], + not_send_msg=msg_to_send) self.logger.debug("ns={} deleted from database".format(params["nsr_id"])) - return - if topic == "nsi": + elif topic == "nsi": if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"): self.logger.debug("received nsi terminated {}".format(params)) if params.get("autoremove"): - self.engine.del_item(self.internal_session, "nsis", _id=params["nsir_id"]) + self.engine.del_item(self.internal_session, "nsis", _id=params["nsir_id"], + not_send_msg=msg_to_send) self.logger.debug("nsis={} deleted from database".format(params["nsir_id"])) - return + + # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that, + # but content to be written is stored at msg_to_send + for msg in msg_to_send: + await self.msg.aiowrite(*msg, loop=self.loop) except (EngineException, DbException, MsgException) as e: self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e)) except Exception as e: