from http import HTTPStatus
from time import time
from uuid import uuid4
-from functools import partial
+
from random import randint
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
return False
- def _write_db_callback(self, task, item, _id, on_done=None, on_exc=None):
- """
- callback for kdu install intended to store the returned kdu_instance at database
- :return: None
- """
- db_update = {}
+ async def _install_kdu(self, nsr_id: str, nsr_db_path: str, vnfr_data: dict, kdu_index: int, kdur: dict, kdud: dict,
+ vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600):
+
try:
- result = task.result()
- if on_done:
- db_update[on_done] = str(result)
+ k8sclustertype = k8s_instance_info["k8scluster-type"]
+ # Instantiate kdu
+ db_dict_install = {"collection": "nsrs",
+ "filter": {"_id": nsr_id},
+ "path": nsr_db_path}
+
+ kdu_instance = await self.k8scluster_map[k8sclustertype].install(
+ cluster_uuid=k8s_instance_info["k8scluster-uuid"],
+ kdu_model=k8s_instance_info["kdu-model"],
+ atomic=True,
+ params=k8params,
+ db_dict=db_dict_install,
+ timeout=timeout,
+ kdu_name=k8s_instance_info["kdu-name"],
+ namespace=k8s_instance_info["namespace"])
+ self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
+
+ # Obtain services to obtain management service ip
+ services = await self.k8scluster_map[k8sclustertype].get_services(
+ cluster_uuid=k8s_instance_info["k8scluster-uuid"],
+ kdu_instance=kdu_instance,
+ namespace=k8s_instance_info["namespace"])
+
+ # Obtain management service info (if exists)
+ if services:
+ vnfr_update_dict = {"kdur.{}.services".format(kdu_index): services}
+ mgmt_services = [service for service in kdud.get("service", []) if service.get("mgmt-service")]
+ for mgmt_service in mgmt_services:
+ for service in services:
+ if service["name"].startswith(mgmt_service["name"]):
+ # Mgmt service found, Obtain service ip
+ ip = service.get("external_ip", service.get("cluster_ip"))
+ if isinstance(ip, list) and len(ip) == 1:
+ ip = ip[0]
+
+ vnfr_update_dict["kdur.{}.ip-address".format(kdu_index)] = ip
+
+ # Check if must update also mgmt ip at the vnf
+ service_external_cp = mgmt_service.get("external-connection-point-ref")
+ if service_external_cp:
+ if deep_get(vnfd, ("mgmt-interface", "cp")) == service_external_cp:
+ vnfr_update_dict["ip-address"] = ip
+
+ break
+ else:
+ self.logger.warn("Mgmt service name: {} not found".format(mgmt_service["name"]))
+
+ self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
+
except Exception as e:
- if on_exc:
- db_update[on_exc] = str(e)
- if db_update:
+ # Prepare update db with error and raise exception
try:
- self.update_db_2(item, _id, db_update)
+ self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)})
except Exception:
+ # ignore to keep original exception
pass
+ # reraise original error
+ raise
+
+ return kdu_instance
async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
# Launch kdus if present in the descriptor
updated_cluster_list = []
for vnfr_data in db_vnfrs.values():
- for kdur in get_iterable(vnfr_data, "kdur"):
+ for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
+ # Step 0: Prepare and set parameters
desc_params = self._format_additional_params(kdur.get("additionalParams"))
vnfd_id = vnfr_data.get('vnfd-id')
+ kdud = next(kdud for kdud in db_vnfds[vnfd_id]["kdu"] if kdud["name"] == kdur["kdu-name"])
namespace = kdur.get("k8s-namespace")
if kdur.get("helm-chart"):
kdumodel = kdur["helm-chart"]
step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
cluster_uuid = _get_cluster_id(k8s_cluster_id, k8sclustertype)
+ # Synchronize repos
if k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list:
del_repo_list, added_repo_dict = await asyncio.ensure_future(
self.k8sclusterhelm.synchronize_repos(cluster_uuid=cluster_uuid))
self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
updated_cluster_list.append(cluster_uuid)
+ # Instantiate kdu
step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
kdur["kdu-name"], k8s_cluster_id)
-
- k8s_instace_info = {"kdu-instance": None,
- "k8scluster-uuid": cluster_uuid,
- "k8scluster-type": k8sclustertype,
- "member-vnf-index": vnfr_data["member-vnf-index-ref"],
- "kdu-name": kdur["kdu-name"],
- "kdu-model": kdumodel,
- "namespace": namespace}
+ k8s_instance_info = {"kdu-instance": None,
+ "k8scluster-uuid": cluster_uuid,
+ "k8scluster-type": k8sclustertype,
+ "member-vnf-index": vnfr_data["member-vnf-index-ref"],
+ "kdu-name": kdur["kdu-name"],
+ "kdu-model": kdumodel,
+ "namespace": namespace}
db_path = "_admin.deployed.K8s.{}".format(index)
- db_nsr_update[db_path] = k8s_instace_info
+ db_nsr_update[db_path] = k8s_instance_info
self.update_db_2("nsrs", nsr_id, db_nsr_update)
- db_dict = {"collection": "nsrs",
- "filter": {"_id": nsr_id},
- "path": db_path}
-
task = asyncio.ensure_future(
- self.k8scluster_map[k8sclustertype].install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
- atomic=True, params=desc_params,
- db_dict=db_dict, timeout=600,
- kdu_name=kdur["kdu-name"], namespace=namespace))
-
- task.add_done_callback(partial(self._write_db_callback, item="nsrs", _id=nsr_id,
- on_done=db_path + ".kdu-instance",
- on_exc=db_path + ".detailed-status"))
+ self._install_kdu(nsr_id, db_path, vnfr_data, kdu_index, kdur, kdud, db_vnfds[vnfd_id],
+ k8s_instance_info, k8params=desc_params, timeout=600))
self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])