X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FNBI.git;a=blobdiff_plain;f=osm_nbi%2Fnotifications.py;h=22413d0b4f97a5481e2dd415242bddf7770a4e78;hp=7b681a1f8325c4960e5c3c26d59a45aa325b900a;hb=HEAD;hpb=4568a372eb5a204e04d917213de03ec51f9110c1 diff --git a/osm_nbi/notifications.py b/osm_nbi/notifications.py index 7b681a1..22413d0 100644 --- a/osm_nbi/notifications.py +++ b/osm_nbi/notifications.py @@ -41,7 +41,6 @@ class NotificationException(Exception): class NotificationBase: - response_models = None # Common HTTP payload header for all notifications. payload_header = {"Content-Type": "application/json", "Accept": "application/json"} @@ -109,12 +108,12 @@ class NotificationBase: return payload async def send_notifications( - self, subscribers: list, loop: asyncio.AbstractEventLoop = None + self, + subscribers: list, ): """ Generate tasks for all notification for an event. :param subscribers: A list of subscribers who want to be notified for event. - :param loop: Event loop object. """ notifications = [] for subscriber in subscribers: @@ -155,21 +154,19 @@ class NotificationBase: if notifications: tasks = [] - async with aiohttp.ClientSession(loop=loop) as session: + async with aiohttp.ClientSession() as session: for notification in notifications: tasks.append( asyncio.ensure_future( - self.send_notification(session, notification, loop=loop), - loop=loop, + self.send_notification(session, notification), ) ) - await asyncio.gather(*tasks, loop=loop) + await asyncio.gather(*tasks) async def send_notification( self, session: aiohttp.ClientSession, notification: dict, - loop: asyncio.AbstractEventLoop = None, retry_count: int = 5, timeout: float = 5.0, ): @@ -178,7 +175,6 @@ class NotificationBase: after maximum number of reties, then notification is dropped. :param session: An aiohttp client session object to maintain http session. :param notification: A dictionary containing all necessary data to make POST request. - :param loop: Event loop object. :param retry_count: An integer specifying the maximum number of reties for a notification. :param timeout: A float representing client timeout of each HTTP request. """ @@ -227,7 +223,7 @@ class NotificationBase: notification["payload"]["subscriptionId"], backoff_delay ) ) - await asyncio.sleep(backoff_delay, loop=loop) + await asyncio.sleep(backoff_delay) # Dropping notification self.logger.debug( "Notification {} sent failed to subscriber:{}.".format( @@ -239,7 +235,14 @@ class NotificationBase: class NsLcmNotification(NotificationBase): - + # maps kafka commands of completed operations to the original operation type + completed_operation_map = { + "INSTANTIATED": "INSTANTIATE", + "SCALED": "SCALE", + "TERMINATED": "TERMINATE", + "UPDATED": "UPDATE", + "HEALED": "HEAL", + } # SOL005 response model for nslcm notifications response_models = { "NsLcmOperationOccurrenceNotification": { @@ -348,15 +351,23 @@ class NsLcmNotification(NotificationBase): :param event_details: dict containing raw data of event occured. :return: List of interested subscribers for occurred event. """ + notification_type = [ + "NsLcmOperationOccurrenceNotification", + "NsChangeNotification", + "NsIdentifierCreationNotification", + "NsIdentifierDeletionNotification", + ] filter_q = { "identifier": [nsd_id, ns_instance_id], "operationStates": ["ANY"], "operationTypes": ["ANY"], + "notificationType": notification_type, } if op_state: filter_q["operationStates"].append(op_state) if command: - filter_q["operationTypes"].append(command) + op_type = self.completed_operation_map.get(command, command) + filter_q["operationTypes"].append(op_type) # self.logger.debug("Db query is: {}".format(filter_q)) subscribers = [] try: @@ -369,6 +380,129 @@ class NsLcmNotification(NotificationBase): return subscribers +class VnfLcmNotification(NotificationBase): + # SOL003 response model for vnflcm notifications + response_models = { + "VnfLcmOperationOccurrenceNotification": { + "id", + "notificationType", + "subscriptionId", + "timeStamp", + "notificationStatus", + "operationState", + "vnfInstanceId", + "operation", + "isAutomaticInvocation", + "vnfLcmOpOccId", + "affectedVnfcs", + "affectedVirtualLinks", + "affectedExtLinkPorts", + "affectedVirtualStorages", + "changedInfo", + "changedExtConnectivity", + "modificationsTriggeredByVnfPkgChange", + "error", + "_links", + }, + "VnfIdentifierCreationNotification": { + "id", + "notificationType", + "subscriptionId", + "timeStamp", + "vnfInstanceId", + "_links", + }, + "VnfIdentifierDeletionNotification": { + "id", + "notificationType", + "subscriptionId", + "timeStamp", + "vnfInstanceId", + "_links", + }, + } + + def __init__(self, db) -> None: + """ + Constructor of VnfLcmNotification class. + :param db: Database handler. + """ + super().__init__(db) + self.subscriber_collection = "mapped_subscriptions" + + def get_models(self) -> dict: + """ + Returns the SOL003 model of notification class + :param None + :return: dict of SOL003 data model + """ + return self.response_models + + def _format_vnflcm_subscribers( + self, subscribers: list, event_details: dict + ) -> list: + """ + Formats the raw event details from kafka message and subscriber details. + :param subscribers: A list of subscribers whom the event needs to be notified. + :param event_details: A dict containing all meta data of event. + :return: + """ + notification_id = str(uuid4()) + event_timestamp = time.time() + event_operation = event_details["command"] + for subscriber in subscribers: + subscriber["id"] = notification_id + subscriber["timeStamp"] = event_timestamp + subscriber["subscriptionId"] = subscriber["reference"] + subscriber["operation"] = event_operation + del subscriber["reference"] + del subscriber["_id"] + subscriber.update(event_details["params"]) + return subscribers + + def get_subscribers( + self, + vnfd_id: str, + vnf_instance_id: str, + command: str, + op_state: str, + event_details: dict, + ) -> list: + """ + Queries database and returns list of subscribers. + :param vnfd_id: Vnfd id of a VNF whose lifecycle has changed. (instantiated, scaled, terminated. etc) + :param vnf_instance_id: Vnf instance id of a VNF whose lifecycle has changed. + :param command: the command for event. + :param op_state: the operation state of VNF. + :param event_details: dict containing raw data of event occurred. + :return: List of interested subscribers for occurred event. + """ + notification_type = [ + "VnfIdentifierCreationNotification", + "VnfLcmOperationOccurrenceNotification", + "VnfIdentifierDeletionNotification", + ] + filter_q = { + "identifier": [vnfd_id, vnf_instance_id], + "operationStates": ["ANY"], + "operationTypes": ["ANY"], + "notificationType": notification_type, + } + if op_state: + filter_q["operationStates"].append(op_state) + if command: + filter_q["operationTypes"].append(command) + subscribers = [] + try: + subscribers = self.db.get_list(self.subscriber_collection, filter_q) + subscribers = self._format_vnflcm_subscribers(subscribers, event_details) + except Exception as e: + error_text = type(e).__name__ + ": " + str(e) + self.logger.debug("Error getting vnflcm subscribers: {}".format(error_text)) + finally: + return subscribers + + class NsdNotification(NotificationBase): def __init__(self, db): """