1 # Copyright 2020 K Sai Kiran (Tata Elxsi)
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 __author__
= "K Sai Kiran <saikiran.k@tataelxsi.co.in>"
17 __date__
= "$28-Apr-2020 23:59:59$"
21 from http
import HTTPStatus
25 from uuid
import uuid4
28 class NotificationException(Exception):
30 Notification Exception
33 def __init__(self
, message
: str, http_code
: int = HTTPStatus
.BAD_REQUEST
) -> None:
35 Constructor of notification exception
36 :param message: String text containing exception details.
37 :param http_code: HTTP status code of exception.
39 self
.http_code
= http_code
40 Exception.__init
__(self
, message
)
43 class NotificationBase
:
45 response_models
= None
46 # Common HTTP payload header for all notifications.
48 "Content-Type": "application/json",
49 "Accept": "application/json"
52 def __init__(self
, db
) -> None:
54 Constructor of NotificationBase class.
55 :param db: Database handler.
58 self
.logger
= logging
.getLogger("nbi.notifications")
59 self
.subscriber_collection
= None
61 def get_models(self
) -> dict:
63 Returns the SOL005 model of notification class
65 :return: dict of SOL005 data model
67 return NotificationBase
.response_models
69 def get_subscribers(self
, **kwargs
) -> NotificationException
:
71 Method should be implemented by all notification subclasses
72 :param kwargs: any keyword arguments needed for db query.
73 :return: List of subscribers
75 raise NotificationException("Method get_subscribers() is not implemented", http_code
=HTTPStatus
.NOT_IMPLEMENTED
)
78 def _get_basic_auth(username
: str, password
: str) -> tuple:
79 return aiohttp
.BasicAuth(username
, password
)
81 def _decrypt_password(self
, hashed
: str, salt
: str, schema_version
: str = "1.1") -> str:
82 return self
.db
.decrypt(hashed
, schema_version
, salt
=salt
)
84 def get_payload(self
, meta_notification
: dict) -> dict:
86 Generates SOL005 compliant payload structure and returns them in dictionary.
87 :param meta_notification: notification meta data which needs to be formatted as SOL005 compliant
88 :return: A dictionary which is SOL005 compliant.
90 model_name
= meta_notification
["notificationType"]
91 response_models
= self
.get_models()
92 if not response_models
or not response_models
.get(model_name
):
93 raise NotificationException("Response model {} is not defined.".format(model_name
),
94 HTTPStatus
.NOT_IMPLEMENTED
)
95 model_keys
= response_models
[model_name
]
96 payload
= dict.fromkeys(model_keys
, "N/A")
97 notification_keys
= set(meta_notification
.keys())
98 for model_key
in model_keys
.intersection(notification_keys
):
99 payload
[model_key
] = meta_notification
[model_key
]
100 self
.logger
.debug("Payload generated for subscriber: {} for {}".format(payload
["subscriptionId"],
101 payload
["notificationType"]))
104 async def send_notifications(self
, subscribers
: list, loop
: asyncio
.AbstractEventLoop
= None):
106 Generate tasks for all notification for an event.
107 :param subscribers: A list of subscribers who want to be notified for event.
108 :param loop: Event loop object.
111 for subscriber
in subscribers
:
112 # Notify without auth
113 if not subscriber
.get("authentication"):
114 notifications
.append({
115 "headers": self
.payload_header
,
116 "payload": self
.get_payload(subscriber
),
117 "CallbackUri": subscriber
["CallbackUri"]
119 elif subscriber
["authentication"]["authType"] == "basic":
120 salt
= subscriber
["subscriptionId"]
121 hashed_password
= subscriber
["authentication"]["paramsBasic"]["password"]
122 password
= self
._decrypt
_password
(hashed_password
, salt
)
123 auth_basic
= self
._get
_basic
_auth
(subscriber
["authentication"]["paramsBasic"]["userName"], password
)
124 notifications
.append({
125 "headers": self
.payload_header
,
126 "payload": self
.get_payload(subscriber
),
127 "auth_basic": auth_basic
,
128 "CallbackUri": subscriber
["CallbackUri"]
130 # TODO add support for AuthType OAuth and TLS after support is added in subscription.
132 self
.logger
.debug("Subscriber {} can not be notified. {} notification auth type is not implemented"
133 .format(subscriber
["subscriptionId"],
134 subscriber
["authentication"]["authType"]))
138 async with aiohttp
.ClientSession(loop
=loop
) as session
:
139 for notification
in notifications
:
140 tasks
.append(asyncio
.ensure_future(self
.send_notification(session
, notification
, loop
=loop
),
142 await asyncio
.gather(*tasks
, loop
=loop
)
144 async def send_notification(self
, session
: aiohttp
.ClientSession
, notification
: dict,
145 loop
: asyncio
.AbstractEventLoop
= None, retry_count
: int = 5, timeout
: float = 5.0):
147 Performs HTTP Post request to notify subscriber. In case if for any reason notification is not sent successfully
148 after maximum number of reties, then notification is dropped.
149 :param session: An aiohttp client session object to maintain http session.
150 :param notification: A dictionary containing all necessary data to make POST request.
151 :param loop: Event loop object.
152 :param retry_count: An integer specifying the maximum number of reties for a notification.
153 :param timeout: A float representing client timeout of each HTTP request.
156 while retry_count
> 0:
158 async with session
.post(url
=notification
["CallbackUri"], headers
=notification
["headers"],
159 auth
=notification
.get("auth_basic", None),
160 data
=json
.dumps(notification
["payload"]),
161 timeout
=timeout
) as resp
:
162 # self.logger.debug("Notification response: {}".format(resp.status))
163 if resp
.status
== HTTPStatus
.NO_CONTENT
:
164 self
.logger
.debug("Notification sent successfully to subscriber {}"
165 .format(notification
["payload"]["subscriptionId"]))
167 error_text
= "Erroneous response code: {}, ".format(resp
.status
)
168 error_text
+= await resp
.text()
169 raise NotificationException(error_text
)
171 except Exception as e
:
172 error_text
= type(e
).__name
__ + ": " + str(e
)
173 self
.logger
.debug("Unable to send notification to subscriber {}. Details: {}"
174 .format(notification
["payload"]["subscriptionId"], error_text
))
176 "error": type(e
).__name
__,
177 "error_text": str(e
),
178 "timestamp": time
.time()
180 if "error_details" in notification
["payload"].keys():
181 notification
["payload"]["error_details"].append(error_detail
)
183 notification
["payload"]["error_details"] = [error_detail
]
186 self
.logger
.debug("Retry Notification for subscriber: {} after backoff delay: {} seconds."
187 .format(notification
["payload"]["subscriptionId"], backoff_delay
))
188 await asyncio
.sleep(backoff_delay
, loop
=loop
)
189 # Dropping notification
190 self
.logger
.debug("Notification {} sent failed to subscriber:{}."
191 .format(notification
["payload"]["notificationType"],
192 notification
["payload"]["subscriptionId"]))
196 class NsLcmNotification(NotificationBase
):
198 # SOL005 response model for nslcm notifications
200 "NsLcmOperationOccurrenceNotification": {"id", "nsInstanceId", "nsLcmOpOccId", "operation",
201 "notificationType", "subscriptionId", "timestamp",
202 "notificationStatus", "operationState", "isAutomaticInvocation",
203 "affectedVnf", "affectedVl", "affectedVnffg", "affectedNs",
204 "affectedSap", "error", "_links"},
206 "NsIdentifierCreationNotification": {"notificationType", "subscriptionId", "timestamp",
207 "nsInstanceId", "_links"},
209 "NsIdentifierDeletionNotification": {"notificationType", "subscriptionId", "timestamp",
210 "nsInstanceId", "_links"},
212 "NsChangeNotification": {"nsInstanceId", "nsComponentType", "nsComponentId",
213 "lcmOpOccIdImpactngNsComponent", "lcmOpNameImpactingNsComponent",
214 "lcmOpOccStatusImpactingNsComponent", "notificationType", "subscriptionId",
215 "timeStamp", "error", "_links"}
218 def __init__(self
, db
) -> None:
220 Constructor of NsLcmNotification class.
221 :param db: Database handler.
224 self
.subscriber_collection
= "mapped_subscriptions"
226 def get_models(self
) -> dict:
228 Returns the SOL005 model of notification class
230 :return: dict of SOL005 data model
232 return NsLcmNotification
.response_models
235 def _format_nslcm_subscribers(subscribers
: list, event_details
: dict) -> list:
237 Formats the raw event details from kakfa message and subscriber details.
238 :param subscribers: A list of subscribers whom the event needs to be notified.
239 :param event_details: A dict containing all meta data of event.
242 notification_id
= str(uuid4())
243 event_timestamp
= event_details
["params"]["startTime"]
244 resource_links
= event_details
["params"]["links"]
245 event_operation
= event_details
["command"]
246 for key
in ["_admin", "_id", "id", "links"]:
247 event_details
["params"].pop(key
, None)
248 for subscriber
in subscribers
:
249 subscriber
["id"] = notification_id
250 subscriber
["timestamp"] = event_timestamp
251 subscriber
["_links"] = resource_links
252 subscriber
["subscriptionId"] = subscriber
["reference"]
253 subscriber
["operation"] = event_operation
254 del subscriber
["reference"]
255 del subscriber
["_id"]
256 subscriber
.update(event_details
["params"])
259 def get_subscribers(self
, nsd_id
: str, ns_instance_id
: str, command
: str, op_state
: str,
260 event_details
: dict) -> list:
262 Queries database and returns list of subscribers.
263 :param nsd_id: NSD id of an NS whose lifecycle has changed. (scaled, terminated. etc)
264 :param ns_instance_id: NS instance id an NS whose lifecycle has changed.
265 :param command: the command for event.
266 :param op_state: the operation state of NS.
267 :param event_details: dict containing raw data of event occured.
268 :return: List of interested subscribers for occurred event.
270 filter_q
= {"identifier": [nsd_id
, ns_instance_id
], "operationStates": ["ANY"], "operationTypes": ["ANY"]}
272 filter_q
["operationStates"].append(op_state
)
274 filter_q
["operationTypes"].append(command
)
275 # self.logger.debug("Db query is: {}".format(filter_q))
278 subscribers
= self
.db
.get_list(self
.subscriber_collection
, filter_q
)
279 subscribers
= self
._format
_nslcm
_subscribers
(subscribers
, event_details
)
280 except Exception as e
:
281 error_text
= type(e
).__name
__ + ": " + str(e
)
282 self
.logger
.debug("Error getting nslcm subscribers: {}".format(error_text
))
287 class NsdNotification(NotificationBase
):
289 def __init__(self
, db
):
291 Constructor of the class
294 # TODO will update this once support is there from subscription
295 self
.response_models
= {}
296 self
.subscriber_collection
= None
299 class VnfdNotification(NotificationBase
):
301 def __init__(self
, db
):
303 Constructor of the class
306 # TODO will update this once support is there from subscription
307 self
.response_models
= {}
308 self
.subscriber_collection
= None