X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FNBI.git;a=blobdiff_plain;f=osm_nbi%2Fnotifications.py;h=22413d0b4f97a5481e2dd415242bddf7770a4e78;hp=2e7ba94b1b4790cbf9b5b09190b72d2ddc0b2092;hb=HEAD;hpb=2278fa4af665ea7720797e13ef0055ee6e96d59f diff --git a/osm_nbi/notifications.py b/osm_nbi/notifications.py index 2e7ba94..22413d0 100644 --- a/osm_nbi/notifications.py +++ b/osm_nbi/notifications.py @@ -41,13 +41,9 @@ class NotificationException(Exception): class NotificationBase: - response_models = None # Common HTTP payload header for all notifications. - payload_header = { - "Content-Type": "application/json", - "Accept": "application/json" - } + payload_header = {"Content-Type": "application/json", "Accept": "application/json"} def __init__(self, db) -> None: """ @@ -72,13 +68,18 @@ class NotificationBase: :param kwargs: any keyword arguments needed for db query. :return: List of subscribers """ - raise NotificationException("Method get_subscribers() is not implemented", http_code=HTTPStatus.NOT_IMPLEMENTED) + raise NotificationException( + "Method get_subscribers() is not implemented", + http_code=HTTPStatus.NOT_IMPLEMENTED, + ) @staticmethod def _get_basic_auth(username: str, password: str) -> tuple: return aiohttp.BasicAuth(username, password) - def _decrypt_password(self, hashed: str, salt: str, schema_version: str = "1.1") -> str: + def _decrypt_password( + self, hashed: str, salt: str, schema_version: str = "1.1" + ) -> str: return self.db.decrypt(hashed, schema_version, salt=salt) def get_payload(self, meta_notification: dict) -> dict: @@ -90,79 +91,110 @@ class NotificationBase: model_name = meta_notification["notificationType"] response_models = self.get_models() if not response_models or not response_models.get(model_name): - raise NotificationException("Response model {} is not defined.".format(model_name), - HTTPStatus.NOT_IMPLEMENTED) + raise NotificationException( + "Response model {} is not defined.".format(model_name), + HTTPStatus.NOT_IMPLEMENTED, + ) model_keys = response_models[model_name] payload = dict.fromkeys(model_keys, "N/A") notification_keys = set(meta_notification.keys()) for model_key in model_keys.intersection(notification_keys): payload[model_key] = meta_notification[model_key] - self.logger.debug("Payload generated for subscriber: {} for {}".format(payload["subscriptionId"], - payload["notificationType"])) + self.logger.debug( + "Payload generated for subscriber: {} for {}".format( + payload["subscriptionId"], payload["notificationType"] + ) + ) return payload - async def send_notifications(self, subscribers: list, loop: asyncio.AbstractEventLoop = None): + async def send_notifications( + self, + subscribers: list, + ): """ Generate tasks for all notification for an event. :param subscribers: A list of subscribers who want to be notified for event. - :param loop: Event loop object. """ notifications = [] for subscriber in subscribers: # Notify without auth if not subscriber.get("authentication"): - notifications.append({ - "headers": self.payload_header, - "payload": self.get_payload(subscriber), - "CallbackUri": subscriber["CallbackUri"] - }) + notifications.append( + { + "headers": self.payload_header, + "payload": self.get_payload(subscriber), + "CallbackUri": subscriber["CallbackUri"], + } + ) elif subscriber["authentication"]["authType"] == "basic": salt = subscriber["subscriptionId"] - hashed_password = subscriber["authentication"]["paramsBasic"]["password"] + hashed_password = subscriber["authentication"]["paramsBasic"][ + "password" + ] password = self._decrypt_password(hashed_password, salt) - auth_basic = self._get_basic_auth(subscriber["authentication"]["paramsBasic"]["userName"], password) - notifications.append({ - "headers": self.payload_header, - "payload": self.get_payload(subscriber), - "auth_basic": auth_basic, - "CallbackUri": subscriber["CallbackUri"] - }) + auth_basic = self._get_basic_auth( + subscriber["authentication"]["paramsBasic"]["userName"], password + ) + notifications.append( + { + "headers": self.payload_header, + "payload": self.get_payload(subscriber), + "auth_basic": auth_basic, + "CallbackUri": subscriber["CallbackUri"], + } + ) # TODO add support for AuthType OAuth and TLS after support is added in subscription. else: - self.logger.debug("Subscriber {} can not be notified. {} notification auth type is not implemented" - .format(subscriber["subscriptionId"], - subscriber["authentication"]["authType"])) + self.logger.debug( + "Subscriber {} can not be notified. {} notification auth type is not implemented".format( + subscriber["subscriptionId"], + subscriber["authentication"]["authType"], + ) + ) if notifications: tasks = [] - async with aiohttp.ClientSession(loop=loop) as session: + async with aiohttp.ClientSession() as session: for notification in notifications: - tasks.append(asyncio.ensure_future(self.send_notification(session, notification, loop=loop), - loop=loop)) - await asyncio.gather(*tasks, loop=loop) - - async def send_notification(self, session: aiohttp.ClientSession, notification: dict, - loop: asyncio.AbstractEventLoop = None, retry_count: int = 5, timeout: float = 5.0): + tasks.append( + asyncio.ensure_future( + self.send_notification(session, notification), + ) + ) + await asyncio.gather(*tasks) + + async def send_notification( + self, + session: aiohttp.ClientSession, + notification: dict, + retry_count: int = 5, + timeout: float = 5.0, + ): """ Performs HTTP Post request to notify subscriber. In case if for any reason notification is not sent successfully after maximum number of reties, then notification is dropped. :param session: An aiohttp client session object to maintain http session. :param notification: A dictionary containing all necessary data to make POST request. - :param loop: Event loop object. :param retry_count: An integer specifying the maximum number of reties for a notification. :param timeout: A float representing client timeout of each HTTP request. """ backoff_delay = 1 while retry_count > 0: try: - async with session.post(url=notification["CallbackUri"], headers=notification["headers"], - auth=notification.get("auth_basic", None), - data=json.dumps(notification["payload"]), - timeout=timeout) as resp: + async with session.post( + url=notification["CallbackUri"], + headers=notification["headers"], + auth=notification.get("auth_basic", None), + data=json.dumps(notification["payload"]), + timeout=timeout, + ) as resp: # self.logger.debug("Notification response: {}".format(resp.status)) if resp.status == HTTPStatus.NO_CONTENT: - self.logger.debug("Notification sent successfully to subscriber {}" - .format(notification["payload"]["subscriptionId"])) + self.logger.debug( + "Notification sent successfully to subscriber {}".format( + notification["payload"]["subscriptionId"] + ) + ) else: error_text = "Erroneous response code: {}, ".format(resp.status) error_text += await resp.text() @@ -170,12 +202,15 @@ class NotificationBase: return True except Exception as e: error_text = type(e).__name__ + ": " + str(e) - self.logger.debug("Unable to send notification to subscriber {}. Details: {}" - .format(notification["payload"]["subscriptionId"], error_text)) + self.logger.debug( + "Unable to send notification to subscriber {}. Details: {}".format( + notification["payload"]["subscriptionId"], error_text + ) + ) error_detail = { "error": type(e).__name__, "error_text": str(e), - "timestamp": time.time() + "timestamp": time.time(), } if "error_details" in notification["payload"].keys(): notification["payload"]["error_details"].append(error_detail) @@ -183,36 +218,79 @@ class NotificationBase: notification["payload"]["error_details"] = [error_detail] retry_count -= 1 backoff_delay *= 2 - self.logger.debug("Retry Notification for subscriber: {} after backoff delay: {} seconds." - .format(notification["payload"]["subscriptionId"], backoff_delay)) - await asyncio.sleep(backoff_delay, loop=loop) + self.logger.debug( + "Retry Notification for subscriber: {} after backoff delay: {} seconds.".format( + notification["payload"]["subscriptionId"], backoff_delay + ) + ) + await asyncio.sleep(backoff_delay) # Dropping notification - self.logger.debug("Notification {} sent failed to subscriber:{}." - .format(notification["payload"]["notificationType"], - notification["payload"]["subscriptionId"])) + self.logger.debug( + "Notification {} sent failed to subscriber:{}.".format( + notification["payload"]["notificationType"], + notification["payload"]["subscriptionId"], + ) + ) return False class NsLcmNotification(NotificationBase): - + # maps kafka commands of completed operations to the original operation type + completed_operation_map = { + "INSTANTIATED": "INSTANTIATE", + "SCALED": "SCALE", + "TERMINATED": "TERMINATE", + "UPDATED": "UPDATE", + "HEALED": "HEAL", + } # SOL005 response model for nslcm notifications response_models = { - "NsLcmOperationOccurrenceNotification": {"id", "nsInstanceId", "nsLcmOpOccId", "operation", - "notificationType", "subscriptionId", "timestamp", - "notificationStatus", "operationState", "isAutomaticInvocation", - "affectedVnf", "affectedVl", "affectedVnffg", "affectedNs", - "affectedSap", "error", "_links"}, - - "NsIdentifierCreationNotification": {"notificationType", "subscriptionId", "timestamp", - "nsInstanceId", "_links"}, - - "NsIdentifierDeletionNotification": {"notificationType", "subscriptionId", "timestamp", - "nsInstanceId", "_links"}, - - "NsChangeNotification": {"nsInstanceId", "nsComponentType", "nsComponentId", - "lcmOpOccIdImpactngNsComponent", "lcmOpNameImpactingNsComponent", - "lcmOpOccStatusImpactingNsComponent", "notificationType", "subscriptionId", - "timeStamp", "error", "_links"} + "NsLcmOperationOccurrenceNotification": { + "id", + "nsInstanceId", + "nsLcmOpOccId", + "operation", + "notificationType", + "subscriptionId", + "timestamp", + "notificationStatus", + "operationState", + "isAutomaticInvocation", + "affectedVnf", + "affectedVl", + "affectedVnffg", + "affectedNs", + "affectedSap", + "error", + "_links", + }, + "NsIdentifierCreationNotification": { + "notificationType", + "subscriptionId", + "timestamp", + "nsInstanceId", + "_links", + }, + "NsIdentifierDeletionNotification": { + "notificationType", + "subscriptionId", + "timestamp", + "nsInstanceId", + "_links", + }, + "NsChangeNotification": { + "nsInstanceId", + "nsComponentType", + "nsComponentId", + "lcmOpOccIdImpactngNsComponent", + "lcmOpNameImpactingNsComponent", + "lcmOpOccStatusImpactingNsComponent", + "notificationType", + "subscriptionId", + "timeStamp", + "error", + "_links", + }, } def __init__(self, db) -> None: @@ -256,8 +334,14 @@ class NsLcmNotification(NotificationBase): subscriber.update(event_details["params"]) return subscribers - def get_subscribers(self, nsd_id: str, ns_instance_id: str, command: str, op_state: str, - event_details: dict) -> list: + def get_subscribers( + self, + nsd_id: str, + ns_instance_id: str, + command: str, + op_state: str, + event_details: dict, + ) -> list: """ Queries database and returns list of subscribers. :param nsd_id: NSD id of an NS whose lifecycle has changed. (scaled, terminated. etc) @@ -267,11 +351,23 @@ class NsLcmNotification(NotificationBase): :param event_details: dict containing raw data of event occured. :return: List of interested subscribers for occurred event. """ - filter_q = {"identifier": [nsd_id, ns_instance_id], "operationStates": ["ANY"], "operationTypes": ["ANY"]} + notification_type = [ + "NsLcmOperationOccurrenceNotification", + "NsChangeNotification", + "NsIdentifierCreationNotification", + "NsIdentifierDeletionNotification", + ] + filter_q = { + "identifier": [nsd_id, ns_instance_id], + "operationStates": ["ANY"], + "operationTypes": ["ANY"], + "notificationType": notification_type, + } if op_state: filter_q["operationStates"].append(op_state) if command: - filter_q["operationTypes"].append(command) + op_type = self.completed_operation_map.get(command, command) + filter_q["operationTypes"].append(op_type) # self.logger.debug("Db query is: {}".format(filter_q)) subscribers = [] try: @@ -284,8 +380,130 @@ class NsLcmNotification(NotificationBase): return subscribers -class NsdNotification(NotificationBase): +class VnfLcmNotification(NotificationBase): + # SOL003 response model for vnflcm notifications + response_models = { + "VnfLcmOperationOccurrenceNotification": { + "id", + "notificationType", + "subscriptionId", + "timeStamp", + "notificationStatus", + "operationState", + "vnfInstanceId", + "operation", + "isAutomaticInvocation", + "vnfLcmOpOccId", + "affectedVnfcs", + "affectedVirtualLinks", + "affectedExtLinkPorts", + "affectedVirtualStorages", + "changedInfo", + "changedExtConnectivity", + "modificationsTriggeredByVnfPkgChange", + "error", + "_links", + }, + "VnfIdentifierCreationNotification": { + "id", + "notificationType", + "subscriptionId", + "timeStamp", + "vnfInstanceId", + "_links", + }, + "VnfIdentifierDeletionNotification": { + "id", + "notificationType", + "subscriptionId", + "timeStamp", + "vnfInstanceId", + "_links", + }, + } + def __init__(self, db) -> None: + """ + Constructor of VnfLcmNotification class. + :param db: Database handler. + """ + super().__init__(db) + self.subscriber_collection = "mapped_subscriptions" + + def get_models(self) -> dict: + """ + Returns the SOL003 model of notification class + :param None + :return: dict of SOL003 data model + """ + return self.response_models + + def _format_vnflcm_subscribers( + self, subscribers: list, event_details: dict + ) -> list: + """ + Formats the raw event details from kafka message and subscriber details. + :param subscribers: A list of subscribers whom the event needs to be notified. + :param event_details: A dict containing all meta data of event. + :return: + """ + notification_id = str(uuid4()) + event_timestamp = time.time() + event_operation = event_details["command"] + for subscriber in subscribers: + subscriber["id"] = notification_id + subscriber["timeStamp"] = event_timestamp + subscriber["subscriptionId"] = subscriber["reference"] + subscriber["operation"] = event_operation + del subscriber["reference"] + del subscriber["_id"] + subscriber.update(event_details["params"]) + return subscribers + + def get_subscribers( + self, + vnfd_id: str, + vnf_instance_id: str, + command: str, + op_state: str, + event_details: dict, + ) -> list: + """ + Queries database and returns list of subscribers. + :param vnfd_id: Vnfd id of a VNF whose lifecycle has changed. (instantiated, scaled, terminated. etc) + :param vnf_instance_id: Vnf instance id of a VNF whose lifecycle has changed. + :param command: the command for event. + :param op_state: the operation state of VNF. + :param event_details: dict containing raw data of event occurred. + :return: List of interested subscribers for occurred event. + """ + notification_type = [ + "VnfIdentifierCreationNotification", + "VnfLcmOperationOccurrenceNotification", + "VnfIdentifierDeletionNotification", + ] + filter_q = { + "identifier": [vnfd_id, vnf_instance_id], + "operationStates": ["ANY"], + "operationTypes": ["ANY"], + "notificationType": notification_type, + } + if op_state: + filter_q["operationStates"].append(op_state) + if command: + filter_q["operationTypes"].append(command) + subscribers = [] + try: + subscribers = self.db.get_list(self.subscriber_collection, filter_q) + subscribers = self._format_vnflcm_subscribers(subscribers, event_details) + except Exception as e: + error_text = type(e).__name__ + ": " + str(e) + self.logger.debug("Error getting vnflcm subscribers: {}".format(error_text)) + finally: + return subscribers + + +class NsdNotification(NotificationBase): def __init__(self, db): """ Constructor of the class @@ -297,7 +515,6 @@ class NsdNotification(NotificationBase): class VnfdNotification(NotificationBase): - def __init__(self, db): """ Constructor of the class