class NotificationBase:
-
response_models = None
# Common HTTP payload header for all notifications.
payload_header = {"Content-Type": "application/json", "Accept": "application/json"}
return payload
async def send_notifications(
- self, subscribers: list, loop: asyncio.AbstractEventLoop = None
+ self,
+ subscribers: list,
):
"""
Generate tasks for all notification for an event.
:param subscribers: A list of subscribers who want to be notified for event.
- :param loop: Event loop object.
"""
notifications = []
for subscriber in subscribers:
if notifications:
tasks = []
- async with aiohttp.ClientSession(loop=loop) as session:
+ async with aiohttp.ClientSession() as session:
for notification in notifications:
tasks.append(
asyncio.ensure_future(
- self.send_notification(session, notification, loop=loop),
- loop=loop,
+ self.send_notification(session, notification),
)
)
- await asyncio.gather(*tasks, loop=loop)
+ await asyncio.gather(*tasks)
async def send_notification(
self,
session: aiohttp.ClientSession,
notification: dict,
- loop: asyncio.AbstractEventLoop = None,
retry_count: int = 5,
timeout: float = 5.0,
):
after maximum number of reties, then notification is dropped.
:param session: An aiohttp client session object to maintain http session.
:param notification: A dictionary containing all necessary data to make POST request.
- :param loop: Event loop object.
:param retry_count: An integer specifying the maximum number of reties for a notification.
:param timeout: A float representing client timeout of each HTTP request.
"""
notification["payload"]["subscriptionId"], backoff_delay
)
)
- await asyncio.sleep(backoff_delay, loop=loop)
+ await asyncio.sleep(backoff_delay)
# Dropping notification
self.logger.debug(
"Notification {} sent failed to subscriber:{}.".format(
class NsLcmNotification(NotificationBase):
-
# SOL005 response model for nslcm notifications
response_models = {
"NsLcmOperationOccurrenceNotification": {
"NsLcmOperationOccurrenceNotification",
"NsChangeNotification",
"NsIdentifierCreationNotification",
- "NsIdentifierDeletionNotification"
+ "NsIdentifierDeletionNotification",
]
filter_q = {
"identifier": [nsd_id, ns_instance_id],
"operationStates": ["ANY"],
"operationTypes": ["ANY"],
- "notificationType": notification_type
- }
+ "notificationType": notification_type,
+ }
if op_state:
filter_q["operationStates"].append(op_state)
if command:
"changedExtConnectivity",
"modificationsTriggeredByVnfPkgChange",
"error",
- "_links"
+ "_links",
},
"VnfIdentifierCreationNotification": {
"id",
"subscriptionId",
"timeStamp",
"vnfInstanceId",
- "_links"
+ "_links",
},
"VnfIdentifierDeletionNotification": {
"id",
"subscriptionId",
"timeStamp",
"vnfInstanceId",
- "_links"
+ "_links",
},
}
"""
return self.response_models
- def _format_vnflcm_subscribers(self, subscribers: list, event_details: dict) -> list:
+ def _format_vnflcm_subscribers(
+ self, subscribers: list, event_details: dict
+ ) -> list:
"""
Formats the raw event details from kafka message and subscriber details.
:param subscribers: A list of subscribers whom the event needs to be notified.
subscriber.update(event_details["params"])
return subscribers
- def get_subscribers(self, vnfd_id: str, vnf_instance_id: str, command: str, op_state: str,
- event_details: dict) -> list:
+ def get_subscribers(
+ self,
+ vnfd_id: str,
+ vnf_instance_id: str,
+ command: str,
+ op_state: str,
+ event_details: dict,
+ ) -> list:
"""
Queries database and returns list of subscribers.
:param vnfd_id: Vnfd id of a VNF whose lifecycle has changed. (instantiated, scaled, terminated. etc)
notification_type = [
"VnfIdentifierCreationNotification",
"VnfLcmOperationOccurrenceNotification",
- "VnfIdentifierDeletionNotification"
+ "VnfIdentifierDeletionNotification",
]
filter_q = {
"identifier": [vnfd_id, vnf_instance_id],
"operationStates": ["ANY"],
"operationTypes": ["ANY"],
- "notificationType": notification_type
+ "notificationType": notification_type,
}
if op_state:
filter_q["operationStates"].append(op_state)