Dockerfile and tox.ini for devops-stages modified to work on ubuntu18.04
[osm/NBI.git] / osm_nbi / notifications.py
1 # Copyright 2020 K Sai Kiran (Tata Elxsi)
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 __author__ = "K Sai Kiran <saikiran.k@tataelxsi.co.in>"
17 __date__ = "$28-Apr-2020 23:59:59$"
18
19 import asyncio
20 import aiohttp
21 from http import HTTPStatus
22 import json
23 import logging
24 import time
25 from uuid import uuid4
26
27
28 class NotificationException(Exception):
29 """
30 Notification Exception
31 """
32
33 def __init__(self, message: str, http_code: int = HTTPStatus.BAD_REQUEST) -> None:
34 """
35 Constructor of notification exception
36 :param message: String text containing exception details.
37 :param http_code: HTTP status code of exception.
38 """
39 self.http_code = http_code
40 Exception.__init__(self, message)
41
42
43 class NotificationBase:
44
45 response_models = None
46 # Common HTTP payload header for all notifications.
47 payload_header = {
48 "Content-Type": "application/json",
49 "Accept": "application/json"
50 }
51
52 def __init__(self, db) -> None:
53 """
54 Constructor of NotificationBase class.
55 :param db: Database handler.
56 """
57 self.db = db
58 self.logger = logging.getLogger("nbi.notifications")
59 self.subscriber_collection = None
60
61 def get_models(self) -> dict:
62 """
63 Returns the SOL005 model of notification class
64 :param None
65 :return: dict of SOL005 data model
66 """
67 return NotificationBase.response_models
68
69 def get_subscribers(self, **kwargs) -> NotificationException:
70 """
71 Method should be implemented by all notification subclasses
72 :param kwargs: any keyword arguments needed for db query.
73 :return: List of subscribers
74 """
75 raise NotificationException("Method get_subscribers() is not implemented", http_code=HTTPStatus.NOT_IMPLEMENTED)
76
77 @staticmethod
78 def _get_basic_auth(username: str, password: str) -> tuple:
79 return aiohttp.BasicAuth(username, password)
80
81 def _decrypt_password(self, hashed: str, salt: str, schema_version: str = "1.1") -> str:
82 return self.db.decrypt(hashed, schema_version, salt=salt)
83
84 def get_payload(self, meta_notification: dict) -> dict:
85 """
86 Generates SOL005 compliant payload structure and returns them in dictionary.
87 :param meta_notification: notification meta data which needs to be formatted as SOL005 compliant
88 :return: A dictionary which is SOL005 compliant.
89 """
90 model_name = meta_notification["notificationType"]
91 response_models = self.get_models()
92 if not response_models or not response_models.get(model_name):
93 raise NotificationException("Response model {} is not defined.".format(model_name),
94 HTTPStatus.NOT_IMPLEMENTED)
95 model_keys = response_models[model_name]
96 payload = dict.fromkeys(model_keys, "N/A")
97 notification_keys = set(meta_notification.keys())
98 for model_key in model_keys.intersection(notification_keys):
99 payload[model_key] = meta_notification[model_key]
100 self.logger.debug("Payload generated for subscriber: {} for {}".format(payload["subscriptionId"],
101 payload["notificationType"]))
102 return payload
103
104 async def send_notifications(self, subscribers: list, loop: asyncio.AbstractEventLoop = None):
105 """
106 Generate tasks for all notification for an event.
107 :param subscribers: A list of subscribers who want to be notified for event.
108 :param loop: Event loop object.
109 """
110 notifications = []
111 for subscriber in subscribers:
112 # Notify without auth
113 if not subscriber.get("authentication"):
114 notifications.append({
115 "headers": self.payload_header,
116 "payload": self.get_payload(subscriber),
117 "CallbackUri": subscriber["CallbackUri"]
118 })
119 elif subscriber["authentication"]["authType"] == "basic":
120 salt = subscriber["subscriptionId"]
121 hashed_password = subscriber["authentication"]["paramsBasic"]["password"]
122 password = self._decrypt_password(hashed_password, salt)
123 auth_basic = self._get_basic_auth(subscriber["authentication"]["paramsBasic"]["userName"], password)
124 notifications.append({
125 "headers": self.payload_header,
126 "payload": self.get_payload(subscriber),
127 "auth_basic": auth_basic,
128 "CallbackUri": subscriber["CallbackUri"]
129 })
130 # TODO add support for AuthType OAuth and TLS after support is added in subscription.
131 else:
132 self.logger.debug("Subscriber {} can not be notified. {} notification auth type is not implemented"
133 .format(subscriber["subscriptionId"],
134 subscriber["authentication"]["authType"]))
135
136 tasks = []
137 async with aiohttp.ClientSession(loop=loop) as session:
138 for notification in notifications:
139 tasks.append(asyncio.ensure_future(self.send_notification(session, notification, loop=loop), loop=loop))
140 await asyncio.gather(*tasks, loop=loop)
141
142 async def send_notification(self, session: aiohttp.ClientSession, notification: dict,
143 loop: asyncio.AbstractEventLoop = None, retry_count: int = 5, timeout: float = 5.0):
144 """
145 Performs HTTP Post request to notify subscriber. In case if for any reason notification is not sent successfully
146 after maximum number of reties, then notification is dropped.
147 :param session: An aiohttp client session object to maintain http session.
148 :param notification: A dictionary containing all necessary data to make POST request.
149 :param loop: Event loop object.
150 :param retry_count: An integer specifying the maximum number of reties for a notification.
151 :param timeout: A float representing client timeout of each HTTP request.
152 """
153 backoff_delay = 1
154 while retry_count > 0:
155 try:
156 async with session.post(url=notification["CallbackUri"], headers=notification["headers"],
157 auth=notification.get("auth_basic", None),
158 data=json.dumps(notification["payload"]),
159 timeout=timeout) as resp:
160 # self.logger.debug("Notification response: {}".format(resp.status))
161 if resp.status == HTTPStatus.NO_CONTENT:
162 self.logger.debug("Notification sent successfully to subscriber {}"
163 .format(notification["payload"]["subscriptionId"]))
164 else:
165 error_text = "Erroneous response code: {}, ".format(resp.status)
166 error_text += await resp.text()
167 raise NotificationException(error_text)
168 return True
169 except Exception as e:
170 error_text = type(e).__name__ + ": " + str(e)
171 self.logger.debug("Unable to send notification to subscriber {}. Details: {}"
172 .format(notification["payload"]["subscriptionId"], error_text))
173 error_detail = {
174 "error": type(e).__name__,
175 "error_text": str(e),
176 "timestamp": time.time()
177 }
178 if "error_details" in notification["payload"].keys():
179 notification["payload"]["error_details"].append(error_detail)
180 else:
181 notification["payload"]["error_details"] = [error_detail]
182 retry_count -= 1
183 backoff_delay *= 2
184 self.logger.debug("Retry Notification for subscriber: {} after backoff delay: {} seconds."
185 .format(notification["payload"]["subscriptionId"], backoff_delay))
186 await asyncio.sleep(backoff_delay, loop=loop)
187 # Dropping notification
188 self.logger.debug("Notification {} sent failed to subscriber:{}."
189 .format(notification["payload"]["notificationType"],
190 notification["payload"]["subscriptionId"]))
191 return False
192
193
194 class NsLcmNotification(NotificationBase):
195
196 # SOL005 response model for nslcm notifications
197 response_models = {
198 "NsLcmOperationOccurrenceNotification": {"id", "nsInstanceId", "nsLcmOpOccId", "operation",
199 "notificationType", "subscriptionId", "timestamp",
200 "notificationStatus", "operationState", "isAutomaticInvocation",
201 "affectedVnf", "affectedVl", "affectedVnffg", "affectedNs",
202 "affectedSap", "error", "_links"},
203
204 "NsIdentifierCreationNotification": {"notificationType", "subscriptionId", "timestamp",
205 "nsInstanceId", "_links"},
206
207 "NsIdentifierDeletionNotification": {"notificationType", "subscriptionId", "timestamp",
208 "nsInstanceId", "_links"},
209
210 "NsChangeNotification": {"nsInstanceId", "nsComponentType", "nsComponentId",
211 "lcmOpOccIdImpactngNsComponent", "lcmOpNameImpactingNsComponent",
212 "lcmOpOccStatusImpactingNsComponent", "notificationType", "subscriptionId",
213 "timeStamp", "error", "_links"}
214 }
215
216 def __init__(self, db) -> None:
217 """
218 Constructor of NsLcmNotification class.
219 :param db: Database handler.
220 """
221 super().__init__(db)
222 self.subscriber_collection = "mapped_subscriptions"
223
224 def get_models(self) -> dict:
225 """
226 Returns the SOL005 model of notification class
227 :param None
228 :return: dict of SOL005 data model
229 """
230 return NsLcmNotification.response_models
231
232 @staticmethod
233 def _format_nslcm_subscribers(subscribers: list, event_details: dict) -> list:
234 """
235 Formats the raw event details from kakfa message and subscriber details.
236 :param subscribers: A list of subscribers whom the event needs to be notified.
237 :param event_details: A dict containing all meta data of event.
238 :return:
239 """
240 notification_id = str(uuid4())
241 event_timestamp = event_details["params"]["startTime"]
242 resource_links = event_details["params"]["links"]
243 event_operation = event_details["command"]
244 for key in ["_admin", "_id", "id", "links"]:
245 event_details["params"].pop(key, None)
246 for subscriber in subscribers:
247 subscriber["id"] = notification_id
248 subscriber["timestamp"] = event_timestamp
249 subscriber["_links"] = resource_links
250 subscriber["subscriptionId"] = subscriber["reference"]
251 subscriber["operation"] = event_operation
252 del subscriber["reference"]
253 del subscriber["_id"]
254 subscriber.update(event_details["params"])
255 return subscribers
256
257 def get_subscribers(self, nsd_id: str, ns_instance_id: str, command: str, op_state: str,
258 event_details: dict) -> list:
259 """
260 Queries database and returns list of subscribers.
261 :param nsd_id: NSD id of an NS whose lifecycle has changed. (scaled, terminated. etc)
262 :param ns_instance_id: NS instance id an NS whose lifecycle has changed.
263 :param command: the command for event.
264 :param op_state: the operation state of NS.
265 :param event_details: dict containing raw data of event occured.
266 :return: List of interested subscribers for occurred event.
267 """
268 filter_q = {"identifier": [nsd_id, ns_instance_id], "operationStates": ["ANY"], "operationTypes": ["ANY"]}
269 if op_state:
270 filter_q["operationStates"].append(op_state)
271 if command:
272 filter_q["operationTypes"].append(command)
273 # self.logger.debug("Db query is: {}".format(filter_q))
274 subscribers = []
275 try:
276 subscribers = self.db.get_list(self.subscriber_collection, filter_q)
277 subscribers = self._format_nslcm_subscribers(subscribers, event_details)
278 except Exception as e:
279 error_text = type(e).__name__ + ": " + str(e)
280 self.logger.debug("Error getting nslcm subscribers: {}".format(error_text))
281 finally:
282 return subscribers
283
284
285 class NsdNotification(NotificationBase):
286
287 def __init__(self, db):
288 """
289 Constructor of the class
290 """
291 super().__init__(db)
292 # TODO will update this once support is there from subscription
293 self.response_models = {}
294 self.subscriber_collection = None
295
296
297 class VnfdNotification(NotificationBase):
298
299 def __init__(self, db):
300 """
301 Constructor of the class
302 """
303 super().__init__(db)
304 # TODO will update this once support is there from subscription
305 self.response_models = {}
306 self.subscriber_collection = None