X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Fns.py;h=d57b739b8f92c5d11e38f0581862f64012a70101;hb=1b782761acbcbbe4666d667a0f04a44f136ef1a3;hp=8ec39ade75e47dc3ada215ab6f7bca31e272fc0a;hpb=a0c6bafeb1998041e171aa87b2134f69d650a9c0;p=osm%2FLCM.git diff --git a/osm_lcm/ns.py b/osm_lcm/ns.py index 8ec39ad..d57b739 100644 --- a/osm_lcm/ns.py +++ b/osm_lcm/ns.py @@ -17,6 +17,7 @@ ## import asyncio +import shutil from typing import Any, Dict, List import yaml import logging @@ -55,6 +56,8 @@ from osm_lcm.lcm_utils import ( deep_get, get_iterable, populate_dict, + check_juju_bundle_existence, + get_charm_artifact_path, ) from osm_lcm.data_utils.nsd import ( get_ns_configuration_relation_list, @@ -62,6 +65,8 @@ from osm_lcm.data_utils.nsd import ( get_vnf_profiles, ) from osm_lcm.data_utils.vnfd import ( + get_kdu, + get_kdu_services, get_relation_list, get_vdu_list, get_vdu_profile, @@ -76,9 +81,15 @@ from osm_lcm.data_utils.vnfd import ( get_number_of_instances, get_juju_ee_ref, get_kdu_resource_profile, + find_software_version, ) from osm_lcm.data_utils.list_utils import find_in_list -from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index, get_kdur +from osm_lcm.data_utils.vnfr import ( + get_osm_params, + get_vdur_index, + get_kdur, + get_volumes_from_instantiation_params, +) from osm_lcm.data_utils.dict_utils import parse_yaml_strings from osm_lcm.data_utils.database.vim_account import VimAccountDB from n2vc.definitions import RelationEndpoint @@ -96,6 +107,7 @@ from n2vc.n2vc_juju_conn import N2VCJujuConnector from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException from osm_lcm.lcm_helm_conn import LCMHelmConn +from osm_lcm.osm_config import OsmConfigBuilder from osm_lcm.prometheus import parse_job from copy import copy, deepcopy @@ -115,9 +127,11 @@ class NsLcm(LcmBase): timeout_ns_terminate = 1800 # default global timeout for un deployment a ns timeout_charm_delete = 10 * 60 timeout_primitive = 30 * 60 # timeout for primitive execution + timeout_ns_update = 30 * 60 # timeout for ns update timeout_progress_primitive = ( 10 * 60 ) # timeout for some progress in a primitive execution + timeout_migrate = 1800 # default global timeout for migrating vnfs SUBOPERATION_STATUS_NOT_FOUND = -1 SUBOPERATION_STATUS_NEW = -2 @@ -348,43 +362,49 @@ class NsLcm(LcmBase): self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e)) async def _on_update_k8s_db( - self, cluster_uuid, kdu_instance, filter=None, vca_id=None + self, cluster_uuid, kdu_instance, filter=None, vca_id=None, cluster_type="juju" ): """ Updating vca status in NSR record :param cluster_uuid: UUID of a k8s cluster :param kdu_instance: The unique name of the KDU instance :param filter: To get nsr_id + :cluster_type: The cluster type (juju, k8s) :return: none """ # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}" # .format(cluster_uuid, kdu_instance, filter)) + nsr_id = filter.get("_id") try: - nsr_id = filter.get("_id") - - # get vca status for NS - vca_status = await self.k8sclusterjuju.status_kdu( - cluster_uuid, - kdu_instance, - complete_status=True, + vca_status = await self.k8scluster_map[cluster_type].status_kdu( + cluster_uuid=cluster_uuid, + kdu_instance=kdu_instance, yaml_format=False, + complete_status=True, vca_id=vca_id, ) + # vcaStatus db_dict = dict() db_dict["vcaStatus"] = {nsr_id: vca_status} - await self.k8sclusterjuju.update_vca_status( - db_dict["vcaStatus"], - kdu_instance, - vca_id=vca_id, + if cluster_type in ("juju-bundle", "juju"): + # TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA + # status in a similar way between Juju Bundles and Helm Charts on this side + await self.k8sclusterjuju.update_vca_status( + db_dict["vcaStatus"], + kdu_instance, + vca_id=vca_id, + ) + + self.logger.debug( + f"Obtained VCA status for cluster type '{cluster_type}': {vca_status}" ) # write to database self.update_db_2("nsrs", nsr_id, db_dict) - except (asyncio.CancelledError, asyncio.TimeoutError): raise except Exception as e: @@ -440,7 +460,7 @@ class NsLcm(LcmBase): def _get_vdu_additional_params(self, db_vnfr, vdu_id): vdur = next( - vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"] + (vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]), {} ) additional_params = vdur.get("additionalParams") return parse_yaml_strings(additional_params) @@ -517,6 +537,7 @@ class NsLcm(LcmBase): def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False): db_vdu_push_list = [] + template_vdur = [] db_update = {"_admin.modified": time()} if vdu_create: for vdu_id, vdu_count in vdu_create.items(): @@ -529,12 +550,25 @@ class NsLcm(LcmBase): None, ) if not vdur: - raise LcmException( - "Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format( - vdu_id + # Read the template saved in the db: + self.logger.debug( + "No vdur in the database. Using the vdur-template to scale" + ) + vdur_template = db_vnfr.get("vdur-template") + if not vdur_template: + raise LcmException( + "Error scaling OUT VNFR for {}. No vnfr or template exists".format( + vdu_id + ) ) + vdur = vdur_template[0] + # Delete a template from the database after using it + self.db.set_one( + "vnfrs", + {"_id": db_vnfr["_id"]}, + None, + pull={"vdur-template": {"_id": vdur["_id"]}}, ) - for count in range(vdu_count): vdur_copy = deepcopy(vdur) vdur_copy["status"] = "BUILD" @@ -559,12 +593,19 @@ class NsLcm(LcmBase): ) else: iface.pop("mac-address", None) - iface.pop( - "mgmt_vnf", None - ) # only first vdu can be managment of vnf + if db_vnfr["vdur"]: + iface.pop( + "mgmt_vnf", None + ) # only first vdu can be managment of vnf db_vdu_push_list.append(vdur_copy) # self.logger.debug("scale out, adding vdu={}".format(vdur_copy)) if vdu_delete: + if len(db_vnfr["vdur"]) == 1: + # The scale will move to 0 instances + self.logger.debug( + "Scaling to 0 !, creating the template with the last vdur" + ) + template_vdur = [db_vnfr["vdur"][0]] for vdu_id, vdu_count in vdu_delete.items(): if mark_delete: indexes_to_delete = [ @@ -592,7 +633,14 @@ class NsLcm(LcmBase): None, pull={"vdur": {"_id": vdu["_id"]}}, ) - db_push = {"vdur": db_vdu_push_list} if db_vdu_push_list else None + db_push = {} + if db_vdu_push_list: + db_push["vdur"] = db_vdu_push_list + if template_vdur: + db_push["vdur-template"] = template_vdur + if not db_push: + db_push = None + db_vnfr["vdur-template"] = template_vdur self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push) # modify passed dictionary db_vnfr db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]}) @@ -822,6 +870,37 @@ class NsLcm(LcmBase): if vld_params.get("common_id"): target_vld["common_id"] = vld_params.get("common_id") + # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account + def update_ns_vld_target(target, ns_params): + for vnf_params in ns_params.get("vnf", ()): + if vnf_params.get("vimAccountId"): + target_vnf = next( + ( + vnfr + for vnfr in db_vnfrs.values() + if vnf_params["member-vnf-index"] + == vnfr["member-vnf-index-ref"] + ), + None, + ) + vdur = next((vdur for vdur in target_vnf.get("vdur", ())), None) + for a_index, a_vld in enumerate(target["ns"]["vld"]): + target_vld = find_in_list( + get_iterable(vdur, "interfaces"), + lambda iface: iface.get("ns-vld-id") == a_vld["name"], + ) + if target_vld: + if vnf_params.get("vimAccountId") not in a_vld.get( + "vim_info", {} + ): + target["ns"]["vld"][a_index].get("vim_info").update( + { + "vim:{}".format(vnf_params["vimAccountId"]): { + "vim_network_name": "" + } + } + ) + nslcmop_id = db_nslcmop["_id"] target = { "name": db_nsr["name"], @@ -836,6 +915,14 @@ class NsLcm(LcmBase): image["vim_info"] = {} for flavor in target["flavor"]: flavor["vim_info"] = {} + if db_nsr.get("affinity-or-anti-affinity-group"): + target["affinity-or-anti-affinity-group"] = deepcopy( + db_nsr["affinity-or-anti-affinity-group"] + ) + for affinity_or_anti_affinity_group in target[ + "affinity-or-anti-affinity-group" + ]: + affinity_or_anti_affinity_group["vim_info"] = {} if db_nslcmop.get("lcmOperationType") != "instantiate": # get parameters of instantiation: @@ -937,6 +1024,8 @@ class NsLcm(LcmBase): vld_params.update(vld_instantiation_params) parse_vld_instantiation_params(target_vim, target_vld, vld_params, None) target["ns"]["vld"].append(target_vld) + # Update the target ns_vld if vnf vim_account is overriden by instantiation params + update_ns_vld_target(target, ns_params) for vnfr in db_vnfrs.values(): vnfd = find_in_list( @@ -1126,11 +1215,26 @@ class NsLcm(LcmBase): if target_vim not in ns_image["vim_info"]: ns_image["vim_info"][target_vim] = {} + # Affinity groups + if vdur.get("affinity-or-anti-affinity-group-id"): + for ags_id in vdur["affinity-or-anti-affinity-group-id"]: + ns_ags = target["affinity-or-anti-affinity-group"][int(ags_id)] + if target_vim not in ns_ags["vim_info"]: + ns_ags["vim_info"][target_vim] = {} + vdur["vim_info"] = {target_vim: {}} # instantiation parameters - # if vnf_params: - # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] == - # vdud["id"]), None) + if vnf_params: + vdu_instantiation_params = find_in_list( + get_iterable(vnf_params, "vdu"), + lambda i_vdu: i_vdu["id"] == vdud["id"], + ) + if vdu_instantiation_params: + # Parse the vdu_volumes from the instantiation params + vdu_volumes = get_volumes_from_instantiation_params( + vdu_instantiation_params, vdud + ) + vdur["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes vdur_list.append(vdur) target_vnf["vdur"] = vdur_list target["vnf"].append(target_vnf) @@ -1341,7 +1445,7 @@ class NsLcm(LcmBase): :param nsr_id: :param vnfr_id: :param kdu_name: - :return: IP address + :return: IP address, K8s services """ # self.logger.debug(logging_text + "Starting wait_kdu_up") @@ -1363,7 +1467,7 @@ class NsLcm(LcmBase): ) if kdur.get("status"): if kdur["status"] in ("READY", "ENABLED"): - return kdur.get("ip-address") + return kdur.get("ip-address"), kdur.get("services") else: raise LcmException( "target KDU={} is in error state".format(kdu_name) @@ -1665,7 +1769,8 @@ class NsLcm(LcmBase): base_folder["folder"], base_folder["pkg-dir"], "charms" - if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") + if vca_type + in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts", vca_name, ) @@ -1673,7 +1778,8 @@ class NsLcm(LcmBase): artifact_path = "{}/Scripts/{}/{}/".format( base_folder["folder"], "charms" - if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") + if vca_type + in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts", vca_name, ) @@ -1900,14 +2006,47 @@ class NsLcm(LcmBase): step = "Waiting to VM being up and getting IP address" self.logger.debug(logging_text + step) + # default rw_mgmt_ip to None, avoiding the non definition of the variable + rw_mgmt_ip = None + # n2vc_redesign STEP 5.1 # wait for RO (ip-address) Insert pub_key into VM if vnfr_id: if kdu_name: - rw_mgmt_ip = await self.wait_kdu_up( + rw_mgmt_ip, services = await self.wait_kdu_up( logging_text, nsr_id, vnfr_id, kdu_name ) - else: + vnfd = self.db.get_one( + "vnfds_revisions", + {"_id": f'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'}, + ) + kdu = get_kdu(vnfd, kdu_name) + kdu_services = [ + service["name"] for service in get_kdu_services(kdu) + ] + exposed_services = [] + for service in services: + if any(s in service["name"] for s in kdu_services): + exposed_services.append(service) + await self.vca_map[vca_type].exec_primitive( + ee_id=ee_id, + primitive_name="config", + params_dict={ + "osm-config": json.dumps( + OsmConfigBuilder( + k8s={"services": exposed_services} + ).build() + ) + }, + vca_id=vca_id, + ) + + # This verification is needed in order to avoid trying to add a public key + # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA + # for a KNF and not for its KDUs, the previous verification gives False, and the code + # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF + # or it is a KNF) + elif db_vnfr.get('vdur'): rw_mgmt_ip = await self.wait_vm_up_insert_key_ro( logging_text, nsr_id, @@ -1917,8 +2056,6 @@ class NsLcm(LcmBase): user=user, pub_key=pub_key, ) - else: - rw_mgmt_ip = None # This is for a NS configuration self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip)) @@ -2004,12 +2141,10 @@ class NsLcm(LcmBase): for job in prometheus_jobs: self.db.set_one( "prometheus_jobs", - { - "job_name": job["job_name"] - }, + {"job_name": job["job_name"]}, job, upsert=True, - fail_on_empty=False + fail_on_empty=False, ) step = "instantiated at VCA" @@ -2313,7 +2448,9 @@ class NsLcm(LcmBase): kdur_list = [] for kdur in vnfr["kdur"]: if kdur.get("additionalParams"): - kdur["additionalParams"] = json.loads(kdur["additionalParams"]) + kdur["additionalParams"] = json.loads( + kdur["additionalParams"] + ) kdur_list.append(kdur) vnfr["kdur"] = kdur_list @@ -2511,8 +2648,8 @@ class NsLcm(LcmBase): ) deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)} if kdur.get("additionalParams"): - deploy_params_kdu = parse_yaml_strings( - kdur["additionalParams"] + deploy_params_kdu.update( + parse_yaml_strings(kdur["additionalParams"].copy()) ) self._deploy_n2vc( @@ -2784,7 +2921,9 @@ class NsLcm(LcmBase): if requirer_id != nsd["id"]: requirer_dict["vnf-profile-id"] = requirer_id else: - raise Exception("provider/requirer or entities must be included in the relation.") + raise Exception( + "provider/requirer or entities must be included in the relation." + ) relation_provider = self._update_ee_relation_data_with_implicit_data( nsr_id, nsd, provider_dict, cached_vnfds ) @@ -2836,7 +2975,9 @@ class NsLcm(LcmBase): if requirer_id != vnfd_id: requirer_dict["vdu-profile-id"] = requirer_id else: - raise Exception("provider/requirer or entities must be included in the relation.") + raise Exception( + "provider/requirer or entities must be included in the relation." + ) relation_provider = self._update_ee_relation_data_with_implicit_data( nsr_id, nsd, provider_dict, cached_vnfds, vnf_profile_id=vnf_profile_id ) @@ -3083,9 +3224,45 @@ class NsLcm(LcmBase): kdu_model=k8s_instance_info["kdu-model"], kdu_name=k8s_instance_info["kdu-name"], ) + + # Update the nsrs table with the kdu-instance value self.update_db_2( - "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance} + item="nsrs", + _id=nsr_id, + _desc={nsr_db_path + ".kdu-instance": kdu_instance}, ) + + # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or + # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace + # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous + # namespace, this first verification could be removed, and the next step would be done for any kind + # of KNF. + # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based + # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027) + if k8sclustertype in ("juju", "juju-bundle"): + # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means + # that the user passed a namespace which he wants its KDU to be deployed in) + if ( + self.db.count( + table="nsrs", + q_filter={ + "_id": nsr_id, + "_admin.projects_write": k8s_instance_info["namespace"], + "_admin.projects_read": k8s_instance_info["namespace"], + }, + ) + > 0 + ): + self.logger.debug( + f"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}" + ) + self.update_db_2( + item="nsrs", + _id=nsr_id, + _desc={f"{nsr_db_path}.namespace": kdu_instance}, + ) + k8s_instance_info["namespace"] = kdu_instance + await self.k8scluster_map[k8sclustertype].install( cluster_uuid=k8s_instance_info["k8scluster-uuid"], kdu_model=k8s_instance_info["kdu-model"], @@ -3098,9 +3275,6 @@ class NsLcm(LcmBase): kdu_instance=kdu_instance, vca_id=vca_id, ) - self.update_db_2( - "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance} - ) # Obtain services to obtain management service ip services = await self.k8scluster_map[k8sclustertype].get_services( @@ -3457,7 +3631,7 @@ class NsLcm(LcmBase): vnfd_with_id, k8s_instance_info, k8params=desc_params, - timeout=600, + timeout=1800, vca_id=vca_id, ) ) @@ -4369,6 +4543,7 @@ class NsLcm(LcmBase): cluster_uuid=kdu.get("k8scluster-uuid"), kdu_instance=kdu_instance, vca_id=vca_id, + namespace=kdu.get("namespace"), ) ) else: @@ -4794,10 +4969,18 @@ class NsLcm(LcmBase): db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) vca_id = self.get_vca_id({}, db_nsr) if db_nsr["_admin"]["deployed"]["K8s"]: - for k8s_index, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]): - cluster_uuid, kdu_instance = k8s["k8scluster-uuid"], k8s["kdu-instance"] + for _, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]): + cluster_uuid, kdu_instance, cluster_type = ( + k8s["k8scluster-uuid"], + k8s["kdu-instance"], + k8s["k8scluster-type"], + ) await self._on_update_k8s_db( - cluster_uuid, kdu_instance, filter={"_id": nsr_id}, vca_id=vca_id + cluster_uuid=cluster_uuid, + kdu_instance=kdu_instance, + filter={"_id": nsr_id}, + vca_id=vca_id, + cluster_type=cluster_type, ) else: for vca_index, _ in enumerate(db_nsr["_admin"]["deployed"]["VCA"]): @@ -4864,7 +5047,9 @@ class NsLcm(LcmBase): kdur_list = [] for kdur in db_vnfr["kdur"]: if kdur.get("additionalParams"): - kdur["additionalParams"] = json.loads(kdur["additionalParams"]) + kdur["additionalParams"] = json.loads( + kdur["additionalParams"] + ) kdur_list.append(kdur) db_vnfr["kdur"] = kdur_list step = "Getting vnfd from database" @@ -4943,7 +5128,17 @@ class NsLcm(LcmBase): actions.add(primitive["name"]) for primitive in kdu_configuration.get("config-primitive", []): actions.add(primitive["name"]) - kdu_action = True if primitive_name in actions else False + kdu = find_in_list( + nsr_deployed["K8s"], + lambda kdu: kdu_name == kdu["kdu-name"] + and kdu["member-vnf-index"] == vnf_index, + ) + kdu_action = ( + True + if primitive_name in actions + and kdu["k8scluster-type"] not in ("helm-chart", "helm-chart-v3") + else False + ) # TODO check if ns is in a proper status if kdu_name and ( @@ -5160,6 +5355,619 @@ class NsLcm(LcmBase): self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action") return nslcmop_operation_state, detailed_status + async def terminate_vdus( + self, db_vnfr, member_vnf_index, db_nsr, update_db_nslcmops, stage, logging_text + ): + """This method terminates VDUs + + Args: + db_vnfr: VNF instance record + member_vnf_index: VNF index to identify the VDUs to be removed + db_nsr: NS instance record + update_db_nslcmops: Nslcmop update record + """ + vca_scaling_info = [] + scaling_info = {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []} + scaling_info["scaling_direction"] = "IN" + scaling_info["vdu-delete"] = {} + scaling_info["kdu-delete"] = {} + db_vdur = db_vnfr.get("vdur") + vdur_list = copy(db_vdur) + count_index = 0 + for index, vdu in enumerate(vdur_list): + vca_scaling_info.append( + { + "osm_vdu_id": vdu["vdu-id-ref"], + "member-vnf-index": member_vnf_index, + "type": "delete", + "vdu_index": count_index, + }) + scaling_info["vdu-delete"][vdu["vdu-id-ref"]] = count_index + scaling_info["vdu"].append( + { + "name": vdu.get("name") or vdu.get("vdu-name"), + "vdu_id": vdu["vdu-id-ref"], + "interface": [], + }) + for interface in vdu["interfaces"]: + scaling_info["vdu"][index]["interface"].append( + { + "name": interface["name"], + "ip_address": interface["ip-address"], + "mac_address": interface.get("mac-address"), + }) + self.logger.info("NS update scaling info{}".format(scaling_info)) + stage[2] = "Terminating VDUs" + if scaling_info.get("vdu-delete"): + # scale_process = "RO" + if self.ro_config.get("ng"): + await self._scale_ng_ro( + logging_text, db_nsr, update_db_nslcmops, db_vnfr, scaling_info, stage + ) + + async def remove_vnf( + self, nsr_id, nslcmop_id, vnf_instance_id + ): + """This method is to Remove VNF instances from NS. + + Args: + nsr_id: NS instance id + nslcmop_id: nslcmop id of update + vnf_instance_id: id of the VNF instance to be removed + + Returns: + result: (str, str) COMPLETED/FAILED, details + """ + try: + db_nsr_update = {} + logging_text = "Task ns={} update ".format(nsr_id) + check_vnfr_count = len(self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})) + self.logger.info("check_vnfr_count {}".format(check_vnfr_count)) + if check_vnfr_count > 1: + stage = ["", "", ""] + step = "Getting nslcmop from database" + self.logger.debug(step + " after having waited for previous tasks to be completed") + # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) + db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) + db_vnfr = self.db.get_one("vnfrs", {"_id": vnf_instance_id}) + member_vnf_index = db_vnfr["member-vnf-index-ref"] + """ db_vnfr = self.db.get_one( + "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """ + + update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id}) + await self.terminate_vdus(db_vnfr, member_vnf_index, db_nsr, update_db_nslcmops, stage, logging_text) + + constituent_vnfr = db_nsr.get("constituent-vnfr-ref") + constituent_vnfr.remove(db_vnfr.get("_id")) + db_nsr_update["constituent-vnfr-ref"] = db_nsr.get("constituent-vnfr-ref") + self.update_db_2("nsrs", nsr_id, db_nsr_update) + self.db.del_one("vnfrs", {"_id": db_vnfr.get("_id")}) + self.update_db_2("nsrs", nsr_id, db_nsr_update) + return "COMPLETED", "Done" + else: + step = "Terminate VNF Failed with" + raise LcmException("{} Cannot terminate the last VNF in this NS.".format( + vnf_instance_id)) + except (LcmException, asyncio.CancelledError): + raise + except Exception as e: + self.logger.debug("Error removing VNF {}".format(e)) + return "FAILED", "Error removing VNF {}".format(e) + + async def _ns_redeploy_vnf( + self, nsr_id, nslcmop_id, db_vnfd, db_vnfr, db_nsr, + ): + """This method updates and redeploys VNF instances + + Args: + nsr_id: NS instance id + nslcmop_id: nslcmop id + db_vnfd: VNF descriptor + db_vnfr: VNF instance record + db_nsr: NS instance record + + Returns: + result: (str, str) COMPLETED/FAILED, details + """ + try: + count_index = 0 + stage = ["", "", ""] + logging_text = "Task ns={} update ".format(nsr_id) + latest_vnfd_revision = db_vnfd["_admin"].get("revision") + member_vnf_index = db_vnfr["member-vnf-index-ref"] + + # Terminate old VNF resources + update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id}) + await self.terminate_vdus(db_vnfr, member_vnf_index, db_nsr, update_db_nslcmops, stage, logging_text) + + # old_vnfd_id = db_vnfr["vnfd-id"] + # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id}) + new_db_vnfd = db_vnfd + # new_vnfd_ref = new_db_vnfd["id"] + # new_vnfd_id = vnfd_id + + # Create VDUR + new_vnfr_cp = [] + for cp in new_db_vnfd.get("ext-cpd", ()): + vnf_cp = { + "name": cp.get("id"), + "connection-point-id": cp.get("int-cpd", {}).get("cpd"), + "connection-point-vdu-id": cp.get("int-cpd", {}).get("vdu-id"), + "id": cp.get("id"), + } + new_vnfr_cp.append(vnf_cp) + new_vdur = update_db_nslcmops["operationParams"]["newVdur"] + # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index) + # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""} + new_vnfr_update = {"revision": latest_vnfd_revision, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""} + self.update_db_2("vnfrs", db_vnfr["_id"], new_vnfr_update) + updated_db_vnfr = self.db.get_one( + "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id} + ) + + # Instantiate new VNF resources + # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id}) + vca_scaling_info = [] + scaling_info = {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []} + scaling_info["scaling_direction"] = "OUT" + scaling_info["vdu-create"] = {} + scaling_info["kdu-create"] = {} + vdud_instantiate_list = db_vnfd["vdu"] + for index, vdud in enumerate(vdud_instantiate_list): + cloud_init_text = self._get_vdu_cloud_init_content( + vdud, db_vnfd + ) + if cloud_init_text: + additional_params = ( + self._get_vdu_additional_params(updated_db_vnfr, vdud["id"]) + or {} + ) + cloud_init_list = [] + if cloud_init_text: + # TODO Information of its own ip is not available because db_vnfr is not updated. + additional_params["OSM"] = get_osm_params( + updated_db_vnfr, vdud["id"], 1 + ) + cloud_init_list.append( + self._parse_cloud_init( + cloud_init_text, + additional_params, + db_vnfd["id"], + vdud["id"], + ) + ) + vca_scaling_info.append( + { + "osm_vdu_id": vdud["id"], + "member-vnf-index": member_vnf_index, + "type": "create", + "vdu_index": count_index, + } + ) + scaling_info["vdu-create"][vdud["id"]] = count_index + if self.ro_config.get("ng"): + self.logger.debug( + "New Resources to be deployed: {}".format(scaling_info)) + await self._scale_ng_ro( + logging_text, db_nsr, update_db_nslcmops, updated_db_vnfr, scaling_info, stage + ) + return "COMPLETED", "Done" + except (LcmException, asyncio.CancelledError): + raise + except Exception as e: + self.logger.debug("Error updating VNF {}".format(e)) + return "FAILED", "Error updating VNF {}".format(e) + + async def _ns_charm_upgrade( + self, + ee_id, + charm_id, + charm_type, + path, + timeout: float = None, + ) -> (str, str): + """This method upgrade charms in VNF instances + + Args: + ee_id: Execution environment id + path: Local path to the charm + charm_id: charm-id + charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm + timeout: (Float) Timeout for the ns update operation + + Returns: + result: (str, str) COMPLETED/FAILED, details + """ + try: + charm_type = charm_type or "lxc_proxy_charm" + output = await self.vca_map[charm_type].upgrade_charm( + ee_id=ee_id, + path=path, + charm_id=charm_id, + charm_type=charm_type, + timeout=timeout or self.timeout_ns_update, + ) + + if output: + return "COMPLETED", output + + except (LcmException, asyncio.CancelledError): + raise + + except Exception as e: + + self.logger.debug("Error upgrading charm {}".format(path)) + + return "FAILED", "Error upgrading charm {}: {}".format(path, e) + + async def update(self, nsr_id, nslcmop_id): + """Update NS according to different update types + + This method performs upgrade of VNF instances then updates the revision + number in VNF record + + Args: + nsr_id: Network service will be updated + nslcmop_id: ns lcm operation id + + Returns: + It may raise DbException, LcmException, N2VCException, K8sException + + """ + # Try to lock HA task here + task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id) + if not task_is_locked_by_me: + return + + logging_text = "Task ns={} update={} ".format(nsr_id, nslcmop_id) + self.logger.debug(logging_text + "Enter") + + # Set the required variables to be filled up later + db_nsr = None + db_nslcmop_update = {} + vnfr_update = {} + nslcmop_operation_state = None + db_nsr_update = {} + error_description_nslcmop = "" + exc = None + change_type = "updated" + detailed_status = "" + + try: + # wait for any previous tasks in process + step = "Waiting for previous operations to terminate" + await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id) + self._write_ns_status( + nsr_id=nsr_id, + ns_state=None, + current_operation="UPDATING", + current_operation_id=nslcmop_id, + ) + + step = "Getting nslcmop from database" + db_nslcmop = self.db.get_one( + "nslcmops", {"_id": nslcmop_id}, fail_on_empty=False + ) + update_type = db_nslcmop["operationParams"]["updateType"] + + step = "Getting nsr from database" + db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) + old_operational_status = db_nsr["operational-status"] + db_nsr_update["operational-status"] = "updating" + self.update_db_2("nsrs", nsr_id, db_nsr_update) + nsr_deployed = db_nsr["_admin"].get("deployed") + + if update_type == "CHANGE_VNFPKG": + + # Get the input parameters given through update request + vnf_instance_id = db_nslcmop["operationParams"][ + "changeVnfPackageData" + ].get("vnfInstanceId") + + vnfd_id = db_nslcmop["operationParams"]["changeVnfPackageData"].get( + "vnfdId" + ) + timeout_seconds = db_nslcmop["operationParams"].get("timeout_ns_update") + + step = "Getting vnfr from database" + db_vnfr = self.db.get_one( + "vnfrs", {"_id": vnf_instance_id}, fail_on_empty=False + ) + + step = "Getting vnfds from database" + # Latest VNFD + latest_vnfd = self.db.get_one( + "vnfds", {"_id": vnfd_id}, fail_on_empty=False + ) + latest_vnfd_revision = latest_vnfd["_admin"].get("revision") + + # Current VNFD + current_vnf_revision = db_vnfr.get("revision", 1) + current_vnfd = self.db.get_one( + "vnfds_revisions", + {"_id": vnfd_id + ":" + str(current_vnf_revision)}, + fail_on_empty=False, + ) + # Charm artifact paths will be filled up later + ( + current_charm_artifact_path, + target_charm_artifact_path, + charm_artifact_paths, + ) = ([], [], []) + + step = "Checking if revision has changed in VNFD" + if current_vnf_revision != latest_vnfd_revision: + + change_type = "policy_updated" + + # There is new revision of VNFD, update operation is required + current_vnfd_path = vnfd_id + ":" + str(current_vnf_revision) + latest_vnfd_path = vnfd_id + ":" + str(latest_vnfd_revision) + + step = "Removing the VNFD packages if they exist in the local path" + shutil.rmtree(self.fs.path + current_vnfd_path, ignore_errors=True) + shutil.rmtree(self.fs.path + latest_vnfd_path, ignore_errors=True) + + step = "Get the VNFD packages from FSMongo" + self.fs.sync(from_path=latest_vnfd_path) + self.fs.sync(from_path=current_vnfd_path) + + step = ( + "Get the charm-type, charm-id, ee-id if there is deployed VCA" + ) + base_folder = latest_vnfd["_admin"]["storage"] + + for charm_index, charm_deployed in enumerate( + get_iterable(nsr_deployed, "VCA") + ): + vnf_index = db_vnfr.get("member-vnf-index-ref") + + # Getting charm-id and charm-type + if charm_deployed.get("member-vnf-index") == vnf_index: + charm_id = self.get_vca_id(db_vnfr, db_nsr) + charm_type = charm_deployed.get("type") + + # Getting ee-id + ee_id = charm_deployed.get("ee_id") + + step = "Getting descriptor config" + descriptor_config = get_configuration( + current_vnfd, current_vnfd["id"] + ) + + if "execution-environment-list" in descriptor_config: + ee_list = descriptor_config.get( + "execution-environment-list", [] + ) + else: + ee_list = [] + + # There could be several charm used in the same VNF + for ee_item in ee_list: + if ee_item.get("juju"): + + step = "Getting charm name" + charm_name = ee_item["juju"].get("charm") + + step = "Setting Charm artifact paths" + current_charm_artifact_path.append( + get_charm_artifact_path( + base_folder, + charm_name, + charm_type, + current_vnf_revision, + ) + ) + target_charm_artifact_path.append( + get_charm_artifact_path( + base_folder, + charm_name, + charm_type, + latest_vnfd_revision, + ) + ) + + charm_artifact_paths = zip( + current_charm_artifact_path, target_charm_artifact_path + ) + + step = "Checking if software version has changed in VNFD" + if find_software_version(current_vnfd) != find_software_version( + latest_vnfd + ): + + step = "Checking if existing VNF has charm" + for current_charm_path, target_charm_path in list( + charm_artifact_paths + ): + if current_charm_path: + raise LcmException( + "Software version change is not supported as VNF instance {} has charm.".format( + vnf_instance_id + ) + ) + + # There is no change in the charm package, then redeploy the VNF + # based on new descriptor + step = "Redeploying VNF" + member_vnf_index = db_vnfr["member-vnf-index-ref"] + ( + result, + detailed_status + ) = await self._ns_redeploy_vnf( + nsr_id, + nslcmop_id, + latest_vnfd, + db_vnfr, + db_nsr + ) + if result == "FAILED": + nslcmop_operation_state = result + error_description_nslcmop = detailed_status + db_nslcmop_update["detailed-status"] = detailed_status + self.logger.debug( + logging_text + + " step {} Done with result {} {}".format( + step, nslcmop_operation_state, detailed_status + ) + ) + + else: + step = "Checking if any charm package has changed or not" + for current_charm_path, target_charm_path in list( + charm_artifact_paths + ): + if ( + current_charm_path + and target_charm_path + and self.check_charm_hash_changed( + current_charm_path, target_charm_path + ) + ): + + step = "Checking whether VNF uses juju bundle" + if check_juju_bundle_existence(current_vnfd): + + raise LcmException( + "Charm upgrade is not supported for the instance which" + " uses juju-bundle: {}".format( + check_juju_bundle_existence(current_vnfd) + ) + ) + + step = "Upgrading Charm" + ( + result, + detailed_status, + ) = await self._ns_charm_upgrade( + ee_id=ee_id, + charm_id=charm_id, + charm_type=charm_type, + path=self.fs.path + target_charm_path, + timeout=timeout_seconds, + ) + + if result == "FAILED": + nslcmop_operation_state = result + error_description_nslcmop = detailed_status + + db_nslcmop_update["detailed-status"] = detailed_status + self.logger.debug( + logging_text + + " step {} Done with result {} {}".format( + step, nslcmop_operation_state, detailed_status + ) + ) + + step = "Updating policies" + member_vnf_index = db_vnfr["member-vnf-index-ref"] + result = "COMPLETED" + detailed_status = "Done" + db_nslcmop_update["detailed-status"] = "Done" + + # If nslcmop_operation_state is None, so any operation is not failed. + if not nslcmop_operation_state: + nslcmop_operation_state = "COMPLETED" + + # If update CHANGE_VNFPKG nslcmop_operation is successful + # vnf revision need to be updated + vnfr_update["revision"] = latest_vnfd_revision + self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update) + + self.logger.debug( + logging_text + + " task Done with result {} {}".format( + nslcmop_operation_state, detailed_status + ) + ) + elif update_type == "REMOVE_VNF": + # This part is included in https://osm.etsi.org/gerrit/11876 + vnf_instance_id = db_nslcmop["operationParams"]["removeVnfInstanceId"] + db_vnfr = self.db.get_one("vnfrs", {"_id": vnf_instance_id}) + member_vnf_index = db_vnfr["member-vnf-index-ref"] + step = "Removing VNF" + (result, detailed_status) = await self.remove_vnf(nsr_id, nslcmop_id, vnf_instance_id) + if result == "FAILED": + nslcmop_operation_state = result + error_description_nslcmop = detailed_status + db_nslcmop_update["detailed-status"] = detailed_status + change_type = "vnf_terminated" + if not nslcmop_operation_state: + nslcmop_operation_state = "COMPLETED" + self.logger.debug( + logging_text + + " task Done with result {} {}".format( + nslcmop_operation_state, detailed_status + ) + ) + + # If nslcmop_operation_state is None, so any operation is not failed. + # All operations are executed in overall. + if not nslcmop_operation_state: + nslcmop_operation_state = "COMPLETED" + db_nsr_update["operational-status"] = old_operational_status + + except (DbException, LcmException, N2VCException, K8sException) as e: + self.logger.error(logging_text + "Exit Exception {}".format(e)) + exc = e + except asyncio.CancelledError: + self.logger.error( + logging_text + "Cancelled Exception while '{}'".format(step) + ) + exc = "Operation was cancelled" + except asyncio.TimeoutError: + self.logger.error(logging_text + "Timeout while '{}'".format(step)) + exc = "Timeout" + except Exception as e: + exc = traceback.format_exc() + self.logger.critical( + logging_text + "Exit Exception {} {}".format(type(e).__name__, e), + exc_info=True, + ) + finally: + if exc: + db_nslcmop_update[ + "detailed-status" + ] = ( + detailed_status + ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc) + nslcmop_operation_state = "FAILED" + db_nsr_update["operational-status"] = old_operational_status + if db_nsr: + self._write_ns_status( + nsr_id=nsr_id, + ns_state=db_nsr["nsState"], + current_operation="IDLE", + current_operation_id=None, + other_update=db_nsr_update, + ) + + self._write_op_status( + op_id=nslcmop_id, + stage="", + error_message=error_description_nslcmop, + operation_state=nslcmop_operation_state, + other_update=db_nslcmop_update, + ) + + if nslcmop_operation_state: + try: + msg = { + "nsr_id": nsr_id, + "nslcmop_id": nslcmop_id, + "operationState": nslcmop_operation_state, + } + if change_type in ("vnf_terminated", "policy_updated"): + msg.update({"vnf_member_index": member_vnf_index}) + await self.msg.aiowrite("ns", change_type, msg, loop=self.loop) + except Exception as e: + self.logger.error( + logging_text + "kafka_write notification Exception {}".format(e) + ) + self.logger.debug(logging_text + "Exit") + self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_update") + return nslcmop_operation_state, detailed_status + async def scale(self, nsr_id, nslcmop_id): # Try to lock HA task here task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id) @@ -6342,13 +7150,7 @@ class NsLcm(LcmBase): ) async def extract_prometheus_scrape_jobs( - self, - ee_id, - artifact_path, - ee_config_descriptor, - vnfr_id, - nsr_id, - target_ip + self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip ): # look if exist a file called 'prometheus*.j2' and artifact_content = self.fs.dir_ls(artifact_path) @@ -6409,3 +7211,101 @@ class NsLcm(LcmBase): """ config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {}) return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential") + + async def migrate(self, nsr_id, nslcmop_id): + """ + Migrate VNFs and VDUs instances in a NS + + :param: nsr_id: NS Instance ID + :param: nslcmop_id: nslcmop ID of migrate + + """ + # Try to lock HA task here + task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id) + if not task_is_locked_by_me: + return + logging_text = "Task ns={} migrate ".format(nsr_id) + self.logger.debug(logging_text + "Enter") + # get all needed from database + db_nslcmop = None + db_nslcmop_update = {} + nslcmop_operation_state = None + db_nsr_update = {} + target = {} + exc = None + # in case of error, indicates what part of scale was failed to put nsr at error status + start_deploy = time() + + try: + # wait for any previous tasks in process + step = "Waiting for previous operations to terminate" + await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id) + + self._write_ns_status( + nsr_id=nsr_id, + ns_state=None, + current_operation="MIGRATING", + current_operation_id=nslcmop_id, + ) + step = "Getting nslcmop from database" + self.logger.debug( + step + " after having waited for previous tasks to be completed" + ) + db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) + migrate_params = db_nslcmop.get("operationParams") + + target = {} + target.update(migrate_params) + desc = await self.RO.migrate(nsr_id, target) + self.logger.debug("RO return > {}".format(desc)) + action_id = desc["action_id"] + await self._wait_ng_ro( + nsr_id, action_id, nslcmop_id, start_deploy, self.timeout_migrate + ) + except (ROclient.ROClientException, DbException, LcmException) as e: + self.logger.error("Exit Exception {}".format(e)) + exc = e + except asyncio.CancelledError: + self.logger.error("Cancelled Exception while '{}'".format(step)) + exc = "Operation was cancelled" + except Exception as e: + exc = traceback.format_exc() + self.logger.critical( + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True + ) + finally: + self._write_ns_status( + nsr_id=nsr_id, + ns_state=None, + current_operation="IDLE", + current_operation_id=None, + ) + if exc: + db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc) + nslcmop_operation_state = "FAILED" + else: + nslcmop_operation_state = "COMPLETED" + db_nslcmop_update["detailed-status"] = "Done" + db_nsr_update["detailed-status"] = "Done" + + self._write_op_status( + op_id=nslcmop_id, + stage="", + error_message="", + operation_state=nslcmop_operation_state, + other_update=db_nslcmop_update, + ) + if nslcmop_operation_state: + try: + msg = { + "nsr_id": nsr_id, + "nslcmop_id": nslcmop_id, + "operationState": nslcmop_operation_state, + } + await self.msg.aiowrite("ns", "migrated", msg, loop=self.loop) + except Exception as e: + self.logger.error( + logging_text + "kafka_write notification Exception {}".format(e) + ) + self.logger.debug(logging_text + "Exit") + self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_migrate")