fix bug 937. NSs are not deleted from database upon NSI deletion
[osm/NBI.git] / osm_nbi / subscriptions.py
index 156f466..03bb92b 100644 (file)
@@ -27,7 +27,7 @@ from http import HTTPStatus
 from osm_common import dbmongo, dbmemory, msglocal, msgkafka
 from osm_common.dbbase import DbException
 from osm_common.msgbase import MsgException
 from osm_common import dbmongo, dbmemory, msglocal, msgkafka
 from osm_common.dbbase import DbException
 from osm_common.msgbase import MsgException
-from engine import EngineException
+from osm_nbi.engine import EngineException
 
 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
 
 
 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
 
@@ -81,7 +81,7 @@ class SubscriptionThread(threading.Thread):
                     kafka_working = True
                 await asyncio.sleep(10, loop=self.loop)
                 self.aiomain_task = asyncio.ensure_future(self.msg.aioread(("ns", "nsi"), loop=self.loop,
                     kafka_working = True
                 await asyncio.sleep(10, loop=self.loop)
                 self.aiomain_task = asyncio.ensure_future(self.msg.aioread(("ns", "nsi"), loop=self.loop,
-                                                                           callback=self._msg_callback),
+                                                                           aiocallback=self._msg_callback),
                                                           loop=self.loop)
                 await asyncio.wait_for(self.aiomain_task, timeout=None, loop=self.loop)
             except Exception as e:
                                                           loop=self.loop)
                 await asyncio.wait_for(self.aiomain_task, timeout=None, loop=self.loop)
             except Exception as e:
@@ -141,7 +141,7 @@ class SubscriptionThread(threading.Thread):
         self._stop()
         self.loop.close()
 
         self._stop()
         self.loop.close()
 
-    def _msg_callback(self, topic, command, params):
+    async def _msg_callback(self, topic, command, params):
         """
         Callback to process a received message from kafka
         :param topic:  topic received
         """
         Callback to process a received message from kafka
         :param topic:  topic received
@@ -149,21 +149,27 @@ class SubscriptionThread(threading.Thread):
         :param params: rest of parameters
         :return: None
         """
         :param params: rest of parameters
         :return: None
         """
+        msg_to_send = []
         try:
             if topic == "ns":
                 if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
                     self.logger.debug("received ns terminated {}".format(params))
                     if params.get("autoremove"):
         try:
             if topic == "ns":
                 if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
                     self.logger.debug("received ns terminated {}".format(params))
                     if params.get("autoremove"):
-                        self.engine.del_item(self.internal_session, "nsrs", _id=params["nsr_id"])
+                        self.engine.del_item(self.internal_session, "nsrs", _id=params["nsr_id"],
+                                             not_send_msg=msg_to_send)
                         self.logger.debug("ns={} deleted from database".format(params["nsr_id"]))
                         self.logger.debug("ns={} deleted from database".format(params["nsr_id"]))
-                    return
-            if topic == "nsi":
+            elif topic == "nsi":
                 if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
                     self.logger.debug("received nsi terminated {}".format(params))
                     if params.get("autoremove"):
                 if command == "terminated" and params["operationState"] in ("COMPLETED", "PARTIALLY_COMPLETED"):
                     self.logger.debug("received nsi terminated {}".format(params))
                     if params.get("autoremove"):
-                        self.engine.del_item(self.internal_session, "nsis", _id=params["nsir_id"])
+                        self.engine.del_item(self.internal_session, "nsis", _id=params["nsir_id"],
+                                             not_send_msg=msg_to_send)
                         self.logger.debug("nsis={} deleted from database".format(params["nsir_id"]))
                         self.logger.debug("nsis={} deleted from database".format(params["nsir_id"]))
-                    return
+
+            # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that, 
+            # but content to be written is stored at msg_to_send
+            for msg in msg_to_send:
+                await self.msg.aiowrite(*msg, loop=self.loop)
         except (EngineException, DbException, MsgException) as e:
             self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e))
         except Exception as e:
         except (EngineException, DbException, MsgException) as e:
             self.logger.error("Error while processing topic={} command={}: {}".format(topic, command, e))
         except Exception as e: