+ async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int,
+ timeout: int = 3600, vca_type: str = None) -> bool:
+
+ # steps:
+ # 1. find all relations for this VCA
+ # 2. wait for other peers related
+ # 3. add relations
+
+ try:
+ vca_type = vca_type or "lxc_proxy_charm"
+
+ # STEP 1: find all relations for this VCA
+
+ # read nsr record
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
+
+ # this VCA data
+ my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
+
+ # read all ns-configuration relations
+ ns_relations = list()
+ db_ns_relations = deep_get(nsd, ('ns-configuration', 'relation'))
+ if db_ns_relations:
+ for r in db_ns_relations:
+ # check if this VCA is in the relation
+ if my_vca.get('member-vnf-index') in\
+ (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
+ ns_relations.append(r)
+
+ # read all vnf-configuration relations
+ vnf_relations = list()
+ db_vnfd_list = db_nsr.get('vnfd-id')
+ if db_vnfd_list:
+ for vnfd in db_vnfd_list:
+ db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
+ db_vnf_relations = deep_get(db_vnfd, ('vnf-configuration', 'relation'))
+ if db_vnf_relations:
+ for r in db_vnf_relations:
+ # check if this VCA is in the relation
+ if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
+ vnf_relations.append(r)
+
+ # if no relations, terminate
+ if not ns_relations and not vnf_relations:
+ self.logger.debug(logging_text + ' No relations')
+ return True
+
+ self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
+
+ # add all relations
+ start = time()
+ while True:
+ # check timeout
+ now = time()
+ if now - start >= timeout:
+ self.logger.error(logging_text + ' : timeout adding relations')
+ return False
+
+ # reload nsr from database (we need to update record: _admin.deloyed.VCA)
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+
+ # for each defined NS relation, find the VCA's related
+ for r in ns_relations.copy():
+ from_vca_ee_id = None
+ to_vca_ee_id = None
+ from_vca_endpoint = None
+ to_vca_endpoint = None
+ vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
+ for vca in vca_list:
+ if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
+ and vca.get('config_sw_installed'):
+ from_vca_ee_id = vca.get('ee_id')
+ from_vca_endpoint = r.get('entities')[0].get('endpoint')
+ if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
+ and vca.get('config_sw_installed'):
+ to_vca_ee_id = vca.get('ee_id')
+ to_vca_endpoint = r.get('entities')[1].get('endpoint')
+ if from_vca_ee_id and to_vca_ee_id:
+ # add relation
+ await self.vca_map[vca_type].add_relation(
+ ee_id_1=from_vca_ee_id,
+ ee_id_2=to_vca_ee_id,
+ endpoint_1=from_vca_endpoint,
+ endpoint_2=to_vca_endpoint)
+ # remove entry from relations list
+ ns_relations.remove(r)
+ else:
+ # check failed peers
+ try:
+ vca_status_list = db_nsr.get('configurationStatus')
+ if vca_status_list:
+ for i in range(len(vca_list)):
+ vca = vca_list[i]
+ vca_status = vca_status_list[i]
+ if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
+ if vca_status.get('status') == 'BROKEN':
+ # peer broken: remove relation from list
+ ns_relations.remove(r)
+ if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
+ if vca_status.get('status') == 'BROKEN':
+ # peer broken: remove relation from list
+ ns_relations.remove(r)
+ except Exception:
+ # ignore
+ pass
+
+ # for each defined VNF relation, find the VCA's related
+ for r in vnf_relations.copy():
+ from_vca_ee_id = None
+ to_vca_ee_id = None
+ from_vca_endpoint = None
+ to_vca_endpoint = None
+ vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
+ for vca in vca_list:
+ key_to_check = "vdu_id"
+ if vca.get("vdu_id") is None:
+ key_to_check = "vnfd_id"
+ if vca.get(key_to_check) == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
+ from_vca_ee_id = vca.get('ee_id')
+ from_vca_endpoint = r.get('entities')[0].get('endpoint')
+ if vca.get(key_to_check) == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
+ to_vca_ee_id = vca.get('ee_id')
+ to_vca_endpoint = r.get('entities')[1].get('endpoint')
+ if from_vca_ee_id and to_vca_ee_id:
+ # add relation
+ await self.vca_map[vca_type].add_relation(
+ ee_id_1=from_vca_ee_id,
+ ee_id_2=to_vca_ee_id,
+ endpoint_1=from_vca_endpoint,
+ endpoint_2=to_vca_endpoint)
+ # remove entry from relations list
+ vnf_relations.remove(r)
+ else:
+ # check failed peers
+ try:
+ vca_status_list = db_nsr.get('configurationStatus')
+ if vca_status_list:
+ for i in range(len(vca_list)):
+ vca = vca_list[i]
+ vca_status = vca_status_list[i]
+ if vca.get('vdu_id') == r.get('entities')[0].get('id'):
+ if vca_status.get('status') == 'BROKEN':
+ # peer broken: remove relation from list
+ vnf_relations.remove(r)
+ if vca.get('vdu_id') == r.get('entities')[1].get('id'):
+ if vca_status.get('status') == 'BROKEN':
+ # peer broken: remove relation from list
+ vnf_relations.remove(r)
+ except Exception:
+ # ignore
+ pass
+
+ # wait for next try
+ await asyncio.sleep(5.0)
+
+ if not ns_relations and not vnf_relations:
+ self.logger.debug('Relations added')
+ break
+
+ return True
+
+ except Exception as e:
+ self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
+ return False
+
+ async def _install_kdu(self, nsr_id: str, nsr_db_path: str, vnfr_data: dict, kdu_index: int, kdud: dict,
+ vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600):
+
+ try:
+ k8sclustertype = k8s_instance_info["k8scluster-type"]
+ # Instantiate kdu
+ db_dict_install = {"collection": "nsrs",
+ "filter": {"_id": nsr_id},
+ "path": nsr_db_path}
+
+ kdu_instance = await self.k8scluster_map[k8sclustertype].install(
+ cluster_uuid=k8s_instance_info["k8scluster-uuid"],
+ kdu_model=k8s_instance_info["kdu-model"],
+ atomic=True,
+ params=k8params,
+ db_dict=db_dict_install,
+ timeout=timeout,
+ kdu_name=k8s_instance_info["kdu-name"],
+ namespace=k8s_instance_info["namespace"])
+ self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
+
+ # Obtain services to obtain management service ip
+ services = await self.k8scluster_map[k8sclustertype].get_services(
+ cluster_uuid=k8s_instance_info["k8scluster-uuid"],
+ kdu_instance=kdu_instance,
+ namespace=k8s_instance_info["namespace"])
+
+ # Obtain management service info (if exists)
+ vnfr_update_dict = {}
+ if services:
+ vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
+ mgmt_services = [service for service in kdud.get("service", []) if service.get("mgmt-service")]
+ for mgmt_service in mgmt_services:
+ for service in services:
+ if service["name"].startswith(mgmt_service["name"]):
+ # Mgmt service found, Obtain service ip
+ ip = service.get("external_ip", service.get("cluster_ip"))
+ if isinstance(ip, list) and len(ip) == 1:
+ ip = ip[0]
+
+ vnfr_update_dict["kdur.{}.ip-address".format(kdu_index)] = ip
+
+ # Check if must update also mgmt ip at the vnf
+ service_external_cp = mgmt_service.get("external-connection-point-ref")
+ if service_external_cp:
+ if deep_get(vnfd, ("mgmt-interface", "cp")) == service_external_cp:
+ vnfr_update_dict["ip-address"] = ip
+
+ break
+ else:
+ self.logger.warn("Mgmt service name: {} not found".format(mgmt_service["name"]))
+
+ vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
+ self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
+
+ kdu_config = kdud.get("kdu-configuration")
+ if kdu_config and kdu_config.get("initial-config-primitive") and kdu_config.get("juju") is None:
+ initial_config_primitive_list = kdu_config.get("initial-config-primitive")
+ initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
+
+ for initial_config_primitive in initial_config_primitive_list:
+ primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, {})
+
+ await asyncio.wait_for(
+ self.k8scluster_map[k8sclustertype].exec_primitive(
+ cluster_uuid=k8s_instance_info["k8scluster-uuid"],
+ kdu_instance=kdu_instance,
+ primitive_name=initial_config_primitive["name"],
+ params=primitive_params_, db_dict={}),
+ timeout=timeout)
+
+ except Exception as e:
+ # Prepare update db with error and raise exception
+ try:
+ self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)})
+ self.update_db_2("vnfrs", vnfr_data.get("_id"), {"kdur.{}.status".format(kdu_index): "ERROR"})
+ except Exception:
+ # ignore to keep original exception
+ pass
+ # reraise original error
+ raise
+
+ return kdu_instance
+
+ async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):