+ vnfd = db_vnfds_ref[vnfd_id]
+ member_vnf_index = str(c_vnf["member-vnf-index"])
+ db_vnfr = db_vnfrs[member_vnf_index]
+ base_folder = vnfd["_admin"]["storage"]
+ vdu_id = None
+ vdu_index = 0
+ vdu_name = None
+ kdu_name = None
+
+ # Get additional parameters
+ deploy_params = {}
+ if db_vnfr.get("additionalParamsForVnf"):
+ deploy_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy())
+
+ descriptor_config = vnfd.get("vnf-configuration")
+ if descriptor_config and descriptor_config.get("juju"):
+ self._deploy_n2vc(
+ logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
+ db_nsr=db_nsr,
+ db_vnfr=db_vnfr,
+ nslcmop_id=nslcmop_id,
+ nsr_id=nsr_id,
+ nsi_id=nsi_id,
+ vnfd_id=vnfd_id,
+ vdu_id=vdu_id,
+ kdu_name=kdu_name,
+ member_vnf_index=member_vnf_index,
+ vdu_index=vdu_index,
+ vdu_name=vdu_name,
+ deploy_params=deploy_params,
+ descriptor_config=descriptor_config,
+ base_folder=base_folder,
+ task_instantiation_info=tasks_dict_info,
+ stage=stage
+ )
+
+ # Deploy charms for each VDU that supports one.
+ for vdud in get_iterable(vnfd, 'vdu'):
+ vdu_id = vdud["id"]
+ descriptor_config = vdud.get('vdu-configuration')
+ vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
+ if vdur.get("additionalParams"):
+ deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
+ else:
+ deploy_params_vdu = deploy_params
+ if descriptor_config and descriptor_config.get("juju"):
+ # look for vdu index in the db_vnfr["vdu"] section
+ # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
+ # if vdur["vdu-id-ref"] == vdu_id:
+ # break
+ # else:
+ # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
+ # "member_vnf_index={}".format(vdu_id, member_vnf_index))
+ # vdu_name = vdur.get("name")
+ vdu_name = None
+ kdu_name = None
+ for vdu_index in range(int(vdud.get("count", 1))):
+ # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
+ self._deploy_n2vc(
+ logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
+ member_vnf_index, vdu_id, vdu_index),
+ db_nsr=db_nsr,
+ db_vnfr=db_vnfr,
+ nslcmop_id=nslcmop_id,
+ nsr_id=nsr_id,
+ nsi_id=nsi_id,
+ vnfd_id=vnfd_id,
+ vdu_id=vdu_id,
+ kdu_name=kdu_name,
+ member_vnf_index=member_vnf_index,
+ vdu_index=vdu_index,
+ vdu_name=vdu_name,
+ deploy_params=deploy_params_vdu,
+ descriptor_config=descriptor_config,
+ base_folder=base_folder,
+ task_instantiation_info=tasks_dict_info,
+ stage=stage
+ )
+ for kdud in get_iterable(vnfd, 'kdu'):
+ kdu_name = kdud["name"]
+ descriptor_config = kdud.get('kdu-configuration')
+ if descriptor_config and descriptor_config.get("juju"):
+ vdu_id = None
+ vdu_index = 0
+ vdu_name = None
+ # look for vdu index in the db_vnfr["vdu"] section
+ # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
+ # if vdur["vdu-id-ref"] == vdu_id:
+ # break
+ # else:
+ # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
+ # "member_vnf_index={}".format(vdu_id, member_vnf_index))
+ # vdu_name = vdur.get("name")
+ # vdu_name = None
+
+ self._deploy_n2vc(
+ logging_text=logging_text,
+ db_nsr=db_nsr,
+ db_vnfr=db_vnfr,
+ nslcmop_id=nslcmop_id,
+ nsr_id=nsr_id,
+ nsi_id=nsi_id,
+ vnfd_id=vnfd_id,
+ vdu_id=vdu_id,
+ kdu_name=kdu_name,
+ member_vnf_index=member_vnf_index,
+ vdu_index=vdu_index,
+ vdu_name=vdu_name,
+ deploy_params=deploy_params,
+ descriptor_config=descriptor_config,
+ base_folder=base_folder,
+ task_instantiation_info=tasks_dict_info,
+ stage=stage
+ )
+
+ # Check if this NS has a charm configuration
+ descriptor_config = nsd.get("ns-configuration")
+ if descriptor_config and descriptor_config.get("juju"):
+ vnfd_id = None
+ db_vnfr = None
+ member_vnf_index = None
+ vdu_id = None
+ kdu_name = None
+ vdu_index = 0
+ vdu_name = None
+
+ # Get additional parameters
+ deploy_params = {}
+ if db_nsr.get("additionalParamsForNs"):
+ deploy_params = self._format_additional_params(db_nsr["additionalParamsForNs"].copy())
+ base_folder = nsd["_admin"]["storage"]
+ self._deploy_n2vc(
+ logging_text=logging_text,
+ db_nsr=db_nsr,
+ db_vnfr=db_vnfr,
+ nslcmop_id=nslcmop_id,
+ nsr_id=nsr_id,
+ nsi_id=nsi_id,
+ vnfd_id=vnfd_id,
+ vdu_id=vdu_id,
+ kdu_name=kdu_name,
+ member_vnf_index=member_vnf_index,
+ vdu_index=vdu_index,
+ vdu_name=vdu_name,
+ deploy_params=deploy_params,
+ descriptor_config=descriptor_config,
+ base_folder=base_folder,
+ task_instantiation_info=tasks_dict_info,
+ stage=stage
+ )
+
+ # rest of staff will be done at finally
+
+ except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
+ self.logger.error(logging_text + "Exit Exception while '{}': {}".format(stage[1], e))
+ exc = e
+ except asyncio.CancelledError:
+ self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
+ exc = "Operation was cancelled"
+ except Exception as e:
+ exc = traceback.format_exc()
+ self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
+ finally:
+ if exc:
+ error_list.append(str(exc))
+ try:
+ # wait for pending tasks
+ if tasks_dict_info:
+ stage[1] = "Waiting for instantiate pending tasks."
+ self.logger.debug(logging_text + stage[1])
+ error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_deploy,
+ stage, nslcmop_id, nsr_id=nsr_id)
+ stage[1] = stage[2] = ""
+ except asyncio.CancelledError:
+ error_list.append("Cancelled")
+ # TODO cancel all tasks
+ except Exception as exc:
+ error_list.append(str(exc))
+
+ # update operation-status
+ db_nsr_update["operational-status"] = "running"
+ # let's begin with VCA 'configured' status (later we can change it)
+ db_nsr_update["config-status"] = "configured"
+ for task, task_name in tasks_dict_info.items():
+ if not task.done() or task.cancelled() or task.exception():
+ if task_name.startswith(self.task_name_deploy_vca):
+ # A N2VC task is pending
+ db_nsr_update["config-status"] = "failed"
+ else:
+ # RO or KDU task is pending
+ db_nsr_update["operational-status"] = "failed"
+
+ # update status at database
+ if error_list:
+ error_detail = ". ".join(error_list)
+ self.logger.error(logging_text + error_detail)
+ error_description_nslcmop = 'Stage: {}. Detail: {}'.format(stage[0], error_detail)
+ error_description_nsr = 'Operation: INSTANTIATING.{}, Stage {}'.format(nslcmop_id, stage[0])
+
+ db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
+ db_nslcmop_update["detailed-status"] = error_detail
+ nslcmop_operation_state = "FAILED"
+ ns_state = "BROKEN"
+ else:
+ error_detail = None
+ error_description_nsr = error_description_nslcmop = None
+ ns_state = "READY"
+ db_nsr_update["detailed-status"] = "Done"
+ db_nslcmop_update["detailed-status"] = "Done"
+ nslcmop_operation_state = "COMPLETED"
+
+ if db_nsr:
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=ns_state,
+ current_operation="IDLE",
+ current_operation_id=None,
+ error_description=error_description_nsr,
+ error_detail=error_detail,
+ other_update=db_nsr_update
+ )
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage="",
+ error_message=error_description_nslcmop,
+ operation_state=nslcmop_operation_state,
+ other_update=db_nslcmop_update,
+ )
+
+ if nslcmop_operation_state:
+ try:
+ await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
+ "operationState": nslcmop_operation_state},
+ loop=self.loop)
+ except Exception as e:
+ self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
+
+ self.logger.debug(logging_text + "Exit")
+ self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
+
+ async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int, timeout: int = 3600) -> bool:
+
+ # steps:
+ # 1. find all relations for this VCA
+ # 2. wait for other peers related
+ # 3. add relations
+
+ try:
+
+ # STEP 1: find all relations for this VCA
+
+ # read nsr record
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
+
+ # this VCA data
+ my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
+
+ # read all ns-configuration relations
+ ns_relations = list()
+ db_ns_relations = deep_get(nsd, ('ns-configuration', 'relation'))
+ if db_ns_relations:
+ for r in db_ns_relations:
+ # check if this VCA is in the relation
+ if my_vca.get('member-vnf-index') in\
+ (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
+ ns_relations.append(r)
+
+ # read all vnf-configuration relations
+ vnf_relations = list()
+ db_vnfd_list = db_nsr.get('vnfd-id')
+ if db_vnfd_list:
+ for vnfd in db_vnfd_list:
+ db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
+ db_vnf_relations = deep_get(db_vnfd, ('vnf-configuration', 'relation'))
+ if db_vnf_relations:
+ for r in db_vnf_relations:
+ # check if this VCA is in the relation
+ if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
+ vnf_relations.append(r)
+
+ # if no relations, terminate
+ if not ns_relations and not vnf_relations:
+ self.logger.debug(logging_text + ' No relations')
+ return True
+
+ self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
+
+ # add all relations
+ start = time()
+ while True:
+ # check timeout
+ now = time()
+ if now - start >= timeout:
+ self.logger.error(logging_text + ' : timeout adding relations')
+ return False
+
+ # reload nsr from database (we need to update record: _admin.deloyed.VCA)
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+
+ # for each defined NS relation, find the VCA's related
+ for r in ns_relations:
+ from_vca_ee_id = None
+ to_vca_ee_id = None
+ from_vca_endpoint = None
+ to_vca_endpoint = None
+ vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
+ for vca in vca_list:
+ if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
+ and vca.get('config_sw_installed'):
+ from_vca_ee_id = vca.get('ee_id')
+ from_vca_endpoint = r.get('entities')[0].get('endpoint')
+ if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
+ and vca.get('config_sw_installed'):
+ to_vca_ee_id = vca.get('ee_id')
+ to_vca_endpoint = r.get('entities')[1].get('endpoint')
+ if from_vca_ee_id and to_vca_ee_id:
+ # add relation
+ await self.n2vc.add_relation(
+ ee_id_1=from_vca_ee_id,
+ ee_id_2=to_vca_ee_id,
+ endpoint_1=from_vca_endpoint,
+ endpoint_2=to_vca_endpoint)
+ # remove entry from relations list
+ ns_relations.remove(r)
+ else:
+ # check failed peers
+ try:
+ vca_status_list = db_nsr.get('configurationStatus')
+ if vca_status_list:
+ for i in range(len(vca_list)):
+ vca = vca_list[i]
+ vca_status = vca_status_list[i]
+ if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
+ if vca_status.get('status') == 'BROKEN':
+ # peer broken: remove relation from list
+ ns_relations.remove(r)
+ if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
+ if vca_status.get('status') == 'BROKEN':
+ # peer broken: remove relation from list
+ ns_relations.remove(r)
+ except Exception:
+ # ignore
+ pass
+
+ # for each defined VNF relation, find the VCA's related
+ for r in vnf_relations:
+ from_vca_ee_id = None
+ to_vca_ee_id = None
+ from_vca_endpoint = None
+ to_vca_endpoint = None
+ vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
+ for vca in vca_list:
+ if vca.get('vdu_id') == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
+ from_vca_ee_id = vca.get('ee_id')
+ from_vca_endpoint = r.get('entities')[0].get('endpoint')
+ if vca.get('vdu_id') == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
+ to_vca_ee_id = vca.get('ee_id')
+ to_vca_endpoint = r.get('entities')[1].get('endpoint')
+ if from_vca_ee_id and to_vca_ee_id:
+ # add relation
+ await self.n2vc.add_relation(
+ ee_id_1=from_vca_ee_id,
+ ee_id_2=to_vca_ee_id,
+ endpoint_1=from_vca_endpoint,
+ endpoint_2=to_vca_endpoint)
+ # remove entry from relations list
+ vnf_relations.remove(r)
+ else:
+ # check failed peers
+ try:
+ vca_status_list = db_nsr.get('configurationStatus')
+ if vca_status_list:
+ for i in range(len(vca_list)):
+ vca = vca_list[i]
+ vca_status = vca_status_list[i]
+ if vca.get('vdu_id') == r.get('entities')[0].get('id'):
+ if vca_status.get('status') == 'BROKEN':
+ # peer broken: remove relation from list
+ ns_relations.remove(r)
+ if vca.get('vdu_id') == r.get('entities')[1].get('id'):
+ if vca_status.get('status') == 'BROKEN':
+ # peer broken: remove relation from list
+ ns_relations.remove(r)
+ except Exception:
+ # ignore
+ pass
+
+ # wait for next try
+ await asyncio.sleep(5.0)
+
+ if not ns_relations and not vnf_relations:
+ self.logger.debug('Relations added')
+ break
+
+ return True
+
+ except Exception as e:
+ self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
+ return False
+
+ def _write_db_callback(self, task, item, _id, on_done=None, on_exc=None):
+ """
+ callback for kdu install intended to store the returned kdu_instance at database
+ :return: None
+ """
+ db_update = {}
+ try:
+ result = task.result()
+ if on_done:
+ db_update[on_done] = str(result)
+ except Exception as e:
+ if on_exc:
+ db_update[on_exc] = str(e)
+ if db_update:
+ try:
+ self.update_db_2(item, _id, db_update)
+ except Exception:
+ pass
+
+ async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
+ # Launch kdus if present in the descriptor
+
+ k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
+
+ def _get_cluster_id(cluster_id, cluster_type):
+ nonlocal k8scluster_id_2_uuic
+ if cluster_id in k8scluster_id_2_uuic[cluster_type]:
+ return k8scluster_id_2_uuic[cluster_type][cluster_id]
+
+ db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
+ if not db_k8scluster:
+ raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
+ k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
+ if not k8s_id:
+ raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type))
+ k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
+ return k8s_id
+
+ logging_text += "Deploy kdus: "
+ step = ""
+ try:
+ db_nsr_update = {"_admin.deployed.K8s": []}
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+
+ index = 0
+ updated_cluster_list = []
+
+ for vnfr_data in db_vnfrs.values():
+ for kdur in get_iterable(vnfr_data, "kdur"):
+ desc_params = self._format_additional_params(kdur.get("additionalParams"))
+ vnfd_id = vnfr_data.get('vnfd-id')
+ namespace = kdur.get("k8s-namespace")
+ if kdur.get("helm-chart"):
+ kdumodel = kdur["helm-chart"]
+ k8sclustertype = "helm-chart"
+ elif kdur.get("juju-bundle"):
+ kdumodel = kdur["juju-bundle"]
+ k8sclustertype = "juju-bundle"
+ else:
+ raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
+ "juju-bundle. Maybe an old NBI version is running".
+ format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]))
+ # check if kdumodel is a file and exists
+ try:
+ storage = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage'))
+ if storage and storage.get('pkg-dir'): # may be not present if vnfd has not artifacts
+ # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
+ filename = '{}/{}/{}s/{}'.format(storage["folder"], storage["pkg-dir"], k8sclustertype,
+ kdumodel)
+ if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
+ kdumodel = self.fs.path + filename
+ except (asyncio.TimeoutError, asyncio.CancelledError):
+ raise
+ except Exception: # it is not a file
+ pass
+
+ k8s_cluster_id = kdur["k8s-cluster"]["id"]
+ step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
+ cluster_uuid = _get_cluster_id(k8s_cluster_id, k8sclustertype)
+
+ if k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list:
+ del_repo_list, added_repo_dict = await asyncio.ensure_future(
+ self.k8sclusterhelm.synchronize_repos(cluster_uuid=cluster_uuid))
+ if del_repo_list or added_repo_dict:
+ unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list}
+ updated = {'_admin.helm_charts_added.' +
+ item: name for item, name in added_repo_dict.items()}
+ self.logger.debug(logging_text + "repos synchronized on k8s cluster '{}' to_delete: {}, "
+ "to_add: {}".format(k8s_cluster_id, del_repo_list,
+ added_repo_dict))
+ self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
+ updated_cluster_list.append(cluster_uuid)
+
+ step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
+ kdur["kdu-name"], k8s_cluster_id)
+
+ k8s_instace_info = {"kdu-instance": None,
+ "k8scluster-uuid": cluster_uuid,
+ "k8scluster-type": k8sclustertype,
+ "member-vnf-index": vnfr_data["member-vnf-index-ref"],
+ "kdu-name": kdur["kdu-name"],
+ "kdu-model": kdumodel,
+ "namespace": namespace}
+ db_path = "_admin.deployed.K8s.{}".format(index)
+ db_nsr_update[db_path] = k8s_instace_info
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+
+ db_dict = {"collection": "nsrs",
+ "filter": {"_id": nsr_id},
+ "path": db_path}
+
+ task = asyncio.ensure_future(
+ self.k8scluster_map[k8sclustertype].install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
+ atomic=True, params=desc_params,
+ db_dict=db_dict, timeout=600,
+ kdu_name=kdur["kdu-name"], namespace=namespace))
+
+ task.add_done_callback(partial(self._write_db_callback, item="nsrs", _id=nsr_id,
+ on_done=db_path + ".kdu-instance",
+ on_exc=db_path + ".detailed-status"))
+ self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
+ task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])