1 |
|
# Copyright 2020 K Sai Kiran (Tata Elxsi) |
2 |
|
# |
3 |
|
# Licensed under the Apache License, Version 2.0 (the "License"); |
4 |
|
# you may not use this file except in compliance with the License. |
5 |
|
# You may obtain a copy of the License at |
6 |
|
# |
7 |
|
# http://www.apache.org/licenses/LICENSE-2.0 |
8 |
|
# |
9 |
|
# Unless required by applicable law or agreed to in writing, software |
10 |
|
# distributed under the License is distributed on an "AS IS" BASIS, |
11 |
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
12 |
|
# implied. |
13 |
|
# See the License for the specific language governing permissions and |
14 |
|
# limitations under the License. |
15 |
|
|
16 |
0 |
__author__ = "K Sai Kiran <saikiran.k@tataelxsi.co.in>" |
17 |
0 |
__date__ = "$28-Apr-2020 23:59:59$" |
18 |
|
|
19 |
0 |
import asyncio |
20 |
0 |
import aiohttp |
21 |
0 |
from http import HTTPStatus |
22 |
0 |
import json |
23 |
0 |
import logging |
24 |
0 |
import time |
25 |
0 |
from uuid import uuid4 |
26 |
|
|
27 |
|
|
28 |
0 |
class NotificationException(Exception): |
29 |
|
""" |
30 |
|
Notification Exception |
31 |
|
""" |
32 |
|
|
33 |
0 |
def __init__(self, message: str, http_code: int = HTTPStatus.BAD_REQUEST) -> None: |
34 |
|
""" |
35 |
|
Constructor of notification exception |
36 |
|
:param message: String text containing exception details. |
37 |
|
:param http_code: HTTP status code of exception. |
38 |
|
""" |
39 |
0 |
self.http_code = http_code |
40 |
0 |
Exception.__init__(self, message) |
41 |
|
|
42 |
|
|
43 |
0 |
class NotificationBase: |
44 |
0 |
response_models = None |
45 |
|
# Common HTTP payload header for all notifications. |
46 |
0 |
payload_header = {"Content-Type": "application/json", "Accept": "application/json"} |
47 |
|
|
48 |
0 |
def __init__(self, db) -> None: |
49 |
|
""" |
50 |
|
Constructor of NotificationBase class. |
51 |
|
:param db: Database handler. |
52 |
|
""" |
53 |
0 |
self.db = db |
54 |
0 |
self.logger = logging.getLogger("nbi.notifications") |
55 |
0 |
self.subscriber_collection = None |
56 |
|
|
57 |
0 |
def get_models(self) -> dict: |
58 |
|
""" |
59 |
|
Returns the SOL005 model of notification class |
60 |
|
:param None |
61 |
|
:return: dict of SOL005 data model |
62 |
|
""" |
63 |
0 |
return NotificationBase.response_models |
64 |
|
|
65 |
0 |
def get_subscribers(self, **kwargs) -> NotificationException: |
66 |
|
""" |
67 |
|
Method should be implemented by all notification subclasses |
68 |
|
:param kwargs: any keyword arguments needed for db query. |
69 |
|
:return: List of subscribers |
70 |
|
""" |
71 |
0 |
raise NotificationException( |
72 |
|
"Method get_subscribers() is not implemented", |
73 |
|
http_code=HTTPStatus.NOT_IMPLEMENTED, |
74 |
|
) |
75 |
|
|
76 |
0 |
@staticmethod |
77 |
0 |
def _get_basic_auth(username: str, password: str) -> tuple: |
78 |
0 |
return aiohttp.BasicAuth(username, password) |
79 |
|
|
80 |
0 |
def _decrypt_password( |
81 |
|
self, hashed: str, salt: str, schema_version: str = "1.1" |
82 |
|
) -> str: |
83 |
0 |
return self.db.decrypt(hashed, schema_version, salt=salt) |
84 |
|
|
85 |
0 |
def get_payload(self, meta_notification: dict) -> dict: |
86 |
|
""" |
87 |
|
Generates SOL005 compliant payload structure and returns them in dictionary. |
88 |
|
:param meta_notification: notification meta data which needs to be formatted as SOL005 compliant |
89 |
|
:return: A dictionary which is SOL005 compliant. |
90 |
|
""" |
91 |
0 |
model_name = meta_notification["notificationType"] |
92 |
0 |
response_models = self.get_models() |
93 |
0 |
if not response_models or not response_models.get(model_name): |
94 |
0 |
raise NotificationException( |
95 |
|
"Response model {} is not defined.".format(model_name), |
96 |
|
HTTPStatus.NOT_IMPLEMENTED, |
97 |
|
) |
98 |
0 |
model_keys = response_models[model_name] |
99 |
0 |
payload = dict.fromkeys(model_keys, "N/A") |
100 |
0 |
notification_keys = set(meta_notification.keys()) |
101 |
0 |
for model_key in model_keys.intersection(notification_keys): |
102 |
0 |
payload[model_key] = meta_notification[model_key] |
103 |
0 |
self.logger.debug( |
104 |
|
"Payload generated for subscriber: {} for {}".format( |
105 |
|
payload["subscriptionId"], payload["notificationType"] |
106 |
|
) |
107 |
|
) |
108 |
0 |
return payload |
109 |
|
|
110 |
0 |
async def send_notifications( |
111 |
|
self, |
112 |
|
subscribers: list, |
113 |
|
): |
114 |
|
""" |
115 |
|
Generate tasks for all notification for an event. |
116 |
|
:param subscribers: A list of subscribers who want to be notified for event. |
117 |
|
""" |
118 |
0 |
notifications = [] |
119 |
0 |
for subscriber in subscribers: |
120 |
|
# Notify without auth |
121 |
0 |
if not subscriber.get("authentication"): |
122 |
0 |
notifications.append( |
123 |
|
{ |
124 |
|
"headers": self.payload_header, |
125 |
|
"payload": self.get_payload(subscriber), |
126 |
|
"CallbackUri": subscriber["CallbackUri"], |
127 |
|
} |
128 |
|
) |
129 |
0 |
elif subscriber["authentication"]["authType"] == "basic": |
130 |
0 |
salt = subscriber["subscriptionId"] |
131 |
0 |
hashed_password = subscriber["authentication"]["paramsBasic"][ |
132 |
|
"password" |
133 |
|
] |
134 |
0 |
password = self._decrypt_password(hashed_password, salt) |
135 |
0 |
auth_basic = self._get_basic_auth( |
136 |
|
subscriber["authentication"]["paramsBasic"]["userName"], password |
137 |
|
) |
138 |
0 |
notifications.append( |
139 |
|
{ |
140 |
|
"headers": self.payload_header, |
141 |
|
"payload": self.get_payload(subscriber), |
142 |
|
"auth_basic": auth_basic, |
143 |
|
"CallbackUri": subscriber["CallbackUri"], |
144 |
|
} |
145 |
|
) |
146 |
|
# TODO add support for AuthType OAuth and TLS after support is added in subscription. |
147 |
|
else: |
148 |
0 |
self.logger.debug( |
149 |
|
"Subscriber {} can not be notified. {} notification auth type is not implemented".format( |
150 |
|
subscriber["subscriptionId"], |
151 |
|
subscriber["authentication"]["authType"], |
152 |
|
) |
153 |
|
) |
154 |
|
|
155 |
0 |
if notifications: |
156 |
0 |
tasks = [] |
157 |
0 |
async with aiohttp.ClientSession() as session: |
158 |
0 |
for notification in notifications: |
159 |
0 |
tasks.append( |
160 |
|
asyncio.ensure_future( |
161 |
|
self.send_notification(session, notification), |
162 |
|
) |
163 |
|
) |
164 |
0 |
await asyncio.gather(*tasks) |
165 |
|
|
166 |
0 |
async def send_notification( |
167 |
|
self, |
168 |
|
session: aiohttp.ClientSession, |
169 |
|
notification: dict, |
170 |
|
retry_count: int = 5, |
171 |
|
timeout: float = 5.0, |
172 |
|
): |
173 |
|
""" |
174 |
|
Performs HTTP Post request to notify subscriber. In case if for any reason notification is not sent successfully |
175 |
|
after maximum number of reties, then notification is dropped. |
176 |
|
:param session: An aiohttp client session object to maintain http session. |
177 |
|
:param notification: A dictionary containing all necessary data to make POST request. |
178 |
|
:param retry_count: An integer specifying the maximum number of reties for a notification. |
179 |
|
:param timeout: A float representing client timeout of each HTTP request. |
180 |
|
""" |
181 |
0 |
backoff_delay = 1 |
182 |
0 |
while retry_count > 0: |
183 |
0 |
try: |
184 |
0 |
async with session.post( |
185 |
|
url=notification["CallbackUri"], |
186 |
|
headers=notification["headers"], |
187 |
|
auth=notification.get("auth_basic", None), |
188 |
|
data=json.dumps(notification["payload"]), |
189 |
|
timeout=timeout, |
190 |
|
) as resp: |
191 |
|
# self.logger.debug("Notification response: {}".format(resp.status)) |
192 |
0 |
if resp.status == HTTPStatus.NO_CONTENT: |
193 |
0 |
self.logger.debug( |
194 |
|
"Notification sent successfully to subscriber {}".format( |
195 |
|
notification["payload"]["subscriptionId"] |
196 |
|
) |
197 |
|
) |
198 |
|
else: |
199 |
0 |
error_text = "Erroneous response code: {}, ".format(resp.status) |
200 |
0 |
error_text += await resp.text() |
201 |
0 |
raise NotificationException(error_text) |
202 |
0 |
return True |
203 |
0 |
except Exception as e: |
204 |
0 |
error_text = type(e).__name__ + ": " + str(e) |
205 |
0 |
self.logger.debug( |
206 |
|
"Unable to send notification to subscriber {}. Details: {}".format( |
207 |
|
notification["payload"]["subscriptionId"], error_text |
208 |
|
) |
209 |
|
) |
210 |
0 |
error_detail = { |
211 |
|
"error": type(e).__name__, |
212 |
|
"error_text": str(e), |
213 |
|
"timestamp": time.time(), |
214 |
|
} |
215 |
0 |
if "error_details" in notification["payload"].keys(): |
216 |
0 |
notification["payload"]["error_details"].append(error_detail) |
217 |
|
else: |
218 |
0 |
notification["payload"]["error_details"] = [error_detail] |
219 |
0 |
retry_count -= 1 |
220 |
0 |
backoff_delay *= 2 |
221 |
0 |
self.logger.debug( |
222 |
|
"Retry Notification for subscriber: {} after backoff delay: {} seconds.".format( |
223 |
|
notification["payload"]["subscriptionId"], backoff_delay |
224 |
|
) |
225 |
|
) |
226 |
0 |
await asyncio.sleep(backoff_delay) |
227 |
|
# Dropping notification |
228 |
0 |
self.logger.debug( |
229 |
|
"Notification {} sent failed to subscriber:{}.".format( |
230 |
|
notification["payload"]["notificationType"], |
231 |
|
notification["payload"]["subscriptionId"], |
232 |
|
) |
233 |
|
) |
234 |
0 |
return False |
235 |
|
|
236 |
|
|
237 |
0 |
class NsLcmNotification(NotificationBase): |
238 |
|
# SOL005 response model for nslcm notifications |
239 |
0 |
response_models = { |
240 |
|
"NsLcmOperationOccurrenceNotification": { |
241 |
|
"id", |
242 |
|
"nsInstanceId", |
243 |
|
"nsLcmOpOccId", |
244 |
|
"operation", |
245 |
|
"notificationType", |
246 |
|
"subscriptionId", |
247 |
|
"timestamp", |
248 |
|
"notificationStatus", |
249 |
|
"operationState", |
250 |
|
"isAutomaticInvocation", |
251 |
|
"affectedVnf", |
252 |
|
"affectedVl", |
253 |
|
"affectedVnffg", |
254 |
|
"affectedNs", |
255 |
|
"affectedSap", |
256 |
|
"error", |
257 |
|
"_links", |
258 |
|
}, |
259 |
|
"NsIdentifierCreationNotification": { |
260 |
|
"notificationType", |
261 |
|
"subscriptionId", |
262 |
|
"timestamp", |
263 |
|
"nsInstanceId", |
264 |
|
"_links", |
265 |
|
}, |
266 |
|
"NsIdentifierDeletionNotification": { |
267 |
|
"notificationType", |
268 |
|
"subscriptionId", |
269 |
|
"timestamp", |
270 |
|
"nsInstanceId", |
271 |
|
"_links", |
272 |
|
}, |
273 |
|
"NsChangeNotification": { |
274 |
|
"nsInstanceId", |
275 |
|
"nsComponentType", |
276 |
|
"nsComponentId", |
277 |
|
"lcmOpOccIdImpactngNsComponent", |
278 |
|
"lcmOpNameImpactingNsComponent", |
279 |
|
"lcmOpOccStatusImpactingNsComponent", |
280 |
|
"notificationType", |
281 |
|
"subscriptionId", |
282 |
|
"timeStamp", |
283 |
|
"error", |
284 |
|
"_links", |
285 |
|
}, |
286 |
|
} |
287 |
|
|
288 |
0 |
def __init__(self, db) -> None: |
289 |
|
""" |
290 |
|
Constructor of NsLcmNotification class. |
291 |
|
:param db: Database handler. |
292 |
|
""" |
293 |
0 |
super().__init__(db) |
294 |
0 |
self.subscriber_collection = "mapped_subscriptions" |
295 |
|
|
296 |
0 |
def get_models(self) -> dict: |
297 |
|
""" |
298 |
|
Returns the SOL005 model of notification class |
299 |
|
:param None |
300 |
|
:return: dict of SOL005 data model |
301 |
|
""" |
302 |
0 |
return NsLcmNotification.response_models |
303 |
|
|
304 |
0 |
@staticmethod |
305 |
0 |
def _format_nslcm_subscribers(subscribers: list, event_details: dict) -> list: |
306 |
|
""" |
307 |
|
Formats the raw event details from kakfa message and subscriber details. |
308 |
|
:param subscribers: A list of subscribers whom the event needs to be notified. |
309 |
|
:param event_details: A dict containing all meta data of event. |
310 |
|
:return: |
311 |
|
""" |
312 |
0 |
notification_id = str(uuid4()) |
313 |
0 |
event_timestamp = event_details["params"]["startTime"] |
314 |
0 |
resource_links = event_details["params"]["links"] |
315 |
0 |
event_operation = event_details["command"] |
316 |
0 |
for key in ["_admin", "_id", "id", "links"]: |
317 |
0 |
event_details["params"].pop(key, None) |
318 |
0 |
for subscriber in subscribers: |
319 |
0 |
subscriber["id"] = notification_id |
320 |
0 |
subscriber["timestamp"] = event_timestamp |
321 |
0 |
subscriber["_links"] = resource_links |
322 |
0 |
subscriber["subscriptionId"] = subscriber["reference"] |
323 |
0 |
subscriber["operation"] = event_operation |
324 |
0 |
del subscriber["reference"] |
325 |
0 |
del subscriber["_id"] |
326 |
0 |
subscriber.update(event_details["params"]) |
327 |
0 |
return subscribers |
328 |
|
|
329 |
0 |
def get_subscribers( |
330 |
|
self, |
331 |
|
nsd_id: str, |
332 |
|
ns_instance_id: str, |
333 |
|
command: str, |
334 |
|
op_state: str, |
335 |
|
event_details: dict, |
336 |
|
) -> list: |
337 |
|
""" |
338 |
|
Queries database and returns list of subscribers. |
339 |
|
:param nsd_id: NSD id of an NS whose lifecycle has changed. (scaled, terminated. etc) |
340 |
|
:param ns_instance_id: NS instance id an NS whose lifecycle has changed. |
341 |
|
:param command: the command for event. |
342 |
|
:param op_state: the operation state of NS. |
343 |
|
:param event_details: dict containing raw data of event occured. |
344 |
|
:return: List of interested subscribers for occurred event. |
345 |
|
""" |
346 |
0 |
notification_type = [ |
347 |
|
"NsLcmOperationOccurrenceNotification", |
348 |
|
"NsChangeNotification", |
349 |
|
"NsIdentifierCreationNotification", |
350 |
|
"NsIdentifierDeletionNotification", |
351 |
|
] |
352 |
0 |
filter_q = { |
353 |
|
"identifier": [nsd_id, ns_instance_id], |
354 |
|
"operationStates": ["ANY"], |
355 |
|
"operationTypes": ["ANY"], |
356 |
|
"notificationType": notification_type, |
357 |
|
} |
358 |
0 |
if op_state: |
359 |
0 |
filter_q["operationStates"].append(op_state) |
360 |
0 |
if command: |
361 |
0 |
filter_q["operationTypes"].append(command) |
362 |
|
# self.logger.debug("Db query is: {}".format(filter_q)) |
363 |
0 |
subscribers = [] |
364 |
0 |
try: |
365 |
0 |
subscribers = self.db.get_list(self.subscriber_collection, filter_q) |
366 |
0 |
subscribers = self._format_nslcm_subscribers(subscribers, event_details) |
367 |
0 |
except Exception as e: |
368 |
0 |
error_text = type(e).__name__ + ": " + str(e) |
369 |
0 |
self.logger.debug("Error getting nslcm subscribers: {}".format(error_text)) |
370 |
|
finally: |
371 |
0 |
return subscribers |
372 |
|
|
373 |
|
|
374 |
0 |
class VnfLcmNotification(NotificationBase): |
375 |
|
# SOL003 response model for vnflcm notifications |
376 |
0 |
response_models = { |
377 |
|
"VnfLcmOperationOccurrenceNotification": { |
378 |
|
"id", |
379 |
|
"notificationType", |
380 |
|
"subscriptionId", |
381 |
|
"timeStamp", |
382 |
|
"notificationStatus", |
383 |
|
"operationState", |
384 |
|
"vnfInstanceId", |
385 |
|
"operation", |
386 |
|
"isAutomaticInvocation", |
387 |
|
"vnfLcmOpOccId", |
388 |
|
"affectedVnfcs", |
389 |
|
"affectedVirtualLinks", |
390 |
|
"affectedExtLinkPorts", |
391 |
|
"affectedVirtualStorages", |
392 |
|
"changedInfo", |
393 |
|
"changedExtConnectivity", |
394 |
|
"modificationsTriggeredByVnfPkgChange", |
395 |
|
"error", |
396 |
|
"_links", |
397 |
|
}, |
398 |
|
"VnfIdentifierCreationNotification": { |
399 |
|
"id", |
400 |
|
"notificationType", |
401 |
|
"subscriptionId", |
402 |
|
"timeStamp", |
403 |
|
"vnfInstanceId", |
404 |
|
"_links", |
405 |
|
}, |
406 |
|
"VnfIdentifierDeletionNotification": { |
407 |
|
"id", |
408 |
|
"notificationType", |
409 |
|
"subscriptionId", |
410 |
|
"timeStamp", |
411 |
|
"vnfInstanceId", |
412 |
|
"_links", |
413 |
|
}, |
414 |
|
} |
415 |
|
|
416 |
0 |
def __init__(self, db) -> None: |
417 |
|
""" |
418 |
|
Constructor of VnfLcmNotification class. |
419 |
|
:param db: Database handler. |
420 |
|
""" |
421 |
0 |
super().__init__(db) |
422 |
0 |
self.subscriber_collection = "mapped_subscriptions" |
423 |
|
|
424 |
0 |
def get_models(self) -> dict: |
425 |
|
""" |
426 |
|
Returns the SOL003 model of notification class |
427 |
|
:param None |
428 |
|
:return: dict of SOL003 data model |
429 |
|
""" |
430 |
0 |
return self.response_models |
431 |
|
|
432 |
0 |
def _format_vnflcm_subscribers( |
433 |
|
self, subscribers: list, event_details: dict |
434 |
|
) -> list: |
435 |
|
""" |
436 |
|
Formats the raw event details from kafka message and subscriber details. |
437 |
|
:param subscribers: A list of subscribers whom the event needs to be notified. |
438 |
|
:param event_details: A dict containing all meta data of event. |
439 |
|
:return: |
440 |
|
""" |
441 |
0 |
notification_id = str(uuid4()) |
442 |
0 |
event_timestamp = time.time() |
443 |
0 |
event_operation = event_details["command"] |
444 |
0 |
for subscriber in subscribers: |
445 |
0 |
subscriber["id"] = notification_id |
446 |
0 |
subscriber["timeStamp"] = event_timestamp |
447 |
0 |
subscriber["subscriptionId"] = subscriber["reference"] |
448 |
0 |
subscriber["operation"] = event_operation |
449 |
0 |
del subscriber["reference"] |
450 |
0 |
del subscriber["_id"] |
451 |
0 |
subscriber.update(event_details["params"]) |
452 |
0 |
return subscribers |
453 |
|
|
454 |
0 |
def get_subscribers( |
455 |
|
self, |
456 |
|
vnfd_id: str, |
457 |
|
vnf_instance_id: str, |
458 |
|
command: str, |
459 |
|
op_state: str, |
460 |
|
event_details: dict, |
461 |
|
) -> list: |
462 |
|
""" |
463 |
|
Queries database and returns list of subscribers. |
464 |
|
:param vnfd_id: Vnfd id of a VNF whose lifecycle has changed. (instantiated, scaled, terminated. etc) |
465 |
|
:param vnf_instance_id: Vnf instance id of a VNF whose lifecycle has changed. |
466 |
|
:param command: the command for event. |
467 |
|
:param op_state: the operation state of VNF. |
468 |
|
:param event_details: dict containing raw data of event occurred. |
469 |
|
:return: List of interested subscribers for occurred event. |
470 |
|
""" |
471 |
0 |
notification_type = [ |
472 |
|
"VnfIdentifierCreationNotification", |
473 |
|
"VnfLcmOperationOccurrenceNotification", |
474 |
|
"VnfIdentifierDeletionNotification", |
475 |
|
] |
476 |
0 |
filter_q = { |
477 |
|
"identifier": [vnfd_id, vnf_instance_id], |
478 |
|
"operationStates": ["ANY"], |
479 |
|
"operationTypes": ["ANY"], |
480 |
|
"notificationType": notification_type, |
481 |
|
} |
482 |
0 |
if op_state: |
483 |
0 |
filter_q["operationStates"].append(op_state) |
484 |
0 |
if command: |
485 |
0 |
filter_q["operationTypes"].append(command) |
486 |
0 |
subscribers = [] |
487 |
0 |
try: |
488 |
0 |
subscribers = self.db.get_list(self.subscriber_collection, filter_q) |
489 |
0 |
subscribers = self._format_vnflcm_subscribers(subscribers, event_details) |
490 |
0 |
except Exception as e: |
491 |
0 |
error_text = type(e).__name__ + ": " + str(e) |
492 |
0 |
self.logger.debug("Error getting vnflcm subscribers: {}".format(error_text)) |
493 |
|
finally: |
494 |
0 |
return subscribers |
495 |
|
|
496 |
|
|
497 |
0 |
class NsdNotification(NotificationBase): |
498 |
0 |
def __init__(self, db): |
499 |
|
""" |
500 |
|
Constructor of the class |
501 |
|
""" |
502 |
0 |
super().__init__(db) |
503 |
|
# TODO will update this once support is there from subscription |
504 |
0 |
self.response_models = {} |
505 |
0 |
self.subscriber_collection = None |
506 |
|
|
507 |
|
|
508 |
0 |
class VnfdNotification(NotificationBase): |
509 |
0 |
def __init__(self, db): |
510 |
|
""" |
511 |
|
Constructor of the class |
512 |
|
""" |
513 |
0 |
super().__init__(db) |
514 |
|
# TODO will update this once support is there from subscription |
515 |
0 |
self.response_models = {} |
516 |
0 |
self.subscriber_collection = None |