Code Coverage

Cobertura Coverage Report > osm_nbi >

notifications.py

Trend

Classes0%
 
Lines0%
 
Conditionals100%
 

File Coverage summary

NameClassesLinesConditionals
notifications.py
0%
0/1
0%
0/132
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
notifications.py
0%
0/132
N/A

Source

osm_nbi/notifications.py
1 # Copyright 2020 K Sai Kiran (Tata Elxsi)
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #    http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 0 __author__ = "K Sai Kiran <saikiran.k@tataelxsi.co.in>"
17 0 __date__ = "$28-Apr-2020 23:59:59$"
18
19 0 import asyncio
20 0 import aiohttp
21 0 from http import HTTPStatus
22 0 import json
23 0 import logging
24 0 import time
25 0 from uuid import uuid4
26
27
28 0 class NotificationException(Exception):
29     """
30     Notification Exception
31     """
32
33 0     def __init__(self, message: str, http_code: int = HTTPStatus.BAD_REQUEST) -> None:
34         """
35         Constructor of notification exception
36         :param message: String text containing exception details.
37         :param http_code: HTTP status code of exception.
38         """
39 0         self.http_code = http_code
40 0         Exception.__init__(self, message)
41
42
43 0 class NotificationBase:
44
45 0     response_models = None
46     # Common HTTP payload header for all notifications.
47 0     payload_header = {
48         "Content-Type": "application/json",
49         "Accept": "application/json"
50     }
51
52 0     def __init__(self, db) -> None:
53         """
54         Constructor of NotificationBase class.
55         :param db: Database handler.
56         """
57 0         self.db = db
58 0         self.logger = logging.getLogger("nbi.notifications")
59 0         self.subscriber_collection = None
60
61 0     def get_models(self) -> dict:
62         """
63         Returns the SOL005 model of notification class
64         :param None
65         :return: dict of SOL005 data model
66         """
67 0         return NotificationBase.response_models
68
69 0     def get_subscribers(self, **kwargs) -> NotificationException:
70         """
71         Method should be implemented by all notification subclasses
72         :param kwargs: any keyword arguments needed for db query.
73         :return: List of subscribers
74         """
75 0         raise NotificationException("Method get_subscribers() is not implemented", http_code=HTTPStatus.NOT_IMPLEMENTED)
76
77 0     @staticmethod
78 0     def _get_basic_auth(username: str, password: str) -> tuple:
79 0         return aiohttp.BasicAuth(username, password)
80
81 0     def _decrypt_password(self, hashed: str, salt: str, schema_version: str = "1.1") -> str:
82 0         return self.db.decrypt(hashed, schema_version, salt=salt)
83
84 0     def get_payload(self, meta_notification: dict) -> dict:
85         """
86         Generates SOL005 compliant payload structure and returns them in dictionary.
87         :param meta_notification: notification meta data which needs to be formatted as SOL005 compliant
88         :return: A dictionary which is SOL005 compliant.
89         """
90 0         model_name = meta_notification["notificationType"]
91 0         response_models = self.get_models()
92 0         if not response_models or not response_models.get(model_name):
93 0             raise NotificationException("Response model {} is not defined.".format(model_name),
94                                         HTTPStatus.NOT_IMPLEMENTED)
95 0         model_keys = response_models[model_name]
96 0         payload = dict.fromkeys(model_keys, "N/A")
97 0         notification_keys = set(meta_notification.keys())
98 0         for model_key in model_keys.intersection(notification_keys):
99 0             payload[model_key] = meta_notification[model_key]
100 0         self.logger.debug("Payload generated for subscriber: {} for {}".format(payload["subscriptionId"],
101                                                                                payload["notificationType"]))
102 0         return payload
103
104 0     async def send_notifications(self, subscribers: list, loop: asyncio.AbstractEventLoop = None):
105         """
106         Generate tasks for all notification for an event.
107         :param subscribers: A list of subscribers who want to be notified for event.
108         :param loop: Event loop object.
109         """
110 0         notifications = []
111 0         for subscriber in subscribers:
112             # Notify without auth
113 0             if not subscriber.get("authentication"):
114 0                 notifications.append({
115                     "headers": self.payload_header,
116                     "payload": self.get_payload(subscriber),
117                     "CallbackUri": subscriber["CallbackUri"]
118                 })
119 0             elif subscriber["authentication"]["authType"] == "basic":
120 0                 salt = subscriber["subscriptionId"]
121 0                 hashed_password = subscriber["authentication"]["paramsBasic"]["password"]
122 0                 password = self._decrypt_password(hashed_password, salt)
123 0                 auth_basic = self._get_basic_auth(subscriber["authentication"]["paramsBasic"]["userName"], password)
124 0                 notifications.append({
125                     "headers": self.payload_header,
126                     "payload": self.get_payload(subscriber),
127                     "auth_basic": auth_basic,
128                     "CallbackUri": subscriber["CallbackUri"]
129                 })
130             # TODO add support for AuthType OAuth and TLS after support is added in subscription.
131             else:
132 0                 self.logger.debug("Subscriber {} can not be notified. {} notification auth type is not implemented"
133                                   .format(subscriber["subscriptionId"],
134                                           subscriber["authentication"]["authType"]))
135
136 0         if notifications:
137 0             tasks = []
138 0             async with aiohttp.ClientSession(loop=loop) as session:
139 0                 for notification in notifications:
140 0                     tasks.append(asyncio.ensure_future(self.send_notification(session, notification, loop=loop),
141                                                        loop=loop))
142 0                 await asyncio.gather(*tasks, loop=loop)
143
144 0     async def send_notification(self, session: aiohttp.ClientSession, notification: dict,
145                                 loop: asyncio.AbstractEventLoop = None, retry_count: int = 5, timeout: float = 5.0):
146         """
147         Performs HTTP Post request to notify subscriber. In case if for any reason notification is not sent successfully
148         after maximum number of reties, then notification is dropped.
149         :param session: An aiohttp client session object to maintain http session.
150         :param notification: A dictionary containing all necessary data to make POST request.
151         :param loop: Event loop object.
152         :param retry_count: An integer specifying the maximum number of reties for a notification.
153         :param timeout: A float representing client timeout of each HTTP request.
154         """
155 0         backoff_delay = 1
156 0         while retry_count > 0:
157 0             try:
158 0                 async with session.post(url=notification["CallbackUri"], headers=notification["headers"],
159                                         auth=notification.get("auth_basic", None),
160                                         data=json.dumps(notification["payload"]),
161                                         timeout=timeout) as resp:
162                     # self.logger.debug("Notification response: {}".format(resp.status))
163 0                     if resp.status == HTTPStatus.NO_CONTENT:
164 0                         self.logger.debug("Notification sent successfully to subscriber {}"
165                                           .format(notification["payload"]["subscriptionId"]))
166                     else:
167 0                         error_text = "Erroneous response code: {}, ".format(resp.status)
168 0                         error_text += await resp.text()
169 0                         raise NotificationException(error_text)
170 0                 return True
171 0             except Exception as e:
172 0                 error_text = type(e).__name__ + ": " + str(e)
173 0                 self.logger.debug("Unable to send notification to subscriber {}. Details: {}"
174                                   .format(notification["payload"]["subscriptionId"], error_text))
175 0                 error_detail = {
176                     "error": type(e).__name__,
177                     "error_text": str(e),
178                     "timestamp": time.time()
179                 }
180 0                 if "error_details" in notification["payload"].keys():
181 0                     notification["payload"]["error_details"].append(error_detail)
182                 else:
183 0                     notification["payload"]["error_details"] = [error_detail]
184 0                 retry_count -= 1
185 0                 backoff_delay *= 2
186 0                 self.logger.debug("Retry Notification for subscriber: {} after backoff delay: {} seconds."
187                                   .format(notification["payload"]["subscriptionId"], backoff_delay))
188 0                 await asyncio.sleep(backoff_delay, loop=loop)
189         # Dropping notification
190 0         self.logger.debug("Notification {} sent failed to subscriber:{}."
191                           .format(notification["payload"]["notificationType"],
192                                   notification["payload"]["subscriptionId"]))
193 0         return False
194
195
196 0 class NsLcmNotification(NotificationBase):
197
198     # SOL005 response model for nslcm notifications
199 0     response_models = {
200         "NsLcmOperationOccurrenceNotification": {"id", "nsInstanceId", "nsLcmOpOccId", "operation",
201                                                  "notificationType", "subscriptionId", "timestamp",
202                                                  "notificationStatus", "operationState", "isAutomaticInvocation",
203                                                  "affectedVnf", "affectedVl", "affectedVnffg", "affectedNs",
204                                                  "affectedSap", "error", "_links"},
205
206         "NsIdentifierCreationNotification": {"notificationType", "subscriptionId", "timestamp",
207                                              "nsInstanceId", "_links"},
208
209         "NsIdentifierDeletionNotification": {"notificationType", "subscriptionId", "timestamp",
210                                              "nsInstanceId", "_links"},
211
212         "NsChangeNotification": {"nsInstanceId", "nsComponentType", "nsComponentId",
213                                  "lcmOpOccIdImpactngNsComponent", "lcmOpNameImpactingNsComponent",
214                                  "lcmOpOccStatusImpactingNsComponent", "notificationType", "subscriptionId",
215                                  "timeStamp", "error", "_links"}
216     }
217
218 0     def __init__(self, db) -> None:
219         """
220         Constructor of NsLcmNotification class.
221         :param db: Database handler.
222         """
223 0         super().__init__(db)
224 0         self.subscriber_collection = "mapped_subscriptions"
225
226 0     def get_models(self) -> dict:
227         """
228         Returns the SOL005 model of notification class
229         :param None
230         :return: dict of SOL005 data model
231         """
232 0         return NsLcmNotification.response_models
233
234 0     @staticmethod
235 0     def _format_nslcm_subscribers(subscribers: list, event_details: dict) -> list:
236         """
237         Formats the raw event details from kakfa message and subscriber details.
238         :param subscribers: A list of subscribers whom the event needs to be notified.
239         :param event_details: A dict containing all meta data of event.
240         :return:
241         """
242 0         notification_id = str(uuid4())
243 0         event_timestamp = event_details["params"]["startTime"]
244 0         resource_links = event_details["params"]["links"]
245 0         event_operation = event_details["command"]
246 0         for key in ["_admin", "_id", "id", "links"]:
247 0             event_details["params"].pop(key, None)
248 0         for subscriber in subscribers:
249 0             subscriber["id"] = notification_id
250 0             subscriber["timestamp"] = event_timestamp
251 0             subscriber["_links"] = resource_links
252 0             subscriber["subscriptionId"] = subscriber["reference"]
253 0             subscriber["operation"] = event_operation
254 0             del subscriber["reference"]
255 0             del subscriber["_id"]
256 0             subscriber.update(event_details["params"])
257 0         return subscribers
258
259 0     def get_subscribers(self, nsd_id: str, ns_instance_id: str, command: str, op_state: str,
260                         event_details: dict) -> list:
261         """
262         Queries database and returns list of subscribers.
263         :param nsd_id: NSD id of an NS whose lifecycle has changed. (scaled, terminated. etc)
264         :param ns_instance_id: NS instance id an NS whose lifecycle has changed.
265         :param command: the command for event.
266         :param op_state: the operation state of NS.
267         :param event_details: dict containing raw data of event occured.
268         :return: List of interested subscribers for occurred event.
269         """
270 0         filter_q = {"identifier": [nsd_id, ns_instance_id], "operationStates": ["ANY"], "operationTypes": ["ANY"]}
271 0         if op_state:
272 0             filter_q["operationStates"].append(op_state)
273 0         if command:
274 0             filter_q["operationTypes"].append(command)
275         # self.logger.debug("Db query is: {}".format(filter_q))
276 0         subscribers = []
277 0         try:
278 0             subscribers = self.db.get_list(self.subscriber_collection, filter_q)
279 0             subscribers = self._format_nslcm_subscribers(subscribers, event_details)
280 0         except Exception as e:
281 0             error_text = type(e).__name__ + ": " + str(e)
282 0             self.logger.debug("Error getting nslcm subscribers: {}".format(error_text))
283         finally:
284 0             return subscribers
285
286
287 0 class NsdNotification(NotificationBase):
288
289 0     def __init__(self, db):
290         """
291         Constructor of the class
292         """
293 0         super().__init__(db)
294         # TODO will update this once support is there from subscription
295 0         self.response_models = {}
296 0         self.subscriber_collection = None
297
298
299 0 class VnfdNotification(NotificationBase):
300
301 0     def __init__(self, db):
302         """
303         Constructor of the class
304         """
305 0         super().__init__(db)
306         # TODO will update this once support is there from subscription
307 0         self.response_models = {}
308 0         self.subscriber_collection = None