Reformat NBI to standardized format
[osm/NBI.git] / osm_nbi / notifications.py
index 2e7ba94..7b681a1 100644 (file)
@@ -44,10 +44,7 @@ class NotificationBase:
 
     response_models = None
     # Common HTTP payload header for all notifications.
-    payload_header = {
-        "Content-Type": "application/json",
-        "Accept": "application/json"
-    }
+    payload_header = {"Content-Type": "application/json", "Accept": "application/json"}
 
     def __init__(self, db) -> None:
         """
@@ -72,13 +69,18 @@ class NotificationBase:
         :param kwargs: any keyword arguments needed for db query.
         :return: List of subscribers
         """
-        raise NotificationException("Method get_subscribers() is not implemented", http_code=HTTPStatus.NOT_IMPLEMENTED)
+        raise NotificationException(
+            "Method get_subscribers() is not implemented",
+            http_code=HTTPStatus.NOT_IMPLEMENTED,
+        )
 
     @staticmethod
     def _get_basic_auth(username: str, password: str) -> tuple:
         return aiohttp.BasicAuth(username, password)
 
-    def _decrypt_password(self, hashed: str, salt: str, schema_version: str = "1.1") -> str:
+    def _decrypt_password(
+        self, hashed: str, salt: str, schema_version: str = "1.1"
+    ) -> str:
         return self.db.decrypt(hashed, schema_version, salt=salt)
 
     def get_payload(self, meta_notification: dict) -> dict:
@@ -90,18 +92,25 @@ class NotificationBase:
         model_name = meta_notification["notificationType"]
         response_models = self.get_models()
         if not response_models or not response_models.get(model_name):
-            raise NotificationException("Response model {} is not defined.".format(model_name),
-                                        HTTPStatus.NOT_IMPLEMENTED)
+            raise NotificationException(
+                "Response model {} is not defined.".format(model_name),
+                HTTPStatus.NOT_IMPLEMENTED,
+            )
         model_keys = response_models[model_name]
         payload = dict.fromkeys(model_keys, "N/A")
         notification_keys = set(meta_notification.keys())
         for model_key in model_keys.intersection(notification_keys):
             payload[model_key] = meta_notification[model_key]
-        self.logger.debug("Payload generated for subscriber: {} for {}".format(payload["subscriptionId"],
-                                                                               payload["notificationType"]))
+        self.logger.debug(
+            "Payload generated for subscriber: {} for {}".format(
+                payload["subscriptionId"], payload["notificationType"]
+            )
+        )
         return payload
 
-    async def send_notifications(self, subscribers: list, loop: asyncio.AbstractEventLoop = None):
+    async def send_notifications(
+        self, subscribers: list, loop: asyncio.AbstractEventLoop = None
+    ):
         """
         Generate tasks for all notification for an event.
         :param subscribers: A list of subscribers who want to be notified for event.
@@ -111,38 +120,59 @@ class NotificationBase:
         for subscriber in subscribers:
             # Notify without auth
             if not subscriber.get("authentication"):
-                notifications.append({
-                    "headers": self.payload_header,
-                    "payload": self.get_payload(subscriber),
-                    "CallbackUri": subscriber["CallbackUri"]
-                })
+                notifications.append(
+                    {
+                        "headers": self.payload_header,
+                        "payload": self.get_payload(subscriber),
+                        "CallbackUri": subscriber["CallbackUri"],
+                    }
+                )
             elif subscriber["authentication"]["authType"] == "basic":
                 salt = subscriber["subscriptionId"]
-                hashed_password = subscriber["authentication"]["paramsBasic"]["password"]
+                hashed_password = subscriber["authentication"]["paramsBasic"][
+                    "password"
+                ]
                 password = self._decrypt_password(hashed_password, salt)
-                auth_basic = self._get_basic_auth(subscriber["authentication"]["paramsBasic"]["userName"], password)
-                notifications.append({
-                    "headers": self.payload_header,
-                    "payload": self.get_payload(subscriber),
-                    "auth_basic": auth_basic,
-                    "CallbackUri": subscriber["CallbackUri"]
-                })
+                auth_basic = self._get_basic_auth(
+                    subscriber["authentication"]["paramsBasic"]["userName"], password
+                )
+                notifications.append(
+                    {
+                        "headers": self.payload_header,
+                        "payload": self.get_payload(subscriber),
+                        "auth_basic": auth_basic,
+                        "CallbackUri": subscriber["CallbackUri"],
+                    }
+                )
             # TODO add support for AuthType OAuth and TLS after support is added in subscription.
             else:
-                self.logger.debug("Subscriber {} can not be notified. {} notification auth type is not implemented"
-                                  .format(subscriber["subscriptionId"],
-                                          subscriber["authentication"]["authType"]))
+                self.logger.debug(
+                    "Subscriber {} can not be notified. {} notification auth type is not implemented".format(
+                        subscriber["subscriptionId"],
+                        subscriber["authentication"]["authType"],
+                    )
+                )
 
         if notifications:
             tasks = []
             async with aiohttp.ClientSession(loop=loop) as session:
                 for notification in notifications:
-                    tasks.append(asyncio.ensure_future(self.send_notification(session, notification, loop=loop),
-                                                       loop=loop))
+                    tasks.append(
+                        asyncio.ensure_future(
+                            self.send_notification(session, notification, loop=loop),
+                            loop=loop,
+                        )
+                    )
                 await asyncio.gather(*tasks, loop=loop)
 
-    async def send_notification(self, session: aiohttp.ClientSession, notification: dict,
-                                loop: asyncio.AbstractEventLoop = None, retry_count: int = 5, timeout: float = 5.0):
+    async def send_notification(
+        self,
+        session: aiohttp.ClientSession,
+        notification: dict,
+        loop: asyncio.AbstractEventLoop = None,
+        retry_count: int = 5,
+        timeout: float = 5.0,
+    ):
         """
         Performs HTTP Post request to notify subscriber. In case if for any reason notification is not sent successfully
         after maximum number of reties, then notification is dropped.
@@ -155,14 +185,20 @@ class NotificationBase:
         backoff_delay = 1
         while retry_count > 0:
             try:
-                async with session.post(url=notification["CallbackUri"], headers=notification["headers"],
-                                        auth=notification.get("auth_basic", None),
-                                        data=json.dumps(notification["payload"]),
-                                        timeout=timeout) as resp:
+                async with session.post(
+                    url=notification["CallbackUri"],
+                    headers=notification["headers"],
+                    auth=notification.get("auth_basic", None),
+                    data=json.dumps(notification["payload"]),
+                    timeout=timeout,
+                ) as resp:
                     # self.logger.debug("Notification response: {}".format(resp.status))
                     if resp.status == HTTPStatus.NO_CONTENT:
-                        self.logger.debug("Notification sent successfully to subscriber {}"
-                                          .format(notification["payload"]["subscriptionId"]))
+                        self.logger.debug(
+                            "Notification sent successfully to subscriber {}".format(
+                                notification["payload"]["subscriptionId"]
+                            )
+                        )
                     else:
                         error_text = "Erroneous response code: {}, ".format(resp.status)
                         error_text += await resp.text()
@@ -170,12 +206,15 @@ class NotificationBase:
                 return True
             except Exception as e:
                 error_text = type(e).__name__ + ": " + str(e)
-                self.logger.debug("Unable to send notification to subscriber {}. Details: {}"
-                                  .format(notification["payload"]["subscriptionId"], error_text))
+                self.logger.debug(
+                    "Unable to send notification to subscriber {}. Details: {}".format(
+                        notification["payload"]["subscriptionId"], error_text
+                    )
+                )
                 error_detail = {
                     "error": type(e).__name__,
                     "error_text": str(e),
-                    "timestamp": time.time()
+                    "timestamp": time.time(),
                 }
                 if "error_details" in notification["payload"].keys():
                     notification["payload"]["error_details"].append(error_detail)
@@ -183,13 +222,19 @@ class NotificationBase:
                     notification["payload"]["error_details"] = [error_detail]
                 retry_count -= 1
                 backoff_delay *= 2
-                self.logger.debug("Retry Notification for subscriber: {} after backoff delay: {} seconds."
-                                  .format(notification["payload"]["subscriptionId"], backoff_delay))
+                self.logger.debug(
+                    "Retry Notification for subscriber: {} after backoff delay: {} seconds.".format(
+                        notification["payload"]["subscriptionId"], backoff_delay
+                    )
+                )
                 await asyncio.sleep(backoff_delay, loop=loop)
         # Dropping notification
-        self.logger.debug("Notification {} sent failed to subscriber:{}."
-                          .format(notification["payload"]["notificationType"],
-                                  notification["payload"]["subscriptionId"]))
+        self.logger.debug(
+            "Notification {} sent failed to subscriber:{}.".format(
+                notification["payload"]["notificationType"],
+                notification["payload"]["subscriptionId"],
+            )
+        )
         return False
 
 
@@ -197,22 +242,52 @@ class NsLcmNotification(NotificationBase):
 
     # SOL005 response model for nslcm notifications
     response_models = {
-        "NsLcmOperationOccurrenceNotification": {"id", "nsInstanceId", "nsLcmOpOccId", "operation",
-                                                 "notificationType", "subscriptionId", "timestamp",
-                                                 "notificationStatus", "operationState", "isAutomaticInvocation",
-                                                 "affectedVnf", "affectedVl", "affectedVnffg", "affectedNs",
-                                                 "affectedSap", "error", "_links"},
-
-        "NsIdentifierCreationNotification": {"notificationType", "subscriptionId", "timestamp",
-                                             "nsInstanceId", "_links"},
-
-        "NsIdentifierDeletionNotification": {"notificationType", "subscriptionId", "timestamp",
-                                             "nsInstanceId", "_links"},
-
-        "NsChangeNotification": {"nsInstanceId", "nsComponentType", "nsComponentId",
-                                 "lcmOpOccIdImpactngNsComponent", "lcmOpNameImpactingNsComponent",
-                                 "lcmOpOccStatusImpactingNsComponent", "notificationType", "subscriptionId",
-                                 "timeStamp", "error", "_links"}
+        "NsLcmOperationOccurrenceNotification": {
+            "id",
+            "nsInstanceId",
+            "nsLcmOpOccId",
+            "operation",
+            "notificationType",
+            "subscriptionId",
+            "timestamp",
+            "notificationStatus",
+            "operationState",
+            "isAutomaticInvocation",
+            "affectedVnf",
+            "affectedVl",
+            "affectedVnffg",
+            "affectedNs",
+            "affectedSap",
+            "error",
+            "_links",
+        },
+        "NsIdentifierCreationNotification": {
+            "notificationType",
+            "subscriptionId",
+            "timestamp",
+            "nsInstanceId",
+            "_links",
+        },
+        "NsIdentifierDeletionNotification": {
+            "notificationType",
+            "subscriptionId",
+            "timestamp",
+            "nsInstanceId",
+            "_links",
+        },
+        "NsChangeNotification": {
+            "nsInstanceId",
+            "nsComponentType",
+            "nsComponentId",
+            "lcmOpOccIdImpactngNsComponent",
+            "lcmOpNameImpactingNsComponent",
+            "lcmOpOccStatusImpactingNsComponent",
+            "notificationType",
+            "subscriptionId",
+            "timeStamp",
+            "error",
+            "_links",
+        },
     }
 
     def __init__(self, db) -> None:
@@ -256,8 +331,14 @@ class NsLcmNotification(NotificationBase):
             subscriber.update(event_details["params"])
         return subscribers
 
-    def get_subscribers(self, nsd_id: str, ns_instance_id: str, command: str, op_state: str,
-                        event_details: dict) -> list:
+    def get_subscribers(
+        self,
+        nsd_id: str,
+        ns_instance_id: str,
+        command: str,
+        op_state: str,
+        event_details: dict,
+    ) -> list:
         """
         Queries database and returns list of subscribers.
         :param nsd_id: NSD id of an NS whose lifecycle has changed. (scaled, terminated. etc)
@@ -267,7 +348,11 @@ class NsLcmNotification(NotificationBase):
         :param event_details: dict containing raw data of event occured.
         :return: List of interested subscribers for occurred event.
         """
-        filter_q = {"identifier": [nsd_id, ns_instance_id], "operationStates": ["ANY"], "operationTypes": ["ANY"]}
+        filter_q = {
+            "identifier": [nsd_id, ns_instance_id],
+            "operationStates": ["ANY"],
+            "operationTypes": ["ANY"],
+        }
         if op_state:
             filter_q["operationStates"].append(op_state)
         if command:
@@ -285,7 +370,6 @@ class NsLcmNotification(NotificationBase):
 
 
 class NsdNotification(NotificationBase):
-
     def __init__(self, db):
         """
         Constructor of the class
@@ -297,7 +381,6 @@ class NsdNotification(NotificationBase):
 
 
 class VnfdNotification(NotificationBase):
-
     def __init__(self, db):
         """
         Constructor of the class