+ async def deploy_kdus(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref, db_k8scluster):
+ # Launch kdus if present in the descriptor
+ logging_text = "Deploy kdus: "
+ db_nsr_update = {}
+ db_nsr_update["_admin.deployed.K8s"] = []
+ try:
+ # Look for all vnfds
+ # db_nsr_update["_admin.deployed.K8s"] = []
+ vnf_update = []
+ task_list = []
+ for c_vnf in nsd.get("constituent-vnfd", ()):
+ vnfr = db_vnfrs[c_vnf["member-vnf-index"]]
+ member_vnf_index = c_vnf["member-vnf-index"]
+ vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
+ vnfd_ref = vnfd["id"]
+ desc_params = {}
+
+ step = "Checking kdu from vnf: {} - member-vnf-index: {}".format(vnfd_ref, member_vnf_index)
+ self.logger.debug(logging_text + step)
+ if vnfd.get("kdu"):
+ step = "vnf: {} has kdus".format(vnfd_ref)
+ self.logger.debug(logging_text + step)
+ for vnfr_name, vnfr_data in db_vnfrs.items():
+ if vnfr_data["vnfd-ref"] == vnfd["id"]:
+ if vnfr_data.get("additionalParamsForVnf"):
+ desc_params = self._format_additional_params(vnfr_data["additionalParamsForVnf"])
+ break
+ else:
+ raise LcmException("VNF descriptor not found with id: {}".format(vnfr_data["vnfd-ref"]))
+ self.logger.debug(logging_text + step)
+
+ for kdur in vnfr.get("kdur"):
+ index = 0
+ for k8scluster in db_k8scluster:
+ if kdur["k8s-cluster"]["id"] == k8scluster["_id"]:
+ cluster_uuid = k8scluster["cluster-uuid"]
+ break
+ else:
+ raise LcmException("K8scluster not found with id: {}".format(kdur["k8s-cluster"]["id"]))
+ self.logger.debug(logging_text + step)
+
+ step = "Instantiate KDU {} in k8s cluster {}".format(kdur["kdu-name"], cluster_uuid)
+ self.logger.debug(logging_text + step)
+ for kdu in vnfd.get("kdu"):
+ if kdu.get("name") == kdur["kdu-name"]:
+ break
+ else:
+ raise LcmException("KDU not found with name: {} in VNFD {}".format(kdur["kdu-name"],
+ vnfd["name"]))
+ self.logger.debug(logging_text + step)
+ kdumodel = None
+ k8sclustertype = None
+ if kdu.get("helm-chart"):
+ kdumodel = kdu["helm-chart"]
+ k8sclustertype = "chart"
+ elif kdu.get("juju-bundle"):
+ kdumodel = kdu["juju-bundle"]
+ k8sclustertype = "juju"
+ k8s_instace_info = {"kdu-instance": None, "k8scluster-uuid": cluster_uuid,
+ "vnfr-id": vnfr["id"], "k8scluster-type": k8sclustertype,
+ "kdu-name": kdur["kdu-name"], "kdu-model": kdumodel}
+ db_nsr_update["_admin.deployed.K8s"].append(k8s_instace_info)
+ db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id}, "path": "_admin.deployed.K8s."
+ "{}".format(index)}
+ if k8sclustertype == "chart":
+ task = self.k8sclusterhelm.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
+ atomic=True, params=desc_params,
+ db_dict=db_dict, timeout=300)
+ else:
+ # TODO I need the juju connector in place
+ pass
+ task_list.append(task)
+ index += 1
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ done = None
+ pending = None
+ if len(task_list) > 0:
+ self.logger.debug('Waiting for terminate pending tasks...')
+ done, pending = await asyncio.wait(task_list, timeout=3600)
+ if not pending:
+ for fut in done:
+ k8s_instance = fut.result()
+ k8s_instace_info = {"kdu-instance": k8s_instance, "k8scluster-uuid": cluster_uuid,
+ "vnfr-id": vnfr["id"], "k8scluster-type": k8sclustertype,
+ "kdu-name": kdur["kdu-name"], "kdu-model": kdumodel}
+ vnf_update.append(k8s_instace_info)
+ self.logger.debug('All tasks finished...')
+ else:
+ self.logger.info('There are pending tasks: {}'.format(pending))
+
+ db_nsr_update["_admin.deployed.K8s"] = vnf_update
+ except Exception as e:
+ self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e))
+ raise LcmException("{} Exit Exception {} while '{}': {}".format(logging_text, type(e).__name__, step, e))
+ finally:
+ # TODO Write in data base
+ if db_nsr_update:
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+