+ def _get_vnfd(self, vnfd_id: str, cached_vnfds: Dict[str, Any]):
+ if vnfd_id not in cached_vnfds:
+ cached_vnfds[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
+ return cached_vnfds[vnfd_id]
+
+ def _get_vnfr(self, nsr_id: str, vnf_profile_id: str, cached_vnfrs: Dict[str, Any]):
+ if vnf_profile_id not in cached_vnfrs:
+ cached_vnfrs[vnf_profile_id] = self.db.get_one(
+ "vnfrs",
+ {
+ "member-vnf-index-ref": vnf_profile_id,
+ "nsr-id-ref": nsr_id,
+ },
+ )
+ return cached_vnfrs[vnf_profile_id]
+
+ def _is_deployed_vca_in_relation(
+ self, vca: DeployedVCA, relation: Relation
+ ) -> bool:
+ found = False
+ for endpoint in (relation.provider, relation.requirer):
+ if endpoint["kdu-resource-profile-id"]:
+ continue
+ found = (
+ vca.vnf_profile_id == endpoint.vnf_profile_id
+ and vca.vdu_profile_id == endpoint.vdu_profile_id
+ and vca.execution_environment_ref == endpoint.execution_environment_ref
+ )
+ if found:
+ break
+ return found
+
+ def _update_ee_relation_data_with_implicit_data(
+ self, nsr_id, nsd, ee_relation_data, cached_vnfds, vnf_profile_id: str = None
+ ):
+ ee_relation_data = safe_get_ee_relation(
+ nsr_id, ee_relation_data, vnf_profile_id=vnf_profile_id
+ )
+ ee_relation_level = EELevel.get_level(ee_relation_data)
+ if (ee_relation_level in (EELevel.VNF, EELevel.VDU)) and not ee_relation_data[
+ "execution-environment-ref"
+ ]:
+ vnf_profile = get_vnf_profile(nsd, ee_relation_data["vnf-profile-id"])
+ vnfd_id = vnf_profile["vnfd-id"]
+ db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
+ entity_id = (
+ vnfd_id
+ if ee_relation_level == EELevel.VNF
+ else ee_relation_data["vdu-profile-id"]
+ )
+ ee = get_juju_ee_ref(db_vnfd, entity_id)
+ if not ee:
+ raise Exception(
+ f"not execution environments found for ee_relation {ee_relation_data}"
+ )
+ ee_relation_data["execution-environment-ref"] = ee["id"]
+ return ee_relation_data
+
+ def _get_ns_relations(
+ self,
+ nsr_id: str,
+ nsd: Dict[str, Any],
+ vca: DeployedVCA,
+ cached_vnfds: Dict[str, Any],
+ ) -> List[Relation]:
+ relations = []
+ db_ns_relations = get_ns_configuration_relation_list(nsd)
+ for r in db_ns_relations:
+ provider_dict = None
+ requirer_dict = None
+ if all(key in r for key in ("provider", "requirer")):
+ provider_dict = r["provider"]
+ requirer_dict = r["requirer"]
+ elif "entities" in r:
+ provider_id = r["entities"][0]["id"]
+ provider_dict = {
+ "nsr-id": nsr_id,
+ "endpoint": r["entities"][0]["endpoint"],
+ }
+ if provider_id != nsd["id"]:
+ provider_dict["vnf-profile-id"] = provider_id
+ requirer_id = r["entities"][1]["id"]
+ requirer_dict = {
+ "nsr-id": nsr_id,
+ "endpoint": r["entities"][1]["endpoint"],
+ }
+ if requirer_id != nsd["id"]:
+ requirer_dict["vnf-profile-id"] = requirer_id
+ else:
+ raise Exception(
+ "provider/requirer or entities must be included in the relation."
+ )
+ relation_provider = self._update_ee_relation_data_with_implicit_data(
+ nsr_id, nsd, provider_dict, cached_vnfds
+ )
+ relation_requirer = self._update_ee_relation_data_with_implicit_data(
+ nsr_id, nsd, requirer_dict, cached_vnfds
+ )
+ provider = EERelation(relation_provider)
+ requirer = EERelation(relation_requirer)
+ relation = Relation(r["name"], provider, requirer)
+ vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
+ if vca_in_relation:
+ relations.append(relation)
+ return relations
+
+ def _get_vnf_relations(
+ self,
+ nsr_id: str,
+ nsd: Dict[str, Any],
+ vca: DeployedVCA,
+ cached_vnfds: Dict[str, Any],
+ ) -> List[Relation]:
+ relations = []
+ vnf_profile = get_vnf_profile(nsd, vca.vnf_profile_id)
+ vnf_profile_id = vnf_profile["id"]
+ vnfd_id = vnf_profile["vnfd-id"]
+ db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
+ db_vnf_relations = get_relation_list(db_vnfd, vnfd_id)
+ for r in db_vnf_relations:
+ provider_dict = None
+ requirer_dict = None
+ if all(key in r for key in ("provider", "requirer")):
+ provider_dict = r["provider"]
+ requirer_dict = r["requirer"]
+ elif "entities" in r:
+ provider_id = r["entities"][0]["id"]
+ provider_dict = {
+ "nsr-id": nsr_id,
+ "vnf-profile-id": vnf_profile_id,
+ "endpoint": r["entities"][0]["endpoint"],
+ }
+ if provider_id != vnfd_id:
+ provider_dict["vdu-profile-id"] = provider_id
+ requirer_id = r["entities"][1]["id"]
+ requirer_dict = {
+ "nsr-id": nsr_id,
+ "vnf-profile-id": vnf_profile_id,
+ "endpoint": r["entities"][1]["endpoint"],
+ }
+ if requirer_id != vnfd_id:
+ requirer_dict["vdu-profile-id"] = requirer_id
+ else:
+ raise Exception(
+ "provider/requirer or entities must be included in the relation."
+ )
+ relation_provider = self._update_ee_relation_data_with_implicit_data(
+ nsr_id, nsd, provider_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
+ )
+ relation_requirer = self._update_ee_relation_data_with_implicit_data(
+ nsr_id, nsd, requirer_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
+ )
+ provider = EERelation(relation_provider)
+ requirer = EERelation(relation_requirer)
+ relation = Relation(r["name"], provider, requirer)
+ vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
+ if vca_in_relation:
+ relations.append(relation)
+ return relations
+
+ def _get_kdu_resource_data(
+ self,
+ ee_relation: EERelation,
+ db_nsr: Dict[str, Any],
+ cached_vnfds: Dict[str, Any],
+ ) -> DeployedK8sResource:
+ nsd = get_nsd(db_nsr)
+ vnf_profiles = get_vnf_profiles(nsd)
+ vnfd_id = find_in_list(
+ vnf_profiles,
+ lambda vnf_profile: vnf_profile["id"] == ee_relation.vnf_profile_id,
+ )["vnfd-id"]
+ db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
+ kdu_resource_profile = get_kdu_resource_profile(
+ db_vnfd, ee_relation.kdu_resource_profile_id
+ )
+ kdu_name = kdu_resource_profile["kdu-name"]
+ deployed_kdu, _ = get_deployed_kdu(
+ db_nsr.get("_admin", ()).get("deployed", ()),
+ kdu_name,
+ ee_relation.vnf_profile_id,
+ )
+ deployed_kdu.update({"resource-name": kdu_resource_profile["resource-name"]})
+ return deployed_kdu
+
+ def _get_deployed_component(
+ self,
+ ee_relation: EERelation,
+ db_nsr: Dict[str, Any],
+ cached_vnfds: Dict[str, Any],
+ ) -> DeployedComponent:
+ nsr_id = db_nsr["_id"]
+ deployed_component = None
+ ee_level = EELevel.get_level(ee_relation)
+ if ee_level == EELevel.NS:
+ vca = get_deployed_vca(db_nsr, {"vdu_id": None, "member-vnf-index": None})
+ if vca:
+ deployed_component = DeployedVCA(nsr_id, vca)
+ elif ee_level == EELevel.VNF:
+ vca = get_deployed_vca(
+ db_nsr,
+ {
+ "vdu_id": None,
+ "member-vnf-index": ee_relation.vnf_profile_id,
+ "ee_descriptor_id": ee_relation.execution_environment_ref,
+ },
+ )
+ if vca:
+ deployed_component = DeployedVCA(nsr_id, vca)
+ elif ee_level == EELevel.VDU:
+ vca = get_deployed_vca(
+ db_nsr,
+ {
+ "vdu_id": ee_relation.vdu_profile_id,
+ "member-vnf-index": ee_relation.vnf_profile_id,
+ "ee_descriptor_id": ee_relation.execution_environment_ref,
+ },
+ )
+ if vca:
+ deployed_component = DeployedVCA(nsr_id, vca)
+ elif ee_level == EELevel.KDU:
+ kdu_resource_data = self._get_kdu_resource_data(
+ ee_relation, db_nsr, cached_vnfds
+ )
+ if kdu_resource_data:
+ deployed_component = DeployedK8sResource(kdu_resource_data)
+ return deployed_component
+
+ async def _add_relation(
+ self,
+ relation: Relation,
+ vca_type: str,
+ db_nsr: Dict[str, Any],
+ cached_vnfds: Dict[str, Any],
+ cached_vnfrs: Dict[str, Any],
+ ) -> bool:
+ deployed_provider = self._get_deployed_component(
+ relation.provider, db_nsr, cached_vnfds
+ )
+ deployed_requirer = self._get_deployed_component(
+ relation.requirer, db_nsr, cached_vnfds
+ )
+ if (
+ deployed_provider
+ and deployed_requirer
+ and deployed_provider.config_sw_installed
+ and deployed_requirer.config_sw_installed
+ ):
+ provider_db_vnfr = (
+ self._get_vnfr(
+ relation.provider.nsr_id,
+ relation.provider.vnf_profile_id,
+ cached_vnfrs,
+ )
+ if relation.provider.vnf_profile_id
+ else None
+ )
+ requirer_db_vnfr = (
+ self._get_vnfr(
+ relation.requirer.nsr_id,
+ relation.requirer.vnf_profile_id,
+ cached_vnfrs,
+ )
+ if relation.requirer.vnf_profile_id
+ else None
+ )
+ provider_vca_id = self.get_vca_id(provider_db_vnfr, db_nsr)
+ requirer_vca_id = self.get_vca_id(requirer_db_vnfr, db_nsr)
+ provider_relation_endpoint = RelationEndpoint(
+ deployed_provider.ee_id,
+ provider_vca_id,
+ relation.provider.endpoint,
+ )
+ requirer_relation_endpoint = RelationEndpoint(
+ deployed_requirer.ee_id,
+ requirer_vca_id,
+ relation.requirer.endpoint,
+ )
+ await self.vca_map[vca_type].add_relation(
+ provider=provider_relation_endpoint,
+ requirer=requirer_relation_endpoint,
+ )
+ # remove entry from relations list
+ return True
+ return False
+
+ async def _add_vca_relations(
+ self,
+ logging_text,
+ nsr_id,
+ vca_type: str,
+ vca_index: int,
+ timeout: int = 3600,
+ ) -> bool: