X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FNBI.git;a=blobdiff_plain;f=osm_nbi%2Fnotifications.py;h=22413d0b4f97a5481e2dd415242bddf7770a4e78;hp=47a24bab5461dfdfa4fb869cd8064dfce6acccd9;hb=HEAD;hpb=f100459221b95cdaaa543793623e556a9abd4852 diff --git a/osm_nbi/notifications.py b/osm_nbi/notifications.py index 47a24ba..22413d0 100644 --- a/osm_nbi/notifications.py +++ b/osm_nbi/notifications.py @@ -41,7 +41,6 @@ class NotificationException(Exception): class NotificationBase: - response_models = None # Common HTTP payload header for all notifications. payload_header = {"Content-Type": "application/json", "Accept": "application/json"} @@ -109,12 +108,12 @@ class NotificationBase: return payload async def send_notifications( - self, subscribers: list, loop: asyncio.AbstractEventLoop = None + self, + subscribers: list, ): """ Generate tasks for all notification for an event. :param subscribers: A list of subscribers who want to be notified for event. - :param loop: Event loop object. """ notifications = [] for subscriber in subscribers: @@ -155,21 +154,19 @@ class NotificationBase: if notifications: tasks = [] - async with aiohttp.ClientSession(loop=loop) as session: + async with aiohttp.ClientSession() as session: for notification in notifications: tasks.append( asyncio.ensure_future( - self.send_notification(session, notification, loop=loop), - loop=loop, + self.send_notification(session, notification), ) ) - await asyncio.gather(*tasks, loop=loop) + await asyncio.gather(*tasks) async def send_notification( self, session: aiohttp.ClientSession, notification: dict, - loop: asyncio.AbstractEventLoop = None, retry_count: int = 5, timeout: float = 5.0, ): @@ -178,7 +175,6 @@ class NotificationBase: after maximum number of reties, then notification is dropped. :param session: An aiohttp client session object to maintain http session. :param notification: A dictionary containing all necessary data to make POST request. - :param loop: Event loop object. :param retry_count: An integer specifying the maximum number of reties for a notification. :param timeout: A float representing client timeout of each HTTP request. """ @@ -227,7 +223,7 @@ class NotificationBase: notification["payload"]["subscriptionId"], backoff_delay ) ) - await asyncio.sleep(backoff_delay, loop=loop) + await asyncio.sleep(backoff_delay) # Dropping notification self.logger.debug( "Notification {} sent failed to subscriber:{}.".format( @@ -239,7 +235,14 @@ class NotificationBase: class NsLcmNotification(NotificationBase): - + # maps kafka commands of completed operations to the original operation type + completed_operation_map = { + "INSTANTIATED": "INSTANTIATE", + "SCALED": "SCALE", + "TERMINATED": "TERMINATE", + "UPDATED": "UPDATE", + "HEALED": "HEAL", + } # SOL005 response model for nslcm notifications response_models = { "NsLcmOperationOccurrenceNotification": { @@ -352,18 +355,19 @@ class NsLcmNotification(NotificationBase): "NsLcmOperationOccurrenceNotification", "NsChangeNotification", "NsIdentifierCreationNotification", - "NsIdentifierDeletionNotification" + "NsIdentifierDeletionNotification", ] filter_q = { "identifier": [nsd_id, ns_instance_id], "operationStates": ["ANY"], "operationTypes": ["ANY"], - "notificationType": notification_type - } + "notificationType": notification_type, + } if op_state: filter_q["operationStates"].append(op_state) if command: - filter_q["operationTypes"].append(command) + op_type = self.completed_operation_map.get(command, command) + filter_q["operationTypes"].append(op_type) # self.logger.debug("Db query is: {}".format(filter_q)) subscribers = [] try: @@ -398,7 +402,7 @@ class VnfLcmNotification(NotificationBase): "changedExtConnectivity", "modificationsTriggeredByVnfPkgChange", "error", - "_links" + "_links", }, "VnfIdentifierCreationNotification": { "id", @@ -406,7 +410,7 @@ class VnfLcmNotification(NotificationBase): "subscriptionId", "timeStamp", "vnfInstanceId", - "_links" + "_links", }, "VnfIdentifierDeletionNotification": { "id", @@ -414,7 +418,7 @@ class VnfLcmNotification(NotificationBase): "subscriptionId", "timeStamp", "vnfInstanceId", - "_links" + "_links", }, } @@ -434,7 +438,9 @@ class VnfLcmNotification(NotificationBase): """ return self.response_models - def _format_vnflcm_subscribers(self, subscribers: list, event_details: dict) -> list: + def _format_vnflcm_subscribers( + self, subscribers: list, event_details: dict + ) -> list: """ Formats the raw event details from kafka message and subscriber details. :param subscribers: A list of subscribers whom the event needs to be notified. @@ -454,8 +460,14 @@ class VnfLcmNotification(NotificationBase): subscriber.update(event_details["params"]) return subscribers - def get_subscribers(self, vnfd_id: str, vnf_instance_id: str, command: str, op_state: str, - event_details: dict) -> list: + def get_subscribers( + self, + vnfd_id: str, + vnf_instance_id: str, + command: str, + op_state: str, + event_details: dict, + ) -> list: """ Queries database and returns list of subscribers. :param vnfd_id: Vnfd id of a VNF whose lifecycle has changed. (instantiated, scaled, terminated. etc) @@ -468,13 +480,13 @@ class VnfLcmNotification(NotificationBase): notification_type = [ "VnfIdentifierCreationNotification", "VnfLcmOperationOccurrenceNotification", - "VnfIdentifierDeletionNotification" + "VnfIdentifierDeletionNotification", ] filter_q = { "identifier": [vnfd_id, vnf_instance_id], "operationStates": ["ANY"], "operationTypes": ["ANY"], - "notificationType": notification_type + "notificationType": notification_type, } if op_state: filter_q["operationStates"].append(op_state)