Fix Bug 2229 Set fixed IP address for VDU through VNFD and the instantiation params
[osm/NBI.git] / osm_nbi / subscriptions.py
index 6810ccd..b178e5b 100644 (file)
@@ -29,7 +29,7 @@ from osm_common import dbmongo, dbmemory, msglocal, msgkafka
 from osm_common.dbbase import DbException
 from osm_common.msgbase import MsgException
 from osm_nbi.engine import EngineException
-from osm_nbi.notifications import NsLcmNotification
+from osm_nbi.notifications import NsLcmNotification, VnfLcmNotification
 
 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
 
@@ -70,6 +70,7 @@ class SubscriptionThread(threading.Thread):
             "method": "delete",
         }
         self.nslcm = None
+        self.vnflcm = None
 
     async def start_kafka(self):
         # timeout_wait_for_kafka = 3*60
@@ -84,6 +85,7 @@ class SubscriptionThread(threading.Thread):
                 )
                 await self.msg.aiowrite("ns", "echo", "dummy message", loop=self.loop)
                 await self.msg.aiowrite("nsi", "echo", "dummy message", loop=self.loop)
+                await self.msg.aiowrite("vnf", "echo", "dummy message", loop=self.loop)
                 if not kafka_working:
                     self.logger.critical("kafka is working again")
                     kafka_working = True
@@ -104,7 +106,7 @@ class SubscriptionThread(threading.Thread):
                     self.logger.debug("Starting non-admin subscription task")
                     self.aiomain_task = asyncio.ensure_future(
                         self.msg.aioread(
-                            ("ns", "nsi"),
+                            ("ns", "nsi", "vnf"),
                             loop=self.loop,
                             aiocallback=self._msg_callback,
                         ),
@@ -178,13 +180,13 @@ class SubscriptionThread(threading.Thread):
                         )
                     )
             self.nslcm = NsLcmNotification(self.db)
+            self.vnflcm = VnfLcmNotification(self.db)
         except (DbException, MsgException) as e:
             raise SubscriptionException(str(e), http_code=e.http_code)
 
         self.logger.debug("Starting")
         while not self.to_terminate:
             try:
-
                 self.loop.run_until_complete(
                     asyncio.ensure_future(self.start_kafka(), loop=self.loop)
                 )
@@ -272,6 +274,31 @@ class SubscriptionThread(threading.Thread):
                     self.logger.debug(
                         "Message can not be used for notification of nslcm"
                     )
+            elif topic == "vnf":
+                if isinstance(params, dict):
+                    vnfd_id = params["vnfdId"]
+                    vnf_instance_id = params["vnfInstanceId"]
+                    if command == "create" or command == "delete":
+                        op_state = command
+                    else:
+                        op_state = params["operationState"]
+                    event_details = {
+                        "topic": topic,
+                        "command": command.upper(),
+                        "params": params,
+                    }
+                    subscribers = self.vnflcm.get_subscribers(
+                        vnfd_id,
+                        vnf_instance_id,
+                        command.upper(),
+                        op_state,
+                        event_details,
+                    )
+                    if subscribers:
+                        asyncio.ensure_future(
+                            self.vnflcm.send_notifications(subscribers, loop=self.loop),
+                            loop=self.loop,
+                        )
             elif topic == "nsi":
                 if command == "terminated" and params["operationState"] in (
                     "COMPLETED",