class NotificationBase:
-
response_models = None
# Common HTTP payload header for all notifications.
payload_header = {"Content-Type": "application/json", "Accept": "application/json"}
return payload
async def send_notifications(
- self, subscribers: list, loop: asyncio.AbstractEventLoop = None
+ self,
+ subscribers: list,
):
"""
Generate tasks for all notification for an event.
:param subscribers: A list of subscribers who want to be notified for event.
- :param loop: Event loop object.
"""
notifications = []
for subscriber in subscribers:
if notifications:
tasks = []
- async with aiohttp.ClientSession(loop=loop) as session:
+ async with aiohttp.ClientSession() as session:
for notification in notifications:
tasks.append(
asyncio.ensure_future(
- self.send_notification(session, notification, loop=loop),
- loop=loop,
+ self.send_notification(session, notification),
)
)
- await asyncio.gather(*tasks, loop=loop)
+ await asyncio.gather(*tasks)
async def send_notification(
self,
session: aiohttp.ClientSession,
notification: dict,
- loop: asyncio.AbstractEventLoop = None,
retry_count: int = 5,
timeout: float = 5.0,
):
after maximum number of reties, then notification is dropped.
:param session: An aiohttp client session object to maintain http session.
:param notification: A dictionary containing all necessary data to make POST request.
- :param loop: Event loop object.
:param retry_count: An integer specifying the maximum number of reties for a notification.
:param timeout: A float representing client timeout of each HTTP request.
"""
notification["payload"]["subscriptionId"], backoff_delay
)
)
- await asyncio.sleep(backoff_delay, loop=loop)
+ await asyncio.sleep(backoff_delay)
# Dropping notification
self.logger.debug(
"Notification {} sent failed to subscriber:{}.".format(
class NsLcmNotification(NotificationBase):
-
# SOL005 response model for nslcm notifications
response_models = {
"NsLcmOperationOccurrenceNotification": {