blob: a62670b6453c2f8e823af95cf38a6ecd8cd62c15 [file] [log] [blame]
K Sai Kiranbb70c812020-04-28 14:48:31 +05301# Copyright 2020 K Sai Kiran (Tata Elxsi)
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16__author__ = "K Sai Kiran <saikiran.k@tataelxsi.co.in>"
17__date__ = "$28-Apr-2020 23:59:59$"
18
19import asyncio
20import aiohttp
21from http import HTTPStatus
22import json
23import logging
24import time
25from uuid import uuid4
26
27
28class NotificationException(Exception):
29 """
30 Notification Exception
31 """
32
33 def __init__(self, message: str, http_code: int = HTTPStatus.BAD_REQUEST) -> None:
34 """
35 Constructor of notification exception
36 :param message: String text containing exception details.
37 :param http_code: HTTP status code of exception.
38 """
39 self.http_code = http_code
40 Exception.__init__(self, message)
41
42
43class NotificationBase:
K Sai Kiranbb70c812020-04-28 14:48:31 +053044 response_models = None
45 # Common HTTP payload header for all notifications.
garciadeblas4568a372021-03-24 09:19:48 +010046 payload_header = {"Content-Type": "application/json", "Accept": "application/json"}
K Sai Kiranbb70c812020-04-28 14:48:31 +053047
48 def __init__(self, db) -> None:
49 """
50 Constructor of NotificationBase class.
51 :param db: Database handler.
52 """
53 self.db = db
54 self.logger = logging.getLogger("nbi.notifications")
55 self.subscriber_collection = None
56
57 def get_models(self) -> dict:
58 """
59 Returns the SOL005 model of notification class
60 :param None
61 :return: dict of SOL005 data model
62 """
63 return NotificationBase.response_models
64
65 def get_subscribers(self, **kwargs) -> NotificationException:
66 """
67 Method should be implemented by all notification subclasses
68 :param kwargs: any keyword arguments needed for db query.
69 :return: List of subscribers
70 """
garciadeblas4568a372021-03-24 09:19:48 +010071 raise NotificationException(
72 "Method get_subscribers() is not implemented",
73 http_code=HTTPStatus.NOT_IMPLEMENTED,
74 )
K Sai Kiranbb70c812020-04-28 14:48:31 +053075
76 @staticmethod
77 def _get_basic_auth(username: str, password: str) -> tuple:
78 return aiohttp.BasicAuth(username, password)
79
garciadeblas4568a372021-03-24 09:19:48 +010080 def _decrypt_password(
81 self, hashed: str, salt: str, schema_version: str = "1.1"
82 ) -> str:
K Sai Kiranbb70c812020-04-28 14:48:31 +053083 return self.db.decrypt(hashed, schema_version, salt=salt)
84
85 def get_payload(self, meta_notification: dict) -> dict:
86 """
87 Generates SOL005 compliant payload structure and returns them in dictionary.
88 :param meta_notification: notification meta data which needs to be formatted as SOL005 compliant
89 :return: A dictionary which is SOL005 compliant.
90 """
91 model_name = meta_notification["notificationType"]
92 response_models = self.get_models()
93 if not response_models or not response_models.get(model_name):
garciadeblas4568a372021-03-24 09:19:48 +010094 raise NotificationException(
95 "Response model {} is not defined.".format(model_name),
96 HTTPStatus.NOT_IMPLEMENTED,
97 )
K Sai Kiranbb70c812020-04-28 14:48:31 +053098 model_keys = response_models[model_name]
99 payload = dict.fromkeys(model_keys, "N/A")
100 notification_keys = set(meta_notification.keys())
101 for model_key in model_keys.intersection(notification_keys):
102 payload[model_key] = meta_notification[model_key]
garciadeblas4568a372021-03-24 09:19:48 +0100103 self.logger.debug(
104 "Payload generated for subscriber: {} for {}".format(
105 payload["subscriptionId"], payload["notificationType"]
106 )
107 )
K Sai Kiranbb70c812020-04-28 14:48:31 +0530108 return payload
109
garciadeblas4568a372021-03-24 09:19:48 +0100110 async def send_notifications(
111 self, subscribers: list, loop: asyncio.AbstractEventLoop = None
112 ):
K Sai Kiranbb70c812020-04-28 14:48:31 +0530113 """
114 Generate tasks for all notification for an event.
115 :param subscribers: A list of subscribers who want to be notified for event.
116 :param loop: Event loop object.
117 """
118 notifications = []
119 for subscriber in subscribers:
K Sai Kiran42c84ea2020-06-25 14:45:50 +0530120 # Notify without auth
121 if not subscriber.get("authentication"):
garciadeblas4568a372021-03-24 09:19:48 +0100122 notifications.append(
123 {
124 "headers": self.payload_header,
125 "payload": self.get_payload(subscriber),
126 "CallbackUri": subscriber["CallbackUri"],
127 }
128 )
K Sai Kiran42c84ea2020-06-25 14:45:50 +0530129 elif subscriber["authentication"]["authType"] == "basic":
130 salt = subscriber["subscriptionId"]
garciadeblas4568a372021-03-24 09:19:48 +0100131 hashed_password = subscriber["authentication"]["paramsBasic"][
132 "password"
133 ]
K Sai Kiran42c84ea2020-06-25 14:45:50 +0530134 password = self._decrypt_password(hashed_password, salt)
garciadeblas4568a372021-03-24 09:19:48 +0100135 auth_basic = self._get_basic_auth(
136 subscriber["authentication"]["paramsBasic"]["userName"], password
137 )
138 notifications.append(
139 {
140 "headers": self.payload_header,
141 "payload": self.get_payload(subscriber),
142 "auth_basic": auth_basic,
143 "CallbackUri": subscriber["CallbackUri"],
144 }
145 )
K Sai Kiranbb70c812020-04-28 14:48:31 +0530146 # TODO add support for AuthType OAuth and TLS after support is added in subscription.
K Sai Kiran42c84ea2020-06-25 14:45:50 +0530147 else:
garciadeblas4568a372021-03-24 09:19:48 +0100148 self.logger.debug(
149 "Subscriber {} can not be notified. {} notification auth type is not implemented".format(
150 subscriber["subscriptionId"],
151 subscriber["authentication"]["authType"],
152 )
153 )
K Sai Kiran42c84ea2020-06-25 14:45:50 +0530154
tierno2278fa42020-08-10 13:53:57 +0000155 if notifications:
156 tasks = []
157 async with aiohttp.ClientSession(loop=loop) as session:
158 for notification in notifications:
garciadeblas4568a372021-03-24 09:19:48 +0100159 tasks.append(
160 asyncio.ensure_future(
161 self.send_notification(session, notification, loop=loop),
162 loop=loop,
163 )
164 )
tierno2278fa42020-08-10 13:53:57 +0000165 await asyncio.gather(*tasks, loop=loop)
K Sai Kiranbb70c812020-04-28 14:48:31 +0530166
garciadeblas4568a372021-03-24 09:19:48 +0100167 async def send_notification(
168 self,
169 session: aiohttp.ClientSession,
170 notification: dict,
171 loop: asyncio.AbstractEventLoop = None,
172 retry_count: int = 5,
173 timeout: float = 5.0,
174 ):
K Sai Kiranbb70c812020-04-28 14:48:31 +0530175 """
176 Performs HTTP Post request to notify subscriber. In case if for any reason notification is not sent successfully
177 after maximum number of reties, then notification is dropped.
178 :param session: An aiohttp client session object to maintain http session.
179 :param notification: A dictionary containing all necessary data to make POST request.
180 :param loop: Event loop object.
181 :param retry_count: An integer specifying the maximum number of reties for a notification.
182 :param timeout: A float representing client timeout of each HTTP request.
183 """
184 backoff_delay = 1
185 while retry_count > 0:
186 try:
garciadeblas4568a372021-03-24 09:19:48 +0100187 async with session.post(
188 url=notification["CallbackUri"],
189 headers=notification["headers"],
190 auth=notification.get("auth_basic", None),
191 data=json.dumps(notification["payload"]),
192 timeout=timeout,
193 ) as resp:
K Sai Kiranbb70c812020-04-28 14:48:31 +0530194 # self.logger.debug("Notification response: {}".format(resp.status))
195 if resp.status == HTTPStatus.NO_CONTENT:
garciadeblas4568a372021-03-24 09:19:48 +0100196 self.logger.debug(
197 "Notification sent successfully to subscriber {}".format(
198 notification["payload"]["subscriptionId"]
199 )
200 )
K Sai Kiranbb70c812020-04-28 14:48:31 +0530201 else:
202 error_text = "Erroneous response code: {}, ".format(resp.status)
203 error_text += await resp.text()
204 raise NotificationException(error_text)
205 return True
206 except Exception as e:
207 error_text = type(e).__name__ + ": " + str(e)
garciadeblas4568a372021-03-24 09:19:48 +0100208 self.logger.debug(
209 "Unable to send notification to subscriber {}. Details: {}".format(
210 notification["payload"]["subscriptionId"], error_text
211 )
212 )
K Sai Kiranbb70c812020-04-28 14:48:31 +0530213 error_detail = {
214 "error": type(e).__name__,
215 "error_text": str(e),
garciadeblas4568a372021-03-24 09:19:48 +0100216 "timestamp": time.time(),
K Sai Kiranbb70c812020-04-28 14:48:31 +0530217 }
218 if "error_details" in notification["payload"].keys():
219 notification["payload"]["error_details"].append(error_detail)
220 else:
221 notification["payload"]["error_details"] = [error_detail]
222 retry_count -= 1
223 backoff_delay *= 2
garciadeblas4568a372021-03-24 09:19:48 +0100224 self.logger.debug(
225 "Retry Notification for subscriber: {} after backoff delay: {} seconds.".format(
226 notification["payload"]["subscriptionId"], backoff_delay
227 )
228 )
K Sai Kiranbb70c812020-04-28 14:48:31 +0530229 await asyncio.sleep(backoff_delay, loop=loop)
230 # Dropping notification
garciadeblas4568a372021-03-24 09:19:48 +0100231 self.logger.debug(
232 "Notification {} sent failed to subscriber:{}.".format(
233 notification["payload"]["notificationType"],
234 notification["payload"]["subscriptionId"],
235 )
236 )
K Sai Kiranbb70c812020-04-28 14:48:31 +0530237 return False
238
239
240class NsLcmNotification(NotificationBase):
K Sai Kiranbb70c812020-04-28 14:48:31 +0530241 # SOL005 response model for nslcm notifications
242 response_models = {
garciadeblas4568a372021-03-24 09:19:48 +0100243 "NsLcmOperationOccurrenceNotification": {
244 "id",
245 "nsInstanceId",
246 "nsLcmOpOccId",
247 "operation",
248 "notificationType",
249 "subscriptionId",
250 "timestamp",
251 "notificationStatus",
252 "operationState",
253 "isAutomaticInvocation",
254 "affectedVnf",
255 "affectedVl",
256 "affectedVnffg",
257 "affectedNs",
258 "affectedSap",
259 "error",
260 "_links",
261 },
262 "NsIdentifierCreationNotification": {
263 "notificationType",
264 "subscriptionId",
265 "timestamp",
266 "nsInstanceId",
267 "_links",
268 },
269 "NsIdentifierDeletionNotification": {
270 "notificationType",
271 "subscriptionId",
272 "timestamp",
273 "nsInstanceId",
274 "_links",
275 },
276 "NsChangeNotification": {
277 "nsInstanceId",
278 "nsComponentType",
279 "nsComponentId",
280 "lcmOpOccIdImpactngNsComponent",
281 "lcmOpNameImpactingNsComponent",
282 "lcmOpOccStatusImpactingNsComponent",
283 "notificationType",
284 "subscriptionId",
285 "timeStamp",
286 "error",
287 "_links",
288 },
K Sai Kiranbb70c812020-04-28 14:48:31 +0530289 }
290
291 def __init__(self, db) -> None:
292 """
K Sai Kiran42c84ea2020-06-25 14:45:50 +0530293 Constructor of NsLcmNotification class.
K Sai Kiranbb70c812020-04-28 14:48:31 +0530294 :param db: Database handler.
295 """
296 super().__init__(db)
297 self.subscriber_collection = "mapped_subscriptions"
298
299 def get_models(self) -> dict:
300 """
301 Returns the SOL005 model of notification class
302 :param None
303 :return: dict of SOL005 data model
304 """
305 return NsLcmNotification.response_models
306
307 @staticmethod
308 def _format_nslcm_subscribers(subscribers: list, event_details: dict) -> list:
309 """
310 Formats the raw event details from kakfa message and subscriber details.
311 :param subscribers: A list of subscribers whom the event needs to be notified.
312 :param event_details: A dict containing all meta data of event.
313 :return:
314 """
315 notification_id = str(uuid4())
316 event_timestamp = event_details["params"]["startTime"]
317 resource_links = event_details["params"]["links"]
318 event_operation = event_details["command"]
319 for key in ["_admin", "_id", "id", "links"]:
320 event_details["params"].pop(key, None)
321 for subscriber in subscribers:
322 subscriber["id"] = notification_id
323 subscriber["timestamp"] = event_timestamp
324 subscriber["_links"] = resource_links
325 subscriber["subscriptionId"] = subscriber["reference"]
326 subscriber["operation"] = event_operation
327 del subscriber["reference"]
328 del subscriber["_id"]
329 subscriber.update(event_details["params"])
330 return subscribers
331
garciadeblas4568a372021-03-24 09:19:48 +0100332 def get_subscribers(
333 self,
334 nsd_id: str,
335 ns_instance_id: str,
336 command: str,
337 op_state: str,
338 event_details: dict,
339 ) -> list:
K Sai Kiranbb70c812020-04-28 14:48:31 +0530340 """
341 Queries database and returns list of subscribers.
342 :param nsd_id: NSD id of an NS whose lifecycle has changed. (scaled, terminated. etc)
343 :param ns_instance_id: NS instance id an NS whose lifecycle has changed.
344 :param command: the command for event.
345 :param op_state: the operation state of NS.
346 :param event_details: dict containing raw data of event occured.
347 :return: List of interested subscribers for occurred event.
348 """
selvi.jf1004592022-04-29 05:42:35 +0000349 notification_type = [
350 "NsLcmOperationOccurrenceNotification",
351 "NsChangeNotification",
352 "NsIdentifierCreationNotification",
garciadeblasf2af4a12023-01-24 16:56:54 +0100353 "NsIdentifierDeletionNotification",
selvi.jf1004592022-04-29 05:42:35 +0000354 ]
garciadeblas4568a372021-03-24 09:19:48 +0100355 filter_q = {
356 "identifier": [nsd_id, ns_instance_id],
357 "operationStates": ["ANY"],
358 "operationTypes": ["ANY"],
garciadeblasf2af4a12023-01-24 16:56:54 +0100359 "notificationType": notification_type,
360 }
K Sai Kiranbb70c812020-04-28 14:48:31 +0530361 if op_state:
362 filter_q["operationStates"].append(op_state)
363 if command:
364 filter_q["operationTypes"].append(command)
365 # self.logger.debug("Db query is: {}".format(filter_q))
366 subscribers = []
367 try:
368 subscribers = self.db.get_list(self.subscriber_collection, filter_q)
369 subscribers = self._format_nslcm_subscribers(subscribers, event_details)
370 except Exception as e:
371 error_text = type(e).__name__ + ": " + str(e)
372 self.logger.debug("Error getting nslcm subscribers: {}".format(error_text))
373 finally:
374 return subscribers
375
376
selvi.jf1004592022-04-29 05:42:35 +0000377class VnfLcmNotification(NotificationBase):
378 # SOL003 response model for vnflcm notifications
379 response_models = {
380 "VnfLcmOperationOccurrenceNotification": {
381 "id",
382 "notificationType",
383 "subscriptionId",
384 "timeStamp",
385 "notificationStatus",
386 "operationState",
387 "vnfInstanceId",
388 "operation",
389 "isAutomaticInvocation",
390 "vnfLcmOpOccId",
391 "affectedVnfcs",
392 "affectedVirtualLinks",
393 "affectedExtLinkPorts",
394 "affectedVirtualStorages",
395 "changedInfo",
396 "changedExtConnectivity",
397 "modificationsTriggeredByVnfPkgChange",
398 "error",
garciadeblasf2af4a12023-01-24 16:56:54 +0100399 "_links",
selvi.jf1004592022-04-29 05:42:35 +0000400 },
401 "VnfIdentifierCreationNotification": {
402 "id",
403 "notificationType",
404 "subscriptionId",
405 "timeStamp",
406 "vnfInstanceId",
garciadeblasf2af4a12023-01-24 16:56:54 +0100407 "_links",
selvi.jf1004592022-04-29 05:42:35 +0000408 },
409 "VnfIdentifierDeletionNotification": {
410 "id",
411 "notificationType",
412 "subscriptionId",
413 "timeStamp",
414 "vnfInstanceId",
garciadeblasf2af4a12023-01-24 16:56:54 +0100415 "_links",
selvi.jf1004592022-04-29 05:42:35 +0000416 },
417 }
418
419 def __init__(self, db) -> None:
420 """
421 Constructor of VnfLcmNotification class.
422 :param db: Database handler.
423 """
424 super().__init__(db)
425 self.subscriber_collection = "mapped_subscriptions"
426
427 def get_models(self) -> dict:
428 """
429 Returns the SOL003 model of notification class
430 :param None
431 :return: dict of SOL003 data model
432 """
433 return self.response_models
434
garciadeblasf2af4a12023-01-24 16:56:54 +0100435 def _format_vnflcm_subscribers(
436 self, subscribers: list, event_details: dict
437 ) -> list:
selvi.jf1004592022-04-29 05:42:35 +0000438 """
439 Formats the raw event details from kafka message and subscriber details.
440 :param subscribers: A list of subscribers whom the event needs to be notified.
441 :param event_details: A dict containing all meta data of event.
442 :return:
443 """
444 notification_id = str(uuid4())
445 event_timestamp = time.time()
446 event_operation = event_details["command"]
447 for subscriber in subscribers:
448 subscriber["id"] = notification_id
449 subscriber["timeStamp"] = event_timestamp
450 subscriber["subscriptionId"] = subscriber["reference"]
451 subscriber["operation"] = event_operation
452 del subscriber["reference"]
453 del subscriber["_id"]
454 subscriber.update(event_details["params"])
455 return subscribers
456
garciadeblasf2af4a12023-01-24 16:56:54 +0100457 def get_subscribers(
458 self,
459 vnfd_id: str,
460 vnf_instance_id: str,
461 command: str,
462 op_state: str,
463 event_details: dict,
464 ) -> list:
selvi.jf1004592022-04-29 05:42:35 +0000465 """
466 Queries database and returns list of subscribers.
467 :param vnfd_id: Vnfd id of a VNF whose lifecycle has changed. (instantiated, scaled, terminated. etc)
468 :param vnf_instance_id: Vnf instance id of a VNF whose lifecycle has changed.
469 :param command: the command for event.
470 :param op_state: the operation state of VNF.
471 :param event_details: dict containing raw data of event occurred.
472 :return: List of interested subscribers for occurred event.
473 """
474 notification_type = [
475 "VnfIdentifierCreationNotification",
476 "VnfLcmOperationOccurrenceNotification",
garciadeblasf2af4a12023-01-24 16:56:54 +0100477 "VnfIdentifierDeletionNotification",
selvi.jf1004592022-04-29 05:42:35 +0000478 ]
479 filter_q = {
480 "identifier": [vnfd_id, vnf_instance_id],
481 "operationStates": ["ANY"],
482 "operationTypes": ["ANY"],
garciadeblasf2af4a12023-01-24 16:56:54 +0100483 "notificationType": notification_type,
selvi.jf1004592022-04-29 05:42:35 +0000484 }
485 if op_state:
486 filter_q["operationStates"].append(op_state)
487 if command:
488 filter_q["operationTypes"].append(command)
489 subscribers = []
490 try:
491 subscribers = self.db.get_list(self.subscriber_collection, filter_q)
492 subscribers = self._format_vnflcm_subscribers(subscribers, event_details)
493 except Exception as e:
494 error_text = type(e).__name__ + ": " + str(e)
495 self.logger.debug("Error getting vnflcm subscribers: {}".format(error_text))
496 finally:
497 return subscribers
498
499
K Sai Kiranbb70c812020-04-28 14:48:31 +0530500class NsdNotification(NotificationBase):
K Sai Kiranbb70c812020-04-28 14:48:31 +0530501 def __init__(self, db):
502 """
503 Constructor of the class
504 """
505 super().__init__(db)
506 # TODO will update this once support is there from subscription
507 self.response_models = {}
508 self.subscriber_collection = None
509
510
511class VnfdNotification(NotificationBase):
K Sai Kiranbb70c812020-04-28 14:48:31 +0530512 def __init__(self, db):
513 """
514 Constructor of the class
515 """
516 super().__init__(db)
517 # TODO will update this once support is there from subscription
518 self.response_models = {}
519 self.subscriber_collection = None