Code Coverage

Cobertura Coverage Report > osm_nbi >

notifications.py

Trend

File Coverage summary

NameClassesLinesConditionals
notifications.py
0%
0/1
0%
0/170
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
notifications.py
0%
0/170
N/A

Source

osm_nbi/notifications.py
1 # Copyright 2020 K Sai Kiran (Tata Elxsi)
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #    http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 0 __author__ = "K Sai Kiran <saikiran.k@tataelxsi.co.in>"
17 0 __date__ = "$28-Apr-2020 23:59:59$"
18
19 0 import asyncio
20 0 import aiohttp
21 0 from http import HTTPStatus
22 0 import json
23 0 import logging
24 0 import time
25 0 from uuid import uuid4
26
27
28 0 class NotificationException(Exception):
29     """
30     Notification Exception
31     """
32
33 0     def __init__(self, message: str, http_code: int = HTTPStatus.BAD_REQUEST) -> None:
34         """
35         Constructor of notification exception
36         :param message: String text containing exception details.
37         :param http_code: HTTP status code of exception.
38         """
39 0         self.http_code = http_code
40 0         Exception.__init__(self, message)
41
42
43 0 class NotificationBase:
44 0     response_models = None
45     # Common HTTP payload header for all notifications.
46 0     payload_header = {"Content-Type": "application/json", "Accept": "application/json"}
47
48 0     def __init__(self, db) -> None:
49         """
50         Constructor of NotificationBase class.
51         :param db: Database handler.
52         """
53 0         self.db = db
54 0         self.logger = logging.getLogger("nbi.notifications")
55 0         self.subscriber_collection = None
56
57 0     def get_models(self) -> dict:
58         """
59         Returns the SOL005 model of notification class
60         :param None
61         :return: dict of SOL005 data model
62         """
63 0         return NotificationBase.response_models
64
65 0     def get_subscribers(self, **kwargs) -> NotificationException:
66         """
67         Method should be implemented by all notification subclasses
68         :param kwargs: any keyword arguments needed for db query.
69         :return: List of subscribers
70         """
71 0         raise NotificationException(
72             "Method get_subscribers() is not implemented",
73             http_code=HTTPStatus.NOT_IMPLEMENTED,
74         )
75
76 0     @staticmethod
77 0     def _get_basic_auth(username: str, password: str) -> tuple:
78 0         return aiohttp.BasicAuth(username, password)
79
80 0     def _decrypt_password(
81         self, hashed: str, salt: str, schema_version: str = "1.1"
82     ) -> str:
83 0         return self.db.decrypt(hashed, schema_version, salt=salt)
84
85 0     def get_payload(self, meta_notification: dict) -> dict:
86         """
87         Generates SOL005 compliant payload structure and returns them in dictionary.
88         :param meta_notification: notification meta data which needs to be formatted as SOL005 compliant
89         :return: A dictionary which is SOL005 compliant.
90         """
91 0         model_name = meta_notification["notificationType"]
92 0         response_models = self.get_models()
93 0         if not response_models or not response_models.get(model_name):
94 0             raise NotificationException(
95                 "Response model {} is not defined.".format(model_name),
96                 HTTPStatus.NOT_IMPLEMENTED,
97             )
98 0         model_keys = response_models[model_name]
99 0         payload = dict.fromkeys(model_keys, "N/A")
100 0         notification_keys = set(meta_notification.keys())
101 0         for model_key in model_keys.intersection(notification_keys):
102 0             payload[model_key] = meta_notification[model_key]
103 0         self.logger.debug(
104             "Payload generated for subscriber: {} for {}".format(
105                 payload["subscriptionId"], payload["notificationType"]
106             )
107         )
108 0         return payload
109
110 0     async def send_notifications(
111         self,
112         subscribers: list,
113     ):
114         """
115         Generate tasks for all notification for an event.
116         :param subscribers: A list of subscribers who want to be notified for event.
117         """
118 0         notifications = []
119 0         for subscriber in subscribers:
120             # Notify without auth
121 0             if not subscriber.get("authentication"):
122 0                 notifications.append(
123                     {
124                         "headers": self.payload_header,
125                         "payload": self.get_payload(subscriber),
126                         "CallbackUri": subscriber["CallbackUri"],
127                     }
128                 )
129 0             elif subscriber["authentication"]["authType"] == "basic":
130 0                 salt = subscriber["subscriptionId"]
131 0                 hashed_password = subscriber["authentication"]["paramsBasic"][
132                     "password"
133                 ]
134 0                 password = self._decrypt_password(hashed_password, salt)
135 0                 auth_basic = self._get_basic_auth(
136                     subscriber["authentication"]["paramsBasic"]["userName"], password
137                 )
138 0                 notifications.append(
139                     {
140                         "headers": self.payload_header,
141                         "payload": self.get_payload(subscriber),
142                         "auth_basic": auth_basic,
143                         "CallbackUri": subscriber["CallbackUri"],
144                     }
145                 )
146             # TODO add support for AuthType OAuth and TLS after support is added in subscription.
147             else:
148 0                 self.logger.debug(
149                     "Subscriber {} can not be notified. {} notification auth type is not implemented".format(
150                         subscriber["subscriptionId"],
151                         subscriber["authentication"]["authType"],
152                     )
153                 )
154
155 0         if notifications:
156 0             tasks = []
157 0             async with aiohttp.ClientSession() as session:
158 0                 for notification in notifications:
159 0                     tasks.append(
160                         asyncio.ensure_future(
161                             self.send_notification(session, notification),
162                         )
163                     )
164 0                 await asyncio.gather(*tasks)
165
166 0     async def send_notification(
167         self,
168         session: aiohttp.ClientSession,
169         notification: dict,
170         retry_count: int = 5,
171         timeout: float = 5.0,
172     ):
173         """
174         Performs HTTP Post request to notify subscriber. In case if for any reason notification is not sent successfully
175         after maximum number of reties, then notification is dropped.
176         :param session: An aiohttp client session object to maintain http session.
177         :param notification: A dictionary containing all necessary data to make POST request.
178         :param retry_count: An integer specifying the maximum number of reties for a notification.
179         :param timeout: A float representing client timeout of each HTTP request.
180         """
181 0         backoff_delay = 1
182 0         while retry_count > 0:
183 0             try:
184 0                 async with session.post(
185                     url=notification["CallbackUri"],
186                     headers=notification["headers"],
187                     auth=notification.get("auth_basic", None),
188                     data=json.dumps(notification["payload"]),
189                     timeout=timeout,
190                 ) as resp:
191                     # self.logger.debug("Notification response: {}".format(resp.status))
192 0                     if resp.status == HTTPStatus.NO_CONTENT:
193 0                         self.logger.debug(
194                             "Notification sent successfully to subscriber {}".format(
195                                 notification["payload"]["subscriptionId"]
196                             )
197                         )
198                     else:
199 0                         error_text = "Erroneous response code: {}, ".format(resp.status)
200 0                         error_text += await resp.text()
201 0                         raise NotificationException(error_text)
202 0                 return True
203 0             except Exception as e:
204 0                 error_text = type(e).__name__ + ": " + str(e)
205 0                 self.logger.debug(
206                     "Unable to send notification to subscriber {}. Details: {}".format(
207                         notification["payload"]["subscriptionId"], error_text
208                     )
209                 )
210 0                 error_detail = {
211                     "error": type(e).__name__,
212                     "error_text": str(e),
213                     "timestamp": time.time(),
214                 }
215 0                 if "error_details" in notification["payload"].keys():
216 0                     notification["payload"]["error_details"].append(error_detail)
217                 else:
218 0                     notification["payload"]["error_details"] = [error_detail]
219 0                 retry_count -= 1
220 0                 backoff_delay *= 2
221 0                 self.logger.debug(
222                     "Retry Notification for subscriber: {} after backoff delay: {} seconds.".format(
223                         notification["payload"]["subscriptionId"], backoff_delay
224                     )
225                 )
226 0                 await asyncio.sleep(backoff_delay)
227         # Dropping notification
228 0         self.logger.debug(
229             "Notification {} sent failed to subscriber:{}.".format(
230                 notification["payload"]["notificationType"],
231                 notification["payload"]["subscriptionId"],
232             )
233         )
234 0         return False
235
236
237 0 class NsLcmNotification(NotificationBase):
238     # maps kafka commands of completed operations to the original operation type
239 0     completed_operation_map = {
240         "INSTANTIATED": "INSTANTIATE",
241         "SCALED": "SCALE",
242         "TERMINATED": "TERMINATE",
243         "UPDATED": "UPDATE",
244         "HEALED": "HEAL",
245     }
246     # SOL005 response model for nslcm notifications
247 0     response_models = {
248         "NsLcmOperationOccurrenceNotification": {
249             "id",
250             "nsInstanceId",
251             "nsLcmOpOccId",
252             "operation",
253             "notificationType",
254             "subscriptionId",
255             "timestamp",
256             "notificationStatus",
257             "operationState",
258             "isAutomaticInvocation",
259             "affectedVnf",
260             "affectedVl",
261             "affectedVnffg",
262             "affectedNs",
263             "affectedSap",
264             "error",
265             "_links",
266         },
267         "NsIdentifierCreationNotification": {
268             "notificationType",
269             "subscriptionId",
270             "timestamp",
271             "nsInstanceId",
272             "_links",
273         },
274         "NsIdentifierDeletionNotification": {
275             "notificationType",
276             "subscriptionId",
277             "timestamp",
278             "nsInstanceId",
279             "_links",
280         },
281         "NsChangeNotification": {
282             "nsInstanceId",
283             "nsComponentType",
284             "nsComponentId",
285             "lcmOpOccIdImpactngNsComponent",
286             "lcmOpNameImpactingNsComponent",
287             "lcmOpOccStatusImpactingNsComponent",
288             "notificationType",
289             "subscriptionId",
290             "timeStamp",
291             "error",
292             "_links",
293         },
294     }
295
296 0     def __init__(self, db) -> None:
297         """
298         Constructor of NsLcmNotification class.
299         :param db: Database handler.
300         """
301 0         super().__init__(db)
302 0         self.subscriber_collection = "mapped_subscriptions"
303
304 0     def get_models(self) -> dict:
305         """
306         Returns the SOL005 model of notification class
307         :param None
308         :return: dict of SOL005 data model
309         """
310 0         return NsLcmNotification.response_models
311
312 0     @staticmethod
313 0     def _format_nslcm_subscribers(subscribers: list, event_details: dict) -> list:
314         """
315         Formats the raw event details from kakfa message and subscriber details.
316         :param subscribers: A list of subscribers whom the event needs to be notified.
317         :param event_details: A dict containing all meta data of event.
318         :return:
319         """
320 0         notification_id = str(uuid4())
321 0         event_timestamp = event_details["params"]["startTime"]
322 0         resource_links = event_details["params"]["links"]
323 0         event_operation = event_details["command"]
324 0         for key in ["_admin", "_id", "id", "links"]:
325 0             event_details["params"].pop(key, None)
326 0         for subscriber in subscribers:
327 0             subscriber["id"] = notification_id
328 0             subscriber["timestamp"] = event_timestamp
329 0             subscriber["_links"] = resource_links
330 0             subscriber["subscriptionId"] = subscriber["reference"]
331 0             subscriber["operation"] = event_operation
332 0             del subscriber["reference"]
333 0             del subscriber["_id"]
334 0             subscriber.update(event_details["params"])
335 0         return subscribers
336
337 0     def get_subscribers(
338         self,
339         nsd_id: str,
340         ns_instance_id: str,
341         command: str,
342         op_state: str,
343         event_details: dict,
344     ) -> list:
345         """
346         Queries database and returns list of subscribers.
347         :param nsd_id: NSD id of an NS whose lifecycle has changed. (scaled, terminated. etc)
348         :param ns_instance_id: NS instance id an NS whose lifecycle has changed.
349         :param command: the command for event.
350         :param op_state: the operation state of NS.
351         :param event_details: dict containing raw data of event occured.
352         :return: List of interested subscribers for occurred event.
353         """
354 0         notification_type = [
355             "NsLcmOperationOccurrenceNotification",
356             "NsChangeNotification",
357             "NsIdentifierCreationNotification",
358             "NsIdentifierDeletionNotification",
359         ]
360 0         filter_q = {
361             "identifier": [nsd_id, ns_instance_id],
362             "operationStates": ["ANY"],
363             "operationTypes": ["ANY"],
364             "notificationType": notification_type,
365         }
366 0         if op_state:
367 0             filter_q["operationStates"].append(op_state)
368 0         if command:
369 0             op_type = self.completed_operation_map.get(command, command)
370 0             filter_q["operationTypes"].append(op_type)
371         # self.logger.debug("Db query is: {}".format(filter_q))
372 0         subscribers = []
373 0         try:
374 0             subscribers = self.db.get_list(self.subscriber_collection, filter_q)
375 0             subscribers = self._format_nslcm_subscribers(subscribers, event_details)
376 0         except Exception as e:
377 0             error_text = type(e).__name__ + ": " + str(e)
378 0             self.logger.debug("Error getting nslcm subscribers: {}".format(error_text))
379         finally:
380 0             return subscribers
381
382
383 0 class VnfLcmNotification(NotificationBase):
384     # SOL003 response model for vnflcm notifications
385 0     response_models = {
386         "VnfLcmOperationOccurrenceNotification": {
387             "id",
388             "notificationType",
389             "subscriptionId",
390             "timeStamp",
391             "notificationStatus",
392             "operationState",
393             "vnfInstanceId",
394             "operation",
395             "isAutomaticInvocation",
396             "vnfLcmOpOccId",
397             "affectedVnfcs",
398             "affectedVirtualLinks",
399             "affectedExtLinkPorts",
400             "affectedVirtualStorages",
401             "changedInfo",
402             "changedExtConnectivity",
403             "modificationsTriggeredByVnfPkgChange",
404             "error",
405             "_links",
406         },
407         "VnfIdentifierCreationNotification": {
408             "id",
409             "notificationType",
410             "subscriptionId",
411             "timeStamp",
412             "vnfInstanceId",
413             "_links",
414         },
415         "VnfIdentifierDeletionNotification": {
416             "id",
417             "notificationType",
418             "subscriptionId",
419             "timeStamp",
420             "vnfInstanceId",
421             "_links",
422         },
423     }
424
425 0     def __init__(self, db) -> None:
426         """
427         Constructor of VnfLcmNotification class.
428         :param db: Database handler.
429         """
430 0         super().__init__(db)
431 0         self.subscriber_collection = "mapped_subscriptions"
432
433 0     def get_models(self) -> dict:
434         """
435         Returns the SOL003 model of notification class
436         :param None
437         :return: dict of SOL003 data model
438         """
439 0         return self.response_models
440
441 0     def _format_vnflcm_subscribers(
442         self, subscribers: list, event_details: dict
443     ) -> list:
444         """
445         Formats the raw event details from kafka message and subscriber details.
446         :param subscribers: A list of subscribers whom the event needs to be notified.
447         :param event_details: A dict containing all meta data of event.
448         :return:
449         """
450 0         notification_id = str(uuid4())
451 0         event_timestamp = time.time()
452 0         event_operation = event_details["command"]
453 0         for subscriber in subscribers:
454 0             subscriber["id"] = notification_id
455 0             subscriber["timeStamp"] = event_timestamp
456 0             subscriber["subscriptionId"] = subscriber["reference"]
457 0             subscriber["operation"] = event_operation
458 0             del subscriber["reference"]
459 0             del subscriber["_id"]
460 0             subscriber.update(event_details["params"])
461 0         return subscribers
462
463 0     def get_subscribers(
464         self,
465         vnfd_id: str,
466         vnf_instance_id: str,
467         command: str,
468         op_state: str,
469         event_details: dict,
470     ) -> list:
471         """
472         Queries database and returns list of subscribers.
473         :param vnfd_id: Vnfd id of a VNF whose lifecycle has changed. (instantiated, scaled, terminated. etc)
474         :param vnf_instance_id: Vnf instance id of a VNF whose lifecycle has changed.
475         :param command: the command for event.
476         :param op_state: the operation state of VNF.
477         :param event_details: dict containing raw data of event occurred.
478         :return: List of interested subscribers for occurred event.
479         """
480 0         notification_type = [
481             "VnfIdentifierCreationNotification",
482             "VnfLcmOperationOccurrenceNotification",
483             "VnfIdentifierDeletionNotification",
484         ]
485 0         filter_q = {
486             "identifier": [vnfd_id, vnf_instance_id],
487             "operationStates": ["ANY"],
488             "operationTypes": ["ANY"],
489             "notificationType": notification_type,
490         }
491 0         if op_state:
492 0             filter_q["operationStates"].append(op_state)
493 0         if command:
494 0             filter_q["operationTypes"].append(command)
495 0         subscribers = []
496 0         try:
497 0             subscribers = self.db.get_list(self.subscriber_collection, filter_q)
498 0             subscribers = self._format_vnflcm_subscribers(subscribers, event_details)
499 0         except Exception as e:
500 0             error_text = type(e).__name__ + ": " + str(e)
501 0             self.logger.debug("Error getting vnflcm subscribers: {}".format(error_text))
502         finally:
503 0             return subscribers
504
505
506 0 class NsdNotification(NotificationBase):
507 0     def __init__(self, db):
508         """
509         Constructor of the class
510         """
511 0         super().__init__(db)
512         # TODO will update this once support is there from subscription
513 0         self.response_models = {}
514 0         self.subscriber_collection = None
515
516
517 0 class VnfdNotification(NotificationBase):
518 0     def __init__(self, db):
519         """
520         Constructor of the class
521         """
522 0         super().__init__(db)
523         # TODO will update this once support is there from subscription
524 0         self.response_models = {}
525 0         self.subscriber_collection = None