X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Fns.py;h=33e1d18b054483f9f45b8607a28b4dbe541028fa;hb=84bd9a7cee8c4686585f4ec53a95b163508c9d0b;hp=7bd6a92e1cd4a9c7be12fd4ba78f776c52278117;hpb=ae230237a438867e2c82a697c33e26838b849f80;p=osm%2FLCM.git diff --git a/osm_lcm/ns.py b/osm_lcm/ns.py index 7bd6a92..33e1d18 100644 --- a/osm_lcm/ns.py +++ b/osm_lcm/ns.py @@ -17,6 +17,7 @@ ## import asyncio +import shutil from typing import Any, Dict, List import yaml import logging @@ -55,6 +56,8 @@ from osm_lcm.lcm_utils import ( deep_get, get_iterable, populate_dict, + check_juju_bundle_existence, + get_charm_artifact_path, ) from osm_lcm.data_utils.nsd import ( get_ns_configuration_relation_list, @@ -78,9 +81,15 @@ from osm_lcm.data_utils.vnfd import ( get_number_of_instances, get_juju_ee_ref, get_kdu_resource_profile, + find_software_version, ) from osm_lcm.data_utils.list_utils import find_in_list -from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index, get_kdur +from osm_lcm.data_utils.vnfr import ( + get_osm_params, + get_vdur_index, + get_kdur, + get_volumes_from_instantiation_params, +) from osm_lcm.data_utils.dict_utils import parse_yaml_strings from osm_lcm.data_utils.database.vim_account import VimAccountDB from n2vc.definitions import RelationEndpoint @@ -116,12 +125,15 @@ class NsLcm(LcmBase): ) # Time for charm from first time at blocked,error status to mark as failed timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns timeout_ns_terminate = 1800 # default global timeout for un deployment a ns + timeout_ns_heal = 1800 # default global timeout for un deployment a ns timeout_charm_delete = 10 * 60 timeout_primitive = 30 * 60 # timeout for primitive execution + timeout_ns_update = 30 * 60 # timeout for ns update timeout_progress_primitive = ( 10 * 60 ) # timeout for some progress in a primitive execution - + timeout_migrate = 1800 # default global timeout for migrating vnfs + timeout_operate = 1800 # default global timeout for migrating vnfs SUBOPERATION_STATUS_NOT_FOUND = -1 SUBOPERATION_STATUS_NEW = -2 SUBOPERATION_STATUS_SKIP = -3 @@ -207,6 +219,13 @@ class NsLcm(LcmBase): # create RO client self.RO = NgRoClient(self.loop, **self.ro_config) + self.op_status_map = { + "instantiation": self.RO.status, + "termination": self.RO.status, + "migrate": self.RO.status, + "healing": self.RO.recreate_status, + } + @staticmethod def increment_ip_mac(ip_mac, vm_index=1): if not isinstance(ip_mac, str): @@ -449,8 +468,7 @@ class NsLcm(LcmBase): def _get_vdu_additional_params(self, db_vnfr, vdu_id): vdur = next( - (vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]), - {} + (vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]), {} ) additional_params = vdur.get("additionalParams") return parse_yaml_strings(additional_params) @@ -541,21 +559,24 @@ class NsLcm(LcmBase): ) if not vdur: # Read the template saved in the db: - self.logger.debug(f"No vdur in the database. Using the vdur-template to scale") + self.logger.debug( + "No vdur in the database. Using the vdur-template to scale" + ) vdur_template = db_vnfr.get("vdur-template") if not vdur_template: raise LcmException( - "Error scaling OUT VNFR for {}. No vnfr or template exists".format( - vdu_id + "Error scaling OUT VNFR for {}. No vnfr or template exists".format( + vdu_id ) ) vdur = vdur_template[0] - #Delete a template from the database after using it - self.db.set_one("vnfrs", - {"_id": db_vnfr["_id"]}, - None, - pull={"vdur-template": {"_id": vdur['_id']}} - ) + # Delete a template from the database after using it + self.db.set_one( + "vnfrs", + {"_id": db_vnfr["_id"]}, + None, + pull={"vdur-template": {"_id": vdur["_id"]}}, + ) for count in range(vdu_count): vdur_copy = deepcopy(vdur) vdur_copy["status"] = "BUILD" @@ -589,7 +610,9 @@ class NsLcm(LcmBase): if vdu_delete: if len(db_vnfr["vdur"]) == 1: # The scale will move to 0 instances - self.logger.debug(f"Scaling to 0 !, creating the template with the last vdur") + self.logger.debug( + "Scaling to 0 !, creating the template with the last vdur" + ) template_vdur = [db_vnfr["vdur"][0]] for vdu_id, vdu_count in vdu_delete.items(): if mark_delete: @@ -874,18 +897,55 @@ class NsLcm(LcmBase): get_iterable(vdur, "interfaces"), lambda iface: iface.get("ns-vld-id") == a_vld["name"], ) + + vld_params = find_in_list( + get_iterable(ns_params, "vld"), + lambda v_vld: v_vld["name"] in (a_vld["name"], a_vld["id"]), + ) if target_vld: + if vnf_params.get("vimAccountId") not in a_vld.get( "vim_info", {} ): + target_vim_network_list = [ + v for _, v in a_vld.get("vim_info").items() + ] + target_vim_network_name = next( + ( + item.get("vim_network_name", "") + for item in target_vim_network_list + ), + "", + ) + target["ns"]["vld"][a_index].get("vim_info").update( { "vim:{}".format(vnf_params["vimAccountId"]): { - "vim_network_name": "" + "vim_network_name": target_vim_network_name, } } ) + if vld_params: + for param in ("vim-network-name", "vim-network-id"): + if vld_params.get(param) and isinstance( + vld_params[param], dict + ): + for vim, vim_net in vld_params[ + param + ].items(): + other_target_vim = "vim:" + vim + populate_dict( + target["ns"]["vld"][a_index].get( + "vim_info" + ), + ( + other_target_vim, + param.replace("-", "_"), + ), + vim_net, + ) + nslcmop_id = db_nslcmop["_id"] target = { "name": db_nsr["name"], @@ -1209,18 +1269,28 @@ class NsLcm(LcmBase): vdur["vim_info"] = {target_vim: {}} # instantiation parameters - # if vnf_params: - # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] == - # vdud["id"]), None) + if vnf_params: + vdu_instantiation_params = find_in_list( + get_iterable(vnf_params, "vdu"), + lambda i_vdu: i_vdu["id"] == vdud["id"], + ) + if vdu_instantiation_params: + # Parse the vdu_volumes from the instantiation params + vdu_volumes = get_volumes_from_instantiation_params( + vdu_instantiation_params, vdud + ) + vdur["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes vdur_list.append(vdur) target_vnf["vdur"] = vdur_list target["vnf"].append(target_vnf) + self.logger.debug("Send to RO > nsr_id={} target={}".format(nsr_id, target)) desc = await self.RO.deploy(nsr_id, target) self.logger.debug("RO return > {}".format(desc)) action_id = desc["action_id"] await self._wait_ng_ro( - nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage + nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage, + operation="instantiation" ) # Updating NSR @@ -1244,12 +1314,13 @@ class NsLcm(LcmBase): start_time=None, timeout=600, stage=None, + operation=None, ): detailed_status_old = None db_nsr_update = {} start_time = start_time or time() while time() <= start_time + timeout: - desc_status = await self.RO.status(nsr_id, action_id) + desc_status = await self.op_status_map[operation](nsr_id, action_id) self.logger.debug("Wait NG RO > {}".format(desc_status)) if desc_status["status"] == "FAILED": raise NgRoException(desc_status["details"]) @@ -1300,7 +1371,8 @@ class NsLcm(LcmBase): # wait until done delete_timeout = 20 * 60 # 20 minutes await self._wait_ng_ro( - nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage + nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage, + operation="termination" ) db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None @@ -1574,7 +1646,7 @@ class NsLcm(LcmBase): } desc = await self.RO.deploy(nsr_id, target) action_id = desc["action_id"] - await self._wait_ng_ro(nsr_id, action_id, timeout=600) + await self._wait_ng_ro(nsr_id, action_id, timeout=600, operation="instantiation") break else: # wait until NS is deployed at RO @@ -1983,6 +2055,9 @@ class NsLcm(LcmBase): step = "Waiting to VM being up and getting IP address" self.logger.debug(logging_text + step) + # default rw_mgmt_ip to None, avoiding the non definition of the variable + rw_mgmt_ip = None + # n2vc_redesign STEP 5.1 # wait for RO (ip-address) Insert pub_key into VM if vnfr_id: @@ -2014,7 +2089,13 @@ class NsLcm(LcmBase): }, vca_id=vca_id, ) - else: + + # This verification is needed in order to avoid trying to add a public key + # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA + # for a KNF and not for its KDUs, the previous verification gives False, and the code + # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF + # or it is a KNF) + elif db_vnfr.get('vdur'): rw_mgmt_ip = await self.wait_vm_up_insert_key_ro( logging_text, nsr_id, @@ -2025,9 +2106,6 @@ class NsLcm(LcmBase): pub_key=pub_key, ) - else: - rw_mgmt_ip = None # This is for a NS configuration - self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip)) # store rw_mgmt_ip in deploy params for later replacement @@ -3195,9 +3273,45 @@ class NsLcm(LcmBase): kdu_model=k8s_instance_info["kdu-model"], kdu_name=k8s_instance_info["kdu-name"], ) + + # Update the nsrs table with the kdu-instance value self.update_db_2( - "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance} + item="nsrs", + _id=nsr_id, + _desc={nsr_db_path + ".kdu-instance": kdu_instance}, ) + + # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or + # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace + # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous + # namespace, this first verification could be removed, and the next step would be done for any kind + # of KNF. + # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based + # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027) + if k8sclustertype in ("juju", "juju-bundle"): + # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means + # that the user passed a namespace which he wants its KDU to be deployed in) + if ( + self.db.count( + table="nsrs", + q_filter={ + "_id": nsr_id, + "_admin.projects_write": k8s_instance_info["namespace"], + "_admin.projects_read": k8s_instance_info["namespace"], + }, + ) + > 0 + ): + self.logger.debug( + f"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}" + ) + self.update_db_2( + item="nsrs", + _id=nsr_id, + _desc={f"{nsr_db_path}.namespace": kdu_instance}, + ) + k8s_instance_info["namespace"] = kdu_instance + await self.k8scluster_map[k8sclustertype].install( cluster_uuid=k8s_instance_info["k8scluster-uuid"], kdu_model=k8s_instance_info["kdu-model"], @@ -3210,9 +3324,6 @@ class NsLcm(LcmBase): kdu_instance=kdu_instance, vca_id=vca_id, ) - self.update_db_2( - "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance} - ) # Obtain services to obtain management service ip services = await self.k8scluster_map[k8sclustertype].get_services( @@ -3569,7 +3680,7 @@ class NsLcm(LcmBase): vnfd_with_id, k8s_instance_info, k8params=desc_params, - timeout=600, + timeout=1800, vca_id=vca_id, ) ) @@ -4481,6 +4592,7 @@ class NsLcm(LcmBase): cluster_uuid=kdu.get("k8scluster-uuid"), kdu_instance=kdu_instance, vca_id=vca_id, + namespace=kdu.get("namespace"), ) ) else: @@ -5292,27 +5404,284 @@ class NsLcm(LcmBase): self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action") return nslcmop_operation_state, detailed_status - async def scale(self, nsr_id, nslcmop_id): + async def terminate_vdus( + self, db_vnfr, member_vnf_index, db_nsr, update_db_nslcmops, stage, logging_text + ): + """This method terminates VDUs + + Args: + db_vnfr: VNF instance record + member_vnf_index: VNF index to identify the VDUs to be removed + db_nsr: NS instance record + update_db_nslcmops: Nslcmop update record + """ + vca_scaling_info = [] + scaling_info = {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []} + scaling_info["scaling_direction"] = "IN" + scaling_info["vdu-delete"] = {} + scaling_info["kdu-delete"] = {} + db_vdur = db_vnfr.get("vdur") + vdur_list = copy(db_vdur) + count_index = 0 + for index, vdu in enumerate(vdur_list): + vca_scaling_info.append( + { + "osm_vdu_id": vdu["vdu-id-ref"], + "member-vnf-index": member_vnf_index, + "type": "delete", + "vdu_index": count_index, + }) + scaling_info["vdu-delete"][vdu["vdu-id-ref"]] = count_index + scaling_info["vdu"].append( + { + "name": vdu.get("name") or vdu.get("vdu-name"), + "vdu_id": vdu["vdu-id-ref"], + "interface": [], + }) + for interface in vdu["interfaces"]: + scaling_info["vdu"][index]["interface"].append( + { + "name": interface["name"], + "ip_address": interface["ip-address"], + "mac_address": interface.get("mac-address"), + }) + self.logger.info("NS update scaling info{}".format(scaling_info)) + stage[2] = "Terminating VDUs" + if scaling_info.get("vdu-delete"): + # scale_process = "RO" + if self.ro_config.get("ng"): + await self._scale_ng_ro( + logging_text, db_nsr, update_db_nslcmops, db_vnfr, scaling_info, stage + ) + + async def remove_vnf( + self, nsr_id, nslcmop_id, vnf_instance_id + ): + """This method is to Remove VNF instances from NS. + + Args: + nsr_id: NS instance id + nslcmop_id: nslcmop id of update + vnf_instance_id: id of the VNF instance to be removed + + Returns: + result: (str, str) COMPLETED/FAILED, details + """ + try: + db_nsr_update = {} + logging_text = "Task ns={} update ".format(nsr_id) + check_vnfr_count = len(self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})) + self.logger.info("check_vnfr_count {}".format(check_vnfr_count)) + if check_vnfr_count > 1: + stage = ["", "", ""] + step = "Getting nslcmop from database" + self.logger.debug(step + " after having waited for previous tasks to be completed") + # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) + db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) + db_vnfr = self.db.get_one("vnfrs", {"_id": vnf_instance_id}) + member_vnf_index = db_vnfr["member-vnf-index-ref"] + """ db_vnfr = self.db.get_one( + "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """ + + update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id}) + await self.terminate_vdus(db_vnfr, member_vnf_index, db_nsr, update_db_nslcmops, stage, logging_text) + + constituent_vnfr = db_nsr.get("constituent-vnfr-ref") + constituent_vnfr.remove(db_vnfr.get("_id")) + db_nsr_update["constituent-vnfr-ref"] = db_nsr.get("constituent-vnfr-ref") + self.update_db_2("nsrs", nsr_id, db_nsr_update) + self.db.del_one("vnfrs", {"_id": db_vnfr.get("_id")}) + self.update_db_2("nsrs", nsr_id, db_nsr_update) + return "COMPLETED", "Done" + else: + step = "Terminate VNF Failed with" + raise LcmException("{} Cannot terminate the last VNF in this NS.".format( + vnf_instance_id)) + except (LcmException, asyncio.CancelledError): + raise + except Exception as e: + self.logger.debug("Error removing VNF {}".format(e)) + return "FAILED", "Error removing VNF {}".format(e) + + async def _ns_redeploy_vnf( + self, nsr_id, nslcmop_id, db_vnfd, db_vnfr, db_nsr, + ): + """This method updates and redeploys VNF instances + + Args: + nsr_id: NS instance id + nslcmop_id: nslcmop id + db_vnfd: VNF descriptor + db_vnfr: VNF instance record + db_nsr: NS instance record + + Returns: + result: (str, str) COMPLETED/FAILED, details + """ + try: + count_index = 0 + stage = ["", "", ""] + logging_text = "Task ns={} update ".format(nsr_id) + latest_vnfd_revision = db_vnfd["_admin"].get("revision") + member_vnf_index = db_vnfr["member-vnf-index-ref"] + + # Terminate old VNF resources + update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id}) + await self.terminate_vdus(db_vnfr, member_vnf_index, db_nsr, update_db_nslcmops, stage, logging_text) + + # old_vnfd_id = db_vnfr["vnfd-id"] + # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id}) + new_db_vnfd = db_vnfd + # new_vnfd_ref = new_db_vnfd["id"] + # new_vnfd_id = vnfd_id + + # Create VDUR + new_vnfr_cp = [] + for cp in new_db_vnfd.get("ext-cpd", ()): + vnf_cp = { + "name": cp.get("id"), + "connection-point-id": cp.get("int-cpd", {}).get("cpd"), + "connection-point-vdu-id": cp.get("int-cpd", {}).get("vdu-id"), + "id": cp.get("id"), + } + new_vnfr_cp.append(vnf_cp) + new_vdur = update_db_nslcmops["operationParams"]["newVdur"] + # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index) + # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""} + new_vnfr_update = {"revision": latest_vnfd_revision, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""} + self.update_db_2("vnfrs", db_vnfr["_id"], new_vnfr_update) + updated_db_vnfr = self.db.get_one( + "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id} + ) + + # Instantiate new VNF resources + # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id}) + vca_scaling_info = [] + scaling_info = {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []} + scaling_info["scaling_direction"] = "OUT" + scaling_info["vdu-create"] = {} + scaling_info["kdu-create"] = {} + vdud_instantiate_list = db_vnfd["vdu"] + for index, vdud in enumerate(vdud_instantiate_list): + cloud_init_text = self._get_vdu_cloud_init_content( + vdud, db_vnfd + ) + if cloud_init_text: + additional_params = ( + self._get_vdu_additional_params(updated_db_vnfr, vdud["id"]) + or {} + ) + cloud_init_list = [] + if cloud_init_text: + # TODO Information of its own ip is not available because db_vnfr is not updated. + additional_params["OSM"] = get_osm_params( + updated_db_vnfr, vdud["id"], 1 + ) + cloud_init_list.append( + self._parse_cloud_init( + cloud_init_text, + additional_params, + db_vnfd["id"], + vdud["id"], + ) + ) + vca_scaling_info.append( + { + "osm_vdu_id": vdud["id"], + "member-vnf-index": member_vnf_index, + "type": "create", + "vdu_index": count_index, + } + ) + scaling_info["vdu-create"][vdud["id"]] = count_index + if self.ro_config.get("ng"): + self.logger.debug( + "New Resources to be deployed: {}".format(scaling_info)) + await self._scale_ng_ro( + logging_text, db_nsr, update_db_nslcmops, updated_db_vnfr, scaling_info, stage + ) + return "COMPLETED", "Done" + except (LcmException, asyncio.CancelledError): + raise + except Exception as e: + self.logger.debug("Error updating VNF {}".format(e)) + return "FAILED", "Error updating VNF {}".format(e) + + async def _ns_charm_upgrade( + self, + ee_id, + charm_id, + charm_type, + path, + timeout: float = None, + ) -> (str, str): + """This method upgrade charms in VNF instances + + Args: + ee_id: Execution environment id + path: Local path to the charm + charm_id: charm-id + charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm + timeout: (Float) Timeout for the ns update operation + + Returns: + result: (str, str) COMPLETED/FAILED, details + """ + try: + charm_type = charm_type or "lxc_proxy_charm" + output = await self.vca_map[charm_type].upgrade_charm( + ee_id=ee_id, + path=path, + charm_id=charm_id, + charm_type=charm_type, + timeout=timeout or self.timeout_ns_update, + ) + + if output: + return "COMPLETED", output + + except (LcmException, asyncio.CancelledError): + raise + + except Exception as e: + + self.logger.debug("Error upgrading charm {}".format(path)) + + return "FAILED", "Error upgrading charm {}: {}".format(path, e) + + async def update(self, nsr_id, nslcmop_id): + """Update NS according to different update types + + This method performs upgrade of VNF instances then updates the revision + number in VNF record + + Args: + nsr_id: Network service will be updated + nslcmop_id: ns lcm operation id + + Returns: + It may raise DbException, LcmException, N2VCException, K8sException + + """ # Try to lock HA task here task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id) if not task_is_locked_by_me: return - logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id) - stage = ["", "", ""] - tasks_dict_info = {} - # ^ stage, step, VIM progress + logging_text = "Task ns={} update={} ".format(nsr_id, nslcmop_id) self.logger.debug(logging_text + "Enter") - # get all needed from database + + # Set the required variables to be filled up later db_nsr = None db_nslcmop_update = {} + vnfr_update = {} + nslcmop_operation_state = None db_nsr_update = {} + error_description_nslcmop = "" exc = None - # in case of error, indicates what part of scale was failed to put nsr at error status - scale_process = None - old_operational_status = "" - old_config_status = "" - nsi_id = None + change_type = "updated" + detailed_status = "" + try: # wait for any previous tasks in process step = "Waiting for previous operations to terminate" @@ -5320,1218 +5689,2688 @@ class NsLcm(LcmBase): self._write_ns_status( nsr_id=nsr_id, ns_state=None, - current_operation="SCALING", + current_operation="UPDATING", current_operation_id=nslcmop_id, ) step = "Getting nslcmop from database" - self.logger.debug( - step + " after having waited for previous tasks to be completed" + db_nslcmop = self.db.get_one( + "nslcmops", {"_id": nslcmop_id}, fail_on_empty=False ) - db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) + update_type = db_nslcmop["operationParams"]["updateType"] step = "Getting nsr from database" db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) old_operational_status = db_nsr["operational-status"] - old_config_status = db_nsr["config-status"] - - step = "Parsing scaling parameters" - db_nsr_update["operational-status"] = "scaling" + db_nsr_update["operational-status"] = "updating" self.update_db_2("nsrs", nsr_id, db_nsr_update) nsr_deployed = db_nsr["_admin"].get("deployed") - vnf_index = db_nslcmop["operationParams"]["scaleVnfData"][ - "scaleByStepData" - ]["member-vnf-index"] - scaling_group = db_nslcmop["operationParams"]["scaleVnfData"][ - "scaleByStepData" - ]["scaling-group-descriptor"] - scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"] - # for backward compatibility - if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict): - nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values()) - db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"] - self.update_db_2("nsrs", nsr_id, db_nsr_update) - - step = "Getting vnfr from database" - db_vnfr = self.db.get_one( - "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id} - ) - - vca_id = self.get_vca_id(db_vnfr, db_nsr) + if update_type == "CHANGE_VNFPKG": - step = "Getting vnfd from database" - db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]}) + # Get the input parameters given through update request + vnf_instance_id = db_nslcmop["operationParams"][ + "changeVnfPackageData" + ].get("vnfInstanceId") - base_folder = db_vnfd["_admin"]["storage"] + vnfd_id = db_nslcmop["operationParams"]["changeVnfPackageData"].get( + "vnfdId" + ) + timeout_seconds = db_nslcmop["operationParams"].get("timeout_ns_update") - step = "Getting scaling-group-descriptor" - scaling_descriptor = find_in_list( - get_scaling_aspect(db_vnfd), - lambda scale_desc: scale_desc["name"] == scaling_group, - ) - if not scaling_descriptor: - raise LcmException( - "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present " - "at vnfd:scaling-group-descriptor".format(scaling_group) + step = "Getting vnfr from database" + db_vnfr = self.db.get_one( + "vnfrs", {"_id": vnf_instance_id}, fail_on_empty=False ) - step = "Sending scale order to VIM" - # TODO check if ns is in a proper status - nb_scale_op = 0 - if not db_nsr["_admin"].get("scaling-group"): - self.update_db_2( - "nsrs", - nsr_id, - { - "_admin.scaling-group": [ - {"name": scaling_group, "nb-scale-op": 0} - ] - }, + step = "Getting vnfds from database" + # Latest VNFD + latest_vnfd = self.db.get_one( + "vnfds", {"_id": vnfd_id}, fail_on_empty=False ) - admin_scale_index = 0 - else: - for admin_scale_index, admin_scale_info in enumerate( - db_nsr["_admin"]["scaling-group"] - ): - if admin_scale_info["name"] == scaling_group: - nb_scale_op = admin_scale_info.get("nb-scale-op", 0) - break - else: # not found, set index one plus last element and add new entry with the name - admin_scale_index += 1 - db_nsr_update[ - "_admin.scaling-group.{}.name".format(admin_scale_index) - ] = scaling_group + latest_vnfd_revision = latest_vnfd["_admin"].get("revision") + + # Current VNFD + current_vnf_revision = db_vnfr.get("revision", 1) + current_vnfd = self.db.get_one( + "vnfds_revisions", + {"_id": vnfd_id + ":" + str(current_vnf_revision)}, + fail_on_empty=False, + ) + # Charm artifact paths will be filled up later + ( + current_charm_artifact_path, + target_charm_artifact_path, + charm_artifact_paths, + ) = ([], [], []) - vca_scaling_info = [] - scaling_info = {"scaling_group_name": scaling_group, "vdu": [], "kdu": []} - if scaling_type == "SCALE_OUT": - if "aspect-delta-details" not in scaling_descriptor: - raise LcmException( - "Aspect delta details not fount in scaling descriptor {}".format( - scaling_descriptor["name"] - ) + step = "Checking if revision has changed in VNFD" + if current_vnf_revision != latest_vnfd_revision: + + change_type = "policy_updated" + + # There is new revision of VNFD, update operation is required + current_vnfd_path = vnfd_id + ":" + str(current_vnf_revision) + latest_vnfd_path = vnfd_id + ":" + str(latest_vnfd_revision) + + step = "Removing the VNFD packages if they exist in the local path" + shutil.rmtree(self.fs.path + current_vnfd_path, ignore_errors=True) + shutil.rmtree(self.fs.path + latest_vnfd_path, ignore_errors=True) + + step = "Get the VNFD packages from FSMongo" + self.fs.sync(from_path=latest_vnfd_path) + self.fs.sync(from_path=current_vnfd_path) + + step = ( + "Get the charm-type, charm-id, ee-id if there is deployed VCA" ) - # count if max-instance-count is reached - deltas = scaling_descriptor.get("aspect-delta-details")["deltas"] + base_folder = latest_vnfd["_admin"]["storage"] - scaling_info["scaling_direction"] = "OUT" - scaling_info["vdu-create"] = {} - scaling_info["kdu-create"] = {} - for delta in deltas: - for vdu_delta in delta.get("vdu-delta", {}): - vdud = get_vdu(db_vnfd, vdu_delta["id"]) - # vdu_index also provides the number of instance of the targeted vdu - vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta) - cloud_init_text = self._get_vdu_cloud_init_content( - vdud, db_vnfd - ) - if cloud_init_text: - additional_params = ( - self._get_vdu_additional_params(db_vnfr, vdud["id"]) - or {} - ) - cloud_init_list = [] + for charm_index, charm_deployed in enumerate( + get_iterable(nsr_deployed, "VCA") + ): + vnf_index = db_vnfr.get("member-vnf-index-ref") - vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"]) - max_instance_count = 10 - if vdu_profile and "max-number-of-instances" in vdu_profile: - max_instance_count = vdu_profile.get( - "max-number-of-instances", 10 - ) + # Getting charm-id and charm-type + if charm_deployed.get("member-vnf-index") == vnf_index: + charm_id = self.get_vca_id(db_vnfr, db_nsr) + charm_type = charm_deployed.get("type") - default_instance_num = get_number_of_instances( - db_vnfd, vdud["id"] - ) - instances_number = vdu_delta.get("number-of-instances", 1) - nb_scale_op += instances_number + # Getting ee-id + ee_id = charm_deployed.get("ee_id") - new_instance_count = nb_scale_op + default_instance_num - # Control if new count is over max and vdu count is less than max. - # Then assign new instance count - if new_instance_count > max_instance_count > vdu_count: - instances_number = new_instance_count - max_instance_count - else: - instances_number = instances_number + step = "Getting descriptor config" + descriptor_config = get_configuration( + current_vnfd, current_vnfd["id"] + ) - if new_instance_count > max_instance_count: - raise LcmException( - "reached the limit of {} (max-instance-count) " - "scaling-out operations for the " - "scaling-group-descriptor '{}'".format( - nb_scale_op, scaling_group + if "execution-environment-list" in descriptor_config: + ee_list = descriptor_config.get( + "execution-environment-list", [] ) - ) - for x in range(vdu_delta.get("number-of-instances", 1)): - if cloud_init_text: - # TODO Information of its own ip is not available because db_vnfr is not updated. - additional_params["OSM"] = get_osm_params( - db_vnfr, vdu_delta["id"], vdu_index + x - ) - cloud_init_list.append( - self._parse_cloud_init( - cloud_init_text, - additional_params, - db_vnfd["id"], - vdud["id"], + else: + ee_list = [] + + # There could be several charm used in the same VNF + for ee_item in ee_list: + if ee_item.get("juju"): + + step = "Getting charm name" + charm_name = ee_item["juju"].get("charm") + + step = "Setting Charm artifact paths" + current_charm_artifact_path.append( + get_charm_artifact_path( + base_folder, + charm_name, + charm_type, + current_vnf_revision, + ) + ) + target_charm_artifact_path.append( + get_charm_artifact_path( + base_folder, + charm_name, + charm_type, + latest_vnfd_revision, + ) ) - ) - vca_scaling_info.append( - { - "osm_vdu_id": vdu_delta["id"], - "member-vnf-index": vnf_index, - "type": "create", - "vdu_index": vdu_index + x, - } - ) - scaling_info["vdu-create"][vdu_delta["id"]] = instances_number - for kdu_delta in delta.get("kdu-resource-delta", {}): - kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"]) - kdu_name = kdu_profile["kdu-name"] - resource_name = kdu_profile.get("resource-name", "") - - # Might have different kdus in the same delta - # Should have list for each kdu - if not scaling_info["kdu-create"].get(kdu_name, None): - scaling_info["kdu-create"][kdu_name] = [] - - kdur = get_kdur(db_vnfr, kdu_name) - if kdur.get("helm-chart"): - k8s_cluster_type = "helm-chart-v3" - self.logger.debug("kdur: {}".format(kdur)) - if ( - kdur.get("helm-version") - and kdur.get("helm-version") == "v2" - ): - k8s_cluster_type = "helm-chart" - elif kdur.get("juju-bundle"): - k8s_cluster_type = "juju-bundle" - else: - raise LcmException( - "kdu type for kdu='{}.{}' is neither helm-chart nor " - "juju-bundle. Maybe an old NBI version is running".format( - db_vnfr["member-vnf-index-ref"], kdu_name - ) - ) - max_instance_count = 10 - if kdu_profile and "max-number-of-instances" in kdu_profile: - max_instance_count = kdu_profile.get( - "max-number-of-instances", 10 + charm_artifact_paths = zip( + current_charm_artifact_path, target_charm_artifact_path ) - nb_scale_op += kdu_delta.get("number-of-instances", 1) - deployed_kdu, _ = get_deployed_kdu( - nsr_deployed, kdu_name, vnf_index - ) - if deployed_kdu is None: - raise LcmException( - "KDU '{}' for vnf '{}' not deployed".format( - kdu_name, vnf_index - ) - ) - kdu_instance = deployed_kdu.get("kdu-instance") - instance_num = await self.k8scluster_map[ - k8s_cluster_type - ].get_scale_count( - resource_name, - kdu_instance, - vca_id=vca_id, - cluster_uuid=deployed_kdu.get("k8scluster-uuid"), - kdu_model=deployed_kdu.get("kdu-model"), - ) - kdu_replica_count = instance_num + kdu_delta.get( - "number-of-instances", 1 - ) + step = "Checking if software version has changed in VNFD" + if find_software_version(current_vnfd) != find_software_version( + latest_vnfd + ): - # Control if new count is over max and instance_num is less than max. - # Then assign max instance number to kdu replica count - if kdu_replica_count > max_instance_count > instance_num: - kdu_replica_count = max_instance_count - if kdu_replica_count > max_instance_count: - raise LcmException( - "reached the limit of {} (max-instance-count) " - "scaling-out operations for the " - "scaling-group-descriptor '{}'".format( - instance_num, scaling_group + step = "Checking if existing VNF has charm" + for current_charm_path, target_charm_path in list( + charm_artifact_paths + ): + if current_charm_path: + raise LcmException( + "Software version change is not supported as VNF instance {} has charm.".format( + vnf_instance_id + ) ) - ) - for x in range(kdu_delta.get("number-of-instances", 1)): - vca_scaling_info.append( - { - "osm_kdu_id": kdu_name, - "member-vnf-index": vnf_index, - "type": "create", - "kdu_index": instance_num + x - 1, - } + # There is no change in the charm package, then redeploy the VNF + # based on new descriptor + step = "Redeploying VNF" + member_vnf_index = db_vnfr["member-vnf-index-ref"] + ( + result, + detailed_status + ) = await self._ns_redeploy_vnf( + nsr_id, + nslcmop_id, + latest_vnfd, + db_vnfr, + db_nsr + ) + if result == "FAILED": + nslcmop_operation_state = result + error_description_nslcmop = detailed_status + db_nslcmop_update["detailed-status"] = detailed_status + self.logger.debug( + logging_text + + " step {} Done with result {} {}".format( + step, nslcmop_operation_state, detailed_status ) - scaling_info["kdu-create"][kdu_name].append( - { - "member-vnf-index": vnf_index, - "type": "create", - "k8s-cluster-type": k8s_cluster_type, - "resource-name": resource_name, - "scale": kdu_replica_count, - } ) - elif scaling_type == "SCALE_IN": - deltas = scaling_descriptor.get("aspect-delta-details")["deltas"] - scaling_info["scaling_direction"] = "IN" - scaling_info["vdu-delete"] = {} - scaling_info["kdu-delete"] = {} + else: + step = "Checking if any charm package has changed or not" + for current_charm_path, target_charm_path in list( + charm_artifact_paths + ): + if ( + current_charm_path + and target_charm_path + and self.check_charm_hash_changed( + current_charm_path, target_charm_path + ) + ): - for delta in deltas: - for vdu_delta in delta.get("vdu-delta", {}): - vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta) - min_instance_count = 0 - vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"]) - if vdu_profile and "min-number-of-instances" in vdu_profile: - min_instance_count = vdu_profile["min-number-of-instances"] + step = "Checking whether VNF uses juju bundle" + if check_juju_bundle_existence(current_vnfd): - default_instance_num = get_number_of_instances( - db_vnfd, vdu_delta["id"] - ) - instance_num = vdu_delta.get("number-of-instances", 1) - nb_scale_op -= instance_num + raise LcmException( + "Charm upgrade is not supported for the instance which" + " uses juju-bundle: {}".format( + check_juju_bundle_existence(current_vnfd) + ) + ) - new_instance_count = nb_scale_op + default_instance_num + step = "Upgrading Charm" + ( + result, + detailed_status, + ) = await self._ns_charm_upgrade( + ee_id=ee_id, + charm_id=charm_id, + charm_type=charm_type, + path=self.fs.path + target_charm_path, + timeout=timeout_seconds, + ) - if new_instance_count < min_instance_count < vdu_count: - instances_number = min_instance_count - new_instance_count - else: - instances_number = instance_num + if result == "FAILED": + nslcmop_operation_state = result + error_description_nslcmop = detailed_status - if new_instance_count < min_instance_count: - raise LcmException( - "reached the limit of {} (min-instance-count) scaling-in operations for the " - "scaling-group-descriptor '{}'".format( - nb_scale_op, scaling_group + db_nslcmop_update["detailed-status"] = detailed_status + self.logger.debug( + logging_text + + " step {} Done with result {} {}".format( + step, nslcmop_operation_state, detailed_status + ) ) - ) - for x in range(vdu_delta.get("number-of-instances", 1)): - vca_scaling_info.append( - { - "osm_vdu_id": vdu_delta["id"], - "member-vnf-index": vnf_index, - "type": "delete", - "vdu_index": vdu_index - 1 - x, - } - ) - scaling_info["vdu-delete"][vdu_delta["id"]] = instances_number - for kdu_delta in delta.get("kdu-resource-delta", {}): - kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"]) - kdu_name = kdu_profile["kdu-name"] - resource_name = kdu_profile.get("resource-name", "") - if not scaling_info["kdu-delete"].get(kdu_name, None): - scaling_info["kdu-delete"][kdu_name] = [] + step = "Updating policies" + member_vnf_index = db_vnfr["member-vnf-index-ref"] + result = "COMPLETED" + detailed_status = "Done" + db_nslcmop_update["detailed-status"] = "Done" - kdur = get_kdur(db_vnfr, kdu_name) - if kdur.get("helm-chart"): - k8s_cluster_type = "helm-chart-v3" - self.logger.debug("kdur: {}".format(kdur)) - if ( - kdur.get("helm-version") - and kdur.get("helm-version") == "v2" - ): - k8s_cluster_type = "helm-chart" - elif kdur.get("juju-bundle"): - k8s_cluster_type = "juju-bundle" - else: - raise LcmException( - "kdu type for kdu='{}.{}' is neither helm-chart nor " - "juju-bundle. Maybe an old NBI version is running".format( - db_vnfr["member-vnf-index-ref"], kdur["kdu-name"] - ) - ) + # If nslcmop_operation_state is None, so any operation is not failed. + if not nslcmop_operation_state: + nslcmop_operation_state = "COMPLETED" - min_instance_count = 0 - if kdu_profile and "min-number-of-instances" in kdu_profile: - min_instance_count = kdu_profile["min-number-of-instances"] + # If update CHANGE_VNFPKG nslcmop_operation is successful + # vnf revision need to be updated + vnfr_update["revision"] = latest_vnfd_revision + self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update) - nb_scale_op -= kdu_delta.get("number-of-instances", 1) - deployed_kdu, _ = get_deployed_kdu( - nsr_deployed, kdu_name, vnf_index - ) - if deployed_kdu is None: - raise LcmException( - "KDU '{}' for vnf '{}' not deployed".format( - kdu_name, vnf_index - ) - ) - kdu_instance = deployed_kdu.get("kdu-instance") - instance_num = await self.k8scluster_map[ - k8s_cluster_type - ].get_scale_count( - resource_name, - kdu_instance, - vca_id=vca_id, - cluster_uuid=deployed_kdu.get("k8scluster-uuid"), - kdu_model=deployed_kdu.get("kdu-model"), - ) - kdu_replica_count = instance_num - kdu_delta.get( - "number-of-instances", 1 + self.logger.debug( + logging_text + + " task Done with result {} {}".format( + nslcmop_operation_state, detailed_status ) + ) + elif update_type == "REMOVE_VNF": + # This part is included in https://osm.etsi.org/gerrit/11876 + vnf_instance_id = db_nslcmop["operationParams"]["removeVnfInstanceId"] + db_vnfr = self.db.get_one("vnfrs", {"_id": vnf_instance_id}) + member_vnf_index = db_vnfr["member-vnf-index-ref"] + step = "Removing VNF" + (result, detailed_status) = await self.remove_vnf(nsr_id, nslcmop_id, vnf_instance_id) + if result == "FAILED": + nslcmop_operation_state = result + error_description_nslcmop = detailed_status + db_nslcmop_update["detailed-status"] = detailed_status + change_type = "vnf_terminated" + if not nslcmop_operation_state: + nslcmop_operation_state = "COMPLETED" + self.logger.debug( + logging_text + + " task Done with result {} {}".format( + nslcmop_operation_state, detailed_status + ) + ) - if kdu_replica_count < min_instance_count < instance_num: - kdu_replica_count = min_instance_count - if kdu_replica_count < min_instance_count: - raise LcmException( - "reached the limit of {} (min-instance-count) scaling-in operations for the " - "scaling-group-descriptor '{}'".format( - instance_num, scaling_group - ) - ) + elif update_type == "OPERATE_VNF": + vnf_id = db_nslcmop["operationParams"]["operateVnfData"]["vnfInstanceId"] + operation_type = db_nslcmop["operationParams"]["operateVnfData"]["changeStateTo"] + additional_param = db_nslcmop["operationParams"]["operateVnfData"]["additionalParam"] + (result, detailed_status) = await self.rebuild_start_stop( + nsr_id, nslcmop_id, vnf_id, additional_param, operation_type + ) + if result == "FAILED": + nslcmop_operation_state = result + error_description_nslcmop = detailed_status + db_nslcmop_update["detailed-status"] = detailed_status + if not nslcmop_operation_state: + nslcmop_operation_state = "COMPLETED" + self.logger.debug( + logging_text + + " task Done with result {} {}".format( + nslcmop_operation_state, detailed_status + ) + ) - for x in range(kdu_delta.get("number-of-instances", 1)): - vca_scaling_info.append( - { - "osm_kdu_id": kdu_name, - "member-vnf-index": vnf_index, - "type": "delete", - "kdu_index": instance_num - x - 1, - } - ) - scaling_info["kdu-delete"][kdu_name].append( - { - "member-vnf-index": vnf_index, - "type": "delete", - "k8s-cluster-type": k8s_cluster_type, - "resource-name": resource_name, - "scale": kdu_replica_count, - } - ) + # If nslcmop_operation_state is None, so any operation is not failed. + # All operations are executed in overall. + if not nslcmop_operation_state: + nslcmop_operation_state = "COMPLETED" + db_nsr_update["operational-status"] = old_operational_status - # update VDU_SCALING_INFO with the VDUs to delete ip_addresses - vdu_delete = copy(scaling_info.get("vdu-delete")) - if scaling_info["scaling_direction"] == "IN": - for vdur in reversed(db_vnfr["vdur"]): - if vdu_delete.get(vdur["vdu-id-ref"]): - vdu_delete[vdur["vdu-id-ref"]] -= 1 - scaling_info["vdu"].append( - { - "name": vdur.get("name") or vdur.get("vdu-name"), - "vdu_id": vdur["vdu-id-ref"], - "interface": [], - } - ) - for interface in vdur["interfaces"]: - scaling_info["vdu"][-1]["interface"].append( - { - "name": interface["name"], - "ip_address": interface["ip-address"], - "mac_address": interface.get("mac-address"), - } - ) - # vdu_delete = vdu_scaling_info.pop("vdu-delete") + except (DbException, LcmException, N2VCException, K8sException) as e: + self.logger.error(logging_text + "Exit Exception {}".format(e)) + exc = e + except asyncio.CancelledError: + self.logger.error( + logging_text + "Cancelled Exception while '{}'".format(step) + ) + exc = "Operation was cancelled" + except asyncio.TimeoutError: + self.logger.error(logging_text + "Timeout while '{}'".format(step)) + exc = "Timeout" + except Exception as e: + exc = traceback.format_exc() + self.logger.critical( + logging_text + "Exit Exception {} {}".format(type(e).__name__, e), + exc_info=True, + ) + finally: + if exc: + db_nslcmop_update[ + "detailed-status" + ] = ( + detailed_status + ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc) + nslcmop_operation_state = "FAILED" + db_nsr_update["operational-status"] = old_operational_status + if db_nsr: + self._write_ns_status( + nsr_id=nsr_id, + ns_state=db_nsr["nsState"], + current_operation="IDLE", + current_operation_id=None, + other_update=db_nsr_update, + ) - # PRE-SCALE BEGIN - step = "Executing pre-scale vnf-config-primitive" - if scaling_descriptor.get("scaling-config-action"): - for scaling_config_action in scaling_descriptor[ - "scaling-config-action" - ]: - if ( - scaling_config_action.get("trigger") == "pre-scale-in" - and scaling_type == "SCALE_IN" - ) or ( - scaling_config_action.get("trigger") == "pre-scale-out" - and scaling_type == "SCALE_OUT" - ): - vnf_config_primitive = scaling_config_action[ - "vnf-config-primitive-name-ref" - ] - step = db_nslcmop_update[ - "detailed-status" - ] = "executing pre-scale scaling-config-action '{}'".format( - vnf_config_primitive - ) + self._write_op_status( + op_id=nslcmop_id, + stage="", + error_message=error_description_nslcmop, + operation_state=nslcmop_operation_state, + other_update=db_nslcmop_update, + ) - # look for primitive - for config_primitive in ( - get_configuration(db_vnfd, db_vnfd["id"]) or {} - ).get("config-primitive", ()): - if config_primitive["name"] == vnf_config_primitive: - break - else: - raise LcmException( - "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action" - "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-" - "primitive".format(scaling_group, vnf_config_primitive) - ) + if nslcmop_operation_state: + try: + msg = { + "nsr_id": nsr_id, + "nslcmop_id": nslcmop_id, + "operationState": nslcmop_operation_state, + } + if change_type in ("vnf_terminated", "policy_updated"): + msg.update({"vnf_member_index": member_vnf_index}) + await self.msg.aiowrite("ns", change_type, msg, loop=self.loop) + except Exception as e: + self.logger.error( + logging_text + "kafka_write notification Exception {}".format(e) + ) + self.logger.debug(logging_text + "Exit") + self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_update") + return nslcmop_operation_state, detailed_status - vnfr_params = {"VDU_SCALE_INFO": scaling_info} - if db_vnfr.get("additionalParamsForVnf"): - vnfr_params.update(db_vnfr["additionalParamsForVnf"]) + async def scale(self, nsr_id, nslcmop_id): + # Try to lock HA task here + task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id) + if not task_is_locked_by_me: + return - scale_process = "VCA" - db_nsr_update["config-status"] = "configuring pre-scaling" - primitive_params = self._map_primitive_params( - config_primitive, {}, vnfr_params - ) + logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id) + stage = ["", "", ""] + tasks_dict_info = {} + # ^ stage, step, VIM progress + self.logger.debug(logging_text + "Enter") + # get all needed from database + db_nsr = None + db_nslcmop_update = {} + db_nsr_update = {} + exc = None + # in case of error, indicates what part of scale was failed to put nsr at error status + scale_process = None + old_operational_status = "" + old_config_status = "" + nsi_id = None + try: + # wait for any previous tasks in process + step = "Waiting for previous operations to terminate" + await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id) + self._write_ns_status( + nsr_id=nsr_id, + ns_state=None, + current_operation="SCALING", + current_operation_id=nslcmop_id, + ) - # Pre-scale retry check: Check if this sub-operation has been executed before - op_index = self._check_or_add_scale_suboperation( - db_nslcmop, - vnf_index, - vnf_config_primitive, - primitive_params, - "PRE-SCALE", - ) - if op_index == self.SUBOPERATION_STATUS_SKIP: - # Skip sub-operation - result = "COMPLETED" - result_detail = "Done" - self.logger.debug( - logging_text - + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format( - vnf_config_primitive, result, result_detail - ) - ) - else: - if op_index == self.SUBOPERATION_STATUS_NEW: - # New sub-operation: Get index of this sub-operation - op_index = ( - len(db_nslcmop.get("_admin", {}).get("operations")) - - 1 - ) - self.logger.debug( - logging_text - + "vnf_config_primitive={} New sub-operation".format( - vnf_config_primitive - ) - ) - else: - # retry: Get registered params for this existing sub-operation - op = db_nslcmop.get("_admin", {}).get("operations", [])[ - op_index - ] - vnf_index = op.get("member_vnf_index") - vnf_config_primitive = op.get("primitive") - primitive_params = op.get("primitive_params") - self.logger.debug( - logging_text - + "vnf_config_primitive={} Sub-operation retry".format( - vnf_config_primitive - ) - ) - # Execute the primitive, either with new (first-time) or registered (reintent) args - ee_descriptor_id = config_primitive.get( - "execution-environment-ref" - ) - primitive_name = config_primitive.get( - "execution-environment-primitive", vnf_config_primitive - ) - ee_id, vca_type = self._look_for_deployed_vca( - nsr_deployed["VCA"], - member_vnf_index=vnf_index, - vdu_id=None, - vdu_count_index=None, - ee_descriptor_id=ee_descriptor_id, - ) - result, result_detail = await self._ns_execute_primitive( - ee_id, - primitive_name, - primitive_params, - vca_type=vca_type, - vca_id=vca_id, - ) - self.logger.debug( - logging_text - + "vnf_config_primitive={} Done with result {} {}".format( - vnf_config_primitive, result, result_detail - ) - ) - # Update operationState = COMPLETED | FAILED - self._update_suboperation_status( - db_nslcmop, op_index, result, result_detail - ) + step = "Getting nslcmop from database" + self.logger.debug( + step + " after having waited for previous tasks to be completed" + ) + db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) - if result == "FAILED": - raise LcmException(result_detail) - db_nsr_update["config-status"] = old_config_status - scale_process = None - # PRE-SCALE END + step = "Getting nsr from database" + db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) + old_operational_status = db_nsr["operational-status"] + old_config_status = db_nsr["config-status"] - db_nsr_update[ - "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index) - ] = nb_scale_op - db_nsr_update[ - "_admin.scaling-group.{}.time".format(admin_scale_index) - ] = time() + step = "Parsing scaling parameters" + db_nsr_update["operational-status"] = "scaling" + self.update_db_2("nsrs", nsr_id, db_nsr_update) + nsr_deployed = db_nsr["_admin"].get("deployed") - # SCALE-IN VCA - BEGIN - if vca_scaling_info: - step = db_nslcmop_update[ - "detailed-status" - ] = "Deleting the execution environments" - scale_process = "VCA" - for vca_info in vca_scaling_info: - if vca_info["type"] == "delete" and not vca_info.get("osm_kdu_id"): - member_vnf_index = str(vca_info["member-vnf-index"]) - self.logger.debug( - logging_text + "vdu info: {}".format(vca_info) - ) - if vca_info.get("osm_vdu_id"): - vdu_id = vca_info["osm_vdu_id"] - vdu_index = int(vca_info["vdu_index"]) - stage[ - 1 - ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format( - member_vnf_index, vdu_id, vdu_index - ) - stage[2] = step = "Scaling in VCA" - self._write_op_status(op_id=nslcmop_id, stage=stage) - vca_update = db_nsr["_admin"]["deployed"]["VCA"] - config_update = db_nsr["configurationStatus"] - for vca_index, vca in enumerate(vca_update): - if ( - (vca or vca.get("ee_id")) - and vca["member-vnf-index"] == member_vnf_index - and vca["vdu_count_index"] == vdu_index - ): - if vca.get("vdu_id"): - config_descriptor = get_configuration( - db_vnfd, vca.get("vdu_id") - ) - elif vca.get("kdu_name"): - config_descriptor = get_configuration( - db_vnfd, vca.get("kdu_name") - ) - else: - config_descriptor = get_configuration( - db_vnfd, db_vnfd["id"] - ) - operation_params = ( - db_nslcmop.get("operationParams") or {} - ) - exec_terminate_primitives = not operation_params.get( - "skip_terminate_primitives" - ) and vca.get("needed_terminate") - task = asyncio.ensure_future( - asyncio.wait_for( - self.destroy_N2VC( - logging_text, - db_nslcmop, - vca, - config_descriptor, - vca_index, - destroy_ee=True, - exec_primitives=exec_terminate_primitives, - scaling_in=True, - vca_id=vca_id, - ), - timeout=self.timeout_charm_delete, - ) - ) - tasks_dict_info[task] = "Terminating VCA {}".format( - vca.get("ee_id") - ) - del vca_update[vca_index] - del config_update[vca_index] - # wait for pending tasks of terminate primitives - if tasks_dict_info: - self.logger.debug( - logging_text - + "Waiting for tasks {}".format( - list(tasks_dict_info.keys()) - ) - ) - error_list = await self._wait_for_tasks( - logging_text, - tasks_dict_info, - min( - self.timeout_charm_delete, self.timeout_ns_terminate - ), - stage, - nslcmop_id, - ) - tasks_dict_info.clear() - if error_list: - raise LcmException("; ".join(error_list)) + vnf_index = db_nslcmop["operationParams"]["scaleVnfData"][ + "scaleByStepData" + ]["member-vnf-index"] + scaling_group = db_nslcmop["operationParams"]["scaleVnfData"][ + "scaleByStepData" + ]["scaling-group-descriptor"] + scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"] + # for backward compatibility + if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict): + nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values()) + db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"] + self.update_db_2("nsrs", nsr_id, db_nsr_update) - db_vca_and_config_update = { - "_admin.deployed.VCA": vca_update, - "configurationStatus": config_update, - } - self.update_db_2( - "nsrs", db_nsr["_id"], db_vca_and_config_update - ) - scale_process = None - # SCALE-IN VCA - END + step = "Getting vnfr from database" + db_vnfr = self.db.get_one( + "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id} + ) - # SCALE RO - BEGIN - if scaling_info.get("vdu-create") or scaling_info.get("vdu-delete"): - scale_process = "RO" - if self.ro_config.get("ng"): - await self._scale_ng_ro( - logging_text, db_nsr, db_nslcmop, db_vnfr, scaling_info, stage - ) - scaling_info.pop("vdu-create", None) - scaling_info.pop("vdu-delete", None) + vca_id = self.get_vca_id(db_vnfr, db_nsr) - scale_process = None - # SCALE RO - END + step = "Getting vnfd from database" + db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]}) - # SCALE KDU - BEGIN - if scaling_info.get("kdu-create") or scaling_info.get("kdu-delete"): - scale_process = "KDU" - await self._scale_kdu( - logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info + base_folder = db_vnfd["_admin"]["storage"] + + step = "Getting scaling-group-descriptor" + scaling_descriptor = find_in_list( + get_scaling_aspect(db_vnfd), + lambda scale_desc: scale_desc["name"] == scaling_group, + ) + if not scaling_descriptor: + raise LcmException( + "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present " + "at vnfd:scaling-group-descriptor".format(scaling_group) ) - scaling_info.pop("kdu-create", None) - scaling_info.pop("kdu-delete", None) - scale_process = None - # SCALE KDU - END + step = "Sending scale order to VIM" + # TODO check if ns is in a proper status + nb_scale_op = 0 + if not db_nsr["_admin"].get("scaling-group"): + self.update_db_2( + "nsrs", + nsr_id, + { + "_admin.scaling-group": [ + {"name": scaling_group, "nb-scale-op": 0} + ] + }, + ) + admin_scale_index = 0 + else: + for admin_scale_index, admin_scale_info in enumerate( + db_nsr["_admin"]["scaling-group"] + ): + if admin_scale_info["name"] == scaling_group: + nb_scale_op = admin_scale_info.get("nb-scale-op", 0) + break + else: # not found, set index one plus last element and add new entry with the name + admin_scale_index += 1 + db_nsr_update[ + "_admin.scaling-group.{}.name".format(admin_scale_index) + ] = scaling_group - if db_nsr_update: - self.update_db_2("nsrs", nsr_id, db_nsr_update) + vca_scaling_info = [] + scaling_info = {"scaling_group_name": scaling_group, "vdu": [], "kdu": []} + if scaling_type == "SCALE_OUT": + if "aspect-delta-details" not in scaling_descriptor: + raise LcmException( + "Aspect delta details not fount in scaling descriptor {}".format( + scaling_descriptor["name"] + ) + ) + # count if max-instance-count is reached + deltas = scaling_descriptor.get("aspect-delta-details")["deltas"] - # SCALE-UP VCA - BEGIN - if vca_scaling_info: - step = db_nslcmop_update[ - "detailed-status" - ] = "Creating new execution environments" - scale_process = "VCA" - for vca_info in vca_scaling_info: - if vca_info["type"] == "create" and not vca_info.get("osm_kdu_id"): - member_vnf_index = str(vca_info["member-vnf-index"]) - self.logger.debug( - logging_text + "vdu info: {}".format(vca_info) + scaling_info["scaling_direction"] = "OUT" + scaling_info["vdu-create"] = {} + scaling_info["kdu-create"] = {} + for delta in deltas: + for vdu_delta in delta.get("vdu-delta", {}): + vdud = get_vdu(db_vnfd, vdu_delta["id"]) + # vdu_index also provides the number of instance of the targeted vdu + vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta) + cloud_init_text = self._get_vdu_cloud_init_content( + vdud, db_vnfd ) - vnfd_id = db_vnfr["vnfd-ref"] - if vca_info.get("osm_vdu_id"): - vdu_index = int(vca_info["vdu_index"]) - deploy_params = {"OSM": get_osm_params(db_vnfr)} - if db_vnfr.get("additionalParamsForVnf"): - deploy_params.update( - parse_yaml_strings( - db_vnfr["additionalParamsForVnf"].copy() - ) - ) - descriptor_config = get_configuration( - db_vnfd, db_vnfd["id"] - ) - if descriptor_config: - vdu_id = None - vdu_name = None - kdu_name = None - self._deploy_n2vc( - logging_text=logging_text - + "member_vnf_index={} ".format(member_vnf_index), - db_nsr=db_nsr, - db_vnfr=db_vnfr, - nslcmop_id=nslcmop_id, - nsr_id=nsr_id, - nsi_id=nsi_id, - vnfd_id=vnfd_id, - vdu_id=vdu_id, - kdu_name=kdu_name, - member_vnf_index=member_vnf_index, - vdu_index=vdu_index, - vdu_name=vdu_name, - deploy_params=deploy_params, - descriptor_config=descriptor_config, - base_folder=base_folder, - task_instantiation_info=tasks_dict_info, - stage=stage, - ) - vdu_id = vca_info["osm_vdu_id"] - vdur = find_in_list( - db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id + if cloud_init_text: + additional_params = ( + self._get_vdu_additional_params(db_vnfr, vdud["id"]) + or {} ) - descriptor_config = get_configuration(db_vnfd, vdu_id) - if vdur.get("additionalParams"): - deploy_params_vdu = parse_yaml_strings( - vdur["additionalParams"] - ) - else: - deploy_params_vdu = deploy_params - deploy_params_vdu["OSM"] = get_osm_params( - db_vnfr, vdu_id, vdu_count_index=vdu_index + cloud_init_list = [] + + vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"]) + max_instance_count = 10 + if vdu_profile and "max-number-of-instances" in vdu_profile: + max_instance_count = vdu_profile.get( + "max-number-of-instances", 10 ) - if descriptor_config: - vdu_name = None - kdu_name = None - stage[ - 1 - ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format( - member_vnf_index, vdu_id, vdu_index - ) - stage[2] = step = "Scaling out VCA" - self._write_op_status(op_id=nslcmop_id, stage=stage) - self._deploy_n2vc( - logging_text=logging_text - + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format( - member_vnf_index, vdu_id, vdu_index - ), - db_nsr=db_nsr, - db_vnfr=db_vnfr, - nslcmop_id=nslcmop_id, - nsr_id=nsr_id, - nsi_id=nsi_id, - vnfd_id=vnfd_id, - vdu_id=vdu_id, - kdu_name=kdu_name, - member_vnf_index=member_vnf_index, - vdu_index=vdu_index, - vdu_name=vdu_name, - deploy_params=deploy_params_vdu, - descriptor_config=descriptor_config, - base_folder=base_folder, - task_instantiation_info=tasks_dict_info, - stage=stage, - ) - # SCALE-UP VCA - END - scale_process = None - # POST-SCALE BEGIN - # execute primitive service POST-SCALING - step = "Executing post-scale vnf-config-primitive" - if scaling_descriptor.get("scaling-config-action"): - for scaling_config_action in scaling_descriptor[ - "scaling-config-action" - ]: - if ( - scaling_config_action.get("trigger") == "post-scale-in" - and scaling_type == "SCALE_IN" - ) or ( - scaling_config_action.get("trigger") == "post-scale-out" - and scaling_type == "SCALE_OUT" - ): - vnf_config_primitive = scaling_config_action[ - "vnf-config-primitive-name-ref" - ] - step = db_nslcmop_update[ - "detailed-status" - ] = "executing post-scale scaling-config-action '{}'".format( - vnf_config_primitive + default_instance_num = get_number_of_instances( + db_vnfd, vdud["id"] ) + instances_number = vdu_delta.get("number-of-instances", 1) + nb_scale_op += instances_number - vnfr_params = {"VDU_SCALE_INFO": scaling_info} - if db_vnfr.get("additionalParamsForVnf"): - vnfr_params.update(db_vnfr["additionalParamsForVnf"]) - - # look for primitive - for config_primitive in ( - get_configuration(db_vnfd, db_vnfd["id"]) or {} - ).get("config-primitive", ()): - if config_primitive["name"] == vnf_config_primitive: - break + new_instance_count = nb_scale_op + default_instance_num + # Control if new count is over max and vdu count is less than max. + # Then assign new instance count + if new_instance_count > max_instance_count > vdu_count: + instances_number = new_instance_count - max_instance_count else: - raise LcmException( - "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-" - "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:" - "config-primitive".format( - scaling_group, vnf_config_primitive - ) - ) - scale_process = "VCA" - db_nsr_update["config-status"] = "configuring post-scaling" - primitive_params = self._map_primitive_params( - config_primitive, {}, vnfr_params - ) + instances_number = instances_number - # Post-scale retry check: Check if this sub-operation has been executed before - op_index = self._check_or_add_scale_suboperation( - db_nslcmop, - vnf_index, - vnf_config_primitive, - primitive_params, - "POST-SCALE", - ) - if op_index == self.SUBOPERATION_STATUS_SKIP: - # Skip sub-operation - result = "COMPLETED" - result_detail = "Done" - self.logger.debug( - logging_text - + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format( - vnf_config_primitive, result, result_detail + if new_instance_count > max_instance_count: + raise LcmException( + "reached the limit of {} (max-instance-count) " + "scaling-out operations for the " + "scaling-group-descriptor '{}'".format( + nb_scale_op, scaling_group ) ) - else: - if op_index == self.SUBOPERATION_STATUS_NEW: - # New sub-operation: Get index of this sub-operation - op_index = ( - len(db_nslcmop.get("_admin", {}).get("operations")) - - 1 + for x in range(vdu_delta.get("number-of-instances", 1)): + if cloud_init_text: + # TODO Information of its own ip is not available because db_vnfr is not updated. + additional_params["OSM"] = get_osm_params( + db_vnfr, vdu_delta["id"], vdu_index + x ) - self.logger.debug( - logging_text - + "vnf_config_primitive={} New sub-operation".format( - vnf_config_primitive + cloud_init_list.append( + self._parse_cloud_init( + cloud_init_text, + additional_params, + db_vnfd["id"], + vdud["id"], ) ) - else: - # retry: Get registered params for this existing sub-operation - op = db_nslcmop.get("_admin", {}).get("operations", [])[ - op_index - ] - vnf_index = op.get("member_vnf_index") - vnf_config_primitive = op.get("primitive") - primitive_params = op.get("primitive_params") - self.logger.debug( - logging_text - + "vnf_config_primitive={} Sub-operation retry".format( - vnf_config_primitive - ) + vca_scaling_info.append( + { + "osm_vdu_id": vdu_delta["id"], + "member-vnf-index": vnf_index, + "type": "create", + "vdu_index": vdu_index + x, + } + ) + scaling_info["vdu-create"][vdu_delta["id"]] = instances_number + for kdu_delta in delta.get("kdu-resource-delta", {}): + kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"]) + kdu_name = kdu_profile["kdu-name"] + resource_name = kdu_profile.get("resource-name", "") + + # Might have different kdus in the same delta + # Should have list for each kdu + if not scaling_info["kdu-create"].get(kdu_name, None): + scaling_info["kdu-create"][kdu_name] = [] + + kdur = get_kdur(db_vnfr, kdu_name) + if kdur.get("helm-chart"): + k8s_cluster_type = "helm-chart-v3" + self.logger.debug("kdur: {}".format(kdur)) + if ( + kdur.get("helm-version") + and kdur.get("helm-version") == "v2" + ): + k8s_cluster_type = "helm-chart" + elif kdur.get("juju-bundle"): + k8s_cluster_type = "juju-bundle" + else: + raise LcmException( + "kdu type for kdu='{}.{}' is neither helm-chart nor " + "juju-bundle. Maybe an old NBI version is running".format( + db_vnfr["member-vnf-index-ref"], kdu_name ) - # Execute the primitive, either with new (first-time) or registered (reintent) args - ee_descriptor_id = config_primitive.get( - "execution-environment-ref" - ) - primitive_name = config_primitive.get( - "execution-environment-primitive", vnf_config_primitive - ) - ee_id, vca_type = self._look_for_deployed_vca( - nsr_deployed["VCA"], - member_vnf_index=vnf_index, - vdu_id=None, - vdu_count_index=None, - ee_descriptor_id=ee_descriptor_id, ) - result, result_detail = await self._ns_execute_primitive( - ee_id, - primitive_name, - primitive_params, - vca_type=vca_type, - vca_id=vca_id, + + max_instance_count = 10 + if kdu_profile and "max-number-of-instances" in kdu_profile: + max_instance_count = kdu_profile.get( + "max-number-of-instances", 10 ) - self.logger.debug( - logging_text - + "vnf_config_primitive={} Done with result {} {}".format( - vnf_config_primitive, result, result_detail + + nb_scale_op += kdu_delta.get("number-of-instances", 1) + deployed_kdu, _ = get_deployed_kdu( + nsr_deployed, kdu_name, vnf_index + ) + if deployed_kdu is None: + raise LcmException( + "KDU '{}' for vnf '{}' not deployed".format( + kdu_name, vnf_index ) ) - # Update operationState = COMPLETED | FAILED - self._update_suboperation_status( - db_nslcmop, op_index, result, result_detail - ) + kdu_instance = deployed_kdu.get("kdu-instance") + instance_num = await self.k8scluster_map[ + k8s_cluster_type + ].get_scale_count( + resource_name, + kdu_instance, + vca_id=vca_id, + cluster_uuid=deployed_kdu.get("k8scluster-uuid"), + kdu_model=deployed_kdu.get("kdu-model"), + ) + kdu_replica_count = instance_num + kdu_delta.get( + "number-of-instances", 1 + ) - if result == "FAILED": - raise LcmException(result_detail) - db_nsr_update["config-status"] = old_config_status - scale_process = None - # POST-SCALE END + # Control if new count is over max and instance_num is less than max. + # Then assign max instance number to kdu replica count + if kdu_replica_count > max_instance_count > instance_num: + kdu_replica_count = max_instance_count + if kdu_replica_count > max_instance_count: + raise LcmException( + "reached the limit of {} (max-instance-count) " + "scaling-out operations for the " + "scaling-group-descriptor '{}'".format( + instance_num, scaling_group + ) + ) - db_nsr_update[ - "detailed-status" - ] = "" # "scaled {} {}".format(scaling_group, scaling_type) - db_nsr_update["operational-status"] = ( - "running" - if old_operational_status == "failed" - else old_operational_status + for x in range(kdu_delta.get("number-of-instances", 1)): + vca_scaling_info.append( + { + "osm_kdu_id": kdu_name, + "member-vnf-index": vnf_index, + "type": "create", + "kdu_index": instance_num + x - 1, + } + ) + scaling_info["kdu-create"][kdu_name].append( + { + "member-vnf-index": vnf_index, + "type": "create", + "k8s-cluster-type": k8s_cluster_type, + "resource-name": resource_name, + "scale": kdu_replica_count, + } + ) + elif scaling_type == "SCALE_IN": + deltas = scaling_descriptor.get("aspect-delta-details")["deltas"] + + scaling_info["scaling_direction"] = "IN" + scaling_info["vdu-delete"] = {} + scaling_info["kdu-delete"] = {} + + for delta in deltas: + for vdu_delta in delta.get("vdu-delta", {}): + vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta) + min_instance_count = 0 + vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"]) + if vdu_profile and "min-number-of-instances" in vdu_profile: + min_instance_count = vdu_profile["min-number-of-instances"] + + default_instance_num = get_number_of_instances( + db_vnfd, vdu_delta["id"] + ) + instance_num = vdu_delta.get("number-of-instances", 1) + nb_scale_op -= instance_num + + new_instance_count = nb_scale_op + default_instance_num + + if new_instance_count < min_instance_count < vdu_count: + instances_number = min_instance_count - new_instance_count + else: + instances_number = instance_num + + if new_instance_count < min_instance_count: + raise LcmException( + "reached the limit of {} (min-instance-count) scaling-in operations for the " + "scaling-group-descriptor '{}'".format( + nb_scale_op, scaling_group + ) + ) + for x in range(vdu_delta.get("number-of-instances", 1)): + vca_scaling_info.append( + { + "osm_vdu_id": vdu_delta["id"], + "member-vnf-index": vnf_index, + "type": "delete", + "vdu_index": vdu_index - 1 - x, + } + ) + scaling_info["vdu-delete"][vdu_delta["id"]] = instances_number + for kdu_delta in delta.get("kdu-resource-delta", {}): + kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"]) + kdu_name = kdu_profile["kdu-name"] + resource_name = kdu_profile.get("resource-name", "") + + if not scaling_info["kdu-delete"].get(kdu_name, None): + scaling_info["kdu-delete"][kdu_name] = [] + + kdur = get_kdur(db_vnfr, kdu_name) + if kdur.get("helm-chart"): + k8s_cluster_type = "helm-chart-v3" + self.logger.debug("kdur: {}".format(kdur)) + if ( + kdur.get("helm-version") + and kdur.get("helm-version") == "v2" + ): + k8s_cluster_type = "helm-chart" + elif kdur.get("juju-bundle"): + k8s_cluster_type = "juju-bundle" + else: + raise LcmException( + "kdu type for kdu='{}.{}' is neither helm-chart nor " + "juju-bundle. Maybe an old NBI version is running".format( + db_vnfr["member-vnf-index-ref"], kdur["kdu-name"] + ) + ) + + min_instance_count = 0 + if kdu_profile and "min-number-of-instances" in kdu_profile: + min_instance_count = kdu_profile["min-number-of-instances"] + + nb_scale_op -= kdu_delta.get("number-of-instances", 1) + deployed_kdu, _ = get_deployed_kdu( + nsr_deployed, kdu_name, vnf_index + ) + if deployed_kdu is None: + raise LcmException( + "KDU '{}' for vnf '{}' not deployed".format( + kdu_name, vnf_index + ) + ) + kdu_instance = deployed_kdu.get("kdu-instance") + instance_num = await self.k8scluster_map[ + k8s_cluster_type + ].get_scale_count( + resource_name, + kdu_instance, + vca_id=vca_id, + cluster_uuid=deployed_kdu.get("k8scluster-uuid"), + kdu_model=deployed_kdu.get("kdu-model"), + ) + kdu_replica_count = instance_num - kdu_delta.get( + "number-of-instances", 1 + ) + + if kdu_replica_count < min_instance_count < instance_num: + kdu_replica_count = min_instance_count + if kdu_replica_count < min_instance_count: + raise LcmException( + "reached the limit of {} (min-instance-count) scaling-in operations for the " + "scaling-group-descriptor '{}'".format( + instance_num, scaling_group + ) + ) + + for x in range(kdu_delta.get("number-of-instances", 1)): + vca_scaling_info.append( + { + "osm_kdu_id": kdu_name, + "member-vnf-index": vnf_index, + "type": "delete", + "kdu_index": instance_num - x - 1, + } + ) + scaling_info["kdu-delete"][kdu_name].append( + { + "member-vnf-index": vnf_index, + "type": "delete", + "k8s-cluster-type": k8s_cluster_type, + "resource-name": resource_name, + "scale": kdu_replica_count, + } + ) + + # update VDU_SCALING_INFO with the VDUs to delete ip_addresses + vdu_delete = copy(scaling_info.get("vdu-delete")) + if scaling_info["scaling_direction"] == "IN": + for vdur in reversed(db_vnfr["vdur"]): + if vdu_delete.get(vdur["vdu-id-ref"]): + vdu_delete[vdur["vdu-id-ref"]] -= 1 + scaling_info["vdu"].append( + { + "name": vdur.get("name") or vdur.get("vdu-name"), + "vdu_id": vdur["vdu-id-ref"], + "interface": [], + } + ) + for interface in vdur["interfaces"]: + scaling_info["vdu"][-1]["interface"].append( + { + "name": interface["name"], + "ip_address": interface["ip-address"], + "mac_address": interface.get("mac-address"), + } + ) + # vdu_delete = vdu_scaling_info.pop("vdu-delete") + + # PRE-SCALE BEGIN + step = "Executing pre-scale vnf-config-primitive" + if scaling_descriptor.get("scaling-config-action"): + for scaling_config_action in scaling_descriptor[ + "scaling-config-action" + ]: + if ( + scaling_config_action.get("trigger") == "pre-scale-in" + and scaling_type == "SCALE_IN" + ) or ( + scaling_config_action.get("trigger") == "pre-scale-out" + and scaling_type == "SCALE_OUT" + ): + vnf_config_primitive = scaling_config_action[ + "vnf-config-primitive-name-ref" + ] + step = db_nslcmop_update[ + "detailed-status" + ] = "executing pre-scale scaling-config-action '{}'".format( + vnf_config_primitive + ) + + # look for primitive + for config_primitive in ( + get_configuration(db_vnfd, db_vnfd["id"]) or {} + ).get("config-primitive", ()): + if config_primitive["name"] == vnf_config_primitive: + break + else: + raise LcmException( + "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action" + "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-" + "primitive".format(scaling_group, vnf_config_primitive) + ) + + vnfr_params = {"VDU_SCALE_INFO": scaling_info} + if db_vnfr.get("additionalParamsForVnf"): + vnfr_params.update(db_vnfr["additionalParamsForVnf"]) + + scale_process = "VCA" + db_nsr_update["config-status"] = "configuring pre-scaling" + primitive_params = self._map_primitive_params( + config_primitive, {}, vnfr_params + ) + + # Pre-scale retry check: Check if this sub-operation has been executed before + op_index = self._check_or_add_scale_suboperation( + db_nslcmop, + vnf_index, + vnf_config_primitive, + primitive_params, + "PRE-SCALE", + ) + if op_index == self.SUBOPERATION_STATUS_SKIP: + # Skip sub-operation + result = "COMPLETED" + result_detail = "Done" + self.logger.debug( + logging_text + + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format( + vnf_config_primitive, result, result_detail + ) + ) + else: + if op_index == self.SUBOPERATION_STATUS_NEW: + # New sub-operation: Get index of this sub-operation + op_index = ( + len(db_nslcmop.get("_admin", {}).get("operations")) + - 1 + ) + self.logger.debug( + logging_text + + "vnf_config_primitive={} New sub-operation".format( + vnf_config_primitive + ) + ) + else: + # retry: Get registered params for this existing sub-operation + op = db_nslcmop.get("_admin", {}).get("operations", [])[ + op_index + ] + vnf_index = op.get("member_vnf_index") + vnf_config_primitive = op.get("primitive") + primitive_params = op.get("primitive_params") + self.logger.debug( + logging_text + + "vnf_config_primitive={} Sub-operation retry".format( + vnf_config_primitive + ) + ) + # Execute the primitive, either with new (first-time) or registered (reintent) args + ee_descriptor_id = config_primitive.get( + "execution-environment-ref" + ) + primitive_name = config_primitive.get( + "execution-environment-primitive", vnf_config_primitive + ) + ee_id, vca_type = self._look_for_deployed_vca( + nsr_deployed["VCA"], + member_vnf_index=vnf_index, + vdu_id=None, + vdu_count_index=None, + ee_descriptor_id=ee_descriptor_id, + ) + result, result_detail = await self._ns_execute_primitive( + ee_id, + primitive_name, + primitive_params, + vca_type=vca_type, + vca_id=vca_id, + ) + self.logger.debug( + logging_text + + "vnf_config_primitive={} Done with result {} {}".format( + vnf_config_primitive, result, result_detail + ) + ) + # Update operationState = COMPLETED | FAILED + self._update_suboperation_status( + db_nslcmop, op_index, result, result_detail + ) + + if result == "FAILED": + raise LcmException(result_detail) + db_nsr_update["config-status"] = old_config_status + scale_process = None + # PRE-SCALE END + + db_nsr_update[ + "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index) + ] = nb_scale_op + db_nsr_update[ + "_admin.scaling-group.{}.time".format(admin_scale_index) + ] = time() + + # SCALE-IN VCA - BEGIN + if vca_scaling_info: + step = db_nslcmop_update[ + "detailed-status" + ] = "Deleting the execution environments" + scale_process = "VCA" + for vca_info in vca_scaling_info: + if vca_info["type"] == "delete" and not vca_info.get("osm_kdu_id"): + member_vnf_index = str(vca_info["member-vnf-index"]) + self.logger.debug( + logging_text + "vdu info: {}".format(vca_info) + ) + if vca_info.get("osm_vdu_id"): + vdu_id = vca_info["osm_vdu_id"] + vdu_index = int(vca_info["vdu_index"]) + stage[ + 1 + ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format( + member_vnf_index, vdu_id, vdu_index + ) + stage[2] = step = "Scaling in VCA" + self._write_op_status(op_id=nslcmop_id, stage=stage) + vca_update = db_nsr["_admin"]["deployed"]["VCA"] + config_update = db_nsr["configurationStatus"] + for vca_index, vca in enumerate(vca_update): + if ( + (vca or vca.get("ee_id")) + and vca["member-vnf-index"] == member_vnf_index + and vca["vdu_count_index"] == vdu_index + ): + if vca.get("vdu_id"): + config_descriptor = get_configuration( + db_vnfd, vca.get("vdu_id") + ) + elif vca.get("kdu_name"): + config_descriptor = get_configuration( + db_vnfd, vca.get("kdu_name") + ) + else: + config_descriptor = get_configuration( + db_vnfd, db_vnfd["id"] + ) + operation_params = ( + db_nslcmop.get("operationParams") or {} + ) + exec_terminate_primitives = not operation_params.get( + "skip_terminate_primitives" + ) and vca.get("needed_terminate") + task = asyncio.ensure_future( + asyncio.wait_for( + self.destroy_N2VC( + logging_text, + db_nslcmop, + vca, + config_descriptor, + vca_index, + destroy_ee=True, + exec_primitives=exec_terminate_primitives, + scaling_in=True, + vca_id=vca_id, + ), + timeout=self.timeout_charm_delete, + ) + ) + tasks_dict_info[task] = "Terminating VCA {}".format( + vca.get("ee_id") + ) + del vca_update[vca_index] + del config_update[vca_index] + # wait for pending tasks of terminate primitives + if tasks_dict_info: + self.logger.debug( + logging_text + + "Waiting for tasks {}".format( + list(tasks_dict_info.keys()) + ) + ) + error_list = await self._wait_for_tasks( + logging_text, + tasks_dict_info, + min( + self.timeout_charm_delete, self.timeout_ns_terminate + ), + stage, + nslcmop_id, + ) + tasks_dict_info.clear() + if error_list: + raise LcmException("; ".join(error_list)) + + db_vca_and_config_update = { + "_admin.deployed.VCA": vca_update, + "configurationStatus": config_update, + } + self.update_db_2( + "nsrs", db_nsr["_id"], db_vca_and_config_update + ) + scale_process = None + # SCALE-IN VCA - END + + # SCALE RO - BEGIN + if scaling_info.get("vdu-create") or scaling_info.get("vdu-delete"): + scale_process = "RO" + if self.ro_config.get("ng"): + await self._scale_ng_ro( + logging_text, db_nsr, db_nslcmop, db_vnfr, scaling_info, stage + ) + scaling_info.pop("vdu-create", None) + scaling_info.pop("vdu-delete", None) + + scale_process = None + # SCALE RO - END + + # SCALE KDU - BEGIN + if scaling_info.get("kdu-create") or scaling_info.get("kdu-delete"): + scale_process = "KDU" + await self._scale_kdu( + logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info + ) + scaling_info.pop("kdu-create", None) + scaling_info.pop("kdu-delete", None) + + scale_process = None + # SCALE KDU - END + + if db_nsr_update: + self.update_db_2("nsrs", nsr_id, db_nsr_update) + + # SCALE-UP VCA - BEGIN + if vca_scaling_info: + step = db_nslcmop_update[ + "detailed-status" + ] = "Creating new execution environments" + scale_process = "VCA" + for vca_info in vca_scaling_info: + if vca_info["type"] == "create" and not vca_info.get("osm_kdu_id"): + member_vnf_index = str(vca_info["member-vnf-index"]) + self.logger.debug( + logging_text + "vdu info: {}".format(vca_info) + ) + vnfd_id = db_vnfr["vnfd-ref"] + if vca_info.get("osm_vdu_id"): + vdu_index = int(vca_info["vdu_index"]) + deploy_params = {"OSM": get_osm_params(db_vnfr)} + if db_vnfr.get("additionalParamsForVnf"): + deploy_params.update( + parse_yaml_strings( + db_vnfr["additionalParamsForVnf"].copy() + ) + ) + descriptor_config = get_configuration( + db_vnfd, db_vnfd["id"] + ) + if descriptor_config: + vdu_id = None + vdu_name = None + kdu_name = None + self._deploy_n2vc( + logging_text=logging_text + + "member_vnf_index={} ".format(member_vnf_index), + db_nsr=db_nsr, + db_vnfr=db_vnfr, + nslcmop_id=nslcmop_id, + nsr_id=nsr_id, + nsi_id=nsi_id, + vnfd_id=vnfd_id, + vdu_id=vdu_id, + kdu_name=kdu_name, + member_vnf_index=member_vnf_index, + vdu_index=vdu_index, + vdu_name=vdu_name, + deploy_params=deploy_params, + descriptor_config=descriptor_config, + base_folder=base_folder, + task_instantiation_info=tasks_dict_info, + stage=stage, + ) + vdu_id = vca_info["osm_vdu_id"] + vdur = find_in_list( + db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id + ) + descriptor_config = get_configuration(db_vnfd, vdu_id) + if vdur.get("additionalParams"): + deploy_params_vdu = parse_yaml_strings( + vdur["additionalParams"] + ) + else: + deploy_params_vdu = deploy_params + deploy_params_vdu["OSM"] = get_osm_params( + db_vnfr, vdu_id, vdu_count_index=vdu_index + ) + if descriptor_config: + vdu_name = None + kdu_name = None + stage[ + 1 + ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format( + member_vnf_index, vdu_id, vdu_index + ) + stage[2] = step = "Scaling out VCA" + self._write_op_status(op_id=nslcmop_id, stage=stage) + self._deploy_n2vc( + logging_text=logging_text + + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format( + member_vnf_index, vdu_id, vdu_index + ), + db_nsr=db_nsr, + db_vnfr=db_vnfr, + nslcmop_id=nslcmop_id, + nsr_id=nsr_id, + nsi_id=nsi_id, + vnfd_id=vnfd_id, + vdu_id=vdu_id, + kdu_name=kdu_name, + member_vnf_index=member_vnf_index, + vdu_index=vdu_index, + vdu_name=vdu_name, + deploy_params=deploy_params_vdu, + descriptor_config=descriptor_config, + base_folder=base_folder, + task_instantiation_info=tasks_dict_info, + stage=stage, + ) + # SCALE-UP VCA - END + scale_process = None + + # POST-SCALE BEGIN + # execute primitive service POST-SCALING + step = "Executing post-scale vnf-config-primitive" + if scaling_descriptor.get("scaling-config-action"): + for scaling_config_action in scaling_descriptor[ + "scaling-config-action" + ]: + if ( + scaling_config_action.get("trigger") == "post-scale-in" + and scaling_type == "SCALE_IN" + ) or ( + scaling_config_action.get("trigger") == "post-scale-out" + and scaling_type == "SCALE_OUT" + ): + vnf_config_primitive = scaling_config_action[ + "vnf-config-primitive-name-ref" + ] + step = db_nslcmop_update[ + "detailed-status" + ] = "executing post-scale scaling-config-action '{}'".format( + vnf_config_primitive + ) + + vnfr_params = {"VDU_SCALE_INFO": scaling_info} + if db_vnfr.get("additionalParamsForVnf"): + vnfr_params.update(db_vnfr["additionalParamsForVnf"]) + + # look for primitive + for config_primitive in ( + get_configuration(db_vnfd, db_vnfd["id"]) or {} + ).get("config-primitive", ()): + if config_primitive["name"] == vnf_config_primitive: + break + else: + raise LcmException( + "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-" + "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:" + "config-primitive".format( + scaling_group, vnf_config_primitive + ) + ) + scale_process = "VCA" + db_nsr_update["config-status"] = "configuring post-scaling" + primitive_params = self._map_primitive_params( + config_primitive, {}, vnfr_params + ) + + # Post-scale retry check: Check if this sub-operation has been executed before + op_index = self._check_or_add_scale_suboperation( + db_nslcmop, + vnf_index, + vnf_config_primitive, + primitive_params, + "POST-SCALE", + ) + if op_index == self.SUBOPERATION_STATUS_SKIP: + # Skip sub-operation + result = "COMPLETED" + result_detail = "Done" + self.logger.debug( + logging_text + + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format( + vnf_config_primitive, result, result_detail + ) + ) + else: + if op_index == self.SUBOPERATION_STATUS_NEW: + # New sub-operation: Get index of this sub-operation + op_index = ( + len(db_nslcmop.get("_admin", {}).get("operations")) + - 1 + ) + self.logger.debug( + logging_text + + "vnf_config_primitive={} New sub-operation".format( + vnf_config_primitive + ) + ) + else: + # retry: Get registered params for this existing sub-operation + op = db_nslcmop.get("_admin", {}).get("operations", [])[ + op_index + ] + vnf_index = op.get("member_vnf_index") + vnf_config_primitive = op.get("primitive") + primitive_params = op.get("primitive_params") + self.logger.debug( + logging_text + + "vnf_config_primitive={} Sub-operation retry".format( + vnf_config_primitive + ) + ) + # Execute the primitive, either with new (first-time) or registered (reintent) args + ee_descriptor_id = config_primitive.get( + "execution-environment-ref" + ) + primitive_name = config_primitive.get( + "execution-environment-primitive", vnf_config_primitive + ) + ee_id, vca_type = self._look_for_deployed_vca( + nsr_deployed["VCA"], + member_vnf_index=vnf_index, + vdu_id=None, + vdu_count_index=None, + ee_descriptor_id=ee_descriptor_id, + ) + result, result_detail = await self._ns_execute_primitive( + ee_id, + primitive_name, + primitive_params, + vca_type=vca_type, + vca_id=vca_id, + ) + self.logger.debug( + logging_text + + "vnf_config_primitive={} Done with result {} {}".format( + vnf_config_primitive, result, result_detail + ) + ) + # Update operationState = COMPLETED | FAILED + self._update_suboperation_status( + db_nslcmop, op_index, result, result_detail + ) + + if result == "FAILED": + raise LcmException(result_detail) + db_nsr_update["config-status"] = old_config_status + scale_process = None + # POST-SCALE END + + db_nsr_update[ + "detailed-status" + ] = "" # "scaled {} {}".format(scaling_group, scaling_type) + db_nsr_update["operational-status"] = ( + "running" + if old_operational_status == "failed" + else old_operational_status + ) + db_nsr_update["config-status"] = old_config_status + return + except ( + ROclient.ROClientException, + DbException, + LcmException, + NgRoException, + ) as e: + self.logger.error(logging_text + "Exit Exception {}".format(e)) + exc = e + except asyncio.CancelledError: + self.logger.error( + logging_text + "Cancelled Exception while '{}'".format(step) + ) + exc = "Operation was cancelled" + except Exception as e: + exc = traceback.format_exc() + self.logger.critical( + logging_text + "Exit Exception {} {}".format(type(e).__name__, e), + exc_info=True, + ) + finally: + self._write_ns_status( + nsr_id=nsr_id, + ns_state=None, + current_operation="IDLE", + current_operation_id=None, + ) + if tasks_dict_info: + stage[1] = "Waiting for instantiate pending tasks." + self.logger.debug(logging_text + stage[1]) + exc = await self._wait_for_tasks( + logging_text, + tasks_dict_info, + self.timeout_ns_deploy, + stage, + nslcmop_id, + nsr_id=nsr_id, + ) + if exc: + db_nslcmop_update[ + "detailed-status" + ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc) + nslcmop_operation_state = "FAILED" + if db_nsr: + db_nsr_update["operational-status"] = old_operational_status + db_nsr_update["config-status"] = old_config_status + db_nsr_update["detailed-status"] = "" + if scale_process: + if "VCA" in scale_process: + db_nsr_update["config-status"] = "failed" + if "RO" in scale_process: + db_nsr_update["operational-status"] = "failed" + db_nsr_update[ + "detailed-status" + ] = "FAILED scaling nslcmop={} {}: {}".format( + nslcmop_id, step, exc + ) + else: + error_description_nslcmop = None + nslcmop_operation_state = "COMPLETED" + db_nslcmop_update["detailed-status"] = "Done" + + self._write_op_status( + op_id=nslcmop_id, + stage="", + error_message=error_description_nslcmop, + operation_state=nslcmop_operation_state, + other_update=db_nslcmop_update, + ) + if db_nsr: + self._write_ns_status( + nsr_id=nsr_id, + ns_state=None, + current_operation="IDLE", + current_operation_id=None, + other_update=db_nsr_update, + ) + + if nslcmop_operation_state: + try: + msg = { + "nsr_id": nsr_id, + "nslcmop_id": nslcmop_id, + "operationState": nslcmop_operation_state, + } + await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop) + except Exception as e: + self.logger.error( + logging_text + "kafka_write notification Exception {}".format(e) + ) + self.logger.debug(logging_text + "Exit") + self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale") + + async def _scale_kdu( + self, logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info + ): + _scaling_info = scaling_info.get("kdu-create") or scaling_info.get("kdu-delete") + for kdu_name in _scaling_info: + for kdu_scaling_info in _scaling_info[kdu_name]: + deployed_kdu, index = get_deployed_kdu( + nsr_deployed, kdu_name, kdu_scaling_info["member-vnf-index"] + ) + cluster_uuid = deployed_kdu["k8scluster-uuid"] + kdu_instance = deployed_kdu["kdu-instance"] + kdu_model = deployed_kdu.get("kdu-model") + scale = int(kdu_scaling_info["scale"]) + k8s_cluster_type = kdu_scaling_info["k8s-cluster-type"] + + db_dict = { + "collection": "nsrs", + "filter": {"_id": nsr_id}, + "path": "_admin.deployed.K8s.{}".format(index), + } + + step = "scaling application {}".format( + kdu_scaling_info["resource-name"] + ) + self.logger.debug(logging_text + step) + + if kdu_scaling_info["type"] == "delete": + kdu_config = get_configuration(db_vnfd, kdu_name) + if ( + kdu_config + and kdu_config.get("terminate-config-primitive") + and get_juju_ee_ref(db_vnfd, kdu_name) is None + ): + terminate_config_primitive_list = kdu_config.get( + "terminate-config-primitive" + ) + terminate_config_primitive_list.sort( + key=lambda val: int(val["seq"]) + ) + + for ( + terminate_config_primitive + ) in terminate_config_primitive_list: + primitive_params_ = self._map_primitive_params( + terminate_config_primitive, {}, {} + ) + step = "execute terminate config primitive" + self.logger.debug(logging_text + step) + await asyncio.wait_for( + self.k8scluster_map[k8s_cluster_type].exec_primitive( + cluster_uuid=cluster_uuid, + kdu_instance=kdu_instance, + primitive_name=terminate_config_primitive["name"], + params=primitive_params_, + db_dict=db_dict, + vca_id=vca_id, + ), + timeout=600, + ) + + await asyncio.wait_for( + self.k8scluster_map[k8s_cluster_type].scale( + kdu_instance, + scale, + kdu_scaling_info["resource-name"], + vca_id=vca_id, + cluster_uuid=cluster_uuid, + kdu_model=kdu_model, + atomic=True, + db_dict=db_dict, + ), + timeout=self.timeout_vca_on_error, + ) + + if kdu_scaling_info["type"] == "create": + kdu_config = get_configuration(db_vnfd, kdu_name) + if ( + kdu_config + and kdu_config.get("initial-config-primitive") + and get_juju_ee_ref(db_vnfd, kdu_name) is None + ): + initial_config_primitive_list = kdu_config.get( + "initial-config-primitive" + ) + initial_config_primitive_list.sort( + key=lambda val: int(val["seq"]) + ) + + for initial_config_primitive in initial_config_primitive_list: + primitive_params_ = self._map_primitive_params( + initial_config_primitive, {}, {} + ) + step = "execute initial config primitive" + self.logger.debug(logging_text + step) + await asyncio.wait_for( + self.k8scluster_map[k8s_cluster_type].exec_primitive( + cluster_uuid=cluster_uuid, + kdu_instance=kdu_instance, + primitive_name=initial_config_primitive["name"], + params=primitive_params_, + db_dict=db_dict, + vca_id=vca_id, + ), + timeout=600, + ) + + async def _scale_ng_ro( + self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage + ): + nsr_id = db_nslcmop["nsInstanceId"] + db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]}) + db_vnfrs = {} + + # read from db: vnfd's for every vnf + db_vnfds = [] + + # for each vnf in ns, read vnfd + for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}): + db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr + vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf + # if we haven't this vnfd, read it from db + if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id): + # read from db + vnfd = self.db.get_one("vnfds", {"_id": vnfd_id}) + db_vnfds.append(vnfd) + n2vc_key = self.n2vc.get_public_key() + n2vc_key_list = [n2vc_key] + self.scale_vnfr( + db_vnfr, + vdu_scaling_info.get("vdu-create"), + vdu_scaling_info.get("vdu-delete"), + mark_delete=True, + ) + # db_vnfr has been updated, update db_vnfrs to use it + db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr + await self._instantiate_ng_ro( + logging_text, + nsr_id, + db_nsd, + db_nsr, + db_nslcmop, + db_vnfrs, + db_vnfds, + n2vc_key_list, + stage=stage, + start_deploy=time(), + timeout_ns_deploy=self.timeout_ns_deploy, + ) + if vdu_scaling_info.get("vdu-delete"): + self.scale_vnfr( + db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False + ) + + async def extract_prometheus_scrape_jobs( + self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip + ): + # look if exist a file called 'prometheus*.j2' and + artifact_content = self.fs.dir_ls(artifact_path) + job_file = next( + ( + f + for f in artifact_content + if f.startswith("prometheus") and f.endswith(".j2") + ), + None, + ) + if not job_file: + return + with self.fs.file_open((artifact_path, job_file), "r") as f: + job_data = f.read() + + # TODO get_service + _, _, service = ee_id.partition(".") # remove prefix "namespace." + host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"]) + host_port = "80" + vnfr_id = vnfr_id.replace("-", "") + variables = { + "JOB_NAME": vnfr_id, + "TARGET_IP": target_ip, + "EXPORTER_POD_IP": host_name, + "EXPORTER_POD_PORT": host_port, + } + job_list = parse_job(job_data, variables) + # ensure job_name is using the vnfr_id. Adding the metadata nsr_id + for job in job_list: + if ( + not isinstance(job.get("job_name"), str) + or vnfr_id not in job["job_name"] + ): + job["job_name"] = vnfr_id + "_" + str(randint(1, 10000)) + job["nsr_id"] = nsr_id + job["vnfr_id"] = vnfr_id + return job_list + + async def rebuild_start_stop(self, nsr_id, nslcmop_id, vnf_id, additional_param, operation_type): + logging_text = "Task ns={} {}={} ".format(nsr_id, operation_type, nslcmop_id) + self.logger.info(logging_text + "Enter") + stage = ["Preparing the environment", ""] + # database nsrs record + db_nsr_update = {} + vdu_vim_name = None + vim_vm_id = None + # in case of error, indicates what part of scale was failed to put nsr at error status + start_deploy = time() + try: + db_vnfr = self.db.get_one("vnfrs", {"_id": vnf_id}) + vim_account_id = db_vnfr.get("vim-account-id") + vim_info_key = "vim:" + vim_account_id + vdur = find_in_list( + db_vnfr["vdur"], lambda vdu: vdu["count-index"] == additional_param["count-index"] + ) + if vdur: + vdu_vim_name = vdur["name"] + vim_vm_id = vdur["vim_info"][vim_info_key]["vim_id"] + target_vim, _ = next(k_v for k_v in vdur["vim_info"].items()) + self.logger.info("vdu_vim_name >> {} ".format(vdu_vim_name)) + # wait for any previous tasks in process + stage[1] = "Waiting for previous operations to terminate" + self.logger.info(stage[1]) + await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id) + + stage[1] = "Reading from database." + self.logger.info(stage[1]) + self._write_ns_status( + nsr_id=nsr_id, + ns_state=None, + current_operation=operation_type.upper(), + current_operation_id=nslcmop_id + ) + self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0) + + # read from db: ns + stage[1] = "Getting nsr={} from db.".format(nsr_id) + db_nsr_update["operational-status"] = operation_type + self.update_db_2("nsrs", nsr_id, db_nsr_update) + # Payload for RO + desc = { + operation_type: { + "vim_vm_id": vim_vm_id, + "vnf_id": vnf_id, + "vdu_index": additional_param["count-index"], + "vdu_id": vdur["id"], + "target_vim": target_vim, + "vim_account_id": vim_account_id + } + } + stage[1] = "Sending rebuild request to RO... {}".format(desc) + self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0) + self.logger.info("ro nsr id: {}".format(nsr_id)) + result_dict = await self.RO.operate(nsr_id, desc, operation_type) + self.logger.info("response from RO: {}".format(result_dict)) + action_id = result_dict["action_id"] + await self._wait_ng_ro( + nsr_id, action_id, nslcmop_id, start_deploy, self.timeout_operate + ) + return "COMPLETED", "Done" + except (ROclient.ROClientException, DbException, LcmException) as e: + self.logger.error("Exit Exception {}".format(e)) + exc = e + except asyncio.CancelledError: + self.logger.error("Cancelled Exception while '{}'".format(stage)) + exc = "Operation was cancelled" + except Exception as e: + exc = traceback.format_exc() + self.logger.critical("Exit Exception {} {}".format(type(e).__name__, e), exc_info=True) + return "FAILED", "Error in operate VNF {}".format(exc) + + def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str): + """ + Get VCA Cloud and VCA Cloud Credentials for the VIM account + + :param: vim_account_id: VIM Account ID + + :return: (cloud_name, cloud_credential) + """ + config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {}) + return config.get("vca_cloud"), config.get("vca_cloud_credential") + + def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str): + """ + Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account + + :param: vim_account_id: VIM Account ID + + :return: (cloud_name, cloud_credential) + """ + config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {}) + return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential") + + async def migrate(self, nsr_id, nslcmop_id): + """ + Migrate VNFs and VDUs instances in a NS + + :param: nsr_id: NS Instance ID + :param: nslcmop_id: nslcmop ID of migrate + + """ + # Try to lock HA task here + task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id) + if not task_is_locked_by_me: + return + logging_text = "Task ns={} migrate ".format(nsr_id) + self.logger.debug(logging_text + "Enter") + # get all needed from database + db_nslcmop = None + db_nslcmop_update = {} + nslcmop_operation_state = None + db_nsr_update = {} + target = {} + exc = None + # in case of error, indicates what part of scale was failed to put nsr at error status + start_deploy = time() + + try: + # wait for any previous tasks in process + step = "Waiting for previous operations to terminate" + await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id) + + self._write_ns_status( + nsr_id=nsr_id, + ns_state=None, + current_operation="MIGRATING", + current_operation_id=nslcmop_id, + ) + step = "Getting nslcmop from database" + self.logger.debug( + step + " after having waited for previous tasks to be completed" + ) + db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) + migrate_params = db_nslcmop.get("operationParams") + + target = {} + target.update(migrate_params) + desc = await self.RO.migrate(nsr_id, target) + self.logger.debug("RO return > {}".format(desc)) + action_id = desc["action_id"] + await self._wait_ng_ro( + nsr_id, action_id, nslcmop_id, start_deploy, self.timeout_migrate, + operation="migrate" + ) + except (ROclient.ROClientException, DbException, LcmException) as e: + self.logger.error("Exit Exception {}".format(e)) + exc = e + except asyncio.CancelledError: + self.logger.error("Cancelled Exception while '{}'".format(step)) + exc = "Operation was cancelled" + except Exception as e: + exc = traceback.format_exc() + self.logger.critical( + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True + ) + finally: + self._write_ns_status( + nsr_id=nsr_id, + ns_state=None, + current_operation="IDLE", + current_operation_id=None, + ) + if exc: + db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc) + nslcmop_operation_state = "FAILED" + else: + nslcmop_operation_state = "COMPLETED" + db_nslcmop_update["detailed-status"] = "Done" + db_nsr_update["detailed-status"] = "Done" + + self._write_op_status( + op_id=nslcmop_id, + stage="", + error_message="", + operation_state=nslcmop_operation_state, + other_update=db_nslcmop_update, + ) + if nslcmop_operation_state: + try: + msg = { + "nsr_id": nsr_id, + "nslcmop_id": nslcmop_id, + "operationState": nslcmop_operation_state, + } + await self.msg.aiowrite("ns", "migrated", msg, loop=self.loop) + except Exception as e: + self.logger.error( + logging_text + "kafka_write notification Exception {}".format(e) + ) + self.logger.debug(logging_text + "Exit") + self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_migrate") + + + async def heal(self, nsr_id, nslcmop_id): + """ + Heal NS + + :param nsr_id: ns instance to heal + :param nslcmop_id: operation to run + :return: + """ + + # Try to lock HA task here + task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id) + if not task_is_locked_by_me: + return + + logging_text = "Task ns={} heal={} ".format(nsr_id, nslcmop_id) + stage = ["", "", ""] + tasks_dict_info = {} + # ^ stage, step, VIM progress + self.logger.debug(logging_text + "Enter") + # get all needed from database + db_nsr = None + db_nslcmop_update = {} + db_nsr_update = {} + db_vnfrs = {} # vnf's info indexed by _id + exc = None + old_operational_status = "" + old_config_status = "" + nsi_id = None + try: + # wait for any previous tasks in process + step = "Waiting for previous operations to terminate" + await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id) + self._write_ns_status( + nsr_id=nsr_id, + ns_state=None, + current_operation="HEALING", + current_operation_id=nslcmop_id, + ) + + step = "Getting nslcmop from database" + self.logger.debug( + step + " after having waited for previous tasks to be completed" + ) + db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) + + step = "Getting nsr from database" + db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) + old_operational_status = db_nsr["operational-status"] + old_config_status = db_nsr["config-status"] + + db_nsr_update = { + "_admin.deployed.RO.operational-status": "healing", + } + self.update_db_2("nsrs", nsr_id, db_nsr_update) + + step = "Sending heal order to VIM" + task_ro = asyncio.ensure_future( + self.heal_RO( + logging_text=logging_text, + nsr_id=nsr_id, + db_nslcmop=db_nslcmop, + stage=stage, + ) + ) + self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "heal_RO", task_ro) + tasks_dict_info[task_ro] = "Healing at VIM" + + # VCA tasks + # read from db: nsd + stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"]) + self.logger.debug(logging_text + stage[1]) + nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]}) + self.fs.sync(db_nsr["nsd-id"]) + db_nsr["nsd"] = nsd + # read from db: vnfr's of this ns + step = "Getting vnfrs from db" + db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}) + for vnfr in db_vnfrs_list: + db_vnfrs[vnfr["_id"]] = vnfr + self.logger.debug("ns.heal db_vnfrs={}".format(db_vnfrs)) + + # Check for each target VNF + target_list = db_nslcmop.get("operationParams", {}).get("healVnfData", {}) + for target_vnf in target_list: + # Find this VNF in the list from DB + vnfr_id = target_vnf.get("vnfInstanceId", None) + if vnfr_id: + db_vnfr = db_vnfrs[vnfr_id] + vnfd_id = db_vnfr.get("vnfd-id") + vnfd_ref = db_vnfr.get("vnfd-ref") + vnfd = self.db.get_one("vnfds", {"_id": vnfd_id}) + base_folder = vnfd["_admin"]["storage"] + vdu_id = None + vdu_index = 0 + vdu_name = None + kdu_name = None + nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI + member_vnf_index = db_vnfr.get("member-vnf-index-ref") + + # Check each target VDU and deploy N2VC + for target_vdu in target_vnf["additionalParams"].get("vdu", None): + deploy_params_vdu = target_vdu + # Set run-day1 vnf level value if not vdu level value exists + if not deploy_params_vdu.get("run-day1") and target_vnf["additionalParams"].get("run-day1"): + deploy_params_vdu["run-day1"] = target_vnf["additionalParams"].get("run-day1") + vdu_name = target_vdu.get("vdu-id", None) + # TODO: Get vdu_id from vdud. + vdu_id = vdu_name + # For multi instance VDU count-index is mandatory + # For single session VDU count-indes is 0 + vdu_index = target_vdu.get("count-index",0) + + # n2vc_redesign STEP 3 to 6 Deploy N2VC + stage[1] = "Deploying Execution Environments." + self.logger.debug(logging_text + stage[1]) + + # VNF Level charm. Normal case when proxy charms. + # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms. + descriptor_config = get_configuration(vnfd, vnfd_ref) + if descriptor_config: + # Continue if healed machine is management machine + vnf_ip_address = db_vnfr.get("ip-address") + target_instance = None + for instance in db_vnfr.get("vdur", None): + if ( instance["vdu-name"] == vdu_name and instance["count-index"] == vdu_index ): + target_instance = instance + break + if vnf_ip_address == target_instance.get("ip-address"): + self._heal_n2vc( + logging_text=logging_text + + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format( + member_vnf_index, vdu_name, vdu_index + ), + db_nsr=db_nsr, + db_vnfr=db_vnfr, + nslcmop_id=nslcmop_id, + nsr_id=nsr_id, + nsi_id=nsi_id, + vnfd_id=vnfd_ref, + vdu_id=None, + kdu_name=None, + member_vnf_index=member_vnf_index, + vdu_index=0, + vdu_name=None, + deploy_params=deploy_params_vdu, + descriptor_config=descriptor_config, + base_folder=base_folder, + task_instantiation_info=tasks_dict_info, + stage=stage, + ) + + # VDU Level charm. Normal case with native charms. + descriptor_config = get_configuration(vnfd, vdu_name) + if descriptor_config: + self._heal_n2vc( + logging_text=logging_text + + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format( + member_vnf_index, vdu_name, vdu_index + ), + db_nsr=db_nsr, + db_vnfr=db_vnfr, + nslcmop_id=nslcmop_id, + nsr_id=nsr_id, + nsi_id=nsi_id, + vnfd_id=vnfd_ref, + vdu_id=vdu_id, + kdu_name=kdu_name, + member_vnf_index=member_vnf_index, + vdu_index=vdu_index, + vdu_name=vdu_name, + deploy_params=deploy_params_vdu, + descriptor_config=descriptor_config, + base_folder=base_folder, + task_instantiation_info=tasks_dict_info, + stage=stage, + ) + + except ( + ROclient.ROClientException, + DbException, + LcmException, + NgRoException, + ) as e: + self.logger.error(logging_text + "Exit Exception {}".format(e)) + exc = e + except asyncio.CancelledError: + self.logger.error( + logging_text + "Cancelled Exception while '{}'".format(step) + ) + exc = "Operation was cancelled" + except Exception as e: + exc = traceback.format_exc() + self.logger.critical( + logging_text + "Exit Exception {} {}".format(type(e).__name__, e), + exc_info=True, + ) + finally: + if tasks_dict_info: + stage[1] = "Waiting for healing pending tasks." + self.logger.debug(logging_text + stage[1]) + exc = await self._wait_for_tasks( + logging_text, + tasks_dict_info, + self.timeout_ns_deploy, + stage, + nslcmop_id, + nsr_id=nsr_id, + ) + if exc: + db_nslcmop_update[ + "detailed-status" + ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc) + nslcmop_operation_state = "FAILED" + if db_nsr: + db_nsr_update["operational-status"] = old_operational_status + db_nsr_update["config-status"] = old_config_status + db_nsr_update[ + "detailed-status" + ] = "FAILED healing nslcmop={} {}: {}".format( + nslcmop_id, step, exc + ) + for task, task_name in tasks_dict_info.items(): + if not task.done() or task.cancelled() or task.exception(): + if task_name.startswith(self.task_name_deploy_vca): + # A N2VC task is pending + db_nsr_update["config-status"] = "failed" + else: + # RO task is pending + db_nsr_update["operational-status"] = "failed" + else: + error_description_nslcmop = None + nslcmop_operation_state = "COMPLETED" + db_nslcmop_update["detailed-status"] = "Done" + db_nsr_update["detailed-status"] = "Done" + db_nsr_update["operational-status"] = "running" + db_nsr_update["config-status"] = "configured" + + self._write_op_status( + op_id=nslcmop_id, + stage="", + error_message=error_description_nslcmop, + operation_state=nslcmop_operation_state, + other_update=db_nslcmop_update, + ) + if db_nsr: + self._write_ns_status( + nsr_id=nsr_id, + ns_state=None, + current_operation="IDLE", + current_operation_id=None, + other_update=db_nsr_update, + ) + + if nslcmop_operation_state: + try: + msg = { + "nsr_id": nsr_id, + "nslcmop_id": nslcmop_id, + "operationState": nslcmop_operation_state, + } + await self.msg.aiowrite("ns", "healed", msg, loop=self.loop) + except Exception as e: + self.logger.error( + logging_text + "kafka_write notification Exception {}".format(e) + ) + self.logger.debug(logging_text + "Exit") + self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_heal") + + async def heal_RO( + self, + logging_text, + nsr_id, + db_nslcmop, + stage, + ): + """ + Heal at RO + :param logging_text: preffix text to use at logging + :param nsr_id: nsr identity + :param db_nslcmop: database content of ns operation, in this case, 'instantiate' + :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific + :return: None or exception + """ + def get_vim_account(vim_account_id): + nonlocal db_vims + if vim_account_id in db_vims: + return db_vims[vim_account_id] + db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id}) + db_vims[vim_account_id] = db_vim + return db_vim + + try: + start_heal = time() + ns_params = db_nslcmop.get("operationParams") + if ns_params and ns_params.get("timeout_ns_heal"): + timeout_ns_heal = ns_params["timeout_ns_heal"] + else: + timeout_ns_heal = self.timeout.get( + "ns_heal", self.timeout_ns_heal + ) + + db_vims = {} + + nslcmop_id = db_nslcmop["_id"] + target = { + "action_id": nslcmop_id, + } + self.logger.warning("db_nslcmop={} and timeout_ns_heal={}".format(db_nslcmop,timeout_ns_heal)) + target.update(db_nslcmop.get("operationParams", {})) + + self.logger.debug("Send to RO > nsr_id={} target={}".format(nsr_id, target)) + desc = await self.RO.recreate(nsr_id, target) + self.logger.debug("RO return > {}".format(desc)) + action_id = desc["action_id"] + # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted + await self._wait_ng_ro( + nsr_id, action_id, nslcmop_id, start_heal, timeout_ns_heal, stage, + operation="healing" + ) + + # Updating NSR + db_nsr_update = { + "_admin.deployed.RO.operational-status": "running", + "detailed-status": " ".join(stage), + } + self.update_db_2("nsrs", nsr_id, db_nsr_update) + self._write_op_status(nslcmop_id, stage) + self.logger.debug( + logging_text + "ns healed at RO. RO_id={}".format(action_id) + ) + + except Exception as e: + stage[2] = "ERROR healing at VIM" + #self.set_vnfr_at_error(db_vnfrs, str(e)) + self.logger.error( + "Error healing at VIM {}".format(e), + exc_info=not isinstance( + e, + ( + ROclient.ROClientException, + LcmException, + DbException, + NgRoException, + ), + ), + ) + raise + + def _heal_n2vc( + self, + logging_text, + db_nsr, + db_vnfr, + nslcmop_id, + nsr_id, + nsi_id, + vnfd_id, + vdu_id, + kdu_name, + member_vnf_index, + vdu_index, + vdu_name, + deploy_params, + descriptor_config, + base_folder, + task_instantiation_info, + stage, + ): + # launch instantiate_N2VC in a asyncio task and register task object + # Look where information of this charm is at database ._admin.deployed.VCA + # if not found, create one entry and update database + # fill db_nsr._admin.deployed.VCA. + + self.logger.debug( + logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id) + ) + if "execution-environment-list" in descriptor_config: + ee_list = descriptor_config.get("execution-environment-list", []) + elif "juju" in descriptor_config: + ee_list = [descriptor_config] # ns charms + else: # other types as script are not supported + ee_list = [] + + for ee_item in ee_list: + self.logger.debug( + logging_text + + "_deploy_n2vc ee_item juju={}, helm={}".format( + ee_item.get("juju"), ee_item.get("helm-chart") + ) + ) + ee_descriptor_id = ee_item.get("id") + if ee_item.get("juju"): + vca_name = ee_item["juju"].get("charm") + vca_type = ( + "lxc_proxy_charm" + if ee_item["juju"].get("charm") is not None + else "native_charm" + ) + if ee_item["juju"].get("cloud") == "k8s": + vca_type = "k8s_proxy_charm" + elif ee_item["juju"].get("proxy") is False: + vca_type = "native_charm" + elif ee_item.get("helm-chart"): + vca_name = ee_item["helm-chart"] + if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2": + vca_type = "helm" + else: + vca_type = "helm-v3" + else: + self.logger.debug( + logging_text + "skipping non juju neither charm configuration" + ) + continue + + vca_index = -1 + for vca_index, vca_deployed in enumerate( + db_nsr["_admin"]["deployed"]["VCA"] + ): + if not vca_deployed: + continue + if ( + vca_deployed.get("member-vnf-index") == member_vnf_index + and vca_deployed.get("vdu_id") == vdu_id + and vca_deployed.get("kdu_name") == kdu_name + and vca_deployed.get("vdu_count_index", 0) == vdu_index + and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id + ): + break + else: + # not found, create one. + target = ( + "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index) + ) + if vdu_id: + target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0) + elif kdu_name: + target += "/kdu/{}".format(kdu_name) + vca_deployed = { + "target_element": target, + # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string + "member-vnf-index": member_vnf_index, + "vdu_id": vdu_id, + "kdu_name": kdu_name, + "vdu_count_index": vdu_index, + "operational-status": "init", # TODO revise + "detailed-status": "", # TODO revise + "step": "initial-deploy", # TODO revise + "vnfd_id": vnfd_id, + "vdu_name": vdu_name, + "type": vca_type, + "ee_descriptor_id": ee_descriptor_id, + } + vca_index += 1 + + # create VCA and configurationStatus in db + db_dict = { + "_admin.deployed.VCA.{}".format(vca_index): vca_deployed, + "configurationStatus.{}".format(vca_index): dict(), + } + self.update_db_2("nsrs", nsr_id, db_dict) + + db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed) + + self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id)) + self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr)) + self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed)) + + # Launch task + task_n2vc = asyncio.ensure_future( + self.heal_N2VC( + logging_text=logging_text, + vca_index=vca_index, + nsi_id=nsi_id, + db_nsr=db_nsr, + db_vnfr=db_vnfr, + vdu_id=vdu_id, + kdu_name=kdu_name, + vdu_index=vdu_index, + deploy_params=deploy_params, + config_descriptor=descriptor_config, + base_folder=base_folder, + nslcmop_id=nslcmop_id, + stage=stage, + vca_type=vca_type, + vca_name=vca_name, + ee_config_descriptor=ee_item, + ) + ) + self.lcm_tasks.register( + "ns", + nsr_id, + nslcmop_id, + "instantiate_N2VC-{}".format(vca_index), + task_n2vc, + ) + task_instantiation_info[ + task_n2vc + ] = self.task_name_deploy_vca + " {}.{}".format( + member_vnf_index or "", vdu_id or "" + ) + + async def heal_N2VC( + self, + logging_text, + vca_index, + nsi_id, + db_nsr, + db_vnfr, + vdu_id, + kdu_name, + vdu_index, + config_descriptor, + deploy_params, + base_folder, + nslcmop_id, + stage, + vca_type, + vca_name, + ee_config_descriptor, + ): + nsr_id = db_nsr["_id"] + db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index) + vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"] + vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index] + osm_config = {"osm": {"ns_id": db_nsr["_id"]}} + db_dict = { + "collection": "nsrs", + "filter": {"_id": nsr_id}, + "path": db_update_entry, + } + step = "" + try: + + element_type = "NS" + element_under_configuration = nsr_id + + vnfr_id = None + if db_vnfr: + vnfr_id = db_vnfr["_id"] + osm_config["osm"]["vnf_id"] = vnfr_id + + namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id) + + if vca_type == "native_charm": + index_number = 0 + else: + index_number = vdu_index or 0 + + if vnfr_id: + element_type = "VNF" + element_under_configuration = vnfr_id + namespace += ".{}-{}".format(vnfr_id, index_number) + if vdu_id: + namespace += ".{}-{}".format(vdu_id, index_number) + element_type = "VDU" + element_under_configuration = "{}-{}".format(vdu_id, index_number) + osm_config["osm"]["vdu_id"] = vdu_id + elif kdu_name: + namespace += ".{}".format(kdu_name) + element_type = "KDU" + element_under_configuration = kdu_name + osm_config["osm"]["kdu_name"] = kdu_name + + # Get artifact path + if base_folder["pkg-dir"]: + artifact_path = "{}/{}/{}/{}".format( + base_folder["folder"], + base_folder["pkg-dir"], + "charms" + if vca_type + in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") + else "helm-charts", + vca_name, + ) + else: + artifact_path = "{}/Scripts/{}/{}/".format( + base_folder["folder"], + "charms" + if vca_type + in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") + else "helm-charts", + vca_name, + ) + + self.logger.debug("Artifact path > {}".format(artifact_path)) + + # get initial_config_primitive_list that applies to this element + initial_config_primitive_list = config_descriptor.get( + "initial-config-primitive" ) - db_nsr_update["config-status"] = old_config_status - return - except ( - ROclient.ROClientException, - DbException, - LcmException, - NgRoException, - ) as e: - self.logger.error(logging_text + "Exit Exception {}".format(e)) - exc = e - except asyncio.CancelledError: - self.logger.error( - logging_text + "Cancelled Exception while '{}'".format(step) + + self.logger.debug( + "Initial config primitive list > {}".format( + initial_config_primitive_list + ) ) - exc = "Operation was cancelled" - except Exception as e: - exc = traceback.format_exc() - self.logger.critical( - logging_text + "Exit Exception {} {}".format(type(e).__name__, e), - exc_info=True, + + # add config if not present for NS charm + ee_descriptor_id = ee_config_descriptor.get("id") + self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id)) + initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list( + initial_config_primitive_list, vca_deployed, ee_descriptor_id ) - finally: - self._write_ns_status( - nsr_id=nsr_id, - ns_state=None, - current_operation="IDLE", - current_operation_id=None, + + self.logger.debug( + "Initial config primitive list #2 > {}".format( + initial_config_primitive_list + ) ) - if tasks_dict_info: - stage[1] = "Waiting for instantiate pending tasks." - self.logger.debug(logging_text + stage[1]) - exc = await self._wait_for_tasks( + # n2vc_redesign STEP 3.1 + # find old ee_id if exists + ee_id = vca_deployed.get("ee_id") + + vca_id = self.get_vca_id(db_vnfr, db_nsr) + # create or register execution environment in VCA. Only for native charms when healing + if vca_type == "native_charm": + step = "Waiting to VM being up and getting IP address" + self.logger.debug(logging_text + step) + rw_mgmt_ip = await self.wait_vm_up_insert_key_ro( logging_text, - tasks_dict_info, - self.timeout_ns_deploy, - stage, - nslcmop_id, + nsr_id, + vnfr_id, + vdu_id, + vdu_index, + user=None, + pub_key=None, + ) + credentials = {"hostname": rw_mgmt_ip} + # get username + username = deep_get( + config_descriptor, ("config-access", "ssh-access", "default-user") + ) + # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were + # merged. Meanwhile let's get username from initial-config-primitive + if not username and initial_config_primitive_list: + for config_primitive in initial_config_primitive_list: + for param in config_primitive.get("parameter", ()): + if param["name"] == "ssh-username": + username = param["value"] + break + if not username: + raise LcmException( + "Cannot determine the username neither with 'initial-config-primitive' nor with " + "'config-access.ssh-access.default-user'" + ) + credentials["username"] = username + + # n2vc_redesign STEP 3.2 + # TODO: Before healing at RO it is needed to destroy native charm units to be deleted. + self._write_configuration_status( nsr_id=nsr_id, + vca_index=vca_index, + status="REGISTERING", + element_under_configuration=element_under_configuration, + element_type=element_type, ) - if exc: - db_nslcmop_update[ - "detailed-status" - ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc) - nslcmop_operation_state = "FAILED" - if db_nsr: - db_nsr_update["operational-status"] = old_operational_status - db_nsr_update["config-status"] = old_config_status - db_nsr_update["detailed-status"] = "" - if scale_process: - if "VCA" in scale_process: - db_nsr_update["config-status"] = "failed" - if "RO" in scale_process: - db_nsr_update["operational-status"] = "failed" - db_nsr_update[ - "detailed-status" - ] = "FAILED scaling nslcmop={} {}: {}".format( - nslcmop_id, step, exc - ) - else: - error_description_nslcmop = None - nslcmop_operation_state = "COMPLETED" - db_nslcmop_update["detailed-status"] = "Done" - self._write_op_status( - op_id=nslcmop_id, - stage="", - error_message=error_description_nslcmop, - operation_state=nslcmop_operation_state, - other_update=db_nslcmop_update, + step = "register execution environment {}".format(credentials) + self.logger.debug(logging_text + step) + ee_id = await self.vca_map[vca_type].register_execution_environment( + credentials=credentials, + namespace=namespace, + db_dict=db_dict, + vca_id=vca_id, + ) + + # update ee_id en db + db_dict_ee_id = { + "_admin.deployed.VCA.{}.ee_id".format(vca_index): ee_id, + } + self.update_db_2("nsrs", nsr_id, db_dict_ee_id) + + # for compatibility with MON/POL modules, the need model and application name at database + # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name" + # Not sure if this need to be done when healing + """ + ee_id_parts = ee_id.split(".") + db_nsr_update = {db_update_entry + "ee_id": ee_id} + if len(ee_id_parts) >= 2: + model_name = ee_id_parts[0] + application_name = ee_id_parts[1] + db_nsr_update[db_update_entry + "model"] = model_name + db_nsr_update[db_update_entry + "application"] = application_name + """ + + # n2vc_redesign STEP 3.3 + # Install configuration software. Only for native charms. + step = "Install configuration Software" + + self._write_configuration_status( + nsr_id=nsr_id, + vca_index=vca_index, + status="INSTALLING SW", + element_under_configuration=element_under_configuration, + element_type=element_type, + #other_update=db_nsr_update, + other_update=None, ) - if db_nsr: - self._write_ns_status( - nsr_id=nsr_id, - ns_state=None, - current_operation="IDLE", - current_operation_id=None, - other_update=db_nsr_update, + + # TODO check if already done + self.logger.debug(logging_text + step) + config = None + if vca_type == "native_charm": + config_primitive = next( + (p for p in initial_config_primitive_list if p["name"] == "config"), + None, + ) + if config_primitive: + config = self._map_primitive_params( + config_primitive, {}, deploy_params + ) + await self.vca_map[vca_type].install_configuration_sw( + ee_id=ee_id, + artifact_path=artifact_path, + db_dict=db_dict, + config=config, + num_units=1, + vca_id=vca_id, + vca_type=vca_type, ) - if nslcmop_operation_state: - try: - msg = { - "nsr_id": nsr_id, - "nslcmop_id": nslcmop_id, - "operationState": nslcmop_operation_state, - } - await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop) - except Exception as e: - self.logger.error( - logging_text + "kafka_write notification Exception {}".format(e) + # write in db flag of configuration_sw already installed + self.update_db_2( + "nsrs", nsr_id, {db_update_entry + "config_sw_installed": True} + ) + + # Not sure if this need to be done when healing + """ + # add relations for this VCA (wait for other peers related with this VCA) + await self._add_vca_relations( + logging_text=logging_text, + nsr_id=nsr_id, + vca_type=vca_type, + vca_index=vca_index, + ) + """ + + # if SSH access is required, then get execution environment SSH public + # if native charm we have waited already to VM be UP + if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"): + pub_key = None + user = None + # self.logger.debug("get ssh key block") + if deep_get( + config_descriptor, ("config-access", "ssh-access", "required") + ): + # self.logger.debug("ssh key needed") + # Needed to inject a ssh key + user = deep_get( + config_descriptor, + ("config-access", "ssh-access", "default-user"), + ) + step = "Install configuration Software, getting public ssh key" + pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key( + ee_id=ee_id, db_dict=db_dict, vca_id=vca_id ) - self.logger.debug(logging_text + "Exit") - self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale") - async def _scale_kdu( - self, logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info - ): - _scaling_info = scaling_info.get("kdu-create") or scaling_info.get("kdu-delete") - for kdu_name in _scaling_info: - for kdu_scaling_info in _scaling_info[kdu_name]: - deployed_kdu, index = get_deployed_kdu( - nsr_deployed, kdu_name, kdu_scaling_info["member-vnf-index"] - ) - cluster_uuid = deployed_kdu["k8scluster-uuid"] - kdu_instance = deployed_kdu["kdu-instance"] - kdu_model = deployed_kdu.get("kdu-model") - scale = int(kdu_scaling_info["scale"]) - k8s_cluster_type = kdu_scaling_info["k8s-cluster-type"] + step = "Insert public key into VM user={} ssh_key={}".format( + user, pub_key + ) + else: + # self.logger.debug("no need to get ssh key") + step = "Waiting to VM being up and getting IP address" + self.logger.debug(logging_text + step) + + # n2vc_redesign STEP 5.1 + # wait for RO (ip-address) Insert pub_key into VM + # IMPORTANT: We need do wait for RO to complete healing operation. + await self._wait_heal_ro(nsr_id,self.timeout_ns_heal) + if vnfr_id: + if kdu_name: + rw_mgmt_ip = await self.wait_kdu_up( + logging_text, nsr_id, vnfr_id, kdu_name + ) + else: + rw_mgmt_ip = await self.wait_vm_up_insert_key_ro( + logging_text, + nsr_id, + vnfr_id, + vdu_id, + vdu_index, + user=user, + pub_key=pub_key, + ) + else: + rw_mgmt_ip = None # This is for a NS configuration - db_dict = { - "collection": "nsrs", - "filter": {"_id": nsr_id}, - "path": "_admin.deployed.K8s.{}".format(index), - } + self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip)) - step = "scaling application {}".format( - kdu_scaling_info["resource-name"] + # store rw_mgmt_ip in deploy params for later replacement + deploy_params["rw_mgmt_ip"] = rw_mgmt_ip + + # Day1 operations. + # get run-day1 operation parameter + runDay1 = deploy_params.get("run-day1",False) + self.logger.debug(" Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id,vdu_id,runDay1)) + if runDay1: + # n2vc_redesign STEP 6 Execute initial config primitive + step = "execute initial config primitive" + + # wait for dependent primitives execution (NS -> VNF -> VDU) + if initial_config_primitive_list: + await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index) + + # stage, in function of element type: vdu, kdu, vnf or ns + my_vca = vca_deployed_list[vca_index] + if my_vca.get("vdu_id") or my_vca.get("kdu_name"): + # VDU or KDU + stage[0] = "Stage 3/5: running Day-1 primitives for VDU." + elif my_vca.get("member-vnf-index"): + # VNF + stage[0] = "Stage 4/5: running Day-1 primitives for VNF." + else: + # NS + stage[0] = "Stage 5/5: running Day-1 primitives for NS." + + self._write_configuration_status( + nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE" ) - self.logger.debug(logging_text + step) - if kdu_scaling_info["type"] == "delete": - kdu_config = get_configuration(db_vnfd, kdu_name) - if ( - kdu_config - and kdu_config.get("terminate-config-primitive") - and get_juju_ee_ref(db_vnfd, kdu_name) is None - ): - terminate_config_primitive_list = kdu_config.get( - "terminate-config-primitive" - ) - terminate_config_primitive_list.sort( - key=lambda val: int(val["seq"]) + self._write_op_status(op_id=nslcmop_id, stage=stage) + + check_if_terminated_needed = True + for initial_config_primitive in initial_config_primitive_list: + # adding information on the vca_deployed if it is a NS execution environment + if not vca_deployed["member-vnf-index"]: + deploy_params["ns_config_info"] = json.dumps( + self._get_ns_config_info(nsr_id) ) + # TODO check if already done + primitive_params_ = self._map_primitive_params( + initial_config_primitive, {}, deploy_params + ) - for ( - terminate_config_primitive - ) in terminate_config_primitive_list: - primitive_params_ = self._map_primitive_params( - terminate_config_primitive, {}, {} - ) - step = "execute terminate config primitive" - self.logger.debug(logging_text + step) - await asyncio.wait_for( - self.k8scluster_map[k8s_cluster_type].exec_primitive( - cluster_uuid=cluster_uuid, - kdu_instance=kdu_instance, - primitive_name=terminate_config_primitive["name"], - params=primitive_params_, - db_dict=db_dict, - vca_id=vca_id, - ), - timeout=600, + step = "execute primitive '{}' params '{}'".format( + initial_config_primitive["name"], primitive_params_ + ) + self.logger.debug(logging_text + step) + await self.vca_map[vca_type].exec_primitive( + ee_id=ee_id, + primitive_name=initial_config_primitive["name"], + params_dict=primitive_params_, + db_dict=db_dict, + vca_id=vca_id, + vca_type=vca_type, + ) + # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives + if check_if_terminated_needed: + if config_descriptor.get("terminate-config-primitive"): + self.update_db_2( + "nsrs", nsr_id, {db_update_entry + "needed_terminate": True} ) + check_if_terminated_needed = False - await asyncio.wait_for( - self.k8scluster_map[k8s_cluster_type].scale( - kdu_instance, - scale, - kdu_scaling_info["resource-name"], - vca_id=vca_id, - cluster_uuid=cluster_uuid, - kdu_model=kdu_model, - atomic=True, - db_dict=db_dict, - ), - timeout=self.timeout_vca_on_error, + # TODO register in database that primitive is done + + # STEP 7 Configure metrics + # Not sure if this need to be done when healing + """ + if vca_type == "helm" or vca_type == "helm-v3": + prometheus_jobs = await self.extract_prometheus_scrape_jobs( + ee_id=ee_id, + artifact_path=artifact_path, + ee_config_descriptor=ee_config_descriptor, + vnfr_id=vnfr_id, + nsr_id=nsr_id, + target_ip=rw_mgmt_ip, ) + if prometheus_jobs: + self.update_db_2( + "nsrs", + nsr_id, + {db_update_entry + "prometheus_jobs": prometheus_jobs}, + ) - if kdu_scaling_info["type"] == "create": - kdu_config = get_configuration(db_vnfd, kdu_name) - if ( - kdu_config - and kdu_config.get("initial-config-primitive") - and get_juju_ee_ref(db_vnfd, kdu_name) is None - ): - initial_config_primitive_list = kdu_config.get( - "initial-config-primitive" - ) - initial_config_primitive_list.sort( - key=lambda val: int(val["seq"]) + for job in prometheus_jobs: + self.db.set_one( + "prometheus_jobs", + {"job_name": job["job_name"]}, + job, + upsert=True, + fail_on_empty=False, ) - for initial_config_primitive in initial_config_primitive_list: - primitive_params_ = self._map_primitive_params( - initial_config_primitive, {}, {} - ) - step = "execute initial config primitive" - self.logger.debug(logging_text + step) - await asyncio.wait_for( - self.k8scluster_map[k8s_cluster_type].exec_primitive( - cluster_uuid=cluster_uuid, - kdu_instance=kdu_instance, - primitive_name=initial_config_primitive["name"], - params=primitive_params_, - db_dict=db_dict, - vca_id=vca_id, - ), - timeout=600, - ) - - async def _scale_ng_ro( - self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage - ): - nsr_id = db_nslcmop["nsInstanceId"] - db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]}) - db_vnfrs = {} - - # read from db: vnfd's for every vnf - db_vnfds = [] + """ + step = "instantiated at VCA" + self.logger.debug(logging_text + step) - # for each vnf in ns, read vnfd - for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}): - db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr - vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf - # if we haven't this vnfd, read it from db - if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id): - # read from db - vnfd = self.db.get_one("vnfds", {"_id": vnfd_id}) - db_vnfds.append(vnfd) - n2vc_key = self.n2vc.get_public_key() - n2vc_key_list = [n2vc_key] - self.scale_vnfr( - db_vnfr, - vdu_scaling_info.get("vdu-create"), - vdu_scaling_info.get("vdu-delete"), - mark_delete=True, - ) - # db_vnfr has been updated, update db_vnfrs to use it - db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr - await self._instantiate_ng_ro( - logging_text, - nsr_id, - db_nsd, - db_nsr, - db_nslcmop, - db_vnfrs, - db_vnfds, - n2vc_key_list, - stage=stage, - start_deploy=time(), - timeout_ns_deploy=self.timeout_ns_deploy, - ) - if vdu_scaling_info.get("vdu-delete"): - self.scale_vnfr( - db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False + self._write_configuration_status( + nsr_id=nsr_id, vca_index=vca_index, status="READY" ) - async def extract_prometheus_scrape_jobs( - self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip - ): - # look if exist a file called 'prometheus*.j2' and - artifact_content = self.fs.dir_ls(artifact_path) - job_file = next( - ( - f - for f in artifact_content - if f.startswith("prometheus") and f.endswith(".j2") - ), - None, - ) - if not job_file: - return - with self.fs.file_open((artifact_path, job_file), "r") as f: - job_data = f.read() - - # TODO get_service - _, _, service = ee_id.partition(".") # remove prefix "namespace." - host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"]) - host_port = "80" - vnfr_id = vnfr_id.replace("-", "") - variables = { - "JOB_NAME": vnfr_id, - "TARGET_IP": target_ip, - "EXPORTER_POD_IP": host_name, - "EXPORTER_POD_PORT": host_port, - } - job_list = parse_job(job_data, variables) - # ensure job_name is using the vnfr_id. Adding the metadata nsr_id - for job in job_list: - if ( - not isinstance(job.get("job_name"), str) - or vnfr_id not in job["job_name"] + except Exception as e: # TODO not use Exception but N2VC exception + # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"}) + if not isinstance( + e, (DbException, N2VCException, LcmException, asyncio.CancelledError) ): - job["job_name"] = vnfr_id + "_" + str(randint(1, 10000)) - job["nsr_id"] = nsr_id - job["vnfr_id"] = vnfr_id - return job_list - - def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str): - """ - Get VCA Cloud and VCA Cloud Credentials for the VIM account - - :param: vim_account_id: VIM Account ID - - :return: (cloud_name, cloud_credential) - """ - config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {}) - return config.get("vca_cloud"), config.get("vca_cloud_credential") - - def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str): - """ - Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account - - :param: vim_account_id: VIM Account ID + self.logger.error( + "Exception while {} : {}".format(step, e), exc_info=True + ) + self._write_configuration_status( + nsr_id=nsr_id, vca_index=vca_index, status="BROKEN" + ) + raise LcmException("{} {}".format(step, e)) from e - :return: (cloud_name, cloud_credential) - """ - config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {}) - return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential") + async def _wait_heal_ro( + self, + nsr_id, + timeout=600, + ): + start_time = time() + while time() <= start_time + timeout: + db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) + operational_status_ro = db_nsr["_admin"]["deployed"]["RO"]["operational-status"] + self.logger.debug("Wait Heal RO > {}".format(operational_status_ro)) + if operational_status_ro != "healing": + break + await asyncio.sleep(15, loop=self.loop) + else: # timeout_ns_deploy + raise NgRoException("Timeout waiting ns to deploy")