Coverage for NG-RO/osm_ng_ro/ns_thread.py: 35%

1258 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2024-06-04 15:25 +0000

1# -*- coding: utf-8 -*- 

2 

3## 

4# Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U. 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); you may 

7# not use this file except in compliance with the License. You may obtain 

8# a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

15# License for the specific language governing permissions and limitations 

16# under the License. 

17# 

18## 

19 

20""" 

21This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM. 

22The tasks are stored at database in table ro_tasks 

23A single ro_task refers to a VIM element (flavor, image, network, ...). 

24A ro_task can contain several 'tasks', each one with a target, where to store the results 

25""" 

26 

27from copy import deepcopy 

28from http import HTTPStatus 

29import logging 

30from os import makedirs 

31from os import path 

32import queue 

33import threading 

34import time 

35import traceback 

36from typing import Dict 

37from unittest.mock import Mock 

38 

39from importlib_metadata import entry_points 

40from osm_common.dbbase import DbException 

41from osm_ng_ro.vim_admin import LockRenew 

42from osm_ro_plugin import sdnconn 

43from osm_ro_plugin import vimconn 

44from osm_ro_plugin.sdn_dummy import SdnDummyConnector 

45from osm_ro_plugin.vim_dummy import VimDummyConnector 

46import yaml 

47 

48__author__ = "Alfonso Tierno" 

49__date__ = "$28-Sep-2017 12:07:15$" 

50 

51 

52def deep_get(target_dict, *args, **kwargs): 

53 """ 

54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None 

55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None 

56 :param target_dict: dictionary to be read 

57 :param args: list of keys to read from target_dict 

58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary 

59 :return: The wanted value if exists, None or default otherwise 

60 """ 

61 for key in args: 

62 if not isinstance(target_dict, dict) or key not in target_dict: 

63 return kwargs.get("default") 

64 target_dict = target_dict[key] 

65 return target_dict 

66 

67 

68class NsWorkerException(Exception): 

69 pass 

70 

71 

72class FailingConnector: 

73 def __init__(self, error_msg): 

74 self.error_msg = error_msg 

75 

76 for method in dir(vimconn.VimConnector): 

77 if method[0] != "_": 

78 setattr( 

79 self, method, Mock(side_effect=vimconn.VimConnException(error_msg)) 

80 ) 

81 

82 for method in dir(sdnconn.SdnConnectorBase): 

83 if method[0] != "_": 

84 setattr( 

85 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg)) 

86 ) 

87 

88 

89class NsWorkerExceptionNotFound(NsWorkerException): 

90 pass 

91 

92 

93class VimInteractionBase: 

94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ... 

95 It implements methods that does nothing and return ok""" 

96 

97 def __init__(self, db, my_vims, db_vims, logger): 

98 self.db = db 

99 self.logger = logger 

100 self.my_vims = my_vims 

101 self.db_vims = db_vims 

102 

103 def new(self, ro_task, task_index, task_depends): 

104 return "BUILD", {} 

105 

106 def refresh(self, ro_task): 

107 """skip calling VIM to get image, flavor status. Assumes ok""" 

108 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR": 

109 return "FAILED", {} 

110 

111 return "DONE", {} 

112 

113 def delete(self, ro_task, task_index): 

114 """skip calling VIM to delete image. Assumes ok""" 

115 return "DONE", {} 

116 

117 def exec(self, ro_task, task_index, task_depends): 

118 return "DONE", None, None 

119 

120 

121class VimInteractionNet(VimInteractionBase): 

122 def new(self, ro_task, task_index, task_depends): 

123 vim_net_id = None 

124 task = ro_task["tasks"][task_index] 

125 task_id = task["task_id"] 

126 created = False 

127 created_items = {} 

128 target_vim = self.my_vims[ro_task["target_id"]] 

129 mgmtnet = False 

130 mgmtnet_defined_in_vim = False 

131 

132 try: 

133 # FIND 

134 if task.get("find_params"): 

135 # if management, get configuration of VIM 

136 if task["find_params"].get("filter_dict"): 

137 vim_filter = task["find_params"]["filter_dict"] 

138 # management network 

139 elif task["find_params"].get("mgmt"): 

140 mgmtnet = True 

141 if deep_get( 

142 self.db_vims[ro_task["target_id"]], 

143 "config", 

144 "management_network_id", 

145 ): 

146 mgmtnet_defined_in_vim = True 

147 vim_filter = { 

148 "id": self.db_vims[ro_task["target_id"]]["config"][ 

149 "management_network_id" 

150 ] 

151 } 

152 elif deep_get( 

153 self.db_vims[ro_task["target_id"]], 

154 "config", 

155 "management_network_name", 

156 ): 

157 mgmtnet_defined_in_vim = True 

158 vim_filter = { 

159 "name": self.db_vims[ro_task["target_id"]]["config"][ 

160 "management_network_name" 

161 ] 

162 } 

163 else: 

164 vim_filter = {"name": task["find_params"]["name"]} 

165 else: 

166 raise NsWorkerExceptionNotFound( 

167 "Invalid find_params for new_net {}".format(task["find_params"]) 

168 ) 

169 

170 vim_nets = target_vim.get_network_list(vim_filter) 

171 if not vim_nets and not task.get("params"): 

172 # If there is mgmt-network in the descriptor, 

173 # there is no mapping of that network to a VIM network in the descriptor, 

174 # also there is no mapping in the "--config" parameter or at VIM creation; 

175 # that mgmt-network will be created. 

176 if mgmtnet and not mgmtnet_defined_in_vim: 

177 net_name = ( 

178 vim_filter.get("name") 

179 if vim_filter.get("name") 

180 else vim_filter.get("id")[:16] 

181 ) 

182 vim_net_id, created_items = target_vim.new_network( 

183 net_name, None 

184 ) 

185 self.logger.debug( 

186 "Created mgmt network vim_net_id: {}".format(vim_net_id) 

187 ) 

188 created = True 

189 else: 

190 raise NsWorkerExceptionNotFound( 

191 "Network not found with this criteria: '{}'".format( 

192 task.get("find_params") 

193 ) 

194 ) 

195 elif len(vim_nets) > 1: 

196 raise NsWorkerException( 

197 "More than one network found with this criteria: '{}'".format( 

198 task["find_params"] 

199 ) 

200 ) 

201 

202 if vim_nets: 

203 vim_net_id = vim_nets[0]["id"] 

204 else: 

205 # CREATE 

206 params = task["params"] 

207 vim_net_id, created_items = target_vim.new_network(**params) 

208 created = True 

209 

210 ro_vim_item_update = { 

211 "vim_id": vim_net_id, 

212 "vim_status": "BUILD", 

213 "created": created, 

214 "created_items": created_items, 

215 "vim_details": None, 

216 "vim_message": None, 

217 } 

218 self.logger.debug( 

219 "task={} {} new-net={} created={}".format( 

220 task_id, ro_task["target_id"], vim_net_id, created 

221 ) 

222 ) 

223 

224 return "BUILD", ro_vim_item_update 

225 except (vimconn.VimConnException, NsWorkerException) as e: 

226 self.logger.error( 

227 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e) 

228 ) 

229 ro_vim_item_update = { 

230 "vim_status": "VIM_ERROR", 

231 "created": created, 

232 "vim_message": str(e), 

233 } 

234 

235 return "FAILED", ro_vim_item_update 

236 

237 def refresh(self, ro_task): 

238 """Call VIM to get network status""" 

239 ro_task_id = ro_task["_id"] 

240 target_vim = self.my_vims[ro_task["target_id"]] 

241 vim_id = ro_task["vim_info"]["vim_id"] 

242 net_to_refresh_list = [vim_id] 

243 

244 try: 

245 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list) 

246 vim_info = vim_dict[vim_id] 

247 

248 if vim_info["status"] == "ACTIVE": 

249 task_status = "DONE" 

250 elif vim_info["status"] == "BUILD": 

251 task_status = "BUILD" 

252 else: 

253 task_status = "FAILED" 

254 except vimconn.VimConnException as e: 

255 # Mark all tasks at VIM_ERROR status 

256 self.logger.error( 

257 "ro_task={} vim={} get-net={}: {}".format( 

258 ro_task_id, ro_task["target_id"], vim_id, e 

259 ) 

260 ) 

261 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)} 

262 task_status = "FAILED" 

263 

264 ro_vim_item_update = {} 

265 if ro_task["vim_info"]["vim_status"] != vim_info["status"]: 

266 ro_vim_item_update["vim_status"] = vim_info["status"] 

267 

268 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"): 

269 ro_vim_item_update["vim_name"] = vim_info.get("name") 

270 

271 if vim_info["status"] in ("ERROR", "VIM_ERROR"): 

272 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"): 

273 ro_vim_item_update["vim_message"] = vim_info.get("error_msg") 

274 elif vim_info["status"] == "DELETED": 

275 ro_vim_item_update["vim_id"] = None 

276 ro_vim_item_update["vim_message"] = "Deleted externally" 

277 else: 

278 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]: 

279 ro_vim_item_update["vim_details"] = vim_info["vim_info"] 

280 

281 if ro_vim_item_update: 

282 self.logger.debug( 

283 "ro_task={} {} get-net={}: status={} {}".format( 

284 ro_task_id, 

285 ro_task["target_id"], 

286 vim_id, 

287 ro_vim_item_update.get("vim_status"), 

288 ro_vim_item_update.get("vim_message") 

289 if ro_vim_item_update.get("vim_status") != "ACTIVE" 

290 else "", 

291 ) 

292 ) 

293 

294 return task_status, ro_vim_item_update 

295 

296 def delete(self, ro_task, task_index): 

297 task = ro_task["tasks"][task_index] 

298 task_id = task["task_id"] 

299 net_vim_id = ro_task["vim_info"]["vim_id"] 

300 ro_vim_item_update_ok = { 

301 "vim_status": "DELETED", 

302 "created": False, 

303 "vim_message": "DELETED", 

304 "vim_id": None, 

305 } 

306 

307 try: 

308 if net_vim_id or ro_task["vim_info"]["created_items"]: 

309 target_vim = self.my_vims[ro_task["target_id"]] 

310 target_vim.delete_network( 

311 net_vim_id, ro_task["vim_info"]["created_items"] 

312 ) 

313 except vimconn.VimConnNotFoundException: 

314 ro_vim_item_update_ok["vim_message"] = "already deleted" 

315 except vimconn.VimConnException as e: 

316 self.logger.error( 

317 "ro_task={} vim={} del-net={}: {}".format( 

318 ro_task["_id"], ro_task["target_id"], net_vim_id, e 

319 ) 

320 ) 

321 ro_vim_item_update = { 

322 "vim_status": "VIM_ERROR", 

323 "vim_message": "Error while deleting: {}".format(e), 

324 } 

325 

326 return "FAILED", ro_vim_item_update 

327 

328 self.logger.debug( 

329 "task={} {} del-net={} {}".format( 

330 task_id, 

331 ro_task["target_id"], 

332 net_vim_id, 

333 ro_vim_item_update_ok.get("vim_message", ""), 

334 ) 

335 ) 

336 

337 return "DONE", ro_vim_item_update_ok 

338 

339 

340class VimInteractionVdu(VimInteractionBase): 

341 max_retries_inject_ssh_key = 20 # 20 times 

342 time_retries_inject_ssh_key = 30 # wevery 30 seconds 

343 

344 def new(self, ro_task, task_index, task_depends): 

345 task = ro_task["tasks"][task_index] 

346 task_id = task["task_id"] 

347 created = False 

348 target_vim = self.my_vims[ro_task["target_id"]] 

349 try: 

350 created = True 

351 params = task["params"] 

352 params_copy = deepcopy(params) 

353 net_list = params_copy["net_list"] 

354 

355 for net in net_list: 

356 # change task_id into network_id 

357 if "net_id" in net and net["net_id"].startswith("TASK-"): 

358 network_id = task_depends[net["net_id"]] 

359 

360 if not network_id: 

361 raise NsWorkerException( 

362 "Cannot create VM because depends on a network not created or found " 

363 "for {}".format(net["net_id"]) 

364 ) 

365 

366 net["net_id"] = network_id 

367 

368 if params_copy["image_id"].startswith("TASK-"): 

369 params_copy["image_id"] = task_depends[params_copy["image_id"]] 

370 

371 if params_copy["flavor_id"].startswith("TASK-"): 

372 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]] 

373 

374 affinity_group_list = params_copy["affinity_group_list"] 

375 for affinity_group in affinity_group_list: 

376 # change task_id into affinity_group_id 

377 if "affinity_group_id" in affinity_group and affinity_group[ 

378 "affinity_group_id" 

379 ].startswith("TASK-"): 

380 affinity_group_id = task_depends[ 

381 affinity_group["affinity_group_id"] 

382 ] 

383 

384 if not affinity_group_id: 

385 raise NsWorkerException( 

386 "found for {}".format(affinity_group["affinity_group_id"]) 

387 ) 

388 

389 affinity_group["affinity_group_id"] = affinity_group_id 

390 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy) 

391 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]] 

392 

393 # add to created items previous_created_volumes (healing) 

394 if task.get("previous_created_volumes"): 

395 for k, v in task["previous_created_volumes"].items(): 

396 created_items[k] = v 

397 

398 ro_vim_item_update = { 

399 "vim_id": vim_vm_id, 

400 "vim_status": "BUILD", 

401 "created": created, 

402 "created_items": created_items, 

403 "vim_details": None, 

404 "vim_message": None, 

405 "interfaces_vim_ids": interfaces, 

406 "interfaces": [], 

407 "interfaces_backup": [], 

408 } 

409 self.logger.debug( 

410 "task={} {} new-vm={} created={}".format( 

411 task_id, ro_task["target_id"], vim_vm_id, created 

412 ) 

413 ) 

414 

415 return "BUILD", ro_vim_item_update 

416 except (vimconn.VimConnException, NsWorkerException) as e: 

417 self.logger.debug(traceback.format_exc()) 

418 self.logger.error( 

419 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e) 

420 ) 

421 ro_vim_item_update = { 

422 "vim_status": "VIM_ERROR", 

423 "created": created, 

424 "vim_message": str(e), 

425 } 

426 

427 return "FAILED", ro_vim_item_update 

428 

429 def delete(self, ro_task, task_index): 

430 task = ro_task["tasks"][task_index] 

431 task_id = task["task_id"] 

432 vm_vim_id = ro_task["vim_info"]["vim_id"] 

433 ro_vim_item_update_ok = { 

434 "vim_status": "DELETED", 

435 "created": False, 

436 "vim_message": "DELETED", 

437 "vim_id": None, 

438 } 

439 

440 try: 

441 self.logger.debug( 

442 "delete_vminstance: vm_vim_id={} created_items={}".format( 

443 vm_vim_id, ro_task["vim_info"]["created_items"] 

444 ) 

445 ) 

446 if vm_vim_id or ro_task["vim_info"]["created_items"]: 

447 target_vim = self.my_vims[ro_task["target_id"]] 

448 target_vim.delete_vminstance( 

449 vm_vim_id, 

450 ro_task["vim_info"]["created_items"], 

451 ro_task["vim_info"].get("volumes_to_hold", []), 

452 ) 

453 except vimconn.VimConnNotFoundException: 

454 ro_vim_item_update_ok["vim_message"] = "already deleted" 

455 except vimconn.VimConnException as e: 

456 self.logger.error( 

457 "ro_task={} vim={} del-vm={}: {}".format( 

458 ro_task["_id"], ro_task["target_id"], vm_vim_id, e 

459 ) 

460 ) 

461 ro_vim_item_update = { 

462 "vim_status": "VIM_ERROR", 

463 "vim_message": "Error while deleting: {}".format(e), 

464 } 

465 

466 return "FAILED", ro_vim_item_update 

467 

468 self.logger.debug( 

469 "task={} {} del-vm={} {}".format( 

470 task_id, 

471 ro_task["target_id"], 

472 vm_vim_id, 

473 ro_vim_item_update_ok.get("vim_message", ""), 

474 ) 

475 ) 

476 

477 return "DONE", ro_vim_item_update_ok 

478 

479 def refresh(self, ro_task): 

480 """Call VIM to get vm status""" 

481 ro_task_id = ro_task["_id"] 

482 target_vim = self.my_vims[ro_task["target_id"]] 

483 vim_id = ro_task["vim_info"]["vim_id"] 

484 

485 if not vim_id: 

486 return None, None 

487 

488 vm_to_refresh_list = [vim_id] 

489 try: 

490 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list) 

491 vim_info = vim_dict[vim_id] 

492 

493 if vim_info["status"] == "ACTIVE": 

494 task_status = "DONE" 

495 elif vim_info["status"] == "BUILD": 

496 task_status = "BUILD" 

497 else: 

498 task_status = "FAILED" 

499 

500 # try to load and parse vim_information 

501 try: 

502 vim_info_info = yaml.safe_load(vim_info["vim_info"]) 

503 if vim_info_info.get("name"): 

504 vim_info["name"] = vim_info_info["name"] 

505 except Exception as vim_info_error: 

506 self.logger.exception( 

507 f"{vim_info_error} occured while getting the vim_info from yaml" 

508 ) 

509 except vimconn.VimConnException as e: 

510 # Mark all tasks at VIM_ERROR status 

511 self.logger.error( 

512 "ro_task={} vim={} get-vm={}: {}".format( 

513 ro_task_id, ro_task["target_id"], vim_id, e 

514 ) 

515 ) 

516 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)} 

517 task_status = "FAILED" 

518 

519 ro_vim_item_update = {} 

520 

521 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED 

522 vim_interfaces = [] 

523 if vim_info.get("interfaces"): 

524 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]: 

525 iface = next( 

526 ( 

527 iface 

528 for iface in vim_info["interfaces"] 

529 if vim_iface_id == iface["vim_interface_id"] 

530 ), 

531 None, 

532 ) 

533 # if iface: 

534 # iface.pop("vim_info", None) 

535 vim_interfaces.append(iface) 

536 

537 task_create = next( 

538 t 

539 for t in ro_task["tasks"] 

540 if t and t["action"] == "CREATE" and t["status"] != "FINISHED" 

541 ) 

542 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None: 

543 vim_interfaces[task_create["mgmt_vnf_interface"]][ 

544 "mgmt_vnf_interface" 

545 ] = True 

546 

547 mgmt_vdu_iface = task_create.get( 

548 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0) 

549 ) 

550 if vim_interfaces: 

551 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True 

552 

553 if ro_task["vim_info"]["interfaces"] != vim_interfaces: 

554 ro_vim_item_update["interfaces"] = vim_interfaces 

555 

556 if ro_task["vim_info"]["vim_status"] != vim_info["status"]: 

557 ro_vim_item_update["vim_status"] = vim_info["status"] 

558 

559 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"): 

560 ro_vim_item_update["vim_name"] = vim_info.get("name") 

561 

562 if vim_info["status"] in ("ERROR", "VIM_ERROR"): 

563 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"): 

564 ro_vim_item_update["vim_message"] = vim_info.get("error_msg") 

565 elif vim_info["status"] == "DELETED": 

566 ro_vim_item_update["vim_id"] = None 

567 ro_vim_item_update["vim_message"] = "Deleted externally" 

568 else: 

569 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]: 

570 ro_vim_item_update["vim_details"] = vim_info["vim_info"] 

571 

572 if ro_vim_item_update: 

573 self.logger.debug( 

574 "ro_task={} {} get-vm={}: status={} {}".format( 

575 ro_task_id, 

576 ro_task["target_id"], 

577 vim_id, 

578 ro_vim_item_update.get("vim_status"), 

579 ro_vim_item_update.get("vim_message") 

580 if ro_vim_item_update.get("vim_status") != "ACTIVE" 

581 else "", 

582 ) 

583 ) 

584 

585 return task_status, ro_vim_item_update 

586 

587 def exec(self, ro_task, task_index, task_depends): 

588 task = ro_task["tasks"][task_index] 

589 task_id = task["task_id"] 

590 target_vim = self.my_vims[ro_task["target_id"]] 

591 db_task_update = {"retries": 0} 

592 retries = task.get("retries", 0) 

593 

594 try: 

595 params = task["params"] 

596 params_copy = deepcopy(params) 

597 params_copy["ro_key"] = self.db.decrypt( 

598 params_copy.pop("private_key"), 

599 params_copy.pop("schema_version"), 

600 params_copy.pop("salt"), 

601 ) 

602 params_copy["ip_addr"] = params_copy.pop("ip_address") 

603 target_vim.inject_user_key(**params_copy) 

604 self.logger.debug( 

605 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"]) 

606 ) 

607 

608 return ( 

609 "DONE", 

610 None, 

611 db_task_update, 

612 ) # params_copy["key"] 

613 except (vimconn.VimConnException, NsWorkerException) as e: 

614 retries += 1 

615 

616 self.logger.debug(traceback.format_exc()) 

617 if retries < self.max_retries_inject_ssh_key: 

618 return ( 

619 "BUILD", 

620 None, 

621 { 

622 "retries": retries, 

623 "next_retry": self.time_retries_inject_ssh_key, 

624 }, 

625 ) 

626 

627 self.logger.error( 

628 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e) 

629 ) 

630 ro_vim_item_update = {"vim_message": str(e)} 

631 

632 return "FAILED", ro_vim_item_update, db_task_update 

633 

634 

635class VimInteractionImage(VimInteractionBase): 

636 def new(self, ro_task, task_index, task_depends): 

637 task = ro_task["tasks"][task_index] 

638 task_id = task["task_id"] 

639 created = False 

640 created_items = {} 

641 target_vim = self.my_vims[ro_task["target_id"]] 

642 

643 try: 

644 # FIND 

645 vim_image_id = "" 

646 if task.get("find_params"): 

647 vim_images = target_vim.get_image_list(**task["find_params"]) 

648 

649 if not vim_images: 

650 raise NsWorkerExceptionNotFound( 

651 "Image not found with this criteria: '{}'".format( 

652 task["find_params"] 

653 ) 

654 ) 

655 elif len(vim_images) > 1: 

656 raise NsWorkerException( 

657 "More than one image found with this criteria: '{}'".format( 

658 task["find_params"] 

659 ) 

660 ) 

661 else: 

662 vim_image_id = vim_images[0]["id"] 

663 

664 ro_vim_item_update = { 

665 "vim_id": vim_image_id, 

666 "vim_status": "ACTIVE", 

667 "created": created, 

668 "created_items": created_items, 

669 "vim_details": None, 

670 "vim_message": None, 

671 } 

672 self.logger.debug( 

673 "task={} {} new-image={} created={}".format( 

674 task_id, ro_task["target_id"], vim_image_id, created 

675 ) 

676 ) 

677 

678 return "DONE", ro_vim_item_update 

679 except (NsWorkerException, vimconn.VimConnException) as e: 

680 self.logger.error( 

681 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e) 

682 ) 

683 ro_vim_item_update = { 

684 "vim_status": "VIM_ERROR", 

685 "created": created, 

686 "vim_message": str(e), 

687 } 

688 

689 return "FAILED", ro_vim_item_update 

690 

691 

692class VimInteractionSharedVolume(VimInteractionBase): 

693 def delete(self, ro_task, task_index): 

694 task = ro_task["tasks"][task_index] 

695 task_id = task["task_id"] 

696 shared_volume_vim_id = ro_task["vim_info"]["vim_id"] 

697 created_items = ro_task["vim_info"]["created_items"] 

698 ro_vim_item_update_ok = { 

699 "vim_status": "DELETED", 

700 "created": False, 

701 "vim_message": "DELETED", 

702 "vim_id": None, 

703 } 

704 if created_items and created_items.get(shared_volume_vim_id).get("keep"): 

705 ro_vim_item_update_ok = { 

706 "vim_status": "ACTIVE", 

707 "created": False, 

708 "vim_message": None, 

709 } 

710 return "DONE", ro_vim_item_update_ok 

711 try: 

712 if shared_volume_vim_id: 

713 target_vim = self.my_vims[ro_task["target_id"]] 

714 target_vim.delete_shared_volumes(shared_volume_vim_id) 

715 except vimconn.VimConnNotFoundException: 

716 ro_vim_item_update_ok["vim_message"] = "already deleted" 

717 except vimconn.VimConnException as e: 

718 self.logger.error( 

719 "ro_task={} vim={} del-shared-volume={}: {}".format( 

720 ro_task["_id"], ro_task["target_id"], shared_volume_vim_id, e 

721 ) 

722 ) 

723 ro_vim_item_update = { 

724 "vim_status": "VIM_ERROR", 

725 "vim_message": "Error while deleting: {}".format(e), 

726 } 

727 

728 return "FAILED", ro_vim_item_update 

729 

730 self.logger.debug( 

731 "task={} {} del-shared-volume={} {}".format( 

732 task_id, 

733 ro_task["target_id"], 

734 shared_volume_vim_id, 

735 ro_vim_item_update_ok.get("vim_message", ""), 

736 ) 

737 ) 

738 

739 return "DONE", ro_vim_item_update_ok 

740 

741 def new(self, ro_task, task_index, task_depends): 

742 task = ro_task["tasks"][task_index] 

743 task_id = task["task_id"] 

744 created = False 

745 created_items = {} 

746 target_vim = self.my_vims[ro_task["target_id"]] 

747 

748 try: 

749 shared_volume_vim_id = None 

750 shared_volume_data = None 

751 

752 if task.get("params"): 

753 shared_volume_data = task["params"] 

754 

755 if shared_volume_data: 

756 self.logger.info( 

757 f"Creating the new shared_volume for {shared_volume_data}\n" 

758 ) 

759 ( 

760 shared_volume_name, 

761 shared_volume_vim_id, 

762 ) = target_vim.new_shared_volumes(shared_volume_data) 

763 created = True 

764 created_items[shared_volume_vim_id] = { 

765 "name": shared_volume_name, 

766 "keep": shared_volume_data.get("keep"), 

767 } 

768 

769 ro_vim_item_update = { 

770 "vim_id": shared_volume_vim_id, 

771 "vim_status": "ACTIVE", 

772 "created": created, 

773 "created_items": created_items, 

774 "vim_details": None, 

775 "vim_message": None, 

776 } 

777 self.logger.debug( 

778 "task={} {} new-shared-volume={} created={}".format( 

779 task_id, ro_task["target_id"], shared_volume_vim_id, created 

780 ) 

781 ) 

782 

783 return "DONE", ro_vim_item_update 

784 except (vimconn.VimConnException, NsWorkerException) as e: 

785 self.logger.error( 

786 "task={} vim={} new-shared-volume:" 

787 " {}".format(task_id, ro_task["target_id"], e) 

788 ) 

789 ro_vim_item_update = { 

790 "vim_status": "VIM_ERROR", 

791 "created": created, 

792 "vim_message": str(e), 

793 } 

794 

795 return "FAILED", ro_vim_item_update 

796 

797 

798class VimInteractionFlavor(VimInteractionBase): 

799 def delete(self, ro_task, task_index): 

800 task = ro_task["tasks"][task_index] 

801 task_id = task["task_id"] 

802 flavor_vim_id = ro_task["vim_info"]["vim_id"] 

803 ro_vim_item_update_ok = { 

804 "vim_status": "DELETED", 

805 "created": False, 

806 "vim_message": "DELETED", 

807 "vim_id": None, 

808 } 

809 

810 try: 

811 if flavor_vim_id: 

812 target_vim = self.my_vims[ro_task["target_id"]] 

813 target_vim.delete_flavor(flavor_vim_id) 

814 except vimconn.VimConnNotFoundException: 

815 ro_vim_item_update_ok["vim_message"] = "already deleted" 

816 except vimconn.VimConnException as e: 

817 self.logger.error( 

818 "ro_task={} vim={} del-flavor={}: {}".format( 

819 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e 

820 ) 

821 ) 

822 ro_vim_item_update = { 

823 "vim_status": "VIM_ERROR", 

824 "vim_message": "Error while deleting: {}".format(e), 

825 } 

826 

827 return "FAILED", ro_vim_item_update 

828 

829 self.logger.debug( 

830 "task={} {} del-flavor={} {}".format( 

831 task_id, 

832 ro_task["target_id"], 

833 flavor_vim_id, 

834 ro_vim_item_update_ok.get("vim_message", ""), 

835 ) 

836 ) 

837 

838 return "DONE", ro_vim_item_update_ok 

839 

840 def new(self, ro_task, task_index, task_depends): 

841 task = ro_task["tasks"][task_index] 

842 task_id = task["task_id"] 

843 created = False 

844 created_items = {} 

845 target_vim = self.my_vims[ro_task["target_id"]] 

846 try: 

847 # FIND 

848 vim_flavor_id = None 

849 

850 if task.get("find_params", {}).get("vim_flavor_id"): 

851 vim_flavor_id = task["find_params"]["vim_flavor_id"] 

852 elif task.get("find_params", {}).get("flavor_data"): 

853 try: 

854 flavor_data = task["find_params"]["flavor_data"] 

855 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data) 

856 except vimconn.VimConnNotFoundException as flavor_not_found_msg: 

857 self.logger.warning( 

858 f"VimConnNotFoundException occured: {flavor_not_found_msg}" 

859 ) 

860 

861 if not vim_flavor_id and task.get("params"): 

862 # CREATE 

863 flavor_data = task["params"]["flavor_data"] 

864 vim_flavor_id = target_vim.new_flavor(flavor_data) 

865 created = True 

866 

867 ro_vim_item_update = { 

868 "vim_id": vim_flavor_id, 

869 "vim_status": "ACTIVE", 

870 "created": created, 

871 "created_items": created_items, 

872 "vim_details": None, 

873 "vim_message": None, 

874 } 

875 self.logger.debug( 

876 "task={} {} new-flavor={} created={}".format( 

877 task_id, ro_task["target_id"], vim_flavor_id, created 

878 ) 

879 ) 

880 

881 return "DONE", ro_vim_item_update 

882 except (vimconn.VimConnException, NsWorkerException) as e: 

883 self.logger.error( 

884 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e) 

885 ) 

886 ro_vim_item_update = { 

887 "vim_status": "VIM_ERROR", 

888 "created": created, 

889 "vim_message": str(e), 

890 } 

891 

892 return "FAILED", ro_vim_item_update 

893 

894 

895class VimInteractionAffinityGroup(VimInteractionBase): 

896 def delete(self, ro_task, task_index): 

897 task = ro_task["tasks"][task_index] 

898 task_id = task["task_id"] 

899 affinity_group_vim_id = ro_task["vim_info"]["vim_id"] 

900 ro_vim_item_update_ok = { 

901 "vim_status": "DELETED", 

902 "created": False, 

903 "vim_message": "DELETED", 

904 "vim_id": None, 

905 } 

906 

907 try: 

908 if affinity_group_vim_id: 

909 target_vim = self.my_vims[ro_task["target_id"]] 

910 target_vim.delete_affinity_group(affinity_group_vim_id) 

911 except vimconn.VimConnNotFoundException: 

912 ro_vim_item_update_ok["vim_message"] = "already deleted" 

913 except vimconn.VimConnException as e: 

914 self.logger.error( 

915 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format( 

916 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e 

917 ) 

918 ) 

919 ro_vim_item_update = { 

920 "vim_status": "VIM_ERROR", 

921 "vim_message": "Error while deleting: {}".format(e), 

922 } 

923 

924 return "FAILED", ro_vim_item_update 

925 

926 self.logger.debug( 

927 "task={} {} del-affinity-or-anti-affinity-group={} {}".format( 

928 task_id, 

929 ro_task["target_id"], 

930 affinity_group_vim_id, 

931 ro_vim_item_update_ok.get("vim_message", ""), 

932 ) 

933 ) 

934 

935 return "DONE", ro_vim_item_update_ok 

936 

937 def new(self, ro_task, task_index, task_depends): 

938 task = ro_task["tasks"][task_index] 

939 task_id = task["task_id"] 

940 created = False 

941 created_items = {} 

942 target_vim = self.my_vims[ro_task["target_id"]] 

943 

944 try: 

945 affinity_group_vim_id = None 

946 affinity_group_data = None 

947 param_affinity_group_id = "" 

948 

949 if task.get("params"): 

950 affinity_group_data = task["params"].get("affinity_group_data") 

951 

952 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"): 

953 try: 

954 param_affinity_group_id = task["params"]["affinity_group_data"].get( 

955 "vim-affinity-group-id" 

956 ) 

957 affinity_group_vim_id = target_vim.get_affinity_group( 

958 param_affinity_group_id 

959 ).get("id") 

960 except vimconn.VimConnNotFoundException: 

961 self.logger.error( 

962 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}" 

963 "could not be found at VIM. Creating a new one.".format( 

964 task_id, ro_task["target_id"], param_affinity_group_id 

965 ) 

966 ) 

967 

968 if not affinity_group_vim_id and affinity_group_data: 

969 affinity_group_vim_id = target_vim.new_affinity_group( 

970 affinity_group_data 

971 ) 

972 created = True 

973 

974 ro_vim_item_update = { 

975 "vim_id": affinity_group_vim_id, 

976 "vim_status": "ACTIVE", 

977 "created": created, 

978 "created_items": created_items, 

979 "vim_details": None, 

980 "vim_message": None, 

981 } 

982 self.logger.debug( 

983 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format( 

984 task_id, ro_task["target_id"], affinity_group_vim_id, created 

985 ) 

986 ) 

987 

988 return "DONE", ro_vim_item_update 

989 except (vimconn.VimConnException, NsWorkerException) as e: 

990 self.logger.error( 

991 "task={} vim={} new-affinity-or-anti-affinity-group:" 

992 " {}".format(task_id, ro_task["target_id"], e) 

993 ) 

994 ro_vim_item_update = { 

995 "vim_status": "VIM_ERROR", 

996 "created": created, 

997 "vim_message": str(e), 

998 } 

999 

1000 return "FAILED", ro_vim_item_update 

1001 

1002 

1003class VimInteractionUpdateVdu(VimInteractionBase): 

1004 def exec(self, ro_task, task_index, task_depends): 

1005 task = ro_task["tasks"][task_index] 

1006 task_id = task["task_id"] 

1007 db_task_update = {"retries": 0} 

1008 created = False 

1009 created_items = {} 

1010 target_vim = self.my_vims[ro_task["target_id"]] 

1011 

1012 try: 

1013 vim_vm_id = "" 

1014 if task.get("params"): 

1015 vim_vm_id = task["params"].get("vim_vm_id") 

1016 action = task["params"].get("action") 

1017 context = {action: action} 

1018 target_vim.action_vminstance(vim_vm_id, context) 

1019 # created = True 

1020 ro_vim_item_update = { 

1021 "vim_id": vim_vm_id, 

1022 "vim_status": "ACTIVE", 

1023 "created": created, 

1024 "created_items": created_items, 

1025 "vim_details": None, 

1026 "vim_message": None, 

1027 } 

1028 self.logger.debug( 

1029 "task={} {} vm-migration done".format(task_id, ro_task["target_id"]) 

1030 ) 

1031 return "DONE", ro_vim_item_update, db_task_update 

1032 except (vimconn.VimConnException, NsWorkerException) as e: 

1033 self.logger.error( 

1034 "task={} vim={} VM Migration:" 

1035 " {}".format(task_id, ro_task["target_id"], e) 

1036 ) 

1037 ro_vim_item_update = { 

1038 "vim_status": "VIM_ERROR", 

1039 "created": created, 

1040 "vim_message": str(e), 

1041 } 

1042 

1043 return "FAILED", ro_vim_item_update, db_task_update 

1044 

1045 

1046class VimInteractionSdnNet(VimInteractionBase): 

1047 @staticmethod 

1048 def _match_pci(port_pci, mapping): 

1049 """ 

1050 Check if port_pci matches with mapping. 

1051 The mapping can have brackets to indicate that several chars are accepted. e.g 

1052 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]' 

1053 :param port_pci: text 

1054 :param mapping: text, can contain brackets to indicate several chars are available 

1055 :return: True if matches, False otherwise 

1056 """ 

1057 if not port_pci or not mapping: 

1058 return False 

1059 if port_pci == mapping: 

1060 return True 

1061 

1062 mapping_index = 0 

1063 pci_index = 0 

1064 while True: 

1065 bracket_start = mapping.find("[", mapping_index) 

1066 

1067 if bracket_start == -1: 

1068 break 

1069 

1070 bracket_end = mapping.find("]", bracket_start) 

1071 if bracket_end == -1: 

1072 break 

1073 

1074 length = bracket_start - mapping_index 

1075 if ( 

1076 length 

1077 and port_pci[pci_index : pci_index + length] 

1078 != mapping[mapping_index:bracket_start] 

1079 ): 

1080 return False 

1081 

1082 if ( 

1083 port_pci[pci_index + length] 

1084 not in mapping[bracket_start + 1 : bracket_end] 

1085 ): 

1086 return False 

1087 

1088 pci_index += length + 1 

1089 mapping_index = bracket_end + 1 

1090 

1091 if port_pci[pci_index:] != mapping[mapping_index:]: 

1092 return False 

1093 

1094 return True 

1095 

1096 def _get_interfaces(self, vlds_to_connect, vim_account_id): 

1097 """ 

1098 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id> 

1099 :param vim_account_id: 

1100 :return: 

1101 """ 

1102 interfaces = [] 

1103 

1104 for vld in vlds_to_connect: 

1105 table, _, db_id = vld.partition(":") 

1106 db_id, _, vld = db_id.partition(":") 

1107 _, _, vld_id = vld.partition(".") 

1108 

1109 if table == "vnfrs": 

1110 q_filter = {"vim-account-id": vim_account_id, "_id": db_id} 

1111 iface_key = "vnf-vld-id" 

1112 else: # table == "nsrs" 

1113 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id} 

1114 iface_key = "ns-vld-id" 

1115 

1116 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter) 

1117 

1118 for db_vnfr in db_vnfrs: 

1119 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())): 

1120 for iface_index, interface in enumerate(vdur["interfaces"]): 

1121 if interface.get(iface_key) == vld_id and interface.get( 

1122 "type" 

1123 ) in ("SR-IOV", "PCI-PASSTHROUGH"): 

1124 # only SR-IOV o PT 

1125 interface_ = interface.copy() 

1126 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format( 

1127 db_vnfr["_id"], vdu_index, iface_index 

1128 ) 

1129 

1130 if vdur.get("status") == "ERROR": 

1131 interface_["status"] = "ERROR" 

1132 

1133 interfaces.append(interface_) 

1134 

1135 return interfaces 

1136 

1137 def refresh(self, ro_task): 

1138 # look for task create 

1139 task_create_index, _ = next( 

1140 i_t 

1141 for i_t in enumerate(ro_task["tasks"]) 

1142 if i_t[1] 

1143 and i_t[1]["action"] == "CREATE" 

1144 and i_t[1]["status"] != "FINISHED" 

1145 ) 

1146 

1147 return self.new(ro_task, task_create_index, None) 

1148 

1149 def new(self, ro_task, task_index, task_depends): 

1150 task = ro_task["tasks"][task_index] 

1151 task_id = task["task_id"] 

1152 target_vim = self.my_vims[ro_task["target_id"]] 

1153 

1154 sdn_net_id = ro_task["vim_info"]["vim_id"] 

1155 

1156 created_items = ro_task["vim_info"].get("created_items") 

1157 connected_ports = ro_task["vim_info"].get("connected_ports", []) 

1158 new_connected_ports = [] 

1159 last_update = ro_task["vim_info"].get("last_update", 0) 

1160 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD" 

1161 error_list = [] 

1162 created = ro_task["vim_info"].get("created", False) 

1163 

1164 try: 

1165 # CREATE 

1166 db_vim = {} 

1167 params = task["params"] 

1168 vlds_to_connect = params.get("vlds", []) 

1169 associated_vim = params.get("target_vim") 

1170 # external additional ports 

1171 additional_ports = params.get("sdn-ports") or () 

1172 _, _, vim_account_id = ( 

1173 (None, None, None) 

1174 if associated_vim is None 

1175 else associated_vim.partition(":") 

1176 ) 

1177 

1178 if associated_vim: 

1179 # get associated VIM 

1180 if associated_vim not in self.db_vims: 

1181 self.db_vims[associated_vim] = self.db.get_one( 

1182 "vim_accounts", {"_id": vim_account_id} 

1183 ) 

1184 

1185 db_vim = self.db_vims[associated_vim] 

1186 

1187 # look for ports to connect 

1188 ports = self._get_interfaces(vlds_to_connect, vim_account_id) 

1189 # print(ports) 

1190 

1191 sdn_ports = [] 

1192 pending_ports = error_ports = 0 

1193 vlan_used = None 

1194 sdn_need_update = False 

1195 

1196 for port in ports: 

1197 vlan_used = port.get("vlan") or vlan_used 

1198 

1199 # TODO. Do not connect if already done 

1200 if not port.get("compute_node") or not port.get("pci"): 

1201 if port.get("status") == "ERROR": 

1202 error_ports += 1 

1203 else: 

1204 pending_ports += 1 

1205 continue 

1206 

1207 pmap = None 

1208 compute_node_mappings = next( 

1209 ( 

1210 c 

1211 for c in db_vim["config"].get("sdn-port-mapping", ()) 

1212 if c and c["compute_node"] == port["compute_node"] 

1213 ), 

1214 None, 

1215 ) 

1216 

1217 if compute_node_mappings: 

1218 # process port_mapping pci of type 0000:af:1[01].[1357] 

1219 pmap = next( 

1220 ( 

1221 p 

1222 for p in compute_node_mappings["ports"] 

1223 if self._match_pci(port["pci"], p.get("pci")) 

1224 ), 

1225 None, 

1226 ) 

1227 

1228 if not pmap: 

1229 if not db_vim["config"].get("mapping_not_needed"): 

1230 error_list.append( 

1231 "Port mapping not found for compute_node={} pci={}".format( 

1232 port["compute_node"], port["pci"] 

1233 ) 

1234 ) 

1235 continue 

1236 

1237 pmap = {} 

1238 

1239 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"]) 

1240 new_port = { 

1241 "service_endpoint_id": pmap.get("service_endpoint_id") 

1242 or service_endpoint_id, 

1243 "service_endpoint_encapsulation_type": "dot1q" 

1244 if port["type"] == "SR-IOV" 

1245 else None, 

1246 "service_endpoint_encapsulation_info": { 

1247 "vlan": port.get("vlan"), 

1248 "mac": port.get("mac-address"), 

1249 "device_id": pmap.get("device_id") or port["compute_node"], 

1250 "device_interface_id": pmap.get("device_interface_id") 

1251 or port["pci"], 

1252 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"), 

1253 "switch_port": pmap.get("switch_port"), 

1254 "service_mapping_info": pmap.get("service_mapping_info"), 

1255 }, 

1256 } 

1257 

1258 # TODO 

1259 # if port["modified_at"] > last_update: 

1260 # sdn_need_update = True 

1261 new_connected_ports.append(port["id"]) # TODO 

1262 sdn_ports.append(new_port) 

1263 

1264 if error_ports: 

1265 error_list.append( 

1266 "{} interfaces have not been created as VDU is on ERROR status".format( 

1267 error_ports 

1268 ) 

1269 ) 

1270 

1271 # connect external ports 

1272 for index, additional_port in enumerate(additional_ports): 

1273 additional_port_id = additional_port.get( 

1274 "service_endpoint_id" 

1275 ) or "external-{}".format(index) 

1276 sdn_ports.append( 

1277 { 

1278 "service_endpoint_id": additional_port_id, 

1279 "service_endpoint_encapsulation_type": additional_port.get( 

1280 "service_endpoint_encapsulation_type", "dot1q" 

1281 ), 

1282 "service_endpoint_encapsulation_info": { 

1283 "vlan": additional_port.get("vlan") or vlan_used, 

1284 "mac": additional_port.get("mac_address"), 

1285 "device_id": additional_port.get("device_id"), 

1286 "device_interface_id": additional_port.get( 

1287 "device_interface_id" 

1288 ), 

1289 "switch_dpid": additional_port.get("switch_dpid") 

1290 or additional_port.get("switch_id"), 

1291 "switch_port": additional_port.get("switch_port"), 

1292 "service_mapping_info": additional_port.get( 

1293 "service_mapping_info" 

1294 ), 

1295 }, 

1296 } 

1297 ) 

1298 new_connected_ports.append(additional_port_id) 

1299 sdn_info = "" 

1300 

1301 # if there are more ports to connect or they have been modified, call create/update 

1302 if error_list: 

1303 sdn_status = "ERROR" 

1304 sdn_info = "; ".join(error_list) 

1305 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update: 

1306 last_update = time.time() 

1307 

1308 if not sdn_net_id: 

1309 if len(sdn_ports) < 2: 

1310 sdn_status = "ACTIVE" 

1311 

1312 if not pending_ports: 

1313 self.logger.debug( 

1314 "task={} {} new-sdn-net done, less than 2 ports".format( 

1315 task_id, ro_task["target_id"] 

1316 ) 

1317 ) 

1318 else: 

1319 net_type = params.get("type") or "ELAN" 

1320 ( 

1321 sdn_net_id, 

1322 created_items, 

1323 ) = target_vim.create_connectivity_service(net_type, sdn_ports) 

1324 created = True 

1325 self.logger.debug( 

1326 "task={} {} new-sdn-net={} created={}".format( 

1327 task_id, ro_task["target_id"], sdn_net_id, created 

1328 ) 

1329 ) 

1330 else: 

1331 created_items = target_vim.edit_connectivity_service( 

1332 sdn_net_id, conn_info=created_items, connection_points=sdn_ports 

1333 ) 

1334 created = True 

1335 self.logger.debug( 

1336 "task={} {} update-sdn-net={} created={}".format( 

1337 task_id, ro_task["target_id"], sdn_net_id, created 

1338 ) 

1339 ) 

1340 

1341 connected_ports = new_connected_ports 

1342 elif sdn_net_id: 

1343 wim_status_dict = target_vim.get_connectivity_service_status( 

1344 sdn_net_id, conn_info=created_items 

1345 ) 

1346 sdn_status = wim_status_dict["sdn_status"] 

1347 

1348 if wim_status_dict.get("sdn_info"): 

1349 sdn_info = str(wim_status_dict.get("sdn_info")) or "" 

1350 

1351 if wim_status_dict.get("error_msg"): 

1352 sdn_info = wim_status_dict.get("error_msg") or "" 

1353 

1354 if pending_ports: 

1355 if sdn_status != "ERROR": 

1356 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format( 

1357 len(ports) - pending_ports, len(ports) 

1358 ) 

1359 

1360 if sdn_status == "ACTIVE": 

1361 sdn_status = "BUILD" 

1362 

1363 ro_vim_item_update = { 

1364 "vim_id": sdn_net_id, 

1365 "vim_status": sdn_status, 

1366 "created": created, 

1367 "created_items": created_items, 

1368 "connected_ports": connected_ports, 

1369 "vim_details": sdn_info, 

1370 "vim_message": None, 

1371 "last_update": last_update, 

1372 } 

1373 

1374 return sdn_status, ro_vim_item_update 

1375 except Exception as e: 

1376 self.logger.error( 

1377 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e), 

1378 exc_info=not isinstance( 

1379 e, (sdnconn.SdnConnectorError, vimconn.VimConnException) 

1380 ), 

1381 ) 

1382 ro_vim_item_update = { 

1383 "vim_status": "VIM_ERROR", 

1384 "created": created, 

1385 "vim_message": str(e), 

1386 } 

1387 

1388 return "FAILED", ro_vim_item_update 

1389 

1390 def delete(self, ro_task, task_index): 

1391 task = ro_task["tasks"][task_index] 

1392 task_id = task["task_id"] 

1393 sdn_vim_id = ro_task["vim_info"].get("vim_id") 

1394 ro_vim_item_update_ok = { 

1395 "vim_status": "DELETED", 

1396 "created": False, 

1397 "vim_message": "DELETED", 

1398 "vim_id": None, 

1399 } 

1400 

1401 try: 

1402 if sdn_vim_id: 

1403 target_vim = self.my_vims[ro_task["target_id"]] 

1404 target_vim.delete_connectivity_service( 

1405 sdn_vim_id, ro_task["vim_info"].get("created_items") 

1406 ) 

1407 

1408 except Exception as e: 

1409 if ( 

1410 isinstance(e, sdnconn.SdnConnectorError) 

1411 and e.http_code == HTTPStatus.NOT_FOUND.value 

1412 ): 

1413 ro_vim_item_update_ok["vim_message"] = "already deleted" 

1414 else: 

1415 self.logger.error( 

1416 "ro_task={} vim={} del-sdn-net={}: {}".format( 

1417 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e 

1418 ), 

1419 exc_info=not isinstance( 

1420 e, (sdnconn.SdnConnectorError, vimconn.VimConnException) 

1421 ), 

1422 ) 

1423 ro_vim_item_update = { 

1424 "vim_status": "VIM_ERROR", 

1425 "vim_message": "Error while deleting: {}".format(e), 

1426 } 

1427 

1428 return "FAILED", ro_vim_item_update 

1429 

1430 self.logger.debug( 

1431 "task={} {} del-sdn-net={} {}".format( 

1432 task_id, 

1433 ro_task["target_id"], 

1434 sdn_vim_id, 

1435 ro_vim_item_update_ok.get("vim_message", ""), 

1436 ) 

1437 ) 

1438 

1439 return "DONE", ro_vim_item_update_ok 

1440 

1441 

1442class VimInteractionMigration(VimInteractionBase): 

1443 def exec(self, ro_task, task_index, task_depends): 

1444 task = ro_task["tasks"][task_index] 

1445 task_id = task["task_id"] 

1446 db_task_update = {"retries": 0} 

1447 target_vim = self.my_vims[ro_task["target_id"]] 

1448 vim_interfaces = [] 

1449 created = False 

1450 created_items = {} 

1451 refreshed_vim_info = {} 

1452 

1453 try: 

1454 vim_vm_id = "" 

1455 if task.get("params"): 

1456 vim_vm_id = task["params"].get("vim_vm_id") 

1457 migrate_host = task["params"].get("migrate_host") 

1458 _, migrated_compute_node = target_vim.migrate_instance( 

1459 vim_vm_id, migrate_host 

1460 ) 

1461 

1462 if migrated_compute_node: 

1463 # When VM is migrated, vdu["vim_info"] needs to be updated 

1464 vdu_old_vim_info = task["params"]["vdu_vim_info"].get( 

1465 ro_task["target_id"] 

1466 ) 

1467 

1468 # Refresh VM to get new vim_info 

1469 vm_to_refresh_list = [vim_vm_id] 

1470 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list) 

1471 refreshed_vim_info = vim_dict[vim_vm_id] 

1472 

1473 if refreshed_vim_info.get("interfaces"): 

1474 for old_iface in vdu_old_vim_info.get("interfaces"): 

1475 iface = next( 

1476 ( 

1477 iface 

1478 for iface in refreshed_vim_info["interfaces"] 

1479 if old_iface["vim_interface_id"] 

1480 == iface["vim_interface_id"] 

1481 ), 

1482 None, 

1483 ) 

1484 vim_interfaces.append(iface) 

1485 

1486 ro_vim_item_update = { 

1487 "vim_id": vim_vm_id, 

1488 "vim_status": "ACTIVE", 

1489 "created": created, 

1490 "created_items": created_items, 

1491 "vim_details": None, 

1492 "vim_message": None, 

1493 } 

1494 

1495 if refreshed_vim_info and refreshed_vim_info.get("status") not in ( 

1496 "ERROR", 

1497 "VIM_ERROR", 

1498 ): 

1499 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"] 

1500 

1501 if vim_interfaces: 

1502 ro_vim_item_update["interfaces"] = vim_interfaces 

1503 

1504 self.logger.debug( 

1505 "task={} {} vm-migration done".format(task_id, ro_task["target_id"]) 

1506 ) 

1507 

1508 return "DONE", ro_vim_item_update, db_task_update 

1509 

1510 except (vimconn.VimConnException, NsWorkerException) as e: 

1511 self.logger.error( 

1512 "task={} vim={} VM Migration:" 

1513 " {}".format(task_id, ro_task["target_id"], e) 

1514 ) 

1515 ro_vim_item_update = { 

1516 "vim_status": "VIM_ERROR", 

1517 "created": created, 

1518 "vim_message": str(e), 

1519 } 

1520 

1521 return "FAILED", ro_vim_item_update, db_task_update 

1522 

1523 

1524class VimInteractionResize(VimInteractionBase): 

1525 def exec(self, ro_task, task_index, task_depends): 

1526 task = ro_task["tasks"][task_index] 

1527 task_id = task["task_id"] 

1528 db_task_update = {"retries": 0} 

1529 created = False 

1530 target_flavor_uuid = None 

1531 created_items = {} 

1532 refreshed_vim_info = {} 

1533 target_vim = self.my_vims[ro_task["target_id"]] 

1534 

1535 try: 

1536 vim_vm_id = "" 

1537 if task.get("params"): 

1538 vim_vm_id = task["params"].get("vim_vm_id") 

1539 flavor_dict = task["params"].get("flavor_dict") 

1540 self.logger.info("flavor_dict %s", flavor_dict) 

1541 

1542 try: 

1543 target_flavor_uuid = target_vim.get_flavor_id_from_data(flavor_dict) 

1544 except Exception as e: 

1545 self.logger.info("Cannot find any flavor matching %s.", str(e)) 

1546 try: 

1547 target_flavor_uuid = target_vim.new_flavor(flavor_dict) 

1548 except Exception as e: 

1549 self.logger.error("Error creating flavor at VIM %s.", str(e)) 

1550 

1551 if target_flavor_uuid is not None: 

1552 resized_status = target_vim.resize_instance( 

1553 vim_vm_id, target_flavor_uuid 

1554 ) 

1555 

1556 if resized_status: 

1557 # Refresh VM to get new vim_info 

1558 vm_to_refresh_list = [vim_vm_id] 

1559 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list) 

1560 refreshed_vim_info = vim_dict[vim_vm_id] 

1561 

1562 ro_vim_item_update = { 

1563 "vim_id": vim_vm_id, 

1564 "vim_status": "ACTIVE", 

1565 "created": created, 

1566 "created_items": created_items, 

1567 "vim_details": None, 

1568 "vim_message": None, 

1569 } 

1570 

1571 if refreshed_vim_info and refreshed_vim_info.get("status") not in ( 

1572 "ERROR", 

1573 "VIM_ERROR", 

1574 ): 

1575 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"] 

1576 

1577 self.logger.debug( 

1578 "task={} {} resize done".format(task_id, ro_task["target_id"]) 

1579 ) 

1580 return "DONE", ro_vim_item_update, db_task_update 

1581 except (vimconn.VimConnException, NsWorkerException) as e: 

1582 self.logger.error( 

1583 "task={} vim={} Resize:" " {}".format(task_id, ro_task["target_id"], e) 

1584 ) 

1585 ro_vim_item_update = { 

1586 "vim_status": "VIM_ERROR", 

1587 "created": created, 

1588 "vim_message": str(e), 

1589 } 

1590 

1591 return "FAILED", ro_vim_item_update, db_task_update 

1592 

1593 

1594class ConfigValidate: 

1595 def __init__(self, config: Dict): 

1596 self.conf = config 

1597 

1598 @property 

1599 def active(self): 

1600 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks 

1601 if ( 

1602 self.conf["period"]["refresh_active"] >= 60 

1603 or self.conf["period"]["refresh_active"] == -1 

1604 ): 

1605 return self.conf["period"]["refresh_active"] 

1606 

1607 return 60 

1608 

1609 @property 

1610 def build(self): 

1611 return self.conf["period"]["refresh_build"] 

1612 

1613 @property 

1614 def image(self): 

1615 return self.conf["period"]["refresh_image"] 

1616 

1617 @property 

1618 def error(self): 

1619 return self.conf["period"]["refresh_error"] 

1620 

1621 @property 

1622 def queue_size(self): 

1623 return self.conf["period"]["queue_size"] 

1624 

1625 

1626class NsWorker(threading.Thread): 

1627 def __init__(self, worker_index, config, plugins, db): 

1628 """ 

1629 :param worker_index: thread index 

1630 :param config: general configuration of RO, among others the process_id with the docker id where it runs 

1631 :param plugins: global shared dict with the loaded plugins 

1632 :param db: database class instance to use 

1633 """ 

1634 threading.Thread.__init__(self) 

1635 self.config = config 

1636 self.plugins = plugins 

1637 self.plugin_name = "unknown" 

1638 self.logger = logging.getLogger("ro.worker{}".format(worker_index)) 

1639 self.worker_index = worker_index 

1640 # refresh periods for created items 

1641 self.refresh_config = ConfigValidate(config) 

1642 self.task_queue = queue.Queue(self.refresh_config.queue_size) 

1643 # targetvim: vimplugin class 

1644 self.my_vims = {} 

1645 # targetvim: vim information from database 

1646 self.db_vims = {} 

1647 # targetvim list 

1648 self.vim_targets = [] 

1649 self.my_id = config["process_id"] + ":" + str(worker_index) 

1650 self.db = db 

1651 self.item2class = { 

1652 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger), 

1653 "shared-volumes": VimInteractionSharedVolume( 

1654 self.db, self.my_vims, self.db_vims, self.logger 

1655 ), 

1656 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger), 

1657 "image": VimInteractionImage( 

1658 self.db, self.my_vims, self.db_vims, self.logger 

1659 ), 

1660 "flavor": VimInteractionFlavor( 

1661 self.db, self.my_vims, self.db_vims, self.logger 

1662 ), 

1663 "sdn_net": VimInteractionSdnNet( 

1664 self.db, self.my_vims, self.db_vims, self.logger 

1665 ), 

1666 "update": VimInteractionUpdateVdu( 

1667 self.db, self.my_vims, self.db_vims, self.logger 

1668 ), 

1669 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup( 

1670 self.db, self.my_vims, self.db_vims, self.logger 

1671 ), 

1672 "migrate": VimInteractionMigration( 

1673 self.db, self.my_vims, self.db_vims, self.logger 

1674 ), 

1675 "verticalscale": VimInteractionResize( 

1676 self.db, self.my_vims, self.db_vims, self.logger 

1677 ), 

1678 } 

1679 self.time_last_task_processed = None 

1680 # lists of tasks to delete because nsrs or vnfrs has been deleted from db 

1681 self.tasks_to_delete = [] 

1682 # it is idle when there are not vim_targets associated 

1683 self.idle = True 

1684 self.task_locked_time = config["global"]["task_locked_time"] 

1685 

1686 def insert_task(self, task): 

1687 try: 

1688 self.task_queue.put(task, False) 

1689 return None 

1690 except queue.Full: 

1691 raise NsWorkerException("timeout inserting a task") 

1692 

1693 def terminate(self): 

1694 self.insert_task("exit") 

1695 

1696 def del_task(self, task): 

1697 with self.task_lock: 

1698 if task["status"] == "SCHEDULED": 

1699 task["status"] = "SUPERSEDED" 

1700 return True 

1701 else: # task["status"] == "processing" 

1702 self.task_lock.release() 

1703 return False 

1704 

1705 def _process_vim_config(self, target_id: str, db_vim: dict) -> None: 

1706 """ 

1707 Process vim config, creating vim configuration files as ca_cert 

1708 :param target_id: vim/sdn/wim + id 

1709 :param db_vim: Vim dictionary obtained from database 

1710 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files 

1711 """ 

1712 if not db_vim.get("config"): 

1713 return 

1714 

1715 file_name = "" 

1716 work_dir = "/app/osm_ro/certs" 

1717 

1718 try: 

1719 if db_vim["config"].get("ca_cert_content"): 

1720 file_name = f"{work_dir}/{target_id}:{self.worker_index}" 

1721 

1722 if not path.isdir(file_name): 

1723 makedirs(file_name) 

1724 

1725 file_name = file_name + "/ca_cert" 

1726 

1727 with open(file_name, "w") as f: 

1728 f.write(db_vim["config"]["ca_cert_content"]) 

1729 del db_vim["config"]["ca_cert_content"] 

1730 db_vim["config"]["ca_cert"] = file_name 

1731 except Exception as e: 

1732 raise NsWorkerException( 

1733 "Error writing to file '{}': {}".format(file_name, e) 

1734 ) 

1735 

1736 def _load_plugin(self, name, type="vim"): 

1737 # type can be vim or sdn 

1738 if "rovim_dummy" not in self.plugins: 

1739 self.plugins["rovim_dummy"] = VimDummyConnector 

1740 

1741 if "rosdn_dummy" not in self.plugins: 

1742 self.plugins["rosdn_dummy"] = SdnDummyConnector 

1743 

1744 if name in self.plugins: 

1745 return self.plugins[name] 

1746 

1747 try: 

1748 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name): 

1749 self.plugins[name] = ep.load() 

1750 except Exception as e: 

1751 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e)) 

1752 

1753 if name and name not in self.plugins: 

1754 raise NsWorkerException( 

1755 "Plugin 'osm_{n}' has not been installed".format(n=name) 

1756 ) 

1757 

1758 return self.plugins[name] 

1759 

1760 def _unload_vim(self, target_id): 

1761 """ 

1762 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list 

1763 :param target_id: Contains type:_id; where type can be 'vim', ... 

1764 :return: None. 

1765 """ 

1766 try: 

1767 self.db_vims.pop(target_id, None) 

1768 self.my_vims.pop(target_id, None) 

1769 

1770 if target_id in self.vim_targets: 

1771 self.vim_targets.remove(target_id) 

1772 

1773 self.logger.info("Unloaded {}".format(target_id)) 

1774 except Exception as e: 

1775 self.logger.error("Cannot unload {}: {}".format(target_id, e)) 

1776 

1777 def _check_vim(self, target_id): 

1778 """ 

1779 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR 

1780 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim' 

1781 :return: None. 

1782 """ 

1783 target, _, _id = target_id.partition(":") 

1784 now = time.time() 

1785 update_dict = {} 

1786 unset_dict = {} 

1787 op_text = "" 

1788 step = "" 

1789 loaded = target_id in self.vim_targets 

1790 target_database = ( 

1791 "vim_accounts" 

1792 if target == "vim" 

1793 else "wim_accounts" 

1794 if target == "wim" 

1795 else "sdns" 

1796 ) 

1797 error_text = "" 

1798 

1799 try: 

1800 step = "Getting {} from db".format(target_id) 

1801 db_vim = self.db.get_one(target_database, {"_id": _id}) 

1802 

1803 for op_index, operation in enumerate( 

1804 db_vim["_admin"].get("operations", ()) 

1805 ): 

1806 if operation["operationState"] != "PROCESSING": 

1807 continue 

1808 

1809 locked_at = operation.get("locked_at") 

1810 

1811 if locked_at is not None and locked_at >= now - self.task_locked_time: 

1812 # some other thread is doing this operation 

1813 return 

1814 

1815 # lock 

1816 op_text = "_admin.operations.{}.".format(op_index) 

1817 

1818 if not self.db.set_one( 

1819 target_database, 

1820 q_filter={ 

1821 "_id": _id, 

1822 op_text + "operationState": "PROCESSING", 

1823 op_text + "locked_at": locked_at, 

1824 }, 

1825 update_dict={ 

1826 op_text + "locked_at": now, 

1827 "admin.current_operation": op_index, 

1828 }, 

1829 fail_on_empty=False, 

1830 ): 

1831 return 

1832 

1833 unset_dict[op_text + "locked_at"] = None 

1834 unset_dict["current_operation"] = None 

1835 step = "Loading " + target_id 

1836 error_text = self._load_vim(target_id) 

1837 

1838 if not error_text: 

1839 step = "Checking connectivity" 

1840 

1841 if target == "vim": 

1842 self.my_vims[target_id].check_vim_connectivity() 

1843 else: 

1844 self.my_vims[target_id].check_credentials() 

1845 

1846 update_dict["_admin.operationalState"] = "ENABLED" 

1847 update_dict["_admin.detailed-status"] = "" 

1848 unset_dict[op_text + "detailed-status"] = None 

1849 update_dict[op_text + "operationState"] = "COMPLETED" 

1850 

1851 return 

1852 

1853 except Exception as e: 

1854 error_text = "{}: {}".format(step, e) 

1855 self.logger.error("{} for {}: {}".format(step, target_id, e)) 

1856 

1857 finally: 

1858 if update_dict or unset_dict: 

1859 if error_text: 

1860 update_dict[op_text + "operationState"] = "FAILED" 

1861 update_dict[op_text + "detailed-status"] = error_text 

1862 unset_dict.pop(op_text + "detailed-status", None) 

1863 update_dict["_admin.operationalState"] = "ERROR" 

1864 update_dict["_admin.detailed-status"] = error_text 

1865 

1866 if op_text: 

1867 update_dict[op_text + "statusEnteredTime"] = now 

1868 

1869 self.db.set_one( 

1870 target_database, 

1871 q_filter={"_id": _id}, 

1872 update_dict=update_dict, 

1873 unset=unset_dict, 

1874 fail_on_empty=False, 

1875 ) 

1876 

1877 if not loaded: 

1878 self._unload_vim(target_id) 

1879 

1880 def _reload_vim(self, target_id): 

1881 if target_id in self.vim_targets: 

1882 self._load_vim(target_id) 

1883 else: 

1884 # if the vim is not loaded, but database information of VIM is cached at self.db_vims, 

1885 # just remove it to force load again next time it is needed 

1886 self.db_vims.pop(target_id, None) 

1887 

1888 def _load_vim(self, target_id): 

1889 """ 

1890 Load or reload a vim_account, sdn_controller or wim_account. 

1891 Read content from database, load the plugin if not loaded. 

1892 In case of error loading the plugin, it loads a failing VIM_connector 

1893 It fills self db_vims dictionary, my_vims dictionary and vim_targets list 

1894 :param target_id: Contains type:_id; where type can be 'vim', ... 

1895 :return: None if ok, descriptive text if error 

1896 """ 

1897 target, _, _id = target_id.partition(":") 

1898 target_database = ( 

1899 "vim_accounts" 

1900 if target == "vim" 

1901 else "wim_accounts" 

1902 if target == "wim" 

1903 else "sdns" 

1904 ) 

1905 plugin_name = "" 

1906 vim = None 

1907 step = "Getting {}={} from db".format(target, _id) 

1908 

1909 try: 

1910 # TODO process for wim, sdnc, ... 

1911 vim = self.db.get_one(target_database, {"_id": _id}) 

1912 

1913 # if deep_get(vim, "config", "sdn-controller"): 

1914 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"]) 

1915 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]}) 

1916 

1917 step = "Decrypting password" 

1918 schema_version = vim.get("schema_version") 

1919 self.db.encrypt_decrypt_fields( 

1920 vim, 

1921 "decrypt", 

1922 fields=("password", "secret"), 

1923 schema_version=schema_version, 

1924 salt=_id, 

1925 ) 

1926 self._process_vim_config(target_id, vim) 

1927 

1928 if target == "vim": 

1929 plugin_name = "rovim_" + vim["vim_type"] 

1930 step = "Loading plugin '{}'".format(plugin_name) 

1931 vim_module_conn = self._load_plugin(plugin_name) 

1932 step = "Loading {}'".format(target_id) 

1933 self.my_vims[target_id] = vim_module_conn( 

1934 uuid=vim["_id"], 

1935 name=vim["name"], 

1936 tenant_id=vim.get("vim_tenant_id"), 

1937 tenant_name=vim.get("vim_tenant_name"), 

1938 url=vim["vim_url"], 

1939 url_admin=None, 

1940 user=vim["vim_user"], 

1941 passwd=vim["vim_password"], 

1942 config=vim.get("config") or {}, 

1943 persistent_info={}, 

1944 ) 

1945 else: # sdn 

1946 plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type")) 

1947 step = "Loading plugin '{}'".format(plugin_name) 

1948 vim_module_conn = self._load_plugin(plugin_name, "sdn") 

1949 step = "Loading {}'".format(target_id) 

1950 wim = deepcopy(vim) 

1951 wim_config = wim.pop("config", {}) or {} 

1952 wim["uuid"] = wim["_id"] 

1953 if "url" in wim and "wim_url" not in wim: 

1954 wim["wim_url"] = wim["url"] 

1955 elif "url" not in wim and "wim_url" in wim: 

1956 wim["url"] = wim["wim_url"] 

1957 

1958 if wim.get("dpid"): 

1959 wim_config["dpid"] = wim.pop("dpid") 

1960 

1961 if wim.get("switch_id"): 

1962 wim_config["switch_id"] = wim.pop("switch_id") 

1963 

1964 # wim, wim_account, config 

1965 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config) 

1966 self.db_vims[target_id] = vim 

1967 self.error_status = None 

1968 

1969 self.logger.info( 

1970 "Connector loaded for {}, plugin={}".format(target_id, plugin_name) 

1971 ) 

1972 except Exception as e: 

1973 self.logger.error( 

1974 "Cannot load {} plugin={}: {} {}".format( 

1975 target_id, plugin_name, step, e 

1976 ) 

1977 ) 

1978 

1979 self.db_vims[target_id] = vim or {} 

1980 self.db_vims[target_id] = FailingConnector(str(e)) 

1981 error_status = "{} Error: {}".format(step, e) 

1982 

1983 return error_status 

1984 finally: 

1985 if target_id not in self.vim_targets: 

1986 self.vim_targets.append(target_id) 

1987 

1988 def _get_db_task(self): 

1989 """ 

1990 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions 

1991 :return: None 

1992 """ 

1993 now = time.time() 

1994 

1995 if not self.time_last_task_processed: 

1996 self.time_last_task_processed = now 

1997 

1998 try: 

1999 while True: 

2000 """ 

2001 # Log RO tasks only when loglevel is DEBUG 

2002 if self.logger.getEffectiveLevel() == logging.DEBUG: 

2003 self._log_ro_task( 

2004 None, 

2005 None, 

2006 None, 

2007 "TASK_WF", 

2008 "task_locked_time=" 

2009 + str(self.task_locked_time) 

2010 + " " 

2011 + "time_last_task_processed=" 

2012 + str(self.time_last_task_processed) 

2013 + " " 

2014 + "now=" 

2015 + str(now), 

2016 ) 

2017 """ 

2018 locked = self.db.set_one( 

2019 "ro_tasks", 

2020 q_filter={ 

2021 "target_id": self.vim_targets, 

2022 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"], 

2023 "locked_at.lt": now - self.task_locked_time, 

2024 "to_check_at.lt": self.time_last_task_processed, 

2025 "to_check_at.gt": -1, 

2026 }, 

2027 update_dict={"locked_by": self.my_id, "locked_at": now}, 

2028 fail_on_empty=False, 

2029 ) 

2030 

2031 if locked: 

2032 # read and return 

2033 ro_task = self.db.get_one( 

2034 "ro_tasks", 

2035 q_filter={ 

2036 "target_id": self.vim_targets, 

2037 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"], 

2038 "locked_at": now, 

2039 }, 

2040 ) 

2041 return ro_task 

2042 

2043 if self.time_last_task_processed == now: 

2044 self.time_last_task_processed = None 

2045 return None 

2046 else: 

2047 self.time_last_task_processed = now 

2048 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now) 

2049 

2050 except DbException as e: 

2051 self.logger.error("Database exception at _get_db_task: {}".format(e)) 

2052 except Exception as e: 

2053 self.logger.critical( 

2054 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True 

2055 ) 

2056 

2057 return None 

2058 

2059 def _delete_task(self, ro_task, task_index, task_depends, db_update): 

2060 """ 

2061 Determine if this task need to be done or superseded 

2062 :return: None 

2063 """ 

2064 my_task = ro_task["tasks"][task_index] 

2065 task_id = my_task["task_id"] 

2066 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get( 

2067 "created_items", False 

2068 ) 

2069 

2070 self.logger.debug("Needed delete: {}".format(needed_delete)) 

2071 if my_task["status"] == "FAILED": 

2072 return None, None # TODO need to be retry?? 

2073 

2074 try: 

2075 for index, task in enumerate(ro_task["tasks"]): 

2076 if index == task_index or not task: 

2077 continue # own task 

2078 

2079 if ( 

2080 my_task["target_record"] == task["target_record"] 

2081 and task["action"] == "CREATE" 

2082 ): 

2083 # set to finished 

2084 db_update["tasks.{}.status".format(index)] = task[ 

2085 "status" 

2086 ] = "FINISHED" 

2087 elif task["action"] == "CREATE" and task["status"] not in ( 

2088 "FINISHED", 

2089 "SUPERSEDED", 

2090 ): 

2091 needed_delete = False 

2092 

2093 if needed_delete: 

2094 self.logger.debug( 

2095 "Deleting ro_task={} task_index={}".format(ro_task, task_index) 

2096 ) 

2097 return self.item2class[my_task["item"]].delete(ro_task, task_index) 

2098 else: 

2099 return "SUPERSEDED", None 

2100 except Exception as e: 

2101 if not isinstance(e, NsWorkerException): 

2102 self.logger.critical( 

2103 "Unexpected exception at _delete_task task={}: {}".format( 

2104 task_id, e 

2105 ), 

2106 exc_info=True, 

2107 ) 

2108 

2109 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e)} 

2110 

2111 def _create_task(self, ro_task, task_index, task_depends, db_update): 

2112 """ 

2113 Determine if this task need to create something at VIM 

2114 :return: None 

2115 """ 

2116 my_task = ro_task["tasks"][task_index] 

2117 task_id = my_task["task_id"] 

2118 

2119 if my_task["status"] == "FAILED": 

2120 return None, None # TODO need to be retry?? 

2121 elif my_task["status"] == "SCHEDULED": 

2122 # check if already created by another task 

2123 for index, task in enumerate(ro_task["tasks"]): 

2124 if index == task_index or not task: 

2125 continue # own task 

2126 

2127 if task["action"] == "CREATE" and task["status"] not in ( 

2128 "SCHEDULED", 

2129 "FINISHED", 

2130 "SUPERSEDED", 

2131 ): 

2132 return task["status"], "COPY_VIM_INFO" 

2133 

2134 try: 

2135 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new( 

2136 ro_task, task_index, task_depends 

2137 ) 

2138 # TODO update other CREATE tasks 

2139 except Exception as e: 

2140 if not isinstance(e, NsWorkerException): 

2141 self.logger.error( 

2142 "Error executing task={}: {}".format(task_id, e), exc_info=True 

2143 ) 

2144 

2145 task_status = "FAILED" 

2146 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_message": str(e)} 

2147 # TODO update ro_vim_item_update 

2148 

2149 return task_status, ro_vim_item_update 

2150 else: 

2151 return None, None 

2152 

2153 def _get_dependency(self, task_id, ro_task=None, target_id=None): 

2154 """ 

2155 Look for dependency task 

2156 :param task_id: Can be one of 

2157 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>" 

2158 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>" 

2159 3. task.task_id: "<action_id>:number" 

2160 :param ro_task: 

2161 :param target_id: 

2162 :return: database ro_task plus index of task 

2163 """ 

2164 if ( 

2165 task_id.startswith("vim:") 

2166 or task_id.startswith("sdn:") 

2167 or task_id.startswith("wim:") 

2168 ): 

2169 target_id, _, task_id = task_id.partition(" ") 

2170 

2171 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"): 

2172 ro_task_dependency = self.db.get_one( 

2173 "ro_tasks", 

2174 q_filter={"target_id": target_id, "tasks.target_record_id": task_id}, 

2175 fail_on_empty=False, 

2176 ) 

2177 

2178 if ro_task_dependency: 

2179 for task_index, task in enumerate(ro_task_dependency["tasks"]): 

2180 if task["target_record_id"] == task_id: 

2181 return ro_task_dependency, task_index 

2182 

2183 else: 

2184 if ro_task: 

2185 for task_index, task in enumerate(ro_task["tasks"]): 

2186 if task and task["task_id"] == task_id: 

2187 return ro_task, task_index 

2188 

2189 ro_task_dependency = self.db.get_one( 

2190 "ro_tasks", 

2191 q_filter={ 

2192 "tasks.ANYINDEX.task_id": task_id, 

2193 "tasks.ANYINDEX.target_record.ne": None, 

2194 }, 

2195 fail_on_empty=False, 

2196 ) 

2197 

2198 self.logger.debug("ro_task_dependency={}".format(ro_task_dependency)) 

2199 if ro_task_dependency: 

2200 for task_index, task in enumerate(ro_task_dependency["tasks"]): 

2201 if task["task_id"] == task_id: 

2202 return ro_task_dependency, task_index 

2203 raise NsWorkerException("Cannot get depending task {}".format(task_id)) 

2204 

2205 def update_vm_refresh(self, ro_task): 

2206 """Enables the VM status updates if self.refresh_config.active parameter 

2207 is not -1 and then updates the DB accordingly 

2208 

2209 """ 

2210 try: 

2211 self.logger.debug("Checking if VM status update config") 

2212 next_refresh = time.time() 

2213 next_refresh = self._get_next_refresh(ro_task, next_refresh) 

2214 

2215 if next_refresh != -1: 

2216 db_ro_task_update = {} 

2217 now = time.time() 

2218 next_check_at = now + (24 * 60 * 60) 

2219 next_check_at = min(next_check_at, next_refresh) 

2220 db_ro_task_update["vim_info.refresh_at"] = next_refresh 

2221 db_ro_task_update["to_check_at"] = next_check_at 

2222 

2223 self.logger.debug( 

2224 "Finding tasks which to be updated to enable VM status updates" 

2225 ) 

2226 refresh_tasks = self.db.get_list( 

2227 "ro_tasks", 

2228 q_filter={ 

2229 "tasks.status": "DONE", 

2230 "to_check_at.lt": 0, 

2231 }, 

2232 ) 

2233 self.logger.debug("Updating tasks to change the to_check_at status") 

2234 for task in refresh_tasks: 

2235 q_filter = { 

2236 "_id": task["_id"], 

2237 } 

2238 self.db.set_one( 

2239 "ro_tasks", 

2240 q_filter=q_filter, 

2241 update_dict=db_ro_task_update, 

2242 fail_on_empty=True, 

2243 ) 

2244 

2245 except Exception as e: 

2246 self.logger.error(f"Error updating tasks to enable VM status updates: {e}") 

2247 

2248 def _get_next_refresh(self, ro_task: dict, next_refresh: float): 

2249 """Decide the next_refresh according to vim type and refresh config period. 

2250 Args: 

2251 ro_task (dict): ro_task details 

2252 next_refresh (float): next refresh time as epoch format 

2253 

2254 Returns: 

2255 next_refresh (float) -1 if vm updates are disabled or vim type is openstack. 

2256 """ 

2257 target_vim = ro_task["target_id"] 

2258 vim_type = self.db_vims[target_vim]["vim_type"] 

2259 if self.refresh_config.active == -1 or vim_type == "openstack": 

2260 next_refresh = -1 

2261 else: 

2262 next_refresh += self.refresh_config.active 

2263 return next_refresh 

2264 

2265 def _process_pending_tasks(self, ro_task): 

2266 ro_task_id = ro_task["_id"] 

2267 now = time.time() 

2268 # one day 

2269 next_check_at = now + (24 * 60 * 60) 

2270 db_ro_task_update = {} 

2271 

2272 def _update_refresh(new_status): 

2273 # compute next_refresh 

2274 nonlocal task 

2275 nonlocal next_check_at 

2276 nonlocal db_ro_task_update 

2277 nonlocal ro_task 

2278 

2279 next_refresh = time.time() 

2280 

2281 if task["item"] in ("image", "flavor"): 

2282 next_refresh += self.refresh_config.image 

2283 elif new_status == "BUILD": 

2284 next_refresh += self.refresh_config.build 

2285 elif new_status == "DONE": 

2286 next_refresh = self._get_next_refresh(ro_task, next_refresh) 

2287 else: 

2288 next_refresh += self.refresh_config.error 

2289 

2290 next_check_at = min(next_check_at, next_refresh) 

2291 db_ro_task_update["vim_info.refresh_at"] = next_refresh 

2292 ro_task["vim_info"]["refresh_at"] = next_refresh 

2293 

2294 try: 

2295 """ 

2296 # Log RO tasks only when loglevel is DEBUG 

2297 if self.logger.getEffectiveLevel() == logging.DEBUG: 

2298 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK") 

2299 """ 

2300 # Check if vim status refresh is enabled again 

2301 self.update_vm_refresh(ro_task) 

2302 # 0: get task_status_create 

2303 lock_object = None 

2304 task_status_create = None 

2305 task_create = next( 

2306 ( 

2307 t 

2308 for t in ro_task["tasks"] 

2309 if t 

2310 and t["action"] == "CREATE" 

2311 and t["status"] in ("BUILD", "DONE") 

2312 ), 

2313 None, 

2314 ) 

2315 

2316 if task_create: 

2317 task_status_create = task_create["status"] 

2318 

2319 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD 

2320 for task_action in ("DELETE", "CREATE", "EXEC"): 

2321 db_vim_update = None 

2322 new_status = None 

2323 

2324 for task_index, task in enumerate(ro_task["tasks"]): 

2325 if not task: 

2326 continue # task deleted 

2327 

2328 task_depends = {} 

2329 target_update = None 

2330 

2331 if ( 

2332 ( 

2333 task_action in ("DELETE", "EXEC") 

2334 and task["status"] not in ("SCHEDULED", "BUILD") 

2335 ) 

2336 or task["action"] != task_action 

2337 or ( 

2338 task_action == "CREATE" 

2339 and task["status"] in ("FINISHED", "SUPERSEDED") 

2340 ) 

2341 ): 

2342 continue 

2343 

2344 task_path = "tasks.{}.status".format(task_index) 

2345 try: 

2346 db_vim_info_update = None 

2347 dependency_ro_task = {} 

2348 

2349 if task["status"] == "SCHEDULED": 

2350 # check if tasks that this depends on have been completed 

2351 dependency_not_completed = False 

2352 

2353 for dependency_task_id in task.get("depends_on") or (): 

2354 ( 

2355 dependency_ro_task, 

2356 dependency_task_index, 

2357 ) = self._get_dependency( 

2358 dependency_task_id, target_id=ro_task["target_id"] 

2359 ) 

2360 dependency_task = dependency_ro_task["tasks"][ 

2361 dependency_task_index 

2362 ] 

2363 self.logger.debug( 

2364 "dependency_ro_task={} dependency_task_index={}".format( 

2365 dependency_ro_task, dependency_task_index 

2366 ) 

2367 ) 

2368 

2369 if dependency_task["status"] == "SCHEDULED": 

2370 dependency_not_completed = True 

2371 next_check_at = min( 

2372 next_check_at, dependency_ro_task["to_check_at"] 

2373 ) 

2374 # must allow dependent task to be processed first 

2375 # to do this set time after last_task_processed 

2376 next_check_at = max( 

2377 self.time_last_task_processed, next_check_at 

2378 ) 

2379 break 

2380 elif dependency_task["status"] == "FAILED": 

2381 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format( 

2382 task["action"], 

2383 task["item"], 

2384 dependency_task["action"], 

2385 dependency_task["item"], 

2386 dependency_task_id, 

2387 dependency_ro_task["vim_info"].get( 

2388 "vim_message" 

2389 ), 

2390 ) 

2391 self.logger.error( 

2392 "task={} {}".format(task["task_id"], error_text) 

2393 ) 

2394 raise NsWorkerException(error_text) 

2395 

2396 task_depends[dependency_task_id] = dependency_ro_task[ 

2397 "vim_info" 

2398 ]["vim_id"] 

2399 task_depends[ 

2400 "TASK-{}".format(dependency_task_id) 

2401 ] = dependency_ro_task["vim_info"]["vim_id"] 

2402 

2403 if dependency_not_completed: 

2404 self.logger.warning( 

2405 "DEPENDENCY NOT COMPLETED {}".format( 

2406 dependency_ro_task["vim_info"]["vim_id"] 

2407 ) 

2408 ) 

2409 # TODO set at vim_info.vim_details that it is waiting 

2410 continue 

2411 

2412 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew 

2413 # the task of renew this locking. It will update database locket_at periodically 

2414 if not lock_object: 

2415 lock_object = LockRenew.add_lock_object( 

2416 "ro_tasks", ro_task, self 

2417 ) 

2418 if task["action"] == "DELETE": 

2419 ( 

2420 new_status, 

2421 db_vim_info_update, 

2422 ) = self._delete_task( 

2423 ro_task, task_index, task_depends, db_ro_task_update 

2424 ) 

2425 new_status = ( 

2426 "FINISHED" if new_status == "DONE" else new_status 

2427 ) 

2428 # ^with FINISHED instead of DONE it will not be refreshing 

2429 

2430 if new_status in ("FINISHED", "SUPERSEDED"): 

2431 target_update = "DELETE" 

2432 elif task["action"] == "EXEC": 

2433 ( 

2434 new_status, 

2435 db_vim_info_update, 

2436 db_task_update, 

2437 ) = self.item2class[task["item"]].exec( 

2438 ro_task, task_index, task_depends 

2439 ) 

2440 new_status = ( 

2441 "FINISHED" if new_status == "DONE" else new_status 

2442 ) 

2443 # ^with FINISHED instead of DONE it will not be refreshing 

2444 

2445 if db_task_update: 

2446 # load into database the modified db_task_update "retries" and "next_retry" 

2447 if db_task_update.get("retries"): 

2448 db_ro_task_update[ 

2449 "tasks.{}.retries".format(task_index) 

2450 ] = db_task_update["retries"] 

2451 

2452 next_check_at = time.time() + db_task_update.get( 

2453 "next_retry", 60 

2454 ) 

2455 target_update = None 

2456 elif task["action"] == "CREATE": 

2457 if task["status"] == "SCHEDULED": 

2458 if task_status_create: 

2459 new_status = task_status_create 

2460 target_update = "COPY_VIM_INFO" 

2461 else: 

2462 new_status, db_vim_info_update = self.item2class[ 

2463 task["item"] 

2464 ].new(ro_task, task_index, task_depends) 

2465 _update_refresh(new_status) 

2466 else: 

2467 refresh_at = ro_task["vim_info"]["refresh_at"] 

2468 if refresh_at and refresh_at != -1 and now > refresh_at: 

2469 ( 

2470 new_status, 

2471 db_vim_info_update, 

2472 ) = self.item2class[ 

2473 task["item"] 

2474 ].refresh(ro_task) 

2475 _update_refresh(new_status) 

2476 else: 

2477 # The refresh is updated to avoid set the value of "refresh_at" to 

2478 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD, 

2479 # because it can happen that in this case the task is never processed 

2480 _update_refresh(task["status"]) 

2481 

2482 except Exception as e: 

2483 new_status = "FAILED" 

2484 db_vim_info_update = { 

2485 "vim_status": "VIM_ERROR", 

2486 "vim_message": str(e), 

2487 } 

2488 

2489 if not isinstance( 

2490 e, (NsWorkerException, vimconn.VimConnException) 

2491 ): 

2492 self.logger.error( 

2493 "Unexpected exception at _delete_task task={}: {}".format( 

2494 task["task_id"], e 

2495 ), 

2496 exc_info=True, 

2497 ) 

2498 

2499 try: 

2500 if db_vim_info_update: 

2501 db_vim_update = db_vim_info_update.copy() 

2502 db_ro_task_update.update( 

2503 { 

2504 "vim_info." + k: v 

2505 for k, v in db_vim_info_update.items() 

2506 } 

2507 ) 

2508 ro_task["vim_info"].update(db_vim_info_update) 

2509 

2510 if new_status: 

2511 if task_action == "CREATE": 

2512 task_status_create = new_status 

2513 db_ro_task_update[task_path] = new_status 

2514 

2515 if target_update or db_vim_update: 

2516 if target_update == "DELETE": 

2517 self._update_target(task, None) 

2518 elif target_update == "COPY_VIM_INFO": 

2519 self._update_target(task, ro_task["vim_info"]) 

2520 else: 

2521 self._update_target(task, db_vim_update) 

2522 

2523 except Exception as e: 

2524 if ( 

2525 isinstance(e, DbException) 

2526 and e.http_code == HTTPStatus.NOT_FOUND 

2527 ): 

2528 # if the vnfrs or nsrs has been removed from database, this task must be removed 

2529 self.logger.debug( 

2530 "marking to delete task={}".format(task["task_id"]) 

2531 ) 

2532 self.tasks_to_delete.append(task) 

2533 else: 

2534 self.logger.error( 

2535 "Unexpected exception at _update_target task={}: {}".format( 

2536 task["task_id"], e 

2537 ), 

2538 exc_info=True, 

2539 ) 

2540 

2541 locked_at = ro_task["locked_at"] 

2542 

2543 if lock_object: 

2544 locked_at = [ 

2545 lock_object["locked_at"], 

2546 lock_object["locked_at"] + self.task_locked_time, 

2547 ] 

2548 # locked_at contains two times to avoid race condition. In case the lock has been renewed, it will 

2549 # contain exactly locked_at + self.task_locked_time 

2550 LockRenew.remove_lock_object(lock_object) 

2551 

2552 q_filter = { 

2553 "_id": ro_task["_id"], 

2554 "to_check_at": ro_task["to_check_at"], 

2555 "locked_at": locked_at, 

2556 } 

2557 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified, 

2558 # outside this task (by ro_nbi) do not update it 

2559 db_ro_task_update["locked_by"] = None 

2560 # locked_at converted to int only for debugging. When it is not decimals it means it has been unlocked 

2561 db_ro_task_update["locked_at"] = int(now - self.task_locked_time) 

2562 db_ro_task_update["modified_at"] = now 

2563 db_ro_task_update["to_check_at"] = next_check_at 

2564 

2565 """ 

2566 # Log RO tasks only when loglevel is DEBUG 

2567 if self.logger.getEffectiveLevel() == logging.DEBUG: 

2568 db_ro_task_update_log = db_ro_task_update.copy() 

2569 db_ro_task_update_log["_id"] = q_filter["_id"] 

2570 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK") 

2571 """ 

2572 

2573 if not self.db.set_one( 

2574 "ro_tasks", 

2575 update_dict=db_ro_task_update, 

2576 q_filter=q_filter, 

2577 fail_on_empty=False, 

2578 ): 

2579 del db_ro_task_update["to_check_at"] 

2580 del q_filter["to_check_at"] 

2581 """ 

2582 # Log RO tasks only when loglevel is DEBUG 

2583 if self.logger.getEffectiveLevel() == logging.DEBUG: 

2584 self._log_ro_task( 

2585 None, 

2586 db_ro_task_update_log, 

2587 None, 

2588 "TASK_WF", 

2589 "SET_TASK " + str(q_filter), 

2590 ) 

2591 """ 

2592 self.db.set_one( 

2593 "ro_tasks", 

2594 q_filter=q_filter, 

2595 update_dict=db_ro_task_update, 

2596 fail_on_empty=True, 

2597 ) 

2598 except DbException as e: 

2599 self.logger.error( 

2600 "ro_task={} Error updating database {}".format(ro_task_id, e) 

2601 ) 

2602 except Exception as e: 

2603 self.logger.error( 

2604 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True 

2605 ) 

2606 

2607 def _update_target(self, task, ro_vim_item_update): 

2608 table, _, temp = task["target_record"].partition(":") 

2609 _id, _, path_vim_status = temp.partition(":") 

2610 path_item = path_vim_status[: path_vim_status.rfind(".")] 

2611 path_item = path_item[: path_item.rfind(".")] 

2612 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id" 

2613 # path_item: dot separated list targeting record information, e.g. "vdur.10" 

2614 

2615 if ro_vim_item_update: 

2616 update_dict = { 

2617 path_vim_status + "." + k: v 

2618 for k, v in ro_vim_item_update.items() 

2619 if k 

2620 in ( 

2621 "vim_id", 

2622 "vim_details", 

2623 "vim_message", 

2624 "vim_name", 

2625 "vim_status", 

2626 "interfaces", 

2627 "interfaces_backup", 

2628 ) 

2629 } 

2630 

2631 if path_vim_status.startswith("vdur."): 

2632 # for backward compatibility, add vdur.name apart from vdur.vim_name 

2633 if ro_vim_item_update.get("vim_name"): 

2634 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"] 

2635 

2636 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id 

2637 if ro_vim_item_update.get("vim_id"): 

2638 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"] 

2639 

2640 # update general status 

2641 if ro_vim_item_update.get("vim_status"): 

2642 update_dict[path_item + ".status"] = ro_vim_item_update[ 

2643 "vim_status" 

2644 ] 

2645 

2646 if ro_vim_item_update.get("interfaces"): 

2647 path_interfaces = path_item + ".interfaces" 

2648 

2649 for i, iface in enumerate(ro_vim_item_update.get("interfaces")): 

2650 if iface: 

2651 update_dict.update( 

2652 { 

2653 path_interfaces + ".{}.".format(i) + k: v 

2654 for k, v in iface.items() 

2655 if k in ("vlan", "compute_node", "pci") 

2656 } 

2657 ) 

2658 

2659 # put ip_address and mac_address with ip-address and mac-address 

2660 if iface.get("ip_address"): 

2661 update_dict[ 

2662 path_interfaces + ".{}.".format(i) + "ip-address" 

2663 ] = iface["ip_address"] 

2664 

2665 if iface.get("mac_address"): 

2666 update_dict[ 

2667 path_interfaces + ".{}.".format(i) + "mac-address" 

2668 ] = iface["mac_address"] 

2669 

2670 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"): 

2671 update_dict["ip-address"] = iface.get("ip_address").split( 

2672 ";" 

2673 )[0] 

2674 

2675 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"): 

2676 update_dict[path_item + ".ip-address"] = iface.get( 

2677 "ip_address" 

2678 ).split(";")[0] 

2679 

2680 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict) 

2681 

2682 # If interfaces exists, it backups VDU interfaces in the DB for healing operations 

2683 if ro_vim_item_update.get("interfaces"): 

2684 search_key = path_vim_status + ".interfaces" 

2685 if update_dict.get(search_key): 

2686 interfaces_backup_update = { 

2687 path_vim_status + ".interfaces_backup": update_dict[search_key] 

2688 } 

2689 

2690 self.db.set_one( 

2691 table, 

2692 q_filter={"_id": _id}, 

2693 update_dict=interfaces_backup_update, 

2694 ) 

2695 

2696 else: 

2697 update_dict = {path_item + ".status": "DELETED"} 

2698 self.db.set_one( 

2699 table, 

2700 q_filter={"_id": _id}, 

2701 update_dict=update_dict, 

2702 unset={path_vim_status: None}, 

2703 ) 

2704 

2705 def _process_delete_db_tasks(self): 

2706 """ 

2707 Delete task from database because vnfrs or nsrs or both have been deleted 

2708 :return: None. Uses and modify self.tasks_to_delete 

2709 """ 

2710 while self.tasks_to_delete: 

2711 task = self.tasks_to_delete[0] 

2712 vnfrs_deleted = None 

2713 nsr_id = task["nsr_id"] 

2714 

2715 if task["target_record"].startswith("vnfrs:"): 

2716 # check if nsrs is present 

2717 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False): 

2718 vnfrs_deleted = task["target_record"].split(":")[1] 

2719 

2720 try: 

2721 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted) 

2722 except Exception as e: 

2723 self.logger.error( 

2724 "Error deleting task={}: {}".format(task["task_id"], e) 

2725 ) 

2726 self.tasks_to_delete.pop(0) 

2727 

2728 @staticmethod 

2729 def delete_db_tasks(db, nsr_id, vnfrs_deleted): 

2730 """ 

2731 Static method because it is called from osm_ng_ro.ns 

2732 :param db: instance of database to use 

2733 :param nsr_id: affected nsrs id 

2734 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id 

2735 :return: None, exception is fails 

2736 """ 

2737 retries = 5 

2738 for retry in range(retries): 

2739 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id}) 

2740 now = time.time() 

2741 conflict = False 

2742 

2743 for ro_task in ro_tasks: 

2744 db_update = {} 

2745 to_delete_ro_task = True 

2746 

2747 for index, task in enumerate(ro_task["tasks"]): 

2748 if not task: 

2749 pass 

2750 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or ( 

2751 vnfrs_deleted 

2752 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted) 

2753 ): 

2754 db_update["tasks.{}".format(index)] = None 

2755 else: 

2756 # used by other nsr, ro_task cannot be deleted 

2757 to_delete_ro_task = False 

2758 

2759 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed 

2760 if to_delete_ro_task: 

2761 if not db.del_one( 

2762 "ro_tasks", 

2763 q_filter={ 

2764 "_id": ro_task["_id"], 

2765 "modified_at": ro_task["modified_at"], 

2766 }, 

2767 fail_on_empty=False, 

2768 ): 

2769 conflict = True 

2770 elif db_update: 

2771 db_update["modified_at"] = now 

2772 if not db.set_one( 

2773 "ro_tasks", 

2774 q_filter={ 

2775 "_id": ro_task["_id"], 

2776 "modified_at": ro_task["modified_at"], 

2777 }, 

2778 update_dict=db_update, 

2779 fail_on_empty=False, 

2780 ): 

2781 conflict = True 

2782 if not conflict: 

2783 return 

2784 else: 

2785 raise NsWorkerException("Exceeded {} retries".format(retries)) 

2786 

2787 def run(self): 

2788 # load database 

2789 self.logger.info("Starting") 

2790 while True: 

2791 # step 1: get commands from queue 

2792 try: 

2793 if self.vim_targets: 

2794 task = self.task_queue.get(block=False) 

2795 else: 

2796 if not self.idle: 

2797 self.logger.debug("enters in idle state") 

2798 self.idle = True 

2799 task = self.task_queue.get(block=True) 

2800 self.idle = False 

2801 

2802 if task[0] == "terminate": 

2803 break 

2804 elif task[0] == "load_vim": 

2805 self.logger.info("order to load vim {}".format(task[1])) 

2806 self._load_vim(task[1]) 

2807 elif task[0] == "unload_vim": 

2808 self.logger.info("order to unload vim {}".format(task[1])) 

2809 self._unload_vim(task[1]) 

2810 elif task[0] == "reload_vim": 

2811 self._reload_vim(task[1]) 

2812 elif task[0] == "check_vim": 

2813 self.logger.info("order to check vim {}".format(task[1])) 

2814 self._check_vim(task[1]) 

2815 continue 

2816 except Exception as e: 

2817 if isinstance(e, queue.Empty): 

2818 pass 

2819 else: 

2820 self.logger.critical( 

2821 "Error processing task: {}".format(e), exc_info=True 

2822 ) 

2823 

2824 # step 2: process pending_tasks, delete not needed tasks 

2825 try: 

2826 if self.tasks_to_delete: 

2827 self._process_delete_db_tasks() 

2828 busy = False 

2829 """ 

2830 # Log RO tasks only when loglevel is DEBUG 

2831 if self.logger.getEffectiveLevel() == logging.DEBUG: 

2832 _ = self._get_db_all_tasks() 

2833 """ 

2834 ro_task = self._get_db_task() 

2835 if ro_task: 

2836 self.logger.debug("Task to process: {}".format(ro_task)) 

2837 time.sleep(1) 

2838 self._process_pending_tasks(ro_task) 

2839 busy = True 

2840 if not busy: 

2841 time.sleep(5) 

2842 except Exception as e: 

2843 self.logger.critical( 

2844 "Unexpected exception at run: " + str(e), exc_info=True 

2845 ) 

2846 

2847 self.logger.info("Finishing")