X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Fns.py;h=8a99e91de1b8cc7c37559decc175da45c80f54d6;hb=refs%2Fchanges%2F14%2F12514%2F2;hp=2e9c1bcc5fc79b0cea9e601833a3458200d7ddee;hpb=b827de931cbe2452bcca76b8b24371b4a8afe2aa;p=osm%2FLCM.git diff --git a/osm_lcm/ns.py b/osm_lcm/ns.py index 2e9c1bc..8a99e91 100644 --- a/osm_lcm/ns.py +++ b/osm_lcm/ns.py @@ -30,6 +30,7 @@ from jinja2 import ( TemplateNotFound, StrictUndefined, UndefinedError, + select_autoescape, ) from osm_lcm import ROclient @@ -134,6 +135,7 @@ class NsLcm(LcmBase): ) # timeout for some progress in a primitive execution timeout_migrate = 1800 # default global timeout for migrating vnfs timeout_operate = 1800 # default global timeout for migrating vnfs + timeout_verticalscale = 1800 # default global timeout for Vertical Sclaing SUBOPERATION_STATUS_NOT_FOUND = -1 SUBOPERATION_STATUS_NEW = -2 SUBOPERATION_STATUS_SKIP = -3 @@ -224,6 +226,8 @@ class NsLcm(LcmBase): "termination": self.RO.status, "migrate": self.RO.status, "healing": self.RO.recreate_status, + "verticalscale": self.RO.status, + "start_stop_rebuild": self.RO.status, } @staticmethod @@ -290,7 +294,6 @@ class NsLcm(LcmBase): # vcaStatus db_dict = dict() db_dict["vcaStatus"] = status_dict - await self.n2vc.update_vca_status(db_dict["vcaStatus"], vca_id=vca_id) # update configurationStatus for this VCA try: @@ -398,15 +401,6 @@ class NsLcm(LcmBase): db_dict = dict() db_dict["vcaStatus"] = {nsr_id: vca_status} - if cluster_type in ("juju-bundle", "juju"): - # TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA - # status in a similar way between Juju Bundles and Helm Charts on this side - await self.k8sclusterjuju.update_vca_status( - db_dict["vcaStatus"], - kdu_instance, - vca_id=vca_id, - ) - self.logger.debug( f"Obtained VCA status for cluster type '{cluster_type}': {vca_status}" ) @@ -421,7 +415,10 @@ class NsLcm(LcmBase): @staticmethod def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id): try: - env = Environment(undefined=StrictUndefined) + env = Environment( + undefined=StrictUndefined, + autoescape=select_autoescape(default_for_string=True, default=True), + ) template = env.from_string(cloud_init_text) return template.render(additional_params or {}) except UndefinedError as e: @@ -897,18 +894,55 @@ class NsLcm(LcmBase): get_iterable(vdur, "interfaces"), lambda iface: iface.get("ns-vld-id") == a_vld["name"], ) + + vld_params = find_in_list( + get_iterable(ns_params, "vld"), + lambda v_vld: v_vld["name"] in (a_vld["name"], a_vld["id"]), + ) if target_vld: + if vnf_params.get("vimAccountId") not in a_vld.get( "vim_info", {} ): + target_vim_network_list = [ + v for _, v in a_vld.get("vim_info").items() + ] + target_vim_network_name = next( + ( + item.get("vim_network_name", "") + for item in target_vim_network_list + ), + "", + ) + target["ns"]["vld"][a_index].get("vim_info").update( { "vim:{}".format(vnf_params["vimAccountId"]): { - "vim_network_name": "" + "vim_network_name": target_vim_network_name, } } ) + if vld_params: + for param in ("vim-network-name", "vim-network-id"): + if vld_params.get(param) and isinstance( + vld_params[param], dict + ): + for vim, vim_net in vld_params[ + param + ].items(): + other_target_vim = "vim:" + vim + populate_dict( + target["ns"]["vld"][a_index].get( + "vim_info" + ), + ( + other_target_vim, + param.replace("-", "_"), + ), + vim_net, + ) + nslcmop_id = db_nslcmop["_id"] target = { "name": db_nsr["name"], @@ -3701,10 +3735,16 @@ class NsLcm(LcmBase): self.logger.debug( logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id) ) + + charm_name = "" + get_charm_name = False if "execution-environment-list" in descriptor_config: ee_list = descriptor_config.get("execution-environment-list", []) elif "juju" in descriptor_config: ee_list = [descriptor_config] # ns charms + if "execution-environment-list" not in descriptor_config: + # charm name is only required for ns charms + get_charm_name = True else: # other types as script are not supported ee_list = [] @@ -3718,6 +3758,8 @@ class NsLcm(LcmBase): ee_descriptor_id = ee_item.get("id") if ee_item.get("juju"): vca_name = ee_item["juju"].get("charm") + if get_charm_name: + charm_name = self.find_charm_name(db_nsr, str(vca_name)) vca_type = ( "lxc_proxy_charm" if ee_item["juju"].get("charm") is not None @@ -3776,6 +3818,7 @@ class NsLcm(LcmBase): "vdu_name": vdu_name, "type": vca_type, "ee_descriptor_id": ee_descriptor_id, + "charm_name": charm_name, } vca_index += 1 @@ -4947,9 +4990,7 @@ class NsLcm(LcmBase): break except asyncio.CancelledError: raise - except Exception as e: # asyncio.TimeoutError - if isinstance(e, asyncio.TimeoutError): - e = "Timeout" + except Exception as e: retries -= 1 if retries >= 0: self.logger.debug( @@ -4960,7 +5001,9 @@ class NsLcm(LcmBase): # wait and retry await asyncio.sleep(retries_interval, loop=self.loop) else: - return "FAILED", str(e) + if isinstance(e, asyncio.TimeoutError): + e = N2VCException(message="Timed out waiting for action to complete") + return "FAILED", getattr(e, 'message', repr(e)) return "COMPLETED", output @@ -5295,7 +5338,7 @@ class NsLcm(LcmBase): ) self.logger.debug( logging_text - + " task Done with result {} {}".format( + + "Done with result {} {}".format( nslcmop_operation_state, detailed_status ) ) @@ -7236,13 +7279,17 @@ class NsLcm(LcmBase): db_vnfr = self.db.get_one("vnfrs", {"_id": vnf_id}) vim_account_id = db_vnfr.get("vim-account-id") vim_info_key = "vim:" + vim_account_id + vdu_id = additional_param["vdu_id"] + vdurs = [item for item in db_vnfr["vdur"] if item["vdu-id-ref"] == vdu_id] vdur = find_in_list( - db_vnfr["vdur"], lambda vdu: vdu["count-index"] == additional_param["count-index"] + vdurs, lambda vdu: vdu["count-index"] == additional_param["count-index"] ) if vdur: vdu_vim_name = vdur["name"] vim_vm_id = vdur["vim_info"][vim_info_key]["vim_id"] target_vim, _ = next(k_v for k_v in vdur["vim_info"].items()) + else: + raise LcmException("Target vdu is not found") self.logger.info("vdu_vim_name >> {} ".format(vdu_vim_name)) # wait for any previous tasks in process stage[1] = "Waiting for previous operations to terminate" @@ -7281,7 +7328,8 @@ class NsLcm(LcmBase): self.logger.info("response from RO: {}".format(result_dict)) action_id = result_dict["action_id"] await self._wait_ng_ro( - nsr_id, action_id, nslcmop_id, start_deploy, self.timeout_operate + nsr_id, action_id, nslcmop_id, start_deploy, + self.timeout_operate, None, "start_stop_rebuild", ) return "COMPLETED", "Done" except (ROclient.ROClientException, DbException, LcmException) as e: @@ -7517,7 +7565,17 @@ class NsLcm(LcmBase): member_vnf_index = db_vnfr.get("member-vnf-index-ref") # Check each target VDU and deploy N2VC - for target_vdu in target_vnf["additionalParams"].get("vdu", None): + target_vdu_list = target_vnf.get("additionalParams", {}).get("vdu", []) + if not target_vdu_list: + # Codigo nuevo para crear diccionario + target_vdu_list = [] + for existing_vdu in db_vnfr.get("vdur"): + vdu_name = existing_vdu.get("vdu-name", None) + vdu_index = existing_vdu.get("count-index", 0) + vdu_run_day1 = target_vnf.get("additionalParams", {}).get("run-day1", False) + vdu_to_be_healed = {"vdu-id": vdu_name, "count-index": vdu_index, "run-day1": vdu_run_day1} + target_vdu_list.append(vdu_to_be_healed) + for target_vdu in target_vdu_list: deploy_params_vdu = target_vdu # Set run-day1 vnf level value if not vdu level value exists if not deploy_params_vdu.get("run-day1") and target_vnf["additionalParams"].get("run-day1"): @@ -7793,10 +7851,16 @@ class NsLcm(LcmBase): self.logger.debug( logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id) ) + + charm_name = "" + get_charm_name = False if "execution-environment-list" in descriptor_config: ee_list = descriptor_config.get("execution-environment-list", []) elif "juju" in descriptor_config: ee_list = [descriptor_config] # ns charms + if "execution-environment-list" not in descriptor_config: + # charm name is only required for ns charms + get_charm_name = True else: # other types as script are not supported ee_list = [] @@ -7810,6 +7874,8 @@ class NsLcm(LcmBase): ee_descriptor_id = ee_item.get("id") if ee_item.get("juju"): vca_name = ee_item["juju"].get("charm") + if get_charm_name: + charm_name = self.find_charm_name(db_nsr, str(vca_name)) vca_type = ( "lxc_proxy_charm" if ee_item["juju"].get("charm") is not None @@ -7868,6 +7934,7 @@ class NsLcm(LcmBase): "vdu_name": vdu_name, "type": vca_type, "ee_descriptor_id": ee_descriptor_id, + "charm_name": charm_name, } vca_index += 1 @@ -8337,3 +8404,99 @@ class NsLcm(LcmBase): await asyncio.sleep(15, loop=self.loop) else: # timeout_ns_deploy raise NgRoException("Timeout waiting ns to deploy") + + async def vertical_scale(self, nsr_id, nslcmop_id): + """ + Vertical Scale the VDUs in a NS + + :param: nsr_id: NS Instance ID + :param: nslcmop_id: nslcmop ID of migrate + + """ + # Try to lock HA task here + task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id) + if not task_is_locked_by_me: + return + logging_text = "Task ns={} vertical scale ".format(nsr_id) + self.logger.debug(logging_text + "Enter") + # get all needed from database + db_nslcmop = None + db_nslcmop_update = {} + nslcmop_operation_state = None + db_nsr_update = {} + target = {} + exc = None + # in case of error, indicates what part of scale was failed to put nsr at error status + start_deploy = time() + + try: + # wait for any previous tasks in process + step = "Waiting for previous operations to terminate" + await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id) + + self._write_ns_status( + nsr_id=nsr_id, + ns_state=None, + current_operation="VerticalScale", + current_operation_id=nslcmop_id + ) + step = "Getting nslcmop from database" + self.logger.debug(step + " after having waited for previous tasks to be completed") + db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) + operationParams = db_nslcmop.get("operationParams") + target = {} + target.update(operationParams) + desc = await self.RO.vertical_scale(nsr_id, target) + self.logger.debug("RO return > {}".format(desc)) + action_id = desc["action_id"] + await self._wait_ng_ro( + nsr_id, action_id, nslcmop_id, start_deploy, self.timeout_verticalscale, + operation="verticalscale" + ) + except (ROclient.ROClientException, DbException, LcmException) as e: + self.logger.error("Exit Exception {}".format(e)) + exc = e + except asyncio.CancelledError: + self.logger.error("Cancelled Exception while '{}'".format(step)) + exc = "Operation was cancelled" + except Exception as e: + exc = traceback.format_exc() + self.logger.critical("Exit Exception {} {}".format(type(e).__name__, e), exc_info=True) + finally: + self._write_ns_status( + nsr_id=nsr_id, + ns_state=None, + current_operation="IDLE", + current_operation_id=None, + ) + if exc: + db_nslcmop_update[ + "detailed-status" + ] = "FAILED {}: {}".format(step, exc) + nslcmop_operation_state = "FAILED" + else: + nslcmop_operation_state = "COMPLETED" + db_nslcmop_update["detailed-status"] = "Done" + db_nsr_update["detailed-status"] = "Done" + + self._write_op_status( + op_id=nslcmop_id, + stage="", + error_message="", + operation_state=nslcmop_operation_state, + other_update=db_nslcmop_update, + ) + if nslcmop_operation_state: + try: + msg = { + "nsr_id": nsr_id, + "nslcmop_id": nslcmop_id, + "operationState": nslcmop_operation_state, + } + await self.msg.aiowrite("ns", "verticalscaled", msg, loop=self.loop) + except Exception as e: + self.logger.error( + logging_text + "kafka_write notification Exception {}".format(e) + ) + self.logger.debug(logging_text + "Exit") + self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_verticalscale")