Bug 1830 fixed: maps completed operations to original operation types
[osm/NBI.git] / osm_nbi / notifications.py
index a62670b..22413d0 100644 (file)
@@ -108,12 +108,12 @@ class NotificationBase:
         return payload
 
     async def send_notifications(
-        self, subscribers: list, loop: asyncio.AbstractEventLoop = None
+        self,
+        subscribers: list,
     ):
         """
         Generate tasks for all notification for an event.
         :param subscribers: A list of subscribers who want to be notified for event.
-        :param loop: Event loop object.
         """
         notifications = []
         for subscriber in subscribers:
@@ -154,21 +154,19 @@ class NotificationBase:
 
         if notifications:
             tasks = []
-            async with aiohttp.ClientSession(loop=loop) as session:
+            async with aiohttp.ClientSession() as session:
                 for notification in notifications:
                     tasks.append(
                         asyncio.ensure_future(
-                            self.send_notification(session, notification, loop=loop),
-                            loop=loop,
+                            self.send_notification(session, notification),
                         )
                     )
-                await asyncio.gather(*tasks, loop=loop)
+                await asyncio.gather(*tasks)
 
     async def send_notification(
         self,
         session: aiohttp.ClientSession,
         notification: dict,
-        loop: asyncio.AbstractEventLoop = None,
         retry_count: int = 5,
         timeout: float = 5.0,
     ):
@@ -177,7 +175,6 @@ class NotificationBase:
         after maximum number of reties, then notification is dropped.
         :param session: An aiohttp client session object to maintain http session.
         :param notification: A dictionary containing all necessary data to make POST request.
-        :param loop: Event loop object.
         :param retry_count: An integer specifying the maximum number of reties for a notification.
         :param timeout: A float representing client timeout of each HTTP request.
         """
@@ -226,7 +223,7 @@ class NotificationBase:
                         notification["payload"]["subscriptionId"], backoff_delay
                     )
                 )
-                await asyncio.sleep(backoff_delay, loop=loop)
+                await asyncio.sleep(backoff_delay)
         # Dropping notification
         self.logger.debug(
             "Notification {} sent failed to subscriber:{}.".format(
@@ -238,6 +235,14 @@ class NotificationBase:
 
 
 class NsLcmNotification(NotificationBase):
+    # maps kafka commands of completed operations to the original operation type
+    completed_operation_map = {
+        "INSTANTIATED": "INSTANTIATE",
+        "SCALED": "SCALE",
+        "TERMINATED": "TERMINATE",
+        "UPDATED": "UPDATE",
+        "HEALED": "HEAL",
+    }
     # SOL005 response model for nslcm notifications
     response_models = {
         "NsLcmOperationOccurrenceNotification": {
@@ -361,7 +366,8 @@ class NsLcmNotification(NotificationBase):
         if op_state:
             filter_q["operationStates"].append(op_state)
         if command:
-            filter_q["operationTypes"].append(command)
+            op_type = self.completed_operation_map.get(command, command)
+            filter_q["operationTypes"].append(op_type)
         # self.logger.debug("Db query is: {}".format(filter_q))
         subscribers = []
         try: