from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
from osm_lcm import ROclient
+from osm_lcm.ng_ro import NgRoClient, NgRoException
from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
from n2vc.k8s_helm_conn import K8sHelmConnector
from n2vc.k8s_juju_conn import K8sJujuConnector
from osm_common.fsbase import FsException
from n2vc.n2vc_juju_conn import N2VCJujuConnector
-from n2vc.exceptions import N2VCException
+from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
from copy import copy, deepcopy
from http import HTTPStatus
from time import time
from uuid import uuid4
+from functools import partial
-__author__ = "Alfonso Tierno"
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
class NsLcm(LcmBase):
timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
timeout_charm_delete = 10 * 60
- timeout_primitive = 10 * 60 # timeout for primitive execution
+ timeout_primitive = 30 * 60 # timeout for primitive execution
+ timeout_progress_primitive = 10 * 60 # timeout for some progress in a primitive execution
SUBOPERATION_STATUS_NOT_FOUND = -1
SUBOPERATION_STATUS_NEW = -2
SUBOPERATION_STATUS_SKIP = -3
- task_name_deploy_vca = "Deploy VCA"
+ task_name_deploy_vca = "Deploying VCA"
def __init__(self, db, msg, fs, lcm_tasks, config, loop):
"""
self.lcm_tasks = lcm_tasks
self.timeout = config["timeout"]
self.ro_config = config["ro_config"]
+ self.ng_ro = config["ro_config"].get("ng")
self.vca_config = config["VCA"].copy()
# create N2VC connector
on_update_db=None,
)
+ self.k8scluster_map = {
+ "helm-chart": self.k8sclusterhelm,
+ "chart": self.k8sclusterhelm,
+ "juju-bundle": self.k8sclusterjuju,
+ "juju": self.k8sclusterjuju,
+ }
# create RO client
- self.RO = ROclient.ROClient(self.loop, **self.ro_config)
+ if self.ng_ro:
+ self.RO = NgRoClient(self.loop, **self.ro_config)
+ else:
+ self.RO = ROclient.ROClient(self.loop, **self.ro_config)
def _on_update_ro_db(self, nsrs_id, ro_descriptor):
# write to database
self.update_db_2("nsrs", nsr_id, db_dict)
+ except (asyncio.CancelledError, asyncio.TimeoutError):
+ raise
except Exception as e:
self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
- return
-
def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
"""
Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
format(vnfd["id"], vdu["id"], e))
- def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
+ def _ns_params_2_RO(self, ns_params, nsd, vnfd_dict, db_vnfrs, n2vc_key_list):
"""
Creates a RO ns descriptor from OSM ns_instantiate params
:param ns_params: OSM instantiate params
+ :param vnfd_dict: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
+ :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index. {member-vnf-index: {vnfr_object}, ...}
:return: The RO ns descriptor
"""
vim_2_RO = {}
"wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
# "scenario": ns_params["nsdId"],
}
+ # set vim_account of each vnf if different from general vim_account.
+ # Get this information from <vnfr> database content, key vim-account-id
+ # Vim account can be set by placement_engine and it may be different from
+ # the instantiate parameters (vnfs.member-vnf-index.datacenter).
+ for vnf_index, vnfr in db_vnfrs.items():
+ if vnfr.get("vim-account-id") and vnfr["vim-account-id"] != ns_params["vimAccountId"]:
+ populate_dict(RO_ns_params, ("vnfs", vnf_index, "datacenter"), vim_account_2_RO(vnfr["vim-account-id"]))
n2vc_key_list = n2vc_key_list or []
for vnfd_ref, vnfd in vnfd_dict.items():
else:
raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
"constituent-vnfd".format(vnf_params["member-vnf-index"]))
- if vnf_params.get("vimAccountId"):
- populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
- vim_account_2_RO(vnf_params["vimAccountId"]))
for vdu_params in get_iterable(vnf_params, "vdu"):
# TODO feature 1417: check that this VDU exist and it is not a PDU
primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
return primitive_list
+ async def _instantiate_ng_ro(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref,
+ n2vc_key_list, stage, start_deploy, timeout_ns_deploy):
+ nslcmop_id = db_nslcmop["_id"]
+ target = {
+ "name": db_nsr["name"],
+ "ns": {"vld": []},
+ "vnf": [],
+ "image": deepcopy(db_nsr["image"]),
+ "flavor": deepcopy(db_nsr["flavor"]),
+ "action_id": nslcmop_id,
+ }
+ for image in target["image"]:
+ image["vim_info"] = []
+ for flavor in target["flavor"]:
+ flavor["vim_info"] = []
+
+ ns_params = db_nslcmop.get("operationParams")
+ ssh_keys = []
+ if ns_params.get("ssh_keys"):
+ ssh_keys += ns_params.get("ssh_keys")
+ if n2vc_key_list:
+ ssh_keys += n2vc_key_list
+
+ cp2target = {}
+ for vld_index, vld in enumerate(nsd.get("vld")):
+ target_vld = {"id": vld["id"],
+ "name": vld["name"],
+ "mgmt-network": vld.get("mgmt-network", False),
+ "type": vld.get("type"),
+ "vim_info": [{"vim-network-name": vld.get("vim-network-name"),
+ "vim_account_id": ns_params["vimAccountId"]}],
+ }
+ for cp in vld["vnfd-connection-point-ref"]:
+ cp2target["member_vnf:{}.{}".format(cp["member-vnf-index-ref"], cp["vnfd-connection-point-ref"])] = \
+ "nsrs:{}:vld.{}".format(nsr_id, vld_index)
+ target["ns"]["vld"].append(target_vld)
+ for vnfr in db_vnfrs.values():
+ vnfd = db_vnfds_ref[vnfr["vnfd-ref"]]
+ target_vnf = deepcopy(vnfr)
+ for vld in target_vnf.get("vld", ()):
+ # check if connected to a ns.vld
+ vnf_cp = next((cp for cp in vnfd.get("connection-point", ()) if
+ cp.get("internal-vld-ref") == vld["id"]), None)
+ if vnf_cp:
+ ns_cp = "member_vnf:{}.{}".format(vnfr["member-vnf-index-ref"], vnf_cp["id"])
+ if cp2target.get(ns_cp):
+ vld["target"] = cp2target[ns_cp]
+ vld["vim_info"] = [{"vim-network-name": vld.get("vim-network-name"),
+ "vim_account_id": vnfr["vim-account-id"]}]
+
+ for vdur in target_vnf.get("vdur", ()):
+ vdur["vim_info"] = [{"vim_account_id": vnfr["vim-account-id"]}]
+ vdud_index, vdud = next(k for k in enumerate(vnfd["vdu"]) if k[1]["id"] == vdur["vdu-id-ref"])
+ # vdur["additionalParams"] = vnfr.get("additionalParamsForVnf") # TODO additional params for VDU
+
+ if ssh_keys:
+ if deep_get(vdud, ("vdu-configuration", "config-access", "ssh-access", "required")):
+ vdur["ssh-keys"] = ssh_keys
+ vdur["ssh-access-required"] = True
+ elif deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required")) and \
+ any(iface.get("mgmt-vnf") for iface in vdur["interfaces"]):
+ vdur["ssh-keys"] = ssh_keys
+ vdur["ssh-access-required"] = True
+
+ # cloud-init
+ if vdud.get("cloud-init-file"):
+ vdur["cloud-init"] = "{}:file:{}".format(vnfd["_id"], vdud.get("cloud-init-file"))
+ elif vdud.get("cloud-init"):
+ vdur["cloud-init"] = "{}:vdu:{}".format(vnfd["_id"], vdud_index)
+
+ # flavor
+ ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
+ if not next((vi for vi in ns_flavor["vim_info"] if
+ vi and vi.get("vim_account_id") == vnfr["vim-account-id"]), None):
+ ns_flavor["vim_info"].append({"vim_account_id": vnfr["vim-account-id"]})
+ # image
+ ns_image = target["image"][int(vdur["ns-image-id"])]
+ if not next((vi for vi in ns_image["vim_info"] if
+ vi and vi.get("vim_account_id") == vnfr["vim-account-id"]), None):
+ ns_image["vim_info"].append({"vim_account_id": vnfr["vim-account-id"]})
+
+ vdur["vim_info"] = [{"vim_account_id": vnfr["vim-account-id"]}]
+ target["vnf"].append(target_vnf)
+
+ desc = await self.RO.deploy(nsr_id, target)
+ action_id = desc["action_id"]
+ await self._wait_ng_ro(self, nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage)
+
+ # Updating NSR
+ db_nsr_update = {
+ "_admin.deployed.RO.operational-status": "running",
+ "detailed-status": " ".join(stage)
+ }
+ # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self._write_op_status(nslcmop_id, stage)
+ self.logger.debug(logging_text + "ns deployed at RO. RO_id={}".format(action_id))
+ return
+
+ async def _wait_ng_ro(self, nsr_id, action_id, nslcmop_id, start_time, timeout, stage):
+ detailed_status_old = None
+ db_nsr_update = {}
+ while time() <= start_time + timeout:
+ desc_status = await self.RO.status(nsr_id, action_id)
+ if desc_status["status"] == "FAILED":
+ raise NgRoException(desc_status["details"])
+ elif desc_status["status"] == "BUILD":
+ stage[2] = "VIM: ({})".format(desc_status["details"])
+ elif desc_status["status"] == "DONE":
+ stage[2] = "Deployed at VIM"
+ break
+ else:
+ assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status["status"])
+ if stage[2] != detailed_status_old:
+ detailed_status_old = stage[2]
+ db_nsr_update["detailed-status"] = " ".join(stage)
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self._write_op_status(nslcmop_id, stage)
+ await asyncio.sleep(5, loop=self.loop)
+ else: # timeout_ns_deploy
+ raise NgRoException("Timeout waiting ns to deploy")
+
+ async def _terminate_ng_ro(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
+ db_nsr_update = {}
+ failed_detail = []
+ action_id = None
+ start_deploy = time()
+ try:
+ target = {
+ "ns": {"vld": []},
+ "vnf": [],
+ "image": [],
+ "flavor": [],
+ }
+ desc = await self.RO.deploy(nsr_id, target)
+ action_id = desc["action_id"]
+ db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
+ db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
+ self.logger.debug(logging_text + "ns terminate action at RO. action_id={}".format(action_id))
+
+ # wait until done
+ delete_timeout = 20 * 60 # 20 minutes
+ await self._wait_ng_ro(self, nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage)
+
+ db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
+ db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
+ # delete all nsr
+ await self.RO.delete(nsr_id)
+ except Exception as e:
+ if isinstance(e, NgRoException) and e.http_code == 404: # not found
+ db_nsr_update["_admin.deployed.RO.nsr_id"] = None
+ db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
+ db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
+ self.logger.debug(logging_text + "RO_action_id={} already deleted".format(action_id))
+ elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
+ failed_detail.append("delete conflict: {}".format(e))
+ self.logger.debug(logging_text + "RO_action_id={} delete conflict: {}".format(action_id, e))
+ else:
+ failed_detail.append("delete error: {}".format(e))
+ self.logger.error(logging_text + "RO_action_id={} delete error: {}".format(action_id, e))
+
+ if failed_detail:
+ stage[2] = "Error deleting from VIM"
+ else:
+ stage[2] = "Deleted from VIM"
+ db_nsr_update["detailed-status"] = " ".join(stage)
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self._write_op_status(nslcmop_id, stage)
+
+ if failed_detail:
+ raise LcmException("; ".join(failed_detail))
+ return
+
async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref,
n2vc_key_list, stage):
+ """
+ Instantiate at RO
+ :param logging_text: preffix text to use at logging
+ :param nsr_id: nsr identity
+ :param nsd: database content of ns descriptor
+ :param db_nsr: database content of ns record
+ :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
+ :param db_vnfrs:
+ :param db_vnfds_ref: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
+ :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
+ :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
+ :return: None or exception
+ """
try:
db_nsr_update = {}
RO_descriptor_number = 0 # number of descriptors created at RO
# Check for and optionally request placement optimization. Database will be updated if placement activated
stage[2] = "Waiting for Placement."
- await self.do_placement(logging_text, db_nslcmop, db_vnfrs)
+ if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
+ # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
+ for vnfr in db_vnfrs.values():
+ if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
+ break
+ else:
+ ns_params["vimAccountId"] == vnfr["vim-account-id"]
+ if self.ng_ro:
+ return await self._instantiate_ng_ro(logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs,
+ db_vnfds_ref, n2vc_key_list, stage, start_deploy,
+ timeout_ns_deploy)
# deploy RO
-
# get vnfds, instantiate at RO
for c_vnf in nsd.get("constituent-vnfd", ()):
member_vnf_index = c_vnf["member-vnf-index"]
await asyncio.wait(task_dependency, timeout=3600)
stage[2] = "Checking instantiation parameters."
- RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
+ RO_ns_params = self._ns_params_2_RO(ns_params, nsd, db_vnfds_ref, db_vnfrs, n2vc_key_list)
stage[2] = "Deploying ns at VIM."
db_nsr_update["detailed-status"] = " ".join(stage)
self.update_db_2("nsrs", nsr_id, db_nsr_update)
self._write_op_status(nslcmop_id, stage)
# await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
# self.logger.debug(logging_text + "Deployed at VIM")
- except (ROclient.ROClientException, LcmException, DbException) as e:
+ except (ROclient.ROClientException, LcmException, DbException, NgRoException) as e:
+ stage[2] = "ERROR deploying at VIM"
self.set_vnfr_at_error(db_vnfrs, str(e))
raise
return ip_address
try:
ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
- result_dict = await self.RO.create_action(
- item="ns",
- item_id_name=ro_nsr_id,
- descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
- )
- # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
- if not result_dict or not isinstance(result_dict, dict):
- raise LcmException("Unknown response from RO when injecting key")
- for result in result_dict.values():
- if result.get("vim_result") == 200:
- break
- else:
- raise ROclient.ROClientException("error injecting key: {}".format(
- result.get("description")))
- break
+ if self.ng_ro:
+ target = {"action": "inject_ssh_key", "key": pub_key, "user": user,
+ "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdu_id}]}],
+ }
+ await self.RO.deploy(nsr_id, target)
+ else:
+ result_dict = await self.RO.create_action(
+ item="ns",
+ item_id_name=ro_nsr_id,
+ descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
+ )
+ # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
+ if not result_dict or not isinstance(result_dict, dict):
+ raise LcmException("Unknown response from RO when injecting key")
+ for result in result_dict.values():
+ if result.get("vim_result") == 200:
+ break
+ else:
+ raise ROclient.ROClientException("error injecting key: {}".format(
+ result.get("description")))
+ break
+ except NgRoException as e:
+ raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
except ROclient.ROClientException as e:
if not nb_tries:
self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
element_type = 'VDU'
element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
+ elif kdu_name:
+ namespace += ".{}".format(kdu_name)
+ element_type = 'KDU'
+ element_under_configuration = kdu_name
# Get artifact path
- self.fs.sync() # Sync from FSMongo
artifact_path = "{}/{}/charms/{}".format(
base_folder["folder"],
base_folder["pkg-dir"],
)
is_proxy_charm = deep_get(config_descriptor, ('juju', 'charm')) is not None
+ is_k8s_proxy_charm = False
+
if deep_get(config_descriptor, ('juju', 'proxy')) is False:
is_proxy_charm = False
- # n2vc_redesign STEP 3.1
+ if deep_get(config_descriptor, ('juju', 'k8s')) is True and is_proxy_charm:
+ is_k8s_proxy_charm = True
- # find old ee_id if exists
- ee_id = vca_deployed.get("ee_id")
+ if not is_k8s_proxy_charm:
+ # n2vc_redesign STEP 3.1
- # create or register execution environment in VCA
- if is_proxy_charm:
+ # find old ee_id if exists
+ ee_id = vca_deployed.get("ee_id")
- self._write_configuration_status(
- nsr_id=nsr_id,
- vca_index=vca_index,
- status='CREATING',
- element_under_configuration=element_under_configuration,
- element_type=element_type
- )
+ # create or register execution environment in VCA
+ if is_proxy_charm:
- step = "create execution environment"
- self.logger.debug(logging_text + step)
- ee_id, credentials = await self.n2vc.create_execution_environment(namespace=namespace,
- reuse_ee_id=ee_id,
- db_dict=db_dict)
+ self._write_configuration_status(
+ nsr_id=nsr_id,
+ vca_index=vca_index,
+ status='CREATING',
+ element_under_configuration=element_under_configuration,
+ element_type=element_type
+ )
- else:
- step = "Waiting to VM being up and getting IP address"
- self.logger.debug(logging_text + step)
- rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
- user=None, pub_key=None)
- credentials = {"hostname": rw_mgmt_ip}
- # get username
- username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
- # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
- # merged. Meanwhile let's get username from initial-config-primitive
- if not username and config_descriptor.get("initial-config-primitive"):
- for config_primitive in config_descriptor["initial-config-primitive"]:
- for param in config_primitive.get("parameter", ()):
- if param["name"] == "ssh-username":
- username = param["value"]
- break
- if not username:
- raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with "
- "'config-access.ssh-access.default-user'")
- credentials["username"] = username
- # n2vc_redesign STEP 3.2
+ step = "create execution environment"
+ self.logger.debug(logging_text + step)
+ ee_id, credentials = await self.n2vc.create_execution_environment(namespace=namespace,
+ reuse_ee_id=ee_id,
+ db_dict=db_dict)
- self._write_configuration_status(
- nsr_id=nsr_id,
- vca_index=vca_index,
- status='REGISTERING',
- element_under_configuration=element_under_configuration,
- element_type=element_type
- )
+ else:
+ step = "Waiting to VM being up and getting IP address"
+ self.logger.debug(logging_text + step)
+ rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
+ user=None, pub_key=None)
+ credentials = {"hostname": rw_mgmt_ip}
+ # get username
+ username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
+ # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
+ # merged. Meanwhile let's get username from initial-config-primitive
+ if not username and config_descriptor.get("initial-config-primitive"):
+ for config_primitive in config_descriptor["initial-config-primitive"]:
+ for param in config_primitive.get("parameter", ()):
+ if param["name"] == "ssh-username":
+ username = param["value"]
+ break
+ if not username:
+ raise LcmException("Cannot determine the username neither with"
+ "'initial-config-promitive' nor with "
+ "'config-access.ssh-access.default-user'")
+ credentials["username"] = username
+ # n2vc_redesign STEP 3.2
+
+ self._write_configuration_status(
+ nsr_id=nsr_id,
+ vca_index=vca_index,
+ status='REGISTERING',
+ element_under_configuration=element_under_configuration,
+ element_type=element_type
+ )
- step = "register execution environment {}".format(credentials)
- self.logger.debug(logging_text + step)
- ee_id = await self.n2vc.register_execution_environment(credentials=credentials, namespace=namespace,
- db_dict=db_dict)
-
- # for compatibility with MON/POL modules, the need model and application name at database
- # TODO ask to N2VC instead of assuming the format "model_name.application_name"
- ee_id_parts = ee_id.split('.')
- model_name = ee_id_parts[0]
- application_name = ee_id_parts[1]
- self.update_db_2("nsrs", nsr_id, {db_update_entry + "model": model_name,
- db_update_entry + "application": application_name,
- db_update_entry + "ee_id": ee_id})
+ step = "register execution environment {}".format(credentials)
+ self.logger.debug(logging_text + step)
+ ee_id = await self.n2vc.register_execution_environment(credentials=credentials, namespace=namespace,
+ db_dict=db_dict)
# n2vc_redesign STEP 3.3
vca_index=vca_index,
status='INSTALLING SW',
element_under_configuration=element_under_configuration,
- element_type=element_type
+ element_type=element_type,
)
# TODO check if already done
self.logger.debug(logging_text + step)
- await self.n2vc.install_configuration_sw(ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict)
+ config = None
+ if not is_proxy_charm:
+ initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
+ if initial_config_primitive_list:
+ for primitive in initial_config_primitive_list:
+ if primitive["name"] == "config":
+ config = self._map_primitive_params(
+ primitive,
+ {},
+ deploy_params
+ )
+ break
+ if is_k8s_proxy_charm:
+ charm_name = deep_get(config_descriptor, ('juju', 'charm'))
+ self.logger.debug("Installing K8s Proxy Charm: {}".format(charm_name))
+
+ ee_id = await self.n2vc.install_k8s_proxy_charm(
+ charm_name=charm_name,
+ namespace=namespace,
+ artifact_path=artifact_path,
+ db_dict=db_dict
+ )
+ else:
+ num_units = 1
+ if is_proxy_charm:
+ if element_type == "NS":
+ num_units = db_nsr.get("config-units") or 1
+ elif element_type == "VNF":
+ num_units = db_vnfr.get("config-units") or 1
+ elif element_type == "VDU":
+ for v in db_vnfr["vdur"]:
+ if vdu_id == v["vdu-id-ref"]:
+ num_units = v.get("config-units") or 1
+ break
+
+ await self.n2vc.install_configuration_sw(
+ ee_id=ee_id,
+ artifact_path=artifact_path,
+ db_dict=db_dict,
+ config=config,
+ num_units=num_units
+ )
# write in db flag of configuration_sw already installed
self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
raise LcmException("{} {}".format(step, e)) from e
def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
- error_description: str = None, other_update: dict = None):
+ error_description: str = None, error_detail: str = None, other_update: dict = None):
"""
Update db_nsr fields.
:param nsr_id:
:param current_operation:
:param current_operation_id:
:param error_description:
+ :param error_detail:
:param other_update: Other required changes at database if provided, will be cleared
:return:
"""
db_dict["currentOperation"] = current_operation
db_dict["currentOperationID"] = current_operation_id
db_dict["errorDescription"] = error_description
+ db_dict["errorDetail"] = error_detail
if ns_state:
db_dict["nsState"] = ns_state
except DbException as e:
self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
- def _write_all_config_status(self, nsr_id: str, status: str):
+ def _write_all_config_status(self, db_nsr: dict, status: str):
try:
- # nsrs record
- db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ nsr_id = db_nsr["_id"]
# configurationStatus
config_status = db_nsr.get('configurationStatus')
if config_status:
+ db_nsr_update = {"configurationStatus.{}.status".format(index): status for index, v in
+ enumerate(config_status) if v}
# update status
- db_dict = dict()
- db_dict['configurationStatus'] = list()
- for c in config_status:
- c['status'] = status
- db_dict['configurationStatus'].append(c)
- self.update_db_2("nsrs", nsr_id, db_dict)
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
except DbException as e:
self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
- element_under_configuration: str = None, element_type: str = None):
+ element_under_configuration: str = None, element_type: str = None,
+ other_update: dict = None):
# self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
# .format(vca_index, status))
try:
db_path = 'configurationStatus.{}.'.format(vca_index)
- db_dict = dict()
+ db_dict = other_update or {}
if status:
db_dict[db_path + 'status'] = status
if element_under_configuration:
self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
.format(status, nsr_id, vca_index, e))
- async def do_placement(self, logging_text, db_nslcmop, db_vnfrs):
+ async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
+ """
+ Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
+ sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
+ Database is used because the result can be obtained from a different LCM worker in case of HA.
+ :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
+ :param db_nslcmop: database content of nslcmop
+ :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
+ :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
+ computed 'vim-account-id'
+ """
+ modified = False
+ nslcmop_id = db_nslcmop['_id']
placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
if placement_engine == "PLA":
- self.logger.debug(logging_text + "Invoke placement optimization for nslcmopId={}".format(db_nslcmop['id']))
- await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': db_nslcmop['_id']}, loop=self.loop)
+ self.logger.debug(logging_text + "Invoke and wait for placement optimization")
+ await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id}, loop=self.loop)
db_poll_interval = 5
- wait = db_poll_interval * 4
+ wait = db_poll_interval * 10
pla_result = None
while not pla_result and wait >= 0:
await asyncio.sleep(db_poll_interval)
wait -= db_poll_interval
- db_nslcmop = self.db.get_one("nslcmops", {"_id": db_nslcmop["_id"]})
+ db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
if not pla_result:
- raise LcmException("Placement timeout for nslcmopId={}".format(db_nslcmop['id']))
+ raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id))
for pla_vnf in pla_result['vnf']:
vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
if not pla_vnf.get('vimAccountId') or not vnfr:
continue
+ modified = True
self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
- return
+ # Modifies db_vnfrs
+ vnfr["vim-account-id"] = pla_vnf['vimAccountId']
+ return modified
def update_nsrs_with_pla_result(self, params):
try:
logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
self.logger.debug(logging_text + "Enter")
+ # Sync from FSMongo
+ self.fs.sync()
+
# get all needed from database
# database nsrs record
# read from db: ns
stage[1] = "Getting nsr={} from db".format(nsr_id)
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
- # nsd is replicated into ns (no db read)
- nsd = db_nsr["nsd"]
+ stage[1] = "Getting nsd={} from db".format(db_nsr["nsd-id"])
+ nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
+ db_nsr["nsd"] = nsd
# nsr_name = db_nsr["name"] # TODO short-name??
# read from db: vnf's of this ns
)
)
self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
- tasks_dict_info[task_ro] = "Deploy at VIM"
+ tasks_dict_info[task_ro] = "Deploying at VIM"
# n2vc_redesign STEP 3 to 6 Deploy N2VC
stage[1] = "Deploying Execution Environments."
deploy_params=deploy_params_vdu,
descriptor_config=descriptor_config,
base_folder=base_folder,
- task_instantiation_info=tasks_dict_info
+ task_instantiation_info=tasks_dict_info,
+ stage=stage
)
for kdud in get_iterable(vnfd, 'kdu'):
kdu_name = kdud["name"]
deploy_params=deploy_params,
descriptor_config=descriptor_config,
base_folder=base_folder,
- task_instantiation_info=tasks_dict_info
+ task_instantiation_info=tasks_dict_info,
+ stage=stage
)
# Check if this NS has a charm configuration
deploy_params=deploy_params,
descriptor_config=descriptor_config,
base_folder=base_folder,
- task_instantiation_info=tasks_dict_info
+ task_instantiation_info=tasks_dict_info,
+ stage=stage
)
# rest of staff will be done at finally
# update status at database
if error_list:
- error_detail = "; ".join(error_list)
+ error_detail = ". ".join(error_list)
self.logger.error(logging_text + error_detail)
- error_description_nslcmop = 'Stage: {} {} Detail: {}'.format(stage[0], stage[1], error_detail)
- error_description_nsr = 'Operation: INSTANTIATING.{}, {}'.format(nslcmop_id, error_description_nslcmop)
+ error_description_nslcmop = 'Stage: {}. Detail: {}'.format(stage[0], error_detail)
+ error_description_nsr = 'Operation: INSTANTIATING.{}, Stage {}'.format(nslcmop_id, stage[0])
- db_nsr_update["detailed-status"] = error_description_nsr
+ db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
db_nslcmop_update["detailed-status"] = error_detail
nslcmop_operation_state = "FAILED"
ns_state = "BROKEN"
else:
+ error_detail = None
error_description_nsr = error_description_nslcmop = None
ns_state = "READY"
db_nsr_update["detailed-status"] = "Done"
current_operation="IDLE",
current_operation_id=None,
error_description=error_description_nsr,
+ error_detail=error_detail,
other_update=db_nsr_update
)
- if db_nslcmop:
- self._write_op_status(
- op_id=nslcmop_id,
- stage="",
- error_message=error_description_nslcmop,
- operation_state=nslcmop_operation_state,
- other_update=db_nslcmop_update,
- )
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage="",
+ error_message=error_description_nslcmop,
+ operation_state=nslcmop_operation_state,
+ other_update=db_nslcmop_update,
+ )
if nslcmop_operation_state:
try:
# read nsr record
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
# this VCA data
my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
# read all ns-configuration relations
ns_relations = list()
- db_ns_relations = deep_get(db_nsr, ('nsd', 'ns-configuration', 'relation'))
+ db_ns_relations = deep_get(nsd, ('ns-configuration', 'relation'))
if db_ns_relations:
for r in db_ns_relations:
# check if this VCA is in the relation
self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
return False
+ def _write_db_callback(self, task, item, _id, on_done=None, on_exc=None):
+ """
+ callback for kdu install intended to store the returned kdu_instance at database
+ :return: None
+ """
+ db_update = {}
+ try:
+ result = task.result()
+ if on_done:
+ db_update[on_done] = str(result)
+ except Exception as e:
+ if on_exc:
+ db_update[on_exc] = str(e)
+ if db_update:
+ try:
+ self.update_db_2(item, _id, db_update)
+ except Exception:
+ pass
+
async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
# Launch kdus if present in the descriptor
for kdur in get_iterable(vnfr_data, "kdur"):
desc_params = self._format_additional_params(kdur.get("additionalParams"))
vnfd_id = vnfr_data.get('vnfd-id')
- pkgdir = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage', 'pkg-dir'))
+ namespace = kdur.get("k8s-namespace")
if kdur.get("helm-chart"):
kdumodel = kdur["helm-chart"]
k8sclustertype = "helm-chart"
format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]))
# check if kdumodel is a file and exists
try:
- # path format: /vnfdid/pkkdir/kdumodel
- filename = '{}/{}/{}s/{}'.format(vnfd_id, pkgdir, k8sclustertype, kdumodel)
- if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
- kdumodel = self.fs.path + filename
- except asyncio.CancelledError:
+ storage = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage'))
+ if storage and storage.get('pkg-dir'): # may be not present if vnfd has not artifacts
+ # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
+ filename = '{}/{}/{}s/{}'.format(storage["folder"], storage["pkg-dir"], k8sclustertype,
+ kdumodel)
+ if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
+ kdumodel = self.fs.path + filename
+ except (asyncio.TimeoutError, asyncio.CancelledError):
raise
except Exception: # it is not a file
pass
step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
kdur["kdu-name"], k8s_cluster_id)
- k8s_instace_info = {"kdu-instance": None, "k8scluster-uuid": cluster_uuid,
+ k8s_instace_info = {"kdu-instance": None,
+ "k8scluster-uuid": cluster_uuid,
"k8scluster-type": k8sclustertype,
- "kdu-name": kdur["kdu-name"], "kdu-model": kdumodel}
- db_nsr_update["_admin.deployed.K8s.{}".format(index)] = k8s_instace_info
+ "member-vnf-index": vnfr_data["member-vnf-index-ref"],
+ "kdu-name": kdur["kdu-name"],
+ "kdu-model": kdumodel,
+ "namespace": namespace}
+ db_path = "_admin.deployed.K8s.{}".format(index)
+ db_nsr_update[db_path] = k8s_instace_info
self.update_db_2("nsrs", nsr_id, db_nsr_update)
db_dict = {"collection": "nsrs",
"filter": {"_id": nsr_id},
- "path": "_admin.deployed.K8s.{}".format(index)}
+ "path": db_path}
- if k8sclustertype == "helm-chart":
- task = asyncio.ensure_future(
- self.k8sclusterhelm.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel, atomic=True,
- params=desc_params, db_dict=db_dict, timeout=3600)
- )
- else:
- task = asyncio.ensure_future(
- self.k8sclusterjuju.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
- atomic=True, params=desc_params,
- db_dict=db_dict, timeout=600,
- kdu_name=kdur["kdu-name"])
- )
+ task = asyncio.ensure_future(
+ self.k8scluster_map[k8sclustertype].install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
+ atomic=True, params=desc_params,
+ db_dict=db_dict, timeout=600,
+ kdu_name=kdur["kdu-name"], namespace=namespace))
+ task.add_done_callback(partial(self._write_db_callback, item="nsrs", _id=nsr_id,
+ on_done=db_path + ".kdu-instance",
+ on_exc=db_path + ".detailed-status"))
self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
- task_instantiation_info[task] = "Deploy KDU {}".format(kdur["kdu-name"])
+ task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
index += 1
# sub-operations
- def _reintent_or_skip_suboperation(self, db_nslcmop, op_index):
- op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
- if (op.get('operationState') == 'COMPLETED'):
+ def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
+ op = deep_get(db_nslcmop, ('_admin', 'operations'), [])[op_index]
+ if op.get('operationState') == 'COMPLETED':
# b. Skip sub-operation
# _ns_execute_primitive() or RO.create_action() will NOT be executed
return self.SUBOPERATION_STATUS_SKIP
else:
- # c. Reintent executing sub-operation
+ # c. retry executing sub-operation
# The sub-operation exists, and operationState != 'COMPLETED'
- # Update operationState = 'PROCESSING' to indicate a reintent.
+ # Update operationState = 'PROCESSING' to indicate a retry.
operationState = 'PROCESSING'
detailed_status = 'In progress'
self._update_suboperation_status(
# Find a sub-operation where all keys in a matching dictionary must match
# Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
def _find_suboperation(self, db_nslcmop, match):
- if (db_nslcmop and match):
+ if db_nslcmop and match:
op_list = db_nslcmop.get('_admin', {}).get('operations', [])
for i, op in enumerate(op_list):
if all(op.get(k) == match[k] for k in match):
# Check for 3 different cases:
# a. New: First time execution, return SUBOPERATION_STATUS_NEW
# b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
- # c. Reintent: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
+ # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
operationType, RO_nsr_id=None, RO_scaling_info=None):
# Find this sub-operation
- if (RO_nsr_id and RO_scaling_info):
+ if RO_nsr_id and RO_scaling_info:
operationType = 'SCALE-RO'
match = {
'member_vnf_index': vnf_index,
'lcmOperationType': operationType
}
op_index = self._find_suboperation(db_nslcmop, match)
- if (op_index == self.SUBOPERATION_STATUS_NOT_FOUND):
+ if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
# a. New sub-operation
# The sub-operation does not exist, add it.
# _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
vdu_id = None
vdu_count_index = None
vdu_name = None
- if (RO_nsr_id and RO_scaling_info):
+ if RO_nsr_id and RO_scaling_info:
vnf_config_primitive = None
primitive_params = None
else:
else:
# Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
# or op_index (operationState != 'COMPLETED')
- return self._reintent_or_skip_suboperation(db_nslcmop, op_index)
+ return self._retry_or_skip_suboperation(db_nslcmop, op_index)
# Function to return execution_environment id
if destroy_ee:
await self.n2vc.delete_execution_environment(vca_deployed["ee_id"])
- async def _delete_N2VC(self, nsr_id: str):
- self._write_all_config_status(nsr_id=nsr_id, status='TERMINATING')
- namespace = "." + nsr_id
- await self.n2vc.delete_namespace(namespace=namespace)
- self._write_all_config_status(nsr_id=nsr_id, status='DELETED')
+ async def _delete_all_N2VC(self, db_nsr: dict):
+ self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING')
+ namespace = "." + db_nsr["_id"]
+ try:
+ await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete)
+ except N2VCNotFound: # already deleted. Skip
+ pass
+ self._write_all_config_status(db_nsr=db_nsr, status='DELETED')
async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
"""
db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id))
elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
- failed_detail.append("RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e))
- self.logger.debug(logging_text + failed_detail[-1])
+ failed_detail.append("delete conflict: {}".format(e))
+ self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e))
else:
- failed_detail.append("RO_ns_id={} delete error: {}".format(ro_nsr_id, e))
- self.logger.error(logging_text + failed_detail[-1])
+ failed_detail.append("delete error: {}".format(e))
+ self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e))
# Delete nsd
if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
failed_detail.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e))
self.logger.error(logging_text + failed_detail[-1])
- stage[2] = "Deleted from VIM"
+ if failed_detail:
+ stage[2] = "Error deleting from VIM"
+ else:
+ stage[2] = "Deleted from VIM"
db_nsr_update["detailed-status"] = " ".join(stage)
self.update_db_2("nsrs", nsr_id, db_nsr_update)
self._write_op_status(nslcmop_id, stage)
if failed_detail:
- raise LcmException("Error while {}: {}".format(stage[2], "; ".join(failed_detail)))
+ raise LcmException("; ".join(failed_detail))
async def terminate(self, nsr_id, nslcmop_id):
# Try to lock HA task here
timeout_ns_terminate = self.timeout_ns_terminate
db_nsr = None
db_nslcmop = None
+ operation_params = None
exc = None
error_list = [] # annotates all failed error messages
db_nslcmop_update = {}
# remove All execution environments at once
stage[0] = "Stage 3/3 delete all."
- stage[1] = "Deleting all execution environments."
- self.logger.debug(logging_text + stage[1])
- task_delete_ee = asyncio.ensure_future(self._delete_N2VC(nsr_id=nsr_id))
- # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
- tasks_dict_info[task_delete_ee] = "Terminating all VCA"
+ if nsr_deployed.get("VCA"):
+ stage[1] = "Deleting all execution environments."
+ self.logger.debug(logging_text + stage[1])
+ task_delete_ee = asyncio.ensure_future(asyncio.wait_for(self._delete_all_N2VC(db_nsr=db_nsr),
+ timeout=self.timeout_charm_delete))
+ # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
+ tasks_dict_info[task_delete_ee] = "Terminating all VCA"
# Delete from k8scluster
stage[1] = "Deleting KDUs."
if not kdu or not kdu.get("kdu-instance"):
continue
kdu_instance = kdu.get("kdu-instance")
- if kdu.get("k8scluster-type") in ("helm-chart", "chart"):
+ if kdu.get("k8scluster-type") in self.k8scluster_map:
task_delete_kdu_instance = asyncio.ensure_future(
- self.k8sclusterhelm.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
- kdu_instance=kdu_instance))
- elif kdu.get("k8scluster-type") in ("juju-bundle", "juju"):
- task_delete_kdu_instance = asyncio.ensure_future(
- self.k8sclusterjuju.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
- kdu_instance=kdu_instance))
+ self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
+ cluster_uuid=kdu.get("k8scluster-uuid"),
+ kdu_instance=kdu_instance))
else:
self.logger.error(logging_text + "Unknown k8s deployment type {}".
format(kdu.get("k8scluster-type")))
# remove from RO
stage[1] = "Deleting ns from VIM."
- task_delete_ro = asyncio.ensure_future(
- self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
+ if self.ng_ro:
+ task_delete_ro = asyncio.ensure_future(
+ self._terminate_ng_ro(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
+ else:
+ task_delete_ro = asyncio.ensure_future(
+ self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
# rest of staff will be done at finally
if error_list:
error_detail = "; ".join(error_list)
# self.logger.error(logging_text + error_detail)
- error_description_nslcmop = 'Stage: {} {} Detail: {}'.format(stage[0], stage[1], error_detail)
- error_description_nsr = 'Operation: TERMINATING.{}, {}'.format(nslcmop_id, error_description_nslcmop)
+ error_description_nslcmop = 'Stage: {}. Detail: {}'.format(stage[0], error_detail)
+ error_description_nsr = 'Operation: TERMINATING.{}, Stage {}.'.format(nslcmop_id, stage[0])
db_nsr_update["operational-status"] = "failed"
- db_nsr_update["detailed-status"] = error_description_nsr
+ db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
db_nslcmop_update["detailed-status"] = error_detail
nslcmop_operation_state = "FAILED"
ns_state = "BROKEN"
else:
+ error_detail = None
error_description_nsr = error_description_nslcmop = None
ns_state = "NOT_INSTANTIATED"
db_nsr_update["operational-status"] = "terminated"
current_operation="IDLE",
current_operation_id=None,
error_description=error_description_nsr,
+ error_detail=error_detail,
other_update=db_nsr_update
)
- if db_nslcmop:
- self._write_op_status(
- op_id=nslcmop_id,
- stage="",
- error_message=error_description_nslcmop,
- operation_state=nslcmop_operation_state,
- other_update=db_nslcmop_update,
- )
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage="",
+ error_message=error_description_nslcmop,
+ operation_state=nslcmop_operation_state,
+ other_update=db_nslcmop_update,
+ )
+ if operation_params:
autoremove = operation_params.get("autoremove", False)
if nslcmop_operation_state:
try:
async def _wait_for_tasks(self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None):
time_start = time()
+ error_detail_list = []
error_list = []
pending_tasks = list(created_tasks_info.keys())
num_tasks = len(pending_tasks)
num_done = 0
stage[1] = "{}/{}.".format(num_done, num_tasks)
self._write_op_status(nslcmop_id, stage)
- new_error = False
while pending_tasks:
+ new_error = None
_timeout = timeout + time_start - time()
done, pending_tasks = await asyncio.wait(pending_tasks, timeout=_timeout,
return_when=asyncio.FIRST_COMPLETED)
num_done += len(done)
if not done: # Timeout
for task in pending_tasks:
- error_list.append(created_tasks_info[task] + ": Timeout")
+ new_error = created_tasks_info[task] + ": Timeout"
+ error_detail_list.append(new_error)
+ error_list.append(new_error)
break
for task in done:
if task.cancelled():
- self.logger.warn(logging_text + created_tasks_info[task] + ": Cancelled")
- error_list.append(created_tasks_info[task] + ": Cancelled")
- new_error = True
+ exc = "Cancelled"
else:
exc = task.exception()
- if exc:
- error_list.append(created_tasks_info[task] + ": {}".format(exc))
- new_error = True
- if isinstance(exc, (DbException, N2VCException, ROclient.ROClientException, LcmException)):
- self.logger.error(logging_text + created_tasks_info[task] + ": " + str(exc))
- else:
- exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
- self.logger.error(logging_text + created_tasks_info[task] + exc_traceback)
+ if exc:
+ if isinstance(exc, asyncio.TimeoutError):
+ exc = "Timeout"
+ new_error = created_tasks_info[task] + ": {}".format(exc)
+ error_list.append(created_tasks_info[task])
+ error_detail_list.append(new_error)
+ if isinstance(exc, (str, DbException, N2VCException, ROclient.ROClientException, LcmException,
+ K8sException)):
+ self.logger.error(logging_text + new_error)
else:
- self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
+ exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
+ self.logger.error(logging_text + created_tasks_info[task] + exc_traceback)
+ else:
+ self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
stage[1] = "{}/{}.".format(num_done, num_tasks)
if new_error:
- stage[1] += "Errors: " + ". ".join(error_list) + "."
- new_error = False
+ stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
if nsr_id: # update also nsr
- self.update_db_2("nsrs", nsr_id, {"errorDescription": ". ".join(error_list)})
+ self.update_db_2("nsrs", nsr_id, {"errorDescription": "Error at: " + ", ".join(error_list),
+ "errorDetail": ". ".join(error_detail_list)})
self._write_op_status(nslcmop_id, stage)
- return error_list
+ return error_detail_list
@staticmethod
def _map_primitive_params(primitive_desc, params, instantiation_params):
calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
return calculated_params
- def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_name, vdu_count_index, kdu_name=None):
+ def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_count_index, kdu_name=None):
# find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
for vca in deployed_vca:
if not vca:
continue
if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
continue
- if vdu_name and vdu_name != vca["vdu_name"]:
- continue
if vdu_count_index is not None and vdu_count_index != vca["vdu_count_index"]:
continue
if kdu_name and kdu_name != vca["kdu_name"]:
break
else:
# vca_deployed not found
- raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not "
- "deployed".format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
+ raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} is not "
+ "deployed".format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
# get ee_id
ee_id = vca.get("ee_id")
if not ee_id:
- raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not "
+ raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
"execution environment"
- .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
+ .format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
return ee_id
async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0,
- retries_interval=30) -> (str, str):
+ retries_interval=30, timeout=None) -> (str, str):
try:
if primitive == "config":
primitive_params = {"params": primitive_params}
while retries >= 0:
try:
- output = await self.n2vc.exec_primitive(
- ee_id=ee_id,
- primitive_name=primitive,
- params_dict=primitive_params
- )
+ output = await asyncio.wait_for(
+ self.n2vc.exec_primitive(
+ ee_id=ee_id,
+ primitive_name=primitive,
+ params_dict=primitive_params,
+ progress_timeout=self.timeout_progress_primitive,
+ total_timeout=self.timeout_primitive),
+ timeout=timeout or self.timeout_primitive)
# execution was OK
break
- except Exception as e:
+ except asyncio.CancelledError:
+ raise
+ except Exception as e: # asyncio.TimeoutError
+ if isinstance(e, asyncio.TimeoutError):
+ e = "Timeout"
retries -= 1
if retries >= 0:
self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
# wait and retry
await asyncio.sleep(retries_interval, loop=self.loop)
else:
- return 'FAIL', 'Cannot execute action {} on {}: {}'.format(primitive, ee_id, e)
+ return 'FAILED', str(e)
return 'COMPLETED', output
- except LcmException:
+ except (LcmException, asyncio.CancelledError):
raise
except Exception as e:
return 'FAIL', 'Error executing action {}: {}'.format(primitive, e)
db_nsr_update = {}
db_nslcmop_update = {}
nslcmop_operation_state = None
- nslcmop_operation_state_detail = None
+ error_description_nslcmop = None
exc = None
try:
# wait for any previous tasks in process
vdu_id = db_nslcmop["operationParams"].get("vdu_id")
kdu_name = db_nslcmop["operationParams"].get("kdu_name")
vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
- vdu_name = db_nslcmop["operationParams"].get("vdu_name")
+ primitive = db_nslcmop["operationParams"]["primitive"]
+ primitive_params = db_nslcmop["operationParams"]["primitive_params"]
+ timeout_ns_action = db_nslcmop["operationParams"].get("timeout_ns_action", self.timeout_primitive)
if vnf_index:
step = "Getting vnfr from database"
step = "Getting vnfd from database"
db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
else:
- if db_nsr.get("nsd"):
- db_nsd = db_nsr.get("nsd") # TODO this will be removed
- else:
- step = "Getting nsd from database"
- db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
+ step = "Getting nsd from database"
+ db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
# for backward compatibility
if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
self.update_db_2("nsrs", nsr_id, db_nsr_update)
- primitive = db_nslcmop["operationParams"]["primitive"]
- primitive_params = db_nslcmop["operationParams"]["primitive_params"]
-
# look for primitive
config_primitive_desc = None
if vdu_id:
for vdu in get_iterable(db_vnfd, "vdu"):
if vdu_id == vdu["id"]:
- for config_primitive in vdu.get("vdu-configuration", {}).get("config-primitive", ()):
+ for config_primitive in deep_get(vdu, ("vdu-configuration", "config-primitive"), ()):
if config_primitive["name"] == primitive:
config_primitive_desc = config_primitive
break
+ break
elif kdu_name:
- self.logger.debug(logging_text + "Checking actions in KDUs")
- kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
- desc_params = self._format_additional_params(kdur.get("additionalParams")) or {}
- if primitive_params:
- desc_params.update(primitive_params)
- # TODO Check if we will need something at vnf level
- index = 0
- for kdu in get_iterable(nsr_deployed, "K8s"):
- if kdu_name == kdu["kdu-name"]:
- db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id},
- "path": "_admin.deployed.K8s.{}".format(index)}
- if primitive == "upgrade":
- if desc_params.get("kdu_model"):
- kdu_model = desc_params.get("kdu_model")
- del desc_params["kdu_model"]
- else:
- kdu_model = kdu.get("kdu-model")
- parts = kdu_model.split(sep=":")
- if len(parts) == 2:
- kdu_model = parts[0]
-
- if kdu.get("k8scluster-type") in ("helm-chart", "chart"):
- output = await self.k8sclusterhelm.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
- kdu_instance=kdu.get("kdu-instance"),
- atomic=True, kdu_model=kdu_model,
- params=desc_params, db_dict=db_dict,
- timeout=300)
- elif kdu.get("k8scluster-type")in ("juju-bundle", "juju"):
- output = await self.k8sclusterjuju.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
- kdu_instance=kdu.get("kdu-instance"),
- atomic=True, kdu_model=kdu_model,
- params=desc_params, db_dict=db_dict,
- timeout=300)
-
- else:
- msg = "k8scluster-type not defined"
- raise LcmException(msg)
-
- self.logger.debug(logging_text + " Upgrade of kdu {} done".format(output))
- break
- elif primitive == "rollback":
- if kdu.get("k8scluster-type") in ("helm-chart", "chart"):
- output = await self.k8sclusterhelm.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
- kdu_instance=kdu.get("kdu-instance"),
- db_dict=db_dict)
- elif kdu.get("k8scluster-type") in ("juju-bundle", "juju"):
- output = await self.k8sclusterjuju.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
- kdu_instance=kdu.get("kdu-instance"),
- db_dict=db_dict)
- else:
- msg = "k8scluster-type not defined"
- raise LcmException(msg)
- break
- elif primitive == "status":
- if kdu.get("k8scluster-type") in ("helm-chart", "chart"):
- output = await self.k8sclusterhelm.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
- kdu_instance=kdu.get("kdu-instance"))
- elif kdu.get("k8scluster-type") in ("juju-bundle", "juju"):
- output = await self.k8sclusterjuju.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
- kdu_instance=kdu.get("kdu-instance"))
- else:
- msg = "k8scluster-type not defined"
- raise LcmException(msg)
- break
- index += 1
-
- else:
- raise LcmException("KDU '{}' not found".format(kdu_name))
- if output:
- db_nslcmop_update["detailed-status"] = output
- db_nslcmop_update["operationState"] = 'COMPLETED'
- db_nslcmop_update["statusEnteredTime"] = time()
- else:
- db_nslcmop_update["detailed-status"] = ''
- db_nslcmop_update["operationState"] = 'FAILED'
- db_nslcmop_update["statusEnteredTime"] = time()
- return
+ for kdu in get_iterable(db_vnfd, "kdu"):
+ if kdu_name == kdu["name"]:
+ for config_primitive in deep_get(kdu, ("kdu-configuration", "config-primitive"), ()):
+ if config_primitive["name"] == primitive:
+ config_primitive_desc = config_primitive
+ break
+ break
elif vnf_index:
- for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
+ for config_primitive in deep_get(db_vnfd, ("vnf-configuration", "config-primitive"), ()):
if config_primitive["name"] == primitive:
config_primitive_desc = config_primitive
break
else:
- for config_primitive in db_nsd.get("ns-configuration", {}).get("config-primitive", ()):
+ for config_primitive in deep_get(db_nsd, ("ns-configuration", "config-primitive"), ()):
if config_primitive["name"] == primitive:
config_primitive_desc = config_primitive
break
- if not config_primitive_desc:
+ if not config_primitive_desc and not (kdu_name and primitive in ("upgrade", "rollback", "status")):
raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
format(primitive))
- desc_params = {}
if vnf_index:
- if db_vnfr.get("additionalParamsForVnf"):
- desc_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"])
if vdu_id:
vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
- if vdur.get("additionalParams"):
- desc_params = self._format_additional_params(vdur["additionalParams"])
+ desc_params = self._format_additional_params(vdur.get("additionalParams"))
+ elif kdu_name:
+ kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
+ desc_params = self._format_additional_params(kdur.get("additionalParams"))
+ else:
+ desc_params = self._format_additional_params(db_vnfr.get("additionalParamsForVnf"))
else:
- if db_nsr.get("additionalParamsForNs"):
- desc_params.update(self._format_additional_params(db_nsr["additionalParamsForNs"]))
+ desc_params = self._format_additional_params(db_nsr.get("additionalParamsForNs"))
+
+ if kdu_name:
+ kdu_action = True if not deep_get(kdu, ("kdu-configuration", "juju")) else False
# TODO check if ns is in a proper status
- result, detailed_status = await self._ns_execute_primitive(
- self._look_for_deployed_vca(nsr_deployed["VCA"],
- member_vnf_index=vnf_index,
- vdu_id=vdu_id,
- vdu_name=vdu_name,
- vdu_count_index=vdu_count_index),
- primitive=primitive,
- primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params))
-
- db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = detailed_status
- db_nslcmop_update["operationState"] = nslcmop_operation_state = result
- db_nslcmop_update["statusEnteredTime"] = time()
- self.logger.debug(logging_text + " task Done with result {} {}".format(result, detailed_status))
+ if kdu_name and (primitive in ("upgrade", "rollback", "status") or kdu_action):
+ # kdur and desc_params already set from before
+ if primitive_params:
+ desc_params.update(primitive_params)
+ # TODO Check if we will need something at vnf level
+ for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
+ if kdu_name == kdu["kdu-name"] and kdu["member-vnf-index"] == vnf_index:
+ break
+ else:
+ raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index))
+
+ if kdu.get("k8scluster-type") not in self.k8scluster_map:
+ msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
+ raise LcmException(msg)
+
+ db_dict = {"collection": "nsrs",
+ "filter": {"_id": nsr_id},
+ "path": "_admin.deployed.K8s.{}".format(index)}
+ self.logger.debug(logging_text + "Exec k8s {} on {}.{}".format(primitive, vnf_index, kdu_name))
+ step = "Executing kdu {}".format(primitive)
+ if primitive == "upgrade":
+ if desc_params.get("kdu_model"):
+ kdu_model = desc_params.get("kdu_model")
+ del desc_params["kdu_model"]
+ else:
+ kdu_model = kdu.get("kdu-model")
+ parts = kdu_model.split(sep=":")
+ if len(parts) == 2:
+ kdu_model = parts[0]
+
+ detailed_status = await asyncio.wait_for(
+ self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
+ cluster_uuid=kdu.get("k8scluster-uuid"),
+ kdu_instance=kdu.get("kdu-instance"),
+ atomic=True, kdu_model=kdu_model,
+ params=desc_params, db_dict=db_dict,
+ timeout=timeout_ns_action),
+ timeout=timeout_ns_action + 10)
+ self.logger.debug(logging_text + " Upgrade of kdu {} done".format(detailed_status))
+ elif primitive == "rollback":
+ detailed_status = await asyncio.wait_for(
+ self.k8scluster_map[kdu["k8scluster-type"]].rollback(
+ cluster_uuid=kdu.get("k8scluster-uuid"),
+ kdu_instance=kdu.get("kdu-instance"),
+ db_dict=db_dict),
+ timeout=timeout_ns_action)
+ elif primitive == "status":
+ detailed_status = await asyncio.wait_for(
+ self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
+ cluster_uuid=kdu.get("k8scluster-uuid"),
+ kdu_instance=kdu.get("kdu-instance")),
+ timeout=timeout_ns_action)
+ else:
+ kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(kdu["kdu-name"], nsr_id)
+ params = self._map_primitive_params(config_primitive_desc, primitive_params, desc_params)
+
+ detailed_status = await asyncio.wait_for(
+ self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
+ cluster_uuid=kdu.get("k8scluster-uuid"),
+ kdu_instance=kdu_instance,
+ primitive_name=primitive,
+ params=params, db_dict=db_dict,
+ timeout=timeout_ns_action),
+ timeout=timeout_ns_action)
+
+ if detailed_status:
+ nslcmop_operation_state = 'COMPLETED'
+ else:
+ detailed_status = ''
+ nslcmop_operation_state = 'FAILED'
+ else:
+ nslcmop_operation_state, detailed_status = await self._ns_execute_primitive(
+ self._look_for_deployed_vca(nsr_deployed["VCA"],
+ member_vnf_index=vnf_index,
+ vdu_id=vdu_id,
+ vdu_count_index=vdu_count_index),
+ primitive=primitive,
+ primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params),
+ timeout=timeout_ns_action)
+
+ db_nslcmop_update["detailed-status"] = detailed_status
+ error_description_nslcmop = detailed_status if nslcmop_operation_state == "FAILED" else ""
+ self.logger.debug(logging_text + " task Done with result {} {}".format(nslcmop_operation_state,
+ detailed_status))
return # database update is called inside finally
- except (DbException, LcmException) as e:
+ except (DbException, LcmException, N2VCException, K8sException) as e:
self.logger.error(logging_text + "Exit Exception {}".format(e))
exc = e
except asyncio.CancelledError:
self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
exc = "Operation was cancelled"
+ except asyncio.TimeoutError:
+ self.logger.error(logging_text + "Timeout while '{}'".format(step))
+ exc = "Timeout"
except Exception as e:
exc = traceback.format_exc()
self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
finally:
- if exc and db_nslcmop:
- db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = \
+ if exc:
+ db_nslcmop_update["detailed-status"] = detailed_status = error_description_nslcmop = \
"FAILED {}: {}".format(step, exc)
- db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
- db_nslcmop_update["statusEnteredTime"] = time()
- try:
- if db_nslcmop_update:
- self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
- if db_nsr:
- self._write_ns_status(
- nsr_id=nsr_id,
- ns_state=None,
- current_operation="IDLE",
- current_operation_id=None,
- other_update=db_nsr_update
- )
- if exc:
- self._write_op_status(
- op_id=nslcmop_id,
- error_message=nslcmop_operation_state_detail
- )
- except DbException as e:
- self.logger.error(logging_text + "Cannot update database: {}".format(e))
- self.logger.debug(logging_text + "Exit")
+ nslcmop_operation_state = "FAILED"
+ if db_nsr:
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=db_nsr["nsState"], # TODO check if degraded. For the moment use previous status
+ current_operation="IDLE",
+ current_operation_id=None,
+ # error_description=error_description_nsr,
+ # error_detail=error_detail,
+ other_update=db_nsr_update
+ )
+
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage="",
+ error_message=error_description_nslcmop,
+ operation_state=nslcmop_operation_state,
+ other_update=db_nslcmop_update,
+ )
+
if nslcmop_operation_state:
try:
await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
self.logger.debug(logging_text + "Exit")
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
- return nslcmop_operation_state, nslcmop_operation_state_detail
+ return nslcmop_operation_state, detailed_status
async def scale(self, nsr_id, nslcmop_id):
db_nsr_update["config-status"] = "configuring pre-scaling"
primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
- # Pre-scale reintent check: Check if this sub-operation has been executed before
+ # Pre-scale retry check: Check if this sub-operation has been executed before
op_index = self._check_or_add_scale_suboperation(
db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
- if (op_index == self.SUBOPERATION_STATUS_SKIP):
+ if op_index == self.SUBOPERATION_STATUS_SKIP:
# Skip sub-operation
result = 'COMPLETED'
result_detail = 'Done'
"vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
vnf_config_primitive, result, result_detail))
else:
- if (op_index == self.SUBOPERATION_STATUS_NEW):
+ if op_index == self.SUBOPERATION_STATUS_NEW:
# New sub-operation: Get index of this sub-operation
op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
format(vnf_config_primitive))
else:
- # Reintent: Get registered params for this existing sub-operation
+ # retry: Get registered params for this existing sub-operation
op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
vnf_index = op.get('member_vnf_index')
vnf_config_primitive = op.get('primitive')
primitive_params = op.get('primitive_params')
- self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
+ self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
format(vnf_config_primitive))
- # Execute the primitive, either with new (first-time) or registered (reintent) args
+ # Execute the primitive, either with new (first-time) or registered (retry) args
result, result_detail = await self._ns_execute_primitive(
self._look_for_deployed_vca(nsr_deployed["VCA"],
member_vnf_index=vnf_index,
vdu_id=None,
- vdu_name=None,
vdu_count_index=None),
vnf_config_primitive, primitive_params)
self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
# if (RO_nsr_id and RO_scaling_info):
if RO_scaling_info:
scale_process = "RO"
- # Scale RO reintent check: Check if this sub-operation has been executed before
+ # Scale RO retry check: Check if this sub-operation has been executed before
op_index = self._check_or_add_scale_suboperation(
db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
- if (op_index == self.SUBOPERATION_STATUS_SKIP):
+ if op_index == self.SUBOPERATION_STATUS_SKIP:
# Skip sub-operation
result = 'COMPLETED'
result_detail = 'Done'
self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
result, result_detail))
else:
- if (op_index == self.SUBOPERATION_STATUS_NEW):
+ if op_index == self.SUBOPERATION_STATUS_NEW:
# New sub-operation: Get index of this sub-operation
op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
self.logger.debug(logging_text + "New sub-operation RO")
else:
- # Reintent: Get registered params for this existing sub-operation
+ # retry: Get registered params for this existing sub-operation
op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
RO_nsr_id = op.get('RO_nsr_id')
RO_scaling_info = op.get('RO_scaling_info')
- self.logger.debug(logging_text + "Sub-operation RO reintent".format(
+ self.logger.debug(logging_text + "Sub-operation RO retry for primitive {}".format(
vnf_config_primitive))
RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
db_nsr_update["config-status"] = "configuring post-scaling"
primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
- # Post-scale reintent check: Check if this sub-operation has been executed before
+ # Post-scale retry check: Check if this sub-operation has been executed before
op_index = self._check_or_add_scale_suboperation(
db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
if op_index == self.SUBOPERATION_STATUS_SKIP:
self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
format(vnf_config_primitive))
else:
- # Reintent: Get registered params for this existing sub-operation
+ # retry: Get registered params for this existing sub-operation
op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
vnf_index = op.get('member_vnf_index')
vnf_config_primitive = op.get('primitive')
primitive_params = op.get('primitive_params')
- self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
+ self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
format(vnf_config_primitive))
- # Execute the primitive, either with new (first-time) or registered (reintent) args
+ # Execute the primitive, either with new (first-time) or registered (retry) args
result, result_detail = await self._ns_execute_primitive(
self._look_for_deployed_vca(nsr_deployed["VCA"],
member_vnf_index=vnf_index,
vdu_id=None,
- vdu_name=None,
vdu_count_index=None),
vnf_config_primitive, primitive_params)
self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
scale_process = None
# POST-SCALE END
- db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
- db_nslcmop_update["statusEnteredTime"] = time()
- db_nslcmop_update["detailed-status"] = "done"
db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
else old_operational_status
current_operation_id=None
)
if exc:
- if db_nslcmop:
- db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
- db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
- db_nslcmop_update["statusEnteredTime"] = time()
+ db_nslcmop_update["detailed-status"] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
+ nslcmop_operation_state = "FAILED"
if db_nsr:
db_nsr_update["operational-status"] = old_operational_status
db_nsr_update["config-status"] = old_config_status
db_nsr_update["operational-status"] = "failed"
db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
exc)
- try:
- if db_nslcmop and db_nslcmop_update:
- self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
- if db_nsr:
- self._write_ns_status(
- nsr_id=nsr_id,
- ns_state=None,
- current_operation="IDLE",
- current_operation_id=None,
- other_update=db_nsr_update
- )
+ else:
+ error_description_nslcmop = None
+ nslcmop_operation_state = "COMPLETED"
+ db_nslcmop_update["detailed-status"] = "Done"
+
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage="",
+ error_message=error_description_nslcmop,
+ operation_state=nslcmop_operation_state,
+ other_update=db_nslcmop_update,
+ )
+ if db_nsr:
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=None,
+ current_operation="IDLE",
+ current_operation_id=None,
+ other_update=db_nsr_update
+ )
- except DbException as e:
- self.logger.error(logging_text + "Cannot update database: {}".format(e))
if nslcmop_operation_state:
try:
await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,