1 # Copyright 2020 K Sai Kiran (Tata Elxsi)
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 __author__
= "K Sai Kiran <saikiran.k@tataelxsi.co.in>"
17 __date__
= "$28-Apr-2020 23:59:59$"
21 from http
import HTTPStatus
25 from uuid
import uuid4
28 class NotificationException(Exception):
30 Notification Exception
33 def __init__(self
, message
: str, http_code
: int = HTTPStatus
.BAD_REQUEST
) -> None:
35 Constructor of notification exception
36 :param message: String text containing exception details.
37 :param http_code: HTTP status code of exception.
39 self
.http_code
= http_code
40 Exception.__init
__(self
, message
)
43 class NotificationBase
:
45 response_models
= None
46 # Common HTTP payload header for all notifications.
47 payload_header
= {"Content-Type": "application/json", "Accept": "application/json"}
49 def __init__(self
, db
) -> None:
51 Constructor of NotificationBase class.
52 :param db: Database handler.
55 self
.logger
= logging
.getLogger("nbi.notifications")
56 self
.subscriber_collection
= None
58 def get_models(self
) -> dict:
60 Returns the SOL005 model of notification class
62 :return: dict of SOL005 data model
64 return NotificationBase
.response_models
66 def get_subscribers(self
, **kwargs
) -> NotificationException
:
68 Method should be implemented by all notification subclasses
69 :param kwargs: any keyword arguments needed for db query.
70 :return: List of subscribers
72 raise NotificationException(
73 "Method get_subscribers() is not implemented",
74 http_code
=HTTPStatus
.NOT_IMPLEMENTED
,
78 def _get_basic_auth(username
: str, password
: str) -> tuple:
79 return aiohttp
.BasicAuth(username
, password
)
81 def _decrypt_password(
82 self
, hashed
: str, salt
: str, schema_version
: str = "1.1"
84 return self
.db
.decrypt(hashed
, schema_version
, salt
=salt
)
86 def get_payload(self
, meta_notification
: dict) -> dict:
88 Generates SOL005 compliant payload structure and returns them in dictionary.
89 :param meta_notification: notification meta data which needs to be formatted as SOL005 compliant
90 :return: A dictionary which is SOL005 compliant.
92 model_name
= meta_notification
["notificationType"]
93 response_models
= self
.get_models()
94 if not response_models
or not response_models
.get(model_name
):
95 raise NotificationException(
96 "Response model {} is not defined.".format(model_name
),
97 HTTPStatus
.NOT_IMPLEMENTED
,
99 model_keys
= response_models
[model_name
]
100 payload
= dict.fromkeys(model_keys
, "N/A")
101 notification_keys
= set(meta_notification
.keys())
102 for model_key
in model_keys
.intersection(notification_keys
):
103 payload
[model_key
] = meta_notification
[model_key
]
105 "Payload generated for subscriber: {} for {}".format(
106 payload
["subscriptionId"], payload
["notificationType"]
111 async def send_notifications(
112 self
, subscribers
: list, loop
: asyncio
.AbstractEventLoop
= None
115 Generate tasks for all notification for an event.
116 :param subscribers: A list of subscribers who want to be notified for event.
117 :param loop: Event loop object.
120 for subscriber
in subscribers
:
121 # Notify without auth
122 if not subscriber
.get("authentication"):
123 notifications
.append(
125 "headers": self
.payload_header
,
126 "payload": self
.get_payload(subscriber
),
127 "CallbackUri": subscriber
["CallbackUri"],
130 elif subscriber
["authentication"]["authType"] == "basic":
131 salt
= subscriber
["subscriptionId"]
132 hashed_password
= subscriber
["authentication"]["paramsBasic"][
135 password
= self
._decrypt
_password
(hashed_password
, salt
)
136 auth_basic
= self
._get
_basic
_auth
(
137 subscriber
["authentication"]["paramsBasic"]["userName"], password
139 notifications
.append(
141 "headers": self
.payload_header
,
142 "payload": self
.get_payload(subscriber
),
143 "auth_basic": auth_basic
,
144 "CallbackUri": subscriber
["CallbackUri"],
147 # TODO add support for AuthType OAuth and TLS after support is added in subscription.
150 "Subscriber {} can not be notified. {} notification auth type is not implemented".format(
151 subscriber
["subscriptionId"],
152 subscriber
["authentication"]["authType"],
158 async with aiohttp
.ClientSession(loop
=loop
) as session
:
159 for notification
in notifications
:
161 asyncio
.ensure_future(
162 self
.send_notification(session
, notification
, loop
=loop
),
166 await asyncio
.gather(*tasks
, loop
=loop
)
168 async def send_notification(
170 session
: aiohttp
.ClientSession
,
172 loop
: asyncio
.AbstractEventLoop
= None,
173 retry_count
: int = 5,
174 timeout
: float = 5.0,
177 Performs HTTP Post request to notify subscriber. In case if for any reason notification is not sent successfully
178 after maximum number of reties, then notification is dropped.
179 :param session: An aiohttp client session object to maintain http session.
180 :param notification: A dictionary containing all necessary data to make POST request.
181 :param loop: Event loop object.
182 :param retry_count: An integer specifying the maximum number of reties for a notification.
183 :param timeout: A float representing client timeout of each HTTP request.
186 while retry_count
> 0:
188 async with session
.post(
189 url
=notification
["CallbackUri"],
190 headers
=notification
["headers"],
191 auth
=notification
.get("auth_basic", None),
192 data
=json
.dumps(notification
["payload"]),
195 # self.logger.debug("Notification response: {}".format(resp.status))
196 if resp
.status
== HTTPStatus
.NO_CONTENT
:
198 "Notification sent successfully to subscriber {}".format(
199 notification
["payload"]["subscriptionId"]
203 error_text
= "Erroneous response code: {}, ".format(resp
.status
)
204 error_text
+= await resp
.text()
205 raise NotificationException(error_text
)
207 except Exception as e
:
208 error_text
= type(e
).__name
__ + ": " + str(e
)
210 "Unable to send notification to subscriber {}. Details: {}".format(
211 notification
["payload"]["subscriptionId"], error_text
215 "error": type(e
).__name
__,
216 "error_text": str(e
),
217 "timestamp": time
.time(),
219 if "error_details" in notification
["payload"].keys():
220 notification
["payload"]["error_details"].append(error_detail
)
222 notification
["payload"]["error_details"] = [error_detail
]
226 "Retry Notification for subscriber: {} after backoff delay: {} seconds.".format(
227 notification
["payload"]["subscriptionId"], backoff_delay
230 await asyncio
.sleep(backoff_delay
, loop
=loop
)
231 # Dropping notification
233 "Notification {} sent failed to subscriber:{}.".format(
234 notification
["payload"]["notificationType"],
235 notification
["payload"]["subscriptionId"],
241 class NsLcmNotification(NotificationBase
):
243 # SOL005 response model for nslcm notifications
245 "NsLcmOperationOccurrenceNotification": {
253 "notificationStatus",
255 "isAutomaticInvocation",
264 "NsIdentifierCreationNotification": {
271 "NsIdentifierDeletionNotification": {
278 "NsChangeNotification": {
282 "lcmOpOccIdImpactngNsComponent",
283 "lcmOpNameImpactingNsComponent",
284 "lcmOpOccStatusImpactingNsComponent",
293 def __init__(self
, db
) -> None:
295 Constructor of NsLcmNotification class.
296 :param db: Database handler.
299 self
.subscriber_collection
= "mapped_subscriptions"
301 def get_models(self
) -> dict:
303 Returns the SOL005 model of notification class
305 :return: dict of SOL005 data model
307 return NsLcmNotification
.response_models
310 def _format_nslcm_subscribers(subscribers
: list, event_details
: dict) -> list:
312 Formats the raw event details from kakfa message and subscriber details.
313 :param subscribers: A list of subscribers whom the event needs to be notified.
314 :param event_details: A dict containing all meta data of event.
317 notification_id
= str(uuid4())
318 event_timestamp
= event_details
["params"]["startTime"]
319 resource_links
= event_details
["params"]["links"]
320 event_operation
= event_details
["command"]
321 for key
in ["_admin", "_id", "id", "links"]:
322 event_details
["params"].pop(key
, None)
323 for subscriber
in subscribers
:
324 subscriber
["id"] = notification_id
325 subscriber
["timestamp"] = event_timestamp
326 subscriber
["_links"] = resource_links
327 subscriber
["subscriptionId"] = subscriber
["reference"]
328 subscriber
["operation"] = event_operation
329 del subscriber
["reference"]
330 del subscriber
["_id"]
331 subscriber
.update(event_details
["params"])
343 Queries database and returns list of subscribers.
344 :param nsd_id: NSD id of an NS whose lifecycle has changed. (scaled, terminated. etc)
345 :param ns_instance_id: NS instance id an NS whose lifecycle has changed.
346 :param command: the command for event.
347 :param op_state: the operation state of NS.
348 :param event_details: dict containing raw data of event occured.
349 :return: List of interested subscribers for occurred event.
351 notification_type
= [
352 "NsLcmOperationOccurrenceNotification",
353 "NsChangeNotification",
354 "NsIdentifierCreationNotification",
355 "NsIdentifierDeletionNotification"
358 "identifier": [nsd_id
, ns_instance_id
],
359 "operationStates": ["ANY"],
360 "operationTypes": ["ANY"],
361 "notificationType": notification_type
364 filter_q
["operationStates"].append(op_state
)
366 filter_q
["operationTypes"].append(command
)
367 # self.logger.debug("Db query is: {}".format(filter_q))
370 subscribers
= self
.db
.get_list(self
.subscriber_collection
, filter_q
)
371 subscribers
= self
._format
_nslcm
_subscribers
(subscribers
, event_details
)
372 except Exception as e
:
373 error_text
= type(e
).__name
__ + ": " + str(e
)
374 self
.logger
.debug("Error getting nslcm subscribers: {}".format(error_text
))
379 class VnfLcmNotification(NotificationBase
):
380 # SOL003 response model for vnflcm notifications
382 "VnfLcmOperationOccurrenceNotification": {
387 "notificationStatus",
391 "isAutomaticInvocation",
394 "affectedVirtualLinks",
395 "affectedExtLinkPorts",
396 "affectedVirtualStorages",
398 "changedExtConnectivity",
399 "modificationsTriggeredByVnfPkgChange",
403 "VnfIdentifierCreationNotification": {
411 "VnfIdentifierDeletionNotification": {
421 def __init__(self
, db
) -> None:
423 Constructor of VnfLcmNotification class.
424 :param db: Database handler.
427 self
.subscriber_collection
= "mapped_subscriptions"
429 def get_models(self
) -> dict:
431 Returns the SOL003 model of notification class
433 :return: dict of SOL003 data model
435 return self
.response_models
437 def _format_vnflcm_subscribers(self
, subscribers
: list, event_details
: dict) -> list:
439 Formats the raw event details from kafka message and subscriber details.
440 :param subscribers: A list of subscribers whom the event needs to be notified.
441 :param event_details: A dict containing all meta data of event.
444 notification_id
= str(uuid4())
445 event_timestamp
= time
.time()
446 event_operation
= event_details
["command"]
447 for subscriber
in subscribers
:
448 subscriber
["id"] = notification_id
449 subscriber
["timeStamp"] = event_timestamp
450 subscriber
["subscriptionId"] = subscriber
["reference"]
451 subscriber
["operation"] = event_operation
452 del subscriber
["reference"]
453 del subscriber
["_id"]
454 subscriber
.update(event_details
["params"])
457 def get_subscribers(self
, vnfd_id
: str, vnf_instance_id
: str, command
: str, op_state
: str,
458 event_details
: dict) -> list:
460 Queries database and returns list of subscribers.
461 :param vnfd_id: Vnfd id of a VNF whose lifecycle has changed. (instantiated, scaled, terminated. etc)
462 :param vnf_instance_id: Vnf instance id of a VNF whose lifecycle has changed.
463 :param command: the command for event.
464 :param op_state: the operation state of VNF.
465 :param event_details: dict containing raw data of event occurred.
466 :return: List of interested subscribers for occurred event.
468 notification_type
= [
469 "VnfIdentifierCreationNotification",
470 "VnfLcmOperationOccurrenceNotification",
471 "VnfIdentifierDeletionNotification"
474 "identifier": [vnfd_id
, vnf_instance_id
],
475 "operationStates": ["ANY"],
476 "operationTypes": ["ANY"],
477 "notificationType": notification_type
480 filter_q
["operationStates"].append(op_state
)
482 filter_q
["operationTypes"].append(command
)
485 subscribers
= self
.db
.get_list(self
.subscriber_collection
, filter_q
)
486 subscribers
= self
._format
_vnflcm
_subscribers
(subscribers
, event_details
)
487 except Exception as e
:
488 error_text
= type(e
).__name
__ + ": " + str(e
)
489 self
.logger
.debug("Error getting vnflcm subscribers: {}".format(error_text
))
494 class NsdNotification(NotificationBase
):
495 def __init__(self
, db
):
497 Constructor of the class
500 # TODO will update this once support is there from subscription
501 self
.response_models
= {}
502 self
.subscriber_collection
= None
505 class VnfdNotification(NotificationBase
):
506 def __init__(self
, db
):
508 Constructor of the class
511 # TODO will update this once support is there from subscription
512 self
.response_models
= {}
513 self
.subscriber_collection
= None