X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;ds=sidebyside;f=osm_lcm%2Fns.py;h=6aed3043cc704b2adb8807aa3b30fb273840b4ed;hb=refs%2Fchanges%2F84%2F11984%2F12;hp=5345d78b32a53620ada42f48a7d95e8362991319;hpb=07f4e4cc95d17c613f5557e65781b8d4599ed20f;p=osm%2FLCM.git diff --git a/osm_lcm/ns.py b/osm_lcm/ns.py index 5345d78..6aed304 100644 --- a/osm_lcm/ns.py +++ b/osm_lcm/ns.py @@ -133,7 +133,8 @@ class NsLcm(LcmBase): 10 * 60 ) # timeout for some progress in a primitive execution timeout_migrate = 1800 # default global timeout for migrating vnfs - + timeout_operate = 1800 # default global timeout for migrating vnfs + timeout_verticalscale = 1800 # default global timeout for Vertical Sclaing SUBOPERATION_STATUS_NOT_FOUND = -1 SUBOPERATION_STATUS_NEW = -2 SUBOPERATION_STATUS_SKIP = -3 @@ -897,18 +898,55 @@ class NsLcm(LcmBase): get_iterable(vdur, "interfaces"), lambda iface: iface.get("ns-vld-id") == a_vld["name"], ) + + vld_params = find_in_list( + get_iterable(ns_params, "vld"), + lambda v_vld: v_vld["name"] in (a_vld["name"], a_vld["id"]), + ) if target_vld: + if vnf_params.get("vimAccountId") not in a_vld.get( "vim_info", {} ): + target_vim_network_list = [ + v for _, v in a_vld.get("vim_info").items() + ] + target_vim_network_name = next( + ( + item.get("vim_network_name", "") + for item in target_vim_network_list + ), + "", + ) + target["ns"]["vld"][a_index].get("vim_info").update( { "vim:{}".format(vnf_params["vimAccountId"]): { - "vim_network_name": "" + "vim_network_name": target_vim_network_name, } } ) + if vld_params: + for param in ("vim-network-name", "vim-network-id"): + if vld_params.get(param) and isinstance( + vld_params[param], dict + ): + for vim, vim_net in vld_params[ + param + ].items(): + other_target_vim = "vim:" + vim + populate_dict( + target["ns"]["vld"][a_index].get( + "vim_info" + ), + ( + other_target_vim, + param.replace("-", "_"), + ), + vim_net, + ) + nslcmop_id = db_nslcmop["_id"] target = { "name": db_nsr["name"], @@ -5913,6 +5951,26 @@ class NsLcm(LcmBase): ) ) + elif update_type == "OPERATE_VNF": + vnf_id = db_nslcmop["operationParams"]["operateVnfData"]["vnfInstanceId"] + operation_type = db_nslcmop["operationParams"]["operateVnfData"]["changeStateTo"] + additional_param = db_nslcmop["operationParams"]["operateVnfData"]["additionalParam"] + (result, detailed_status) = await self.rebuild_start_stop( + nsr_id, nslcmop_id, vnf_id, additional_param, operation_type + ) + if result == "FAILED": + nslcmop_operation_state = result + error_description_nslcmop = detailed_status + db_nslcmop_update["detailed-status"] = detailed_status + if not nslcmop_operation_state: + nslcmop_operation_state = "COMPLETED" + self.logger.debug( + logging_text + + " task Done with result {} {}".format( + nslcmop_operation_state, detailed_status + ) + ) + # If nslcmop_operation_state is None, so any operation is not failed. # All operations are executed in overall. if not nslcmop_operation_state: @@ -7202,6 +7260,79 @@ class NsLcm(LcmBase): job["vnfr_id"] = vnfr_id return job_list + async def rebuild_start_stop(self, nsr_id, nslcmop_id, vnf_id, additional_param, operation_type): + logging_text = "Task ns={} {}={} ".format(nsr_id, operation_type, nslcmop_id) + self.logger.info(logging_text + "Enter") + stage = ["Preparing the environment", ""] + # database nsrs record + db_nsr_update = {} + vdu_vim_name = None + vim_vm_id = None + # in case of error, indicates what part of scale was failed to put nsr at error status + start_deploy = time() + try: + db_vnfr = self.db.get_one("vnfrs", {"_id": vnf_id}) + vim_account_id = db_vnfr.get("vim-account-id") + vim_info_key = "vim:" + vim_account_id + vdur = find_in_list( + db_vnfr["vdur"], lambda vdu: vdu["count-index"] == additional_param["count-index"] + ) + if vdur: + vdu_vim_name = vdur["name"] + vim_vm_id = vdur["vim_info"][vim_info_key]["vim_id"] + target_vim, _ = next(k_v for k_v in vdur["vim_info"].items()) + self.logger.info("vdu_vim_name >> {} ".format(vdu_vim_name)) + # wait for any previous tasks in process + stage[1] = "Waiting for previous operations to terminate" + self.logger.info(stage[1]) + await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id) + + stage[1] = "Reading from database." + self.logger.info(stage[1]) + self._write_ns_status( + nsr_id=nsr_id, + ns_state=None, + current_operation=operation_type.upper(), + current_operation_id=nslcmop_id + ) + self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0) + + # read from db: ns + stage[1] = "Getting nsr={} from db.".format(nsr_id) + db_nsr_update["operational-status"] = operation_type + self.update_db_2("nsrs", nsr_id, db_nsr_update) + # Payload for RO + desc = { + operation_type: { + "vim_vm_id": vim_vm_id, + "vnf_id": vnf_id, + "vdu_index": additional_param["count-index"], + "vdu_id": vdur["id"], + "target_vim": target_vim, + "vim_account_id": vim_account_id + } + } + stage[1] = "Sending rebuild request to RO... {}".format(desc) + self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0) + self.logger.info("ro nsr id: {}".format(nsr_id)) + result_dict = await self.RO.operate(nsr_id, desc, operation_type) + self.logger.info("response from RO: {}".format(result_dict)) + action_id = result_dict["action_id"] + await self._wait_ng_ro( + nsr_id, action_id, nslcmop_id, start_deploy, self.timeout_operate + ) + return "COMPLETED", "Done" + except (ROclient.ROClientException, DbException, LcmException) as e: + self.logger.error("Exit Exception {}".format(e)) + exc = e + except asyncio.CancelledError: + self.logger.error("Cancelled Exception while '{}'".format(stage)) + exc = "Operation was cancelled" + except Exception as e: + exc = traceback.format_exc() + self.logger.critical("Exit Exception {} {}".format(type(e).__name__, e), exc_info=True) + return "FAILED", "Error in operate VNF {}".format(exc) + def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str): """ Get VCA Cloud and VCA Cloud Credentials for the VIM account @@ -8244,3 +8375,98 @@ class NsLcm(LcmBase): await asyncio.sleep(15, loop=self.loop) else: # timeout_ns_deploy raise NgRoException("Timeout waiting ns to deploy") + + async def vertical_scale(self, nsr_id, nslcmop_id): + """ + Vertical Scale the VDUs in a NS + + :param: nsr_id: NS Instance ID + :param: nslcmop_id: nslcmop ID of migrate + + """ + # Try to lock HA task here + task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id) + if not task_is_locked_by_me: + return + logging_text = "Task ns={} vertical scale ".format(nsr_id) + self.logger.debug(logging_text + "Enter") + # get all needed from database + db_nslcmop = None + db_nslcmop_update = {} + nslcmop_operation_state = None + db_nsr_update = {} + target = {} + exc = None + # in case of error, indicates what part of scale was failed to put nsr at error status + start_deploy = time() + + try: + # wait for any previous tasks in process + step = "Waiting for previous operations to terminate" + await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id) + + self._write_ns_status( + nsr_id=nsr_id, + ns_state=None, + current_operation="VerticalScale", + current_operation_id=nslcmop_id + ) + step = "Getting nslcmop from database" + self.logger.debug(step + " after having waited for previous tasks to be completed") + db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) + operationParams = db_nslcmop.get("operationParams") + target = {} + target.update(operationParams) + desc = await self.RO.vertical_scale(nsr_id, target) + self.logger.debug("RO return > {}".format(desc)) + action_id = desc["action_id"] + await self._wait_ng_ro( + nsr_id, action_id, nslcmop_id, start_deploy, self.timeout_verticalscale + ) + except (ROclient.ROClientException, DbException, LcmException) as e: + self.logger.error("Exit Exception {}".format(e)) + exc = e + except asyncio.CancelledError: + self.logger.error("Cancelled Exception while '{}'".format(step)) + exc = "Operation was cancelled" + except Exception as e: + exc = traceback.format_exc() + self.logger.critical("Exit Exception {} {}".format(type(e).__name__, e), exc_info=True) + finally: + self._write_ns_status( + nsr_id=nsr_id, + ns_state=None, + current_operation="IDLE", + current_operation_id=None, + ) + if exc: + db_nslcmop_update[ + "detailed-status" + ] = "FAILED {}: {}".format(step, exc) + nslcmop_operation_state = "FAILED" + else: + nslcmop_operation_state = "COMPLETED" + db_nslcmop_update["detailed-status"] = "Done" + db_nsr_update["detailed-status"] = "Done" + + self._write_op_status( + op_id=nslcmop_id, + stage="", + error_message="", + operation_state=nslcmop_operation_state, + other_update=db_nslcmop_update, + ) + if nslcmop_operation_state: + try: + msg = { + "nsr_id": nsr_id, + "nslcmop_id": nslcmop_id, + "operationState": nslcmop_operation_state, + } + await self.msg.aiowrite("ns", "verticalscaled", msg, loop=self.loop) + except Exception as e: + self.logger.error( + logging_text + "kafka_write notification Exception {}".format(e) + ) + self.logger.debug(logging_text + "Exit") + self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_verticalscale")