Coverage for osm_nbi/notifications.py: 0%

170 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2024-06-29 09:05 +0000

1# Copyright 2020 K Sai Kiran (Tata Elxsi) 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

12# implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15 

16__author__ = "K Sai Kiran <saikiran.k@tataelxsi.co.in>" 

17__date__ = "$28-Apr-2020 23:59:59$" 

18 

19import asyncio 

20import aiohttp 

21from http import HTTPStatus 

22import json 

23import logging 

24import time 

25from uuid import uuid4 

26 

27 

28class NotificationException(Exception): 

29 """ 

30 Notification Exception 

31 """ 

32 

33 def __init__(self, message: str, http_code: int = HTTPStatus.BAD_REQUEST) -> None: 

34 """ 

35 Constructor of notification exception 

36 :param message: String text containing exception details. 

37 :param http_code: HTTP status code of exception. 

38 """ 

39 self.http_code = http_code 

40 Exception.__init__(self, message) 

41 

42 

43class NotificationBase: 

44 response_models = None 

45 # Common HTTP payload header for all notifications. 

46 payload_header = {"Content-Type": "application/json", "Accept": "application/json"} 

47 

48 def __init__(self, db) -> None: 

49 """ 

50 Constructor of NotificationBase class. 

51 :param db: Database handler. 

52 """ 

53 self.db = db 

54 self.logger = logging.getLogger("nbi.notifications") 

55 self.subscriber_collection = None 

56 

57 def get_models(self) -> dict: 

58 """ 

59 Returns the SOL005 model of notification class 

60 :param None 

61 :return: dict of SOL005 data model 

62 """ 

63 return NotificationBase.response_models 

64 

65 def get_subscribers(self, **kwargs) -> NotificationException: 

66 """ 

67 Method should be implemented by all notification subclasses 

68 :param kwargs: any keyword arguments needed for db query. 

69 :return: List of subscribers 

70 """ 

71 raise NotificationException( 

72 "Method get_subscribers() is not implemented", 

73 http_code=HTTPStatus.NOT_IMPLEMENTED, 

74 ) 

75 

76 @staticmethod 

77 def _get_basic_auth(username: str, password: str) -> tuple: 

78 return aiohttp.BasicAuth(username, password) 

79 

80 def _decrypt_password( 

81 self, hashed: str, salt: str, schema_version: str = "1.1" 

82 ) -> str: 

83 return self.db.decrypt(hashed, schema_version, salt=salt) 

84 

85 def get_payload(self, meta_notification: dict) -> dict: 

86 """ 

87 Generates SOL005 compliant payload structure and returns them in dictionary. 

88 :param meta_notification: notification meta data which needs to be formatted as SOL005 compliant 

89 :return: A dictionary which is SOL005 compliant. 

90 """ 

91 model_name = meta_notification["notificationType"] 

92 response_models = self.get_models() 

93 if not response_models or not response_models.get(model_name): 

94 raise NotificationException( 

95 "Response model {} is not defined.".format(model_name), 

96 HTTPStatus.NOT_IMPLEMENTED, 

97 ) 

98 model_keys = response_models[model_name] 

99 payload = dict.fromkeys(model_keys, "N/A") 

100 notification_keys = set(meta_notification.keys()) 

101 for model_key in model_keys.intersection(notification_keys): 

102 payload[model_key] = meta_notification[model_key] 

103 self.logger.debug( 

104 "Payload generated for subscriber: {} for {}".format( 

105 payload["subscriptionId"], payload["notificationType"] 

106 ) 

107 ) 

108 return payload 

109 

110 async def send_notifications( 

111 self, 

112 subscribers: list, 

113 ): 

114 """ 

115 Generate tasks for all notification for an event. 

116 :param subscribers: A list of subscribers who want to be notified for event. 

117 """ 

118 notifications = [] 

119 for subscriber in subscribers: 

120 # Notify without auth 

121 if not subscriber.get("authentication"): 

122 notifications.append( 

123 { 

124 "headers": self.payload_header, 

125 "payload": self.get_payload(subscriber), 

126 "CallbackUri": subscriber["CallbackUri"], 

127 } 

128 ) 

129 elif subscriber["authentication"]["authType"] == "basic": 

130 salt = subscriber["subscriptionId"] 

131 hashed_password = subscriber["authentication"]["paramsBasic"][ 

132 "password" 

133 ] 

134 password = self._decrypt_password(hashed_password, salt) 

135 auth_basic = self._get_basic_auth( 

136 subscriber["authentication"]["paramsBasic"]["userName"], password 

137 ) 

138 notifications.append( 

139 { 

140 "headers": self.payload_header, 

141 "payload": self.get_payload(subscriber), 

142 "auth_basic": auth_basic, 

143 "CallbackUri": subscriber["CallbackUri"], 

144 } 

145 ) 

146 # TODO add support for AuthType OAuth and TLS after support is added in subscription. 

147 else: 

148 self.logger.debug( 

149 "Subscriber {} can not be notified. {} notification auth type is not implemented".format( 

150 subscriber["subscriptionId"], 

151 subscriber["authentication"]["authType"], 

152 ) 

153 ) 

154 

155 if notifications: 

156 tasks = [] 

157 async with aiohttp.ClientSession() as session: 

158 for notification in notifications: 

159 tasks.append( 

160 asyncio.ensure_future( 

161 self.send_notification(session, notification), 

162 ) 

163 ) 

164 await asyncio.gather(*tasks) 

165 

166 async def send_notification( 

167 self, 

168 session: aiohttp.ClientSession, 

169 notification: dict, 

170 retry_count: int = 5, 

171 timeout: float = 5.0, 

172 ): 

173 """ 

174 Performs HTTP Post request to notify subscriber. In case if for any reason notification is not sent successfully 

175 after maximum number of reties, then notification is dropped. 

176 :param session: An aiohttp client session object to maintain http session. 

177 :param notification: A dictionary containing all necessary data to make POST request. 

178 :param retry_count: An integer specifying the maximum number of reties for a notification. 

179 :param timeout: A float representing client timeout of each HTTP request. 

180 """ 

181 backoff_delay = 1 

182 while retry_count > 0: 

183 try: 

184 async with session.post( 

185 url=notification["CallbackUri"], 

186 headers=notification["headers"], 

187 auth=notification.get("auth_basic", None), 

188 data=json.dumps(notification["payload"]), 

189 timeout=timeout, 

190 ) as resp: 

191 # self.logger.debug("Notification response: {}".format(resp.status)) 

192 if resp.status == HTTPStatus.NO_CONTENT: 

193 self.logger.debug( 

194 "Notification sent successfully to subscriber {}".format( 

195 notification["payload"]["subscriptionId"] 

196 ) 

197 ) 

198 else: 

199 error_text = "Erroneous response code: {}, ".format(resp.status) 

200 error_text += await resp.text() 

201 raise NotificationException(error_text) 

202 return True 

203 except Exception as e: 

204 error_text = type(e).__name__ + ": " + str(e) 

205 self.logger.debug( 

206 "Unable to send notification to subscriber {}. Details: {}".format( 

207 notification["payload"]["subscriptionId"], error_text 

208 ) 

209 ) 

210 error_detail = { 

211 "error": type(e).__name__, 

212 "error_text": str(e), 

213 "timestamp": time.time(), 

214 } 

215 if "error_details" in notification["payload"].keys(): 

216 notification["payload"]["error_details"].append(error_detail) 

217 else: 

218 notification["payload"]["error_details"] = [error_detail] 

219 retry_count -= 1 

220 backoff_delay *= 2 

221 self.logger.debug( 

222 "Retry Notification for subscriber: {} after backoff delay: {} seconds.".format( 

223 notification["payload"]["subscriptionId"], backoff_delay 

224 ) 

225 ) 

226 await asyncio.sleep(backoff_delay) 

227 # Dropping notification 

228 self.logger.debug( 

229 "Notification {} sent failed to subscriber:{}.".format( 

230 notification["payload"]["notificationType"], 

231 notification["payload"]["subscriptionId"], 

232 ) 

233 ) 

234 return False 

235 

236 

237class NsLcmNotification(NotificationBase): 

238 # maps kafka commands of completed operations to the original operation type 

239 completed_operation_map = { 

240 "INSTANTIATED": "INSTANTIATE", 

241 "SCALED": "SCALE", 

242 "TERMINATED": "TERMINATE", 

243 "UPDATED": "UPDATE", 

244 "HEALED": "HEAL", 

245 } 

246 # SOL005 response model for nslcm notifications 

247 response_models = { 

248 "NsLcmOperationOccurrenceNotification": { 

249 "id", 

250 "nsInstanceId", 

251 "nsLcmOpOccId", 

252 "operation", 

253 "notificationType", 

254 "subscriptionId", 

255 "timestamp", 

256 "notificationStatus", 

257 "operationState", 

258 "isAutomaticInvocation", 

259 "affectedVnf", 

260 "affectedVl", 

261 "affectedVnffg", 

262 "affectedNs", 

263 "affectedSap", 

264 "error", 

265 "_links", 

266 }, 

267 "NsIdentifierCreationNotification": { 

268 "notificationType", 

269 "subscriptionId", 

270 "timestamp", 

271 "nsInstanceId", 

272 "_links", 

273 }, 

274 "NsIdentifierDeletionNotification": { 

275 "notificationType", 

276 "subscriptionId", 

277 "timestamp", 

278 "nsInstanceId", 

279 "_links", 

280 }, 

281 "NsChangeNotification": { 

282 "nsInstanceId", 

283 "nsComponentType", 

284 "nsComponentId", 

285 "lcmOpOccIdImpactngNsComponent", 

286 "lcmOpNameImpactingNsComponent", 

287 "lcmOpOccStatusImpactingNsComponent", 

288 "notificationType", 

289 "subscriptionId", 

290 "timeStamp", 

291 "error", 

292 "_links", 

293 }, 

294 } 

295 

296 def __init__(self, db) -> None: 

297 """ 

298 Constructor of NsLcmNotification class. 

299 :param db: Database handler. 

300 """ 

301 super().__init__(db) 

302 self.subscriber_collection = "mapped_subscriptions" 

303 

304 def get_models(self) -> dict: 

305 """ 

306 Returns the SOL005 model of notification class 

307 :param None 

308 :return: dict of SOL005 data model 

309 """ 

310 return NsLcmNotification.response_models 

311 

312 @staticmethod 

313 def _format_nslcm_subscribers(subscribers: list, event_details: dict) -> list: 

314 """ 

315 Formats the raw event details from kakfa message and subscriber details. 

316 :param subscribers: A list of subscribers whom the event needs to be notified. 

317 :param event_details: A dict containing all meta data of event. 

318 :return: 

319 """ 

320 notification_id = str(uuid4()) 

321 event_timestamp = event_details["params"]["startTime"] 

322 resource_links = event_details["params"]["links"] 

323 event_operation = event_details["command"] 

324 for key in ["_admin", "_id", "id", "links"]: 

325 event_details["params"].pop(key, None) 

326 for subscriber in subscribers: 

327 subscriber["id"] = notification_id 

328 subscriber["timestamp"] = event_timestamp 

329 subscriber["_links"] = resource_links 

330 subscriber["subscriptionId"] = subscriber["reference"] 

331 subscriber["operation"] = event_operation 

332 del subscriber["reference"] 

333 del subscriber["_id"] 

334 subscriber.update(event_details["params"]) 

335 return subscribers 

336 

337 def get_subscribers( 

338 self, 

339 nsd_id: str, 

340 ns_instance_id: str, 

341 command: str, 

342 op_state: str, 

343 event_details: dict, 

344 ) -> list: 

345 """ 

346 Queries database and returns list of subscribers. 

347 :param nsd_id: NSD id of an NS whose lifecycle has changed. (scaled, terminated. etc) 

348 :param ns_instance_id: NS instance id an NS whose lifecycle has changed. 

349 :param command: the command for event. 

350 :param op_state: the operation state of NS. 

351 :param event_details: dict containing raw data of event occured. 

352 :return: List of interested subscribers for occurred event. 

353 """ 

354 notification_type = [ 

355 "NsLcmOperationOccurrenceNotification", 

356 "NsChangeNotification", 

357 "NsIdentifierCreationNotification", 

358 "NsIdentifierDeletionNotification", 

359 ] 

360 filter_q = { 

361 "identifier": [nsd_id, ns_instance_id], 

362 "operationStates": ["ANY"], 

363 "operationTypes": ["ANY"], 

364 "notificationType": notification_type, 

365 } 

366 if op_state: 

367 filter_q["operationStates"].append(op_state) 

368 if command: 

369 op_type = self.completed_operation_map.get(command, command) 

370 filter_q["operationTypes"].append(op_type) 

371 # self.logger.debug("Db query is: {}".format(filter_q)) 

372 subscribers = [] 

373 try: 

374 subscribers = self.db.get_list(self.subscriber_collection, filter_q) 

375 subscribers = self._format_nslcm_subscribers(subscribers, event_details) 

376 except Exception as e: 

377 error_text = type(e).__name__ + ": " + str(e) 

378 self.logger.debug("Error getting nslcm subscribers: {}".format(error_text)) 

379 finally: 

380 return subscribers 

381 

382 

383class VnfLcmNotification(NotificationBase): 

384 # SOL003 response model for vnflcm notifications 

385 response_models = { 

386 "VnfLcmOperationOccurrenceNotification": { 

387 "id", 

388 "notificationType", 

389 "subscriptionId", 

390 "timeStamp", 

391 "notificationStatus", 

392 "operationState", 

393 "vnfInstanceId", 

394 "operation", 

395 "isAutomaticInvocation", 

396 "vnfLcmOpOccId", 

397 "affectedVnfcs", 

398 "affectedVirtualLinks", 

399 "affectedExtLinkPorts", 

400 "affectedVirtualStorages", 

401 "changedInfo", 

402 "changedExtConnectivity", 

403 "modificationsTriggeredByVnfPkgChange", 

404 "error", 

405 "_links", 

406 }, 

407 "VnfIdentifierCreationNotification": { 

408 "id", 

409 "notificationType", 

410 "subscriptionId", 

411 "timeStamp", 

412 "vnfInstanceId", 

413 "_links", 

414 }, 

415 "VnfIdentifierDeletionNotification": { 

416 "id", 

417 "notificationType", 

418 "subscriptionId", 

419 "timeStamp", 

420 "vnfInstanceId", 

421 "_links", 

422 }, 

423 } 

424 

425 def __init__(self, db) -> None: 

426 """ 

427 Constructor of VnfLcmNotification class. 

428 :param db: Database handler. 

429 """ 

430 super().__init__(db) 

431 self.subscriber_collection = "mapped_subscriptions" 

432 

433 def get_models(self) -> dict: 

434 """ 

435 Returns the SOL003 model of notification class 

436 :param None 

437 :return: dict of SOL003 data model 

438 """ 

439 return self.response_models 

440 

441 def _format_vnflcm_subscribers( 

442 self, subscribers: list, event_details: dict 

443 ) -> list: 

444 """ 

445 Formats the raw event details from kafka message and subscriber details. 

446 :param subscribers: A list of subscribers whom the event needs to be notified. 

447 :param event_details: A dict containing all meta data of event. 

448 :return: 

449 """ 

450 notification_id = str(uuid4()) 

451 event_timestamp = time.time() 

452 event_operation = event_details["command"] 

453 for subscriber in subscribers: 

454 subscriber["id"] = notification_id 

455 subscriber["timeStamp"] = event_timestamp 

456 subscriber["subscriptionId"] = subscriber["reference"] 

457 subscriber["operation"] = event_operation 

458 del subscriber["reference"] 

459 del subscriber["_id"] 

460 subscriber.update(event_details["params"]) 

461 return subscribers 

462 

463 def get_subscribers( 

464 self, 

465 vnfd_id: str, 

466 vnf_instance_id: str, 

467 command: str, 

468 op_state: str, 

469 event_details: dict, 

470 ) -> list: 

471 """ 

472 Queries database and returns list of subscribers. 

473 :param vnfd_id: Vnfd id of a VNF whose lifecycle has changed. (instantiated, scaled, terminated. etc) 

474 :param vnf_instance_id: Vnf instance id of a VNF whose lifecycle has changed. 

475 :param command: the command for event. 

476 :param op_state: the operation state of VNF. 

477 :param event_details: dict containing raw data of event occurred. 

478 :return: List of interested subscribers for occurred event. 

479 """ 

480 notification_type = [ 

481 "VnfIdentifierCreationNotification", 

482 "VnfLcmOperationOccurrenceNotification", 

483 "VnfIdentifierDeletionNotification", 

484 ] 

485 filter_q = { 

486 "identifier": [vnfd_id, vnf_instance_id], 

487 "operationStates": ["ANY"], 

488 "operationTypes": ["ANY"], 

489 "notificationType": notification_type, 

490 } 

491 if op_state: 

492 filter_q["operationStates"].append(op_state) 

493 if command: 

494 filter_q["operationTypes"].append(command) 

495 subscribers = [] 

496 try: 

497 subscribers = self.db.get_list(self.subscriber_collection, filter_q) 

498 subscribers = self._format_vnflcm_subscribers(subscribers, event_details) 

499 except Exception as e: 

500 error_text = type(e).__name__ + ": " + str(e) 

501 self.logger.debug("Error getting vnflcm subscribers: {}".format(error_text)) 

502 finally: 

503 return subscribers 

504 

505 

506class NsdNotification(NotificationBase): 

507 def __init__(self, db): 

508 """ 

509 Constructor of the class 

510 """ 

511 super().__init__(db) 

512 # TODO will update this once support is there from subscription 

513 self.response_models = {} 

514 self.subscriber_collection = None 

515 

516 

517class VnfdNotification(NotificationBase): 

518 def __init__(self, db): 

519 """ 

520 Constructor of the class 

521 """ 

522 super().__init__(db) 

523 # TODO will update this once support is there from subscription 

524 self.response_models = {} 

525 self.subscriber_collection = None