Coverage for NG-RO/osm_ng_ro/ns_thread.py: 30%

1457 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-14 12:04 +0000

1# -*- coding: utf-8 -*- 

2 

3## 

4# Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U. 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); you may 

7# not use this file except in compliance with the License. You may obtain 

8# a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

15# License for the specific language governing permissions and limitations 

16# under the License. 

17# 

18## 

19 

20""" 

21This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM. 

22The tasks are stored at database in table ro_tasks 

23A single ro_task refers to a VIM element (flavor, image, network, ...). 

24A ro_task can contain several 'tasks', each one with a target, where to store the results 

25""" 

26 

27from copy import deepcopy 

28from http import HTTPStatus 

29import logging 

30from os import makedirs 

31from os import path 

32import queue 

33import threading 

34import time 

35import traceback 

36from typing import Dict 

37from unittest.mock import Mock 

38 

39from importlib_metadata import entry_points 

40from osm_common.dbbase import DbException 

41from osm_ng_ro.vim_admin import LockRenew 

42from osm_ro_plugin import sdnconn 

43from osm_ro_plugin import vimconn 

44from osm_ro_plugin.sdn_dummy import SdnDummyConnector 

45from osm_ro_plugin.vim_dummy import VimDummyConnector 

46import yaml 

47 

48__author__ = "Alfonso Tierno" 

49__date__ = "$28-Sep-2017 12:07:15$" 

50 

51 

52def deep_get(target_dict, *args, **kwargs): 

53 """ 

54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None 

55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None 

56 :param target_dict: dictionary to be read 

57 :param args: list of keys to read from target_dict 

58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary 

59 :return: The wanted value if exists, None or default otherwise 

60 """ 

61 for key in args: 

62 if not isinstance(target_dict, dict) or key not in target_dict: 

63 return kwargs.get("default") 

64 target_dict = target_dict[key] 

65 return target_dict 

66 

67 

68class NsWorkerException(Exception): 

69 pass 

70 

71 

72class FailingConnector: 

73 def __init__(self, error_msg): 

74 self.error_msg = error_msg 

75 

76 for method in dir(vimconn.VimConnector): 

77 if method[0] != "_": 

78 setattr( 

79 self, method, Mock(side_effect=vimconn.VimConnException(error_msg)) 

80 ) 

81 

82 for method in dir(sdnconn.SdnConnectorBase): 

83 if method[0] != "_": 

84 setattr( 

85 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg)) 

86 ) 

87 

88 

89class NsWorkerExceptionNotFound(NsWorkerException): 

90 pass 

91 

92 

93class VimInteractionBase: 

94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ... 

95 It implements methods that does nothing and return ok""" 

96 

97 def __init__(self, db, my_vims, db_vims, logger): 

98 self.db = db 

99 self.logger = logger 

100 self.my_vims = my_vims 

101 self.db_vims = db_vims 

102 

103 def new(self, ro_task, task_index, task_depends): 

104 return "BUILD", {} 

105 

106 def refresh(self, ro_task): 

107 """skip calling VIM to get image, flavor status. Assumes ok""" 

108 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR": 

109 return "FAILED", {} 

110 

111 return "DONE", {} 

112 

113 def delete(self, ro_task, task_index): 

114 """skip calling VIM to delete image. Assumes ok""" 

115 return "DONE", {} 

116 

117 def exec(self, ro_task, task_index, task_depends): 

118 return "DONE", None, None 

119 

120 

121class VimInteractionNet(VimInteractionBase): 

122 def new(self, ro_task, task_index, task_depends): 

123 vim_net_id = None 

124 task = ro_task["tasks"][task_index] 

125 task_id = task["task_id"] 

126 created = False 

127 created_items = {} 

128 target_vim = self.my_vims[ro_task["target_id"]] 

129 mgmtnet = False 

130 mgmtnet_defined_in_vim = False 

131 

132 try: 

133 # FIND 

134 if task.get("find_params"): 

135 # if management, get configuration of VIM 

136 if task["find_params"].get("filter_dict"): 

137 vim_filter = task["find_params"]["filter_dict"] 

138 # management network 

139 elif task["find_params"].get("mgmt"): 

140 mgmtnet = True 

141 if deep_get( 

142 self.db_vims[ro_task["target_id"]], 

143 "config", 

144 "management_network_id", 

145 ): 

146 mgmtnet_defined_in_vim = True 

147 vim_filter = { 

148 "id": self.db_vims[ro_task["target_id"]]["config"][ 

149 "management_network_id" 

150 ] 

151 } 

152 elif deep_get( 

153 self.db_vims[ro_task["target_id"]], 

154 "config", 

155 "management_network_name", 

156 ): 

157 mgmtnet_defined_in_vim = True 

158 vim_filter = { 

159 "name": self.db_vims[ro_task["target_id"]]["config"][ 

160 "management_network_name" 

161 ] 

162 } 

163 else: 

164 vim_filter = {"name": task["find_params"]["name"]} 

165 else: 

166 raise NsWorkerExceptionNotFound( 

167 "Invalid find_params for new_net {}".format(task["find_params"]) 

168 ) 

169 

170 vim_nets = target_vim.get_network_list(vim_filter) 

171 if not vim_nets and not task.get("params"): 

172 # If there is mgmt-network in the descriptor, 

173 # there is no mapping of that network to a VIM network in the descriptor, 

174 # also there is no mapping in the "--config" parameter or at VIM creation; 

175 # that mgmt-network will be created. 

176 if mgmtnet and not mgmtnet_defined_in_vim: 

177 net_name = ( 

178 vim_filter.get("name") 

179 if vim_filter.get("name") 

180 else vim_filter.get("id")[:16] 

181 ) 

182 vim_net_id, created_items = target_vim.new_network( 

183 net_name, None 

184 ) 

185 self.logger.debug( 

186 "Created mgmt network vim_net_id: {}".format(vim_net_id) 

187 ) 

188 created = True 

189 else: 

190 raise NsWorkerExceptionNotFound( 

191 "Network not found with this criteria: '{}'".format( 

192 task.get("find_params") 

193 ) 

194 ) 

195 elif len(vim_nets) > 1: 

196 raise NsWorkerException( 

197 "More than one network found with this criteria: '{}'".format( 

198 task["find_params"] 

199 ) 

200 ) 

201 

202 if vim_nets: 

203 vim_net_id = vim_nets[0]["id"] 

204 else: 

205 # CREATE 

206 params = task["params"] 

207 vim_net_id, created_items = target_vim.new_network(**params) 

208 created = True 

209 

210 ro_vim_item_update = { 

211 "vim_id": vim_net_id, 

212 "vim_status": "BUILD", 

213 "created": created, 

214 "created_items": created_items, 

215 "vim_details": None, 

216 "vim_message": None, 

217 } 

218 self.logger.debug( 

219 "task={} {} new-net={} created={}".format( 

220 task_id, ro_task["target_id"], vim_net_id, created 

221 ) 

222 ) 

223 

224 return "BUILD", ro_vim_item_update 

225 except (vimconn.VimConnException, NsWorkerException) as e: 

226 self.logger.error( 

227 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e) 

228 ) 

229 ro_vim_item_update = { 

230 "vim_status": "VIM_ERROR", 

231 "created": created, 

232 "vim_message": str(e), 

233 } 

234 

235 return "FAILED", ro_vim_item_update 

236 

237 def refresh(self, ro_task): 

238 """Call VIM to get network status""" 

239 ro_task_id = ro_task["_id"] 

240 target_vim = self.my_vims[ro_task["target_id"]] 

241 vim_id = ro_task["vim_info"]["vim_id"] 

242 net_to_refresh_list = [vim_id] 

243 

244 try: 

245 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list) 

246 vim_info = vim_dict[vim_id] 

247 

248 if vim_info["status"] == "ACTIVE": 

249 task_status = "DONE" 

250 elif vim_info["status"] == "BUILD": 

251 task_status = "BUILD" 

252 else: 

253 task_status = "FAILED" 

254 except vimconn.VimConnException as e: 

255 # Mark all tasks at VIM_ERROR status 

256 self.logger.error( 

257 "ro_task={} vim={} get-net={}: {}".format( 

258 ro_task_id, ro_task["target_id"], vim_id, e 

259 ) 

260 ) 

261 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)} 

262 task_status = "FAILED" 

263 

264 ro_vim_item_update = {} 

265 if ro_task["vim_info"]["vim_status"] != vim_info["status"]: 

266 ro_vim_item_update["vim_status"] = vim_info["status"] 

267 

268 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"): 

269 ro_vim_item_update["vim_name"] = vim_info.get("name") 

270 

271 if vim_info["status"] in ("ERROR", "VIM_ERROR"): 

272 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"): 

273 ro_vim_item_update["vim_message"] = vim_info.get("error_msg") 

274 elif vim_info["status"] == "DELETED": 

275 ro_vim_item_update["vim_id"] = None 

276 ro_vim_item_update["vim_message"] = "Deleted externally" 

277 else: 

278 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]: 

279 ro_vim_item_update["vim_details"] = vim_info["vim_info"] 

280 

281 if ro_vim_item_update: 

282 self.logger.debug( 

283 "ro_task={} {} get-net={}: status={} {}".format( 

284 ro_task_id, 

285 ro_task["target_id"], 

286 vim_id, 

287 ro_vim_item_update.get("vim_status"), 

288 ( 

289 ro_vim_item_update.get("vim_message") 

290 if ro_vim_item_update.get("vim_status") != "ACTIVE" 

291 else "" 

292 ), 

293 ) 

294 ) 

295 

296 return task_status, ro_vim_item_update 

297 

298 def delete(self, ro_task, task_index): 

299 task = ro_task["tasks"][task_index] 

300 task_id = task["task_id"] 

301 net_vim_id = ro_task["vim_info"]["vim_id"] 

302 ro_vim_item_update_ok = { 

303 "vim_status": "DELETED", 

304 "created": False, 

305 "vim_message": "DELETED", 

306 "vim_id": None, 

307 } 

308 

309 try: 

310 if net_vim_id or ro_task["vim_info"]["created_items"]: 

311 target_vim = self.my_vims[ro_task["target_id"]] 

312 target_vim.delete_network( 

313 net_vim_id, ro_task["vim_info"]["created_items"] 

314 ) 

315 except vimconn.VimConnNotFoundException: 

316 ro_vim_item_update_ok["vim_message"] = "already deleted" 

317 except vimconn.VimConnException as e: 

318 self.logger.error( 

319 "ro_task={} vim={} del-net={}: {}".format( 

320 ro_task["_id"], ro_task["target_id"], net_vim_id, e 

321 ) 

322 ) 

323 ro_vim_item_update = { 

324 "vim_status": "VIM_ERROR", 

325 "vim_message": "Error while deleting: {}".format(e), 

326 } 

327 

328 return "FAILED", ro_vim_item_update 

329 

330 self.logger.debug( 

331 "task={} {} del-net={} {}".format( 

332 task_id, 

333 ro_task["target_id"], 

334 net_vim_id, 

335 ro_vim_item_update_ok.get("vim_message", ""), 

336 ) 

337 ) 

338 

339 return "DONE", ro_vim_item_update_ok 

340 

341 

342class VimInteractionClassification(VimInteractionBase): 

343 def new(self, ro_task, task_index, task_depends): 

344 task = ro_task["tasks"][task_index] 

345 task_id = task["task_id"] 

346 created = False 

347 target_vim = self.my_vims[ro_task["target_id"]] 

348 

349 try: 

350 created = True 

351 params = task["params"] 

352 params_copy = deepcopy(params) 

353 

354 name = params_copy.pop("name") 

355 logical_source_port_index = int( 

356 params_copy.pop("logical_source_port_index") 

357 ) 

358 logical_source_port = params_copy["logical_source_port"] 

359 

360 if logical_source_port.startswith("TASK-"): 

361 vm_id = task_depends[logical_source_port] 

362 params_copy["logical_source_port"] = target_vim.refresh_vms_status( 

363 [vm_id] 

364 )[vm_id]["interfaces"][logical_source_port_index]["vim_interface_id"] 

365 

366 vim_classification_id = target_vim.new_classification( 

367 name, "legacy_flow_classifier", params_copy 

368 ) 

369 

370 ro_vim_item_update = { 

371 "vim_id": vim_classification_id, 

372 "vim_status": "DONE", 

373 "created": created, 

374 "vim_details": None, 

375 "vim_message": None, 

376 } 

377 self.logger.debug( 

378 "task={} {} created={}".format(task_id, ro_task["target_id"], created) 

379 ) 

380 

381 return "DONE", ro_vim_item_update 

382 except (vimconn.VimConnException, NsWorkerException) as e: 

383 self.logger.debug(traceback.format_exc()) 

384 self.logger.error( 

385 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e) 

386 ) 

387 ro_vim_item_update = { 

388 "vim_status": "VIM_ERROR", 

389 "created": created, 

390 "vim_message": str(e), 

391 } 

392 

393 return "FAILED", ro_vim_item_update 

394 

395 def delete(self, ro_task, task_index): 

396 task = ro_task["tasks"][task_index] 

397 task_id = task["task_id"] 

398 classification_vim_id = ro_task["vim_info"]["vim_id"] 

399 ro_vim_item_update_ok = { 

400 "vim_status": "DELETED", 

401 "created": False, 

402 "vim_message": "DELETED", 

403 "vim_id": None, 

404 } 

405 

406 try: 

407 if classification_vim_id: 

408 target_vim = self.my_vims[ro_task["target_id"]] 

409 target_vim.delete_classification(classification_vim_id) 

410 except vimconn.VimConnNotFoundException: 

411 ro_vim_item_update_ok["vim_message"] = "already deleted" 

412 except vimconn.VimConnException as e: 

413 self.logger.error( 

414 "ro_task={} vim={} del-classification={}: {}".format( 

415 ro_task["_id"], ro_task["target_id"], classification_vim_id, e 

416 ) 

417 ) 

418 ro_vim_item_update = { 

419 "vim_status": "VIM_ERROR", 

420 "vim_message": "Error while deleting: {}".format(e), 

421 } 

422 

423 return "FAILED", ro_vim_item_update 

424 

425 self.logger.debug( 

426 "task={} {} del-classification={} {}".format( 

427 task_id, 

428 ro_task["target_id"], 

429 classification_vim_id, 

430 ro_vim_item_update_ok.get("vim_message", ""), 

431 ) 

432 ) 

433 

434 return "DONE", ro_vim_item_update_ok 

435 

436 

437class VimInteractionSfi(VimInteractionBase): 

438 def new(self, ro_task, task_index, task_depends): 

439 task = ro_task["tasks"][task_index] 

440 task_id = task["task_id"] 

441 created = False 

442 target_vim = self.my_vims[ro_task["target_id"]] 

443 

444 try: 

445 created = True 

446 params = task["params"] 

447 params_copy = deepcopy(params) 

448 name = params_copy["name"] 

449 ingress_port = params_copy["ingress_port"] 

450 egress_port = params_copy["egress_port"] 

451 ingress_port_index = params_copy["ingress_port_index"] 

452 egress_port_index = params_copy["egress_port_index"] 

453 

454 ingress_port_id = ingress_port 

455 egress_port_id = egress_port 

456 

457 vm_id = task_depends[ingress_port] 

458 

459 if ingress_port.startswith("TASK-"): 

460 ingress_port_id = target_vim.refresh_vms_status([vm_id])[vm_id][ 

461 "interfaces" 

462 ][ingress_port_index]["vim_interface_id"] 

463 

464 if ingress_port == egress_port: 

465 egress_port_id = ingress_port_id 

466 else: 

467 if egress_port.startswith("TASK-"): 

468 egress_port_id = target_vim.refresh_vms_status([vm_id])[vm_id][ 

469 "interfaces" 

470 ][egress_port_index]["vim_interface_id"] 

471 

472 ingress_port_id_list = [ingress_port_id] 

473 egress_port_id_list = [egress_port_id] 

474 

475 vim_sfi_id = target_vim.new_sfi( 

476 name, ingress_port_id_list, egress_port_id_list, sfc_encap=False 

477 ) 

478 

479 ro_vim_item_update = { 

480 "vim_id": vim_sfi_id, 

481 "vim_status": "DONE", 

482 "created": created, 

483 "vim_details": None, 

484 "vim_message": None, 

485 } 

486 self.logger.debug( 

487 "task={} {} created={}".format(task_id, ro_task["target_id"], created) 

488 ) 

489 

490 return "DONE", ro_vim_item_update 

491 except (vimconn.VimConnException, NsWorkerException) as e: 

492 self.logger.debug(traceback.format_exc()) 

493 self.logger.error( 

494 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e) 

495 ) 

496 ro_vim_item_update = { 

497 "vim_status": "VIM_ERROR", 

498 "created": created, 

499 "vim_message": str(e), 

500 } 

501 

502 return "FAILED", ro_vim_item_update 

503 

504 def delete(self, ro_task, task_index): 

505 task = ro_task["tasks"][task_index] 

506 task_id = task["task_id"] 

507 sfi_vim_id = ro_task["vim_info"]["vim_id"] 

508 ro_vim_item_update_ok = { 

509 "vim_status": "DELETED", 

510 "created": False, 

511 "vim_message": "DELETED", 

512 "vim_id": None, 

513 } 

514 

515 try: 

516 if sfi_vim_id: 

517 target_vim = self.my_vims[ro_task["target_id"]] 

518 target_vim.delete_sfi(sfi_vim_id) 

519 except vimconn.VimConnNotFoundException: 

520 ro_vim_item_update_ok["vim_message"] = "already deleted" 

521 except vimconn.VimConnException as e: 

522 self.logger.error( 

523 "ro_task={} vim={} del-sfi={}: {}".format( 

524 ro_task["_id"], ro_task["target_id"], sfi_vim_id, e 

525 ) 

526 ) 

527 ro_vim_item_update = { 

528 "vim_status": "VIM_ERROR", 

529 "vim_message": "Error while deleting: {}".format(e), 

530 } 

531 

532 return "FAILED", ro_vim_item_update 

533 

534 self.logger.debug( 

535 "task={} {} del-sfi={} {}".format( 

536 task_id, 

537 ro_task["target_id"], 

538 sfi_vim_id, 

539 ro_vim_item_update_ok.get("vim_message", ""), 

540 ) 

541 ) 

542 

543 return "DONE", ro_vim_item_update_ok 

544 

545 

546class VimInteractionSf(VimInteractionBase): 

547 def new(self, ro_task, task_index, task_depends): 

548 task = ro_task["tasks"][task_index] 

549 task_id = task["task_id"] 

550 created = False 

551 target_vim = self.my_vims[ro_task["target_id"]] 

552 

553 try: 

554 created = True 

555 params = task["params"] 

556 params_copy = deepcopy(params) 

557 name = params_copy["name"] 

558 sfi_list = params_copy["sfis"] 

559 sfi_id_list = [] 

560 

561 for sfi in sfi_list: 

562 sfi_id = task_depends[sfi] if sfi.startswith("TASK-") else sfi 

563 sfi_id_list.append(sfi_id) 

564 

565 vim_sf_id = target_vim.new_sf(name, sfi_id_list, sfc_encap=False) 

566 

567 ro_vim_item_update = { 

568 "vim_id": vim_sf_id, 

569 "vim_status": "DONE", 

570 "created": created, 

571 "vim_details": None, 

572 "vim_message": None, 

573 } 

574 self.logger.debug( 

575 "task={} {} created={}".format(task_id, ro_task["target_id"], created) 

576 ) 

577 

578 return "DONE", ro_vim_item_update 

579 except (vimconn.VimConnException, NsWorkerException) as e: 

580 self.logger.debug(traceback.format_exc()) 

581 self.logger.error( 

582 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e) 

583 ) 

584 ro_vim_item_update = { 

585 "vim_status": "VIM_ERROR", 

586 "created": created, 

587 "vim_message": str(e), 

588 } 

589 

590 return "FAILED", ro_vim_item_update 

591 

592 def delete(self, ro_task, task_index): 

593 task = ro_task["tasks"][task_index] 

594 task_id = task["task_id"] 

595 sf_vim_id = ro_task["vim_info"]["vim_id"] 

596 ro_vim_item_update_ok = { 

597 "vim_status": "DELETED", 

598 "created": False, 

599 "vim_message": "DELETED", 

600 "vim_id": None, 

601 } 

602 

603 try: 

604 if sf_vim_id: 

605 target_vim = self.my_vims[ro_task["target_id"]] 

606 target_vim.delete_sf(sf_vim_id) 

607 except vimconn.VimConnNotFoundException: 

608 ro_vim_item_update_ok["vim_message"] = "already deleted" 

609 except vimconn.VimConnException as e: 

610 self.logger.error( 

611 "ro_task={} vim={} del-sf={}: {}".format( 

612 ro_task["_id"], ro_task["target_id"], sf_vim_id, e 

613 ) 

614 ) 

615 ro_vim_item_update = { 

616 "vim_status": "VIM_ERROR", 

617 "vim_message": "Error while deleting: {}".format(e), 

618 } 

619 

620 return "FAILED", ro_vim_item_update 

621 

622 self.logger.debug( 

623 "task={} {} del-sf={} {}".format( 

624 task_id, 

625 ro_task["target_id"], 

626 sf_vim_id, 

627 ro_vim_item_update_ok.get("vim_message", ""), 

628 ) 

629 ) 

630 

631 return "DONE", ro_vim_item_update_ok 

632 

633 

634class VimInteractionSfp(VimInteractionBase): 

635 def new(self, ro_task, task_index, task_depends): 

636 task = ro_task["tasks"][task_index] 

637 task_id = task["task_id"] 

638 created = False 

639 target_vim = self.my_vims[ro_task["target_id"]] 

640 

641 try: 

642 created = True 

643 params = task["params"] 

644 params_copy = deepcopy(params) 

645 name = params_copy["name"] 

646 sf_list = params_copy["sfs"] 

647 classification_list = params_copy["classifications"] 

648 

649 classification_id_list = [] 

650 sf_id_list = [] 

651 

652 for classification in classification_list: 

653 classi_id = ( 

654 task_depends[classification] 

655 if classification.startswith("TASK-") 

656 else classification 

657 ) 

658 classification_id_list.append(classi_id) 

659 

660 for sf in sf_list: 

661 sf_id = task_depends[sf] if sf.startswith("TASK-") else sf 

662 sf_id_list.append(sf_id) 

663 

664 vim_sfp_id = target_vim.new_sfp( 

665 name, classification_id_list, sf_id_list, sfc_encap=False 

666 ) 

667 

668 ro_vim_item_update = { 

669 "vim_id": vim_sfp_id, 

670 "vim_status": "DONE", 

671 "created": created, 

672 "vim_details": None, 

673 "vim_message": None, 

674 } 

675 self.logger.debug( 

676 "task={} {} created={}".format(task_id, ro_task["target_id"], created) 

677 ) 

678 

679 return "DONE", ro_vim_item_update 

680 except (vimconn.VimConnException, NsWorkerException) as e: 

681 self.logger.debug(traceback.format_exc()) 

682 self.logger.error( 

683 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e) 

684 ) 

685 ro_vim_item_update = { 

686 "vim_status": "VIM_ERROR", 

687 "created": created, 

688 "vim_message": str(e), 

689 } 

690 

691 return "FAILED", ro_vim_item_update 

692 

693 def delete(self, ro_task, task_index): 

694 task = ro_task["tasks"][task_index] 

695 task_id = task["task_id"] 

696 sfp_vim_id = ro_task["vim_info"]["vim_id"] 

697 ro_vim_item_update_ok = { 

698 "vim_status": "DELETED", 

699 "created": False, 

700 "vim_message": "DELETED", 

701 "vim_id": None, 

702 } 

703 

704 try: 

705 if sfp_vim_id: 

706 target_vim = self.my_vims[ro_task["target_id"]] 

707 target_vim.delete_sfp(sfp_vim_id) 

708 except vimconn.VimConnNotFoundException: 

709 ro_vim_item_update_ok["vim_message"] = "already deleted" 

710 except vimconn.VimConnException as e: 

711 self.logger.error( 

712 "ro_task={} vim={} del-sfp={}: {}".format( 

713 ro_task["_id"], ro_task["target_id"], sfp_vim_id, e 

714 ) 

715 ) 

716 ro_vim_item_update = { 

717 "vim_status": "VIM_ERROR", 

718 "vim_message": "Error while deleting: {}".format(e), 

719 } 

720 

721 return "FAILED", ro_vim_item_update 

722 

723 self.logger.debug( 

724 "task={} {} del-sfp={} {}".format( 

725 task_id, 

726 ro_task["target_id"], 

727 sfp_vim_id, 

728 ro_vim_item_update_ok.get("vim_message", ""), 

729 ) 

730 ) 

731 

732 return "DONE", ro_vim_item_update_ok 

733 

734 

735class VimInteractionVdu(VimInteractionBase): 

736 max_retries_inject_ssh_key = 20 # 20 times 

737 time_retries_inject_ssh_key = 30 # wevery 30 seconds 

738 

739 def new(self, ro_task, task_index, task_depends): 

740 task = ro_task["tasks"][task_index] 

741 task_id = task["task_id"] 

742 created = False 

743 target_vim = self.my_vims[ro_task["target_id"]] 

744 try: 

745 created = True 

746 params = task["params"] 

747 params_copy = deepcopy(params) 

748 net_list = params_copy["net_list"] 

749 

750 for net in net_list: 

751 # change task_id into network_id 

752 if "net_id" in net and net["net_id"].startswith("TASK-"): 

753 network_id = task_depends[net["net_id"]] 

754 

755 if not network_id: 

756 raise NsWorkerException( 

757 "Cannot create VM because depends on a network not created or found " 

758 "for {}".format(net["net_id"]) 

759 ) 

760 

761 net["net_id"] = network_id 

762 

763 if params_copy["image_id"].startswith("TASK-"): 

764 params_copy["image_id"] = task_depends[params_copy["image_id"]] 

765 

766 if params_copy["flavor_id"].startswith("TASK-"): 

767 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]] 

768 

769 affinity_group_list = params_copy["affinity_group_list"] 

770 for affinity_group in affinity_group_list: 

771 # change task_id into affinity_group_id 

772 if "affinity_group_id" in affinity_group and affinity_group[ 

773 "affinity_group_id" 

774 ].startswith("TASK-"): 

775 affinity_group_id = task_depends[ 

776 affinity_group["affinity_group_id"] 

777 ] 

778 

779 if not affinity_group_id: 

780 raise NsWorkerException( 

781 "found for {}".format(affinity_group["affinity_group_id"]) 

782 ) 

783 

784 affinity_group["affinity_group_id"] = affinity_group_id 

785 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy) 

786 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]] 

787 

788 # add to created items previous_created_volumes (healing) 

789 if task.get("previous_created_volumes"): 

790 for k, v in task["previous_created_volumes"].items(): 

791 created_items[k] = v 

792 

793 ro_vim_item_update = { 

794 "vim_id": vim_vm_id, 

795 "vim_status": "BUILD", 

796 "created": created, 

797 "created_items": created_items, 

798 "vim_details": None, 

799 "vim_message": None, 

800 "interfaces_vim_ids": interfaces, 

801 "interfaces": [], 

802 "interfaces_backup": [], 

803 } 

804 self.logger.debug( 

805 "task={} {} new-vm={} created={}".format( 

806 task_id, ro_task["target_id"], vim_vm_id, created 

807 ) 

808 ) 

809 

810 return "BUILD", ro_vim_item_update 

811 except (vimconn.VimConnException, NsWorkerException) as e: 

812 self.logger.debug(traceback.format_exc()) 

813 self.logger.error( 

814 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e) 

815 ) 

816 ro_vim_item_update = { 

817 "vim_status": "VIM_ERROR", 

818 "created": created, 

819 "vim_message": str(e), 

820 } 

821 

822 return "FAILED", ro_vim_item_update 

823 

824 def delete(self, ro_task, task_index): 

825 task = ro_task["tasks"][task_index] 

826 task_id = task["task_id"] 

827 vm_vim_id = ro_task["vim_info"]["vim_id"] 

828 ro_vim_item_update_ok = { 

829 "vim_status": "DELETED", 

830 "created": False, 

831 "vim_message": "DELETED", 

832 "vim_id": None, 

833 } 

834 

835 try: 

836 self.logger.debug( 

837 "delete_vminstance: vm_vim_id={} created_items={}".format( 

838 vm_vim_id, ro_task["vim_info"]["created_items"] 

839 ) 

840 ) 

841 if vm_vim_id or ro_task["vim_info"]["created_items"]: 

842 target_vim = self.my_vims[ro_task["target_id"]] 

843 target_vim.delete_vminstance( 

844 vm_vim_id, 

845 ro_task["vim_info"]["created_items"], 

846 ro_task["vim_info"].get("volumes_to_hold", []), 

847 ) 

848 except vimconn.VimConnNotFoundException: 

849 ro_vim_item_update_ok["vim_message"] = "already deleted" 

850 except vimconn.VimConnException as e: 

851 self.logger.error( 

852 "ro_task={} vim={} del-vm={}: {}".format( 

853 ro_task["_id"], ro_task["target_id"], vm_vim_id, e 

854 ) 

855 ) 

856 ro_vim_item_update = { 

857 "vim_status": "VIM_ERROR", 

858 "vim_message": "Error while deleting: {}".format(e), 

859 } 

860 

861 return "FAILED", ro_vim_item_update 

862 

863 self.logger.debug( 

864 "task={} {} del-vm={} {}".format( 

865 task_id, 

866 ro_task["target_id"], 

867 vm_vim_id, 

868 ro_vim_item_update_ok.get("vim_message", ""), 

869 ) 

870 ) 

871 

872 return "DONE", ro_vim_item_update_ok 

873 

874 def refresh(self, ro_task): 

875 """Call VIM to get vm status""" 

876 ro_task_id = ro_task["_id"] 

877 target_vim = self.my_vims[ro_task["target_id"]] 

878 vim_id = ro_task["vim_info"]["vim_id"] 

879 

880 if not vim_id: 

881 return None, None 

882 

883 vm_to_refresh_list = [vim_id] 

884 try: 

885 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list) 

886 vim_info = vim_dict[vim_id] 

887 

888 if vim_info["status"] == "ACTIVE": 

889 task_status = "DONE" 

890 elif vim_info["status"] == "BUILD": 

891 task_status = "BUILD" 

892 else: 

893 task_status = "FAILED" 

894 

895 # try to load and parse vim_information 

896 try: 

897 vim_info_info = yaml.safe_load(vim_info["vim_info"]) 

898 if vim_info_info.get("name"): 

899 vim_info["name"] = vim_info_info["name"] 

900 except Exception as vim_info_error: 

901 self.logger.exception( 

902 f"{vim_info_error} occured while getting the vim_info from yaml" 

903 ) 

904 except vimconn.VimConnException as e: 

905 # Mark all tasks at VIM_ERROR status 

906 self.logger.error( 

907 "ro_task={} vim={} get-vm={}: {}".format( 

908 ro_task_id, ro_task["target_id"], vim_id, e 

909 ) 

910 ) 

911 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)} 

912 task_status = "FAILED" 

913 

914 ro_vim_item_update = {} 

915 

916 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED 

917 vim_interfaces = [] 

918 if vim_info.get("interfaces"): 

919 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]: 

920 iface = next( 

921 ( 

922 iface 

923 for iface in vim_info["interfaces"] 

924 if vim_iface_id == iface["vim_interface_id"] 

925 ), 

926 None, 

927 ) 

928 # if iface: 

929 # iface.pop("vim_info", None) 

930 vim_interfaces.append(iface) 

931 

932 task_create = next( 

933 t 

934 for t in ro_task["tasks"] 

935 if t and t["action"] == "CREATE" and t["status"] != "FINISHED" 

936 ) 

937 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None: 

938 vim_interfaces[task_create["mgmt_vnf_interface"]][ 

939 "mgmt_vnf_interface" 

940 ] = True 

941 

942 mgmt_vdu_iface = task_create.get( 

943 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0) 

944 ) 

945 if vim_interfaces: 

946 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True 

947 

948 if ro_task["vim_info"]["interfaces"] != vim_interfaces: 

949 ro_vim_item_update["interfaces"] = vim_interfaces 

950 

951 if ro_task["vim_info"]["vim_status"] != vim_info["status"]: 

952 ro_vim_item_update["vim_status"] = vim_info["status"] 

953 

954 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"): 

955 ro_vim_item_update["vim_name"] = vim_info.get("name") 

956 

957 if vim_info["status"] in ("ERROR", "VIM_ERROR"): 

958 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"): 

959 ro_vim_item_update["vim_message"] = vim_info.get("error_msg") 

960 elif vim_info["status"] == "DELETED": 

961 ro_vim_item_update["vim_id"] = None 

962 ro_vim_item_update["vim_message"] = "Deleted externally" 

963 else: 

964 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]: 

965 ro_vim_item_update["vim_details"] = vim_info["vim_info"] 

966 

967 if ro_vim_item_update: 

968 self.logger.debug( 

969 "ro_task={} {} get-vm={}: status={} {}".format( 

970 ro_task_id, 

971 ro_task["target_id"], 

972 vim_id, 

973 ro_vim_item_update.get("vim_status"), 

974 ( 

975 ro_vim_item_update.get("vim_message") 

976 if ro_vim_item_update.get("vim_status") != "ACTIVE" 

977 else "" 

978 ), 

979 ) 

980 ) 

981 

982 return task_status, ro_vim_item_update 

983 

984 def exec(self, ro_task, task_index, task_depends): 

985 task = ro_task["tasks"][task_index] 

986 task_id = task["task_id"] 

987 target_vim = self.my_vims[ro_task["target_id"]] 

988 db_task_update = {"retries": 0} 

989 retries = task.get("retries", 0) 

990 

991 try: 

992 params = task["params"] 

993 params_copy = deepcopy(params) 

994 params_copy["ro_key"] = self.db.decrypt( 

995 params_copy.pop("private_key"), 

996 params_copy.pop("schema_version"), 

997 params_copy.pop("salt"), 

998 ) 

999 params_copy["ip_addr"] = params_copy.pop("ip_address") 

1000 target_vim.inject_user_key(**params_copy) 

1001 self.logger.debug( 

1002 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"]) 

1003 ) 

1004 

1005 return ( 

1006 "DONE", 

1007 None, 

1008 db_task_update, 

1009 ) # params_copy["key"] 

1010 except (vimconn.VimConnException, NsWorkerException) as e: 

1011 retries += 1 

1012 

1013 self.logger.debug(traceback.format_exc()) 

1014 if retries < self.max_retries_inject_ssh_key: 

1015 return ( 

1016 "BUILD", 

1017 None, 

1018 { 

1019 "retries": retries, 

1020 "next_retry": self.time_retries_inject_ssh_key, 

1021 }, 

1022 ) 

1023 

1024 self.logger.error( 

1025 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e) 

1026 ) 

1027 ro_vim_item_update = {"vim_message": str(e)} 

1028 

1029 return "FAILED", ro_vim_item_update, db_task_update 

1030 

1031 

1032class VimInteractionImage(VimInteractionBase): 

1033 def new(self, ro_task, task_index, task_depends): 

1034 task = ro_task["tasks"][task_index] 

1035 task_id = task["task_id"] 

1036 created = False 

1037 created_items = {} 

1038 target_vim = self.my_vims[ro_task["target_id"]] 

1039 

1040 try: 

1041 # FIND 

1042 vim_image_id = "" 

1043 if task.get("find_params"): 

1044 vim_images = target_vim.get_image_list( 

1045 task["find_params"].get("filter_dict", {}) 

1046 ) 

1047 

1048 if not vim_images: 

1049 raise NsWorkerExceptionNotFound( 

1050 "Image not found with this criteria: '{}'".format( 

1051 task["find_params"] 

1052 ) 

1053 ) 

1054 elif len(vim_images) > 1: 

1055 raise NsWorkerException( 

1056 "More than one image found with this criteria: '{}'".format( 

1057 task["find_params"] 

1058 ) 

1059 ) 

1060 else: 

1061 vim_image_id = vim_images[0]["id"] 

1062 

1063 ro_vim_item_update = { 

1064 "vim_id": vim_image_id, 

1065 "vim_status": "ACTIVE", 

1066 "created": created, 

1067 "created_items": created_items, 

1068 "vim_details": None, 

1069 "vim_message": None, 

1070 } 

1071 self.logger.debug( 

1072 "task={} {} new-image={} created={}".format( 

1073 task_id, ro_task["target_id"], vim_image_id, created 

1074 ) 

1075 ) 

1076 

1077 return "DONE", ro_vim_item_update 

1078 except (NsWorkerException, vimconn.VimConnException) as e: 

1079 self.logger.error( 

1080 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e) 

1081 ) 

1082 ro_vim_item_update = { 

1083 "vim_status": "VIM_ERROR", 

1084 "created": created, 

1085 "vim_message": str(e), 

1086 } 

1087 

1088 return "FAILED", ro_vim_item_update 

1089 

1090 

1091class VimInteractionSharedVolume(VimInteractionBase): 

1092 def delete(self, ro_task, task_index): 

1093 task = ro_task["tasks"][task_index] 

1094 task_id = task["task_id"] 

1095 shared_volume_vim_id = ro_task["vim_info"]["vim_id"] 

1096 created_items = ro_task["vim_info"]["created_items"] 

1097 ro_vim_item_update_ok = { 

1098 "vim_status": "DELETED", 

1099 "created": False, 

1100 "vim_message": "DELETED", 

1101 "vim_id": None, 

1102 } 

1103 if created_items and created_items.get(shared_volume_vim_id).get("keep"): 

1104 ro_vim_item_update_ok = { 

1105 "vim_status": "ACTIVE", 

1106 "created": False, 

1107 "vim_message": None, 

1108 } 

1109 return "DONE", ro_vim_item_update_ok 

1110 try: 

1111 if shared_volume_vim_id: 

1112 target_vim = self.my_vims[ro_task["target_id"]] 

1113 target_vim.delete_shared_volumes(shared_volume_vim_id) 

1114 except vimconn.VimConnNotFoundException: 

1115 ro_vim_item_update_ok["vim_message"] = "already deleted" 

1116 except vimconn.VimConnException as e: 

1117 self.logger.error( 

1118 "ro_task={} vim={} del-shared-volume={}: {}".format( 

1119 ro_task["_id"], ro_task["target_id"], shared_volume_vim_id, e 

1120 ) 

1121 ) 

1122 ro_vim_item_update = { 

1123 "vim_status": "VIM_ERROR", 

1124 "vim_message": "Error while deleting: {}".format(e), 

1125 } 

1126 

1127 return "FAILED", ro_vim_item_update 

1128 

1129 self.logger.debug( 

1130 "task={} {} del-shared-volume={} {}".format( 

1131 task_id, 

1132 ro_task["target_id"], 

1133 shared_volume_vim_id, 

1134 ro_vim_item_update_ok.get("vim_message", ""), 

1135 ) 

1136 ) 

1137 

1138 return "DONE", ro_vim_item_update_ok 

1139 

1140 def new(self, ro_task, task_index, task_depends): 

1141 task = ro_task["tasks"][task_index] 

1142 task_id = task["task_id"] 

1143 created = False 

1144 created_items = {} 

1145 target_vim = self.my_vims[ro_task["target_id"]] 

1146 

1147 try: 

1148 shared_volume_vim_id = None 

1149 shared_volume_data = None 

1150 

1151 if task.get("params"): 

1152 shared_volume_data = task["params"] 

1153 

1154 if shared_volume_data: 

1155 self.logger.info( 

1156 f"Creating the new shared_volume for {shared_volume_data}\n" 

1157 ) 

1158 ( 

1159 shared_volume_name, 

1160 shared_volume_vim_id, 

1161 ) = target_vim.new_shared_volumes(shared_volume_data) 

1162 created = True 

1163 created_items[shared_volume_vim_id] = { 

1164 "name": shared_volume_name, 

1165 "keep": shared_volume_data.get("keep"), 

1166 } 

1167 

1168 ro_vim_item_update = { 

1169 "vim_id": shared_volume_vim_id, 

1170 "vim_status": "ACTIVE", 

1171 "created": created, 

1172 "created_items": created_items, 

1173 "vim_details": None, 

1174 "vim_message": None, 

1175 } 

1176 self.logger.debug( 

1177 "task={} {} new-shared-volume={} created={}".format( 

1178 task_id, ro_task["target_id"], shared_volume_vim_id, created 

1179 ) 

1180 ) 

1181 

1182 return "DONE", ro_vim_item_update 

1183 except (vimconn.VimConnException, NsWorkerException) as e: 

1184 self.logger.error( 

1185 "task={} vim={} new-shared-volume:" 

1186 " {}".format(task_id, ro_task["target_id"], e) 

1187 ) 

1188 ro_vim_item_update = { 

1189 "vim_status": "VIM_ERROR", 

1190 "created": created, 

1191 "vim_message": str(e), 

1192 } 

1193 

1194 return "FAILED", ro_vim_item_update 

1195 

1196 

1197class VimInteractionFlavor(VimInteractionBase): 

1198 def delete(self, ro_task, task_index): 

1199 task = ro_task["tasks"][task_index] 

1200 task_id = task["task_id"] 

1201 flavor_vim_id = ro_task["vim_info"]["vim_id"] 

1202 ro_vim_item_update_ok = { 

1203 "vim_status": "DELETED", 

1204 "created": False, 

1205 "vim_message": "DELETED", 

1206 "vim_id": None, 

1207 } 

1208 

1209 try: 

1210 if flavor_vim_id: 

1211 target_vim = self.my_vims[ro_task["target_id"]] 

1212 target_vim.delete_flavor(flavor_vim_id) 

1213 except vimconn.VimConnNotFoundException: 

1214 ro_vim_item_update_ok["vim_message"] = "already deleted" 

1215 except vimconn.VimConnException as e: 

1216 self.logger.error( 

1217 "ro_task={} vim={} del-flavor={}: {}".format( 

1218 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e 

1219 ) 

1220 ) 

1221 ro_vim_item_update = { 

1222 "vim_status": "VIM_ERROR", 

1223 "vim_message": "Error while deleting: {}".format(e), 

1224 } 

1225 

1226 return "FAILED", ro_vim_item_update 

1227 

1228 self.logger.debug( 

1229 "task={} {} del-flavor={} {}".format( 

1230 task_id, 

1231 ro_task["target_id"], 

1232 flavor_vim_id, 

1233 ro_vim_item_update_ok.get("vim_message", ""), 

1234 ) 

1235 ) 

1236 

1237 return "DONE", ro_vim_item_update_ok 

1238 

1239 def new(self, ro_task, task_index, task_depends): 

1240 task = ro_task["tasks"][task_index] 

1241 task_id = task["task_id"] 

1242 created = False 

1243 created_items = {} 

1244 target_vim = self.my_vims[ro_task["target_id"]] 

1245 try: 

1246 # FIND 

1247 vim_flavor_id = None 

1248 

1249 if task.get("find_params", {}).get("vim_flavor_id"): 

1250 vim_flavor_id = task["find_params"]["vim_flavor_id"] 

1251 db_nsr = self.db.get_one("nsrs", {"_id": task["nsr_id"]}) 

1252 for vnfr_id in db_nsr.get("constituent-vnfr-ref"): 

1253 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id}) 

1254 for each_flavor in db_nsr["flavor"]: 

1255 nsd_flavor_id = each_flavor["id"] 

1256 for vdur in db_vnfr["vdur"]: 

1257 if ( 

1258 vdur.get("ns-flavor-id") 

1259 and vdur.get("ns-flavor-id") == nsd_flavor_id 

1260 ): 

1261 if vdur["additionalParams"]["OSM"].get("vim_flavor_id"): 

1262 flavor_id = vdur["additionalParams"]["OSM"][ 

1263 "vim_flavor_id" 

1264 ] 

1265 flavor_details = target_vim.get_flavor(flavor_id) 

1266 flavor_dict = { 

1267 "memory-mb": flavor_details["ram"], 

1268 "storage-gb": flavor_details["disk"], 

1269 "vcpu-count": flavor_details["vcpus"], 

1270 } 

1271 each_flavor.update(flavor_dict) 

1272 self.db.set_one("nsrs", {"_id": task["nsr_id"]}, db_nsr) 

1273 elif task.get("find_params", {}).get("vim_flavor_name"): 

1274 db_nsr = self.db.get_one("nsrs", {"_id": task["nsr_id"]}) 

1275 for vnfr_id in db_nsr.get("constituent-vnfr-ref"): 

1276 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id}) 

1277 for each_flavor in db_nsr["flavor"]: 

1278 nsd_flavor_id = each_flavor["id"] 

1279 for vdur in db_vnfr["vdur"]: 

1280 if vdur.get("ns-flavor-id") == nsd_flavor_id: 

1281 if vdur["additionalParams"]["OSM"].get( 

1282 "vim_flavor_name" 

1283 ): 

1284 flavor_name = vdur["additionalParams"]["OSM"][ 

1285 "vim_flavor_name" 

1286 ] 

1287 flavor_details = target_vim.get_flavor( 

1288 flavor_name=flavor_name 

1289 ) 

1290 flavor_dict = { 

1291 "memory-mb": flavor_details["ram"], 

1292 "storage-gb": flavor_details["disk"], 

1293 "vcpu-count": flavor_details["vcpus"], 

1294 } 

1295 each_flavor.update(flavor_dict) 

1296 vim_flavor_id = flavor_details.get("id") 

1297 self.db.set_one("nsrs", {"_id": task["nsr_id"]}, db_nsr) 

1298 elif task.get("find_params", {}).get("flavor_data"): 

1299 try: 

1300 flavor_data = task["find_params"]["flavor_data"] 

1301 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data) 

1302 except vimconn.VimConnNotFoundException as flavor_not_found_msg: 

1303 self.logger.warning( 

1304 f"VimConnNotFoundException occured: {flavor_not_found_msg}" 

1305 ) 

1306 

1307 if not vim_flavor_id and task.get("params"): 

1308 # CREATE 

1309 flavor_data = task["params"]["flavor_data"] 

1310 vim_flavor_id = target_vim.new_flavor(flavor_data) 

1311 created = True 

1312 

1313 ro_vim_item_update = { 

1314 "vim_id": vim_flavor_id, 

1315 "vim_status": "ACTIVE", 

1316 "created": created, 

1317 "created_items": created_items, 

1318 "vim_details": None, 

1319 "vim_message": None, 

1320 } 

1321 self.logger.debug( 

1322 "task={} {} new-flavor={} created={}".format( 

1323 task_id, ro_task["target_id"], vim_flavor_id, created 

1324 ) 

1325 ) 

1326 

1327 return "DONE", ro_vim_item_update 

1328 except (vimconn.VimConnException, NsWorkerException) as e: 

1329 self.logger.error( 

1330 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e) 

1331 ) 

1332 ro_vim_item_update = { 

1333 "vim_status": "VIM_ERROR", 

1334 "created": created, 

1335 "vim_message": str(e), 

1336 } 

1337 

1338 return "FAILED", ro_vim_item_update 

1339 

1340 

1341class VimInteractionAffinityGroup(VimInteractionBase): 

1342 def delete(self, ro_task, task_index): 

1343 task = ro_task["tasks"][task_index] 

1344 task_id = task["task_id"] 

1345 affinity_group_vim_id = ro_task["vim_info"]["vim_id"] 

1346 ro_vim_item_update_ok = { 

1347 "vim_status": "DELETED", 

1348 "created": False, 

1349 "vim_message": "DELETED", 

1350 "vim_id": None, 

1351 } 

1352 

1353 try: 

1354 if affinity_group_vim_id: 

1355 target_vim = self.my_vims[ro_task["target_id"]] 

1356 target_vim.delete_affinity_group(affinity_group_vim_id) 

1357 except vimconn.VimConnNotFoundException: 

1358 ro_vim_item_update_ok["vim_message"] = "already deleted" 

1359 except vimconn.VimConnException as e: 

1360 self.logger.error( 

1361 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format( 

1362 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e 

1363 ) 

1364 ) 

1365 ro_vim_item_update = { 

1366 "vim_status": "VIM_ERROR", 

1367 "vim_message": "Error while deleting: {}".format(e), 

1368 } 

1369 

1370 return "FAILED", ro_vim_item_update 

1371 

1372 self.logger.debug( 

1373 "task={} {} del-affinity-or-anti-affinity-group={} {}".format( 

1374 task_id, 

1375 ro_task["target_id"], 

1376 affinity_group_vim_id, 

1377 ro_vim_item_update_ok.get("vim_message", ""), 

1378 ) 

1379 ) 

1380 

1381 return "DONE", ro_vim_item_update_ok 

1382 

1383 def new(self, ro_task, task_index, task_depends): 

1384 task = ro_task["tasks"][task_index] 

1385 task_id = task["task_id"] 

1386 created = False 

1387 created_items = {} 

1388 target_vim = self.my_vims[ro_task["target_id"]] 

1389 

1390 try: 

1391 affinity_group_vim_id = None 

1392 affinity_group_data = None 

1393 param_affinity_group_id = "" 

1394 

1395 if task.get("params"): 

1396 affinity_group_data = task["params"].get("affinity_group_data") 

1397 

1398 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"): 

1399 try: 

1400 param_affinity_group_id = task["params"]["affinity_group_data"].get( 

1401 "vim-affinity-group-id" 

1402 ) 

1403 affinity_group_vim_id = target_vim.get_affinity_group( 

1404 param_affinity_group_id 

1405 ).get("id") 

1406 except vimconn.VimConnNotFoundException: 

1407 self.logger.error( 

1408 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}" 

1409 "could not be found at VIM. Creating a new one.".format( 

1410 task_id, ro_task["target_id"], param_affinity_group_id 

1411 ) 

1412 ) 

1413 

1414 if not affinity_group_vim_id and affinity_group_data: 

1415 affinity_group_vim_id = target_vim.new_affinity_group( 

1416 affinity_group_data 

1417 ) 

1418 created = True 

1419 

1420 ro_vim_item_update = { 

1421 "vim_id": affinity_group_vim_id, 

1422 "vim_status": "ACTIVE", 

1423 "created": created, 

1424 "created_items": created_items, 

1425 "vim_details": None, 

1426 "vim_message": None, 

1427 } 

1428 self.logger.debug( 

1429 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format( 

1430 task_id, ro_task["target_id"], affinity_group_vim_id, created 

1431 ) 

1432 ) 

1433 

1434 return "DONE", ro_vim_item_update 

1435 except (vimconn.VimConnException, NsWorkerException) as e: 

1436 self.logger.error( 

1437 "task={} vim={} new-affinity-or-anti-affinity-group:" 

1438 " {}".format(task_id, ro_task["target_id"], e) 

1439 ) 

1440 ro_vim_item_update = { 

1441 "vim_status": "VIM_ERROR", 

1442 "created": created, 

1443 "vim_message": str(e), 

1444 } 

1445 

1446 return "FAILED", ro_vim_item_update 

1447 

1448 

1449class VimInteractionUpdateVdu(VimInteractionBase): 

1450 def exec(self, ro_task, task_index, task_depends): 

1451 task = ro_task["tasks"][task_index] 

1452 task_id = task["task_id"] 

1453 db_task_update = {"retries": 0} 

1454 target_vim = self.my_vims[ro_task["target_id"]] 

1455 

1456 try: 

1457 vim_vm_id = "" 

1458 if task.get("params"): 

1459 vim_vm_id = task["params"].get("vim_vm_id") 

1460 action = task["params"].get("action") 

1461 context = {action: action} 

1462 target_vim.action_vminstance(vim_vm_id, context) 

1463 # created = True 

1464 ro_vim_item_update = { 

1465 "vim_id": vim_vm_id, 

1466 "vim_status": "ACTIVE", 

1467 } 

1468 self.logger.debug( 

1469 "task={} {} vm-migration done".format(task_id, ro_task["target_id"]) 

1470 ) 

1471 return "DONE", ro_vim_item_update, db_task_update 

1472 except (vimconn.VimConnException, NsWorkerException) as e: 

1473 self.logger.error( 

1474 "task={} vim={} VM Migration:" 

1475 " {}".format(task_id, ro_task["target_id"], e) 

1476 ) 

1477 ro_vim_item_update = { 

1478 "vim_status": "VIM_ERROR", 

1479 "vim_message": str(e), 

1480 } 

1481 

1482 return "FAILED", ro_vim_item_update, db_task_update 

1483 

1484 

1485class VimInteractionSdnNet(VimInteractionBase): 

1486 @staticmethod 

1487 def _match_pci(port_pci, mapping): 

1488 """ 

1489 Check if port_pci matches with mapping. 

1490 The mapping can have brackets to indicate that several chars are accepted. e.g 

1491 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]' 

1492 :param port_pci: text 

1493 :param mapping: text, can contain brackets to indicate several chars are available 

1494 :return: True if matches, False otherwise 

1495 """ 

1496 if not port_pci or not mapping: 

1497 return False 

1498 if port_pci == mapping: 

1499 return True 

1500 

1501 mapping_index = 0 

1502 pci_index = 0 

1503 while True: 

1504 bracket_start = mapping.find("[", mapping_index) 

1505 

1506 if bracket_start == -1: 

1507 break 

1508 

1509 bracket_end = mapping.find("]", bracket_start) 

1510 if bracket_end == -1: 

1511 break 

1512 

1513 length = bracket_start - mapping_index 

1514 if ( 

1515 length 

1516 and port_pci[pci_index : pci_index + length] 

1517 != mapping[mapping_index:bracket_start] 

1518 ): 

1519 return False 

1520 

1521 if ( 

1522 port_pci[pci_index + length] 

1523 not in mapping[bracket_start + 1 : bracket_end] 

1524 ): 

1525 return False 

1526 

1527 pci_index += length + 1 

1528 mapping_index = bracket_end + 1 

1529 

1530 if port_pci[pci_index:] != mapping[mapping_index:]: 

1531 return False 

1532 

1533 return True 

1534 

1535 def _get_interfaces(self, vlds_to_connect, vim_account_id): 

1536 """ 

1537 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id> 

1538 :param vim_account_id: 

1539 :return: 

1540 """ 

1541 interfaces = [] 

1542 

1543 for vld in vlds_to_connect: 

1544 table, _, db_id = vld.partition(":") 

1545 db_id, _, vld = db_id.partition(":") 

1546 _, _, vld_id = vld.partition(".") 

1547 

1548 if table == "vnfrs": 

1549 q_filter = {"vim-account-id": vim_account_id, "_id": db_id} 

1550 iface_key = "vnf-vld-id" 

1551 else: # table == "nsrs" 

1552 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id} 

1553 iface_key = "ns-vld-id" 

1554 

1555 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter) 

1556 

1557 for db_vnfr in db_vnfrs: 

1558 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())): 

1559 for iface_index, interface in enumerate(vdur["interfaces"]): 

1560 if interface.get(iface_key) == vld_id and interface.get( 

1561 "type" 

1562 ) in ("SR-IOV", "PCI-PASSTHROUGH"): 

1563 # only SR-IOV o PT 

1564 interface_ = interface.copy() 

1565 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format( 

1566 db_vnfr["_id"], vdu_index, iface_index 

1567 ) 

1568 

1569 if vdur.get("status") == "ERROR": 

1570 interface_["status"] = "ERROR" 

1571 

1572 interfaces.append(interface_) 

1573 

1574 return interfaces 

1575 

1576 def refresh(self, ro_task): 

1577 # look for task create 

1578 task_create_index, _ = next( 

1579 i_t 

1580 for i_t in enumerate(ro_task["tasks"]) 

1581 if i_t[1] 

1582 and i_t[1]["action"] == "CREATE" 

1583 and i_t[1]["status"] != "FINISHED" 

1584 ) 

1585 

1586 return self.new(ro_task, task_create_index, None) 

1587 

1588 def new(self, ro_task, task_index, task_depends): 

1589 task = ro_task["tasks"][task_index] 

1590 task_id = task["task_id"] 

1591 target_vim = self.my_vims[ro_task["target_id"]] 

1592 

1593 sdn_net_id = ro_task["vim_info"]["vim_id"] 

1594 

1595 created_items = ro_task["vim_info"].get("created_items") 

1596 connected_ports = ro_task["vim_info"].get("connected_ports", []) 

1597 new_connected_ports = [] 

1598 last_update = ro_task["vim_info"].get("last_update", 0) 

1599 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD" 

1600 error_list = [] 

1601 created = ro_task["vim_info"].get("created", False) 

1602 

1603 try: 

1604 # CREATE 

1605 db_vim = {} 

1606 params = task["params"] 

1607 vlds_to_connect = params.get("vlds", []) 

1608 associated_vim = params.get("target_vim") 

1609 # external additional ports 

1610 additional_ports = params.get("sdn-ports") or () 

1611 _, _, vim_account_id = ( 

1612 (None, None, None) 

1613 if associated_vim is None 

1614 else associated_vim.partition(":") 

1615 ) 

1616 

1617 if associated_vim: 

1618 # get associated VIM 

1619 if associated_vim not in self.db_vims: 

1620 self.db_vims[associated_vim] = self.db.get_one( 

1621 "vim_accounts", {"_id": vim_account_id} 

1622 ) 

1623 

1624 db_vim = self.db_vims[associated_vim] 

1625 

1626 # look for ports to connect 

1627 ports = self._get_interfaces(vlds_to_connect, vim_account_id) 

1628 # print(ports) 

1629 

1630 sdn_ports = [] 

1631 pending_ports = error_ports = 0 

1632 vlan_used = None 

1633 sdn_need_update = False 

1634 

1635 for port in ports: 

1636 vlan_used = port.get("vlan") or vlan_used 

1637 

1638 # TODO. Do not connect if already done 

1639 if not port.get("compute_node") or not port.get("pci"): 

1640 if port.get("status") == "ERROR": 

1641 error_ports += 1 

1642 else: 

1643 pending_ports += 1 

1644 continue 

1645 

1646 pmap = None 

1647 compute_node_mappings = next( 

1648 ( 

1649 c 

1650 for c in db_vim["config"].get("sdn-port-mapping", ()) 

1651 if c and c["compute_node"] == port["compute_node"] 

1652 ), 

1653 None, 

1654 ) 

1655 

1656 if compute_node_mappings: 

1657 # process port_mapping pci of type 0000:af:1[01].[1357] 

1658 pmap = next( 

1659 ( 

1660 p 

1661 for p in compute_node_mappings["ports"] 

1662 if self._match_pci(port["pci"], p.get("pci")) 

1663 ), 

1664 None, 

1665 ) 

1666 

1667 if not pmap: 

1668 if not db_vim["config"].get("mapping_not_needed"): 

1669 error_list.append( 

1670 "Port mapping not found for compute_node={} pci={}".format( 

1671 port["compute_node"], port["pci"] 

1672 ) 

1673 ) 

1674 continue 

1675 

1676 pmap = {} 

1677 

1678 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"]) 

1679 new_port = { 

1680 "service_endpoint_id": pmap.get("service_endpoint_id") 

1681 or service_endpoint_id, 

1682 "service_endpoint_encapsulation_type": ( 

1683 "dot1q" if port["type"] == "SR-IOV" else None 

1684 ), 

1685 "service_endpoint_encapsulation_info": { 

1686 "vlan": port.get("vlan"), 

1687 "mac": port.get("mac-address"), 

1688 "device_id": pmap.get("device_id") or port["compute_node"], 

1689 "device_interface_id": pmap.get("device_interface_id") 

1690 or port["pci"], 

1691 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"), 

1692 "switch_port": pmap.get("switch_port"), 

1693 "service_mapping_info": pmap.get("service_mapping_info"), 

1694 }, 

1695 } 

1696 

1697 # TODO 

1698 # if port["modified_at"] > last_update: 

1699 # sdn_need_update = True 

1700 new_connected_ports.append(port["id"]) # TODO 

1701 sdn_ports.append(new_port) 

1702 

1703 if error_ports: 

1704 error_list.append( 

1705 "{} interfaces have not been created as VDU is on ERROR status".format( 

1706 error_ports 

1707 ) 

1708 ) 

1709 

1710 # connect external ports 

1711 for index, additional_port in enumerate(additional_ports): 

1712 additional_port_id = additional_port.get( 

1713 "service_endpoint_id" 

1714 ) or "external-{}".format(index) 

1715 sdn_ports.append( 

1716 { 

1717 "service_endpoint_id": additional_port_id, 

1718 "service_endpoint_encapsulation_type": additional_port.get( 

1719 "service_endpoint_encapsulation_type", "dot1q" 

1720 ), 

1721 "service_endpoint_encapsulation_info": { 

1722 "vlan": additional_port.get("vlan") or vlan_used, 

1723 "mac": additional_port.get("mac_address"), 

1724 "device_id": additional_port.get("device_id"), 

1725 "device_interface_id": additional_port.get( 

1726 "device_interface_id" 

1727 ), 

1728 "switch_dpid": additional_port.get("switch_dpid") 

1729 or additional_port.get("switch_id"), 

1730 "switch_port": additional_port.get("switch_port"), 

1731 "service_mapping_info": additional_port.get( 

1732 "service_mapping_info" 

1733 ), 

1734 }, 

1735 } 

1736 ) 

1737 new_connected_ports.append(additional_port_id) 

1738 sdn_info = "" 

1739 

1740 # if there are more ports to connect or they have been modified, call create/update 

1741 if error_list: 

1742 sdn_status = "ERROR" 

1743 sdn_info = "; ".join(error_list) 

1744 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update: 

1745 last_update = time.time() 

1746 

1747 if not sdn_net_id: 

1748 if len(sdn_ports) < 2: 

1749 sdn_status = "ACTIVE" 

1750 

1751 if not pending_ports: 

1752 self.logger.debug( 

1753 "task={} {} new-sdn-net done, less than 2 ports".format( 

1754 task_id, ro_task["target_id"] 

1755 ) 

1756 ) 

1757 else: 

1758 net_type = params.get("type") or "ELAN" 

1759 ( 

1760 sdn_net_id, 

1761 created_items, 

1762 ) = target_vim.create_connectivity_service(net_type, sdn_ports) 

1763 created = True 

1764 self.logger.debug( 

1765 "task={} {} new-sdn-net={} created={}".format( 

1766 task_id, ro_task["target_id"], sdn_net_id, created 

1767 ) 

1768 ) 

1769 else: 

1770 created_items = target_vim.edit_connectivity_service( 

1771 sdn_net_id, conn_info=created_items, connection_points=sdn_ports 

1772 ) 

1773 created = True 

1774 self.logger.debug( 

1775 "task={} {} update-sdn-net={} created={}".format( 

1776 task_id, ro_task["target_id"], sdn_net_id, created 

1777 ) 

1778 ) 

1779 

1780 connected_ports = new_connected_ports 

1781 elif sdn_net_id: 

1782 wim_status_dict = target_vim.get_connectivity_service_status( 

1783 sdn_net_id, conn_info=created_items 

1784 ) 

1785 sdn_status = wim_status_dict["sdn_status"] 

1786 

1787 if wim_status_dict.get("sdn_info"): 

1788 sdn_info = str(wim_status_dict.get("sdn_info")) or "" 

1789 

1790 if wim_status_dict.get("error_msg"): 

1791 sdn_info = wim_status_dict.get("error_msg") or "" 

1792 

1793 if pending_ports: 

1794 if sdn_status != "ERROR": 

1795 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format( 

1796 len(ports) - pending_ports, len(ports) 

1797 ) 

1798 

1799 if sdn_status == "ACTIVE": 

1800 sdn_status = "BUILD" 

1801 

1802 ro_vim_item_update = { 

1803 "vim_id": sdn_net_id, 

1804 "vim_status": sdn_status, 

1805 "created": created, 

1806 "created_items": created_items, 

1807 "connected_ports": connected_ports, 

1808 "vim_details": sdn_info, 

1809 "vim_message": None, 

1810 "last_update": last_update, 

1811 } 

1812 

1813 return sdn_status, ro_vim_item_update 

1814 except Exception as e: 

1815 self.logger.error( 

1816 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e), 

1817 exc_info=not isinstance( 

1818 e, (sdnconn.SdnConnectorError, vimconn.VimConnException) 

1819 ), 

1820 ) 

1821 ro_vim_item_update = { 

1822 "vim_status": "VIM_ERROR", 

1823 "created": created, 

1824 "vim_message": str(e), 

1825 } 

1826 

1827 return "FAILED", ro_vim_item_update 

1828 

1829 def delete(self, ro_task, task_index): 

1830 task = ro_task["tasks"][task_index] 

1831 task_id = task["task_id"] 

1832 sdn_vim_id = ro_task["vim_info"].get("vim_id") 

1833 ro_vim_item_update_ok = { 

1834 "vim_status": "DELETED", 

1835 "created": False, 

1836 "vim_message": "DELETED", 

1837 "vim_id": None, 

1838 } 

1839 

1840 try: 

1841 if sdn_vim_id: 

1842 target_vim = self.my_vims[ro_task["target_id"]] 

1843 target_vim.delete_connectivity_service( 

1844 sdn_vim_id, ro_task["vim_info"].get("created_items") 

1845 ) 

1846 

1847 except Exception as e: 

1848 if ( 

1849 isinstance(e, sdnconn.SdnConnectorError) 

1850 and e.http_code == HTTPStatus.NOT_FOUND.value 

1851 ): 

1852 ro_vim_item_update_ok["vim_message"] = "already deleted" 

1853 else: 

1854 self.logger.error( 

1855 "ro_task={} vim={} del-sdn-net={}: {}".format( 

1856 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e 

1857 ), 

1858 exc_info=not isinstance( 

1859 e, (sdnconn.SdnConnectorError, vimconn.VimConnException) 

1860 ), 

1861 ) 

1862 ro_vim_item_update = { 

1863 "vim_status": "VIM_ERROR", 

1864 "vim_message": "Error while deleting: {}".format(e), 

1865 } 

1866 

1867 return "FAILED", ro_vim_item_update 

1868 

1869 self.logger.debug( 

1870 "task={} {} del-sdn-net={} {}".format( 

1871 task_id, 

1872 ro_task["target_id"], 

1873 sdn_vim_id, 

1874 ro_vim_item_update_ok.get("vim_message", ""), 

1875 ) 

1876 ) 

1877 

1878 return "DONE", ro_vim_item_update_ok 

1879 

1880 

1881class VimInteractionMigration(VimInteractionBase): 

1882 def exec(self, ro_task, task_index, task_depends): 

1883 task = ro_task["tasks"][task_index] 

1884 task_id = task["task_id"] 

1885 db_task_update = {"retries": 0} 

1886 target_vim = self.my_vims[ro_task["target_id"]] 

1887 vim_interfaces = [] 

1888 refreshed_vim_info = {} 

1889 

1890 try: 

1891 vim_vm_id = "" 

1892 if task.get("params"): 

1893 vim_vm_id = task["params"].get("vim_vm_id") 

1894 migrate_host = task["params"].get("migrate_host") 

1895 _, migrated_compute_node = target_vim.migrate_instance( 

1896 vim_vm_id, migrate_host 

1897 ) 

1898 

1899 if migrated_compute_node: 

1900 # When VM is migrated, vdu["vim_info"] needs to be updated 

1901 vdu_old_vim_info = task["params"]["vdu_vim_info"].get( 

1902 ro_task["target_id"] 

1903 ) 

1904 

1905 # Refresh VM to get new vim_info 

1906 vm_to_refresh_list = [vim_vm_id] 

1907 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list) 

1908 refreshed_vim_info = vim_dict[vim_vm_id] 

1909 

1910 if refreshed_vim_info.get("interfaces"): 

1911 for old_iface in vdu_old_vim_info.get("interfaces"): 

1912 iface = next( 

1913 ( 

1914 iface 

1915 for iface in refreshed_vim_info["interfaces"] 

1916 if old_iface["vim_interface_id"] 

1917 == iface["vim_interface_id"] 

1918 ), 

1919 None, 

1920 ) 

1921 vim_interfaces.append(iface) 

1922 

1923 ro_vim_item_update = { 

1924 "vim_id": vim_vm_id, 

1925 "vim_status": "ACTIVE", 

1926 "vim_details": None, 

1927 "vim_message": None, 

1928 } 

1929 

1930 if refreshed_vim_info and refreshed_vim_info.get("status") not in ( 

1931 "ERROR", 

1932 "VIM_ERROR", 

1933 ): 

1934 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"] 

1935 

1936 if vim_interfaces: 

1937 ro_vim_item_update["interfaces"] = vim_interfaces 

1938 

1939 self.logger.debug( 

1940 "task={} {} vm-migration done".format(task_id, ro_task["target_id"]) 

1941 ) 

1942 

1943 return "DONE", ro_vim_item_update, db_task_update 

1944 

1945 except (vimconn.VimConnException, NsWorkerException) as e: 

1946 self.logger.error( 

1947 "task={} vim={} VM Migration:" 

1948 " {}".format(task_id, ro_task["target_id"], e) 

1949 ) 

1950 ro_vim_item_update = { 

1951 "vim_status": "VIM_ERROR", 

1952 "vim_message": str(e), 

1953 } 

1954 

1955 return "FAILED", ro_vim_item_update, db_task_update 

1956 

1957 

1958class VimInteractionResize(VimInteractionBase): 

1959 def exec(self, ro_task, task_index, task_depends): 

1960 task = ro_task["tasks"][task_index] 

1961 task_id = task["task_id"] 

1962 db_task_update = {"retries": 0} 

1963 target_flavor_uuid = None 

1964 refreshed_vim_info = {} 

1965 target_vim = self.my_vims[ro_task["target_id"]] 

1966 

1967 try: 

1968 params = task["params"] 

1969 params_copy = deepcopy(params) 

1970 target_flavor_uuid = task_depends[params_copy["flavor_id"]] 

1971 vim_vm_id = "" 

1972 if task.get("params"): 

1973 self.logger.info("vim_vm_id %s", vim_vm_id) 

1974 

1975 if target_flavor_uuid is not None: 

1976 resized_status = target_vim.resize_instance( 

1977 vim_vm_id, target_flavor_uuid 

1978 ) 

1979 

1980 if resized_status: 

1981 # Refresh VM to get new vim_info 

1982 vm_to_refresh_list = [vim_vm_id] 

1983 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list) 

1984 refreshed_vim_info = vim_dict[vim_vm_id] 

1985 

1986 ro_vim_item_update = { 

1987 "vim_id": vim_vm_id, 

1988 "vim_status": "ACTIVE", 

1989 "vim_details": None, 

1990 "vim_message": None, 

1991 } 

1992 

1993 if refreshed_vim_info and refreshed_vim_info.get("status") not in ( 

1994 "ERROR", 

1995 "VIM_ERROR", 

1996 ): 

1997 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"] 

1998 

1999 self.logger.debug( 

2000 "task={} {} resize done".format(task_id, ro_task["target_id"]) 

2001 ) 

2002 return "DONE", ro_vim_item_update, db_task_update 

2003 except (vimconn.VimConnException, NsWorkerException) as e: 

2004 self.logger.error( 

2005 "task={} vim={} Resize:" " {}".format(task_id, ro_task["target_id"], e) 

2006 ) 

2007 ro_vim_item_update = { 

2008 "vim_status": "VIM_ERROR", 

2009 "vim_message": str(e), 

2010 } 

2011 

2012 return "FAILED", ro_vim_item_update, db_task_update 

2013 

2014 

2015class ConfigValidate: 

2016 def __init__(self, config: Dict): 

2017 self.conf = config 

2018 

2019 @property 

2020 def active(self): 

2021 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks 

2022 if ( 

2023 self.conf["period"]["refresh_active"] >= 60 

2024 or self.conf["period"]["refresh_active"] == -1 

2025 ): 

2026 return self.conf["period"]["refresh_active"] 

2027 

2028 return 60 

2029 

2030 @property 

2031 def build(self): 

2032 return self.conf["period"]["refresh_build"] 

2033 

2034 @property 

2035 def image(self): 

2036 return self.conf["period"]["refresh_image"] 

2037 

2038 @property 

2039 def error(self): 

2040 return self.conf["period"]["refresh_error"] 

2041 

2042 @property 

2043 def queue_size(self): 

2044 return self.conf["period"]["queue_size"] 

2045 

2046 

2047class NsWorker(threading.Thread): 

2048 def __init__(self, worker_index, config, plugins, db): 

2049 """ 

2050 :param worker_index: thread index 

2051 :param config: general configuration of RO, among others the process_id with the docker id where it runs 

2052 :param plugins: global shared dict with the loaded plugins 

2053 :param db: database class instance to use 

2054 """ 

2055 threading.Thread.__init__(self) 

2056 self.config = config 

2057 self.plugins = plugins 

2058 self.plugin_name = "unknown" 

2059 self.logger = logging.getLogger("ro.worker{}".format(worker_index)) 

2060 self.worker_index = worker_index 

2061 # refresh periods for created items 

2062 self.refresh_config = ConfigValidate(config) 

2063 self.task_queue = queue.Queue(self.refresh_config.queue_size) 

2064 # targetvim: vimplugin class 

2065 self.my_vims = {} 

2066 # targetvim: vim information from database 

2067 self.db_vims = {} 

2068 # targetvim list 

2069 self.vim_targets = [] 

2070 self.my_id = config["process_id"] + ":" + str(worker_index) 

2071 self.db = db 

2072 self.item2class = { 

2073 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger), 

2074 "shared-volumes": VimInteractionSharedVolume( 

2075 self.db, self.my_vims, self.db_vims, self.logger 

2076 ), 

2077 "classification": VimInteractionClassification( 

2078 self.db, self.my_vims, self.db_vims, self.logger 

2079 ), 

2080 "sfi": VimInteractionSfi(self.db, self.my_vims, self.db_vims, self.logger), 

2081 "sf": VimInteractionSf(self.db, self.my_vims, self.db_vims, self.logger), 

2082 "sfp": VimInteractionSfp(self.db, self.my_vims, self.db_vims, self.logger), 

2083 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger), 

2084 "image": VimInteractionImage( 

2085 self.db, self.my_vims, self.db_vims, self.logger 

2086 ), 

2087 "flavor": VimInteractionFlavor( 

2088 self.db, self.my_vims, self.db_vims, self.logger 

2089 ), 

2090 "sdn_net": VimInteractionSdnNet( 

2091 self.db, self.my_vims, self.db_vims, self.logger 

2092 ), 

2093 "update": VimInteractionUpdateVdu( 

2094 self.db, self.my_vims, self.db_vims, self.logger 

2095 ), 

2096 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup( 

2097 self.db, self.my_vims, self.db_vims, self.logger 

2098 ), 

2099 "migrate": VimInteractionMigration( 

2100 self.db, self.my_vims, self.db_vims, self.logger 

2101 ), 

2102 "verticalscale": VimInteractionResize( 

2103 self.db, self.my_vims, self.db_vims, self.logger 

2104 ), 

2105 } 

2106 self.time_last_task_processed = None 

2107 # lists of tasks to delete because nsrs or vnfrs has been deleted from db 

2108 self.tasks_to_delete = [] 

2109 # it is idle when there are not vim_targets associated 

2110 self.idle = True 

2111 self.task_locked_time = config["global"]["task_locked_time"] 

2112 

2113 def insert_task(self, task): 

2114 try: 

2115 self.task_queue.put(task, False) 

2116 return None 

2117 except queue.Full: 

2118 raise NsWorkerException("timeout inserting a task") 

2119 

2120 def terminate(self): 

2121 self.insert_task("exit") 

2122 

2123 def del_task(self, task): 

2124 with self.task_lock: 

2125 if task["status"] == "SCHEDULED": 

2126 task["status"] = "SUPERSEDED" 

2127 return True 

2128 else: # task["status"] == "processing" 

2129 self.task_lock.release() 

2130 return False 

2131 

2132 def _process_vim_config(self, target_id: str, db_vim: dict) -> None: 

2133 """ 

2134 Process vim config, creating vim configuration files as ca_cert 

2135 :param target_id: vim/sdn/wim + id 

2136 :param db_vim: Vim dictionary obtained from database 

2137 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files 

2138 """ 

2139 if not db_vim.get("config"): 

2140 return 

2141 

2142 file_name = "" 

2143 work_dir = "/app/osm_ro/certs" 

2144 

2145 try: 

2146 if db_vim["config"].get("ca_cert_content"): 

2147 file_name = f"{work_dir}/{target_id}:{self.worker_index}" 

2148 

2149 if not path.isdir(file_name): 

2150 makedirs(file_name) 

2151 

2152 file_name = file_name + "/ca_cert" 

2153 

2154 with open(file_name, "w") as f: 

2155 f.write(db_vim["config"]["ca_cert_content"]) 

2156 del db_vim["config"]["ca_cert_content"] 

2157 db_vim["config"]["ca_cert"] = file_name 

2158 except Exception as e: 

2159 raise NsWorkerException( 

2160 "Error writing to file '{}': {}".format(file_name, e) 

2161 ) 

2162 

2163 def _load_plugin(self, name, type="vim"): 

2164 # type can be vim or sdn 

2165 if "rovim_dummy" not in self.plugins: 

2166 self.plugins["rovim_dummy"] = VimDummyConnector 

2167 

2168 if "rosdn_dummy" not in self.plugins: 

2169 self.plugins["rosdn_dummy"] = SdnDummyConnector 

2170 

2171 if name in self.plugins: 

2172 return self.plugins[name] 

2173 

2174 try: 

2175 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name): 

2176 self.plugins[name] = ep.load() 

2177 except Exception as e: 

2178 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e)) 

2179 

2180 if name and name not in self.plugins: 

2181 raise NsWorkerException( 

2182 "Plugin 'osm_{n}' has not been installed".format(n=name) 

2183 ) 

2184 

2185 return self.plugins[name] 

2186 

2187 def _unload_vim(self, target_id): 

2188 """ 

2189 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list 

2190 :param target_id: Contains type:_id; where type can be 'vim', ... 

2191 :return: None. 

2192 """ 

2193 try: 

2194 self.db_vims.pop(target_id, None) 

2195 self.my_vims.pop(target_id, None) 

2196 

2197 if target_id in self.vim_targets: 

2198 self.vim_targets.remove(target_id) 

2199 

2200 self.logger.info("Unloaded {}".format(target_id)) 

2201 except Exception as e: 

2202 self.logger.error("Cannot unload {}: {}".format(target_id, e)) 

2203 

2204 def _check_vim(self, target_id): 

2205 """ 

2206 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR 

2207 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim' 

2208 :return: None. 

2209 """ 

2210 target, _, _id = target_id.partition(":") 

2211 now = time.time() 

2212 update_dict = {} 

2213 unset_dict = {} 

2214 op_text = "" 

2215 step = "" 

2216 loaded = target_id in self.vim_targets 

2217 target_database = ( 

2218 "vim_accounts" 

2219 if target == "vim" 

2220 else "wim_accounts" if target == "wim" else "sdns" 

2221 ) 

2222 error_text = "" 

2223 

2224 try: 

2225 step = "Getting {} from db".format(target_id) 

2226 db_vim = self.db.get_one(target_database, {"_id": _id}) 

2227 

2228 for op_index, operation in enumerate( 

2229 db_vim["_admin"].get("operations", ()) 

2230 ): 

2231 if operation["operationState"] != "PROCESSING": 

2232 continue 

2233 

2234 locked_at = operation.get("locked_at") 

2235 

2236 if locked_at is not None and locked_at >= now - self.task_locked_time: 

2237 # some other thread is doing this operation 

2238 return 

2239 

2240 # lock 

2241 op_text = "_admin.operations.{}.".format(op_index) 

2242 

2243 if not self.db.set_one( 

2244 target_database, 

2245 q_filter={ 

2246 "_id": _id, 

2247 op_text + "operationState": "PROCESSING", 

2248 op_text + "locked_at": locked_at, 

2249 }, 

2250 update_dict={ 

2251 op_text + "locked_at": now, 

2252 "admin.current_operation": op_index, 

2253 }, 

2254 fail_on_empty=False, 

2255 ): 

2256 return 

2257 

2258 unset_dict[op_text + "locked_at"] = None 

2259 unset_dict["current_operation"] = None 

2260 step = "Loading " + target_id 

2261 error_text = self._load_vim(target_id) 

2262 

2263 if not error_text: 

2264 step = "Checking connectivity" 

2265 

2266 if target == "vim": 

2267 self.my_vims[target_id].check_vim_connectivity() 

2268 else: 

2269 self.my_vims[target_id].check_credentials() 

2270 

2271 update_dict["_admin.operationalState"] = "ENABLED" 

2272 update_dict["_admin.detailed-status"] = "" 

2273 unset_dict[op_text + "detailed-status"] = None 

2274 update_dict[op_text + "operationState"] = "COMPLETED" 

2275 

2276 return 

2277 

2278 except Exception as e: 

2279 error_text = "{}: {}".format(step, e) 

2280 self.logger.error("{} for {}: {}".format(step, target_id, e)) 

2281 

2282 finally: 

2283 if update_dict or unset_dict: 

2284 if error_text: 

2285 update_dict[op_text + "operationState"] = "FAILED" 

2286 update_dict[op_text + "detailed-status"] = error_text 

2287 unset_dict.pop(op_text + "detailed-status", None) 

2288 update_dict["_admin.operationalState"] = "ERROR" 

2289 update_dict["_admin.detailed-status"] = error_text 

2290 

2291 if op_text: 

2292 update_dict[op_text + "statusEnteredTime"] = now 

2293 

2294 self.db.set_one( 

2295 target_database, 

2296 q_filter={"_id": _id}, 

2297 update_dict=update_dict, 

2298 unset=unset_dict, 

2299 fail_on_empty=False, 

2300 ) 

2301 

2302 if not loaded: 

2303 self._unload_vim(target_id) 

2304 

2305 def _reload_vim(self, target_id): 

2306 if target_id in self.vim_targets: 

2307 self._load_vim(target_id) 

2308 else: 

2309 # if the vim is not loaded, but database information of VIM is cached at self.db_vims, 

2310 # just remove it to force load again next time it is needed 

2311 self.db_vims.pop(target_id, None) 

2312 

2313 def _load_vim(self, target_id): 

2314 """ 

2315 Load or reload a vim_account, sdn_controller or wim_account. 

2316 Read content from database, load the plugin if not loaded. 

2317 In case of error loading the plugin, it loads a failing VIM_connector 

2318 It fills self db_vims dictionary, my_vims dictionary and vim_targets list 

2319 :param target_id: Contains type:_id; where type can be 'vim', ... 

2320 :return: None if ok, descriptive text if error 

2321 """ 

2322 target, _, _id = target_id.partition(":") 

2323 target_database = ( 

2324 "vim_accounts" 

2325 if target == "vim" 

2326 else "wim_accounts" if target == "wim" else "sdns" 

2327 ) 

2328 plugin_name = "" 

2329 vim = None 

2330 step = "Getting {}={} from db".format(target, _id) 

2331 

2332 try: 

2333 # TODO process for wim, sdnc, ... 

2334 vim = self.db.get_one(target_database, {"_id": _id}) 

2335 

2336 # if deep_get(vim, "config", "sdn-controller"): 

2337 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"]) 

2338 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]}) 

2339 

2340 step = "Decrypting password" 

2341 schema_version = vim.get("schema_version") 

2342 self.db.encrypt_decrypt_fields( 

2343 vim, 

2344 "decrypt", 

2345 fields=("password", "secret"), 

2346 schema_version=schema_version, 

2347 salt=_id, 

2348 ) 

2349 self._process_vim_config(target_id, vim) 

2350 

2351 if target == "vim": 

2352 plugin_name = "rovim_" + vim["vim_type"] 

2353 step = "Loading plugin '{}'".format(plugin_name) 

2354 vim_module_conn = self._load_plugin(plugin_name) 

2355 step = "Loading {}'".format(target_id) 

2356 self.my_vims[target_id] = vim_module_conn( 

2357 uuid=vim["_id"], 

2358 name=vim["name"], 

2359 tenant_id=vim.get("vim_tenant_id"), 

2360 tenant_name=vim.get("vim_tenant_name"), 

2361 url=vim["vim_url"], 

2362 url_admin=None, 

2363 user=vim["vim_user"], 

2364 passwd=vim["vim_password"], 

2365 config=vim.get("config") or {}, 

2366 persistent_info={}, 

2367 ) 

2368 else: # sdn 

2369 plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type")) 

2370 step = "Loading plugin '{}'".format(plugin_name) 

2371 vim_module_conn = self._load_plugin(plugin_name, "sdn") 

2372 step = "Loading {}'".format(target_id) 

2373 wim = deepcopy(vim) 

2374 wim_config = wim.pop("config", {}) or {} 

2375 wim["uuid"] = wim["_id"] 

2376 if "url" in wim and "wim_url" not in wim: 

2377 wim["wim_url"] = wim["url"] 

2378 elif "url" not in wim and "wim_url" in wim: 

2379 wim["url"] = wim["wim_url"] 

2380 

2381 if wim.get("dpid"): 

2382 wim_config["dpid"] = wim.pop("dpid") 

2383 

2384 if wim.get("switch_id"): 

2385 wim_config["switch_id"] = wim.pop("switch_id") 

2386 

2387 # wim, wim_account, config 

2388 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config) 

2389 self.db_vims[target_id] = vim 

2390 self.error_status = None 

2391 

2392 self.logger.info( 

2393 "Connector loaded for {}, plugin={}".format(target_id, plugin_name) 

2394 ) 

2395 except Exception as e: 

2396 self.logger.error( 

2397 "Cannot load {} plugin={}: {} {}".format( 

2398 target_id, plugin_name, step, e 

2399 ) 

2400 ) 

2401 

2402 self.db_vims[target_id] = vim or {} 

2403 self.db_vims[target_id] = FailingConnector(str(e)) 

2404 error_status = "{} Error: {}".format(step, e) 

2405 

2406 return error_status 

2407 finally: 

2408 if target_id not in self.vim_targets: 

2409 self.vim_targets.append(target_id) 

2410 

2411 def _get_db_task(self): 

2412 """ 

2413 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions 

2414 :return: None 

2415 """ 

2416 now = time.time() 

2417 

2418 if not self.time_last_task_processed: 

2419 self.time_last_task_processed = now 

2420 

2421 try: 

2422 while True: 

2423 """ 

2424 # Log RO tasks only when loglevel is DEBUG 

2425 if self.logger.getEffectiveLevel() == logging.DEBUG: 

2426 self._log_ro_task( 

2427 None, 

2428 None, 

2429 None, 

2430 "TASK_WF", 

2431 "task_locked_time=" 

2432 + str(self.task_locked_time) 

2433 + " " 

2434 + "time_last_task_processed=" 

2435 + str(self.time_last_task_processed) 

2436 + " " 

2437 + "now=" 

2438 + str(now), 

2439 ) 

2440 """ 

2441 locked = self.db.set_one( 

2442 "ro_tasks", 

2443 q_filter={ 

2444 "target_id": self.vim_targets, 

2445 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"], 

2446 "locked_at.lt": now - self.task_locked_time, 

2447 "to_check_at.lt": self.time_last_task_processed, 

2448 "to_check_at.gt": -1, 

2449 }, 

2450 update_dict={"locked_by": self.my_id, "locked_at": now}, 

2451 fail_on_empty=False, 

2452 ) 

2453 

2454 if locked: 

2455 # read and return 

2456 ro_task = self.db.get_one( 

2457 "ro_tasks", 

2458 q_filter={ 

2459 "target_id": self.vim_targets, 

2460 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"], 

2461 "locked_at": now, 

2462 }, 

2463 ) 

2464 return ro_task 

2465 

2466 if self.time_last_task_processed == now: 

2467 self.time_last_task_processed = None 

2468 return None 

2469 else: 

2470 self.time_last_task_processed = now 

2471 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now) 

2472 

2473 except DbException as e: 

2474 self.logger.error("Database exception at _get_db_task: {}".format(e)) 

2475 except Exception as e: 

2476 self.logger.critical( 

2477 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True 

2478 ) 

2479 

2480 return None 

2481 

2482 def _delete_task(self, ro_task, task_index, task_depends, db_update): 

2483 """ 

2484 Determine if this task need to be done or superseded 

2485 :return: None 

2486 """ 

2487 my_task = ro_task["tasks"][task_index] 

2488 task_id = my_task["task_id"] 

2489 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get( 

2490 "created_items", False 

2491 ) 

2492 

2493 self.logger.debug("Needed delete: {}".format(needed_delete)) 

2494 if my_task["status"] == "FAILED": 

2495 return None, None # TODO need to be retry?? 

2496 

2497 try: 

2498 for index, task in enumerate(ro_task["tasks"]): 

2499 if index == task_index or not task: 

2500 continue # own task 

2501 

2502 if ( 

2503 my_task["target_record"] == task["target_record"] 

2504 and task["action"] == "CREATE" 

2505 ): 

2506 # set to finished 

2507 db_update["tasks.{}.status".format(index)] = task["status"] = ( 

2508 "FINISHED" 

2509 ) 

2510 elif task["action"] == "CREATE" and task["status"] not in ( 

2511 "FINISHED", 

2512 "SUPERSEDED", 

2513 ): 

2514 needed_delete = False 

2515 

2516 if needed_delete: 

2517 self.logger.debug( 

2518 "Deleting ro_task={} task_index={}".format(ro_task, task_index) 

2519 ) 

2520 return self.item2class[my_task["item"]].delete(ro_task, task_index) 

2521 else: 

2522 return "SUPERSEDED", None 

2523 except Exception as e: 

2524 if not isinstance(e, NsWorkerException): 

2525 self.logger.critical( 

2526 "Unexpected exception at _delete_task task={}: {}".format( 

2527 task_id, e 

2528 ), 

2529 exc_info=True, 

2530 ) 

2531 

2532 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e)} 

2533 

2534 def _create_task(self, ro_task, task_index, task_depends, db_update): 

2535 """ 

2536 Determine if this task need to create something at VIM 

2537 :return: None 

2538 """ 

2539 my_task = ro_task["tasks"][task_index] 

2540 task_id = my_task["task_id"] 

2541 

2542 if my_task["status"] == "FAILED": 

2543 return None, None # TODO need to be retry?? 

2544 elif my_task["status"] == "SCHEDULED": 

2545 # check if already created by another task 

2546 for index, task in enumerate(ro_task["tasks"]): 

2547 if index == task_index or not task: 

2548 continue # own task 

2549 

2550 if task["action"] == "CREATE" and task["status"] not in ( 

2551 "SCHEDULED", 

2552 "FINISHED", 

2553 "SUPERSEDED", 

2554 ): 

2555 return task["status"], "COPY_VIM_INFO" 

2556 

2557 try: 

2558 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new( 

2559 ro_task, task_index, task_depends 

2560 ) 

2561 # TODO update other CREATE tasks 

2562 except Exception as e: 

2563 if not isinstance(e, NsWorkerException): 

2564 self.logger.error( 

2565 "Error executing task={}: {}".format(task_id, e), exc_info=True 

2566 ) 

2567 

2568 task_status = "FAILED" 

2569 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_message": str(e)} 

2570 # TODO update ro_vim_item_update 

2571 

2572 return task_status, ro_vim_item_update 

2573 else: 

2574 return None, None 

2575 

2576 def _get_dependency(self, task_id, ro_task=None, target_id=None): 

2577 """ 

2578 Look for dependency task 

2579 :param task_id: Can be one of 

2580 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>" 

2581 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>" 

2582 3. task.task_id: "<action_id>:number" 

2583 :param ro_task: 

2584 :param target_id: 

2585 :return: database ro_task plus index of task 

2586 """ 

2587 if ( 

2588 task_id.startswith("vim:") 

2589 or task_id.startswith("sdn:") 

2590 or task_id.startswith("wim:") 

2591 ): 

2592 target_id, _, task_id = task_id.partition(" ") 

2593 

2594 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"): 

2595 ro_task_dependency = self.db.get_one( 

2596 "ro_tasks", 

2597 q_filter={"target_id": target_id, "tasks.target_record_id": task_id}, 

2598 fail_on_empty=False, 

2599 ) 

2600 

2601 if ro_task_dependency: 

2602 for task_index, task in enumerate(ro_task_dependency["tasks"]): 

2603 if task["target_record_id"] == task_id: 

2604 return ro_task_dependency, task_index 

2605 

2606 else: 

2607 if ro_task: 

2608 for task_index, task in enumerate(ro_task["tasks"]): 

2609 if task and task["task_id"] == task_id: 

2610 return ro_task, task_index 

2611 

2612 ro_task_dependency = self.db.get_one( 

2613 "ro_tasks", 

2614 q_filter={ 

2615 "tasks.ANYINDEX.task_id": task_id, 

2616 "tasks.ANYINDEX.target_record.ne": None, 

2617 }, 

2618 fail_on_empty=False, 

2619 ) 

2620 

2621 self.logger.debug("ro_task_dependency={}".format(ro_task_dependency)) 

2622 if ro_task_dependency: 

2623 for task_index, task in enumerate(ro_task_dependency["tasks"]): 

2624 if task["task_id"] == task_id: 

2625 return ro_task_dependency, task_index 

2626 raise NsWorkerException("Cannot get depending task {}".format(task_id)) 

2627 

2628 def update_vm_refresh(self, ro_task): 

2629 """Enables the VM status updates if self.refresh_config.active parameter 

2630 is not -1 and then updates the DB accordingly 

2631 

2632 """ 

2633 try: 

2634 self.logger.debug("Checking if VM status update config") 

2635 next_refresh = time.time() 

2636 next_refresh = self._get_next_refresh(ro_task, next_refresh) 

2637 

2638 if next_refresh != -1: 

2639 db_ro_task_update = {} 

2640 now = time.time() 

2641 next_check_at = now + (24 * 60 * 60) 

2642 next_check_at = min(next_check_at, next_refresh) 

2643 db_ro_task_update["vim_info.refresh_at"] = next_refresh 

2644 db_ro_task_update["to_check_at"] = next_check_at 

2645 

2646 self.logger.debug( 

2647 "Finding tasks which to be updated to enable VM status updates" 

2648 ) 

2649 refresh_tasks = self.db.get_list( 

2650 "ro_tasks", 

2651 q_filter={ 

2652 "tasks.status": "DONE", 

2653 "to_check_at.lt": 0, 

2654 }, 

2655 ) 

2656 self.logger.debug("Updating tasks to change the to_check_at status") 

2657 for task in refresh_tasks: 

2658 q_filter = { 

2659 "_id": task["_id"], 

2660 } 

2661 self.db.set_one( 

2662 "ro_tasks", 

2663 q_filter=q_filter, 

2664 update_dict=db_ro_task_update, 

2665 fail_on_empty=True, 

2666 ) 

2667 

2668 except Exception as e: 

2669 self.logger.error(f"Error updating tasks to enable VM status updates: {e}") 

2670 

2671 def _get_next_refresh(self, ro_task: dict, next_refresh: float): 

2672 """Decide the next_refresh according to vim type and refresh config period. 

2673 Args: 

2674 ro_task (dict): ro_task details 

2675 next_refresh (float): next refresh time as epoch format 

2676 

2677 Returns: 

2678 next_refresh (float) -1 if vm updates are disabled or vim type is openstack. 

2679 """ 

2680 target_vim = ro_task["target_id"] 

2681 vim_type = self.db_vims[target_vim]["vim_type"] 

2682 if self.refresh_config.active == -1 or vim_type == "openstack": 

2683 next_refresh = -1 

2684 else: 

2685 next_refresh += self.refresh_config.active 

2686 return next_refresh 

2687 

2688 def _process_pending_tasks(self, ro_task): 

2689 ro_task_id = ro_task["_id"] 

2690 now = time.time() 

2691 # one day 

2692 next_check_at = now + (24 * 60 * 60) 

2693 db_ro_task_update = {} 

2694 

2695 def _update_refresh(new_status): 

2696 # compute next_refresh 

2697 nonlocal task 

2698 nonlocal next_check_at 

2699 nonlocal db_ro_task_update 

2700 nonlocal ro_task 

2701 

2702 next_refresh = time.time() 

2703 

2704 if task["item"] in ("image", "flavor"): 

2705 next_refresh += self.refresh_config.image 

2706 elif new_status == "BUILD": 

2707 next_refresh += self.refresh_config.build 

2708 elif new_status == "DONE": 

2709 next_refresh = self._get_next_refresh(ro_task, next_refresh) 

2710 else: 

2711 next_refresh += self.refresh_config.error 

2712 

2713 next_check_at = min(next_check_at, next_refresh) 

2714 db_ro_task_update["vim_info.refresh_at"] = next_refresh 

2715 ro_task["vim_info"]["refresh_at"] = next_refresh 

2716 

2717 try: 

2718 """ 

2719 # Log RO tasks only when loglevel is DEBUG 

2720 if self.logger.getEffectiveLevel() == logging.DEBUG: 

2721 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK") 

2722 """ 

2723 # Check if vim status refresh is enabled again 

2724 self.update_vm_refresh(ro_task) 

2725 # 0: get task_status_create 

2726 lock_object = None 

2727 task_status_create = None 

2728 task_create = next( 

2729 ( 

2730 t 

2731 for t in ro_task["tasks"] 

2732 if t 

2733 and t["action"] == "CREATE" 

2734 and t["status"] in ("BUILD", "DONE") 

2735 ), 

2736 None, 

2737 ) 

2738 

2739 if task_create: 

2740 task_status_create = task_create["status"] 

2741 

2742 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD 

2743 for task_action in ("DELETE", "CREATE", "EXEC"): 

2744 db_vim_update = None 

2745 new_status = None 

2746 

2747 for task_index, task in enumerate(ro_task["tasks"]): 

2748 if not task: 

2749 continue # task deleted 

2750 

2751 task_depends = {} 

2752 target_update = None 

2753 

2754 if ( 

2755 ( 

2756 task_action in ("DELETE", "EXEC") 

2757 and task["status"] not in ("SCHEDULED", "BUILD") 

2758 ) 

2759 or task["action"] != task_action 

2760 or ( 

2761 task_action == "CREATE" 

2762 and task["status"] in ("FINISHED", "SUPERSEDED") 

2763 ) 

2764 ): 

2765 continue 

2766 

2767 task_path = "tasks.{}.status".format(task_index) 

2768 try: 

2769 db_vim_info_update = None 

2770 dependency_ro_task = {} 

2771 

2772 if task["status"] == "SCHEDULED": 

2773 # check if tasks that this depends on have been completed 

2774 dependency_not_completed = False 

2775 

2776 for dependency_task_id in task.get("depends_on") or (): 

2777 ( 

2778 dependency_ro_task, 

2779 dependency_task_index, 

2780 ) = self._get_dependency( 

2781 dependency_task_id, target_id=ro_task["target_id"] 

2782 ) 

2783 dependency_task = dependency_ro_task["tasks"][ 

2784 dependency_task_index 

2785 ] 

2786 self.logger.debug( 

2787 "dependency_ro_task={} dependency_task_index={}".format( 

2788 dependency_ro_task, dependency_task_index 

2789 ) 

2790 ) 

2791 

2792 if dependency_task["status"] == "SCHEDULED": 

2793 dependency_not_completed = True 

2794 next_check_at = min( 

2795 next_check_at, dependency_ro_task["to_check_at"] 

2796 ) 

2797 # must allow dependent task to be processed first 

2798 # to do this set time after last_task_processed 

2799 next_check_at = max( 

2800 self.time_last_task_processed, next_check_at 

2801 ) 

2802 break 

2803 elif dependency_task["status"] == "FAILED": 

2804 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format( 

2805 task["action"], 

2806 task["item"], 

2807 dependency_task["action"], 

2808 dependency_task["item"], 

2809 dependency_task_id, 

2810 dependency_ro_task["vim_info"].get( 

2811 "vim_message" 

2812 ), 

2813 ) 

2814 self.logger.error( 

2815 "task={} {}".format(task["task_id"], error_text) 

2816 ) 

2817 raise NsWorkerException(error_text) 

2818 

2819 task_depends[dependency_task_id] = dependency_ro_task[ 

2820 "vim_info" 

2821 ]["vim_id"] 

2822 task_depends["TASK-{}".format(dependency_task_id)] = ( 

2823 dependency_ro_task["vim_info"]["vim_id"] 

2824 ) 

2825 

2826 if dependency_not_completed: 

2827 self.logger.warning( 

2828 "DEPENDENCY NOT COMPLETED {}".format( 

2829 dependency_ro_task["vim_info"]["vim_id"] 

2830 ) 

2831 ) 

2832 # TODO set at vim_info.vim_details that it is waiting 

2833 continue 

2834 

2835 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew 

2836 # the task of renew this locking. It will update database locket_at periodically 

2837 if not lock_object: 

2838 lock_object = LockRenew.add_lock_object( 

2839 "ro_tasks", ro_task, self 

2840 ) 

2841 if task["action"] == "DELETE": 

2842 ( 

2843 new_status, 

2844 db_vim_info_update, 

2845 ) = self._delete_task( 

2846 ro_task, task_index, task_depends, db_ro_task_update 

2847 ) 

2848 new_status = ( 

2849 "FINISHED" if new_status == "DONE" else new_status 

2850 ) 

2851 # ^with FINISHED instead of DONE it will not be refreshing 

2852 

2853 if new_status in ("FINISHED", "SUPERSEDED"): 

2854 target_update = "DELETE" 

2855 elif task["action"] == "EXEC": 

2856 ( 

2857 new_status, 

2858 db_vim_info_update, 

2859 db_task_update, 

2860 ) = self.item2class[task["item"]].exec( 

2861 ro_task, task_index, task_depends 

2862 ) 

2863 new_status = ( 

2864 "FINISHED" if new_status == "DONE" else new_status 

2865 ) 

2866 # ^with FINISHED instead of DONE it will not be refreshing 

2867 

2868 if db_task_update: 

2869 # load into database the modified db_task_update "retries" and "next_retry" 

2870 if db_task_update.get("retries"): 

2871 db_ro_task_update[ 

2872 "tasks.{}.retries".format(task_index) 

2873 ] = db_task_update["retries"] 

2874 

2875 next_check_at = time.time() + db_task_update.get( 

2876 "next_retry", 60 

2877 ) 

2878 target_update = None 

2879 elif task["action"] == "CREATE": 

2880 if task["status"] == "SCHEDULED": 

2881 if task_status_create: 

2882 new_status = task_status_create 

2883 target_update = "COPY_VIM_INFO" 

2884 else: 

2885 new_status, db_vim_info_update = self.item2class[ 

2886 task["item"] 

2887 ].new(ro_task, task_index, task_depends) 

2888 _update_refresh(new_status) 

2889 else: 

2890 refresh_at = ro_task["vim_info"]["refresh_at"] 

2891 if refresh_at and refresh_at != -1 and now > refresh_at: 

2892 ( 

2893 new_status, 

2894 db_vim_info_update, 

2895 ) = self.item2class[ 

2896 task["item"] 

2897 ].refresh(ro_task) 

2898 _update_refresh(new_status) 

2899 else: 

2900 # The refresh is updated to avoid set the value of "refresh_at" to 

2901 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD, 

2902 # because it can happen that in this case the task is never processed 

2903 _update_refresh(task["status"]) 

2904 

2905 except Exception as e: 

2906 new_status = "FAILED" 

2907 db_vim_info_update = { 

2908 "vim_status": "VIM_ERROR", 

2909 "vim_message": str(e), 

2910 } 

2911 

2912 if not isinstance( 

2913 e, (NsWorkerException, vimconn.VimConnException) 

2914 ): 

2915 self.logger.error( 

2916 "Unexpected exception at _delete_task task={}: {}".format( 

2917 task["task_id"], e 

2918 ), 

2919 exc_info=True, 

2920 ) 

2921 

2922 try: 

2923 if db_vim_info_update: 

2924 db_vim_update = db_vim_info_update.copy() 

2925 db_ro_task_update.update( 

2926 { 

2927 "vim_info." + k: v 

2928 for k, v in db_vim_info_update.items() 

2929 } 

2930 ) 

2931 ro_task["vim_info"].update(db_vim_info_update) 

2932 

2933 if new_status: 

2934 if task_action == "CREATE": 

2935 task_status_create = new_status 

2936 db_ro_task_update[task_path] = new_status 

2937 

2938 if target_update or db_vim_update: 

2939 if target_update == "DELETE": 

2940 self._update_target(task, None) 

2941 elif target_update == "COPY_VIM_INFO": 

2942 self._update_target(task, ro_task["vim_info"]) 

2943 else: 

2944 self._update_target(task, db_vim_update) 

2945 

2946 except Exception as e: 

2947 if ( 

2948 isinstance(e, DbException) 

2949 and e.http_code == HTTPStatus.NOT_FOUND 

2950 ): 

2951 # if the vnfrs or nsrs has been removed from database, this task must be removed 

2952 self.logger.debug( 

2953 "marking to delete task={}".format(task["task_id"]) 

2954 ) 

2955 self.tasks_to_delete.append(task) 

2956 else: 

2957 self.logger.error( 

2958 "Unexpected exception at _update_target task={}: {}".format( 

2959 task["task_id"], e 

2960 ), 

2961 exc_info=True, 

2962 ) 

2963 

2964 locked_at = ro_task["locked_at"] 

2965 

2966 if lock_object: 

2967 locked_at = [ 

2968 lock_object["locked_at"], 

2969 lock_object["locked_at"] + self.task_locked_time, 

2970 ] 

2971 # locked_at contains two times to avoid race condition. In case the lock has been renewed, it will 

2972 # contain exactly locked_at + self.task_locked_time 

2973 LockRenew.remove_lock_object(lock_object) 

2974 

2975 q_filter = { 

2976 "_id": ro_task["_id"], 

2977 "to_check_at": ro_task["to_check_at"], 

2978 "locked_at": locked_at, 

2979 } 

2980 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified, 

2981 # outside this task (by ro_nbi) do not update it 

2982 db_ro_task_update["locked_by"] = None 

2983 # locked_at converted to int only for debugging. When it is not decimals it means it has been unlocked 

2984 db_ro_task_update["locked_at"] = int(now - self.task_locked_time) 

2985 db_ro_task_update["modified_at"] = now 

2986 db_ro_task_update["to_check_at"] = next_check_at 

2987 

2988 """ 

2989 # Log RO tasks only when loglevel is DEBUG 

2990 if self.logger.getEffectiveLevel() == logging.DEBUG: 

2991 db_ro_task_update_log = db_ro_task_update.copy() 

2992 db_ro_task_update_log["_id"] = q_filter["_id"] 

2993 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK") 

2994 """ 

2995 

2996 if not self.db.set_one( 

2997 "ro_tasks", 

2998 update_dict=db_ro_task_update, 

2999 q_filter=q_filter, 

3000 fail_on_empty=False, 

3001 ): 

3002 del db_ro_task_update["to_check_at"] 

3003 del q_filter["to_check_at"] 

3004 """ 

3005 # Log RO tasks only when loglevel is DEBUG 

3006 if self.logger.getEffectiveLevel() == logging.DEBUG: 

3007 self._log_ro_task( 

3008 None, 

3009 db_ro_task_update_log, 

3010 None, 

3011 "TASK_WF", 

3012 "SET_TASK " + str(q_filter), 

3013 ) 

3014 """ 

3015 self.db.set_one( 

3016 "ro_tasks", 

3017 q_filter=q_filter, 

3018 update_dict=db_ro_task_update, 

3019 fail_on_empty=True, 

3020 ) 

3021 except DbException as e: 

3022 self.logger.error( 

3023 "ro_task={} Error updating database {}".format(ro_task_id, e) 

3024 ) 

3025 except Exception as e: 

3026 self.logger.error( 

3027 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True 

3028 ) 

3029 

3030 def _update_target(self, task, ro_vim_item_update): 

3031 table, _, temp = task["target_record"].partition(":") 

3032 _id, _, path_vim_status = temp.partition(":") 

3033 path_item = path_vim_status[: path_vim_status.rfind(".")] 

3034 path_item = path_item[: path_item.rfind(".")] 

3035 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id" 

3036 # path_item: dot separated list targeting record information, e.g. "vdur.10" 

3037 

3038 if ro_vim_item_update: 

3039 update_dict = { 

3040 path_vim_status + "." + k: v 

3041 for k, v in ro_vim_item_update.items() 

3042 if k 

3043 in ( 

3044 "vim_id", 

3045 "vim_details", 

3046 "vim_message", 

3047 "vim_name", 

3048 "vim_status", 

3049 "interfaces", 

3050 "interfaces_backup", 

3051 ) 

3052 } 

3053 

3054 if path_vim_status.startswith("vdur."): 

3055 # for backward compatibility, add vdur.name apart from vdur.vim_name 

3056 if ro_vim_item_update.get("vim_name"): 

3057 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"] 

3058 

3059 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id 

3060 if ro_vim_item_update.get("vim_id"): 

3061 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"] 

3062 

3063 # update general status 

3064 if ro_vim_item_update.get("vim_status"): 

3065 update_dict[path_item + ".status"] = ro_vim_item_update[ 

3066 "vim_status" 

3067 ] 

3068 

3069 if ro_vim_item_update.get("interfaces"): 

3070 path_interfaces = path_item + ".interfaces" 

3071 

3072 for i, iface in enumerate(ro_vim_item_update.get("interfaces")): 

3073 if iface: 

3074 update_dict.update( 

3075 { 

3076 path_interfaces + ".{}.".format(i) + k: v 

3077 for k, v in iface.items() 

3078 if k in ("vlan", "compute_node", "pci") 

3079 } 

3080 ) 

3081 

3082 # put ip_address and mac_address with ip-address and mac-address 

3083 if iface.get("ip_address"): 

3084 update_dict[ 

3085 path_interfaces + ".{}.".format(i) + "ip-address" 

3086 ] = iface["ip_address"] 

3087 

3088 if iface.get("mac_address"): 

3089 update_dict[ 

3090 path_interfaces + ".{}.".format(i) + "mac-address" 

3091 ] = iface["mac_address"] 

3092 

3093 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"): 

3094 update_dict["ip-address"] = iface.get("ip_address").split( 

3095 ";" 

3096 )[0] 

3097 

3098 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"): 

3099 update_dict[path_item + ".ip-address"] = iface.get( 

3100 "ip_address" 

3101 ).split(";")[0] 

3102 

3103 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict) 

3104 

3105 # If interfaces exists, it backups VDU interfaces in the DB for healing operations 

3106 if ro_vim_item_update.get("interfaces"): 

3107 search_key = path_vim_status + ".interfaces" 

3108 if update_dict.get(search_key): 

3109 interfaces_backup_update = { 

3110 path_vim_status + ".interfaces_backup": update_dict[search_key] 

3111 } 

3112 

3113 self.db.set_one( 

3114 table, 

3115 q_filter={"_id": _id}, 

3116 update_dict=interfaces_backup_update, 

3117 ) 

3118 

3119 else: 

3120 update_dict = {path_item + ".status": "DELETED"} 

3121 self.db.set_one( 

3122 table, 

3123 q_filter={"_id": _id}, 

3124 update_dict=update_dict, 

3125 unset={path_vim_status: None}, 

3126 ) 

3127 

3128 def _process_delete_db_tasks(self): 

3129 """ 

3130 Delete task from database because vnfrs or nsrs or both have been deleted 

3131 :return: None. Uses and modify self.tasks_to_delete 

3132 """ 

3133 while self.tasks_to_delete: 

3134 task = self.tasks_to_delete[0] 

3135 vnfrs_deleted = None 

3136 nsr_id = task["nsr_id"] 

3137 

3138 if task["target_record"].startswith("vnfrs:"): 

3139 # check if nsrs is present 

3140 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False): 

3141 vnfrs_deleted = task["target_record"].split(":")[1] 

3142 

3143 try: 

3144 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted) 

3145 except Exception as e: 

3146 self.logger.error( 

3147 "Error deleting task={}: {}".format(task["task_id"], e) 

3148 ) 

3149 self.tasks_to_delete.pop(0) 

3150 

3151 @staticmethod 

3152 def delete_db_tasks(db, nsr_id, vnfrs_deleted): 

3153 """ 

3154 Static method because it is called from osm_ng_ro.ns 

3155 :param db: instance of database to use 

3156 :param nsr_id: affected nsrs id 

3157 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id 

3158 :return: None, exception is fails 

3159 """ 

3160 retries = 5 

3161 for retry in range(retries): 

3162 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id}) 

3163 now = time.time() 

3164 conflict = False 

3165 

3166 for ro_task in ro_tasks: 

3167 db_update = {} 

3168 to_delete_ro_task = True 

3169 

3170 for index, task in enumerate(ro_task["tasks"]): 

3171 if not task: 

3172 pass 

3173 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or ( 

3174 vnfrs_deleted 

3175 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted) 

3176 ): 

3177 db_update["tasks.{}".format(index)] = None 

3178 else: 

3179 # used by other nsr, ro_task cannot be deleted 

3180 to_delete_ro_task = False 

3181 

3182 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed 

3183 if to_delete_ro_task: 

3184 if not db.del_one( 

3185 "ro_tasks", 

3186 q_filter={ 

3187 "_id": ro_task["_id"], 

3188 "modified_at": ro_task["modified_at"], 

3189 }, 

3190 fail_on_empty=False, 

3191 ): 

3192 conflict = True 

3193 elif db_update: 

3194 db_update["modified_at"] = now 

3195 if not db.set_one( 

3196 "ro_tasks", 

3197 q_filter={ 

3198 "_id": ro_task["_id"], 

3199 "modified_at": ro_task["modified_at"], 

3200 }, 

3201 update_dict=db_update, 

3202 fail_on_empty=False, 

3203 ): 

3204 conflict = True 

3205 if not conflict: 

3206 return 

3207 else: 

3208 raise NsWorkerException("Exceeded {} retries".format(retries)) 

3209 

3210 def run(self): 

3211 # load database 

3212 self.logger.info("Starting") 

3213 while True: 

3214 # step 1: get commands from queue 

3215 try: 

3216 if self.vim_targets: 

3217 task = self.task_queue.get(block=False) 

3218 else: 

3219 if not self.idle: 

3220 self.logger.debug("enters in idle state") 

3221 self.idle = True 

3222 task = self.task_queue.get(block=True) 

3223 self.idle = False 

3224 

3225 if task[0] == "terminate": 

3226 break 

3227 elif task[0] == "load_vim": 

3228 self.logger.info("order to load vim {}".format(task[1])) 

3229 self._load_vim(task[1]) 

3230 elif task[0] == "unload_vim": 

3231 self.logger.info("order to unload vim {}".format(task[1])) 

3232 self._unload_vim(task[1]) 

3233 elif task[0] == "reload_vim": 

3234 self._reload_vim(task[1]) 

3235 elif task[0] == "check_vim": 

3236 self.logger.info("order to check vim {}".format(task[1])) 

3237 self._check_vim(task[1]) 

3238 continue 

3239 except Exception as e: 

3240 if isinstance(e, queue.Empty): 

3241 pass 

3242 else: 

3243 self.logger.critical( 

3244 "Error processing task: {}".format(e), exc_info=True 

3245 ) 

3246 

3247 # step 2: process pending_tasks, delete not needed tasks 

3248 try: 

3249 if self.tasks_to_delete: 

3250 self._process_delete_db_tasks() 

3251 busy = False 

3252 """ 

3253 # Log RO tasks only when loglevel is DEBUG 

3254 if self.logger.getEffectiveLevel() == logging.DEBUG: 

3255 _ = self._get_db_all_tasks() 

3256 """ 

3257 ro_task = self._get_db_task() 

3258 if ro_task: 

3259 self.logger.debug("Task to process: {}".format(ro_task)) 

3260 time.sleep(1) 

3261 self._process_pending_tasks(ro_task) 

3262 busy = True 

3263 if not busy: 

3264 time.sleep(5) 

3265 except Exception as e: 

3266 self.logger.critical( 

3267 "Unexpected exception at run: " + str(e), exc_info=True 

3268 ) 

3269 

3270 self.logger.info("Finishing")