fix(vdu): vdu number of instances now is taking into account. Bug 1477
[osm/NBI.git] / osm_nbi / notifications.py
1 # Copyright 2020 K Sai Kiran (Tata Elxsi)
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 __author__ = "K Sai Kiran <saikiran.k@tataelxsi.co.in>"
17 __date__ = "$28-Apr-2020 23:59:59$"
18
19 import asyncio
20 import aiohttp
21 from http import HTTPStatus
22 import json
23 import logging
24 import time
25 from uuid import uuid4
26
27
28 class NotificationException(Exception):
29 """
30 Notification Exception
31 """
32
33 def __init__(self, message: str, http_code: int = HTTPStatus.BAD_REQUEST) -> None:
34 """
35 Constructor of notification exception
36 :param message: String text containing exception details.
37 :param http_code: HTTP status code of exception.
38 """
39 self.http_code = http_code
40 Exception.__init__(self, message)
41
42
43 class NotificationBase:
44
45 response_models = None
46 # Common HTTP payload header for all notifications.
47 payload_header = {
48 "Content-Type": "application/json",
49 "Accept": "application/json"
50 }
51
52 def __init__(self, db) -> None:
53 """
54 Constructor of NotificationBase class.
55 :param db: Database handler.
56 """
57 self.db = db
58 self.logger = logging.getLogger("nbi.notifications")
59 self.subscriber_collection = None
60
61 def get_models(self) -> dict:
62 """
63 Returns the SOL005 model of notification class
64 :param None
65 :return: dict of SOL005 data model
66 """
67 return NotificationBase.response_models
68
69 def get_subscribers(self, **kwargs) -> NotificationException:
70 """
71 Method should be implemented by all notification subclasses
72 :param kwargs: any keyword arguments needed for db query.
73 :return: List of subscribers
74 """
75 raise NotificationException("Method get_subscribers() is not implemented", http_code=HTTPStatus.NOT_IMPLEMENTED)
76
77 @staticmethod
78 def _get_basic_auth(username: str, password: str) -> tuple:
79 return aiohttp.BasicAuth(username, password)
80
81 def _decrypt_password(self, hashed: str, salt: str, schema_version: str = "1.1") -> str:
82 return self.db.decrypt(hashed, schema_version, salt=salt)
83
84 def get_payload(self, meta_notification: dict) -> dict:
85 """
86 Generates SOL005 compliant payload structure and returns them in dictionary.
87 :param meta_notification: notification meta data which needs to be formatted as SOL005 compliant
88 :return: A dictionary which is SOL005 compliant.
89 """
90 model_name = meta_notification["notificationType"]
91 response_models = self.get_models()
92 if not response_models or not response_models.get(model_name):
93 raise NotificationException("Response model {} is not defined.".format(model_name),
94 HTTPStatus.NOT_IMPLEMENTED)
95 model_keys = response_models[model_name]
96 payload = dict.fromkeys(model_keys, "N/A")
97 notification_keys = set(meta_notification.keys())
98 for model_key in model_keys.intersection(notification_keys):
99 payload[model_key] = meta_notification[model_key]
100 self.logger.debug("Payload generated for subscriber: {} for {}".format(payload["subscriptionId"],
101 payload["notificationType"]))
102 return payload
103
104 async def send_notifications(self, subscribers: list, loop: asyncio.AbstractEventLoop = None):
105 """
106 Generate tasks for all notification for an event.
107 :param subscribers: A list of subscribers who want to be notified for event.
108 :param loop: Event loop object.
109 """
110 notifications = []
111 for subscriber in subscribers:
112 # Notify without auth
113 if not subscriber.get("authentication"):
114 notifications.append({
115 "headers": self.payload_header,
116 "payload": self.get_payload(subscriber),
117 "CallbackUri": subscriber["CallbackUri"]
118 })
119 elif subscriber["authentication"]["authType"] == "basic":
120 salt = subscriber["subscriptionId"]
121 hashed_password = subscriber["authentication"]["paramsBasic"]["password"]
122 password = self._decrypt_password(hashed_password, salt)
123 auth_basic = self._get_basic_auth(subscriber["authentication"]["paramsBasic"]["userName"], password)
124 notifications.append({
125 "headers": self.payload_header,
126 "payload": self.get_payload(subscriber),
127 "auth_basic": auth_basic,
128 "CallbackUri": subscriber["CallbackUri"]
129 })
130 # TODO add support for AuthType OAuth and TLS after support is added in subscription.
131 else:
132 self.logger.debug("Subscriber {} can not be notified. {} notification auth type is not implemented"
133 .format(subscriber["subscriptionId"],
134 subscriber["authentication"]["authType"]))
135
136 if notifications:
137 tasks = []
138 async with aiohttp.ClientSession(loop=loop) as session:
139 for notification in notifications:
140 tasks.append(asyncio.ensure_future(self.send_notification(session, notification, loop=loop),
141 loop=loop))
142 await asyncio.gather(*tasks, loop=loop)
143
144 async def send_notification(self, session: aiohttp.ClientSession, notification: dict,
145 loop: asyncio.AbstractEventLoop = None, retry_count: int = 5, timeout: float = 5.0):
146 """
147 Performs HTTP Post request to notify subscriber. In case if for any reason notification is not sent successfully
148 after maximum number of reties, then notification is dropped.
149 :param session: An aiohttp client session object to maintain http session.
150 :param notification: A dictionary containing all necessary data to make POST request.
151 :param loop: Event loop object.
152 :param retry_count: An integer specifying the maximum number of reties for a notification.
153 :param timeout: A float representing client timeout of each HTTP request.
154 """
155 backoff_delay = 1
156 while retry_count > 0:
157 try:
158 async with session.post(url=notification["CallbackUri"], headers=notification["headers"],
159 auth=notification.get("auth_basic", None),
160 data=json.dumps(notification["payload"]),
161 timeout=timeout) as resp:
162 # self.logger.debug("Notification response: {}".format(resp.status))
163 if resp.status == HTTPStatus.NO_CONTENT:
164 self.logger.debug("Notification sent successfully to subscriber {}"
165 .format(notification["payload"]["subscriptionId"]))
166 else:
167 error_text = "Erroneous response code: {}, ".format(resp.status)
168 error_text += await resp.text()
169 raise NotificationException(error_text)
170 return True
171 except Exception as e:
172 error_text = type(e).__name__ + ": " + str(e)
173 self.logger.debug("Unable to send notification to subscriber {}. Details: {}"
174 .format(notification["payload"]["subscriptionId"], error_text))
175 error_detail = {
176 "error": type(e).__name__,
177 "error_text": str(e),
178 "timestamp": time.time()
179 }
180 if "error_details" in notification["payload"].keys():
181 notification["payload"]["error_details"].append(error_detail)
182 else:
183 notification["payload"]["error_details"] = [error_detail]
184 retry_count -= 1
185 backoff_delay *= 2
186 self.logger.debug("Retry Notification for subscriber: {} after backoff delay: {} seconds."
187 .format(notification["payload"]["subscriptionId"], backoff_delay))
188 await asyncio.sleep(backoff_delay, loop=loop)
189 # Dropping notification
190 self.logger.debug("Notification {} sent failed to subscriber:{}."
191 .format(notification["payload"]["notificationType"],
192 notification["payload"]["subscriptionId"]))
193 return False
194
195
196 class NsLcmNotification(NotificationBase):
197
198 # SOL005 response model for nslcm notifications
199 response_models = {
200 "NsLcmOperationOccurrenceNotification": {"id", "nsInstanceId", "nsLcmOpOccId", "operation",
201 "notificationType", "subscriptionId", "timestamp",
202 "notificationStatus", "operationState", "isAutomaticInvocation",
203 "affectedVnf", "affectedVl", "affectedVnffg", "affectedNs",
204 "affectedSap", "error", "_links"},
205
206 "NsIdentifierCreationNotification": {"notificationType", "subscriptionId", "timestamp",
207 "nsInstanceId", "_links"},
208
209 "NsIdentifierDeletionNotification": {"notificationType", "subscriptionId", "timestamp",
210 "nsInstanceId", "_links"},
211
212 "NsChangeNotification": {"nsInstanceId", "nsComponentType", "nsComponentId",
213 "lcmOpOccIdImpactngNsComponent", "lcmOpNameImpactingNsComponent",
214 "lcmOpOccStatusImpactingNsComponent", "notificationType", "subscriptionId",
215 "timeStamp", "error", "_links"}
216 }
217
218 def __init__(self, db) -> None:
219 """
220 Constructor of NsLcmNotification class.
221 :param db: Database handler.
222 """
223 super().__init__(db)
224 self.subscriber_collection = "mapped_subscriptions"
225
226 def get_models(self) -> dict:
227 """
228 Returns the SOL005 model of notification class
229 :param None
230 :return: dict of SOL005 data model
231 """
232 return NsLcmNotification.response_models
233
234 @staticmethod
235 def _format_nslcm_subscribers(subscribers: list, event_details: dict) -> list:
236 """
237 Formats the raw event details from kakfa message and subscriber details.
238 :param subscribers: A list of subscribers whom the event needs to be notified.
239 :param event_details: A dict containing all meta data of event.
240 :return:
241 """
242 notification_id = str(uuid4())
243 event_timestamp = event_details["params"]["startTime"]
244 resource_links = event_details["params"]["links"]
245 event_operation = event_details["command"]
246 for key in ["_admin", "_id", "id", "links"]:
247 event_details["params"].pop(key, None)
248 for subscriber in subscribers:
249 subscriber["id"] = notification_id
250 subscriber["timestamp"] = event_timestamp
251 subscriber["_links"] = resource_links
252 subscriber["subscriptionId"] = subscriber["reference"]
253 subscriber["operation"] = event_operation
254 del subscriber["reference"]
255 del subscriber["_id"]
256 subscriber.update(event_details["params"])
257 return subscribers
258
259 def get_subscribers(self, nsd_id: str, ns_instance_id: str, command: str, op_state: str,
260 event_details: dict) -> list:
261 """
262 Queries database and returns list of subscribers.
263 :param nsd_id: NSD id of an NS whose lifecycle has changed. (scaled, terminated. etc)
264 :param ns_instance_id: NS instance id an NS whose lifecycle has changed.
265 :param command: the command for event.
266 :param op_state: the operation state of NS.
267 :param event_details: dict containing raw data of event occured.
268 :return: List of interested subscribers for occurred event.
269 """
270 filter_q = {"identifier": [nsd_id, ns_instance_id], "operationStates": ["ANY"], "operationTypes": ["ANY"]}
271 if op_state:
272 filter_q["operationStates"].append(op_state)
273 if command:
274 filter_q["operationTypes"].append(command)
275 # self.logger.debug("Db query is: {}".format(filter_q))
276 subscribers = []
277 try:
278 subscribers = self.db.get_list(self.subscriber_collection, filter_q)
279 subscribers = self._format_nslcm_subscribers(subscribers, event_details)
280 except Exception as e:
281 error_text = type(e).__name__ + ": " + str(e)
282 self.logger.debug("Error getting nslcm subscribers: {}".format(error_text))
283 finally:
284 return subscribers
285
286
287 class NsdNotification(NotificationBase):
288
289 def __init__(self, db):
290 """
291 Constructor of the class
292 """
293 super().__init__(db)
294 # TODO will update this once support is there from subscription
295 self.response_models = {}
296 self.subscriber_collection = None
297
298
299 class VnfdNotification(NotificationBase):
300
301 def __init__(self, db):
302 """
303 Constructor of the class
304 """
305 super().__init__(db)
306 # TODO will update this once support is there from subscription
307 self.response_models = {}
308 self.subscriber_collection = None