+ member_vnf_index = vnf_profile.get("id")
+ self._add_flavor_to_nsr(vdu, vnfd, nsr_descriptor, member_vnf_index)
+ sw_image_id = vdu.get("sw-image-desc")
+ if sw_image_id:
+ image_data = self._get_image_data_from_vnfd(vnfd, sw_image_id)
+ self._add_image_to_nsr(nsr_descriptor, image_data)
+
+ # also add alternative images to the list of images
+ for alt_image in vdu.get("alternative-sw-image-desc", ()):
+ image_data = self._get_image_data_from_vnfd(vnfd, alt_image)
+ self._add_image_to_nsr(nsr_descriptor, image_data)
+
+ # Add Affinity or Anti-affinity group information to NSR
+ vdu_profiles = vnfd.get("df", [[]])[0].get("vdu-profile", ())
+ affinity_group_prefix_name = "{}-{}".format(
+ nsr_descriptor["name"][:16], vnf_profile.get("id")[:16]
+ )
+
+ for vdu_profile in vdu_profiles:
+ affinity_group_data = {}
+ for affinity_group in vdu_profile.get(
+ "affinity-or-anti-affinity-group", ()
+ ):
+ affinity_group_data = (
+ self._get_affinity_or_anti_affinity_group_data_from_vnfd(
+ vnfd, affinity_group["id"]
+ )
+ )
+ affinity_group_data["member-vnf-index"] = vnf_profile.get("id")
+ self._add_affinity_or_anti_affinity_group_to_nsr(
+ nsr_descriptor,
+ affinity_group_data,
+ affinity_group_prefix_name,
+ )
+
+ for vld in nsr_vld:
+ vld["vnfd-connection-point-ref"] = all_vld_connection_point_data.get(
+ vld.get("id"), []
+ )
+ vld["name"] = vld["id"]
+ nsr_descriptor["vld"] = nsr_vld
+
+ return nsr_descriptor
+
+ def _get_affinity_or_anti_affinity_group_data_from_vnfd(
+ self, vnfd, affinity_group_id
+ ):
+ """
+ Gets affinity-or-anti-affinity-group info from df and returns the desired affinity group
+ """
+ affinity_group = utils.find_in_list(
+ vnfd.get("df", [[]])[0].get("affinity-or-anti-affinity-group", ()),
+ lambda ag: ag["id"] == affinity_group_id,
+ )
+ affinity_group_data = {}
+ if affinity_group:
+ if affinity_group.get("id"):
+ affinity_group_data["ag-id"] = affinity_group["id"]
+ if affinity_group.get("type"):
+ affinity_group_data["type"] = affinity_group["type"]
+ if affinity_group.get("scope"):
+ affinity_group_data["scope"] = affinity_group["scope"]
+ return affinity_group_data
+
+ def _add_affinity_or_anti_affinity_group_to_nsr(
+ self, nsr_descriptor, affinity_group_data, affinity_group_prefix_name
+ ):
+ """
+ Adds affinity-or-anti-affinity-group to nsr checking first it is not already added
+ """
+ affinity_group = next(
+ (
+ f
+ for f in nsr_descriptor["affinity-or-anti-affinity-group"]
+ if all(f.get(k) == affinity_group_data[k] for k in affinity_group_data)
+ ),
+ None,
+ )
+ if not affinity_group:
+ affinity_group_data["id"] = str(
+ len(nsr_descriptor["affinity-or-anti-affinity-group"])
+ )
+ affinity_group_data["name"] = "{}-{}".format(
+ affinity_group_prefix_name, affinity_group_data["ag-id"][:32]
+ )
+ nsr_descriptor["affinity-or-anti-affinity-group"].append(
+ affinity_group_data
+ )
+
+ def _get_image_data_from_vnfd(self, vnfd, sw_image_id):
+ sw_image_desc = utils.find_in_list(
+ vnfd.get("sw-image-desc", ()), lambda sw: sw["id"] == sw_image_id
+ )
+ image_data = {}
+ if sw_image_desc.get("image"):
+ image_data["image"] = sw_image_desc["image"]
+ if sw_image_desc.get("checksum"):
+ image_data["image_checksum"] = sw_image_desc["checksum"]["hash"]
+ if sw_image_desc.get("vim-type"):
+ image_data["vim-type"] = sw_image_desc["vim-type"]
+ return image_data
+
+ def _add_image_to_nsr(self, nsr_descriptor, image_data):
+ """
+ Adds image to nsr checking first it is not already added
+ """
+ img = next(
+ (
+ f
+ for f in nsr_descriptor["image"]
+ if all(f.get(k) == image_data[k] for k in image_data)
+ ),
+ None,
+ )
+ if not img:
+ image_data["id"] = str(len(nsr_descriptor["image"]))
+ nsr_descriptor["image"].append(image_data)
+
+ def _create_vnfr_descriptor_from_vnfd(
+ self,
+ nsd,
+ vnfd,
+ vnfd_id,
+ vnf_index,
+ nsr_descriptor,
+ ns_request,
+ ns_k8s_namespace,
+ revision=None,
+ ):
+ vnfr_id = str(uuid4())
+ nsr_id = nsr_descriptor["id"]
+ now = time()
+ additional_params, vnf_params = self._format_additional_params(
+ ns_request, vnf_index, descriptor=vnfd
+ )
+
+ vnfr_descriptor = {
+ "id": vnfr_id,
+ "_id": vnfr_id,
+ "nsr-id-ref": nsr_id,
+ "member-vnf-index-ref": vnf_index,
+ "additionalParamsForVnf": additional_params,
+ "created-time": now,
+ # "vnfd": vnfd, # at OSM model.but removed to avoid data duplication TODO: revise
+ "vnfd-ref": vnfd_id,
+ "vnfd-id": vnfd["_id"], # not at OSM model, but useful
+ "vim-account-id": None,
+ "vca-id": None,
+ "vdur": [],
+ "connection-point": [],
+ "ip-address": None, # mgmt-interface filled by LCM
+ }
+
+ # Revision backwards compatility. Only specify the revision in the record if
+ # the original VNFD has a revision.
+ if "revision" in vnfd:
+ vnfr_descriptor["revision"] = vnfd["revision"]
+
+ vnf_k8s_namespace = ns_k8s_namespace
+ if vnf_params:
+ if vnf_params.get("k8s-namespace"):
+ vnf_k8s_namespace = vnf_params["k8s-namespace"]
+ if vnf_params.get("config-units"):
+ vnfr_descriptor["config-units"] = vnf_params["config-units"]
+
+ # Create vld
+ if vnfd.get("int-virtual-link-desc"):
+ vnfr_descriptor["vld"] = []
+ for vnfd_vld in vnfd.get("int-virtual-link-desc"):
+ vnfr_descriptor["vld"].append({key: vnfd_vld[key] for key in vnfd_vld})
+
+ for cp in vnfd.get("ext-cpd", ()):
+ vnf_cp = {
+ "name": cp.get("id"),
+ "connection-point-id": cp.get("int-cpd", {}).get("cpd"),
+ "connection-point-vdu-id": cp.get("int-cpd", {}).get("vdu-id"),
+ "id": cp.get("id"),
+ # "ip-address", "mac-address" # filled by LCM
+ # vim-id # TODO it would be nice having a vim port id
+ }
+ vnfr_descriptor["connection-point"].append(vnf_cp)
+
+ # Create k8s-cluster information
+ # TODO: Validate if a k8s-cluster net can have more than one ext-cpd ?
+ if vnfd.get("k8s-cluster"):
+ vnfr_descriptor["k8s-cluster"] = vnfd["k8s-cluster"]
+ all_k8s_cluster_nets_cpds = {}
+ for cpd in get_iterable(vnfd.get("ext-cpd")):
+ if cpd.get("k8s-cluster-net"):
+ all_k8s_cluster_nets_cpds[cpd.get("k8s-cluster-net")] = cpd.get(
+ "id"
+ )
+ for net in get_iterable(vnfr_descriptor["k8s-cluster"].get("nets")):
+ if net.get("id") in all_k8s_cluster_nets_cpds:
+ net["external-connection-point-ref"] = all_k8s_cluster_nets_cpds[
+ net.get("id")
+ ]
+
+ # update kdus
+ for kdu in get_iterable(vnfd.get("kdu")):
+ additional_params, kdu_params = self._format_additional_params(
+ ns_request, vnf_index, kdu_name=kdu["name"], descriptor=vnfd
+ )
+ kdu_k8s_namespace = vnf_k8s_namespace
+ kdu_model = kdu_params.get("kdu_model") if kdu_params else None
+ if kdu_params and kdu_params.get("k8s-namespace"):
+ kdu_k8s_namespace = kdu_params["k8s-namespace"]
+
+ kdu_deployment_name = ""
+ if kdu_params and kdu_params.get("kdu-deployment-name"):
+ kdu_deployment_name = kdu_params.get("kdu-deployment-name")
+
+ kdur = {
+ "additionalParams": additional_params,
+ "k8s-namespace": kdu_k8s_namespace,
+ "kdu-deployment-name": kdu_deployment_name,
+ "kdu-name": kdu["name"],
+ # TODO "name": "" Name of the VDU in the VIM
+ "ip-address": None, # mgmt-interface filled by LCM
+ "k8s-cluster": {},
+ }
+ if kdu_params and kdu_params.get("config-units"):
+ kdur["config-units"] = kdu_params["config-units"]
+ if kdu.get("helm-version"):
+ kdur["helm-version"] = kdu["helm-version"]
+ for k8s_type in ("helm-chart", "juju-bundle"):
+ if kdu.get(k8s_type):
+ kdur[k8s_type] = kdu_model or kdu[k8s_type]
+ if not vnfr_descriptor.get("kdur"):
+ vnfr_descriptor["kdur"] = []
+ vnfr_descriptor["kdur"].append(kdur)
+
+ vnfd_mgmt_cp = vnfd.get("mgmt-cp")
+
+ for vdu in vnfd.get("vdu", ()):
+ vdu_mgmt_cp = []
+ try:
+ configs = vnfd.get("df")[0]["lcm-operations-configuration"][
+ "operate-vnf-op-config"
+ ]["day1-2"]
+ vdu_config = utils.find_in_list(
+ configs, lambda config: config["id"] == vdu["id"]
+ )
+ except Exception:
+ vdu_config = None
+
+ try:
+ vdu_instantiation_level = utils.find_in_list(
+ vnfd.get("df")[0]["instantiation-level"][0]["vdu-level"],
+ lambda a_vdu_profile: a_vdu_profile["vdu-id"] == vdu["id"],
+ )
+ except Exception:
+ vdu_instantiation_level = None
+
+ if vdu_config:
+ external_connection_ee = utils.filter_in_list(
+ vdu_config.get("execution-environment-list", []),
+ lambda ee: "external-connection-point-ref" in ee,
+ )
+ for ee in external_connection_ee:
+ vdu_mgmt_cp.append(ee["external-connection-point-ref"])
+
+ additional_params, vdu_params = self._format_additional_params(
+ ns_request, vnf_index, vdu_id=vdu["id"], descriptor=vnfd
+ )
+
+ try:
+ vdu_virtual_storage_descriptors = utils.filter_in_list(
+ vnfd.get("virtual-storage-desc", []),
+ lambda stg_desc: stg_desc["id"] in vdu["virtual-storage-desc"],
+ )
+ except Exception:
+ vdu_virtual_storage_descriptors = []
+ vdur = {
+ "vdu-id-ref": vdu["id"],
+ # TODO "name": "" Name of the VDU in the VIM
+ "ip-address": None, # mgmt-interface filled by LCM
+ # "vim-id", "flavor-id", "image-id", "management-ip" # filled by LCM
+ "internal-connection-point": [],
+ "interfaces": [],
+ "additionalParams": additional_params,
+ "vdu-name": vdu["name"],
+ "virtual-storages": vdu_virtual_storage_descriptors,
+ }
+ if vdu_params and vdu_params.get("config-units"):
+ vdur["config-units"] = vdu_params["config-units"]
+ if deep_get(vdu, ("supplemental-boot-data", "boot-data-drive")):
+ vdur["boot-data-drive"] = vdu["supplemental-boot-data"][
+ "boot-data-drive"
+ ]
+ if vdu.get("pdu-type"):
+ vdur["pdu-type"] = vdu["pdu-type"]
+ vdur["name"] = vdu["pdu-type"]
+ # TODO volumes: name, volume-id
+ for icp in vdu.get("int-cpd", ()):
+ vdu_icp = {
+ "id": icp["id"],
+ "connection-point-id": icp["id"],
+ "name": icp.get("id"),
+ }
+
+ vdur["internal-connection-point"].append(vdu_icp)
+
+ for iface in icp.get("virtual-network-interface-requirement", ()):
+ # Name, mac-address and interface position is taken from VNFD
+ # and included into VNFR. By this way RO can process this information
+ # while creating the VDU.
+ iface_fields = ("name", "mac-address", "position", "ip-address")
+ vdu_iface = {
+ x: iface[x] for x in iface_fields if iface.get(x) is not None