timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
timeout_charm_delete = 10 * 60
timeout_primitive = 10 * 60 # timeout for primitive execution
+ timeout_progress_primitive = 2 * 60 # timeout for some progress in a primitive execution
SUBOPERATION_STATUS_NOT_FOUND = -1
SUBOPERATION_STATUS_NEW = -2
# await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
# self.logger.debug(logging_text + "Deployed at VIM")
except (ROclient.ROClientException, LcmException, DbException) as e:
- stage[2] = "ERROR deployig at VIM"
+ stage[2] = "ERROR deploying at VIM"
self.set_vnfr_at_error(db_vnfrs, str(e))
raise
step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
kdur["kdu-name"], k8s_cluster_id)
- k8s_instace_info = {"kdu-instance": None, "k8scluster-uuid": cluster_uuid,
+ k8s_instace_info = {"kdu-instance": None,
+ "k8scluster-uuid": cluster_uuid,
"k8scluster-type": k8sclustertype,
- "kdu-name": kdur["kdu-name"], "kdu-model": kdumodel}
+ "member-vnf-index": vnfr_data["member-vnf-index-ref"],
+ "kdu-name": kdur["kdu-name"],
+ "kdu-model": kdumodel}
db_nsr_update["_admin.deployed.K8s.{}".format(index)] = k8s_instace_info
self.update_db_2("nsrs", nsr_id, db_nsr_update)
async def _delete_N2VC(self, nsr_id: str):
self._write_all_config_status(nsr_id=nsr_id, status='TERMINATING')
namespace = "." + nsr_id
- await self.n2vc.delete_namespace(namespace=namespace)
+ await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete)
self._write_all_config_status(nsr_id=nsr_id, status='DELETED')
async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
stage[1] = "Deleting all execution environments."
self.logger.debug(logging_text + stage[1])
- task_delete_ee = asyncio.ensure_future(self._delete_N2VC(nsr_id=nsr_id))
+ task_delete_ee = asyncio.ensure_future(asyncio.wait_for(self._delete_N2VC(nsr_id=nsr_id),
+ timeout=self.timeout_charm_delete))
# task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
tasks_dict_info[task_delete_ee] = "Terminating all VCA"
break
for task in done:
if task.cancelled():
- new_error = created_tasks_info[task] + ": Cancelled"
- self.logger.warn(logging_text + new_error)
- error_detail_list.append(new_error)
- error_list.append(new_error)
+ exc = "Cancelled"
else:
exc = task.exception()
- if exc:
- new_error = created_tasks_info[task] + ": {}".format(exc)
- error_list.append(created_tasks_info[task])
- error_detail_list.append(new_error)
- if isinstance(exc, (DbException, N2VCException, ROclient.ROClientException, LcmException)):
- self.logger.error(logging_text + new_error)
- else:
- exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
- self.logger.error(logging_text + created_tasks_info[task] + exc_traceback)
+ if exc:
+ if isinstance(exc, asyncio.TimeoutError):
+ exc = "Timeout"
+ new_error = created_tasks_info[task] + ": {}".format(exc)
+ error_list.append(created_tasks_info[task])
+ error_detail_list.append(new_error)
+ if isinstance(exc, (str, DbException, N2VCException, ROclient.ROClientException, LcmException)):
+ self.logger.error(logging_text + new_error)
else:
- self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
+ exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
+ self.logger.error(logging_text + created_tasks_info[task] + exc_traceback)
+ else:
+ self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
stage[1] = "{}/{}.".format(num_done, num_tasks)
if new_error:
stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
return calculated_params
- def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_name, vdu_count_index, kdu_name=None):
+ def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_count_index, kdu_name=None):
# find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
for vca in deployed_vca:
if not vca:
continue
if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
continue
- if vdu_name and vdu_name != vca["vdu_name"]:
- continue
if vdu_count_index is not None and vdu_count_index != vca["vdu_count_index"]:
continue
if kdu_name and kdu_name != vca["kdu_name"]:
break
else:
# vca_deployed not found
- raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not "
- "deployed".format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
+ raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} is not "
+ "deployed".format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
# get ee_id
ee_id = vca.get("ee_id")
if not ee_id:
- raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not "
+ raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
"execution environment"
- .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
+ .format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
return ee_id
async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0,
- retries_interval=30) -> (str, str):
+ retries_interval=30, timeout=None) -> (str, str):
try:
if primitive == "config":
primitive_params = {"params": primitive_params}
while retries >= 0:
try:
- output = await self.n2vc.exec_primitive(
- ee_id=ee_id,
- primitive_name=primitive,
- params_dict=primitive_params
- )
+ output = await asyncio.wait_for(
+ self.n2vc.exec_primitive(
+ ee_id=ee_id,
+ primitive_name=primitive,
+ params_dict=primitive_params,
+ progress_timeout=self.timeout_progress_primitive,
+ total_timeout=self.timeout_primitive),
+ timeout=timeout or self.timeout_primitive)
# execution was OK
break
- except Exception as e:
+ except asyncio.CancelledError:
+ raise
+ except Exception as e: # asyncio.TimeoutError
+ if isinstance(e, asyncio.TimeoutError):
+ e = "Timeout"
retries -= 1
if retries >= 0:
self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
# wait and retry
await asyncio.sleep(retries_interval, loop=self.loop)
else:
- return 'FAIL', 'Cannot execute action {} on {}: {}'.format(primitive, ee_id, e)
+ return 'FAILED', str(e)
return 'COMPLETED', output
- except LcmException:
+ except (LcmException, asyncio.CancelledError):
raise
except Exception as e:
return 'FAIL', 'Error executing action {}: {}'.format(primitive, e)
db_nsr_update = {}
db_nslcmop_update = {}
nslcmop_operation_state = None
- nslcmop_operation_state_detail = None
+ error_description_nslcmop = None
exc = None
try:
# wait for any previous tasks in process
vdu_id = db_nslcmop["operationParams"].get("vdu_id")
kdu_name = db_nslcmop["operationParams"].get("kdu_name")
vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
- vdu_name = db_nslcmop["operationParams"].get("vdu_name")
+ primitive = db_nslcmop["operationParams"]["primitive"]
+ primitive_params = db_nslcmop["operationParams"]["primitive_params"]
+ timeout_ns_action = db_nslcmop["operationParams"].get("timeout_ns_action", self.timeout_primitive)
if vnf_index:
step = "Getting vnfr from database"
step = "Getting vnfd from database"
db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
else:
- if db_nsr.get("nsd"):
- db_nsd = db_nsr.get("nsd") # TODO this will be removed
- else:
- step = "Getting nsd from database"
- db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
+ step = "Getting nsd from database"
+ db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
# for backward compatibility
if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
self.update_db_2("nsrs", nsr_id, db_nsr_update)
- primitive = db_nslcmop["operationParams"]["primitive"]
- primitive_params = db_nslcmop["operationParams"]["primitive_params"]
-
# look for primitive
config_primitive_desc = None
if vdu_id:
for vdu in get_iterable(db_vnfd, "vdu"):
if vdu_id == vdu["id"]:
- for config_primitive in vdu.get("vdu-configuration", {}).get("config-primitive", ()):
+ for config_primitive in deep_get(vdu, ("vdu-configuration", "config-primitive"), ()):
if config_primitive["name"] == primitive:
config_primitive_desc = config_primitive
break
+ break
elif kdu_name:
- self.logger.debug(logging_text + "Checking actions in KDUs")
- kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
- desc_params = self._format_additional_params(kdur.get("additionalParams")) or {}
- if primitive_params:
- desc_params.update(primitive_params)
- # TODO Check if we will need something at vnf level
- index = 0
- for kdu in get_iterable(nsr_deployed, "K8s"):
- if kdu_name == kdu["kdu-name"]:
- db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id},
- "path": "_admin.deployed.K8s.{}".format(index)}
- if primitive == "upgrade":
- if desc_params.get("kdu_model"):
- kdu_model = desc_params.get("kdu_model")
- del desc_params["kdu_model"]
- else:
- kdu_model = kdu.get("kdu-model")
- parts = kdu_model.split(sep=":")
- if len(parts) == 2:
- kdu_model = parts[0]
-
- if kdu.get("k8scluster-type") in self.k8scluster_map:
- output = await self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
- cluster_uuid=kdu.get("k8scluster-uuid"),
- kdu_instance=kdu.get("kdu-instance"),
- atomic=True, kdu_model=kdu_model,
- params=desc_params, db_dict=db_dict,
- timeout=300)
-
- else:
- msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
- raise LcmException(msg)
-
- self.logger.debug(logging_text + " Upgrade of kdu {} done".format(output))
- break
- elif primitive == "rollback":
- if kdu.get("k8scluster-type") in self.k8scluster_map:
- output = await self.k8scluster_map[kdu["k8scluster-type"]].rollback(
- cluster_uuid=kdu.get("k8scluster-uuid"),
- kdu_instance=kdu.get("kdu-instance"),
- db_dict=db_dict)
- else:
- msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
- raise LcmException(msg)
- break
- elif primitive == "status":
- if kdu.get("k8scluster-type") in self.k8scluster_map:
- output = await self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
- cluster_uuid=kdu.get("k8scluster-uuid"),
- kdu_instance=kdu.get("kdu-instance"))
- else:
- msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
- raise LcmException(msg)
- break
- index += 1
-
- else:
- raise LcmException("KDU '{}' not found".format(kdu_name))
- if output:
- db_nslcmop_update["detailed-status"] = output
- db_nslcmop_update["operationState"] = 'COMPLETED'
- db_nslcmop_update["statusEnteredTime"] = time()
- else:
- db_nslcmop_update["detailed-status"] = ''
- db_nslcmop_update["operationState"] = 'FAILED'
- db_nslcmop_update["statusEnteredTime"] = time()
- return
+ for kdu in get_iterable(db_vnfd, "kdu"):
+ if kdu_name == kdu["name"]:
+ for config_primitive in deep_get(kdu, ("kdu-configuration", "config-primitive"), ()):
+ if config_primitive["name"] == primitive:
+ config_primitive_desc = config_primitive
+ break
+ break
elif vnf_index:
- for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
+ for config_primitive in deep_get(db_vnfd, ("vnf-configuration", "config-primitive"), ()):
if config_primitive["name"] == primitive:
config_primitive_desc = config_primitive
break
else:
- for config_primitive in db_nsd.get("ns-configuration", {}).get("config-primitive", ()):
+ for config_primitive in deep_get(db_nsd, ("ns-configuration", "config-primitive"), ()):
if config_primitive["name"] == primitive:
config_primitive_desc = config_primitive
break
- if not config_primitive_desc:
+ if not config_primitive_desc and not (kdu_name and primitive in ("upgrade", "rollback", "status")):
raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
format(primitive))
- desc_params = {}
if vnf_index:
- if db_vnfr.get("additionalParamsForVnf"):
- desc_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"])
if vdu_id:
vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
- if vdur.get("additionalParams"):
- desc_params = self._format_additional_params(vdur["additionalParams"])
+ desc_params = self._format_additional_params(vdur.get("additionalParams"))
+ elif kdu_name:
+ kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
+ desc_params = self._format_additional_params(kdur.get("additionalParams"))
+ else:
+ desc_params = self._format_additional_params(db_vnfr.get("additionalParamsForVnf"))
else:
- if db_nsr.get("additionalParamsForNs"):
- desc_params.update(self._format_additional_params(db_nsr["additionalParamsForNs"]))
+ desc_params = self._format_additional_params(db_nsr.get("additionalParamsForNs"))
# TODO check if ns is in a proper status
- result, detailed_status = await self._ns_execute_primitive(
- self._look_for_deployed_vca(nsr_deployed["VCA"],
- member_vnf_index=vnf_index,
- vdu_id=vdu_id,
- vdu_name=vdu_name,
- vdu_count_index=vdu_count_index),
- primitive=primitive,
- primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params))
-
- db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = detailed_status
- db_nslcmop_update["operationState"] = nslcmop_operation_state = result
- db_nslcmop_update["statusEnteredTime"] = time()
- self.logger.debug(logging_text + " task Done with result {} {}".format(result, detailed_status))
+ if kdu_name and primitive in ("upgrade", "rollback", "status"):
+ # kdur and desc_params already set from before
+ if primitive_params:
+ desc_params.update(primitive_params)
+ # TODO Check if we will need something at vnf level
+ for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
+ if kdu_name == kdu["kdu-name"] and kdu["member-vnf-index"] == vnf_index:
+ break
+ else:
+ raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index))
+
+ if kdu.get("k8scluster-type") not in self.k8scluster_map:
+ msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
+ raise LcmException(msg)
+
+ db_dict = {"collection": "nsrs",
+ "filter": {"_id": nsr_id},
+ "path": "_admin.deployed.K8s.{}".format(index)}
+ self.logger.debug(logging_text + "Exec k8s {} on {}.{}".format(primitive, vnf_index, kdu_name))
+ step = "Executing kdu {}".format(primitive)
+ if primitive == "upgrade":
+ if desc_params.get("kdu_model"):
+ kdu_model = desc_params.get("kdu_model")
+ del desc_params["kdu_model"]
+ else:
+ kdu_model = kdu.get("kdu-model")
+ parts = kdu_model.split(sep=":")
+ if len(parts) == 2:
+ kdu_model = parts[0]
+
+ detailed_status = await asyncio.wait_for(
+ self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
+ cluster_uuid=kdu.get("k8scluster-uuid"),
+ kdu_instance=kdu.get("kdu-instance"),
+ atomic=True, kdu_model=kdu_model,
+ params=desc_params, db_dict=db_dict,
+ timeout=timeout_ns_action),
+ timeout=timeout_ns_action + 10)
+ self.logger.debug(logging_text + " Upgrade of kdu {} done".format(detailed_status))
+ elif primitive == "rollback":
+ detailed_status = await asyncio.wait_for(
+ self.k8scluster_map[kdu["k8scluster-type"]].rollback(
+ cluster_uuid=kdu.get("k8scluster-uuid"),
+ kdu_instance=kdu.get("kdu-instance"),
+ db_dict=db_dict),
+ timeout=timeout_ns_action)
+ elif primitive == "status":
+ detailed_status = await asyncio.wait_for(
+ self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
+ cluster_uuid=kdu.get("k8scluster-uuid"),
+ kdu_instance=kdu.get("kdu-instance")),
+ timeout=timeout_ns_action)
+
+ if detailed_status:
+ nslcmop_operation_state = 'COMPLETED'
+ else:
+ detailed_status = ''
+ nslcmop_operation_state = 'FAILED'
+
+ else:
+ nslcmop_operation_state, detailed_status = await self._ns_execute_primitive(
+ self._look_for_deployed_vca(nsr_deployed["VCA"],
+ member_vnf_index=vnf_index,
+ vdu_id=vdu_id,
+ vdu_count_index=vdu_count_index),
+ primitive=primitive,
+ primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params),
+ timeout=timeout_ns_action)
+
+ db_nslcmop_update["detailed-status"] = detailed_status
+ error_description_nslcmop = detailed_status if nslcmop_operation_state == "FAILED" else ""
+ self.logger.debug(logging_text + " task Done with result {} {}".format(nslcmop_operation_state,
+ detailed_status))
return # database update is called inside finally
- except (DbException, LcmException) as e:
+ except (DbException, LcmException, N2VCException) as e:
self.logger.error(logging_text + "Exit Exception {}".format(e))
exc = e
except asyncio.CancelledError:
self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
exc = "Operation was cancelled"
+ except asyncio.TimeoutError:
+ self.logger.error(logging_text + "Timeout while '{}'".format(step))
+ exc = "Timeout"
except Exception as e:
exc = traceback.format_exc()
self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
finally:
- if exc and db_nslcmop:
- db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = \
+ if exc:
+ db_nslcmop_update["detailed-status"] = detailed_status = error_description_nslcmop = \
"FAILED {}: {}".format(step, exc)
- db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
- db_nslcmop_update["statusEnteredTime"] = time()
- try:
- if db_nslcmop_update:
- self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
- if db_nsr:
- self._write_ns_status(
- nsr_id=nsr_id,
- ns_state=None,
- current_operation="IDLE",
- current_operation_id=None,
- other_update=db_nsr_update
- )
- if exc:
- self._write_op_status(
- op_id=nslcmop_id,
- error_message=nslcmop_operation_state_detail
- )
- except DbException as e:
- self.logger.error(logging_text + "Cannot update database: {}".format(e))
- self.logger.debug(logging_text + "Exit")
+ nslcmop_operation_state = "FAILED"
+ if db_nsr:
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=db_nsr["nsState"], # TODO check if degraded. For the moment use previous status
+ current_operation="IDLE",
+ current_operation_id=None,
+ # error_description=error_description_nsr,
+ # error_detail=error_detail,
+ other_update=db_nsr_update
+ )
+
+ if db_nslcmop:
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage="",
+ error_message=error_description_nslcmop,
+ operation_state=nslcmop_operation_state,
+ other_update=db_nslcmop_update,
+ )
+
if nslcmop_operation_state:
try:
await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
self.logger.debug(logging_text + "Exit")
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
- return nslcmop_operation_state, nslcmop_operation_state_detail
+ return nslcmop_operation_state, detailed_status
async def scale(self, nsr_id, nslcmop_id):
self._look_for_deployed_vca(nsr_deployed["VCA"],
member_vnf_index=vnf_index,
vdu_id=None,
- vdu_name=None,
vdu_count_index=None),
vnf_config_primitive, primitive_params)
self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
self._look_for_deployed_vca(nsr_deployed["VCA"],
member_vnf_index=vnf_index,
vdu_id=None,
- vdu_name=None,
vdu_count_index=None),
vnf_config_primitive, primitive_params)
self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(