Coverage for NG-RO/osm_ng_ro/vim_admin.py: 17%

223 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-14 12:04 +0000

1# -*- coding: utf-8 -*- 

2 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

12# implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15 

16""" 

17This module implements a thread that reads from kafka bus reading VIM messages. 

18It is based on asyncio. 

19It is in charge of load tasks assigned to VIMs that nobody is in chage of it 

20""" 

21 

22import asyncio 

23from http import HTTPStatus 

24import logging 

25import threading 

26from time import time 

27 

28from osm_common import dbmemory, dbmongo, msgkafka, msglocal 

29from osm_common.dbbase import DbException 

30from osm_common.msgbase import MsgException 

31 

32__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>" 

33 

34 

35class VimAdminException(Exception): 

36 def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST): 

37 self.http_code = http_code 

38 Exception.__init__(self, message) 

39 

40 

41class LockRenew: 

42 renew_list = [] 

43 # ^ static method, common for all RO. Time ordered list of dictionaries with information of locks that needs to 

44 # be renewed. The time order is achieved as it is appended at the end 

45 

46 def __init__(self, config, logger): 

47 """ 

48 Constructor of class 

49 :param config: configuration parameters of database and messaging 

50 """ 

51 self.config = config 

52 self.logger = logger 

53 self.to_terminate = False 

54 self.db = None 

55 self.task_locked_time = config["global"]["task_locked_time"] 

56 self.task_relock_time = config["global"]["task_relock_time"] 

57 self.task_max_locked_time = config["global"]["task_max_locked_time"] 

58 

59 def start(self, db): 

60 self.db = db 

61 

62 @staticmethod 

63 def add_lock_object(database_table, database_object, thread_object): 

64 """ 

65 Insert a task to renew the locking 

66 :param database_table: database collection where to maintain the lock 

67 :param database_object: database object. '_id' and 'locked_at' are mandatory keys 

68 :param thread_object: Thread object that has locked to check if it is alive 

69 :return: a locked_object needed for calling remove_lock_object. It will contain uptodya database 'locked_at' 

70 """ 

71 lock_object = { 

72 "table": database_table, 

73 "_id": database_object["_id"], 

74 "initial_lock_time": database_object["locked_at"], 

75 "locked_at": database_object["locked_at"], 

76 "thread": thread_object, 

77 "unlocked": False, # True when it is not needed any more 

78 } 

79 LockRenew.renew_list.append(lock_object) 

80 

81 return lock_object 

82 

83 @staticmethod 

84 def remove_lock_object(lock_object): 

85 lock_object["unlocked"] = True 

86 

87 async def renew_locks(self): 

88 while not self.to_terminate: 

89 if not self.renew_list: 

90 await asyncio.sleep(self.task_locked_time - self.task_relock_time) 

91 continue 

92 

93 lock_object = self.renew_list[0] 

94 

95 if ( 

96 lock_object["unlocked"] 

97 or not lock_object["thread"] 

98 or not lock_object["thread"].is_alive() 

99 ): 

100 # task has been finished or locker thread is dead, not needed to re-locked. 

101 self.renew_list.pop(0) 

102 continue 

103 

104 locked_at = lock_object["locked_at"] 

105 now = time() 

106 time_to_relock = ( 

107 locked_at + self.task_locked_time - self.task_relock_time - now 

108 ) 

109 

110 if time_to_relock < 1: 

111 if lock_object["initial_lock_time"] + self.task_max_locked_time < now: 

112 self.renew_list.pop(0) 

113 # re-lock 

114 new_locked_at = locked_at + self.task_locked_time 

115 

116 try: 

117 if self.db.set_one( 

118 lock_object["table"], 

119 update_dict={ 

120 "locked_at": new_locked_at, 

121 "modified_at": now, 

122 }, 

123 q_filter={ 

124 "_id": lock_object["_id"], 

125 "locked_at": locked_at, 

126 }, 

127 fail_on_empty=False, 

128 ): 

129 self.logger.debug( 

130 "Renew lock for {}.{}".format( 

131 lock_object["table"], lock_object["_id"] 

132 ) 

133 ) 

134 lock_object["locked_at"] = new_locked_at 

135 self.renew_list.append(lock_object) 

136 else: 

137 self.logger.info( 

138 "Cannot renew lock for {}.{}".format( 

139 lock_object["table"], lock_object["_id"] 

140 ) 

141 ) 

142 except Exception as e: 

143 self.logger.error( 

144 "Exception when trying to renew lock for {}.{}: {}".format( 

145 lock_object["table"], lock_object["_id"], e 

146 ) 

147 ) 

148 else: 

149 # wait until it is time to re-lock it 

150 await asyncio.sleep(time_to_relock) 

151 

152 def stop(self): 

153 # unlock all locked items 

154 now = time() 

155 

156 for lock_object in self.renew_list: 

157 locked_at = lock_object["locked_at"] 

158 

159 if not lock_object["unlocked"] or locked_at + self.task_locked_time >= now: 

160 self.db.set_one( 

161 lock_object["table"], 

162 update_dict={"locked_at": 0}, 

163 q_filter={"_id": lock_object["_id"], "locked_at": locked_at}, 

164 fail_on_empty=False, 

165 ) 

166 

167 

168class VimAdminThread(threading.Thread): 

169 MAX_TIME_UNATTENDED = 600 # 10min 

170 TIME_CHECK_UNUSED_VIM = 3600 * 2 # 2h 

171 kafka_topics = ("vim_account", "wim_account", "sdn") 

172 

173 def __init__(self, config, engine): 

174 """ 

175 Constructor of class 

176 :param config: configuration parameters of database and messaging 

177 :param engine: an instance of Engine class, used for deleting instances 

178 """ 

179 threading.Thread.__init__(self) 

180 self.to_terminate = False 

181 self.config = config 

182 self.db = None 

183 self.msg = None 

184 self.engine = engine 

185 self.loop = None 

186 self.last_rotask_time = 0 

187 self.next_check_unused_vim = time() + self.TIME_CHECK_UNUSED_VIM 

188 self.logger = logging.getLogger("ro.vimadmin") 

189 # asyncio task for receiving vim actions from kafka bus 

190 self.aiomain_task_kafka = None 

191 # asyncio task for watching ro_tasks not processed by nobody 

192 self.aiomain_task_vim = None 

193 self.aiomain_task_renew_lock = None 

194 # ^asyncio task for maintain an ro_task locked when VIM plugin takes too much time processing an order 

195 self.lock_renew = LockRenew(config, self.logger) 

196 self.task_locked_time = config["global"]["task_locked_time"] 

197 

198 async def vim_watcher(self): 

199 """Reads database periodically looking for tasks not processed by nobody because of a reboot 

200 in order to load this vim""" 

201 # firstly read VIMS not processed 

202 for target_database in ("vim_accounts", "wim_accounts", "sdns"): 

203 unattended_targets = self.db.get_list( 

204 target_database, 

205 q_filter={"_admin.operations.operationState": "PROCESSING"}, 

206 ) 

207 

208 for target in unattended_targets: 

209 target_id = "{}:{}".format(target_database[:3], target["_id"]) 

210 self.logger.info("ordered to check {}".format(target_id)) 

211 self.engine.check_vim(target_id) 

212 

213 while not self.to_terminate: 

214 now = time() 

215 processed_vims = [] 

216 

217 if not self.last_rotask_time: 

218 self.last_rotask_time = 0 

219 

220 ro_tasks = self.db.get_list( 

221 "ro_tasks", 

222 q_filter={ 

223 "target_id.ncont": self.engine.get_assigned_vims(), 

224 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"], 

225 "locked_at.lt": now - self.task_locked_time, 

226 "to_check_at.gt": self.last_rotask_time, 

227 "to_check_at.lte": now - self.MAX_TIME_UNATTENDED, 

228 }, 

229 ) 

230 self.last_rotask_time = now - self.MAX_TIME_UNATTENDED 

231 

232 for ro_task in ro_tasks: 

233 # if already checked ignore 

234 if ro_task["target_id"] in processed_vims: 

235 continue 

236 

237 processed_vims.append(ro_task["target_id"]) 

238 

239 # if already assigned ignore 

240 if ro_task["target_id"] in self.engine.get_assigned_vims(): 

241 continue 

242 

243 # if there is some task locked on this VIM, there is an RO working on it, so ignore 

244 if self.db.get_list( 

245 "ro_tasks", 

246 q_filter={ 

247 "target_id": ro_task["target_id"], 

248 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"], 

249 "locked_at.gt": now - self.task_locked_time, 

250 }, 

251 ): 

252 continue 

253 

254 # unattended, assign vim 

255 self.engine.assign_vim(ro_task["target_id"]) 

256 self.logger.debug( 

257 "ordered to load {}. Inactivity detected".format( 

258 ro_task["target_id"] 

259 ) 

260 ) 

261 

262 # every 2 hours check if there are vims without any ro_task and unload it 

263 if now > self.next_check_unused_vim: 

264 self.next_check_unused_vim = now + self.TIME_CHECK_UNUSED_VIM 

265 self.engine.unload_unused_vims() 

266 

267 await asyncio.sleep(self.MAX_TIME_UNATTENDED) 

268 

269 async def aiomain(self): 

270 kafka_working = True 

271 while not self.to_terminate: 

272 try: 

273 if not self.aiomain_task_kafka: 

274 for kafka_topic in self.kafka_topics: 

275 await self.msg.aiowrite(kafka_topic, "echo", "dummy message") 

276 kafka_working = True 

277 self.logger.debug("Starting vim_account subscription task") 

278 self.aiomain_task_kafka = asyncio.ensure_future( 

279 self.msg.aioread( 

280 self.kafka_topics, 

281 group_id=False, 

282 aiocallback=self._msg_callback, 

283 ), 

284 ) 

285 

286 if not self.aiomain_task_vim: 

287 self.aiomain_task_vim = asyncio.ensure_future(self.vim_watcher()) 

288 

289 if not self.aiomain_task_renew_lock: 

290 self.aiomain_task_renew_lock = asyncio.ensure_future( 

291 self.lock_renew.renew_locks() 

292 ) 

293 

294 done, _ = await asyncio.wait( 

295 [ 

296 self.aiomain_task_kafka, 

297 self.aiomain_task_vim, 

298 self.aiomain_task_renew_lock, 

299 ], 

300 timeout=None, 

301 return_when=asyncio.FIRST_COMPLETED, 

302 ) 

303 

304 try: 

305 if self.aiomain_task_kafka in done: 

306 exc = self.aiomain_task_kafka.exception() 

307 self.logger.error( 

308 "kafka subscription task exception: {}".format(exc) 

309 ) 

310 self.aiomain_task_kafka = None 

311 

312 if self.aiomain_task_vim in done: 

313 exc = self.aiomain_task_vim.exception() 

314 self.logger.error( 

315 "vim_account watcher task exception: {}".format(exc) 

316 ) 

317 self.aiomain_task_vim = None 

318 

319 if self.aiomain_task_renew_lock in done: 

320 exc = self.aiomain_task_renew_lock.exception() 

321 self.logger.error("renew_locks task exception: {}".format(exc)) 

322 self.aiomain_task_renew_lock = None 

323 except asyncio.CancelledError: 

324 self.logger.exception("asyncio.CancelledError occured.") 

325 

326 except Exception as e: 

327 if self.to_terminate: 

328 return 

329 

330 if kafka_working: 

331 # logging only first time 

332 self.logger.critical( 

333 "Error accessing kafka '{}'. Retrying ...".format(e) 

334 ) 

335 kafka_working = False 

336 

337 await asyncio.sleep(10) 

338 

339 def run(self): 

340 """ 

341 Start of the thread 

342 :return: None 

343 """ 

344 self.loop = asyncio.new_event_loop() 

345 try: 

346 if not self.db: 

347 if self.config["database"]["driver"] == "mongo": 

348 self.db = dbmongo.DbMongo() 

349 self.db.db_connect(self.config["database"]) 

350 elif self.config["database"]["driver"] == "memory": 

351 self.db = dbmemory.DbMemory() 

352 self.db.db_connect(self.config["database"]) 

353 else: 

354 raise VimAdminException( 

355 "Invalid configuration param '{}' at '[database]':'driver'".format( 

356 self.config["database"]["driver"] 

357 ) 

358 ) 

359 

360 self.lock_renew.start(self.db) 

361 

362 if not self.msg: 

363 config_msg = self.config["message"].copy() 

364 

365 if config_msg["driver"] == "local": 

366 self.msg = msglocal.MsgLocal() 

367 self.msg.connect(config_msg) 

368 elif config_msg["driver"] == "kafka": 

369 self.msg = msgkafka.MsgKafka() 

370 self.msg.connect(config_msg) 

371 else: 

372 raise VimAdminException( 

373 "Invalid configuration param '{}' at '[message]':'driver'".format( 

374 config_msg["driver"] 

375 ) 

376 ) 

377 except (DbException, MsgException) as e: 

378 raise VimAdminException(str(e), http_code=e.http_code) 

379 

380 self.logger.info("Starting") 

381 while not self.to_terminate: 

382 try: 

383 asyncio.run(self.main_task()) 

384 except Exception as e: 

385 if not self.to_terminate: 

386 self.logger.exception( 

387 "Exception '{}' at messaging read loop".format(e), exc_info=True 

388 ) 

389 

390 self.logger.info("Finishing") 

391 self._stop() 

392 self.loop.close() 

393 

394 async def main_task(self): 

395 task = asyncio.ensure_future(self.aiomain()) 

396 await task 

397 

398 async def _msg_callback(self, topic, command, params): 

399 """ 

400 Callback to process a received message from kafka 

401 :param topic: topic received 

402 :param command: command received 

403 :param params: rest of parameters 

404 :return: None 

405 """ 

406 try: 

407 if command == "echo": 

408 return 

409 

410 if topic in self.kafka_topics: 

411 target = topic[0:3] # vim, wim or sdn 

412 target_id = target + ":" + params["_id"] 

413 

414 if command in ("edited", "edit"): 

415 self.engine.reload_vim(target_id) 

416 self.logger.debug("ordered to reload {}".format(target_id)) 

417 elif command in ("deleted", "delete"): 

418 self.engine.unload_vim(target_id) 

419 self.logger.debug("ordered to unload {}".format(target_id)) 

420 elif command in ("create", "created"): 

421 self.engine.check_vim(target_id) 

422 self.logger.debug("ordered to check {}".format(target_id)) 

423 except (DbException, MsgException) as e: 

424 self.logger.error( 

425 "Error while processing topic={} command={}: {}".format( 

426 topic, command, e 

427 ) 

428 ) 

429 except Exception as e: 

430 self.logger.exception( 

431 "Exception while processing topic={} command={}: {}".format( 

432 topic, command, e 

433 ), 

434 exc_info=True, 

435 ) 

436 

437 def _stop(self): 

438 """ 

439 Close all connections 

440 :return: None 

441 """ 

442 try: 

443 if self.db: 

444 self.db.db_disconnect() 

445 

446 if self.msg: 

447 self.msg.disconnect() 

448 except (DbException, MsgException) as e: 

449 raise VimAdminException(str(e), http_code=e.http_code) 

450 

451 def terminate(self): 

452 """ 

453 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards, 

454 but not immediately. 

455 :return: None 

456 """ 

457 self.to_terminate = True 

458 self.lock_renew.to_terminate = True 

459 

460 if self.aiomain_task_kafka: 

461 self.loop.call_soon_threadsafe(self.aiomain_task_kafka.cancel()) 

462 

463 if self.aiomain_task_vim: 

464 self.loop.call_soon_threadsafe(self.aiomain_task_vim.cancel()) 

465 

466 if self.aiomain_task_renew_lock: 

467 self.loop.call_soon_threadsafe(self.aiomain_task_renew_lock.cancel()) 

468 

469 self.lock_renew.stop()