Code Coverage

Cobertura Coverage Report > osm_nbi >

notifications.py

Trend

Classes0%
 
Lines0%
 
Conditionals100%
 

File Coverage summary

NameClassesLinesConditionals
notifications.py
0%
0/1
0%
0/168
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
notifications.py
0%
0/168
N/A

Source

osm_nbi/notifications.py
1 # Copyright 2020 K Sai Kiran (Tata Elxsi)
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #    http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 0 __author__ = "K Sai Kiran <saikiran.k@tataelxsi.co.in>"
17 0 __date__ = "$28-Apr-2020 23:59:59$"
18
19 0 import asyncio
20 0 import aiohttp
21 0 from http import HTTPStatus
22 0 import json
23 0 import logging
24 0 import time
25 0 from uuid import uuid4
26
27
28 0 class NotificationException(Exception):
29     """
30     Notification Exception
31     """
32
33 0     def __init__(self, message: str, http_code: int = HTTPStatus.BAD_REQUEST) -> None:
34         """
35         Constructor of notification exception
36         :param message: String text containing exception details.
37         :param http_code: HTTP status code of exception.
38         """
39 0         self.http_code = http_code
40 0         Exception.__init__(self, message)
41
42
43 0 class NotificationBase:
44
45 0     response_models = None
46     # Common HTTP payload header for all notifications.
47 0     payload_header = {"Content-Type": "application/json", "Accept": "application/json"}
48
49 0     def __init__(self, db) -> None:
50         """
51         Constructor of NotificationBase class.
52         :param db: Database handler.
53         """
54 0         self.db = db
55 0         self.logger = logging.getLogger("nbi.notifications")
56 0         self.subscriber_collection = None
57
58 0     def get_models(self) -> dict:
59         """
60         Returns the SOL005 model of notification class
61         :param None
62         :return: dict of SOL005 data model
63         """
64 0         return NotificationBase.response_models
65
66 0     def get_subscribers(self, **kwargs) -> NotificationException:
67         """
68         Method should be implemented by all notification subclasses
69         :param kwargs: any keyword arguments needed for db query.
70         :return: List of subscribers
71         """
72 0         raise NotificationException(
73             "Method get_subscribers() is not implemented",
74             http_code=HTTPStatus.NOT_IMPLEMENTED,
75         )
76
77 0     @staticmethod
78 0     def _get_basic_auth(username: str, password: str) -> tuple:
79 0         return aiohttp.BasicAuth(username, password)
80
81 0     def _decrypt_password(
82         self, hashed: str, salt: str, schema_version: str = "1.1"
83     ) -> str:
84 0         return self.db.decrypt(hashed, schema_version, salt=salt)
85
86 0     def get_payload(self, meta_notification: dict) -> dict:
87         """
88         Generates SOL005 compliant payload structure and returns them in dictionary.
89         :param meta_notification: notification meta data which needs to be formatted as SOL005 compliant
90         :return: A dictionary which is SOL005 compliant.
91         """
92 0         model_name = meta_notification["notificationType"]
93 0         response_models = self.get_models()
94 0         if not response_models or not response_models.get(model_name):
95 0             raise NotificationException(
96                 "Response model {} is not defined.".format(model_name),
97                 HTTPStatus.NOT_IMPLEMENTED,
98             )
99 0         model_keys = response_models[model_name]
100 0         payload = dict.fromkeys(model_keys, "N/A")
101 0         notification_keys = set(meta_notification.keys())
102 0         for model_key in model_keys.intersection(notification_keys):
103 0             payload[model_key] = meta_notification[model_key]
104 0         self.logger.debug(
105             "Payload generated for subscriber: {} for {}".format(
106                 payload["subscriptionId"], payload["notificationType"]
107             )
108         )
109 0         return payload
110
111 0     async def send_notifications(
112         self, subscribers: list, loop: asyncio.AbstractEventLoop = None
113     ):
114         """
115         Generate tasks for all notification for an event.
116         :param subscribers: A list of subscribers who want to be notified for event.
117         :param loop: Event loop object.
118         """
119 0         notifications = []
120 0         for subscriber in subscribers:
121             # Notify without auth
122 0             if not subscriber.get("authentication"):
123 0                 notifications.append(
124                     {
125                         "headers": self.payload_header,
126                         "payload": self.get_payload(subscriber),
127                         "CallbackUri": subscriber["CallbackUri"],
128                     }
129                 )
130 0             elif subscriber["authentication"]["authType"] == "basic":
131 0                 salt = subscriber["subscriptionId"]
132 0                 hashed_password = subscriber["authentication"]["paramsBasic"][
133                     "password"
134                 ]
135 0                 password = self._decrypt_password(hashed_password, salt)
136 0                 auth_basic = self._get_basic_auth(
137                     subscriber["authentication"]["paramsBasic"]["userName"], password
138                 )
139 0                 notifications.append(
140                     {
141                         "headers": self.payload_header,
142                         "payload": self.get_payload(subscriber),
143                         "auth_basic": auth_basic,
144                         "CallbackUri": subscriber["CallbackUri"],
145                     }
146                 )
147             # TODO add support for AuthType OAuth and TLS after support is added in subscription.
148             else:
149 0                 self.logger.debug(
150                     "Subscriber {} can not be notified. {} notification auth type is not implemented".format(
151                         subscriber["subscriptionId"],
152                         subscriber["authentication"]["authType"],
153                     )
154                 )
155
156 0         if notifications:
157 0             tasks = []
158 0             async with aiohttp.ClientSession(loop=loop) as session:
159 0                 for notification in notifications:
160 0                     tasks.append(
161                         asyncio.ensure_future(
162                             self.send_notification(session, notification, loop=loop),
163                             loop=loop,
164                         )
165                     )
166 0                 await asyncio.gather(*tasks, loop=loop)
167
168 0     async def send_notification(
169         self,
170         session: aiohttp.ClientSession,
171         notification: dict,
172         loop: asyncio.AbstractEventLoop = None,
173         retry_count: int = 5,
174         timeout: float = 5.0,
175     ):
176         """
177         Performs HTTP Post request to notify subscriber. In case if for any reason notification is not sent successfully
178         after maximum number of reties, then notification is dropped.
179         :param session: An aiohttp client session object to maintain http session.
180         :param notification: A dictionary containing all necessary data to make POST request.
181         :param loop: Event loop object.
182         :param retry_count: An integer specifying the maximum number of reties for a notification.
183         :param timeout: A float representing client timeout of each HTTP request.
184         """
185 0         backoff_delay = 1
186 0         while retry_count > 0:
187 0             try:
188 0                 async with session.post(
189                     url=notification["CallbackUri"],
190                     headers=notification["headers"],
191                     auth=notification.get("auth_basic", None),
192                     data=json.dumps(notification["payload"]),
193                     timeout=timeout,
194                 ) as resp:
195                     # self.logger.debug("Notification response: {}".format(resp.status))
196 0                     if resp.status == HTTPStatus.NO_CONTENT:
197 0                         self.logger.debug(
198                             "Notification sent successfully to subscriber {}".format(
199                                 notification["payload"]["subscriptionId"]
200                             )
201                         )
202                     else:
203 0                         error_text = "Erroneous response code: {}, ".format(resp.status)
204 0                         error_text += await resp.text()
205 0                         raise NotificationException(error_text)
206 0                 return True
207 0             except Exception as e:
208 0                 error_text = type(e).__name__ + ": " + str(e)
209 0                 self.logger.debug(
210                     "Unable to send notification to subscriber {}. Details: {}".format(
211                         notification["payload"]["subscriptionId"], error_text
212                     )
213                 )
214 0                 error_detail = {
215                     "error": type(e).__name__,
216                     "error_text": str(e),
217                     "timestamp": time.time(),
218                 }
219 0                 if "error_details" in notification["payload"].keys():
220 0                     notification["payload"]["error_details"].append(error_detail)
221                 else:
222 0                     notification["payload"]["error_details"] = [error_detail]
223 0                 retry_count -= 1
224 0                 backoff_delay *= 2
225 0                 self.logger.debug(
226                     "Retry Notification for subscriber: {} after backoff delay: {} seconds.".format(
227                         notification["payload"]["subscriptionId"], backoff_delay
228                     )
229                 )
230 0                 await asyncio.sleep(backoff_delay, loop=loop)
231         # Dropping notification
232 0         self.logger.debug(
233             "Notification {} sent failed to subscriber:{}.".format(
234                 notification["payload"]["notificationType"],
235                 notification["payload"]["subscriptionId"],
236             )
237         )
238 0         return False
239
240
241 0 class NsLcmNotification(NotificationBase):
242
243     # SOL005 response model for nslcm notifications
244 0     response_models = {
245         "NsLcmOperationOccurrenceNotification": {
246             "id",
247             "nsInstanceId",
248             "nsLcmOpOccId",
249             "operation",
250             "notificationType",
251             "subscriptionId",
252             "timestamp",
253             "notificationStatus",
254             "operationState",
255             "isAutomaticInvocation",
256             "affectedVnf",
257             "affectedVl",
258             "affectedVnffg",
259             "affectedNs",
260             "affectedSap",
261             "error",
262             "_links",
263         },
264         "NsIdentifierCreationNotification": {
265             "notificationType",
266             "subscriptionId",
267             "timestamp",
268             "nsInstanceId",
269             "_links",
270         },
271         "NsIdentifierDeletionNotification": {
272             "notificationType",
273             "subscriptionId",
274             "timestamp",
275             "nsInstanceId",
276             "_links",
277         },
278         "NsChangeNotification": {
279             "nsInstanceId",
280             "nsComponentType",
281             "nsComponentId",
282             "lcmOpOccIdImpactngNsComponent",
283             "lcmOpNameImpactingNsComponent",
284             "lcmOpOccStatusImpactingNsComponent",
285             "notificationType",
286             "subscriptionId",
287             "timeStamp",
288             "error",
289             "_links",
290         },
291     }
292
293 0     def __init__(self, db) -> None:
294         """
295         Constructor of NsLcmNotification class.
296         :param db: Database handler.
297         """
298 0         super().__init__(db)
299 0         self.subscriber_collection = "mapped_subscriptions"
300
301 0     def get_models(self) -> dict:
302         """
303         Returns the SOL005 model of notification class
304         :param None
305         :return: dict of SOL005 data model
306         """
307 0         return NsLcmNotification.response_models
308
309 0     @staticmethod
310 0     def _format_nslcm_subscribers(subscribers: list, event_details: dict) -> list:
311         """
312         Formats the raw event details from kakfa message and subscriber details.
313         :param subscribers: A list of subscribers whom the event needs to be notified.
314         :param event_details: A dict containing all meta data of event.
315         :return:
316         """
317 0         notification_id = str(uuid4())
318 0         event_timestamp = event_details["params"]["startTime"]
319 0         resource_links = event_details["params"]["links"]
320 0         event_operation = event_details["command"]
321 0         for key in ["_admin", "_id", "id", "links"]:
322 0             event_details["params"].pop(key, None)
323 0         for subscriber in subscribers:
324 0             subscriber["id"] = notification_id
325 0             subscriber["timestamp"] = event_timestamp
326 0             subscriber["_links"] = resource_links
327 0             subscriber["subscriptionId"] = subscriber["reference"]
328 0             subscriber["operation"] = event_operation
329 0             del subscriber["reference"]
330 0             del subscriber["_id"]
331 0             subscriber.update(event_details["params"])
332 0         return subscribers
333
334 0     def get_subscribers(
335         self,
336         nsd_id: str,
337         ns_instance_id: str,
338         command: str,
339         op_state: str,
340         event_details: dict,
341     ) -> list:
342         """
343         Queries database and returns list of subscribers.
344         :param nsd_id: NSD id of an NS whose lifecycle has changed. (scaled, terminated. etc)
345         :param ns_instance_id: NS instance id an NS whose lifecycle has changed.
346         :param command: the command for event.
347         :param op_state: the operation state of NS.
348         :param event_details: dict containing raw data of event occured.
349         :return: List of interested subscribers for occurred event.
350         """
351 0         notification_type = [
352             "NsLcmOperationOccurrenceNotification",
353             "NsChangeNotification",
354             "NsIdentifierCreationNotification",
355             "NsIdentifierDeletionNotification"
356         ]
357 0         filter_q = {
358             "identifier": [nsd_id, ns_instance_id],
359             "operationStates": ["ANY"],
360             "operationTypes": ["ANY"],
361             "notificationType": notification_type
362             }
363 0         if op_state:
364 0             filter_q["operationStates"].append(op_state)
365 0         if command:
366 0             filter_q["operationTypes"].append(command)
367         # self.logger.debug("Db query is: {}".format(filter_q))
368 0         subscribers = []
369 0         try:
370 0             subscribers = self.db.get_list(self.subscriber_collection, filter_q)
371 0             subscribers = self._format_nslcm_subscribers(subscribers, event_details)
372 0         except Exception as e:
373 0             error_text = type(e).__name__ + ": " + str(e)
374 0             self.logger.debug("Error getting nslcm subscribers: {}".format(error_text))
375         finally:
376 0             return subscribers
377
378
379 0 class VnfLcmNotification(NotificationBase):
380     # SOL003 response model for vnflcm notifications
381 0     response_models = {
382         "VnfLcmOperationOccurrenceNotification": {
383             "id",
384             "notificationType",
385             "subscriptionId",
386             "timeStamp",
387             "notificationStatus",
388             "operationState",
389             "vnfInstanceId",
390             "operation",
391             "isAutomaticInvocation",
392             "vnfLcmOpOccId",
393             "affectedVnfcs",
394             "affectedVirtualLinks",
395             "affectedExtLinkPorts",
396             "affectedVirtualStorages",
397             "changedInfo",
398             "changedExtConnectivity",
399             "modificationsTriggeredByVnfPkgChange",
400             "error",
401             "_links"
402         },
403         "VnfIdentifierCreationNotification": {
404             "id",
405             "notificationType",
406             "subscriptionId",
407             "timeStamp",
408             "vnfInstanceId",
409             "_links"
410         },
411         "VnfIdentifierDeletionNotification": {
412             "id",
413             "notificationType",
414             "subscriptionId",
415             "timeStamp",
416             "vnfInstanceId",
417             "_links"
418         },
419     }
420
421 0     def __init__(self, db) -> None:
422         """
423         Constructor of VnfLcmNotification class.
424         :param db: Database handler.
425         """
426 0         super().__init__(db)
427 0         self.subscriber_collection = "mapped_subscriptions"
428
429 0     def get_models(self) -> dict:
430         """
431         Returns the SOL003 model of notification class
432         :param None
433         :return: dict of SOL003 data model
434         """
435 0         return self.response_models
436
437 0     def _format_vnflcm_subscribers(self, subscribers: list, event_details: dict) -> list:
438         """
439         Formats the raw event details from kafka message and subscriber details.
440         :param subscribers: A list of subscribers whom the event needs to be notified.
441         :param event_details: A dict containing all meta data of event.
442         :return:
443         """
444 0         notification_id = str(uuid4())
445 0         event_timestamp = time.time()
446 0         event_operation = event_details["command"]
447 0         for subscriber in subscribers:
448 0             subscriber["id"] = notification_id
449 0             subscriber["timeStamp"] = event_timestamp
450 0             subscriber["subscriptionId"] = subscriber["reference"]
451 0             subscriber["operation"] = event_operation
452 0             del subscriber["reference"]
453 0             del subscriber["_id"]
454 0             subscriber.update(event_details["params"])
455 0         return subscribers
456
457 0     def get_subscribers(self, vnfd_id: str, vnf_instance_id: str, command: str, op_state: str,
458                         event_details: dict) -> list:
459         """
460         Queries database and returns list of subscribers.
461         :param vnfd_id: Vnfd id of a VNF whose lifecycle has changed. (instantiated, scaled, terminated. etc)
462         :param vnf_instance_id: Vnf instance id of a VNF whose lifecycle has changed.
463         :param command: the command for event.
464         :param op_state: the operation state of VNF.
465         :param event_details: dict containing raw data of event occurred.
466         :return: List of interested subscribers for occurred event.
467         """
468 0         notification_type = [
469             "VnfIdentifierCreationNotification",
470             "VnfLcmOperationOccurrenceNotification",
471             "VnfIdentifierDeletionNotification"
472         ]
473 0         filter_q = {
474             "identifier": [vnfd_id, vnf_instance_id],
475             "operationStates": ["ANY"],
476             "operationTypes": ["ANY"],
477             "notificationType": notification_type
478         }
479 0         if op_state:
480 0             filter_q["operationStates"].append(op_state)
481 0         if command:
482 0             filter_q["operationTypes"].append(command)
483 0         subscribers = []
484 0         try:
485 0             subscribers = self.db.get_list(self.subscriber_collection, filter_q)
486 0             subscribers = self._format_vnflcm_subscribers(subscribers, event_details)
487 0         except Exception as e:
488 0             error_text = type(e).__name__ + ": " + str(e)
489 0             self.logger.debug("Error getting vnflcm subscribers: {}".format(error_text))
490         finally:
491 0             return subscribers
492
493
494 0 class NsdNotification(NotificationBase):
495 0     def __init__(self, db):
496         """
497         Constructor of the class
498         """
499 0         super().__init__(db)
500         # TODO will update this once support is there from subscription
501 0         self.response_models = {}
502 0         self.subscriber_collection = None
503
504
505 0 class VnfdNotification(NotificationBase):
506 0     def __init__(self, db):
507         """
508         Constructor of the class
509         """
510 0         super().__init__(db)
511         # TODO will update this once support is there from subscription
512 0         self.response_models = {}
513 0         self.subscriber_collection = None