1 |
|
# Copyright 2020 K Sai Kiran (Tata Elxsi) |
2 |
|
# |
3 |
|
# Licensed under the Apache License, Version 2.0 (the "License"); |
4 |
|
# you may not use this file except in compliance with the License. |
5 |
|
# You may obtain a copy of the License at |
6 |
|
# |
7 |
|
# http://www.apache.org/licenses/LICENSE-2.0 |
8 |
|
# |
9 |
|
# Unless required by applicable law or agreed to in writing, software |
10 |
|
# distributed under the License is distributed on an "AS IS" BASIS, |
11 |
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
12 |
|
# implied. |
13 |
|
# See the License for the specific language governing permissions and |
14 |
|
# limitations under the License. |
15 |
|
|
16 |
0 |
__author__ = "K Sai Kiran <saikiran.k@tataelxsi.co.in>" |
17 |
0 |
__date__ = "$28-Apr-2020 23:59:59$" |
18 |
|
|
19 |
0 |
import asyncio |
20 |
0 |
import aiohttp |
21 |
0 |
from http import HTTPStatus |
22 |
0 |
import json |
23 |
0 |
import logging |
24 |
0 |
import time |
25 |
0 |
from uuid import uuid4 |
26 |
|
|
27 |
|
|
28 |
0 |
class NotificationException(Exception): |
29 |
|
""" |
30 |
|
Notification Exception |
31 |
|
""" |
32 |
|
|
33 |
0 |
def __init__(self, message: str, http_code: int = HTTPStatus.BAD_REQUEST) -> None: |
34 |
|
""" |
35 |
|
Constructor of notification exception |
36 |
|
:param message: String text containing exception details. |
37 |
|
:param http_code: HTTP status code of exception. |
38 |
|
""" |
39 |
0 |
self.http_code = http_code |
40 |
0 |
Exception.__init__(self, message) |
41 |
|
|
42 |
|
|
43 |
0 |
class NotificationBase: |
44 |
|
|
45 |
0 |
response_models = None |
46 |
|
# Common HTTP payload header for all notifications. |
47 |
0 |
payload_header = { |
48 |
|
"Content-Type": "application/json", |
49 |
|
"Accept": "application/json" |
50 |
|
} |
51 |
|
|
52 |
0 |
def __init__(self, db) -> None: |
53 |
|
""" |
54 |
|
Constructor of NotificationBase class. |
55 |
|
:param db: Database handler. |
56 |
|
""" |
57 |
0 |
self.db = db |
58 |
0 |
self.logger = logging.getLogger("nbi.notifications") |
59 |
0 |
self.subscriber_collection = None |
60 |
|
|
61 |
0 |
def get_models(self) -> dict: |
62 |
|
""" |
63 |
|
Returns the SOL005 model of notification class |
64 |
|
:param None |
65 |
|
:return: dict of SOL005 data model |
66 |
|
""" |
67 |
0 |
return NotificationBase.response_models |
68 |
|
|
69 |
0 |
def get_subscribers(self, **kwargs) -> NotificationException: |
70 |
|
""" |
71 |
|
Method should be implemented by all notification subclasses |
72 |
|
:param kwargs: any keyword arguments needed for db query. |
73 |
|
:return: List of subscribers |
74 |
|
""" |
75 |
0 |
raise NotificationException("Method get_subscribers() is not implemented", http_code=HTTPStatus.NOT_IMPLEMENTED) |
76 |
|
|
77 |
0 |
@staticmethod |
78 |
0 |
def _get_basic_auth(username: str, password: str) -> tuple: |
79 |
0 |
return aiohttp.BasicAuth(username, password) |
80 |
|
|
81 |
0 |
def _decrypt_password(self, hashed: str, salt: str, schema_version: str = "1.1") -> str: |
82 |
0 |
return self.db.decrypt(hashed, schema_version, salt=salt) |
83 |
|
|
84 |
0 |
def get_payload(self, meta_notification: dict) -> dict: |
85 |
|
""" |
86 |
|
Generates SOL005 compliant payload structure and returns them in dictionary. |
87 |
|
:param meta_notification: notification meta data which needs to be formatted as SOL005 compliant |
88 |
|
:return: A dictionary which is SOL005 compliant. |
89 |
|
""" |
90 |
0 |
model_name = meta_notification["notificationType"] |
91 |
0 |
response_models = self.get_models() |
92 |
0 |
if not response_models or not response_models.get(model_name): |
93 |
0 |
raise NotificationException("Response model {} is not defined.".format(model_name), |
94 |
|
HTTPStatus.NOT_IMPLEMENTED) |
95 |
0 |
model_keys = response_models[model_name] |
96 |
0 |
payload = dict.fromkeys(model_keys, "N/A") |
97 |
0 |
notification_keys = set(meta_notification.keys()) |
98 |
0 |
for model_key in model_keys.intersection(notification_keys): |
99 |
0 |
payload[model_key] = meta_notification[model_key] |
100 |
0 |
self.logger.debug("Payload generated for subscriber: {} for {}".format(payload["subscriptionId"], |
101 |
|
payload["notificationType"])) |
102 |
0 |
return payload |
103 |
|
|
104 |
0 |
async def send_notifications(self, subscribers: list, loop: asyncio.AbstractEventLoop = None): |
105 |
|
""" |
106 |
|
Generate tasks for all notification for an event. |
107 |
|
:param subscribers: A list of subscribers who want to be notified for event. |
108 |
|
:param loop: Event loop object. |
109 |
|
""" |
110 |
0 |
notifications = [] |
111 |
0 |
for subscriber in subscribers: |
112 |
|
# Notify without auth |
113 |
0 |
if not subscriber.get("authentication"): |
114 |
0 |
notifications.append({ |
115 |
|
"headers": self.payload_header, |
116 |
|
"payload": self.get_payload(subscriber), |
117 |
|
"CallbackUri": subscriber["CallbackUri"] |
118 |
|
}) |
119 |
0 |
elif subscriber["authentication"]["authType"] == "basic": |
120 |
0 |
salt = subscriber["subscriptionId"] |
121 |
0 |
hashed_password = subscriber["authentication"]["paramsBasic"]["password"] |
122 |
0 |
password = self._decrypt_password(hashed_password, salt) |
123 |
0 |
auth_basic = self._get_basic_auth(subscriber["authentication"]["paramsBasic"]["userName"], password) |
124 |
0 |
notifications.append({ |
125 |
|
"headers": self.payload_header, |
126 |
|
"payload": self.get_payload(subscriber), |
127 |
|
"auth_basic": auth_basic, |
128 |
|
"CallbackUri": subscriber["CallbackUri"] |
129 |
|
}) |
130 |
|
# TODO add support for AuthType OAuth and TLS after support is added in subscription. |
131 |
|
else: |
132 |
0 |
self.logger.debug("Subscriber {} can not be notified. {} notification auth type is not implemented" |
133 |
|
.format(subscriber["subscriptionId"], |
134 |
|
subscriber["authentication"]["authType"])) |
135 |
|
|
136 |
0 |
if notifications: |
137 |
0 |
tasks = [] |
138 |
0 |
async with aiohttp.ClientSession(loop=loop) as session: |
139 |
0 |
for notification in notifications: |
140 |
0 |
tasks.append(asyncio.ensure_future(self.send_notification(session, notification, loop=loop), |
141 |
|
loop=loop)) |
142 |
0 |
await asyncio.gather(*tasks, loop=loop) |
143 |
|
|
144 |
0 |
async def send_notification(self, session: aiohttp.ClientSession, notification: dict, |
145 |
|
loop: asyncio.AbstractEventLoop = None, retry_count: int = 5, timeout: float = 5.0): |
146 |
|
""" |
147 |
|
Performs HTTP Post request to notify subscriber. In case if for any reason notification is not sent successfully |
148 |
|
after maximum number of reties, then notification is dropped. |
149 |
|
:param session: An aiohttp client session object to maintain http session. |
150 |
|
:param notification: A dictionary containing all necessary data to make POST request. |
151 |
|
:param loop: Event loop object. |
152 |
|
:param retry_count: An integer specifying the maximum number of reties for a notification. |
153 |
|
:param timeout: A float representing client timeout of each HTTP request. |
154 |
|
""" |
155 |
0 |
backoff_delay = 1 |
156 |
0 |
while retry_count > 0: |
157 |
0 |
try: |
158 |
0 |
async with session.post(url=notification["CallbackUri"], headers=notification["headers"], |
159 |
|
auth=notification.get("auth_basic", None), |
160 |
|
data=json.dumps(notification["payload"]), |
161 |
|
timeout=timeout) as resp: |
162 |
|
# self.logger.debug("Notification response: {}".format(resp.status)) |
163 |
0 |
if resp.status == HTTPStatus.NO_CONTENT: |
164 |
0 |
self.logger.debug("Notification sent successfully to subscriber {}" |
165 |
|
.format(notification["payload"]["subscriptionId"])) |
166 |
|
else: |
167 |
0 |
error_text = "Erroneous response code: {}, ".format(resp.status) |
168 |
0 |
error_text += await resp.text() |
169 |
0 |
raise NotificationException(error_text) |
170 |
0 |
return True |
171 |
0 |
except Exception as e: |
172 |
0 |
error_text = type(e).__name__ + ": " + str(e) |
173 |
0 |
self.logger.debug("Unable to send notification to subscriber {}. Details: {}" |
174 |
|
.format(notification["payload"]["subscriptionId"], error_text)) |
175 |
0 |
error_detail = { |
176 |
|
"error": type(e).__name__, |
177 |
|
"error_text": str(e), |
178 |
|
"timestamp": time.time() |
179 |
|
} |
180 |
0 |
if "error_details" in notification["payload"].keys(): |
181 |
0 |
notification["payload"]["error_details"].append(error_detail) |
182 |
|
else: |
183 |
0 |
notification["payload"]["error_details"] = [error_detail] |
184 |
0 |
retry_count -= 1 |
185 |
0 |
backoff_delay *= 2 |
186 |
0 |
self.logger.debug("Retry Notification for subscriber: {} after backoff delay: {} seconds." |
187 |
|
.format(notification["payload"]["subscriptionId"], backoff_delay)) |
188 |
0 |
await asyncio.sleep(backoff_delay, loop=loop) |
189 |
|
# Dropping notification |
190 |
0 |
self.logger.debug("Notification {} sent failed to subscriber:{}." |
191 |
|
.format(notification["payload"]["notificationType"], |
192 |
|
notification["payload"]["subscriptionId"])) |
193 |
0 |
return False |
194 |
|
|
195 |
|
|
196 |
0 |
class NsLcmNotification(NotificationBase): |
197 |
|
|
198 |
|
# SOL005 response model for nslcm notifications |
199 |
0 |
response_models = { |
200 |
|
"NsLcmOperationOccurrenceNotification": {"id", "nsInstanceId", "nsLcmOpOccId", "operation", |
201 |
|
"notificationType", "subscriptionId", "timestamp", |
202 |
|
"notificationStatus", "operationState", "isAutomaticInvocation", |
203 |
|
"affectedVnf", "affectedVl", "affectedVnffg", "affectedNs", |
204 |
|
"affectedSap", "error", "_links"}, |
205 |
|
|
206 |
|
"NsIdentifierCreationNotification": {"notificationType", "subscriptionId", "timestamp", |
207 |
|
"nsInstanceId", "_links"}, |
208 |
|
|
209 |
|
"NsIdentifierDeletionNotification": {"notificationType", "subscriptionId", "timestamp", |
210 |
|
"nsInstanceId", "_links"}, |
211 |
|
|
212 |
|
"NsChangeNotification": {"nsInstanceId", "nsComponentType", "nsComponentId", |
213 |
|
"lcmOpOccIdImpactngNsComponent", "lcmOpNameImpactingNsComponent", |
214 |
|
"lcmOpOccStatusImpactingNsComponent", "notificationType", "subscriptionId", |
215 |
|
"timeStamp", "error", "_links"} |
216 |
|
} |
217 |
|
|
218 |
0 |
def __init__(self, db) -> None: |
219 |
|
""" |
220 |
|
Constructor of NsLcmNotification class. |
221 |
|
:param db: Database handler. |
222 |
|
""" |
223 |
0 |
super().__init__(db) |
224 |
0 |
self.subscriber_collection = "mapped_subscriptions" |
225 |
|
|
226 |
0 |
def get_models(self) -> dict: |
227 |
|
""" |
228 |
|
Returns the SOL005 model of notification class |
229 |
|
:param None |
230 |
|
:return: dict of SOL005 data model |
231 |
|
""" |
232 |
0 |
return NsLcmNotification.response_models |
233 |
|
|
234 |
0 |
@staticmethod |
235 |
0 |
def _format_nslcm_subscribers(subscribers: list, event_details: dict) -> list: |
236 |
|
""" |
237 |
|
Formats the raw event details from kakfa message and subscriber details. |
238 |
|
:param subscribers: A list of subscribers whom the event needs to be notified. |
239 |
|
:param event_details: A dict containing all meta data of event. |
240 |
|
:return: |
241 |
|
""" |
242 |
0 |
notification_id = str(uuid4()) |
243 |
0 |
event_timestamp = event_details["params"]["startTime"] |
244 |
0 |
resource_links = event_details["params"]["links"] |
245 |
0 |
event_operation = event_details["command"] |
246 |
0 |
for key in ["_admin", "_id", "id", "links"]: |
247 |
0 |
event_details["params"].pop(key, None) |
248 |
0 |
for subscriber in subscribers: |
249 |
0 |
subscriber["id"] = notification_id |
250 |
0 |
subscriber["timestamp"] = event_timestamp |
251 |
0 |
subscriber["_links"] = resource_links |
252 |
0 |
subscriber["subscriptionId"] = subscriber["reference"] |
253 |
0 |
subscriber["operation"] = event_operation |
254 |
0 |
del subscriber["reference"] |
255 |
0 |
del subscriber["_id"] |
256 |
0 |
subscriber.update(event_details["params"]) |
257 |
0 |
return subscribers |
258 |
|
|
259 |
0 |
def get_subscribers(self, nsd_id: str, ns_instance_id: str, command: str, op_state: str, |
260 |
|
event_details: dict) -> list: |
261 |
|
""" |
262 |
|
Queries database and returns list of subscribers. |
263 |
|
:param nsd_id: NSD id of an NS whose lifecycle has changed. (scaled, terminated. etc) |
264 |
|
:param ns_instance_id: NS instance id an NS whose lifecycle has changed. |
265 |
|
:param command: the command for event. |
266 |
|
:param op_state: the operation state of NS. |
267 |
|
:param event_details: dict containing raw data of event occured. |
268 |
|
:return: List of interested subscribers for occurred event. |
269 |
|
""" |
270 |
0 |
filter_q = {"identifier": [nsd_id, ns_instance_id], "operationStates": ["ANY"], "operationTypes": ["ANY"]} |
271 |
0 |
if op_state: |
272 |
0 |
filter_q["operationStates"].append(op_state) |
273 |
0 |
if command: |
274 |
0 |
filter_q["operationTypes"].append(command) |
275 |
|
# self.logger.debug("Db query is: {}".format(filter_q)) |
276 |
0 |
subscribers = [] |
277 |
0 |
try: |
278 |
0 |
subscribers = self.db.get_list(self.subscriber_collection, filter_q) |
279 |
0 |
subscribers = self._format_nslcm_subscribers(subscribers, event_details) |
280 |
0 |
except Exception as e: |
281 |
0 |
error_text = type(e).__name__ + ": " + str(e) |
282 |
0 |
self.logger.debug("Error getting nslcm subscribers: {}".format(error_text)) |
283 |
|
finally: |
284 |
0 |
return subscribers |
285 |
|
|
286 |
|
|
287 |
0 |
class NsdNotification(NotificationBase): |
288 |
|
|
289 |
0 |
def __init__(self, db): |
290 |
|
""" |
291 |
|
Constructor of the class |
292 |
|
""" |
293 |
0 |
super().__init__(db) |
294 |
|
# TODO will update this once support is there from subscription |
295 |
0 |
self.response_models = {} |
296 |
0 |
self.subscriber_collection = None |
297 |
|
|
298 |
|
|
299 |
0 |
class VnfdNotification(NotificationBase): |
300 |
|
|
301 |
0 |
def __init__(self, db): |
302 |
|
""" |
303 |
|
Constructor of the class |
304 |
|
""" |
305 |
0 |
super().__init__(db) |
306 |
|
# TODO will update this once support is there from subscription |
307 |
0 |
self.response_models = {} |
308 |
0 |
self.subscriber_collection = None |