"""
notifications = []
for subscriber in subscribers:
+ # Notify without auth
+ if not subscriber.get("authentication"):
+ notifications.append({
+ "headers": self.payload_header,
+ "payload": self.get_payload(subscriber),
+ "CallbackUri": subscriber["CallbackUri"]
+ })
+ elif subscriber["authentication"]["authType"] == "basic":
+ salt = subscriber["subscriptionId"]
+ hashed_password = subscriber["authentication"]["paramsBasic"]["password"]
+ password = self._decrypt_password(hashed_password, salt)
+ auth_basic = self._get_basic_auth(subscriber["authentication"]["paramsBasic"]["userName"], password)
+ notifications.append({
+ "headers": self.payload_header,
+ "payload": self.get_payload(subscriber),
+ "auth_basic": auth_basic,
+ "CallbackUri": subscriber["CallbackUri"]
+ })
# TODO add support for AuthType OAuth and TLS after support is added in subscription.
- if subscriber["authentication"]["authType"] != "basic":
- self.logger.debug("Subscriber {} can not be notified {} notification auth type is not implemented"
+ else:
+ self.logger.debug("Subscriber {} can not be notified. {} notification auth type is not implemented"
.format(subscriber["subscriptionId"],
subscriber["authentication"]["authType"]))
- continue
- salt = subscriber["subscriptionId"]
- hashed_password = subscriber["authentication"]["paramsBasic"]["password"]
- password = self._decrypt_password(hashed_password, salt)
- auth_basic = self._get_basic_auth(subscriber["authentication"]["paramsBasic"]["userName"], password)
- notifications.append({
- "headers": self.payload_header,
- "payload": self.get_payload(subscriber),
- "auth_basic": auth_basic,
- "CallbackUri": subscriber["CallbackUri"]
- })
+
tasks = []
async with aiohttp.ClientSession(loop=loop) as session:
for notification in notifications:
- tasks.append(asyncio.ensure_future(self.send_notification(session, notification), loop=loop))
+ tasks.append(asyncio.ensure_future(self.send_notification(session, notification, loop=loop), loop=loop))
await asyncio.gather(*tasks, loop=loop)
async def send_notification(self, session: aiohttp.ClientSession, notification: dict,
while retry_count > 0:
try:
async with session.post(url=notification["CallbackUri"], headers=notification["headers"],
- auth=notification["auth_basic"], data=json.dumps(notification["payload"]),
+ auth=notification.get("auth_basic", None),
+ data=json.dumps(notification["payload"]),
timeout=timeout) as resp:
# self.logger.debug("Notification response: {}".format(resp.status))
if resp.status == HTTPStatus.NO_CONTENT:
return True
except Exception as e:
error_text = type(e).__name__ + ": " + str(e)
- self.logger.debug("Unable while sending notification to subscriber {}. Details: {}"
+ self.logger.debug("Unable to send notification to subscriber {}. Details: {}"
.format(notification["payload"]["subscriptionId"], error_text))
error_detail = {
"error": type(e).__name__,
def __init__(self, db) -> None:
"""
- Constructor of NotificationBase class.
+ Constructor of NsLcmNotification class.
:param db: Database handler.
"""
super().__init__(db)