Bug 1830 fixed: maps completed operations to original operation types
[osm/NBI.git] / osm_nbi / notifications.py
index 47a24ba..22413d0 100644 (file)
@@ -41,7 +41,6 @@ class NotificationException(Exception):
 
 
 class NotificationBase:
-
     response_models = None
     # Common HTTP payload header for all notifications.
     payload_header = {"Content-Type": "application/json", "Accept": "application/json"}
@@ -109,12 +108,12 @@ class NotificationBase:
         return payload
 
     async def send_notifications(
-        self, subscribers: list, loop: asyncio.AbstractEventLoop = None
+        self,
+        subscribers: list,
     ):
         """
         Generate tasks for all notification for an event.
         :param subscribers: A list of subscribers who want to be notified for event.
-        :param loop: Event loop object.
         """
         notifications = []
         for subscriber in subscribers:
@@ -155,21 +154,19 @@ class NotificationBase:
 
         if notifications:
             tasks = []
-            async with aiohttp.ClientSession(loop=loop) as session:
+            async with aiohttp.ClientSession() as session:
                 for notification in notifications:
                     tasks.append(
                         asyncio.ensure_future(
-                            self.send_notification(session, notification, loop=loop),
-                            loop=loop,
+                            self.send_notification(session, notification),
                         )
                     )
-                await asyncio.gather(*tasks, loop=loop)
+                await asyncio.gather(*tasks)
 
     async def send_notification(
         self,
         session: aiohttp.ClientSession,
         notification: dict,
-        loop: asyncio.AbstractEventLoop = None,
         retry_count: int = 5,
         timeout: float = 5.0,
     ):
@@ -178,7 +175,6 @@ class NotificationBase:
         after maximum number of reties, then notification is dropped.
         :param session: An aiohttp client session object to maintain http session.
         :param notification: A dictionary containing all necessary data to make POST request.
-        :param loop: Event loop object.
         :param retry_count: An integer specifying the maximum number of reties for a notification.
         :param timeout: A float representing client timeout of each HTTP request.
         """
@@ -227,7 +223,7 @@ class NotificationBase:
                         notification["payload"]["subscriptionId"], backoff_delay
                     )
                 )
-                await asyncio.sleep(backoff_delay, loop=loop)
+                await asyncio.sleep(backoff_delay)
         # Dropping notification
         self.logger.debug(
             "Notification {} sent failed to subscriber:{}.".format(
@@ -239,7 +235,14 @@ class NotificationBase:
 
 
 class NsLcmNotification(NotificationBase):
-
+    # maps kafka commands of completed operations to the original operation type
+    completed_operation_map = {
+        "INSTANTIATED": "INSTANTIATE",
+        "SCALED": "SCALE",
+        "TERMINATED": "TERMINATE",
+        "UPDATED": "UPDATE",
+        "HEALED": "HEAL",
+    }
     # SOL005 response model for nslcm notifications
     response_models = {
         "NsLcmOperationOccurrenceNotification": {
@@ -352,18 +355,19 @@ class NsLcmNotification(NotificationBase):
             "NsLcmOperationOccurrenceNotification",
             "NsChangeNotification",
             "NsIdentifierCreationNotification",
-            "NsIdentifierDeletionNotification"
+            "NsIdentifierDeletionNotification",
         ]
         filter_q = {
             "identifier": [nsd_id, ns_instance_id],
             "operationStates": ["ANY"],
             "operationTypes": ["ANY"],
-            "notificationType": notification_type
-            }
+            "notificationType": notification_type,
+        }
         if op_state:
             filter_q["operationStates"].append(op_state)
         if command:
-            filter_q["operationTypes"].append(command)
+            op_type = self.completed_operation_map.get(command, command)
+            filter_q["operationTypes"].append(op_type)
         # self.logger.debug("Db query is: {}".format(filter_q))
         subscribers = []
         try:
@@ -398,7 +402,7 @@ class VnfLcmNotification(NotificationBase):
             "changedExtConnectivity",
             "modificationsTriggeredByVnfPkgChange",
             "error",
-            "_links"
+            "_links",
         },
         "VnfIdentifierCreationNotification": {
             "id",
@@ -406,7 +410,7 @@ class VnfLcmNotification(NotificationBase):
             "subscriptionId",
             "timeStamp",
             "vnfInstanceId",
-            "_links"
+            "_links",
         },
         "VnfIdentifierDeletionNotification": {
             "id",
@@ -414,7 +418,7 @@ class VnfLcmNotification(NotificationBase):
             "subscriptionId",
             "timeStamp",
             "vnfInstanceId",
-            "_links"
+            "_links",
         },
     }
 
@@ -434,7 +438,9 @@ class VnfLcmNotification(NotificationBase):
         """
         return self.response_models
 
-    def _format_vnflcm_subscribers(self, subscribers: list, event_details: dict) -> list:
+    def _format_vnflcm_subscribers(
+        self, subscribers: list, event_details: dict
+    ) -> list:
         """
         Formats the raw event details from kafka message and subscriber details.
         :param subscribers: A list of subscribers whom the event needs to be notified.
@@ -454,8 +460,14 @@ class VnfLcmNotification(NotificationBase):
             subscriber.update(event_details["params"])
         return subscribers
 
-    def get_subscribers(self, vnfd_id: str, vnf_instance_id: str, command: str, op_state: str,
-                        event_details: dict) -> list:
+    def get_subscribers(
+        self,
+        vnfd_id: str,
+        vnf_instance_id: str,
+        command: str,
+        op_state: str,
+        event_details: dict,
+    ) -> list:
         """
         Queries database and returns list of subscribers.
         :param vnfd_id: Vnfd id of a VNF whose lifecycle has changed. (instantiated, scaled, terminated. etc)
@@ -468,13 +480,13 @@ class VnfLcmNotification(NotificationBase):
         notification_type = [
             "VnfIdentifierCreationNotification",
             "VnfLcmOperationOccurrenceNotification",
-            "VnfIdentifierDeletionNotification"
+            "VnfIdentifierDeletionNotification",
         ]
         filter_q = {
             "identifier": [vnfd_id, vnf_instance_id],
             "operationStates": ["ANY"],
             "operationTypes": ["ANY"],
-            "notificationType": notification_type
+            "notificationType": notification_type,
         }
         if op_state:
             filter_q["operationStates"].append(op_state)