Feature 10926 - Subscription feature for SOL003 VNF-LCM
[osm/NBI.git] / osm_nbi / notifications.py
index 9156c2c..47a24ba 100644 (file)
@@ -44,10 +44,7 @@ class NotificationBase:
 
     response_models = None
     # Common HTTP payload header for all notifications.
 
     response_models = None
     # Common HTTP payload header for all notifications.
-    payload_header = {
-        "Content-Type": "application/json",
-        "Accept": "application/json"
-    }
+    payload_header = {"Content-Type": "application/json", "Accept": "application/json"}
 
     def __init__(self, db) -> None:
         """
 
     def __init__(self, db) -> None:
         """
@@ -72,13 +69,18 @@ class NotificationBase:
         :param kwargs: any keyword arguments needed for db query.
         :return: List of subscribers
         """
         :param kwargs: any keyword arguments needed for db query.
         :return: List of subscribers
         """
-        raise NotificationException("Method get_subscribers() is not implemented", http_code=HTTPStatus.NOT_IMPLEMENTED)
+        raise NotificationException(
+            "Method get_subscribers() is not implemented",
+            http_code=HTTPStatus.NOT_IMPLEMENTED,
+        )
 
     @staticmethod
     def _get_basic_auth(username: str, password: str) -> tuple:
         return aiohttp.BasicAuth(username, password)
 
 
     @staticmethod
     def _get_basic_auth(username: str, password: str) -> tuple:
         return aiohttp.BasicAuth(username, password)
 
-    def _decrypt_password(self, hashed: str, salt: str, schema_version: str = "1.1") -> str:
+    def _decrypt_password(
+        self, hashed: str, salt: str, schema_version: str = "1.1"
+    ) -> str:
         return self.db.decrypt(hashed, schema_version, salt=salt)
 
     def get_payload(self, meta_notification: dict) -> dict:
         return self.db.decrypt(hashed, schema_version, salt=salt)
 
     def get_payload(self, meta_notification: dict) -> dict:
@@ -90,18 +92,25 @@ class NotificationBase:
         model_name = meta_notification["notificationType"]
         response_models = self.get_models()
         if not response_models or not response_models.get(model_name):
         model_name = meta_notification["notificationType"]
         response_models = self.get_models()
         if not response_models or not response_models.get(model_name):
-            raise NotificationException("Response model {} is not defined.".format(model_name),
-                                        HTTPStatus.NOT_IMPLEMENTED)
+            raise NotificationException(
+                "Response model {} is not defined.".format(model_name),
+                HTTPStatus.NOT_IMPLEMENTED,
+            )
         model_keys = response_models[model_name]
         payload = dict.fromkeys(model_keys, "N/A")
         notification_keys = set(meta_notification.keys())
         for model_key in model_keys.intersection(notification_keys):
             payload[model_key] = meta_notification[model_key]
         model_keys = response_models[model_name]
         payload = dict.fromkeys(model_keys, "N/A")
         notification_keys = set(meta_notification.keys())
         for model_key in model_keys.intersection(notification_keys):
             payload[model_key] = meta_notification[model_key]
-        self.logger.debug("Payload generated for subscriber: {} for {}".format(payload["subscriptionId"],
-                                                                               payload["notificationType"]))
+        self.logger.debug(
+            "Payload generated for subscriber: {} for {}".format(
+                payload["subscriptionId"], payload["notificationType"]
+            )
+        )
         return payload
 
         return payload
 
-    async def send_notifications(self, subscribers: list, loop: asyncio.AbstractEventLoop = None):
+    async def send_notifications(
+        self, subscribers: list, loop: asyncio.AbstractEventLoop = None
+    ):
         """
         Generate tasks for all notification for an event.
         :param subscribers: A list of subscribers who want to be notified for event.
         """
         Generate tasks for all notification for an event.
         :param subscribers: A list of subscribers who want to be notified for event.
@@ -111,36 +120,59 @@ class NotificationBase:
         for subscriber in subscribers:
             # Notify without auth
             if not subscriber.get("authentication"):
         for subscriber in subscribers:
             # Notify without auth
             if not subscriber.get("authentication"):
-                notifications.append({
-                    "headers": self.payload_header,
-                    "payload": self.get_payload(subscriber),
-                    "CallbackUri": subscriber["CallbackUri"]
-                })
+                notifications.append(
+                    {
+                        "headers": self.payload_header,
+                        "payload": self.get_payload(subscriber),
+                        "CallbackUri": subscriber["CallbackUri"],
+                    }
+                )
             elif subscriber["authentication"]["authType"] == "basic":
                 salt = subscriber["subscriptionId"]
             elif subscriber["authentication"]["authType"] == "basic":
                 salt = subscriber["subscriptionId"]
-                hashed_password = subscriber["authentication"]["paramsBasic"]["password"]
+                hashed_password = subscriber["authentication"]["paramsBasic"][
+                    "password"
+                ]
                 password = self._decrypt_password(hashed_password, salt)
                 password = self._decrypt_password(hashed_password, salt)
-                auth_basic = self._get_basic_auth(subscriber["authentication"]["paramsBasic"]["userName"], password)
-                notifications.append({
-                    "headers": self.payload_header,
-                    "payload": self.get_payload(subscriber),
-                    "auth_basic": auth_basic,
-                    "CallbackUri": subscriber["CallbackUri"]
-                })
+                auth_basic = self._get_basic_auth(
+                    subscriber["authentication"]["paramsBasic"]["userName"], password
+                )
+                notifications.append(
+                    {
+                        "headers": self.payload_header,
+                        "payload": self.get_payload(subscriber),
+                        "auth_basic": auth_basic,
+                        "CallbackUri": subscriber["CallbackUri"],
+                    }
+                )
             # TODO add support for AuthType OAuth and TLS after support is added in subscription.
             else:
             # TODO add support for AuthType OAuth and TLS after support is added in subscription.
             else:
-                self.logger.debug("Subscriber {} can not be notified. {} notification auth type is not implemented"
-                                  .format(subscriber["subscriptionId"],
-                                          subscriber["authentication"]["authType"]))
-
-        tasks = []
-        async with aiohttp.ClientSession(loop=loop) as session:
-            for notification in notifications:
-                tasks.append(asyncio.ensure_future(self.send_notification(session, notification, loop=loop), loop=loop))
-            await asyncio.gather(*tasks, loop=loop)
-
-    async def send_notification(self, session: aiohttp.ClientSession, notification: dict,
-                                loop: asyncio.AbstractEventLoop = None, retry_count: int = 5, timeout: float = 5.0):
+                self.logger.debug(
+                    "Subscriber {} can not be notified. {} notification auth type is not implemented".format(
+                        subscriber["subscriptionId"],
+                        subscriber["authentication"]["authType"],
+                    )
+                )
+
+        if notifications:
+            tasks = []
+            async with aiohttp.ClientSession(loop=loop) as session:
+                for notification in notifications:
+                    tasks.append(
+                        asyncio.ensure_future(
+                            self.send_notification(session, notification, loop=loop),
+                            loop=loop,
+                        )
+                    )
+                await asyncio.gather(*tasks, loop=loop)
+
+    async def send_notification(
+        self,
+        session: aiohttp.ClientSession,
+        notification: dict,
+        loop: asyncio.AbstractEventLoop = None,
+        retry_count: int = 5,
+        timeout: float = 5.0,
+    ):
         """
         Performs HTTP Post request to notify subscriber. In case if for any reason notification is not sent successfully
         after maximum number of reties, then notification is dropped.
         """
         Performs HTTP Post request to notify subscriber. In case if for any reason notification is not sent successfully
         after maximum number of reties, then notification is dropped.
@@ -153,14 +185,20 @@ class NotificationBase:
         backoff_delay = 1
         while retry_count > 0:
             try:
         backoff_delay = 1
         while retry_count > 0:
             try:
-                async with session.post(url=notification["CallbackUri"], headers=notification["headers"],
-                                        auth=notification.get("auth_basic", None),
-                                        data=json.dumps(notification["payload"]),
-                                        timeout=timeout) as resp:
+                async with session.post(
+                    url=notification["CallbackUri"],
+                    headers=notification["headers"],
+                    auth=notification.get("auth_basic", None),
+                    data=json.dumps(notification["payload"]),
+                    timeout=timeout,
+                ) as resp:
                     # self.logger.debug("Notification response: {}".format(resp.status))
                     if resp.status == HTTPStatus.NO_CONTENT:
                     # self.logger.debug("Notification response: {}".format(resp.status))
                     if resp.status == HTTPStatus.NO_CONTENT:
-                        self.logger.debug("Notification sent successfully to subscriber {}"
-                                          .format(notification["payload"]["subscriptionId"]))
+                        self.logger.debug(
+                            "Notification sent successfully to subscriber {}".format(
+                                notification["payload"]["subscriptionId"]
+                            )
+                        )
                     else:
                         error_text = "Erroneous response code: {}, ".format(resp.status)
                         error_text += await resp.text()
                     else:
                         error_text = "Erroneous response code: {}, ".format(resp.status)
                         error_text += await resp.text()
@@ -168,12 +206,15 @@ class NotificationBase:
                 return True
             except Exception as e:
                 error_text = type(e).__name__ + ": " + str(e)
                 return True
             except Exception as e:
                 error_text = type(e).__name__ + ": " + str(e)
-                self.logger.debug("Unable to send notification to subscriber {}. Details: {}"
-                                  .format(notification["payload"]["subscriptionId"], error_text))
+                self.logger.debug(
+                    "Unable to send notification to subscriber {}. Details: {}".format(
+                        notification["payload"]["subscriptionId"], error_text
+                    )
+                )
                 error_detail = {
                     "error": type(e).__name__,
                     "error_text": str(e),
                 error_detail = {
                     "error": type(e).__name__,
                     "error_text": str(e),
-                    "timestamp": time.time()
+                    "timestamp": time.time(),
                 }
                 if "error_details" in notification["payload"].keys():
                     notification["payload"]["error_details"].append(error_detail)
                 }
                 if "error_details" in notification["payload"].keys():
                     notification["payload"]["error_details"].append(error_detail)
@@ -181,13 +222,19 @@ class NotificationBase:
                     notification["payload"]["error_details"] = [error_detail]
                 retry_count -= 1
                 backoff_delay *= 2
                     notification["payload"]["error_details"] = [error_detail]
                 retry_count -= 1
                 backoff_delay *= 2
-                self.logger.debug("Retry Notification for subscriber: {} after backoff delay: {} seconds."
-                                  .format(notification["payload"]["subscriptionId"], backoff_delay))
+                self.logger.debug(
+                    "Retry Notification for subscriber: {} after backoff delay: {} seconds.".format(
+                        notification["payload"]["subscriptionId"], backoff_delay
+                    )
+                )
                 await asyncio.sleep(backoff_delay, loop=loop)
         # Dropping notification
                 await asyncio.sleep(backoff_delay, loop=loop)
         # Dropping notification
-        self.logger.debug("Notification {} sent failed to subscriber:{}."
-                          .format(notification["payload"]["notificationType"],
-                                  notification["payload"]["subscriptionId"]))
+        self.logger.debug(
+            "Notification {} sent failed to subscriber:{}.".format(
+                notification["payload"]["notificationType"],
+                notification["payload"]["subscriptionId"],
+            )
+        )
         return False
 
 
         return False
 
 
@@ -195,22 +242,52 @@ class NsLcmNotification(NotificationBase):
 
     # SOL005 response model for nslcm notifications
     response_models = {
 
     # SOL005 response model for nslcm notifications
     response_models = {
-        "NsLcmOperationOccurrenceNotification": {"id", "nsInstanceId", "nsLcmOpOccId", "operation",
-                                                 "notificationType", "subscriptionId", "timestamp",
-                                                 "notificationStatus", "operationState", "isAutomaticInvocation",
-                                                 "affectedVnf", "affectedVl", "affectedVnffg", "affectedNs",
-                                                 "affectedSap", "error", "_links"},
-
-        "NsIdentifierCreationNotification": {"notificationType", "subscriptionId", "timestamp",
-                                             "nsInstanceId", "_links"},
-
-        "NsIdentifierDeletionNotification": {"notificationType", "subscriptionId", "timestamp",
-                                             "nsInstanceId", "_links"},
-
-        "NsChangeNotification": {"nsInstanceId", "nsComponentType", "nsComponentId",
-                                 "lcmOpOccIdImpactngNsComponent", "lcmOpNameImpactingNsComponent",
-                                 "lcmOpOccStatusImpactingNsComponent", "notificationType", "subscriptionId",
-                                 "timeStamp", "error", "_links"}
+        "NsLcmOperationOccurrenceNotification": {
+            "id",
+            "nsInstanceId",
+            "nsLcmOpOccId",
+            "operation",
+            "notificationType",
+            "subscriptionId",
+            "timestamp",
+            "notificationStatus",
+            "operationState",
+            "isAutomaticInvocation",
+            "affectedVnf",
+            "affectedVl",
+            "affectedVnffg",
+            "affectedNs",
+            "affectedSap",
+            "error",
+            "_links",
+        },
+        "NsIdentifierCreationNotification": {
+            "notificationType",
+            "subscriptionId",
+            "timestamp",
+            "nsInstanceId",
+            "_links",
+        },
+        "NsIdentifierDeletionNotification": {
+            "notificationType",
+            "subscriptionId",
+            "timestamp",
+            "nsInstanceId",
+            "_links",
+        },
+        "NsChangeNotification": {
+            "nsInstanceId",
+            "nsComponentType",
+            "nsComponentId",
+            "lcmOpOccIdImpactngNsComponent",
+            "lcmOpNameImpactingNsComponent",
+            "lcmOpOccStatusImpactingNsComponent",
+            "notificationType",
+            "subscriptionId",
+            "timeStamp",
+            "error",
+            "_links",
+        },
     }
 
     def __init__(self, db) -> None:
     }
 
     def __init__(self, db) -> None:
@@ -254,8 +331,14 @@ class NsLcmNotification(NotificationBase):
             subscriber.update(event_details["params"])
         return subscribers
 
             subscriber.update(event_details["params"])
         return subscribers
 
-    def get_subscribers(self, nsd_id: str, ns_instance_id: str, command: str, op_state: str,
-                        event_details: dict) -> list:
+    def get_subscribers(
+        self,
+        nsd_id: str,
+        ns_instance_id: str,
+        command: str,
+        op_state: str,
+        event_details: dict,
+    ) -> list:
         """
         Queries database and returns list of subscribers.
         :param nsd_id: NSD id of an NS whose lifecycle has changed. (scaled, terminated. etc)
         """
         Queries database and returns list of subscribers.
         :param nsd_id: NSD id of an NS whose lifecycle has changed. (scaled, terminated. etc)
@@ -265,7 +348,18 @@ class NsLcmNotification(NotificationBase):
         :param event_details: dict containing raw data of event occured.
         :return: List of interested subscribers for occurred event.
         """
         :param event_details: dict containing raw data of event occured.
         :return: List of interested subscribers for occurred event.
         """
-        filter_q = {"identifier": [nsd_id, ns_instance_id], "operationStates": ["ANY"], "operationTypes": ["ANY"]}
+        notification_type = [
+            "NsLcmOperationOccurrenceNotification",
+            "NsChangeNotification",
+            "NsIdentifierCreationNotification",
+            "NsIdentifierDeletionNotification"
+        ]
+        filter_q = {
+            "identifier": [nsd_id, ns_instance_id],
+            "operationStates": ["ANY"],
+            "operationTypes": ["ANY"],
+            "notificationType": notification_type
+            }
         if op_state:
             filter_q["operationStates"].append(op_state)
         if command:
         if op_state:
             filter_q["operationStates"].append(op_state)
         if command:
@@ -282,8 +376,122 @@ class NsLcmNotification(NotificationBase):
             return subscribers
 
 
             return subscribers
 
 
-class NsdNotification(NotificationBase):
+class VnfLcmNotification(NotificationBase):
+    # SOL003 response model for vnflcm notifications
+    response_models = {
+        "VnfLcmOperationOccurrenceNotification": {
+            "id",
+            "notificationType",
+            "subscriptionId",
+            "timeStamp",
+            "notificationStatus",
+            "operationState",
+            "vnfInstanceId",
+            "operation",
+            "isAutomaticInvocation",
+            "vnfLcmOpOccId",
+            "affectedVnfcs",
+            "affectedVirtualLinks",
+            "affectedExtLinkPorts",
+            "affectedVirtualStorages",
+            "changedInfo",
+            "changedExtConnectivity",
+            "modificationsTriggeredByVnfPkgChange",
+            "error",
+            "_links"
+        },
+        "VnfIdentifierCreationNotification": {
+            "id",
+            "notificationType",
+            "subscriptionId",
+            "timeStamp",
+            "vnfInstanceId",
+            "_links"
+        },
+        "VnfIdentifierDeletionNotification": {
+            "id",
+            "notificationType",
+            "subscriptionId",
+            "timeStamp",
+            "vnfInstanceId",
+            "_links"
+        },
+    }
+
+    def __init__(self, db) -> None:
+        """
+        Constructor of VnfLcmNotification class.
+        :param db: Database handler.
+        """
+        super().__init__(db)
+        self.subscriber_collection = "mapped_subscriptions"
+
+    def get_models(self) -> dict:
+        """
+        Returns the SOL003 model of notification class
+        :param None
+        :return: dict of SOL003 data model
+        """
+        return self.response_models
+
+    def _format_vnflcm_subscribers(self, subscribers: list, event_details: dict) -> list:
+        """
+        Formats the raw event details from kafka message and subscriber details.
+        :param subscribers: A list of subscribers whom the event needs to be notified.
+        :param event_details: A dict containing all meta data of event.
+        :return:
+        """
+        notification_id = str(uuid4())
+        event_timestamp = time.time()
+        event_operation = event_details["command"]
+        for subscriber in subscribers:
+            subscriber["id"] = notification_id
+            subscriber["timeStamp"] = event_timestamp
+            subscriber["subscriptionId"] = subscriber["reference"]
+            subscriber["operation"] = event_operation
+            del subscriber["reference"]
+            del subscriber["_id"]
+            subscriber.update(event_details["params"])
+        return subscribers
+
+    def get_subscribers(self, vnfd_id: str, vnf_instance_id: str, command: str, op_state: str,
+                        event_details: dict) -> list:
+        """
+        Queries database and returns list of subscribers.
+        :param vnfd_id: Vnfd id of a VNF whose lifecycle has changed. (instantiated, scaled, terminated. etc)
+        :param vnf_instance_id: Vnf instance id of a VNF whose lifecycle has changed.
+        :param command: the command for event.
+        :param op_state: the operation state of VNF.
+        :param event_details: dict containing raw data of event occurred.
+        :return: List of interested subscribers for occurred event.
+        """
+        notification_type = [
+            "VnfIdentifierCreationNotification",
+            "VnfLcmOperationOccurrenceNotification",
+            "VnfIdentifierDeletionNotification"
+        ]
+        filter_q = {
+            "identifier": [vnfd_id, vnf_instance_id],
+            "operationStates": ["ANY"],
+            "operationTypes": ["ANY"],
+            "notificationType": notification_type
+        }
+        if op_state:
+            filter_q["operationStates"].append(op_state)
+        if command:
+            filter_q["operationTypes"].append(command)
+        subscribers = []
+        try:
+            subscribers = self.db.get_list(self.subscriber_collection, filter_q)
+            subscribers = self._format_vnflcm_subscribers(subscribers, event_details)
+        except Exception as e:
+            error_text = type(e).__name__ + ": " + str(e)
+            self.logger.debug("Error getting vnflcm subscribers: {}".format(error_text))
+        finally:
+            return subscribers
 
 
+
+class NsdNotification(NotificationBase):
     def __init__(self, db):
         """
         Constructor of the class
     def __init__(self, db):
         """
         Constructor of the class
@@ -295,7 +503,6 @@ class NsdNotification(NotificationBase):
 
 
 class VnfdNotification(NotificationBase):
 
 
 class VnfdNotification(NotificationBase):
-
     def __init__(self, db):
         """
         Constructor of the class
     def __init__(self, db):
         """
         Constructor of the class