projects
/
osm
/
NBI.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Update from master
[osm/NBI.git]
/
osm_nbi
/
notifications.py
diff --git
a/osm_nbi/notifications.py
b/osm_nbi/notifications.py
index
47a24ba
..
63d4ce8
100644
(file)
--- a/
osm_nbi/notifications.py
+++ b/
osm_nbi/notifications.py
@@
-41,7
+41,6
@@
class NotificationException(Exception):
class NotificationBase:
class NotificationBase:
-
response_models = None
# Common HTTP payload header for all notifications.
payload_header = {"Content-Type": "application/json", "Accept": "application/json"}
response_models = None
# Common HTTP payload header for all notifications.
payload_header = {"Content-Type": "application/json", "Accept": "application/json"}
@@
-109,12
+108,12
@@
class NotificationBase:
return payload
async def send_notifications(
return payload
async def send_notifications(
- self, subscribers: list, loop: asyncio.AbstractEventLoop = None
+ self,
+ subscribers: list,
):
"""
Generate tasks for all notification for an event.
:param subscribers: A list of subscribers who want to be notified for event.
):
"""
Generate tasks for all notification for an event.
:param subscribers: A list of subscribers who want to be notified for event.
- :param loop: Event loop object.
"""
notifications = []
for subscriber in subscribers:
"""
notifications = []
for subscriber in subscribers:
@@
-155,21
+154,19
@@
class NotificationBase:
if notifications:
tasks = []
if notifications:
tasks = []
- async with aiohttp.ClientSession(
loop=loop
) as session:
+ async with aiohttp.ClientSession() as session:
for notification in notifications:
tasks.append(
asyncio.ensure_future(
for notification in notifications:
tasks.append(
asyncio.ensure_future(
- self.send_notification(session, notification, loop=loop),
- loop=loop,
+ self.send_notification(session, notification),
)
)
)
)
- await asyncio.gather(*tasks
, loop=loop
)
+ await asyncio.gather(*tasks)
async def send_notification(
self,
session: aiohttp.ClientSession,
notification: dict,
async def send_notification(
self,
session: aiohttp.ClientSession,
notification: dict,
- loop: asyncio.AbstractEventLoop = None,
retry_count: int = 5,
timeout: float = 5.0,
):
retry_count: int = 5,
timeout: float = 5.0,
):
@@
-178,7
+175,6
@@
class NotificationBase:
after maximum number of reties, then notification is dropped.
:param session: An aiohttp client session object to maintain http session.
:param notification: A dictionary containing all necessary data to make POST request.
after maximum number of reties, then notification is dropped.
:param session: An aiohttp client session object to maintain http session.
:param notification: A dictionary containing all necessary data to make POST request.
- :param loop: Event loop object.
:param retry_count: An integer specifying the maximum number of reties for a notification.
:param timeout: A float representing client timeout of each HTTP request.
"""
:param retry_count: An integer specifying the maximum number of reties for a notification.
:param timeout: A float representing client timeout of each HTTP request.
"""
@@
-227,7
+223,7
@@
class NotificationBase:
notification["payload"]["subscriptionId"], backoff_delay
)
)
notification["payload"]["subscriptionId"], backoff_delay
)
)
- await asyncio.sleep(backoff_delay
, loop=loop
)
+ await asyncio.sleep(backoff_delay)
# Dropping notification
self.logger.debug(
"Notification {} sent failed to subscriber:{}.".format(
# Dropping notification
self.logger.debug(
"Notification {} sent failed to subscriber:{}.".format(
@@
-239,7
+235,6
@@
class NotificationBase:
class NsLcmNotification(NotificationBase):
class NsLcmNotification(NotificationBase):
-
# SOL005 response model for nslcm notifications
response_models = {
"NsLcmOperationOccurrenceNotification": {
# SOL005 response model for nslcm notifications
response_models = {
"NsLcmOperationOccurrenceNotification": {
@@
-352,14
+347,14
@@
class NsLcmNotification(NotificationBase):
"NsLcmOperationOccurrenceNotification",
"NsChangeNotification",
"NsIdentifierCreationNotification",
"NsLcmOperationOccurrenceNotification",
"NsChangeNotification",
"NsIdentifierCreationNotification",
- "NsIdentifierDeletionNotification"
+ "NsIdentifierDeletionNotification"
,
]
filter_q = {
"identifier": [nsd_id, ns_instance_id],
"operationStates": ["ANY"],
"operationTypes": ["ANY"],
]
filter_q = {
"identifier": [nsd_id, ns_instance_id],
"operationStates": ["ANY"],
"operationTypes": ["ANY"],
- "notificationType": notification_type
-
}
+ "notificationType": notification_type
,
+ }
if op_state:
filter_q["operationStates"].append(op_state)
if command:
if op_state:
filter_q["operationStates"].append(op_state)
if command:
@@
-398,7
+393,7
@@
class VnfLcmNotification(NotificationBase):
"changedExtConnectivity",
"modificationsTriggeredByVnfPkgChange",
"error",
"changedExtConnectivity",
"modificationsTriggeredByVnfPkgChange",
"error",
- "_links"
+ "_links"
,
},
"VnfIdentifierCreationNotification": {
"id",
},
"VnfIdentifierCreationNotification": {
"id",
@@
-406,7
+401,7
@@
class VnfLcmNotification(NotificationBase):
"subscriptionId",
"timeStamp",
"vnfInstanceId",
"subscriptionId",
"timeStamp",
"vnfInstanceId",
- "_links"
+ "_links"
,
},
"VnfIdentifierDeletionNotification": {
"id",
},
"VnfIdentifierDeletionNotification": {
"id",
@@
-414,7
+409,7
@@
class VnfLcmNotification(NotificationBase):
"subscriptionId",
"timeStamp",
"vnfInstanceId",
"subscriptionId",
"timeStamp",
"vnfInstanceId",
- "_links"
+ "_links"
,
},
}
},
}
@@
-434,7
+429,9
@@
class VnfLcmNotification(NotificationBase):
"""
return self.response_models
"""
return self.response_models
- def _format_vnflcm_subscribers(self, subscribers: list, event_details: dict) -> list:
+ def _format_vnflcm_subscribers(
+ self, subscribers: list, event_details: dict
+ ) -> list:
"""
Formats the raw event details from kafka message and subscriber details.
:param subscribers: A list of subscribers whom the event needs to be notified.
"""
Formats the raw event details from kafka message and subscriber details.
:param subscribers: A list of subscribers whom the event needs to be notified.
@@
-454,8
+451,14
@@
class VnfLcmNotification(NotificationBase):
subscriber.update(event_details["params"])
return subscribers
subscriber.update(event_details["params"])
return subscribers
- def get_subscribers(self, vnfd_id: str, vnf_instance_id: str, command: str, op_state: str,
- event_details: dict) -> list:
+ def get_subscribers(
+ self,
+ vnfd_id: str,
+ vnf_instance_id: str,
+ command: str,
+ op_state: str,
+ event_details: dict,
+ ) -> list:
"""
Queries database and returns list of subscribers.
:param vnfd_id: Vnfd id of a VNF whose lifecycle has changed. (instantiated, scaled, terminated. etc)
"""
Queries database and returns list of subscribers.
:param vnfd_id: Vnfd id of a VNF whose lifecycle has changed. (instantiated, scaled, terminated. etc)
@@
-468,13
+471,13
@@
class VnfLcmNotification(NotificationBase):
notification_type = [
"VnfIdentifierCreationNotification",
"VnfLcmOperationOccurrenceNotification",
notification_type = [
"VnfIdentifierCreationNotification",
"VnfLcmOperationOccurrenceNotification",
- "VnfIdentifierDeletionNotification"
+ "VnfIdentifierDeletionNotification"
,
]
filter_q = {
"identifier": [vnfd_id, vnf_instance_id],
"operationStates": ["ANY"],
"operationTypes": ["ANY"],
]
filter_q = {
"identifier": [vnfd_id, vnf_instance_id],
"operationStates": ["ANY"],
"operationTypes": ["ANY"],
- "notificationType": notification_type
+ "notificationType": notification_type
,
}
if op_state:
filter_q["operationStates"].append(op_state)
}
if op_state:
filter_q["operationStates"].append(op_state)