Bug 1830 fixed: maps completed operations to original operation types
[osm/NBI.git] / osm_nbi / notifications.py
1 # Copyright 2020 K Sai Kiran (Tata Elxsi)
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 __author__ = "K Sai Kiran <saikiran.k@tataelxsi.co.in>"
17 __date__ = "$28-Apr-2020 23:59:59$"
18
19 import asyncio
20 import aiohttp
21 from http import HTTPStatus
22 import json
23 import logging
24 import time
25 from uuid import uuid4
26
27
28 class NotificationException(Exception):
29 """
30 Notification Exception
31 """
32
33 def __init__(self, message: str, http_code: int = HTTPStatus.BAD_REQUEST) -> None:
34 """
35 Constructor of notification exception
36 :param message: String text containing exception details.
37 :param http_code: HTTP status code of exception.
38 """
39 self.http_code = http_code
40 Exception.__init__(self, message)
41
42
43 class NotificationBase:
44
45 response_models = None
46 # Common HTTP payload header for all notifications.
47 payload_header = {"Content-Type": "application/json", "Accept": "application/json"}
48
49 def __init__(self, db) -> None:
50 """
51 Constructor of NotificationBase class.
52 :param db: Database handler.
53 """
54 self.db = db
55 self.logger = logging.getLogger("nbi.notifications")
56 self.subscriber_collection = None
57
58 def get_models(self) -> dict:
59 """
60 Returns the SOL005 model of notification class
61 :param None
62 :return: dict of SOL005 data model
63 """
64 return NotificationBase.response_models
65
66 def get_subscribers(self, **kwargs) -> NotificationException:
67 """
68 Method should be implemented by all notification subclasses
69 :param kwargs: any keyword arguments needed for db query.
70 :return: List of subscribers
71 """
72 raise NotificationException(
73 "Method get_subscribers() is not implemented",
74 http_code=HTTPStatus.NOT_IMPLEMENTED,
75 )
76
77 @staticmethod
78 def _get_basic_auth(username: str, password: str) -> tuple:
79 return aiohttp.BasicAuth(username, password)
80
81 def _decrypt_password(
82 self, hashed: str, salt: str, schema_version: str = "1.1"
83 ) -> str:
84 return self.db.decrypt(hashed, schema_version, salt=salt)
85
86 def get_payload(self, meta_notification: dict) -> dict:
87 """
88 Generates SOL005 compliant payload structure and returns them in dictionary.
89 :param meta_notification: notification meta data which needs to be formatted as SOL005 compliant
90 :return: A dictionary which is SOL005 compliant.
91 """
92 model_name = meta_notification["notificationType"]
93 response_models = self.get_models()
94 if not response_models or not response_models.get(model_name):
95 raise NotificationException(
96 "Response model {} is not defined.".format(model_name),
97 HTTPStatus.NOT_IMPLEMENTED,
98 )
99 model_keys = response_models[model_name]
100 payload = dict.fromkeys(model_keys, "N/A")
101 notification_keys = set(meta_notification.keys())
102 for model_key in model_keys.intersection(notification_keys):
103 payload[model_key] = meta_notification[model_key]
104 self.logger.debug(
105 "Payload generated for subscriber: {} for {}".format(
106 payload["subscriptionId"], payload["notificationType"]
107 )
108 )
109 return payload
110
111 async def send_notifications(
112 self, subscribers: list, loop: asyncio.AbstractEventLoop = None
113 ):
114 """
115 Generate tasks for all notification for an event.
116 :param subscribers: A list of subscribers who want to be notified for event.
117 :param loop: Event loop object.
118 """
119 notifications = []
120 for subscriber in subscribers:
121 # Notify without auth
122 if not subscriber.get("authentication"):
123 notifications.append(
124 {
125 "headers": self.payload_header,
126 "payload": self.get_payload(subscriber),
127 "CallbackUri": subscriber["CallbackUri"],
128 }
129 )
130 elif subscriber["authentication"]["authType"] == "basic":
131 salt = subscriber["subscriptionId"]
132 hashed_password = subscriber["authentication"]["paramsBasic"][
133 "password"
134 ]
135 password = self._decrypt_password(hashed_password, salt)
136 auth_basic = self._get_basic_auth(
137 subscriber["authentication"]["paramsBasic"]["userName"], password
138 )
139 notifications.append(
140 {
141 "headers": self.payload_header,
142 "payload": self.get_payload(subscriber),
143 "auth_basic": auth_basic,
144 "CallbackUri": subscriber["CallbackUri"],
145 }
146 )
147 # TODO add support for AuthType OAuth and TLS after support is added in subscription.
148 else:
149 self.logger.debug(
150 "Subscriber {} can not be notified. {} notification auth type is not implemented".format(
151 subscriber["subscriptionId"],
152 subscriber["authentication"]["authType"],
153 )
154 )
155
156 if notifications:
157 tasks = []
158 async with aiohttp.ClientSession(loop=loop) as session:
159 for notification in notifications:
160 tasks.append(
161 asyncio.ensure_future(
162 self.send_notification(session, notification, loop=loop),
163 loop=loop,
164 )
165 )
166 await asyncio.gather(*tasks, loop=loop)
167
168 async def send_notification(
169 self,
170 session: aiohttp.ClientSession,
171 notification: dict,
172 loop: asyncio.AbstractEventLoop = None,
173 retry_count: int = 5,
174 timeout: float = 5.0,
175 ):
176 """
177 Performs HTTP Post request to notify subscriber. In case if for any reason notification is not sent successfully
178 after maximum number of reties, then notification is dropped.
179 :param session: An aiohttp client session object to maintain http session.
180 :param notification: A dictionary containing all necessary data to make POST request.
181 :param loop: Event loop object.
182 :param retry_count: An integer specifying the maximum number of reties for a notification.
183 :param timeout: A float representing client timeout of each HTTP request.
184 """
185 backoff_delay = 1
186 while retry_count > 0:
187 try:
188 async with session.post(
189 url=notification["CallbackUri"],
190 headers=notification["headers"],
191 auth=notification.get("auth_basic", None),
192 data=json.dumps(notification["payload"]),
193 timeout=timeout,
194 ) as resp:
195 # self.logger.debug("Notification response: {}".format(resp.status))
196 if resp.status == HTTPStatus.NO_CONTENT:
197 self.logger.debug(
198 "Notification sent successfully to subscriber {}".format(
199 notification["payload"]["subscriptionId"]
200 )
201 )
202 else:
203 error_text = "Erroneous response code: {}, ".format(resp.status)
204 error_text += await resp.text()
205 raise NotificationException(error_text)
206 return True
207 except Exception as e:
208 error_text = type(e).__name__ + ": " + str(e)
209 self.logger.debug(
210 "Unable to send notification to subscriber {}. Details: {}".format(
211 notification["payload"]["subscriptionId"], error_text
212 )
213 )
214 error_detail = {
215 "error": type(e).__name__,
216 "error_text": str(e),
217 "timestamp": time.time(),
218 }
219 if "error_details" in notification["payload"].keys():
220 notification["payload"]["error_details"].append(error_detail)
221 else:
222 notification["payload"]["error_details"] = [error_detail]
223 retry_count -= 1
224 backoff_delay *= 2
225 self.logger.debug(
226 "Retry Notification for subscriber: {} after backoff delay: {} seconds.".format(
227 notification["payload"]["subscriptionId"], backoff_delay
228 )
229 )
230 await asyncio.sleep(backoff_delay, loop=loop)
231 # Dropping notification
232 self.logger.debug(
233 "Notification {} sent failed to subscriber:{}.".format(
234 notification["payload"]["notificationType"],
235 notification["payload"]["subscriptionId"],
236 )
237 )
238 return False
239
240
241 class NsLcmNotification(NotificationBase):
242
243 # SOL005 response model for nslcm notifications
244 response_models = {
245 "NsLcmOperationOccurrenceNotification": {
246 "id",
247 "nsInstanceId",
248 "nsLcmOpOccId",
249 "operation",
250 "notificationType",
251 "subscriptionId",
252 "timestamp",
253 "notificationStatus",
254 "operationState",
255 "isAutomaticInvocation",
256 "affectedVnf",
257 "affectedVl",
258 "affectedVnffg",
259 "affectedNs",
260 "affectedSap",
261 "error",
262 "_links",
263 },
264 "NsIdentifierCreationNotification": {
265 "notificationType",
266 "subscriptionId",
267 "timestamp",
268 "nsInstanceId",
269 "_links",
270 },
271 "NsIdentifierDeletionNotification": {
272 "notificationType",
273 "subscriptionId",
274 "timestamp",
275 "nsInstanceId",
276 "_links",
277 },
278 "NsChangeNotification": {
279 "nsInstanceId",
280 "nsComponentType",
281 "nsComponentId",
282 "lcmOpOccIdImpactngNsComponent",
283 "lcmOpNameImpactingNsComponent",
284 "lcmOpOccStatusImpactingNsComponent",
285 "notificationType",
286 "subscriptionId",
287 "timeStamp",
288 "error",
289 "_links",
290 },
291 }
292
293 def __init__(self, db) -> None:
294 """
295 Constructor of NsLcmNotification class.
296 :param db: Database handler.
297 """
298 super().__init__(db)
299 self.subscriber_collection = "mapped_subscriptions"
300
301 def get_models(self) -> dict:
302 """
303 Returns the SOL005 model of notification class
304 :param None
305 :return: dict of SOL005 data model
306 """
307 return NsLcmNotification.response_models
308
309 @staticmethod
310 def _format_nslcm_subscribers(subscribers: list, event_details: dict) -> list:
311 """
312 Formats the raw event details from kakfa message and subscriber details.
313 :param subscribers: A list of subscribers whom the event needs to be notified.
314 :param event_details: A dict containing all meta data of event.
315 :return:
316 """
317 notification_id = str(uuid4())
318 event_timestamp = event_details["params"]["startTime"]
319 resource_links = event_details["params"]["links"]
320 event_operation = event_details["command"]
321 for key in ["_admin", "_id", "id", "links"]:
322 event_details["params"].pop(key, None)
323 for subscriber in subscribers:
324 subscriber["id"] = notification_id
325 subscriber["timestamp"] = event_timestamp
326 subscriber["_links"] = resource_links
327 subscriber["subscriptionId"] = subscriber["reference"]
328 subscriber["operation"] = event_operation
329 del subscriber["reference"]
330 del subscriber["_id"]
331 subscriber.update(event_details["params"])
332 return subscribers
333
334 def get_subscribers(
335 self,
336 nsd_id: str,
337 ns_instance_id: str,
338 command: str,
339 op_state: str,
340 event_details: dict,
341 ) -> list:
342 """
343 Queries database and returns list of subscribers.
344 :param nsd_id: NSD id of an NS whose lifecycle has changed. (scaled, terminated. etc)
345 :param ns_instance_id: NS instance id an NS whose lifecycle has changed.
346 :param command: the command for event.
347 :param op_state: the operation state of NS.
348 :param event_details: dict containing raw data of event occured.
349 :return: List of interested subscribers for occurred event.
350 """
351 notification_type = [
352 "NsLcmOperationOccurrenceNotification",
353 "NsChangeNotification",
354 "NsIdentifierCreationNotification",
355 "NsIdentifierDeletionNotification"
356 ]
357 filter_q = {
358 "identifier": [nsd_id, ns_instance_id],
359 "operationStates": ["ANY"],
360 "operationTypes": ["ANY"],
361 "notificationType": notification_type
362 }
363 if op_state:
364 filter_q["operationStates"].append(op_state)
365 if command:
366 filter_q["operationTypes"].append(command)
367 # self.logger.debug("Db query is: {}".format(filter_q))
368 subscribers = []
369 try:
370 subscribers = self.db.get_list(self.subscriber_collection, filter_q)
371 subscribers = self._format_nslcm_subscribers(subscribers, event_details)
372 except Exception as e:
373 error_text = type(e).__name__ + ": " + str(e)
374 self.logger.debug("Error getting nslcm subscribers: {}".format(error_text))
375 finally:
376 return subscribers
377
378
379 class VnfLcmNotification(NotificationBase):
380 # SOL003 response model for vnflcm notifications
381 response_models = {
382 "VnfLcmOperationOccurrenceNotification": {
383 "id",
384 "notificationType",
385 "subscriptionId",
386 "timeStamp",
387 "notificationStatus",
388 "operationState",
389 "vnfInstanceId",
390 "operation",
391 "isAutomaticInvocation",
392 "vnfLcmOpOccId",
393 "affectedVnfcs",
394 "affectedVirtualLinks",
395 "affectedExtLinkPorts",
396 "affectedVirtualStorages",
397 "changedInfo",
398 "changedExtConnectivity",
399 "modificationsTriggeredByVnfPkgChange",
400 "error",
401 "_links"
402 },
403 "VnfIdentifierCreationNotification": {
404 "id",
405 "notificationType",
406 "subscriptionId",
407 "timeStamp",
408 "vnfInstanceId",
409 "_links"
410 },
411 "VnfIdentifierDeletionNotification": {
412 "id",
413 "notificationType",
414 "subscriptionId",
415 "timeStamp",
416 "vnfInstanceId",
417 "_links"
418 },
419 }
420
421 def __init__(self, db) -> None:
422 """
423 Constructor of VnfLcmNotification class.
424 :param db: Database handler.
425 """
426 super().__init__(db)
427 self.subscriber_collection = "mapped_subscriptions"
428
429 def get_models(self) -> dict:
430 """
431 Returns the SOL003 model of notification class
432 :param None
433 :return: dict of SOL003 data model
434 """
435 return self.response_models
436
437 def _format_vnflcm_subscribers(self, subscribers: list, event_details: dict) -> list:
438 """
439 Formats the raw event details from kafka message and subscriber details.
440 :param subscribers: A list of subscribers whom the event needs to be notified.
441 :param event_details: A dict containing all meta data of event.
442 :return:
443 """
444 notification_id = str(uuid4())
445 event_timestamp = time.time()
446 event_operation = event_details["command"]
447 for subscriber in subscribers:
448 subscriber["id"] = notification_id
449 subscriber["timeStamp"] = event_timestamp
450 subscriber["subscriptionId"] = subscriber["reference"]
451 subscriber["operation"] = event_operation
452 del subscriber["reference"]
453 del subscriber["_id"]
454 subscriber.update(event_details["params"])
455 return subscribers
456
457 def get_subscribers(self, vnfd_id: str, vnf_instance_id: str, command: str, op_state: str,
458 event_details: dict) -> list:
459 """
460 Queries database and returns list of subscribers.
461 :param vnfd_id: Vnfd id of a VNF whose lifecycle has changed. (instantiated, scaled, terminated. etc)
462 :param vnf_instance_id: Vnf instance id of a VNF whose lifecycle has changed.
463 :param command: the command for event.
464 :param op_state: the operation state of VNF.
465 :param event_details: dict containing raw data of event occurred.
466 :return: List of interested subscribers for occurred event.
467 """
468 notification_type = [
469 "VnfIdentifierCreationNotification",
470 "VnfLcmOperationOccurrenceNotification",
471 "VnfIdentifierDeletionNotification"
472 ]
473 filter_q = {
474 "identifier": [vnfd_id, vnf_instance_id],
475 "operationStates": ["ANY"],
476 "operationTypes": ["ANY"],
477 "notificationType": notification_type
478 }
479 if op_state:
480 filter_q["operationStates"].append(op_state)
481 if command:
482 filter_q["operationTypes"].append(command)
483 subscribers = []
484 try:
485 subscribers = self.db.get_list(self.subscriber_collection, filter_q)
486 subscribers = self._format_vnflcm_subscribers(subscribers, event_details)
487 except Exception as e:
488 error_text = type(e).__name__ + ": " + str(e)
489 self.logger.debug("Error getting vnflcm subscribers: {}".format(error_text))
490 finally:
491 return subscribers
492
493
494 class NsdNotification(NotificationBase):
495 def __init__(self, db):
496 """
497 Constructor of the class
498 """
499 super().__init__(db)
500 # TODO will update this once support is there from subscription
501 self.response_models = {}
502 self.subscriber_collection = None
503
504
505 class VnfdNotification(NotificationBase):
506 def __init__(self, db):
507 """
508 Constructor of the class
509 """
510 super().__init__(db)
511 # TODO will update this once support is there from subscription
512 self.response_models = {}
513 self.subscriber_collection = None