+ def _update_ee_relation_data_with_implicit_data(
+ self, nsr_id, nsd, ee_relation_data, cached_vnfds, vnf_profile_id: str = None
+ ):
+ ee_relation_data = safe_get_ee_relation(
+ nsr_id, ee_relation_data, vnf_profile_id=vnf_profile_id
+ )
+ ee_relation_level = EELevel.get_level(ee_relation_data)
+ if (ee_relation_level in (EELevel.VNF, EELevel.VDU)) and not ee_relation_data[
+ "execution-environment-ref"
+ ]:
+ vnf_profile = get_vnf_profile(nsd, ee_relation_data["vnf-profile-id"])
+ vnfd_id = vnf_profile["vnfd-id"]
+ db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
+ entity_id = (
+ vnfd_id
+ if ee_relation_level == EELevel.VNF
+ else ee_relation_data["vdu-profile-id"]
+ )
+ ee = get_juju_ee_ref(db_vnfd, entity_id)
+ if not ee:
+ raise Exception(
+ f"not execution environments found for ee_relation {ee_relation_data}"
+ )
+ ee_relation_data["execution-environment-ref"] = ee["id"]
+ return ee_relation_data
+
+ def _get_ns_relations(
+ self,
+ nsr_id: str,
+ nsd: Dict[str, Any],
+ vca: DeployedVCA,
+ cached_vnfds: Dict[str, Any],
+ ):
+ relations = []
+ db_ns_relations = get_ns_configuration_relation_list(nsd)
+ for r in db_ns_relations:
+ relation_provider = self._update_ee_relation_data_with_implicit_data(
+ nsr_id, nsd, r["provider"], cached_vnfds
+ )
+ relation_requirer = self._update_ee_relation_data_with_implicit_data(
+ nsr_id, nsd, r["requirer"], cached_vnfds
+ )
+ provider = EERelation(relation_provider)
+ requirer = EERelation(relation_requirer)
+ relation = Relation(r["name"], provider, requirer)
+ vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
+ if vca_in_relation:
+ relations.append(relation)
+ return relations
+
+ def _get_vnf_relations(
+ self,
+ nsr_id: str,
+ nsd: Dict[str, Any],
+ vca: DeployedVCA,
+ cached_vnfds: Dict[str, Any],
+ ):
+ relations = []
+ vnf_profile = get_vnf_profile(nsd, vca.vnf_profile_id)
+ vnf_profile_id = vnf_profile["id"]
+ vnfd_id = vnf_profile["vnfd-id"]
+ db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
+ db_vnf_relations = get_relation_list(db_vnfd, vnfd_id)
+ for r in db_vnf_relations:
+ relation_provider = self._update_ee_relation_data_with_implicit_data(
+ nsr_id, nsd, r["provider"], cached_vnfds, vnf_profile_id=vnf_profile_id
+ )
+ relation_requirer = self._update_ee_relation_data_with_implicit_data(
+ nsr_id, nsd, r["requirer"], cached_vnfds, vnf_profile_id=vnf_profile_id
+ )
+ provider = EERelation(relation_provider)
+ requirer = EERelation(relation_requirer)
+ relation = Relation(r["name"], provider, requirer)
+ vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
+ if vca_in_relation:
+ relations.append(relation)
+ return relations
+
+ def _get_kdu_resource_data(
+ self,
+ ee_relation: EERelation,
+ db_nsr: Dict[str, Any],
+ cached_vnfds: Dict[str, Any],
+ ) -> DeployedK8sResource:
+ nsd = get_nsd(db_nsr)
+ vnf_profiles = get_vnf_profiles(nsd)
+ vnfd_id = find_in_list(
+ vnf_profiles,
+ lambda vnf_profile: vnf_profile["id"] == ee_relation.vnf_profile_id,
+ )["vnfd-id"]
+ db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
+ kdu_resource_profile = get_kdu_resource_profile(
+ db_vnfd, ee_relation.kdu_resource_profile_id
+ )
+ kdu_name = kdu_resource_profile["kdu-name"]
+ deployed_kdu, _ = get_deployed_kdu(
+ db_nsr.get("_admin", ()).get("deployed", ()),
+ kdu_name,
+ ee_relation.vnf_profile_id,
+ )
+ deployed_kdu.update({"resource-name": kdu_resource_profile["resource-name"]})
+ return deployed_kdu
+
+ def _get_deployed_component(
+ self,
+ ee_relation: EERelation,
+ db_nsr: Dict[str, Any],
+ cached_vnfds: Dict[str, Any],
+ ) -> DeployedComponent:
+ nsr_id = db_nsr["_id"]
+ deployed_component = None
+ ee_level = EELevel.get_level(ee_relation)
+ if ee_level == EELevel.NS:
+ vca = get_deployed_vca(db_nsr, {"vdu_id": None, "member-vnf-index": None})
+ if vca:
+ deployed_component = DeployedVCA(nsr_id, vca)
+ elif ee_level == EELevel.VNF:
+ vca = get_deployed_vca(
+ db_nsr,
+ {
+ "vdu_id": None,
+ "member-vnf-index": ee_relation.vnf_profile_id,
+ "ee_descriptor_id": ee_relation.execution_environment_ref,
+ },
+ )
+ if vca:
+ deployed_component = DeployedVCA(nsr_id, vca)
+ elif ee_level == EELevel.VDU:
+ vca = get_deployed_vca(
+ db_nsr,
+ {
+ "vdu_id": ee_relation.vdu_profile_id,
+ "member-vnf-index": ee_relation.vnf_profile_id,
+ "ee_descriptor_id": ee_relation.execution_environment_ref,
+ },
+ )
+ if vca:
+ deployed_component = DeployedVCA(nsr_id, vca)
+ elif ee_level == EELevel.KDU:
+ kdu_resource_data = self._get_kdu_resource_data(
+ ee_relation, db_nsr, cached_vnfds
+ )
+ if kdu_resource_data:
+ deployed_component = DeployedK8sResource(kdu_resource_data)
+ return deployed_component
+
+ async def _add_relation(
+ self,
+ relation: Relation,
+ vca_type: str,
+ db_nsr: Dict[str, Any],
+ cached_vnfds: Dict[str, Any],
+ cached_vnfrs: Dict[str, Any],
+ ) -> bool:
+ deployed_provider = self._get_deployed_component(
+ relation.provider, db_nsr, cached_vnfds
+ )
+ deployed_requirer = self._get_deployed_component(
+ relation.requirer, db_nsr, cached_vnfds
+ )
+ if (
+ deployed_provider
+ and deployed_requirer
+ and deployed_provider.config_sw_installed
+ and deployed_requirer.config_sw_installed
+ ):
+ provider_db_vnfr = (
+ self._get_vnfr(
+ relation.provider.nsr_id,
+ relation.provider.vnf_profile_id,
+ cached_vnfrs,
+ )
+ if relation.provider.vnf_profile_id
+ else None
+ )
+ requirer_db_vnfr = (
+ self._get_vnfr(
+ relation.requirer.nsr_id,
+ relation.requirer.vnf_profile_id,
+ cached_vnfrs,
+ )
+ if relation.requirer.vnf_profile_id
+ else None
+ )
+ provider_vca_id = self.get_vca_id(provider_db_vnfr, db_nsr)
+ requirer_vca_id = self.get_vca_id(requirer_db_vnfr, db_nsr)
+ provider_relation_endpoint = RelationEndpoint(
+ deployed_provider.ee_id,
+ provider_vca_id,
+ relation.provider.endpoint,
+ )
+ requirer_relation_endpoint = RelationEndpoint(
+ deployed_requirer.ee_id,
+ requirer_vca_id,
+ relation.requirer.endpoint,
+ )
+ await self.vca_map[vca_type].add_relation(
+ provider=provider_relation_endpoint,
+ requirer=requirer_relation_endpoint,
+ )
+ # remove entry from relations list
+ return True
+ return False
+
+ async def _add_vca_relations(
+ self,
+ logging_text,
+ nsr_id,
+ vca_type: str,
+ vca_index: int,
+ timeout: int = 3600,
+ ) -> bool:
+
+ # steps:
+ # 1. find all relations for this VCA
+ # 2. wait for other peers related
+ # 3. add relations
+
+ try:
+ # STEP 1: find all relations for this VCA
+
+ # read nsr record
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ nsd = get_nsd(db_nsr)
+
+ # this VCA data
+ deployed_vca_dict = get_deployed_vca_list(db_nsr)[vca_index]
+ my_vca = DeployedVCA(nsr_id, deployed_vca_dict)
+
+ cached_vnfds = {}
+ cached_vnfrs = {}
+ relations = []
+ relations.extend(self._get_ns_relations(nsr_id, nsd, my_vca, cached_vnfds))
+ relations.extend(self._get_vnf_relations(nsr_id, nsd, my_vca, cached_vnfds))
+
+ # if no relations, terminate
+ if not relations:
+ self.logger.debug(logging_text + " No relations")
+ return True
+
+ self.logger.debug(logging_text + " adding relations {}".format(relations))
+
+ # add all relations
+ start = time()
+ while True:
+ # check timeout
+ now = time()
+ if now - start >= timeout:
+ self.logger.error(logging_text + " : timeout adding relations")
+ return False
+
+ # reload nsr from database (we need to update record: _admin.deployed.VCA)
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+
+ # for each relation, find the VCA's related
+ for relation in relations.copy():
+ added = await self._add_relation(
+ relation,
+ vca_type,
+ db_nsr,
+ cached_vnfds,
+ cached_vnfrs,
+ )
+ if added:
+ relations.remove(relation)
+
+ if not relations:
+ self.logger.debug("Relations added")
+ break
+ await asyncio.sleep(5.0)
+
+ return True
+
+ except Exception as e:
+ self.logger.warn(logging_text + " ERROR adding relations: {}".format(e))
+ return False
+
+ async def _install_kdu(
+ self,
+ nsr_id: str,
+ nsr_db_path: str,
+ vnfr_data: dict,
+ kdu_index: int,
+ kdud: dict,
+ vnfd: dict,
+ k8s_instance_info: dict,
+ k8params: dict = None,
+ timeout: int = 600,
+ vca_id: str = None,
+ ):
+
+ try:
+ k8sclustertype = k8s_instance_info["k8scluster-type"]
+ # Instantiate kdu
+ db_dict_install = {
+ "collection": "nsrs",
+ "filter": {"_id": nsr_id},
+ "path": nsr_db_path,
+ }
+
+ if k8s_instance_info.get("kdu-deployment-name"):
+ kdu_instance = k8s_instance_info.get("kdu-deployment-name")
+ else:
+ kdu_instance = self.k8scluster_map[
+ k8sclustertype
+ ].generate_kdu_instance_name(
+ db_dict=db_dict_install,
+ kdu_model=k8s_instance_info["kdu-model"],
+ kdu_name=k8s_instance_info["kdu-name"],
+ )
+ self.update_db_2(
+ "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
+ )
+ await self.k8scluster_map[k8sclustertype].install(
+ cluster_uuid=k8s_instance_info["k8scluster-uuid"],
+ kdu_model=k8s_instance_info["kdu-model"],
+ atomic=True,
+ params=k8params,
+ db_dict=db_dict_install,
+ timeout=timeout,
+ kdu_name=k8s_instance_info["kdu-name"],
+ namespace=k8s_instance_info["namespace"],
+ kdu_instance=kdu_instance,
+ vca_id=vca_id,
+ )
+ self.update_db_2(
+ "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
+ )
+
+ # Obtain services to obtain management service ip
+ services = await self.k8scluster_map[k8sclustertype].get_services(
+ cluster_uuid=k8s_instance_info["k8scluster-uuid"],
+ kdu_instance=kdu_instance,
+ namespace=k8s_instance_info["namespace"],
+ )
+
+ # Obtain management service info (if exists)
+ vnfr_update_dict = {}
+ kdu_config = get_configuration(vnfd, kdud["name"])
+ if kdu_config:
+ target_ee_list = kdu_config.get("execution-environment-list", [])
+ else:
+ target_ee_list = []
+
+ if services:
+ vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
+ mgmt_services = [
+ service
+ for service in kdud.get("service", [])
+ if service.get("mgmt-service")
+ ]
+ for mgmt_service in mgmt_services:
+ for service in services:
+ if service["name"].startswith(mgmt_service["name"]):
+ # Mgmt service found, Obtain service ip
+ ip = service.get("external_ip", service.get("cluster_ip"))
+ if isinstance(ip, list) and len(ip) == 1:
+ ip = ip[0]
+
+ vnfr_update_dict[
+ "kdur.{}.ip-address".format(kdu_index)
+ ] = ip
+
+ # Check if must update also mgmt ip at the vnf
+ service_external_cp = mgmt_service.get(
+ "external-connection-point-ref"
+ )
+ if service_external_cp:
+ if (
+ deep_get(vnfd, ("mgmt-interface", "cp"))
+ == service_external_cp
+ ):
+ vnfr_update_dict["ip-address"] = ip
+
+ if find_in_list(
+ target_ee_list,
+ lambda ee: ee.get(
+ "external-connection-point-ref", ""
+ )
+ == service_external_cp,
+ ):
+ vnfr_update_dict[
+ "kdur.{}.ip-address".format(kdu_index)
+ ] = ip
+ break
+ else:
+ self.logger.warn(
+ "Mgmt service name: {} not found".format(
+ mgmt_service["name"]
+ )
+ )
+
+ vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
+ self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
+
+ kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
+ if (
+ kdu_config
+ and kdu_config.get("initial-config-primitive")
+ and get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None
+ ):
+ initial_config_primitive_list = kdu_config.get(
+ "initial-config-primitive"
+ )
+ initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
+
+ for initial_config_primitive in initial_config_primitive_list:
+ primitive_params_ = self._map_primitive_params(
+ initial_config_primitive, {}, {}
+ )