Coverage for NG-RO/osm_ng_ro/ns_thread.py: 35%

1252 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2024-06-28 09:51 +0000

1# -*- coding: utf-8 -*- 

2 

3## 

4# Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U. 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); you may 

7# not use this file except in compliance with the License. You may obtain 

8# a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

15# License for the specific language governing permissions and limitations 

16# under the License. 

17# 

18## 

19 

20""" 

21This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM. 

22The tasks are stored at database in table ro_tasks 

23A single ro_task refers to a VIM element (flavor, image, network, ...). 

24A ro_task can contain several 'tasks', each one with a target, where to store the results 

25""" 

26 

27from copy import deepcopy 

28from http import HTTPStatus 

29import logging 

30from os import makedirs 

31from os import path 

32import queue 

33import threading 

34import time 

35import traceback 

36from typing import Dict 

37from unittest.mock import Mock 

38 

39from importlib_metadata import entry_points 

40from osm_common.dbbase import DbException 

41from osm_ng_ro.vim_admin import LockRenew 

42from osm_ro_plugin import sdnconn 

43from osm_ro_plugin import vimconn 

44from osm_ro_plugin.sdn_dummy import SdnDummyConnector 

45from osm_ro_plugin.vim_dummy import VimDummyConnector 

46import yaml 

47 

48__author__ = "Alfonso Tierno" 

49__date__ = "$28-Sep-2017 12:07:15$" 

50 

51 

52def deep_get(target_dict, *args, **kwargs): 

53 """ 

54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None 

55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None 

56 :param target_dict: dictionary to be read 

57 :param args: list of keys to read from target_dict 

58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary 

59 :return: The wanted value if exists, None or default otherwise 

60 """ 

61 for key in args: 

62 if not isinstance(target_dict, dict) or key not in target_dict: 

63 return kwargs.get("default") 

64 target_dict = target_dict[key] 

65 return target_dict 

66 

67 

68class NsWorkerException(Exception): 

69 pass 

70 

71 

72class FailingConnector: 

73 def __init__(self, error_msg): 

74 self.error_msg = error_msg 

75 

76 for method in dir(vimconn.VimConnector): 

77 if method[0] != "_": 

78 setattr( 

79 self, method, Mock(side_effect=vimconn.VimConnException(error_msg)) 

80 ) 

81 

82 for method in dir(sdnconn.SdnConnectorBase): 

83 if method[0] != "_": 

84 setattr( 

85 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg)) 

86 ) 

87 

88 

89class NsWorkerExceptionNotFound(NsWorkerException): 

90 pass 

91 

92 

93class VimInteractionBase: 

94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ... 

95 It implements methods that does nothing and return ok""" 

96 

97 def __init__(self, db, my_vims, db_vims, logger): 

98 self.db = db 

99 self.logger = logger 

100 self.my_vims = my_vims 

101 self.db_vims = db_vims 

102 

103 def new(self, ro_task, task_index, task_depends): 

104 return "BUILD", {} 

105 

106 def refresh(self, ro_task): 

107 """skip calling VIM to get image, flavor status. Assumes ok""" 

108 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR": 

109 return "FAILED", {} 

110 

111 return "DONE", {} 

112 

113 def delete(self, ro_task, task_index): 

114 """skip calling VIM to delete image. Assumes ok""" 

115 return "DONE", {} 

116 

117 def exec(self, ro_task, task_index, task_depends): 

118 return "DONE", None, None 

119 

120 

121class VimInteractionNet(VimInteractionBase): 

122 def new(self, ro_task, task_index, task_depends): 

123 vim_net_id = None 

124 task = ro_task["tasks"][task_index] 

125 task_id = task["task_id"] 

126 created = False 

127 created_items = {} 

128 target_vim = self.my_vims[ro_task["target_id"]] 

129 mgmtnet = False 

130 mgmtnet_defined_in_vim = False 

131 

132 try: 

133 # FIND 

134 if task.get("find_params"): 

135 # if management, get configuration of VIM 

136 if task["find_params"].get("filter_dict"): 

137 vim_filter = task["find_params"]["filter_dict"] 

138 # management network 

139 elif task["find_params"].get("mgmt"): 

140 mgmtnet = True 

141 if deep_get( 

142 self.db_vims[ro_task["target_id"]], 

143 "config", 

144 "management_network_id", 

145 ): 

146 mgmtnet_defined_in_vim = True 

147 vim_filter = { 

148 "id": self.db_vims[ro_task["target_id"]]["config"][ 

149 "management_network_id" 

150 ] 

151 } 

152 elif deep_get( 

153 self.db_vims[ro_task["target_id"]], 

154 "config", 

155 "management_network_name", 

156 ): 

157 mgmtnet_defined_in_vim = True 

158 vim_filter = { 

159 "name": self.db_vims[ro_task["target_id"]]["config"][ 

160 "management_network_name" 

161 ] 

162 } 

163 else: 

164 vim_filter = {"name": task["find_params"]["name"]} 

165 else: 

166 raise NsWorkerExceptionNotFound( 

167 "Invalid find_params for new_net {}".format(task["find_params"]) 

168 ) 

169 

170 vim_nets = target_vim.get_network_list(vim_filter) 

171 if not vim_nets and not task.get("params"): 

172 # If there is mgmt-network in the descriptor, 

173 # there is no mapping of that network to a VIM network in the descriptor, 

174 # also there is no mapping in the "--config" parameter or at VIM creation; 

175 # that mgmt-network will be created. 

176 if mgmtnet and not mgmtnet_defined_in_vim: 

177 net_name = ( 

178 vim_filter.get("name") 

179 if vim_filter.get("name") 

180 else vim_filter.get("id")[:16] 

181 ) 

182 vim_net_id, created_items = target_vim.new_network( 

183 net_name, None 

184 ) 

185 self.logger.debug( 

186 "Created mgmt network vim_net_id: {}".format(vim_net_id) 

187 ) 

188 created = True 

189 else: 

190 raise NsWorkerExceptionNotFound( 

191 "Network not found with this criteria: '{}'".format( 

192 task.get("find_params") 

193 ) 

194 ) 

195 elif len(vim_nets) > 1: 

196 raise NsWorkerException( 

197 "More than one network found with this criteria: '{}'".format( 

198 task["find_params"] 

199 ) 

200 ) 

201 

202 if vim_nets: 

203 vim_net_id = vim_nets[0]["id"] 

204 else: 

205 # CREATE 

206 params = task["params"] 

207 vim_net_id, created_items = target_vim.new_network(**params) 

208 created = True 

209 

210 ro_vim_item_update = { 

211 "vim_id": vim_net_id, 

212 "vim_status": "BUILD", 

213 "created": created, 

214 "created_items": created_items, 

215 "vim_details": None, 

216 "vim_message": None, 

217 } 

218 self.logger.debug( 

219 "task={} {} new-net={} created={}".format( 

220 task_id, ro_task["target_id"], vim_net_id, created 

221 ) 

222 ) 

223 

224 return "BUILD", ro_vim_item_update 

225 except (vimconn.VimConnException, NsWorkerException) as e: 

226 self.logger.error( 

227 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e) 

228 ) 

229 ro_vim_item_update = { 

230 "vim_status": "VIM_ERROR", 

231 "created": created, 

232 "vim_message": str(e), 

233 } 

234 

235 return "FAILED", ro_vim_item_update 

236 

237 def refresh(self, ro_task): 

238 """Call VIM to get network status""" 

239 ro_task_id = ro_task["_id"] 

240 target_vim = self.my_vims[ro_task["target_id"]] 

241 vim_id = ro_task["vim_info"]["vim_id"] 

242 net_to_refresh_list = [vim_id] 

243 

244 try: 

245 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list) 

246 vim_info = vim_dict[vim_id] 

247 

248 if vim_info["status"] == "ACTIVE": 

249 task_status = "DONE" 

250 elif vim_info["status"] == "BUILD": 

251 task_status = "BUILD" 

252 else: 

253 task_status = "FAILED" 

254 except vimconn.VimConnException as e: 

255 # Mark all tasks at VIM_ERROR status 

256 self.logger.error( 

257 "ro_task={} vim={} get-net={}: {}".format( 

258 ro_task_id, ro_task["target_id"], vim_id, e 

259 ) 

260 ) 

261 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)} 

262 task_status = "FAILED" 

263 

264 ro_vim_item_update = {} 

265 if ro_task["vim_info"]["vim_status"] != vim_info["status"]: 

266 ro_vim_item_update["vim_status"] = vim_info["status"] 

267 

268 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"): 

269 ro_vim_item_update["vim_name"] = vim_info.get("name") 

270 

271 if vim_info["status"] in ("ERROR", "VIM_ERROR"): 

272 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"): 

273 ro_vim_item_update["vim_message"] = vim_info.get("error_msg") 

274 elif vim_info["status"] == "DELETED": 

275 ro_vim_item_update["vim_id"] = None 

276 ro_vim_item_update["vim_message"] = "Deleted externally" 

277 else: 

278 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]: 

279 ro_vim_item_update["vim_details"] = vim_info["vim_info"] 

280 

281 if ro_vim_item_update: 

282 self.logger.debug( 

283 "ro_task={} {} get-net={}: status={} {}".format( 

284 ro_task_id, 

285 ro_task["target_id"], 

286 vim_id, 

287 ro_vim_item_update.get("vim_status"), 

288 ro_vim_item_update.get("vim_message") 

289 if ro_vim_item_update.get("vim_status") != "ACTIVE" 

290 else "", 

291 ) 

292 ) 

293 

294 return task_status, ro_vim_item_update 

295 

296 def delete(self, ro_task, task_index): 

297 task = ro_task["tasks"][task_index] 

298 task_id = task["task_id"] 

299 net_vim_id = ro_task["vim_info"]["vim_id"] 

300 ro_vim_item_update_ok = { 

301 "vim_status": "DELETED", 

302 "created": False, 

303 "vim_message": "DELETED", 

304 "vim_id": None, 

305 } 

306 

307 try: 

308 if net_vim_id or ro_task["vim_info"]["created_items"]: 

309 target_vim = self.my_vims[ro_task["target_id"]] 

310 target_vim.delete_network( 

311 net_vim_id, ro_task["vim_info"]["created_items"] 

312 ) 

313 except vimconn.VimConnNotFoundException: 

314 ro_vim_item_update_ok["vim_message"] = "already deleted" 

315 except vimconn.VimConnException as e: 

316 self.logger.error( 

317 "ro_task={} vim={} del-net={}: {}".format( 

318 ro_task["_id"], ro_task["target_id"], net_vim_id, e 

319 ) 

320 ) 

321 ro_vim_item_update = { 

322 "vim_status": "VIM_ERROR", 

323 "vim_message": "Error while deleting: {}".format(e), 

324 } 

325 

326 return "FAILED", ro_vim_item_update 

327 

328 self.logger.debug( 

329 "task={} {} del-net={} {}".format( 

330 task_id, 

331 ro_task["target_id"], 

332 net_vim_id, 

333 ro_vim_item_update_ok.get("vim_message", ""), 

334 ) 

335 ) 

336 

337 return "DONE", ro_vim_item_update_ok 

338 

339 

340class VimInteractionVdu(VimInteractionBase): 

341 max_retries_inject_ssh_key = 20 # 20 times 

342 time_retries_inject_ssh_key = 30 # wevery 30 seconds 

343 

344 def new(self, ro_task, task_index, task_depends): 

345 task = ro_task["tasks"][task_index] 

346 task_id = task["task_id"] 

347 created = False 

348 target_vim = self.my_vims[ro_task["target_id"]] 

349 try: 

350 created = True 

351 params = task["params"] 

352 params_copy = deepcopy(params) 

353 net_list = params_copy["net_list"] 

354 

355 for net in net_list: 

356 # change task_id into network_id 

357 if "net_id" in net and net["net_id"].startswith("TASK-"): 

358 network_id = task_depends[net["net_id"]] 

359 

360 if not network_id: 

361 raise NsWorkerException( 

362 "Cannot create VM because depends on a network not created or found " 

363 "for {}".format(net["net_id"]) 

364 ) 

365 

366 net["net_id"] = network_id 

367 

368 if params_copy["image_id"].startswith("TASK-"): 

369 params_copy["image_id"] = task_depends[params_copy["image_id"]] 

370 

371 if params_copy["flavor_id"].startswith("TASK-"): 

372 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]] 

373 

374 affinity_group_list = params_copy["affinity_group_list"] 

375 for affinity_group in affinity_group_list: 

376 # change task_id into affinity_group_id 

377 if "affinity_group_id" in affinity_group and affinity_group[ 

378 "affinity_group_id" 

379 ].startswith("TASK-"): 

380 affinity_group_id = task_depends[ 

381 affinity_group["affinity_group_id"] 

382 ] 

383 

384 if not affinity_group_id: 

385 raise NsWorkerException( 

386 "found for {}".format(affinity_group["affinity_group_id"]) 

387 ) 

388 

389 affinity_group["affinity_group_id"] = affinity_group_id 

390 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy) 

391 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]] 

392 

393 # add to created items previous_created_volumes (healing) 

394 if task.get("previous_created_volumes"): 

395 for k, v in task["previous_created_volumes"].items(): 

396 created_items[k] = v 

397 

398 ro_vim_item_update = { 

399 "vim_id": vim_vm_id, 

400 "vim_status": "BUILD", 

401 "created": created, 

402 "created_items": created_items, 

403 "vim_details": None, 

404 "vim_message": None, 

405 "interfaces_vim_ids": interfaces, 

406 "interfaces": [], 

407 "interfaces_backup": [], 

408 } 

409 self.logger.debug( 

410 "task={} {} new-vm={} created={}".format( 

411 task_id, ro_task["target_id"], vim_vm_id, created 

412 ) 

413 ) 

414 

415 return "BUILD", ro_vim_item_update 

416 except (vimconn.VimConnException, NsWorkerException) as e: 

417 self.logger.debug(traceback.format_exc()) 

418 self.logger.error( 

419 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e) 

420 ) 

421 ro_vim_item_update = { 

422 "vim_status": "VIM_ERROR", 

423 "created": created, 

424 "vim_message": str(e), 

425 } 

426 

427 return "FAILED", ro_vim_item_update 

428 

429 def delete(self, ro_task, task_index): 

430 task = ro_task["tasks"][task_index] 

431 task_id = task["task_id"] 

432 vm_vim_id = ro_task["vim_info"]["vim_id"] 

433 ro_vim_item_update_ok = { 

434 "vim_status": "DELETED", 

435 "created": False, 

436 "vim_message": "DELETED", 

437 "vim_id": None, 

438 } 

439 

440 try: 

441 self.logger.debug( 

442 "delete_vminstance: vm_vim_id={} created_items={}".format( 

443 vm_vim_id, ro_task["vim_info"]["created_items"] 

444 ) 

445 ) 

446 if vm_vim_id or ro_task["vim_info"]["created_items"]: 

447 target_vim = self.my_vims[ro_task["target_id"]] 

448 target_vim.delete_vminstance( 

449 vm_vim_id, 

450 ro_task["vim_info"]["created_items"], 

451 ro_task["vim_info"].get("volumes_to_hold", []), 

452 ) 

453 except vimconn.VimConnNotFoundException: 

454 ro_vim_item_update_ok["vim_message"] = "already deleted" 

455 except vimconn.VimConnException as e: 

456 self.logger.error( 

457 "ro_task={} vim={} del-vm={}: {}".format( 

458 ro_task["_id"], ro_task["target_id"], vm_vim_id, e 

459 ) 

460 ) 

461 ro_vim_item_update = { 

462 "vim_status": "VIM_ERROR", 

463 "vim_message": "Error while deleting: {}".format(e), 

464 } 

465 

466 return "FAILED", ro_vim_item_update 

467 

468 self.logger.debug( 

469 "task={} {} del-vm={} {}".format( 

470 task_id, 

471 ro_task["target_id"], 

472 vm_vim_id, 

473 ro_vim_item_update_ok.get("vim_message", ""), 

474 ) 

475 ) 

476 

477 return "DONE", ro_vim_item_update_ok 

478 

479 def refresh(self, ro_task): 

480 """Call VIM to get vm status""" 

481 ro_task_id = ro_task["_id"] 

482 target_vim = self.my_vims[ro_task["target_id"]] 

483 vim_id = ro_task["vim_info"]["vim_id"] 

484 

485 if not vim_id: 

486 return None, None 

487 

488 vm_to_refresh_list = [vim_id] 

489 try: 

490 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list) 

491 vim_info = vim_dict[vim_id] 

492 

493 if vim_info["status"] == "ACTIVE": 

494 task_status = "DONE" 

495 elif vim_info["status"] == "BUILD": 

496 task_status = "BUILD" 

497 else: 

498 task_status = "FAILED" 

499 

500 # try to load and parse vim_information 

501 try: 

502 vim_info_info = yaml.safe_load(vim_info["vim_info"]) 

503 if vim_info_info.get("name"): 

504 vim_info["name"] = vim_info_info["name"] 

505 except Exception as vim_info_error: 

506 self.logger.exception( 

507 f"{vim_info_error} occured while getting the vim_info from yaml" 

508 ) 

509 except vimconn.VimConnException as e: 

510 # Mark all tasks at VIM_ERROR status 

511 self.logger.error( 

512 "ro_task={} vim={} get-vm={}: {}".format( 

513 ro_task_id, ro_task["target_id"], vim_id, e 

514 ) 

515 ) 

516 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)} 

517 task_status = "FAILED" 

518 

519 ro_vim_item_update = {} 

520 

521 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED 

522 vim_interfaces = [] 

523 if vim_info.get("interfaces"): 

524 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]: 

525 iface = next( 

526 ( 

527 iface 

528 for iface in vim_info["interfaces"] 

529 if vim_iface_id == iface["vim_interface_id"] 

530 ), 

531 None, 

532 ) 

533 # if iface: 

534 # iface.pop("vim_info", None) 

535 vim_interfaces.append(iface) 

536 

537 task_create = next( 

538 t 

539 for t in ro_task["tasks"] 

540 if t and t["action"] == "CREATE" and t["status"] != "FINISHED" 

541 ) 

542 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None: 

543 vim_interfaces[task_create["mgmt_vnf_interface"]][ 

544 "mgmt_vnf_interface" 

545 ] = True 

546 

547 mgmt_vdu_iface = task_create.get( 

548 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0) 

549 ) 

550 if vim_interfaces: 

551 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True 

552 

553 if ro_task["vim_info"]["interfaces"] != vim_interfaces: 

554 ro_vim_item_update["interfaces"] = vim_interfaces 

555 

556 if ro_task["vim_info"]["vim_status"] != vim_info["status"]: 

557 ro_vim_item_update["vim_status"] = vim_info["status"] 

558 

559 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"): 

560 ro_vim_item_update["vim_name"] = vim_info.get("name") 

561 

562 if vim_info["status"] in ("ERROR", "VIM_ERROR"): 

563 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"): 

564 ro_vim_item_update["vim_message"] = vim_info.get("error_msg") 

565 elif vim_info["status"] == "DELETED": 

566 ro_vim_item_update["vim_id"] = None 

567 ro_vim_item_update["vim_message"] = "Deleted externally" 

568 else: 

569 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]: 

570 ro_vim_item_update["vim_details"] = vim_info["vim_info"] 

571 

572 if ro_vim_item_update: 

573 self.logger.debug( 

574 "ro_task={} {} get-vm={}: status={} {}".format( 

575 ro_task_id, 

576 ro_task["target_id"], 

577 vim_id, 

578 ro_vim_item_update.get("vim_status"), 

579 ro_vim_item_update.get("vim_message") 

580 if ro_vim_item_update.get("vim_status") != "ACTIVE" 

581 else "", 

582 ) 

583 ) 

584 

585 return task_status, ro_vim_item_update 

586 

587 def exec(self, ro_task, task_index, task_depends): 

588 task = ro_task["tasks"][task_index] 

589 task_id = task["task_id"] 

590 target_vim = self.my_vims[ro_task["target_id"]] 

591 db_task_update = {"retries": 0} 

592 retries = task.get("retries", 0) 

593 

594 try: 

595 params = task["params"] 

596 params_copy = deepcopy(params) 

597 params_copy["ro_key"] = self.db.decrypt( 

598 params_copy.pop("private_key"), 

599 params_copy.pop("schema_version"), 

600 params_copy.pop("salt"), 

601 ) 

602 params_copy["ip_addr"] = params_copy.pop("ip_address") 

603 target_vim.inject_user_key(**params_copy) 

604 self.logger.debug( 

605 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"]) 

606 ) 

607 

608 return ( 

609 "DONE", 

610 None, 

611 db_task_update, 

612 ) # params_copy["key"] 

613 except (vimconn.VimConnException, NsWorkerException) as e: 

614 retries += 1 

615 

616 self.logger.debug(traceback.format_exc()) 

617 if retries < self.max_retries_inject_ssh_key: 

618 return ( 

619 "BUILD", 

620 None, 

621 { 

622 "retries": retries, 

623 "next_retry": self.time_retries_inject_ssh_key, 

624 }, 

625 ) 

626 

627 self.logger.error( 

628 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e) 

629 ) 

630 ro_vim_item_update = {"vim_message": str(e)} 

631 

632 return "FAILED", ro_vim_item_update, db_task_update 

633 

634 

635class VimInteractionImage(VimInteractionBase): 

636 def new(self, ro_task, task_index, task_depends): 

637 task = ro_task["tasks"][task_index] 

638 task_id = task["task_id"] 

639 created = False 

640 created_items = {} 

641 target_vim = self.my_vims[ro_task["target_id"]] 

642 

643 try: 

644 # FIND 

645 vim_image_id = "" 

646 if task.get("find_params"): 

647 vim_images = target_vim.get_image_list(**task["find_params"]) 

648 

649 if not vim_images: 

650 raise NsWorkerExceptionNotFound( 

651 "Image not found with this criteria: '{}'".format( 

652 task["find_params"] 

653 ) 

654 ) 

655 elif len(vim_images) > 1: 

656 raise NsWorkerException( 

657 "More than one image found with this criteria: '{}'".format( 

658 task["find_params"] 

659 ) 

660 ) 

661 else: 

662 vim_image_id = vim_images[0]["id"] 

663 

664 ro_vim_item_update = { 

665 "vim_id": vim_image_id, 

666 "vim_status": "ACTIVE", 

667 "created": created, 

668 "created_items": created_items, 

669 "vim_details": None, 

670 "vim_message": None, 

671 } 

672 self.logger.debug( 

673 "task={} {} new-image={} created={}".format( 

674 task_id, ro_task["target_id"], vim_image_id, created 

675 ) 

676 ) 

677 

678 return "DONE", ro_vim_item_update 

679 except (NsWorkerException, vimconn.VimConnException) as e: 

680 self.logger.error( 

681 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e) 

682 ) 

683 ro_vim_item_update = { 

684 "vim_status": "VIM_ERROR", 

685 "created": created, 

686 "vim_message": str(e), 

687 } 

688 

689 return "FAILED", ro_vim_item_update 

690 

691 

692class VimInteractionSharedVolume(VimInteractionBase): 

693 def delete(self, ro_task, task_index): 

694 task = ro_task["tasks"][task_index] 

695 task_id = task["task_id"] 

696 shared_volume_vim_id = ro_task["vim_info"]["vim_id"] 

697 created_items = ro_task["vim_info"]["created_items"] 

698 ro_vim_item_update_ok = { 

699 "vim_status": "DELETED", 

700 "created": False, 

701 "vim_message": "DELETED", 

702 "vim_id": None, 

703 } 

704 if created_items and created_items.get(shared_volume_vim_id).get("keep"): 

705 ro_vim_item_update_ok = { 

706 "vim_status": "ACTIVE", 

707 "created": False, 

708 "vim_message": None, 

709 } 

710 return "DONE", ro_vim_item_update_ok 

711 try: 

712 if shared_volume_vim_id: 

713 target_vim = self.my_vims[ro_task["target_id"]] 

714 target_vim.delete_shared_volumes(shared_volume_vim_id) 

715 except vimconn.VimConnNotFoundException: 

716 ro_vim_item_update_ok["vim_message"] = "already deleted" 

717 except vimconn.VimConnException as e: 

718 self.logger.error( 

719 "ro_task={} vim={} del-shared-volume={}: {}".format( 

720 ro_task["_id"], ro_task["target_id"], shared_volume_vim_id, e 

721 ) 

722 ) 

723 ro_vim_item_update = { 

724 "vim_status": "VIM_ERROR", 

725 "vim_message": "Error while deleting: {}".format(e), 

726 } 

727 

728 return "FAILED", ro_vim_item_update 

729 

730 self.logger.debug( 

731 "task={} {} del-shared-volume={} {}".format( 

732 task_id, 

733 ro_task["target_id"], 

734 shared_volume_vim_id, 

735 ro_vim_item_update_ok.get("vim_message", ""), 

736 ) 

737 ) 

738 

739 return "DONE", ro_vim_item_update_ok 

740 

741 def new(self, ro_task, task_index, task_depends): 

742 task = ro_task["tasks"][task_index] 

743 task_id = task["task_id"] 

744 created = False 

745 created_items = {} 

746 target_vim = self.my_vims[ro_task["target_id"]] 

747 

748 try: 

749 shared_volume_vim_id = None 

750 shared_volume_data = None 

751 

752 if task.get("params"): 

753 shared_volume_data = task["params"] 

754 

755 if shared_volume_data: 

756 self.logger.info( 

757 f"Creating the new shared_volume for {shared_volume_data}\n" 

758 ) 

759 ( 

760 shared_volume_name, 

761 shared_volume_vim_id, 

762 ) = target_vim.new_shared_volumes(shared_volume_data) 

763 created = True 

764 created_items[shared_volume_vim_id] = { 

765 "name": shared_volume_name, 

766 "keep": shared_volume_data.get("keep"), 

767 } 

768 

769 ro_vim_item_update = { 

770 "vim_id": shared_volume_vim_id, 

771 "vim_status": "ACTIVE", 

772 "created": created, 

773 "created_items": created_items, 

774 "vim_details": None, 

775 "vim_message": None, 

776 } 

777 self.logger.debug( 

778 "task={} {} new-shared-volume={} created={}".format( 

779 task_id, ro_task["target_id"], shared_volume_vim_id, created 

780 ) 

781 ) 

782 

783 return "DONE", ro_vim_item_update 

784 except (vimconn.VimConnException, NsWorkerException) as e: 

785 self.logger.error( 

786 "task={} vim={} new-shared-volume:" 

787 " {}".format(task_id, ro_task["target_id"], e) 

788 ) 

789 ro_vim_item_update = { 

790 "vim_status": "VIM_ERROR", 

791 "created": created, 

792 "vim_message": str(e), 

793 } 

794 

795 return "FAILED", ro_vim_item_update 

796 

797 

798class VimInteractionFlavor(VimInteractionBase): 

799 def delete(self, ro_task, task_index): 

800 task = ro_task["tasks"][task_index] 

801 task_id = task["task_id"] 

802 flavor_vim_id = ro_task["vim_info"]["vim_id"] 

803 ro_vim_item_update_ok = { 

804 "vim_status": "DELETED", 

805 "created": False, 

806 "vim_message": "DELETED", 

807 "vim_id": None, 

808 } 

809 

810 try: 

811 if flavor_vim_id: 

812 target_vim = self.my_vims[ro_task["target_id"]] 

813 target_vim.delete_flavor(flavor_vim_id) 

814 except vimconn.VimConnNotFoundException: 

815 ro_vim_item_update_ok["vim_message"] = "already deleted" 

816 except vimconn.VimConnException as e: 

817 self.logger.error( 

818 "ro_task={} vim={} del-flavor={}: {}".format( 

819 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e 

820 ) 

821 ) 

822 ro_vim_item_update = { 

823 "vim_status": "VIM_ERROR", 

824 "vim_message": "Error while deleting: {}".format(e), 

825 } 

826 

827 return "FAILED", ro_vim_item_update 

828 

829 self.logger.debug( 

830 "task={} {} del-flavor={} {}".format( 

831 task_id, 

832 ro_task["target_id"], 

833 flavor_vim_id, 

834 ro_vim_item_update_ok.get("vim_message", ""), 

835 ) 

836 ) 

837 

838 return "DONE", ro_vim_item_update_ok 

839 

840 def new(self, ro_task, task_index, task_depends): 

841 task = ro_task["tasks"][task_index] 

842 task_id = task["task_id"] 

843 created = False 

844 created_items = {} 

845 target_vim = self.my_vims[ro_task["target_id"]] 

846 try: 

847 # FIND 

848 vim_flavor_id = None 

849 

850 if task.get("find_params", {}).get("vim_flavor_id"): 

851 vim_flavor_id = task["find_params"]["vim_flavor_id"] 

852 elif task.get("find_params", {}).get("flavor_data"): 

853 try: 

854 flavor_data = task["find_params"]["flavor_data"] 

855 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data) 

856 except vimconn.VimConnNotFoundException as flavor_not_found_msg: 

857 self.logger.warning( 

858 f"VimConnNotFoundException occured: {flavor_not_found_msg}" 

859 ) 

860 

861 if not vim_flavor_id and task.get("params"): 

862 # CREATE 

863 flavor_data = task["params"]["flavor_data"] 

864 vim_flavor_id = target_vim.new_flavor(flavor_data) 

865 created = True 

866 

867 ro_vim_item_update = { 

868 "vim_id": vim_flavor_id, 

869 "vim_status": "ACTIVE", 

870 "created": created, 

871 "created_items": created_items, 

872 "vim_details": None, 

873 "vim_message": None, 

874 } 

875 self.logger.debug( 

876 "task={} {} new-flavor={} created={}".format( 

877 task_id, ro_task["target_id"], vim_flavor_id, created 

878 ) 

879 ) 

880 

881 return "DONE", ro_vim_item_update 

882 except (vimconn.VimConnException, NsWorkerException) as e: 

883 self.logger.error( 

884 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e) 

885 ) 

886 ro_vim_item_update = { 

887 "vim_status": "VIM_ERROR", 

888 "created": created, 

889 "vim_message": str(e), 

890 } 

891 

892 return "FAILED", ro_vim_item_update 

893 

894 

895class VimInteractionAffinityGroup(VimInteractionBase): 

896 def delete(self, ro_task, task_index): 

897 task = ro_task["tasks"][task_index] 

898 task_id = task["task_id"] 

899 affinity_group_vim_id = ro_task["vim_info"]["vim_id"] 

900 ro_vim_item_update_ok = { 

901 "vim_status": "DELETED", 

902 "created": False, 

903 "vim_message": "DELETED", 

904 "vim_id": None, 

905 } 

906 

907 try: 

908 if affinity_group_vim_id: 

909 target_vim = self.my_vims[ro_task["target_id"]] 

910 target_vim.delete_affinity_group(affinity_group_vim_id) 

911 except vimconn.VimConnNotFoundException: 

912 ro_vim_item_update_ok["vim_message"] = "already deleted" 

913 except vimconn.VimConnException as e: 

914 self.logger.error( 

915 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format( 

916 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e 

917 ) 

918 ) 

919 ro_vim_item_update = { 

920 "vim_status": "VIM_ERROR", 

921 "vim_message": "Error while deleting: {}".format(e), 

922 } 

923 

924 return "FAILED", ro_vim_item_update 

925 

926 self.logger.debug( 

927 "task={} {} del-affinity-or-anti-affinity-group={} {}".format( 

928 task_id, 

929 ro_task["target_id"], 

930 affinity_group_vim_id, 

931 ro_vim_item_update_ok.get("vim_message", ""), 

932 ) 

933 ) 

934 

935 return "DONE", ro_vim_item_update_ok 

936 

937 def new(self, ro_task, task_index, task_depends): 

938 task = ro_task["tasks"][task_index] 

939 task_id = task["task_id"] 

940 created = False 

941 created_items = {} 

942 target_vim = self.my_vims[ro_task["target_id"]] 

943 

944 try: 

945 affinity_group_vim_id = None 

946 affinity_group_data = None 

947 param_affinity_group_id = "" 

948 

949 if task.get("params"): 

950 affinity_group_data = task["params"].get("affinity_group_data") 

951 

952 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"): 

953 try: 

954 param_affinity_group_id = task["params"]["affinity_group_data"].get( 

955 "vim-affinity-group-id" 

956 ) 

957 affinity_group_vim_id = target_vim.get_affinity_group( 

958 param_affinity_group_id 

959 ).get("id") 

960 except vimconn.VimConnNotFoundException: 

961 self.logger.error( 

962 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}" 

963 "could not be found at VIM. Creating a new one.".format( 

964 task_id, ro_task["target_id"], param_affinity_group_id 

965 ) 

966 ) 

967 

968 if not affinity_group_vim_id and affinity_group_data: 

969 affinity_group_vim_id = target_vim.new_affinity_group( 

970 affinity_group_data 

971 ) 

972 created = True 

973 

974 ro_vim_item_update = { 

975 "vim_id": affinity_group_vim_id, 

976 "vim_status": "ACTIVE", 

977 "created": created, 

978 "created_items": created_items, 

979 "vim_details": None, 

980 "vim_message": None, 

981 } 

982 self.logger.debug( 

983 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format( 

984 task_id, ro_task["target_id"], affinity_group_vim_id, created 

985 ) 

986 ) 

987 

988 return "DONE", ro_vim_item_update 

989 except (vimconn.VimConnException, NsWorkerException) as e: 

990 self.logger.error( 

991 "task={} vim={} new-affinity-or-anti-affinity-group:" 

992 " {}".format(task_id, ro_task["target_id"], e) 

993 ) 

994 ro_vim_item_update = { 

995 "vim_status": "VIM_ERROR", 

996 "created": created, 

997 "vim_message": str(e), 

998 } 

999 

1000 return "FAILED", ro_vim_item_update 

1001 

1002 

1003class VimInteractionUpdateVdu(VimInteractionBase): 

1004 def exec(self, ro_task, task_index, task_depends): 

1005 task = ro_task["tasks"][task_index] 

1006 task_id = task["task_id"] 

1007 db_task_update = {"retries": 0} 

1008 target_vim = self.my_vims[ro_task["target_id"]] 

1009 

1010 try: 

1011 vim_vm_id = "" 

1012 if task.get("params"): 

1013 vim_vm_id = task["params"].get("vim_vm_id") 

1014 action = task["params"].get("action") 

1015 context = {action: action} 

1016 target_vim.action_vminstance(vim_vm_id, context) 

1017 # created = True 

1018 ro_vim_item_update = { 

1019 "vim_id": vim_vm_id, 

1020 "vim_status": "ACTIVE", 

1021 } 

1022 self.logger.debug( 

1023 "task={} {} vm-migration done".format(task_id, ro_task["target_id"]) 

1024 ) 

1025 return "DONE", ro_vim_item_update, db_task_update 

1026 except (vimconn.VimConnException, NsWorkerException) as e: 

1027 self.logger.error( 

1028 "task={} vim={} VM Migration:" 

1029 " {}".format(task_id, ro_task["target_id"], e) 

1030 ) 

1031 ro_vim_item_update = { 

1032 "vim_status": "VIM_ERROR", 

1033 "vim_message": str(e), 

1034 } 

1035 

1036 return "FAILED", ro_vim_item_update, db_task_update 

1037 

1038 

1039class VimInteractionSdnNet(VimInteractionBase): 

1040 @staticmethod 

1041 def _match_pci(port_pci, mapping): 

1042 """ 

1043 Check if port_pci matches with mapping. 

1044 The mapping can have brackets to indicate that several chars are accepted. e.g 

1045 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]' 

1046 :param port_pci: text 

1047 :param mapping: text, can contain brackets to indicate several chars are available 

1048 :return: True if matches, False otherwise 

1049 """ 

1050 if not port_pci or not mapping: 

1051 return False 

1052 if port_pci == mapping: 

1053 return True 

1054 

1055 mapping_index = 0 

1056 pci_index = 0 

1057 while True: 

1058 bracket_start = mapping.find("[", mapping_index) 

1059 

1060 if bracket_start == -1: 

1061 break 

1062 

1063 bracket_end = mapping.find("]", bracket_start) 

1064 if bracket_end == -1: 

1065 break 

1066 

1067 length = bracket_start - mapping_index 

1068 if ( 

1069 length 

1070 and port_pci[pci_index : pci_index + length] 

1071 != mapping[mapping_index:bracket_start] 

1072 ): 

1073 return False 

1074 

1075 if ( 

1076 port_pci[pci_index + length] 

1077 not in mapping[bracket_start + 1 : bracket_end] 

1078 ): 

1079 return False 

1080 

1081 pci_index += length + 1 

1082 mapping_index = bracket_end + 1 

1083 

1084 if port_pci[pci_index:] != mapping[mapping_index:]: 

1085 return False 

1086 

1087 return True 

1088 

1089 def _get_interfaces(self, vlds_to_connect, vim_account_id): 

1090 """ 

1091 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id> 

1092 :param vim_account_id: 

1093 :return: 

1094 """ 

1095 interfaces = [] 

1096 

1097 for vld in vlds_to_connect: 

1098 table, _, db_id = vld.partition(":") 

1099 db_id, _, vld = db_id.partition(":") 

1100 _, _, vld_id = vld.partition(".") 

1101 

1102 if table == "vnfrs": 

1103 q_filter = {"vim-account-id": vim_account_id, "_id": db_id} 

1104 iface_key = "vnf-vld-id" 

1105 else: # table == "nsrs" 

1106 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id} 

1107 iface_key = "ns-vld-id" 

1108 

1109 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter) 

1110 

1111 for db_vnfr in db_vnfrs: 

1112 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())): 

1113 for iface_index, interface in enumerate(vdur["interfaces"]): 

1114 if interface.get(iface_key) == vld_id and interface.get( 

1115 "type" 

1116 ) in ("SR-IOV", "PCI-PASSTHROUGH"): 

1117 # only SR-IOV o PT 

1118 interface_ = interface.copy() 

1119 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format( 

1120 db_vnfr["_id"], vdu_index, iface_index 

1121 ) 

1122 

1123 if vdur.get("status") == "ERROR": 

1124 interface_["status"] = "ERROR" 

1125 

1126 interfaces.append(interface_) 

1127 

1128 return interfaces 

1129 

1130 def refresh(self, ro_task): 

1131 # look for task create 

1132 task_create_index, _ = next( 

1133 i_t 

1134 for i_t in enumerate(ro_task["tasks"]) 

1135 if i_t[1] 

1136 and i_t[1]["action"] == "CREATE" 

1137 and i_t[1]["status"] != "FINISHED" 

1138 ) 

1139 

1140 return self.new(ro_task, task_create_index, None) 

1141 

1142 def new(self, ro_task, task_index, task_depends): 

1143 task = ro_task["tasks"][task_index] 

1144 task_id = task["task_id"] 

1145 target_vim = self.my_vims[ro_task["target_id"]] 

1146 

1147 sdn_net_id = ro_task["vim_info"]["vim_id"] 

1148 

1149 created_items = ro_task["vim_info"].get("created_items") 

1150 connected_ports = ro_task["vim_info"].get("connected_ports", []) 

1151 new_connected_ports = [] 

1152 last_update = ro_task["vim_info"].get("last_update", 0) 

1153 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD" 

1154 error_list = [] 

1155 created = ro_task["vim_info"].get("created", False) 

1156 

1157 try: 

1158 # CREATE 

1159 db_vim = {} 

1160 params = task["params"] 

1161 vlds_to_connect = params.get("vlds", []) 

1162 associated_vim = params.get("target_vim") 

1163 # external additional ports 

1164 additional_ports = params.get("sdn-ports") or () 

1165 _, _, vim_account_id = ( 

1166 (None, None, None) 

1167 if associated_vim is None 

1168 else associated_vim.partition(":") 

1169 ) 

1170 

1171 if associated_vim: 

1172 # get associated VIM 

1173 if associated_vim not in self.db_vims: 

1174 self.db_vims[associated_vim] = self.db.get_one( 

1175 "vim_accounts", {"_id": vim_account_id} 

1176 ) 

1177 

1178 db_vim = self.db_vims[associated_vim] 

1179 

1180 # look for ports to connect 

1181 ports = self._get_interfaces(vlds_to_connect, vim_account_id) 

1182 # print(ports) 

1183 

1184 sdn_ports = [] 

1185 pending_ports = error_ports = 0 

1186 vlan_used = None 

1187 sdn_need_update = False 

1188 

1189 for port in ports: 

1190 vlan_used = port.get("vlan") or vlan_used 

1191 

1192 # TODO. Do not connect if already done 

1193 if not port.get("compute_node") or not port.get("pci"): 

1194 if port.get("status") == "ERROR": 

1195 error_ports += 1 

1196 else: 

1197 pending_ports += 1 

1198 continue 

1199 

1200 pmap = None 

1201 compute_node_mappings = next( 

1202 ( 

1203 c 

1204 for c in db_vim["config"].get("sdn-port-mapping", ()) 

1205 if c and c["compute_node"] == port["compute_node"] 

1206 ), 

1207 None, 

1208 ) 

1209 

1210 if compute_node_mappings: 

1211 # process port_mapping pci of type 0000:af:1[01].[1357] 

1212 pmap = next( 

1213 ( 

1214 p 

1215 for p in compute_node_mappings["ports"] 

1216 if self._match_pci(port["pci"], p.get("pci")) 

1217 ), 

1218 None, 

1219 ) 

1220 

1221 if not pmap: 

1222 if not db_vim["config"].get("mapping_not_needed"): 

1223 error_list.append( 

1224 "Port mapping not found for compute_node={} pci={}".format( 

1225 port["compute_node"], port["pci"] 

1226 ) 

1227 ) 

1228 continue 

1229 

1230 pmap = {} 

1231 

1232 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"]) 

1233 new_port = { 

1234 "service_endpoint_id": pmap.get("service_endpoint_id") 

1235 or service_endpoint_id, 

1236 "service_endpoint_encapsulation_type": "dot1q" 

1237 if port["type"] == "SR-IOV" 

1238 else None, 

1239 "service_endpoint_encapsulation_info": { 

1240 "vlan": port.get("vlan"), 

1241 "mac": port.get("mac-address"), 

1242 "device_id": pmap.get("device_id") or port["compute_node"], 

1243 "device_interface_id": pmap.get("device_interface_id") 

1244 or port["pci"], 

1245 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"), 

1246 "switch_port": pmap.get("switch_port"), 

1247 "service_mapping_info": pmap.get("service_mapping_info"), 

1248 }, 

1249 } 

1250 

1251 # TODO 

1252 # if port["modified_at"] > last_update: 

1253 # sdn_need_update = True 

1254 new_connected_ports.append(port["id"]) # TODO 

1255 sdn_ports.append(new_port) 

1256 

1257 if error_ports: 

1258 error_list.append( 

1259 "{} interfaces have not been created as VDU is on ERROR status".format( 

1260 error_ports 

1261 ) 

1262 ) 

1263 

1264 # connect external ports 

1265 for index, additional_port in enumerate(additional_ports): 

1266 additional_port_id = additional_port.get( 

1267 "service_endpoint_id" 

1268 ) or "external-{}".format(index) 

1269 sdn_ports.append( 

1270 { 

1271 "service_endpoint_id": additional_port_id, 

1272 "service_endpoint_encapsulation_type": additional_port.get( 

1273 "service_endpoint_encapsulation_type", "dot1q" 

1274 ), 

1275 "service_endpoint_encapsulation_info": { 

1276 "vlan": additional_port.get("vlan") or vlan_used, 

1277 "mac": additional_port.get("mac_address"), 

1278 "device_id": additional_port.get("device_id"), 

1279 "device_interface_id": additional_port.get( 

1280 "device_interface_id" 

1281 ), 

1282 "switch_dpid": additional_port.get("switch_dpid") 

1283 or additional_port.get("switch_id"), 

1284 "switch_port": additional_port.get("switch_port"), 

1285 "service_mapping_info": additional_port.get( 

1286 "service_mapping_info" 

1287 ), 

1288 }, 

1289 } 

1290 ) 

1291 new_connected_ports.append(additional_port_id) 

1292 sdn_info = "" 

1293 

1294 # if there are more ports to connect or they have been modified, call create/update 

1295 if error_list: 

1296 sdn_status = "ERROR" 

1297 sdn_info = "; ".join(error_list) 

1298 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update: 

1299 last_update = time.time() 

1300 

1301 if not sdn_net_id: 

1302 if len(sdn_ports) < 2: 

1303 sdn_status = "ACTIVE" 

1304 

1305 if not pending_ports: 

1306 self.logger.debug( 

1307 "task={} {} new-sdn-net done, less than 2 ports".format( 

1308 task_id, ro_task["target_id"] 

1309 ) 

1310 ) 

1311 else: 

1312 net_type = params.get("type") or "ELAN" 

1313 ( 

1314 sdn_net_id, 

1315 created_items, 

1316 ) = target_vim.create_connectivity_service(net_type, sdn_ports) 

1317 created = True 

1318 self.logger.debug( 

1319 "task={} {} new-sdn-net={} created={}".format( 

1320 task_id, ro_task["target_id"], sdn_net_id, created 

1321 ) 

1322 ) 

1323 else: 

1324 created_items = target_vim.edit_connectivity_service( 

1325 sdn_net_id, conn_info=created_items, connection_points=sdn_ports 

1326 ) 

1327 created = True 

1328 self.logger.debug( 

1329 "task={} {} update-sdn-net={} created={}".format( 

1330 task_id, ro_task["target_id"], sdn_net_id, created 

1331 ) 

1332 ) 

1333 

1334 connected_ports = new_connected_ports 

1335 elif sdn_net_id: 

1336 wim_status_dict = target_vim.get_connectivity_service_status( 

1337 sdn_net_id, conn_info=created_items 

1338 ) 

1339 sdn_status = wim_status_dict["sdn_status"] 

1340 

1341 if wim_status_dict.get("sdn_info"): 

1342 sdn_info = str(wim_status_dict.get("sdn_info")) or "" 

1343 

1344 if wim_status_dict.get("error_msg"): 

1345 sdn_info = wim_status_dict.get("error_msg") or "" 

1346 

1347 if pending_ports: 

1348 if sdn_status != "ERROR": 

1349 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format( 

1350 len(ports) - pending_ports, len(ports) 

1351 ) 

1352 

1353 if sdn_status == "ACTIVE": 

1354 sdn_status = "BUILD" 

1355 

1356 ro_vim_item_update = { 

1357 "vim_id": sdn_net_id, 

1358 "vim_status": sdn_status, 

1359 "created": created, 

1360 "created_items": created_items, 

1361 "connected_ports": connected_ports, 

1362 "vim_details": sdn_info, 

1363 "vim_message": None, 

1364 "last_update": last_update, 

1365 } 

1366 

1367 return sdn_status, ro_vim_item_update 

1368 except Exception as e: 

1369 self.logger.error( 

1370 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e), 

1371 exc_info=not isinstance( 

1372 e, (sdnconn.SdnConnectorError, vimconn.VimConnException) 

1373 ), 

1374 ) 

1375 ro_vim_item_update = { 

1376 "vim_status": "VIM_ERROR", 

1377 "created": created, 

1378 "vim_message": str(e), 

1379 } 

1380 

1381 return "FAILED", ro_vim_item_update 

1382 

1383 def delete(self, ro_task, task_index): 

1384 task = ro_task["tasks"][task_index] 

1385 task_id = task["task_id"] 

1386 sdn_vim_id = ro_task["vim_info"].get("vim_id") 

1387 ro_vim_item_update_ok = { 

1388 "vim_status": "DELETED", 

1389 "created": False, 

1390 "vim_message": "DELETED", 

1391 "vim_id": None, 

1392 } 

1393 

1394 try: 

1395 if sdn_vim_id: 

1396 target_vim = self.my_vims[ro_task["target_id"]] 

1397 target_vim.delete_connectivity_service( 

1398 sdn_vim_id, ro_task["vim_info"].get("created_items") 

1399 ) 

1400 

1401 except Exception as e: 

1402 if ( 

1403 isinstance(e, sdnconn.SdnConnectorError) 

1404 and e.http_code == HTTPStatus.NOT_FOUND.value 

1405 ): 

1406 ro_vim_item_update_ok["vim_message"] = "already deleted" 

1407 else: 

1408 self.logger.error( 

1409 "ro_task={} vim={} del-sdn-net={}: {}".format( 

1410 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e 

1411 ), 

1412 exc_info=not isinstance( 

1413 e, (sdnconn.SdnConnectorError, vimconn.VimConnException) 

1414 ), 

1415 ) 

1416 ro_vim_item_update = { 

1417 "vim_status": "VIM_ERROR", 

1418 "vim_message": "Error while deleting: {}".format(e), 

1419 } 

1420 

1421 return "FAILED", ro_vim_item_update 

1422 

1423 self.logger.debug( 

1424 "task={} {} del-sdn-net={} {}".format( 

1425 task_id, 

1426 ro_task["target_id"], 

1427 sdn_vim_id, 

1428 ro_vim_item_update_ok.get("vim_message", ""), 

1429 ) 

1430 ) 

1431 

1432 return "DONE", ro_vim_item_update_ok 

1433 

1434 

1435class VimInteractionMigration(VimInteractionBase): 

1436 def exec(self, ro_task, task_index, task_depends): 

1437 task = ro_task["tasks"][task_index] 

1438 task_id = task["task_id"] 

1439 db_task_update = {"retries": 0} 

1440 target_vim = self.my_vims[ro_task["target_id"]] 

1441 vim_interfaces = [] 

1442 refreshed_vim_info = {} 

1443 

1444 try: 

1445 vim_vm_id = "" 

1446 if task.get("params"): 

1447 vim_vm_id = task["params"].get("vim_vm_id") 

1448 migrate_host = task["params"].get("migrate_host") 

1449 _, migrated_compute_node = target_vim.migrate_instance( 

1450 vim_vm_id, migrate_host 

1451 ) 

1452 

1453 if migrated_compute_node: 

1454 # When VM is migrated, vdu["vim_info"] needs to be updated 

1455 vdu_old_vim_info = task["params"]["vdu_vim_info"].get( 

1456 ro_task["target_id"] 

1457 ) 

1458 

1459 # Refresh VM to get new vim_info 

1460 vm_to_refresh_list = [vim_vm_id] 

1461 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list) 

1462 refreshed_vim_info = vim_dict[vim_vm_id] 

1463 

1464 if refreshed_vim_info.get("interfaces"): 

1465 for old_iface in vdu_old_vim_info.get("interfaces"): 

1466 iface = next( 

1467 ( 

1468 iface 

1469 for iface in refreshed_vim_info["interfaces"] 

1470 if old_iface["vim_interface_id"] 

1471 == iface["vim_interface_id"] 

1472 ), 

1473 None, 

1474 ) 

1475 vim_interfaces.append(iface) 

1476 

1477 ro_vim_item_update = { 

1478 "vim_id": vim_vm_id, 

1479 "vim_status": "ACTIVE", 

1480 "vim_details": None, 

1481 "vim_message": None, 

1482 } 

1483 

1484 if refreshed_vim_info and refreshed_vim_info.get("status") not in ( 

1485 "ERROR", 

1486 "VIM_ERROR", 

1487 ): 

1488 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"] 

1489 

1490 if vim_interfaces: 

1491 ro_vim_item_update["interfaces"] = vim_interfaces 

1492 

1493 self.logger.debug( 

1494 "task={} {} vm-migration done".format(task_id, ro_task["target_id"]) 

1495 ) 

1496 

1497 return "DONE", ro_vim_item_update, db_task_update 

1498 

1499 except (vimconn.VimConnException, NsWorkerException) as e: 

1500 self.logger.error( 

1501 "task={} vim={} VM Migration:" 

1502 " {}".format(task_id, ro_task["target_id"], e) 

1503 ) 

1504 ro_vim_item_update = { 

1505 "vim_status": "VIM_ERROR", 

1506 "vim_message": str(e), 

1507 } 

1508 

1509 return "FAILED", ro_vim_item_update, db_task_update 

1510 

1511 

1512class VimInteractionResize(VimInteractionBase): 

1513 def exec(self, ro_task, task_index, task_depends): 

1514 task = ro_task["tasks"][task_index] 

1515 task_id = task["task_id"] 

1516 db_task_update = {"retries": 0} 

1517 target_flavor_uuid = None 

1518 refreshed_vim_info = {} 

1519 target_vim = self.my_vims[ro_task["target_id"]] 

1520 

1521 try: 

1522 vim_vm_id = "" 

1523 if task.get("params"): 

1524 vim_vm_id = task["params"].get("vim_vm_id") 

1525 flavor_dict = task["params"].get("flavor_dict") 

1526 self.logger.info("flavor_dict %s", flavor_dict) 

1527 

1528 try: 

1529 target_flavor_uuid = target_vim.get_flavor_id_from_data(flavor_dict) 

1530 except Exception as e: 

1531 self.logger.info("Cannot find any flavor matching %s.", str(e)) 

1532 try: 

1533 target_flavor_uuid = target_vim.new_flavor(flavor_dict) 

1534 except Exception as e: 

1535 self.logger.error("Error creating flavor at VIM %s.", str(e)) 

1536 

1537 if target_flavor_uuid is not None: 

1538 resized_status = target_vim.resize_instance( 

1539 vim_vm_id, target_flavor_uuid 

1540 ) 

1541 

1542 if resized_status: 

1543 # Refresh VM to get new vim_info 

1544 vm_to_refresh_list = [vim_vm_id] 

1545 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list) 

1546 refreshed_vim_info = vim_dict[vim_vm_id] 

1547 

1548 ro_vim_item_update = { 

1549 "vim_id": vim_vm_id, 

1550 "vim_status": "ACTIVE", 

1551 "vim_details": None, 

1552 "vim_message": None, 

1553 } 

1554 

1555 if refreshed_vim_info and refreshed_vim_info.get("status") not in ( 

1556 "ERROR", 

1557 "VIM_ERROR", 

1558 ): 

1559 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"] 

1560 

1561 self.logger.debug( 

1562 "task={} {} resize done".format(task_id, ro_task["target_id"]) 

1563 ) 

1564 return "DONE", ro_vim_item_update, db_task_update 

1565 except (vimconn.VimConnException, NsWorkerException) as e: 

1566 self.logger.error( 

1567 "task={} vim={} Resize:" " {}".format(task_id, ro_task["target_id"], e) 

1568 ) 

1569 ro_vim_item_update = { 

1570 "vim_status": "VIM_ERROR", 

1571 "vim_message": str(e), 

1572 } 

1573 

1574 return "FAILED", ro_vim_item_update, db_task_update 

1575 

1576 

1577class ConfigValidate: 

1578 def __init__(self, config: Dict): 

1579 self.conf = config 

1580 

1581 @property 

1582 def active(self): 

1583 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks 

1584 if ( 

1585 self.conf["period"]["refresh_active"] >= 60 

1586 or self.conf["period"]["refresh_active"] == -1 

1587 ): 

1588 return self.conf["period"]["refresh_active"] 

1589 

1590 return 60 

1591 

1592 @property 

1593 def build(self): 

1594 return self.conf["period"]["refresh_build"] 

1595 

1596 @property 

1597 def image(self): 

1598 return self.conf["period"]["refresh_image"] 

1599 

1600 @property 

1601 def error(self): 

1602 return self.conf["period"]["refresh_error"] 

1603 

1604 @property 

1605 def queue_size(self): 

1606 return self.conf["period"]["queue_size"] 

1607 

1608 

1609class NsWorker(threading.Thread): 

1610 def __init__(self, worker_index, config, plugins, db): 

1611 """ 

1612 :param worker_index: thread index 

1613 :param config: general configuration of RO, among others the process_id with the docker id where it runs 

1614 :param plugins: global shared dict with the loaded plugins 

1615 :param db: database class instance to use 

1616 """ 

1617 threading.Thread.__init__(self) 

1618 self.config = config 

1619 self.plugins = plugins 

1620 self.plugin_name = "unknown" 

1621 self.logger = logging.getLogger("ro.worker{}".format(worker_index)) 

1622 self.worker_index = worker_index 

1623 # refresh periods for created items 

1624 self.refresh_config = ConfigValidate(config) 

1625 self.task_queue = queue.Queue(self.refresh_config.queue_size) 

1626 # targetvim: vimplugin class 

1627 self.my_vims = {} 

1628 # targetvim: vim information from database 

1629 self.db_vims = {} 

1630 # targetvim list 

1631 self.vim_targets = [] 

1632 self.my_id = config["process_id"] + ":" + str(worker_index) 

1633 self.db = db 

1634 self.item2class = { 

1635 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger), 

1636 "shared-volumes": VimInteractionSharedVolume( 

1637 self.db, self.my_vims, self.db_vims, self.logger 

1638 ), 

1639 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger), 

1640 "image": VimInteractionImage( 

1641 self.db, self.my_vims, self.db_vims, self.logger 

1642 ), 

1643 "flavor": VimInteractionFlavor( 

1644 self.db, self.my_vims, self.db_vims, self.logger 

1645 ), 

1646 "sdn_net": VimInteractionSdnNet( 

1647 self.db, self.my_vims, self.db_vims, self.logger 

1648 ), 

1649 "update": VimInteractionUpdateVdu( 

1650 self.db, self.my_vims, self.db_vims, self.logger 

1651 ), 

1652 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup( 

1653 self.db, self.my_vims, self.db_vims, self.logger 

1654 ), 

1655 "migrate": VimInteractionMigration( 

1656 self.db, self.my_vims, self.db_vims, self.logger 

1657 ), 

1658 "verticalscale": VimInteractionResize( 

1659 self.db, self.my_vims, self.db_vims, self.logger 

1660 ), 

1661 } 

1662 self.time_last_task_processed = None 

1663 # lists of tasks to delete because nsrs or vnfrs has been deleted from db 

1664 self.tasks_to_delete = [] 

1665 # it is idle when there are not vim_targets associated 

1666 self.idle = True 

1667 self.task_locked_time = config["global"]["task_locked_time"] 

1668 

1669 def insert_task(self, task): 

1670 try: 

1671 self.task_queue.put(task, False) 

1672 return None 

1673 except queue.Full: 

1674 raise NsWorkerException("timeout inserting a task") 

1675 

1676 def terminate(self): 

1677 self.insert_task("exit") 

1678 

1679 def del_task(self, task): 

1680 with self.task_lock: 

1681 if task["status"] == "SCHEDULED": 

1682 task["status"] = "SUPERSEDED" 

1683 return True 

1684 else: # task["status"] == "processing" 

1685 self.task_lock.release() 

1686 return False 

1687 

1688 def _process_vim_config(self, target_id: str, db_vim: dict) -> None: 

1689 """ 

1690 Process vim config, creating vim configuration files as ca_cert 

1691 :param target_id: vim/sdn/wim + id 

1692 :param db_vim: Vim dictionary obtained from database 

1693 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files 

1694 """ 

1695 if not db_vim.get("config"): 

1696 return 

1697 

1698 file_name = "" 

1699 work_dir = "/app/osm_ro/certs" 

1700 

1701 try: 

1702 if db_vim["config"].get("ca_cert_content"): 

1703 file_name = f"{work_dir}/{target_id}:{self.worker_index}" 

1704 

1705 if not path.isdir(file_name): 

1706 makedirs(file_name) 

1707 

1708 file_name = file_name + "/ca_cert" 

1709 

1710 with open(file_name, "w") as f: 

1711 f.write(db_vim["config"]["ca_cert_content"]) 

1712 del db_vim["config"]["ca_cert_content"] 

1713 db_vim["config"]["ca_cert"] = file_name 

1714 except Exception as e: 

1715 raise NsWorkerException( 

1716 "Error writing to file '{}': {}".format(file_name, e) 

1717 ) 

1718 

1719 def _load_plugin(self, name, type="vim"): 

1720 # type can be vim or sdn 

1721 if "rovim_dummy" not in self.plugins: 

1722 self.plugins["rovim_dummy"] = VimDummyConnector 

1723 

1724 if "rosdn_dummy" not in self.plugins: 

1725 self.plugins["rosdn_dummy"] = SdnDummyConnector 

1726 

1727 if name in self.plugins: 

1728 return self.plugins[name] 

1729 

1730 try: 

1731 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name): 

1732 self.plugins[name] = ep.load() 

1733 except Exception as e: 

1734 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e)) 

1735 

1736 if name and name not in self.plugins: 

1737 raise NsWorkerException( 

1738 "Plugin 'osm_{n}' has not been installed".format(n=name) 

1739 ) 

1740 

1741 return self.plugins[name] 

1742 

1743 def _unload_vim(self, target_id): 

1744 """ 

1745 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list 

1746 :param target_id: Contains type:_id; where type can be 'vim', ... 

1747 :return: None. 

1748 """ 

1749 try: 

1750 self.db_vims.pop(target_id, None) 

1751 self.my_vims.pop(target_id, None) 

1752 

1753 if target_id in self.vim_targets: 

1754 self.vim_targets.remove(target_id) 

1755 

1756 self.logger.info("Unloaded {}".format(target_id)) 

1757 except Exception as e: 

1758 self.logger.error("Cannot unload {}: {}".format(target_id, e)) 

1759 

1760 def _check_vim(self, target_id): 

1761 """ 

1762 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR 

1763 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim' 

1764 :return: None. 

1765 """ 

1766 target, _, _id = target_id.partition(":") 

1767 now = time.time() 

1768 update_dict = {} 

1769 unset_dict = {} 

1770 op_text = "" 

1771 step = "" 

1772 loaded = target_id in self.vim_targets 

1773 target_database = ( 

1774 "vim_accounts" 

1775 if target == "vim" 

1776 else "wim_accounts" 

1777 if target == "wim" 

1778 else "sdns" 

1779 ) 

1780 error_text = "" 

1781 

1782 try: 

1783 step = "Getting {} from db".format(target_id) 

1784 db_vim = self.db.get_one(target_database, {"_id": _id}) 

1785 

1786 for op_index, operation in enumerate( 

1787 db_vim["_admin"].get("operations", ()) 

1788 ): 

1789 if operation["operationState"] != "PROCESSING": 

1790 continue 

1791 

1792 locked_at = operation.get("locked_at") 

1793 

1794 if locked_at is not None and locked_at >= now - self.task_locked_time: 

1795 # some other thread is doing this operation 

1796 return 

1797 

1798 # lock 

1799 op_text = "_admin.operations.{}.".format(op_index) 

1800 

1801 if not self.db.set_one( 

1802 target_database, 

1803 q_filter={ 

1804 "_id": _id, 

1805 op_text + "operationState": "PROCESSING", 

1806 op_text + "locked_at": locked_at, 

1807 }, 

1808 update_dict={ 

1809 op_text + "locked_at": now, 

1810 "admin.current_operation": op_index, 

1811 }, 

1812 fail_on_empty=False, 

1813 ): 

1814 return 

1815 

1816 unset_dict[op_text + "locked_at"] = None 

1817 unset_dict["current_operation"] = None 

1818 step = "Loading " + target_id 

1819 error_text = self._load_vim(target_id) 

1820 

1821 if not error_text: 

1822 step = "Checking connectivity" 

1823 

1824 if target == "vim": 

1825 self.my_vims[target_id].check_vim_connectivity() 

1826 else: 

1827 self.my_vims[target_id].check_credentials() 

1828 

1829 update_dict["_admin.operationalState"] = "ENABLED" 

1830 update_dict["_admin.detailed-status"] = "" 

1831 unset_dict[op_text + "detailed-status"] = None 

1832 update_dict[op_text + "operationState"] = "COMPLETED" 

1833 

1834 return 

1835 

1836 except Exception as e: 

1837 error_text = "{}: {}".format(step, e) 

1838 self.logger.error("{} for {}: {}".format(step, target_id, e)) 

1839 

1840 finally: 

1841 if update_dict or unset_dict: 

1842 if error_text: 

1843 update_dict[op_text + "operationState"] = "FAILED" 

1844 update_dict[op_text + "detailed-status"] = error_text 

1845 unset_dict.pop(op_text + "detailed-status", None) 

1846 update_dict["_admin.operationalState"] = "ERROR" 

1847 update_dict["_admin.detailed-status"] = error_text 

1848 

1849 if op_text: 

1850 update_dict[op_text + "statusEnteredTime"] = now 

1851 

1852 self.db.set_one( 

1853 target_database, 

1854 q_filter={"_id": _id}, 

1855 update_dict=update_dict, 

1856 unset=unset_dict, 

1857 fail_on_empty=False, 

1858 ) 

1859 

1860 if not loaded: 

1861 self._unload_vim(target_id) 

1862 

1863 def _reload_vim(self, target_id): 

1864 if target_id in self.vim_targets: 

1865 self._load_vim(target_id) 

1866 else: 

1867 # if the vim is not loaded, but database information of VIM is cached at self.db_vims, 

1868 # just remove it to force load again next time it is needed 

1869 self.db_vims.pop(target_id, None) 

1870 

1871 def _load_vim(self, target_id): 

1872 """ 

1873 Load or reload a vim_account, sdn_controller or wim_account. 

1874 Read content from database, load the plugin if not loaded. 

1875 In case of error loading the plugin, it loads a failing VIM_connector 

1876 It fills self db_vims dictionary, my_vims dictionary and vim_targets list 

1877 :param target_id: Contains type:_id; where type can be 'vim', ... 

1878 :return: None if ok, descriptive text if error 

1879 """ 

1880 target, _, _id = target_id.partition(":") 

1881 target_database = ( 

1882 "vim_accounts" 

1883 if target == "vim" 

1884 else "wim_accounts" 

1885 if target == "wim" 

1886 else "sdns" 

1887 ) 

1888 plugin_name = "" 

1889 vim = None 

1890 step = "Getting {}={} from db".format(target, _id) 

1891 

1892 try: 

1893 # TODO process for wim, sdnc, ... 

1894 vim = self.db.get_one(target_database, {"_id": _id}) 

1895 

1896 # if deep_get(vim, "config", "sdn-controller"): 

1897 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"]) 

1898 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]}) 

1899 

1900 step = "Decrypting password" 

1901 schema_version = vim.get("schema_version") 

1902 self.db.encrypt_decrypt_fields( 

1903 vim, 

1904 "decrypt", 

1905 fields=("password", "secret"), 

1906 schema_version=schema_version, 

1907 salt=_id, 

1908 ) 

1909 self._process_vim_config(target_id, vim) 

1910 

1911 if target == "vim": 

1912 plugin_name = "rovim_" + vim["vim_type"] 

1913 step = "Loading plugin '{}'".format(plugin_name) 

1914 vim_module_conn = self._load_plugin(plugin_name) 

1915 step = "Loading {}'".format(target_id) 

1916 self.my_vims[target_id] = vim_module_conn( 

1917 uuid=vim["_id"], 

1918 name=vim["name"], 

1919 tenant_id=vim.get("vim_tenant_id"), 

1920 tenant_name=vim.get("vim_tenant_name"), 

1921 url=vim["vim_url"], 

1922 url_admin=None, 

1923 user=vim["vim_user"], 

1924 passwd=vim["vim_password"], 

1925 config=vim.get("config") or {}, 

1926 persistent_info={}, 

1927 ) 

1928 else: # sdn 

1929 plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type")) 

1930 step = "Loading plugin '{}'".format(plugin_name) 

1931 vim_module_conn = self._load_plugin(plugin_name, "sdn") 

1932 step = "Loading {}'".format(target_id) 

1933 wim = deepcopy(vim) 

1934 wim_config = wim.pop("config", {}) or {} 

1935 wim["uuid"] = wim["_id"] 

1936 if "url" in wim and "wim_url" not in wim: 

1937 wim["wim_url"] = wim["url"] 

1938 elif "url" not in wim and "wim_url" in wim: 

1939 wim["url"] = wim["wim_url"] 

1940 

1941 if wim.get("dpid"): 

1942 wim_config["dpid"] = wim.pop("dpid") 

1943 

1944 if wim.get("switch_id"): 

1945 wim_config["switch_id"] = wim.pop("switch_id") 

1946 

1947 # wim, wim_account, config 

1948 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config) 

1949 self.db_vims[target_id] = vim 

1950 self.error_status = None 

1951 

1952 self.logger.info( 

1953 "Connector loaded for {}, plugin={}".format(target_id, plugin_name) 

1954 ) 

1955 except Exception as e: 

1956 self.logger.error( 

1957 "Cannot load {} plugin={}: {} {}".format( 

1958 target_id, plugin_name, step, e 

1959 ) 

1960 ) 

1961 

1962 self.db_vims[target_id] = vim or {} 

1963 self.db_vims[target_id] = FailingConnector(str(e)) 

1964 error_status = "{} Error: {}".format(step, e) 

1965 

1966 return error_status 

1967 finally: 

1968 if target_id not in self.vim_targets: 

1969 self.vim_targets.append(target_id) 

1970 

1971 def _get_db_task(self): 

1972 """ 

1973 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions 

1974 :return: None 

1975 """ 

1976 now = time.time() 

1977 

1978 if not self.time_last_task_processed: 

1979 self.time_last_task_processed = now 

1980 

1981 try: 

1982 while True: 

1983 """ 

1984 # Log RO tasks only when loglevel is DEBUG 

1985 if self.logger.getEffectiveLevel() == logging.DEBUG: 

1986 self._log_ro_task( 

1987 None, 

1988 None, 

1989 None, 

1990 "TASK_WF", 

1991 "task_locked_time=" 

1992 + str(self.task_locked_time) 

1993 + " " 

1994 + "time_last_task_processed=" 

1995 + str(self.time_last_task_processed) 

1996 + " " 

1997 + "now=" 

1998 + str(now), 

1999 ) 

2000 """ 

2001 locked = self.db.set_one( 

2002 "ro_tasks", 

2003 q_filter={ 

2004 "target_id": self.vim_targets, 

2005 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"], 

2006 "locked_at.lt": now - self.task_locked_time, 

2007 "to_check_at.lt": self.time_last_task_processed, 

2008 "to_check_at.gt": -1, 

2009 }, 

2010 update_dict={"locked_by": self.my_id, "locked_at": now}, 

2011 fail_on_empty=False, 

2012 ) 

2013 

2014 if locked: 

2015 # read and return 

2016 ro_task = self.db.get_one( 

2017 "ro_tasks", 

2018 q_filter={ 

2019 "target_id": self.vim_targets, 

2020 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"], 

2021 "locked_at": now, 

2022 }, 

2023 ) 

2024 return ro_task 

2025 

2026 if self.time_last_task_processed == now: 

2027 self.time_last_task_processed = None 

2028 return None 

2029 else: 

2030 self.time_last_task_processed = now 

2031 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now) 

2032 

2033 except DbException as e: 

2034 self.logger.error("Database exception at _get_db_task: {}".format(e)) 

2035 except Exception as e: 

2036 self.logger.critical( 

2037 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True 

2038 ) 

2039 

2040 return None 

2041 

2042 def _delete_task(self, ro_task, task_index, task_depends, db_update): 

2043 """ 

2044 Determine if this task need to be done or superseded 

2045 :return: None 

2046 """ 

2047 my_task = ro_task["tasks"][task_index] 

2048 task_id = my_task["task_id"] 

2049 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get( 

2050 "created_items", False 

2051 ) 

2052 

2053 self.logger.debug("Needed delete: {}".format(needed_delete)) 

2054 if my_task["status"] == "FAILED": 

2055 return None, None # TODO need to be retry?? 

2056 

2057 try: 

2058 for index, task in enumerate(ro_task["tasks"]): 

2059 if index == task_index or not task: 

2060 continue # own task 

2061 

2062 if ( 

2063 my_task["target_record"] == task["target_record"] 

2064 and task["action"] == "CREATE" 

2065 ): 

2066 # set to finished 

2067 db_update["tasks.{}.status".format(index)] = task[ 

2068 "status" 

2069 ] = "FINISHED" 

2070 elif task["action"] == "CREATE" and task["status"] not in ( 

2071 "FINISHED", 

2072 "SUPERSEDED", 

2073 ): 

2074 needed_delete = False 

2075 

2076 if needed_delete: 

2077 self.logger.debug( 

2078 "Deleting ro_task={} task_index={}".format(ro_task, task_index) 

2079 ) 

2080 return self.item2class[my_task["item"]].delete(ro_task, task_index) 

2081 else: 

2082 return "SUPERSEDED", None 

2083 except Exception as e: 

2084 if not isinstance(e, NsWorkerException): 

2085 self.logger.critical( 

2086 "Unexpected exception at _delete_task task={}: {}".format( 

2087 task_id, e 

2088 ), 

2089 exc_info=True, 

2090 ) 

2091 

2092 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e)} 

2093 

2094 def _create_task(self, ro_task, task_index, task_depends, db_update): 

2095 """ 

2096 Determine if this task need to create something at VIM 

2097 :return: None 

2098 """ 

2099 my_task = ro_task["tasks"][task_index] 

2100 task_id = my_task["task_id"] 

2101 

2102 if my_task["status"] == "FAILED": 

2103 return None, None # TODO need to be retry?? 

2104 elif my_task["status"] == "SCHEDULED": 

2105 # check if already created by another task 

2106 for index, task in enumerate(ro_task["tasks"]): 

2107 if index == task_index or not task: 

2108 continue # own task 

2109 

2110 if task["action"] == "CREATE" and task["status"] not in ( 

2111 "SCHEDULED", 

2112 "FINISHED", 

2113 "SUPERSEDED", 

2114 ): 

2115 return task["status"], "COPY_VIM_INFO" 

2116 

2117 try: 

2118 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new( 

2119 ro_task, task_index, task_depends 

2120 ) 

2121 # TODO update other CREATE tasks 

2122 except Exception as e: 

2123 if not isinstance(e, NsWorkerException): 

2124 self.logger.error( 

2125 "Error executing task={}: {}".format(task_id, e), exc_info=True 

2126 ) 

2127 

2128 task_status = "FAILED" 

2129 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_message": str(e)} 

2130 # TODO update ro_vim_item_update 

2131 

2132 return task_status, ro_vim_item_update 

2133 else: 

2134 return None, None 

2135 

2136 def _get_dependency(self, task_id, ro_task=None, target_id=None): 

2137 """ 

2138 Look for dependency task 

2139 :param task_id: Can be one of 

2140 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>" 

2141 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>" 

2142 3. task.task_id: "<action_id>:number" 

2143 :param ro_task: 

2144 :param target_id: 

2145 :return: database ro_task plus index of task 

2146 """ 

2147 if ( 

2148 task_id.startswith("vim:") 

2149 or task_id.startswith("sdn:") 

2150 or task_id.startswith("wim:") 

2151 ): 

2152 target_id, _, task_id = task_id.partition(" ") 

2153 

2154 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"): 

2155 ro_task_dependency = self.db.get_one( 

2156 "ro_tasks", 

2157 q_filter={"target_id": target_id, "tasks.target_record_id": task_id}, 

2158 fail_on_empty=False, 

2159 ) 

2160 

2161 if ro_task_dependency: 

2162 for task_index, task in enumerate(ro_task_dependency["tasks"]): 

2163 if task["target_record_id"] == task_id: 

2164 return ro_task_dependency, task_index 

2165 

2166 else: 

2167 if ro_task: 

2168 for task_index, task in enumerate(ro_task["tasks"]): 

2169 if task and task["task_id"] == task_id: 

2170 return ro_task, task_index 

2171 

2172 ro_task_dependency = self.db.get_one( 

2173 "ro_tasks", 

2174 q_filter={ 

2175 "tasks.ANYINDEX.task_id": task_id, 

2176 "tasks.ANYINDEX.target_record.ne": None, 

2177 }, 

2178 fail_on_empty=False, 

2179 ) 

2180 

2181 self.logger.debug("ro_task_dependency={}".format(ro_task_dependency)) 

2182 if ro_task_dependency: 

2183 for task_index, task in enumerate(ro_task_dependency["tasks"]): 

2184 if task["task_id"] == task_id: 

2185 return ro_task_dependency, task_index 

2186 raise NsWorkerException("Cannot get depending task {}".format(task_id)) 

2187 

2188 def update_vm_refresh(self, ro_task): 

2189 """Enables the VM status updates if self.refresh_config.active parameter 

2190 is not -1 and then updates the DB accordingly 

2191 

2192 """ 

2193 try: 

2194 self.logger.debug("Checking if VM status update config") 

2195 next_refresh = time.time() 

2196 next_refresh = self._get_next_refresh(ro_task, next_refresh) 

2197 

2198 if next_refresh != -1: 

2199 db_ro_task_update = {} 

2200 now = time.time() 

2201 next_check_at = now + (24 * 60 * 60) 

2202 next_check_at = min(next_check_at, next_refresh) 

2203 db_ro_task_update["vim_info.refresh_at"] = next_refresh 

2204 db_ro_task_update["to_check_at"] = next_check_at 

2205 

2206 self.logger.debug( 

2207 "Finding tasks which to be updated to enable VM status updates" 

2208 ) 

2209 refresh_tasks = self.db.get_list( 

2210 "ro_tasks", 

2211 q_filter={ 

2212 "tasks.status": "DONE", 

2213 "to_check_at.lt": 0, 

2214 }, 

2215 ) 

2216 self.logger.debug("Updating tasks to change the to_check_at status") 

2217 for task in refresh_tasks: 

2218 q_filter = { 

2219 "_id": task["_id"], 

2220 } 

2221 self.db.set_one( 

2222 "ro_tasks", 

2223 q_filter=q_filter, 

2224 update_dict=db_ro_task_update, 

2225 fail_on_empty=True, 

2226 ) 

2227 

2228 except Exception as e: 

2229 self.logger.error(f"Error updating tasks to enable VM status updates: {e}") 

2230 

2231 def _get_next_refresh(self, ro_task: dict, next_refresh: float): 

2232 """Decide the next_refresh according to vim type and refresh config period. 

2233 Args: 

2234 ro_task (dict): ro_task details 

2235 next_refresh (float): next refresh time as epoch format 

2236 

2237 Returns: 

2238 next_refresh (float) -1 if vm updates are disabled or vim type is openstack. 

2239 """ 

2240 target_vim = ro_task["target_id"] 

2241 vim_type = self.db_vims[target_vim]["vim_type"] 

2242 if self.refresh_config.active == -1 or vim_type == "openstack": 

2243 next_refresh = -1 

2244 else: 

2245 next_refresh += self.refresh_config.active 

2246 return next_refresh 

2247 

2248 def _process_pending_tasks(self, ro_task): 

2249 ro_task_id = ro_task["_id"] 

2250 now = time.time() 

2251 # one day 

2252 next_check_at = now + (24 * 60 * 60) 

2253 db_ro_task_update = {} 

2254 

2255 def _update_refresh(new_status): 

2256 # compute next_refresh 

2257 nonlocal task 

2258 nonlocal next_check_at 

2259 nonlocal db_ro_task_update 

2260 nonlocal ro_task 

2261 

2262 next_refresh = time.time() 

2263 

2264 if task["item"] in ("image", "flavor"): 

2265 next_refresh += self.refresh_config.image 

2266 elif new_status == "BUILD": 

2267 next_refresh += self.refresh_config.build 

2268 elif new_status == "DONE": 

2269 next_refresh = self._get_next_refresh(ro_task, next_refresh) 

2270 else: 

2271 next_refresh += self.refresh_config.error 

2272 

2273 next_check_at = min(next_check_at, next_refresh) 

2274 db_ro_task_update["vim_info.refresh_at"] = next_refresh 

2275 ro_task["vim_info"]["refresh_at"] = next_refresh 

2276 

2277 try: 

2278 """ 

2279 # Log RO tasks only when loglevel is DEBUG 

2280 if self.logger.getEffectiveLevel() == logging.DEBUG: 

2281 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK") 

2282 """ 

2283 # Check if vim status refresh is enabled again 

2284 self.update_vm_refresh(ro_task) 

2285 # 0: get task_status_create 

2286 lock_object = None 

2287 task_status_create = None 

2288 task_create = next( 

2289 ( 

2290 t 

2291 for t in ro_task["tasks"] 

2292 if t 

2293 and t["action"] == "CREATE" 

2294 and t["status"] in ("BUILD", "DONE") 

2295 ), 

2296 None, 

2297 ) 

2298 

2299 if task_create: 

2300 task_status_create = task_create["status"] 

2301 

2302 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD 

2303 for task_action in ("DELETE", "CREATE", "EXEC"): 

2304 db_vim_update = None 

2305 new_status = None 

2306 

2307 for task_index, task in enumerate(ro_task["tasks"]): 

2308 if not task: 

2309 continue # task deleted 

2310 

2311 task_depends = {} 

2312 target_update = None 

2313 

2314 if ( 

2315 ( 

2316 task_action in ("DELETE", "EXEC") 

2317 and task["status"] not in ("SCHEDULED", "BUILD") 

2318 ) 

2319 or task["action"] != task_action 

2320 or ( 

2321 task_action == "CREATE" 

2322 and task["status"] in ("FINISHED", "SUPERSEDED") 

2323 ) 

2324 ): 

2325 continue 

2326 

2327 task_path = "tasks.{}.status".format(task_index) 

2328 try: 

2329 db_vim_info_update = None 

2330 dependency_ro_task = {} 

2331 

2332 if task["status"] == "SCHEDULED": 

2333 # check if tasks that this depends on have been completed 

2334 dependency_not_completed = False 

2335 

2336 for dependency_task_id in task.get("depends_on") or (): 

2337 ( 

2338 dependency_ro_task, 

2339 dependency_task_index, 

2340 ) = self._get_dependency( 

2341 dependency_task_id, target_id=ro_task["target_id"] 

2342 ) 

2343 dependency_task = dependency_ro_task["tasks"][ 

2344 dependency_task_index 

2345 ] 

2346 self.logger.debug( 

2347 "dependency_ro_task={} dependency_task_index={}".format( 

2348 dependency_ro_task, dependency_task_index 

2349 ) 

2350 ) 

2351 

2352 if dependency_task["status"] == "SCHEDULED": 

2353 dependency_not_completed = True 

2354 next_check_at = min( 

2355 next_check_at, dependency_ro_task["to_check_at"] 

2356 ) 

2357 # must allow dependent task to be processed first 

2358 # to do this set time after last_task_processed 

2359 next_check_at = max( 

2360 self.time_last_task_processed, next_check_at 

2361 ) 

2362 break 

2363 elif dependency_task["status"] == "FAILED": 

2364 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format( 

2365 task["action"], 

2366 task["item"], 

2367 dependency_task["action"], 

2368 dependency_task["item"], 

2369 dependency_task_id, 

2370 dependency_ro_task["vim_info"].get( 

2371 "vim_message" 

2372 ), 

2373 ) 

2374 self.logger.error( 

2375 "task={} {}".format(task["task_id"], error_text) 

2376 ) 

2377 raise NsWorkerException(error_text) 

2378 

2379 task_depends[dependency_task_id] = dependency_ro_task[ 

2380 "vim_info" 

2381 ]["vim_id"] 

2382 task_depends[ 

2383 "TASK-{}".format(dependency_task_id) 

2384 ] = dependency_ro_task["vim_info"]["vim_id"] 

2385 

2386 if dependency_not_completed: 

2387 self.logger.warning( 

2388 "DEPENDENCY NOT COMPLETED {}".format( 

2389 dependency_ro_task["vim_info"]["vim_id"] 

2390 ) 

2391 ) 

2392 # TODO set at vim_info.vim_details that it is waiting 

2393 continue 

2394 

2395 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew 

2396 # the task of renew this locking. It will update database locket_at periodically 

2397 if not lock_object: 

2398 lock_object = LockRenew.add_lock_object( 

2399 "ro_tasks", ro_task, self 

2400 ) 

2401 if task["action"] == "DELETE": 

2402 ( 

2403 new_status, 

2404 db_vim_info_update, 

2405 ) = self._delete_task( 

2406 ro_task, task_index, task_depends, db_ro_task_update 

2407 ) 

2408 new_status = ( 

2409 "FINISHED" if new_status == "DONE" else new_status 

2410 ) 

2411 # ^with FINISHED instead of DONE it will not be refreshing 

2412 

2413 if new_status in ("FINISHED", "SUPERSEDED"): 

2414 target_update = "DELETE" 

2415 elif task["action"] == "EXEC": 

2416 ( 

2417 new_status, 

2418 db_vim_info_update, 

2419 db_task_update, 

2420 ) = self.item2class[task["item"]].exec( 

2421 ro_task, task_index, task_depends 

2422 ) 

2423 new_status = ( 

2424 "FINISHED" if new_status == "DONE" else new_status 

2425 ) 

2426 # ^with FINISHED instead of DONE it will not be refreshing 

2427 

2428 if db_task_update: 

2429 # load into database the modified db_task_update "retries" and "next_retry" 

2430 if db_task_update.get("retries"): 

2431 db_ro_task_update[ 

2432 "tasks.{}.retries".format(task_index) 

2433 ] = db_task_update["retries"] 

2434 

2435 next_check_at = time.time() + db_task_update.get( 

2436 "next_retry", 60 

2437 ) 

2438 target_update = None 

2439 elif task["action"] == "CREATE": 

2440 if task["status"] == "SCHEDULED": 

2441 if task_status_create: 

2442 new_status = task_status_create 

2443 target_update = "COPY_VIM_INFO" 

2444 else: 

2445 new_status, db_vim_info_update = self.item2class[ 

2446 task["item"] 

2447 ].new(ro_task, task_index, task_depends) 

2448 _update_refresh(new_status) 

2449 else: 

2450 refresh_at = ro_task["vim_info"]["refresh_at"] 

2451 if refresh_at and refresh_at != -1 and now > refresh_at: 

2452 ( 

2453 new_status, 

2454 db_vim_info_update, 

2455 ) = self.item2class[ 

2456 task["item"] 

2457 ].refresh(ro_task) 

2458 _update_refresh(new_status) 

2459 else: 

2460 # The refresh is updated to avoid set the value of "refresh_at" to 

2461 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD, 

2462 # because it can happen that in this case the task is never processed 

2463 _update_refresh(task["status"]) 

2464 

2465 except Exception as e: 

2466 new_status = "FAILED" 

2467 db_vim_info_update = { 

2468 "vim_status": "VIM_ERROR", 

2469 "vim_message": str(e), 

2470 } 

2471 

2472 if not isinstance( 

2473 e, (NsWorkerException, vimconn.VimConnException) 

2474 ): 

2475 self.logger.error( 

2476 "Unexpected exception at _delete_task task={}: {}".format( 

2477 task["task_id"], e 

2478 ), 

2479 exc_info=True, 

2480 ) 

2481 

2482 try: 

2483 if db_vim_info_update: 

2484 db_vim_update = db_vim_info_update.copy() 

2485 db_ro_task_update.update( 

2486 { 

2487 "vim_info." + k: v 

2488 for k, v in db_vim_info_update.items() 

2489 } 

2490 ) 

2491 ro_task["vim_info"].update(db_vim_info_update) 

2492 

2493 if new_status: 

2494 if task_action == "CREATE": 

2495 task_status_create = new_status 

2496 db_ro_task_update[task_path] = new_status 

2497 

2498 if target_update or db_vim_update: 

2499 if target_update == "DELETE": 

2500 self._update_target(task, None) 

2501 elif target_update == "COPY_VIM_INFO": 

2502 self._update_target(task, ro_task["vim_info"]) 

2503 else: 

2504 self._update_target(task, db_vim_update) 

2505 

2506 except Exception as e: 

2507 if ( 

2508 isinstance(e, DbException) 

2509 and e.http_code == HTTPStatus.NOT_FOUND 

2510 ): 

2511 # if the vnfrs or nsrs has been removed from database, this task must be removed 

2512 self.logger.debug( 

2513 "marking to delete task={}".format(task["task_id"]) 

2514 ) 

2515 self.tasks_to_delete.append(task) 

2516 else: 

2517 self.logger.error( 

2518 "Unexpected exception at _update_target task={}: {}".format( 

2519 task["task_id"], e 

2520 ), 

2521 exc_info=True, 

2522 ) 

2523 

2524 locked_at = ro_task["locked_at"] 

2525 

2526 if lock_object: 

2527 locked_at = [ 

2528 lock_object["locked_at"], 

2529 lock_object["locked_at"] + self.task_locked_time, 

2530 ] 

2531 # locked_at contains two times to avoid race condition. In case the lock has been renewed, it will 

2532 # contain exactly locked_at + self.task_locked_time 

2533 LockRenew.remove_lock_object(lock_object) 

2534 

2535 q_filter = { 

2536 "_id": ro_task["_id"], 

2537 "to_check_at": ro_task["to_check_at"], 

2538 "locked_at": locked_at, 

2539 } 

2540 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified, 

2541 # outside this task (by ro_nbi) do not update it 

2542 db_ro_task_update["locked_by"] = None 

2543 # locked_at converted to int only for debugging. When it is not decimals it means it has been unlocked 

2544 db_ro_task_update["locked_at"] = int(now - self.task_locked_time) 

2545 db_ro_task_update["modified_at"] = now 

2546 db_ro_task_update["to_check_at"] = next_check_at 

2547 

2548 """ 

2549 # Log RO tasks only when loglevel is DEBUG 

2550 if self.logger.getEffectiveLevel() == logging.DEBUG: 

2551 db_ro_task_update_log = db_ro_task_update.copy() 

2552 db_ro_task_update_log["_id"] = q_filter["_id"] 

2553 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK") 

2554 """ 

2555 

2556 if not self.db.set_one( 

2557 "ro_tasks", 

2558 update_dict=db_ro_task_update, 

2559 q_filter=q_filter, 

2560 fail_on_empty=False, 

2561 ): 

2562 del db_ro_task_update["to_check_at"] 

2563 del q_filter["to_check_at"] 

2564 """ 

2565 # Log RO tasks only when loglevel is DEBUG 

2566 if self.logger.getEffectiveLevel() == logging.DEBUG: 

2567 self._log_ro_task( 

2568 None, 

2569 db_ro_task_update_log, 

2570 None, 

2571 "TASK_WF", 

2572 "SET_TASK " + str(q_filter), 

2573 ) 

2574 """ 

2575 self.db.set_one( 

2576 "ro_tasks", 

2577 q_filter=q_filter, 

2578 update_dict=db_ro_task_update, 

2579 fail_on_empty=True, 

2580 ) 

2581 except DbException as e: 

2582 self.logger.error( 

2583 "ro_task={} Error updating database {}".format(ro_task_id, e) 

2584 ) 

2585 except Exception as e: 

2586 self.logger.error( 

2587 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True 

2588 ) 

2589 

2590 def _update_target(self, task, ro_vim_item_update): 

2591 table, _, temp = task["target_record"].partition(":") 

2592 _id, _, path_vim_status = temp.partition(":") 

2593 path_item = path_vim_status[: path_vim_status.rfind(".")] 

2594 path_item = path_item[: path_item.rfind(".")] 

2595 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id" 

2596 # path_item: dot separated list targeting record information, e.g. "vdur.10" 

2597 

2598 if ro_vim_item_update: 

2599 update_dict = { 

2600 path_vim_status + "." + k: v 

2601 for k, v in ro_vim_item_update.items() 

2602 if k 

2603 in ( 

2604 "vim_id", 

2605 "vim_details", 

2606 "vim_message", 

2607 "vim_name", 

2608 "vim_status", 

2609 "interfaces", 

2610 "interfaces_backup", 

2611 ) 

2612 } 

2613 

2614 if path_vim_status.startswith("vdur."): 

2615 # for backward compatibility, add vdur.name apart from vdur.vim_name 

2616 if ro_vim_item_update.get("vim_name"): 

2617 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"] 

2618 

2619 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id 

2620 if ro_vim_item_update.get("vim_id"): 

2621 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"] 

2622 

2623 # update general status 

2624 if ro_vim_item_update.get("vim_status"): 

2625 update_dict[path_item + ".status"] = ro_vim_item_update[ 

2626 "vim_status" 

2627 ] 

2628 

2629 if ro_vim_item_update.get("interfaces"): 

2630 path_interfaces = path_item + ".interfaces" 

2631 

2632 for i, iface in enumerate(ro_vim_item_update.get("interfaces")): 

2633 if iface: 

2634 update_dict.update( 

2635 { 

2636 path_interfaces + ".{}.".format(i) + k: v 

2637 for k, v in iface.items() 

2638 if k in ("vlan", "compute_node", "pci") 

2639 } 

2640 ) 

2641 

2642 # put ip_address and mac_address with ip-address and mac-address 

2643 if iface.get("ip_address"): 

2644 update_dict[ 

2645 path_interfaces + ".{}.".format(i) + "ip-address" 

2646 ] = iface["ip_address"] 

2647 

2648 if iface.get("mac_address"): 

2649 update_dict[ 

2650 path_interfaces + ".{}.".format(i) + "mac-address" 

2651 ] = iface["mac_address"] 

2652 

2653 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"): 

2654 update_dict["ip-address"] = iface.get("ip_address").split( 

2655 ";" 

2656 )[0] 

2657 

2658 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"): 

2659 update_dict[path_item + ".ip-address"] = iface.get( 

2660 "ip_address" 

2661 ).split(";")[0] 

2662 

2663 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict) 

2664 

2665 # If interfaces exists, it backups VDU interfaces in the DB for healing operations 

2666 if ro_vim_item_update.get("interfaces"): 

2667 search_key = path_vim_status + ".interfaces" 

2668 if update_dict.get(search_key): 

2669 interfaces_backup_update = { 

2670 path_vim_status + ".interfaces_backup": update_dict[search_key] 

2671 } 

2672 

2673 self.db.set_one( 

2674 table, 

2675 q_filter={"_id": _id}, 

2676 update_dict=interfaces_backup_update, 

2677 ) 

2678 

2679 else: 

2680 update_dict = {path_item + ".status": "DELETED"} 

2681 self.db.set_one( 

2682 table, 

2683 q_filter={"_id": _id}, 

2684 update_dict=update_dict, 

2685 unset={path_vim_status: None}, 

2686 ) 

2687 

2688 def _process_delete_db_tasks(self): 

2689 """ 

2690 Delete task from database because vnfrs or nsrs or both have been deleted 

2691 :return: None. Uses and modify self.tasks_to_delete 

2692 """ 

2693 while self.tasks_to_delete: 

2694 task = self.tasks_to_delete[0] 

2695 vnfrs_deleted = None 

2696 nsr_id = task["nsr_id"] 

2697 

2698 if task["target_record"].startswith("vnfrs:"): 

2699 # check if nsrs is present 

2700 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False): 

2701 vnfrs_deleted = task["target_record"].split(":")[1] 

2702 

2703 try: 

2704 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted) 

2705 except Exception as e: 

2706 self.logger.error( 

2707 "Error deleting task={}: {}".format(task["task_id"], e) 

2708 ) 

2709 self.tasks_to_delete.pop(0) 

2710 

2711 @staticmethod 

2712 def delete_db_tasks(db, nsr_id, vnfrs_deleted): 

2713 """ 

2714 Static method because it is called from osm_ng_ro.ns 

2715 :param db: instance of database to use 

2716 :param nsr_id: affected nsrs id 

2717 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id 

2718 :return: None, exception is fails 

2719 """ 

2720 retries = 5 

2721 for retry in range(retries): 

2722 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id}) 

2723 now = time.time() 

2724 conflict = False 

2725 

2726 for ro_task in ro_tasks: 

2727 db_update = {} 

2728 to_delete_ro_task = True 

2729 

2730 for index, task in enumerate(ro_task["tasks"]): 

2731 if not task: 

2732 pass 

2733 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or ( 

2734 vnfrs_deleted 

2735 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted) 

2736 ): 

2737 db_update["tasks.{}".format(index)] = None 

2738 else: 

2739 # used by other nsr, ro_task cannot be deleted 

2740 to_delete_ro_task = False 

2741 

2742 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed 

2743 if to_delete_ro_task: 

2744 if not db.del_one( 

2745 "ro_tasks", 

2746 q_filter={ 

2747 "_id": ro_task["_id"], 

2748 "modified_at": ro_task["modified_at"], 

2749 }, 

2750 fail_on_empty=False, 

2751 ): 

2752 conflict = True 

2753 elif db_update: 

2754 db_update["modified_at"] = now 

2755 if not db.set_one( 

2756 "ro_tasks", 

2757 q_filter={ 

2758 "_id": ro_task["_id"], 

2759 "modified_at": ro_task["modified_at"], 

2760 }, 

2761 update_dict=db_update, 

2762 fail_on_empty=False, 

2763 ): 

2764 conflict = True 

2765 if not conflict: 

2766 return 

2767 else: 

2768 raise NsWorkerException("Exceeded {} retries".format(retries)) 

2769 

2770 def run(self): 

2771 # load database 

2772 self.logger.info("Starting") 

2773 while True: 

2774 # step 1: get commands from queue 

2775 try: 

2776 if self.vim_targets: 

2777 task = self.task_queue.get(block=False) 

2778 else: 

2779 if not self.idle: 

2780 self.logger.debug("enters in idle state") 

2781 self.idle = True 

2782 task = self.task_queue.get(block=True) 

2783 self.idle = False 

2784 

2785 if task[0] == "terminate": 

2786 break 

2787 elif task[0] == "load_vim": 

2788 self.logger.info("order to load vim {}".format(task[1])) 

2789 self._load_vim(task[1]) 

2790 elif task[0] == "unload_vim": 

2791 self.logger.info("order to unload vim {}".format(task[1])) 

2792 self._unload_vim(task[1]) 

2793 elif task[0] == "reload_vim": 

2794 self._reload_vim(task[1]) 

2795 elif task[0] == "check_vim": 

2796 self.logger.info("order to check vim {}".format(task[1])) 

2797 self._check_vim(task[1]) 

2798 continue 

2799 except Exception as e: 

2800 if isinstance(e, queue.Empty): 

2801 pass 

2802 else: 

2803 self.logger.critical( 

2804 "Error processing task: {}".format(e), exc_info=True 

2805 ) 

2806 

2807 # step 2: process pending_tasks, delete not needed tasks 

2808 try: 

2809 if self.tasks_to_delete: 

2810 self._process_delete_db_tasks() 

2811 busy = False 

2812 """ 

2813 # Log RO tasks only when loglevel is DEBUG 

2814 if self.logger.getEffectiveLevel() == logging.DEBUG: 

2815 _ = self._get_db_all_tasks() 

2816 """ 

2817 ro_task = self._get_db_task() 

2818 if ro_task: 

2819 self.logger.debug("Task to process: {}".format(ro_task)) 

2820 time.sleep(1) 

2821 self._process_pending_tasks(ro_task) 

2822 busy = True 

2823 if not busy: 

2824 time.sleep(5) 

2825 except Exception as e: 

2826 self.logger.critical( 

2827 "Unexpected exception at run: " + str(e), exc_info=True 

2828 ) 

2829 

2830 self.logger.info("Finishing")