+ @staticmethod
+ def _process_flavor_params(
+ target_flavor: Dict[str, Any],
+ indata: Dict[str, Any],
+ vim_info: Dict[str, Any],
+ target_record_id: str,
+ ) -> Dict[str, Any]:
+ """[summary]
+
+ Args:
+ target_flavor (Dict[str, Any]): [description]
+ indata (Dict[str, Any]): [description]
+ vim_info (Dict[str, Any]): [description]
+ target_record_id (str): [description]
+
+ Returns:
+ Dict[str, Any]: [description]
+ """
+ flavor_data = {
+ "disk": int(target_flavor["storage-gb"]),
+ "ram": int(target_flavor["memory-mb"]),
+ "vcpus": int(target_flavor["vcpu-count"]),
+ }
+
+ target_vdur = {}
+ for vnf in indata.get("vnf", []):
+ for vdur in vnf.get("vdur", []):
+ if vdur.get("ns-flavor-id") == target_flavor["id"]:
+ target_vdur = vdur
+
+ for storage in target_vdur.get("virtual-storages", []):
+ if (
+ storage.get("type-of-storage")
+ == "etsi-nfv-descriptors:ephemeral-storage"
+ ):
+ flavor_data["ephemeral"] = int(storage.get("size-of-storage", 0))
+ elif storage.get("type-of-storage") == "etsi-nfv-descriptors:swap-storage":
+ flavor_data["swap"] = int(storage.get("size-of-storage", 0))
+
+ extended = Ns._process_epa_params(target_flavor)
+ if extended:
+ flavor_data["extended"] = extended
+
+ extra_dict = {"find_params": {"flavor_data": flavor_data}}
+ flavor_data_name = flavor_data.copy()
+ flavor_data_name["name"] = target_flavor["name"]
+ extra_dict["params"] = {"flavor_data": flavor_data_name}
+
+ return extra_dict
+
+ @staticmethod
+ def _ip_profile_to_ro(
+ ip_profile: Dict[str, Any],
+ ) -> Dict[str, Any]:
+ """[summary]
+
+ Args:
+ ip_profile (Dict[str, Any]): [description]
+
+ Returns:
+ Dict[str, Any]: [description]
+ """
+ if not ip_profile:
+ return None
+
+ ro_ip_profile = {
+ "ip_version": "IPv4"
+ if "v4" in ip_profile.get("ip-version", "ipv4")
+ else "IPv6",
+ "subnet_address": ip_profile.get("subnet-address"),
+ "gateway_address": ip_profile.get("gateway-address"),
+ "dhcp_enabled": ip_profile.get("dhcp-params", {}).get("enabled", False),
+ "dhcp_start_address": ip_profile.get("dhcp-params", {}).get(
+ "start-address", None
+ ),
+ "dhcp_count": ip_profile.get("dhcp-params", {}).get("count", None),
+ }
+
+ if ip_profile.get("dns-server"):
+ ro_ip_profile["dns_address"] = ";".join(
+ [v["address"] for v in ip_profile["dns-server"] if v.get("address")]
+ )
+
+ if ip_profile.get("security-group"):
+ ro_ip_profile["security_group"] = ip_profile["security-group"]
+
+ return ro_ip_profile
+
+ @staticmethod
+ def _process_net_params(
+ target_vld: Dict[str, Any],
+ indata: Dict[str, Any],
+ vim_info: Dict[str, Any],
+ target_record_id: str,
+ ) -> Dict[str, Any]:
+ """Function to process network parameters.
+
+ Args:
+ target_vld (Dict[str, Any]): [description]
+ indata (Dict[str, Any]): [description]
+ vim_info (Dict[str, Any]): [description]
+ target_record_id (str): [description]
+
+ Returns:
+ Dict[str, Any]: [description]
+ """
+ extra_dict = {}
+
+ if vim_info.get("sdn"):
+ # vnf_preffix = "vnfrs:{}".format(vnfr_id)
+ # ns_preffix = "nsrs:{}".format(nsr_id)
+ # remove the ending ".sdn
+ vld_target_record_id, _, _ = target_record_id.rpartition(".")
+ extra_dict["params"] = {
+ k: vim_info[k]
+ for k in ("sdn-ports", "target_vim", "vlds", "type")
+ if vim_info.get(k)
+ }
+
+ # TODO needed to add target_id in the dependency.
+ if vim_info.get("target_vim"):
+ extra_dict["depends_on"] = [
+ f"{vim_info.get('target_vim')} {vld_target_record_id}"
+ ]
+
+ return extra_dict
+
+ if vim_info.get("vim_network_name"):
+ extra_dict["find_params"] = {
+ "filter_dict": {
+ "name": vim_info.get("vim_network_name"),
+ },
+ }
+ elif vim_info.get("vim_network_id"):
+ extra_dict["find_params"] = {
+ "filter_dict": {
+ "id": vim_info.get("vim_network_id"),
+ },
+ }
+ elif target_vld.get("mgmt-network"):
+ extra_dict["find_params"] = {
+ "mgmt": True,
+ "name": target_vld["id"],
+ }
+ else:
+ # create
+ extra_dict["params"] = {
+ "net_name": (
+ f"{indata.get('name')[:16]}-{target_vld.get('name', target_vld.get('id'))[:16]}"
+ ),
+ "ip_profile": Ns._ip_profile_to_ro(vim_info.get("ip_profile")),
+ "provider_network_profile": vim_info.get("provider_network"),
+ }
+
+ if not target_vld.get("underlay"):
+ extra_dict["params"]["net_type"] = "bridge"
+ else:
+ extra_dict["params"]["net_type"] = (
+ "ptp" if target_vld.get("type") == "ELINE" else "data"
+ )
+
+ return extra_dict
+
+ def deploy(self, session, indata, version, nsr_id, *args, **kwargs):
+ self.logger.debug("ns.deploy nsr_id={} indata={}".format(nsr_id, indata))
+ validate_input(indata, deploy_schema)
+ action_id = indata.get("action_id", str(uuid4()))
+ task_index = 0
+ # get current deployment
+ db_nsr_update = {} # update operation on nsrs
+ db_vnfrs_update = {}
+ db_vnfrs = {} # vnf's info indexed by _id
+ nb_ro_tasks = 0 # for logging
+ vdu2cloud_init = indata.get("cloud_init_content") or {}
+ step = ""
+ logging_text = "Task deploy nsr_id={} action_id={} ".format(nsr_id, action_id)
+ self.logger.debug(logging_text + "Enter")
+
+ try:
+ step = "Getting ns and vnfr record from db"
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ db_new_tasks = []
+ tasks_by_target_record_id = {}
+ # read from db: vnf's of this ns
+ step = "Getting vnfrs from db"
+ db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
+
+ if not db_vnfrs_list:
+ raise NsException("Cannot obtain associated VNF for ns")
+
+ for vnfr in db_vnfrs_list:
+ db_vnfrs[vnfr["_id"]] = vnfr
+ db_vnfrs_update[vnfr["_id"]] = {}
+
+ now = time()
+ db_ro_nsr = self.db.get_one("ro_nsrs", {"_id": nsr_id}, fail_on_empty=False)
+
+ if not db_ro_nsr:
+ db_ro_nsr = self._create_db_ro_nsrs(nsr_id, now)
+
+ ro_nsr_public_key = db_ro_nsr["public_key"]
+
+ # check that action_id is not in the list of actions. Suffixed with :index
+ if action_id in db_ro_nsr["actions"]:
+ index = 1
+
+ while True:
+ new_action_id = "{}:{}".format(action_id, index)
+
+ if new_action_id not in db_ro_nsr["actions"]:
+ action_id = new_action_id
+ self.logger.debug(
+ logging_text
+ + "Changing action_id in use to {}".format(action_id)