Bug 1830 fixed: maps completed operations to original operation types
[osm/NBI.git] / osm_nbi / subscriptions.py
index 6810ccd..846e7d3 100644 (file)
@@ -29,7 +29,7 @@ from osm_common import dbmongo, dbmemory, msglocal, msgkafka
 from osm_common.dbbase import DbException
 from osm_common.msgbase import MsgException
 from osm_nbi.engine import EngineException
-from osm_nbi.notifications import NsLcmNotification
+from osm_nbi.notifications import NsLcmNotification, VnfLcmNotification
 
 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
 
@@ -53,7 +53,6 @@ class SubscriptionThread(threading.Thread):
         self.db = None
         self.msg = None
         self.engine = engine
-        self.loop = None
         self.logger = logging.getLogger("nbi.subscriptions")
         self.aiomain_task_admin = (
             None  # asyncio task for receiving admin actions from kafka bus
@@ -70,6 +69,7 @@ class SubscriptionThread(threading.Thread):
             "method": "delete",
         }
         self.nslcm = None
+        self.vnflcm = None
 
     async def start_kafka(self):
         # timeout_wait_for_kafka = 3*60
@@ -80,40 +80,38 @@ class SubscriptionThread(threading.Thread):
                 # created.
                 # Before subscribe, send dummy messages
                 await self.msg.aiowrite(
-                    "admin", "echo", "dummy message", loop=self.loop
+                    "admin",
+                    "echo",
+                    "dummy message",
                 )
-                await self.msg.aiowrite("ns", "echo", "dummy message", loop=self.loop)
-                await self.msg.aiowrite("nsi", "echo", "dummy message", loop=self.loop)
+                await self.msg.aiowrite("ns", "echo", "dummy message")
+                await self.msg.aiowrite("nsi", "echo", "dummy message")
+                await self.msg.aiowrite("vnf", "echo", "dummy message")
                 if not kafka_working:
                     self.logger.critical("kafka is working again")
                     kafka_working = True
                 if not self.aiomain_task_admin:
-                    await asyncio.sleep(10, loop=self.loop)
+                    await asyncio.sleep(10)
                     self.logger.debug("Starting admin subscription task")
                     self.aiomain_task_admin = asyncio.ensure_future(
                         self.msg.aioread(
                             ("admin",),
-                            loop=self.loop,
                             group_id=False,
                             aiocallback=self._msg_callback,
                         ),
-                        loop=self.loop,
                     )
                 if not self.aiomain_task:
-                    await asyncio.sleep(10, loop=self.loop)
+                    await asyncio.sleep(10)
                     self.logger.debug("Starting non-admin subscription task")
                     self.aiomain_task = asyncio.ensure_future(
                         self.msg.aioread(
-                            ("ns", "nsi"),
-                            loop=self.loop,
+                            ("ns", "nsi", "vnf"),
                             aiocallback=self._msg_callback,
                         ),
-                        loop=self.loop,
                     )
                 done, _ = await asyncio.wait(
                     [self.aiomain_task, self.aiomain_task_admin],
                     timeout=None,
-                    loop=self.loop,
                     return_when=asyncio.FIRST_COMPLETED,
                 )
                 try:
@@ -140,14 +138,13 @@ class SubscriptionThread(threading.Thread):
                         "Error accessing kafka '{}'. Retrying ...".format(e)
                     )
                     kafka_working = False
-            await asyncio.sleep(10, loop=self.loop)
+            await asyncio.sleep(10)
 
     def run(self):
         """
         Start of the thread
         :return: None
         """
-        self.loop = asyncio.new_event_loop()
         try:
             if not self.db:
                 if self.config["database"]["driver"] == "mongo":
@@ -164,7 +161,6 @@ class SubscriptionThread(threading.Thread):
                     )
             if not self.msg:
                 config_msg = self.config["message"].copy()
-                config_msg["loop"] = self.loop
                 if config_msg["driver"] == "local":
                     self.msg = msglocal.MsgLocal()
                     self.msg.connect(config_msg)
@@ -178,18 +174,14 @@ class SubscriptionThread(threading.Thread):
                         )
                     )
             self.nslcm = NsLcmNotification(self.db)
+            self.vnflcm = VnfLcmNotification(self.db)
         except (DbException, MsgException) as e:
             raise SubscriptionException(str(e), http_code=e.http_code)
 
         self.logger.debug("Starting")
         while not self.to_terminate:
             try:
-
-                self.loop.run_until_complete(
-                    asyncio.ensure_future(self.start_kafka(), loop=self.loop)
-                )
-            # except asyncio.CancelledError:
-            #     break  # if cancelled it should end, breaking loop
+                asyncio.run(self.start_kafka())
             except Exception as e:
                 if not self.to_terminate:
                     self.logger.exception(
@@ -198,7 +190,6 @@ class SubscriptionThread(threading.Thread):
 
         self.logger.debug("Finishing")
         self._stop()
-        self.loop.close()
 
     async def _msg_callback(self, topic, command, params):
         """
@@ -263,15 +254,36 @@ class SubscriptionThread(threading.Thread):
                             # self.logger.debug(subscribers)
                             if subscribers:
                                 asyncio.ensure_future(
-                                    self.nslcm.send_notifications(
-                                        subscribers, loop=self.loop
-                                    ),
-                                    loop=self.loop,
+                                    self.nslcm.send_notifications(subscribers),
                                 )
                 else:
                     self.logger.debug(
                         "Message can not be used for notification of nslcm"
                     )
+            elif topic == "vnf":
+                if isinstance(params, dict):
+                    vnfd_id = params["vnfdId"]
+                    vnf_instance_id = params["vnfInstanceId"]
+                    if command == "create" or command == "delete":
+                        op_state = command
+                    else:
+                        op_state = params["operationState"]
+                    event_details = {
+                        "topic": topic,
+                        "command": command.upper(),
+                        "params": params,
+                    }
+                    subscribers = self.vnflcm.get_subscribers(
+                        vnfd_id,
+                        vnf_instance_id,
+                        command.upper(),
+                        op_state,
+                        event_details,
+                    )
+                    if subscribers:
+                        asyncio.ensure_future(
+                            self.vnflcm.send_notifications(subscribers),
+                        )
             elif topic == "nsi":
                 if command == "terminated" and params["operationState"] in (
                     "COMPLETED",
@@ -316,7 +328,7 @@ class SubscriptionThread(threading.Thread):
             # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that,
             # but content to be written is stored at msg_to_send
             for msg in msg_to_send:
-                await self.msg.aiowrite(*msg, loop=self.loop)
+                await self.msg.aiowrite(*msg)
         except (EngineException, DbException, MsgException) as e:
             self.logger.error(
                 "Error while processing topic={} command={}: {}".format(
@@ -352,6 +364,8 @@ class SubscriptionThread(threading.Thread):
         """
         self.to_terminate = True
         if self.aiomain_task:
-            self.loop.call_soon_threadsafe(self.aiomain_task.cancel)
+            asyncio.get_event_loop().call_soon_threadsafe(self.aiomain_task.cancel)
         if self.aiomain_task_admin:
-            self.loop.call_soon_threadsafe(self.aiomain_task_admin.cancel)
+            asyncio.get_event_loop().call_soon_threadsafe(
+                self.aiomain_task_admin.cancel
+            )