Feature-9904: Enhancing NG-UI to enable Juju operational view dashboard
[osm/NBI.git] / osm_nbi / notifications.py
index 2e7ba94..7b681a1 100644 (file)
@@ -44,10 +44,7 @@ class NotificationBase:
 
     response_models = None
     # Common HTTP payload header for all notifications.
 
     response_models = None
     # Common HTTP payload header for all notifications.
-    payload_header = {
-        "Content-Type": "application/json",
-        "Accept": "application/json"
-    }
+    payload_header = {"Content-Type": "application/json", "Accept": "application/json"}
 
     def __init__(self, db) -> None:
         """
 
     def __init__(self, db) -> None:
         """
@@ -72,13 +69,18 @@ class NotificationBase:
         :param kwargs: any keyword arguments needed for db query.
         :return: List of subscribers
         """
         :param kwargs: any keyword arguments needed for db query.
         :return: List of subscribers
         """
-        raise NotificationException("Method get_subscribers() is not implemented", http_code=HTTPStatus.NOT_IMPLEMENTED)
+        raise NotificationException(
+            "Method get_subscribers() is not implemented",
+            http_code=HTTPStatus.NOT_IMPLEMENTED,
+        )
 
     @staticmethod
     def _get_basic_auth(username: str, password: str) -> tuple:
         return aiohttp.BasicAuth(username, password)
 
 
     @staticmethod
     def _get_basic_auth(username: str, password: str) -> tuple:
         return aiohttp.BasicAuth(username, password)
 
-    def _decrypt_password(self, hashed: str, salt: str, schema_version: str = "1.1") -> str:
+    def _decrypt_password(
+        self, hashed: str, salt: str, schema_version: str = "1.1"
+    ) -> str:
         return self.db.decrypt(hashed, schema_version, salt=salt)
 
     def get_payload(self, meta_notification: dict) -> dict:
         return self.db.decrypt(hashed, schema_version, salt=salt)
 
     def get_payload(self, meta_notification: dict) -> dict:
@@ -90,18 +92,25 @@ class NotificationBase:
         model_name = meta_notification["notificationType"]
         response_models = self.get_models()
         if not response_models or not response_models.get(model_name):
         model_name = meta_notification["notificationType"]
         response_models = self.get_models()
         if not response_models or not response_models.get(model_name):
-            raise NotificationException("Response model {} is not defined.".format(model_name),
-                                        HTTPStatus.NOT_IMPLEMENTED)
+            raise NotificationException(
+                "Response model {} is not defined.".format(model_name),
+                HTTPStatus.NOT_IMPLEMENTED,
+            )
         model_keys = response_models[model_name]
         payload = dict.fromkeys(model_keys, "N/A")
         notification_keys = set(meta_notification.keys())
         for model_key in model_keys.intersection(notification_keys):
             payload[model_key] = meta_notification[model_key]
         model_keys = response_models[model_name]
         payload = dict.fromkeys(model_keys, "N/A")
         notification_keys = set(meta_notification.keys())
         for model_key in model_keys.intersection(notification_keys):
             payload[model_key] = meta_notification[model_key]
-        self.logger.debug("Payload generated for subscriber: {} for {}".format(payload["subscriptionId"],
-                                                                               payload["notificationType"]))
+        self.logger.debug(
+            "Payload generated for subscriber: {} for {}".format(
+                payload["subscriptionId"], payload["notificationType"]
+            )
+        )
         return payload
 
         return payload
 
-    async def send_notifications(self, subscribers: list, loop: asyncio.AbstractEventLoop = None):
+    async def send_notifications(
+        self, subscribers: list, loop: asyncio.AbstractEventLoop = None
+    ):
         """
         Generate tasks for all notification for an event.
         :param subscribers: A list of subscribers who want to be notified for event.
         """
         Generate tasks for all notification for an event.
         :param subscribers: A list of subscribers who want to be notified for event.
@@ -111,38 +120,59 @@ class NotificationBase:
         for subscriber in subscribers:
             # Notify without auth
             if not subscriber.get("authentication"):
         for subscriber in subscribers:
             # Notify without auth
             if not subscriber.get("authentication"):
-                notifications.append({
-                    "headers": self.payload_header,
-                    "payload": self.get_payload(subscriber),
-                    "CallbackUri": subscriber["CallbackUri"]
-                })
+                notifications.append(
+                    {
+                        "headers": self.payload_header,
+                        "payload": self.get_payload(subscriber),
+                        "CallbackUri": subscriber["CallbackUri"],
+                    }
+                )
             elif subscriber["authentication"]["authType"] == "basic":
                 salt = subscriber["subscriptionId"]
             elif subscriber["authentication"]["authType"] == "basic":
                 salt = subscriber["subscriptionId"]
-                hashed_password = subscriber["authentication"]["paramsBasic"]["password"]
+                hashed_password = subscriber["authentication"]["paramsBasic"][
+                    "password"
+                ]
                 password = self._decrypt_password(hashed_password, salt)
                 password = self._decrypt_password(hashed_password, salt)
-                auth_basic = self._get_basic_auth(subscriber["authentication"]["paramsBasic"]["userName"], password)
-                notifications.append({
-                    "headers": self.payload_header,
-                    "payload": self.get_payload(subscriber),
-                    "auth_basic": auth_basic,
-                    "CallbackUri": subscriber["CallbackUri"]
-                })
+                auth_basic = self._get_basic_auth(
+                    subscriber["authentication"]["paramsBasic"]["userName"], password
+                )
+                notifications.append(
+                    {
+                        "headers": self.payload_header,
+                        "payload": self.get_payload(subscriber),
+                        "auth_basic": auth_basic,
+                        "CallbackUri": subscriber["CallbackUri"],
+                    }
+                )
             # TODO add support for AuthType OAuth and TLS after support is added in subscription.
             else:
             # TODO add support for AuthType OAuth and TLS after support is added in subscription.
             else:
-                self.logger.debug("Subscriber {} can not be notified. {} notification auth type is not implemented"
-                                  .format(subscriber["subscriptionId"],
-                                          subscriber["authentication"]["authType"]))
+                self.logger.debug(
+                    "Subscriber {} can not be notified. {} notification auth type is not implemented".format(
+                        subscriber["subscriptionId"],
+                        subscriber["authentication"]["authType"],
+                    )
+                )
 
         if notifications:
             tasks = []
             async with aiohttp.ClientSession(loop=loop) as session:
                 for notification in notifications:
 
         if notifications:
             tasks = []
             async with aiohttp.ClientSession(loop=loop) as session:
                 for notification in notifications:
-                    tasks.append(asyncio.ensure_future(self.send_notification(session, notification, loop=loop),
-                                                       loop=loop))
+                    tasks.append(
+                        asyncio.ensure_future(
+                            self.send_notification(session, notification, loop=loop),
+                            loop=loop,
+                        )
+                    )
                 await asyncio.gather(*tasks, loop=loop)
 
                 await asyncio.gather(*tasks, loop=loop)
 
-    async def send_notification(self, session: aiohttp.ClientSession, notification: dict,
-                                loop: asyncio.AbstractEventLoop = None, retry_count: int = 5, timeout: float = 5.0):
+    async def send_notification(
+        self,
+        session: aiohttp.ClientSession,
+        notification: dict,
+        loop: asyncio.AbstractEventLoop = None,
+        retry_count: int = 5,
+        timeout: float = 5.0,
+    ):
         """
         Performs HTTP Post request to notify subscriber. In case if for any reason notification is not sent successfully
         after maximum number of reties, then notification is dropped.
         """
         Performs HTTP Post request to notify subscriber. In case if for any reason notification is not sent successfully
         after maximum number of reties, then notification is dropped.
@@ -155,14 +185,20 @@ class NotificationBase:
         backoff_delay = 1
         while retry_count > 0:
             try:
         backoff_delay = 1
         while retry_count > 0:
             try:
-                async with session.post(url=notification["CallbackUri"], headers=notification["headers"],
-                                        auth=notification.get("auth_basic", None),
-                                        data=json.dumps(notification["payload"]),
-                                        timeout=timeout) as resp:
+                async with session.post(
+                    url=notification["CallbackUri"],
+                    headers=notification["headers"],
+                    auth=notification.get("auth_basic", None),
+                    data=json.dumps(notification["payload"]),
+                    timeout=timeout,
+                ) as resp:
                     # self.logger.debug("Notification response: {}".format(resp.status))
                     if resp.status == HTTPStatus.NO_CONTENT:
                     # self.logger.debug("Notification response: {}".format(resp.status))
                     if resp.status == HTTPStatus.NO_CONTENT:
-                        self.logger.debug("Notification sent successfully to subscriber {}"
-                                          .format(notification["payload"]["subscriptionId"]))
+                        self.logger.debug(
+                            "Notification sent successfully to subscriber {}".format(
+                                notification["payload"]["subscriptionId"]
+                            )
+                        )
                     else:
                         error_text = "Erroneous response code: {}, ".format(resp.status)
                         error_text += await resp.text()
                     else:
                         error_text = "Erroneous response code: {}, ".format(resp.status)
                         error_text += await resp.text()
@@ -170,12 +206,15 @@ class NotificationBase:
                 return True
             except Exception as e:
                 error_text = type(e).__name__ + ": " + str(e)
                 return True
             except Exception as e:
                 error_text = type(e).__name__ + ": " + str(e)
-                self.logger.debug("Unable to send notification to subscriber {}. Details: {}"
-                                  .format(notification["payload"]["subscriptionId"], error_text))
+                self.logger.debug(
+                    "Unable to send notification to subscriber {}. Details: {}".format(
+                        notification["payload"]["subscriptionId"], error_text
+                    )
+                )
                 error_detail = {
                     "error": type(e).__name__,
                     "error_text": str(e),
                 error_detail = {
                     "error": type(e).__name__,
                     "error_text": str(e),
-                    "timestamp": time.time()
+                    "timestamp": time.time(),
                 }
                 if "error_details" in notification["payload"].keys():
                     notification["payload"]["error_details"].append(error_detail)
                 }
                 if "error_details" in notification["payload"].keys():
                     notification["payload"]["error_details"].append(error_detail)
@@ -183,13 +222,19 @@ class NotificationBase:
                     notification["payload"]["error_details"] = [error_detail]
                 retry_count -= 1
                 backoff_delay *= 2
                     notification["payload"]["error_details"] = [error_detail]
                 retry_count -= 1
                 backoff_delay *= 2
-                self.logger.debug("Retry Notification for subscriber: {} after backoff delay: {} seconds."
-                                  .format(notification["payload"]["subscriptionId"], backoff_delay))
+                self.logger.debug(
+                    "Retry Notification for subscriber: {} after backoff delay: {} seconds.".format(
+                        notification["payload"]["subscriptionId"], backoff_delay
+                    )
+                )
                 await asyncio.sleep(backoff_delay, loop=loop)
         # Dropping notification
                 await asyncio.sleep(backoff_delay, loop=loop)
         # Dropping notification
-        self.logger.debug("Notification {} sent failed to subscriber:{}."
-                          .format(notification["payload"]["notificationType"],
-                                  notification["payload"]["subscriptionId"]))
+        self.logger.debug(
+            "Notification {} sent failed to subscriber:{}.".format(
+                notification["payload"]["notificationType"],
+                notification["payload"]["subscriptionId"],
+            )
+        )
         return False
 
 
         return False
 
 
@@ -197,22 +242,52 @@ class NsLcmNotification(NotificationBase):
 
     # SOL005 response model for nslcm notifications
     response_models = {
 
     # SOL005 response model for nslcm notifications
     response_models = {
-        "NsLcmOperationOccurrenceNotification": {"id", "nsInstanceId", "nsLcmOpOccId", "operation",
-                                                 "notificationType", "subscriptionId", "timestamp",
-                                                 "notificationStatus", "operationState", "isAutomaticInvocation",
-                                                 "affectedVnf", "affectedVl", "affectedVnffg", "affectedNs",
-                                                 "affectedSap", "error", "_links"},
-
-        "NsIdentifierCreationNotification": {"notificationType", "subscriptionId", "timestamp",
-                                             "nsInstanceId", "_links"},
-
-        "NsIdentifierDeletionNotification": {"notificationType", "subscriptionId", "timestamp",
-                                             "nsInstanceId", "_links"},
-
-        "NsChangeNotification": {"nsInstanceId", "nsComponentType", "nsComponentId",
-                                 "lcmOpOccIdImpactngNsComponent", "lcmOpNameImpactingNsComponent",
-                                 "lcmOpOccStatusImpactingNsComponent", "notificationType", "subscriptionId",
-                                 "timeStamp", "error", "_links"}
+        "NsLcmOperationOccurrenceNotification": {
+            "id",
+            "nsInstanceId",
+            "nsLcmOpOccId",
+            "operation",
+            "notificationType",
+            "subscriptionId",
+            "timestamp",
+            "notificationStatus",
+            "operationState",
+            "isAutomaticInvocation",
+            "affectedVnf",
+            "affectedVl",
+            "affectedVnffg",
+            "affectedNs",
+            "affectedSap",
+            "error",
+            "_links",
+        },
+        "NsIdentifierCreationNotification": {
+            "notificationType",
+            "subscriptionId",
+            "timestamp",
+            "nsInstanceId",
+            "_links",
+        },
+        "NsIdentifierDeletionNotification": {
+            "notificationType",
+            "subscriptionId",
+            "timestamp",
+            "nsInstanceId",
+            "_links",
+        },
+        "NsChangeNotification": {
+            "nsInstanceId",
+            "nsComponentType",
+            "nsComponentId",
+            "lcmOpOccIdImpactngNsComponent",
+            "lcmOpNameImpactingNsComponent",
+            "lcmOpOccStatusImpactingNsComponent",
+            "notificationType",
+            "subscriptionId",
+            "timeStamp",
+            "error",
+            "_links",
+        },
     }
 
     def __init__(self, db) -> None:
     }
 
     def __init__(self, db) -> None:
@@ -256,8 +331,14 @@ class NsLcmNotification(NotificationBase):
             subscriber.update(event_details["params"])
         return subscribers
 
             subscriber.update(event_details["params"])
         return subscribers
 
-    def get_subscribers(self, nsd_id: str, ns_instance_id: str, command: str, op_state: str,
-                        event_details: dict) -> list:
+    def get_subscribers(
+        self,
+        nsd_id: str,
+        ns_instance_id: str,
+        command: str,
+        op_state: str,
+        event_details: dict,
+    ) -> list:
         """
         Queries database and returns list of subscribers.
         :param nsd_id: NSD id of an NS whose lifecycle has changed. (scaled, terminated. etc)
         """
         Queries database and returns list of subscribers.
         :param nsd_id: NSD id of an NS whose lifecycle has changed. (scaled, terminated. etc)
@@ -267,7 +348,11 @@ class NsLcmNotification(NotificationBase):
         :param event_details: dict containing raw data of event occured.
         :return: List of interested subscribers for occurred event.
         """
         :param event_details: dict containing raw data of event occured.
         :return: List of interested subscribers for occurred event.
         """
-        filter_q = {"identifier": [nsd_id, ns_instance_id], "operationStates": ["ANY"], "operationTypes": ["ANY"]}
+        filter_q = {
+            "identifier": [nsd_id, ns_instance_id],
+            "operationStates": ["ANY"],
+            "operationTypes": ["ANY"],
+        }
         if op_state:
             filter_q["operationStates"].append(op_state)
         if command:
         if op_state:
             filter_q["operationStates"].append(op_state)
         if command:
@@ -285,7 +370,6 @@ class NsLcmNotification(NotificationBase):
 
 
 class NsdNotification(NotificationBase):
 
 
 class NsdNotification(NotificationBase):
-
     def __init__(self, db):
         """
         Constructor of the class
     def __init__(self, db):
         """
         Constructor of the class
@@ -297,7 +381,6 @@ class NsdNotification(NotificationBase):
 
 
 class VnfdNotification(NotificationBase):
 
 
 class VnfdNotification(NotificationBase):
-
     def __init__(self, db):
         """
         Constructor of the class
     def __init__(self, db):
         """
         Constructor of the class