+ # rest of staff will be done at finally
+
+ except (
+ ROclient.ROClientException,
+ DbException,
+ LcmException,
+ N2VCException,
+ ) as e:
+ self.logger.error(
+ logging_text + "Exit Exception while '{}': {}".format(stage[1], e)
+ )
+ exc = e
+ except asyncio.CancelledError:
+ self.logger.error(
+ logging_text + "Cancelled Exception while '{}'".format(stage[1])
+ )
+ exc = "Operation was cancelled"
+ except Exception as e:
+ exc = traceback.format_exc()
+ self.logger.critical(
+ logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
+ exc_info=True,
+ )
+ finally:
+ if exc:
+ error_list.append(str(exc))
+ try:
+ # wait for pending tasks
+ if tasks_dict_info:
+ stage[1] = "Waiting for instantiate pending tasks."
+ self.logger.debug(logging_text + stage[1])
+ error_list += await self._wait_for_tasks(
+ logging_text,
+ tasks_dict_info,
+ timeout_ns_deploy,
+ stage,
+ nslcmop_id,
+ nsr_id=nsr_id,
+ )
+ stage[1] = stage[2] = ""
+ except asyncio.CancelledError:
+ error_list.append("Cancelled")
+ # TODO cancel all tasks
+ except Exception as exc:
+ error_list.append(str(exc))
+
+ # update operation-status
+ db_nsr_update["operational-status"] = "running"
+ # let's begin with VCA 'configured' status (later we can change it)
+ db_nsr_update["config-status"] = "configured"
+ for task, task_name in tasks_dict_info.items():
+ if not task.done() or task.cancelled() or task.exception():
+ if task_name.startswith(self.task_name_deploy_vca):
+ # A N2VC task is pending
+ db_nsr_update["config-status"] = "failed"
+ else:
+ # RO or KDU task is pending
+ db_nsr_update["operational-status"] = "failed"
+
+ # update status at database
+ if error_list:
+ error_detail = ". ".join(error_list)
+ self.logger.error(logging_text + error_detail)
+ error_description_nslcmop = "{} Detail: {}".format(
+ stage[0], error_detail
+ )
+ error_description_nsr = "Operation: INSTANTIATING.{}, {}".format(
+ nslcmop_id, stage[0]
+ )
+
+ db_nsr_update["detailed-status"] = (
+ error_description_nsr + " Detail: " + error_detail
+ )
+ db_nslcmop_update["detailed-status"] = error_detail
+ nslcmop_operation_state = "FAILED"
+ ns_state = "BROKEN"
+ else:
+ error_detail = None
+ error_description_nsr = error_description_nslcmop = None
+ ns_state = "READY"
+ db_nsr_update["detailed-status"] = "Done"
+ db_nslcmop_update["detailed-status"] = "Done"
+ nslcmop_operation_state = "COMPLETED"
+
+ if db_nsr:
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=ns_state,
+ current_operation="IDLE",
+ current_operation_id=None,
+ error_description=error_description_nsr,
+ error_detail=error_detail,
+ other_update=db_nsr_update,
+ )
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage="",
+ error_message=error_description_nslcmop,
+ operation_state=nslcmop_operation_state,
+ other_update=db_nslcmop_update,
+ )
+
+ if nslcmop_operation_state:
+ try:
+ await self.msg.aiowrite(
+ "ns",
+ "instantiated",
+ {
+ "nsr_id": nsr_id,
+ "nslcmop_id": nslcmop_id,
+ "operationState": nslcmop_operation_state,
+ },
+ loop=self.loop,
+ )
+ except Exception as e:
+ self.logger.error(
+ logging_text + "kafka_write notification Exception {}".format(e)
+ )
+
+ self.logger.debug(logging_text + "Exit")
+ self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
+
+ def _get_vnfd(self, vnfd_id: str, cached_vnfds: Dict[str, Any]):
+ if vnfd_id not in cached_vnfds:
+ cached_vnfds[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
+ return cached_vnfds[vnfd_id]
+
+ def _get_vnfr(self, nsr_id: str, vnf_profile_id: str, cached_vnfrs: Dict[str, Any]):
+ if vnf_profile_id not in cached_vnfrs:
+ cached_vnfrs[vnf_profile_id] = self.db.get_one(
+ "vnfrs",
+ {
+ "member-vnf-index-ref": vnf_profile_id,
+ "nsr-id-ref": nsr_id,
+ },
+ )
+ return cached_vnfrs[vnf_profile_id]
+
+ def _is_deployed_vca_in_relation(
+ self, vca: DeployedVCA, relation: Relation
+ ) -> bool:
+ found = False
+ for endpoint in (relation.provider, relation.requirer):
+ if endpoint["kdu-resource-profile-id"]:
+ continue
+ found = (
+ vca.vnf_profile_id == endpoint.vnf_profile_id
+ and vca.vdu_profile_id == endpoint.vdu_profile_id
+ and vca.execution_environment_ref == endpoint.execution_environment_ref
+ )
+ if found:
+ break
+ return found
+
+ def _update_ee_relation_data_with_implicit_data(
+ self, nsr_id, nsd, ee_relation_data, cached_vnfds, vnf_profile_id: str = None
+ ):
+ ee_relation_data = safe_get_ee_relation(
+ nsr_id, ee_relation_data, vnf_profile_id=vnf_profile_id
+ )
+ ee_relation_level = EELevel.get_level(ee_relation_data)
+ if (ee_relation_level in (EELevel.VNF, EELevel.VDU)) and not ee_relation_data[
+ "execution-environment-ref"
+ ]:
+ vnf_profile = get_vnf_profile(nsd, ee_relation_data["vnf-profile-id"])
+ vnfd_id = vnf_profile["vnfd-id"]
+ db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
+ entity_id = (
+ vnfd_id
+ if ee_relation_level == EELevel.VNF
+ else ee_relation_data["vdu-profile-id"]
+ )
+ ee = get_juju_ee_ref(db_vnfd, entity_id)
+ if not ee:
+ raise Exception(
+ f"not execution environments found for ee_relation {ee_relation_data}"
+ )
+ ee_relation_data["execution-environment-ref"] = ee["id"]
+ return ee_relation_data
+
+ def _get_ns_relations(
+ self,
+ nsr_id: str,
+ nsd: Dict[str, Any],
+ vca: DeployedVCA,
+ cached_vnfds: Dict[str, Any],
+ ) -> List[Relation]:
+ relations = []
+ db_ns_relations = get_ns_configuration_relation_list(nsd)
+ for r in db_ns_relations:
+ provider_dict = None
+ requirer_dict = None
+ if all(key in r for key in ("provider", "requirer")):
+ provider_dict = r["provider"]
+ requirer_dict = r["requirer"]
+ elif "entities" in r:
+ provider_id = r["entities"][0]["id"]
+ provider_dict = {
+ "nsr-id": nsr_id,
+ "endpoint": r["entities"][0]["endpoint"],
+ }
+ if provider_id != nsd["id"]:
+ provider_dict["vnf-profile-id"] = provider_id
+ requirer_id = r["entities"][1]["id"]
+ requirer_dict = {
+ "nsr-id": nsr_id,
+ "endpoint": r["entities"][1]["endpoint"],
+ }
+ if requirer_id != nsd["id"]:
+ requirer_dict["vnf-profile-id"] = requirer_id
+ else:
+ raise Exception("provider/requirer or entities must be included in the relation.")
+ relation_provider = self._update_ee_relation_data_with_implicit_data(
+ nsr_id, nsd, provider_dict, cached_vnfds
+ )
+ relation_requirer = self._update_ee_relation_data_with_implicit_data(
+ nsr_id, nsd, requirer_dict, cached_vnfds
+ )
+ provider = EERelation(relation_provider)
+ requirer = EERelation(relation_requirer)
+ relation = Relation(r["name"], provider, requirer)
+ vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
+ if vca_in_relation:
+ relations.append(relation)
+ return relations
+
+ def _get_vnf_relations(
+ self,
+ nsr_id: str,
+ nsd: Dict[str, Any],
+ vca: DeployedVCA,
+ cached_vnfds: Dict[str, Any],
+ ) -> List[Relation]:
+ relations = []
+ vnf_profile = get_vnf_profile(nsd, vca.vnf_profile_id)
+ vnf_profile_id = vnf_profile["id"]
+ vnfd_id = vnf_profile["vnfd-id"]
+ db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
+ db_vnf_relations = get_relation_list(db_vnfd, vnfd_id)
+ for r in db_vnf_relations:
+ provider_dict = None
+ requirer_dict = None
+ if all(key in r for key in ("provider", "requirer")):
+ provider_dict = r["provider"]
+ requirer_dict = r["requirer"]
+ elif "entities" in r:
+ provider_id = r["entities"][0]["id"]
+ provider_dict = {
+ "nsr-id": nsr_id,
+ "vnf-profile-id": vnf_profile_id,
+ "endpoint": r["entities"][0]["endpoint"],
+ }
+ if provider_id != vnfd_id:
+ provider_dict["vdu-profile-id"] = provider_id
+ requirer_id = r["entities"][1]["id"]
+ requirer_dict = {
+ "nsr-id": nsr_id,
+ "vnf-profile-id": vnf_profile_id,
+ "endpoint": r["entities"][1]["endpoint"],
+ }
+ if requirer_id != vnfd_id:
+ requirer_dict["vdu-profile-id"] = requirer_id
+ else:
+ raise Exception("provider/requirer or entities must be included in the relation.")
+ relation_provider = self._update_ee_relation_data_with_implicit_data(
+ nsr_id, nsd, provider_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
+ )
+ relation_requirer = self._update_ee_relation_data_with_implicit_data(
+ nsr_id, nsd, requirer_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
+ )
+ provider = EERelation(relation_provider)
+ requirer = EERelation(relation_requirer)
+ relation = Relation(r["name"], provider, requirer)
+ vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
+ if vca_in_relation:
+ relations.append(relation)
+ return relations
+
+ def _get_kdu_resource_data(
+ self,
+ ee_relation: EERelation,
+ db_nsr: Dict[str, Any],
+ cached_vnfds: Dict[str, Any],
+ ) -> DeployedK8sResource:
+ nsd = get_nsd(db_nsr)
+ vnf_profiles = get_vnf_profiles(nsd)
+ vnfd_id = find_in_list(
+ vnf_profiles,
+ lambda vnf_profile: vnf_profile["id"] == ee_relation.vnf_profile_id,
+ )["vnfd-id"]
+ db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
+ kdu_resource_profile = get_kdu_resource_profile(
+ db_vnfd, ee_relation.kdu_resource_profile_id
+ )
+ kdu_name = kdu_resource_profile["kdu-name"]
+ deployed_kdu, _ = get_deployed_kdu(
+ db_nsr.get("_admin", ()).get("deployed", ()),
+ kdu_name,
+ ee_relation.vnf_profile_id,
+ )
+ deployed_kdu.update({"resource-name": kdu_resource_profile["resource-name"]})
+ return deployed_kdu
+
+ def _get_deployed_component(
+ self,
+ ee_relation: EERelation,
+ db_nsr: Dict[str, Any],
+ cached_vnfds: Dict[str, Any],
+ ) -> DeployedComponent:
+ nsr_id = db_nsr["_id"]
+ deployed_component = None
+ ee_level = EELevel.get_level(ee_relation)
+ if ee_level == EELevel.NS:
+ vca = get_deployed_vca(db_nsr, {"vdu_id": None, "member-vnf-index": None})
+ if vca:
+ deployed_component = DeployedVCA(nsr_id, vca)
+ elif ee_level == EELevel.VNF:
+ vca = get_deployed_vca(
+ db_nsr,
+ {
+ "vdu_id": None,
+ "member-vnf-index": ee_relation.vnf_profile_id,
+ "ee_descriptor_id": ee_relation.execution_environment_ref,
+ },
+ )
+ if vca:
+ deployed_component = DeployedVCA(nsr_id, vca)
+ elif ee_level == EELevel.VDU:
+ vca = get_deployed_vca(
+ db_nsr,
+ {
+ "vdu_id": ee_relation.vdu_profile_id,
+ "member-vnf-index": ee_relation.vnf_profile_id,
+ "ee_descriptor_id": ee_relation.execution_environment_ref,
+ },
+ )
+ if vca:
+ deployed_component = DeployedVCA(nsr_id, vca)
+ elif ee_level == EELevel.KDU:
+ kdu_resource_data = self._get_kdu_resource_data(
+ ee_relation, db_nsr, cached_vnfds
+ )
+ if kdu_resource_data:
+ deployed_component = DeployedK8sResource(kdu_resource_data)
+ return deployed_component
+
+ async def _add_relation(
+ self,
+ relation: Relation,
+ vca_type: str,
+ db_nsr: Dict[str, Any],
+ cached_vnfds: Dict[str, Any],
+ cached_vnfrs: Dict[str, Any],
+ ) -> bool:
+ deployed_provider = self._get_deployed_component(
+ relation.provider, db_nsr, cached_vnfds
+ )
+ deployed_requirer = self._get_deployed_component(
+ relation.requirer, db_nsr, cached_vnfds
+ )
+ if (
+ deployed_provider
+ and deployed_requirer
+ and deployed_provider.config_sw_installed
+ and deployed_requirer.config_sw_installed
+ ):
+ provider_db_vnfr = (
+ self._get_vnfr(
+ relation.provider.nsr_id,
+ relation.provider.vnf_profile_id,
+ cached_vnfrs,
+ )
+ if relation.provider.vnf_profile_id
+ else None
+ )
+ requirer_db_vnfr = (
+ self._get_vnfr(
+ relation.requirer.nsr_id,
+ relation.requirer.vnf_profile_id,
+ cached_vnfrs,
+ )
+ if relation.requirer.vnf_profile_id
+ else None
+ )
+ provider_vca_id = self.get_vca_id(provider_db_vnfr, db_nsr)
+ requirer_vca_id = self.get_vca_id(requirer_db_vnfr, db_nsr)
+ provider_relation_endpoint = RelationEndpoint(
+ deployed_provider.ee_id,
+ provider_vca_id,
+ relation.provider.endpoint,
+ )
+ requirer_relation_endpoint = RelationEndpoint(
+ deployed_requirer.ee_id,
+ requirer_vca_id,
+ relation.requirer.endpoint,
+ )
+ await self.vca_map[vca_type].add_relation(
+ provider=provider_relation_endpoint,
+ requirer=requirer_relation_endpoint,
+ )
+ # remove entry from relations list
+ return True
+ return False
+
+ async def _add_vca_relations(
+ self,
+ logging_text,
+ nsr_id,
+ vca_type: str,
+ vca_index: int,
+ timeout: int = 3600,
+ ) -> bool:
+
+ # steps:
+ # 1. find all relations for this VCA
+ # 2. wait for other peers related
+ # 3. add relations
+
+ try:
+ # STEP 1: find all relations for this VCA
+
+ # read nsr record
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ nsd = get_nsd(db_nsr)
+
+ # this VCA data
+ deployed_vca_dict = get_deployed_vca_list(db_nsr)[vca_index]
+ my_vca = DeployedVCA(nsr_id, deployed_vca_dict)
+
+ cached_vnfds = {}
+ cached_vnfrs = {}
+ relations = []
+ relations.extend(self._get_ns_relations(nsr_id, nsd, my_vca, cached_vnfds))
+ relations.extend(self._get_vnf_relations(nsr_id, nsd, my_vca, cached_vnfds))
+
+ # if no relations, terminate
+ if not relations:
+ self.logger.debug(logging_text + " No relations")
+ return True
+
+ self.logger.debug(logging_text + " adding relations {}".format(relations))
+
+ # add all relations
+ start = time()
+ while True:
+ # check timeout
+ now = time()
+ if now - start >= timeout:
+ self.logger.error(logging_text + " : timeout adding relations")
+ return False
+
+ # reload nsr from database (we need to update record: _admin.deployed.VCA)
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+
+ # for each relation, find the VCA's related
+ for relation in relations.copy():
+ added = await self._add_relation(
+ relation,
+ vca_type,
+ db_nsr,
+ cached_vnfds,
+ cached_vnfrs,
+ )
+ if added:
+ relations.remove(relation)
+
+ if not relations:
+ self.logger.debug("Relations added")
+ break
+ await asyncio.sleep(5.0)
+
+ return True
+
+ except Exception as e:
+ self.logger.warn(logging_text + " ERROR adding relations: {}".format(e))
+ return False