X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FLCM.git;a=blobdiff_plain;f=osm_lcm%2Fns.py;h=443a09e8d17e696225c05f45b237b37b06187051;hp=4077d01568c9c317a68468174629cc4f08dcf009;hb=6ec62b7495f91c1c8388f835696d581ef35c69d6;hpb=9a256dbc33676a12665c8a77e1e22154d34eab4b diff --git a/osm_lcm/ns.py b/osm_lcm/ns.py index 4077d01..443a09e 100644 --- a/osm_lcm/ns.py +++ b/osm_lcm/ns.py @@ -33,7 +33,7 @@ from osm_lcm.data_utils.vnfd import get_vdu_list, get_vdu_profile, \ get_kdu_list, get_virtual_link_profiles, get_vdu, get_configuration, \ get_vdu_index, get_scaling_aspect, get_number_of_instances, get_juju_ee_ref from osm_lcm.data_utils.list_utils import find_in_list -from osm_lcm.data_utils.vnfr import get_osm_params +from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index from osm_lcm.data_utils.dict_utils import parse_yaml_strings from osm_lcm.data_utils.database.vim_account import VimAccountDB from n2vc.k8s_helm_conn import K8sHelmConnector @@ -97,9 +97,6 @@ class NsLcm(LcmBase): self.n2vc = N2VCJujuConnector( log=self.logger, loop=self.loop, - url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']), - username=self.vca_config.get('user', None), - vca_config=self.vca_config, on_update_db=self._on_update_n2vc_db, fs=self.fs, db=self.db @@ -108,8 +105,6 @@ class NsLcm(LcmBase): self.conn_helm_ee = LCMHelmConn( log=self.logger, loop=self.loop, - url=None, - username=None, vca_config=self.vca_config, on_update_db=self._on_update_n2vc_db ) @@ -137,8 +132,7 @@ class NsLcm(LcmBase): juju_command=self.vca_config.get("jujupath"), log=self.logger, loop=self.loop, - on_update_db=None, - vca_config=self.vca_config, + on_update_db=self._on_update_k8s_db, fs=self.fs, db=self.db ) @@ -200,7 +194,7 @@ class NsLcm(LcmBase): except Exception as e: self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e)) - async def _on_update_n2vc_db(self, table, filter, path, updated_data): + async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None): # remove last dot from path (if exists) if path.endswith('.'): @@ -208,7 +202,6 @@ class NsLcm(LcmBase): # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}' # .format(table, filter, path, updated_data)) - try: nsr_id = filter.get('_id') @@ -218,11 +211,12 @@ class NsLcm(LcmBase): current_ns_status = nsr.get('nsState') # get vca status for NS - status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False) + status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False, vca_id=vca_id) # vcaStatus db_dict = dict() db_dict['vcaStatus'] = status_dict + await self.n2vc.update_vca_status(db_dict['vcaStatus'], vca_id=vca_id) # update configurationStatus for this VCA try: @@ -289,6 +283,47 @@ class NsLcm(LcmBase): except Exception as e: self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e)) + async def _on_update_k8s_db(self, cluster_uuid, kdu_instance, filter=None, vca_id=None): + """ + Updating vca status in NSR record + :param cluster_uuid: UUID of a k8s cluster + :param kdu_instance: The unique name of the KDU instance + :param filter: To get nsr_id + :return: none + """ + + # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}" + # .format(cluster_uuid, kdu_instance, filter)) + + try: + nsr_id = filter.get('_id') + + # get vca status for NS + vca_status = await self.k8sclusterjuju.status_kdu( + cluster_uuid, + kdu_instance, + complete_status=True, + yaml_format=False, + vca_id=vca_id, + ) + # vcaStatus + db_dict = dict() + db_dict['vcaStatus'] = {nsr_id: vca_status} + + await self.k8sclusterjuju.update_vca_status( + db_dict['vcaStatus'], + kdu_instance, + vca_id=vca_id, + ) + + # write to database + self.update_db_2("nsrs", nsr_id, db_dict) + + except (asyncio.CancelledError, asyncio.TimeoutError): + raise + except Exception as e: + self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e)) + @staticmethod def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id): try: @@ -595,7 +630,7 @@ class NsLcm(LcmBase): for param in ("vim-network-name", "vim-network-id"): if vld_params.get(param): if isinstance(vld_params[param], dict): - for vim, vim_net in vld_params[param]: + for vim, vim_net in vld_params[param].items(): other_target_vim = "vim:" + vim populate_dict(target_vld["vim_info"], (other_target_vim, param.replace("-", "_")), vim_net) else: # isinstance str @@ -664,12 +699,25 @@ class NsLcm(LcmBase): # check at nsd descriptor, if there is an ip-profile vld_params = {} - virtual_link_profiles = get_virtual_link_profiles(nsd) + nsd_vlp = find_in_list( + get_virtual_link_profiles(nsd), + lambda a_link_profile: a_link_profile["virtual-link-desc-id"] == vld["id"]) + if nsd_vlp and nsd_vlp.get("virtual-link-protocol-data") and \ + nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data"): + ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"]["l3-protocol-data"] + ip_profile_dest_data = {} + if "ip-version" in ip_profile_source_data: + ip_profile_dest_data["ip-version"] = ip_profile_source_data["ip-version"] + if "cidr" in ip_profile_source_data: + ip_profile_dest_data["subnet-address"] = ip_profile_source_data["cidr"] + if "gateway-ip" in ip_profile_source_data: + ip_profile_dest_data["gateway-address"] = ip_profile_source_data["gateway-ip"] + if "dhcp-enabled" in ip_profile_source_data: + ip_profile_dest_data["dhcp-params"] = { + "enabled": ip_profile_source_data["dhcp-enabled"] + } + vld_params["ip-profile"] = ip_profile_dest_data - for vlp in virtual_link_profiles: - ip_profile = find_in_list(nsd["ip-profiles"], - lambda profile: profile["name"] == vlp["ip-profile-ref"]) - vld_params["ip-profile"] = ip_profile["ip-profile-params"] # update vld_params with instantiation params vld_instantiation_params = find_in_list(get_iterable(ns_params, "vld"), lambda a_vld: a_vld["name"] in (vld["name"], vld["id"])) @@ -1132,6 +1180,12 @@ class NsLcm(LcmBase): raise LcmException("Configuration aborted because dependent charm/s timeout") + def get_vca_id(self, db_vnfr: dict, db_nsr: dict): + return ( + deep_get(db_vnfr, ("vca-id",)) or + deep_get(db_nsr, ("instantiate_params", "vcaId")) + ) + async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index, config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name, ee_config_descriptor): @@ -1163,14 +1217,14 @@ class NsLcm(LcmBase): if vnfr_id: element_type = 'VNF' element_under_configuration = vnfr_id - namespace += ".{}".format(vnfr_id) + namespace += ".{}-{}".format(vnfr_id, vdu_index or 0) if vdu_id: namespace += ".{}-{}".format(vdu_id, vdu_index or 0) element_type = 'VDU' element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0) osm_config["osm"]["vdu_id"] = vdu_id elif kdu_name: - namespace += ".{}".format(kdu_name) + namespace += ".{}.{}".format(kdu_name, vdu_index or 0) element_type = 'KDU' element_under_configuration = kdu_name osm_config["osm"]["kdu_name"] = kdu_name @@ -1201,12 +1255,7 @@ class NsLcm(LcmBase): # find old ee_id if exists ee_id = vca_deployed.get("ee_id") - vim_account_id = ( - deep_get(db_vnfr, ("vim-account-id",)) or - deep_get(deploy_params, ("OSM", "vim_account_id")) - ) - vca_cloud, vca_cloud_credential = self.get_vca_cloud_and_credentials(vim_account_id) - vca_k8s_cloud, vca_k8s_cloud_credential = self.get_vca_k8s_cloud_and_credentials(vim_account_id) + vca_id = self.get_vca_id(db_vnfr, db_nsr) # create or register execution environment in VCA if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"): @@ -1229,10 +1278,9 @@ class NsLcm(LcmBase): namespace=namespace, artifact_path=artifact_path, db_dict=db_dict, - cloud_name=vca_k8s_cloud, - credential_name=vca_k8s_cloud_credential, + vca_id=vca_id, ) - elif vca_type == "helm" or vca_type == "helm-v3": + elif vca_type == "helm" or vca_type == "helm-v3": ee_id, credentials = await self.vca_map[vca_type].create_execution_environment( namespace=namespace, reuse_ee_id=ee_id, @@ -1246,8 +1294,7 @@ class NsLcm(LcmBase): namespace=namespace, reuse_ee_id=ee_id, db_dict=db_dict, - cloud_name=vca_cloud, - credential_name=vca_cloud_credential, + vca_id=vca_id, ) elif vca_type == "native_charm": @@ -1286,8 +1333,7 @@ class NsLcm(LcmBase): credentials=credentials, namespace=namespace, db_dict=db_dict, - cloud_name=vca_cloud, - credential_name=vca_cloud_credential, + vca_id=vca_id, ) # for compatibility with MON/POL modules, the need model and application name at database @@ -1341,6 +1387,7 @@ class NsLcm(LcmBase): db_dict=db_dict, config=config, num_units=num_units, + vca_id=vca_id, ) # write in db flag of configuration_sw already installed @@ -1348,7 +1395,7 @@ class NsLcm(LcmBase): # add relations for this VCA (wait for other peers related with this VCA) await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id, - vca_index=vca_index, vca_type=vca_type) + vca_index=vca_index, vca_id=vca_id, vca_type=vca_type) # if SSH access is required, then get execution environment SSH public # if native charm we have waited already to VM be UP @@ -1361,7 +1408,11 @@ class NsLcm(LcmBase): # Needed to inject a ssh key user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user")) step = "Install configuration Software, getting public ssh key" - pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict) + pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key( + ee_id=ee_id, + db_dict=db_dict, + vca_id=vca_id + ) step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key) else: @@ -1429,7 +1480,8 @@ class NsLcm(LcmBase): ee_id=ee_id, primitive_name=initial_config_primitive["name"], params_dict=primitive_params_, - db_dict=db_dict + db_dict=db_dict, + vca_id=vca_id, ) # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives if check_if_terminated_needed: @@ -2016,8 +2068,15 @@ class NsLcm(LcmBase): self.logger.debug(logging_text + "Exit") self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate") - async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int, - timeout: int = 3600, vca_type: str = None) -> bool: + async def _add_vca_relations( + self, + logging_text, + nsr_id, + vca_index: int, + timeout: int = 3600, + vca_type: str = None, + vca_id: str = None, + ) -> bool: # steps: # 1. find all relations for this VCA @@ -2051,8 +2110,11 @@ class NsLcm(LcmBase): db_vnfd_list = db_nsr.get('vnfd-id') if db_vnfd_list: for vnfd in db_vnfd_list: + db_vnf_relations = None db_vnfd = self.db.get_one("vnfds", {"_id": vnfd}) - db_vnf_relations = get_configuration(db_vnfd, db_vnfd["id"]).get("relation", []) + db_vnf_configuration = get_configuration(db_vnfd, db_vnfd["id"]) + if db_vnf_configuration: + db_vnf_relations = db_vnf_configuration.get("relation", []) if db_vnf_relations: for r in db_vnf_relations: # check if this VCA is in the relation @@ -2100,7 +2162,9 @@ class NsLcm(LcmBase): ee_id_1=from_vca_ee_id, ee_id_2=to_vca_ee_id, endpoint_1=from_vca_endpoint, - endpoint_2=to_vca_endpoint) + endpoint_2=to_vca_endpoint, + vca_id=vca_id, + ) # remove entry from relations list ns_relations.remove(r) else: @@ -2146,7 +2210,9 @@ class NsLcm(LcmBase): ee_id_1=from_vca_ee_id, ee_id_2=to_vca_ee_id, endpoint_1=from_vca_endpoint, - endpoint_2=to_vca_endpoint) + endpoint_2=to_vca_endpoint, + vca_id=vca_id, + ) # remove entry from relations list vnf_relations.remove(r) else: @@ -2183,7 +2249,8 @@ class NsLcm(LcmBase): return False async def _install_kdu(self, nsr_id: str, nsr_db_path: str, vnfr_data: dict, kdu_index: int, kdud: dict, - vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600): + vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600, + vca_id: str = None): try: k8sclustertype = k8s_instance_info["k8scluster-type"] @@ -2192,7 +2259,13 @@ class NsLcm(LcmBase): "filter": {"_id": nsr_id}, "path": nsr_db_path} - kdu_instance = await self.k8scluster_map[k8sclustertype].install( + kdu_instance = self.k8scluster_map[k8sclustertype].generate_kdu_instance_name( + db_dict=db_dict_install, + kdu_model=k8s_instance_info["kdu-model"], + kdu_name=k8s_instance_info["kdu-name"], + ) + self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}) + await self.k8scluster_map[k8sclustertype].install( cluster_uuid=k8s_instance_info["k8scluster-uuid"], kdu_model=k8s_instance_info["kdu-model"], atomic=True, @@ -2200,7 +2273,10 @@ class NsLcm(LcmBase): db_dict=db_dict_install, timeout=timeout, kdu_name=k8s_instance_info["kdu-name"], - namespace=k8s_instance_info["namespace"]) + namespace=k8s_instance_info["namespace"], + kdu_instance=kdu_instance, + vca_id=vca_id, + ) self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}) # Obtain services to obtain management service ip @@ -2211,6 +2287,12 @@ class NsLcm(LcmBase): # Obtain management service info (if exists) vnfr_update_dict = {} + kdu_config = get_configuration(vnfd, kdud["name"]) + if kdu_config: + target_ee_list = kdu_config.get("execution-environment-list", []) + else: + target_ee_list = [] + if services: vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services mgmt_services = [service for service in kdud.get("service", []) if service.get("mgmt-service")] @@ -2230,6 +2312,11 @@ class NsLcm(LcmBase): if deep_get(vnfd, ("mgmt-interface", "cp")) == service_external_cp: vnfr_update_dict["ip-address"] = ip + if find_in_list( + target_ee_list, + lambda ee: ee.get("external-connection-point-ref", "") == service_external_cp + ): + vnfr_update_dict["kdur.{}.ip-address".format(kdu_index)] = ip break else: self.logger.warn("Mgmt service name: {} not found".format(mgmt_service["name"])) @@ -2251,8 +2338,11 @@ class NsLcm(LcmBase): cluster_uuid=k8s_instance_info["k8scluster-uuid"], kdu_instance=kdu_instance, primitive_name=initial_config_primitive["name"], - params=primitive_params_, db_dict={}), - timeout=timeout) + params=primitive_params_, db_dict=db_dict_install, + vca_id=vca_id, + ), + timeout=timeout + ) except Exception as e: # Prepare update db with error and raise exception @@ -2323,6 +2413,7 @@ class NsLcm(LcmBase): updated_v3_cluster_list = [] for vnfr_data in db_vnfrs.values(): + vca_id = self.get_vca_id(vnfr_data, {}) for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")): # Step 0: Prepare and set parameters desc_params = parse_yaml_strings(kdur.get("additionalParams")) @@ -2400,7 +2491,7 @@ class NsLcm(LcmBase): vnfd_with_id = find_in_list(db_vnfds, lambda vnf: vnf["_id"] == vnfd_id) task = asyncio.ensure_future( self._install_kdu(nsr_id, db_path, vnfr_data, kdu_index, kdud, vnfd_with_id, - k8s_instance_info, k8params=desc_params, timeout=600)) + k8s_instance_info, k8params=desc_params, timeout=600, vca_id=vca_id)) self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task) task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"]) @@ -2733,8 +2824,18 @@ class NsLcm(LcmBase): if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id: return vca["ee_id"] - async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor, - vca_index, destroy_ee=True, exec_primitives=True): + async def destroy_N2VC( + self, + logging_text, + db_nslcmop, + vca_deployed, + config_descriptor, + vca_index, + destroy_ee=True, + exec_primitives=True, + scaling_in=False, + vca_id: str = None, + ): """ Execute the terminate primitives and destroy the execution environment (if destroy_ee=False :param logging_text: @@ -2745,6 +2846,7 @@ class NsLcm(LcmBase): :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has not executed properly + :param scaling_in: True destroys the application, False destroys the model :return: None or exception """ @@ -2784,9 +2886,12 @@ class NsLcm(LcmBase): mapped_primitive_params) # Sub-operations: Call _ns_execute_primitive() instead of action() try: - result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive, - mapped_primitive_params, - vca_type=vca_type) + result, result_detail = await self._ns_execute_primitive( + vca_deployed["ee_id"], primitive, + mapped_primitive_params, + vca_type=vca_type, + vca_id=vca_id, + ) except LcmException: # this happens when VCA is not deployed. In this case it is not needed to terminate continue @@ -2802,13 +2907,21 @@ class NsLcm(LcmBase): await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"]) if destroy_ee: - await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"]) + await self.vca_map[vca_type].delete_execution_environment( + vca_deployed["ee_id"], + scaling_in=scaling_in, + vca_id=vca_id, + ) - async def _delete_all_N2VC(self, db_nsr: dict): + async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None): self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING') namespace = "." + db_nsr["_id"] try: - await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete) + await self.n2vc.delete_namespace( + namespace=namespace, + total_timeout=self.timeout_charm_delete, + vca_id=vca_id, + ) except N2VCNotFound: # already deleted. Skip pass self._write_all_config_status(db_nsr=db_nsr, status='DELETED') @@ -3008,6 +3121,7 @@ class NsLcm(LcmBase): stage[1] = "Getting vnf descriptors from db." db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}) + db_vnfrs_dict = {db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list} db_vnfds_from_id = {} db_vnfds_from_member_index = {} # Loop over VNFRs @@ -3030,6 +3144,8 @@ class NsLcm(LcmBase): for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")): config_descriptor = None + + vca_id = self.get_vca_id(db_vnfrs_dict[vca["member-vnf-index"]], db_nsr) if not vca or not vca.get("ee_id"): continue if not vca.get("member-vnf-index"): @@ -3043,7 +3159,7 @@ class NsLcm(LcmBase): config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name")) else: db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]] - config_descriptor = get_configuration(db_vnfd, db_vnfd["id"]) + config_descriptor = get_configuration(db_vnfd, db_vnfd["id"]) vca_type = vca.get("type") exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and vca.get("needed_terminate")) @@ -3053,8 +3169,17 @@ class NsLcm(LcmBase): # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format( # vca_index, vca.get("ee_id"), vca_type, destroy_ee)) task = asyncio.ensure_future( - self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, vca_index, - destroy_ee, exec_terminate_primitives)) + self.destroy_N2VC( + logging_text, + db_nslcmop, + vca, + config_descriptor, + vca_index, + destroy_ee, + exec_terminate_primitives, + vca_id=vca_id, + ) + ) tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id")) # wait for pending tasks of terminate primitives @@ -3073,8 +3198,13 @@ class NsLcm(LcmBase): if nsr_deployed.get("VCA"): stage[1] = "Deleting all execution environments." self.logger.debug(logging_text + stage[1]) - task_delete_ee = asyncio.ensure_future(asyncio.wait_for(self._delete_all_N2VC(db_nsr=db_nsr), - timeout=self.timeout_charm_delete)) + vca_id = self.get_vca_id({}, db_nsr) + task_delete_ee = asyncio.ensure_future( + asyncio.wait_for( + self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id), + timeout=self.timeout_charm_delete + ) + ) # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id)) tasks_dict_info[task_delete_ee] = "Terminating all VCA" @@ -3087,10 +3217,15 @@ class NsLcm(LcmBase): continue kdu_instance = kdu.get("kdu-instance") if kdu.get("k8scluster-type") in self.k8scluster_map: + # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs + vca_id = self.get_vca_id({}, db_nsr) task_delete_kdu_instance = asyncio.ensure_future( self.k8scluster_map[kdu["k8scluster-type"]].uninstall( cluster_uuid=kdu.get("k8scluster-uuid"), - kdu_instance=kdu_instance)) + kdu_instance=kdu_instance, + vca_id=vca_id, + ) + ) else: self.logger.error(logging_text + "Unknown k8s deployment type {}". format(kdu.get("k8scluster-type"))) @@ -3322,8 +3457,18 @@ class NsLcm(LcmBase): .format(member_vnf_index, vdu_id, kdu_name, vdu_count_index)) return ee_id, vca_type - async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0, retries_interval=30, - timeout=None, vca_type=None, db_dict=None) -> (str, str): + async def _ns_execute_primitive( + self, + ee_id, + primitive, + primitive_params, + retries=0, + retries_interval=30, + timeout=None, + vca_type=None, + db_dict=None, + vca_id: str = None, + ) -> (str, str): try: if primitive == "config": primitive_params = {"params": primitive_params} @@ -3339,7 +3484,9 @@ class NsLcm(LcmBase): params_dict=primitive_params, progress_timeout=self.timeout_progress_primitive, total_timeout=self.timeout_primitive, - db_dict=db_dict), + db_dict=db_dict, + vca_id=vca_id, + ), timeout=timeout or self.timeout_primitive) # execution was OK break @@ -3363,6 +3510,30 @@ class NsLcm(LcmBase): except Exception as e: return 'FAIL', 'Error executing action {}: {}'.format(primitive, e) + async def vca_status_refresh(self, nsr_id, nslcmop_id): + """ + Updating the vca_status with latest juju information in nsrs record + :param: nsr_id: Id of the nsr + :param: nslcmop_id: Id of the nslcmop + :return: None + """ + + self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id)) + db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) + vca_id = self.get_vca_id({}, db_nsr) + if db_nsr['_admin']['deployed']['K8s']: + for k8s_index, k8s in enumerate(db_nsr['_admin']['deployed']['K8s']): + cluster_uuid, kdu_instance = k8s["k8scluster-uuid"], k8s["kdu-instance"] + await self._on_update_k8s_db(cluster_uuid, kdu_instance, filter={'_id': nsr_id}, vca_id=vca_id) + else: + for vca_index, _ in enumerate(db_nsr['_admin']['deployed']['VCA']): + table, filter = "nsrs", {"_id": nsr_id} + path = "_admin.deployed.VCA.{}.".format(vca_index) + await self._on_update_n2vc_db(table, filter, path, {}) + + self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id)) + self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh") + async def action(self, nsr_id, nslcmop_id): # Try to lock HA task here task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id) @@ -3413,6 +3584,7 @@ class NsLcm(LcmBase): step = "Getting nsd from database" db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]}) + vca_id = self.get_vca_id(db_vnfr, db_nsr) # for backward compatibility if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict): nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values()) @@ -3517,8 +3689,11 @@ class NsLcm(LcmBase): detailed_status = await asyncio.wait_for( self.k8scluster_map[kdu["k8scluster-type"]].status_kdu( cluster_uuid=kdu.get("k8scluster-uuid"), - kdu_instance=kdu.get("kdu-instance")), - timeout=timeout_ns_action) + kdu_instance=kdu.get("kdu-instance"), + vca_id=vca_id, + ), + timeout=timeout_ns_action + ) else: kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(kdu["kdu-name"], nsr_id) params = self._map_primitive_params(config_primitive_desc, primitive_params, desc_params) @@ -3529,8 +3704,11 @@ class NsLcm(LcmBase): kdu_instance=kdu_instance, primitive_name=primitive_name, params=params, db_dict=db_dict, - timeout=timeout_ns_action), - timeout=timeout_ns_action) + timeout=timeout_ns_action, + vca_id=vca_id, + ), + timeout=timeout_ns_action + ) if detailed_status: nslcmop_operation_state = 'COMPLETED' @@ -3541,16 +3719,21 @@ class NsLcm(LcmBase): ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"], member_vnf_index=vnf_index, vdu_id=vdu_id, vdu_count_index=vdu_count_index, ee_descriptor_id=ee_descriptor_id) - db_nslcmop_notif = {"collection": "nslcmops", - "filter": {"_id": nslcmop_id}, - "path": "admin.VCA"} + for vca_index, vca_deployed in enumerate(db_nsr['_admin']['deployed']['VCA']): + if vca_deployed.get("member-vnf-index") == vnf_index: + db_dict = {"collection": "nsrs", + "filter": {"_id": nsr_id}, + "path": "_admin.deployed.VCA.{}.".format(vca_index)} + break nslcmop_operation_state, detailed_status = await self._ns_execute_primitive( ee_id, primitive=primitive_name, primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params), timeout=timeout_ns_action, vca_type=vca_type, - db_dict=db_nslcmop_notif) + db_dict=db_dict, + vca_id=vca_id, + ) db_nslcmop_update["detailed-status"] = detailed_status error_description_nslcmop = detailed_status if nslcmop_operation_state == "FAILED" else "" @@ -3608,6 +3791,7 @@ class NsLcm(LcmBase): logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id) stage = ['', '', ''] + tasks_dict_info = {} # ^ stage, step, VIM progress self.logger.debug(logging_text + "Enter") # get all needed from database @@ -3619,6 +3803,7 @@ class NsLcm(LcmBase): scale_process = None old_operational_status = "" old_config_status = "" + nsi_id = None try: # wait for any previous tasks in process step = "Waiting for previous operations to terminate" @@ -3660,9 +3845,13 @@ class NsLcm(LcmBase): step = "Getting vnfr from database" db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}) + vca_id = self.get_vca_id(db_vnfr, db_nsr) + step = "Getting vnfd from database" db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]}) + base_folder = db_vnfd["_admin"]["storage"] + step = "Getting scaling-group-descriptor" scaling_descriptor = find_in_list( get_scaling_aspect( @@ -3689,6 +3878,7 @@ class NsLcm(LcmBase): admin_scale_index += 1 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group RO_scaling_info = [] + VCA_scaling_info = [] vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []} if scaling_type == "SCALE_OUT": if "aspect-delta-details" not in scaling_descriptor: @@ -3705,7 +3895,7 @@ class NsLcm(LcmBase): for delta in deltas: for vdu_delta in delta["vdu-delta"]: vdud = get_vdu(db_vnfd, vdu_delta["id"]) - vdu_index = get_vdu_index(db_vnfr, vdu_delta["id"]) + vdu_index = get_vdur_index(db_vnfr, vdu_delta) cloud_init_text = self._get_vdu_cloud_init_content(vdud, db_vnfd) if cloud_init_text: additional_params = self._get_vdu_additional_params(db_vnfr, vdud["id"]) or {} @@ -3716,11 +3906,11 @@ class NsLcm(LcmBase): if vdu_profile and "max-number-of-instances" in vdu_profile: max_instance_count = vdu_profile.get("max-number-of-instances", 10) - deafult_instance_num = get_number_of_instances(db_vnfd, vdud["id"]) + default_instance_num = get_number_of_instances(db_vnfd, vdud["id"]) nb_scale_op += vdu_delta.get("number-of-instances", 1) - if nb_scale_op + deafult_instance_num > max_instance_count: + if nb_scale_op + default_instance_num > max_instance_count: raise LcmException( "reached the limit of {} (max-instance-count) " "scaling-out operations for the " @@ -3742,6 +3932,14 @@ class NsLcm(LcmBase): vdud["id"] ) ) + VCA_scaling_info.append( + { + "osm_vdu_id": vdu_delta["id"], + "member-vnf-index": vnf_index, + "type": "create", + "vdu_index": vdu_index + x + } + ) RO_scaling_info.append( { "osm_vdu_id": vdu_delta["id"], @@ -3763,21 +3961,32 @@ class NsLcm(LcmBase): deltas = scaling_descriptor.get("aspect-delta-details")["deltas"] for delta in deltas: for vdu_delta in delta["vdu-delta"]: + vdu_index = get_vdur_index(db_vnfr, vdu_delta) min_instance_count = 0 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"]) if vdu_profile and "min-number-of-instances" in vdu_profile: min_instance_count = vdu_profile["min-number-of-instances"] - deafult_instance_num = get_number_of_instances(db_vnfd, vdu_delta["id"]) + default_instance_num = get_number_of_instances(db_vnfd, vdu_delta["id"]) nb_scale_op -= vdu_delta.get("number-of-instances", 1) - if nb_scale_op + deafult_instance_num < min_instance_count: + if nb_scale_op + default_instance_num < min_instance_count: raise LcmException( "reached the limit of {} (min-instance-count) scaling-in operations for the " "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group) ) RO_scaling_info.append({"osm_vdu_id": vdu_delta["id"], "member-vnf-index": vnf_index, - "type": "delete", "count": vdu_delta.get("number-of-instances", 1)}) + "type": "delete", "count": vdu_delta.get("number-of-instances", 1), + "vdu_index": vdu_index - 1}) + for x in range(vdu_delta.get("number-of-instances", 1)): + VCA_scaling_info.append( + { + "osm_vdu_id": vdu_delta["id"], + "member-vnf-index": vnf_index, + "type": "delete", + "vdu_index": vdu_index - 1 - x + } + ) vdu_scaling_info["vdu-delete"][vdu_delta["id"]] = vdu_delta.get("number-of-instances", 1) # update VDU_SCALING_INFO with the VDUs to delete ip_addresses @@ -3863,7 +4072,11 @@ class NsLcm(LcmBase): vdu_count_index=None, ee_descriptor_id=ee_descriptor_id) result, result_detail = await self._ns_execute_primitive( - ee_id, primitive_name, primitive_params, vca_type) + ee_id, primitive_name, + primitive_params, + vca_type=vca_type, + vca_id=vca_id, + ) self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format( vnf_config_primitive, result, result_detail)) # Update operationState = COMPLETED | FAILED @@ -3879,6 +4092,77 @@ class NsLcm(LcmBase): db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time() + # SCALE-IN VCA - BEGIN + if VCA_scaling_info: + step = db_nslcmop_update["detailed-status"] = \ + "Deleting the execution environments" + scale_process = "VCA" + for vdu_info in VCA_scaling_info: + if vdu_info["type"] == "delete": + member_vnf_index = str(vdu_info["member-vnf-index"]) + self.logger.debug(logging_text + "vdu info: {}".format(vdu_info)) + vdu_id = vdu_info["osm_vdu_id"] + vdu_index = int(vdu_info["vdu_index"]) + stage[1] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format( + member_vnf_index, vdu_id, vdu_index) + stage[2] = step = "Scaling in VCA" + self._write_op_status( + op_id=nslcmop_id, + stage=stage + ) + vca_update = db_nsr["_admin"]["deployed"]["VCA"] + config_update = db_nsr["configurationStatus"] + for vca_index, vca in enumerate(vca_update): + if (vca or vca.get("ee_id")) and vca["member-vnf-index"] == member_vnf_index and \ + vca["vdu_count_index"] == vdu_index: + if vca.get("vdu_id"): + config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id")) + elif vca.get("kdu_name"): + config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name")) + else: + config_descriptor = get_configuration(db_vnfd, db_vnfd["id"]) + operation_params = db_nslcmop.get("operationParams") or {} + exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and + vca.get("needed_terminate")) + task = asyncio.ensure_future( + asyncio.wait_for( + self.destroy_N2VC( + logging_text, + db_nslcmop, + vca, + config_descriptor, + vca_index, + destroy_ee=True, + exec_primitives=exec_terminate_primitives, + scaling_in=True, + vca_id=vca_id, + ), + timeout=self.timeout_charm_delete + ) + ) + tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id")) + del vca_update[vca_index] + del config_update[vca_index] + # wait for pending tasks of terminate primitives + if tasks_dict_info: + self.logger.debug(logging_text + + 'Waiting for tasks {}'.format(list(tasks_dict_info.keys()))) + error_list = await self._wait_for_tasks(logging_text, tasks_dict_info, + min(self.timeout_charm_delete, + self.timeout_ns_terminate), + stage, nslcmop_id) + tasks_dict_info.clear() + if error_list: + raise LcmException("; ".join(error_list)) + + db_vca_and_config_update = { + "_admin.deployed.VCA": vca_update, + "configurationStatus": config_update + } + self.update_db_2("nsrs", db_nsr["_id"], db_vca_and_config_update) + scale_process = None + # SCALE-IN VCA - END + # SCALE RO - BEGIN if RO_scaling_info: scale_process = "RO" @@ -3890,6 +4174,86 @@ class NsLcm(LcmBase): scale_process = None if db_nsr_update: self.update_db_2("nsrs", nsr_id, db_nsr_update) + # SCALE RO - END + + # SCALE-UP VCA - BEGIN + if VCA_scaling_info: + step = db_nslcmop_update["detailed-status"] = \ + "Creating new execution environments" + scale_process = "VCA" + for vdu_info in VCA_scaling_info: + if vdu_info["type"] == "create": + member_vnf_index = str(vdu_info["member-vnf-index"]) + self.logger.debug(logging_text + "vdu info: {}".format(vdu_info)) + vnfd_id = db_vnfr["vnfd-ref"] + vdu_index = int(vdu_info["vdu_index"]) + deploy_params = {"OSM": get_osm_params(db_vnfr)} + if db_vnfr.get("additionalParamsForVnf"): + deploy_params.update(parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy())) + descriptor_config = get_configuration(db_vnfd, db_vnfd["id"]) + if descriptor_config: + vdu_id = None + vdu_name = None + kdu_name = None + self._deploy_n2vc( + logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index), + db_nsr=db_nsr, + db_vnfr=db_vnfr, + nslcmop_id=nslcmop_id, + nsr_id=nsr_id, + nsi_id=nsi_id, + vnfd_id=vnfd_id, + vdu_id=vdu_id, + kdu_name=kdu_name, + member_vnf_index=member_vnf_index, + vdu_index=vdu_index, + vdu_name=vdu_name, + deploy_params=deploy_params, + descriptor_config=descriptor_config, + base_folder=base_folder, + task_instantiation_info=tasks_dict_info, + stage=stage + ) + vdu_id = vdu_info["osm_vdu_id"] + vdur = find_in_list(db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id) + descriptor_config = get_configuration(db_vnfd, vdu_id) + if vdur.get("additionalParams"): + deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"]) + else: + deploy_params_vdu = deploy_params + deploy_params_vdu["OSM"] = get_osm_params(db_vnfr, vdu_id, vdu_count_index=vdu_index) + if descriptor_config: + vdu_name = None + kdu_name = None + stage[1] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format( + member_vnf_index, vdu_id, vdu_index) + stage[2] = step = "Scaling out VCA" + self._write_op_status( + op_id=nslcmop_id, + stage=stage + ) + self._deploy_n2vc( + logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format( + member_vnf_index, vdu_id, vdu_index), + db_nsr=db_nsr, + db_vnfr=db_vnfr, + nslcmop_id=nslcmop_id, + nsr_id=nsr_id, + nsi_id=nsi_id, + vnfd_id=vnfd_id, + vdu_id=vdu_id, + kdu_name=kdu_name, + member_vnf_index=member_vnf_index, + vdu_index=vdu_index, + vdu_name=vdu_name, + deploy_params=deploy_params_vdu, + descriptor_config=descriptor_config, + base_folder=base_folder, + task_instantiation_info=tasks_dict_info, + stage=stage + ) + # SCALE-UP VCA - END + scale_process = None # POST-SCALE BEGIN # execute primitive service POST-SCALING @@ -3955,7 +4319,12 @@ class NsLcm(LcmBase): vdu_count_index=None, ee_descriptor_id=ee_descriptor_id) result, result_detail = await self._ns_execute_primitive( - ee_id, primitive_name, primitive_params, vca_type) + ee_id, + primitive_name, + primitive_params, + vca_type=vca_type, + vca_id=vca_id, + ) self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format( vnf_config_primitive, result, result_detail)) # Update operationState = COMPLETED | FAILED @@ -3984,6 +4353,11 @@ class NsLcm(LcmBase): self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True) finally: self._write_ns_status(nsr_id=nsr_id, ns_state=None, current_operation="IDLE", current_operation_id=None) + if tasks_dict_info: + stage[1] = "Waiting for instantiate pending tasks." + self.logger.debug(logging_text + stage[1]) + exc = await self._wait_for_tasks(logging_text, tasks_dict_info, self.timeout_ns_deploy, + stage, nslcmop_id, nsr_id=nsr_id) if exc: db_nslcmop_update["detailed-status"] = error_description_nslcmop = "FAILED {}: {}".format(step, exc) nslcmop_operation_state = "FAILED"