+ # read nsr record
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
+
+ # this VCA data
+ my_vca = deep_get(db_nsr, ("_admin", "deployed", "VCA"))[vca_index]
+
+ # read all ns-configuration relations
+ ns_relations = list()
+ db_ns_relations = deep_get(nsd, ("ns-configuration", "relation"))
+ if db_ns_relations:
+ for r in db_ns_relations:
+ # check if this VCA is in the relation
+ if my_vca.get("member-vnf-index") in (
+ r.get("entities")[0].get("id"),
+ r.get("entities")[1].get("id"),
+ ):
+ ns_relations.append(r)
+
+ # read all vnf-configuration relations
+ vnf_relations = list()
+ db_vnfd_list = db_nsr.get("vnfd-id")
+ if db_vnfd_list:
+ for vnfd in db_vnfd_list:
+ db_vnf_relations = None
+ db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
+ db_vnf_configuration = get_configuration(db_vnfd, db_vnfd["id"])
+ if db_vnf_configuration:
+ db_vnf_relations = db_vnf_configuration.get("relation", [])
+ if db_vnf_relations:
+ for r in db_vnf_relations:
+ # check if this VCA is in the relation
+ if my_vca.get("vdu_id") in (
+ r.get("entities")[0].get("id"),
+ r.get("entities")[1].get("id"),
+ ):
+ vnf_relations.append(r)
+
+ # if no relations, terminate
+ if not ns_relations and not vnf_relations:
+ self.logger.debug(logging_text + " No relations")
+ return True
+
+ self.logger.debug(
+ logging_text
+ + " adding relations\n {}\n {}".format(
+ ns_relations, vnf_relations
+ )
+ )
+
+ # add all relations
+ start = time()
+ while True:
+ # check timeout
+ now = time()
+ if now - start >= timeout:
+ self.logger.error(logging_text + " : timeout adding relations")
+ return False
+
+ # reload nsr from database (we need to update record: _admin.deloyed.VCA)
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+
+ # for each defined NS relation, find the VCA's related
+ for r in ns_relations.copy():
+ from_vca_ee_id = None
+ to_vca_ee_id = None
+ from_vca_endpoint = None
+ to_vca_endpoint = None
+ vca_list = deep_get(db_nsr, ("_admin", "deployed", "VCA"))
+ for vca in vca_list:
+ if vca.get("member-vnf-index") == r.get("entities")[0].get(
+ "id"
+ ) and vca.get("config_sw_installed"):
+ from_vca_ee_id = vca.get("ee_id")
+ from_vca_endpoint = r.get("entities")[0].get("endpoint")
+ if vca.get("member-vnf-index") == r.get("entities")[1].get(
+ "id"
+ ) and vca.get("config_sw_installed"):
+ to_vca_ee_id = vca.get("ee_id")
+ to_vca_endpoint = r.get("entities")[1].get("endpoint")
+ if from_vca_ee_id and to_vca_ee_id:
+ # add relation
+ await self.vca_map[vca_type].add_relation(
+ ee_id_1=from_vca_ee_id,
+ ee_id_2=to_vca_ee_id,
+ endpoint_1=from_vca_endpoint,
+ endpoint_2=to_vca_endpoint,
+ vca_id=vca_id,
+ )
+ # remove entry from relations list
+ ns_relations.remove(r)
+ else:
+ # check failed peers
+ try:
+ vca_status_list = db_nsr.get("configurationStatus")
+ if vca_status_list:
+ for i in range(len(vca_list)):
+ vca = vca_list[i]
+ vca_status = vca_status_list[i]
+ if vca.get("member-vnf-index") == r.get("entities")[
+ 0
+ ].get("id"):
+ if vca_status.get("status") == "BROKEN":
+ # peer broken: remove relation from list
+ ns_relations.remove(r)
+ if vca.get("member-vnf-index") == r.get("entities")[
+ 1
+ ].get("id"):
+ if vca_status.get("status") == "BROKEN":
+ # peer broken: remove relation from list
+ ns_relations.remove(r)
+ except Exception:
+ # ignore
+ pass
+
+ # for each defined VNF relation, find the VCA's related
+ for r in vnf_relations.copy():
+ from_vca_ee_id = None
+ to_vca_ee_id = None
+ from_vca_endpoint = None
+ to_vca_endpoint = None
+ vca_list = deep_get(db_nsr, ("_admin", "deployed", "VCA"))
+ for vca in vca_list:
+ key_to_check = "vdu_id"
+ if vca.get("vdu_id") is None:
+ key_to_check = "vnfd_id"
+ if vca.get(key_to_check) == r.get("entities")[0].get(
+ "id"
+ ) and vca.get("config_sw_installed"):
+ from_vca_ee_id = vca.get("ee_id")
+ from_vca_endpoint = r.get("entities")[0].get("endpoint")
+ if vca.get(key_to_check) == r.get("entities")[1].get(
+ "id"
+ ) and vca.get("config_sw_installed"):
+ to_vca_ee_id = vca.get("ee_id")
+ to_vca_endpoint = r.get("entities")[1].get("endpoint")
+ if from_vca_ee_id and to_vca_ee_id:
+ # add relation
+ await self.vca_map[vca_type].add_relation(
+ ee_id_1=from_vca_ee_id,
+ ee_id_2=to_vca_ee_id,
+ endpoint_1=from_vca_endpoint,
+ endpoint_2=to_vca_endpoint,
+ vca_id=vca_id,
+ )
+ # remove entry from relations list
+ vnf_relations.remove(r)
+ else:
+ # check failed peers
+ try:
+ vca_status_list = db_nsr.get("configurationStatus")
+ if vca_status_list:
+ for i in range(len(vca_list)):
+ vca = vca_list[i]
+ vca_status = vca_status_list[i]
+ if vca.get("vdu_id") == r.get("entities")[0].get(
+ "id"
+ ):
+ if vca_status.get("status") == "BROKEN":
+ # peer broken: remove relation from list
+ vnf_relations.remove(r)
+ if vca.get("vdu_id") == r.get("entities")[1].get(
+ "id"
+ ):
+ if vca_status.get("status") == "BROKEN":
+ # peer broken: remove relation from list
+ vnf_relations.remove(r)
+ except Exception:
+ # ignore
+ pass
+
+ # wait for next try
+ await asyncio.sleep(5.0)
+
+ if not ns_relations and not vnf_relations:
+ self.logger.debug("Relations added")
+ break
+
+ return True
+
+ except Exception as e:
+ self.logger.warn(logging_text + " ERROR adding relations: {}".format(e))
+ return False
+
+ async def _install_kdu(
+ self,
+ nsr_id: str,
+ nsr_db_path: str,
+ vnfr_data: dict,
+ kdu_index: int,
+ kdud: dict,
+ vnfd: dict,
+ k8s_instance_info: dict,
+ k8params: dict = None,
+ timeout: int = 600,
+ vca_id: str = None,
+ ):
+
+ try:
+ k8sclustertype = k8s_instance_info["k8scluster-type"]
+ # Instantiate kdu
+ db_dict_install = {
+ "collection": "nsrs",
+ "filter": {"_id": nsr_id},
+ "path": nsr_db_path,
+ }
+
+ if k8s_instance_info.get("kdu-deployment-name"):
+ kdu_instance = k8s_instance_info.get("kdu-deployment-name")
+ else:
+ kdu_instance = self.k8scluster_map[
+ k8sclustertype
+ ].generate_kdu_instance_name(
+ db_dict=db_dict_install,
+ kdu_model=k8s_instance_info["kdu-model"],
+ kdu_name=k8s_instance_info["kdu-name"],
+ )
+ self.update_db_2(
+ "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
+ )
+ await self.k8scluster_map[k8sclustertype].install(
+ cluster_uuid=k8s_instance_info["k8scluster-uuid"],
+ kdu_model=k8s_instance_info["kdu-model"],
+ atomic=True,
+ params=k8params,
+ db_dict=db_dict_install,
+ timeout=timeout,
+ kdu_name=k8s_instance_info["kdu-name"],
+ namespace=k8s_instance_info["namespace"],
+ kdu_instance=kdu_instance,
+ vca_id=vca_id,
+ )
+ self.update_db_2(
+ "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
+ )
+
+ # Obtain services to obtain management service ip
+ services = await self.k8scluster_map[k8sclustertype].get_services(
+ cluster_uuid=k8s_instance_info["k8scluster-uuid"],
+ kdu_instance=kdu_instance,
+ namespace=k8s_instance_info["namespace"],
+ )
+
+ # Obtain management service info (if exists)
+ vnfr_update_dict = {}
+ kdu_config = get_configuration(vnfd, kdud["name"])
+ if kdu_config:
+ target_ee_list = kdu_config.get("execution-environment-list", [])
+ else:
+ target_ee_list = []
+
+ if services:
+ vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
+ mgmt_services = [
+ service
+ for service in kdud.get("service", [])
+ if service.get("mgmt-service")
+ ]
+ for mgmt_service in mgmt_services:
+ for service in services:
+ if service["name"].startswith(mgmt_service["name"]):
+ # Mgmt service found, Obtain service ip
+ ip = service.get("external_ip", service.get("cluster_ip"))
+ if isinstance(ip, list) and len(ip) == 1:
+ ip = ip[0]
+
+ vnfr_update_dict[
+ "kdur.{}.ip-address".format(kdu_index)
+ ] = ip
+
+ # Check if must update also mgmt ip at the vnf
+ service_external_cp = mgmt_service.get(
+ "external-connection-point-ref"
+ )
+ if service_external_cp:
+ if (
+ deep_get(vnfd, ("mgmt-interface", "cp"))
+ == service_external_cp
+ ):
+ vnfr_update_dict["ip-address"] = ip
+
+ if find_in_list(
+ target_ee_list,
+ lambda ee: ee.get(
+ "external-connection-point-ref", ""
+ )
+ == service_external_cp,
+ ):
+ vnfr_update_dict[
+ "kdur.{}.ip-address".format(kdu_index)
+ ] = ip
+ break
+ else:
+ self.logger.warn(
+ "Mgmt service name: {} not found".format(
+ mgmt_service["name"]
+ )
+ )
+
+ vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
+ self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
+
+ kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
+ if (
+ kdu_config
+ and kdu_config.get("initial-config-primitive")
+ and get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None
+ ):
+ initial_config_primitive_list = kdu_config.get(
+ "initial-config-primitive"
+ )
+ initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
+
+ for initial_config_primitive in initial_config_primitive_list:
+ primitive_params_ = self._map_primitive_params(
+ initial_config_primitive, {}, {}
+ )
+
+ await asyncio.wait_for(
+ self.k8scluster_map[k8sclustertype].exec_primitive(
+ cluster_uuid=k8s_instance_info["k8scluster-uuid"],
+ kdu_instance=kdu_instance,
+ primitive_name=initial_config_primitive["name"],
+ params=primitive_params_,
+ db_dict=db_dict_install,
+ vca_id=vca_id,
+ ),
+ timeout=timeout,
+ )
+
+ except Exception as e:
+ # Prepare update db with error and raise exception
+ try:
+ self.update_db_2(
+ "nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}
+ )
+ self.update_db_2(
+ "vnfrs",
+ vnfr_data.get("_id"),
+ {"kdur.{}.status".format(kdu_index): "ERROR"},
+ )
+ except Exception:
+ # ignore to keep original exception
+ pass
+ # reraise original error
+ raise
+
+ return kdu_instance
+
+ async def deploy_kdus(
+ self,
+ logging_text,
+ nsr_id,
+ nslcmop_id,
+ db_vnfrs,
+ db_vnfds,
+ task_instantiation_info,
+ ):
+ # Launch kdus if present in the descriptor
+
+ k8scluster_id_2_uuic = {
+ "helm-chart-v3": {},
+ "helm-chart": {},
+ "juju-bundle": {},
+ }
+
+ async def _get_cluster_id(cluster_id, cluster_type):
+ nonlocal k8scluster_id_2_uuic
+ if cluster_id in k8scluster_id_2_uuic[cluster_type]:
+ return k8scluster_id_2_uuic[cluster_type][cluster_id]
+
+ # check if K8scluster is creating and wait look if previous tasks in process
+ task_name, task_dependency = self.lcm_tasks.lookfor_related(
+ "k8scluster", cluster_id
+ )
+ if task_dependency:
+ text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
+ task_name, cluster_id
+ )
+ self.logger.debug(logging_text + text)
+ await asyncio.wait(task_dependency, timeout=3600)
+
+ db_k8scluster = self.db.get_one(
+ "k8sclusters", {"_id": cluster_id}, fail_on_empty=False
+ )
+ if not db_k8scluster:
+ raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
+
+ k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
+ if not k8s_id:
+ if cluster_type == "helm-chart-v3":
+ try:
+ # backward compatibility for existing clusters that have not been initialized for helm v3
+ k8s_credentials = yaml.safe_dump(
+ db_k8scluster.get("credentials")
+ )
+ k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(
+ k8s_credentials, reuse_cluster_uuid=cluster_id
+ )
+ db_k8scluster_update = {}
+ db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
+ db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
+ db_k8scluster_update[
+ "_admin.helm-chart-v3.created"
+ ] = uninstall_sw
+ db_k8scluster_update[
+ "_admin.helm-chart-v3.operationalState"
+ ] = "ENABLED"
+ self.update_db_2(
+ "k8sclusters", cluster_id, db_k8scluster_update
+ )
+ except Exception as e:
+ self.logger.error(
+ logging_text
+ + "error initializing helm-v3 cluster: {}".format(str(e))
+ )
+ raise LcmException(
+ "K8s cluster '{}' has not been initialized for '{}'".format(
+ cluster_id, cluster_type
+ )
+ )
+ else:
+ raise LcmException(
+ "K8s cluster '{}' has not been initialized for '{}'".format(
+ cluster_id, cluster_type
+ )
+ )
+ k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
+ return k8s_id