blob: 6d20968ce6deb412aeeb7ff310f1ecba7dcbde2b [file] [log] [blame]
K Sai Kiranbb70c812020-04-28 14:48:31 +05301# Copyright 2020 K Sai Kiran (Tata Elxsi)
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16__author__ = "K Sai Kiran <saikiran.k@tataelxsi.co.in>"
17__date__ = "$28-Apr-2020 23:59:59$"
18
19import asyncio
20import aiohttp
Adurti9103e542024-05-08 05:37:59 +000021import ssl
22import certifi
K Sai Kiranbb70c812020-04-28 14:48:31 +053023from http import HTTPStatus
24import json
25import logging
26import time
27from uuid import uuid4
28
29
30class NotificationException(Exception):
31 """
32 Notification Exception
33 """
34
35 def __init__(self, message: str, http_code: int = HTTPStatus.BAD_REQUEST) -> None:
36 """
37 Constructor of notification exception
38 :param message: String text containing exception details.
39 :param http_code: HTTP status code of exception.
40 """
41 self.http_code = http_code
42 Exception.__init__(self, message)
43
44
45class NotificationBase:
K Sai Kiranbb70c812020-04-28 14:48:31 +053046 response_models = None
47 # Common HTTP payload header for all notifications.
garciadeblas4568a372021-03-24 09:19:48 +010048 payload_header = {"Content-Type": "application/json", "Accept": "application/json"}
K Sai Kiranbb70c812020-04-28 14:48:31 +053049
50 def __init__(self, db) -> None:
51 """
52 Constructor of NotificationBase class.
53 :param db: Database handler.
54 """
55 self.db = db
56 self.logger = logging.getLogger("nbi.notifications")
57 self.subscriber_collection = None
58
59 def get_models(self) -> dict:
60 """
61 Returns the SOL005 model of notification class
62 :param None
63 :return: dict of SOL005 data model
64 """
65 return NotificationBase.response_models
66
67 def get_subscribers(self, **kwargs) -> NotificationException:
68 """
69 Method should be implemented by all notification subclasses
70 :param kwargs: any keyword arguments needed for db query.
71 :return: List of subscribers
72 """
garciadeblas4568a372021-03-24 09:19:48 +010073 raise NotificationException(
74 "Method get_subscribers() is not implemented",
75 http_code=HTTPStatus.NOT_IMPLEMENTED,
76 )
K Sai Kiranbb70c812020-04-28 14:48:31 +053077
78 @staticmethod
79 def _get_basic_auth(username: str, password: str) -> tuple:
80 return aiohttp.BasicAuth(username, password)
81
garciadeblas4568a372021-03-24 09:19:48 +010082 def _decrypt_password(
83 self, hashed: str, salt: str, schema_version: str = "1.1"
84 ) -> str:
K Sai Kiranbb70c812020-04-28 14:48:31 +053085 return self.db.decrypt(hashed, schema_version, salt=salt)
86
87 def get_payload(self, meta_notification: dict) -> dict:
88 """
89 Generates SOL005 compliant payload structure and returns them in dictionary.
90 :param meta_notification: notification meta data which needs to be formatted as SOL005 compliant
91 :return: A dictionary which is SOL005 compliant.
92 """
93 model_name = meta_notification["notificationType"]
94 response_models = self.get_models()
95 if not response_models or not response_models.get(model_name):
garciadeblas4568a372021-03-24 09:19:48 +010096 raise NotificationException(
97 "Response model {} is not defined.".format(model_name),
98 HTTPStatus.NOT_IMPLEMENTED,
99 )
K Sai Kiranbb70c812020-04-28 14:48:31 +0530100 model_keys = response_models[model_name]
101 payload = dict.fromkeys(model_keys, "N/A")
102 notification_keys = set(meta_notification.keys())
103 for model_key in model_keys.intersection(notification_keys):
104 payload[model_key] = meta_notification[model_key]
garciadeblas4568a372021-03-24 09:19:48 +0100105 self.logger.debug(
106 "Payload generated for subscriber: {} for {}".format(
107 payload["subscriptionId"], payload["notificationType"]
108 )
109 )
K Sai Kiranbb70c812020-04-28 14:48:31 +0530110 return payload
111
garciadeblas4568a372021-03-24 09:19:48 +0100112 async def send_notifications(
Mark Beierl375aeb22023-05-10 13:55:55 -0400113 self,
114 subscribers: list,
garciadeblas4568a372021-03-24 09:19:48 +0100115 ):
K Sai Kiranbb70c812020-04-28 14:48:31 +0530116 """
117 Generate tasks for all notification for an event.
118 :param subscribers: A list of subscribers who want to be notified for event.
K Sai Kiranbb70c812020-04-28 14:48:31 +0530119 """
120 notifications = []
121 for subscriber in subscribers:
K Sai Kiran42c84ea2020-06-25 14:45:50 +0530122 # Notify without auth
123 if not subscriber.get("authentication"):
garciadeblas4568a372021-03-24 09:19:48 +0100124 notifications.append(
125 {
126 "headers": self.payload_header,
127 "payload": self.get_payload(subscriber),
128 "CallbackUri": subscriber["CallbackUri"],
129 }
130 )
K Sai Kiran42c84ea2020-06-25 14:45:50 +0530131 elif subscriber["authentication"]["authType"] == "basic":
132 salt = subscriber["subscriptionId"]
garciadeblas4568a372021-03-24 09:19:48 +0100133 hashed_password = subscriber["authentication"]["paramsBasic"][
134 "password"
135 ]
K Sai Kiran42c84ea2020-06-25 14:45:50 +0530136 password = self._decrypt_password(hashed_password, salt)
garciadeblas4568a372021-03-24 09:19:48 +0100137 auth_basic = self._get_basic_auth(
138 subscriber["authentication"]["paramsBasic"]["userName"], password
139 )
140 notifications.append(
141 {
142 "headers": self.payload_header,
143 "payload": self.get_payload(subscriber),
144 "auth_basic": auth_basic,
145 "CallbackUri": subscriber["CallbackUri"],
146 }
147 )
K Sai Kiranbb70c812020-04-28 14:48:31 +0530148 # TODO add support for AuthType OAuth and TLS after support is added in subscription.
K Sai Kiran42c84ea2020-06-25 14:45:50 +0530149 else:
garciadeblas4568a372021-03-24 09:19:48 +0100150 self.logger.debug(
151 "Subscriber {} can not be notified. {} notification auth type is not implemented".format(
152 subscriber["subscriptionId"],
153 subscriber["authentication"]["authType"],
154 )
155 )
K Sai Kiran42c84ea2020-06-25 14:45:50 +0530156
tierno2278fa42020-08-10 13:53:57 +0000157 if notifications:
158 tasks = []
Adurti9103e542024-05-08 05:37:59 +0000159 ssl_context = ssl.create_default_context(cafile=certifi.where())
160 conn = aiohttp.TCPConnector(ssl=ssl_context)
161 async with aiohttp.ClientSession(connector=conn) as session:
tierno2278fa42020-08-10 13:53:57 +0000162 for notification in notifications:
garciadeblas4568a372021-03-24 09:19:48 +0100163 tasks.append(
164 asyncio.ensure_future(
Mark Beierl375aeb22023-05-10 13:55:55 -0400165 self.send_notification(session, notification),
garciadeblas4568a372021-03-24 09:19:48 +0100166 )
167 )
Mark Beierl375aeb22023-05-10 13:55:55 -0400168 await asyncio.gather(*tasks)
K Sai Kiranbb70c812020-04-28 14:48:31 +0530169
garciadeblas4568a372021-03-24 09:19:48 +0100170 async def send_notification(
171 self,
172 session: aiohttp.ClientSession,
173 notification: dict,
garciadeblas4568a372021-03-24 09:19:48 +0100174 retry_count: int = 5,
175 timeout: float = 5.0,
176 ):
K Sai Kiranbb70c812020-04-28 14:48:31 +0530177 """
178 Performs HTTP Post request to notify subscriber. In case if for any reason notification is not sent successfully
179 after maximum number of reties, then notification is dropped.
180 :param session: An aiohttp client session object to maintain http session.
181 :param notification: A dictionary containing all necessary data to make POST request.
K Sai Kiranbb70c812020-04-28 14:48:31 +0530182 :param retry_count: An integer specifying the maximum number of reties for a notification.
183 :param timeout: A float representing client timeout of each HTTP request.
184 """
185 backoff_delay = 1
186 while retry_count > 0:
187 try:
garciadeblas4568a372021-03-24 09:19:48 +0100188 async with session.post(
189 url=notification["CallbackUri"],
190 headers=notification["headers"],
191 auth=notification.get("auth_basic", None),
192 data=json.dumps(notification["payload"]),
193 timeout=timeout,
194 ) as resp:
K Sai Kiranbb70c812020-04-28 14:48:31 +0530195 # self.logger.debug("Notification response: {}".format(resp.status))
196 if resp.status == HTTPStatus.NO_CONTENT:
garciadeblas4568a372021-03-24 09:19:48 +0100197 self.logger.debug(
198 "Notification sent successfully to subscriber {}".format(
199 notification["payload"]["subscriptionId"]
200 )
201 )
K Sai Kiranbb70c812020-04-28 14:48:31 +0530202 else:
203 error_text = "Erroneous response code: {}, ".format(resp.status)
204 error_text += await resp.text()
205 raise NotificationException(error_text)
206 return True
207 except Exception as e:
208 error_text = type(e).__name__ + ": " + str(e)
garciadeblas4568a372021-03-24 09:19:48 +0100209 self.logger.debug(
210 "Unable to send notification to subscriber {}. Details: {}".format(
211 notification["payload"]["subscriptionId"], error_text
212 )
213 )
K Sai Kiranbb70c812020-04-28 14:48:31 +0530214 error_detail = {
215 "error": type(e).__name__,
216 "error_text": str(e),
garciadeblas4568a372021-03-24 09:19:48 +0100217 "timestamp": time.time(),
K Sai Kiranbb70c812020-04-28 14:48:31 +0530218 }
219 if "error_details" in notification["payload"].keys():
220 notification["payload"]["error_details"].append(error_detail)
221 else:
222 notification["payload"]["error_details"] = [error_detail]
223 retry_count -= 1
224 backoff_delay *= 2
garciadeblas4568a372021-03-24 09:19:48 +0100225 self.logger.debug(
226 "Retry Notification for subscriber: {} after backoff delay: {} seconds.".format(
227 notification["payload"]["subscriptionId"], backoff_delay
228 )
229 )
Mark Beierl375aeb22023-05-10 13:55:55 -0400230 await asyncio.sleep(backoff_delay)
K Sai Kiranbb70c812020-04-28 14:48:31 +0530231 # Dropping notification
garciadeblas4568a372021-03-24 09:19:48 +0100232 self.logger.debug(
233 "Notification {} sent failed to subscriber:{}.".format(
234 notification["payload"]["notificationType"],
235 notification["payload"]["subscriptionId"],
236 )
237 )
K Sai Kiranbb70c812020-04-28 14:48:31 +0530238 return False
239
240
241class NsLcmNotification(NotificationBase):
K Sai Kiranbb70c812020-04-28 14:48:31 +0530242 # SOL005 response model for nslcm notifications
243 response_models = {
garciadeblas4568a372021-03-24 09:19:48 +0100244 "NsLcmOperationOccurrenceNotification": {
245 "id",
246 "nsInstanceId",
247 "nsLcmOpOccId",
248 "operation",
249 "notificationType",
250 "subscriptionId",
251 "timestamp",
252 "notificationStatus",
253 "operationState",
254 "isAutomaticInvocation",
255 "affectedVnf",
256 "affectedVl",
257 "affectedVnffg",
258 "affectedNs",
259 "affectedSap",
260 "error",
261 "_links",
262 },
263 "NsIdentifierCreationNotification": {
264 "notificationType",
265 "subscriptionId",
266 "timestamp",
267 "nsInstanceId",
268 "_links",
269 },
270 "NsIdentifierDeletionNotification": {
271 "notificationType",
272 "subscriptionId",
273 "timestamp",
274 "nsInstanceId",
275 "_links",
276 },
277 "NsChangeNotification": {
278 "nsInstanceId",
279 "nsComponentType",
280 "nsComponentId",
281 "lcmOpOccIdImpactngNsComponent",
282 "lcmOpNameImpactingNsComponent",
283 "lcmOpOccStatusImpactingNsComponent",
284 "notificationType",
285 "subscriptionId",
286 "timeStamp",
287 "error",
288 "_links",
289 },
K Sai Kiranbb70c812020-04-28 14:48:31 +0530290 }
291
292 def __init__(self, db) -> None:
293 """
K Sai Kiran42c84ea2020-06-25 14:45:50 +0530294 Constructor of NsLcmNotification class.
K Sai Kiranbb70c812020-04-28 14:48:31 +0530295 :param db: Database handler.
296 """
297 super().__init__(db)
298 self.subscriber_collection = "mapped_subscriptions"
299
300 def get_models(self) -> dict:
301 """
302 Returns the SOL005 model of notification class
303 :param None
304 :return: dict of SOL005 data model
305 """
306 return NsLcmNotification.response_models
307
308 @staticmethod
309 def _format_nslcm_subscribers(subscribers: list, event_details: dict) -> list:
310 """
311 Formats the raw event details from kakfa message and subscriber details.
312 :param subscribers: A list of subscribers whom the event needs to be notified.
313 :param event_details: A dict containing all meta data of event.
314 :return:
315 """
316 notification_id = str(uuid4())
317 event_timestamp = event_details["params"]["startTime"]
318 resource_links = event_details["params"]["links"]
319 event_operation = event_details["command"]
320 for key in ["_admin", "_id", "id", "links"]:
321 event_details["params"].pop(key, None)
322 for subscriber in subscribers:
323 subscriber["id"] = notification_id
324 subscriber["timestamp"] = event_timestamp
325 subscriber["_links"] = resource_links
326 subscriber["subscriptionId"] = subscriber["reference"]
327 subscriber["operation"] = event_operation
328 del subscriber["reference"]
329 del subscriber["_id"]
330 subscriber.update(event_details["params"])
331 return subscribers
332
garciadeblas4568a372021-03-24 09:19:48 +0100333 def get_subscribers(
334 self,
335 nsd_id: str,
336 ns_instance_id: str,
337 command: str,
338 op_state: str,
339 event_details: dict,
340 ) -> list:
K Sai Kiranbb70c812020-04-28 14:48:31 +0530341 """
342 Queries database and returns list of subscribers.
343 :param nsd_id: NSD id of an NS whose lifecycle has changed. (scaled, terminated. etc)
344 :param ns_instance_id: NS instance id an NS whose lifecycle has changed.
345 :param command: the command for event.
346 :param op_state: the operation state of NS.
347 :param event_details: dict containing raw data of event occured.
348 :return: List of interested subscribers for occurred event.
349 """
selvi.jf1004592022-04-29 05:42:35 +0000350 notification_type = [
351 "NsLcmOperationOccurrenceNotification",
352 "NsChangeNotification",
353 "NsIdentifierCreationNotification",
garciadeblasf2af4a12023-01-24 16:56:54 +0100354 "NsIdentifierDeletionNotification",
selvi.jf1004592022-04-29 05:42:35 +0000355 ]
garciadeblas4568a372021-03-24 09:19:48 +0100356 filter_q = {
357 "identifier": [nsd_id, ns_instance_id],
358 "operationStates": ["ANY"],
359 "operationTypes": ["ANY"],
garciadeblasf2af4a12023-01-24 16:56:54 +0100360 "notificationType": notification_type,
361 }
K Sai Kiranbb70c812020-04-28 14:48:31 +0530362 if op_state:
363 filter_q["operationStates"].append(op_state)
364 if command:
365 filter_q["operationTypes"].append(command)
366 # self.logger.debug("Db query is: {}".format(filter_q))
367 subscribers = []
368 try:
369 subscribers = self.db.get_list(self.subscriber_collection, filter_q)
370 subscribers = self._format_nslcm_subscribers(subscribers, event_details)
371 except Exception as e:
372 error_text = type(e).__name__ + ": " + str(e)
373 self.logger.debug("Error getting nslcm subscribers: {}".format(error_text))
374 finally:
375 return subscribers
376
377
selvi.jf1004592022-04-29 05:42:35 +0000378class VnfLcmNotification(NotificationBase):
379 # SOL003 response model for vnflcm notifications
380 response_models = {
381 "VnfLcmOperationOccurrenceNotification": {
382 "id",
383 "notificationType",
384 "subscriptionId",
385 "timeStamp",
386 "notificationStatus",
387 "operationState",
388 "vnfInstanceId",
389 "operation",
390 "isAutomaticInvocation",
391 "vnfLcmOpOccId",
392 "affectedVnfcs",
393 "affectedVirtualLinks",
394 "affectedExtLinkPorts",
395 "affectedVirtualStorages",
396 "changedInfo",
397 "changedExtConnectivity",
398 "modificationsTriggeredByVnfPkgChange",
399 "error",
garciadeblasf2af4a12023-01-24 16:56:54 +0100400 "_links",
selvi.jf1004592022-04-29 05:42:35 +0000401 },
402 "VnfIdentifierCreationNotification": {
403 "id",
404 "notificationType",
405 "subscriptionId",
406 "timeStamp",
407 "vnfInstanceId",
garciadeblasf2af4a12023-01-24 16:56:54 +0100408 "_links",
selvi.jf1004592022-04-29 05:42:35 +0000409 },
410 "VnfIdentifierDeletionNotification": {
411 "id",
412 "notificationType",
413 "subscriptionId",
414 "timeStamp",
415 "vnfInstanceId",
garciadeblasf2af4a12023-01-24 16:56:54 +0100416 "_links",
selvi.jf1004592022-04-29 05:42:35 +0000417 },
418 }
419
420 def __init__(self, db) -> None:
421 """
422 Constructor of VnfLcmNotification class.
423 :param db: Database handler.
424 """
425 super().__init__(db)
426 self.subscriber_collection = "mapped_subscriptions"
427
428 def get_models(self) -> dict:
429 """
430 Returns the SOL003 model of notification class
431 :param None
432 :return: dict of SOL003 data model
433 """
434 return self.response_models
435
garciadeblasf2af4a12023-01-24 16:56:54 +0100436 def _format_vnflcm_subscribers(
437 self, subscribers: list, event_details: dict
438 ) -> list:
selvi.jf1004592022-04-29 05:42:35 +0000439 """
440 Formats the raw event details from kafka message and subscriber details.
441 :param subscribers: A list of subscribers whom the event needs to be notified.
442 :param event_details: A dict containing all meta data of event.
443 :return:
444 """
445 notification_id = str(uuid4())
446 event_timestamp = time.time()
447 event_operation = event_details["command"]
448 for subscriber in subscribers:
449 subscriber["id"] = notification_id
450 subscriber["timeStamp"] = event_timestamp
451 subscriber["subscriptionId"] = subscriber["reference"]
452 subscriber["operation"] = event_operation
453 del subscriber["reference"]
454 del subscriber["_id"]
455 subscriber.update(event_details["params"])
456 return subscribers
457
garciadeblasf2af4a12023-01-24 16:56:54 +0100458 def get_subscribers(
459 self,
460 vnfd_id: str,
461 vnf_instance_id: str,
462 command: str,
463 op_state: str,
464 event_details: dict,
465 ) -> list:
selvi.jf1004592022-04-29 05:42:35 +0000466 """
467 Queries database and returns list of subscribers.
468 :param vnfd_id: Vnfd id of a VNF whose lifecycle has changed. (instantiated, scaled, terminated. etc)
469 :param vnf_instance_id: Vnf instance id of a VNF whose lifecycle has changed.
470 :param command: the command for event.
471 :param op_state: the operation state of VNF.
472 :param event_details: dict containing raw data of event occurred.
473 :return: List of interested subscribers for occurred event.
474 """
475 notification_type = [
476 "VnfIdentifierCreationNotification",
477 "VnfLcmOperationOccurrenceNotification",
garciadeblasf2af4a12023-01-24 16:56:54 +0100478 "VnfIdentifierDeletionNotification",
selvi.jf1004592022-04-29 05:42:35 +0000479 ]
480 filter_q = {
481 "identifier": [vnfd_id, vnf_instance_id],
482 "operationStates": ["ANY"],
483 "operationTypes": ["ANY"],
garciadeblasf2af4a12023-01-24 16:56:54 +0100484 "notificationType": notification_type,
selvi.jf1004592022-04-29 05:42:35 +0000485 }
486 if op_state:
487 filter_q["operationStates"].append(op_state)
488 if command:
489 filter_q["operationTypes"].append(command)
490 subscribers = []
491 try:
492 subscribers = self.db.get_list(self.subscriber_collection, filter_q)
493 subscribers = self._format_vnflcm_subscribers(subscribers, event_details)
494 except Exception as e:
495 error_text = type(e).__name__ + ": " + str(e)
496 self.logger.debug("Error getting vnflcm subscribers: {}".format(error_text))
497 finally:
498 return subscribers
499
500
K Sai Kiranbb70c812020-04-28 14:48:31 +0530501class NsdNotification(NotificationBase):
K Sai Kiranbb70c812020-04-28 14:48:31 +0530502 def __init__(self, db):
503 """
504 Constructor of the class
505 """
506 super().__init__(db)
507 # TODO will update this once support is there from subscription
508 self.response_models = {}
509 self.subscriber_collection = None
510
511
512class VnfdNotification(NotificationBase):
K Sai Kiranbb70c812020-04-28 14:48:31 +0530513 def __init__(self, db):
514 """
515 Constructor of the class
516 """
517 super().__init__(db)
518 # TODO will update this once support is there from subscription
519 self.response_models = {}
520 self.subscriber_collection = None