From f0f83a3ca9d8a94e464721ca0d99c5ff5d47f405 Mon Sep 17 00:00:00 2001 From: aktas Date: Fri, 12 Feb 2021 22:19:10 +0300 Subject: [PATCH 1/1] Bug 585 Fix for scaling This fix should be merged with this https://osm.etsi.org/gerrit/c/osm/N2VC/+/10364 Change-Id: I43fb4e5c81dbbaed07f01ba1a3ba399f7425b347 Signed-off-by: aktas Signed-off-by: garciadeblas --- osm_lcm/data_utils/vnfd.py | 4 +- osm_lcm/data_utils/vnfr.py | 9 ++ osm_lcm/ns.py | 227 +++++++++++++++++++++++++++++++++++-- 3 files changed, 226 insertions(+), 14 deletions(-) diff --git a/osm_lcm/data_utils/vnfd.py b/osm_lcm/data_utils/vnfd.py index f816a8d..1b45b53 100644 --- a/osm_lcm/data_utils/vnfd.py +++ b/osm_lcm/data_utils/vnfd.py @@ -96,7 +96,7 @@ def get_configuration(vnfd, entity_id): if not ops_vnf: return None day12ops = ops_vnf.get("day1-2", []) - list_utils.find_in_list( + return list_utils.find_in_list( day12ops, lambda configuration: configuration["id"] == entity_id) @@ -138,7 +138,7 @@ def get_number_of_instances(vnfd, vdu_id): () ), lambda a_vdu: a_vdu["vdu-id"] == vdu_id - )["number-of-instances"] + ).get("number-of-instances", 1) def get_juju_ee_ref(vnfd, entity_id): diff --git a/osm_lcm/data_utils/vnfr.py b/osm_lcm/data_utils/vnfr.py index 042788e..9c0b148 100644 --- a/osm_lcm/data_utils/vnfr.py +++ b/osm_lcm/data_utils/vnfr.py @@ -23,6 +23,7 @@ ## from osm_lcm.data_utils import list_utils +from osm_lcm.lcm_utils import get_iterable def find_VNFR_by_VDU_ID(vnfr, vdu_id): @@ -57,3 +58,11 @@ def get_osm_params(db_vnfr, vdu_id=None, vdu_count_index=0): osm_params["vdu_id"] = vdu_id osm_params["count_index"] = vdu_count_index return osm_params + + +def get_vdur_index(db_vnfr, vdu_delta): + vdur_list = get_iterable(db_vnfr, "vdur") + if vdur_list: + return len([x for x in vdur_list if x.get("vdu-id-ref") == vdu_delta["id"]]) + else: + return 0 diff --git a/osm_lcm/ns.py b/osm_lcm/ns.py index 1e40fee..75db8e5 100644 --- a/osm_lcm/ns.py +++ b/osm_lcm/ns.py @@ -33,7 +33,7 @@ from osm_lcm.data_utils.vnfd import get_vdu_list, get_vdu_profile, \ get_kdu_list, get_virtual_link_profiles, get_vdu, get_configuration, \ get_vdu_index, get_scaling_aspect, get_number_of_instances, get_juju_ee_ref from osm_lcm.data_utils.list_utils import find_in_list -from osm_lcm.data_utils.vnfr import get_osm_params +from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index from osm_lcm.data_utils.dict_utils import parse_yaml_strings from osm_lcm.data_utils.database.vim_account import VimAccountDB from n2vc.k8s_helm_conn import K8sHelmConnector @@ -1163,14 +1163,14 @@ class NsLcm(LcmBase): if vnfr_id: element_type = 'VNF' element_under_configuration = vnfr_id - namespace += ".{}".format(vnfr_id) + namespace += ".{}-{}".format(vnfr_id, vdu_index or 0) if vdu_id: namespace += ".{}-{}".format(vdu_id, vdu_index or 0) element_type = 'VDU' element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0) osm_config["osm"]["vdu_id"] = vdu_id elif kdu_name: - namespace += ".{}".format(kdu_name) + namespace += ".{}.{}".format(kdu_name, vdu_index or 0) element_type = 'KDU' element_under_configuration = kdu_name osm_config["osm"]["kdu_name"] = kdu_name @@ -2741,7 +2741,7 @@ class NsLcm(LcmBase): return vca["ee_id"] async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor, - vca_index, destroy_ee=True, exec_primitives=True): + vca_index, destroy_ee=True, exec_primitives=True, scaling_in=False): """ Execute the terminate primitives and destroy the execution environment (if destroy_ee=False :param logging_text: @@ -2752,6 +2752,7 @@ class NsLcm(LcmBase): :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has not executed properly + :param scaling_in: True destroys the application, False destroys the model :return: None or exception """ @@ -2809,7 +2810,7 @@ class NsLcm(LcmBase): await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"]) if destroy_ee: - await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"]) + await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"], scaling_in=scaling_in) async def _delete_all_N2VC(self, db_nsr: dict): self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING') @@ -3050,7 +3051,7 @@ class NsLcm(LcmBase): config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name")) else: db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]] - config_descriptor = get_configuration(db_vnfd, db_vnfd["id"]) + config_descriptor = get_configuration(db_vnfd, db_vnfd["id"]) vca_type = vca.get("type") exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and vca.get("needed_terminate")) @@ -3615,6 +3616,7 @@ class NsLcm(LcmBase): logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id) stage = ['', '', ''] + tasks_dict_info = {} # ^ stage, step, VIM progress self.logger.debug(logging_text + "Enter") # get all needed from database @@ -3626,6 +3628,7 @@ class NsLcm(LcmBase): scale_process = None old_operational_status = "" old_config_status = "" + nsi_id = None try: # wait for any previous tasks in process step = "Waiting for previous operations to terminate" @@ -3670,6 +3673,8 @@ class NsLcm(LcmBase): step = "Getting vnfd from database" db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]}) + base_folder = db_vnfd["_admin"]["storage"] + step = "Getting scaling-group-descriptor" scaling_descriptor = find_in_list( get_scaling_aspect( @@ -3696,6 +3701,7 @@ class NsLcm(LcmBase): admin_scale_index += 1 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group RO_scaling_info = [] + VCA_scaling_info = [] vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []} if scaling_type == "SCALE_OUT": if "aspect-delta-details" not in scaling_descriptor: @@ -3712,7 +3718,7 @@ class NsLcm(LcmBase): for delta in deltas: for vdu_delta in delta["vdu-delta"]: vdud = get_vdu(db_vnfd, vdu_delta["id"]) - vdu_index = get_vdu_index(db_vnfr, vdu_delta["id"]) + vdu_index = get_vdur_index(db_vnfr, vdu_delta) cloud_init_text = self._get_vdu_cloud_init_content(vdud, db_vnfd) if cloud_init_text: additional_params = self._get_vdu_additional_params(db_vnfr, vdud["id"]) or {} @@ -3723,11 +3729,11 @@ class NsLcm(LcmBase): if vdu_profile and "max-number-of-instances" in vdu_profile: max_instance_count = vdu_profile.get("max-number-of-instances", 10) - deafult_instance_num = get_number_of_instances(db_vnfd, vdud["id"]) + default_instance_num = get_number_of_instances(db_vnfd, vdud["id"]) nb_scale_op += vdu_delta.get("number-of-instances", 1) - if nb_scale_op + deafult_instance_num > max_instance_count: + if nb_scale_op + default_instance_num > max_instance_count: raise LcmException( "reached the limit of {} (max-instance-count) " "scaling-out operations for the " @@ -3749,6 +3755,14 @@ class NsLcm(LcmBase): vdud["id"] ) ) + VCA_scaling_info.append( + { + "osm_vdu_id": vdu_delta["id"], + "member-vnf-index": vnf_index, + "type": "create", + "vdu_index": vdu_index + x + } + ) RO_scaling_info.append( { "osm_vdu_id": vdu_delta["id"], @@ -3770,21 +3784,32 @@ class NsLcm(LcmBase): deltas = scaling_descriptor.get("aspect-delta-details")["deltas"] for delta in deltas: for vdu_delta in delta["vdu-delta"]: + vdu_index = get_vdur_index(db_vnfr, vdu_delta) min_instance_count = 0 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"]) if vdu_profile and "min-number-of-instances" in vdu_profile: min_instance_count = vdu_profile["min-number-of-instances"] - deafult_instance_num = get_number_of_instances(db_vnfd, vdu_delta["id"]) + default_instance_num = get_number_of_instances(db_vnfd, vdu_delta["id"]) nb_scale_op -= vdu_delta.get("number-of-instances", 1) - if nb_scale_op + deafult_instance_num < min_instance_count: + if nb_scale_op + default_instance_num < min_instance_count: raise LcmException( "reached the limit of {} (min-instance-count) scaling-in operations for the " "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group) ) RO_scaling_info.append({"osm_vdu_id": vdu_delta["id"], "member-vnf-index": vnf_index, - "type": "delete", "count": vdu_delta.get("number-of-instances", 1)}) + "type": "delete", "count": vdu_delta.get("number-of-instances", 1), + "vdu_index": vdu_index - 1}) + for x in range(vdu_delta.get("number-of-instances", 1)): + VCA_scaling_info.append( + { + "osm_vdu_id": vdu_delta["id"], + "member-vnf-index": vnf_index, + "type": "delete", + "vdu_index": vdu_index - 1 - x + } + ) vdu_scaling_info["vdu-delete"][vdu_delta["id"]] = vdu_delta.get("number-of-instances", 1) # update VDU_SCALING_INFO with the VDUs to delete ip_addresses @@ -3886,6 +3911,68 @@ class NsLcm(LcmBase): db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time() + # SCALE-IN VCA - BEGIN + if VCA_scaling_info: + step = db_nslcmop_update["detailed-status"] = \ + "Deleting the execution environments" + scale_process = "VCA" + for vdu_info in VCA_scaling_info: + if vdu_info["type"] == "delete": + member_vnf_index = str(vdu_info["member-vnf-index"]) + self.logger.debug(logging_text + "vdu info: {}".format(vdu_info)) + vdu_id = vdu_info["osm_vdu_id"] + vdu_index = int(vdu_info["vdu_index"]) + stage[1] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format( + member_vnf_index, vdu_id, vdu_index) + stage[2] = step = "Scaling in VCA" + self._write_op_status( + op_id=nslcmop_id, + stage=stage + ) + vca_update = db_nsr["_admin"]["deployed"]["VCA"] + config_update = db_nsr["configurationStatus"] + for vca_index, vca in enumerate(vca_update): + if (vca or vca.get("ee_id")) and vca["member-vnf-index"] == member_vnf_index and \ + vca["vdu_count_index"] == vdu_index: + if vca.get("vdu_id"): + config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id")) + elif vca.get("kdu_name"): + config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name")) + else: + config_descriptor = get_configuration(db_vnfd, db_vnfd["id"]) + operation_params = db_nslcmop.get("operationParams") or {} + exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and + vca.get("needed_terminate")) + task = asyncio.ensure_future(asyncio.wait_for( + self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, + vca_index, destroy_ee=True, + exec_primitives=exec_terminate_primitives, + scaling_in=True), timeout=self.timeout_charm_delete)) + # wait before next removal + await asyncio.sleep(30) + tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id")) + del vca_update[vca_index] + del config_update[vca_index] + # wait for pending tasks of terminate primitives + if tasks_dict_info: + self.logger.debug(logging_text + + 'Waiting for tasks {}'.format(list(tasks_dict_info.keys()))) + error_list = await self._wait_for_tasks(logging_text, tasks_dict_info, + min(self.timeout_charm_delete, + self.timeout_ns_terminate), + stage, nslcmop_id) + tasks_dict_info.clear() + if error_list: + raise LcmException("; ".join(error_list)) + + db_vca_and_config_update = { + "_admin.deployed.VCA": vca_update, + "configurationStatus": config_update + } + self.update_db_2("nsrs", db_nsr["_id"], db_vca_and_config_update) + scale_process = None + # SCALE-IN VCA - END + # SCALE RO - BEGIN if RO_scaling_info: scale_process = "RO" @@ -3897,6 +3984,117 @@ class NsLcm(LcmBase): scale_process = None if db_nsr_update: self.update_db_2("nsrs", nsr_id, db_nsr_update) + # SCALE RO - END + + # SCALE-UP VCA - BEGIN + if VCA_scaling_info: + step = db_nslcmop_update["detailed-status"] = \ + "Creating new execution environments" + scale_process = "VCA" + for vdu_info in VCA_scaling_info: + if vdu_info["type"] == "create": + member_vnf_index = str(vdu_info["member-vnf-index"]) + self.logger.debug(logging_text + "vdu info: {}".format(vdu_info)) + vnfd_id = db_vnfr["vnfd-ref"] + vdu_index = int(vdu_info["vdu_index"]) + deploy_params = {"OSM": get_osm_params(db_vnfr)} + if db_vnfr.get("additionalParamsForVnf"): + deploy_params.update(parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy())) + descriptor_config = get_configuration(db_vnfd, db_vnfd["id"]) + if descriptor_config: + vdu_id = None + vdu_name = None + kdu_name = None + self._deploy_n2vc( + logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index), + db_nsr=db_nsr, + db_vnfr=db_vnfr, + nslcmop_id=nslcmop_id, + nsr_id=nsr_id, + nsi_id=nsi_id, + vnfd_id=vnfd_id, + vdu_id=vdu_id, + kdu_name=kdu_name, + member_vnf_index=member_vnf_index, + vdu_index=vdu_index, + vdu_name=vdu_name, + deploy_params=deploy_params, + descriptor_config=descriptor_config, + base_folder=base_folder, + task_instantiation_info=tasks_dict_info, + stage=stage + ) + vdu_id = vdu_info["osm_vdu_id"] + vdur = find_in_list(db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id) + descriptor_config = get_configuration(db_vnfd, vdu_id) + if vdur.get("additionalParams"): + deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"]) + else: + deploy_params_vdu = deploy_params + deploy_params_vdu["OSM"] = get_osm_params(db_vnfr, vdu_id, vdu_count_index=vdu_index) + if descriptor_config: + vdu_name = None + kdu_name = None + stage[1] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format( + member_vnf_index, vdu_id, vdu_index) + stage[2] = step = "Scaling out VCA" + self._write_op_status( + op_id=nslcmop_id, + stage=stage + ) + self._deploy_n2vc( + logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format( + member_vnf_index, vdu_id, vdu_index), + db_nsr=db_nsr, + db_vnfr=db_vnfr, + nslcmop_id=nslcmop_id, + nsr_id=nsr_id, + nsi_id=nsi_id, + vnfd_id=vnfd_id, + vdu_id=vdu_id, + kdu_name=kdu_name, + member_vnf_index=member_vnf_index, + vdu_index=vdu_index, + vdu_name=vdu_name, + deploy_params=deploy_params_vdu, + descriptor_config=descriptor_config, + base_folder=base_folder, + task_instantiation_info=tasks_dict_info, + stage=stage + ) + # TODO: scaling for kdu is not implemented yet. + kdu_name = vdu_info["osm_vdu_id"] + descriptor_config = get_configuration(db_vnfd, kdu_name) + if descriptor_config: + vdu_id = None + vdu_index = vdu_index + vdu_name = None + kdur = next(x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name) + deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)} + if kdur.get("additionalParams"): + deploy_params_kdu = parse_yaml_strings(kdur["additionalParams"]) + + self._deploy_n2vc( + logging_text=logging_text, + db_nsr=db_nsr, + db_vnfr=db_vnfr, + nslcmop_id=nslcmop_id, + nsr_id=nsr_id, + nsi_id=nsi_id, + vnfd_id=vnfd_id, + vdu_id=vdu_id, + kdu_name=kdu_name, + member_vnf_index=member_vnf_index, + vdu_index=vdu_index, + vdu_name=vdu_name, + deploy_params=deploy_params_kdu, + descriptor_config=descriptor_config, + base_folder=base_folder, + task_instantiation_info=tasks_dict_info, + stage=stage + ) + # SCALE-UP VCA - END + scale_process = None # POST-SCALE BEGIN # execute primitive service POST-SCALING @@ -3991,6 +4189,11 @@ class NsLcm(LcmBase): self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True) finally: self._write_ns_status(nsr_id=nsr_id, ns_state=None, current_operation="IDLE", current_operation_id=None) + if tasks_dict_info: + stage[1] = "Waiting for instantiate pending tasks." + self.logger.debug(logging_text + stage[1]) + exc = await self._wait_for_tasks(logging_text, tasks_dict_info, self.timeout_ns_deploy, + stage, nslcmop_id, nsr_id=nsr_id) if exc: db_nslcmop_update["detailed-status"] = error_description_nslcmop = "FAILED {}: {}".format(step, exc) nslcmop_operation_state = "FAILED" -- 2.25.1