1 |
|
# Copyright 2020 K Sai Kiran (Tata Elxsi) |
2 |
|
# |
3 |
|
# Licensed under the Apache License, Version 2.0 (the "License"); |
4 |
|
# you may not use this file except in compliance with the License. |
5 |
|
# You may obtain a copy of the License at |
6 |
|
# |
7 |
|
# http://www.apache.org/licenses/LICENSE-2.0 |
8 |
|
# |
9 |
|
# Unless required by applicable law or agreed to in writing, software |
10 |
|
# distributed under the License is distributed on an "AS IS" BASIS, |
11 |
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
12 |
|
# implied. |
13 |
|
# See the License for the specific language governing permissions and |
14 |
|
# limitations under the License. |
15 |
|
|
16 |
0 |
__author__ = "K Sai Kiran <saikiran.k@tataelxsi.co.in>" |
17 |
0 |
__date__ = "$28-Apr-2020 23:59:59$" |
18 |
|
|
19 |
0 |
import asyncio |
20 |
0 |
import aiohttp |
21 |
0 |
from http import HTTPStatus |
22 |
0 |
import json |
23 |
0 |
import logging |
24 |
0 |
import time |
25 |
0 |
from uuid import uuid4 |
26 |
|
|
27 |
|
|
28 |
0 |
class NotificationException(Exception): |
29 |
|
""" |
30 |
|
Notification Exception |
31 |
|
""" |
32 |
|
|
33 |
0 |
def __init__(self, message: str, http_code: int = HTTPStatus.BAD_REQUEST) -> None: |
34 |
|
""" |
35 |
|
Constructor of notification exception |
36 |
|
:param message: String text containing exception details. |
37 |
|
:param http_code: HTTP status code of exception. |
38 |
|
""" |
39 |
0 |
self.http_code = http_code |
40 |
0 |
Exception.__init__(self, message) |
41 |
|
|
42 |
|
|
43 |
0 |
class NotificationBase: |
44 |
|
|
45 |
0 |
response_models = None |
46 |
|
# Common HTTP payload header for all notifications. |
47 |
0 |
payload_header = {"Content-Type": "application/json", "Accept": "application/json"} |
48 |
|
|
49 |
0 |
def __init__(self, db) -> None: |
50 |
|
""" |
51 |
|
Constructor of NotificationBase class. |
52 |
|
:param db: Database handler. |
53 |
|
""" |
54 |
0 |
self.db = db |
55 |
0 |
self.logger = logging.getLogger("nbi.notifications") |
56 |
0 |
self.subscriber_collection = None |
57 |
|
|
58 |
0 |
def get_models(self) -> dict: |
59 |
|
""" |
60 |
|
Returns the SOL005 model of notification class |
61 |
|
:param None |
62 |
|
:return: dict of SOL005 data model |
63 |
|
""" |
64 |
0 |
return NotificationBase.response_models |
65 |
|
|
66 |
0 |
def get_subscribers(self, **kwargs) -> NotificationException: |
67 |
|
""" |
68 |
|
Method should be implemented by all notification subclasses |
69 |
|
:param kwargs: any keyword arguments needed for db query. |
70 |
|
:return: List of subscribers |
71 |
|
""" |
72 |
0 |
raise NotificationException( |
73 |
|
"Method get_subscribers() is not implemented", |
74 |
|
http_code=HTTPStatus.NOT_IMPLEMENTED, |
75 |
|
) |
76 |
|
|
77 |
0 |
@staticmethod |
78 |
0 |
def _get_basic_auth(username: str, password: str) -> tuple: |
79 |
0 |
return aiohttp.BasicAuth(username, password) |
80 |
|
|
81 |
0 |
def _decrypt_password( |
82 |
|
self, hashed: str, salt: str, schema_version: str = "1.1" |
83 |
|
) -> str: |
84 |
0 |
return self.db.decrypt(hashed, schema_version, salt=salt) |
85 |
|
|
86 |
0 |
def get_payload(self, meta_notification: dict) -> dict: |
87 |
|
""" |
88 |
|
Generates SOL005 compliant payload structure and returns them in dictionary. |
89 |
|
:param meta_notification: notification meta data which needs to be formatted as SOL005 compliant |
90 |
|
:return: A dictionary which is SOL005 compliant. |
91 |
|
""" |
92 |
0 |
model_name = meta_notification["notificationType"] |
93 |
0 |
response_models = self.get_models() |
94 |
0 |
if not response_models or not response_models.get(model_name): |
95 |
0 |
raise NotificationException( |
96 |
|
"Response model {} is not defined.".format(model_name), |
97 |
|
HTTPStatus.NOT_IMPLEMENTED, |
98 |
|
) |
99 |
0 |
model_keys = response_models[model_name] |
100 |
0 |
payload = dict.fromkeys(model_keys, "N/A") |
101 |
0 |
notification_keys = set(meta_notification.keys()) |
102 |
0 |
for model_key in model_keys.intersection(notification_keys): |
103 |
0 |
payload[model_key] = meta_notification[model_key] |
104 |
0 |
self.logger.debug( |
105 |
|
"Payload generated for subscriber: {} for {}".format( |
106 |
|
payload["subscriptionId"], payload["notificationType"] |
107 |
|
) |
108 |
|
) |
109 |
0 |
return payload |
110 |
|
|
111 |
0 |
async def send_notifications( |
112 |
|
self, subscribers: list, loop: asyncio.AbstractEventLoop = None |
113 |
|
): |
114 |
|
""" |
115 |
|
Generate tasks for all notification for an event. |
116 |
|
:param subscribers: A list of subscribers who want to be notified for event. |
117 |
|
:param loop: Event loop object. |
118 |
|
""" |
119 |
0 |
notifications = [] |
120 |
0 |
for subscriber in subscribers: |
121 |
|
# Notify without auth |
122 |
0 |
if not subscriber.get("authentication"): |
123 |
0 |
notifications.append( |
124 |
|
{ |
125 |
|
"headers": self.payload_header, |
126 |
|
"payload": self.get_payload(subscriber), |
127 |
|
"CallbackUri": subscriber["CallbackUri"], |
128 |
|
} |
129 |
|
) |
130 |
0 |
elif subscriber["authentication"]["authType"] == "basic": |
131 |
0 |
salt = subscriber["subscriptionId"] |
132 |
0 |
hashed_password = subscriber["authentication"]["paramsBasic"][ |
133 |
|
"password" |
134 |
|
] |
135 |
0 |
password = self._decrypt_password(hashed_password, salt) |
136 |
0 |
auth_basic = self._get_basic_auth( |
137 |
|
subscriber["authentication"]["paramsBasic"]["userName"], password |
138 |
|
) |
139 |
0 |
notifications.append( |
140 |
|
{ |
141 |
|
"headers": self.payload_header, |
142 |
|
"payload": self.get_payload(subscriber), |
143 |
|
"auth_basic": auth_basic, |
144 |
|
"CallbackUri": subscriber["CallbackUri"], |
145 |
|
} |
146 |
|
) |
147 |
|
# TODO add support for AuthType OAuth and TLS after support is added in subscription. |
148 |
|
else: |
149 |
0 |
self.logger.debug( |
150 |
|
"Subscriber {} can not be notified. {} notification auth type is not implemented".format( |
151 |
|
subscriber["subscriptionId"], |
152 |
|
subscriber["authentication"]["authType"], |
153 |
|
) |
154 |
|
) |
155 |
|
|
156 |
0 |
if notifications: |
157 |
0 |
tasks = [] |
158 |
0 |
async with aiohttp.ClientSession(loop=loop) as session: |
159 |
0 |
for notification in notifications: |
160 |
0 |
tasks.append( |
161 |
|
asyncio.ensure_future( |
162 |
|
self.send_notification(session, notification, loop=loop), |
163 |
|
loop=loop, |
164 |
|
) |
165 |
|
) |
166 |
0 |
await asyncio.gather(*tasks, loop=loop) |
167 |
|
|
168 |
0 |
async def send_notification( |
169 |
|
self, |
170 |
|
session: aiohttp.ClientSession, |
171 |
|
notification: dict, |
172 |
|
loop: asyncio.AbstractEventLoop = None, |
173 |
|
retry_count: int = 5, |
174 |
|
timeout: float = 5.0, |
175 |
|
): |
176 |
|
""" |
177 |
|
Performs HTTP Post request to notify subscriber. In case if for any reason notification is not sent successfully |
178 |
|
after maximum number of reties, then notification is dropped. |
179 |
|
:param session: An aiohttp client session object to maintain http session. |
180 |
|
:param notification: A dictionary containing all necessary data to make POST request. |
181 |
|
:param loop: Event loop object. |
182 |
|
:param retry_count: An integer specifying the maximum number of reties for a notification. |
183 |
|
:param timeout: A float representing client timeout of each HTTP request. |
184 |
|
""" |
185 |
0 |
backoff_delay = 1 |
186 |
0 |
while retry_count > 0: |
187 |
0 |
try: |
188 |
0 |
async with session.post( |
189 |
|
url=notification["CallbackUri"], |
190 |
|
headers=notification["headers"], |
191 |
|
auth=notification.get("auth_basic", None), |
192 |
|
data=json.dumps(notification["payload"]), |
193 |
|
timeout=timeout, |
194 |
|
) as resp: |
195 |
|
# self.logger.debug("Notification response: {}".format(resp.status)) |
196 |
0 |
if resp.status == HTTPStatus.NO_CONTENT: |
197 |
0 |
self.logger.debug( |
198 |
|
"Notification sent successfully to subscriber {}".format( |
199 |
|
notification["payload"]["subscriptionId"] |
200 |
|
) |
201 |
|
) |
202 |
|
else: |
203 |
0 |
error_text = "Erroneous response code: {}, ".format(resp.status) |
204 |
0 |
error_text += await resp.text() |
205 |
0 |
raise NotificationException(error_text) |
206 |
0 |
return True |
207 |
0 |
except Exception as e: |
208 |
0 |
error_text = type(e).__name__ + ": " + str(e) |
209 |
0 |
self.logger.debug( |
210 |
|
"Unable to send notification to subscriber {}. Details: {}".format( |
211 |
|
notification["payload"]["subscriptionId"], error_text |
212 |
|
) |
213 |
|
) |
214 |
0 |
error_detail = { |
215 |
|
"error": type(e).__name__, |
216 |
|
"error_text": str(e), |
217 |
|
"timestamp": time.time(), |
218 |
|
} |
219 |
0 |
if "error_details" in notification["payload"].keys(): |
220 |
0 |
notification["payload"]["error_details"].append(error_detail) |
221 |
|
else: |
222 |
0 |
notification["payload"]["error_details"] = [error_detail] |
223 |
0 |
retry_count -= 1 |
224 |
0 |
backoff_delay *= 2 |
225 |
0 |
self.logger.debug( |
226 |
|
"Retry Notification for subscriber: {} after backoff delay: {} seconds.".format( |
227 |
|
notification["payload"]["subscriptionId"], backoff_delay |
228 |
|
) |
229 |
|
) |
230 |
0 |
await asyncio.sleep(backoff_delay, loop=loop) |
231 |
|
# Dropping notification |
232 |
0 |
self.logger.debug( |
233 |
|
"Notification {} sent failed to subscriber:{}.".format( |
234 |
|
notification["payload"]["notificationType"], |
235 |
|
notification["payload"]["subscriptionId"], |
236 |
|
) |
237 |
|
) |
238 |
0 |
return False |
239 |
|
|
240 |
|
|
241 |
0 |
class NsLcmNotification(NotificationBase): |
242 |
|
|
243 |
|
# SOL005 response model for nslcm notifications |
244 |
0 |
response_models = { |
245 |
|
"NsLcmOperationOccurrenceNotification": { |
246 |
|
"id", |
247 |
|
"nsInstanceId", |
248 |
|
"nsLcmOpOccId", |
249 |
|
"operation", |
250 |
|
"notificationType", |
251 |
|
"subscriptionId", |
252 |
|
"timestamp", |
253 |
|
"notificationStatus", |
254 |
|
"operationState", |
255 |
|
"isAutomaticInvocation", |
256 |
|
"affectedVnf", |
257 |
|
"affectedVl", |
258 |
|
"affectedVnffg", |
259 |
|
"affectedNs", |
260 |
|
"affectedSap", |
261 |
|
"error", |
262 |
|
"_links", |
263 |
|
}, |
264 |
|
"NsIdentifierCreationNotification": { |
265 |
|
"notificationType", |
266 |
|
"subscriptionId", |
267 |
|
"timestamp", |
268 |
|
"nsInstanceId", |
269 |
|
"_links", |
270 |
|
}, |
271 |
|
"NsIdentifierDeletionNotification": { |
272 |
|
"notificationType", |
273 |
|
"subscriptionId", |
274 |
|
"timestamp", |
275 |
|
"nsInstanceId", |
276 |
|
"_links", |
277 |
|
}, |
278 |
|
"NsChangeNotification": { |
279 |
|
"nsInstanceId", |
280 |
|
"nsComponentType", |
281 |
|
"nsComponentId", |
282 |
|
"lcmOpOccIdImpactngNsComponent", |
283 |
|
"lcmOpNameImpactingNsComponent", |
284 |
|
"lcmOpOccStatusImpactingNsComponent", |
285 |
|
"notificationType", |
286 |
|
"subscriptionId", |
287 |
|
"timeStamp", |
288 |
|
"error", |
289 |
|
"_links", |
290 |
|
}, |
291 |
|
} |
292 |
|
|
293 |
0 |
def __init__(self, db) -> None: |
294 |
|
""" |
295 |
|
Constructor of NsLcmNotification class. |
296 |
|
:param db: Database handler. |
297 |
|
""" |
298 |
0 |
super().__init__(db) |
299 |
0 |
self.subscriber_collection = "mapped_subscriptions" |
300 |
|
|
301 |
0 |
def get_models(self) -> dict: |
302 |
|
""" |
303 |
|
Returns the SOL005 model of notification class |
304 |
|
:param None |
305 |
|
:return: dict of SOL005 data model |
306 |
|
""" |
307 |
0 |
return NsLcmNotification.response_models |
308 |
|
|
309 |
0 |
@staticmethod |
310 |
0 |
def _format_nslcm_subscribers(subscribers: list, event_details: dict) -> list: |
311 |
|
""" |
312 |
|
Formats the raw event details from kakfa message and subscriber details. |
313 |
|
:param subscribers: A list of subscribers whom the event needs to be notified. |
314 |
|
:param event_details: A dict containing all meta data of event. |
315 |
|
:return: |
316 |
|
""" |
317 |
0 |
notification_id = str(uuid4()) |
318 |
0 |
event_timestamp = event_details["params"]["startTime"] |
319 |
0 |
resource_links = event_details["params"]["links"] |
320 |
0 |
event_operation = event_details["command"] |
321 |
0 |
for key in ["_admin", "_id", "id", "links"]: |
322 |
0 |
event_details["params"].pop(key, None) |
323 |
0 |
for subscriber in subscribers: |
324 |
0 |
subscriber["id"] = notification_id |
325 |
0 |
subscriber["timestamp"] = event_timestamp |
326 |
0 |
subscriber["_links"] = resource_links |
327 |
0 |
subscriber["subscriptionId"] = subscriber["reference"] |
328 |
0 |
subscriber["operation"] = event_operation |
329 |
0 |
del subscriber["reference"] |
330 |
0 |
del subscriber["_id"] |
331 |
0 |
subscriber.update(event_details["params"]) |
332 |
0 |
return subscribers |
333 |
|
|
334 |
0 |
def get_subscribers( |
335 |
|
self, |
336 |
|
nsd_id: str, |
337 |
|
ns_instance_id: str, |
338 |
|
command: str, |
339 |
|
op_state: str, |
340 |
|
event_details: dict, |
341 |
|
) -> list: |
342 |
|
""" |
343 |
|
Queries database and returns list of subscribers. |
344 |
|
:param nsd_id: NSD id of an NS whose lifecycle has changed. (scaled, terminated. etc) |
345 |
|
:param ns_instance_id: NS instance id an NS whose lifecycle has changed. |
346 |
|
:param command: the command for event. |
347 |
|
:param op_state: the operation state of NS. |
348 |
|
:param event_details: dict containing raw data of event occured. |
349 |
|
:return: List of interested subscribers for occurred event. |
350 |
|
""" |
351 |
0 |
notification_type = [ |
352 |
|
"NsLcmOperationOccurrenceNotification", |
353 |
|
"NsChangeNotification", |
354 |
|
"NsIdentifierCreationNotification", |
355 |
|
"NsIdentifierDeletionNotification" |
356 |
|
] |
357 |
0 |
filter_q = { |
358 |
|
"identifier": [nsd_id, ns_instance_id], |
359 |
|
"operationStates": ["ANY"], |
360 |
|
"operationTypes": ["ANY"], |
361 |
|
"notificationType": notification_type |
362 |
|
} |
363 |
0 |
if op_state: |
364 |
0 |
filter_q["operationStates"].append(op_state) |
365 |
0 |
if command: |
366 |
0 |
filter_q["operationTypes"].append(command) |
367 |
|
# self.logger.debug("Db query is: {}".format(filter_q)) |
368 |
0 |
subscribers = [] |
369 |
0 |
try: |
370 |
0 |
subscribers = self.db.get_list(self.subscriber_collection, filter_q) |
371 |
0 |
subscribers = self._format_nslcm_subscribers(subscribers, event_details) |
372 |
0 |
except Exception as e: |
373 |
0 |
error_text = type(e).__name__ + ": " + str(e) |
374 |
0 |
self.logger.debug("Error getting nslcm subscribers: {}".format(error_text)) |
375 |
|
finally: |
376 |
0 |
return subscribers |
377 |
|
|
378 |
|
|
379 |
0 |
class VnfLcmNotification(NotificationBase): |
380 |
|
# SOL003 response model for vnflcm notifications |
381 |
0 |
response_models = { |
382 |
|
"VnfLcmOperationOccurrenceNotification": { |
383 |
|
"id", |
384 |
|
"notificationType", |
385 |
|
"subscriptionId", |
386 |
|
"timeStamp", |
387 |
|
"notificationStatus", |
388 |
|
"operationState", |
389 |
|
"vnfInstanceId", |
390 |
|
"operation", |
391 |
|
"isAutomaticInvocation", |
392 |
|
"vnfLcmOpOccId", |
393 |
|
"affectedVnfcs", |
394 |
|
"affectedVirtualLinks", |
395 |
|
"affectedExtLinkPorts", |
396 |
|
"affectedVirtualStorages", |
397 |
|
"changedInfo", |
398 |
|
"changedExtConnectivity", |
399 |
|
"modificationsTriggeredByVnfPkgChange", |
400 |
|
"error", |
401 |
|
"_links" |
402 |
|
}, |
403 |
|
"VnfIdentifierCreationNotification": { |
404 |
|
"id", |
405 |
|
"notificationType", |
406 |
|
"subscriptionId", |
407 |
|
"timeStamp", |
408 |
|
"vnfInstanceId", |
409 |
|
"_links" |
410 |
|
}, |
411 |
|
"VnfIdentifierDeletionNotification": { |
412 |
|
"id", |
413 |
|
"notificationType", |
414 |
|
"subscriptionId", |
415 |
|
"timeStamp", |
416 |
|
"vnfInstanceId", |
417 |
|
"_links" |
418 |
|
}, |
419 |
|
} |
420 |
|
|
421 |
0 |
def __init__(self, db) -> None: |
422 |
|
""" |
423 |
|
Constructor of VnfLcmNotification class. |
424 |
|
:param db: Database handler. |
425 |
|
""" |
426 |
0 |
super().__init__(db) |
427 |
0 |
self.subscriber_collection = "mapped_subscriptions" |
428 |
|
|
429 |
0 |
def get_models(self) -> dict: |
430 |
|
""" |
431 |
|
Returns the SOL003 model of notification class |
432 |
|
:param None |
433 |
|
:return: dict of SOL003 data model |
434 |
|
""" |
435 |
0 |
return self.response_models |
436 |
|
|
437 |
0 |
def _format_vnflcm_subscribers(self, subscribers: list, event_details: dict) -> list: |
438 |
|
""" |
439 |
|
Formats the raw event details from kafka message and subscriber details. |
440 |
|
:param subscribers: A list of subscribers whom the event needs to be notified. |
441 |
|
:param event_details: A dict containing all meta data of event. |
442 |
|
:return: |
443 |
|
""" |
444 |
0 |
notification_id = str(uuid4()) |
445 |
0 |
event_timestamp = time.time() |
446 |
0 |
event_operation = event_details["command"] |
447 |
0 |
for subscriber in subscribers: |
448 |
0 |
subscriber["id"] = notification_id |
449 |
0 |
subscriber["timeStamp"] = event_timestamp |
450 |
0 |
subscriber["subscriptionId"] = subscriber["reference"] |
451 |
0 |
subscriber["operation"] = event_operation |
452 |
0 |
del subscriber["reference"] |
453 |
0 |
del subscriber["_id"] |
454 |
0 |
subscriber.update(event_details["params"]) |
455 |
0 |
return subscribers |
456 |
|
|
457 |
0 |
def get_subscribers(self, vnfd_id: str, vnf_instance_id: str, command: str, op_state: str, |
458 |
|
event_details: dict) -> list: |
459 |
|
""" |
460 |
|
Queries database and returns list of subscribers. |
461 |
|
:param vnfd_id: Vnfd id of a VNF whose lifecycle has changed. (instantiated, scaled, terminated. etc) |
462 |
|
:param vnf_instance_id: Vnf instance id of a VNF whose lifecycle has changed. |
463 |
|
:param command: the command for event. |
464 |
|
:param op_state: the operation state of VNF. |
465 |
|
:param event_details: dict containing raw data of event occurred. |
466 |
|
:return: List of interested subscribers for occurred event. |
467 |
|
""" |
468 |
0 |
notification_type = [ |
469 |
|
"VnfIdentifierCreationNotification", |
470 |
|
"VnfLcmOperationOccurrenceNotification", |
471 |
|
"VnfIdentifierDeletionNotification" |
472 |
|
] |
473 |
0 |
filter_q = { |
474 |
|
"identifier": [vnfd_id, vnf_instance_id], |
475 |
|
"operationStates": ["ANY"], |
476 |
|
"operationTypes": ["ANY"], |
477 |
|
"notificationType": notification_type |
478 |
|
} |
479 |
0 |
if op_state: |
480 |
0 |
filter_q["operationStates"].append(op_state) |
481 |
0 |
if command: |
482 |
0 |
filter_q["operationTypes"].append(command) |
483 |
0 |
subscribers = [] |
484 |
0 |
try: |
485 |
0 |
subscribers = self.db.get_list(self.subscriber_collection, filter_q) |
486 |
0 |
subscribers = self._format_vnflcm_subscribers(subscribers, event_details) |
487 |
0 |
except Exception as e: |
488 |
0 |
error_text = type(e).__name__ + ": " + str(e) |
489 |
0 |
self.logger.debug("Error getting vnflcm subscribers: {}".format(error_text)) |
490 |
|
finally: |
491 |
0 |
return subscribers |
492 |
|
|
493 |
|
|
494 |
0 |
class NsdNotification(NotificationBase): |
495 |
0 |
def __init__(self, db): |
496 |
|
""" |
497 |
|
Constructor of the class |
498 |
|
""" |
499 |
0 |
super().__init__(db) |
500 |
|
# TODO will update this once support is there from subscription |
501 |
0 |
self.response_models = {} |
502 |
0 |
self.subscriber_collection = None |
503 |
|
|
504 |
|
|
505 |
0 |
class VnfdNotification(NotificationBase): |
506 |
0 |
def __init__(self, db): |
507 |
|
""" |
508 |
|
Constructor of the class |
509 |
|
""" |
510 |
0 |
super().__init__(db) |
511 |
|
# TODO will update this once support is there from subscription |
512 |
0 |
self.response_models = {} |
513 |
0 |
self.subscriber_collection = None |