Coverage for NG-RO/osm_ng_ro/ns_thread.py: 31%

1428 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2024-06-27 18:44 +0000

1# -*- coding: utf-8 -*- 

2 

3## 

4# Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U. 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); you may 

7# not use this file except in compliance with the License. You may obtain 

8# a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

15# License for the specific language governing permissions and limitations 

16# under the License. 

17# 

18## 

19 

20""" 

21This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM. 

22The tasks are stored at database in table ro_tasks 

23A single ro_task refers to a VIM element (flavor, image, network, ...). 

24A ro_task can contain several 'tasks', each one with a target, where to store the results 

25""" 

26 

27from copy import deepcopy 

28from http import HTTPStatus 

29import logging 

30from os import makedirs 

31from os import path 

32import queue 

33import threading 

34import time 

35import traceback 

36from typing import Dict 

37from unittest.mock import Mock 

38 

39from importlib_metadata import entry_points 

40from osm_common.dbbase import DbException 

41from osm_ng_ro.vim_admin import LockRenew 

42from osm_ro_plugin import sdnconn 

43from osm_ro_plugin import vimconn 

44from osm_ro_plugin.sdn_dummy import SdnDummyConnector 

45from osm_ro_plugin.vim_dummy import VimDummyConnector 

46import yaml 

47 

48__author__ = "Alfonso Tierno" 

49__date__ = "$28-Sep-2017 12:07:15$" 

50 

51 

52def deep_get(target_dict, *args, **kwargs): 

53 """ 

54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None 

55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None 

56 :param target_dict: dictionary to be read 

57 :param args: list of keys to read from target_dict 

58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary 

59 :return: The wanted value if exists, None or default otherwise 

60 """ 

61 for key in args: 

62 if not isinstance(target_dict, dict) or key not in target_dict: 

63 return kwargs.get("default") 

64 target_dict = target_dict[key] 

65 return target_dict 

66 

67 

68class NsWorkerException(Exception): 

69 pass 

70 

71 

72class FailingConnector: 

73 def __init__(self, error_msg): 

74 self.error_msg = error_msg 

75 

76 for method in dir(vimconn.VimConnector): 

77 if method[0] != "_": 

78 setattr( 

79 self, method, Mock(side_effect=vimconn.VimConnException(error_msg)) 

80 ) 

81 

82 for method in dir(sdnconn.SdnConnectorBase): 

83 if method[0] != "_": 

84 setattr( 

85 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg)) 

86 ) 

87 

88 

89class NsWorkerExceptionNotFound(NsWorkerException): 

90 pass 

91 

92 

93class VimInteractionBase: 

94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ... 

95 It implements methods that does nothing and return ok""" 

96 

97 def __init__(self, db, my_vims, db_vims, logger): 

98 self.db = db 

99 self.logger = logger 

100 self.my_vims = my_vims 

101 self.db_vims = db_vims 

102 

103 def new(self, ro_task, task_index, task_depends): 

104 return "BUILD", {} 

105 

106 def refresh(self, ro_task): 

107 """skip calling VIM to get image, flavor status. Assumes ok""" 

108 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR": 

109 return "FAILED", {} 

110 

111 return "DONE", {} 

112 

113 def delete(self, ro_task, task_index): 

114 """skip calling VIM to delete image. Assumes ok""" 

115 return "DONE", {} 

116 

117 def exec(self, ro_task, task_index, task_depends): 

118 return "DONE", None, None 

119 

120 

121class VimInteractionNet(VimInteractionBase): 

122 def new(self, ro_task, task_index, task_depends): 

123 vim_net_id = None 

124 task = ro_task["tasks"][task_index] 

125 task_id = task["task_id"] 

126 created = False 

127 created_items = {} 

128 target_vim = self.my_vims[ro_task["target_id"]] 

129 mgmtnet = False 

130 mgmtnet_defined_in_vim = False 

131 

132 try: 

133 # FIND 

134 if task.get("find_params"): 

135 # if management, get configuration of VIM 

136 if task["find_params"].get("filter_dict"): 

137 vim_filter = task["find_params"]["filter_dict"] 

138 # management network 

139 elif task["find_params"].get("mgmt"): 

140 mgmtnet = True 

141 if deep_get( 

142 self.db_vims[ro_task["target_id"]], 

143 "config", 

144 "management_network_id", 

145 ): 

146 mgmtnet_defined_in_vim = True 

147 vim_filter = { 

148 "id": self.db_vims[ro_task["target_id"]]["config"][ 

149 "management_network_id" 

150 ] 

151 } 

152 elif deep_get( 

153 self.db_vims[ro_task["target_id"]], 

154 "config", 

155 "management_network_name", 

156 ): 

157 mgmtnet_defined_in_vim = True 

158 vim_filter = { 

159 "name": self.db_vims[ro_task["target_id"]]["config"][ 

160 "management_network_name" 

161 ] 

162 } 

163 else: 

164 vim_filter = {"name": task["find_params"]["name"]} 

165 else: 

166 raise NsWorkerExceptionNotFound( 

167 "Invalid find_params for new_net {}".format(task["find_params"]) 

168 ) 

169 

170 vim_nets = target_vim.get_network_list(vim_filter) 

171 if not vim_nets and not task.get("params"): 

172 # If there is mgmt-network in the descriptor, 

173 # there is no mapping of that network to a VIM network in the descriptor, 

174 # also there is no mapping in the "--config" parameter or at VIM creation; 

175 # that mgmt-network will be created. 

176 if mgmtnet and not mgmtnet_defined_in_vim: 

177 net_name = ( 

178 vim_filter.get("name") 

179 if vim_filter.get("name") 

180 else vim_filter.get("id")[:16] 

181 ) 

182 vim_net_id, created_items = target_vim.new_network( 

183 net_name, None 

184 ) 

185 self.logger.debug( 

186 "Created mgmt network vim_net_id: {}".format(vim_net_id) 

187 ) 

188 created = True 

189 else: 

190 raise NsWorkerExceptionNotFound( 

191 "Network not found with this criteria: '{}'".format( 

192 task.get("find_params") 

193 ) 

194 ) 

195 elif len(vim_nets) > 1: 

196 raise NsWorkerException( 

197 "More than one network found with this criteria: '{}'".format( 

198 task["find_params"] 

199 ) 

200 ) 

201 

202 if vim_nets: 

203 vim_net_id = vim_nets[0]["id"] 

204 else: 

205 # CREATE 

206 params = task["params"] 

207 vim_net_id, created_items = target_vim.new_network(**params) 

208 created = True 

209 

210 ro_vim_item_update = { 

211 "vim_id": vim_net_id, 

212 "vim_status": "BUILD", 

213 "created": created, 

214 "created_items": created_items, 

215 "vim_details": None, 

216 "vim_message": None, 

217 } 

218 self.logger.debug( 

219 "task={} {} new-net={} created={}".format( 

220 task_id, ro_task["target_id"], vim_net_id, created 

221 ) 

222 ) 

223 

224 return "BUILD", ro_vim_item_update 

225 except (vimconn.VimConnException, NsWorkerException) as e: 

226 self.logger.error( 

227 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e) 

228 ) 

229 ro_vim_item_update = { 

230 "vim_status": "VIM_ERROR", 

231 "created": created, 

232 "vim_message": str(e), 

233 } 

234 

235 return "FAILED", ro_vim_item_update 

236 

237 def refresh(self, ro_task): 

238 """Call VIM to get network status""" 

239 ro_task_id = ro_task["_id"] 

240 target_vim = self.my_vims[ro_task["target_id"]] 

241 vim_id = ro_task["vim_info"]["vim_id"] 

242 net_to_refresh_list = [vim_id] 

243 

244 try: 

245 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list) 

246 vim_info = vim_dict[vim_id] 

247 

248 if vim_info["status"] == "ACTIVE": 

249 task_status = "DONE" 

250 elif vim_info["status"] == "BUILD": 

251 task_status = "BUILD" 

252 else: 

253 task_status = "FAILED" 

254 except vimconn.VimConnException as e: 

255 # Mark all tasks at VIM_ERROR status 

256 self.logger.error( 

257 "ro_task={} vim={} get-net={}: {}".format( 

258 ro_task_id, ro_task["target_id"], vim_id, e 

259 ) 

260 ) 

261 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)} 

262 task_status = "FAILED" 

263 

264 ro_vim_item_update = {} 

265 if ro_task["vim_info"]["vim_status"] != vim_info["status"]: 

266 ro_vim_item_update["vim_status"] = vim_info["status"] 

267 

268 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"): 

269 ro_vim_item_update["vim_name"] = vim_info.get("name") 

270 

271 if vim_info["status"] in ("ERROR", "VIM_ERROR"): 

272 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"): 

273 ro_vim_item_update["vim_message"] = vim_info.get("error_msg") 

274 elif vim_info["status"] == "DELETED": 

275 ro_vim_item_update["vim_id"] = None 

276 ro_vim_item_update["vim_message"] = "Deleted externally" 

277 else: 

278 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]: 

279 ro_vim_item_update["vim_details"] = vim_info["vim_info"] 

280 

281 if ro_vim_item_update: 

282 self.logger.debug( 

283 "ro_task={} {} get-net={}: status={} {}".format( 

284 ro_task_id, 

285 ro_task["target_id"], 

286 vim_id, 

287 ro_vim_item_update.get("vim_status"), 

288 ( 

289 ro_vim_item_update.get("vim_message") 

290 if ro_vim_item_update.get("vim_status") != "ACTIVE" 

291 else "" 

292 ), 

293 ) 

294 ) 

295 

296 return task_status, ro_vim_item_update 

297 

298 def delete(self, ro_task, task_index): 

299 task = ro_task["tasks"][task_index] 

300 task_id = task["task_id"] 

301 net_vim_id = ro_task["vim_info"]["vim_id"] 

302 ro_vim_item_update_ok = { 

303 "vim_status": "DELETED", 

304 "created": False, 

305 "vim_message": "DELETED", 

306 "vim_id": None, 

307 } 

308 

309 try: 

310 if net_vim_id or ro_task["vim_info"]["created_items"]: 

311 target_vim = self.my_vims[ro_task["target_id"]] 

312 target_vim.delete_network( 

313 net_vim_id, ro_task["vim_info"]["created_items"] 

314 ) 

315 except vimconn.VimConnNotFoundException: 

316 ro_vim_item_update_ok["vim_message"] = "already deleted" 

317 except vimconn.VimConnException as e: 

318 self.logger.error( 

319 "ro_task={} vim={} del-net={}: {}".format( 

320 ro_task["_id"], ro_task["target_id"], net_vim_id, e 

321 ) 

322 ) 

323 ro_vim_item_update = { 

324 "vim_status": "VIM_ERROR", 

325 "vim_message": "Error while deleting: {}".format(e), 

326 } 

327 

328 return "FAILED", ro_vim_item_update 

329 

330 self.logger.debug( 

331 "task={} {} del-net={} {}".format( 

332 task_id, 

333 ro_task["target_id"], 

334 net_vim_id, 

335 ro_vim_item_update_ok.get("vim_message", ""), 

336 ) 

337 ) 

338 

339 return "DONE", ro_vim_item_update_ok 

340 

341 

342class VimInteractionClassification(VimInteractionBase): 

343 def new(self, ro_task, task_index, task_depends): 

344 task = ro_task["tasks"][task_index] 

345 task_id = task["task_id"] 

346 created = False 

347 target_vim = self.my_vims[ro_task["target_id"]] 

348 

349 try: 

350 created = True 

351 params = task["params"] 

352 params_copy = deepcopy(params) 

353 

354 name = params_copy.pop("name") 

355 logical_source_port_index = int( 

356 params_copy.pop("logical_source_port_index") 

357 ) 

358 logical_source_port = params_copy["logical_source_port"] 

359 

360 if logical_source_port.startswith("TASK-"): 

361 vm_id = task_depends[logical_source_port] 

362 params_copy["logical_source_port"] = target_vim.refresh_vms_status( 

363 [vm_id] 

364 )[vm_id]["interfaces"][logical_source_port_index]["vim_interface_id"] 

365 

366 vim_classification_id = target_vim.new_classification( 

367 name, "legacy_flow_classifier", params_copy 

368 ) 

369 

370 ro_vim_item_update = { 

371 "vim_id": vim_classification_id, 

372 "vim_status": "DONE", 

373 "created": created, 

374 "vim_details": None, 

375 "vim_message": None, 

376 } 

377 self.logger.debug( 

378 "task={} {} created={}".format(task_id, ro_task["target_id"], created) 

379 ) 

380 

381 return "DONE", ro_vim_item_update 

382 except (vimconn.VimConnException, NsWorkerException) as e: 

383 self.logger.debug(traceback.format_exc()) 

384 self.logger.error( 

385 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e) 

386 ) 

387 ro_vim_item_update = { 

388 "vim_status": "VIM_ERROR", 

389 "created": created, 

390 "vim_message": str(e), 

391 } 

392 

393 return "FAILED", ro_vim_item_update 

394 

395 def delete(self, ro_task, task_index): 

396 task = ro_task["tasks"][task_index] 

397 task_id = task["task_id"] 

398 classification_vim_id = ro_task["vim_info"]["vim_id"] 

399 ro_vim_item_update_ok = { 

400 "vim_status": "DELETED", 

401 "created": False, 

402 "vim_message": "DELETED", 

403 "vim_id": None, 

404 } 

405 

406 try: 

407 if classification_vim_id: 

408 target_vim = self.my_vims[ro_task["target_id"]] 

409 target_vim.delete_classification(classification_vim_id) 

410 except vimconn.VimConnNotFoundException: 

411 ro_vim_item_update_ok["vim_message"] = "already deleted" 

412 except vimconn.VimConnException as e: 

413 self.logger.error( 

414 "ro_task={} vim={} del-classification={}: {}".format( 

415 ro_task["_id"], ro_task["target_id"], classification_vim_id, e 

416 ) 

417 ) 

418 ro_vim_item_update = { 

419 "vim_status": "VIM_ERROR", 

420 "vim_message": "Error while deleting: {}".format(e), 

421 } 

422 

423 return "FAILED", ro_vim_item_update 

424 

425 self.logger.debug( 

426 "task={} {} del-classification={} {}".format( 

427 task_id, 

428 ro_task["target_id"], 

429 classification_vim_id, 

430 ro_vim_item_update_ok.get("vim_message", ""), 

431 ) 

432 ) 

433 

434 return "DONE", ro_vim_item_update_ok 

435 

436 

437class VimInteractionSfi(VimInteractionBase): 

438 def new(self, ro_task, task_index, task_depends): 

439 task = ro_task["tasks"][task_index] 

440 task_id = task["task_id"] 

441 created = False 

442 target_vim = self.my_vims[ro_task["target_id"]] 

443 

444 try: 

445 created = True 

446 params = task["params"] 

447 params_copy = deepcopy(params) 

448 name = params_copy["name"] 

449 ingress_port = params_copy["ingress_port"] 

450 egress_port = params_copy["egress_port"] 

451 ingress_port_index = params_copy["ingress_port_index"] 

452 egress_port_index = params_copy["egress_port_index"] 

453 

454 ingress_port_id = ingress_port 

455 egress_port_id = egress_port 

456 

457 vm_id = task_depends[ingress_port] 

458 

459 if ingress_port.startswith("TASK-"): 

460 ingress_port_id = target_vim.refresh_vms_status([vm_id])[vm_id][ 

461 "interfaces" 

462 ][ingress_port_index]["vim_interface_id"] 

463 

464 if ingress_port == egress_port: 

465 egress_port_id = ingress_port_id 

466 else: 

467 if egress_port.startswith("TASK-"): 

468 egress_port_id = target_vim.refresh_vms_status([vm_id])[vm_id][ 

469 "interfaces" 

470 ][egress_port_index]["vim_interface_id"] 

471 

472 ingress_port_id_list = [ingress_port_id] 

473 egress_port_id_list = [egress_port_id] 

474 

475 vim_sfi_id = target_vim.new_sfi( 

476 name, ingress_port_id_list, egress_port_id_list, sfc_encap=False 

477 ) 

478 

479 ro_vim_item_update = { 

480 "vim_id": vim_sfi_id, 

481 "vim_status": "DONE", 

482 "created": created, 

483 "vim_details": None, 

484 "vim_message": None, 

485 } 

486 self.logger.debug( 

487 "task={} {} created={}".format(task_id, ro_task["target_id"], created) 

488 ) 

489 

490 return "DONE", ro_vim_item_update 

491 except (vimconn.VimConnException, NsWorkerException) as e: 

492 self.logger.debug(traceback.format_exc()) 

493 self.logger.error( 

494 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e) 

495 ) 

496 ro_vim_item_update = { 

497 "vim_status": "VIM_ERROR", 

498 "created": created, 

499 "vim_message": str(e), 

500 } 

501 

502 return "FAILED", ro_vim_item_update 

503 

504 def delete(self, ro_task, task_index): 

505 task = ro_task["tasks"][task_index] 

506 task_id = task["task_id"] 

507 sfi_vim_id = ro_task["vim_info"]["vim_id"] 

508 ro_vim_item_update_ok = { 

509 "vim_status": "DELETED", 

510 "created": False, 

511 "vim_message": "DELETED", 

512 "vim_id": None, 

513 } 

514 

515 try: 

516 if sfi_vim_id: 

517 target_vim = self.my_vims[ro_task["target_id"]] 

518 target_vim.delete_sfi(sfi_vim_id) 

519 except vimconn.VimConnNotFoundException: 

520 ro_vim_item_update_ok["vim_message"] = "already deleted" 

521 except vimconn.VimConnException as e: 

522 self.logger.error( 

523 "ro_task={} vim={} del-sfi={}: {}".format( 

524 ro_task["_id"], ro_task["target_id"], sfi_vim_id, e 

525 ) 

526 ) 

527 ro_vim_item_update = { 

528 "vim_status": "VIM_ERROR", 

529 "vim_message": "Error while deleting: {}".format(e), 

530 } 

531 

532 return "FAILED", ro_vim_item_update 

533 

534 self.logger.debug( 

535 "task={} {} del-sfi={} {}".format( 

536 task_id, 

537 ro_task["target_id"], 

538 sfi_vim_id, 

539 ro_vim_item_update_ok.get("vim_message", ""), 

540 ) 

541 ) 

542 

543 return "DONE", ro_vim_item_update_ok 

544 

545 

546class VimInteractionSf(VimInteractionBase): 

547 def new(self, ro_task, task_index, task_depends): 

548 task = ro_task["tasks"][task_index] 

549 task_id = task["task_id"] 

550 created = False 

551 target_vim = self.my_vims[ro_task["target_id"]] 

552 

553 try: 

554 created = True 

555 params = task["params"] 

556 params_copy = deepcopy(params) 

557 name = params_copy["name"] 

558 sfi_list = params_copy["sfis"] 

559 sfi_id_list = [] 

560 

561 for sfi in sfi_list: 

562 sfi_id = task_depends[sfi] if sfi.startswith("TASK-") else sfi 

563 sfi_id_list.append(sfi_id) 

564 

565 vim_sf_id = target_vim.new_sf(name, sfi_id_list, sfc_encap=False) 

566 

567 ro_vim_item_update = { 

568 "vim_id": vim_sf_id, 

569 "vim_status": "DONE", 

570 "created": created, 

571 "vim_details": None, 

572 "vim_message": None, 

573 } 

574 self.logger.debug( 

575 "task={} {} created={}".format(task_id, ro_task["target_id"], created) 

576 ) 

577 

578 return "DONE", ro_vim_item_update 

579 except (vimconn.VimConnException, NsWorkerException) as e: 

580 self.logger.debug(traceback.format_exc()) 

581 self.logger.error( 

582 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e) 

583 ) 

584 ro_vim_item_update = { 

585 "vim_status": "VIM_ERROR", 

586 "created": created, 

587 "vim_message": str(e), 

588 } 

589 

590 return "FAILED", ro_vim_item_update 

591 

592 def delete(self, ro_task, task_index): 

593 task = ro_task["tasks"][task_index] 

594 task_id = task["task_id"] 

595 sf_vim_id = ro_task["vim_info"]["vim_id"] 

596 ro_vim_item_update_ok = { 

597 "vim_status": "DELETED", 

598 "created": False, 

599 "vim_message": "DELETED", 

600 "vim_id": None, 

601 } 

602 

603 try: 

604 if sf_vim_id: 

605 target_vim = self.my_vims[ro_task["target_id"]] 

606 target_vim.delete_sf(sf_vim_id) 

607 except vimconn.VimConnNotFoundException: 

608 ro_vim_item_update_ok["vim_message"] = "already deleted" 

609 except vimconn.VimConnException as e: 

610 self.logger.error( 

611 "ro_task={} vim={} del-sf={}: {}".format( 

612 ro_task["_id"], ro_task["target_id"], sf_vim_id, e 

613 ) 

614 ) 

615 ro_vim_item_update = { 

616 "vim_status": "VIM_ERROR", 

617 "vim_message": "Error while deleting: {}".format(e), 

618 } 

619 

620 return "FAILED", ro_vim_item_update 

621 

622 self.logger.debug( 

623 "task={} {} del-sf={} {}".format( 

624 task_id, 

625 ro_task["target_id"], 

626 sf_vim_id, 

627 ro_vim_item_update_ok.get("vim_message", ""), 

628 ) 

629 ) 

630 

631 return "DONE", ro_vim_item_update_ok 

632 

633 

634class VimInteractionSfp(VimInteractionBase): 

635 def new(self, ro_task, task_index, task_depends): 

636 task = ro_task["tasks"][task_index] 

637 task_id = task["task_id"] 

638 created = False 

639 target_vim = self.my_vims[ro_task["target_id"]] 

640 

641 try: 

642 created = True 

643 params = task["params"] 

644 params_copy = deepcopy(params) 

645 name = params_copy["name"] 

646 sf_list = params_copy["sfs"] 

647 classification_list = params_copy["classifications"] 

648 

649 classification_id_list = [] 

650 sf_id_list = [] 

651 

652 for classification in classification_list: 

653 classi_id = ( 

654 task_depends[classification] 

655 if classification.startswith("TASK-") 

656 else classification 

657 ) 

658 classification_id_list.append(classi_id) 

659 

660 for sf in sf_list: 

661 sf_id = task_depends[sf] if sf.startswith("TASK-") else sf 

662 sf_id_list.append(sf_id) 

663 

664 vim_sfp_id = target_vim.new_sfp( 

665 name, classification_id_list, sf_id_list, sfc_encap=False 

666 ) 

667 

668 ro_vim_item_update = { 

669 "vim_id": vim_sfp_id, 

670 "vim_status": "DONE", 

671 "created": created, 

672 "vim_details": None, 

673 "vim_message": None, 

674 } 

675 self.logger.debug( 

676 "task={} {} created={}".format(task_id, ro_task["target_id"], created) 

677 ) 

678 

679 return "DONE", ro_vim_item_update 

680 except (vimconn.VimConnException, NsWorkerException) as e: 

681 self.logger.debug(traceback.format_exc()) 

682 self.logger.error( 

683 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e) 

684 ) 

685 ro_vim_item_update = { 

686 "vim_status": "VIM_ERROR", 

687 "created": created, 

688 "vim_message": str(e), 

689 } 

690 

691 return "FAILED", ro_vim_item_update 

692 

693 def delete(self, ro_task, task_index): 

694 task = ro_task["tasks"][task_index] 

695 task_id = task["task_id"] 

696 sfp_vim_id = ro_task["vim_info"]["vim_id"] 

697 ro_vim_item_update_ok = { 

698 "vim_status": "DELETED", 

699 "created": False, 

700 "vim_message": "DELETED", 

701 "vim_id": None, 

702 } 

703 

704 try: 

705 if sfp_vim_id: 

706 target_vim = self.my_vims[ro_task["target_id"]] 

707 target_vim.delete_sfp(sfp_vim_id) 

708 except vimconn.VimConnNotFoundException: 

709 ro_vim_item_update_ok["vim_message"] = "already deleted" 

710 except vimconn.VimConnException as e: 

711 self.logger.error( 

712 "ro_task={} vim={} del-sfp={}: {}".format( 

713 ro_task["_id"], ro_task["target_id"], sfp_vim_id, e 

714 ) 

715 ) 

716 ro_vim_item_update = { 

717 "vim_status": "VIM_ERROR", 

718 "vim_message": "Error while deleting: {}".format(e), 

719 } 

720 

721 return "FAILED", ro_vim_item_update 

722 

723 self.logger.debug( 

724 "task={} {} del-sfp={} {}".format( 

725 task_id, 

726 ro_task["target_id"], 

727 sfp_vim_id, 

728 ro_vim_item_update_ok.get("vim_message", ""), 

729 ) 

730 ) 

731 

732 return "DONE", ro_vim_item_update_ok 

733 

734 

735class VimInteractionVdu(VimInteractionBase): 

736 max_retries_inject_ssh_key = 20 # 20 times 

737 time_retries_inject_ssh_key = 30 # wevery 30 seconds 

738 

739 def new(self, ro_task, task_index, task_depends): 

740 task = ro_task["tasks"][task_index] 

741 task_id = task["task_id"] 

742 created = False 

743 target_vim = self.my_vims[ro_task["target_id"]] 

744 try: 

745 created = True 

746 params = task["params"] 

747 params_copy = deepcopy(params) 

748 net_list = params_copy["net_list"] 

749 

750 for net in net_list: 

751 # change task_id into network_id 

752 if "net_id" in net and net["net_id"].startswith("TASK-"): 

753 network_id = task_depends[net["net_id"]] 

754 

755 if not network_id: 

756 raise NsWorkerException( 

757 "Cannot create VM because depends on a network not created or found " 

758 "for {}".format(net["net_id"]) 

759 ) 

760 

761 net["net_id"] = network_id 

762 

763 if params_copy["image_id"].startswith("TASK-"): 

764 params_copy["image_id"] = task_depends[params_copy["image_id"]] 

765 

766 if params_copy["flavor_id"].startswith("TASK-"): 

767 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]] 

768 

769 affinity_group_list = params_copy["affinity_group_list"] 

770 for affinity_group in affinity_group_list: 

771 # change task_id into affinity_group_id 

772 if "affinity_group_id" in affinity_group and affinity_group[ 

773 "affinity_group_id" 

774 ].startswith("TASK-"): 

775 affinity_group_id = task_depends[ 

776 affinity_group["affinity_group_id"] 

777 ] 

778 

779 if not affinity_group_id: 

780 raise NsWorkerException( 

781 "found for {}".format(affinity_group["affinity_group_id"]) 

782 ) 

783 

784 affinity_group["affinity_group_id"] = affinity_group_id 

785 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy) 

786 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]] 

787 

788 # add to created items previous_created_volumes (healing) 

789 if task.get("previous_created_volumes"): 

790 for k, v in task["previous_created_volumes"].items(): 

791 created_items[k] = v 

792 

793 ro_vim_item_update = { 

794 "vim_id": vim_vm_id, 

795 "vim_status": "BUILD", 

796 "created": created, 

797 "created_items": created_items, 

798 "vim_details": None, 

799 "vim_message": None, 

800 "interfaces_vim_ids": interfaces, 

801 "interfaces": [], 

802 "interfaces_backup": [], 

803 } 

804 self.logger.debug( 

805 "task={} {} new-vm={} created={}".format( 

806 task_id, ro_task["target_id"], vim_vm_id, created 

807 ) 

808 ) 

809 

810 return "BUILD", ro_vim_item_update 

811 except (vimconn.VimConnException, NsWorkerException) as e: 

812 self.logger.debug(traceback.format_exc()) 

813 self.logger.error( 

814 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e) 

815 ) 

816 ro_vim_item_update = { 

817 "vim_status": "VIM_ERROR", 

818 "created": created, 

819 "vim_message": str(e), 

820 } 

821 

822 return "FAILED", ro_vim_item_update 

823 

824 def delete(self, ro_task, task_index): 

825 task = ro_task["tasks"][task_index] 

826 task_id = task["task_id"] 

827 vm_vim_id = ro_task["vim_info"]["vim_id"] 

828 ro_vim_item_update_ok = { 

829 "vim_status": "DELETED", 

830 "created": False, 

831 "vim_message": "DELETED", 

832 "vim_id": None, 

833 } 

834 

835 try: 

836 self.logger.debug( 

837 "delete_vminstance: vm_vim_id={} created_items={}".format( 

838 vm_vim_id, ro_task["vim_info"]["created_items"] 

839 ) 

840 ) 

841 if vm_vim_id or ro_task["vim_info"]["created_items"]: 

842 target_vim = self.my_vims[ro_task["target_id"]] 

843 target_vim.delete_vminstance( 

844 vm_vim_id, 

845 ro_task["vim_info"]["created_items"], 

846 ro_task["vim_info"].get("volumes_to_hold", []), 

847 ) 

848 except vimconn.VimConnNotFoundException: 

849 ro_vim_item_update_ok["vim_message"] = "already deleted" 

850 except vimconn.VimConnException as e: 

851 self.logger.error( 

852 "ro_task={} vim={} del-vm={}: {}".format( 

853 ro_task["_id"], ro_task["target_id"], vm_vim_id, e 

854 ) 

855 ) 

856 ro_vim_item_update = { 

857 "vim_status": "VIM_ERROR", 

858 "vim_message": "Error while deleting: {}".format(e), 

859 } 

860 

861 return "FAILED", ro_vim_item_update 

862 

863 self.logger.debug( 

864 "task={} {} del-vm={} {}".format( 

865 task_id, 

866 ro_task["target_id"], 

867 vm_vim_id, 

868 ro_vim_item_update_ok.get("vim_message", ""), 

869 ) 

870 ) 

871 

872 return "DONE", ro_vim_item_update_ok 

873 

874 def refresh(self, ro_task): 

875 """Call VIM to get vm status""" 

876 ro_task_id = ro_task["_id"] 

877 target_vim = self.my_vims[ro_task["target_id"]] 

878 vim_id = ro_task["vim_info"]["vim_id"] 

879 

880 if not vim_id: 

881 return None, None 

882 

883 vm_to_refresh_list = [vim_id] 

884 try: 

885 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list) 

886 vim_info = vim_dict[vim_id] 

887 

888 if vim_info["status"] == "ACTIVE": 

889 task_status = "DONE" 

890 elif vim_info["status"] == "BUILD": 

891 task_status = "BUILD" 

892 else: 

893 task_status = "FAILED" 

894 

895 # try to load and parse vim_information 

896 try: 

897 vim_info_info = yaml.safe_load(vim_info["vim_info"]) 

898 if vim_info_info.get("name"): 

899 vim_info["name"] = vim_info_info["name"] 

900 except Exception as vim_info_error: 

901 self.logger.exception( 

902 f"{vim_info_error} occured while getting the vim_info from yaml" 

903 ) 

904 except vimconn.VimConnException as e: 

905 # Mark all tasks at VIM_ERROR status 

906 self.logger.error( 

907 "ro_task={} vim={} get-vm={}: {}".format( 

908 ro_task_id, ro_task["target_id"], vim_id, e 

909 ) 

910 ) 

911 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)} 

912 task_status = "FAILED" 

913 

914 ro_vim_item_update = {} 

915 

916 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED 

917 vim_interfaces = [] 

918 if vim_info.get("interfaces"): 

919 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]: 

920 iface = next( 

921 ( 

922 iface 

923 for iface in vim_info["interfaces"] 

924 if vim_iface_id == iface["vim_interface_id"] 

925 ), 

926 None, 

927 ) 

928 # if iface: 

929 # iface.pop("vim_info", None) 

930 vim_interfaces.append(iface) 

931 

932 task_create = next( 

933 t 

934 for t in ro_task["tasks"] 

935 if t and t["action"] == "CREATE" and t["status"] != "FINISHED" 

936 ) 

937 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None: 

938 vim_interfaces[task_create["mgmt_vnf_interface"]][ 

939 "mgmt_vnf_interface" 

940 ] = True 

941 

942 mgmt_vdu_iface = task_create.get( 

943 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0) 

944 ) 

945 if vim_interfaces: 

946 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True 

947 

948 if ro_task["vim_info"]["interfaces"] != vim_interfaces: 

949 ro_vim_item_update["interfaces"] = vim_interfaces 

950 

951 if ro_task["vim_info"]["vim_status"] != vim_info["status"]: 

952 ro_vim_item_update["vim_status"] = vim_info["status"] 

953 

954 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"): 

955 ro_vim_item_update["vim_name"] = vim_info.get("name") 

956 

957 if vim_info["status"] in ("ERROR", "VIM_ERROR"): 

958 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"): 

959 ro_vim_item_update["vim_message"] = vim_info.get("error_msg") 

960 elif vim_info["status"] == "DELETED": 

961 ro_vim_item_update["vim_id"] = None 

962 ro_vim_item_update["vim_message"] = "Deleted externally" 

963 else: 

964 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]: 

965 ro_vim_item_update["vim_details"] = vim_info["vim_info"] 

966 

967 if ro_vim_item_update: 

968 self.logger.debug( 

969 "ro_task={} {} get-vm={}: status={} {}".format( 

970 ro_task_id, 

971 ro_task["target_id"], 

972 vim_id, 

973 ro_vim_item_update.get("vim_status"), 

974 ( 

975 ro_vim_item_update.get("vim_message") 

976 if ro_vim_item_update.get("vim_status") != "ACTIVE" 

977 else "" 

978 ), 

979 ) 

980 ) 

981 

982 return task_status, ro_vim_item_update 

983 

984 def exec(self, ro_task, task_index, task_depends): 

985 task = ro_task["tasks"][task_index] 

986 task_id = task["task_id"] 

987 target_vim = self.my_vims[ro_task["target_id"]] 

988 db_task_update = {"retries": 0} 

989 retries = task.get("retries", 0) 

990 

991 try: 

992 params = task["params"] 

993 params_copy = deepcopy(params) 

994 params_copy["ro_key"] = self.db.decrypt( 

995 params_copy.pop("private_key"), 

996 params_copy.pop("schema_version"), 

997 params_copy.pop("salt"), 

998 ) 

999 params_copy["ip_addr"] = params_copy.pop("ip_address") 

1000 target_vim.inject_user_key(**params_copy) 

1001 self.logger.debug( 

1002 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"]) 

1003 ) 

1004 

1005 return ( 

1006 "DONE", 

1007 None, 

1008 db_task_update, 

1009 ) # params_copy["key"] 

1010 except (vimconn.VimConnException, NsWorkerException) as e: 

1011 retries += 1 

1012 

1013 self.logger.debug(traceback.format_exc()) 

1014 if retries < self.max_retries_inject_ssh_key: 

1015 return ( 

1016 "BUILD", 

1017 None, 

1018 { 

1019 "retries": retries, 

1020 "next_retry": self.time_retries_inject_ssh_key, 

1021 }, 

1022 ) 

1023 

1024 self.logger.error( 

1025 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e) 

1026 ) 

1027 ro_vim_item_update = {"vim_message": str(e)} 

1028 

1029 return "FAILED", ro_vim_item_update, db_task_update 

1030 

1031 

1032class VimInteractionImage(VimInteractionBase): 

1033 def new(self, ro_task, task_index, task_depends): 

1034 task = ro_task["tasks"][task_index] 

1035 task_id = task["task_id"] 

1036 created = False 

1037 created_items = {} 

1038 target_vim = self.my_vims[ro_task["target_id"]] 

1039 

1040 try: 

1041 # FIND 

1042 vim_image_id = "" 

1043 if task.get("find_params"): 

1044 vim_images = target_vim.get_image_list( 

1045 task["find_params"].get("filter_dict", {}) 

1046 ) 

1047 

1048 if not vim_images: 

1049 raise NsWorkerExceptionNotFound( 

1050 "Image not found with this criteria: '{}'".format( 

1051 task["find_params"] 

1052 ) 

1053 ) 

1054 elif len(vim_images) > 1: 

1055 raise NsWorkerException( 

1056 "More than one image found with this criteria: '{}'".format( 

1057 task["find_params"] 

1058 ) 

1059 ) 

1060 else: 

1061 vim_image_id = vim_images[0]["id"] 

1062 

1063 ro_vim_item_update = { 

1064 "vim_id": vim_image_id, 

1065 "vim_status": "ACTIVE", 

1066 "created": created, 

1067 "created_items": created_items, 

1068 "vim_details": None, 

1069 "vim_message": None, 

1070 } 

1071 self.logger.debug( 

1072 "task={} {} new-image={} created={}".format( 

1073 task_id, ro_task["target_id"], vim_image_id, created 

1074 ) 

1075 ) 

1076 

1077 return "DONE", ro_vim_item_update 

1078 except (NsWorkerException, vimconn.VimConnException) as e: 

1079 self.logger.error( 

1080 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e) 

1081 ) 

1082 ro_vim_item_update = { 

1083 "vim_status": "VIM_ERROR", 

1084 "created": created, 

1085 "vim_message": str(e), 

1086 } 

1087 

1088 return "FAILED", ro_vim_item_update 

1089 

1090 

1091class VimInteractionSharedVolume(VimInteractionBase): 

1092 def delete(self, ro_task, task_index): 

1093 task = ro_task["tasks"][task_index] 

1094 task_id = task["task_id"] 

1095 shared_volume_vim_id = ro_task["vim_info"]["vim_id"] 

1096 created_items = ro_task["vim_info"]["created_items"] 

1097 ro_vim_item_update_ok = { 

1098 "vim_status": "DELETED", 

1099 "created": False, 

1100 "vim_message": "DELETED", 

1101 "vim_id": None, 

1102 } 

1103 if created_items and created_items.get(shared_volume_vim_id).get("keep"): 

1104 ro_vim_item_update_ok = { 

1105 "vim_status": "ACTIVE", 

1106 "created": False, 

1107 "vim_message": None, 

1108 } 

1109 return "DONE", ro_vim_item_update_ok 

1110 try: 

1111 if shared_volume_vim_id: 

1112 target_vim = self.my_vims[ro_task["target_id"]] 

1113 target_vim.delete_shared_volumes(shared_volume_vim_id) 

1114 except vimconn.VimConnNotFoundException: 

1115 ro_vim_item_update_ok["vim_message"] = "already deleted" 

1116 except vimconn.VimConnException as e: 

1117 self.logger.error( 

1118 "ro_task={} vim={} del-shared-volume={}: {}".format( 

1119 ro_task["_id"], ro_task["target_id"], shared_volume_vim_id, e 

1120 ) 

1121 ) 

1122 ro_vim_item_update = { 

1123 "vim_status": "VIM_ERROR", 

1124 "vim_message": "Error while deleting: {}".format(e), 

1125 } 

1126 

1127 return "FAILED", ro_vim_item_update 

1128 

1129 self.logger.debug( 

1130 "task={} {} del-shared-volume={} {}".format( 

1131 task_id, 

1132 ro_task["target_id"], 

1133 shared_volume_vim_id, 

1134 ro_vim_item_update_ok.get("vim_message", ""), 

1135 ) 

1136 ) 

1137 

1138 return "DONE", ro_vim_item_update_ok 

1139 

1140 def new(self, ro_task, task_index, task_depends): 

1141 task = ro_task["tasks"][task_index] 

1142 task_id = task["task_id"] 

1143 created = False 

1144 created_items = {} 

1145 target_vim = self.my_vims[ro_task["target_id"]] 

1146 

1147 try: 

1148 shared_volume_vim_id = None 

1149 shared_volume_data = None 

1150 

1151 if task.get("params"): 

1152 shared_volume_data = task["params"] 

1153 

1154 if shared_volume_data: 

1155 self.logger.info( 

1156 f"Creating the new shared_volume for {shared_volume_data}\n" 

1157 ) 

1158 ( 

1159 shared_volume_name, 

1160 shared_volume_vim_id, 

1161 ) = target_vim.new_shared_volumes(shared_volume_data) 

1162 created = True 

1163 created_items[shared_volume_vim_id] = { 

1164 "name": shared_volume_name, 

1165 "keep": shared_volume_data.get("keep"), 

1166 } 

1167 

1168 ro_vim_item_update = { 

1169 "vim_id": shared_volume_vim_id, 

1170 "vim_status": "ACTIVE", 

1171 "created": created, 

1172 "created_items": created_items, 

1173 "vim_details": None, 

1174 "vim_message": None, 

1175 } 

1176 self.logger.debug( 

1177 "task={} {} new-shared-volume={} created={}".format( 

1178 task_id, ro_task["target_id"], shared_volume_vim_id, created 

1179 ) 

1180 ) 

1181 

1182 return "DONE", ro_vim_item_update 

1183 except (vimconn.VimConnException, NsWorkerException) as e: 

1184 self.logger.error( 

1185 "task={} vim={} new-shared-volume:" 

1186 " {}".format(task_id, ro_task["target_id"], e) 

1187 ) 

1188 ro_vim_item_update = { 

1189 "vim_status": "VIM_ERROR", 

1190 "created": created, 

1191 "vim_message": str(e), 

1192 } 

1193 

1194 return "FAILED", ro_vim_item_update 

1195 

1196 

1197class VimInteractionFlavor(VimInteractionBase): 

1198 def delete(self, ro_task, task_index): 

1199 task = ro_task["tasks"][task_index] 

1200 task_id = task["task_id"] 

1201 flavor_vim_id = ro_task["vim_info"]["vim_id"] 

1202 ro_vim_item_update_ok = { 

1203 "vim_status": "DELETED", 

1204 "created": False, 

1205 "vim_message": "DELETED", 

1206 "vim_id": None, 

1207 } 

1208 

1209 try: 

1210 if flavor_vim_id: 

1211 target_vim = self.my_vims[ro_task["target_id"]] 

1212 target_vim.delete_flavor(flavor_vim_id) 

1213 except vimconn.VimConnNotFoundException: 

1214 ro_vim_item_update_ok["vim_message"] = "already deleted" 

1215 except vimconn.VimConnException as e: 

1216 self.logger.error( 

1217 "ro_task={} vim={} del-flavor={}: {}".format( 

1218 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e 

1219 ) 

1220 ) 

1221 ro_vim_item_update = { 

1222 "vim_status": "VIM_ERROR", 

1223 "vim_message": "Error while deleting: {}".format(e), 

1224 } 

1225 

1226 return "FAILED", ro_vim_item_update 

1227 

1228 self.logger.debug( 

1229 "task={} {} del-flavor={} {}".format( 

1230 task_id, 

1231 ro_task["target_id"], 

1232 flavor_vim_id, 

1233 ro_vim_item_update_ok.get("vim_message", ""), 

1234 ) 

1235 ) 

1236 

1237 return "DONE", ro_vim_item_update_ok 

1238 

1239 def new(self, ro_task, task_index, task_depends): 

1240 task = ro_task["tasks"][task_index] 

1241 task_id = task["task_id"] 

1242 created = False 

1243 created_items = {} 

1244 target_vim = self.my_vims[ro_task["target_id"]] 

1245 try: 

1246 # FIND 

1247 vim_flavor_id = None 

1248 

1249 if task.get("find_params", {}).get("vim_flavor_id"): 

1250 vim_flavor_id = task["find_params"]["vim_flavor_id"] 

1251 elif task.get("find_params", {}).get("flavor_data"): 

1252 try: 

1253 flavor_data = task["find_params"]["flavor_data"] 

1254 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data) 

1255 except vimconn.VimConnNotFoundException as flavor_not_found_msg: 

1256 self.logger.warning( 

1257 f"VimConnNotFoundException occured: {flavor_not_found_msg}" 

1258 ) 

1259 

1260 if not vim_flavor_id and task.get("params"): 

1261 # CREATE 

1262 flavor_data = task["params"]["flavor_data"] 

1263 vim_flavor_id = target_vim.new_flavor(flavor_data) 

1264 created = True 

1265 

1266 ro_vim_item_update = { 

1267 "vim_id": vim_flavor_id, 

1268 "vim_status": "ACTIVE", 

1269 "created": created, 

1270 "created_items": created_items, 

1271 "vim_details": None, 

1272 "vim_message": None, 

1273 } 

1274 self.logger.debug( 

1275 "task={} {} new-flavor={} created={}".format( 

1276 task_id, ro_task["target_id"], vim_flavor_id, created 

1277 ) 

1278 ) 

1279 

1280 return "DONE", ro_vim_item_update 

1281 except (vimconn.VimConnException, NsWorkerException) as e: 

1282 self.logger.error( 

1283 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e) 

1284 ) 

1285 ro_vim_item_update = { 

1286 "vim_status": "VIM_ERROR", 

1287 "created": created, 

1288 "vim_message": str(e), 

1289 } 

1290 

1291 return "FAILED", ro_vim_item_update 

1292 

1293 

1294class VimInteractionAffinityGroup(VimInteractionBase): 

1295 def delete(self, ro_task, task_index): 

1296 task = ro_task["tasks"][task_index] 

1297 task_id = task["task_id"] 

1298 affinity_group_vim_id = ro_task["vim_info"]["vim_id"] 

1299 ro_vim_item_update_ok = { 

1300 "vim_status": "DELETED", 

1301 "created": False, 

1302 "vim_message": "DELETED", 

1303 "vim_id": None, 

1304 } 

1305 

1306 try: 

1307 if affinity_group_vim_id: 

1308 target_vim = self.my_vims[ro_task["target_id"]] 

1309 target_vim.delete_affinity_group(affinity_group_vim_id) 

1310 except vimconn.VimConnNotFoundException: 

1311 ro_vim_item_update_ok["vim_message"] = "already deleted" 

1312 except vimconn.VimConnException as e: 

1313 self.logger.error( 

1314 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format( 

1315 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e 

1316 ) 

1317 ) 

1318 ro_vim_item_update = { 

1319 "vim_status": "VIM_ERROR", 

1320 "vim_message": "Error while deleting: {}".format(e), 

1321 } 

1322 

1323 return "FAILED", ro_vim_item_update 

1324 

1325 self.logger.debug( 

1326 "task={} {} del-affinity-or-anti-affinity-group={} {}".format( 

1327 task_id, 

1328 ro_task["target_id"], 

1329 affinity_group_vim_id, 

1330 ro_vim_item_update_ok.get("vim_message", ""), 

1331 ) 

1332 ) 

1333 

1334 return "DONE", ro_vim_item_update_ok 

1335 

1336 def new(self, ro_task, task_index, task_depends): 

1337 task = ro_task["tasks"][task_index] 

1338 task_id = task["task_id"] 

1339 created = False 

1340 created_items = {} 

1341 target_vim = self.my_vims[ro_task["target_id"]] 

1342 

1343 try: 

1344 affinity_group_vim_id = None 

1345 affinity_group_data = None 

1346 param_affinity_group_id = "" 

1347 

1348 if task.get("params"): 

1349 affinity_group_data = task["params"].get("affinity_group_data") 

1350 

1351 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"): 

1352 try: 

1353 param_affinity_group_id = task["params"]["affinity_group_data"].get( 

1354 "vim-affinity-group-id" 

1355 ) 

1356 affinity_group_vim_id = target_vim.get_affinity_group( 

1357 param_affinity_group_id 

1358 ).get("id") 

1359 except vimconn.VimConnNotFoundException: 

1360 self.logger.error( 

1361 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}" 

1362 "could not be found at VIM. Creating a new one.".format( 

1363 task_id, ro_task["target_id"], param_affinity_group_id 

1364 ) 

1365 ) 

1366 

1367 if not affinity_group_vim_id and affinity_group_data: 

1368 affinity_group_vim_id = target_vim.new_affinity_group( 

1369 affinity_group_data 

1370 ) 

1371 created = True 

1372 

1373 ro_vim_item_update = { 

1374 "vim_id": affinity_group_vim_id, 

1375 "vim_status": "ACTIVE", 

1376 "created": created, 

1377 "created_items": created_items, 

1378 "vim_details": None, 

1379 "vim_message": None, 

1380 } 

1381 self.logger.debug( 

1382 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format( 

1383 task_id, ro_task["target_id"], affinity_group_vim_id, created 

1384 ) 

1385 ) 

1386 

1387 return "DONE", ro_vim_item_update 

1388 except (vimconn.VimConnException, NsWorkerException) as e: 

1389 self.logger.error( 

1390 "task={} vim={} new-affinity-or-anti-affinity-group:" 

1391 " {}".format(task_id, ro_task["target_id"], e) 

1392 ) 

1393 ro_vim_item_update = { 

1394 "vim_status": "VIM_ERROR", 

1395 "created": created, 

1396 "vim_message": str(e), 

1397 } 

1398 

1399 return "FAILED", ro_vim_item_update 

1400 

1401 

1402class VimInteractionUpdateVdu(VimInteractionBase): 

1403 def exec(self, ro_task, task_index, task_depends): 

1404 task = ro_task["tasks"][task_index] 

1405 task_id = task["task_id"] 

1406 db_task_update = {"retries": 0} 

1407 target_vim = self.my_vims[ro_task["target_id"]] 

1408 

1409 try: 

1410 vim_vm_id = "" 

1411 if task.get("params"): 

1412 vim_vm_id = task["params"].get("vim_vm_id") 

1413 action = task["params"].get("action") 

1414 context = {action: action} 

1415 target_vim.action_vminstance(vim_vm_id, context) 

1416 # created = True 

1417 ro_vim_item_update = { 

1418 "vim_id": vim_vm_id, 

1419 "vim_status": "ACTIVE", 

1420 } 

1421 self.logger.debug( 

1422 "task={} {} vm-migration done".format(task_id, ro_task["target_id"]) 

1423 ) 

1424 return "DONE", ro_vim_item_update, db_task_update 

1425 except (vimconn.VimConnException, NsWorkerException) as e: 

1426 self.logger.error( 

1427 "task={} vim={} VM Migration:" 

1428 " {}".format(task_id, ro_task["target_id"], e) 

1429 ) 

1430 ro_vim_item_update = { 

1431 "vim_status": "VIM_ERROR", 

1432 "vim_message": str(e), 

1433 } 

1434 

1435 return "FAILED", ro_vim_item_update, db_task_update 

1436 

1437 

1438class VimInteractionSdnNet(VimInteractionBase): 

1439 @staticmethod 

1440 def _match_pci(port_pci, mapping): 

1441 """ 

1442 Check if port_pci matches with mapping. 

1443 The mapping can have brackets to indicate that several chars are accepted. e.g 

1444 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]' 

1445 :param port_pci: text 

1446 :param mapping: text, can contain brackets to indicate several chars are available 

1447 :return: True if matches, False otherwise 

1448 """ 

1449 if not port_pci or not mapping: 

1450 return False 

1451 if port_pci == mapping: 

1452 return True 

1453 

1454 mapping_index = 0 

1455 pci_index = 0 

1456 while True: 

1457 bracket_start = mapping.find("[", mapping_index) 

1458 

1459 if bracket_start == -1: 

1460 break 

1461 

1462 bracket_end = mapping.find("]", bracket_start) 

1463 if bracket_end == -1: 

1464 break 

1465 

1466 length = bracket_start - mapping_index 

1467 if ( 

1468 length 

1469 and port_pci[pci_index : pci_index + length] 

1470 != mapping[mapping_index:bracket_start] 

1471 ): 

1472 return False 

1473 

1474 if ( 

1475 port_pci[pci_index + length] 

1476 not in mapping[bracket_start + 1 : bracket_end] 

1477 ): 

1478 return False 

1479 

1480 pci_index += length + 1 

1481 mapping_index = bracket_end + 1 

1482 

1483 if port_pci[pci_index:] != mapping[mapping_index:]: 

1484 return False 

1485 

1486 return True 

1487 

1488 def _get_interfaces(self, vlds_to_connect, vim_account_id): 

1489 """ 

1490 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id> 

1491 :param vim_account_id: 

1492 :return: 

1493 """ 

1494 interfaces = [] 

1495 

1496 for vld in vlds_to_connect: 

1497 table, _, db_id = vld.partition(":") 

1498 db_id, _, vld = db_id.partition(":") 

1499 _, _, vld_id = vld.partition(".") 

1500 

1501 if table == "vnfrs": 

1502 q_filter = {"vim-account-id": vim_account_id, "_id": db_id} 

1503 iface_key = "vnf-vld-id" 

1504 else: # table == "nsrs" 

1505 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id} 

1506 iface_key = "ns-vld-id" 

1507 

1508 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter) 

1509 

1510 for db_vnfr in db_vnfrs: 

1511 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())): 

1512 for iface_index, interface in enumerate(vdur["interfaces"]): 

1513 if interface.get(iface_key) == vld_id and interface.get( 

1514 "type" 

1515 ) in ("SR-IOV", "PCI-PASSTHROUGH"): 

1516 # only SR-IOV o PT 

1517 interface_ = interface.copy() 

1518 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format( 

1519 db_vnfr["_id"], vdu_index, iface_index 

1520 ) 

1521 

1522 if vdur.get("status") == "ERROR": 

1523 interface_["status"] = "ERROR" 

1524 

1525 interfaces.append(interface_) 

1526 

1527 return interfaces 

1528 

1529 def refresh(self, ro_task): 

1530 # look for task create 

1531 task_create_index, _ = next( 

1532 i_t 

1533 for i_t in enumerate(ro_task["tasks"]) 

1534 if i_t[1] 

1535 and i_t[1]["action"] == "CREATE" 

1536 and i_t[1]["status"] != "FINISHED" 

1537 ) 

1538 

1539 return self.new(ro_task, task_create_index, None) 

1540 

1541 def new(self, ro_task, task_index, task_depends): 

1542 task = ro_task["tasks"][task_index] 

1543 task_id = task["task_id"] 

1544 target_vim = self.my_vims[ro_task["target_id"]] 

1545 

1546 sdn_net_id = ro_task["vim_info"]["vim_id"] 

1547 

1548 created_items = ro_task["vim_info"].get("created_items") 

1549 connected_ports = ro_task["vim_info"].get("connected_ports", []) 

1550 new_connected_ports = [] 

1551 last_update = ro_task["vim_info"].get("last_update", 0) 

1552 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD" 

1553 error_list = [] 

1554 created = ro_task["vim_info"].get("created", False) 

1555 

1556 try: 

1557 # CREATE 

1558 db_vim = {} 

1559 params = task["params"] 

1560 vlds_to_connect = params.get("vlds", []) 

1561 associated_vim = params.get("target_vim") 

1562 # external additional ports 

1563 additional_ports = params.get("sdn-ports") or () 

1564 _, _, vim_account_id = ( 

1565 (None, None, None) 

1566 if associated_vim is None 

1567 else associated_vim.partition(":") 

1568 ) 

1569 

1570 if associated_vim: 

1571 # get associated VIM 

1572 if associated_vim not in self.db_vims: 

1573 self.db_vims[associated_vim] = self.db.get_one( 

1574 "vim_accounts", {"_id": vim_account_id} 

1575 ) 

1576 

1577 db_vim = self.db_vims[associated_vim] 

1578 

1579 # look for ports to connect 

1580 ports = self._get_interfaces(vlds_to_connect, vim_account_id) 

1581 # print(ports) 

1582 

1583 sdn_ports = [] 

1584 pending_ports = error_ports = 0 

1585 vlan_used = None 

1586 sdn_need_update = False 

1587 

1588 for port in ports: 

1589 vlan_used = port.get("vlan") or vlan_used 

1590 

1591 # TODO. Do not connect if already done 

1592 if not port.get("compute_node") or not port.get("pci"): 

1593 if port.get("status") == "ERROR": 

1594 error_ports += 1 

1595 else: 

1596 pending_ports += 1 

1597 continue 

1598 

1599 pmap = None 

1600 compute_node_mappings = next( 

1601 ( 

1602 c 

1603 for c in db_vim["config"].get("sdn-port-mapping", ()) 

1604 if c and c["compute_node"] == port["compute_node"] 

1605 ), 

1606 None, 

1607 ) 

1608 

1609 if compute_node_mappings: 

1610 # process port_mapping pci of type 0000:af:1[01].[1357] 

1611 pmap = next( 

1612 ( 

1613 p 

1614 for p in compute_node_mappings["ports"] 

1615 if self._match_pci(port["pci"], p.get("pci")) 

1616 ), 

1617 None, 

1618 ) 

1619 

1620 if not pmap: 

1621 if not db_vim["config"].get("mapping_not_needed"): 

1622 error_list.append( 

1623 "Port mapping not found for compute_node={} pci={}".format( 

1624 port["compute_node"], port["pci"] 

1625 ) 

1626 ) 

1627 continue 

1628 

1629 pmap = {} 

1630 

1631 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"]) 

1632 new_port = { 

1633 "service_endpoint_id": pmap.get("service_endpoint_id") 

1634 or service_endpoint_id, 

1635 "service_endpoint_encapsulation_type": ( 

1636 "dot1q" if port["type"] == "SR-IOV" else None 

1637 ), 

1638 "service_endpoint_encapsulation_info": { 

1639 "vlan": port.get("vlan"), 

1640 "mac": port.get("mac-address"), 

1641 "device_id": pmap.get("device_id") or port["compute_node"], 

1642 "device_interface_id": pmap.get("device_interface_id") 

1643 or port["pci"], 

1644 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"), 

1645 "switch_port": pmap.get("switch_port"), 

1646 "service_mapping_info": pmap.get("service_mapping_info"), 

1647 }, 

1648 } 

1649 

1650 # TODO 

1651 # if port["modified_at"] > last_update: 

1652 # sdn_need_update = True 

1653 new_connected_ports.append(port["id"]) # TODO 

1654 sdn_ports.append(new_port) 

1655 

1656 if error_ports: 

1657 error_list.append( 

1658 "{} interfaces have not been created as VDU is on ERROR status".format( 

1659 error_ports 

1660 ) 

1661 ) 

1662 

1663 # connect external ports 

1664 for index, additional_port in enumerate(additional_ports): 

1665 additional_port_id = additional_port.get( 

1666 "service_endpoint_id" 

1667 ) or "external-{}".format(index) 

1668 sdn_ports.append( 

1669 { 

1670 "service_endpoint_id": additional_port_id, 

1671 "service_endpoint_encapsulation_type": additional_port.get( 

1672 "service_endpoint_encapsulation_type", "dot1q" 

1673 ), 

1674 "service_endpoint_encapsulation_info": { 

1675 "vlan": additional_port.get("vlan") or vlan_used, 

1676 "mac": additional_port.get("mac_address"), 

1677 "device_id": additional_port.get("device_id"), 

1678 "device_interface_id": additional_port.get( 

1679 "device_interface_id" 

1680 ), 

1681 "switch_dpid": additional_port.get("switch_dpid") 

1682 or additional_port.get("switch_id"), 

1683 "switch_port": additional_port.get("switch_port"), 

1684 "service_mapping_info": additional_port.get( 

1685 "service_mapping_info" 

1686 ), 

1687 }, 

1688 } 

1689 ) 

1690 new_connected_ports.append(additional_port_id) 

1691 sdn_info = "" 

1692 

1693 # if there are more ports to connect or they have been modified, call create/update 

1694 if error_list: 

1695 sdn_status = "ERROR" 

1696 sdn_info = "; ".join(error_list) 

1697 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update: 

1698 last_update = time.time() 

1699 

1700 if not sdn_net_id: 

1701 if len(sdn_ports) < 2: 

1702 sdn_status = "ACTIVE" 

1703 

1704 if not pending_ports: 

1705 self.logger.debug( 

1706 "task={} {} new-sdn-net done, less than 2 ports".format( 

1707 task_id, ro_task["target_id"] 

1708 ) 

1709 ) 

1710 else: 

1711 net_type = params.get("type") or "ELAN" 

1712 ( 

1713 sdn_net_id, 

1714 created_items, 

1715 ) = target_vim.create_connectivity_service(net_type, sdn_ports) 

1716 created = True 

1717 self.logger.debug( 

1718 "task={} {} new-sdn-net={} created={}".format( 

1719 task_id, ro_task["target_id"], sdn_net_id, created 

1720 ) 

1721 ) 

1722 else: 

1723 created_items = target_vim.edit_connectivity_service( 

1724 sdn_net_id, conn_info=created_items, connection_points=sdn_ports 

1725 ) 

1726 created = True 

1727 self.logger.debug( 

1728 "task={} {} update-sdn-net={} created={}".format( 

1729 task_id, ro_task["target_id"], sdn_net_id, created 

1730 ) 

1731 ) 

1732 

1733 connected_ports = new_connected_ports 

1734 elif sdn_net_id: 

1735 wim_status_dict = target_vim.get_connectivity_service_status( 

1736 sdn_net_id, conn_info=created_items 

1737 ) 

1738 sdn_status = wim_status_dict["sdn_status"] 

1739 

1740 if wim_status_dict.get("sdn_info"): 

1741 sdn_info = str(wim_status_dict.get("sdn_info")) or "" 

1742 

1743 if wim_status_dict.get("error_msg"): 

1744 sdn_info = wim_status_dict.get("error_msg") or "" 

1745 

1746 if pending_ports: 

1747 if sdn_status != "ERROR": 

1748 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format( 

1749 len(ports) - pending_ports, len(ports) 

1750 ) 

1751 

1752 if sdn_status == "ACTIVE": 

1753 sdn_status = "BUILD" 

1754 

1755 ro_vim_item_update = { 

1756 "vim_id": sdn_net_id, 

1757 "vim_status": sdn_status, 

1758 "created": created, 

1759 "created_items": created_items, 

1760 "connected_ports": connected_ports, 

1761 "vim_details": sdn_info, 

1762 "vim_message": None, 

1763 "last_update": last_update, 

1764 } 

1765 

1766 return sdn_status, ro_vim_item_update 

1767 except Exception as e: 

1768 self.logger.error( 

1769 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e), 

1770 exc_info=not isinstance( 

1771 e, (sdnconn.SdnConnectorError, vimconn.VimConnException) 

1772 ), 

1773 ) 

1774 ro_vim_item_update = { 

1775 "vim_status": "VIM_ERROR", 

1776 "created": created, 

1777 "vim_message": str(e), 

1778 } 

1779 

1780 return "FAILED", ro_vim_item_update 

1781 

1782 def delete(self, ro_task, task_index): 

1783 task = ro_task["tasks"][task_index] 

1784 task_id = task["task_id"] 

1785 sdn_vim_id = ro_task["vim_info"].get("vim_id") 

1786 ro_vim_item_update_ok = { 

1787 "vim_status": "DELETED", 

1788 "created": False, 

1789 "vim_message": "DELETED", 

1790 "vim_id": None, 

1791 } 

1792 

1793 try: 

1794 if sdn_vim_id: 

1795 target_vim = self.my_vims[ro_task["target_id"]] 

1796 target_vim.delete_connectivity_service( 

1797 sdn_vim_id, ro_task["vim_info"].get("created_items") 

1798 ) 

1799 

1800 except Exception as e: 

1801 if ( 

1802 isinstance(e, sdnconn.SdnConnectorError) 

1803 and e.http_code == HTTPStatus.NOT_FOUND.value 

1804 ): 

1805 ro_vim_item_update_ok["vim_message"] = "already deleted" 

1806 else: 

1807 self.logger.error( 

1808 "ro_task={} vim={} del-sdn-net={}: {}".format( 

1809 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e 

1810 ), 

1811 exc_info=not isinstance( 

1812 e, (sdnconn.SdnConnectorError, vimconn.VimConnException) 

1813 ), 

1814 ) 

1815 ro_vim_item_update = { 

1816 "vim_status": "VIM_ERROR", 

1817 "vim_message": "Error while deleting: {}".format(e), 

1818 } 

1819 

1820 return "FAILED", ro_vim_item_update 

1821 

1822 self.logger.debug( 

1823 "task={} {} del-sdn-net={} {}".format( 

1824 task_id, 

1825 ro_task["target_id"], 

1826 sdn_vim_id, 

1827 ro_vim_item_update_ok.get("vim_message", ""), 

1828 ) 

1829 ) 

1830 

1831 return "DONE", ro_vim_item_update_ok 

1832 

1833 

1834class VimInteractionMigration(VimInteractionBase): 

1835 def exec(self, ro_task, task_index, task_depends): 

1836 task = ro_task["tasks"][task_index] 

1837 task_id = task["task_id"] 

1838 db_task_update = {"retries": 0} 

1839 target_vim = self.my_vims[ro_task["target_id"]] 

1840 vim_interfaces = [] 

1841 refreshed_vim_info = {} 

1842 

1843 try: 

1844 vim_vm_id = "" 

1845 if task.get("params"): 

1846 vim_vm_id = task["params"].get("vim_vm_id") 

1847 migrate_host = task["params"].get("migrate_host") 

1848 _, migrated_compute_node = target_vim.migrate_instance( 

1849 vim_vm_id, migrate_host 

1850 ) 

1851 

1852 if migrated_compute_node: 

1853 # When VM is migrated, vdu["vim_info"] needs to be updated 

1854 vdu_old_vim_info = task["params"]["vdu_vim_info"].get( 

1855 ro_task["target_id"] 

1856 ) 

1857 

1858 # Refresh VM to get new vim_info 

1859 vm_to_refresh_list = [vim_vm_id] 

1860 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list) 

1861 refreshed_vim_info = vim_dict[vim_vm_id] 

1862 

1863 if refreshed_vim_info.get("interfaces"): 

1864 for old_iface in vdu_old_vim_info.get("interfaces"): 

1865 iface = next( 

1866 ( 

1867 iface 

1868 for iface in refreshed_vim_info["interfaces"] 

1869 if old_iface["vim_interface_id"] 

1870 == iface["vim_interface_id"] 

1871 ), 

1872 None, 

1873 ) 

1874 vim_interfaces.append(iface) 

1875 

1876 ro_vim_item_update = { 

1877 "vim_id": vim_vm_id, 

1878 "vim_status": "ACTIVE", 

1879 "vim_details": None, 

1880 "vim_message": None, 

1881 } 

1882 

1883 if refreshed_vim_info and refreshed_vim_info.get("status") not in ( 

1884 "ERROR", 

1885 "VIM_ERROR", 

1886 ): 

1887 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"] 

1888 

1889 if vim_interfaces: 

1890 ro_vim_item_update["interfaces"] = vim_interfaces 

1891 

1892 self.logger.debug( 

1893 "task={} {} vm-migration done".format(task_id, ro_task["target_id"]) 

1894 ) 

1895 

1896 return "DONE", ro_vim_item_update, db_task_update 

1897 

1898 except (vimconn.VimConnException, NsWorkerException) as e: 

1899 self.logger.error( 

1900 "task={} vim={} VM Migration:" 

1901 " {}".format(task_id, ro_task["target_id"], e) 

1902 ) 

1903 ro_vim_item_update = { 

1904 "vim_status": "VIM_ERROR", 

1905 "vim_message": str(e), 

1906 } 

1907 

1908 return "FAILED", ro_vim_item_update, db_task_update 

1909 

1910 

1911class VimInteractionResize(VimInteractionBase): 

1912 def exec(self, ro_task, task_index, task_depends): 

1913 task = ro_task["tasks"][task_index] 

1914 task_id = task["task_id"] 

1915 db_task_update = {"retries": 0} 

1916 target_flavor_uuid = None 

1917 refreshed_vim_info = {} 

1918 target_vim = self.my_vims[ro_task["target_id"]] 

1919 

1920 try: 

1921 params = task["params"] 

1922 params_copy = deepcopy(params) 

1923 target_flavor_uuid = task_depends[params_copy["flavor_id"]] 

1924 vim_vm_id = "" 

1925 if task.get("params"): 

1926 self.logger.info("vim_vm_id %s", vim_vm_id) 

1927 

1928 if target_flavor_uuid is not None: 

1929 resized_status = target_vim.resize_instance( 

1930 vim_vm_id, target_flavor_uuid 

1931 ) 

1932 

1933 if resized_status: 

1934 # Refresh VM to get new vim_info 

1935 vm_to_refresh_list = [vim_vm_id] 

1936 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list) 

1937 refreshed_vim_info = vim_dict[vim_vm_id] 

1938 

1939 ro_vim_item_update = { 

1940 "vim_id": vim_vm_id, 

1941 "vim_status": "ACTIVE", 

1942 "vim_details": None, 

1943 "vim_message": None, 

1944 } 

1945 

1946 if refreshed_vim_info and refreshed_vim_info.get("status") not in ( 

1947 "ERROR", 

1948 "VIM_ERROR", 

1949 ): 

1950 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"] 

1951 

1952 self.logger.debug( 

1953 "task={} {} resize done".format(task_id, ro_task["target_id"]) 

1954 ) 

1955 return "DONE", ro_vim_item_update, db_task_update 

1956 except (vimconn.VimConnException, NsWorkerException) as e: 

1957 self.logger.error( 

1958 "task={} vim={} Resize:" " {}".format(task_id, ro_task["target_id"], e) 

1959 ) 

1960 ro_vim_item_update = { 

1961 "vim_status": "VIM_ERROR", 

1962 "vim_message": str(e), 

1963 } 

1964 

1965 return "FAILED", ro_vim_item_update, db_task_update 

1966 

1967 

1968class ConfigValidate: 

1969 def __init__(self, config: Dict): 

1970 self.conf = config 

1971 

1972 @property 

1973 def active(self): 

1974 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks 

1975 if ( 

1976 self.conf["period"]["refresh_active"] >= 60 

1977 or self.conf["period"]["refresh_active"] == -1 

1978 ): 

1979 return self.conf["period"]["refresh_active"] 

1980 

1981 return 60 

1982 

1983 @property 

1984 def build(self): 

1985 return self.conf["period"]["refresh_build"] 

1986 

1987 @property 

1988 def image(self): 

1989 return self.conf["period"]["refresh_image"] 

1990 

1991 @property 

1992 def error(self): 

1993 return self.conf["period"]["refresh_error"] 

1994 

1995 @property 

1996 def queue_size(self): 

1997 return self.conf["period"]["queue_size"] 

1998 

1999 

2000class NsWorker(threading.Thread): 

2001 def __init__(self, worker_index, config, plugins, db): 

2002 """ 

2003 :param worker_index: thread index 

2004 :param config: general configuration of RO, among others the process_id with the docker id where it runs 

2005 :param plugins: global shared dict with the loaded plugins 

2006 :param db: database class instance to use 

2007 """ 

2008 threading.Thread.__init__(self) 

2009 self.config = config 

2010 self.plugins = plugins 

2011 self.plugin_name = "unknown" 

2012 self.logger = logging.getLogger("ro.worker{}".format(worker_index)) 

2013 self.worker_index = worker_index 

2014 # refresh periods for created items 

2015 self.refresh_config = ConfigValidate(config) 

2016 self.task_queue = queue.Queue(self.refresh_config.queue_size) 

2017 # targetvim: vimplugin class 

2018 self.my_vims = {} 

2019 # targetvim: vim information from database 

2020 self.db_vims = {} 

2021 # targetvim list 

2022 self.vim_targets = [] 

2023 self.my_id = config["process_id"] + ":" + str(worker_index) 

2024 self.db = db 

2025 self.item2class = { 

2026 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger), 

2027 "shared-volumes": VimInteractionSharedVolume( 

2028 self.db, self.my_vims, self.db_vims, self.logger 

2029 ), 

2030 "classification": VimInteractionClassification( 

2031 self.db, self.my_vims, self.db_vims, self.logger 

2032 ), 

2033 "sfi": VimInteractionSfi(self.db, self.my_vims, self.db_vims, self.logger), 

2034 "sf": VimInteractionSf(self.db, self.my_vims, self.db_vims, self.logger), 

2035 "sfp": VimInteractionSfp(self.db, self.my_vims, self.db_vims, self.logger), 

2036 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger), 

2037 "image": VimInteractionImage( 

2038 self.db, self.my_vims, self.db_vims, self.logger 

2039 ), 

2040 "flavor": VimInteractionFlavor( 

2041 self.db, self.my_vims, self.db_vims, self.logger 

2042 ), 

2043 "sdn_net": VimInteractionSdnNet( 

2044 self.db, self.my_vims, self.db_vims, self.logger 

2045 ), 

2046 "update": VimInteractionUpdateVdu( 

2047 self.db, self.my_vims, self.db_vims, self.logger 

2048 ), 

2049 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup( 

2050 self.db, self.my_vims, self.db_vims, self.logger 

2051 ), 

2052 "migrate": VimInteractionMigration( 

2053 self.db, self.my_vims, self.db_vims, self.logger 

2054 ), 

2055 "verticalscale": VimInteractionResize( 

2056 self.db, self.my_vims, self.db_vims, self.logger 

2057 ), 

2058 } 

2059 self.time_last_task_processed = None 

2060 # lists of tasks to delete because nsrs or vnfrs has been deleted from db 

2061 self.tasks_to_delete = [] 

2062 # it is idle when there are not vim_targets associated 

2063 self.idle = True 

2064 self.task_locked_time = config["global"]["task_locked_time"] 

2065 

2066 def insert_task(self, task): 

2067 try: 

2068 self.task_queue.put(task, False) 

2069 return None 

2070 except queue.Full: 

2071 raise NsWorkerException("timeout inserting a task") 

2072 

2073 def terminate(self): 

2074 self.insert_task("exit") 

2075 

2076 def del_task(self, task): 

2077 with self.task_lock: 

2078 if task["status"] == "SCHEDULED": 

2079 task["status"] = "SUPERSEDED" 

2080 return True 

2081 else: # task["status"] == "processing" 

2082 self.task_lock.release() 

2083 return False 

2084 

2085 def _process_vim_config(self, target_id: str, db_vim: dict) -> None: 

2086 """ 

2087 Process vim config, creating vim configuration files as ca_cert 

2088 :param target_id: vim/sdn/wim + id 

2089 :param db_vim: Vim dictionary obtained from database 

2090 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files 

2091 """ 

2092 if not db_vim.get("config"): 

2093 return 

2094 

2095 file_name = "" 

2096 work_dir = "/app/osm_ro/certs" 

2097 

2098 try: 

2099 if db_vim["config"].get("ca_cert_content"): 

2100 file_name = f"{work_dir}/{target_id}:{self.worker_index}" 

2101 

2102 if not path.isdir(file_name): 

2103 makedirs(file_name) 

2104 

2105 file_name = file_name + "/ca_cert" 

2106 

2107 with open(file_name, "w") as f: 

2108 f.write(db_vim["config"]["ca_cert_content"]) 

2109 del db_vim["config"]["ca_cert_content"] 

2110 db_vim["config"]["ca_cert"] = file_name 

2111 except Exception as e: 

2112 raise NsWorkerException( 

2113 "Error writing to file '{}': {}".format(file_name, e) 

2114 ) 

2115 

2116 def _load_plugin(self, name, type="vim"): 

2117 # type can be vim or sdn 

2118 if "rovim_dummy" not in self.plugins: 

2119 self.plugins["rovim_dummy"] = VimDummyConnector 

2120 

2121 if "rosdn_dummy" not in self.plugins: 

2122 self.plugins["rosdn_dummy"] = SdnDummyConnector 

2123 

2124 if name in self.plugins: 

2125 return self.plugins[name] 

2126 

2127 try: 

2128 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name): 

2129 self.plugins[name] = ep.load() 

2130 except Exception as e: 

2131 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e)) 

2132 

2133 if name and name not in self.plugins: 

2134 raise NsWorkerException( 

2135 "Plugin 'osm_{n}' has not been installed".format(n=name) 

2136 ) 

2137 

2138 return self.plugins[name] 

2139 

2140 def _unload_vim(self, target_id): 

2141 """ 

2142 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list 

2143 :param target_id: Contains type:_id; where type can be 'vim', ... 

2144 :return: None. 

2145 """ 

2146 try: 

2147 self.db_vims.pop(target_id, None) 

2148 self.my_vims.pop(target_id, None) 

2149 

2150 if target_id in self.vim_targets: 

2151 self.vim_targets.remove(target_id) 

2152 

2153 self.logger.info("Unloaded {}".format(target_id)) 

2154 except Exception as e: 

2155 self.logger.error("Cannot unload {}: {}".format(target_id, e)) 

2156 

2157 def _check_vim(self, target_id): 

2158 """ 

2159 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR 

2160 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim' 

2161 :return: None. 

2162 """ 

2163 target, _, _id = target_id.partition(":") 

2164 now = time.time() 

2165 update_dict = {} 

2166 unset_dict = {} 

2167 op_text = "" 

2168 step = "" 

2169 loaded = target_id in self.vim_targets 

2170 target_database = ( 

2171 "vim_accounts" 

2172 if target == "vim" 

2173 else "wim_accounts" if target == "wim" else "sdns" 

2174 ) 

2175 error_text = "" 

2176 

2177 try: 

2178 step = "Getting {} from db".format(target_id) 

2179 db_vim = self.db.get_one(target_database, {"_id": _id}) 

2180 

2181 for op_index, operation in enumerate( 

2182 db_vim["_admin"].get("operations", ()) 

2183 ): 

2184 if operation["operationState"] != "PROCESSING": 

2185 continue 

2186 

2187 locked_at = operation.get("locked_at") 

2188 

2189 if locked_at is not None and locked_at >= now - self.task_locked_time: 

2190 # some other thread is doing this operation 

2191 return 

2192 

2193 # lock 

2194 op_text = "_admin.operations.{}.".format(op_index) 

2195 

2196 if not self.db.set_one( 

2197 target_database, 

2198 q_filter={ 

2199 "_id": _id, 

2200 op_text + "operationState": "PROCESSING", 

2201 op_text + "locked_at": locked_at, 

2202 }, 

2203 update_dict={ 

2204 op_text + "locked_at": now, 

2205 "admin.current_operation": op_index, 

2206 }, 

2207 fail_on_empty=False, 

2208 ): 

2209 return 

2210 

2211 unset_dict[op_text + "locked_at"] = None 

2212 unset_dict["current_operation"] = None 

2213 step = "Loading " + target_id 

2214 error_text = self._load_vim(target_id) 

2215 

2216 if not error_text: 

2217 step = "Checking connectivity" 

2218 

2219 if target == "vim": 

2220 self.my_vims[target_id].check_vim_connectivity() 

2221 else: 

2222 self.my_vims[target_id].check_credentials() 

2223 

2224 update_dict["_admin.operationalState"] = "ENABLED" 

2225 update_dict["_admin.detailed-status"] = "" 

2226 unset_dict[op_text + "detailed-status"] = None 

2227 update_dict[op_text + "operationState"] = "COMPLETED" 

2228 

2229 return 

2230 

2231 except Exception as e: 

2232 error_text = "{}: {}".format(step, e) 

2233 self.logger.error("{} for {}: {}".format(step, target_id, e)) 

2234 

2235 finally: 

2236 if update_dict or unset_dict: 

2237 if error_text: 

2238 update_dict[op_text + "operationState"] = "FAILED" 

2239 update_dict[op_text + "detailed-status"] = error_text 

2240 unset_dict.pop(op_text + "detailed-status", None) 

2241 update_dict["_admin.operationalState"] = "ERROR" 

2242 update_dict["_admin.detailed-status"] = error_text 

2243 

2244 if op_text: 

2245 update_dict[op_text + "statusEnteredTime"] = now 

2246 

2247 self.db.set_one( 

2248 target_database, 

2249 q_filter={"_id": _id}, 

2250 update_dict=update_dict, 

2251 unset=unset_dict, 

2252 fail_on_empty=False, 

2253 ) 

2254 

2255 if not loaded: 

2256 self._unload_vim(target_id) 

2257 

2258 def _reload_vim(self, target_id): 

2259 if target_id in self.vim_targets: 

2260 self._load_vim(target_id) 

2261 else: 

2262 # if the vim is not loaded, but database information of VIM is cached at self.db_vims, 

2263 # just remove it to force load again next time it is needed 

2264 self.db_vims.pop(target_id, None) 

2265 

2266 def _load_vim(self, target_id): 

2267 """ 

2268 Load or reload a vim_account, sdn_controller or wim_account. 

2269 Read content from database, load the plugin if not loaded. 

2270 In case of error loading the plugin, it loads a failing VIM_connector 

2271 It fills self db_vims dictionary, my_vims dictionary and vim_targets list 

2272 :param target_id: Contains type:_id; where type can be 'vim', ... 

2273 :return: None if ok, descriptive text if error 

2274 """ 

2275 target, _, _id = target_id.partition(":") 

2276 target_database = ( 

2277 "vim_accounts" 

2278 if target == "vim" 

2279 else "wim_accounts" if target == "wim" else "sdns" 

2280 ) 

2281 plugin_name = "" 

2282 vim = None 

2283 step = "Getting {}={} from db".format(target, _id) 

2284 

2285 try: 

2286 # TODO process for wim, sdnc, ... 

2287 vim = self.db.get_one(target_database, {"_id": _id}) 

2288 

2289 # if deep_get(vim, "config", "sdn-controller"): 

2290 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"]) 

2291 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]}) 

2292 

2293 step = "Decrypting password" 

2294 schema_version = vim.get("schema_version") 

2295 self.db.encrypt_decrypt_fields( 

2296 vim, 

2297 "decrypt", 

2298 fields=("password", "secret"), 

2299 schema_version=schema_version, 

2300 salt=_id, 

2301 ) 

2302 self._process_vim_config(target_id, vim) 

2303 

2304 if target == "vim": 

2305 plugin_name = "rovim_" + vim["vim_type"] 

2306 step = "Loading plugin '{}'".format(plugin_name) 

2307 vim_module_conn = self._load_plugin(plugin_name) 

2308 step = "Loading {}'".format(target_id) 

2309 self.my_vims[target_id] = vim_module_conn( 

2310 uuid=vim["_id"], 

2311 name=vim["name"], 

2312 tenant_id=vim.get("vim_tenant_id"), 

2313 tenant_name=vim.get("vim_tenant_name"), 

2314 url=vim["vim_url"], 

2315 url_admin=None, 

2316 user=vim["vim_user"], 

2317 passwd=vim["vim_password"], 

2318 config=vim.get("config") or {}, 

2319 persistent_info={}, 

2320 ) 

2321 else: # sdn 

2322 plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type")) 

2323 step = "Loading plugin '{}'".format(plugin_name) 

2324 vim_module_conn = self._load_plugin(plugin_name, "sdn") 

2325 step = "Loading {}'".format(target_id) 

2326 wim = deepcopy(vim) 

2327 wim_config = wim.pop("config", {}) or {} 

2328 wim["uuid"] = wim["_id"] 

2329 if "url" in wim and "wim_url" not in wim: 

2330 wim["wim_url"] = wim["url"] 

2331 elif "url" not in wim and "wim_url" in wim: 

2332 wim["url"] = wim["wim_url"] 

2333 

2334 if wim.get("dpid"): 

2335 wim_config["dpid"] = wim.pop("dpid") 

2336 

2337 if wim.get("switch_id"): 

2338 wim_config["switch_id"] = wim.pop("switch_id") 

2339 

2340 # wim, wim_account, config 

2341 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config) 

2342 self.db_vims[target_id] = vim 

2343 self.error_status = None 

2344 

2345 self.logger.info( 

2346 "Connector loaded for {}, plugin={}".format(target_id, plugin_name) 

2347 ) 

2348 except Exception as e: 

2349 self.logger.error( 

2350 "Cannot load {} plugin={}: {} {}".format( 

2351 target_id, plugin_name, step, e 

2352 ) 

2353 ) 

2354 

2355 self.db_vims[target_id] = vim or {} 

2356 self.db_vims[target_id] = FailingConnector(str(e)) 

2357 error_status = "{} Error: {}".format(step, e) 

2358 

2359 return error_status 

2360 finally: 

2361 if target_id not in self.vim_targets: 

2362 self.vim_targets.append(target_id) 

2363 

2364 def _get_db_task(self): 

2365 """ 

2366 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions 

2367 :return: None 

2368 """ 

2369 now = time.time() 

2370 

2371 if not self.time_last_task_processed: 

2372 self.time_last_task_processed = now 

2373 

2374 try: 

2375 while True: 

2376 """ 

2377 # Log RO tasks only when loglevel is DEBUG 

2378 if self.logger.getEffectiveLevel() == logging.DEBUG: 

2379 self._log_ro_task( 

2380 None, 

2381 None, 

2382 None, 

2383 "TASK_WF", 

2384 "task_locked_time=" 

2385 + str(self.task_locked_time) 

2386 + " " 

2387 + "time_last_task_processed=" 

2388 + str(self.time_last_task_processed) 

2389 + " " 

2390 + "now=" 

2391 + str(now), 

2392 ) 

2393 """ 

2394 locked = self.db.set_one( 

2395 "ro_tasks", 

2396 q_filter={ 

2397 "target_id": self.vim_targets, 

2398 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"], 

2399 "locked_at.lt": now - self.task_locked_time, 

2400 "to_check_at.lt": self.time_last_task_processed, 

2401 "to_check_at.gt": -1, 

2402 }, 

2403 update_dict={"locked_by": self.my_id, "locked_at": now}, 

2404 fail_on_empty=False, 

2405 ) 

2406 

2407 if locked: 

2408 # read and return 

2409 ro_task = self.db.get_one( 

2410 "ro_tasks", 

2411 q_filter={ 

2412 "target_id": self.vim_targets, 

2413 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"], 

2414 "locked_at": now, 

2415 }, 

2416 ) 

2417 return ro_task 

2418 

2419 if self.time_last_task_processed == now: 

2420 self.time_last_task_processed = None 

2421 return None 

2422 else: 

2423 self.time_last_task_processed = now 

2424 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now) 

2425 

2426 except DbException as e: 

2427 self.logger.error("Database exception at _get_db_task: {}".format(e)) 

2428 except Exception as e: 

2429 self.logger.critical( 

2430 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True 

2431 ) 

2432 

2433 return None 

2434 

2435 def _delete_task(self, ro_task, task_index, task_depends, db_update): 

2436 """ 

2437 Determine if this task need to be done or superseded 

2438 :return: None 

2439 """ 

2440 my_task = ro_task["tasks"][task_index] 

2441 task_id = my_task["task_id"] 

2442 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get( 

2443 "created_items", False 

2444 ) 

2445 

2446 self.logger.debug("Needed delete: {}".format(needed_delete)) 

2447 if my_task["status"] == "FAILED": 

2448 return None, None # TODO need to be retry?? 

2449 

2450 try: 

2451 for index, task in enumerate(ro_task["tasks"]): 

2452 if index == task_index or not task: 

2453 continue # own task 

2454 

2455 if ( 

2456 my_task["target_record"] == task["target_record"] 

2457 and task["action"] == "CREATE" 

2458 ): 

2459 # set to finished 

2460 db_update["tasks.{}.status".format(index)] = task["status"] = ( 

2461 "FINISHED" 

2462 ) 

2463 elif task["action"] == "CREATE" and task["status"] not in ( 

2464 "FINISHED", 

2465 "SUPERSEDED", 

2466 ): 

2467 needed_delete = False 

2468 

2469 if needed_delete: 

2470 self.logger.debug( 

2471 "Deleting ro_task={} task_index={}".format(ro_task, task_index) 

2472 ) 

2473 return self.item2class[my_task["item"]].delete(ro_task, task_index) 

2474 else: 

2475 return "SUPERSEDED", None 

2476 except Exception as e: 

2477 if not isinstance(e, NsWorkerException): 

2478 self.logger.critical( 

2479 "Unexpected exception at _delete_task task={}: {}".format( 

2480 task_id, e 

2481 ), 

2482 exc_info=True, 

2483 ) 

2484 

2485 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e)} 

2486 

2487 def _create_task(self, ro_task, task_index, task_depends, db_update): 

2488 """ 

2489 Determine if this task need to create something at VIM 

2490 :return: None 

2491 """ 

2492 my_task = ro_task["tasks"][task_index] 

2493 task_id = my_task["task_id"] 

2494 

2495 if my_task["status"] == "FAILED": 

2496 return None, None # TODO need to be retry?? 

2497 elif my_task["status"] == "SCHEDULED": 

2498 # check if already created by another task 

2499 for index, task in enumerate(ro_task["tasks"]): 

2500 if index == task_index or not task: 

2501 continue # own task 

2502 

2503 if task["action"] == "CREATE" and task["status"] not in ( 

2504 "SCHEDULED", 

2505 "FINISHED", 

2506 "SUPERSEDED", 

2507 ): 

2508 return task["status"], "COPY_VIM_INFO" 

2509 

2510 try: 

2511 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new( 

2512 ro_task, task_index, task_depends 

2513 ) 

2514 # TODO update other CREATE tasks 

2515 except Exception as e: 

2516 if not isinstance(e, NsWorkerException): 

2517 self.logger.error( 

2518 "Error executing task={}: {}".format(task_id, e), exc_info=True 

2519 ) 

2520 

2521 task_status = "FAILED" 

2522 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_message": str(e)} 

2523 # TODO update ro_vim_item_update 

2524 

2525 return task_status, ro_vim_item_update 

2526 else: 

2527 return None, None 

2528 

2529 def _get_dependency(self, task_id, ro_task=None, target_id=None): 

2530 """ 

2531 Look for dependency task 

2532 :param task_id: Can be one of 

2533 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>" 

2534 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>" 

2535 3. task.task_id: "<action_id>:number" 

2536 :param ro_task: 

2537 :param target_id: 

2538 :return: database ro_task plus index of task 

2539 """ 

2540 if ( 

2541 task_id.startswith("vim:") 

2542 or task_id.startswith("sdn:") 

2543 or task_id.startswith("wim:") 

2544 ): 

2545 target_id, _, task_id = task_id.partition(" ") 

2546 

2547 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"): 

2548 ro_task_dependency = self.db.get_one( 

2549 "ro_tasks", 

2550 q_filter={"target_id": target_id, "tasks.target_record_id": task_id}, 

2551 fail_on_empty=False, 

2552 ) 

2553 

2554 if ro_task_dependency: 

2555 for task_index, task in enumerate(ro_task_dependency["tasks"]): 

2556 if task["target_record_id"] == task_id: 

2557 return ro_task_dependency, task_index 

2558 

2559 else: 

2560 if ro_task: 

2561 for task_index, task in enumerate(ro_task["tasks"]): 

2562 if task and task["task_id"] == task_id: 

2563 return ro_task, task_index 

2564 

2565 ro_task_dependency = self.db.get_one( 

2566 "ro_tasks", 

2567 q_filter={ 

2568 "tasks.ANYINDEX.task_id": task_id, 

2569 "tasks.ANYINDEX.target_record.ne": None, 

2570 }, 

2571 fail_on_empty=False, 

2572 ) 

2573 

2574 self.logger.debug("ro_task_dependency={}".format(ro_task_dependency)) 

2575 if ro_task_dependency: 

2576 for task_index, task in enumerate(ro_task_dependency["tasks"]): 

2577 if task["task_id"] == task_id: 

2578 return ro_task_dependency, task_index 

2579 raise NsWorkerException("Cannot get depending task {}".format(task_id)) 

2580 

2581 def update_vm_refresh(self, ro_task): 

2582 """Enables the VM status updates if self.refresh_config.active parameter 

2583 is not -1 and then updates the DB accordingly 

2584 

2585 """ 

2586 try: 

2587 self.logger.debug("Checking if VM status update config") 

2588 next_refresh = time.time() 

2589 next_refresh = self._get_next_refresh(ro_task, next_refresh) 

2590 

2591 if next_refresh != -1: 

2592 db_ro_task_update = {} 

2593 now = time.time() 

2594 next_check_at = now + (24 * 60 * 60) 

2595 next_check_at = min(next_check_at, next_refresh) 

2596 db_ro_task_update["vim_info.refresh_at"] = next_refresh 

2597 db_ro_task_update["to_check_at"] = next_check_at 

2598 

2599 self.logger.debug( 

2600 "Finding tasks which to be updated to enable VM status updates" 

2601 ) 

2602 refresh_tasks = self.db.get_list( 

2603 "ro_tasks", 

2604 q_filter={ 

2605 "tasks.status": "DONE", 

2606 "to_check_at.lt": 0, 

2607 }, 

2608 ) 

2609 self.logger.debug("Updating tasks to change the to_check_at status") 

2610 for task in refresh_tasks: 

2611 q_filter = { 

2612 "_id": task["_id"], 

2613 } 

2614 self.db.set_one( 

2615 "ro_tasks", 

2616 q_filter=q_filter, 

2617 update_dict=db_ro_task_update, 

2618 fail_on_empty=True, 

2619 ) 

2620 

2621 except Exception as e: 

2622 self.logger.error(f"Error updating tasks to enable VM status updates: {e}") 

2623 

2624 def _get_next_refresh(self, ro_task: dict, next_refresh: float): 

2625 """Decide the next_refresh according to vim type and refresh config period. 

2626 Args: 

2627 ro_task (dict): ro_task details 

2628 next_refresh (float): next refresh time as epoch format 

2629 

2630 Returns: 

2631 next_refresh (float) -1 if vm updates are disabled or vim type is openstack. 

2632 """ 

2633 target_vim = ro_task["target_id"] 

2634 vim_type = self.db_vims[target_vim]["vim_type"] 

2635 if self.refresh_config.active == -1 or vim_type == "openstack": 

2636 next_refresh = -1 

2637 else: 

2638 next_refresh += self.refresh_config.active 

2639 return next_refresh 

2640 

2641 def _process_pending_tasks(self, ro_task): 

2642 ro_task_id = ro_task["_id"] 

2643 now = time.time() 

2644 # one day 

2645 next_check_at = now + (24 * 60 * 60) 

2646 db_ro_task_update = {} 

2647 

2648 def _update_refresh(new_status): 

2649 # compute next_refresh 

2650 nonlocal task 

2651 nonlocal next_check_at 

2652 nonlocal db_ro_task_update 

2653 nonlocal ro_task 

2654 

2655 next_refresh = time.time() 

2656 

2657 if task["item"] in ("image", "flavor"): 

2658 next_refresh += self.refresh_config.image 

2659 elif new_status == "BUILD": 

2660 next_refresh += self.refresh_config.build 

2661 elif new_status == "DONE": 

2662 next_refresh = self._get_next_refresh(ro_task, next_refresh) 

2663 else: 

2664 next_refresh += self.refresh_config.error 

2665 

2666 next_check_at = min(next_check_at, next_refresh) 

2667 db_ro_task_update["vim_info.refresh_at"] = next_refresh 

2668 ro_task["vim_info"]["refresh_at"] = next_refresh 

2669 

2670 try: 

2671 """ 

2672 # Log RO tasks only when loglevel is DEBUG 

2673 if self.logger.getEffectiveLevel() == logging.DEBUG: 

2674 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK") 

2675 """ 

2676 # Check if vim status refresh is enabled again 

2677 self.update_vm_refresh(ro_task) 

2678 # 0: get task_status_create 

2679 lock_object = None 

2680 task_status_create = None 

2681 task_create = next( 

2682 ( 

2683 t 

2684 for t in ro_task["tasks"] 

2685 if t 

2686 and t["action"] == "CREATE" 

2687 and t["status"] in ("BUILD", "DONE") 

2688 ), 

2689 None, 

2690 ) 

2691 

2692 if task_create: 

2693 task_status_create = task_create["status"] 

2694 

2695 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD 

2696 for task_action in ("DELETE", "CREATE", "EXEC"): 

2697 db_vim_update = None 

2698 new_status = None 

2699 

2700 for task_index, task in enumerate(ro_task["tasks"]): 

2701 if not task: 

2702 continue # task deleted 

2703 

2704 task_depends = {} 

2705 target_update = None 

2706 

2707 if ( 

2708 ( 

2709 task_action in ("DELETE", "EXEC") 

2710 and task["status"] not in ("SCHEDULED", "BUILD") 

2711 ) 

2712 or task["action"] != task_action 

2713 or ( 

2714 task_action == "CREATE" 

2715 and task["status"] in ("FINISHED", "SUPERSEDED") 

2716 ) 

2717 ): 

2718 continue 

2719 

2720 task_path = "tasks.{}.status".format(task_index) 

2721 try: 

2722 db_vim_info_update = None 

2723 dependency_ro_task = {} 

2724 

2725 if task["status"] == "SCHEDULED": 

2726 # check if tasks that this depends on have been completed 

2727 dependency_not_completed = False 

2728 

2729 for dependency_task_id in task.get("depends_on") or (): 

2730 ( 

2731 dependency_ro_task, 

2732 dependency_task_index, 

2733 ) = self._get_dependency( 

2734 dependency_task_id, target_id=ro_task["target_id"] 

2735 ) 

2736 dependency_task = dependency_ro_task["tasks"][ 

2737 dependency_task_index 

2738 ] 

2739 self.logger.debug( 

2740 "dependency_ro_task={} dependency_task_index={}".format( 

2741 dependency_ro_task, dependency_task_index 

2742 ) 

2743 ) 

2744 

2745 if dependency_task["status"] == "SCHEDULED": 

2746 dependency_not_completed = True 

2747 next_check_at = min( 

2748 next_check_at, dependency_ro_task["to_check_at"] 

2749 ) 

2750 # must allow dependent task to be processed first 

2751 # to do this set time after last_task_processed 

2752 next_check_at = max( 

2753 self.time_last_task_processed, next_check_at 

2754 ) 

2755 break 

2756 elif dependency_task["status"] == "FAILED": 

2757 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format( 

2758 task["action"], 

2759 task["item"], 

2760 dependency_task["action"], 

2761 dependency_task["item"], 

2762 dependency_task_id, 

2763 dependency_ro_task["vim_info"].get( 

2764 "vim_message" 

2765 ), 

2766 ) 

2767 self.logger.error( 

2768 "task={} {}".format(task["task_id"], error_text) 

2769 ) 

2770 raise NsWorkerException(error_text) 

2771 

2772 task_depends[dependency_task_id] = dependency_ro_task[ 

2773 "vim_info" 

2774 ]["vim_id"] 

2775 task_depends["TASK-{}".format(dependency_task_id)] = ( 

2776 dependency_ro_task["vim_info"]["vim_id"] 

2777 ) 

2778 

2779 if dependency_not_completed: 

2780 self.logger.warning( 

2781 "DEPENDENCY NOT COMPLETED {}".format( 

2782 dependency_ro_task["vim_info"]["vim_id"] 

2783 ) 

2784 ) 

2785 # TODO set at vim_info.vim_details that it is waiting 

2786 continue 

2787 

2788 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew 

2789 # the task of renew this locking. It will update database locket_at periodically 

2790 if not lock_object: 

2791 lock_object = LockRenew.add_lock_object( 

2792 "ro_tasks", ro_task, self 

2793 ) 

2794 if task["action"] == "DELETE": 

2795 ( 

2796 new_status, 

2797 db_vim_info_update, 

2798 ) = self._delete_task( 

2799 ro_task, task_index, task_depends, db_ro_task_update 

2800 ) 

2801 new_status = ( 

2802 "FINISHED" if new_status == "DONE" else new_status 

2803 ) 

2804 # ^with FINISHED instead of DONE it will not be refreshing 

2805 

2806 if new_status in ("FINISHED", "SUPERSEDED"): 

2807 target_update = "DELETE" 

2808 elif task["action"] == "EXEC": 

2809 ( 

2810 new_status, 

2811 db_vim_info_update, 

2812 db_task_update, 

2813 ) = self.item2class[task["item"]].exec( 

2814 ro_task, task_index, task_depends 

2815 ) 

2816 new_status = ( 

2817 "FINISHED" if new_status == "DONE" else new_status 

2818 ) 

2819 # ^with FINISHED instead of DONE it will not be refreshing 

2820 

2821 if db_task_update: 

2822 # load into database the modified db_task_update "retries" and "next_retry" 

2823 if db_task_update.get("retries"): 

2824 db_ro_task_update[ 

2825 "tasks.{}.retries".format(task_index) 

2826 ] = db_task_update["retries"] 

2827 

2828 next_check_at = time.time() + db_task_update.get( 

2829 "next_retry", 60 

2830 ) 

2831 target_update = None 

2832 elif task["action"] == "CREATE": 

2833 if task["status"] == "SCHEDULED": 

2834 if task_status_create: 

2835 new_status = task_status_create 

2836 target_update = "COPY_VIM_INFO" 

2837 else: 

2838 new_status, db_vim_info_update = self.item2class[ 

2839 task["item"] 

2840 ].new(ro_task, task_index, task_depends) 

2841 _update_refresh(new_status) 

2842 else: 

2843 refresh_at = ro_task["vim_info"]["refresh_at"] 

2844 if refresh_at and refresh_at != -1 and now > refresh_at: 

2845 ( 

2846 new_status, 

2847 db_vim_info_update, 

2848 ) = self.item2class[ 

2849 task["item"] 

2850 ].refresh(ro_task) 

2851 _update_refresh(new_status) 

2852 else: 

2853 # The refresh is updated to avoid set the value of "refresh_at" to 

2854 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD, 

2855 # because it can happen that in this case the task is never processed 

2856 _update_refresh(task["status"]) 

2857 

2858 except Exception as e: 

2859 new_status = "FAILED" 

2860 db_vim_info_update = { 

2861 "vim_status": "VIM_ERROR", 

2862 "vim_message": str(e), 

2863 } 

2864 

2865 if not isinstance( 

2866 e, (NsWorkerException, vimconn.VimConnException) 

2867 ): 

2868 self.logger.error( 

2869 "Unexpected exception at _delete_task task={}: {}".format( 

2870 task["task_id"], e 

2871 ), 

2872 exc_info=True, 

2873 ) 

2874 

2875 try: 

2876 if db_vim_info_update: 

2877 db_vim_update = db_vim_info_update.copy() 

2878 db_ro_task_update.update( 

2879 { 

2880 "vim_info." + k: v 

2881 for k, v in db_vim_info_update.items() 

2882 } 

2883 ) 

2884 ro_task["vim_info"].update(db_vim_info_update) 

2885 

2886 if new_status: 

2887 if task_action == "CREATE": 

2888 task_status_create = new_status 

2889 db_ro_task_update[task_path] = new_status 

2890 

2891 if target_update or db_vim_update: 

2892 if target_update == "DELETE": 

2893 self._update_target(task, None) 

2894 elif target_update == "COPY_VIM_INFO": 

2895 self._update_target(task, ro_task["vim_info"]) 

2896 else: 

2897 self._update_target(task, db_vim_update) 

2898 

2899 except Exception as e: 

2900 if ( 

2901 isinstance(e, DbException) 

2902 and e.http_code == HTTPStatus.NOT_FOUND 

2903 ): 

2904 # if the vnfrs or nsrs has been removed from database, this task must be removed 

2905 self.logger.debug( 

2906 "marking to delete task={}".format(task["task_id"]) 

2907 ) 

2908 self.tasks_to_delete.append(task) 

2909 else: 

2910 self.logger.error( 

2911 "Unexpected exception at _update_target task={}: {}".format( 

2912 task["task_id"], e 

2913 ), 

2914 exc_info=True, 

2915 ) 

2916 

2917 locked_at = ro_task["locked_at"] 

2918 

2919 if lock_object: 

2920 locked_at = [ 

2921 lock_object["locked_at"], 

2922 lock_object["locked_at"] + self.task_locked_time, 

2923 ] 

2924 # locked_at contains two times to avoid race condition. In case the lock has been renewed, it will 

2925 # contain exactly locked_at + self.task_locked_time 

2926 LockRenew.remove_lock_object(lock_object) 

2927 

2928 q_filter = { 

2929 "_id": ro_task["_id"], 

2930 "to_check_at": ro_task["to_check_at"], 

2931 "locked_at": locked_at, 

2932 } 

2933 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified, 

2934 # outside this task (by ro_nbi) do not update it 

2935 db_ro_task_update["locked_by"] = None 

2936 # locked_at converted to int only for debugging. When it is not decimals it means it has been unlocked 

2937 db_ro_task_update["locked_at"] = int(now - self.task_locked_time) 

2938 db_ro_task_update["modified_at"] = now 

2939 db_ro_task_update["to_check_at"] = next_check_at 

2940 

2941 """ 

2942 # Log RO tasks only when loglevel is DEBUG 

2943 if self.logger.getEffectiveLevel() == logging.DEBUG: 

2944 db_ro_task_update_log = db_ro_task_update.copy() 

2945 db_ro_task_update_log["_id"] = q_filter["_id"] 

2946 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK") 

2947 """ 

2948 

2949 if not self.db.set_one( 

2950 "ro_tasks", 

2951 update_dict=db_ro_task_update, 

2952 q_filter=q_filter, 

2953 fail_on_empty=False, 

2954 ): 

2955 del db_ro_task_update["to_check_at"] 

2956 del q_filter["to_check_at"] 

2957 """ 

2958 # Log RO tasks only when loglevel is DEBUG 

2959 if self.logger.getEffectiveLevel() == logging.DEBUG: 

2960 self._log_ro_task( 

2961 None, 

2962 db_ro_task_update_log, 

2963 None, 

2964 "TASK_WF", 

2965 "SET_TASK " + str(q_filter), 

2966 ) 

2967 """ 

2968 self.db.set_one( 

2969 "ro_tasks", 

2970 q_filter=q_filter, 

2971 update_dict=db_ro_task_update, 

2972 fail_on_empty=True, 

2973 ) 

2974 except DbException as e: 

2975 self.logger.error( 

2976 "ro_task={} Error updating database {}".format(ro_task_id, e) 

2977 ) 

2978 except Exception as e: 

2979 self.logger.error( 

2980 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True 

2981 ) 

2982 

2983 def _update_target(self, task, ro_vim_item_update): 

2984 table, _, temp = task["target_record"].partition(":") 

2985 _id, _, path_vim_status = temp.partition(":") 

2986 path_item = path_vim_status[: path_vim_status.rfind(".")] 

2987 path_item = path_item[: path_item.rfind(".")] 

2988 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id" 

2989 # path_item: dot separated list targeting record information, e.g. "vdur.10" 

2990 

2991 if ro_vim_item_update: 

2992 update_dict = { 

2993 path_vim_status + "." + k: v 

2994 for k, v in ro_vim_item_update.items() 

2995 if k 

2996 in ( 

2997 "vim_id", 

2998 "vim_details", 

2999 "vim_message", 

3000 "vim_name", 

3001 "vim_status", 

3002 "interfaces", 

3003 "interfaces_backup", 

3004 ) 

3005 } 

3006 

3007 if path_vim_status.startswith("vdur."): 

3008 # for backward compatibility, add vdur.name apart from vdur.vim_name 

3009 if ro_vim_item_update.get("vim_name"): 

3010 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"] 

3011 

3012 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id 

3013 if ro_vim_item_update.get("vim_id"): 

3014 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"] 

3015 

3016 # update general status 

3017 if ro_vim_item_update.get("vim_status"): 

3018 update_dict[path_item + ".status"] = ro_vim_item_update[ 

3019 "vim_status" 

3020 ] 

3021 

3022 if ro_vim_item_update.get("interfaces"): 

3023 path_interfaces = path_item + ".interfaces" 

3024 

3025 for i, iface in enumerate(ro_vim_item_update.get("interfaces")): 

3026 if iface: 

3027 update_dict.update( 

3028 { 

3029 path_interfaces + ".{}.".format(i) + k: v 

3030 for k, v in iface.items() 

3031 if k in ("vlan", "compute_node", "pci") 

3032 } 

3033 ) 

3034 

3035 # put ip_address and mac_address with ip-address and mac-address 

3036 if iface.get("ip_address"): 

3037 update_dict[ 

3038 path_interfaces + ".{}.".format(i) + "ip-address" 

3039 ] = iface["ip_address"] 

3040 

3041 if iface.get("mac_address"): 

3042 update_dict[ 

3043 path_interfaces + ".{}.".format(i) + "mac-address" 

3044 ] = iface["mac_address"] 

3045 

3046 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"): 

3047 update_dict["ip-address"] = iface.get("ip_address").split( 

3048 ";" 

3049 )[0] 

3050 

3051 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"): 

3052 update_dict[path_item + ".ip-address"] = iface.get( 

3053 "ip_address" 

3054 ).split(";")[0] 

3055 

3056 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict) 

3057 

3058 # If interfaces exists, it backups VDU interfaces in the DB for healing operations 

3059 if ro_vim_item_update.get("interfaces"): 

3060 search_key = path_vim_status + ".interfaces" 

3061 if update_dict.get(search_key): 

3062 interfaces_backup_update = { 

3063 path_vim_status + ".interfaces_backup": update_dict[search_key] 

3064 } 

3065 

3066 self.db.set_one( 

3067 table, 

3068 q_filter={"_id": _id}, 

3069 update_dict=interfaces_backup_update, 

3070 ) 

3071 

3072 else: 

3073 update_dict = {path_item + ".status": "DELETED"} 

3074 self.db.set_one( 

3075 table, 

3076 q_filter={"_id": _id}, 

3077 update_dict=update_dict, 

3078 unset={path_vim_status: None}, 

3079 ) 

3080 

3081 def _process_delete_db_tasks(self): 

3082 """ 

3083 Delete task from database because vnfrs or nsrs or both have been deleted 

3084 :return: None. Uses and modify self.tasks_to_delete 

3085 """ 

3086 while self.tasks_to_delete: 

3087 task = self.tasks_to_delete[0] 

3088 vnfrs_deleted = None 

3089 nsr_id = task["nsr_id"] 

3090 

3091 if task["target_record"].startswith("vnfrs:"): 

3092 # check if nsrs is present 

3093 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False): 

3094 vnfrs_deleted = task["target_record"].split(":")[1] 

3095 

3096 try: 

3097 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted) 

3098 except Exception as e: 

3099 self.logger.error( 

3100 "Error deleting task={}: {}".format(task["task_id"], e) 

3101 ) 

3102 self.tasks_to_delete.pop(0) 

3103 

3104 @staticmethod 

3105 def delete_db_tasks(db, nsr_id, vnfrs_deleted): 

3106 """ 

3107 Static method because it is called from osm_ng_ro.ns 

3108 :param db: instance of database to use 

3109 :param nsr_id: affected nsrs id 

3110 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id 

3111 :return: None, exception is fails 

3112 """ 

3113 retries = 5 

3114 for retry in range(retries): 

3115 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id}) 

3116 now = time.time() 

3117 conflict = False 

3118 

3119 for ro_task in ro_tasks: 

3120 db_update = {} 

3121 to_delete_ro_task = True 

3122 

3123 for index, task in enumerate(ro_task["tasks"]): 

3124 if not task: 

3125 pass 

3126 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or ( 

3127 vnfrs_deleted 

3128 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted) 

3129 ): 

3130 db_update["tasks.{}".format(index)] = None 

3131 else: 

3132 # used by other nsr, ro_task cannot be deleted 

3133 to_delete_ro_task = False 

3134 

3135 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed 

3136 if to_delete_ro_task: 

3137 if not db.del_one( 

3138 "ro_tasks", 

3139 q_filter={ 

3140 "_id": ro_task["_id"], 

3141 "modified_at": ro_task["modified_at"], 

3142 }, 

3143 fail_on_empty=False, 

3144 ): 

3145 conflict = True 

3146 elif db_update: 

3147 db_update["modified_at"] = now 

3148 if not db.set_one( 

3149 "ro_tasks", 

3150 q_filter={ 

3151 "_id": ro_task["_id"], 

3152 "modified_at": ro_task["modified_at"], 

3153 }, 

3154 update_dict=db_update, 

3155 fail_on_empty=False, 

3156 ): 

3157 conflict = True 

3158 if not conflict: 

3159 return 

3160 else: 

3161 raise NsWorkerException("Exceeded {} retries".format(retries)) 

3162 

3163 def run(self): 

3164 # load database 

3165 self.logger.info("Starting") 

3166 while True: 

3167 # step 1: get commands from queue 

3168 try: 

3169 if self.vim_targets: 

3170 task = self.task_queue.get(block=False) 

3171 else: 

3172 if not self.idle: 

3173 self.logger.debug("enters in idle state") 

3174 self.idle = True 

3175 task = self.task_queue.get(block=True) 

3176 self.idle = False 

3177 

3178 if task[0] == "terminate": 

3179 break 

3180 elif task[0] == "load_vim": 

3181 self.logger.info("order to load vim {}".format(task[1])) 

3182 self._load_vim(task[1]) 

3183 elif task[0] == "unload_vim": 

3184 self.logger.info("order to unload vim {}".format(task[1])) 

3185 self._unload_vim(task[1]) 

3186 elif task[0] == "reload_vim": 

3187 self._reload_vim(task[1]) 

3188 elif task[0] == "check_vim": 

3189 self.logger.info("order to check vim {}".format(task[1])) 

3190 self._check_vim(task[1]) 

3191 continue 

3192 except Exception as e: 

3193 if isinstance(e, queue.Empty): 

3194 pass 

3195 else: 

3196 self.logger.critical( 

3197 "Error processing task: {}".format(e), exc_info=True 

3198 ) 

3199 

3200 # step 2: process pending_tasks, delete not needed tasks 

3201 try: 

3202 if self.tasks_to_delete: 

3203 self._process_delete_db_tasks() 

3204 busy = False 

3205 """ 

3206 # Log RO tasks only when loglevel is DEBUG 

3207 if self.logger.getEffectiveLevel() == logging.DEBUG: 

3208 _ = self._get_db_all_tasks() 

3209 """ 

3210 ro_task = self._get_db_task() 

3211 if ro_task: 

3212 self.logger.debug("Task to process: {}".format(ro_task)) 

3213 time.sleep(1) 

3214 self._process_pending_tasks(ro_task) 

3215 busy = True 

3216 if not busy: 

3217 time.sleep(5) 

3218 except Exception as e: 

3219 self.logger.critical( 

3220 "Unexpected exception at run: " + str(e), exc_info=True 

3221 ) 

3222 

3223 self.logger.info("Finishing")