Feature 10926 - Subscription feature for SOL003 VNF-LCM
[osm/NBI.git] / osm_nbi / subscriptions.py
index 6810ccd..1f172dd 100644 (file)
@@ -29,7 +29,7 @@ from osm_common import dbmongo, dbmemory, msglocal, msgkafka
 from osm_common.dbbase import DbException
 from osm_common.msgbase import MsgException
 from osm_nbi.engine import EngineException
-from osm_nbi.notifications import NsLcmNotification
+from osm_nbi.notifications import NsLcmNotification, VnfLcmNotification
 
 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
 
@@ -70,6 +70,7 @@ class SubscriptionThread(threading.Thread):
             "method": "delete",
         }
         self.nslcm = None
+        self.vnflcm = None
 
     async def start_kafka(self):
         # timeout_wait_for_kafka = 3*60
@@ -84,6 +85,7 @@ class SubscriptionThread(threading.Thread):
                 )
                 await self.msg.aiowrite("ns", "echo", "dummy message", loop=self.loop)
                 await self.msg.aiowrite("nsi", "echo", "dummy message", loop=self.loop)
+                await self.msg.aiowrite("vnf", "echo", "dummy message", loop=self.loop)
                 if not kafka_working:
                     self.logger.critical("kafka is working again")
                     kafka_working = True
@@ -104,7 +106,7 @@ class SubscriptionThread(threading.Thread):
                     self.logger.debug("Starting non-admin subscription task")
                     self.aiomain_task = asyncio.ensure_future(
                         self.msg.aioread(
-                            ("ns", "nsi"),
+                            ("ns", "nsi", "vnf"),
                             loop=self.loop,
                             aiocallback=self._msg_callback,
                         ),
@@ -178,6 +180,7 @@ class SubscriptionThread(threading.Thread):
                         )
                     )
             self.nslcm = NsLcmNotification(self.db)
+            self.vnflcm = VnfLcmNotification(self.db)
         except (DbException, MsgException) as e:
             raise SubscriptionException(str(e), http_code=e.http_code)
 
@@ -272,6 +275,33 @@ class SubscriptionThread(threading.Thread):
                     self.logger.debug(
                         "Message can not be used for notification of nslcm"
                     )
+            elif topic == "vnf":
+                if isinstance(params, dict):
+                    vnfd_id = params["vnfdId"]
+                    vnf_instance_id = params["vnfInstanceId"]
+                    if command == "create" or command == "delete":
+                        op_state = command
+                    else:
+                        op_state = params["operationState"]
+                    event_details = {
+                            "topic": topic,
+                            "command": command.upper(),
+                            "params": params,
+                            }
+                    subscribers = self.vnflcm.get_subscribers(
+                            vnfd_id,
+                            vnf_instance_id,
+                            command.upper(),
+                            op_state,
+                            event_details
+                            )
+                    if subscribers:
+                        asyncio.ensure_future(
+                                self.vnflcm.send_notifications(
+                                    subscribers, loop=self.loop
+                                ),
+                                loop=self.loop
+                            )
             elif topic == "nsi":
                 if command == "terminated" and params["operationState"] in (
                     "COMPLETED",