projects
/
osm
/
NBI.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Bug 1830 fixed: maps completed operations to original operation types
[osm/NBI.git]
/
osm_nbi
/
notifications.py
diff --git
a/osm_nbi/notifications.py
b/osm_nbi/notifications.py
index
a62670b
..
22413d0
100644
(file)
--- a/
osm_nbi/notifications.py
+++ b/
osm_nbi/notifications.py
@@
-108,12
+108,12
@@
class NotificationBase:
return payload
async def send_notifications(
return payload
async def send_notifications(
- self, subscribers: list, loop: asyncio.AbstractEventLoop = None
+ self,
+ subscribers: list,
):
"""
Generate tasks for all notification for an event.
:param subscribers: A list of subscribers who want to be notified for event.
):
"""
Generate tasks for all notification for an event.
:param subscribers: A list of subscribers who want to be notified for event.
- :param loop: Event loop object.
"""
notifications = []
for subscriber in subscribers:
"""
notifications = []
for subscriber in subscribers:
@@
-154,21
+154,19
@@
class NotificationBase:
if notifications:
tasks = []
if notifications:
tasks = []
- async with aiohttp.ClientSession(
loop=loop
) as session:
+ async with aiohttp.ClientSession() as session:
for notification in notifications:
tasks.append(
asyncio.ensure_future(
for notification in notifications:
tasks.append(
asyncio.ensure_future(
- self.send_notification(session, notification, loop=loop),
- loop=loop,
+ self.send_notification(session, notification),
)
)
)
)
- await asyncio.gather(*tasks
, loop=loop
)
+ await asyncio.gather(*tasks)
async def send_notification(
self,
session: aiohttp.ClientSession,
notification: dict,
async def send_notification(
self,
session: aiohttp.ClientSession,
notification: dict,
- loop: asyncio.AbstractEventLoop = None,
retry_count: int = 5,
timeout: float = 5.0,
):
retry_count: int = 5,
timeout: float = 5.0,
):
@@
-177,7
+175,6
@@
class NotificationBase:
after maximum number of reties, then notification is dropped.
:param session: An aiohttp client session object to maintain http session.
:param notification: A dictionary containing all necessary data to make POST request.
after maximum number of reties, then notification is dropped.
:param session: An aiohttp client session object to maintain http session.
:param notification: A dictionary containing all necessary data to make POST request.
- :param loop: Event loop object.
:param retry_count: An integer specifying the maximum number of reties for a notification.
:param timeout: A float representing client timeout of each HTTP request.
"""
:param retry_count: An integer specifying the maximum number of reties for a notification.
:param timeout: A float representing client timeout of each HTTP request.
"""
@@
-226,7
+223,7
@@
class NotificationBase:
notification["payload"]["subscriptionId"], backoff_delay
)
)
notification["payload"]["subscriptionId"], backoff_delay
)
)
- await asyncio.sleep(backoff_delay
, loop=loop
)
+ await asyncio.sleep(backoff_delay)
# Dropping notification
self.logger.debug(
"Notification {} sent failed to subscriber:{}.".format(
# Dropping notification
self.logger.debug(
"Notification {} sent failed to subscriber:{}.".format(
@@
-238,6
+235,14
@@
class NotificationBase:
class NsLcmNotification(NotificationBase):
class NsLcmNotification(NotificationBase):
+ # maps kafka commands of completed operations to the original operation type
+ completed_operation_map = {
+ "INSTANTIATED": "INSTANTIATE",
+ "SCALED": "SCALE",
+ "TERMINATED": "TERMINATE",
+ "UPDATED": "UPDATE",
+ "HEALED": "HEAL",
+ }
# SOL005 response model for nslcm notifications
response_models = {
"NsLcmOperationOccurrenceNotification": {
# SOL005 response model for nslcm notifications
response_models = {
"NsLcmOperationOccurrenceNotification": {
@@
-361,7
+366,8
@@
class NsLcmNotification(NotificationBase):
if op_state:
filter_q["operationStates"].append(op_state)
if command:
if op_state:
filter_q["operationStates"].append(op_state)
if command:
- filter_q["operationTypes"].append(command)
+ op_type = self.completed_operation_map.get(command, command)
+ filter_q["operationTypes"].append(op_type)
# self.logger.debug("Db query is: {}".format(filter_q))
subscribers = []
try:
# self.logger.debug("Db query is: {}".format(filter_q))
subscribers = []
try: