bug 1092 fixing deletion of items referenced by several projects
[osm/NBI.git] / osm_nbi / notifications.py
1 # Copyright 2020 K Sai Kiran (Tata Elxsi)
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 __author__ = "K Sai Kiran <saikiran.k@tataelxsi.co.in>"
17 __date__ = "$28-Apr-2020 23:59:59$"
18
19 import asyncio
20 import aiohttp
21 from http import HTTPStatus
22 import json
23 import logging
24 import time
25 from uuid import uuid4
26
27
28 class NotificationException(Exception):
29 """
30 Notification Exception
31 """
32
33 def __init__(self, message: str, http_code: int = HTTPStatus.BAD_REQUEST) -> None:
34 """
35 Constructor of notification exception
36 :param message: String text containing exception details.
37 :param http_code: HTTP status code of exception.
38 """
39 self.http_code = http_code
40 Exception.__init__(self, message)
41
42
43 class NotificationBase:
44
45 response_models = None
46 # Common HTTP payload header for all notifications.
47 payload_header = {
48 "Content-Type": "application/json",
49 "Accept": "application/json"
50 }
51
52 def __init__(self, db) -> None:
53 """
54 Constructor of NotificationBase class.
55 :param db: Database handler.
56 """
57 self.db = db
58 self.logger = logging.getLogger("nbi.notifications")
59 self.subscriber_collection = None
60
61 def get_models(self) -> dict:
62 """
63 Returns the SOL005 model of notification class
64 :param None
65 :return: dict of SOL005 data model
66 """
67 return NotificationBase.response_models
68
69 def get_subscribers(self, **kwargs) -> NotificationException:
70 """
71 Method should be implemented by all notification subclasses
72 :param kwargs: any keyword arguments needed for db query.
73 :return: List of subscribers
74 """
75 raise NotificationException("Method get_subscribers() is not implemented", http_code=HTTPStatus.NOT_IMPLEMENTED)
76
77 @staticmethod
78 def _get_basic_auth(username: str, password: str) -> tuple:
79 return aiohttp.BasicAuth(username, password)
80
81 def _decrypt_password(self, hashed: str, salt: str, schema_version: str = "1.1") -> str:
82 return self.db.decrypt(hashed, schema_version, salt=salt)
83
84 def get_payload(self, meta_notification: dict) -> dict:
85 """
86 Generates SOL005 compliant payload structure and returns them in dictionary.
87 :param meta_notification: notification meta data which needs to be formatted as SOL005 compliant
88 :return: A dictionary which is SOL005 compliant.
89 """
90 model_name = meta_notification["notificationType"]
91 response_models = self.get_models()
92 if not response_models or not response_models.get(model_name):
93 raise NotificationException("Response model {} is not defined.".format(model_name),
94 HTTPStatus.NOT_IMPLEMENTED)
95 model_keys = response_models[model_name]
96 payload = dict.fromkeys(model_keys, "N/A")
97 notification_keys = set(meta_notification.keys())
98 for model_key in model_keys.intersection(notification_keys):
99 payload[model_key] = meta_notification[model_key]
100 self.logger.debug("Payload generated for subscriber: {} for {}".format(payload["subscriptionId"],
101 payload["notificationType"]))
102 return payload
103
104 async def send_notifications(self, subscribers: list, loop: asyncio.AbstractEventLoop = None):
105 """
106 Generate tasks for all notification for an event.
107 :param subscribers: A list of subscribers who want to be notified for event.
108 :param loop: Event loop object.
109 """
110 notifications = []
111 for subscriber in subscribers:
112 # TODO add support for AuthType OAuth and TLS after support is added in subscription.
113 if subscriber["authentication"]["authType"] != "basic":
114 self.logger.debug("Subscriber {} can not be notified {} notification auth type is not implemented"
115 .format(subscriber["subscriptionId"],
116 subscriber["authentication"]["authType"]))
117 continue
118 salt = subscriber["subscriptionId"]
119 hashed_password = subscriber["authentication"]["paramsBasic"]["password"]
120 password = self._decrypt_password(hashed_password, salt)
121 auth_basic = self._get_basic_auth(subscriber["authentication"]["paramsBasic"]["userName"], password)
122 notifications.append({
123 "headers": self.payload_header,
124 "payload": self.get_payload(subscriber),
125 "auth_basic": auth_basic,
126 "CallbackUri": subscriber["CallbackUri"]
127 })
128 tasks = []
129 async with aiohttp.ClientSession(loop=loop) as session:
130 for notification in notifications:
131 tasks.append(asyncio.ensure_future(self.send_notification(session, notification), loop=loop))
132 await asyncio.gather(*tasks, loop=loop)
133
134 async def send_notification(self, session: aiohttp.ClientSession, notification: dict,
135 loop: asyncio.AbstractEventLoop = None, retry_count: int = 5, timeout: float = 5.0):
136 """
137 Performs HTTP Post request to notify subscriber. In case if for any reason notification is not sent successfully
138 after maximum number of reties, then notification is dropped.
139 :param session: An aiohttp client session object to maintain http session.
140 :param notification: A dictionary containing all necessary data to make POST request.
141 :param loop: Event loop object.
142 :param retry_count: An integer specifying the maximum number of reties for a notification.
143 :param timeout: A float representing client timeout of each HTTP request.
144 """
145 backoff_delay = 1
146 while retry_count > 0:
147 try:
148 async with session.post(url=notification["CallbackUri"], headers=notification["headers"],
149 auth=notification["auth_basic"], data=json.dumps(notification["payload"]),
150 timeout=timeout) as resp:
151 # self.logger.debug("Notification response: {}".format(resp.status))
152 if resp.status == HTTPStatus.NO_CONTENT:
153 self.logger.debug("Notification sent successfully to subscriber {}"
154 .format(notification["payload"]["subscriptionId"]))
155 else:
156 error_text = "Erroneous response code: {}, ".format(resp.status)
157 error_text += await resp.text()
158 raise NotificationException(error_text)
159 return True
160 except Exception as e:
161 error_text = type(e).__name__ + ": " + str(e)
162 self.logger.debug("Unable while sending notification to subscriber {}. Details: {}"
163 .format(notification["payload"]["subscriptionId"], error_text))
164 error_detail = {
165 "error": type(e).__name__,
166 "error_text": str(e),
167 "timestamp": time.time()
168 }
169 if "error_details" in notification["payload"].keys():
170 notification["payload"]["error_details"].append(error_detail)
171 else:
172 notification["payload"]["error_details"] = [error_detail]
173 retry_count -= 1
174 backoff_delay *= 2
175 self.logger.debug("Retry Notification for subscriber: {} after backoff delay: {} seconds."
176 .format(notification["payload"]["subscriptionId"], backoff_delay))
177 await asyncio.sleep(backoff_delay, loop=loop)
178 # Dropping notification
179 self.logger.debug("Notification {} sent failed to subscriber:{}."
180 .format(notification["payload"]["notificationType"],
181 notification["payload"]["subscriptionId"]))
182 return False
183
184
185 class NsLcmNotification(NotificationBase):
186
187 # SOL005 response model for nslcm notifications
188 response_models = {
189 "NsLcmOperationOccurrenceNotification": {"id", "nsInstanceId", "nsLcmOpOccId", "operation",
190 "notificationType", "subscriptionId", "timestamp",
191 "notificationStatus", "operationState", "isAutomaticInvocation",
192 "affectedVnf", "affectedVl", "affectedVnffg", "affectedNs",
193 "affectedSap", "error", "_links"},
194
195 "NsIdentifierCreationNotification": {"notificationType", "subscriptionId", "timestamp",
196 "nsInstanceId", "_links"},
197
198 "NsIdentifierDeletionNotification": {"notificationType", "subscriptionId", "timestamp",
199 "nsInstanceId", "_links"},
200
201 "NsChangeNotification": {"nsInstanceId", "nsComponentType", "nsComponentId",
202 "lcmOpOccIdImpactngNsComponent", "lcmOpNameImpactingNsComponent",
203 "lcmOpOccStatusImpactingNsComponent", "notificationType", "subscriptionId",
204 "timeStamp", "error", "_links"}
205 }
206
207 def __init__(self, db) -> None:
208 """
209 Constructor of NotificationBase class.
210 :param db: Database handler.
211 """
212 super().__init__(db)
213 self.subscriber_collection = "mapped_subscriptions"
214
215 def get_models(self) -> dict:
216 """
217 Returns the SOL005 model of notification class
218 :param None
219 :return: dict of SOL005 data model
220 """
221 return NsLcmNotification.response_models
222
223 @staticmethod
224 def _format_nslcm_subscribers(subscribers: list, event_details: dict) -> list:
225 """
226 Formats the raw event details from kakfa message and subscriber details.
227 :param subscribers: A list of subscribers whom the event needs to be notified.
228 :param event_details: A dict containing all meta data of event.
229 :return:
230 """
231 notification_id = str(uuid4())
232 event_timestamp = event_details["params"]["startTime"]
233 resource_links = event_details["params"]["links"]
234 event_operation = event_details["command"]
235 for key in ["_admin", "_id", "id", "links"]:
236 event_details["params"].pop(key, None)
237 for subscriber in subscribers:
238 subscriber["id"] = notification_id
239 subscriber["timestamp"] = event_timestamp
240 subscriber["_links"] = resource_links
241 subscriber["subscriptionId"] = subscriber["reference"]
242 subscriber["operation"] = event_operation
243 del subscriber["reference"]
244 del subscriber["_id"]
245 subscriber.update(event_details["params"])
246 return subscribers
247
248 def get_subscribers(self, nsd_id: str, ns_instance_id: str, command: str, op_state: str,
249 event_details: dict) -> list:
250 """
251 Queries database and returns list of subscribers.
252 :param nsd_id: NSD id of an NS whose lifecycle has changed. (scaled, terminated. etc)
253 :param ns_instance_id: NS instance id an NS whose lifecycle has changed.
254 :param command: the command for event.
255 :param op_state: the operation state of NS.
256 :param event_details: dict containing raw data of event occured.
257 :return: List of interested subscribers for occurred event.
258 """
259 filter_q = {"identifier": [nsd_id, ns_instance_id], "operationStates": ["ANY"], "operationTypes": ["ANY"]}
260 if op_state:
261 filter_q["operationStates"].append(op_state)
262 if command:
263 filter_q["operationTypes"].append(command)
264 # self.logger.debug("Db query is: {}".format(filter_q))
265 subscribers = []
266 try:
267 subscribers = self.db.get_list(self.subscriber_collection, filter_q)
268 subscribers = self._format_nslcm_subscribers(subscribers, event_details)
269 except Exception as e:
270 error_text = type(e).__name__ + ": " + str(e)
271 self.logger.debug("Error getting nslcm subscribers: {}".format(error_text))
272 finally:
273 return subscribers
274
275
276 class NsdNotification(NotificationBase):
277
278 def __init__(self, db):
279 """
280 Constructor of the class
281 """
282 super().__init__(db)
283 # TODO will update this once support is there from subscription
284 self.response_models = {}
285 self.subscriber_collection = None
286
287
288 class VnfdNotification(NotificationBase):
289
290 def __init__(self, db):
291 """
292 Constructor of the class
293 """
294 super().__init__(db)
295 # TODO will update this once support is there from subscription
296 self.response_models = {}
297 self.subscriber_collection = None