from http import HTTPStatus
from time import time
from uuid import uuid4
-from functools import partial
+
from random import randint
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
+
# if we haven't this vnfd, read it from db
if vnfd_id not in db_vnfds:
# read from db
# set state to INSTANTIATED. When instantiated NBI will not delete directly
db_nsr_update["_admin.nsState"] = "INSTANTIATED"
self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"})
# n2vc_redesign STEP 2 Deploy Network Scenario
stage[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
return False
- def _write_db_callback(self, task, item, _id, on_done=None, on_exc=None):
- """
- callback for kdu install intended to store the returned kdu_instance at database
- :return: None
- """
- db_update = {}
+ async def _install_kdu(self, nsr_id: str, nsr_db_path: str, vnfr_data: dict, kdu_index: int, kdur: dict, kdud: dict,
+ vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600):
+
try:
- result = task.result()
- if on_done:
- db_update[on_done] = str(result)
+ k8sclustertype = k8s_instance_info["k8scluster-type"]
+ # Instantiate kdu
+ db_dict_install = {"collection": "nsrs",
+ "filter": {"_id": nsr_id},
+ "path": nsr_db_path}
+
+ kdu_instance = await self.k8scluster_map[k8sclustertype].install(
+ cluster_uuid=k8s_instance_info["k8scluster-uuid"],
+ kdu_model=k8s_instance_info["kdu-model"],
+ atomic=True,
+ params=k8params,
+ db_dict=db_dict_install,
+ timeout=timeout,
+ kdu_name=k8s_instance_info["kdu-name"],
+ namespace=k8s_instance_info["namespace"])
+ self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
+
+ # Obtain services to obtain management service ip
+ services = await self.k8scluster_map[k8sclustertype].get_services(
+ cluster_uuid=k8s_instance_info["k8scluster-uuid"],
+ kdu_instance=kdu_instance,
+ namespace=k8s_instance_info["namespace"])
+
+ # Obtain management service info (if exists)
+ if services:
+ vnfr_update_dict = {"kdur.{}.services".format(kdu_index): services}
+ mgmt_services = [service for service in kdud.get("service", []) if service.get("mgmt-service")]
+ for mgmt_service in mgmt_services:
+ for service in services:
+ if service["name"].startswith(mgmt_service["name"]):
+ # Mgmt service found, Obtain service ip
+ ip = service.get("external_ip", service.get("cluster_ip"))
+ if isinstance(ip, list) and len(ip) == 1:
+ ip = ip[0]
+
+ vnfr_update_dict["kdur.{}.ip-address".format(kdu_index)] = ip
+
+ # Check if must update also mgmt ip at the vnf
+ service_external_cp = mgmt_service.get("external-connection-point-ref")
+ if service_external_cp:
+ if deep_get(vnfd, ("mgmt-interface", "cp")) == service_external_cp:
+ vnfr_update_dict["ip-address"] = ip
+
+ break
+ else:
+ self.logger.warn("Mgmt service name: {} not found".format(mgmt_service["name"]))
+
+ self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
+
except Exception as e:
- if on_exc:
- db_update[on_exc] = str(e)
- if db_update:
+ # Prepare update db with error and raise exception
try:
- self.update_db_2(item, _id, db_update)
+ self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)})
except Exception:
+ # ignore to keep original exception
pass
+ # reraise original error
+ raise
+
+ return kdu_instance
async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
# Launch kdus if present in the descriptor
k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
- def _get_cluster_id(cluster_id, cluster_type):
+ async def _get_cluster_id(cluster_id, cluster_type):
nonlocal k8scluster_id_2_uuic
if cluster_id in k8scluster_id_2_uuic[cluster_type]:
return k8scluster_id_2_uuic[cluster_type][cluster_id]
+ # check if K8scluster is creating and wait look if previous tasks in process
+ task_name, task_dependency = self.lcm_tasks.lookfor_related("k8scluster", cluster_id)
+ if task_dependency:
+ text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(task_name, cluster_id)
+ self.logger.debug(logging_text + text)
+ await asyncio.wait(task_dependency, timeout=3600)
+
db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
if not db_k8scluster:
raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
+
k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
if not k8s_id:
- raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type))
+ raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id,
+ cluster_type))
k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
return k8s_id
updated_cluster_list = []
for vnfr_data in db_vnfrs.values():
- for kdur in get_iterable(vnfr_data, "kdur"):
+ for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
+ # Step 0: Prepare and set parameters
desc_params = self._format_additional_params(kdur.get("additionalParams"))
vnfd_id = vnfr_data.get('vnfd-id')
+ kdud = next(kdud for kdud in db_vnfds[vnfd_id]["kdu"] if kdud["name"] == kdur["kdu-name"])
namespace = kdur.get("k8s-namespace")
if kdur.get("helm-chart"):
kdumodel = kdur["helm-chart"]
k8s_cluster_id = kdur["k8s-cluster"]["id"]
step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
- cluster_uuid = _get_cluster_id(k8s_cluster_id, k8sclustertype)
+ cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
+ # Synchronize repos
if k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list:
del_repo_list, added_repo_dict = await asyncio.ensure_future(
self.k8sclusterhelm.synchronize_repos(cluster_uuid=cluster_uuid))
self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
updated_cluster_list.append(cluster_uuid)
+ # Instantiate kdu
step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
kdur["kdu-name"], k8s_cluster_id)
-
- k8s_instace_info = {"kdu-instance": None,
- "k8scluster-uuid": cluster_uuid,
- "k8scluster-type": k8sclustertype,
- "member-vnf-index": vnfr_data["member-vnf-index-ref"],
- "kdu-name": kdur["kdu-name"],
- "kdu-model": kdumodel,
- "namespace": namespace}
+ k8s_instance_info = {"kdu-instance": None,
+ "k8scluster-uuid": cluster_uuid,
+ "k8scluster-type": k8sclustertype,
+ "member-vnf-index": vnfr_data["member-vnf-index-ref"],
+ "kdu-name": kdur["kdu-name"],
+ "kdu-model": kdumodel,
+ "namespace": namespace}
db_path = "_admin.deployed.K8s.{}".format(index)
- db_nsr_update[db_path] = k8s_instace_info
+ db_nsr_update[db_path] = k8s_instance_info
self.update_db_2("nsrs", nsr_id, db_nsr_update)
- db_dict = {"collection": "nsrs",
- "filter": {"_id": nsr_id},
- "path": db_path}
-
task = asyncio.ensure_future(
- self.k8scluster_map[k8sclustertype].install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
- atomic=True, params=desc_params,
- db_dict=db_dict, timeout=600,
- kdu_name=kdur["kdu-name"], namespace=namespace))
-
- task.add_done_callback(partial(self._write_db_callback, item="nsrs", _id=nsr_id,
- on_done=db_path + ".kdu-instance",
- on_exc=db_path + ".detailed-status"))
+ self._install_kdu(nsr_id, db_path, vnfr_data, kdu_index, kdur, kdud, db_vnfds[vnfd_id],
+ k8s_instance_info, k8params=desc_params, timeout=600))
self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
vca_type = vca.get("type")
exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
vca.get("needed_terminate"))
- # For helm we must destroy_ee
- destroy_ee = "True" if vca_type == "helm" else "False"
+ # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
+ # pending native charms
+ destroy_ee = "True" if vca_type in ("helm", "native_charm") else "False"
task = asyncio.ensure_future(
self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, vca_index,
destroy_ee, exec_terminate_primitives))
operation_state=nslcmop_operation_state,
other_update=db_nslcmop_update,
)
+ if ns_state == "NOT_INSTANTIATED":
+ try:
+ self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "NOT_INSTANTIATED"})
+ except DbException as e:
+ self.logger.warn(logging_text + 'Error writing VNFR status for nsr-id-ref: {} -> {}'.
+ format(nsr_id, e))
if operation_params:
autoremove = operation_params.get("autoremove", False)
if nslcmop_operation_state:
config_primitive_desc = config_primitive
break
- if not config_primitive_desc and not (kdu_name and primitive in ("upgrade", "rollback", "status")):
- raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
- format(primitive))
- primitive_name = config_primitive_desc.get("execution-environment-primitive", primitive)
- ee_descriptor_id = config_primitive_desc.get("execution-environment-ref")
+ if not config_primitive_desc:
+ if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
+ raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
+ format(primitive))
+ primitive_name = primitive
+ ee_descriptor_id = None
+ else:
+ primitive_name = config_primitive_desc.get("execution-environment-primitive", primitive)
+ ee_descriptor_id = config_primitive_desc.get("execution-environment-ref")
if vnf_index:
if vdu_id: