+ # Obtain management service info (if exists)
+ vnfr_update_dict = {}
+ kdu_config = get_configuration(vnfd, kdud["name"])
+ if kdu_config:
+ target_ee_list = kdu_config.get("execution-environment-list", [])
+ else:
+ target_ee_list = []
+
+ if services:
+ vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
+ mgmt_services = [
+ service
+ for service in kdud.get("service", [])
+ if service.get("mgmt-service")
+ ]
+ for mgmt_service in mgmt_services:
+ for service in services:
+ if service["name"].startswith(mgmt_service["name"]):
+ # Mgmt service found, Obtain service ip
+ ip = service.get("external_ip", service.get("cluster_ip"))
+ if isinstance(ip, list) and len(ip) == 1:
+ ip = ip[0]
+
+ vnfr_update_dict[
+ "kdur.{}.ip-address".format(kdu_index)
+ ] = ip
+
+ # Check if must update also mgmt ip at the vnf
+ service_external_cp = mgmt_service.get(
+ "external-connection-point-ref"
+ )
+ if service_external_cp:
+ if (
+ deep_get(vnfd, ("mgmt-interface", "cp"))
+ == service_external_cp
+ ):
+ vnfr_update_dict["ip-address"] = ip
+
+ if find_in_list(
+ target_ee_list,
+ lambda ee: ee.get(
+ "external-connection-point-ref", ""
+ )
+ == service_external_cp,
+ ):
+ vnfr_update_dict[
+ "kdur.{}.ip-address".format(kdu_index)
+ ] = ip
+ break
+ else:
+ self.logger.warn(
+ "Mgmt service name: {} not found".format(
+ mgmt_service["name"]
+ )
+ )
+
+ vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
+ self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
+
+ kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
+ if (
+ kdu_config
+ and kdu_config.get("initial-config-primitive")
+ and get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None
+ ):
+ initial_config_primitive_list = kdu_config.get(
+ "initial-config-primitive"
+ )
+ initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
+
+ for initial_config_primitive in initial_config_primitive_list:
+ primitive_params_ = self._map_primitive_params(
+ initial_config_primitive, {}, {}
+ )
+
+ await asyncio.wait_for(
+ self.k8scluster_map[k8sclustertype].exec_primitive(
+ cluster_uuid=k8s_instance_info["k8scluster-uuid"],
+ kdu_instance=kdu_instance,
+ primitive_name=initial_config_primitive["name"],
+ params=primitive_params_,
+ db_dict=db_dict_install,
+ vca_id=vca_id,
+ ),
+ timeout=timeout,
+ )
+
+ except Exception as e:
+ # Prepare update db with error and raise exception
+ try:
+ self.update_db_2(
+ "nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}
+ )
+ self.update_db_2(
+ "vnfrs",
+ vnfr_data.get("_id"),
+ {"kdur.{}.status".format(kdu_index): "ERROR"},
+ )
+ except Exception:
+ # ignore to keep original exception
+ pass
+ # reraise original error
+ raise
+
+ return kdu_instance
+
+ async def deploy_kdus(
+ self,
+ logging_text,
+ nsr_id,
+ nslcmop_id,
+ db_vnfrs,
+ db_vnfds,
+ task_instantiation_info,
+ ):
+ # Launch kdus if present in the descriptor
+
+ k8scluster_id_2_uuic = {
+ "helm-chart-v3": {},
+ "helm-chart": {},
+ "juju-bundle": {},
+ }
+
+ async def _get_cluster_id(cluster_id, cluster_type):
+ nonlocal k8scluster_id_2_uuic
+ if cluster_id in k8scluster_id_2_uuic[cluster_type]:
+ return k8scluster_id_2_uuic[cluster_type][cluster_id]
+
+ # check if K8scluster is creating and wait look if previous tasks in process
+ task_name, task_dependency = self.lcm_tasks.lookfor_related(
+ "k8scluster", cluster_id
+ )
+ if task_dependency:
+ text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
+ task_name, cluster_id
+ )
+ self.logger.debug(logging_text + text)
+ await asyncio.wait(task_dependency, timeout=3600)
+
+ db_k8scluster = self.db.get_one(
+ "k8sclusters", {"_id": cluster_id}, fail_on_empty=False
+ )
+ if not db_k8scluster:
+ raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
+
+ k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
+ if not k8s_id:
+ if cluster_type == "helm-chart-v3":
+ try:
+ # backward compatibility for existing clusters that have not been initialized for helm v3
+ k8s_credentials = yaml.safe_dump(
+ db_k8scluster.get("credentials")
+ )
+ k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(
+ k8s_credentials, reuse_cluster_uuid=cluster_id
+ )
+ db_k8scluster_update = {}
+ db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
+ db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
+ db_k8scluster_update[
+ "_admin.helm-chart-v3.created"
+ ] = uninstall_sw
+ db_k8scluster_update[
+ "_admin.helm-chart-v3.operationalState"
+ ] = "ENABLED"
+ self.update_db_2(
+ "k8sclusters", cluster_id, db_k8scluster_update
+ )
+ except Exception as e:
+ self.logger.error(
+ logging_text
+ + "error initializing helm-v3 cluster: {}".format(str(e))
+ )
+ raise LcmException(
+ "K8s cluster '{}' has not been initialized for '{}'".format(
+ cluster_id, cluster_type
+ )
+ )
+ else:
+ raise LcmException(
+ "K8s cluster '{}' has not been initialized for '{}'".format(
+ cluster_id, cluster_type
+ )
+ )
+ k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
+ return k8s_id
+
+ logging_text += "Deploy kdus: "
+ step = ""
+ try:
+ db_nsr_update = {"_admin.deployed.K8s": []}
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+
+ index = 0
+ updated_cluster_list = []
+ updated_v3_cluster_list = []
+
+ for vnfr_data in db_vnfrs.values():
+ vca_id = self.get_vca_id(vnfr_data, {})
+ for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
+ # Step 0: Prepare and set parameters
+ desc_params = parse_yaml_strings(kdur.get("additionalParams"))
+ vnfd_id = vnfr_data.get("vnfd-id")
+ vnfd_with_id = find_in_list(
+ db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
+ )
+ kdud = next(
+ kdud
+ for kdud in vnfd_with_id["kdu"]
+ if kdud["name"] == kdur["kdu-name"]
+ )
+ namespace = kdur.get("k8s-namespace")
+ kdu_deployment_name = kdur.get("kdu-deployment-name")
+ if kdur.get("helm-chart"):
+ kdumodel = kdur["helm-chart"]
+ # Default version: helm3, if helm-version is v2 assign v2
+ k8sclustertype = "helm-chart-v3"
+ self.logger.debug("kdur: {}".format(kdur))
+ if (
+ kdur.get("helm-version")
+ and kdur.get("helm-version") == "v2"
+ ):
+ k8sclustertype = "helm-chart"
+ elif kdur.get("juju-bundle"):
+ kdumodel = kdur["juju-bundle"]
+ k8sclustertype = "juju-bundle"
+ else:
+ raise LcmException(
+ "kdu type for kdu='{}.{}' is neither helm-chart nor "
+ "juju-bundle. Maybe an old NBI version is running".format(
+ vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]
+ )
+ )
+ # check if kdumodel is a file and exists
+ try:
+ vnfd_with_id = find_in_list(
+ db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
+ )
+ storage = deep_get(vnfd_with_id, ("_admin", "storage"))
+ if storage and storage.get(
+ "pkg-dir"
+ ): # may be not present if vnfd has not artifacts
+ # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
+ filename = "{}/{}/{}s/{}".format(
+ storage["folder"],
+ storage["pkg-dir"],
+ k8sclustertype,
+ kdumodel,
+ )
+ if self.fs.file_exists(
+ filename, mode="file"
+ ) or self.fs.file_exists(filename, mode="dir"):
+ kdumodel = self.fs.path + filename
+ except (asyncio.TimeoutError, asyncio.CancelledError):
+ raise
+ except Exception: # it is not a file
+ pass
+
+ k8s_cluster_id = kdur["k8s-cluster"]["id"]
+ step = "Synchronize repos for k8s cluster '{}'".format(
+ k8s_cluster_id
+ )
+ cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
+
+ # Synchronize repos
+ if (
+ k8sclustertype == "helm-chart"
+ and cluster_uuid not in updated_cluster_list
+ ) or (
+ k8sclustertype == "helm-chart-v3"
+ and cluster_uuid not in updated_v3_cluster_list
+ ):
+ del_repo_list, added_repo_dict = await asyncio.ensure_future(
+ self.k8scluster_map[k8sclustertype].synchronize_repos(
+ cluster_uuid=cluster_uuid
+ )
+ )
+ if del_repo_list or added_repo_dict:
+ if k8sclustertype == "helm-chart":
+ unset = {
+ "_admin.helm_charts_added." + item: None
+ for item in del_repo_list
+ }
+ updated = {
+ "_admin.helm_charts_added." + item: name
+ for item, name in added_repo_dict.items()
+ }
+ updated_cluster_list.append(cluster_uuid)
+ elif k8sclustertype == "helm-chart-v3":
+ unset = {
+ "_admin.helm_charts_v3_added." + item: None
+ for item in del_repo_list
+ }
+ updated = {
+ "_admin.helm_charts_v3_added." + item: name
+ for item, name in added_repo_dict.items()
+ }
+ updated_v3_cluster_list.append(cluster_uuid)
+ self.logger.debug(
+ logging_text + "repos synchronized on k8s cluster "
+ "'{}' to_delete: {}, to_add: {}".format(
+ k8s_cluster_id, del_repo_list, added_repo_dict
+ )
+ )
+ self.db.set_one(
+ "k8sclusters",
+ {"_id": k8s_cluster_id},
+ updated,
+ unset=unset,
+ )
+
+ # Instantiate kdu
+ step = "Instantiating KDU {}.{} in k8s cluster {}".format(
+ vnfr_data["member-vnf-index-ref"],
+ kdur["kdu-name"],
+ k8s_cluster_id,
+ )
+ k8s_instance_info = {
+ "kdu-instance": None,
+ "k8scluster-uuid": cluster_uuid,
+ "k8scluster-type": k8sclustertype,
+ "member-vnf-index": vnfr_data["member-vnf-index-ref"],
+ "kdu-name": kdur["kdu-name"],
+ "kdu-model": kdumodel,
+ "namespace": namespace,
+ "kdu-deployment-name": kdu_deployment_name,
+ }
+ db_path = "_admin.deployed.K8s.{}".format(index)
+ db_nsr_update[db_path] = k8s_instance_info
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ vnfd_with_id = find_in_list(
+ db_vnfds, lambda vnf: vnf["_id"] == vnfd_id
+ )
+ task = asyncio.ensure_future(
+ self._install_kdu(
+ nsr_id,
+ db_path,
+ vnfr_data,
+ kdu_index,
+ kdud,
+ vnfd_with_id,
+ k8s_instance_info,
+ k8params=desc_params,
+ timeout=600,
+ vca_id=vca_id,
+ )
+ )
+ self.lcm_tasks.register(
+ "ns",
+ nsr_id,
+ nslcmop_id,
+ "instantiate_KDU-{}".format(index),
+ task,
+ )
+ task_instantiation_info[task] = "Deploying KDU {}".format(
+ kdur["kdu-name"]
+ )
+
+ index += 1
+
+ except (LcmException, asyncio.CancelledError):
+ raise
+ except Exception as e:
+ msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
+ if isinstance(e, (N2VCException, DbException)):
+ self.logger.error(logging_text + msg)
+ else:
+ self.logger.critical(logging_text + msg, exc_info=True)
+ raise LcmException(msg)
+ finally:
+ if db_nsr_update:
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+
+ def _deploy_n2vc(
+ self,
+ logging_text,
+ db_nsr,
+ db_vnfr,
+ nslcmop_id,
+ nsr_id,
+ nsi_id,
+ vnfd_id,
+ vdu_id,
+ kdu_name,
+ member_vnf_index,
+ vdu_index,
+ vdu_name,
+ deploy_params,
+ descriptor_config,
+ base_folder,
+ task_instantiation_info,
+ stage,
+ ):
+ # launch instantiate_N2VC in a asyncio task and register task object
+ # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
+ # if not found, create one entry and update database
+ # fill db_nsr._admin.deployed.VCA.<index>
+
+ self.logger.debug(
+ logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)
+ )
+ if "execution-environment-list" in descriptor_config:
+ ee_list = descriptor_config.get("execution-environment-list", [])
+ elif "juju" in descriptor_config:
+ ee_list = [descriptor_config] # ns charms
+ else: # other types as script are not supported
+ ee_list = []
+
+ for ee_item in ee_list:
+ self.logger.debug(
+ logging_text
+ + "_deploy_n2vc ee_item juju={}, helm={}".format(
+ ee_item.get("juju"), ee_item.get("helm-chart")
+ )
+ )
+ ee_descriptor_id = ee_item.get("id")
+ if ee_item.get("juju"):
+ vca_name = ee_item["juju"].get("charm")
+ vca_type = (
+ "lxc_proxy_charm"
+ if ee_item["juju"].get("charm") is not None
+ else "native_charm"
+ )
+ if ee_item["juju"].get("cloud") == "k8s":
+ vca_type = "k8s_proxy_charm"
+ elif ee_item["juju"].get("proxy") is False:
+ vca_type = "native_charm"
+ elif ee_item.get("helm-chart"):
+ vca_name = ee_item["helm-chart"]
+ if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
+ vca_type = "helm"
+ else:
+ vca_type = "helm-v3"
+ else:
+ self.logger.debug(
+ logging_text + "skipping non juju neither charm configuration"
+ )
+ continue
+
+ vca_index = -1
+ for vca_index, vca_deployed in enumerate(
+ db_nsr["_admin"]["deployed"]["VCA"]
+ ):
+ if not vca_deployed:
+ continue
+ if (
+ vca_deployed.get("member-vnf-index") == member_vnf_index
+ and vca_deployed.get("vdu_id") == vdu_id
+ and vca_deployed.get("kdu_name") == kdu_name
+ and vca_deployed.get("vdu_count_index", 0) == vdu_index
+ and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id
+ ):
+ break
+ else:
+ # not found, create one.
+ target = (
+ "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
+ )
+ if vdu_id:
+ target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
+ elif kdu_name:
+ target += "/kdu/{}".format(kdu_name)
+ vca_deployed = {
+ "target_element": target,
+ # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
+ "member-vnf-index": member_vnf_index,
+ "vdu_id": vdu_id,
+ "kdu_name": kdu_name,
+ "vdu_count_index": vdu_index,
+ "operational-status": "init", # TODO revise
+ "detailed-status": "", # TODO revise
+ "step": "initial-deploy", # TODO revise
+ "vnfd_id": vnfd_id,
+ "vdu_name": vdu_name,
+ "type": vca_type,
+ "ee_descriptor_id": ee_descriptor_id,
+ }
+ vca_index += 1
+
+ # create VCA and configurationStatus in db
+ db_dict = {
+ "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
+ "configurationStatus.{}".format(vca_index): dict(),
+ }
+ self.update_db_2("nsrs", nsr_id, db_dict)
+
+ db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
+
+ self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
+ self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
+ self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
+
+ # Launch task
+ task_n2vc = asyncio.ensure_future(
+ self.instantiate_N2VC(
+ logging_text=logging_text,
+ vca_index=vca_index,
+ nsi_id=nsi_id,
+ db_nsr=db_nsr,
+ db_vnfr=db_vnfr,
+ vdu_id=vdu_id,
+ kdu_name=kdu_name,
+ vdu_index=vdu_index,
+ deploy_params=deploy_params,
+ config_descriptor=descriptor_config,
+ base_folder=base_folder,
+ nslcmop_id=nslcmop_id,
+ stage=stage,
+ vca_type=vca_type,
+ vca_name=vca_name,
+ ee_config_descriptor=ee_item,
+ )
+ )
+ self.lcm_tasks.register(
+ "ns",
+ nsr_id,
+ nslcmop_id,
+ "instantiate_N2VC-{}".format(vca_index),
+ task_n2vc,
+ )
+ task_instantiation_info[
+ task_n2vc
+ ] = self.task_name_deploy_vca + " {}.{}".format(
+ member_vnf_index or "", vdu_id or ""
+ )