task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id))
self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task)
return
+ elif command == "migrate":
+ nslcmop = params
+ nslcmop_id = nslcmop["_id"]
+ nsr_id = nslcmop["nsInstanceId"]
+ task = asyncio.ensure_future(self.ns.migrate(nsr_id, nslcmop_id))
+ self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_migrate", task)
+ return
elif command == "show":
nsr_id = params
try:
"scaled",
"actioned",
"updated",
+ "migrated",
): # "scaled-cooldown-time"
return
elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
except asyncio.TimeoutError:
raise NgRoException("Timeout", http_code=504)
+ async def migrate(self, nsr_id, target):
+ """
+ Performs migration of VNFs
+ :param nsr_id: NS Instance Id
+ :param target: payload data for migrate operation
+ :return: dictionary with the information or raises NgRoException on Error
+ """
+ try:
+ if isinstance(target, str):
+ target = self._parse_yaml(target)
+ payload_req = yaml.safe_dump(target)
+
+ url = "{}/ns/v1/migrate/{nsr_id}".format(self.endpoint_url, nsr_id=nsr_id)
+ async with aiohttp.ClientSession(loop=self.loop) as session:
+ self.logger.debug("NG-RO POST %s %s", url, payload_req)
+ # timeout = aiohttp.ClientTimeout(total=self.timeout_large)
+ async with session.post(
+ url, headers=self.headers_req, data=payload_req
+ ) as response:
+ response_text = await response.read()
+ self.logger.debug(
+ "POST {} [{}] {}".format(
+ url, response.status, response_text[:100]
+ )
+ )
+ if response.status >= 300:
+ raise NgRoException(response_text, http_code=response.status)
+ return self._parse_yaml(response_text, response=True)
+ except (aiohttp.ClientOSError, aiohttp.ClientError) as e:
+ raise NgRoException(e, http_code=504)
+ except asyncio.TimeoutError:
+ raise NgRoException("Timeout", http_code=504)
+
async def status(self, nsr_id, action_id):
try:
url = "{}/ns/v1/deploy/{nsr_id}/{action_id}".format(
timeout_progress_primitive = (
10 * 60
) # timeout for some progress in a primitive execution
+ timeout_migrate = 1800 # default global timeout for migrating vnfs
SUBOPERATION_STATUS_NOT_FOUND = -1
SUBOPERATION_STATUS_NEW = -2
"""
config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")
+
+ async def migrate(self, nsr_id, nslcmop_id):
+ """
+ Migrate VNFs and VDUs instances in a NS
+
+ :param: nsr_id: NS Instance ID
+ :param: nslcmop_id: nslcmop ID of migrate
+
+ """
+ # Try to lock HA task here
+ task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
+ if not task_is_locked_by_me:
+ return
+ logging_text = "Task ns={} migrate ".format(nsr_id)
+ self.logger.debug(logging_text + "Enter")
+ # get all needed from database
+ db_nslcmop = None
+ db_nslcmop_update = {}
+ nslcmop_operation_state = None
+ db_nsr_update = {}
+ target = {}
+ exc = None
+ # in case of error, indicates what part of scale was failed to put nsr at error status
+ start_deploy = time()
+
+ try:
+ # wait for any previous tasks in process
+ step = "Waiting for previous operations to terminate"
+ await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
+
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=None,
+ current_operation="MIGRATING",
+ current_operation_id=nslcmop_id
+ )
+ step = "Getting nslcmop from database"
+ self.logger.debug(step + " after having waited for previous tasks to be completed")
+ db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ migrate_params = db_nslcmop.get("operationParams")
+
+ target = {}
+ target.update(migrate_params)
+ desc = await self.RO.migrate(nsr_id, target)
+ self.logger.debug("RO return > {}".format(desc))
+ action_id = desc["action_id"]
+ await self._wait_ng_ro(
+ nsr_id, action_id, nslcmop_id, start_deploy, self.timeout_migrate
+ )
+ except (ROclient.ROClientException, DbException, LcmException) as e:
+ self.logger.error("Exit Exception {}".format(e))
+ exc = e
+ except asyncio.CancelledError:
+ self.logger.error("Cancelled Exception while '{}'".format(step))
+ exc = "Operation was cancelled"
+ except Exception as e:
+ exc = traceback.format_exc()
+ self.logger.critical("Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
+ finally:
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=None,
+ current_operation="IDLE",
+ current_operation_id=None,
+ )
+ if exc:
+ db_nslcmop_update[
+ "detailed-status"
+ ] = "FAILED {}: {}".format(step, exc)
+ nslcmop_operation_state = "FAILED"
+ else:
+ nslcmop_operation_state = "COMPLETED"
+ db_nslcmop_update["detailed-status"] = "Done"
+ db_nsr_update["detailed-status"] = "Done"
+
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage="",
+ error_message="",
+ operation_state=nslcmop_operation_state,
+ other_update=db_nslcmop_update,
+ )
+ if nslcmop_operation_state:
+ try:
+ msg = {
+ "nsr_id": nsr_id,
+ "nslcmop_id": nslcmop_id,
+ "operationState": nslcmop_operation_state,
+ }
+ await self.msg.aiowrite("ns", "migrated", msg, loop=self.loop)
+ except Exception as e:
+ self.logger.error(
+ logging_text + "kafka_write notification Exception {}".format(e)
+ )
+ self.logger.debug(logging_text + "Exit")
+ self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_migrate")