Coverage for osm_nbi/notifications.py: 0%

174 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-12 20:04 +0000

1# Copyright 2020 K Sai Kiran (Tata Elxsi) 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

12# implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15 

16__author__ = "K Sai Kiran <saikiran.k@tataelxsi.co.in>" 

17__date__ = "$28-Apr-2020 23:59:59$" 

18 

19import asyncio 

20import aiohttp 

21import ssl 

22import certifi 

23from http import HTTPStatus 

24import json 

25import logging 

26import time 

27from uuid import uuid4 

28 

29 

30class NotificationException(Exception): 

31 """ 

32 Notification Exception 

33 """ 

34 

35 def __init__(self, message: str, http_code: int = HTTPStatus.BAD_REQUEST) -> None: 

36 """ 

37 Constructor of notification exception 

38 :param message: String text containing exception details. 

39 :param http_code: HTTP status code of exception. 

40 """ 

41 self.http_code = http_code 

42 Exception.__init__(self, message) 

43 

44 

45class NotificationBase: 

46 response_models = None 

47 # Common HTTP payload header for all notifications. 

48 payload_header = {"Content-Type": "application/json", "Accept": "application/json"} 

49 

50 def __init__(self, db) -> None: 

51 """ 

52 Constructor of NotificationBase class. 

53 :param db: Database handler. 

54 """ 

55 self.db = db 

56 self.logger = logging.getLogger("nbi.notifications") 

57 self.subscriber_collection = None 

58 

59 def get_models(self) -> dict: 

60 """ 

61 Returns the SOL005 model of notification class 

62 :param None 

63 :return: dict of SOL005 data model 

64 """ 

65 return NotificationBase.response_models 

66 

67 def get_subscribers(self, **kwargs) -> NotificationException: 

68 """ 

69 Method should be implemented by all notification subclasses 

70 :param kwargs: any keyword arguments needed for db query. 

71 :return: List of subscribers 

72 """ 

73 raise NotificationException( 

74 "Method get_subscribers() is not implemented", 

75 http_code=HTTPStatus.NOT_IMPLEMENTED, 

76 ) 

77 

78 @staticmethod 

79 def _get_basic_auth(username: str, password: str) -> tuple: 

80 return aiohttp.BasicAuth(username, password) 

81 

82 def _decrypt_password( 

83 self, hashed: str, salt: str, schema_version: str = "1.1" 

84 ) -> str: 

85 return self.db.decrypt(hashed, schema_version, salt=salt) 

86 

87 def get_payload(self, meta_notification: dict) -> dict: 

88 """ 

89 Generates SOL005 compliant payload structure and returns them in dictionary. 

90 :param meta_notification: notification meta data which needs to be formatted as SOL005 compliant 

91 :return: A dictionary which is SOL005 compliant. 

92 """ 

93 model_name = meta_notification["notificationType"] 

94 response_models = self.get_models() 

95 if not response_models or not response_models.get(model_name): 

96 raise NotificationException( 

97 "Response model {} is not defined.".format(model_name), 

98 HTTPStatus.NOT_IMPLEMENTED, 

99 ) 

100 model_keys = response_models[model_name] 

101 payload = dict.fromkeys(model_keys, "N/A") 

102 notification_keys = set(meta_notification.keys()) 

103 for model_key in model_keys.intersection(notification_keys): 

104 payload[model_key] = meta_notification[model_key] 

105 self.logger.debug( 

106 "Payload generated for subscriber: {} for {}".format( 

107 payload["subscriptionId"], payload["notificationType"] 

108 ) 

109 ) 

110 return payload 

111 

112 async def send_notifications( 

113 self, 

114 subscribers: list, 

115 ): 

116 """ 

117 Generate tasks for all notification for an event. 

118 :param subscribers: A list of subscribers who want to be notified for event. 

119 """ 

120 notifications = [] 

121 for subscriber in subscribers: 

122 # Notify without auth 

123 if not subscriber.get("authentication"): 

124 notifications.append( 

125 { 

126 "headers": self.payload_header, 

127 "payload": self.get_payload(subscriber), 

128 "CallbackUri": subscriber["CallbackUri"], 

129 } 

130 ) 

131 elif subscriber["authentication"]["authType"] == "basic": 

132 salt = subscriber["subscriptionId"] 

133 hashed_password = subscriber["authentication"]["paramsBasic"][ 

134 "password" 

135 ] 

136 password = self._decrypt_password(hashed_password, salt) 

137 auth_basic = self._get_basic_auth( 

138 subscriber["authentication"]["paramsBasic"]["userName"], password 

139 ) 

140 notifications.append( 

141 { 

142 "headers": self.payload_header, 

143 "payload": self.get_payload(subscriber), 

144 "auth_basic": auth_basic, 

145 "CallbackUri": subscriber["CallbackUri"], 

146 } 

147 ) 

148 # TODO add support for AuthType OAuth and TLS after support is added in subscription. 

149 else: 

150 self.logger.debug( 

151 "Subscriber {} can not be notified. {} notification auth type is not implemented".format( 

152 subscriber["subscriptionId"], 

153 subscriber["authentication"]["authType"], 

154 ) 

155 ) 

156 

157 if notifications: 

158 tasks = [] 

159 ssl_context = ssl.create_default_context(cafile=certifi.where()) 

160 conn = aiohttp.TCPConnector(ssl=ssl_context) 

161 async with aiohttp.ClientSession(connector=conn) as session: 

162 for notification in notifications: 

163 tasks.append( 

164 asyncio.ensure_future( 

165 self.send_notification(session, notification), 

166 ) 

167 ) 

168 await asyncio.gather(*tasks) 

169 

170 async def send_notification( 

171 self, 

172 session: aiohttp.ClientSession, 

173 notification: dict, 

174 retry_count: int = 5, 

175 timeout: float = 5.0, 

176 ): 

177 """ 

178 Performs HTTP Post request to notify subscriber. In case if for any reason notification is not sent successfully 

179 after maximum number of reties, then notification is dropped. 

180 :param session: An aiohttp client session object to maintain http session. 

181 :param notification: A dictionary containing all necessary data to make POST request. 

182 :param retry_count: An integer specifying the maximum number of reties for a notification. 

183 :param timeout: A float representing client timeout of each HTTP request. 

184 """ 

185 backoff_delay = 1 

186 while retry_count > 0: 

187 try: 

188 async with session.post( 

189 url=notification["CallbackUri"], 

190 headers=notification["headers"], 

191 auth=notification.get("auth_basic", None), 

192 data=json.dumps(notification["payload"]), 

193 timeout=timeout, 

194 ) as resp: 

195 # self.logger.debug("Notification response: {}".format(resp.status)) 

196 if resp.status == HTTPStatus.NO_CONTENT: 

197 self.logger.debug( 

198 "Notification sent successfully to subscriber {}".format( 

199 notification["payload"]["subscriptionId"] 

200 ) 

201 ) 

202 else: 

203 error_text = "Erroneous response code: {}, ".format(resp.status) 

204 error_text += await resp.text() 

205 raise NotificationException(error_text) 

206 return True 

207 except Exception as e: 

208 error_text = type(e).__name__ + ": " + str(e) 

209 self.logger.debug( 

210 "Unable to send notification to subscriber {}. Details: {}".format( 

211 notification["payload"]["subscriptionId"], error_text 

212 ) 

213 ) 

214 error_detail = { 

215 "error": type(e).__name__, 

216 "error_text": str(e), 

217 "timestamp": time.time(), 

218 } 

219 if "error_details" in notification["payload"].keys(): 

220 notification["payload"]["error_details"].append(error_detail) 

221 else: 

222 notification["payload"]["error_details"] = [error_detail] 

223 retry_count -= 1 

224 backoff_delay *= 2 

225 self.logger.debug( 

226 "Retry Notification for subscriber: {} after backoff delay: {} seconds.".format( 

227 notification["payload"]["subscriptionId"], backoff_delay 

228 ) 

229 ) 

230 await asyncio.sleep(backoff_delay) 

231 # Dropping notification 

232 self.logger.debug( 

233 "Notification {} sent failed to subscriber:{}.".format( 

234 notification["payload"]["notificationType"], 

235 notification["payload"]["subscriptionId"], 

236 ) 

237 ) 

238 return False 

239 

240 

241class NsLcmNotification(NotificationBase): 

242 # maps kafka commands of completed operations to the original operation type 

243 completed_operation_map = { 

244 "INSTANTIATED": "INSTANTIATE", 

245 "SCALED": "SCALE", 

246 "TERMINATED": "TERMINATE", 

247 "UPDATED": "UPDATE", 

248 "HEALED": "HEAL", 

249 } 

250 # SOL005 response model for nslcm notifications 

251 response_models = { 

252 "NsLcmOperationOccurrenceNotification": { 

253 "id", 

254 "nsInstanceId", 

255 "nsLcmOpOccId", 

256 "operation", 

257 "notificationType", 

258 "subscriptionId", 

259 "timestamp", 

260 "notificationStatus", 

261 "operationState", 

262 "isAutomaticInvocation", 

263 "affectedVnf", 

264 "affectedVl", 

265 "affectedVnffg", 

266 "affectedNs", 

267 "affectedSap", 

268 "error", 

269 "_links", 

270 }, 

271 "NsIdentifierCreationNotification": { 

272 "notificationType", 

273 "subscriptionId", 

274 "timestamp", 

275 "nsInstanceId", 

276 "_links", 

277 }, 

278 "NsIdentifierDeletionNotification": { 

279 "notificationType", 

280 "subscriptionId", 

281 "timestamp", 

282 "nsInstanceId", 

283 "_links", 

284 }, 

285 "NsChangeNotification": { 

286 "nsInstanceId", 

287 "nsComponentType", 

288 "nsComponentId", 

289 "lcmOpOccIdImpactngNsComponent", 

290 "lcmOpNameImpactingNsComponent", 

291 "lcmOpOccStatusImpactingNsComponent", 

292 "notificationType", 

293 "subscriptionId", 

294 "timeStamp", 

295 "error", 

296 "_links", 

297 }, 

298 } 

299 

300 def __init__(self, db) -> None: 

301 """ 

302 Constructor of NsLcmNotification class. 

303 :param db: Database handler. 

304 """ 

305 super().__init__(db) 

306 self.subscriber_collection = "mapped_subscriptions" 

307 

308 def get_models(self) -> dict: 

309 """ 

310 Returns the SOL005 model of notification class 

311 :param None 

312 :return: dict of SOL005 data model 

313 """ 

314 return NsLcmNotification.response_models 

315 

316 @staticmethod 

317 def _format_nslcm_subscribers(subscribers: list, event_details: dict) -> list: 

318 """ 

319 Formats the raw event details from kakfa message and subscriber details. 

320 :param subscribers: A list of subscribers whom the event needs to be notified. 

321 :param event_details: A dict containing all meta data of event. 

322 :return: 

323 """ 

324 notification_id = str(uuid4()) 

325 event_timestamp = event_details["params"]["startTime"] 

326 resource_links = event_details["params"]["links"] 

327 event_operation = event_details["command"] 

328 for key in ["_admin", "_id", "id", "links"]: 

329 event_details["params"].pop(key, None) 

330 for subscriber in subscribers: 

331 subscriber["id"] = notification_id 

332 subscriber["timestamp"] = event_timestamp 

333 subscriber["_links"] = resource_links 

334 subscriber["subscriptionId"] = subscriber["reference"] 

335 subscriber["operation"] = event_operation 

336 del subscriber["reference"] 

337 del subscriber["_id"] 

338 subscriber.update(event_details["params"]) 

339 return subscribers 

340 

341 def get_subscribers( 

342 self, 

343 nsd_id: str, 

344 ns_instance_id: str, 

345 command: str, 

346 op_state: str, 

347 event_details: dict, 

348 ) -> list: 

349 """ 

350 Queries database and returns list of subscribers. 

351 :param nsd_id: NSD id of an NS whose lifecycle has changed. (scaled, terminated. etc) 

352 :param ns_instance_id: NS instance id an NS whose lifecycle has changed. 

353 :param command: the command for event. 

354 :param op_state: the operation state of NS. 

355 :param event_details: dict containing raw data of event occured. 

356 :return: List of interested subscribers for occurred event. 

357 """ 

358 notification_type = [ 

359 "NsLcmOperationOccurrenceNotification", 

360 "NsChangeNotification", 

361 "NsIdentifierCreationNotification", 

362 "NsIdentifierDeletionNotification", 

363 ] 

364 filter_q = { 

365 "identifier": [nsd_id, ns_instance_id], 

366 "operationStates": ["ANY"], 

367 "operationTypes": ["ANY"], 

368 "notificationType": notification_type, 

369 } 

370 if op_state: 

371 filter_q["operationStates"].append(op_state) 

372 if command: 

373 op_type = self.completed_operation_map.get(command, command) 

374 filter_q["operationTypes"].append(op_type) 

375 # self.logger.debug("Db query is: {}".format(filter_q)) 

376 subscribers = [] 

377 try: 

378 subscribers = self.db.get_list(self.subscriber_collection, filter_q) 

379 subscribers = self._format_nslcm_subscribers(subscribers, event_details) 

380 except Exception as e: 

381 error_text = type(e).__name__ + ": " + str(e) 

382 self.logger.debug("Error getting nslcm subscribers: {}".format(error_text)) 

383 finally: 

384 return subscribers 

385 

386 

387class VnfLcmNotification(NotificationBase): 

388 # SOL003 response model for vnflcm notifications 

389 response_models = { 

390 "VnfLcmOperationOccurrenceNotification": { 

391 "id", 

392 "notificationType", 

393 "subscriptionId", 

394 "timeStamp", 

395 "notificationStatus", 

396 "operationState", 

397 "vnfInstanceId", 

398 "operation", 

399 "isAutomaticInvocation", 

400 "vnfLcmOpOccId", 

401 "affectedVnfcs", 

402 "affectedVirtualLinks", 

403 "affectedExtLinkPorts", 

404 "affectedVirtualStorages", 

405 "changedInfo", 

406 "changedExtConnectivity", 

407 "modificationsTriggeredByVnfPkgChange", 

408 "error", 

409 "_links", 

410 }, 

411 "VnfIdentifierCreationNotification": { 

412 "id", 

413 "notificationType", 

414 "subscriptionId", 

415 "timeStamp", 

416 "vnfInstanceId", 

417 "_links", 

418 }, 

419 "VnfIdentifierDeletionNotification": { 

420 "id", 

421 "notificationType", 

422 "subscriptionId", 

423 "timeStamp", 

424 "vnfInstanceId", 

425 "_links", 

426 }, 

427 } 

428 

429 def __init__(self, db) -> None: 

430 """ 

431 Constructor of VnfLcmNotification class. 

432 :param db: Database handler. 

433 """ 

434 super().__init__(db) 

435 self.subscriber_collection = "mapped_subscriptions" 

436 

437 def get_models(self) -> dict: 

438 """ 

439 Returns the SOL003 model of notification class 

440 :param None 

441 :return: dict of SOL003 data model 

442 """ 

443 return self.response_models 

444 

445 def _format_vnflcm_subscribers( 

446 self, subscribers: list, event_details: dict 

447 ) -> list: 

448 """ 

449 Formats the raw event details from kafka message and subscriber details. 

450 :param subscribers: A list of subscribers whom the event needs to be notified. 

451 :param event_details: A dict containing all meta data of event. 

452 :return: 

453 """ 

454 notification_id = str(uuid4()) 

455 event_timestamp = time.time() 

456 event_operation = event_details["command"] 

457 for subscriber in subscribers: 

458 subscriber["id"] = notification_id 

459 subscriber["timeStamp"] = event_timestamp 

460 subscriber["subscriptionId"] = subscriber["reference"] 

461 subscriber["operation"] = event_operation 

462 del subscriber["reference"] 

463 del subscriber["_id"] 

464 subscriber.update(event_details["params"]) 

465 return subscribers 

466 

467 def get_subscribers( 

468 self, 

469 vnfd_id: str, 

470 vnf_instance_id: str, 

471 command: str, 

472 op_state: str, 

473 event_details: dict, 

474 ) -> list: 

475 """ 

476 Queries database and returns list of subscribers. 

477 :param vnfd_id: Vnfd id of a VNF whose lifecycle has changed. (instantiated, scaled, terminated. etc) 

478 :param vnf_instance_id: Vnf instance id of a VNF whose lifecycle has changed. 

479 :param command: the command for event. 

480 :param op_state: the operation state of VNF. 

481 :param event_details: dict containing raw data of event occurred. 

482 :return: List of interested subscribers for occurred event. 

483 """ 

484 notification_type = [ 

485 "VnfIdentifierCreationNotification", 

486 "VnfLcmOperationOccurrenceNotification", 

487 "VnfIdentifierDeletionNotification", 

488 ] 

489 filter_q = { 

490 "identifier": [vnfd_id, vnf_instance_id], 

491 "operationStates": ["ANY"], 

492 "operationTypes": ["ANY"], 

493 "notificationType": notification_type, 

494 } 

495 if op_state: 

496 filter_q["operationStates"].append(op_state) 

497 if command: 

498 filter_q["operationTypes"].append(command) 

499 subscribers = [] 

500 try: 

501 subscribers = self.db.get_list(self.subscriber_collection, filter_q) 

502 subscribers = self._format_vnflcm_subscribers(subscribers, event_details) 

503 except Exception as e: 

504 error_text = type(e).__name__ + ": " + str(e) 

505 self.logger.debug("Error getting vnflcm subscribers: {}".format(error_text)) 

506 finally: 

507 return subscribers 

508 

509 

510class NsdNotification(NotificationBase): 

511 def __init__(self, db): 

512 """ 

513 Constructor of the class 

514 """ 

515 super().__init__(db) 

516 # TODO will update this once support is there from subscription 

517 self.response_models = {} 

518 self.subscriber_collection = None 

519 

520 

521class VnfdNotification(NotificationBase): 

522 def __init__(self, db): 

523 """ 

524 Constructor of the class 

525 """ 

526 super().__init__(db) 

527 # TODO will update this once support is there from subscription 

528 self.response_models = {} 

529 self.subscriber_collection = None