Coverage for osm_nbi/subscriptions.py: 0%

172 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2024-06-27 02:46 +0000

1# -*- coding: utf-8 -*- 

2 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

12# implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15 

16""" 

17This module implements a thread that reads from kafka bus implementing all the subscriptions. 

18It is based on asyncio. 

19To avoid race conditions it uses same engine class as the main module for database changes 

20For the moment this module only deletes NS instances when they are terminated with the autoremove flag 

21""" 

22 

23import logging 

24import threading 

25import asyncio 

26from http import HTTPStatus 

27 

28from osm_common import dbmongo, dbmemory, msglocal, msgkafka 

29from osm_common.dbbase import DbException 

30from osm_common.msgbase import MsgException 

31from osm_nbi.engine import EngineException 

32from osm_nbi.notifications import NsLcmNotification, VnfLcmNotification 

33 

34__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>" 

35 

36 

37class SubscriptionException(Exception): 

38 def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST): 

39 self.http_code = http_code 

40 Exception.__init__(self, message) 

41 

42 

43class SubscriptionThread(threading.Thread): 

44 def __init__(self, config, engine): 

45 """ 

46 Constructor of class 

47 :param config: configuration parameters of database and messaging 

48 :param engine: an instance of Engine class, used for deleting instances 

49 """ 

50 threading.Thread.__init__(self) 

51 self.to_terminate = False 

52 self.config = config 

53 self.db = None 

54 self.msg = None 

55 self.engine = engine 

56 self.logger = logging.getLogger("nbi.subscriptions") 

57 self.aiomain_task_admin = ( 

58 None # asyncio task for receiving admin actions from kafka bus 

59 ) 

60 self.aiomain_task = ( 

61 None # asyncio task for receiving normal actions from kafka bus 

62 ) 

63 self.internal_session = { # used for a session to the engine methods 

64 "project_id": (), 

65 "set_project": (), 

66 "admin": True, 

67 "force": False, 

68 "public": None, 

69 "method": "delete", 

70 } 

71 self.nslcm = None 

72 self.vnflcm = None 

73 

74 async def start_kafka(self): 

75 # timeout_wait_for_kafka = 3*60 

76 kafka_working = True 

77 while not self.to_terminate: 

78 try: 

79 # bug 710 635. The library aiokafka does not recieve anything when the topci at kafka has not been 

80 # created. 

81 # Before subscribe, send dummy messages 

82 await self.msg.aiowrite( 

83 "admin", 

84 "echo", 

85 "dummy message", 

86 ) 

87 await self.msg.aiowrite("ns", "echo", "dummy message") 

88 await self.msg.aiowrite("nsi", "echo", "dummy message") 

89 await self.msg.aiowrite("vnf", "echo", "dummy message") 

90 if not kafka_working: 

91 self.logger.critical("kafka is working again") 

92 kafka_working = True 

93 if not self.aiomain_task_admin: 

94 await asyncio.sleep(10) 

95 self.logger.debug("Starting admin subscription task") 

96 self.aiomain_task_admin = asyncio.ensure_future( 

97 self.msg.aioread( 

98 ("admin",), 

99 group_id=False, 

100 aiocallback=self._msg_callback, 

101 ), 

102 ) 

103 if not self.aiomain_task: 

104 await asyncio.sleep(10) 

105 self.logger.debug("Starting non-admin subscription task") 

106 self.aiomain_task = asyncio.ensure_future( 

107 self.msg.aioread( 

108 ("ns", "nsi", "vnf"), 

109 aiocallback=self._msg_callback, 

110 ), 

111 ) 

112 done, _ = await asyncio.wait( 

113 [self.aiomain_task, self.aiomain_task_admin], 

114 timeout=None, 

115 return_when=asyncio.FIRST_COMPLETED, 

116 ) 

117 try: 

118 if self.aiomain_task_admin in done: 

119 exc = self.aiomain_task_admin.exception() 

120 self.logger.error( 

121 "admin subscription task exception: {}".format(exc) 

122 ) 

123 self.aiomain_task_admin = None 

124 if self.aiomain_task in done: 

125 exc = self.aiomain_task.exception() 

126 self.logger.error( 

127 "non-admin subscription task exception: {}".format(exc) 

128 ) 

129 self.aiomain_task = None 

130 except asyncio.CancelledError: 

131 pass 

132 except Exception as e: 

133 if self.to_terminate: 

134 return 

135 if kafka_working: 

136 # logging only first time 

137 self.logger.critical( 

138 "Error accessing kafka '{}'. Retrying ...".format(e) 

139 ) 

140 kafka_working = False 

141 await asyncio.sleep(10) 

142 

143 def run(self): 

144 """ 

145 Start of the thread 

146 :return: None 

147 """ 

148 try: 

149 if not self.db: 

150 if self.config["database"]["driver"] == "mongo": 

151 self.db = dbmongo.DbMongo() 

152 self.db.db_connect(self.config["database"]) 

153 elif self.config["database"]["driver"] == "memory": 

154 self.db = dbmemory.DbMemory() 

155 self.db.db_connect(self.config["database"]) 

156 else: 

157 raise SubscriptionException( 

158 "Invalid configuration param '{}' at '[database]':'driver'".format( 

159 self.config["database"]["driver"] 

160 ) 

161 ) 

162 if not self.msg: 

163 config_msg = self.config["message"].copy() 

164 if config_msg["driver"] == "local": 

165 self.msg = msglocal.MsgLocal() 

166 self.msg.connect(config_msg) 

167 elif config_msg["driver"] == "kafka": 

168 self.msg = msgkafka.MsgKafka() 

169 self.msg.connect(config_msg) 

170 else: 

171 raise SubscriptionException( 

172 "Invalid configuration param '{}' at '[message]':'driver'".format( 

173 config_msg["driver"] 

174 ) 

175 ) 

176 self.nslcm = NsLcmNotification(self.db) 

177 self.vnflcm = VnfLcmNotification(self.db) 

178 except (DbException, MsgException) as e: 

179 raise SubscriptionException(str(e), http_code=e.http_code) 

180 

181 self.logger.debug("Starting") 

182 while not self.to_terminate: 

183 try: 

184 asyncio.run(self.start_kafka()) 

185 except Exception as e: 

186 if not self.to_terminate: 

187 self.logger.exception( 

188 "Exception '{}' at messaging read loop".format(e), exc_info=True 

189 ) 

190 

191 self.logger.debug("Finishing") 

192 self._stop() 

193 

194 async def _msg_callback(self, topic, command, params): 

195 """ 

196 Callback to process a received message from kafka 

197 :param topic: topic received 

198 :param command: command received 

199 :param params: rest of parameters 

200 :return: None 

201 """ 

202 msg_to_send = [] 

203 try: 

204 if topic == "ns": 

205 if command == "terminated" and params["operationState"] in ( 

206 "COMPLETED", 

207 "PARTIALLY_COMPLETED", 

208 ): 

209 self.logger.debug("received ns terminated {}".format(params)) 

210 if params.get("autoremove"): 

211 self.engine.del_item( 

212 self.internal_session, 

213 "nsrs", 

214 _id=params["nsr_id"], 

215 not_send_msg=msg_to_send, 

216 ) 

217 self.logger.debug( 

218 "ns={} deleted from database".format(params["nsr_id"]) 

219 ) 

220 # Check for nslcm notification 

221 if isinstance(params, dict): 

222 # Check availability of operationState and command 

223 if ( 

224 (not params.get("operationState")) 

225 or (not command) 

226 or (not params.get("operationParams")) 

227 ): 

228 self.logger.debug( 

229 "Message can not be used for notification of nslcm" 

230 ) 

231 else: 

232 nsd_id = params["operationParams"].get("nsdId") 

233 ns_instance_id = params["operationParams"].get("nsInstanceId") 

234 # Any one among nsd_id, ns_instance_id should be present. 

235 if not (nsd_id or ns_instance_id): 

236 self.logger.debug( 

237 "Message can not be used for notification of nslcm" 

238 ) 

239 else: 

240 op_state = params["operationState"] 

241 event_details = { 

242 "topic": topic, 

243 "command": command.upper(), 

244 "params": params, 

245 } 

246 subscribers = self.nslcm.get_subscribers( 

247 nsd_id, 

248 ns_instance_id, 

249 command.upper(), 

250 op_state, 

251 event_details, 

252 ) 

253 # self.logger.debug("subscribers list: ") 

254 # self.logger.debug(subscribers) 

255 if subscribers: 

256 asyncio.ensure_future( 

257 self.nslcm.send_notifications(subscribers), 

258 ) 

259 else: 

260 self.logger.debug( 

261 "Message can not be used for notification of nslcm" 

262 ) 

263 elif topic == "vnf": 

264 if isinstance(params, dict): 

265 vnfd_id = params["vnfdId"] 

266 vnf_instance_id = params["vnfInstanceId"] 

267 if command == "create" or command == "delete": 

268 op_state = command 

269 else: 

270 op_state = params["operationState"] 

271 event_details = { 

272 "topic": topic, 

273 "command": command.upper(), 

274 "params": params, 

275 } 

276 subscribers = self.vnflcm.get_subscribers( 

277 vnfd_id, 

278 vnf_instance_id, 

279 command.upper(), 

280 op_state, 

281 event_details, 

282 ) 

283 if subscribers: 

284 asyncio.ensure_future( 

285 self.vnflcm.send_notifications(subscribers), 

286 ) 

287 elif topic == "nsi": 

288 if command == "terminated" and params["operationState"] in ( 

289 "COMPLETED", 

290 "PARTIALLY_COMPLETED", 

291 ): 

292 self.logger.debug("received nsi terminated {}".format(params)) 

293 if params.get("autoremove"): 

294 self.engine.del_item( 

295 self.internal_session, 

296 "nsis", 

297 _id=params["nsir_id"], 

298 not_send_msg=msg_to_send, 

299 ) 

300 self.logger.debug( 

301 "nsis={} deleted from database".format(params["nsir_id"]) 

302 ) 

303 elif topic == "admin": 

304 self.logger.debug("received {} {} {}".format(topic, command, params)) 

305 if command in ["echo", "ping"]: # ignored commands 

306 pass 

307 elif command == "revoke_token": 

308 if params: 

309 if isinstance(params, dict) and "_id" in params: 

310 tid = params.get("_id") 

311 self.engine.authenticator.tokens_cache.pop(tid, None) 

312 self.logger.debug( 

313 "token '{}' removed from token_cache".format(tid) 

314 ) 

315 else: 

316 self.logger.debug( 

317 "unrecognized params in command '{} {}': {}".format( 

318 topic, command, params 

319 ) 

320 ) 

321 else: 

322 self.engine.authenticator.tokens_cache.clear() 

323 self.logger.debug("token_cache cleared") 

324 else: 

325 self.logger.debug( 

326 "unrecognized command '{} {}'".format(topic, command) 

327 ) 

328 # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that, 

329 # but content to be written is stored at msg_to_send 

330 for msg in msg_to_send: 

331 await self.msg.aiowrite(*msg) 

332 except (EngineException, DbException, MsgException) as e: 

333 self.logger.error( 

334 "Error while processing topic={} command={}: {}".format( 

335 topic, command, e 

336 ) 

337 ) 

338 except Exception as e: 

339 self.logger.exception( 

340 "Exception while processing topic={} command={}: {}".format( 

341 topic, command, e 

342 ), 

343 exc_info=True, 

344 ) 

345 

346 def _stop(self): 

347 """ 

348 Close all connections 

349 :return: None 

350 """ 

351 try: 

352 if self.db: 

353 self.db.db_disconnect() 

354 if self.msg: 

355 self.msg.disconnect() 

356 except (DbException, MsgException) as e: 

357 raise SubscriptionException(str(e), http_code=e.http_code) 

358 

359 def terminate(self): 

360 """ 

361 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards, 

362 but not immediately. 

363 :return: None 

364 """ 

365 self.to_terminate = True 

366 if self.aiomain_task: 

367 asyncio.get_event_loop().call_soon_threadsafe(self.aiomain_task.cancel) 

368 if self.aiomain_task_admin: 

369 asyncio.get_event_loop().call_soon_threadsafe( 

370 self.aiomain_task_admin.cancel 

371 )