blob: 63d4ce832f61e21066c30159579efe1eddc5cf45 [file] [log] [blame]
K Sai Kiranbb70c812020-04-28 14:48:31 +05301# Copyright 2020 K Sai Kiran (Tata Elxsi)
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16__author__ = "K Sai Kiran <saikiran.k@tataelxsi.co.in>"
17__date__ = "$28-Apr-2020 23:59:59$"
18
19import asyncio
20import aiohttp
21from http import HTTPStatus
22import json
23import logging
24import time
25from uuid import uuid4
26
27
28class NotificationException(Exception):
29 """
30 Notification Exception
31 """
32
33 def __init__(self, message: str, http_code: int = HTTPStatus.BAD_REQUEST) -> None:
34 """
35 Constructor of notification exception
36 :param message: String text containing exception details.
37 :param http_code: HTTP status code of exception.
38 """
39 self.http_code = http_code
40 Exception.__init__(self, message)
41
42
43class NotificationBase:
K Sai Kiranbb70c812020-04-28 14:48:31 +053044 response_models = None
45 # Common HTTP payload header for all notifications.
garciadeblas4568a372021-03-24 09:19:48 +010046 payload_header = {"Content-Type": "application/json", "Accept": "application/json"}
K Sai Kiranbb70c812020-04-28 14:48:31 +053047
48 def __init__(self, db) -> None:
49 """
50 Constructor of NotificationBase class.
51 :param db: Database handler.
52 """
53 self.db = db
54 self.logger = logging.getLogger("nbi.notifications")
55 self.subscriber_collection = None
56
57 def get_models(self) -> dict:
58 """
59 Returns the SOL005 model of notification class
60 :param None
61 :return: dict of SOL005 data model
62 """
63 return NotificationBase.response_models
64
65 def get_subscribers(self, **kwargs) -> NotificationException:
66 """
67 Method should be implemented by all notification subclasses
68 :param kwargs: any keyword arguments needed for db query.
69 :return: List of subscribers
70 """
garciadeblas4568a372021-03-24 09:19:48 +010071 raise NotificationException(
72 "Method get_subscribers() is not implemented",
73 http_code=HTTPStatus.NOT_IMPLEMENTED,
74 )
K Sai Kiranbb70c812020-04-28 14:48:31 +053075
76 @staticmethod
77 def _get_basic_auth(username: str, password: str) -> tuple:
78 return aiohttp.BasicAuth(username, password)
79
garciadeblas4568a372021-03-24 09:19:48 +010080 def _decrypt_password(
81 self, hashed: str, salt: str, schema_version: str = "1.1"
82 ) -> str:
K Sai Kiranbb70c812020-04-28 14:48:31 +053083 return self.db.decrypt(hashed, schema_version, salt=salt)
84
85 def get_payload(self, meta_notification: dict) -> dict:
86 """
87 Generates SOL005 compliant payload structure and returns them in dictionary.
88 :param meta_notification: notification meta data which needs to be formatted as SOL005 compliant
89 :return: A dictionary which is SOL005 compliant.
90 """
91 model_name = meta_notification["notificationType"]
92 response_models = self.get_models()
93 if not response_models or not response_models.get(model_name):
garciadeblas4568a372021-03-24 09:19:48 +010094 raise NotificationException(
95 "Response model {} is not defined.".format(model_name),
96 HTTPStatus.NOT_IMPLEMENTED,
97 )
K Sai Kiranbb70c812020-04-28 14:48:31 +053098 model_keys = response_models[model_name]
99 payload = dict.fromkeys(model_keys, "N/A")
100 notification_keys = set(meta_notification.keys())
101 for model_key in model_keys.intersection(notification_keys):
102 payload[model_key] = meta_notification[model_key]
garciadeblas4568a372021-03-24 09:19:48 +0100103 self.logger.debug(
104 "Payload generated for subscriber: {} for {}".format(
105 payload["subscriptionId"], payload["notificationType"]
106 )
107 )
K Sai Kiranbb70c812020-04-28 14:48:31 +0530108 return payload
109
garciadeblas4568a372021-03-24 09:19:48 +0100110 async def send_notifications(
Dario Faccin5584b642023-05-24 10:21:37 +0200111 self,
112 subscribers: list,
garciadeblas4568a372021-03-24 09:19:48 +0100113 ):
K Sai Kiranbb70c812020-04-28 14:48:31 +0530114 """
115 Generate tasks for all notification for an event.
116 :param subscribers: A list of subscribers who want to be notified for event.
K Sai Kiranbb70c812020-04-28 14:48:31 +0530117 """
118 notifications = []
119 for subscriber in subscribers:
K Sai Kiran42c84ea2020-06-25 14:45:50 +0530120 # Notify without auth
121 if not subscriber.get("authentication"):
garciadeblas4568a372021-03-24 09:19:48 +0100122 notifications.append(
123 {
124 "headers": self.payload_header,
125 "payload": self.get_payload(subscriber),
126 "CallbackUri": subscriber["CallbackUri"],
127 }
128 )
K Sai Kiran42c84ea2020-06-25 14:45:50 +0530129 elif subscriber["authentication"]["authType"] == "basic":
130 salt = subscriber["subscriptionId"]
garciadeblas4568a372021-03-24 09:19:48 +0100131 hashed_password = subscriber["authentication"]["paramsBasic"][
132 "password"
133 ]
K Sai Kiran42c84ea2020-06-25 14:45:50 +0530134 password = self._decrypt_password(hashed_password, salt)
garciadeblas4568a372021-03-24 09:19:48 +0100135 auth_basic = self._get_basic_auth(
136 subscriber["authentication"]["paramsBasic"]["userName"], password
137 )
138 notifications.append(
139 {
140 "headers": self.payload_header,
141 "payload": self.get_payload(subscriber),
142 "auth_basic": auth_basic,
143 "CallbackUri": subscriber["CallbackUri"],
144 }
145 )
K Sai Kiranbb70c812020-04-28 14:48:31 +0530146 # TODO add support for AuthType OAuth and TLS after support is added in subscription.
K Sai Kiran42c84ea2020-06-25 14:45:50 +0530147 else:
garciadeblas4568a372021-03-24 09:19:48 +0100148 self.logger.debug(
149 "Subscriber {} can not be notified. {} notification auth type is not implemented".format(
150 subscriber["subscriptionId"],
151 subscriber["authentication"]["authType"],
152 )
153 )
K Sai Kiran42c84ea2020-06-25 14:45:50 +0530154
tierno2278fa42020-08-10 13:53:57 +0000155 if notifications:
156 tasks = []
Dario Faccin5584b642023-05-24 10:21:37 +0200157 async with aiohttp.ClientSession() as session:
tierno2278fa42020-08-10 13:53:57 +0000158 for notification in notifications:
garciadeblas4568a372021-03-24 09:19:48 +0100159 tasks.append(
160 asyncio.ensure_future(
Dario Faccin5584b642023-05-24 10:21:37 +0200161 self.send_notification(session, notification),
garciadeblas4568a372021-03-24 09:19:48 +0100162 )
163 )
Dario Faccin5584b642023-05-24 10:21:37 +0200164 await asyncio.gather(*tasks)
K Sai Kiranbb70c812020-04-28 14:48:31 +0530165
garciadeblas4568a372021-03-24 09:19:48 +0100166 async def send_notification(
167 self,
168 session: aiohttp.ClientSession,
169 notification: dict,
garciadeblas4568a372021-03-24 09:19:48 +0100170 retry_count: int = 5,
171 timeout: float = 5.0,
172 ):
K Sai Kiranbb70c812020-04-28 14:48:31 +0530173 """
174 Performs HTTP Post request to notify subscriber. In case if for any reason notification is not sent successfully
175 after maximum number of reties, then notification is dropped.
176 :param session: An aiohttp client session object to maintain http session.
177 :param notification: A dictionary containing all necessary data to make POST request.
K Sai Kiranbb70c812020-04-28 14:48:31 +0530178 :param retry_count: An integer specifying the maximum number of reties for a notification.
179 :param timeout: A float representing client timeout of each HTTP request.
180 """
181 backoff_delay = 1
182 while retry_count > 0:
183 try:
garciadeblas4568a372021-03-24 09:19:48 +0100184 async with session.post(
185 url=notification["CallbackUri"],
186 headers=notification["headers"],
187 auth=notification.get("auth_basic", None),
188 data=json.dumps(notification["payload"]),
189 timeout=timeout,
190 ) as resp:
K Sai Kiranbb70c812020-04-28 14:48:31 +0530191 # self.logger.debug("Notification response: {}".format(resp.status))
192 if resp.status == HTTPStatus.NO_CONTENT:
garciadeblas4568a372021-03-24 09:19:48 +0100193 self.logger.debug(
194 "Notification sent successfully to subscriber {}".format(
195 notification["payload"]["subscriptionId"]
196 )
197 )
K Sai Kiranbb70c812020-04-28 14:48:31 +0530198 else:
199 error_text = "Erroneous response code: {}, ".format(resp.status)
200 error_text += await resp.text()
201 raise NotificationException(error_text)
202 return True
203 except Exception as e:
204 error_text = type(e).__name__ + ": " + str(e)
garciadeblas4568a372021-03-24 09:19:48 +0100205 self.logger.debug(
206 "Unable to send notification to subscriber {}. Details: {}".format(
207 notification["payload"]["subscriptionId"], error_text
208 )
209 )
K Sai Kiranbb70c812020-04-28 14:48:31 +0530210 error_detail = {
211 "error": type(e).__name__,
212 "error_text": str(e),
garciadeblas4568a372021-03-24 09:19:48 +0100213 "timestamp": time.time(),
K Sai Kiranbb70c812020-04-28 14:48:31 +0530214 }
215 if "error_details" in notification["payload"].keys():
216 notification["payload"]["error_details"].append(error_detail)
217 else:
218 notification["payload"]["error_details"] = [error_detail]
219 retry_count -= 1
220 backoff_delay *= 2
garciadeblas4568a372021-03-24 09:19:48 +0100221 self.logger.debug(
222 "Retry Notification for subscriber: {} after backoff delay: {} seconds.".format(
223 notification["payload"]["subscriptionId"], backoff_delay
224 )
225 )
Dario Faccin5584b642023-05-24 10:21:37 +0200226 await asyncio.sleep(backoff_delay)
K Sai Kiranbb70c812020-04-28 14:48:31 +0530227 # Dropping notification
garciadeblas4568a372021-03-24 09:19:48 +0100228 self.logger.debug(
229 "Notification {} sent failed to subscriber:{}.".format(
230 notification["payload"]["notificationType"],
231 notification["payload"]["subscriptionId"],
232 )
233 )
K Sai Kiranbb70c812020-04-28 14:48:31 +0530234 return False
235
236
237class NsLcmNotification(NotificationBase):
K Sai Kiranbb70c812020-04-28 14:48:31 +0530238 # SOL005 response model for nslcm notifications
239 response_models = {
garciadeblas4568a372021-03-24 09:19:48 +0100240 "NsLcmOperationOccurrenceNotification": {
241 "id",
242 "nsInstanceId",
243 "nsLcmOpOccId",
244 "operation",
245 "notificationType",
246 "subscriptionId",
247 "timestamp",
248 "notificationStatus",
249 "operationState",
250 "isAutomaticInvocation",
251 "affectedVnf",
252 "affectedVl",
253 "affectedVnffg",
254 "affectedNs",
255 "affectedSap",
256 "error",
257 "_links",
258 },
259 "NsIdentifierCreationNotification": {
260 "notificationType",
261 "subscriptionId",
262 "timestamp",
263 "nsInstanceId",
264 "_links",
265 },
266 "NsIdentifierDeletionNotification": {
267 "notificationType",
268 "subscriptionId",
269 "timestamp",
270 "nsInstanceId",
271 "_links",
272 },
273 "NsChangeNotification": {
274 "nsInstanceId",
275 "nsComponentType",
276 "nsComponentId",
277 "lcmOpOccIdImpactngNsComponent",
278 "lcmOpNameImpactingNsComponent",
279 "lcmOpOccStatusImpactingNsComponent",
280 "notificationType",
281 "subscriptionId",
282 "timeStamp",
283 "error",
284 "_links",
285 },
K Sai Kiranbb70c812020-04-28 14:48:31 +0530286 }
287
288 def __init__(self, db) -> None:
289 """
K Sai Kiran42c84ea2020-06-25 14:45:50 +0530290 Constructor of NsLcmNotification class.
K Sai Kiranbb70c812020-04-28 14:48:31 +0530291 :param db: Database handler.
292 """
293 super().__init__(db)
294 self.subscriber_collection = "mapped_subscriptions"
295
296 def get_models(self) -> dict:
297 """
298 Returns the SOL005 model of notification class
299 :param None
300 :return: dict of SOL005 data model
301 """
302 return NsLcmNotification.response_models
303
304 @staticmethod
305 def _format_nslcm_subscribers(subscribers: list, event_details: dict) -> list:
306 """
307 Formats the raw event details from kakfa message and subscriber details.
308 :param subscribers: A list of subscribers whom the event needs to be notified.
309 :param event_details: A dict containing all meta data of event.
310 :return:
311 """
312 notification_id = str(uuid4())
313 event_timestamp = event_details["params"]["startTime"]
314 resource_links = event_details["params"]["links"]
315 event_operation = event_details["command"]
316 for key in ["_admin", "_id", "id", "links"]:
317 event_details["params"].pop(key, None)
318 for subscriber in subscribers:
319 subscriber["id"] = notification_id
320 subscriber["timestamp"] = event_timestamp
321 subscriber["_links"] = resource_links
322 subscriber["subscriptionId"] = subscriber["reference"]
323 subscriber["operation"] = event_operation
324 del subscriber["reference"]
325 del subscriber["_id"]
326 subscriber.update(event_details["params"])
327 return subscribers
328
garciadeblas4568a372021-03-24 09:19:48 +0100329 def get_subscribers(
330 self,
331 nsd_id: str,
332 ns_instance_id: str,
333 command: str,
334 op_state: str,
335 event_details: dict,
336 ) -> list:
K Sai Kiranbb70c812020-04-28 14:48:31 +0530337 """
338 Queries database and returns list of subscribers.
339 :param nsd_id: NSD id of an NS whose lifecycle has changed. (scaled, terminated. etc)
340 :param ns_instance_id: NS instance id an NS whose lifecycle has changed.
341 :param command: the command for event.
342 :param op_state: the operation state of NS.
343 :param event_details: dict containing raw data of event occured.
344 :return: List of interested subscribers for occurred event.
345 """
selvi.jf1004592022-04-29 05:42:35 +0000346 notification_type = [
347 "NsLcmOperationOccurrenceNotification",
348 "NsChangeNotification",
349 "NsIdentifierCreationNotification",
garciadeblasf2af4a12023-01-24 16:56:54 +0100350 "NsIdentifierDeletionNotification",
selvi.jf1004592022-04-29 05:42:35 +0000351 ]
garciadeblas4568a372021-03-24 09:19:48 +0100352 filter_q = {
353 "identifier": [nsd_id, ns_instance_id],
354 "operationStates": ["ANY"],
355 "operationTypes": ["ANY"],
garciadeblasf2af4a12023-01-24 16:56:54 +0100356 "notificationType": notification_type,
357 }
K Sai Kiranbb70c812020-04-28 14:48:31 +0530358 if op_state:
359 filter_q["operationStates"].append(op_state)
360 if command:
361 filter_q["operationTypes"].append(command)
362 # self.logger.debug("Db query is: {}".format(filter_q))
363 subscribers = []
364 try:
365 subscribers = self.db.get_list(self.subscriber_collection, filter_q)
366 subscribers = self._format_nslcm_subscribers(subscribers, event_details)
367 except Exception as e:
368 error_text = type(e).__name__ + ": " + str(e)
369 self.logger.debug("Error getting nslcm subscribers: {}".format(error_text))
370 finally:
371 return subscribers
372
373
selvi.jf1004592022-04-29 05:42:35 +0000374class VnfLcmNotification(NotificationBase):
375 # SOL003 response model for vnflcm notifications
376 response_models = {
377 "VnfLcmOperationOccurrenceNotification": {
378 "id",
379 "notificationType",
380 "subscriptionId",
381 "timeStamp",
382 "notificationStatus",
383 "operationState",
384 "vnfInstanceId",
385 "operation",
386 "isAutomaticInvocation",
387 "vnfLcmOpOccId",
388 "affectedVnfcs",
389 "affectedVirtualLinks",
390 "affectedExtLinkPorts",
391 "affectedVirtualStorages",
392 "changedInfo",
393 "changedExtConnectivity",
394 "modificationsTriggeredByVnfPkgChange",
395 "error",
garciadeblasf2af4a12023-01-24 16:56:54 +0100396 "_links",
selvi.jf1004592022-04-29 05:42:35 +0000397 },
398 "VnfIdentifierCreationNotification": {
399 "id",
400 "notificationType",
401 "subscriptionId",
402 "timeStamp",
403 "vnfInstanceId",
garciadeblasf2af4a12023-01-24 16:56:54 +0100404 "_links",
selvi.jf1004592022-04-29 05:42:35 +0000405 },
406 "VnfIdentifierDeletionNotification": {
407 "id",
408 "notificationType",
409 "subscriptionId",
410 "timeStamp",
411 "vnfInstanceId",
garciadeblasf2af4a12023-01-24 16:56:54 +0100412 "_links",
selvi.jf1004592022-04-29 05:42:35 +0000413 },
414 }
415
416 def __init__(self, db) -> None:
417 """
418 Constructor of VnfLcmNotification class.
419 :param db: Database handler.
420 """
421 super().__init__(db)
422 self.subscriber_collection = "mapped_subscriptions"
423
424 def get_models(self) -> dict:
425 """
426 Returns the SOL003 model of notification class
427 :param None
428 :return: dict of SOL003 data model
429 """
430 return self.response_models
431
garciadeblasf2af4a12023-01-24 16:56:54 +0100432 def _format_vnflcm_subscribers(
433 self, subscribers: list, event_details: dict
434 ) -> list:
selvi.jf1004592022-04-29 05:42:35 +0000435 """
436 Formats the raw event details from kafka message and subscriber details.
437 :param subscribers: A list of subscribers whom the event needs to be notified.
438 :param event_details: A dict containing all meta data of event.
439 :return:
440 """
441 notification_id = str(uuid4())
442 event_timestamp = time.time()
443 event_operation = event_details["command"]
444 for subscriber in subscribers:
445 subscriber["id"] = notification_id
446 subscriber["timeStamp"] = event_timestamp
447 subscriber["subscriptionId"] = subscriber["reference"]
448 subscriber["operation"] = event_operation
449 del subscriber["reference"]
450 del subscriber["_id"]
451 subscriber.update(event_details["params"])
452 return subscribers
453
garciadeblasf2af4a12023-01-24 16:56:54 +0100454 def get_subscribers(
455 self,
456 vnfd_id: str,
457 vnf_instance_id: str,
458 command: str,
459 op_state: str,
460 event_details: dict,
461 ) -> list:
selvi.jf1004592022-04-29 05:42:35 +0000462 """
463 Queries database and returns list of subscribers.
464 :param vnfd_id: Vnfd id of a VNF whose lifecycle has changed. (instantiated, scaled, terminated. etc)
465 :param vnf_instance_id: Vnf instance id of a VNF whose lifecycle has changed.
466 :param command: the command for event.
467 :param op_state: the operation state of VNF.
468 :param event_details: dict containing raw data of event occurred.
469 :return: List of interested subscribers for occurred event.
470 """
471 notification_type = [
472 "VnfIdentifierCreationNotification",
473 "VnfLcmOperationOccurrenceNotification",
garciadeblasf2af4a12023-01-24 16:56:54 +0100474 "VnfIdentifierDeletionNotification",
selvi.jf1004592022-04-29 05:42:35 +0000475 ]
476 filter_q = {
477 "identifier": [vnfd_id, vnf_instance_id],
478 "operationStates": ["ANY"],
479 "operationTypes": ["ANY"],
garciadeblasf2af4a12023-01-24 16:56:54 +0100480 "notificationType": notification_type,
selvi.jf1004592022-04-29 05:42:35 +0000481 }
482 if op_state:
483 filter_q["operationStates"].append(op_state)
484 if command:
485 filter_q["operationTypes"].append(command)
486 subscribers = []
487 try:
488 subscribers = self.db.get_list(self.subscriber_collection, filter_q)
489 subscribers = self._format_vnflcm_subscribers(subscribers, event_details)
490 except Exception as e:
491 error_text = type(e).__name__ + ": " + str(e)
492 self.logger.debug("Error getting vnflcm subscribers: {}".format(error_text))
493 finally:
494 return subscribers
495
496
K Sai Kiranbb70c812020-04-28 14:48:31 +0530497class NsdNotification(NotificationBase):
K Sai Kiranbb70c812020-04-28 14:48:31 +0530498 def __init__(self, db):
499 """
500 Constructor of the class
501 """
502 super().__init__(db)
503 # TODO will update this once support is there from subscription
504 self.response_models = {}
505 self.subscriber_collection = None
506
507
508class VnfdNotification(NotificationBase):
K Sai Kiranbb70c812020-04-28 14:48:31 +0530509 def __init__(self, db):
510 """
511 Constructor of the class
512 """
513 super().__init__(db)
514 # TODO will update this once support is there from subscription
515 self.response_models = {}
516 self.subscriber_collection = None