timeout_progress_primitive = (
10 * 60
) # timeout for some progress in a primitive execution
+ timeout_migrate = 1800 # default global timeout for migrating vnfs
SUBOPERATION_STATUS_NOT_FOUND = -1
SUBOPERATION_STATUS_NEW = -2
cluster_uuid=kdu.get("k8scluster-uuid"),
kdu_instance=kdu_instance,
vca_id=vca_id,
+ namespace=kdu.get("namespace"),
)
)
else:
"""
config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")
+
+ async def migrate(self, nsr_id, nslcmop_id):
+ """
+ Migrate VNFs and VDUs instances in a NS
+
+ :param: nsr_id: NS Instance ID
+ :param: nslcmop_id: nslcmop ID of migrate
+
+ """
+ # Try to lock HA task here
+ task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
+ if not task_is_locked_by_me:
+ return
+ logging_text = "Task ns={} migrate ".format(nsr_id)
+ self.logger.debug(logging_text + "Enter")
+ # get all needed from database
+ db_nslcmop = None
+ db_nslcmop_update = {}
+ nslcmop_operation_state = None
+ db_nsr_update = {}
+ target = {}
+ exc = None
+ # in case of error, indicates what part of scale was failed to put nsr at error status
+ start_deploy = time()
+
+ try:
+ # wait for any previous tasks in process
+ step = "Waiting for previous operations to terminate"
+ await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
+
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=None,
+ current_operation="MIGRATING",
+ current_operation_id=nslcmop_id
+ )
+ step = "Getting nslcmop from database"
+ self.logger.debug(step + " after having waited for previous tasks to be completed")
+ db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ migrate_params = db_nslcmop.get("operationParams")
+
+ target = {}
+ target.update(migrate_params)
+ desc = await self.RO.migrate(nsr_id, target)
+ self.logger.debug("RO return > {}".format(desc))
+ action_id = desc["action_id"]
+ await self._wait_ng_ro(
+ nsr_id, action_id, nslcmop_id, start_deploy, self.timeout_migrate
+ )
+ except (ROclient.ROClientException, DbException, LcmException) as e:
+ self.logger.error("Exit Exception {}".format(e))
+ exc = e
+ except asyncio.CancelledError:
+ self.logger.error("Cancelled Exception while '{}'".format(step))
+ exc = "Operation was cancelled"
+ except Exception as e:
+ exc = traceback.format_exc()
+ self.logger.critical("Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
+ finally:
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=None,
+ current_operation="IDLE",
+ current_operation_id=None,
+ )
+ if exc:
+ db_nslcmop_update[
+ "detailed-status"
+ ] = "FAILED {}: {}".format(step, exc)
+ nslcmop_operation_state = "FAILED"
+ else:
+ nslcmop_operation_state = "COMPLETED"
+ db_nslcmop_update["detailed-status"] = "Done"
+ db_nsr_update["detailed-status"] = "Done"
+
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage="",
+ error_message="",
+ operation_state=nslcmop_operation_state,
+ other_update=db_nslcmop_update,
+ )
+ if nslcmop_operation_state:
+ try:
+ msg = {
+ "nsr_id": nsr_id,
+ "nslcmop_id": nslcmop_id,
+ "operationState": nslcmop_operation_state,
+ }
+ await self.msg.aiowrite("ns", "migrated", msg, loop=self.loop)
+ except Exception as e:
+ self.logger.error(
+ logging_text + "kafka_write notification Exception {}".format(e)
+ )
+ self.logger.debug(logging_text + "Exit")
+ self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_migrate")