Coverage for NG-RO/osm_ng_ro/ns.py: 44%

1493 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-14 12:04 +0000

1# -*- coding: utf-8 -*- 

2 

3## 

4# Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U. 

5# Licensed under the Apache License, Version 2.0 (the "License"); 

6# you may not use this file except in compliance with the License. 

7# You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

14# implied. 

15# See the License for the specific language governing permissions and 

16# limitations under the License. 

17## 

18 

19from copy import deepcopy 

20from http import HTTPStatus 

21from itertools import product 

22import logging 

23from random import choice as random_choice 

24from threading import Lock 

25from time import time 

26from traceback import format_exc as traceback_format_exc 

27from typing import Any, Dict, List, Optional, Tuple, Type 

28from uuid import uuid4 

29 

30from cryptography.hazmat.backends import default_backend as crypto_default_backend 

31from cryptography.hazmat.primitives import serialization as crypto_serialization 

32from cryptography.hazmat.primitives.asymmetric import rsa 

33from jinja2 import ( 

34 Environment, 

35 select_autoescape, 

36 StrictUndefined, 

37 TemplateError, 

38 TemplateNotFound, 

39 UndefinedError, 

40) 

41from osm_common import ( 

42 dbmemory, 

43 dbmongo, 

44 fslocal, 

45 fsmongo, 

46 msgkafka, 

47 msglocal, 

48 version as common_version, 

49) 

50from osm_common.dbbase import DbBase, DbException 

51from osm_common.fsbase import FsBase, FsException 

52from osm_common.msgbase import MsgException 

53from osm_ng_ro.ns_thread import deep_get, NsWorker, NsWorkerException 

54from osm_ng_ro.validation import deploy_schema, validate_input 

55import yaml 

56 

57__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>" 

58min_common_version = "0.1.16" 

59 

60 

61class NsException(Exception): 

62 def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST): 

63 self.http_code = http_code 

64 super(Exception, self).__init__(message) 

65 

66 

67def get_process_id(): 

68 """ 

69 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it 

70 will provide a random one 

71 :return: Obtained ID 

72 """ 

73 # Try getting docker id. If fails, get pid 

74 try: 

75 with open("/proc/self/cgroup", "r") as f: 

76 text_id_ = f.readline() 

77 _, _, text_id = text_id_.rpartition("/") 

78 text_id = text_id.replace("\n", "")[:12] 

79 

80 if text_id: 

81 return text_id 

82 except Exception as error: 

83 logging.exception(f"{error} occured while getting process id") 

84 

85 # Return a random id 

86 return "".join(random_choice("0123456789abcdef") for _ in range(12)) 

87 

88 

89def versiontuple(v): 

90 """utility for compare dot separate versions. Fills with zeros to proper number comparison""" 

91 filled = [] 

92 

93 for point in v.split("."): 

94 filled.append(point.zfill(8)) 

95 

96 return tuple(filled) 

97 

98 

99class Ns(object): 

100 def __init__(self): 

101 self.db = None 

102 self.fs = None 

103 self.msg = None 

104 self.config = None 

105 # self.operations = None 

106 self.logger = None 

107 # ^ Getting logger inside method self.start because parent logger (ro) is not available yet. 

108 # If done now it will not be linked to parent not getting its handler and level 

109 self.map_topic = {} 

110 self.write_lock = None 

111 self.vims_assigned = {} 

112 self.next_worker = 0 

113 self.plugins = {} 

114 self.workers = [] 

115 self.process_params_function_map = { 

116 "net": Ns._process_net_params, 

117 "image": Ns._process_image_params, 

118 "flavor": Ns._process_flavor_params, 

119 "vdu": Ns._process_vdu_params, 

120 "classification": Ns._process_classification_params, 

121 "sfi": Ns._process_sfi_params, 

122 "sf": Ns._process_sf_params, 

123 "sfp": Ns._process_sfp_params, 

124 "affinity-or-anti-affinity-group": Ns._process_affinity_group_params, 

125 "shared-volumes": Ns._process_shared_volumes_params, 

126 } 

127 self.db_path_map = { 

128 "net": "vld", 

129 "image": "image", 

130 "flavor": "flavor", 

131 "vdu": "vdur", 

132 "classification": "classification", 

133 "sfi": "sfi", 

134 "sf": "sf", 

135 "sfp": "sfp", 

136 "affinity-or-anti-affinity-group": "affinity-or-anti-affinity-group", 

137 "shared-volumes": "shared-volumes", 

138 } 

139 

140 def init_db(self, target_version): 

141 pass 

142 

143 def start(self, config): 

144 """ 

145 Connect to database, filesystem storage, and messaging 

146 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', 

147 :param config: Configuration of db, storage, etc 

148 :return: None 

149 """ 

150 self.config = config 

151 self.config["process_id"] = get_process_id() # used for HA identity 

152 self.logger = logging.getLogger("ro.ns") 

153 

154 # check right version of common 

155 if versiontuple(common_version) < versiontuple(min_common_version): 

156 raise NsException( 

157 "Not compatible osm/common version '{}'. Needed '{}' or higher".format( 

158 common_version, min_common_version 

159 ) 

160 ) 

161 

162 try: 

163 if not self.db: 

164 if config["database"]["driver"] == "mongo": 

165 self.db = dbmongo.DbMongo() 

166 self.db.db_connect(config["database"]) 

167 elif config["database"]["driver"] == "memory": 

168 self.db = dbmemory.DbMemory() 

169 self.db.db_connect(config["database"]) 

170 else: 

171 raise NsException( 

172 "Invalid configuration param '{}' at '[database]':'driver'".format( 

173 config["database"]["driver"] 

174 ) 

175 ) 

176 

177 if not self.fs: 

178 if config["storage"]["driver"] == "local": 

179 self.fs = fslocal.FsLocal() 

180 self.fs.fs_connect(config["storage"]) 

181 elif config["storage"]["driver"] == "mongo": 

182 self.fs = fsmongo.FsMongo() 

183 self.fs.fs_connect(config["storage"]) 

184 elif config["storage"]["driver"] is None: 

185 pass 

186 else: 

187 raise NsException( 

188 "Invalid configuration param '{}' at '[storage]':'driver'".format( 

189 config["storage"]["driver"] 

190 ) 

191 ) 

192 

193 if not self.msg: 

194 if config["message"]["driver"] == "local": 

195 self.msg = msglocal.MsgLocal() 

196 self.msg.connect(config["message"]) 

197 elif config["message"]["driver"] == "kafka": 

198 self.msg = msgkafka.MsgKafka() 

199 self.msg.connect(config["message"]) 

200 else: 

201 raise NsException( 

202 "Invalid configuration param '{}' at '[message]':'driver'".format( 

203 config["message"]["driver"] 

204 ) 

205 ) 

206 

207 # TODO load workers to deal with exising database tasks 

208 

209 self.write_lock = Lock() 

210 except (DbException, FsException, MsgException) as e: 

211 raise NsException(str(e), http_code=e.http_code) 

212 

213 def get_assigned_vims(self): 

214 return list(self.vims_assigned.keys()) 

215 

216 def stop(self): 

217 try: 

218 if self.db: 

219 self.db.db_disconnect() 

220 

221 if self.fs: 

222 self.fs.fs_disconnect() 

223 

224 if self.msg: 

225 self.msg.disconnect() 

226 

227 self.write_lock = None 

228 except (DbException, FsException, MsgException) as e: 

229 raise NsException(str(e), http_code=e.http_code) 

230 

231 for worker in self.workers: 

232 worker.insert_task(("terminate",)) 

233 

234 def _create_worker(self): 

235 """ 

236 Look for a worker thread in idle status. If not found it creates one unless the number of threads reach the 

237 limit of 'server.ns_threads' configuration. If reached, it just assigns one existing thread 

238 return the index of the assigned worker thread. Worker threads are storead at self.workers 

239 """ 

240 # Look for a thread in idle status 

241 worker_id = next( 

242 ( 

243 i 

244 for i in range(len(self.workers)) 

245 if self.workers[i] and self.workers[i].idle 

246 ), 

247 None, 

248 ) 

249 

250 if worker_id is not None: 

251 # unset idle status to avoid race conditions 

252 self.workers[worker_id].idle = False 

253 else: 

254 worker_id = len(self.workers) 

255 

256 if worker_id < self.config["global"]["server.ns_threads"]: 

257 # create a new worker 

258 self.workers.append( 

259 NsWorker(worker_id, self.config, self.plugins, self.db) 

260 ) 

261 self.workers[worker_id].start() 

262 else: 

263 # reached maximum number of threads, assign VIM to an existing one 

264 worker_id = self.next_worker 

265 self.next_worker = (self.next_worker + 1) % self.config["global"][ 

266 "server.ns_threads" 

267 ] 

268 

269 return worker_id 

270 

271 def assign_vim(self, target_id): 

272 with self.write_lock: 

273 return self._assign_vim(target_id) 

274 

275 def _assign_vim(self, target_id): 

276 if target_id not in self.vims_assigned: 

277 worker_id = self.vims_assigned[target_id] = self._create_worker() 

278 self.workers[worker_id].insert_task(("load_vim", target_id)) 

279 

280 def reload_vim(self, target_id): 

281 # send reload_vim to the thread working with this VIM and inform all that a VIM has been changed, 

282 # this is because database VIM information is cached for threads working with SDN 

283 with self.write_lock: 

284 for worker in self.workers: 

285 if worker and not worker.idle: 

286 worker.insert_task(("reload_vim", target_id)) 

287 

288 def unload_vim(self, target_id): 

289 with self.write_lock: 

290 return self._unload_vim(target_id) 

291 

292 def _unload_vim(self, target_id): 

293 if target_id in self.vims_assigned: 

294 worker_id = self.vims_assigned[target_id] 

295 self.workers[worker_id].insert_task(("unload_vim", target_id)) 

296 del self.vims_assigned[target_id] 

297 

298 def check_vim(self, target_id): 

299 with self.write_lock: 

300 if target_id in self.vims_assigned: 

301 worker_id = self.vims_assigned[target_id] 

302 else: 

303 worker_id = self._create_worker() 

304 

305 worker = self.workers[worker_id] 

306 worker.insert_task(("check_vim", target_id)) 

307 

308 def unload_unused_vims(self): 

309 with self.write_lock: 

310 vims_to_unload = [] 

311 

312 for target_id in self.vims_assigned: 

313 if not self.db.get_one( 

314 "ro_tasks", 

315 q_filter={ 

316 "target_id": target_id, 

317 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"], 

318 }, 

319 fail_on_empty=False, 

320 ): 

321 vims_to_unload.append(target_id) 

322 

323 for target_id in vims_to_unload: 

324 self._unload_vim(target_id) 

325 

326 @staticmethod 

327 def _get_cloud_init( 

328 db: Type[DbBase], 

329 fs: Type[FsBase], 

330 location: str, 

331 ) -> str: 

332 """This method reads cloud init from a file. 

333 

334 Note: Not used as cloud init content is provided in the http body. 

335 

336 Args: 

337 db (Type[DbBase]): [description] 

338 fs (Type[FsBase]): [description] 

339 location (str): can be 'vnfr_id:file:file_name' or 'vnfr_id:vdu:vdu_idex' 

340 

341 Raises: 

342 NsException: [description] 

343 NsException: [description] 

344 

345 Returns: 

346 str: [description] 

347 """ 

348 vnfd_id, _, other = location.partition(":") 

349 _type, _, name = other.partition(":") 

350 vnfd = db.get_one("vnfds", {"_id": vnfd_id}) 

351 

352 if _type == "file": 

353 base_folder = vnfd["_admin"]["storage"] 

354 cloud_init_file = "{}/{}/cloud_init/{}".format( 

355 base_folder["folder"], base_folder["pkg-dir"], name 

356 ) 

357 

358 if not fs: 

359 raise NsException( 

360 "Cannot read file '{}'. Filesystem not loaded, change configuration at storage.driver".format( 

361 cloud_init_file 

362 ) 

363 ) 

364 

365 with fs.file_open(cloud_init_file, "r") as ci_file: 

366 cloud_init_content = ci_file.read() 

367 elif _type == "vdu": 

368 cloud_init_content = vnfd["vdu"][int(name)]["cloud-init"] 

369 else: 

370 raise NsException("Mismatch descriptor for cloud init: {}".format(location)) 

371 

372 return cloud_init_content 

373 

374 @staticmethod 

375 def _parse_jinja2( 

376 cloud_init_content: str, 

377 params: Dict[str, Any], 

378 context: str, 

379 ) -> str: 

380 """Function that processes the cloud init to replace Jinja2 encoded parameters. 

381 

382 Args: 

383 cloud_init_content (str): [description] 

384 params (Dict[str, Any]): [description] 

385 context (str): [description] 

386 

387 Raises: 

388 NsException: [description] 

389 NsException: [description] 

390 

391 Returns: 

392 str: [description] 

393 """ 

394 try: 

395 env = Environment( 

396 undefined=StrictUndefined, 

397 autoescape=select_autoescape(default_for_string=True, default=True), 

398 ) 

399 template = env.from_string(cloud_init_content) 

400 

401 return template.render(params or {}) 

402 except UndefinedError as e: 

403 raise NsException( 

404 "Variable '{}' defined at vnfd='{}' must be provided in the instantiation parameters" 

405 "inside the 'additionalParamsForVnf' block".format(e, context) 

406 ) 

407 except (TemplateError, TemplateNotFound) as e: 

408 raise NsException( 

409 "Error parsing Jinja2 to cloud-init content at vnfd='{}': {}".format( 

410 context, e 

411 ) 

412 ) 

413 

414 def _create_db_ro_nsrs(self, nsr_id, now): 

415 try: 

416 key = rsa.generate_private_key( 

417 backend=crypto_default_backend(), public_exponent=65537, key_size=2048 

418 ) 

419 private_key = key.private_bytes( 

420 crypto_serialization.Encoding.PEM, 

421 crypto_serialization.PrivateFormat.PKCS8, 

422 crypto_serialization.NoEncryption(), 

423 ) 

424 public_key = key.public_key().public_bytes( 

425 crypto_serialization.Encoding.OpenSSH, 

426 crypto_serialization.PublicFormat.OpenSSH, 

427 ) 

428 private_key = private_key.decode("utf8") 

429 # Change first line because Paramiko needs a explicit start with 'BEGIN RSA PRIVATE KEY' 

430 i = private_key.find("\n") 

431 private_key = "-----BEGIN RSA PRIVATE KEY-----" + private_key[i:] 

432 public_key = public_key.decode("utf8") 

433 except Exception as e: 

434 raise NsException("Cannot create ssh-keys: {}".format(e)) 

435 

436 schema_version = "1.1" 

437 private_key_encrypted = self.db.encrypt( 

438 private_key, schema_version=schema_version, salt=nsr_id 

439 ) 

440 db_content = { 

441 "_id": nsr_id, 

442 "_admin": { 

443 "created": now, 

444 "modified": now, 

445 "schema_version": schema_version, 

446 }, 

447 "public_key": public_key, 

448 "private_key": private_key_encrypted, 

449 "actions": [], 

450 } 

451 self.db.create("ro_nsrs", db_content) 

452 

453 return db_content 

454 

455 @staticmethod 

456 def _create_task( 

457 deployment_info: Dict[str, Any], 

458 target_id: str, 

459 item: str, 

460 action: str, 

461 target_record: str, 

462 target_record_id: str, 

463 extra_dict: Dict[str, Any] = None, 

464 ) -> Dict[str, Any]: 

465 """Function to create task dict from deployment information. 

466 

467 Args: 

468 deployment_info (Dict[str, Any]): [description] 

469 target_id (str): [description] 

470 item (str): [description] 

471 action (str): [description] 

472 target_record (str): [description] 

473 target_record_id (str): [description] 

474 extra_dict (Dict[str, Any], optional): [description]. Defaults to None. 

475 

476 Returns: 

477 Dict[str, Any]: [description] 

478 """ 

479 task = { 

480 "target_id": target_id, # it will be removed before pushing at database 

481 "action_id": deployment_info.get("action_id"), 

482 "nsr_id": deployment_info.get("nsr_id"), 

483 "task_id": f"{deployment_info.get('action_id')}:{deployment_info.get('task_index')}", 

484 "status": "SCHEDULED", 

485 "action": action, 

486 "item": item, 

487 "target_record": target_record, 

488 "target_record_id": target_record_id, 

489 } 

490 

491 if extra_dict: 

492 task.update(extra_dict) # params, find_params, depends_on 

493 

494 deployment_info["task_index"] = deployment_info.get("task_index", 0) + 1 

495 

496 return task 

497 

498 @staticmethod 

499 def _create_ro_task( 

500 target_id: str, 

501 task: Dict[str, Any], 

502 ) -> Dict[str, Any]: 

503 """Function to create an RO task from task information. 

504 

505 Args: 

506 target_id (str): [description] 

507 task (Dict[str, Any]): [description] 

508 

509 Returns: 

510 Dict[str, Any]: [description] 

511 """ 

512 now = time() 

513 

514 _id = task.get("task_id") 

515 db_ro_task = { 

516 "_id": _id, 

517 "locked_by": None, 

518 "locked_at": 0.0, 

519 "target_id": target_id, 

520 "vim_info": { 

521 "created": False, 

522 "created_items": None, 

523 "vim_id": None, 

524 "vim_name": None, 

525 "vim_status": None, 

526 "vim_details": None, 

527 "vim_message": None, 

528 "refresh_at": None, 

529 }, 

530 "modified_at": now, 

531 "created_at": now, 

532 "to_check_at": now, 

533 "tasks": [task], 

534 } 

535 

536 return db_ro_task 

537 

538 @staticmethod 

539 def _process_image_params( 

540 target_image: Dict[str, Any], 

541 indata: Dict[str, Any], 

542 vim_info: Dict[str, Any], 

543 target_record_id: str, 

544 **kwargs: Dict[str, Any], 

545 ) -> Dict[str, Any]: 

546 """Function to process VDU image parameters. 

547 

548 Args: 

549 target_image (Dict[str, Any]): [description] 

550 indata (Dict[str, Any]): [description] 

551 vim_info (Dict[str, Any]): [description] 

552 target_record_id (str): [description] 

553 

554 Returns: 

555 Dict[str, Any]: [description] 

556 """ 

557 find_params = {} 

558 

559 if target_image.get("image"): 

560 find_params["filter_dict"] = {"name": target_image.get("image")} 

561 

562 if target_image.get("vim_image_id"): 

563 find_params["filter_dict"] = {"id": target_image.get("vim_image_id")} 

564 

565 if target_image.get("image_checksum"): 

566 find_params["filter_dict"] = { 

567 "checksum": target_image.get("image_checksum") 

568 } 

569 

570 return {"find_params": find_params} 

571 

572 @staticmethod 

573 def _get_resource_allocation_params( 

574 quota_descriptor: Dict[str, Any], 

575 ) -> Dict[str, Any]: 

576 """Read the quota_descriptor from vnfd and fetch the resource allocation properties from the 

577 descriptor object. 

578 

579 Args: 

580 quota_descriptor (Dict[str, Any]): cpu/mem/vif/disk-io quota descriptor 

581 

582 Returns: 

583 Dict[str, Any]: quota params for limit, reserve, shares from the descriptor object 

584 """ 

585 quota = {} 

586 

587 if quota_descriptor.get("limit"): 

588 quota["limit"] = int(quota_descriptor["limit"]) 

589 

590 if quota_descriptor.get("reserve"): 

591 quota["reserve"] = int(quota_descriptor["reserve"]) 

592 

593 if quota_descriptor.get("shares"): 

594 quota["shares"] = int(quota_descriptor["shares"]) 

595 

596 return quota 

597 

598 @staticmethod 

599 def _process_guest_epa_quota_params( 

600 guest_epa_quota: Dict[str, Any], 

601 epa_vcpu_set: bool, 

602 ) -> Dict[str, Any]: 

603 """Function to extract the guest epa quota parameters. 

604 

605 Args: 

606 guest_epa_quota (Dict[str, Any]): [description] 

607 epa_vcpu_set (bool): [description] 

608 

609 Returns: 

610 Dict[str, Any]: [description] 

611 """ 

612 result = {} 

613 

614 if guest_epa_quota.get("cpu-quota") and not epa_vcpu_set: 

615 cpuquota = Ns._get_resource_allocation_params( 

616 guest_epa_quota.get("cpu-quota") 

617 ) 

618 

619 if cpuquota: 

620 result["cpu-quota"] = cpuquota 

621 

622 if guest_epa_quota.get("mem-quota"): 

623 vduquota = Ns._get_resource_allocation_params( 

624 guest_epa_quota.get("mem-quota") 

625 ) 

626 

627 if vduquota: 

628 result["mem-quota"] = vduquota 

629 

630 if guest_epa_quota.get("disk-io-quota"): 

631 diskioquota = Ns._get_resource_allocation_params( 

632 guest_epa_quota.get("disk-io-quota") 

633 ) 

634 

635 if diskioquota: 

636 result["disk-io-quota"] = diskioquota 

637 

638 if guest_epa_quota.get("vif-quota"): 

639 vifquota = Ns._get_resource_allocation_params( 

640 guest_epa_quota.get("vif-quota") 

641 ) 

642 

643 if vifquota: 

644 result["vif-quota"] = vifquota 

645 

646 return result 

647 

648 @staticmethod 

649 def _process_guest_epa_numa_params( 

650 guest_epa_quota: Dict[str, Any], 

651 ) -> Tuple[Dict[str, Any], bool]: 

652 """[summary] 

653 

654 Args: 

655 guest_epa_quota (Dict[str, Any]): [description] 

656 

657 Returns: 

658 Tuple[Dict[str, Any], bool]: [description] 

659 """ 

660 numa = {} 

661 numa_list = [] 

662 epa_vcpu_set = False 

663 

664 if guest_epa_quota.get("numa-node-policy"): 

665 numa_node_policy = guest_epa_quota.get("numa-node-policy") 

666 

667 if numa_node_policy.get("node"): 

668 for numa_node in numa_node_policy["node"]: 

669 vcpu_list = [] 

670 if numa_node.get("id"): 

671 numa["id"] = int(numa_node["id"]) 

672 

673 if numa_node.get("vcpu"): 

674 for vcpu in numa_node.get("vcpu"): 

675 vcpu_id = int(vcpu.get("id")) 

676 vcpu_list.append(vcpu_id) 

677 numa["vcpu"] = vcpu_list 

678 

679 if numa_node.get("num-cores"): 

680 numa["cores"] = numa_node["num-cores"] 

681 epa_vcpu_set = True 

682 

683 paired_threads = numa_node.get("paired-threads", {}) 

684 if paired_threads.get("num-paired-threads"): 

685 numa["paired_threads"] = int( 

686 numa_node["paired-threads"]["num-paired-threads"] 

687 ) 

688 epa_vcpu_set = True 

689 

690 if paired_threads.get("paired-thread-ids"): 

691 numa["paired-threads-id"] = [] 

692 

693 for pair in paired_threads["paired-thread-ids"]: 

694 numa["paired-threads-id"].append( 

695 ( 

696 str(pair["thread-a"]), 

697 str(pair["thread-b"]), 

698 ) 

699 ) 

700 

701 if numa_node.get("num-threads"): 

702 numa["threads"] = int(numa_node["num-threads"]) 

703 epa_vcpu_set = True 

704 

705 if numa_node.get("memory-mb"): 

706 numa["memory"] = max(int(int(numa_node["memory-mb"]) / 1024), 1) 

707 

708 numa_list.append(numa) 

709 numa = {} 

710 

711 return numa_list, epa_vcpu_set 

712 

713 @staticmethod 

714 def _process_guest_epa_cpu_pinning_params( 

715 guest_epa_quota: Dict[str, Any], 

716 vcpu_count: int, 

717 epa_vcpu_set: bool, 

718 ) -> Tuple[Dict[str, Any], bool]: 

719 """[summary] 

720 

721 Args: 

722 guest_epa_quota (Dict[str, Any]): [description] 

723 vcpu_count (int): [description] 

724 epa_vcpu_set (bool): [description] 

725 

726 Returns: 

727 Tuple[Dict[str, Any], bool]: [description] 

728 """ 

729 numa = {} 

730 local_epa_vcpu_set = epa_vcpu_set 

731 

732 if ( 

733 guest_epa_quota.get("cpu-pinning-policy") == "DEDICATED" 

734 and not epa_vcpu_set 

735 ): 

736 # Pinning policy "REQUIRE" uses threads as host should support SMT architecture 

737 # Pinning policy "ISOLATE" uses cores as host should not support SMT architecture 

738 # Pinning policy "PREFER" uses threads in case host supports SMT architecture 

739 numa[ 

740 ( 

741 "cores" 

742 if guest_epa_quota.get("cpu-thread-pinning-policy") == "ISOLATE" 

743 else "threads" 

744 ) 

745 ] = max(vcpu_count, 1) 

746 local_epa_vcpu_set = True 

747 

748 return numa, local_epa_vcpu_set 

749 

750 @staticmethod 

751 def _process_epa_params( 

752 target_flavor: Dict[str, Any], 

753 ) -> Dict[str, Any]: 

754 """[summary] 

755 

756 Args: 

757 target_flavor (Dict[str, Any]): [description] 

758 

759 Returns: 

760 Dict[str, Any]: [description] 

761 """ 

762 extended = {} 

763 numa = {} 

764 numa_list = [] 

765 

766 if target_flavor.get("guest-epa"): 

767 guest_epa = target_flavor["guest-epa"] 

768 

769 numa_list, epa_vcpu_set = Ns._process_guest_epa_numa_params( 

770 guest_epa_quota=guest_epa 

771 ) 

772 

773 if guest_epa.get("mempage-size"): 

774 extended["mempage-size"] = guest_epa.get("mempage-size") 

775 

776 if guest_epa.get("cpu-pinning-policy"): 

777 extended["cpu-pinning-policy"] = guest_epa.get("cpu-pinning-policy") 

778 

779 if guest_epa.get("cpu-thread-pinning-policy"): 

780 extended["cpu-thread-pinning-policy"] = guest_epa.get( 

781 "cpu-thread-pinning-policy" 

782 ) 

783 

784 if guest_epa.get("numa-node-policy"): 

785 if guest_epa.get("numa-node-policy").get("mem-policy"): 

786 extended["mem-policy"] = guest_epa.get("numa-node-policy").get( 

787 "mem-policy" 

788 ) 

789 

790 tmp_numa, epa_vcpu_set = Ns._process_guest_epa_cpu_pinning_params( 

791 guest_epa_quota=guest_epa, 

792 vcpu_count=int(target_flavor.get("vcpu-count", 1)), 

793 epa_vcpu_set=epa_vcpu_set, 

794 ) 

795 for numa in numa_list: 

796 numa.update(tmp_numa) 

797 

798 extended.update( 

799 Ns._process_guest_epa_quota_params( 

800 guest_epa_quota=guest_epa, 

801 epa_vcpu_set=epa_vcpu_set, 

802 ) 

803 ) 

804 

805 if numa: 

806 extended["numas"] = numa_list 

807 

808 return extended 

809 

810 @staticmethod 

811 def _process_flavor_params( 

812 target_flavor: Dict[str, Any], 

813 indata: Dict[str, Any], 

814 vim_info: Dict[str, Any], 

815 target_record_id: str, 

816 **kwargs: Dict[str, Any], 

817 ) -> Dict[str, Any]: 

818 """[summary] 

819 

820 Args: 

821 target_flavor (Dict[str, Any]): [description] 

822 indata (Dict[str, Any]): [description] 

823 vim_info (Dict[str, Any]): [description] 

824 target_record_id (str): [description] 

825 

826 Returns: 

827 Dict[str, Any]: [description] 

828 """ 

829 db = kwargs.get("db") 

830 target_vdur = {} 

831 

832 for vnf in indata.get("vnf", []): 

833 for vdur in vnf.get("vdur", []): 

834 if vdur.get("ns-flavor-id") == target_flavor.get("id"): 

835 target_vdur = vdur 

836 

837 vim_flavor_id = ( 

838 target_vdur.get("additionalParams", {}).get("OSM", {}).get("vim_flavor_id") 

839 ) 

840 if vim_flavor_id: # vim-flavor-id was passed so flavor won't be created 

841 return {"find_params": {"vim_flavor_id": vim_flavor_id}} 

842 

843 vim_flavor_name = ( 

844 target_vdur.get("additionalParams", {}) 

845 .get("OSM", {}) 

846 .get("vim_flavor_name") 

847 ) 

848 if vim_flavor_name: # vim-flavor-name was passed so flavor won't be created 

849 return {"find_params": {"vim_flavor_name": vim_flavor_name}} 

850 

851 flavor_data = { 

852 "disk": int(target_flavor["storage-gb"]), 

853 "ram": int(target_flavor["memory-mb"]), 

854 "vcpus": int(target_flavor["vcpu-count"]), 

855 } 

856 

857 if db and isinstance(indata.get("vnf"), list): 

858 vnfd_id = indata.get("vnf")[0].get("vnfd-id") 

859 vnfd = db.get_one("vnfds", {"_id": vnfd_id}) 

860 # check if there is persistent root disk 

861 for vdu in vnfd.get("vdu", ()): 

862 if vdu["name"] == target_vdur.get("vdu-name"): 

863 for vsd in vnfd.get("virtual-storage-desc", ()): 

864 if vsd.get("id") == vdu.get("virtual-storage-desc", [[]])[0]: 

865 root_disk = vsd 

866 if root_disk.get("type-of-storage", "").endswith( 

867 "persistent-storage" 

868 ): 

869 flavor_data["disk"] = 0 

870 

871 for storage in target_vdur.get("virtual-storages", []): 

872 if ( 

873 storage.get("type-of-storage") 

874 == "etsi-nfv-descriptors:ephemeral-storage" 

875 ): 

876 flavor_data["ephemeral"] = int(storage.get("size-of-storage", 0)) 

877 elif storage.get("type-of-storage") == "etsi-nfv-descriptors:swap-storage": 

878 flavor_data["swap"] = int(storage.get("size-of-storage", 0)) 

879 

880 extended = Ns._process_epa_params(target_flavor) 

881 if extended: 

882 flavor_data["extended"] = extended 

883 

884 extra_dict = {"find_params": {"flavor_data": flavor_data}} 

885 flavor_data_name = flavor_data.copy() 

886 flavor_data_name["name"] = target_flavor["name"] 

887 extra_dict["params"] = {"flavor_data": flavor_data_name} 

888 return extra_dict 

889 

890 @staticmethod 

891 def _prefix_ip_address(ip_address): 

892 if "/" not in ip_address: 

893 ip_address += "/32" 

894 return ip_address 

895 

896 @staticmethod 

897 def _process_ip_proto(ip_proto): 

898 if ip_proto: 

899 if ip_proto == 1: 

900 ip_proto = "icmp" 

901 elif ip_proto == 6: 

902 ip_proto = "tcp" 

903 elif ip_proto == 17: 

904 ip_proto = "udp" 

905 return ip_proto 

906 

907 @staticmethod 

908 def _process_classification_params( 

909 target_classification: Dict[str, Any], 

910 indata: Dict[str, Any], 

911 vim_info: Dict[str, Any], 

912 target_record_id: str, 

913 **kwargs: Dict[str, Any], 

914 ) -> Dict[str, Any]: 

915 """[summary] 

916 

917 Args: 

918 target_classification (Dict[str, Any]): Classification dictionary parameters that needs to be processed to create resource on VIM 

919 indata (Dict[str, Any]): Deployment info 

920 vim_info (Dict[str, Any]):To add items created by OSM on the VIM. 

921 target_record_id (str): Task record ID. 

922 **kwargs (Dict[str, Any]): Used to send additional information to the task. 

923 

924 Returns: 

925 Dict[str, Any]: Return parameters required to create classification and Items on which classification is dependent. 

926 """ 

927 vnfr_id = target_classification["vnfr_id"] 

928 vdur_id = target_classification["vdur_id"] 

929 port_index = target_classification["ingress_port_index"] 

930 extra_dict = {} 

931 

932 classification_data = { 

933 "name": target_classification["id"], 

934 "source_port_range_min": target_classification["source-port"], 

935 "source_port_range_max": target_classification["source-port"], 

936 "destination_port_range_min": target_classification["destination-port"], 

937 "destination_port_range_max": target_classification["destination-port"], 

938 } 

939 

940 classification_data["source_ip_prefix"] = Ns._prefix_ip_address( 

941 target_classification["source-ip-address"] 

942 ) 

943 

944 classification_data["destination_ip_prefix"] = Ns._prefix_ip_address( 

945 target_classification["destination-ip-address"] 

946 ) 

947 

948 classification_data["protocol"] = Ns._process_ip_proto( 

949 int(target_classification["ip-proto"]) 

950 ) 

951 

952 db = kwargs.get("db") 

953 vdu_text = Ns._get_vnfr_vdur_text(db, vnfr_id, vdur_id) 

954 

955 extra_dict = {"depends_on": [vdu_text]} 

956 

957 extra_dict = {"depends_on": [vdu_text]} 

958 classification_data["logical_source_port"] = "TASK-" + vdu_text 

959 classification_data["logical_source_port_index"] = port_index 

960 

961 extra_dict["params"] = classification_data 

962 

963 return extra_dict 

964 

965 @staticmethod 

966 def _process_sfi_params( 

967 target_sfi: Dict[str, Any], 

968 indata: Dict[str, Any], 

969 vim_info: Dict[str, Any], 

970 target_record_id: str, 

971 **kwargs: Dict[str, Any], 

972 ) -> Dict[str, Any]: 

973 """[summary] 

974 

975 Args: 

976 target_sfi (Dict[str, Any]): SFI dictionary parameters that needs to be processed to create resource on VIM 

977 indata (Dict[str, Any]): deployment info 

978 vim_info (Dict[str, Any]): To add items created by OSM on the VIM. 

979 target_record_id (str): Task record ID. 

980 **kwargs (Dict[str, Any]): Used to send additional information to the task. 

981 

982 Returns: 

983 Dict[str, Any]: Return parameters required to create SFI and Items on which SFI is dependent. 

984 """ 

985 

986 vnfr_id = target_sfi["vnfr_id"] 

987 vdur_id = target_sfi["vdur_id"] 

988 

989 sfi_data = { 

990 "name": target_sfi["id"], 

991 "ingress_port_index": target_sfi["ingress_port_index"], 

992 "egress_port_index": target_sfi["egress_port_index"], 

993 } 

994 

995 db = kwargs.get("db") 

996 vdu_text = Ns._get_vnfr_vdur_text(db, vnfr_id, vdur_id) 

997 

998 extra_dict = {"depends_on": [vdu_text]} 

999 sfi_data["ingress_port"] = "TASK-" + vdu_text 

1000 sfi_data["egress_port"] = "TASK-" + vdu_text 

1001 

1002 extra_dict["params"] = sfi_data 

1003 

1004 return extra_dict 

1005 

1006 @staticmethod 

1007 def _get_vnfr_vdur_text(db, vnfr_id, vdur_id): 

1008 vnf_preffix = "vnfrs:{}".format(vnfr_id) 

1009 db_vnfr = db.get_one("vnfrs", {"_id": vnfr_id}) 

1010 vdur_list = [] 

1011 vdu_text = "" 

1012 

1013 if db_vnfr: 

1014 vdur_list = [ 

1015 vdur["id"] for vdur in db_vnfr["vdur"] if vdur["vdu-id-ref"] == vdur_id 

1016 ] 

1017 

1018 if vdur_list: 

1019 vdu_text = vnf_preffix + ":vdur." + vdur_list[0] 

1020 

1021 return vdu_text 

1022 

1023 @staticmethod 

1024 def _process_sf_params( 

1025 target_sf: Dict[str, Any], 

1026 indata: Dict[str, Any], 

1027 vim_info: Dict[str, Any], 

1028 target_record_id: str, 

1029 **kwargs: Dict[str, Any], 

1030 ) -> Dict[str, Any]: 

1031 """[summary] 

1032 

1033 Args: 

1034 target_sf (Dict[str, Any]): SF dictionary parameters that needs to be processed to create resource on VIM 

1035 indata (Dict[str, Any]): Deployment info. 

1036 vim_info (Dict[str, Any]):To add items created by OSM on the VIM. 

1037 target_record_id (str): Task record ID. 

1038 **kwargs (Dict[str, Any]): Used to send additional information to the task. 

1039 

1040 Returns: 

1041 Dict[str, Any]: Return parameters required to create SF and Items on which SF is dependent. 

1042 """ 

1043 

1044 nsr_id = kwargs.get("nsr_id", "") 

1045 sfis = target_sf["sfis"] 

1046 ns_preffix = "nsrs:{}".format(nsr_id) 

1047 extra_dict = {"depends_on": [], "params": []} 

1048 sf_data = {"name": target_sf["id"], "sfis": sfis} 

1049 

1050 for count, sfi in enumerate(sfis): 

1051 sfi_text = ns_preffix + ":sfi." + sfi 

1052 sfis[count] = "TASK-" + sfi_text 

1053 extra_dict["depends_on"].append(sfi_text) 

1054 

1055 extra_dict["params"] = sf_data 

1056 

1057 return extra_dict 

1058 

1059 @staticmethod 

1060 def _process_sfp_params( 

1061 target_sfp: Dict[str, Any], 

1062 indata: Dict[str, Any], 

1063 vim_info: Dict[str, Any], 

1064 target_record_id: str, 

1065 **kwargs: Dict[str, Any], 

1066 ) -> Dict[str, Any]: 

1067 """[summary] 

1068 

1069 Args: 

1070 target_sfp (Dict[str, Any]): SFP dictionary parameters that needs to be processed to create resource on VIM. 

1071 indata (Dict[str, Any]): Deployment info 

1072 vim_info (Dict[str, Any]):To add items created by OSM on the VIM. 

1073 target_record_id (str): Task record ID. 

1074 **kwargs (Dict[str, Any]): Used to send additional information to the task. 

1075 

1076 Returns: 

1077 Dict[str, Any]: Return parameters required to create SFP and Items on which SFP is dependent. 

1078 """ 

1079 

1080 nsr_id = kwargs.get("nsr_id") 

1081 sfs = target_sfp["sfs"] 

1082 classifications = target_sfp["classifications"] 

1083 ns_preffix = "nsrs:{}".format(nsr_id) 

1084 extra_dict = {"depends_on": [], "params": []} 

1085 sfp_data = { 

1086 "name": target_sfp["id"], 

1087 "sfs": sfs, 

1088 "classifications": classifications, 

1089 } 

1090 

1091 for count, sf in enumerate(sfs): 

1092 sf_text = ns_preffix + ":sf." + sf 

1093 sfs[count] = "TASK-" + sf_text 

1094 extra_dict["depends_on"].append(sf_text) 

1095 

1096 for count, classi in enumerate(classifications): 

1097 classi_text = ns_preffix + ":classification." + classi 

1098 classifications[count] = "TASK-" + classi_text 

1099 extra_dict["depends_on"].append(classi_text) 

1100 

1101 extra_dict["params"] = sfp_data 

1102 

1103 return extra_dict 

1104 

1105 @staticmethod 

1106 def _process_net_params( 

1107 target_vld: Dict[str, Any], 

1108 indata: Dict[str, Any], 

1109 vim_info: Dict[str, Any], 

1110 target_record_id: str, 

1111 **kwargs: Dict[str, Any], 

1112 ) -> Dict[str, Any]: 

1113 """Function to process network parameters. 

1114 

1115 Args: 

1116 target_vld (Dict[str, Any]): [description] 

1117 indata (Dict[str, Any]): [description] 

1118 vim_info (Dict[str, Any]): [description] 

1119 target_record_id (str): [description] 

1120 

1121 Returns: 

1122 Dict[str, Any]: [description] 

1123 """ 

1124 extra_dict = {} 

1125 

1126 if vim_info.get("sdn"): 

1127 # vnf_preffix = "vnfrs:{}".format(vnfr_id) 

1128 # ns_preffix = "nsrs:{}".format(nsr_id) 

1129 # remove the ending ".sdn 

1130 vld_target_record_id, _, _ = target_record_id.rpartition(".") 

1131 extra_dict["params"] = { 

1132 k: vim_info[k] 

1133 for k in ("sdn-ports", "target_vim", "vlds", "type") 

1134 if vim_info.get(k) 

1135 } 

1136 

1137 # TODO needed to add target_id in the dependency. 

1138 if vim_info.get("target_vim"): 

1139 extra_dict["depends_on"] = [ 

1140 f"{vim_info.get('target_vim')} {vld_target_record_id}" 

1141 ] 

1142 

1143 return extra_dict 

1144 

1145 if vim_info.get("vim_network_name"): 

1146 extra_dict["find_params"] = { 

1147 "filter_dict": { 

1148 "name": vim_info.get("vim_network_name"), 

1149 }, 

1150 } 

1151 elif vim_info.get("vim_network_id"): 

1152 extra_dict["find_params"] = { 

1153 "filter_dict": { 

1154 "id": vim_info.get("vim_network_id"), 

1155 }, 

1156 } 

1157 elif target_vld.get("mgmt-network") and not vim_info.get("provider_network"): 

1158 extra_dict["find_params"] = { 

1159 "mgmt": True, 

1160 "name": target_vld["id"], 

1161 } 

1162 else: 

1163 # create 

1164 extra_dict["params"] = { 

1165 "net_name": ( 

1166 f"{indata.get('name')[:16]}-{target_vld.get('name', target_vld.get('id'))[:16]}" 

1167 ), 

1168 "ip_profile": vim_info.get("ip_profile"), 

1169 "provider_network_profile": vim_info.get("provider_network"), 

1170 } 

1171 

1172 if not target_vld.get("underlay"): 

1173 extra_dict["params"]["net_type"] = "bridge" 

1174 else: 

1175 extra_dict["params"]["net_type"] = ( 

1176 "ptp" if target_vld.get("type") == "ELINE" else "data" 

1177 ) 

1178 

1179 return extra_dict 

1180 

1181 @staticmethod 

1182 def find_persistent_root_volumes( 

1183 vnfd: dict, 

1184 target_vdu: dict, 

1185 vdu_instantiation_volumes_list: list, 

1186 disk_list: list, 

1187 ) -> Dict[str, any]: 

1188 """Find the persistent root volumes and add them to the disk_list 

1189 by parsing the instantiation parameters. 

1190 

1191 Args: 

1192 vnfd (dict): VNF descriptor 

1193 target_vdu (dict): processed VDU 

1194 vdu_instantiation_volumes_list (list): instantiation parameters for the each VDU as a list 

1195 disk_list (list): to be filled up 

1196 

1197 Returns: 

1198 persistent_root_disk (dict): Details of persistent root disk 

1199 

1200 """ 

1201 persistent_root_disk = {} 

1202 # There can be only one root disk, when we find it, it will return the result 

1203 

1204 for vdu, vsd in product( 

1205 vnfd.get("vdu", ()), vnfd.get("virtual-storage-desc", ()) 

1206 ): 

1207 if ( 

1208 vdu["name"] == target_vdu["vdu-name"] 

1209 and vsd.get("id") == vdu.get("virtual-storage-desc", [[]])[0] 

1210 ): 

1211 root_disk = vsd 

1212 if root_disk.get("type-of-storage", "").endswith("persistent-storage"): 

1213 for vdu_volume in vdu_instantiation_volumes_list: 

1214 if ( 

1215 vdu_volume["vim-volume-id"] 

1216 and root_disk["id"] == vdu_volume["name"] 

1217 ): 

1218 persistent_root_disk[vsd["id"]] = { 

1219 "vim_volume_id": vdu_volume["vim-volume-id"], 

1220 "image_id": vdu.get("sw-image-desc"), 

1221 } 

1222 

1223 disk_list.append(persistent_root_disk[vsd["id"]]) 

1224 

1225 return persistent_root_disk 

1226 

1227 else: 

1228 if root_disk.get("size-of-storage"): 

1229 persistent_root_disk[vsd["id"]] = { 

1230 "image_id": vdu.get("sw-image-desc"), 

1231 "size": root_disk.get("size-of-storage"), 

1232 "keep": Ns.is_volume_keeping_required(root_disk), 

1233 } 

1234 

1235 disk_list.append(persistent_root_disk[vsd["id"]]) 

1236 

1237 return persistent_root_disk 

1238 return persistent_root_disk 

1239 

1240 @staticmethod 

1241 def find_persistent_volumes( 

1242 persistent_root_disk: dict, 

1243 target_vdu: dict, 

1244 vdu_instantiation_volumes_list: list, 

1245 disk_list: list, 

1246 ) -> None: 

1247 """Find the ordinary persistent volumes and add them to the disk_list 

1248 by parsing the instantiation parameters. 

1249 

1250 Args: 

1251 persistent_root_disk: persistent root disk dictionary 

1252 target_vdu: processed VDU 

1253 vdu_instantiation_volumes_list: instantiation parameters for the each VDU as a list 

1254 disk_list: to be filled up 

1255 

1256 """ 

1257 # Find the ordinary volumes which are not added to the persistent_root_disk 

1258 persistent_disk = {} 

1259 for disk in target_vdu.get("virtual-storages", {}): 

1260 if ( 

1261 disk.get("type-of-storage", "").endswith("persistent-storage") 

1262 and disk["id"] not in persistent_root_disk.keys() 

1263 ): 

1264 for vdu_volume in vdu_instantiation_volumes_list: 

1265 if vdu_volume["vim-volume-id"] and disk["id"] == vdu_volume["name"]: 

1266 persistent_disk[disk["id"]] = { 

1267 "vim_volume_id": vdu_volume["vim-volume-id"], 

1268 } 

1269 disk_list.append(persistent_disk[disk["id"]]) 

1270 

1271 else: 

1272 if disk["id"] not in persistent_disk.keys(): 

1273 persistent_disk[disk["id"]] = { 

1274 "size": disk.get("size-of-storage"), 

1275 "keep": Ns.is_volume_keeping_required(disk), 

1276 } 

1277 disk_list.append(persistent_disk[disk["id"]]) 

1278 

1279 @staticmethod 

1280 def is_volume_keeping_required(virtual_storage_desc: Dict[str, Any]) -> bool: 

1281 """Function to decide keeping persistent volume 

1282 upon VDU deletion. 

1283 

1284 Args: 

1285 virtual_storage_desc (Dict[str, Any]): virtual storage description dictionary 

1286 

1287 Returns: 

1288 bool (True/False) 

1289 """ 

1290 

1291 if not virtual_storage_desc.get("vdu-storage-requirements"): 

1292 return False 

1293 for item in virtual_storage_desc.get("vdu-storage-requirements", {}): 

1294 if item.get("key") == "keep-volume" and item.get("value").lower() == "true": 

1295 return True 

1296 return False 

1297 

1298 @staticmethod 

1299 def is_shared_volume( 

1300 virtual_storage_desc: Dict[str, Any], vnfd_id: str 

1301 ) -> (str, bool): 

1302 """Function to decide if the volume type is multi attached or not . 

1303 

1304 Args: 

1305 virtual_storage_desc (Dict[str, Any]): virtual storage description dictionary 

1306 vnfd_id (str): vnfd id 

1307 

1308 Returns: 

1309 bool (True/False) 

1310 name (str) New name if it is a multiattach disk 

1311 """ 

1312 

1313 if vdu_storage_requirements := virtual_storage_desc.get( 

1314 "vdu-storage-requirements", {} 

1315 ): 

1316 for item in vdu_storage_requirements: 

1317 if ( 

1318 item.get("key") == "multiattach" 

1319 and item.get("value").lower() == "true" 

1320 ): 

1321 name = f"shared-{virtual_storage_desc['id']}-{vnfd_id}" 

1322 return name, True 

1323 return virtual_storage_desc["id"], False 

1324 

1325 @staticmethod 

1326 def _sort_vdu_interfaces(target_vdu: dict) -> None: 

1327 """Sort the interfaces according to position number. 

1328 

1329 Args: 

1330 target_vdu (dict): Details of VDU to be created 

1331 

1332 """ 

1333 # If the position info is provided for all the interfaces, it will be sorted 

1334 # according to position number ascendingly. 

1335 sorted_interfaces = sorted( 

1336 target_vdu["interfaces"], 

1337 key=lambda x: (x.get("position") is None, x.get("position")), 

1338 ) 

1339 target_vdu["interfaces"] = sorted_interfaces 

1340 

1341 @staticmethod 

1342 def _partially_locate_vdu_interfaces(target_vdu: dict) -> None: 

1343 """Only place the interfaces which has specific position. 

1344 

1345 Args: 

1346 target_vdu (dict): Details of VDU to be created 

1347 

1348 """ 

1349 # If the position info is provided for some interfaces but not all of them, the interfaces 

1350 # which has specific position numbers will be placed and others' positions will not be taken care. 

1351 if any( 

1352 i.get("position") + 1 

1353 for i in target_vdu["interfaces"] 

1354 if i.get("position") is not None 

1355 ): 

1356 n = len(target_vdu["interfaces"]) 

1357 sorted_interfaces = [-1] * n 

1358 k, m = 0, 0 

1359 

1360 while k < n: 

1361 if target_vdu["interfaces"][k].get("position") is not None: 

1362 if any(i.get("position") == 0 for i in target_vdu["interfaces"]): 

1363 idx = target_vdu["interfaces"][k]["position"] + 1 

1364 else: 

1365 idx = target_vdu["interfaces"][k]["position"] 

1366 sorted_interfaces[idx - 1] = target_vdu["interfaces"][k] 

1367 k += 1 

1368 

1369 while m < n: 

1370 if target_vdu["interfaces"][m].get("position") is None: 

1371 idy = sorted_interfaces.index(-1) 

1372 sorted_interfaces[idy] = target_vdu["interfaces"][m] 

1373 m += 1 

1374 

1375 target_vdu["interfaces"] = sorted_interfaces 

1376 

1377 @staticmethod 

1378 def _prepare_vdu_cloud_init( 

1379 target_vdu: dict, vdu2cloud_init: dict, db: object, fs: object 

1380 ) -> Dict: 

1381 """Fill cloud_config dict with cloud init details. 

1382 

1383 Args: 

1384 target_vdu (dict): Details of VDU to be created 

1385 vdu2cloud_init (dict): Cloud init dict 

1386 db (object): DB object 

1387 fs (object): FS object 

1388 

1389 Returns: 

1390 cloud_config (dict): Cloud config details of VDU 

1391 

1392 """ 

1393 # cloud config 

1394 cloud_config = {} 

1395 

1396 if target_vdu.get("cloud-init"): 

1397 if target_vdu["cloud-init"] not in vdu2cloud_init: 

1398 vdu2cloud_init[target_vdu["cloud-init"]] = Ns._get_cloud_init( 

1399 db=db, 

1400 fs=fs, 

1401 location=target_vdu["cloud-init"], 

1402 ) 

1403 

1404 cloud_content_ = vdu2cloud_init[target_vdu["cloud-init"]] 

1405 cloud_config["user-data"] = Ns._parse_jinja2( 

1406 cloud_init_content=cloud_content_, 

1407 params=target_vdu.get("additionalParams"), 

1408 context=target_vdu["cloud-init"], 

1409 ) 

1410 

1411 if target_vdu.get("boot-data-drive"): 

1412 cloud_config["boot-data-drive"] = target_vdu.get("boot-data-drive") 

1413 

1414 return cloud_config 

1415 

1416 @staticmethod 

1417 def _check_vld_information_of_interfaces( 

1418 interface: dict, ns_preffix: str, vnf_preffix: str 

1419 ) -> Optional[str]: 

1420 """Prepare the net_text by the virtual link information for vnf and ns level. 

1421 Args: 

1422 interface (dict): Interface details 

1423 ns_preffix (str): Prefix of NS 

1424 vnf_preffix (str): Prefix of VNF 

1425 

1426 Returns: 

1427 net_text (str): information of net 

1428 

1429 """ 

1430 net_text = "" 

1431 if interface.get("ns-vld-id"): 

1432 net_text = ns_preffix + ":vld." + interface["ns-vld-id"] 

1433 elif interface.get("vnf-vld-id"): 

1434 net_text = vnf_preffix + ":vld." + interface["vnf-vld-id"] 

1435 

1436 return net_text 

1437 

1438 @staticmethod 

1439 def _prepare_interface_port_security(interface: dict) -> None: 

1440 """ 

1441 

1442 Args: 

1443 interface (dict): Interface details 

1444 

1445 """ 

1446 if "port-security-enabled" in interface: 

1447 interface["port_security"] = interface.pop("port-security-enabled") 

1448 

1449 if "port-security-disable-strategy" in interface: 

1450 interface["port_security_disable_strategy"] = interface.pop( 

1451 "port-security-disable-strategy" 

1452 ) 

1453 

1454 @staticmethod 

1455 def _create_net_item_of_interface(interface: dict, net_text: str) -> dict: 

1456 """Prepare net item including name, port security, floating ip etc. 

1457 

1458 Args: 

1459 interface (dict): Interface details 

1460 net_text (str): information of net 

1461 

1462 Returns: 

1463 net_item (dict): Dict including net details 

1464 

1465 """ 

1466 

1467 net_item = { 

1468 x: v 

1469 for x, v in interface.items() 

1470 if x 

1471 in ( 

1472 "name", 

1473 "vpci", 

1474 "port_security", 

1475 "port_security_disable_strategy", 

1476 "floating_ip", 

1477 ) 

1478 } 

1479 net_item["net_id"] = "TASK-" + net_text 

1480 net_item["type"] = "virtual" 

1481 

1482 return net_item 

1483 

1484 @staticmethod 

1485 def _prepare_type_of_interface( 

1486 interface: dict, tasks_by_target_record_id: dict, net_text: str, net_item: dict 

1487 ) -> None: 

1488 """Fill the net item type by interface type such as SR-IOV, OM-MGMT, bridge etc. 

1489 

1490 Args: 

1491 interface (dict): Interface details 

1492 tasks_by_target_record_id (dict): Task details 

1493 net_text (str): information of net 

1494 net_item (dict): Dict including net details 

1495 

1496 """ 

1497 # TODO mac_address: used for SR-IOV ifaces #TODO for other types 

1498 # TODO floating_ip: True/False (or it can be None) 

1499 

1500 if interface.get("type") in ("SR-IOV", "PCI-PASSTHROUGH"): 

1501 # Mark the net create task as type data 

1502 if deep_get( 

1503 tasks_by_target_record_id, 

1504 net_text, 

1505 "extra_dict", 

1506 "params", 

1507 "net_type", 

1508 ): 

1509 tasks_by_target_record_id[net_text]["extra_dict"]["params"][ 

1510 "net_type" 

1511 ] = "data" 

1512 

1513 net_item["use"] = "data" 

1514 net_item["model"] = interface["type"] 

1515 net_item["type"] = interface["type"] 

1516 

1517 elif ( 

1518 interface.get("type") == "OM-MGMT" 

1519 or interface.get("mgmt-interface") 

1520 or interface.get("mgmt-vnf") 

1521 ): 

1522 net_item["use"] = "mgmt" 

1523 

1524 else: 

1525 # If interface.get("type") in ("VIRTIO", "E1000", "PARAVIRT"): 

1526 net_item["use"] = "bridge" 

1527 net_item["model"] = interface.get("type") 

1528 

1529 @staticmethod 

1530 def _prepare_vdu_interfaces( 

1531 target_vdu: dict, 

1532 extra_dict: dict, 

1533 ns_preffix: str, 

1534 vnf_preffix: str, 

1535 logger: object, 

1536 tasks_by_target_record_id: dict, 

1537 net_list: list, 

1538 ) -> None: 

1539 """Prepare the net_item and add net_list, add mgmt interface to extra_dict. 

1540 

1541 Args: 

1542 target_vdu (dict): VDU to be created 

1543 extra_dict (dict): Dictionary to be filled 

1544 ns_preffix (str): NS prefix as string 

1545 vnf_preffix (str): VNF prefix as string 

1546 logger (object): Logger Object 

1547 tasks_by_target_record_id (dict): Task details 

1548 net_list (list): Net list of VDU 

1549 """ 

1550 for iface_index, interface in enumerate(target_vdu["interfaces"]): 

1551 net_text = Ns._check_vld_information_of_interfaces( 

1552 interface, ns_preffix, vnf_preffix 

1553 ) 

1554 if not net_text: 

1555 # Interface not connected to any vld 

1556 logger.error( 

1557 "Interface {} from vdu {} not connected to any vld".format( 

1558 iface_index, target_vdu["vdu-name"] 

1559 ) 

1560 ) 

1561 continue 

1562 

1563 extra_dict["depends_on"].append(net_text) 

1564 

1565 Ns._prepare_interface_port_security(interface) 

1566 

1567 net_item = Ns._create_net_item_of_interface(interface, net_text) 

1568 

1569 Ns._prepare_type_of_interface( 

1570 interface, tasks_by_target_record_id, net_text, net_item 

1571 ) 

1572 

1573 if interface.get("ip-address"): 

1574 net_item["ip_address"] = interface["ip-address"] 

1575 

1576 if interface.get("mac-address"): 

1577 net_item["mac_address"] = interface["mac-address"] 

1578 

1579 net_list.append(net_item) 

1580 

1581 if interface.get("mgmt-vnf"): 

1582 extra_dict["mgmt_vnf_interface"] = iface_index 

1583 elif interface.get("mgmt-interface"): 

1584 extra_dict["mgmt_vdu_interface"] = iface_index 

1585 

1586 @staticmethod 

1587 def _prepare_vdu_ssh_keys( 

1588 target_vdu: dict, ro_nsr_public_key: dict, cloud_config: dict 

1589 ) -> None: 

1590 """Add ssh keys to cloud config. 

1591 

1592 Args: 

1593 target_vdu (dict): Details of VDU to be created 

1594 ro_nsr_public_key (dict): RO NSR public Key 

1595 cloud_config (dict): Cloud config details 

1596 

1597 """ 

1598 ssh_keys = [] 

1599 

1600 if target_vdu.get("ssh-keys"): 

1601 ssh_keys += target_vdu.get("ssh-keys") 

1602 

1603 if target_vdu.get("ssh-access-required"): 

1604 ssh_keys.append(ro_nsr_public_key) 

1605 

1606 if ssh_keys: 

1607 cloud_config["key-pairs"] = ssh_keys 

1608 

1609 @staticmethod 

1610 def _select_persistent_root_disk(vsd: dict, vdu: dict) -> dict: 

1611 """Selects the persistent root disk if exists. 

1612 Args: 

1613 vsd (dict): Virtual storage descriptors in VNFD 

1614 vdu (dict): VNF descriptor 

1615 

1616 Returns: 

1617 root_disk (dict): Selected persistent root disk 

1618 """ 

1619 if vsd.get("id") == vdu.get("virtual-storage-desc", [[]])[0]: 

1620 root_disk = vsd 

1621 if root_disk.get("type-of-storage", "").endswith( 

1622 "persistent-storage" 

1623 ) and root_disk.get("size-of-storage"): 

1624 return root_disk 

1625 

1626 @staticmethod 

1627 def _add_persistent_root_disk_to_disk_list( 

1628 vnfd: dict, target_vdu: dict, persistent_root_disk: dict, disk_list: list 

1629 ) -> None: 

1630 """Find the persistent root disk and add to disk list. 

1631 

1632 Args: 

1633 vnfd (dict): VNF descriptor 

1634 target_vdu (dict): Details of VDU to be created 

1635 persistent_root_disk (dict): Details of persistent root disk 

1636 disk_list (list): Disks of VDU 

1637 

1638 """ 

1639 for vdu in vnfd.get("vdu", ()): 

1640 if vdu["name"] == target_vdu["vdu-name"]: 

1641 for vsd in vnfd.get("virtual-storage-desc", ()): 

1642 root_disk = Ns._select_persistent_root_disk(vsd, vdu) 

1643 if not root_disk: 

1644 continue 

1645 

1646 persistent_root_disk[vsd["id"]] = { 

1647 "image_id": vdu.get("sw-image-desc"), 

1648 "size": root_disk["size-of-storage"], 

1649 "keep": Ns.is_volume_keeping_required(root_disk), 

1650 } 

1651 disk_list.append(persistent_root_disk[vsd["id"]]) 

1652 break 

1653 

1654 @staticmethod 

1655 def _add_persistent_ordinary_disks_to_disk_list( 

1656 target_vdu: dict, 

1657 persistent_root_disk: dict, 

1658 persistent_ordinary_disk: dict, 

1659 disk_list: list, 

1660 extra_dict: dict, 

1661 vnf_id: str = None, 

1662 nsr_id: str = None, 

1663 ) -> None: 

1664 """Fill the disk list by adding persistent ordinary disks. 

1665 

1666 Args: 

1667 target_vdu (dict): Details of VDU to be created 

1668 persistent_root_disk (dict): Details of persistent root disk 

1669 persistent_ordinary_disk (dict): Details of persistent ordinary disk 

1670 disk_list (list): Disks of VDU 

1671 

1672 """ 

1673 if target_vdu.get("virtual-storages"): 

1674 for disk in target_vdu["virtual-storages"]: 

1675 if ( 

1676 disk.get("type-of-storage", "").endswith("persistent-storage") 

1677 and disk["id"] not in persistent_root_disk.keys() 

1678 ): 

1679 name, multiattach = Ns.is_shared_volume(disk, vnf_id) 

1680 persistent_ordinary_disk[disk["id"]] = { 

1681 "name": name, 

1682 "size": disk["size-of-storage"], 

1683 "keep": Ns.is_volume_keeping_required(disk), 

1684 "multiattach": multiattach, 

1685 } 

1686 disk_list.append(persistent_ordinary_disk[disk["id"]]) 

1687 if multiattach: # VDU creation has to wait for shared volumes 

1688 extra_dict["depends_on"].append( 

1689 f"nsrs:{nsr_id}:shared-volumes.{name}" 

1690 ) 

1691 

1692 @staticmethod 

1693 def _prepare_vdu_affinity_group_list( 

1694 target_vdu: dict, extra_dict: dict, ns_preffix: str 

1695 ) -> List[Dict[str, any]]: 

1696 """Process affinity group details to prepare affinity group list. 

1697 

1698 Args: 

1699 target_vdu (dict): Details of VDU to be created 

1700 extra_dict (dict): Dictionary to be filled 

1701 ns_preffix (str): Prefix as string 

1702 

1703 Returns: 

1704 

1705 affinity_group_list (list): Affinity group details 

1706 

1707 """ 

1708 affinity_group_list = [] 

1709 

1710 if target_vdu.get("affinity-or-anti-affinity-group-id"): 

1711 for affinity_group_id in target_vdu["affinity-or-anti-affinity-group-id"]: 

1712 affinity_group = {} 

1713 affinity_group_text = ( 

1714 ns_preffix + ":affinity-or-anti-affinity-group." + affinity_group_id 

1715 ) 

1716 

1717 if not isinstance(extra_dict.get("depends_on"), list): 

1718 raise NsException("Invalid extra_dict format.") 

1719 

1720 extra_dict["depends_on"].append(affinity_group_text) 

1721 affinity_group["affinity_group_id"] = "TASK-" + affinity_group_text 

1722 affinity_group_list.append(affinity_group) 

1723 

1724 return affinity_group_list 

1725 

1726 @staticmethod 

1727 def _process_vdu_params( 

1728 target_vdu: Dict[str, Any], 

1729 indata: Dict[str, Any], 

1730 vim_info: Dict[str, Any], 

1731 target_record_id: str, 

1732 **kwargs: Dict[str, Any], 

1733 ) -> Dict[str, Any]: 

1734 """Function to process VDU parameters. 

1735 

1736 Args: 

1737 target_vdu (Dict[str, Any]): [description] 

1738 indata (Dict[str, Any]): [description] 

1739 vim_info (Dict[str, Any]): [description] 

1740 target_record_id (str): [description] 

1741 

1742 Returns: 

1743 Dict[str, Any]: [description] 

1744 """ 

1745 vnfr_id = kwargs.get("vnfr_id") 

1746 nsr_id = kwargs.get("nsr_id") 

1747 vnfr = kwargs.get("vnfr") 

1748 vdu2cloud_init = kwargs.get("vdu2cloud_init") 

1749 tasks_by_target_record_id = kwargs.get("tasks_by_target_record_id") 

1750 logger = kwargs.get("logger") 

1751 db = kwargs.get("db") 

1752 fs = kwargs.get("fs") 

1753 ro_nsr_public_key = kwargs.get("ro_nsr_public_key") 

1754 

1755 vnf_preffix = "vnfrs:{}".format(vnfr_id) 

1756 ns_preffix = "nsrs:{}".format(nsr_id) 

1757 image_text = ns_preffix + ":image." + target_vdu["ns-image-id"] 

1758 flavor_text = ns_preffix + ":flavor." + target_vdu["ns-flavor-id"] 

1759 extra_dict = {"depends_on": [image_text, flavor_text]} 

1760 net_list = [] 

1761 persistent_root_disk = {} 

1762 persistent_ordinary_disk = {} 

1763 vdu_instantiation_volumes_list = [] 

1764 disk_list = [] 

1765 vnfd_id = vnfr["vnfd-id"] 

1766 vnfd = db.get_one("vnfds", {"_id": vnfd_id}) 

1767 # If the position info is provided for all the interfaces, it will be sorted 

1768 # according to position number ascendingly. 

1769 if all( 

1770 True if i.get("position") is not None else False 

1771 for i in target_vdu["interfaces"] 

1772 ): 

1773 Ns._sort_vdu_interfaces(target_vdu) 

1774 

1775 # If the position info is provided for some interfaces but not all of them, the interfaces 

1776 # which has specific position numbers will be placed and others' positions will not be taken care. 

1777 else: 

1778 Ns._partially_locate_vdu_interfaces(target_vdu) 

1779 

1780 # If the position info is not provided for the interfaces, interfaces will be attached 

1781 # according to the order in the VNFD. 

1782 Ns._prepare_vdu_interfaces( 

1783 target_vdu, 

1784 extra_dict, 

1785 ns_preffix, 

1786 vnf_preffix, 

1787 logger, 

1788 tasks_by_target_record_id, 

1789 net_list, 

1790 ) 

1791 

1792 # cloud config 

1793 cloud_config = Ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs) 

1794 

1795 # Prepare VDU ssh keys 

1796 Ns._prepare_vdu_ssh_keys(target_vdu, ro_nsr_public_key, cloud_config) 

1797 

1798 if target_vdu.get("additionalParams"): 

1799 vdu_instantiation_volumes_list = ( 

1800 target_vdu.get("additionalParams").get("OSM", {}).get("vdu_volumes") 

1801 ) 

1802 

1803 if vdu_instantiation_volumes_list: 

1804 # Find the root volumes and add to the disk_list 

1805 persistent_root_disk = Ns.find_persistent_root_volumes( 

1806 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list 

1807 ) 

1808 

1809 # Find the ordinary volumes which are not added to the persistent_root_disk 

1810 # and put them to the disk list 

1811 Ns.find_persistent_volumes( 

1812 persistent_root_disk, 

1813 target_vdu, 

1814 vdu_instantiation_volumes_list, 

1815 disk_list, 

1816 ) 

1817 

1818 else: 

1819 # Vdu_instantiation_volumes_list is empty 

1820 # First get add the persistent root disks to disk_list 

1821 Ns._add_persistent_root_disk_to_disk_list( 

1822 vnfd, target_vdu, persistent_root_disk, disk_list 

1823 ) 

1824 # Add the persistent non-root disks to disk_list 

1825 Ns._add_persistent_ordinary_disks_to_disk_list( 

1826 target_vdu, 

1827 persistent_root_disk, 

1828 persistent_ordinary_disk, 

1829 disk_list, 

1830 extra_dict, 

1831 vnfd["id"], 

1832 nsr_id, 

1833 ) 

1834 

1835 affinity_group_list = Ns._prepare_vdu_affinity_group_list( 

1836 target_vdu, extra_dict, ns_preffix 

1837 ) 

1838 

1839 instance_name = "{}-{}-{}-{}".format( 

1840 indata["name"], 

1841 vnfr["member-vnf-index-ref"], 

1842 target_vdu["vdu-name"], 

1843 target_vdu.get("count-index") or 0, 

1844 ) 

1845 security_group_name = None 

1846 if additional_params := target_vdu.get("additionalParams"): 

1847 if additional_params.get("OSM", {}).get("instance_name"): 

1848 instance_name = additional_params.get("OSM", {}).get("instance_name") 

1849 if count_index := target_vdu.get("count-index"): 

1850 if count_index >= 1: 

1851 instance_name = "{}-{}".format(instance_name, count_index) 

1852 if additional_params.get("OSM", {}).get("security-group-name"): 

1853 security_group_name = additional_params.get("OSM", {}).get( 

1854 "security-group-name" 

1855 ) 

1856 else: 

1857 security_group_name = None 

1858 

1859 extra_dict["params"] = { 

1860 "name": instance_name, 

1861 "description": target_vdu["vdu-name"], 

1862 "start": True, 

1863 "image_id": "TASK-" + image_text, 

1864 "flavor_id": "TASK-" + flavor_text, 

1865 "affinity_group_list": affinity_group_list, 

1866 "net_list": net_list, 

1867 "cloud_config": cloud_config or None, 

1868 "disk_list": disk_list, 

1869 "security_group_name": security_group_name, 

1870 "availability_zone_index": None, # TODO 

1871 "availability_zone_list": None, # TODO 

1872 } 

1873 return extra_dict 

1874 

1875 @staticmethod 

1876 def _process_shared_volumes_params( 

1877 target_shared_volume: Dict[str, Any], 

1878 indata: Dict[str, Any], 

1879 vim_info: Dict[str, Any], 

1880 target_record_id: str, 

1881 **kwargs: Dict[str, Any], 

1882 ) -> Dict[str, Any]: 

1883 extra_dict = {} 

1884 shared_volume_data = { 

1885 "size": target_shared_volume["size-of-storage"], 

1886 "name": target_shared_volume["id"], 

1887 "type": target_shared_volume["type-of-storage"], 

1888 "keep": Ns.is_volume_keeping_required(target_shared_volume), 

1889 } 

1890 extra_dict["params"] = shared_volume_data 

1891 return extra_dict 

1892 

1893 @staticmethod 

1894 def _process_affinity_group_params( 

1895 target_affinity_group: Dict[str, Any], 

1896 indata: Dict[str, Any], 

1897 vim_info: Dict[str, Any], 

1898 target_record_id: str, 

1899 **kwargs: Dict[str, Any], 

1900 ) -> Dict[str, Any]: 

1901 """Get affinity or anti-affinity group parameters. 

1902 

1903 Args: 

1904 target_affinity_group (Dict[str, Any]): [description] 

1905 indata (Dict[str, Any]): [description] 

1906 vim_info (Dict[str, Any]): [description] 

1907 target_record_id (str): [description] 

1908 

1909 Returns: 

1910 Dict[str, Any]: [description] 

1911 """ 

1912 

1913 extra_dict = {} 

1914 affinity_group_data = { 

1915 "name": target_affinity_group["name"], 

1916 "type": target_affinity_group["type"], 

1917 "scope": target_affinity_group["scope"], 

1918 } 

1919 

1920 if target_affinity_group.get("vim-affinity-group-id"): 

1921 affinity_group_data["vim-affinity-group-id"] = target_affinity_group[ 

1922 "vim-affinity-group-id" 

1923 ] 

1924 

1925 extra_dict["params"] = { 

1926 "affinity_group_data": affinity_group_data, 

1927 } 

1928 return extra_dict 

1929 

1930 @staticmethod 

1931 def _process_recreate_vdu_params( 

1932 existing_vdu: Dict[str, Any], 

1933 db_nsr: Dict[str, Any], 

1934 vim_info: Dict[str, Any], 

1935 target_record_id: str, 

1936 target_id: str, 

1937 **kwargs: Dict[str, Any], 

1938 ) -> Dict[str, Any]: 

1939 """Function to process VDU parameters to recreate. 

1940 

1941 Args: 

1942 existing_vdu (Dict[str, Any]): [description] 

1943 db_nsr (Dict[str, Any]): [description] 

1944 vim_info (Dict[str, Any]): [description] 

1945 target_record_id (str): [description] 

1946 target_id (str): [description] 

1947 

1948 Returns: 

1949 Dict[str, Any]: [description] 

1950 """ 

1951 vnfr = kwargs.get("vnfr") 

1952 vdu2cloud_init = kwargs.get("vdu2cloud_init") 

1953 # logger = kwargs.get("logger") 

1954 db = kwargs.get("db") 

1955 fs = kwargs.get("fs") 

1956 ro_nsr_public_key = kwargs.get("ro_nsr_public_key") 

1957 

1958 extra_dict = {} 

1959 net_list = [] 

1960 

1961 vim_details = {} 

1962 vim_details_text = existing_vdu["vim_info"][target_id].get("vim_details", None) 

1963 

1964 if vim_details_text: 

1965 vim_details = yaml.safe_load(f"{vim_details_text}") 

1966 

1967 for iface_index, interface in enumerate(existing_vdu["interfaces"]): 

1968 if "port-security-enabled" in interface: 

1969 interface["port_security"] = interface.pop("port-security-enabled") 

1970 

1971 if "port-security-disable-strategy" in interface: 

1972 interface["port_security_disable_strategy"] = interface.pop( 

1973 "port-security-disable-strategy" 

1974 ) 

1975 

1976 net_item = { 

1977 x: v 

1978 for x, v in interface.items() 

1979 if x 

1980 in ( 

1981 "name", 

1982 "vpci", 

1983 "port_security", 

1984 "port_security_disable_strategy", 

1985 "floating_ip", 

1986 ) 

1987 } 

1988 existing_ifaces = existing_vdu["vim_info"][target_id].get( 

1989 "interfaces_backup", [] 

1990 ) 

1991 net_id = next( 

1992 ( 

1993 i["vim_net_id"] 

1994 for i in existing_ifaces 

1995 if i["ip_address"] == interface["ip-address"] 

1996 ), 

1997 None, 

1998 ) 

1999 

2000 net_item["net_id"] = net_id 

2001 net_item["type"] = "virtual" 

2002 

2003 # TODO mac_address: used for SR-IOV ifaces #TODO for other types 

2004 # TODO floating_ip: True/False (or it can be None) 

2005 if interface.get("type") in ("SR-IOV", "PCI-PASSTHROUGH"): 

2006 net_item["use"] = "data" 

2007 net_item["model"] = interface["type"] 

2008 net_item["type"] = interface["type"] 

2009 elif ( 

2010 interface.get("type") == "OM-MGMT" 

2011 or interface.get("mgmt-interface") 

2012 or interface.get("mgmt-vnf") 

2013 ): 

2014 net_item["use"] = "mgmt" 

2015 else: 

2016 # if interface.get("type") in ("VIRTIO", "E1000", "PARAVIRT"): 

2017 net_item["use"] = "bridge" 

2018 net_item["model"] = interface.get("type") 

2019 

2020 if interface.get("ip-address"): 

2021 dual_ip = interface.get("ip-address").split(";") 

2022 if len(dual_ip) == 2: 

2023 net_item["ip_address"] = dual_ip 

2024 else: 

2025 net_item["ip_address"] = interface["ip-address"] 

2026 

2027 if interface.get("mac-address"): 

2028 net_item["mac_address"] = interface["mac-address"] 

2029 

2030 net_list.append(net_item) 

2031 

2032 if interface.get("mgmt-vnf"): 

2033 extra_dict["mgmt_vnf_interface"] = iface_index 

2034 elif interface.get("mgmt-interface"): 

2035 extra_dict["mgmt_vdu_interface"] = iface_index 

2036 

2037 # cloud config 

2038 cloud_config = {} 

2039 

2040 if existing_vdu.get("cloud-init"): 

2041 if existing_vdu["cloud-init"] not in vdu2cloud_init: 

2042 vdu2cloud_init[existing_vdu["cloud-init"]] = Ns._get_cloud_init( 

2043 db=db, 

2044 fs=fs, 

2045 location=existing_vdu["cloud-init"], 

2046 ) 

2047 

2048 cloud_content_ = vdu2cloud_init[existing_vdu["cloud-init"]] 

2049 cloud_config["user-data"] = Ns._parse_jinja2( 

2050 cloud_init_content=cloud_content_, 

2051 params=existing_vdu.get("additionalParams"), 

2052 context=existing_vdu["cloud-init"], 

2053 ) 

2054 

2055 if existing_vdu.get("boot-data-drive"): 

2056 cloud_config["boot-data-drive"] = existing_vdu.get("boot-data-drive") 

2057 

2058 ssh_keys = [] 

2059 

2060 if existing_vdu.get("ssh-keys"): 

2061 ssh_keys += existing_vdu.get("ssh-keys") 

2062 

2063 if existing_vdu.get("ssh-access-required"): 

2064 ssh_keys.append(ro_nsr_public_key) 

2065 

2066 if ssh_keys: 

2067 cloud_config["key-pairs"] = ssh_keys 

2068 

2069 disk_list = [] 

2070 for vol_id in vim_details.get("os-extended-volumes:volumes_attached", []): 

2071 disk_list.append({"vim_id": vol_id["id"]}) 

2072 

2073 affinity_group_list = [] 

2074 

2075 if existing_vdu.get("affinity-or-anti-affinity-group-id"): 

2076 affinity_group = {} 

2077 for affinity_group_id in existing_vdu["affinity-or-anti-affinity-group-id"]: 

2078 for group in db_nsr.get("affinity-or-anti-affinity-group"): 

2079 if ( 

2080 group["id"] == affinity_group_id 

2081 and group["vim_info"][target_id].get("vim_id", None) is not None 

2082 ): 

2083 affinity_group["affinity_group_id"] = group["vim_info"][ 

2084 target_id 

2085 ].get("vim_id", None) 

2086 affinity_group_list.append(affinity_group) 

2087 

2088 instance_name = "{}-{}-{}-{}".format( 

2089 db_nsr["name"], 

2090 vnfr["member-vnf-index-ref"], 

2091 existing_vdu["vdu-name"], 

2092 existing_vdu.get("count-index") or 0, 

2093 ) 

2094 if additional_params := existing_vdu.get("additionalParams"): 

2095 if additional_params.get("OSM", {}).get("instance_name"): 

2096 instance_name = additional_params.get("OSM", {}).get("instance_name") 

2097 if count_index := existing_vdu.get("count-index"): 

2098 if count_index >= 1: 

2099 instance_name = "{}-{}".format(instance_name, count_index) 

2100 if additional_params.get("OSM", {}).get("security-group-name"): 

2101 security_group_name = additional_params.get("OSM", {}).get( 

2102 "security-group-name" 

2103 ) 

2104 else: 

2105 security_group_name = None 

2106 

2107 extra_dict["params"] = { 

2108 "name": instance_name, 

2109 "description": existing_vdu["vdu-name"], 

2110 "start": True, 

2111 "image_id": vim_details["image"]["id"], 

2112 "flavor_id": vim_details["flavor"]["id"], 

2113 "affinity_group_list": affinity_group_list, 

2114 "net_list": net_list, 

2115 "cloud_config": cloud_config or None, 

2116 "disk_list": disk_list, 

2117 "security_group_name": security_group_name, 

2118 "availability_zone_index": None, # TODO 

2119 "availability_zone_list": None, # TODO 

2120 } 

2121 

2122 return extra_dict 

2123 

2124 def calculate_diff_items( 

2125 self, 

2126 indata, 

2127 db_nsr, 

2128 db_ro_nsr, 

2129 db_nsr_update, 

2130 item, 

2131 tasks_by_target_record_id, 

2132 action_id, 

2133 nsr_id, 

2134 task_index, 

2135 vnfr_id=None, 

2136 vnfr=None, 

2137 ): 

2138 """Function that returns the incremental changes (creation, deletion) 

2139 related to a specific item `item` to be done. This function should be 

2140 called for NS instantiation, NS termination, NS update to add a new VNF 

2141 or a new VLD, remove a VNF or VLD, etc. 

2142 Item can be `net`, `flavor`, `image` or `vdu`. 

2143 It takes a list of target items from indata (which came from the REST API) 

2144 and compares with the existing items from db_ro_nsr, identifying the 

2145 incremental changes to be done. During the comparison, it calls the method 

2146 `process_params` (which was passed as parameter, and is particular for each 

2147 `item`) 

2148 

2149 Args: 

2150 indata (Dict[str, Any]): deployment info 

2151 db_nsr: NSR record from DB 

2152 db_ro_nsr (Dict[str, Any]): record from "ro_nsrs" 

2153 db_nsr_update (Dict[str, Any]): NSR info to update in DB 

2154 item (str): element to process (net, vdu...) 

2155 tasks_by_target_record_id (Dict[str, Any]): 

2156 [<target_record_id>, <task>] 

2157 action_id (str): action id 

2158 nsr_id (str): NSR id 

2159 task_index (number): task index to add to task name 

2160 vnfr_id (str): VNFR id 

2161 vnfr (Dict[str, Any]): VNFR info 

2162 

2163 Returns: 

2164 List: list with the incremental changes (deletes, creates) for each item 

2165 number: current task index 

2166 """ 

2167 

2168 diff_items = [] 

2169 db_path = "" 

2170 db_record = "" 

2171 target_list = [] 

2172 existing_list = [] 

2173 process_params = None 

2174 vdu2cloud_init = indata.get("cloud_init_content") or {} 

2175 ro_nsr_public_key = db_ro_nsr["public_key"] 

2176 # According to the type of item, the path, the target_list, 

2177 # the existing_list and the method to process params are set 

2178 db_path = self.db_path_map[item] 

2179 process_params = self.process_params_function_map[item] 

2180 

2181 if item in ("sfp", "classification", "sf", "sfi"): 

2182 db_record = "nsrs:{}:{}".format(nsr_id, db_path) 

2183 target_vnffg = indata.get("vnffg", [])[0] 

2184 target_list = target_vnffg[item] 

2185 existing_list = db_nsr.get(item, []) 

2186 elif item in ("net", "vdu"): 

2187 # This case is specific for the NS VLD (not applied to VDU) 

2188 if vnfr is None: 

2189 db_record = "nsrs:{}:{}".format(nsr_id, db_path) 

2190 target_list = indata.get("ns", []).get(db_path, []) 

2191 existing_list = db_nsr.get(db_path, []) 

2192 # This case is common for VNF VLDs and VNF VDUs 

2193 else: 

2194 db_record = "vnfrs:{}:{}".format(vnfr_id, db_path) 

2195 target_vnf = next( 

2196 (vnf for vnf in indata.get("vnf", ()) if vnf["_id"] == vnfr_id), 

2197 None, 

2198 ) 

2199 target_list = target_vnf.get(db_path, []) if target_vnf else [] 

2200 existing_list = vnfr.get(db_path, []) 

2201 elif item in ( 

2202 "image", 

2203 "flavor", 

2204 "affinity-or-anti-affinity-group", 

2205 "shared-volumes", 

2206 ): 

2207 db_record = "nsrs:{}:{}".format(nsr_id, db_path) 

2208 target_list = indata.get(item, []) 

2209 existing_list = db_nsr.get(item, []) 

2210 else: 

2211 raise NsException("Item not supported: {}", item) 

2212 # ensure all the target_list elements has an "id". If not assign the index as id 

2213 if target_list is None: 

2214 target_list = [] 

2215 for target_index, tl in enumerate(target_list): 

2216 if tl and not tl.get("id"): 

2217 tl["id"] = str(target_index) 

2218 # step 1 items (networks,vdus,...) to be deleted/updated 

2219 for item_index, existing_item in enumerate(existing_list): 

2220 target_item = next( 

2221 (t for t in target_list if t["id"] == existing_item["id"]), 

2222 None, 

2223 ) 

2224 for target_vim, existing_viminfo in existing_item.get( 

2225 "vim_info", {} 

2226 ).items(): 

2227 if existing_viminfo is None: 

2228 continue 

2229 

2230 if target_item: 

2231 target_viminfo = target_item.get("vim_info", {}).get(target_vim) 

2232 else: 

2233 target_viminfo = None 

2234 

2235 if target_viminfo is None: 

2236 # must be deleted 

2237 self._assign_vim(target_vim) 

2238 target_record_id = "{}.{}".format(db_record, existing_item["id"]) 

2239 item_ = item 

2240 

2241 if target_vim.startswith("sdn") or target_vim.startswith("wim"): 

2242 # item must be sdn-net instead of net if target_vim is a sdn 

2243 item_ = "sdn_net" 

2244 target_record_id += ".sdn" 

2245 

2246 deployment_info = { 

2247 "action_id": action_id, 

2248 "nsr_id": nsr_id, 

2249 "task_index": task_index, 

2250 } 

2251 

2252 diff_items.append( 

2253 { 

2254 "deployment_info": deployment_info, 

2255 "target_id": target_vim, 

2256 "item": item_, 

2257 "action": "DELETE", 

2258 "target_record": f"{db_record}.{item_index}.vim_info.{target_vim}", 

2259 "target_record_id": target_record_id, 

2260 } 

2261 ) 

2262 task_index += 1 

2263 

2264 # step 2 items (networks,vdus,...) to be created 

2265 for target_item in target_list: 

2266 item_index = -1 

2267 for item_index, existing_item in enumerate(existing_list): 

2268 if existing_item["id"] == target_item["id"]: 

2269 break 

2270 else: 

2271 item_index += 1 

2272 db_nsr_update[db_path + ".{}".format(item_index)] = target_item 

2273 existing_list.append(target_item) 

2274 existing_item = None 

2275 

2276 for target_vim, target_viminfo in target_item.get("vim_info", {}).items(): 

2277 existing_viminfo = None 

2278 

2279 if existing_item: 

2280 existing_viminfo = existing_item.get("vim_info", {}).get(target_vim) 

2281 

2282 if existing_viminfo is not None: 

2283 continue 

2284 

2285 target_record_id = "{}.{}".format(db_record, target_item["id"]) 

2286 item_ = item 

2287 

2288 if target_vim.startswith("sdn") or target_vim.startswith("wim"): 

2289 # item must be sdn-net instead of net if target_vim is a sdn 

2290 item_ = "sdn_net" 

2291 target_record_id += ".sdn" 

2292 

2293 kwargs = {} 

2294 self.logger.debug( 

2295 "ns.calculate_diff_items target_item={}".format(target_item) 

2296 ) 

2297 if process_params == Ns._process_flavor_params: 

2298 kwargs.update( 

2299 { 

2300 "db": self.db, 

2301 } 

2302 ) 

2303 self.logger.debug( 

2304 "calculate_diff_items for flavor kwargs={}".format(kwargs) 

2305 ) 

2306 

2307 if process_params == Ns._process_vdu_params: 

2308 self.logger.debug("calculate_diff_items self.fs={}".format(self.fs)) 

2309 kwargs.update( 

2310 { 

2311 "vnfr_id": vnfr_id, 

2312 "nsr_id": nsr_id, 

2313 "vnfr": vnfr, 

2314 "vdu2cloud_init": vdu2cloud_init, 

2315 "tasks_by_target_record_id": tasks_by_target_record_id, 

2316 "logger": self.logger, 

2317 "db": self.db, 

2318 "fs": self.fs, 

2319 "ro_nsr_public_key": ro_nsr_public_key, 

2320 } 

2321 ) 

2322 self.logger.debug("calculate_diff_items kwargs={}".format(kwargs)) 

2323 if ( 

2324 process_params == Ns._process_sfi_params 

2325 or Ns._process_sf_params 

2326 or Ns._process_classification_params 

2327 or Ns._process_sfp_params 

2328 ): 

2329 kwargs.update({"nsr_id": nsr_id, "db": self.db}) 

2330 

2331 self.logger.debug("calculate_diff_items kwargs={}".format(kwargs)) 

2332 

2333 extra_dict = process_params( 

2334 target_item, 

2335 indata, 

2336 target_viminfo, 

2337 target_record_id, 

2338 **kwargs, 

2339 ) 

2340 self._assign_vim(target_vim) 

2341 

2342 deployment_info = { 

2343 "action_id": action_id, 

2344 "nsr_id": nsr_id, 

2345 "task_index": task_index, 

2346 } 

2347 

2348 new_item = { 

2349 "deployment_info": deployment_info, 

2350 "target_id": target_vim, 

2351 "item": item_, 

2352 "action": "CREATE", 

2353 "target_record": f"{db_record}.{item_index}.vim_info.{target_vim}", 

2354 "target_record_id": target_record_id, 

2355 "extra_dict": extra_dict, 

2356 "common_id": target_item.get("common_id", None), 

2357 } 

2358 diff_items.append(new_item) 

2359 tasks_by_target_record_id[target_record_id] = new_item 

2360 task_index += 1 

2361 

2362 db_nsr_update[db_path + ".{}".format(item_index)] = target_item 

2363 

2364 return diff_items, task_index 

2365 

2366 def _process_vnfgd_sfp(self, sfp): 

2367 processed_sfp = {} 

2368 # getting sfp name, sfs and classifications in sfp to store it in processed_sfp 

2369 processed_sfp["id"] = sfp["id"] 

2370 sfs_in_sfp = [ 

2371 sf["id"] for sf in sfp.get("position-desc-id", [])[0].get("cp-profile-id") 

2372 ] 

2373 classifications_in_sfp = [ 

2374 classi["id"] 

2375 for classi in sfp.get("position-desc-id", [])[0].get("match-attributes") 

2376 ] 

2377 

2378 # creating a list of sfp with sfs and classifications 

2379 processed_sfp["sfs"] = sfs_in_sfp 

2380 processed_sfp["classifications"] = classifications_in_sfp 

2381 

2382 return processed_sfp 

2383 

2384 def _process_vnfgd_sf(self, sf): 

2385 processed_sf = {} 

2386 # getting name of sf 

2387 processed_sf["id"] = sf["id"] 

2388 # getting sfis in sf 

2389 sfis_in_sf = sf.get("constituent-profile-elements") 

2390 sorted_sfis = sorted(sfis_in_sf, key=lambda i: i["order"]) 

2391 # getting sfis names 

2392 processed_sf["sfis"] = [sfi["id"] for sfi in sorted_sfis] 

2393 

2394 return processed_sf 

2395 

2396 def _process_vnfgd_sfi(self, sfi, db_vnfrs): 

2397 processed_sfi = {} 

2398 # getting name of sfi 

2399 processed_sfi["id"] = sfi["id"] 

2400 

2401 # getting ports in sfi 

2402 ingress_port = sfi["ingress-constituent-cpd-id"] 

2403 egress_port = sfi["egress-constituent-cpd-id"] 

2404 sfi_vnf_member_index = sfi["constituent-base-element-id"] 

2405 

2406 processed_sfi["ingress_port"] = ingress_port 

2407 processed_sfi["egress_port"] = egress_port 

2408 

2409 all_vnfrs = db_vnfrs.values() 

2410 

2411 sfi_vnfr = [ 

2412 element 

2413 for element in all_vnfrs 

2414 if element["member-vnf-index-ref"] == sfi_vnf_member_index 

2415 ] 

2416 processed_sfi["vnfr_id"] = sfi_vnfr[0]["id"] 

2417 

2418 sfi_vnfr_cp = sfi_vnfr[0]["connection-point"] 

2419 

2420 ingress_port_index = [ 

2421 c for c, element in enumerate(sfi_vnfr_cp) if element["id"] == ingress_port 

2422 ] 

2423 ingress_port_index = ingress_port_index[0] 

2424 

2425 processed_sfi["vdur_id"] = sfi_vnfr_cp[ingress_port_index][ 

2426 "connection-point-vdu-id" 

2427 ] 

2428 processed_sfi["ingress_port_index"] = ingress_port_index 

2429 processed_sfi["egress_port_index"] = ingress_port_index 

2430 

2431 if egress_port != ingress_port: 

2432 egress_port_index = [ 

2433 c 

2434 for c, element in enumerate(sfi_vnfr_cp) 

2435 if element["id"] == egress_port 

2436 ] 

2437 processed_sfi["egress_port_index"] = egress_port_index 

2438 

2439 return processed_sfi 

2440 

2441 def _process_vnfgd_classification(self, classification, db_vnfrs): 

2442 processed_classification = {} 

2443 

2444 processed_classification = deepcopy(classification) 

2445 classi_vnf_member_index = processed_classification[ 

2446 "constituent-base-element-id" 

2447 ] 

2448 logical_source_port = processed_classification["constituent-cpd-id"] 

2449 

2450 all_vnfrs = db_vnfrs.values() 

2451 

2452 classi_vnfr = [ 

2453 element 

2454 for element in all_vnfrs 

2455 if element["member-vnf-index-ref"] == classi_vnf_member_index 

2456 ] 

2457 processed_classification["vnfr_id"] = classi_vnfr[0]["id"] 

2458 

2459 classi_vnfr_cp = classi_vnfr[0]["connection-point"] 

2460 

2461 ingress_port_index = [ 

2462 c 

2463 for c, element in enumerate(classi_vnfr_cp) 

2464 if element["id"] == logical_source_port 

2465 ] 

2466 ingress_port_index = ingress_port_index[0] 

2467 

2468 processed_classification["ingress_port_index"] = ingress_port_index 

2469 processed_classification["vdur_id"] = classi_vnfr_cp[ingress_port_index][ 

2470 "connection-point-vdu-id" 

2471 ] 

2472 

2473 return processed_classification 

2474 

2475 def _update_db_nsr_with_vnffg(self, processed_vnffg, vim_info, nsr_id): 

2476 """This method used to add viminfo dict to sfi, sf sfp and classification in indata and count info in db_nsr. 

2477 

2478 Args: 

2479 processed_vnffg (Dict[str, Any]): deployment info 

2480 vim_info (Dict): dictionary to store VIM resource information 

2481 nsr_id (str): NSR id 

2482 

2483 Returns: None 

2484 """ 

2485 

2486 nsr_sfi = {} 

2487 nsr_sf = {} 

2488 nsr_sfp = {} 

2489 nsr_classification = {} 

2490 db_nsr_vnffg = deepcopy(processed_vnffg) 

2491 

2492 for count, sfi in enumerate(processed_vnffg["sfi"]): 

2493 sfi["vim_info"] = vim_info 

2494 sfi_count = "sfi.{}".format(count) 

2495 nsr_sfi[sfi_count] = db_nsr_vnffg["sfi"][count] 

2496 

2497 self.db.set_list("nsrs", {"_id": nsr_id}, nsr_sfi) 

2498 

2499 for count, sf in enumerate(processed_vnffg["sf"]): 

2500 sf["vim_info"] = vim_info 

2501 sf_count = "sf.{}".format(count) 

2502 nsr_sf[sf_count] = db_nsr_vnffg["sf"][count] 

2503 

2504 self.db.set_list("nsrs", {"_id": nsr_id}, nsr_sf) 

2505 

2506 for count, sfp in enumerate(processed_vnffg["sfp"]): 

2507 sfp["vim_info"] = vim_info 

2508 sfp_count = "sfp.{}".format(count) 

2509 nsr_sfp[sfp_count] = db_nsr_vnffg["sfp"][count] 

2510 

2511 self.db.set_list("nsrs", {"_id": nsr_id}, nsr_sfp) 

2512 

2513 for count, classi in enumerate(processed_vnffg["classification"]): 

2514 classi["vim_info"] = vim_info 

2515 classification_count = "classification.{}".format(count) 

2516 nsr_classification[classification_count] = db_nsr_vnffg["classification"][ 

2517 count 

2518 ] 

2519 

2520 self.db.set_list("nsrs", {"_id": nsr_id}, nsr_classification) 

2521 

2522 def process_vnffgd_descriptor( 

2523 self, 

2524 indata: dict, 

2525 nsr_id: str, 

2526 db_nsr: dict, 

2527 db_vnfrs: dict, 

2528 ) -> dict: 

2529 """This method used to process vnffgd parameters from descriptor. 

2530 

2531 Args: 

2532 indata (Dict[str, Any]): deployment info 

2533 nsr_id (str): NSR id 

2534 db_nsr: NSR record from DB 

2535 db_vnfrs: VNFRS record from DB 

2536 

2537 Returns: 

2538 Dict: Processed vnffg parameters. 

2539 """ 

2540 

2541 processed_vnffg = {} 

2542 vnffgd = db_nsr.get("nsd", {}).get("vnffgd") 

2543 vnf_list = indata.get("vnf", []) 

2544 vim_text = "" 

2545 

2546 if vnf_list: 

2547 vim_text = "vim:" + vnf_list[0].get("vim-account-id", "") 

2548 

2549 vim_info = {} 

2550 vim_info[vim_text] = {} 

2551 processed_sfps = [] 

2552 processed_classifications = [] 

2553 processed_sfs = [] 

2554 processed_sfis = [] 

2555 

2556 # setting up intial empty entries for vnffg items in mongodb. 

2557 self.db.set_list( 

2558 "nsrs", 

2559 {"_id": nsr_id}, 

2560 { 

2561 "sfi": [], 

2562 "sf": [], 

2563 "sfp": [], 

2564 "classification": [], 

2565 }, 

2566 ) 

2567 

2568 vnffg = vnffgd[0] 

2569 # getting sfps 

2570 sfps = vnffg.get("nfpd") 

2571 for sfp in sfps: 

2572 processed_sfp = self._process_vnfgd_sfp(sfp) 

2573 # appending the list of processed sfps 

2574 processed_sfps.append(processed_sfp) 

2575 

2576 # getting sfs in sfp 

2577 sfs = sfp.get("position-desc-id")[0].get("cp-profile-id") 

2578 for sf in sfs: 

2579 processed_sf = self._process_vnfgd_sf(sf) 

2580 

2581 # appending the list of processed sfs 

2582 processed_sfs.append(processed_sf) 

2583 

2584 # getting sfis in sf 

2585 sfis_in_sf = sf.get("constituent-profile-elements") 

2586 sorted_sfis = sorted(sfis_in_sf, key=lambda i: i["order"]) 

2587 

2588 for sfi in sorted_sfis: 

2589 processed_sfi = self._process_vnfgd_sfi(sfi, db_vnfrs) 

2590 

2591 processed_sfis.append(processed_sfi) 

2592 

2593 classifications = sfp.get("position-desc-id")[0].get("match-attributes") 

2594 # getting classifications from sfp 

2595 for classification in classifications: 

2596 processed_classification = self._process_vnfgd_classification( 

2597 classification, db_vnfrs 

2598 ) 

2599 

2600 processed_classifications.append(processed_classification) 

2601 

2602 processed_vnffg["sfi"] = processed_sfis 

2603 processed_vnffg["sf"] = processed_sfs 

2604 processed_vnffg["classification"] = processed_classifications 

2605 processed_vnffg["sfp"] = processed_sfps 

2606 

2607 # adding viminfo dict to sfi, sf sfp and classification 

2608 self._update_db_nsr_with_vnffg(processed_vnffg, vim_info, nsr_id) 

2609 

2610 # updating indata with vnffg porcessed parameters 

2611 indata["vnffg"].append(processed_vnffg) 

2612 

2613 def calculate_all_differences_to_deploy( 

2614 self, 

2615 indata, 

2616 nsr_id, 

2617 db_nsr, 

2618 db_vnfrs, 

2619 db_ro_nsr, 

2620 db_nsr_update, 

2621 db_vnfrs_update, 

2622 action_id, 

2623 tasks_by_target_record_id, 

2624 ): 

2625 """This method calculates the ordered list of items (`changes_list`) 

2626 to be created and deleted. 

2627 

2628 Args: 

2629 indata (Dict[str, Any]): deployment info 

2630 nsr_id (str): NSR id 

2631 db_nsr: NSR record from DB 

2632 db_vnfrs: VNFRS record from DB 

2633 db_ro_nsr (Dict[str, Any]): record from "ro_nsrs" 

2634 db_nsr_update (Dict[str, Any]): NSR info to update in DB 

2635 db_vnfrs_update (Dict[str, Any]): VNFRS info to update in DB 

2636 action_id (str): action id 

2637 tasks_by_target_record_id (Dict[str, Any]): 

2638 [<target_record_id>, <task>] 

2639 

2640 Returns: 

2641 List: ordered list of items to be created and deleted. 

2642 """ 

2643 

2644 task_index = 0 

2645 # set list with diffs: 

2646 changes_list = [] 

2647 

2648 # processing vnffg from descriptor parameter 

2649 vnffgd = db_nsr.get("nsd").get("vnffgd") 

2650 if vnffgd is not None: 

2651 indata["vnffg"] = [] 

2652 vnf_list = indata["vnf"] 

2653 processed_vnffg = {} 

2654 

2655 # in case of ns-delete 

2656 if not vnf_list: 

2657 processed_vnffg["sfi"] = [] 

2658 processed_vnffg["sf"] = [] 

2659 processed_vnffg["classification"] = [] 

2660 processed_vnffg["sfp"] = [] 

2661 

2662 indata["vnffg"].append(processed_vnffg) 

2663 

2664 else: 

2665 self.process_vnffgd_descriptor( 

2666 indata=indata, 

2667 nsr_id=nsr_id, 

2668 db_nsr=db_nsr, 

2669 db_vnfrs=db_vnfrs, 

2670 ) 

2671 

2672 # getting updated db_nsr having vnffg parameters 

2673 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) 

2674 

2675 self.logger.debug( 

2676 "After processing vnffd parameters indata={} nsr={}".format( 

2677 indata, db_nsr 

2678 ) 

2679 ) 

2680 

2681 for item in ["sfp", "classification", "sf", "sfi"]: 

2682 self.logger.debug("process NS={} {}".format(nsr_id, item)) 

2683 diff_items, task_index = self.calculate_diff_items( 

2684 indata=indata, 

2685 db_nsr=db_nsr, 

2686 db_ro_nsr=db_ro_nsr, 

2687 db_nsr_update=db_nsr_update, 

2688 item=item, 

2689 tasks_by_target_record_id=tasks_by_target_record_id, 

2690 action_id=action_id, 

2691 nsr_id=nsr_id, 

2692 task_index=task_index, 

2693 vnfr_id=None, 

2694 ) 

2695 changes_list += diff_items 

2696 

2697 # NS vld, image and flavor 

2698 for item in [ 

2699 "net", 

2700 "image", 

2701 "flavor", 

2702 "affinity-or-anti-affinity-group", 

2703 ]: 

2704 self.logger.debug("process NS={} {}".format(nsr_id, item)) 

2705 diff_items, task_index = self.calculate_diff_items( 

2706 indata=indata, 

2707 db_nsr=db_nsr, 

2708 db_ro_nsr=db_ro_nsr, 

2709 db_nsr_update=db_nsr_update, 

2710 item=item, 

2711 tasks_by_target_record_id=tasks_by_target_record_id, 

2712 action_id=action_id, 

2713 nsr_id=nsr_id, 

2714 task_index=task_index, 

2715 vnfr_id=None, 

2716 ) 

2717 changes_list += diff_items 

2718 

2719 # VNF vlds and vdus 

2720 for vnfr_id, vnfr in db_vnfrs.items(): 

2721 # vnfr_id need to be set as global variable for among others nested method _process_vdu_params 

2722 for item in ["net", "vdu", "shared-volumes"]: 

2723 self.logger.debug("process VNF={} {}".format(vnfr_id, item)) 

2724 diff_items, task_index = self.calculate_diff_items( 

2725 indata=indata, 

2726 db_nsr=db_nsr, 

2727 db_ro_nsr=db_ro_nsr, 

2728 db_nsr_update=db_vnfrs_update[vnfr["_id"]], 

2729 item=item, 

2730 tasks_by_target_record_id=tasks_by_target_record_id, 

2731 action_id=action_id, 

2732 nsr_id=nsr_id, 

2733 task_index=task_index, 

2734 vnfr_id=vnfr_id, 

2735 vnfr=vnfr, 

2736 ) 

2737 changes_list += diff_items 

2738 

2739 return changes_list 

2740 

2741 def define_all_tasks( 

2742 self, 

2743 changes_list, 

2744 db_new_tasks, 

2745 tasks_by_target_record_id, 

2746 ): 

2747 """Function to create all the task structures obtanied from 

2748 the method calculate_all_differences_to_deploy 

2749 

2750 Args: 

2751 changes_list (List): ordered list of items to be created or deleted 

2752 db_new_tasks (List): tasks list to be created 

2753 action_id (str): action id 

2754 tasks_by_target_record_id (Dict[str, Any]): 

2755 [<target_record_id>, <task>] 

2756 

2757 """ 

2758 

2759 task_id_list = [] 

2760 for change in changes_list: 

2761 task = Ns._create_task( 

2762 deployment_info=change["deployment_info"], 

2763 target_id=change["target_id"], 

2764 item=change["item"], 

2765 action=change["action"], 

2766 target_record=change["target_record"], 

2767 target_record_id=change["target_record_id"], 

2768 extra_dict=change.get("extra_dict", None), 

2769 ) 

2770 if task.get("item") == "vdu" and task.get("params", {}).get( 

2771 "affinity_group_list" 

2772 ): 

2773 task_id_list.append(task["task_id"]) 

2774 target_record_data = task["target_record"].split(":") 

2775 vdur_index = target_record_data[2].split(".")[1] 

2776 if vdur_index != "0": 

2777 self.insert_task_id(task_id_list, task) 

2778 del task_id_list[0] 

2779 

2780 self.logger.debug("ns.define_all_tasks task={}".format(task)) 

2781 tasks_by_target_record_id[change["target_record_id"]] = task 

2782 db_new_tasks.append(task) 

2783 

2784 if change.get("common_id"): 

2785 task["common_id"] = change["common_id"] 

2786 

2787 def insert_task_id(self, task_id_list, task): 

2788 for index, value in enumerate(task_id_list): 

2789 if index != 0: 

2790 task_id = task_id_list[index - 1] 

2791 vdur_text = "{}".format(task_id) 

2792 task.get("depends_on").append(vdur_text) 

2793 break 

2794 return task 

2795 

2796 def upload_all_tasks( 

2797 self, 

2798 db_new_tasks, 

2799 now, 

2800 ): 

2801 """Function to save all tasks in the common DB 

2802 

2803 Args: 

2804 db_new_tasks (List): tasks list to be created 

2805 now (time): current time 

2806 

2807 """ 

2808 

2809 nb_ro_tasks = 0 # for logging 

2810 

2811 for db_task in db_new_tasks: 

2812 target_id = db_task.pop("target_id") 

2813 common_id = db_task.get("common_id") 

2814 

2815 # Do not chek tasks with vim_status DELETED 

2816 # because in manual heealing there are two tasks for the same vdur: 

2817 # one with vim_status deleted and the other one with the actual VM status. 

2818 

2819 if common_id: 

2820 if self.db.set_one( 

2821 "ro_tasks", 

2822 q_filter={ 

2823 "target_id": target_id, 

2824 "tasks.common_id": common_id, 

2825 "vim_info.vim_status.ne": "DELETED", 

2826 }, 

2827 update_dict={"to_check_at": now, "modified_at": now}, 

2828 push={"tasks": db_task}, 

2829 fail_on_empty=False, 

2830 ): 

2831 continue 

2832 

2833 if not self.db.set_one( 

2834 "ro_tasks", 

2835 q_filter={ 

2836 "target_id": target_id, 

2837 "tasks.target_record": db_task["target_record"], 

2838 "vim_info.vim_status.ne": "DELETED", 

2839 }, 

2840 update_dict={"to_check_at": now, "modified_at": now}, 

2841 push={"tasks": db_task}, 

2842 fail_on_empty=False, 

2843 ): 

2844 # Create a ro_task 

2845 self.logger.debug("Updating database, Creating ro_tasks") 

2846 db_ro_task = Ns._create_ro_task(target_id, db_task) 

2847 nb_ro_tasks += 1 

2848 self.db.create("ro_tasks", db_ro_task) 

2849 

2850 self.logger.debug( 

2851 "Created {} ro_tasks; {} tasks - db_new_tasks={}".format( 

2852 nb_ro_tasks, len(db_new_tasks), db_new_tasks 

2853 ) 

2854 ) 

2855 

2856 def upload_recreate_tasks( 

2857 self, 

2858 db_new_tasks, 

2859 now, 

2860 ): 

2861 """Function to save recreate tasks in the common DB 

2862 

2863 Args: 

2864 db_new_tasks (List): tasks list to be created 

2865 now (time): current time 

2866 

2867 """ 

2868 

2869 nb_ro_tasks = 0 # for logging 

2870 

2871 for db_task in db_new_tasks: 

2872 target_id = db_task.pop("target_id") 

2873 self.logger.debug("target_id={} db_task={}".format(target_id, db_task)) 

2874 

2875 action = db_task.get("action", None) 

2876 

2877 # Create a ro_task 

2878 self.logger.debug("Updating database, Creating ro_tasks") 

2879 db_ro_task = Ns._create_ro_task(target_id, db_task) 

2880 

2881 # If DELETE task: the associated created items should be removed 

2882 # (except persistent volumes): 

2883 if action == "DELETE": 

2884 db_ro_task["vim_info"]["created"] = True 

2885 db_ro_task["vim_info"]["created_items"] = db_task.get( 

2886 "created_items", {} 

2887 ) 

2888 db_ro_task["vim_info"]["volumes_to_hold"] = db_task.get( 

2889 "volumes_to_hold", [] 

2890 ) 

2891 db_ro_task["vim_info"]["vim_id"] = db_task.get("vim_id", None) 

2892 

2893 nb_ro_tasks += 1 

2894 self.logger.debug("upload_all_tasks db_ro_task={}".format(db_ro_task)) 

2895 self.db.create("ro_tasks", db_ro_task) 

2896 

2897 self.logger.debug( 

2898 "Created {} ro_tasks; {} tasks - db_new_tasks={}".format( 

2899 nb_ro_tasks, len(db_new_tasks), db_new_tasks 

2900 ) 

2901 ) 

2902 

2903 def _prepare_created_items_for_healing( 

2904 self, 

2905 nsr_id, 

2906 target_record, 

2907 ): 

2908 created_items = {} 

2909 # Get created_items from ro_task 

2910 ro_tasks = self.db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id}) 

2911 for ro_task in ro_tasks: 

2912 for task in ro_task["tasks"]: 

2913 if ( 

2914 task["target_record"] == target_record 

2915 and task["action"] == "CREATE" 

2916 and ro_task["vim_info"]["created_items"] 

2917 ): 

2918 created_items = ro_task["vim_info"]["created_items"] 

2919 break 

2920 

2921 return created_items 

2922 

2923 def _prepare_persistent_volumes_for_healing( 

2924 self, 

2925 target_id, 

2926 existing_vdu, 

2927 ): 

2928 # The associated volumes of the VM shouldn't be removed 

2929 volumes_list = [] 

2930 vim_details = {} 

2931 vim_details_text = existing_vdu["vim_info"][target_id].get("vim_details", None) 

2932 if vim_details_text: 

2933 vim_details = yaml.safe_load(f"{vim_details_text}") 

2934 

2935 for vol_id in vim_details.get("os-extended-volumes:volumes_attached", []): 

2936 volumes_list.append(vol_id["id"]) 

2937 

2938 return volumes_list 

2939 

2940 def prepare_changes_to_recreate( 

2941 self, 

2942 indata, 

2943 nsr_id, 

2944 db_nsr, 

2945 db_vnfrs, 

2946 db_ro_nsr, 

2947 action_id, 

2948 tasks_by_target_record_id, 

2949 ): 

2950 """This method will obtain an ordered list of items (`changes_list`) 

2951 to be created and deleted to meet the recreate request. 

2952 """ 

2953 

2954 self.logger.debug( 

2955 "ns.prepare_changes_to_recreate nsr_id={} indata={}".format(nsr_id, indata) 

2956 ) 

2957 

2958 task_index = 0 

2959 # set list with diffs: 

2960 changes_list = [] 

2961 db_path = self.db_path_map["vdu"] 

2962 target_list = indata.get("healVnfData", {}) 

2963 vdu2cloud_init = indata.get("cloud_init_content") or {} 

2964 ro_nsr_public_key = db_ro_nsr["public_key"] 

2965 

2966 # Check each VNF of the target 

2967 for target_vnf in target_list: 

2968 # Find this VNF in the list from DB, raise exception if vnfInstanceId is not found 

2969 vnfr_id = target_vnf["vnfInstanceId"] 

2970 existing_vnf = db_vnfrs.get(vnfr_id, {}) 

2971 db_record = "vnfrs:{}:{}".format(vnfr_id, db_path) 

2972 # vim_account_id = existing_vnf.get("vim-account-id", "") 

2973 

2974 target_vdus = target_vnf.get("additionalParams", {}).get("vdu", []) 

2975 # Check each VDU of this VNF 

2976 if not target_vdus: 

2977 # Create target_vdu_list from DB, if VDUs are not specified 

2978 target_vdus = [] 

2979 for existing_vdu in existing_vnf.get("vdur"): 

2980 vdu_name = existing_vdu.get("vdu-name", None) 

2981 vdu_index = existing_vdu.get("count-index", 0) 

2982 vdu_to_be_healed = {"vdu-id": vdu_name, "count-index": vdu_index} 

2983 target_vdus.append(vdu_to_be_healed) 

2984 for target_vdu in target_vdus: 

2985 vdu_name = target_vdu.get("vdu-id", None) 

2986 # For multi instance VDU count-index is mandatory 

2987 # For single session VDU count-indes is 0 

2988 count_index = target_vdu.get("count-index", 0) 

2989 item_index = 0 

2990 existing_instance = {} 

2991 if existing_vnf: 

2992 for instance in existing_vnf.get("vdur", {}): 

2993 if ( 

2994 instance["vdu-name"] == vdu_name 

2995 and instance["count-index"] == count_index 

2996 ): 

2997 existing_instance = instance 

2998 break 

2999 else: 

3000 item_index += 1 

3001 

3002 target_record_id = "{}.{}".format(db_record, existing_instance["id"]) 

3003 

3004 # The target VIM is the one already existing in DB to recreate 

3005 for target_vim, target_viminfo in existing_instance.get( 

3006 "vim_info", {} 

3007 ).items(): 

3008 # step 1 vdu to be deleted 

3009 self._assign_vim(target_vim) 

3010 deployment_info = { 

3011 "action_id": action_id, 

3012 "nsr_id": nsr_id, 

3013 "task_index": task_index, 

3014 } 

3015 

3016 target_record = f"{db_record}.{item_index}.vim_info.{target_vim}" 

3017 created_items = self._prepare_created_items_for_healing( 

3018 nsr_id, target_record 

3019 ) 

3020 

3021 volumes_to_hold = self._prepare_persistent_volumes_for_healing( 

3022 target_vim, existing_instance 

3023 ) 

3024 

3025 # Specific extra params for recreate tasks: 

3026 extra_dict = { 

3027 "created_items": created_items, 

3028 "vim_id": existing_instance["vim-id"], 

3029 "volumes_to_hold": volumes_to_hold, 

3030 } 

3031 

3032 changes_list.append( 

3033 { 

3034 "deployment_info": deployment_info, 

3035 "target_id": target_vim, 

3036 "item": "vdu", 

3037 "action": "DELETE", 

3038 "target_record": target_record, 

3039 "target_record_id": target_record_id, 

3040 "extra_dict": extra_dict, 

3041 } 

3042 ) 

3043 delete_task_id = f"{action_id}:{task_index}" 

3044 task_index += 1 

3045 

3046 # step 2 vdu to be created 

3047 kwargs = {} 

3048 kwargs.update( 

3049 { 

3050 "vnfr_id": vnfr_id, 

3051 "nsr_id": nsr_id, 

3052 "vnfr": existing_vnf, 

3053 "vdu2cloud_init": vdu2cloud_init, 

3054 "tasks_by_target_record_id": tasks_by_target_record_id, 

3055 "logger": self.logger, 

3056 "db": self.db, 

3057 "fs": self.fs, 

3058 "ro_nsr_public_key": ro_nsr_public_key, 

3059 } 

3060 ) 

3061 

3062 extra_dict = self._process_recreate_vdu_params( 

3063 existing_instance, 

3064 db_nsr, 

3065 target_viminfo, 

3066 target_record_id, 

3067 target_vim, 

3068 **kwargs, 

3069 ) 

3070 

3071 # The CREATE task depens on the DELETE task 

3072 extra_dict["depends_on"] = [delete_task_id] 

3073 

3074 # Add volumes created from created_items if any 

3075 # Ports should be deleted with delete task and automatically created with create task 

3076 volumes = {} 

3077 for k, v in created_items.items(): 

3078 try: 

3079 k_item, _, k_id = k.partition(":") 

3080 if k_item == "volume": 

3081 volumes[k] = v 

3082 except Exception as e: 

3083 self.logger.error( 

3084 "Error evaluating created item {}: {}".format(k, e) 

3085 ) 

3086 extra_dict["previous_created_volumes"] = volumes 

3087 

3088 deployment_info = { 

3089 "action_id": action_id, 

3090 "nsr_id": nsr_id, 

3091 "task_index": task_index, 

3092 } 

3093 self._assign_vim(target_vim) 

3094 

3095 new_item = { 

3096 "deployment_info": deployment_info, 

3097 "target_id": target_vim, 

3098 "item": "vdu", 

3099 "action": "CREATE", 

3100 "target_record": target_record, 

3101 "target_record_id": target_record_id, 

3102 "extra_dict": extra_dict, 

3103 } 

3104 changes_list.append(new_item) 

3105 tasks_by_target_record_id[target_record_id] = new_item 

3106 task_index += 1 

3107 

3108 return changes_list 

3109 

3110 def recreate(self, session, indata, version, nsr_id, *args, **kwargs): 

3111 self.logger.debug("ns.recreate nsr_id={} indata={}".format(nsr_id, indata)) 

3112 # TODO: validate_input(indata, recreate_schema) 

3113 action_id = indata.get("action_id", str(uuid4())) 

3114 # get current deployment 

3115 db_vnfrs = {} # vnf's info indexed by _id 

3116 step = "" 

3117 logging_text = "Recreate nsr_id={} action_id={} indata={}".format( 

3118 nsr_id, action_id, indata 

3119 ) 

3120 self.logger.debug(logging_text + "Enter") 

3121 

3122 try: 

3123 step = "Getting ns and vnfr record from db" 

3124 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) 

3125 db_new_tasks = [] 

3126 tasks_by_target_record_id = {} 

3127 # read from db: vnf's of this ns 

3128 step = "Getting vnfrs from db" 

3129 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}) 

3130 self.logger.debug("ns.recreate: db_vnfrs_list={}".format(db_vnfrs_list)) 

3131 

3132 if not db_vnfrs_list: 

3133 raise NsException("Cannot obtain associated VNF for ns") 

3134 

3135 for vnfr in db_vnfrs_list: 

3136 db_vnfrs[vnfr["_id"]] = vnfr 

3137 

3138 now = time() 

3139 db_ro_nsr = self.db.get_one("ro_nsrs", {"_id": nsr_id}, fail_on_empty=False) 

3140 self.logger.debug("ns.recreate: db_ro_nsr={}".format(db_ro_nsr)) 

3141 

3142 if not db_ro_nsr: 

3143 db_ro_nsr = self._create_db_ro_nsrs(nsr_id, now) 

3144 

3145 with self.write_lock: 

3146 # NS 

3147 step = "process NS elements" 

3148 changes_list = self.prepare_changes_to_recreate( 

3149 indata=indata, 

3150 nsr_id=nsr_id, 

3151 db_nsr=db_nsr, 

3152 db_vnfrs=db_vnfrs, 

3153 db_ro_nsr=db_ro_nsr, 

3154 action_id=action_id, 

3155 tasks_by_target_record_id=tasks_by_target_record_id, 

3156 ) 

3157 

3158 self.define_all_tasks( 

3159 changes_list=changes_list, 

3160 db_new_tasks=db_new_tasks, 

3161 tasks_by_target_record_id=tasks_by_target_record_id, 

3162 ) 

3163 

3164 # Delete all ro_tasks registered for the targets vdurs (target_record) 

3165 # If task of type CREATE exist then vim will try to get info form deleted VMs. 

3166 # So remove all task related to target record. 

3167 ro_tasks = self.db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id}) 

3168 for change in changes_list: 

3169 for ro_task in ro_tasks: 

3170 for task in ro_task["tasks"]: 

3171 if task["target_record"] == change["target_record"]: 

3172 self.db.del_one( 

3173 "ro_tasks", 

3174 q_filter={ 

3175 "_id": ro_task["_id"], 

3176 "modified_at": ro_task["modified_at"], 

3177 }, 

3178 fail_on_empty=False, 

3179 ) 

3180 

3181 step = "Updating database, Appending tasks to ro_tasks" 

3182 self.upload_recreate_tasks( 

3183 db_new_tasks=db_new_tasks, 

3184 now=now, 

3185 ) 

3186 

3187 self.logger.debug( 

3188 logging_text + "Exit. Created {} tasks".format(len(db_new_tasks)) 

3189 ) 

3190 

3191 return ( 

3192 {"status": "ok", "nsr_id": nsr_id, "action_id": action_id}, 

3193 action_id, 

3194 True, 

3195 ) 

3196 except Exception as e: 

3197 if isinstance(e, (DbException, NsException)): 

3198 self.logger.error( 

3199 logging_text + "Exit Exception while '{}': {}".format(step, e) 

3200 ) 

3201 else: 

3202 e = traceback_format_exc() 

3203 self.logger.critical( 

3204 logging_text + "Exit Exception while '{}': {}".format(step, e), 

3205 exc_info=True, 

3206 ) 

3207 

3208 raise NsException(e) 

3209 

3210 def deploy(self, session, indata, version, nsr_id, *args, **kwargs): 

3211 self.logger.debug("ns.deploy nsr_id={} indata={}".format(nsr_id, indata)) 

3212 validate_input(indata, deploy_schema) 

3213 action_id = indata.get("action_id", str(uuid4())) 

3214 task_index = 0 

3215 # get current deployment 

3216 db_nsr_update = {} # update operation on nsrs 

3217 db_vnfrs_update = {} 

3218 db_vnfrs = {} # vnf's info indexed by _id 

3219 step = "" 

3220 logging_text = "Task deploy nsr_id={} action_id={} ".format(nsr_id, action_id) 

3221 self.logger.debug(logging_text + "Enter") 

3222 

3223 try: 

3224 step = "Getting ns and vnfr record from db" 

3225 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) 

3226 self.logger.debug("ns.deploy: db_nsr={}".format(db_nsr)) 

3227 db_new_tasks = [] 

3228 tasks_by_target_record_id = {} 

3229 # read from db: vnf's of this ns 

3230 step = "Getting vnfrs from db" 

3231 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}) 

3232 

3233 if not db_vnfrs_list: 

3234 raise NsException("Cannot obtain associated VNF for ns") 

3235 

3236 for vnfr in db_vnfrs_list: 

3237 db_vnfrs[vnfr["_id"]] = vnfr 

3238 db_vnfrs_update[vnfr["_id"]] = {} 

3239 self.logger.debug("ns.deploy db_vnfrs={}".format(db_vnfrs)) 

3240 

3241 now = time() 

3242 db_ro_nsr = self.db.get_one("ro_nsrs", {"_id": nsr_id}, fail_on_empty=False) 

3243 

3244 if not db_ro_nsr: 

3245 db_ro_nsr = self._create_db_ro_nsrs(nsr_id, now) 

3246 

3247 # check that action_id is not in the list of actions. Suffixed with :index 

3248 if action_id in db_ro_nsr["actions"]: 

3249 index = 1 

3250 

3251 while True: 

3252 new_action_id = "{}:{}".format(action_id, index) 

3253 

3254 if new_action_id not in db_ro_nsr["actions"]: 

3255 action_id = new_action_id 

3256 self.logger.debug( 

3257 logging_text 

3258 + "Changing action_id in use to {}".format(action_id) 

3259 ) 

3260 break 

3261 

3262 index += 1 

3263 

3264 def _process_action(indata): 

3265 nonlocal db_new_tasks 

3266 nonlocal action_id 

3267 nonlocal nsr_id 

3268 nonlocal task_index 

3269 nonlocal db_vnfrs 

3270 nonlocal db_ro_nsr 

3271 

3272 if indata["action"]["action"] == "inject_ssh_key": 

3273 key = indata["action"].get("key") 

3274 user = indata["action"].get("user") 

3275 password = indata["action"].get("password") 

3276 

3277 for vnf in indata.get("vnf", ()): 

3278 if vnf["_id"] not in db_vnfrs: 

3279 raise NsException("Invalid vnf={}".format(vnf["_id"])) 

3280 

3281 db_vnfr = db_vnfrs[vnf["_id"]] 

3282 

3283 for target_vdu in vnf.get("vdur", ()): 

3284 vdu_index, vdur = next( 

3285 ( 

3286 i_v 

3287 for i_v in enumerate(db_vnfr["vdur"]) 

3288 if i_v[1]["id"] == target_vdu["id"] 

3289 ), 

3290 (None, None), 

3291 ) 

3292 

3293 if not vdur: 

3294 raise NsException( 

3295 "Invalid vdu vnf={}.{}".format( 

3296 vnf["_id"], target_vdu["id"] 

3297 ) 

3298 ) 

3299 

3300 target_vim, vim_info = next( 

3301 k_v for k_v in vdur["vim_info"].items() 

3302 ) 

3303 self._assign_vim(target_vim) 

3304 target_record = "vnfrs:{}:vdur.{}.ssh_keys".format( 

3305 vnf["_id"], vdu_index 

3306 ) 

3307 extra_dict = { 

3308 "depends_on": [ 

3309 "vnfrs:{}:vdur.{}".format(vnf["_id"], vdur["id"]) 

3310 ], 

3311 "params": { 

3312 "ip_address": vdur.get("ip-address"), 

3313 "user": user, 

3314 "key": key, 

3315 "password": password, 

3316 "private_key": db_ro_nsr["private_key"], 

3317 "salt": db_ro_nsr["_id"], 

3318 "schema_version": db_ro_nsr["_admin"][ 

3319 "schema_version" 

3320 ], 

3321 }, 

3322 } 

3323 

3324 deployment_info = { 

3325 "action_id": action_id, 

3326 "nsr_id": nsr_id, 

3327 "task_index": task_index, 

3328 } 

3329 

3330 task = Ns._create_task( 

3331 deployment_info=deployment_info, 

3332 target_id=target_vim, 

3333 item="vdu", 

3334 action="EXEC", 

3335 target_record=target_record, 

3336 target_record_id=None, 

3337 extra_dict=extra_dict, 

3338 ) 

3339 

3340 task_index = deployment_info.get("task_index") 

3341 

3342 db_new_tasks.append(task) 

3343 

3344 with self.write_lock: 

3345 if indata.get("action"): 

3346 _process_action(indata) 

3347 else: 

3348 # compute network differences 

3349 # NS 

3350 step = "process NS elements" 

3351 changes_list = self.calculate_all_differences_to_deploy( 

3352 indata=indata, 

3353 nsr_id=nsr_id, 

3354 db_nsr=db_nsr, 

3355 db_vnfrs=db_vnfrs, 

3356 db_ro_nsr=db_ro_nsr, 

3357 db_nsr_update=db_nsr_update, 

3358 db_vnfrs_update=db_vnfrs_update, 

3359 action_id=action_id, 

3360 tasks_by_target_record_id=tasks_by_target_record_id, 

3361 ) 

3362 self.define_all_tasks( 

3363 changes_list=changes_list, 

3364 db_new_tasks=db_new_tasks, 

3365 tasks_by_target_record_id=tasks_by_target_record_id, 

3366 ) 

3367 

3368 step = "Updating database, Appending tasks to ro_tasks" 

3369 self.upload_all_tasks( 

3370 db_new_tasks=db_new_tasks, 

3371 now=now, 

3372 ) 

3373 

3374 step = "Updating database, nsrs" 

3375 if db_nsr_update: 

3376 self.db.set_one("nsrs", {"_id": nsr_id}, db_nsr_update) 

3377 

3378 for vnfr_id, db_vnfr_update in db_vnfrs_update.items(): 

3379 if db_vnfr_update: 

3380 step = "Updating database, vnfrs={}".format(vnfr_id) 

3381 self.db.set_one("vnfrs", {"_id": vnfr_id}, db_vnfr_update) 

3382 

3383 self.logger.debug( 

3384 logging_text + "Exit. Created {} tasks".format(len(db_new_tasks)) 

3385 ) 

3386 

3387 return ( 

3388 {"status": "ok", "nsr_id": nsr_id, "action_id": action_id}, 

3389 action_id, 

3390 True, 

3391 ) 

3392 except Exception as e: 

3393 if isinstance(e, (DbException, NsException)): 

3394 self.logger.error( 

3395 logging_text + "Exit Exception while '{}': {}".format(step, e) 

3396 ) 

3397 else: 

3398 e = traceback_format_exc() 

3399 self.logger.critical( 

3400 logging_text + "Exit Exception while '{}': {}".format(step, e), 

3401 exc_info=True, 

3402 ) 

3403 

3404 raise NsException(e) 

3405 

3406 def delete(self, session, indata, version, nsr_id, *args, **kwargs): 

3407 self.logger.debug("ns.delete version={} nsr_id={}".format(version, nsr_id)) 

3408 # self.db.del_list({"_id": ro_task["_id"], "tasks.nsr_id.ne": nsr_id}) 

3409 

3410 with self.write_lock: 

3411 try: 

3412 NsWorker.delete_db_tasks(self.db, nsr_id, None) 

3413 except NsWorkerException as e: 

3414 raise NsException(e) 

3415 

3416 return None, None, True 

3417 

3418 def status(self, session, indata, version, nsr_id, action_id, *args, **kwargs): 

3419 self.logger.debug( 

3420 "ns.status version={} nsr_id={}, action_id={} indata={}".format( 

3421 version, nsr_id, action_id, indata 

3422 ) 

3423 ) 

3424 task_list = [] 

3425 done = 0 

3426 total = 0 

3427 ro_tasks = self.db.get_list("ro_tasks", {"tasks.action_id": action_id}) 

3428 global_status = "DONE" 

3429 details = [] 

3430 

3431 for ro_task in ro_tasks: 

3432 for task in ro_task["tasks"]: 

3433 if task and task["action_id"] == action_id: 

3434 task_list.append(task) 

3435 total += 1 

3436 

3437 if task["status"] == "FAILED": 

3438 global_status = "FAILED" 

3439 error_text = "Error at {} {}: {}".format( 

3440 task["action"].lower(), 

3441 task["item"], 

3442 ro_task["vim_info"].get("vim_message") or "unknown", 

3443 ) 

3444 details.append(error_text) 

3445 elif task["status"] in ("SCHEDULED", "BUILD"): 

3446 if global_status != "FAILED": 

3447 global_status = "BUILD" 

3448 else: 

3449 done += 1 

3450 

3451 return_data = { 

3452 "status": global_status, 

3453 "details": ( 

3454 ". ".join(details) if details else "progress {}/{}".format(done, total) 

3455 ), 

3456 "nsr_id": nsr_id, 

3457 "action_id": action_id, 

3458 "tasks": task_list, 

3459 } 

3460 

3461 return return_data, None, True 

3462 

3463 def recreate_status( 

3464 self, session, indata, version, nsr_id, action_id, *args, **kwargs 

3465 ): 

3466 return self.status(session, indata, version, nsr_id, action_id, *args, **kwargs) 

3467 

3468 def cancel(self, session, indata, version, nsr_id, action_id, *args, **kwargs): 

3469 print( 

3470 "ns.cancel session={} indata={} version={} nsr_id={}, action_id={}".format( 

3471 session, indata, version, nsr_id, action_id 

3472 ) 

3473 ) 

3474 

3475 return None, None, True 

3476 

3477 def rebuild_start_stop_task( 

3478 self, 

3479 vdu_id, 

3480 vnf_id, 

3481 vdu_index, 

3482 action_id, 

3483 nsr_id, 

3484 task_index, 

3485 target_vim, 

3486 extra_dict, 

3487 ): 

3488 self._assign_vim(target_vim) 

3489 target_record = "vnfrs:{}:vdur.{}.vim_info.{}".format( 

3490 vnf_id, vdu_index, target_vim 

3491 ) 

3492 target_record_id = "vnfrs:{}:vdur.{}".format(vnf_id, vdu_id) 

3493 deployment_info = { 

3494 "action_id": action_id, 

3495 "nsr_id": nsr_id, 

3496 "task_index": task_index, 

3497 } 

3498 

3499 task = Ns._create_task( 

3500 deployment_info=deployment_info, 

3501 target_id=target_vim, 

3502 item="update", 

3503 action="EXEC", 

3504 target_record=target_record, 

3505 target_record_id=target_record_id, 

3506 extra_dict=extra_dict, 

3507 ) 

3508 return task 

3509 

3510 def rebuild_start_stop( 

3511 self, session, action_dict, version, nsr_id, *args, **kwargs 

3512 ): 

3513 task_index = 0 

3514 extra_dict = {} 

3515 now = time() 

3516 action_id = action_dict.get("action_id", str(uuid4())) 

3517 step = "" 

3518 logging_text = "Task deploy nsr_id={} action_id={} ".format(nsr_id, action_id) 

3519 self.logger.debug(logging_text + "Enter") 

3520 

3521 action = list(action_dict.keys())[0] 

3522 task_dict = action_dict.get(action) 

3523 vim_vm_id = action_dict.get(action).get("vim_vm_id") 

3524 

3525 if action_dict.get("stop"): 

3526 action = "shutoff" 

3527 db_new_tasks = [] 

3528 try: 

3529 step = "lock the operation & do task creation" 

3530 with self.write_lock: 

3531 extra_dict["params"] = { 

3532 "vim_vm_id": vim_vm_id, 

3533 "action": action, 

3534 } 

3535 task = self.rebuild_start_stop_task( 

3536 task_dict["vdu_id"], 

3537 task_dict["vnf_id"], 

3538 task_dict["vdu_index"], 

3539 action_id, 

3540 nsr_id, 

3541 task_index, 

3542 task_dict["target_vim"], 

3543 extra_dict, 

3544 ) 

3545 db_new_tasks.append(task) 

3546 step = "upload Task to db" 

3547 self.upload_all_tasks( 

3548 db_new_tasks=db_new_tasks, 

3549 now=now, 

3550 ) 

3551 self.logger.debug( 

3552 logging_text + "Exit. Created {} tasks".format(len(db_new_tasks)) 

3553 ) 

3554 return ( 

3555 {"status": "ok", "nsr_id": nsr_id, "action_id": action_id}, 

3556 action_id, 

3557 True, 

3558 ) 

3559 except Exception as e: 

3560 if isinstance(e, (DbException, NsException)): 

3561 self.logger.error( 

3562 logging_text + "Exit Exception while '{}': {}".format(step, e) 

3563 ) 

3564 else: 

3565 e = traceback_format_exc() 

3566 self.logger.critical( 

3567 logging_text + "Exit Exception while '{}': {}".format(step, e), 

3568 exc_info=True, 

3569 ) 

3570 raise NsException(e) 

3571 

3572 def get_deploy(self, session, indata, version, nsr_id, action_id, *args, **kwargs): 

3573 nsrs = self.db.get_list("nsrs", {}) 

3574 return_data = [] 

3575 

3576 for ns in nsrs: 

3577 return_data.append({"_id": ns["_id"], "name": ns["name"]}) 

3578 

3579 return return_data, None, True 

3580 

3581 def get_actions(self, session, indata, version, nsr_id, action_id, *args, **kwargs): 

3582 ro_tasks = self.db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id}) 

3583 return_data = [] 

3584 

3585 for ro_task in ro_tasks: 

3586 for task in ro_task["tasks"]: 

3587 if task["action_id"] not in return_data: 

3588 return_data.append(task["action_id"]) 

3589 

3590 return return_data, None, True 

3591 

3592 def migrate_task( 

3593 self, vdu, vnf, vdu_index, action_id, nsr_id, task_index, extra_dict 

3594 ): 

3595 target_vim, vim_info = next(k_v for k_v in vdu["vim_info"].items()) 

3596 self._assign_vim(target_vim) 

3597 target_record = "vnfrs:{}:vdur.{}.vim_info.{}".format( 

3598 vnf["_id"], vdu_index, target_vim 

3599 ) 

3600 target_record_id = "vnfrs:{}:vdur.{}".format(vnf["_id"], vdu["id"]) 

3601 deployment_info = { 

3602 "action_id": action_id, 

3603 "nsr_id": nsr_id, 

3604 "task_index": task_index, 

3605 } 

3606 

3607 task = Ns._create_task( 

3608 deployment_info=deployment_info, 

3609 target_id=target_vim, 

3610 item="migrate", 

3611 action="EXEC", 

3612 target_record=target_record, 

3613 target_record_id=target_record_id, 

3614 extra_dict=extra_dict, 

3615 ) 

3616 

3617 return task 

3618 

3619 def migrate(self, session, indata, version, nsr_id, *args, **kwargs): 

3620 task_index = 0 

3621 extra_dict = {} 

3622 now = time() 

3623 action_id = indata.get("action_id", str(uuid4())) 

3624 step = "" 

3625 logging_text = "Task deploy nsr_id={} action_id={} ".format(nsr_id, action_id) 

3626 self.logger.debug(logging_text + "Enter") 

3627 try: 

3628 vnf_instance_id = indata["vnfInstanceId"] 

3629 step = "Getting vnfrs from db" 

3630 db_vnfr = self.db.get_one("vnfrs", {"_id": vnf_instance_id}) 

3631 vdu = indata.get("vdu") 

3632 migrateToHost = indata.get("migrateToHost") 

3633 db_new_tasks = [] 

3634 

3635 with self.write_lock: 

3636 if vdu is not None: 

3637 vdu_id = indata["vdu"]["vduId"] 

3638 vdu_count_index = indata["vdu"].get("vduCountIndex", 0) 

3639 for vdu_index, vdu in enumerate(db_vnfr["vdur"]): 

3640 if ( 

3641 vdu["vdu-id-ref"] == vdu_id 

3642 and vdu["count-index"] == vdu_count_index 

3643 ): 

3644 extra_dict["params"] = { 

3645 "vim_vm_id": vdu["vim-id"], 

3646 "migrate_host": migrateToHost, 

3647 "vdu_vim_info": vdu["vim_info"], 

3648 } 

3649 step = "Creating migration task for vdu:{}".format(vdu) 

3650 task = self.migrate_task( 

3651 vdu, 

3652 db_vnfr, 

3653 vdu_index, 

3654 action_id, 

3655 nsr_id, 

3656 task_index, 

3657 extra_dict, 

3658 ) 

3659 db_new_tasks.append(task) 

3660 task_index += 1 

3661 break 

3662 else: 

3663 for vdu_index, vdu in enumerate(db_vnfr["vdur"]): 

3664 extra_dict["params"] = { 

3665 "vim_vm_id": vdu["vim-id"], 

3666 "migrate_host": migrateToHost, 

3667 "vdu_vim_info": vdu["vim_info"], 

3668 } 

3669 step = "Creating migration task for vdu:{}".format(vdu) 

3670 task = self.migrate_task( 

3671 vdu, 

3672 db_vnfr, 

3673 vdu_index, 

3674 action_id, 

3675 nsr_id, 

3676 task_index, 

3677 extra_dict, 

3678 ) 

3679 db_new_tasks.append(task) 

3680 task_index += 1 

3681 

3682 self.upload_all_tasks( 

3683 db_new_tasks=db_new_tasks, 

3684 now=now, 

3685 ) 

3686 

3687 self.logger.debug( 

3688 logging_text + "Exit. Created {} tasks".format(len(db_new_tasks)) 

3689 ) 

3690 return ( 

3691 {"status": "ok", "nsr_id": nsr_id, "action_id": action_id}, 

3692 action_id, 

3693 True, 

3694 ) 

3695 except Exception as e: 

3696 if isinstance(e, (DbException, NsException)): 

3697 self.logger.error( 

3698 logging_text + "Exit Exception while '{}': {}".format(step, e) 

3699 ) 

3700 else: 

3701 e = traceback_format_exc() 

3702 self.logger.critical( 

3703 logging_text + "Exit Exception while '{}': {}".format(step, e), 

3704 exc_info=True, 

3705 ) 

3706 raise NsException(e) 

3707 

3708 def verticalscale_task( 

3709 self, vdu, vnf, vdu_index, action_id, nsr_id, task_index, extra_dict 

3710 ): 

3711 target_vim, vim_info = next(k_v for k_v in vdu["vim_info"].items()) 

3712 self._assign_vim(target_vim) 

3713 ns_preffix = "nsrs:{}".format(nsr_id) 

3714 flavor_text = ns_preffix + ":flavor." + vdu["ns-flavor-id"] 

3715 extra_dict["depends_on"] = [flavor_text] 

3716 extra_dict["params"].update({"flavor_id": "TASK-" + flavor_text}) 

3717 target_record = "vnfrs:{}:vdur.{}.vim_info.{}".format( 

3718 vnf["_id"], vdu_index, target_vim 

3719 ) 

3720 target_record_id = "vnfrs:{}:vdur.{}".format(vnf["_id"], vdu["id"]) 

3721 deployment_info = { 

3722 "action_id": action_id, 

3723 "nsr_id": nsr_id, 

3724 "task_index": task_index, 

3725 } 

3726 

3727 task = Ns._create_task( 

3728 deployment_info=deployment_info, 

3729 target_id=target_vim, 

3730 item="verticalscale", 

3731 action="EXEC", 

3732 target_record=target_record, 

3733 target_record_id=target_record_id, 

3734 extra_dict=extra_dict, 

3735 ) 

3736 return task 

3737 

3738 def verticalscale_flavor_task( 

3739 self, vdu, vnf, vdu_index, action_id, nsr_id, task_index, extra_dict 

3740 ): 

3741 target_vim, vim_info = next(k_v for k_v in vdu["vim_info"].items()) 

3742 self._assign_vim(target_vim) 

3743 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) 

3744 target_record = "nsrs:{}:flavor.{}.vim_info.{}".format( 

3745 nsr_id, len(db_nsr["flavor"]) - 1, target_vim 

3746 ) 

3747 target_record_id = "nsrs:{}:flavor.{}".format(nsr_id, len(db_nsr["flavor"]) - 1) 

3748 deployment_info = { 

3749 "action_id": action_id, 

3750 "nsr_id": nsr_id, 

3751 "task_index": task_index, 

3752 } 

3753 task = Ns._create_task( 

3754 deployment_info=deployment_info, 

3755 target_id=target_vim, 

3756 item="flavor", 

3757 action="CREATE", 

3758 target_record=target_record, 

3759 target_record_id=target_record_id, 

3760 extra_dict=extra_dict, 

3761 ) 

3762 return task 

3763 

3764 def verticalscale(self, session, indata, version, nsr_id, *args, **kwargs): 

3765 task_index = 0 

3766 extra_dict = {} 

3767 flavor_extra_dict = {} 

3768 now = time() 

3769 action_id = indata.get("action_id", str(uuid4())) 

3770 step = "" 

3771 logging_text = "Task deploy nsr_id={} action_id={} ".format(nsr_id, action_id) 

3772 self.logger.debug(logging_text + "Enter") 

3773 try: 

3774 VnfFlavorData = indata.get("changeVnfFlavorData") 

3775 vnf_instance_id = VnfFlavorData["vnfInstanceId"] 

3776 step = "Getting vnfrs from db" 

3777 db_vnfr = self.db.get_one("vnfrs", {"_id": vnf_instance_id}) 

3778 vduid = VnfFlavorData["additionalParams"]["vduid"] 

3779 vduCountIndex = VnfFlavorData["additionalParams"]["vduCountIndex"] 

3780 virtualMemory = VnfFlavorData["additionalParams"]["virtualMemory"] 

3781 numVirtualCpu = VnfFlavorData["additionalParams"]["numVirtualCpu"] 

3782 sizeOfStorage = VnfFlavorData["additionalParams"]["sizeOfStorage"] 

3783 flavor_dict = { 

3784 "name": vduid + "-flv", 

3785 "ram": virtualMemory, 

3786 "vcpus": numVirtualCpu, 

3787 "disk": sizeOfStorage, 

3788 } 

3789 flavor_data = { 

3790 "ram": virtualMemory, 

3791 "vcpus": numVirtualCpu, 

3792 "disk": sizeOfStorage, 

3793 } 

3794 flavor_extra_dict["find_params"] = {"flavor_data": flavor_data} 

3795 flavor_extra_dict["params"] = {"flavor_data": flavor_dict} 

3796 db_new_tasks = [] 

3797 step = "Creating Tasks for vertical scaling" 

3798 with self.write_lock: 

3799 for vdu_index, vdu in enumerate(db_vnfr["vdur"]): 

3800 if ( 

3801 vdu["vdu-id-ref"] == vduid 

3802 and vdu["count-index"] == vduCountIndex 

3803 ): 

3804 extra_dict["params"] = { 

3805 "vim_vm_id": vdu["vim-id"], 

3806 "flavor_dict": flavor_dict, 

3807 "vdu-id-ref": vdu["vdu-id-ref"], 

3808 "count-index": vdu["count-index"], 

3809 "vnf_instance_id": vnf_instance_id, 

3810 } 

3811 task = self.verticalscale_flavor_task( 

3812 vdu, 

3813 db_vnfr, 

3814 vdu_index, 

3815 action_id, 

3816 nsr_id, 

3817 task_index, 

3818 flavor_extra_dict, 

3819 ) 

3820 db_new_tasks.append(task) 

3821 task_index += 1 

3822 task = self.verticalscale_task( 

3823 vdu, 

3824 db_vnfr, 

3825 vdu_index, 

3826 action_id, 

3827 nsr_id, 

3828 task_index, 

3829 extra_dict, 

3830 ) 

3831 db_new_tasks.append(task) 

3832 task_index += 1 

3833 break 

3834 self.upload_all_tasks( 

3835 db_new_tasks=db_new_tasks, 

3836 now=now, 

3837 ) 

3838 self.logger.debug( 

3839 logging_text + "Exit. Created {} tasks".format(len(db_new_tasks)) 

3840 ) 

3841 return ( 

3842 {"status": "ok", "nsr_id": nsr_id, "action_id": action_id}, 

3843 action_id, 

3844 True, 

3845 ) 

3846 except Exception as e: 

3847 if isinstance(e, (DbException, NsException)): 

3848 self.logger.error( 

3849 logging_text + "Exit Exception while '{}': {}".format(step, e) 

3850 ) 

3851 else: 

3852 e = traceback_format_exc() 

3853 self.logger.critical( 

3854 logging_text + "Exit Exception while '{}': {}".format(step, e), 

3855 exc_info=True, 

3856 ) 

3857 raise NsException(e)