class NotificationBase:
-
response_models = None
# Common HTTP payload header for all notifications.
payload_header = {"Content-Type": "application/json", "Accept": "application/json"}
return payload
async def send_notifications(
- self, subscribers: list, loop: asyncio.AbstractEventLoop = None
+ self,
+ subscribers: list,
):
"""
Generate tasks for all notification for an event.
:param subscribers: A list of subscribers who want to be notified for event.
- :param loop: Event loop object.
"""
notifications = []
for subscriber in subscribers:
if notifications:
tasks = []
- async with aiohttp.ClientSession(loop=loop) as session:
+ async with aiohttp.ClientSession() as session:
for notification in notifications:
tasks.append(
asyncio.ensure_future(
- self.send_notification(session, notification, loop=loop),
- loop=loop,
+ self.send_notification(session, notification),
)
)
- await asyncio.gather(*tasks, loop=loop)
+ await asyncio.gather(*tasks)
async def send_notification(
self,
session: aiohttp.ClientSession,
notification: dict,
- loop: asyncio.AbstractEventLoop = None,
retry_count: int = 5,
timeout: float = 5.0,
):
after maximum number of reties, then notification is dropped.
:param session: An aiohttp client session object to maintain http session.
:param notification: A dictionary containing all necessary data to make POST request.
- :param loop: Event loop object.
:param retry_count: An integer specifying the maximum number of reties for a notification.
:param timeout: A float representing client timeout of each HTTP request.
"""
notification["payload"]["subscriptionId"], backoff_delay
)
)
- await asyncio.sleep(backoff_delay, loop=loop)
+ await asyncio.sleep(backoff_delay)
# Dropping notification
self.logger.debug(
"Notification {} sent failed to subscriber:{}.".format(
class NsLcmNotification(NotificationBase):
-
+ # maps kafka commands of completed operations to the original operation type
+ completed_operation_map = {
+ "INSTANTIATED": "INSTANTIATE",
+ "SCALED": "SCALE",
+ "TERMINATED": "TERMINATE",
+ "UPDATED": "UPDATE",
+ "HEALED": "HEAL",
+ }
# SOL005 response model for nslcm notifications
response_models = {
"NsLcmOperationOccurrenceNotification": {
:param event_details: dict containing raw data of event occured.
:return: List of interested subscribers for occurred event.
"""
+ notification_type = [
+ "NsLcmOperationOccurrenceNotification",
+ "NsChangeNotification",
+ "NsIdentifierCreationNotification",
+ "NsIdentifierDeletionNotification",
+ ]
filter_q = {
"identifier": [nsd_id, ns_instance_id],
"operationStates": ["ANY"],
"operationTypes": ["ANY"],
+ "notificationType": notification_type,
}
if op_state:
filter_q["operationStates"].append(op_state)
if command:
- filter_q["operationTypes"].append(command)
+ op_type = self.completed_operation_map.get(command, command)
+ filter_q["operationTypes"].append(op_type)
# self.logger.debug("Db query is: {}".format(filter_q))
subscribers = []
try:
return subscribers
+class VnfLcmNotification(NotificationBase):
+ # SOL003 response model for vnflcm notifications
+ response_models = {
+ "VnfLcmOperationOccurrenceNotification": {
+ "id",
+ "notificationType",
+ "subscriptionId",
+ "timeStamp",
+ "notificationStatus",
+ "operationState",
+ "vnfInstanceId",
+ "operation",
+ "isAutomaticInvocation",
+ "vnfLcmOpOccId",
+ "affectedVnfcs",
+ "affectedVirtualLinks",
+ "affectedExtLinkPorts",
+ "affectedVirtualStorages",
+ "changedInfo",
+ "changedExtConnectivity",
+ "modificationsTriggeredByVnfPkgChange",
+ "error",
+ "_links",
+ },
+ "VnfIdentifierCreationNotification": {
+ "id",
+ "notificationType",
+ "subscriptionId",
+ "timeStamp",
+ "vnfInstanceId",
+ "_links",
+ },
+ "VnfIdentifierDeletionNotification": {
+ "id",
+ "notificationType",
+ "subscriptionId",
+ "timeStamp",
+ "vnfInstanceId",
+ "_links",
+ },
+ }
+
+ def __init__(self, db) -> None:
+ """
+ Constructor of VnfLcmNotification class.
+ :param db: Database handler.
+ """
+ super().__init__(db)
+ self.subscriber_collection = "mapped_subscriptions"
+
+ def get_models(self) -> dict:
+ """
+ Returns the SOL003 model of notification class
+ :param None
+ :return: dict of SOL003 data model
+ """
+ return self.response_models
+
+ def _format_vnflcm_subscribers(
+ self, subscribers: list, event_details: dict
+ ) -> list:
+ """
+ Formats the raw event details from kafka message and subscriber details.
+ :param subscribers: A list of subscribers whom the event needs to be notified.
+ :param event_details: A dict containing all meta data of event.
+ :return:
+ """
+ notification_id = str(uuid4())
+ event_timestamp = time.time()
+ event_operation = event_details["command"]
+ for subscriber in subscribers:
+ subscriber["id"] = notification_id
+ subscriber["timeStamp"] = event_timestamp
+ subscriber["subscriptionId"] = subscriber["reference"]
+ subscriber["operation"] = event_operation
+ del subscriber["reference"]
+ del subscriber["_id"]
+ subscriber.update(event_details["params"])
+ return subscribers
+
+ def get_subscribers(
+ self,
+ vnfd_id: str,
+ vnf_instance_id: str,
+ command: str,
+ op_state: str,
+ event_details: dict,
+ ) -> list:
+ """
+ Queries database and returns list of subscribers.
+ :param vnfd_id: Vnfd id of a VNF whose lifecycle has changed. (instantiated, scaled, terminated. etc)
+ :param vnf_instance_id: Vnf instance id of a VNF whose lifecycle has changed.
+ :param command: the command for event.
+ :param op_state: the operation state of VNF.
+ :param event_details: dict containing raw data of event occurred.
+ :return: List of interested subscribers for occurred event.
+ """
+ notification_type = [
+ "VnfIdentifierCreationNotification",
+ "VnfLcmOperationOccurrenceNotification",
+ "VnfIdentifierDeletionNotification",
+ ]
+ filter_q = {
+ "identifier": [vnfd_id, vnf_instance_id],
+ "operationStates": ["ANY"],
+ "operationTypes": ["ANY"],
+ "notificationType": notification_type,
+ }
+ if op_state:
+ filter_q["operationStates"].append(op_state)
+ if command:
+ filter_q["operationTypes"].append(command)
+ subscribers = []
+ try:
+ subscribers = self.db.get_list(self.subscriber_collection, filter_q)
+ subscribers = self._format_vnflcm_subscribers(subscribers, event_details)
+ except Exception as e:
+ error_text = type(e).__name__ + ": " + str(e)
+ self.logger.debug("Error getting vnflcm subscribers: {}".format(error_text))
+ finally:
+ return subscribers
+
+
class NsdNotification(NotificationBase):
def __init__(self, db):
"""