From 07f4e4cc95d17c613f5557e65781b8d4599ed20f Mon Sep 17 00:00:00 2001 From: garciadeblas Date: Thu, 9 Jun 2022 09:42:58 +0200 Subject: [PATCH] Feature 10909: Heal operation for VDU Change-Id: I609dc47a2253fc40bc712479eafac18b7d653785 Signed-off-by: garciadeblas Signed-off-by: gallardo Signed-off-by: palaciosj --- osm_lcm/lcm.py | 12 + osm_lcm/ng_ro.py | 59 +++ osm_lcm/ns.py | 945 ++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 1011 insertions(+), 5 deletions(-) diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py index 9eb37fa..9378b1d 100644 --- a/osm_lcm/lcm.py +++ b/osm_lcm/lcm.py @@ -455,6 +455,16 @@ class Lcm: task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id)) self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task) return + elif command == "heal": + # self.logger.debug("Healing NS {}".format(nsr_id)) + nslcmop = params + nslcmop_id = nslcmop["_id"] + nsr_id = nslcmop["nsInstanceId"] + task = asyncio.ensure_future(self.ns.heal(nsr_id, nslcmop_id)) + self.lcm_tasks.register( + "ns", nsr_id, nslcmop_id, "ns_heal", task + ) + return elif command == "migrate": nslcmop = params nslcmop_id = nslcmop["_id"] @@ -490,6 +500,7 @@ class Lcm: "terminated", "instantiated", "scaled", + "healed", "actioned", "updated", "migrated", @@ -548,6 +559,7 @@ class Lcm: "terminated", "instantiated", "scaled", + "healed", "actioned", ): # "scaled-cooldown-time" return diff --git a/osm_lcm/ng_ro.py b/osm_lcm/ng_ro.py index edbaf88..0acccc4 100644 --- a/osm_lcm/ng_ro.py +++ b/osm_lcm/ng_ro.py @@ -240,6 +240,65 @@ class NgRoClient: http_code=500, ) + async def recreate(self, nsr_id, target): + """ + Performs an action over an item + :param item: can be 'tenant', 'vnfd', 'nsd', 'ns', 'vim', 'vim_account', 'sdn' + :param item_id_name: RO id or name of the item. Raise and exception if more than one found + :param descriptor: can be a dict, or a yaml/json text. Autodetect unless descriptor_format is provided + :param descriptor_format: Can be 'json' or 'yaml' + :param kwargs: Overrides descriptor with values as name, description, vim_url, vim_url_admin, vim_type + keys can be a dot separated list to specify elements inside dict + :return: dictionary with the information or raises NgRoException on Error + """ + try: + if isinstance(target, str): + target = self._parse_yaml(target) + payload_req = yaml.safe_dump(target) + + url = "{}/ns/v1/recreate/{nsr_id}".format(self.endpoint_url, nsr_id=nsr_id) + async with aiohttp.ClientSession(loop=self.loop) as session: + self.logger.debug("NG-RO POST %s %s", url, payload_req) + async with session.post( + url, headers=self.headers_req, data=payload_req + ) as response: + response_text = await response.read() + self.logger.debug( + "POST {} [{}] {}".format( + url, response.status, response_text[:100] + ) + ) + if response.status >= 300: + raise NgRoException(response_text, http_code=response.status) + return self._parse_yaml(response_text, response=True) + except (aiohttp.ClientOSError, aiohttp.ClientError) as e: + raise NgRoException(e, http_code=504) + except asyncio.TimeoutError: + raise NgRoException("Timeout", http_code=504) + + async def recreate_status(self, nsr_id, action_id): + try: + url = "{}/ns/v1/recreate/{nsr_id}/{action_id}".format( + self.endpoint_url, nsr_id=nsr_id, action_id=action_id + ) + async with aiohttp.ClientSession(loop=self.loop) as session: + self.logger.debug("GET %s", url) + async with session.get(url, headers=self.headers_req) as response: + response_text = await response.read() + self.logger.debug( + "GET {} [{}] {}".format( + url, response.status, response_text[:100] + ) + ) + if response.status >= 300: + raise NgRoException(response_text, http_code=response.status) + return self._parse_yaml(response_text, response=True) + + except (aiohttp.ClientOSError, aiohttp.ClientError) as e: + raise NgRoException(e, http_code=504) + except asyncio.TimeoutError: + raise NgRoException("Timeout", http_code=504) + @staticmethod def _parse_yaml(descriptor, response=False): try: diff --git a/osm_lcm/ns.py b/osm_lcm/ns.py index d57b739..5345d78 100644 --- a/osm_lcm/ns.py +++ b/osm_lcm/ns.py @@ -125,6 +125,7 @@ class NsLcm(LcmBase): ) # Time for charm from first time at blocked,error status to mark as failed timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns timeout_ns_terminate = 1800 # default global timeout for un deployment a ns + timeout_ns_heal = 1800 # default global timeout for un deployment a ns timeout_charm_delete = 10 * 60 timeout_primitive = 30 * 60 # timeout for primitive execution timeout_ns_update = 30 * 60 # timeout for ns update @@ -218,6 +219,13 @@ class NsLcm(LcmBase): # create RO client self.RO = NgRoClient(self.loop, **self.ro_config) + self.op_status_map = { + "instantiation": self.RO.status, + "termination": self.RO.status, + "migrate": self.RO.status, + "healing": self.RO.recreate_status, + } + @staticmethod def increment_ip_mac(ip_mac, vm_index=1): if not isinstance(ip_mac, str): @@ -1239,11 +1247,13 @@ class NsLcm(LcmBase): target_vnf["vdur"] = vdur_list target["vnf"].append(target_vnf) + self.logger.debug("Send to RO > nsr_id={} target={}".format(nsr_id, target)) desc = await self.RO.deploy(nsr_id, target) self.logger.debug("RO return > {}".format(desc)) action_id = desc["action_id"] await self._wait_ng_ro( - nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage + nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage, + operation="instantiation" ) # Updating NSR @@ -1267,12 +1277,13 @@ class NsLcm(LcmBase): start_time=None, timeout=600, stage=None, + operation=None, ): detailed_status_old = None db_nsr_update = {} start_time = start_time or time() while time() <= start_time + timeout: - desc_status = await self.RO.status(nsr_id, action_id) + desc_status = await self.op_status_map[operation](nsr_id, action_id) self.logger.debug("Wait NG RO > {}".format(desc_status)) if desc_status["status"] == "FAILED": raise NgRoException(desc_status["details"]) @@ -1323,7 +1334,8 @@ class NsLcm(LcmBase): # wait until done delete_timeout = 20 * 60 # 20 minutes await self._wait_ng_ro( - nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage + nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage, + operation="termination" ) db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None @@ -1597,7 +1609,7 @@ class NsLcm(LcmBase): } desc = await self.RO.deploy(nsr_id, target) action_id = desc["action_id"] - await self._wait_ng_ro(nsr_id, action_id, timeout=600) + await self._wait_ng_ro(nsr_id, action_id, timeout=600, operation="instantiation") break else: # wait until NS is deployed at RO @@ -7260,7 +7272,8 @@ class NsLcm(LcmBase): self.logger.debug("RO return > {}".format(desc)) action_id = desc["action_id"] await self._wait_ng_ro( - nsr_id, action_id, nslcmop_id, start_deploy, self.timeout_migrate + nsr_id, action_id, nslcmop_id, start_deploy, self.timeout_migrate, + operation="migrate" ) except (ROclient.ROClientException, DbException, LcmException) as e: self.logger.error("Exit Exception {}".format(e)) @@ -7309,3 +7322,925 @@ class NsLcm(LcmBase): ) self.logger.debug(logging_text + "Exit") self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_migrate") + + + async def heal(self, nsr_id, nslcmop_id): + """ + Heal NS + + :param nsr_id: ns instance to heal + :param nslcmop_id: operation to run + :return: + """ + + # Try to lock HA task here + task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id) + if not task_is_locked_by_me: + return + + logging_text = "Task ns={} heal={} ".format(nsr_id, nslcmop_id) + stage = ["", "", ""] + tasks_dict_info = {} + # ^ stage, step, VIM progress + self.logger.debug(logging_text + "Enter") + # get all needed from database + db_nsr = None + db_nslcmop_update = {} + db_nsr_update = {} + db_vnfrs = {} # vnf's info indexed by _id + exc = None + old_operational_status = "" + old_config_status = "" + nsi_id = None + try: + # wait for any previous tasks in process + step = "Waiting for previous operations to terminate" + await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id) + self._write_ns_status( + nsr_id=nsr_id, + ns_state=None, + current_operation="HEALING", + current_operation_id=nslcmop_id, + ) + + step = "Getting nslcmop from database" + self.logger.debug( + step + " after having waited for previous tasks to be completed" + ) + db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) + + step = "Getting nsr from database" + db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) + old_operational_status = db_nsr["operational-status"] + old_config_status = db_nsr["config-status"] + + db_nsr_update = { + "_admin.deployed.RO.operational-status": "healing", + } + self.update_db_2("nsrs", nsr_id, db_nsr_update) + + step = "Sending heal order to VIM" + task_ro = asyncio.ensure_future( + self.heal_RO( + logging_text=logging_text, + nsr_id=nsr_id, + db_nslcmop=db_nslcmop, + stage=stage, + ) + ) + self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "heal_RO", task_ro) + tasks_dict_info[task_ro] = "Healing at VIM" + + # VCA tasks + # read from db: nsd + stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"]) + self.logger.debug(logging_text + stage[1]) + nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]}) + self.fs.sync(db_nsr["nsd-id"]) + db_nsr["nsd"] = nsd + # read from db: vnfr's of this ns + step = "Getting vnfrs from db" + db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}) + for vnfr in db_vnfrs_list: + db_vnfrs[vnfr["_id"]] = vnfr + self.logger.debug("ns.heal db_vnfrs={}".format(db_vnfrs)) + + # Check for each target VNF + target_list = db_nslcmop.get("operationParams", {}).get("healVnfData", {}) + for target_vnf in target_list: + # Find this VNF in the list from DB + vnfr_id = target_vnf.get("vnfInstanceId", None) + if vnfr_id: + db_vnfr = db_vnfrs[vnfr_id] + vnfd_id = db_vnfr.get("vnfd-id") + vnfd_ref = db_vnfr.get("vnfd-ref") + vnfd = self.db.get_one("vnfds", {"_id": vnfd_id}) + base_folder = vnfd["_admin"]["storage"] + vdu_id = None + vdu_index = 0 + vdu_name = None + kdu_name = None + nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI + member_vnf_index = db_vnfr.get("member-vnf-index-ref") + + # Check each target VDU and deploy N2VC + for target_vdu in target_vnf["additionalParams"].get("vdu", None): + deploy_params_vdu = target_vdu + # Set run-day1 vnf level value if not vdu level value exists + if not deploy_params_vdu.get("run-day1") and target_vnf["additionalParams"].get("run-day1"): + deploy_params_vdu["run-day1"] = target_vnf["additionalParams"].get("run-day1") + vdu_name = target_vdu.get("vdu-id", None) + # TODO: Get vdu_id from vdud. + vdu_id = vdu_name + # For multi instance VDU count-index is mandatory + # For single session VDU count-indes is 0 + vdu_index = target_vdu.get("count-index",0) + + # n2vc_redesign STEP 3 to 6 Deploy N2VC + stage[1] = "Deploying Execution Environments." + self.logger.debug(logging_text + stage[1]) + + # VNF Level charm. Normal case when proxy charms. + # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms. + descriptor_config = get_configuration(vnfd, vnfd_ref) + if descriptor_config: + # Continue if healed machine is management machine + vnf_ip_address = db_vnfr.get("ip-address") + target_instance = None + for instance in db_vnfr.get("vdur", None): + if ( instance["vdu-name"] == vdu_name and instance["count-index"] == vdu_index ): + target_instance = instance + break + if vnf_ip_address == target_instance.get("ip-address"): + self._heal_n2vc( + logging_text=logging_text + + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format( + member_vnf_index, vdu_name, vdu_index + ), + db_nsr=db_nsr, + db_vnfr=db_vnfr, + nslcmop_id=nslcmop_id, + nsr_id=nsr_id, + nsi_id=nsi_id, + vnfd_id=vnfd_ref, + vdu_id=None, + kdu_name=None, + member_vnf_index=member_vnf_index, + vdu_index=0, + vdu_name=None, + deploy_params=deploy_params_vdu, + descriptor_config=descriptor_config, + base_folder=base_folder, + task_instantiation_info=tasks_dict_info, + stage=stage, + ) + + # VDU Level charm. Normal case with native charms. + descriptor_config = get_configuration(vnfd, vdu_name) + if descriptor_config: + self._heal_n2vc( + logging_text=logging_text + + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format( + member_vnf_index, vdu_name, vdu_index + ), + db_nsr=db_nsr, + db_vnfr=db_vnfr, + nslcmop_id=nslcmop_id, + nsr_id=nsr_id, + nsi_id=nsi_id, + vnfd_id=vnfd_ref, + vdu_id=vdu_id, + kdu_name=kdu_name, + member_vnf_index=member_vnf_index, + vdu_index=vdu_index, + vdu_name=vdu_name, + deploy_params=deploy_params_vdu, + descriptor_config=descriptor_config, + base_folder=base_folder, + task_instantiation_info=tasks_dict_info, + stage=stage, + ) + + except ( + ROclient.ROClientException, + DbException, + LcmException, + NgRoException, + ) as e: + self.logger.error(logging_text + "Exit Exception {}".format(e)) + exc = e + except asyncio.CancelledError: + self.logger.error( + logging_text + "Cancelled Exception while '{}'".format(step) + ) + exc = "Operation was cancelled" + except Exception as e: + exc = traceback.format_exc() + self.logger.critical( + logging_text + "Exit Exception {} {}".format(type(e).__name__, e), + exc_info=True, + ) + finally: + if tasks_dict_info: + stage[1] = "Waiting for healing pending tasks." + self.logger.debug(logging_text + stage[1]) + exc = await self._wait_for_tasks( + logging_text, + tasks_dict_info, + self.timeout_ns_deploy, + stage, + nslcmop_id, + nsr_id=nsr_id, + ) + if exc: + db_nslcmop_update[ + "detailed-status" + ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc) + nslcmop_operation_state = "FAILED" + if db_nsr: + db_nsr_update["operational-status"] = old_operational_status + db_nsr_update["config-status"] = old_config_status + db_nsr_update[ + "detailed-status" + ] = "FAILED healing nslcmop={} {}: {}".format( + nslcmop_id, step, exc + ) + for task, task_name in tasks_dict_info.items(): + if not task.done() or task.cancelled() or task.exception(): + if task_name.startswith(self.task_name_deploy_vca): + # A N2VC task is pending + db_nsr_update["config-status"] = "failed" + else: + # RO task is pending + db_nsr_update["operational-status"] = "failed" + else: + error_description_nslcmop = None + nslcmop_operation_state = "COMPLETED" + db_nslcmop_update["detailed-status"] = "Done" + db_nsr_update["detailed-status"] = "Done" + db_nsr_update["operational-status"] = "running" + db_nsr_update["config-status"] = "configured" + + self._write_op_status( + op_id=nslcmop_id, + stage="", + error_message=error_description_nslcmop, + operation_state=nslcmop_operation_state, + other_update=db_nslcmop_update, + ) + if db_nsr: + self._write_ns_status( + nsr_id=nsr_id, + ns_state=None, + current_operation="IDLE", + current_operation_id=None, + other_update=db_nsr_update, + ) + + if nslcmop_operation_state: + try: + msg = { + "nsr_id": nsr_id, + "nslcmop_id": nslcmop_id, + "operationState": nslcmop_operation_state, + } + await self.msg.aiowrite("ns", "healed", msg, loop=self.loop) + except Exception as e: + self.logger.error( + logging_text + "kafka_write notification Exception {}".format(e) + ) + self.logger.debug(logging_text + "Exit") + self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_heal") + + async def heal_RO( + self, + logging_text, + nsr_id, + db_nslcmop, + stage, + ): + """ + Heal at RO + :param logging_text: preffix text to use at logging + :param nsr_id: nsr identity + :param db_nslcmop: database content of ns operation, in this case, 'instantiate' + :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific + :return: None or exception + """ + def get_vim_account(vim_account_id): + nonlocal db_vims + if vim_account_id in db_vims: + return db_vims[vim_account_id] + db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id}) + db_vims[vim_account_id] = db_vim + return db_vim + + try: + start_heal = time() + ns_params = db_nslcmop.get("operationParams") + if ns_params and ns_params.get("timeout_ns_heal"): + timeout_ns_heal = ns_params["timeout_ns_heal"] + else: + timeout_ns_heal = self.timeout.get( + "ns_heal", self.timeout_ns_heal + ) + + db_vims = {} + + nslcmop_id = db_nslcmop["_id"] + target = { + "action_id": nslcmop_id, + } + self.logger.warning("db_nslcmop={} and timeout_ns_heal={}".format(db_nslcmop,timeout_ns_heal)) + target.update(db_nslcmop.get("operationParams", {})) + + self.logger.debug("Send to RO > nsr_id={} target={}".format(nsr_id, target)) + desc = await self.RO.recreate(nsr_id, target) + self.logger.debug("RO return > {}".format(desc)) + action_id = desc["action_id"] + # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted + await self._wait_ng_ro( + nsr_id, action_id, nslcmop_id, start_heal, timeout_ns_heal, stage, + operation="healing" + ) + + # Updating NSR + db_nsr_update = { + "_admin.deployed.RO.operational-status": "running", + "detailed-status": " ".join(stage), + } + self.update_db_2("nsrs", nsr_id, db_nsr_update) + self._write_op_status(nslcmop_id, stage) + self.logger.debug( + logging_text + "ns healed at RO. RO_id={}".format(action_id) + ) + + except Exception as e: + stage[2] = "ERROR healing at VIM" + #self.set_vnfr_at_error(db_vnfrs, str(e)) + self.logger.error( + "Error healing at VIM {}".format(e), + exc_info=not isinstance( + e, + ( + ROclient.ROClientException, + LcmException, + DbException, + NgRoException, + ), + ), + ) + raise + + def _heal_n2vc( + self, + logging_text, + db_nsr, + db_vnfr, + nslcmop_id, + nsr_id, + nsi_id, + vnfd_id, + vdu_id, + kdu_name, + member_vnf_index, + vdu_index, + vdu_name, + deploy_params, + descriptor_config, + base_folder, + task_instantiation_info, + stage, + ): + # launch instantiate_N2VC in a asyncio task and register task object + # Look where information of this charm is at database ._admin.deployed.VCA + # if not found, create one entry and update database + # fill db_nsr._admin.deployed.VCA. + + self.logger.debug( + logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id) + ) + if "execution-environment-list" in descriptor_config: + ee_list = descriptor_config.get("execution-environment-list", []) + elif "juju" in descriptor_config: + ee_list = [descriptor_config] # ns charms + else: # other types as script are not supported + ee_list = [] + + for ee_item in ee_list: + self.logger.debug( + logging_text + + "_deploy_n2vc ee_item juju={}, helm={}".format( + ee_item.get("juju"), ee_item.get("helm-chart") + ) + ) + ee_descriptor_id = ee_item.get("id") + if ee_item.get("juju"): + vca_name = ee_item["juju"].get("charm") + vca_type = ( + "lxc_proxy_charm" + if ee_item["juju"].get("charm") is not None + else "native_charm" + ) + if ee_item["juju"].get("cloud") == "k8s": + vca_type = "k8s_proxy_charm" + elif ee_item["juju"].get("proxy") is False: + vca_type = "native_charm" + elif ee_item.get("helm-chart"): + vca_name = ee_item["helm-chart"] + if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2": + vca_type = "helm" + else: + vca_type = "helm-v3" + else: + self.logger.debug( + logging_text + "skipping non juju neither charm configuration" + ) + continue + + vca_index = -1 + for vca_index, vca_deployed in enumerate( + db_nsr["_admin"]["deployed"]["VCA"] + ): + if not vca_deployed: + continue + if ( + vca_deployed.get("member-vnf-index") == member_vnf_index + and vca_deployed.get("vdu_id") == vdu_id + and vca_deployed.get("kdu_name") == kdu_name + and vca_deployed.get("vdu_count_index", 0) == vdu_index + and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id + ): + break + else: + # not found, create one. + target = ( + "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index) + ) + if vdu_id: + target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0) + elif kdu_name: + target += "/kdu/{}".format(kdu_name) + vca_deployed = { + "target_element": target, + # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string + "member-vnf-index": member_vnf_index, + "vdu_id": vdu_id, + "kdu_name": kdu_name, + "vdu_count_index": vdu_index, + "operational-status": "init", # TODO revise + "detailed-status": "", # TODO revise + "step": "initial-deploy", # TODO revise + "vnfd_id": vnfd_id, + "vdu_name": vdu_name, + "type": vca_type, + "ee_descriptor_id": ee_descriptor_id, + } + vca_index += 1 + + # create VCA and configurationStatus in db + db_dict = { + "_admin.deployed.VCA.{}".format(vca_index): vca_deployed, + "configurationStatus.{}".format(vca_index): dict(), + } + self.update_db_2("nsrs", nsr_id, db_dict) + + db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed) + + self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id)) + self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr)) + self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed)) + + # Launch task + task_n2vc = asyncio.ensure_future( + self.heal_N2VC( + logging_text=logging_text, + vca_index=vca_index, + nsi_id=nsi_id, + db_nsr=db_nsr, + db_vnfr=db_vnfr, + vdu_id=vdu_id, + kdu_name=kdu_name, + vdu_index=vdu_index, + deploy_params=deploy_params, + config_descriptor=descriptor_config, + base_folder=base_folder, + nslcmop_id=nslcmop_id, + stage=stage, + vca_type=vca_type, + vca_name=vca_name, + ee_config_descriptor=ee_item, + ) + ) + self.lcm_tasks.register( + "ns", + nsr_id, + nslcmop_id, + "instantiate_N2VC-{}".format(vca_index), + task_n2vc, + ) + task_instantiation_info[ + task_n2vc + ] = self.task_name_deploy_vca + " {}.{}".format( + member_vnf_index or "", vdu_id or "" + ) + + async def heal_N2VC( + self, + logging_text, + vca_index, + nsi_id, + db_nsr, + db_vnfr, + vdu_id, + kdu_name, + vdu_index, + config_descriptor, + deploy_params, + base_folder, + nslcmop_id, + stage, + vca_type, + vca_name, + ee_config_descriptor, + ): + nsr_id = db_nsr["_id"] + db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index) + vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"] + vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index] + osm_config = {"osm": {"ns_id": db_nsr["_id"]}} + db_dict = { + "collection": "nsrs", + "filter": {"_id": nsr_id}, + "path": db_update_entry, + } + step = "" + try: + + element_type = "NS" + element_under_configuration = nsr_id + + vnfr_id = None + if db_vnfr: + vnfr_id = db_vnfr["_id"] + osm_config["osm"]["vnf_id"] = vnfr_id + + namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id) + + if vca_type == "native_charm": + index_number = 0 + else: + index_number = vdu_index or 0 + + if vnfr_id: + element_type = "VNF" + element_under_configuration = vnfr_id + namespace += ".{}-{}".format(vnfr_id, index_number) + if vdu_id: + namespace += ".{}-{}".format(vdu_id, index_number) + element_type = "VDU" + element_under_configuration = "{}-{}".format(vdu_id, index_number) + osm_config["osm"]["vdu_id"] = vdu_id + elif kdu_name: + namespace += ".{}".format(kdu_name) + element_type = "KDU" + element_under_configuration = kdu_name + osm_config["osm"]["kdu_name"] = kdu_name + + # Get artifact path + if base_folder["pkg-dir"]: + artifact_path = "{}/{}/{}/{}".format( + base_folder["folder"], + base_folder["pkg-dir"], + "charms" + if vca_type + in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") + else "helm-charts", + vca_name, + ) + else: + artifact_path = "{}/Scripts/{}/{}/".format( + base_folder["folder"], + "charms" + if vca_type + in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") + else "helm-charts", + vca_name, + ) + + self.logger.debug("Artifact path > {}".format(artifact_path)) + + # get initial_config_primitive_list that applies to this element + initial_config_primitive_list = config_descriptor.get( + "initial-config-primitive" + ) + + self.logger.debug( + "Initial config primitive list > {}".format( + initial_config_primitive_list + ) + ) + + # add config if not present for NS charm + ee_descriptor_id = ee_config_descriptor.get("id") + self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id)) + initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list( + initial_config_primitive_list, vca_deployed, ee_descriptor_id + ) + + self.logger.debug( + "Initial config primitive list #2 > {}".format( + initial_config_primitive_list + ) + ) + # n2vc_redesign STEP 3.1 + # find old ee_id if exists + ee_id = vca_deployed.get("ee_id") + + vca_id = self.get_vca_id(db_vnfr, db_nsr) + # create or register execution environment in VCA. Only for native charms when healing + if vca_type == "native_charm": + step = "Waiting to VM being up and getting IP address" + self.logger.debug(logging_text + step) + rw_mgmt_ip = await self.wait_vm_up_insert_key_ro( + logging_text, + nsr_id, + vnfr_id, + vdu_id, + vdu_index, + user=None, + pub_key=None, + ) + credentials = {"hostname": rw_mgmt_ip} + # get username + username = deep_get( + config_descriptor, ("config-access", "ssh-access", "default-user") + ) + # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were + # merged. Meanwhile let's get username from initial-config-primitive + if not username and initial_config_primitive_list: + for config_primitive in initial_config_primitive_list: + for param in config_primitive.get("parameter", ()): + if param["name"] == "ssh-username": + username = param["value"] + break + if not username: + raise LcmException( + "Cannot determine the username neither with 'initial-config-primitive' nor with " + "'config-access.ssh-access.default-user'" + ) + credentials["username"] = username + + # n2vc_redesign STEP 3.2 + # TODO: Before healing at RO it is needed to destroy native charm units to be deleted. + self._write_configuration_status( + nsr_id=nsr_id, + vca_index=vca_index, + status="REGISTERING", + element_under_configuration=element_under_configuration, + element_type=element_type, + ) + + step = "register execution environment {}".format(credentials) + self.logger.debug(logging_text + step) + ee_id = await self.vca_map[vca_type].register_execution_environment( + credentials=credentials, + namespace=namespace, + db_dict=db_dict, + vca_id=vca_id, + ) + + # update ee_id en db + db_dict_ee_id = { + "_admin.deployed.VCA.{}.ee_id".format(vca_index): ee_id, + } + self.update_db_2("nsrs", nsr_id, db_dict_ee_id) + + # for compatibility with MON/POL modules, the need model and application name at database + # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name" + # Not sure if this need to be done when healing + """ + ee_id_parts = ee_id.split(".") + db_nsr_update = {db_update_entry + "ee_id": ee_id} + if len(ee_id_parts) >= 2: + model_name = ee_id_parts[0] + application_name = ee_id_parts[1] + db_nsr_update[db_update_entry + "model"] = model_name + db_nsr_update[db_update_entry + "application"] = application_name + """ + + # n2vc_redesign STEP 3.3 + # Install configuration software. Only for native charms. + step = "Install configuration Software" + + self._write_configuration_status( + nsr_id=nsr_id, + vca_index=vca_index, + status="INSTALLING SW", + element_under_configuration=element_under_configuration, + element_type=element_type, + #other_update=db_nsr_update, + other_update=None, + ) + + # TODO check if already done + self.logger.debug(logging_text + step) + config = None + if vca_type == "native_charm": + config_primitive = next( + (p for p in initial_config_primitive_list if p["name"] == "config"), + None, + ) + if config_primitive: + config = self._map_primitive_params( + config_primitive, {}, deploy_params + ) + await self.vca_map[vca_type].install_configuration_sw( + ee_id=ee_id, + artifact_path=artifact_path, + db_dict=db_dict, + config=config, + num_units=1, + vca_id=vca_id, + vca_type=vca_type, + ) + + # write in db flag of configuration_sw already installed + self.update_db_2( + "nsrs", nsr_id, {db_update_entry + "config_sw_installed": True} + ) + + # Not sure if this need to be done when healing + """ + # add relations for this VCA (wait for other peers related with this VCA) + await self._add_vca_relations( + logging_text=logging_text, + nsr_id=nsr_id, + vca_type=vca_type, + vca_index=vca_index, + ) + """ + + # if SSH access is required, then get execution environment SSH public + # if native charm we have waited already to VM be UP + if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"): + pub_key = None + user = None + # self.logger.debug("get ssh key block") + if deep_get( + config_descriptor, ("config-access", "ssh-access", "required") + ): + # self.logger.debug("ssh key needed") + # Needed to inject a ssh key + user = deep_get( + config_descriptor, + ("config-access", "ssh-access", "default-user"), + ) + step = "Install configuration Software, getting public ssh key" + pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key( + ee_id=ee_id, db_dict=db_dict, vca_id=vca_id + ) + + step = "Insert public key into VM user={} ssh_key={}".format( + user, pub_key + ) + else: + # self.logger.debug("no need to get ssh key") + step = "Waiting to VM being up and getting IP address" + self.logger.debug(logging_text + step) + + # n2vc_redesign STEP 5.1 + # wait for RO (ip-address) Insert pub_key into VM + # IMPORTANT: We need do wait for RO to complete healing operation. + await self._wait_heal_ro(nsr_id,self.timeout_ns_heal) + if vnfr_id: + if kdu_name: + rw_mgmt_ip = await self.wait_kdu_up( + logging_text, nsr_id, vnfr_id, kdu_name + ) + else: + rw_mgmt_ip = await self.wait_vm_up_insert_key_ro( + logging_text, + nsr_id, + vnfr_id, + vdu_id, + vdu_index, + user=user, + pub_key=pub_key, + ) + else: + rw_mgmt_ip = None # This is for a NS configuration + + self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip)) + + # store rw_mgmt_ip in deploy params for later replacement + deploy_params["rw_mgmt_ip"] = rw_mgmt_ip + + # Day1 operations. + # get run-day1 operation parameter + runDay1 = deploy_params.get("run-day1",False) + self.logger.debug(" Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id,vdu_id,runDay1)) + if runDay1: + # n2vc_redesign STEP 6 Execute initial config primitive + step = "execute initial config primitive" + + # wait for dependent primitives execution (NS -> VNF -> VDU) + if initial_config_primitive_list: + await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index) + + # stage, in function of element type: vdu, kdu, vnf or ns + my_vca = vca_deployed_list[vca_index] + if my_vca.get("vdu_id") or my_vca.get("kdu_name"): + # VDU or KDU + stage[0] = "Stage 3/5: running Day-1 primitives for VDU." + elif my_vca.get("member-vnf-index"): + # VNF + stage[0] = "Stage 4/5: running Day-1 primitives for VNF." + else: + # NS + stage[0] = "Stage 5/5: running Day-1 primitives for NS." + + self._write_configuration_status( + nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE" + ) + + self._write_op_status(op_id=nslcmop_id, stage=stage) + + check_if_terminated_needed = True + for initial_config_primitive in initial_config_primitive_list: + # adding information on the vca_deployed if it is a NS execution environment + if not vca_deployed["member-vnf-index"]: + deploy_params["ns_config_info"] = json.dumps( + self._get_ns_config_info(nsr_id) + ) + # TODO check if already done + primitive_params_ = self._map_primitive_params( + initial_config_primitive, {}, deploy_params + ) + + step = "execute primitive '{}' params '{}'".format( + initial_config_primitive["name"], primitive_params_ + ) + self.logger.debug(logging_text + step) + await self.vca_map[vca_type].exec_primitive( + ee_id=ee_id, + primitive_name=initial_config_primitive["name"], + params_dict=primitive_params_, + db_dict=db_dict, + vca_id=vca_id, + vca_type=vca_type, + ) + # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives + if check_if_terminated_needed: + if config_descriptor.get("terminate-config-primitive"): + self.update_db_2( + "nsrs", nsr_id, {db_update_entry + "needed_terminate": True} + ) + check_if_terminated_needed = False + + # TODO register in database that primitive is done + + # STEP 7 Configure metrics + # Not sure if this need to be done when healing + """ + if vca_type == "helm" or vca_type == "helm-v3": + prometheus_jobs = await self.extract_prometheus_scrape_jobs( + ee_id=ee_id, + artifact_path=artifact_path, + ee_config_descriptor=ee_config_descriptor, + vnfr_id=vnfr_id, + nsr_id=nsr_id, + target_ip=rw_mgmt_ip, + ) + if prometheus_jobs: + self.update_db_2( + "nsrs", + nsr_id, + {db_update_entry + "prometheus_jobs": prometheus_jobs}, + ) + + for job in prometheus_jobs: + self.db.set_one( + "prometheus_jobs", + {"job_name": job["job_name"]}, + job, + upsert=True, + fail_on_empty=False, + ) + + """ + step = "instantiated at VCA" + self.logger.debug(logging_text + step) + + self._write_configuration_status( + nsr_id=nsr_id, vca_index=vca_index, status="READY" + ) + + except Exception as e: # TODO not use Exception but N2VC exception + # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"}) + if not isinstance( + e, (DbException, N2VCException, LcmException, asyncio.CancelledError) + ): + self.logger.error( + "Exception while {} : {}".format(step, e), exc_info=True + ) + self._write_configuration_status( + nsr_id=nsr_id, vca_index=vca_index, status="BROKEN" + ) + raise LcmException("{} {}".format(step, e)) from e + + async def _wait_heal_ro( + self, + nsr_id, + timeout=600, + ): + start_time = time() + while time() <= start_time + timeout: + db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) + operational_status_ro = db_nsr["_admin"]["deployed"]["RO"]["operational-status"] + self.logger.debug("Wait Heal RO > {}".format(operational_status_ro)) + if operational_status_ro != "healing": + break + await asyncio.sleep(15, loop=self.loop) + else: # timeout_ns_deploy + raise NgRoException("Timeout waiting ns to deploy") -- 2.17.1