+ # Update status for a sub-operation given its index
+ def _update_suboperation_status(
+ self, db_nslcmop, op_index, operationState, detailed_status
+ ):
+ # Update DB for HA tasks
+ q_filter = {"_id": db_nslcmop["_id"]}
+ update_dict = {
+ "_admin.operations.{}.operationState".format(op_index): operationState,
+ "_admin.operations.{}.detailed-status".format(op_index): detailed_status,
+ }
+ self.db.set_one(
+ "nslcmops", q_filter=q_filter, update_dict=update_dict, fail_on_empty=False
+ )
+
+ # Add sub-operation, return the index of the added sub-operation
+ # Optionally, set operationState, detailed-status, and operationType
+ # Status and type are currently set for 'scale' sub-operations:
+ # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
+ # 'detailed-status' : status message
+ # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
+ # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
+ def _add_suboperation(
+ self,
+ db_nslcmop,
+ vnf_index,
+ vdu_id,
+ vdu_count_index,
+ vdu_name,
+ primitive,
+ mapped_primitive_params,
+ operationState=None,
+ detailed_status=None,
+ operationType=None,
+ RO_nsr_id=None,
+ RO_scaling_info=None,
+ ):
+ if not db_nslcmop:
+ return self.SUBOPERATION_STATUS_NOT_FOUND
+ # Get the "_admin.operations" list, if it exists
+ db_nslcmop_admin = db_nslcmop.get("_admin", {})
+ op_list = db_nslcmop_admin.get("operations")
+ # Create or append to the "_admin.operations" list
+ new_op = {
+ "member_vnf_index": vnf_index,
+ "vdu_id": vdu_id,
+ "vdu_count_index": vdu_count_index,
+ "primitive": primitive,
+ "primitive_params": mapped_primitive_params,
+ }
+ if operationState:
+ new_op["operationState"] = operationState
+ if detailed_status:
+ new_op["detailed-status"] = detailed_status
+ if operationType:
+ new_op["lcmOperationType"] = operationType
+ if RO_nsr_id:
+ new_op["RO_nsr_id"] = RO_nsr_id
+ if RO_scaling_info:
+ new_op["RO_scaling_info"] = RO_scaling_info
+ if not op_list:
+ # No existing operations, create key 'operations' with current operation as first list element
+ db_nslcmop_admin.update({"operations": [new_op]})
+ op_list = db_nslcmop_admin.get("operations")
+ else:
+ # Existing operations, append operation to list
+ op_list.append(new_op)
+
+ db_nslcmop_update = {"_admin.operations": op_list}
+ self.update_db_2("nslcmops", db_nslcmop["_id"], db_nslcmop_update)
+ op_index = len(op_list) - 1
+ return op_index
+
+ # Helper methods for scale() sub-operations
+
+ # pre-scale/post-scale:
+ # Check for 3 different cases:
+ # a. New: First time execution, return SUBOPERATION_STATUS_NEW
+ # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
+ # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
+ def _check_or_add_scale_suboperation(
+ self,
+ db_nslcmop,
+ vnf_index,
+ vnf_config_primitive,
+ primitive_params,
+ operationType,
+ RO_nsr_id=None,
+ RO_scaling_info=None,
+ ):
+ # Find this sub-operation
+ if RO_nsr_id and RO_scaling_info:
+ operationType = "SCALE-RO"
+ match = {
+ "member_vnf_index": vnf_index,
+ "RO_nsr_id": RO_nsr_id,
+ "RO_scaling_info": RO_scaling_info,
+ }
+ else:
+ match = {
+ "member_vnf_index": vnf_index,
+ "primitive": vnf_config_primitive,
+ "primitive_params": primitive_params,
+ "lcmOperationType": operationType,
+ }
+ op_index = self._find_suboperation(db_nslcmop, match)
+ if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
+ # a. New sub-operation
+ # The sub-operation does not exist, add it.
+ # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
+ # The following parameters are set to None for all kind of scaling:
+ vdu_id = None
+ vdu_count_index = None
+ vdu_name = None
+ if RO_nsr_id and RO_scaling_info:
+ vnf_config_primitive = None
+ primitive_params = None
+ else:
+ RO_nsr_id = None
+ RO_scaling_info = None
+ # Initial status for sub-operation
+ operationState = "PROCESSING"
+ detailed_status = "In progress"
+ # Add sub-operation for pre/post-scaling (zero or more operations)
+ self._add_suboperation(
+ db_nslcmop,
+ vnf_index,
+ vdu_id,
+ vdu_count_index,
+ vdu_name,
+ vnf_config_primitive,
+ primitive_params,
+ operationState,
+ detailed_status,
+ operationType,
+ RO_nsr_id,
+ RO_scaling_info,
+ )
+ return self.SUBOPERATION_STATUS_NEW
+ else:
+ # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
+ # or op_index (operationState != 'COMPLETED')
+ return self._retry_or_skip_suboperation(db_nslcmop, op_index)
+
+ # Function to return execution_environment id
+
+ def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
+ # TODO vdu_index_count
+ for vca in vca_deployed_list:
+ if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
+ return vca["ee_id"]
+
+ async def destroy_N2VC(
+ self,
+ logging_text,
+ db_nslcmop,
+ vca_deployed,
+ config_descriptor,
+ vca_index,
+ destroy_ee=True,
+ exec_primitives=True,
+ scaling_in=False,
+ vca_id: str = None,
+ ):
+ """
+ Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
+ :param logging_text:
+ :param db_nslcmop:
+ :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
+ :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
+ :param vca_index: index in the database _admin.deployed.VCA
+ :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
+ :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
+ not executed properly
+ :param scaling_in: True destroys the application, False destroys the model
+ :return: None or exception
+ """
+
+ self.logger.debug(
+ logging_text
+ + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
+ vca_index, vca_deployed, config_descriptor, destroy_ee
+ )
+ )
+
+ vca_type = vca_deployed.get("type", "lxc_proxy_charm")
+
+ # execute terminate_primitives
+ if exec_primitives:
+ terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
+ config_descriptor.get("terminate-config-primitive"),
+ vca_deployed.get("ee_descriptor_id"),
+ )
+ vdu_id = vca_deployed.get("vdu_id")
+ vdu_count_index = vca_deployed.get("vdu_count_index")
+ vdu_name = vca_deployed.get("vdu_name")
+ vnf_index = vca_deployed.get("member-vnf-index")
+ if terminate_primitives and vca_deployed.get("needed_terminate"):
+ for seq in terminate_primitives:
+ # For each sequence in list, get primitive and call _ns_execute_primitive()
+ step = "Calling terminate action for vnf_member_index={} primitive={}".format(
+ vnf_index, seq.get("name")
+ )
+ self.logger.debug(logging_text + step)
+ # Create the primitive for each sequence, i.e. "primitive": "touch"
+ primitive = seq.get("name")
+ mapped_primitive_params = self._get_terminate_primitive_params(
+ seq, vnf_index
+ )
+
+ # Add sub-operation
+ self._add_suboperation(
+ db_nslcmop,
+ vnf_index,
+ vdu_id,
+ vdu_count_index,
+ vdu_name,
+ primitive,
+ mapped_primitive_params,
+ )
+ # Sub-operations: Call _ns_execute_primitive() instead of action()
+ try:
+ result, result_detail = await self._ns_execute_primitive(
+ vca_deployed["ee_id"],
+ primitive,
+ mapped_primitive_params,
+ vca_type=vca_type,
+ vca_id=vca_id,
+ )
+ except LcmException:
+ # this happens when VCA is not deployed. In this case it is not needed to terminate
+ continue
+ result_ok = ["COMPLETED", "PARTIALLY_COMPLETED"]
+ if result not in result_ok:
+ raise LcmException(
+ "terminate_primitive {} for vnf_member_index={} fails with "
+ "error {}".format(seq.get("name"), vnf_index, result_detail)
+ )
+ # set that this VCA do not need terminated
+ db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(
+ vca_index
+ )
+ self.update_db_2(
+ "nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}
+ )
+
+ # Delete Prometheus Jobs if any
+ # This uses NSR_ID, so it will destroy any jobs under this index
+ self.db.del_list("prometheus_jobs", {"nsr_id": db_nslcmop["nsInstanceId"]})
+
+ if destroy_ee:
+ await self.vca_map[vca_type].delete_execution_environment(
+ vca_deployed["ee_id"],
+ scaling_in=scaling_in,
+ vca_type=vca_type,
+ vca_id=vca_id,
+ )
+
+ async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None):
+ self._write_all_config_status(db_nsr=db_nsr, status="TERMINATING")
+ namespace = "." + db_nsr["_id"]
+ try:
+ await self.n2vc.delete_namespace(
+ namespace=namespace,
+ total_timeout=self.timeout_charm_delete,
+ vca_id=vca_id,
+ )
+ except N2VCNotFound: # already deleted. Skip
+ pass
+ self._write_all_config_status(db_nsr=db_nsr, status="DELETED")
+
+ async def _terminate_RO(
+ self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
+ ):
+ """
+ Terminates a deployment from RO
+ :param logging_text:
+ :param nsr_deployed: db_nsr._admin.deployed
+ :param nsr_id:
+ :param nslcmop_id:
+ :param stage: list of string with the content to write on db_nslcmop.detailed-status.
+ this method will update only the index 2, but it will write on database the concatenated content of the list
+ :return:
+ """
+ db_nsr_update = {}
+ failed_detail = []
+ ro_nsr_id = ro_delete_action = None
+ if nsr_deployed and nsr_deployed.get("RO"):
+ ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
+ ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
+ try:
+ if ro_nsr_id:
+ stage[2] = "Deleting ns from VIM."
+ db_nsr_update["detailed-status"] = " ".join(stage)
+ self._write_op_status(nslcmop_id, stage)
+ self.logger.debug(logging_text + stage[2])
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self._write_op_status(nslcmop_id, stage)
+ desc = await self.RO.delete("ns", ro_nsr_id)
+ ro_delete_action = desc["action_id"]
+ db_nsr_update[
+ "_admin.deployed.RO.nsr_delete_action_id"
+ ] = ro_delete_action
+ db_nsr_update["_admin.deployed.RO.nsr_id"] = None
+ db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
+ if ro_delete_action:
+ # wait until NS is deleted from VIM
+ stage[2] = "Waiting ns deleted from VIM."
+ detailed_status_old = None
+ self.logger.debug(
+ logging_text
+ + stage[2]
+ + " RO_id={} ro_delete_action={}".format(
+ ro_nsr_id, ro_delete_action
+ )
+ )
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self._write_op_status(nslcmop_id, stage)
+
+ delete_timeout = 20 * 60 # 20 minutes
+ while delete_timeout > 0:
+ desc = await self.RO.show(
+ "ns",
+ item_id_name=ro_nsr_id,
+ extra_item="action",
+ extra_item_id=ro_delete_action,
+ )
+
+ # deploymentStatus
+ self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
+
+ ns_status, ns_status_info = self.RO.check_action_status(desc)
+ if ns_status == "ERROR":
+ raise ROclient.ROClientException(ns_status_info)
+ elif ns_status == "BUILD":
+ stage[2] = "Deleting from VIM {}".format(ns_status_info)
+ elif ns_status == "ACTIVE":
+ db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
+ db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
+ break
+ else:
+ assert (
+ False
+ ), "ROclient.check_action_status returns unknown {}".format(
+ ns_status
+ )
+ if stage[2] != detailed_status_old:
+ detailed_status_old = stage[2]
+ db_nsr_update["detailed-status"] = " ".join(stage)
+ self._write_op_status(nslcmop_id, stage)
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ await asyncio.sleep(5, loop=self.loop)
+ delete_timeout -= 5
+ else: # delete_timeout <= 0:
+ raise ROclient.ROClientException(
+ "Timeout waiting ns deleted from VIM"
+ )
+
+ except Exception as e:
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ if (
+ isinstance(e, ROclient.ROClientException) and e.http_code == 404
+ ): # not found
+ db_nsr_update["_admin.deployed.RO.nsr_id"] = None
+ db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
+ db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
+ self.logger.debug(
+ logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id)
+ )
+ elif (
+ isinstance(e, ROclient.ROClientException) and e.http_code == 409
+ ): # conflict
+ failed_detail.append("delete conflict: {}".format(e))
+ self.logger.debug(
+ logging_text
+ + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e)
+ )
+ else:
+ failed_detail.append("delete error: {}".format(e))
+ self.logger.error(
+ logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e)
+ )
+
+ # Delete nsd
+ if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
+ ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
+ try:
+ stage[2] = "Deleting nsd from RO."
+ db_nsr_update["detailed-status"] = " ".join(stage)
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self._write_op_status(nslcmop_id, stage)
+ await self.RO.delete("nsd", ro_nsd_id)
+ self.logger.debug(
+ logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id)
+ )
+ db_nsr_update["_admin.deployed.RO.nsd_id"] = None
+ except Exception as e:
+ if (
+ isinstance(e, ROclient.ROClientException) and e.http_code == 404
+ ): # not found
+ db_nsr_update["_admin.deployed.RO.nsd_id"] = None
+ self.logger.debug(
+ logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id)
+ )
+ elif (
+ isinstance(e, ROclient.ROClientException) and e.http_code == 409
+ ): # conflict
+ failed_detail.append(
+ "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e)
+ )
+ self.logger.debug(logging_text + failed_detail[-1])
+ else:
+ failed_detail.append(
+ "ro_nsd_id={} delete error: {}".format(ro_nsd_id, e)
+ )
+ self.logger.error(logging_text + failed_detail[-1])
+
+ if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
+ for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
+ if not vnf_deployed or not vnf_deployed["id"]:
+ continue
+ try:
+ ro_vnfd_id = vnf_deployed["id"]
+ stage[
+ 2
+ ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
+ vnf_deployed["member-vnf-index"], ro_vnfd_id
+ )
+ db_nsr_update["detailed-status"] = " ".join(stage)
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self._write_op_status(nslcmop_id, stage)
+ await self.RO.delete("vnfd", ro_vnfd_id)
+ self.logger.debug(
+ logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id)
+ )
+ db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
+ except Exception as e:
+ if (
+ isinstance(e, ROclient.ROClientException) and e.http_code == 404
+ ): # not found
+ db_nsr_update[
+ "_admin.deployed.RO.vnfd.{}.id".format(index)
+ ] = None
+ self.logger.debug(
+ logging_text
+ + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id)
+ )
+ elif (
+ isinstance(e, ROclient.ROClientException) and e.http_code == 409
+ ): # conflict
+ failed_detail.append(
+ "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e)
+ )
+ self.logger.debug(logging_text + failed_detail[-1])
+ else:
+ failed_detail.append(
+ "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e)
+ )
+ self.logger.error(logging_text + failed_detail[-1])
+
+ if failed_detail:
+ stage[2] = "Error deleting from VIM"
+ else:
+ stage[2] = "Deleted from VIM"
+ db_nsr_update["detailed-status"] = " ".join(stage)
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self._write_op_status(nslcmop_id, stage)
+
+ if failed_detail:
+ raise LcmException("; ".join(failed_detail))
+
+ async def terminate(self, nsr_id, nslcmop_id):
+ # Try to lock HA task here
+ task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
+ if not task_is_locked_by_me:
+ return
+
+ logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
+ self.logger.debug(logging_text + "Enter")
+ timeout_ns_terminate = self.timeout_ns_terminate
+ db_nsr = None
+ db_nslcmop = None
+ operation_params = None
+ exc = None
+ error_list = [] # annotates all failed error messages
+ db_nslcmop_update = {}
+ autoremove = False # autoremove after terminated
+ tasks_dict_info = {}
+ db_nsr_update = {}
+ stage = [
+ "Stage 1/3: Preparing task.",
+ "Waiting for previous operations to terminate.",
+ "",
+ ]
+ # ^ contains [stage, step, VIM-status]
+ try:
+ # wait for any previous tasks in process
+ await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
+
+ stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
+ db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ operation_params = db_nslcmop.get("operationParams") or {}
+ if operation_params.get("timeout_ns_terminate"):
+ timeout_ns_terminate = operation_params["timeout_ns_terminate"]
+ stage[1] = "Getting nsr={} from db.".format(nsr_id)
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+
+ db_nsr_update["operational-status"] = "terminating"
+ db_nsr_update["config-status"] = "terminating"
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state="TERMINATING",
+ current_operation="TERMINATING",
+ current_operation_id=nslcmop_id,
+ other_update=db_nsr_update,
+ )
+ self._write_op_status(op_id=nslcmop_id, queuePosition=0, stage=stage)
+ nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
+ if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
+ return
+
+ stage[1] = "Getting vnf descriptors from db."
+ db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
+ db_vnfrs_dict = {
+ db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list
+ }
+ db_vnfds_from_id = {}
+ db_vnfds_from_member_index = {}
+ # Loop over VNFRs
+ for vnfr in db_vnfrs_list:
+ vnfd_id = vnfr["vnfd-id"]
+ if vnfd_id not in db_vnfds_from_id:
+ vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
+ db_vnfds_from_id[vnfd_id] = vnfd
+ db_vnfds_from_member_index[
+ vnfr["member-vnf-index-ref"]
+ ] = db_vnfds_from_id[vnfd_id]
+
+ # Destroy individual execution environments when there are terminating primitives.
+ # Rest of EE will be deleted at once
+ # TODO - check before calling _destroy_N2VC
+ # if not operation_params.get("skip_terminate_primitives"):#
+ # or not vca.get("needed_terminate"):
+ stage[0] = "Stage 2/3 execute terminating primitives."
+ self.logger.debug(logging_text + stage[0])
+ stage[1] = "Looking execution environment that needs terminate."
+ self.logger.debug(logging_text + stage[1])
+
+ for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
+ config_descriptor = None
+ vca_member_vnf_index = vca.get("member-vnf-index")
+ vca_id = self.get_vca_id(
+ db_vnfrs_dict.get(vca_member_vnf_index)
+ if vca_member_vnf_index
+ else None,
+ db_nsr,
+ )
+ if not vca or not vca.get("ee_id"):
+ continue
+ if not vca.get("member-vnf-index"):
+ # ns
+ config_descriptor = db_nsr.get("ns-configuration")
+ elif vca.get("vdu_id"):
+ db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
+ config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
+ elif vca.get("kdu_name"):
+ db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
+ config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
+ else:
+ db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
+ config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
+ vca_type = vca.get("type")
+ exec_terminate_primitives = not operation_params.get(
+ "skip_terminate_primitives"
+ ) and vca.get("needed_terminate")
+ # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
+ # pending native charms
+ destroy_ee = (
+ True if vca_type in ("helm", "helm-v3", "native_charm") else False
+ )
+ # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
+ # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
+ task = asyncio.ensure_future(
+ self.destroy_N2VC(
+ logging_text,
+ db_nslcmop,
+ vca,
+ config_descriptor,
+ vca_index,
+ destroy_ee,
+ exec_terminate_primitives,
+ vca_id=vca_id,
+ )
+ )
+ tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
+
+ # wait for pending tasks of terminate primitives
+ if tasks_dict_info:
+ self.logger.debug(
+ logging_text
+ + "Waiting for tasks {}".format(list(tasks_dict_info.keys()))
+ )
+ error_list = await self._wait_for_tasks(
+ logging_text,
+ tasks_dict_info,
+ min(self.timeout_charm_delete, timeout_ns_terminate),
+ stage,
+ nslcmop_id,
+ )
+ tasks_dict_info.clear()
+ if error_list:
+ return # raise LcmException("; ".join(error_list))
+
+ # remove All execution environments at once
+ stage[0] = "Stage 3/3 delete all."
+
+ if nsr_deployed.get("VCA"):
+ stage[1] = "Deleting all execution environments."
+ self.logger.debug(logging_text + stage[1])
+ vca_id = self.get_vca_id({}, db_nsr)
+ task_delete_ee = asyncio.ensure_future(
+ asyncio.wait_for(
+ self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
+ timeout=self.timeout_charm_delete,
+ )
+ )
+ # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
+ tasks_dict_info[task_delete_ee] = "Terminating all VCA"
+
+ # Delete from k8scluster
+ stage[1] = "Deleting KDUs."
+ self.logger.debug(logging_text + stage[1])
+ # print(nsr_deployed)
+ for kdu in get_iterable(nsr_deployed, "K8s"):
+ if not kdu or not kdu.get("kdu-instance"):
+ continue
+ kdu_instance = kdu.get("kdu-instance")
+ if kdu.get("k8scluster-type") in self.k8scluster_map:
+ # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
+ vca_id = self.get_vca_id({}, db_nsr)
+ task_delete_kdu_instance = asyncio.ensure_future(
+ self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
+ cluster_uuid=kdu.get("k8scluster-uuid"),
+ kdu_instance=kdu_instance,
+ vca_id=vca_id,
+ namespace=kdu.get("namespace"),
+ )
+ )
+ else:
+ self.logger.error(
+ logging_text
+ + "Unknown k8s deployment type {}".format(
+ kdu.get("k8scluster-type")
+ )
+ )
+ continue
+ tasks_dict_info[
+ task_delete_kdu_instance
+ ] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
+
+ # remove from RO
+ stage[1] = "Deleting ns from VIM."
+ if self.ng_ro:
+ task_delete_ro = asyncio.ensure_future(
+ self._terminate_ng_ro(
+ logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
+ )
+ )
+ else:
+ task_delete_ro = asyncio.ensure_future(
+ self._terminate_RO(
+ logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
+ )
+ )
+ tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
+
+ # rest of staff will be done at finally
+
+ except (
+ ROclient.ROClientException,
+ DbException,
+ LcmException,
+ N2VCException,
+ ) as e:
+ self.logger.error(logging_text + "Exit Exception {}".format(e))
+ exc = e
+ except asyncio.CancelledError:
+ self.logger.error(
+ logging_text + "Cancelled Exception while '{}'".format(stage[1])
+ )
+ exc = "Operation was cancelled"
+ except Exception as e:
+ exc = traceback.format_exc()
+ self.logger.critical(
+ logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
+ exc_info=True,
+ )
+ finally:
+ if exc:
+ error_list.append(str(exc))
+ try:
+ # wait for pending tasks
+ if tasks_dict_info:
+ stage[1] = "Waiting for terminate pending tasks."
+ self.logger.debug(logging_text + stage[1])
+ error_list += await self._wait_for_tasks(
+ logging_text,
+ tasks_dict_info,
+ timeout_ns_terminate,
+ stage,
+ nslcmop_id,
+ )
+ stage[1] = stage[2] = ""
+ except asyncio.CancelledError:
+ error_list.append("Cancelled")
+ # TODO cancell all tasks
+ except Exception as exc:
+ error_list.append(str(exc))
+ # update status at database
+ if error_list:
+ error_detail = "; ".join(error_list)
+ # self.logger.error(logging_text + error_detail)
+ error_description_nslcmop = "{} Detail: {}".format(
+ stage[0], error_detail
+ )
+ error_description_nsr = "Operation: TERMINATING.{}, {}.".format(
+ nslcmop_id, stage[0]
+ )
+
+ db_nsr_update["operational-status"] = "failed"
+ db_nsr_update["detailed-status"] = (
+ error_description_nsr + " Detail: " + error_detail
+ )
+ db_nslcmop_update["detailed-status"] = error_detail
+ nslcmop_operation_state = "FAILED"
+ ns_state = "BROKEN"
+ else:
+ error_detail = None
+ error_description_nsr = error_description_nslcmop = None
+ ns_state = "NOT_INSTANTIATED"
+ db_nsr_update["operational-status"] = "terminated"
+ db_nsr_update["detailed-status"] = "Done"
+ db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
+ db_nslcmop_update["detailed-status"] = "Done"
+ nslcmop_operation_state = "COMPLETED"
+
+ if db_nsr:
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=ns_state,
+ current_operation="IDLE",
+ current_operation_id=None,
+ error_description=error_description_nsr,
+ error_detail=error_detail,
+ other_update=db_nsr_update,
+ )
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage="",
+ error_message=error_description_nslcmop,
+ operation_state=nslcmop_operation_state,
+ other_update=db_nslcmop_update,
+ )
+ if ns_state == "NOT_INSTANTIATED":
+ try:
+ self.db.set_list(
+ "vnfrs",
+ {"nsr-id-ref": nsr_id},
+ {"_admin.nsState": "NOT_INSTANTIATED"},
+ )
+ except DbException as e:
+ self.logger.warn(
+ logging_text
+ + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
+ nsr_id, e
+ )
+ )
+ if operation_params:
+ autoremove = operation_params.get("autoremove", False)
+ if nslcmop_operation_state:
+ try:
+ await self.msg.aiowrite(
+ "ns",
+ "terminated",
+ {
+ "nsr_id": nsr_id,
+ "nslcmop_id": nslcmop_id,
+ "operationState": nslcmop_operation_state,
+ "autoremove": autoremove,
+ },
+ loop=self.loop,
+ )
+ except Exception as e:
+ self.logger.error(
+ logging_text + "kafka_write notification Exception {}".format(e)
+ )
+
+ self.logger.debug(logging_text + "Exit")
+ self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
+
+ async def _wait_for_tasks(
+ self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None
+ ):
+ time_start = time()
+ error_detail_list = []
+ error_list = []
+ pending_tasks = list(created_tasks_info.keys())
+ num_tasks = len(pending_tasks)
+ num_done = 0
+ stage[1] = "{}/{}.".format(num_done, num_tasks)
+ self._write_op_status(nslcmop_id, stage)
+ while pending_tasks:
+ new_error = None
+ _timeout = timeout + time_start - time()
+ done, pending_tasks = await asyncio.wait(
+ pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
+ )
+ num_done += len(done)
+ if not done: # Timeout
+ for task in pending_tasks:
+ new_error = created_tasks_info[task] + ": Timeout"
+ error_detail_list.append(new_error)
+ error_list.append(new_error)
+ break
+ for task in done:
+ if task.cancelled():
+ exc = "Cancelled"
+ else:
+ exc = task.exception()
+ if exc:
+ if isinstance(exc, asyncio.TimeoutError):
+ exc = "Timeout"
+ new_error = created_tasks_info[task] + ": {}".format(exc)
+ error_list.append(created_tasks_info[task])
+ error_detail_list.append(new_error)
+ if isinstance(
+ exc,
+ (
+ str,
+ DbException,
+ N2VCException,
+ ROclient.ROClientException,
+ LcmException,
+ K8sException,
+ NgRoException,
+ ),
+ ):
+ self.logger.error(logging_text + new_error)
+ else:
+ exc_traceback = "".join(
+ traceback.format_exception(None, exc, exc.__traceback__)
+ )
+ self.logger.error(
+ logging_text
+ + created_tasks_info[task]
+ + " "
+ + exc_traceback
+ )
+ else:
+ self.logger.debug(
+ logging_text + created_tasks_info[task] + ": Done"
+ )
+ stage[1] = "{}/{}.".format(num_done, num_tasks)
+ if new_error:
+ stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
+ if nsr_id: # update also nsr
+ self.update_db_2(
+ "nsrs",
+ nsr_id,
+ {
+ "errorDescription": "Error at: " + ", ".join(error_list),
+ "errorDetail": ". ".join(error_detail_list),
+ },
+ )
+ self._write_op_status(nslcmop_id, stage)
+ return error_detail_list
+
+ @staticmethod
+ def _map_primitive_params(primitive_desc, params, instantiation_params):
+ """
+ Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
+ The default-value is used. If it is between < > it look for a value at instantiation_params
+ :param primitive_desc: portion of VNFD/NSD that describes primitive
+ :param params: Params provided by user
+ :param instantiation_params: Instantiation params provided by user
+ :return: a dictionary with the calculated params
+ """
+ calculated_params = {}
+ for parameter in primitive_desc.get("parameter", ()):
+ param_name = parameter["name"]
+ if param_name in params:
+ calculated_params[param_name] = params[param_name]
+ elif "default-value" in parameter or "value" in parameter:
+ if "value" in parameter:
+ calculated_params[param_name] = parameter["value"]
+ else:
+ calculated_params[param_name] = parameter["default-value"]
+ if (
+ isinstance(calculated_params[param_name], str)
+ and calculated_params[param_name].startswith("<")
+ and calculated_params[param_name].endswith(">")
+ ):
+ if calculated_params[param_name][1:-1] in instantiation_params:
+ calculated_params[param_name] = instantiation_params[
+ calculated_params[param_name][1:-1]
+ ]
+ else:
+ raise LcmException(
+ "Parameter {} needed to execute primitive {} not provided".format(
+ calculated_params[param_name], primitive_desc["name"]
+ )
+ )
+ else:
+ raise LcmException(
+ "Parameter {} needed to execute primitive {} not provided".format(
+ param_name, primitive_desc["name"]
+ )
+ )
+
+ if isinstance(calculated_params[param_name], (dict, list, tuple)):
+ calculated_params[param_name] = yaml.safe_dump(
+ calculated_params[param_name], default_flow_style=True, width=256
+ )
+ elif isinstance(calculated_params[param_name], str) and calculated_params[
+ param_name
+ ].startswith("!!yaml "):
+ calculated_params[param_name] = calculated_params[param_name][7:]
+ if parameter.get("data-type") == "INTEGER":
+ try:
+ calculated_params[param_name] = int(calculated_params[param_name])
+ except ValueError: # error converting string to int
+ raise LcmException(
+ "Parameter {} of primitive {} must be integer".format(
+ param_name, primitive_desc["name"]
+ )
+ )
+ elif parameter.get("data-type") == "BOOLEAN":
+ calculated_params[param_name] = not (
+ (str(calculated_params[param_name])).lower() == "false"
+ )
+
+ # add always ns_config_info if primitive name is config
+ if primitive_desc["name"] == "config":
+ if "ns_config_info" in instantiation_params:
+ calculated_params["ns_config_info"] = instantiation_params[
+ "ns_config_info"
+ ]
+ return calculated_params
+
+ def _look_for_deployed_vca(
+ self,
+ deployed_vca,
+ member_vnf_index,
+ vdu_id,
+ vdu_count_index,
+ kdu_name=None,
+ ee_descriptor_id=None,
+ ):
+ # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
+ for vca in deployed_vca:
+ if not vca:
+ continue
+ if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
+ continue
+ if (
+ vdu_count_index is not None
+ and vdu_count_index != vca["vdu_count_index"]
+ ):
+ continue
+ if kdu_name and kdu_name != vca["kdu_name"]:
+ continue
+ if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
+ continue
+ break
+ else:
+ # vca_deployed not found
+ raise LcmException(
+ "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
+ " is not deployed".format(
+ member_vnf_index,
+ vdu_id,
+ vdu_count_index,
+ kdu_name,
+ ee_descriptor_id,
+ )
+ )
+ # get ee_id
+ ee_id = vca.get("ee_id")
+ vca_type = vca.get(
+ "type", "lxc_proxy_charm"
+ ) # default value for backward compatibility - proxy charm
+ if not ee_id:
+ raise LcmException(
+ "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
+ "execution environment".format(
+ member_vnf_index, vdu_id, kdu_name, vdu_count_index
+ )
+ )
+ return ee_id, vca_type
+
+ async def _ns_execute_primitive(
+ self,
+ ee_id,
+ primitive,
+ primitive_params,
+ retries=0,
+ retries_interval=30,
+ timeout=None,
+ vca_type=None,
+ db_dict=None,
+ vca_id: str = None,
+ ) -> (str, str):
+ try:
+ if primitive == "config":
+ primitive_params = {"params": primitive_params}
+
+ vca_type = vca_type or "lxc_proxy_charm"
+
+ while retries >= 0:
+ try:
+ output = await asyncio.wait_for(
+ self.vca_map[vca_type].exec_primitive(
+ ee_id=ee_id,
+ primitive_name=primitive,
+ params_dict=primitive_params,
+ progress_timeout=self.timeout_progress_primitive,
+ total_timeout=self.timeout_primitive,
+ db_dict=db_dict,
+ vca_id=vca_id,
+ vca_type=vca_type,
+ ),
+ timeout=timeout or self.timeout_primitive,
+ )
+ # execution was OK
+ break
+ except asyncio.CancelledError:
+ raise
+ except Exception as e: # asyncio.TimeoutError
+ if isinstance(e, asyncio.TimeoutError):
+ e = "Timeout"
+ retries -= 1
+ if retries >= 0:
+ self.logger.debug(
+ "Error executing action {} on {} -> {}".format(
+ primitive, ee_id, e
+ )
+ )
+ # wait and retry
+ await asyncio.sleep(retries_interval, loop=self.loop)
+ else:
+ return "FAILED", str(e)
+
+ return "COMPLETED", output
+
+ except (LcmException, asyncio.CancelledError):
+ raise
+ except Exception as e:
+ return "FAIL", "Error executing action {}: {}".format(primitive, e)
+
+ async def vca_status_refresh(self, nsr_id, nslcmop_id):
+ """
+ Updating the vca_status with latest juju information in nsrs record
+ :param: nsr_id: Id of the nsr
+ :param: nslcmop_id: Id of the nslcmop
+ :return: None
+ """
+
+ self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ vca_id = self.get_vca_id({}, db_nsr)
+ if db_nsr["_admin"]["deployed"]["K8s"]:
+ for _, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
+ cluster_uuid, kdu_instance, cluster_type = (
+ k8s["k8scluster-uuid"],
+ k8s["kdu-instance"],
+ k8s["k8scluster-type"],
+ )
+ await self._on_update_k8s_db(
+ cluster_uuid=cluster_uuid,
+ kdu_instance=kdu_instance,
+ filter={"_id": nsr_id},
+ vca_id=vca_id,
+ cluster_type=cluster_type,
+ )
+ else:
+ for vca_index, _ in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
+ table, filter = "nsrs", {"_id": nsr_id}
+ path = "_admin.deployed.VCA.{}.".format(vca_index)
+ await self._on_update_n2vc_db(table, filter, path, {})
+
+ self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
+ self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
+
+ async def action(self, nsr_id, nslcmop_id):
+ # Try to lock HA task here
+ task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
+ if not task_is_locked_by_me:
+ return
+
+ logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
+ self.logger.debug(logging_text + "Enter")
+ # get all needed from database
+ db_nsr = None
+ db_nslcmop = None
+ db_nsr_update = {}
+ db_nslcmop_update = {}
+ nslcmop_operation_state = None
+ error_description_nslcmop = None
+ exc = None
+ try:
+ # wait for any previous tasks in process
+ step = "Waiting for previous operations to terminate"
+ await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
+
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=None,
+ current_operation="RUNNING ACTION",
+ current_operation_id=nslcmop_id,
+ )
+
+ step = "Getting information from database"
+ db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ if db_nslcmop["operationParams"].get("primitive_params"):
+ db_nslcmop["operationParams"]["primitive_params"] = json.loads(
+ db_nslcmop["operationParams"]["primitive_params"]
+ )
+
+ nsr_deployed = db_nsr["_admin"].get("deployed")
+ vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
+ vdu_id = db_nslcmop["operationParams"].get("vdu_id")
+ kdu_name = db_nslcmop["operationParams"].get("kdu_name")
+ vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
+ primitive = db_nslcmop["operationParams"]["primitive"]
+ primitive_params = db_nslcmop["operationParams"]["primitive_params"]
+ timeout_ns_action = db_nslcmop["operationParams"].get(
+ "timeout_ns_action", self.timeout_primitive
+ )
+
+ if vnf_index:
+ step = "Getting vnfr from database"
+ db_vnfr = self.db.get_one(
+ "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
+ )
+ if db_vnfr.get("kdur"):
+ kdur_list = []
+ for kdur in db_vnfr["kdur"]:
+ if kdur.get("additionalParams"):
+ kdur["additionalParams"] = json.loads(
+ kdur["additionalParams"]
+ )
+ kdur_list.append(kdur)
+ db_vnfr["kdur"] = kdur_list
+ step = "Getting vnfd from database"
+ db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
+
+ # Sync filesystem before running a primitive
+ self.fs.sync(db_vnfr["vnfd-id"])
+ else:
+ step = "Getting nsd from database"
+ db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
+
+ vca_id = self.get_vca_id(db_vnfr, db_nsr)
+ # for backward compatibility
+ if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
+ nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
+ db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+
+ # look for primitive
+ config_primitive_desc = descriptor_configuration = None
+ if vdu_id:
+ descriptor_configuration = get_configuration(db_vnfd, vdu_id)
+ elif kdu_name:
+ descriptor_configuration = get_configuration(db_vnfd, kdu_name)
+ elif vnf_index:
+ descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
+ else:
+ descriptor_configuration = db_nsd.get("ns-configuration")
+
+ if descriptor_configuration and descriptor_configuration.get(
+ "config-primitive"
+ ):
+ for config_primitive in descriptor_configuration["config-primitive"]:
+ if config_primitive["name"] == primitive:
+ config_primitive_desc = config_primitive
+ break
+
+ if not config_primitive_desc:
+ if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
+ raise LcmException(
+ "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
+ primitive
+ )
+ )
+ primitive_name = primitive
+ ee_descriptor_id = None
+ else:
+ primitive_name = config_primitive_desc.get(
+ "execution-environment-primitive", primitive
+ )
+ ee_descriptor_id = config_primitive_desc.get(
+ "execution-environment-ref"
+ )
+
+ if vnf_index:
+ if vdu_id:
+ vdur = next(
+ (x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None
+ )
+ desc_params = parse_yaml_strings(vdur.get("additionalParams"))
+ elif kdu_name:
+ kdur = next(
+ (x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None
+ )
+ desc_params = parse_yaml_strings(kdur.get("additionalParams"))
+ else:
+ desc_params = parse_yaml_strings(
+ db_vnfr.get("additionalParamsForVnf")
+ )
+ else:
+ desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
+ if kdu_name and get_configuration(db_vnfd, kdu_name):
+ kdu_configuration = get_configuration(db_vnfd, kdu_name)
+ actions = set()
+ for primitive in kdu_configuration.get("initial-config-primitive", []):
+ actions.add(primitive["name"])
+ for primitive in kdu_configuration.get("config-primitive", []):
+ actions.add(primitive["name"])
+ kdu = find_in_list(
+ nsr_deployed["K8s"],
+ lambda kdu: kdu_name == kdu["kdu-name"]
+ and kdu["member-vnf-index"] == vnf_index,
+ )
+ kdu_action = (
+ True
+ if primitive_name in actions
+ and kdu["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
+ else False
+ )
+
+ # TODO check if ns is in a proper status
+ if kdu_name and (
+ primitive_name in ("upgrade", "rollback", "status") or kdu_action
+ ):
+ # kdur and desc_params already set from before
+ if primitive_params:
+ desc_params.update(primitive_params)
+ # TODO Check if we will need something at vnf level
+ for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
+ if (
+ kdu_name == kdu["kdu-name"]
+ and kdu["member-vnf-index"] == vnf_index
+ ):
+ break
+ else:
+ raise LcmException(
+ "KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index)
+ )
+
+ if kdu.get("k8scluster-type") not in self.k8scluster_map:
+ msg = "unknown k8scluster-type '{}'".format(
+ kdu.get("k8scluster-type")
+ )
+ raise LcmException(msg)
+
+ db_dict = {
+ "collection": "nsrs",
+ "filter": {"_id": nsr_id},
+ "path": "_admin.deployed.K8s.{}".format(index),
+ }
+ self.logger.debug(
+ logging_text
+ + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name)
+ )
+ step = "Executing kdu {}".format(primitive_name)
+ if primitive_name == "upgrade":
+ if desc_params.get("kdu_model"):
+ kdu_model = desc_params.get("kdu_model")
+ del desc_params["kdu_model"]
+ else:
+ kdu_model = kdu.get("kdu-model")
+ parts = kdu_model.split(sep=":")
+ if len(parts) == 2:
+ kdu_model = parts[0]
+ if desc_params.get("kdu_atomic_upgrade"):
+ atomic_upgrade = desc_params.get("kdu_atomic_upgrade").lower() in ("yes", "true", "1")
+ del desc_params["kdu_atomic_upgrade"]
+ else:
+ atomic_upgrade = True
+
+ detailed_status = await asyncio.wait_for(
+ self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
+ cluster_uuid=kdu.get("k8scluster-uuid"),
+ kdu_instance=kdu.get("kdu-instance"),
+ atomic=atomic_upgrade,
+ kdu_model=kdu_model,
+ params=desc_params,
+ db_dict=db_dict,
+ timeout=timeout_ns_action,
+ ),
+ timeout=timeout_ns_action + 10,
+ )
+ self.logger.debug(
+ logging_text + " Upgrade of kdu {} done".format(detailed_status)
+ )
+ elif primitive_name == "rollback":
+ detailed_status = await asyncio.wait_for(
+ self.k8scluster_map[kdu["k8scluster-type"]].rollback(
+ cluster_uuid=kdu.get("k8scluster-uuid"),
+ kdu_instance=kdu.get("kdu-instance"),
+ db_dict=db_dict,
+ ),
+ timeout=timeout_ns_action,
+ )
+ elif primitive_name == "status":
+ detailed_status = await asyncio.wait_for(
+ self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
+ cluster_uuid=kdu.get("k8scluster-uuid"),
+ kdu_instance=kdu.get("kdu-instance"),
+ vca_id=vca_id,
+ ),
+ timeout=timeout_ns_action,
+ )
+ else:
+ kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(
+ kdu["kdu-name"], nsr_id
+ )
+ params = self._map_primitive_params(
+ config_primitive_desc, primitive_params, desc_params
+ )
+
+ detailed_status = await asyncio.wait_for(
+ self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
+ cluster_uuid=kdu.get("k8scluster-uuid"),
+ kdu_instance=kdu_instance,
+ primitive_name=primitive_name,
+ params=params,
+ db_dict=db_dict,
+ timeout=timeout_ns_action,
+ vca_id=vca_id,
+ ),
+ timeout=timeout_ns_action,
+ )
+
+ if detailed_status:
+ nslcmop_operation_state = "COMPLETED"
+ else:
+ detailed_status = ""
+ nslcmop_operation_state = "FAILED"
+ else:
+ ee_id, vca_type = self._look_for_deployed_vca(
+ nsr_deployed["VCA"],
+ member_vnf_index=vnf_index,
+ vdu_id=vdu_id,
+ vdu_count_index=vdu_count_index,
+ ee_descriptor_id=ee_descriptor_id,
+ )
+ for vca_index, vca_deployed in enumerate(
+ db_nsr["_admin"]["deployed"]["VCA"]
+ ):
+ if vca_deployed.get("member-vnf-index") == vnf_index:
+ db_dict = {
+ "collection": "nsrs",
+ "filter": {"_id": nsr_id},
+ "path": "_admin.deployed.VCA.{}.".format(vca_index),
+ }
+ break
+ (
+ nslcmop_operation_state,
+ detailed_status,
+ ) = await self._ns_execute_primitive(
+ ee_id,
+ primitive=primitive_name,
+ primitive_params=self._map_primitive_params(
+ config_primitive_desc, primitive_params, desc_params
+ ),
+ timeout=timeout_ns_action,
+ vca_type=vca_type,
+ db_dict=db_dict,
+ vca_id=vca_id,
+ )
+
+ db_nslcmop_update["detailed-status"] = detailed_status
+ error_description_nslcmop = (
+ detailed_status if nslcmop_operation_state == "FAILED" else ""
+ )
+ self.logger.debug(
+ logging_text
+ + " task Done with result {} {}".format(
+ nslcmop_operation_state, detailed_status
+ )
+ )
+ return # database update is called inside finally
+
+ except (DbException, LcmException, N2VCException, K8sException) as e:
+ self.logger.error(logging_text + "Exit Exception {}".format(e))
+ exc = e
+ except asyncio.CancelledError:
+ self.logger.error(
+ logging_text + "Cancelled Exception while '{}'".format(step)
+ )
+ exc = "Operation was cancelled"
+ except asyncio.TimeoutError:
+ self.logger.error(logging_text + "Timeout while '{}'".format(step))
+ exc = "Timeout"
+ except Exception as e:
+ exc = traceback.format_exc()
+ self.logger.critical(
+ logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
+ exc_info=True,
+ )
+ finally:
+ if exc:
+ db_nslcmop_update[
+ "detailed-status"
+ ] = (
+ detailed_status
+ ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
+ nslcmop_operation_state = "FAILED"
+ if db_nsr:
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=db_nsr[
+ "nsState"
+ ], # TODO check if degraded. For the moment use previous status
+ current_operation="IDLE",
+ current_operation_id=None,
+ # error_description=error_description_nsr,
+ # error_detail=error_detail,
+ other_update=db_nsr_update,
+ )
+
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage="",
+ error_message=error_description_nslcmop,
+ operation_state=nslcmop_operation_state,
+ other_update=db_nslcmop_update,
+ )
+
+ if nslcmop_operation_state:
+ try:
+ await self.msg.aiowrite(
+ "ns",
+ "actioned",
+ {
+ "nsr_id": nsr_id,
+ "nslcmop_id": nslcmop_id,
+ "operationState": nslcmop_operation_state,
+ },
+ loop=self.loop,
+ )
+ except Exception as e:
+ self.logger.error(
+ logging_text + "kafka_write notification Exception {}".format(e)
+ )
+ self.logger.debug(logging_text + "Exit")
+ self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
+ return nslcmop_operation_state, detailed_status
+
+ async def terminate_vdus(
+ self, db_vnfr, member_vnf_index, db_nsr, update_db_nslcmops, stage, logging_text
+ ):
+ """This method terminates VDUs
+
+ Args:
+ db_vnfr: VNF instance record
+ member_vnf_index: VNF index to identify the VDUs to be removed
+ db_nsr: NS instance record
+ update_db_nslcmops: Nslcmop update record
+ """
+ vca_scaling_info = []
+ scaling_info = {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
+ scaling_info["scaling_direction"] = "IN"
+ scaling_info["vdu-delete"] = {}
+ scaling_info["kdu-delete"] = {}
+ db_vdur = db_vnfr.get("vdur")
+ vdur_list = copy(db_vdur)
+ count_index = 0
+ for index, vdu in enumerate(vdur_list):
+ vca_scaling_info.append(
+ {
+ "osm_vdu_id": vdu["vdu-id-ref"],
+ "member-vnf-index": member_vnf_index,
+ "type": "delete",
+ "vdu_index": count_index,
+ })
+ scaling_info["vdu-delete"][vdu["vdu-id-ref"]] = count_index
+ scaling_info["vdu"].append(
+ {
+ "name": vdu.get("name") or vdu.get("vdu-name"),
+ "vdu_id": vdu["vdu-id-ref"],
+ "interface": [],
+ })
+ for interface in vdu["interfaces"]:
+ scaling_info["vdu"][index]["interface"].append(
+ {
+ "name": interface["name"],
+ "ip_address": interface["ip-address"],
+ "mac_address": interface.get("mac-address"),
+ })
+ self.logger.info("NS update scaling info{}".format(scaling_info))
+ stage[2] = "Terminating VDUs"
+ if scaling_info.get("vdu-delete"):
+ # scale_process = "RO"
+ if self.ro_config.get("ng"):
+ await self._scale_ng_ro(
+ logging_text, db_nsr, update_db_nslcmops, db_vnfr, scaling_info, stage
+ )
+
+ async def remove_vnf(
+ self, nsr_id, nslcmop_id, vnf_instance_id
+ ):
+ """This method is to Remove VNF instances from NS.
+
+ Args:
+ nsr_id: NS instance id
+ nslcmop_id: nslcmop id of update
+ vnf_instance_id: id of the VNF instance to be removed
+
+ Returns:
+ result: (str, str) COMPLETED/FAILED, details
+ """
+ try:
+ db_nsr_update = {}
+ logging_text = "Task ns={} update ".format(nsr_id)
+ check_vnfr_count = len(self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}))
+ self.logger.info("check_vnfr_count {}".format(check_vnfr_count))
+ if check_vnfr_count > 1:
+ stage = ["", "", ""]
+ step = "Getting nslcmop from database"
+ self.logger.debug(step + " after having waited for previous tasks to be completed")
+ # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ db_vnfr = self.db.get_one("vnfrs", {"_id": vnf_instance_id})
+ member_vnf_index = db_vnfr["member-vnf-index-ref"]
+ """ db_vnfr = self.db.get_one(
+ "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
+
+ update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ await self.terminate_vdus(db_vnfr, member_vnf_index, db_nsr, update_db_nslcmops, stage, logging_text)
+
+ constituent_vnfr = db_nsr.get("constituent-vnfr-ref")
+ constituent_vnfr.remove(db_vnfr.get("_id"))
+ db_nsr_update["constituent-vnfr-ref"] = db_nsr.get("constituent-vnfr-ref")
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self.db.del_one("vnfrs", {"_id": db_vnfr.get("_id")})
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ return "COMPLETED", "Done"
+ else:
+ step = "Terminate VNF Failed with"
+ raise LcmException("{} Cannot terminate the last VNF in this NS.".format(
+ vnf_instance_id))
+ except (LcmException, asyncio.CancelledError):
+ raise
+ except Exception as e:
+ self.logger.debug("Error removing VNF {}".format(e))
+ return "FAILED", "Error removing VNF {}".format(e)
+
+ async def _ns_redeploy_vnf(
+ self, nsr_id, nslcmop_id, db_vnfd, db_vnfr, db_nsr,
+ ):
+ """This method updates and redeploys VNF instances
+
+ Args:
+ nsr_id: NS instance id
+ nslcmop_id: nslcmop id
+ db_vnfd: VNF descriptor
+ db_vnfr: VNF instance record
+ db_nsr: NS instance record
+
+ Returns:
+ result: (str, str) COMPLETED/FAILED, details
+ """
+ try:
+ count_index = 0
+ stage = ["", "", ""]
+ logging_text = "Task ns={} update ".format(nsr_id)
+ latest_vnfd_revision = db_vnfd["_admin"].get("revision")
+ member_vnf_index = db_vnfr["member-vnf-index-ref"]
+
+ # Terminate old VNF resources
+ update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ await self.terminate_vdus(db_vnfr, member_vnf_index, db_nsr, update_db_nslcmops, stage, logging_text)
+
+ # old_vnfd_id = db_vnfr["vnfd-id"]
+ # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
+ new_db_vnfd = db_vnfd
+ # new_vnfd_ref = new_db_vnfd["id"]
+ # new_vnfd_id = vnfd_id
+
+ # Create VDUR
+ new_vnfr_cp = []
+ for cp in new_db_vnfd.get("ext-cpd", ()):
+ vnf_cp = {
+ "name": cp.get("id"),
+ "connection-point-id": cp.get("int-cpd", {}).get("cpd"),
+ "connection-point-vdu-id": cp.get("int-cpd", {}).get("vdu-id"),
+ "id": cp.get("id"),
+ }
+ new_vnfr_cp.append(vnf_cp)
+ new_vdur = update_db_nslcmops["operationParams"]["newVdur"]
+ # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
+ # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
+ new_vnfr_update = {"revision": latest_vnfd_revision, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
+ self.update_db_2("vnfrs", db_vnfr["_id"], new_vnfr_update)
+ updated_db_vnfr = self.db.get_one(
+ "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}
+ )
+
+ # Instantiate new VNF resources
+ # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ vca_scaling_info = []
+ scaling_info = {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
+ scaling_info["scaling_direction"] = "OUT"
+ scaling_info["vdu-create"] = {}
+ scaling_info["kdu-create"] = {}
+ vdud_instantiate_list = db_vnfd["vdu"]
+ for index, vdud in enumerate(vdud_instantiate_list):
+ cloud_init_text = self._get_vdu_cloud_init_content(
+ vdud, db_vnfd
+ )
+ if cloud_init_text:
+ additional_params = (
+ self._get_vdu_additional_params(updated_db_vnfr, vdud["id"])
+ or {}
+ )
+ cloud_init_list = []
+ if cloud_init_text:
+ # TODO Information of its own ip is not available because db_vnfr is not updated.
+ additional_params["OSM"] = get_osm_params(
+ updated_db_vnfr, vdud["id"], 1
+ )
+ cloud_init_list.append(
+ self._parse_cloud_init(
+ cloud_init_text,
+ additional_params,
+ db_vnfd["id"],
+ vdud["id"],
+ )
+ )
+ vca_scaling_info.append(
+ {
+ "osm_vdu_id": vdud["id"],
+ "member-vnf-index": member_vnf_index,
+ "type": "create",
+ "vdu_index": count_index,
+ }
+ )
+ scaling_info["vdu-create"][vdud["id"]] = count_index
+ if self.ro_config.get("ng"):
+ self.logger.debug(
+ "New Resources to be deployed: {}".format(scaling_info))
+ await self._scale_ng_ro(
+ logging_text, db_nsr, update_db_nslcmops, updated_db_vnfr, scaling_info, stage
+ )
+ return "COMPLETED", "Done"
+ except (LcmException, asyncio.CancelledError):
+ raise
+ except Exception as e:
+ self.logger.debug("Error updating VNF {}".format(e))
+ return "FAILED", "Error updating VNF {}".format(e)
+
+ async def _ns_charm_upgrade(
+ self,
+ ee_id,
+ charm_id,
+ charm_type,
+ path,
+ timeout: float = None,
+ ) -> (str, str):
+ """This method upgrade charms in VNF instances
+
+ Args:
+ ee_id: Execution environment id
+ path: Local path to the charm
+ charm_id: charm-id
+ charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
+ timeout: (Float) Timeout for the ns update operation
+
+ Returns:
+ result: (str, str) COMPLETED/FAILED, details
+ """
+ try:
+ charm_type = charm_type or "lxc_proxy_charm"
+ output = await self.vca_map[charm_type].upgrade_charm(
+ ee_id=ee_id,
+ path=path,
+ charm_id=charm_id,
+ charm_type=charm_type,
+ timeout=timeout or self.timeout_ns_update,
+ )
+
+ if output:
+ return "COMPLETED", output
+
+ except (LcmException, asyncio.CancelledError):
+ raise
+
+ except Exception as e:
+
+ self.logger.debug("Error upgrading charm {}".format(path))
+
+ return "FAILED", "Error upgrading charm {}: {}".format(path, e)
+
+ async def update(self, nsr_id, nslcmop_id):
+ """Update NS according to different update types
+
+ This method performs upgrade of VNF instances then updates the revision
+ number in VNF record
+
+ Args:
+ nsr_id: Network service will be updated
+ nslcmop_id: ns lcm operation id
+
+ Returns:
+ It may raise DbException, LcmException, N2VCException, K8sException
+
+ """
+ # Try to lock HA task here
+ task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
+ if not task_is_locked_by_me:
+ return
+
+ logging_text = "Task ns={} update={} ".format(nsr_id, nslcmop_id)
+ self.logger.debug(logging_text + "Enter")
+
+ # Set the required variables to be filled up later
+ db_nsr = None
+ db_nslcmop_update = {}
+ vnfr_update = {}
+ nslcmop_operation_state = None
+ db_nsr_update = {}
+ error_description_nslcmop = ""
+ exc = None
+ change_type = "updated"
+ detailed_status = ""
+
+ try:
+ # wait for any previous tasks in process
+ step = "Waiting for previous operations to terminate"
+ await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=None,
+ current_operation="UPDATING",
+ current_operation_id=nslcmop_id,
+ )
+
+ step = "Getting nslcmop from database"
+ db_nslcmop = self.db.get_one(
+ "nslcmops", {"_id": nslcmop_id}, fail_on_empty=False
+ )
+ update_type = db_nslcmop["operationParams"]["updateType"]
+
+ step = "Getting nsr from database"
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ old_operational_status = db_nsr["operational-status"]
+ db_nsr_update["operational-status"] = "updating"
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ nsr_deployed = db_nsr["_admin"].get("deployed")
+
+ if update_type == "CHANGE_VNFPKG":
+
+ # Get the input parameters given through update request
+ vnf_instance_id = db_nslcmop["operationParams"][
+ "changeVnfPackageData"
+ ].get("vnfInstanceId")
+
+ vnfd_id = db_nslcmop["operationParams"]["changeVnfPackageData"].get(
+ "vnfdId"
+ )
+ timeout_seconds = db_nslcmop["operationParams"].get("timeout_ns_update")
+
+ step = "Getting vnfr from database"
+ db_vnfr = self.db.get_one(
+ "vnfrs", {"_id": vnf_instance_id}, fail_on_empty=False
+ )
+
+ step = "Getting vnfds from database"
+ # Latest VNFD
+ latest_vnfd = self.db.get_one(
+ "vnfds", {"_id": vnfd_id}, fail_on_empty=False
+ )
+ latest_vnfd_revision = latest_vnfd["_admin"].get("revision")
+
+ # Current VNFD
+ current_vnf_revision = db_vnfr.get("revision", 1)
+ current_vnfd = self.db.get_one(
+ "vnfds_revisions",
+ {"_id": vnfd_id + ":" + str(current_vnf_revision)},
+ fail_on_empty=False,
+ )
+ # Charm artifact paths will be filled up later
+ (
+ current_charm_artifact_path,
+ target_charm_artifact_path,
+ charm_artifact_paths,
+ ) = ([], [], [])
+
+ step = "Checking if revision has changed in VNFD"
+ if current_vnf_revision != latest_vnfd_revision:
+
+ change_type = "policy_updated"
+
+ # There is new revision of VNFD, update operation is required
+ current_vnfd_path = vnfd_id + ":" + str(current_vnf_revision)
+ latest_vnfd_path = vnfd_id + ":" + str(latest_vnfd_revision)
+
+ step = "Removing the VNFD packages if they exist in the local path"
+ shutil.rmtree(self.fs.path + current_vnfd_path, ignore_errors=True)
+ shutil.rmtree(self.fs.path + latest_vnfd_path, ignore_errors=True)
+
+ step = "Get the VNFD packages from FSMongo"
+ self.fs.sync(from_path=latest_vnfd_path)
+ self.fs.sync(from_path=current_vnfd_path)
+
+ step = (
+ "Get the charm-type, charm-id, ee-id if there is deployed VCA"
+ )
+ base_folder = latest_vnfd["_admin"]["storage"]
+
+ for charm_index, charm_deployed in enumerate(
+ get_iterable(nsr_deployed, "VCA")
+ ):
+ vnf_index = db_vnfr.get("member-vnf-index-ref")
+
+ # Getting charm-id and charm-type
+ if charm_deployed.get("member-vnf-index") == vnf_index:
+ charm_id = self.get_vca_id(db_vnfr, db_nsr)
+ charm_type = charm_deployed.get("type")
+
+ # Getting ee-id
+ ee_id = charm_deployed.get("ee_id")
+
+ step = "Getting descriptor config"
+ descriptor_config = get_configuration(
+ current_vnfd, current_vnfd["id"]
+ )
+
+ if "execution-environment-list" in descriptor_config:
+ ee_list = descriptor_config.get(
+ "execution-environment-list", []
+ )
+ else:
+ ee_list = []
+
+ # There could be several charm used in the same VNF
+ for ee_item in ee_list:
+ if ee_item.get("juju"):
+
+ step = "Getting charm name"
+ charm_name = ee_item["juju"].get("charm")
+
+ step = "Setting Charm artifact paths"
+ current_charm_artifact_path.append(
+ get_charm_artifact_path(
+ base_folder,
+ charm_name,
+ charm_type,
+ current_vnf_revision,
+ )
+ )
+ target_charm_artifact_path.append(
+ get_charm_artifact_path(
+ base_folder,
+ charm_name,
+ charm_type,
+ latest_vnfd_revision,
+ )
+ )
+
+ charm_artifact_paths = zip(
+ current_charm_artifact_path, target_charm_artifact_path
+ )
+
+ step = "Checking if software version has changed in VNFD"
+ if find_software_version(current_vnfd) != find_software_version(
+ latest_vnfd
+ ):
+
+ step = "Checking if existing VNF has charm"
+ for current_charm_path, target_charm_path in list(
+ charm_artifact_paths
+ ):
+ if current_charm_path:
+ raise LcmException(
+ "Software version change is not supported as VNF instance {} has charm.".format(
+ vnf_instance_id
+ )
+ )
+
+ # There is no change in the charm package, then redeploy the VNF
+ # based on new descriptor
+ step = "Redeploying VNF"
+ member_vnf_index = db_vnfr["member-vnf-index-ref"]
+ (
+ result,
+ detailed_status
+ ) = await self._ns_redeploy_vnf(
+ nsr_id,
+ nslcmop_id,
+ latest_vnfd,
+ db_vnfr,
+ db_nsr
+ )
+ if result == "FAILED":
+ nslcmop_operation_state = result
+ error_description_nslcmop = detailed_status
+ db_nslcmop_update["detailed-status"] = detailed_status
+ self.logger.debug(
+ logging_text
+ + " step {} Done with result {} {}".format(
+ step, nslcmop_operation_state, detailed_status
+ )
+ )
+
+ else:
+ step = "Checking if any charm package has changed or not"
+ for current_charm_path, target_charm_path in list(
+ charm_artifact_paths
+ ):
+ if (
+ current_charm_path
+ and target_charm_path
+ and self.check_charm_hash_changed(
+ current_charm_path, target_charm_path
+ )
+ ):
+
+ step = "Checking whether VNF uses juju bundle"
+ if check_juju_bundle_existence(current_vnfd):
+
+ raise LcmException(
+ "Charm upgrade is not supported for the instance which"
+ " uses juju-bundle: {}".format(
+ check_juju_bundle_existence(current_vnfd)
+ )
+ )