X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FNBI.git;a=blobdiff_plain;f=osm_nbi%2Fnotifications.py;h=7b681a1f8325c4960e5c3c26d59a45aa325b900a;hp=9156c2ce659d15cf586e896be7fe210401f782b9;hb=5758955b7b394517ff5caf5506a4400cdc5aa372;hpb=42c84ea6d88ecb94c2a7d2e3e94e86522cbd19aa diff --git a/osm_nbi/notifications.py b/osm_nbi/notifications.py index 9156c2c..7b681a1 100644 --- a/osm_nbi/notifications.py +++ b/osm_nbi/notifications.py @@ -44,10 +44,7 @@ class NotificationBase: response_models = None # Common HTTP payload header for all notifications. - payload_header = { - "Content-Type": "application/json", - "Accept": "application/json" - } + payload_header = {"Content-Type": "application/json", "Accept": "application/json"} def __init__(self, db) -> None: """ @@ -72,13 +69,18 @@ class NotificationBase: :param kwargs: any keyword arguments needed for db query. :return: List of subscribers """ - raise NotificationException("Method get_subscribers() is not implemented", http_code=HTTPStatus.NOT_IMPLEMENTED) + raise NotificationException( + "Method get_subscribers() is not implemented", + http_code=HTTPStatus.NOT_IMPLEMENTED, + ) @staticmethod def _get_basic_auth(username: str, password: str) -> tuple: return aiohttp.BasicAuth(username, password) - def _decrypt_password(self, hashed: str, salt: str, schema_version: str = "1.1") -> str: + def _decrypt_password( + self, hashed: str, salt: str, schema_version: str = "1.1" + ) -> str: return self.db.decrypt(hashed, schema_version, salt=salt) def get_payload(self, meta_notification: dict) -> dict: @@ -90,18 +92,25 @@ class NotificationBase: model_name = meta_notification["notificationType"] response_models = self.get_models() if not response_models or not response_models.get(model_name): - raise NotificationException("Response model {} is not defined.".format(model_name), - HTTPStatus.NOT_IMPLEMENTED) + raise NotificationException( + "Response model {} is not defined.".format(model_name), + HTTPStatus.NOT_IMPLEMENTED, + ) model_keys = response_models[model_name] payload = dict.fromkeys(model_keys, "N/A") notification_keys = set(meta_notification.keys()) for model_key in model_keys.intersection(notification_keys): payload[model_key] = meta_notification[model_key] - self.logger.debug("Payload generated for subscriber: {} for {}".format(payload["subscriptionId"], - payload["notificationType"])) + self.logger.debug( + "Payload generated for subscriber: {} for {}".format( + payload["subscriptionId"], payload["notificationType"] + ) + ) return payload - async def send_notifications(self, subscribers: list, loop: asyncio.AbstractEventLoop = None): + async def send_notifications( + self, subscribers: list, loop: asyncio.AbstractEventLoop = None + ): """ Generate tasks for all notification for an event. :param subscribers: A list of subscribers who want to be notified for event. @@ -111,36 +120,59 @@ class NotificationBase: for subscriber in subscribers: # Notify without auth if not subscriber.get("authentication"): - notifications.append({ - "headers": self.payload_header, - "payload": self.get_payload(subscriber), - "CallbackUri": subscriber["CallbackUri"] - }) + notifications.append( + { + "headers": self.payload_header, + "payload": self.get_payload(subscriber), + "CallbackUri": subscriber["CallbackUri"], + } + ) elif subscriber["authentication"]["authType"] == "basic": salt = subscriber["subscriptionId"] - hashed_password = subscriber["authentication"]["paramsBasic"]["password"] + hashed_password = subscriber["authentication"]["paramsBasic"][ + "password" + ] password = self._decrypt_password(hashed_password, salt) - auth_basic = self._get_basic_auth(subscriber["authentication"]["paramsBasic"]["userName"], password) - notifications.append({ - "headers": self.payload_header, - "payload": self.get_payload(subscriber), - "auth_basic": auth_basic, - "CallbackUri": subscriber["CallbackUri"] - }) + auth_basic = self._get_basic_auth( + subscriber["authentication"]["paramsBasic"]["userName"], password + ) + notifications.append( + { + "headers": self.payload_header, + "payload": self.get_payload(subscriber), + "auth_basic": auth_basic, + "CallbackUri": subscriber["CallbackUri"], + } + ) # TODO add support for AuthType OAuth and TLS after support is added in subscription. else: - self.logger.debug("Subscriber {} can not be notified. {} notification auth type is not implemented" - .format(subscriber["subscriptionId"], - subscriber["authentication"]["authType"])) - - tasks = [] - async with aiohttp.ClientSession(loop=loop) as session: - for notification in notifications: - tasks.append(asyncio.ensure_future(self.send_notification(session, notification, loop=loop), loop=loop)) - await asyncio.gather(*tasks, loop=loop) - - async def send_notification(self, session: aiohttp.ClientSession, notification: dict, - loop: asyncio.AbstractEventLoop = None, retry_count: int = 5, timeout: float = 5.0): + self.logger.debug( + "Subscriber {} can not be notified. {} notification auth type is not implemented".format( + subscriber["subscriptionId"], + subscriber["authentication"]["authType"], + ) + ) + + if notifications: + tasks = [] + async with aiohttp.ClientSession(loop=loop) as session: + for notification in notifications: + tasks.append( + asyncio.ensure_future( + self.send_notification(session, notification, loop=loop), + loop=loop, + ) + ) + await asyncio.gather(*tasks, loop=loop) + + async def send_notification( + self, + session: aiohttp.ClientSession, + notification: dict, + loop: asyncio.AbstractEventLoop = None, + retry_count: int = 5, + timeout: float = 5.0, + ): """ Performs HTTP Post request to notify subscriber. In case if for any reason notification is not sent successfully after maximum number of reties, then notification is dropped. @@ -153,14 +185,20 @@ class NotificationBase: backoff_delay = 1 while retry_count > 0: try: - async with session.post(url=notification["CallbackUri"], headers=notification["headers"], - auth=notification.get("auth_basic", None), - data=json.dumps(notification["payload"]), - timeout=timeout) as resp: + async with session.post( + url=notification["CallbackUri"], + headers=notification["headers"], + auth=notification.get("auth_basic", None), + data=json.dumps(notification["payload"]), + timeout=timeout, + ) as resp: # self.logger.debug("Notification response: {}".format(resp.status)) if resp.status == HTTPStatus.NO_CONTENT: - self.logger.debug("Notification sent successfully to subscriber {}" - .format(notification["payload"]["subscriptionId"])) + self.logger.debug( + "Notification sent successfully to subscriber {}".format( + notification["payload"]["subscriptionId"] + ) + ) else: error_text = "Erroneous response code: {}, ".format(resp.status) error_text += await resp.text() @@ -168,12 +206,15 @@ class NotificationBase: return True except Exception as e: error_text = type(e).__name__ + ": " + str(e) - self.logger.debug("Unable to send notification to subscriber {}. Details: {}" - .format(notification["payload"]["subscriptionId"], error_text)) + self.logger.debug( + "Unable to send notification to subscriber {}. Details: {}".format( + notification["payload"]["subscriptionId"], error_text + ) + ) error_detail = { "error": type(e).__name__, "error_text": str(e), - "timestamp": time.time() + "timestamp": time.time(), } if "error_details" in notification["payload"].keys(): notification["payload"]["error_details"].append(error_detail) @@ -181,13 +222,19 @@ class NotificationBase: notification["payload"]["error_details"] = [error_detail] retry_count -= 1 backoff_delay *= 2 - self.logger.debug("Retry Notification for subscriber: {} after backoff delay: {} seconds." - .format(notification["payload"]["subscriptionId"], backoff_delay)) + self.logger.debug( + "Retry Notification for subscriber: {} after backoff delay: {} seconds.".format( + notification["payload"]["subscriptionId"], backoff_delay + ) + ) await asyncio.sleep(backoff_delay, loop=loop) # Dropping notification - self.logger.debug("Notification {} sent failed to subscriber:{}." - .format(notification["payload"]["notificationType"], - notification["payload"]["subscriptionId"])) + self.logger.debug( + "Notification {} sent failed to subscriber:{}.".format( + notification["payload"]["notificationType"], + notification["payload"]["subscriptionId"], + ) + ) return False @@ -195,22 +242,52 @@ class NsLcmNotification(NotificationBase): # SOL005 response model for nslcm notifications response_models = { - "NsLcmOperationOccurrenceNotification": {"id", "nsInstanceId", "nsLcmOpOccId", "operation", - "notificationType", "subscriptionId", "timestamp", - "notificationStatus", "operationState", "isAutomaticInvocation", - "affectedVnf", "affectedVl", "affectedVnffg", "affectedNs", - "affectedSap", "error", "_links"}, - - "NsIdentifierCreationNotification": {"notificationType", "subscriptionId", "timestamp", - "nsInstanceId", "_links"}, - - "NsIdentifierDeletionNotification": {"notificationType", "subscriptionId", "timestamp", - "nsInstanceId", "_links"}, - - "NsChangeNotification": {"nsInstanceId", "nsComponentType", "nsComponentId", - "lcmOpOccIdImpactngNsComponent", "lcmOpNameImpactingNsComponent", - "lcmOpOccStatusImpactingNsComponent", "notificationType", "subscriptionId", - "timeStamp", "error", "_links"} + "NsLcmOperationOccurrenceNotification": { + "id", + "nsInstanceId", + "nsLcmOpOccId", + "operation", + "notificationType", + "subscriptionId", + "timestamp", + "notificationStatus", + "operationState", + "isAutomaticInvocation", + "affectedVnf", + "affectedVl", + "affectedVnffg", + "affectedNs", + "affectedSap", + "error", + "_links", + }, + "NsIdentifierCreationNotification": { + "notificationType", + "subscriptionId", + "timestamp", + "nsInstanceId", + "_links", + }, + "NsIdentifierDeletionNotification": { + "notificationType", + "subscriptionId", + "timestamp", + "nsInstanceId", + "_links", + }, + "NsChangeNotification": { + "nsInstanceId", + "nsComponentType", + "nsComponentId", + "lcmOpOccIdImpactngNsComponent", + "lcmOpNameImpactingNsComponent", + "lcmOpOccStatusImpactingNsComponent", + "notificationType", + "subscriptionId", + "timeStamp", + "error", + "_links", + }, } def __init__(self, db) -> None: @@ -254,8 +331,14 @@ class NsLcmNotification(NotificationBase): subscriber.update(event_details["params"]) return subscribers - def get_subscribers(self, nsd_id: str, ns_instance_id: str, command: str, op_state: str, - event_details: dict) -> list: + def get_subscribers( + self, + nsd_id: str, + ns_instance_id: str, + command: str, + op_state: str, + event_details: dict, + ) -> list: """ Queries database and returns list of subscribers. :param nsd_id: NSD id of an NS whose lifecycle has changed. (scaled, terminated. etc) @@ -265,7 +348,11 @@ class NsLcmNotification(NotificationBase): :param event_details: dict containing raw data of event occured. :return: List of interested subscribers for occurred event. """ - filter_q = {"identifier": [nsd_id, ns_instance_id], "operationStates": ["ANY"], "operationTypes": ["ANY"]} + filter_q = { + "identifier": [nsd_id, ns_instance_id], + "operationStates": ["ANY"], + "operationTypes": ["ANY"], + } if op_state: filter_q["operationStates"].append(op_state) if command: @@ -283,7 +370,6 @@ class NsLcmNotification(NotificationBase): class NsdNotification(NotificationBase): - def __init__(self, db): """ Constructor of the class @@ -295,7 +381,6 @@ class NsdNotification(NotificationBase): class VnfdNotification(NotificationBase): - def __init__(self, db): """ Constructor of the class