Feature-9904: Enhancing NG-UI to enable Juju operational view dashboard
[osm/NBI.git] / osm_nbi / notifications.py
1 # Copyright 2020 K Sai Kiran (Tata Elxsi)
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 __author__ = "K Sai Kiran <saikiran.k@tataelxsi.co.in>"
17 __date__ = "$28-Apr-2020 23:59:59$"
18
19 import asyncio
20 import aiohttp
21 from http import HTTPStatus
22 import json
23 import logging
24 import time
25 from uuid import uuid4
26
27
28 class NotificationException(Exception):
29 """
30 Notification Exception
31 """
32
33 def __init__(self, message: str, http_code: int = HTTPStatus.BAD_REQUEST) -> None:
34 """
35 Constructor of notification exception
36 :param message: String text containing exception details.
37 :param http_code: HTTP status code of exception.
38 """
39 self.http_code = http_code
40 Exception.__init__(self, message)
41
42
43 class NotificationBase:
44
45 response_models = None
46 # Common HTTP payload header for all notifications.
47 payload_header = {"Content-Type": "application/json", "Accept": "application/json"}
48
49 def __init__(self, db) -> None:
50 """
51 Constructor of NotificationBase class.
52 :param db: Database handler.
53 """
54 self.db = db
55 self.logger = logging.getLogger("nbi.notifications")
56 self.subscriber_collection = None
57
58 def get_models(self) -> dict:
59 """
60 Returns the SOL005 model of notification class
61 :param None
62 :return: dict of SOL005 data model
63 """
64 return NotificationBase.response_models
65
66 def get_subscribers(self, **kwargs) -> NotificationException:
67 """
68 Method should be implemented by all notification subclasses
69 :param kwargs: any keyword arguments needed for db query.
70 :return: List of subscribers
71 """
72 raise NotificationException(
73 "Method get_subscribers() is not implemented",
74 http_code=HTTPStatus.NOT_IMPLEMENTED,
75 )
76
77 @staticmethod
78 def _get_basic_auth(username: str, password: str) -> tuple:
79 return aiohttp.BasicAuth(username, password)
80
81 def _decrypt_password(
82 self, hashed: str, salt: str, schema_version: str = "1.1"
83 ) -> str:
84 return self.db.decrypt(hashed, schema_version, salt=salt)
85
86 def get_payload(self, meta_notification: dict) -> dict:
87 """
88 Generates SOL005 compliant payload structure and returns them in dictionary.
89 :param meta_notification: notification meta data which needs to be formatted as SOL005 compliant
90 :return: A dictionary which is SOL005 compliant.
91 """
92 model_name = meta_notification["notificationType"]
93 response_models = self.get_models()
94 if not response_models or not response_models.get(model_name):
95 raise NotificationException(
96 "Response model {} is not defined.".format(model_name),
97 HTTPStatus.NOT_IMPLEMENTED,
98 )
99 model_keys = response_models[model_name]
100 payload = dict.fromkeys(model_keys, "N/A")
101 notification_keys = set(meta_notification.keys())
102 for model_key in model_keys.intersection(notification_keys):
103 payload[model_key] = meta_notification[model_key]
104 self.logger.debug(
105 "Payload generated for subscriber: {} for {}".format(
106 payload["subscriptionId"], payload["notificationType"]
107 )
108 )
109 return payload
110
111 async def send_notifications(
112 self, subscribers: list, loop: asyncio.AbstractEventLoop = None
113 ):
114 """
115 Generate tasks for all notification for an event.
116 :param subscribers: A list of subscribers who want to be notified for event.
117 :param loop: Event loop object.
118 """
119 notifications = []
120 for subscriber in subscribers:
121 # Notify without auth
122 if not subscriber.get("authentication"):
123 notifications.append(
124 {
125 "headers": self.payload_header,
126 "payload": self.get_payload(subscriber),
127 "CallbackUri": subscriber["CallbackUri"],
128 }
129 )
130 elif subscriber["authentication"]["authType"] == "basic":
131 salt = subscriber["subscriptionId"]
132 hashed_password = subscriber["authentication"]["paramsBasic"][
133 "password"
134 ]
135 password = self._decrypt_password(hashed_password, salt)
136 auth_basic = self._get_basic_auth(
137 subscriber["authentication"]["paramsBasic"]["userName"], password
138 )
139 notifications.append(
140 {
141 "headers": self.payload_header,
142 "payload": self.get_payload(subscriber),
143 "auth_basic": auth_basic,
144 "CallbackUri": subscriber["CallbackUri"],
145 }
146 )
147 # TODO add support for AuthType OAuth and TLS after support is added in subscription.
148 else:
149 self.logger.debug(
150 "Subscriber {} can not be notified. {} notification auth type is not implemented".format(
151 subscriber["subscriptionId"],
152 subscriber["authentication"]["authType"],
153 )
154 )
155
156 if notifications:
157 tasks = []
158 async with aiohttp.ClientSession(loop=loop) as session:
159 for notification in notifications:
160 tasks.append(
161 asyncio.ensure_future(
162 self.send_notification(session, notification, loop=loop),
163 loop=loop,
164 )
165 )
166 await asyncio.gather(*tasks, loop=loop)
167
168 async def send_notification(
169 self,
170 session: aiohttp.ClientSession,
171 notification: dict,
172 loop: asyncio.AbstractEventLoop = None,
173 retry_count: int = 5,
174 timeout: float = 5.0,
175 ):
176 """
177 Performs HTTP Post request to notify subscriber. In case if for any reason notification is not sent successfully
178 after maximum number of reties, then notification is dropped.
179 :param session: An aiohttp client session object to maintain http session.
180 :param notification: A dictionary containing all necessary data to make POST request.
181 :param loop: Event loop object.
182 :param retry_count: An integer specifying the maximum number of reties for a notification.
183 :param timeout: A float representing client timeout of each HTTP request.
184 """
185 backoff_delay = 1
186 while retry_count > 0:
187 try:
188 async with session.post(
189 url=notification["CallbackUri"],
190 headers=notification["headers"],
191 auth=notification.get("auth_basic", None),
192 data=json.dumps(notification["payload"]),
193 timeout=timeout,
194 ) as resp:
195 # self.logger.debug("Notification response: {}".format(resp.status))
196 if resp.status == HTTPStatus.NO_CONTENT:
197 self.logger.debug(
198 "Notification sent successfully to subscriber {}".format(
199 notification["payload"]["subscriptionId"]
200 )
201 )
202 else:
203 error_text = "Erroneous response code: {}, ".format(resp.status)
204 error_text += await resp.text()
205 raise NotificationException(error_text)
206 return True
207 except Exception as e:
208 error_text = type(e).__name__ + ": " + str(e)
209 self.logger.debug(
210 "Unable to send notification to subscriber {}. Details: {}".format(
211 notification["payload"]["subscriptionId"], error_text
212 )
213 )
214 error_detail = {
215 "error": type(e).__name__,
216 "error_text": str(e),
217 "timestamp": time.time(),
218 }
219 if "error_details" in notification["payload"].keys():
220 notification["payload"]["error_details"].append(error_detail)
221 else:
222 notification["payload"]["error_details"] = [error_detail]
223 retry_count -= 1
224 backoff_delay *= 2
225 self.logger.debug(
226 "Retry Notification for subscriber: {} after backoff delay: {} seconds.".format(
227 notification["payload"]["subscriptionId"], backoff_delay
228 )
229 )
230 await asyncio.sleep(backoff_delay, loop=loop)
231 # Dropping notification
232 self.logger.debug(
233 "Notification {} sent failed to subscriber:{}.".format(
234 notification["payload"]["notificationType"],
235 notification["payload"]["subscriptionId"],
236 )
237 )
238 return False
239
240
241 class NsLcmNotification(NotificationBase):
242
243 # SOL005 response model for nslcm notifications
244 response_models = {
245 "NsLcmOperationOccurrenceNotification": {
246 "id",
247 "nsInstanceId",
248 "nsLcmOpOccId",
249 "operation",
250 "notificationType",
251 "subscriptionId",
252 "timestamp",
253 "notificationStatus",
254 "operationState",
255 "isAutomaticInvocation",
256 "affectedVnf",
257 "affectedVl",
258 "affectedVnffg",
259 "affectedNs",
260 "affectedSap",
261 "error",
262 "_links",
263 },
264 "NsIdentifierCreationNotification": {
265 "notificationType",
266 "subscriptionId",
267 "timestamp",
268 "nsInstanceId",
269 "_links",
270 },
271 "NsIdentifierDeletionNotification": {
272 "notificationType",
273 "subscriptionId",
274 "timestamp",
275 "nsInstanceId",
276 "_links",
277 },
278 "NsChangeNotification": {
279 "nsInstanceId",
280 "nsComponentType",
281 "nsComponentId",
282 "lcmOpOccIdImpactngNsComponent",
283 "lcmOpNameImpactingNsComponent",
284 "lcmOpOccStatusImpactingNsComponent",
285 "notificationType",
286 "subscriptionId",
287 "timeStamp",
288 "error",
289 "_links",
290 },
291 }
292
293 def __init__(self, db) -> None:
294 """
295 Constructor of NsLcmNotification class.
296 :param db: Database handler.
297 """
298 super().__init__(db)
299 self.subscriber_collection = "mapped_subscriptions"
300
301 def get_models(self) -> dict:
302 """
303 Returns the SOL005 model of notification class
304 :param None
305 :return: dict of SOL005 data model
306 """
307 return NsLcmNotification.response_models
308
309 @staticmethod
310 def _format_nslcm_subscribers(subscribers: list, event_details: dict) -> list:
311 """
312 Formats the raw event details from kakfa message and subscriber details.
313 :param subscribers: A list of subscribers whom the event needs to be notified.
314 :param event_details: A dict containing all meta data of event.
315 :return:
316 """
317 notification_id = str(uuid4())
318 event_timestamp = event_details["params"]["startTime"]
319 resource_links = event_details["params"]["links"]
320 event_operation = event_details["command"]
321 for key in ["_admin", "_id", "id", "links"]:
322 event_details["params"].pop(key, None)
323 for subscriber in subscribers:
324 subscriber["id"] = notification_id
325 subscriber["timestamp"] = event_timestamp
326 subscriber["_links"] = resource_links
327 subscriber["subscriptionId"] = subscriber["reference"]
328 subscriber["operation"] = event_operation
329 del subscriber["reference"]
330 del subscriber["_id"]
331 subscriber.update(event_details["params"])
332 return subscribers
333
334 def get_subscribers(
335 self,
336 nsd_id: str,
337 ns_instance_id: str,
338 command: str,
339 op_state: str,
340 event_details: dict,
341 ) -> list:
342 """
343 Queries database and returns list of subscribers.
344 :param nsd_id: NSD id of an NS whose lifecycle has changed. (scaled, terminated. etc)
345 :param ns_instance_id: NS instance id an NS whose lifecycle has changed.
346 :param command: the command for event.
347 :param op_state: the operation state of NS.
348 :param event_details: dict containing raw data of event occured.
349 :return: List of interested subscribers for occurred event.
350 """
351 filter_q = {
352 "identifier": [nsd_id, ns_instance_id],
353 "operationStates": ["ANY"],
354 "operationTypes": ["ANY"],
355 }
356 if op_state:
357 filter_q["operationStates"].append(op_state)
358 if command:
359 filter_q["operationTypes"].append(command)
360 # self.logger.debug("Db query is: {}".format(filter_q))
361 subscribers = []
362 try:
363 subscribers = self.db.get_list(self.subscriber_collection, filter_q)
364 subscribers = self._format_nslcm_subscribers(subscribers, event_details)
365 except Exception as e:
366 error_text = type(e).__name__ + ": " + str(e)
367 self.logger.debug("Error getting nslcm subscribers: {}".format(error_text))
368 finally:
369 return subscribers
370
371
372 class NsdNotification(NotificationBase):
373 def __init__(self, db):
374 """
375 Constructor of the class
376 """
377 super().__init__(db)
378 # TODO will update this once support is there from subscription
379 self.response_models = {}
380 self.subscriber_collection = None
381
382
383 class VnfdNotification(NotificationBase):
384 def __init__(self, db):
385 """
386 Constructor of the class
387 """
388 super().__init__(db)
389 # TODO will update this once support is there from subscription
390 self.response_models = {}
391 self.subscriber_collection = None