projects
/
osm
/
NBI.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
docker: Fix debian channel to stable
[osm/NBI.git]
/
osm_nbi
/
subscriptions.py
diff --git
a/osm_nbi/subscriptions.py
b/osm_nbi/subscriptions.py
index
39eed76
..
03bb92b
100644
(file)
--- a/
osm_nbi/subscriptions.py
+++ b/
osm_nbi/subscriptions.py
@@
-81,7
+81,7
@@
class SubscriptionThread(threading.Thread):
kafka_working = True
await asyncio.sleep(10, loop=self.loop)
self.aiomain_task = asyncio.ensure_future(self.msg.aioread(("ns", "nsi"), loop=self.loop,
kafka_working = True
await asyncio.sleep(10, loop=self.loop)
self.aiomain_task = asyncio.ensure_future(self.msg.aioread(("ns", "nsi"), loop=self.loop,
- callback=self._msg_callback),
+
aio
callback=self._msg_callback),
loop=self.loop)
await asyncio.wait_for(self.aiomain_task, timeout=None, loop=self.loop)
except Exception as e:
loop=self.loop)
await asyncio.wait_for(self.aiomain_task, timeout=None, loop=self.loop)
except Exception as e:
@@
-141,7
+141,7
@@
class SubscriptionThread(threading.Thread):
self._stop()
self.loop.close()
self._stop()
self.loop.close()
- def _msg_callback(self, topic, command, params):
+
async
def _msg_callback(self, topic, command, params):
"""
Callback to process a received message from kafka
:param topic: topic received
"""
Callback to process a received message from kafka
:param topic: topic received
@@
-149,21
+149,27
@@
class SubscriptionThread(threading.Thread):
:param params: rest of parameters
:return: None
"""
:param params: rest of parameters
:return: None
"""
+ msg_to_send = []
try:
if topic == "ns":
if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
self.logger.debug("received ns terminated {}".format(params))
if params.get("autoremove"):
try:
if topic == "ns":
if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
self.logger.debug("received ns terminated {}".format(params))
if params.get("autoremove"):
- self.engine.del_item(self.internal_session, "nsrs", _id=params["nsr_id"])
+ self.engine.del_item(self.internal_session, "nsrs", _id=params["nsr_id"],
+ not_send_msg=msg_to_send)
self.logger.debug("ns={} deleted from database".format(params["nsr_id"]))
self.logger.debug("ns={} deleted from database".format(params["nsr_id"]))
- return
- if topic == "nsi":
+ elif topic == "nsi":
if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
self.logger.debug("received nsi terminated {}".format(params))
if params.get("autoremove"):
if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
self.logger.debug("received nsi terminated {}".format(params))
if params.get("autoremove"):
- self.engine.del_item(self.internal_session, "nsis", _id=params["nsir_id"])
+ self.engine.del_item(self.internal_session, "nsis", _id=params["nsir_id"],
+ not_send_msg=msg_to_send)
self.logger.debug("nsis={} deleted from database".format(params["nsir_id"]))
self.logger.debug("nsis={} deleted from database".format(params["nsir_id"]))
- return
+
+ # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that,
+ # but content to be written is stored at msg_to_send
+ for msg in msg_to_send:
+ await self.msg.aiowrite(*msg, loop=self.loop)
except (EngineException, DbException, MsgException) as e:
self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e))
except Exception as e:
except (EngineException, DbException, MsgException) as e:
self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e))
except Exception as e: