- if vifquota:
- extended["vif-quota"] = vifquota
-
- if numa:
- extended["numas"] = [numa]
-
- if extended:
- flavor_data["extended"] = extended
-
- extra_dict = {"find_params": {"flavor_data": flavor_data}}
- flavor_data_name = flavor_data.copy()
- flavor_data_name["name"] = target_flavor["name"]
- extra_dict["params"] = {"flavor_data": flavor_data_name}
-
- return extra_dict
-
- def _ip_profile_2_ro(ip_profile):
- if not ip_profile:
- return None
-
- ro_ip_profile = {
- "ip_version": "IPv4"
- if "v4" in ip_profile.get("ip-version", "ipv4")
- else "IPv6",
- "subnet_address": ip_profile.get("subnet-address"),
- "gateway_address": ip_profile.get("gateway-address"),
- "dhcp_enabled": ip_profile.get("dhcp-params", {}).get(
- "enabled", False
- ),
- "dhcp_start_address": ip_profile.get("dhcp-params", {}).get(
- "start-address", None
- ),
- "dhcp_count": ip_profile.get("dhcp-params", {}).get(
- "count", None
- ),
- }
+ return numa, epa_vcpu_set
+
+ @staticmethod
+ def _process_guest_epa_cpu_pinning_params(
+ guest_epa_quota: Dict[str, Any],
+ vcpu_count: int,
+ epa_vcpu_set: bool,
+ ) -> Tuple[Dict[str, Any], bool]:
+ """[summary]
+
+ Args:
+ guest_epa_quota (Dict[str, Any]): [description]
+ vcpu_count (int): [description]
+ epa_vcpu_set (bool): [description]
+
+ Returns:
+ Tuple[Dict[str, Any], bool]: [description]
+ """
+ numa = {}
+ local_epa_vcpu_set = epa_vcpu_set
+
+ if (
+ guest_epa_quota.get("cpu-pinning-policy") == "DEDICATED"
+ and not epa_vcpu_set
+ ):
+ numa[
+ "cores"
+ if guest_epa_quota.get("cpu-thread-pinning-policy") != "PREFER"
+ else "threads"
+ ] = max(vcpu_count, 1)
+ local_epa_vcpu_set = True
+
+ return numa, local_epa_vcpu_set
+
+ @staticmethod
+ def _process_epa_params(
+ target_flavor: Dict[str, Any],
+ ) -> Dict[str, Any]:
+ """[summary]
+
+ Args:
+ target_flavor (Dict[str, Any]): [description]
+
+ Returns:
+ Dict[str, Any]: [description]
+ """
+ extended = {}
+ numa = {}
+
+ if target_flavor.get("guest-epa"):
+ guest_epa = target_flavor["guest-epa"]
+
+ numa, epa_vcpu_set = Ns._process_guest_epa_numa_params(
+ guest_epa_quota=guest_epa
+ )
+
+ if guest_epa.get("mempage-size"):
+ extended["mempage-size"] = guest_epa.get("mempage-size")
+
+ tmp_numa, epa_vcpu_set = Ns._process_guest_epa_cpu_pinning_params(
+ guest_epa_quota=guest_epa,
+ vcpu_count=int(target_flavor.get("vcpu-count", 1)),
+ epa_vcpu_set=epa_vcpu_set,
+ )
+ numa.update(tmp_numa)
+
+ extended.update(
+ Ns._process_guest_epa_quota_params(
+ guest_epa_quota=guest_epa,
+ epa_vcpu_set=epa_vcpu_set,
+ )
+ )
+
+ if numa:
+ extended["numas"] = [numa]
+
+ return extended
+
+ @staticmethod
+ def _process_flavor_params(
+ target_flavor: Dict[str, Any],
+ indata: Dict[str, Any],
+ vim_info: Dict[str, Any],
+ target_record_id: str,
+ **kwargs: Dict[str, Any],
+ ) -> Dict[str, Any]:
+ """[summary]
+
+ Args:
+ target_flavor (Dict[str, Any]): [description]
+ indata (Dict[str, Any]): [description]
+ vim_info (Dict[str, Any]): [description]
+ target_record_id (str): [description]
+
+ Returns:
+ Dict[str, Any]: [description]
+ """
+ flavor_data = {
+ "disk": int(target_flavor["storage-gb"]),
+ "ram": int(target_flavor["memory-mb"]),
+ "vcpus": int(target_flavor["vcpu-count"]),
+ }
+
+ target_vdur = {}
+ for vnf in indata.get("vnf", []):
+ for vdur in vnf.get("vdur", []):
+ if vdur.get("ns-flavor-id") == target_flavor["id"]:
+ target_vdur = vdur
+
+ for storage in target_vdur.get("virtual-storages", []):
+ if (
+ storage.get("type-of-storage")
+ == "etsi-nfv-descriptors:ephemeral-storage"
+ ):
+ flavor_data["ephemeral"] = int(storage.get("size-of-storage", 0))
+ elif storage.get("type-of-storage") == "etsi-nfv-descriptors:swap-storage":
+ flavor_data["swap"] = int(storage.get("size-of-storage", 0))
+
+ extended = Ns._process_epa_params(target_flavor)
+ if extended:
+ flavor_data["extended"] = extended
+
+ extra_dict = {"find_params": {"flavor_data": flavor_data}}
+ flavor_data_name = flavor_data.copy()
+ flavor_data_name["name"] = target_flavor["name"]
+ extra_dict["params"] = {"flavor_data": flavor_data_name}
+
+ return extra_dict
+
+ @staticmethod
+ def _ip_profile_to_ro(
+ ip_profile: Dict[str, Any],
+ ) -> Dict[str, Any]:
+ """[summary]
+
+ Args:
+ ip_profile (Dict[str, Any]): [description]
+
+ Returns:
+ Dict[str, Any]: [description]
+ """
+ if not ip_profile:
+ return None
+
+ ro_ip_profile = {
+ "ip_version": "IPv4"
+ if "v4" in ip_profile.get("ip-version", "ipv4")
+ else "IPv6",
+ "subnet_address": ip_profile.get("subnet-address"),
+ "gateway_address": ip_profile.get("gateway-address"),
+ "dhcp_enabled": ip_profile.get("dhcp-params", {}).get("enabled", False),
+ "dhcp_start_address": ip_profile.get("dhcp-params", {}).get(
+ "start-address", None
+ ),
+ "dhcp_count": ip_profile.get("dhcp-params", {}).get("count", None),
+ }