from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
from osm_lcm import ROclient
+from osm_lcm.ng_ro import NgRoClient, NgRoException
from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
from n2vc.k8s_helm_conn import K8sHelmConnector
from n2vc.k8s_juju_conn import K8sJujuConnector
from n2vc.n2vc_juju_conn import N2VCJujuConnector
from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
+from osm_lcm.lcm_helm_conn import LCMHelmConn
+
from copy import copy, deepcopy
from http import HTTPStatus
from time import time
from uuid import uuid4
-__author__ = "Alfonso Tierno"
+from random import randint
+
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+
+
+class N2VCJujuConnectorLCM(N2VCJujuConnector):
+
+ async def create_execution_environment(self, namespace: str, db_dict: dict, reuse_ee_id: str = None,
+ progress_timeout: float = None, total_timeout: float = None,
+ config: dict = None, artifact_path: str = None,
+ vca_type: str = None) -> (str, dict):
+ # admit two new parameters, artifact_path and vca_type
+ if vca_type == "k8s_proxy_charm":
+ ee_id = await self.install_k8s_proxy_charm(
+ charm_name=artifact_path[artifact_path.rfind("/") + 1:],
+ namespace=namespace,
+ artifact_path=artifact_path,
+ db_dict=db_dict)
+ return ee_id, None
+ else:
+ return await super().create_execution_environment(
+ namespace=namespace, db_dict=db_dict, reuse_ee_id=reuse_ee_id,
+ progress_timeout=progress_timeout, total_timeout=total_timeout)
+
+ async def install_configuration_sw(self, ee_id: str, artifact_path: str, db_dict: dict,
+ progress_timeout: float = None, total_timeout: float = None,
+ config: dict = None, num_units: int = 1, vca_type: str = "lxc_proxy_charm"):
+ if vca_type == "k8s_proxy_charm":
+ return
+ return await super().install_configuration_sw(
+ ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict, progress_timeout=progress_timeout,
+ total_timeout=total_timeout, config=config, num_units=num_units)
class NsLcm(LcmBase):
timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
timeout_charm_delete = 10 * 60
- timeout_primitive = 10 * 60 # timeout for primitive execution
- timeout_progress_primitive = 2 * 60 # timeout for some progress in a primitive execution
+ timeout_primitive = 30 * 60 # timeout for primitive execution
+ timeout_progress_primitive = 10 * 60 # timeout for some progress in a primitive execution
SUBOPERATION_STATUS_NOT_FOUND = -1
SUBOPERATION_STATUS_NEW = -2
SUBOPERATION_STATUS_SKIP = -3
task_name_deploy_vca = "Deploying VCA"
- def __init__(self, db, msg, fs, lcm_tasks, config, loop):
+ def __init__(self, db, msg, fs, lcm_tasks, config, loop, prometheus=None):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
self.lcm_tasks = lcm_tasks
self.timeout = config["timeout"]
self.ro_config = config["ro_config"]
+ self.ng_ro = config["ro_config"].get("ng")
self.vca_config = config["VCA"].copy()
# create N2VC connector
- self.n2vc = N2VCJujuConnector(
+ self.n2vc = N2VCJujuConnectorLCM(
db=self.db,
fs=self.fs,
log=self.logger,
on_update_db=self._on_update_n2vc_db
)
+ self.conn_helm_ee = LCMHelmConn(
+ db=self.db,
+ fs=self.fs,
+ log=self.logger,
+ loop=self.loop,
+ url=None,
+ username=None,
+ vca_config=self.vca_config,
+ on_update_db=self._on_update_n2vc_db
+ )
+
self.k8sclusterhelm = K8sHelmConnector(
kubectl_command=self.vca_config.get("kubectlpath"),
helm_command=self.vca_config.get("helmpath"),
"juju-bundle": self.k8sclusterjuju,
"juju": self.k8sclusterjuju,
}
+
+ self.vca_map = {
+ "lxc_proxy_charm": self.n2vc,
+ "native_charm": self.n2vc,
+ "k8s_proxy_charm": self.n2vc,
+ "helm": self.conn_helm_ee
+ }
+
+ self.prometheus = prometheus
+
# create RO client
- self.RO = ROclient.ROClient(self.loop, **self.ro_config)
+ if self.ng_ro:
+ self.RO = NgRoClient(self.loop, **self.ro_config)
+ else:
+ self.RO = ROclient.ROClient(self.loop, **self.ro_config)
def _on_update_ro_db(self, nsrs_id, ro_descriptor):
raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
format(vnfd["id"], vdu["id"], e))
- def _ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
+ def _ns_params_2_RO(self, ns_params, nsd, vnfd_dict, db_vnfrs, n2vc_key_list):
"""
Creates a RO ns descriptor from OSM ns_instantiate params
:param ns_params: OSM instantiate params
+ :param vnfd_dict: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
+ :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index. {member-vnf-index: {vnfr_object}, ...}
:return: The RO ns descriptor
"""
vim_2_RO = {}
"wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
# "scenario": ns_params["nsdId"],
}
+ # set vim_account of each vnf if different from general vim_account.
+ # Get this information from <vnfr> database content, key vim-account-id
+ # Vim account can be set by placement_engine and it may be different from
+ # the instantiate parameters (vnfs.member-vnf-index.datacenter).
+ for vnf_index, vnfr in db_vnfrs.items():
+ if vnfr.get("vim-account-id") and vnfr["vim-account-id"] != ns_params["vimAccountId"]:
+ populate_dict(RO_ns_params, ("vnfs", vnf_index, "datacenter"), vim_account_2_RO(vnfr["vim-account-id"]))
n2vc_key_list = n2vc_key_list or []
for vnfd_ref, vnfd in vnfd_dict.items():
else:
raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
"constituent-vnfd".format(vnf_params["member-vnf-index"]))
- if vnf_params.get("vimAccountId"):
- populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
- vim_account_2_RO(vnf_params["vimAccountId"]))
for vdu_params in get_iterable(vnf_params, "vdu"):
# TODO feature 1417: check that this VDU exist and it is not a PDU
return ns_config_info
@staticmethod
- def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed):
+ def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed, ee_descriptor_id):
"""
Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
primitives as verify-ssh-credentials, or config when needed
:param desc_primitive_list: information of the descriptor
:param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
this element contains a ssh public key
+ :param ee_descriptor_id: execution environment descriptor id. It is the value of
+ XXX_configuration.execution-environment-list.INDEX.id; it can be None
:return: The modified list. Can ba an empty list, but always a list
"""
- if desc_primitive_list:
- primitive_list = desc_primitive_list.copy()
- else:
- primitive_list = []
+
+ primitive_list = desc_primitive_list or []
+
+ # filter primitives by ee_id
+ primitive_list = [p for p in primitive_list if p.get("execution-environment-ref") == ee_descriptor_id]
+
+ # sort by 'seq'
+ if primitive_list:
+ primitive_list.sort(key=lambda val: int(val['seq']))
+
# look for primitive config, and get the position. None if not present
config_position = None
for index, primitive in enumerate(primitive_list):
if not vca_deployed["member-vnf-index"] and config_position is None:
primitive_list.insert(0, {"name": "config", "parameter": []})
config_position = 0
- # for VNF/VDU add verify-ssh-credentials after config
+ # TODO revise if needed: for VNF/VDU add verify-ssh-credentials after config
if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
return primitive_list
+ async def _instantiate_ng_ro(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref,
+ n2vc_key_list, stage, start_deploy, timeout_ns_deploy):
+ nslcmop_id = db_nslcmop["_id"]
+ target = {
+ "name": db_nsr["name"],
+ "ns": {"vld": []},
+ "vnf": [],
+ "image": deepcopy(db_nsr["image"]),
+ "flavor": deepcopy(db_nsr["flavor"]),
+ "action_id": nslcmop_id,
+ }
+ for image in target["image"]:
+ image["vim_info"] = []
+ for flavor in target["flavor"]:
+ flavor["vim_info"] = []
+
+ ns_params = db_nslcmop.get("operationParams")
+ ssh_keys = []
+ if ns_params.get("ssh_keys"):
+ ssh_keys += ns_params.get("ssh_keys")
+ if n2vc_key_list:
+ ssh_keys += n2vc_key_list
+
+ cp2target = {}
+ for vld_index, vld in enumerate(nsd.get("vld")):
+ target_vld = {"id": vld["id"],
+ "name": vld["name"],
+ "mgmt-network": vld.get("mgmt-network", False),
+ "type": vld.get("type"),
+ "vim_info": [{"vim-network-name": vld.get("vim-network-name"),
+ "vim_account_id": ns_params["vimAccountId"]}],
+ }
+ for cp in vld["vnfd-connection-point-ref"]:
+ cp2target["member_vnf:{}.{}".format(cp["member-vnf-index-ref"], cp["vnfd-connection-point-ref"])] = \
+ "nsrs:{}:vld.{}".format(nsr_id, vld_index)
+ target["ns"]["vld"].append(target_vld)
+ for vnfr in db_vnfrs.values():
+ vnfd = db_vnfds_ref[vnfr["vnfd-ref"]]
+ target_vnf = deepcopy(vnfr)
+ for vld in target_vnf.get("vld", ()):
+ # check if connected to a ns.vld
+ vnf_cp = next((cp for cp in vnfd.get("connection-point", ()) if
+ cp.get("internal-vld-ref") == vld["id"]), None)
+ if vnf_cp:
+ ns_cp = "member_vnf:{}.{}".format(vnfr["member-vnf-index-ref"], vnf_cp["id"])
+ if cp2target.get(ns_cp):
+ vld["target"] = cp2target[ns_cp]
+ vld["vim_info"] = [{"vim-network-name": vld.get("vim-network-name"),
+ "vim_account_id": vnfr["vim-account-id"]}]
+
+ for vdur in target_vnf.get("vdur", ()):
+ vdur["vim_info"] = [{"vim_account_id": vnfr["vim-account-id"]}]
+ vdud_index, vdud = next(k for k in enumerate(vnfd["vdu"]) if k[1]["id"] == vdur["vdu-id-ref"])
+ # vdur["additionalParams"] = vnfr.get("additionalParamsForVnf") # TODO additional params for VDU
+
+ if ssh_keys:
+ if deep_get(vdud, ("vdu-configuration", "config-access", "ssh-access", "required")):
+ vdur["ssh-keys"] = ssh_keys
+ vdur["ssh-access-required"] = True
+ elif deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required")) and \
+ any(iface.get("mgmt-vnf") for iface in vdur["interfaces"]):
+ vdur["ssh-keys"] = ssh_keys
+ vdur["ssh-access-required"] = True
+
+ # cloud-init
+ if vdud.get("cloud-init-file"):
+ vdur["cloud-init"] = "{}:file:{}".format(vnfd["_id"], vdud.get("cloud-init-file"))
+ elif vdud.get("cloud-init"):
+ vdur["cloud-init"] = "{}:vdu:{}".format(vnfd["_id"], vdud_index)
+
+ # flavor
+ ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
+ if not next((vi for vi in ns_flavor["vim_info"] if
+ vi and vi.get("vim_account_id") == vnfr["vim-account-id"]), None):
+ ns_flavor["vim_info"].append({"vim_account_id": vnfr["vim-account-id"]})
+ # image
+ ns_image = target["image"][int(vdur["ns-image-id"])]
+ if not next((vi for vi in ns_image["vim_info"] if
+ vi and vi.get("vim_account_id") == vnfr["vim-account-id"]), None):
+ ns_image["vim_info"].append({"vim_account_id": vnfr["vim-account-id"]})
+
+ vdur["vim_info"] = [{"vim_account_id": vnfr["vim-account-id"]}]
+ target["vnf"].append(target_vnf)
+
+ desc = await self.RO.deploy(nsr_id, target)
+ action_id = desc["action_id"]
+ await self._wait_ng_ro(self, nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage)
+
+ # Updating NSR
+ db_nsr_update = {
+ "_admin.deployed.RO.operational-status": "running",
+ "detailed-status": " ".join(stage)
+ }
+ # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self._write_op_status(nslcmop_id, stage)
+ self.logger.debug(logging_text + "ns deployed at RO. RO_id={}".format(action_id))
+ return
+
+ async def _wait_ng_ro(self, nsr_id, action_id, nslcmop_id, start_time, timeout, stage):
+ detailed_status_old = None
+ db_nsr_update = {}
+ while time() <= start_time + timeout:
+ desc_status = await self.RO.status(nsr_id, action_id)
+ if desc_status["status"] == "FAILED":
+ raise NgRoException(desc_status["details"])
+ elif desc_status["status"] == "BUILD":
+ stage[2] = "VIM: ({})".format(desc_status["details"])
+ elif desc_status["status"] == "DONE":
+ stage[2] = "Deployed at VIM"
+ break
+ else:
+ assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status["status"])
+ if stage[2] != detailed_status_old:
+ detailed_status_old = stage[2]
+ db_nsr_update["detailed-status"] = " ".join(stage)
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self._write_op_status(nslcmop_id, stage)
+ await asyncio.sleep(5, loop=self.loop)
+ else: # timeout_ns_deploy
+ raise NgRoException("Timeout waiting ns to deploy")
+
+ async def _terminate_ng_ro(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
+ db_nsr_update = {}
+ failed_detail = []
+ action_id = None
+ start_deploy = time()
+ try:
+ target = {
+ "ns": {"vld": []},
+ "vnf": [],
+ "image": [],
+ "flavor": [],
+ }
+ desc = await self.RO.deploy(nsr_id, target)
+ action_id = desc["action_id"]
+ db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
+ db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
+ self.logger.debug(logging_text + "ns terminate action at RO. action_id={}".format(action_id))
+
+ # wait until done
+ delete_timeout = 20 * 60 # 20 minutes
+ await self._wait_ng_ro(self, nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage)
+
+ db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
+ db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
+ # delete all nsr
+ await self.RO.delete(nsr_id)
+ except Exception as e:
+ if isinstance(e, NgRoException) and e.http_code == 404: # not found
+ db_nsr_update["_admin.deployed.RO.nsr_id"] = None
+ db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
+ db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
+ self.logger.debug(logging_text + "RO_action_id={} already deleted".format(action_id))
+ elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
+ failed_detail.append("delete conflict: {}".format(e))
+ self.logger.debug(logging_text + "RO_action_id={} delete conflict: {}".format(action_id, e))
+ else:
+ failed_detail.append("delete error: {}".format(e))
+ self.logger.error(logging_text + "RO_action_id={} delete error: {}".format(action_id, e))
+
+ if failed_detail:
+ stage[2] = "Error deleting from VIM"
+ else:
+ stage[2] = "Deleted from VIM"
+ db_nsr_update["detailed-status"] = " ".join(stage)
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self._write_op_status(nslcmop_id, stage)
+
+ if failed_detail:
+ raise LcmException("; ".join(failed_detail))
+ return
+
async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref,
n2vc_key_list, stage):
+ """
+ Instantiate at RO
+ :param logging_text: preffix text to use at logging
+ :param nsr_id: nsr identity
+ :param nsd: database content of ns descriptor
+ :param db_nsr: database content of ns record
+ :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
+ :param db_vnfrs:
+ :param db_vnfds_ref: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
+ :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
+ :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
+ :return: None or exception
+ """
try:
db_nsr_update = {}
RO_descriptor_number = 0 # number of descriptors created at RO
# Check for and optionally request placement optimization. Database will be updated if placement activated
stage[2] = "Waiting for Placement."
- await self._do_placement(logging_text, db_nslcmop, db_vnfrs)
+ if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
+ # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
+ for vnfr in db_vnfrs.values():
+ if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
+ break
+ else:
+ ns_params["vimAccountId"] == vnfr["vim-account-id"]
+ if self.ng_ro:
+ return await self._instantiate_ng_ro(logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs,
+ db_vnfds_ref, n2vc_key_list, stage, start_deploy,
+ timeout_ns_deploy)
# deploy RO
-
# get vnfds, instantiate at RO
for c_vnf in nsd.get("constituent-vnfd", ()):
member_vnf_index = c_vnf["member-vnf-index"]
await asyncio.wait(task_dependency, timeout=3600)
stage[2] = "Checking instantiation parameters."
- RO_ns_params = self._ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
+ RO_ns_params = self._ns_params_2_RO(ns_params, nsd, db_vnfds_ref, db_vnfrs, n2vc_key_list)
stage[2] = "Deploying ns at VIM."
db_nsr_update["detailed-status"] = " ".join(stage)
self.update_db_2("nsrs", nsr_id, db_nsr_update)
self._write_op_status(nslcmop_id, stage)
# await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
# self.logger.debug(logging_text + "Deployed at VIM")
- except (ROclient.ROClientException, LcmException, DbException) as e:
+ except (ROclient.ROClientException, LcmException, DbException, NgRoException) as e:
stage[2] = "ERROR deploying at VIM"
self.set_vnfr_at_error(db_vnfrs, str(e))
raise
return ip_address
try:
ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
- result_dict = await self.RO.create_action(
- item="ns",
- item_id_name=ro_nsr_id,
- descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
- )
- # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
- if not result_dict or not isinstance(result_dict, dict):
- raise LcmException("Unknown response from RO when injecting key")
- for result in result_dict.values():
- if result.get("vim_result") == 200:
- break
- else:
- raise ROclient.ROClientException("error injecting key: {}".format(
- result.get("description")))
- break
+ if self.ng_ro:
+ target = {"action": "inject_ssh_key", "key": pub_key, "user": user,
+ "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdu_id}]}],
+ }
+ await self.RO.deploy(nsr_id, target)
+ else:
+ result_dict = await self.RO.create_action(
+ item="ns",
+ item_id_name=ro_nsr_id,
+ descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
+ )
+ # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
+ if not result_dict or not isinstance(result_dict, dict):
+ raise LcmException("Unknown response from RO when injecting key")
+ for result in result_dict.values():
+ if result.get("vim_result") == 200:
+ break
+ else:
+ raise ROclient.ROClientException("error injecting key: {}".format(
+ result.get("description")))
+ break
+ except NgRoException as e:
+ raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
except ROclient.ROClientException as e:
if not nb_tries:
self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
raise LcmException("Configuration aborted because dependent charm/s timeout")
async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
- config_descriptor, deploy_params, base_folder, nslcmop_id, stage):
+ config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name,
+ ee_config_descriptor):
nsr_id = db_nsr["_id"]
db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
+ osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
db_dict = {
'collection': 'nsrs',
'filter': {'_id': nsr_id},
vnfr_id = None
if db_vnfr:
vnfr_id = db_vnfr["_id"]
+ osm_config["osm"]["vnf_id"] = vnfr_id
namespace = "{nsi}.{ns}".format(
nsi=nsi_id if nsi_id else "",
namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
element_type = 'VDU'
element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
+ osm_config["osm"]["vdu_id"] = vdu_id
elif kdu_name:
namespace += ".{}".format(kdu_name)
element_type = 'KDU'
element_under_configuration = kdu_name
+ osm_config["osm"]["kdu_name"] = kdu_name
# Get artifact path
- self.fs.sync() # Sync from FSMongo
- artifact_path = "{}/{}/charms/{}".format(
+ artifact_path = "{}/{}/{}/{}".format(
base_folder["folder"],
base_folder["pkg-dir"],
- config_descriptor["juju"]["charm"]
+ "charms" if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
+ vca_name
)
+ # get initial_config_primitive_list that applies to this element
+ initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
- is_proxy_charm = deep_get(config_descriptor, ('juju', 'charm')) is not None
- if deep_get(config_descriptor, ('juju', 'proxy')) is False:
- is_proxy_charm = False
+ # add config if not present for NS charm
+ ee_descriptor_id = ee_config_descriptor.get("id")
+ initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
+ vca_deployed, ee_descriptor_id)
# n2vc_redesign STEP 3.1
-
# find old ee_id if exists
ee_id = vca_deployed.get("ee_id")
# create or register execution environment in VCA
- if is_proxy_charm:
+ if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm"):
self._write_configuration_status(
nsr_id=nsr_id,
step = "create execution environment"
self.logger.debug(logging_text + step)
- ee_id, credentials = await self.n2vc.create_execution_environment(namespace=namespace,
- reuse_ee_id=ee_id,
- db_dict=db_dict)
-
- else:
+ ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
+ namespace=namespace,
+ reuse_ee_id=ee_id,
+ db_dict=db_dict,
+ config=osm_config,
+ artifact_path=artifact_path,
+ vca_type=vca_type)
+
+ elif vca_type == "native_charm":
step = "Waiting to VM being up and getting IP address"
self.logger.debug(logging_text + step)
rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
# TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
# merged. Meanwhile let's get username from initial-config-primitive
- if not username and config_descriptor.get("initial-config-primitive"):
- for config_primitive in config_descriptor["initial-config-primitive"]:
+ if not username and initial_config_primitive_list:
+ for config_primitive in initial_config_primitive_list:
for param in config_primitive.get("parameter", ()):
if param["name"] == "ssh-username":
username = param["value"]
break
if not username:
- raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with "
+ raise LcmException("Cannot determine the username neither with 'initial-config-primitive' nor with "
"'config-access.ssh-access.default-user'")
credentials["username"] = username
# n2vc_redesign STEP 3.2
step = "register execution environment {}".format(credentials)
self.logger.debug(logging_text + step)
- ee_id = await self.n2vc.register_execution_environment(credentials=credentials, namespace=namespace,
- db_dict=db_dict)
+ ee_id = await self.vca_map[vca_type].register_execution_environment(
+ credentials=credentials, namespace=namespace, db_dict=db_dict)
# for compatibility with MON/POL modules, the need model and application name at database
- # TODO ask to N2VC instead of assuming the format "model_name.application_name"
+ # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
ee_id_parts = ee_id.split('.')
- model_name = ee_id_parts[0]
- application_name = ee_id_parts[1]
- db_nsr_update = {db_update_entry + "model": model_name,
- db_update_entry + "application": application_name,
- db_update_entry + "ee_id": ee_id}
+ db_nsr_update = {db_update_entry + "ee_id": ee_id}
+ if len(ee_id_parts) >= 2:
+ model_name = ee_id_parts[0]
+ application_name = ee_id_parts[1]
+ db_nsr_update[db_update_entry + "model"] = model_name
+ db_nsr_update[db_update_entry + "application"] = application_name
# n2vc_redesign STEP 3.3
-
step = "Install configuration Software"
self._write_configuration_status(
# TODO check if already done
self.logger.debug(logging_text + step)
config = None
- if not is_proxy_charm:
- initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
- if initial_config_primitive_list:
- for primitive in initial_config_primitive_list:
- if primitive["name"] == "config":
- config = self._map_primitive_params(
- primitive,
- {},
- deploy_params
- )
+ if vca_type == "native_charm":
+ config_primitive = next((p for p in initial_config_primitive_list if p["name"] == "config"), None)
+ if config_primitive:
+ config = self._map_primitive_params(
+ config_primitive,
+ {},
+ deploy_params
+ )
+ num_units = 1
+ if vca_type == "lxc_proxy_charm":
+ if element_type == "NS":
+ num_units = db_nsr.get("config-units") or 1
+ elif element_type == "VNF":
+ num_units = db_vnfr.get("config-units") or 1
+ elif element_type == "VDU":
+ for v in db_vnfr["vdur"]:
+ if vdu_id == v["vdu-id-ref"]:
+ num_units = v.get("config-units") or 1
break
- await self.n2vc.install_configuration_sw(
+
+ await self.vca_map[vca_type].install_configuration_sw(
ee_id=ee_id,
artifact_path=artifact_path,
db_dict=db_dict,
- config=config
+ config=config,
+ num_units=num_units,
+ vca_type=vca_type
)
# write in db flag of configuration_sw already installed
self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
# add relations for this VCA (wait for other peers related with this VCA)
- await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id, vca_index=vca_index)
+ await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id,
+ vca_index=vca_index, vca_type=vca_type)
# if SSH access is required, then get execution environment SSH public
- if is_proxy_charm: # if native charm we have waited already to VM be UP
+ # if native charm we have waited already to VM be UP
+ if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm"):
pub_key = None
user = None
+ # self.logger.debug("get ssh key block")
if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
+ # self.logger.debug("ssh key needed")
# Needed to inject a ssh key
user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
step = "Install configuration Software, getting public ssh key"
- pub_key = await self.n2vc.get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
+ pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
else:
+ # self.logger.debug("no need to get ssh key")
step = "Waiting to VM being up and getting IP address"
self.logger.debug(logging_text + step)
# n2vc_redesign STEP 6 Execute initial config primitive
step = 'execute initial config primitive'
- initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
-
- # sort initial config primitives by 'seq'
- if initial_config_primitive_list:
- try:
- initial_config_primitive_list.sort(key=lambda val: int(val['seq']))
- except Exception as e:
- self.logger.error(logging_text + step + ": " + str(e))
- else:
- self.logger.debug(logging_text + step + ": No initial-config-primitive")
-
- # add config if not present for NS charm
- initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
- vca_deployed)
# wait for dependent primitives execution (NS -> VNF -> VDU)
if initial_config_primitive_list:
step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
self.logger.debug(logging_text + step)
- await self.n2vc.exec_primitive(
+ await self.vca_map[vca_type].exec_primitive(
ee_id=ee_id,
primitive_name=initial_config_primitive["name"],
params_dict=primitive_params_,
# TODO register in database that primitive is done
+ # STEP 7 Configure metrics
+ if vca_type == "helm":
+ prometheus_jobs = await self.add_prometheus_metrics(
+ ee_id=ee_id,
+ artifact_path=artifact_path,
+ ee_config_descriptor=ee_config_descriptor,
+ vnfr_id=vnfr_id,
+ nsr_id=nsr_id,
+ target_ip=rw_mgmt_ip,
+ )
+ if prometheus_jobs:
+ self.update_db_2("nsrs", nsr_id, {db_update_entry + "prometheus_jobs": prometheus_jobs})
+
step = "instantiated at VCA"
self.logger.debug(logging_text + step)
:param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
:param db_nslcmop: database content of nslcmop
:param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
- :return: None. Modifies database vnfrs and parameter db_vnfr with the computed 'vim-account-id'
+ :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
+ computed 'vim-account-id'
"""
+ modified = False
nslcmop_id = db_nslcmop['_id']
placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
if placement_engine == "PLA":
vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
if not pla_vnf.get('vimAccountId') or not vnfr:
continue
+ modified = True
self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
# Modifies db_vnfrs
vnfr["vim-account-id"] = pla_vnf['vimAccountId']
- return
+ return modified
def update_nsrs_with_pla_result(self, params):
try:
# wait for any previous tasks in process
await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
+ stage[1] = "Sync filesystem from database"
+ self.fs.sync() # TODO, make use of partial sync, only for the needed packages
+
# STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
- stage[1] = "Reading from database,"
+ stage[1] = "Reading from database"
# nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
db_nsr_update["detailed-status"] = "creating"
db_nsr_update["operational-status"] = "init"
# read from db: ns
stage[1] = "Getting nsr={} from db".format(nsr_id)
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
- # nsd is replicated into ns (no db read)
- nsd = db_nsr["nsd"]
+ stage[1] = "Getting nsd={} from db".format(db_nsr["nsd-id"])
+ nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
+ db_nsr["nsd"] = nsd
# nsr_name = db_nsr["name"] # TODO short-name??
# read from db: vnf's of this ns
db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
+
# if we haven't this vnfd, read it from db
if vnfd_id not in db_vnfds:
# read from db
# set state to INSTANTIATED. When instantiated NBI will not delete directly
db_nsr_update["_admin.nsState"] = "INSTANTIATED"
self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"})
# n2vc_redesign STEP 2 Deploy Network Scenario
stage[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
deploy_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy())
descriptor_config = vnfd.get("vnf-configuration")
- if descriptor_config and descriptor_config.get("juju"):
+ if descriptor_config:
self._deploy_n2vc(
logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
db_nsr=db_nsr,
deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
else:
deploy_params_vdu = deploy_params
- if descriptor_config and descriptor_config.get("juju"):
+ if descriptor_config:
# look for vdu index in the db_vnfr["vdu"] section
# for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
# if vdur["vdu-id-ref"] == vdu_id:
for kdud in get_iterable(vnfd, 'kdu'):
kdu_name = kdud["name"]
descriptor_config = kdud.get('kdu-configuration')
- if descriptor_config and descriptor_config.get("juju"):
+ if descriptor_config:
vdu_id = None
vdu_index = 0
vdu_name = None
error_detail=error_detail,
other_update=db_nsr_update
)
- if db_nslcmop:
- self._write_op_status(
- op_id=nslcmop_id,
- stage="",
- error_message=error_description_nslcmop,
- operation_state=nslcmop_operation_state,
- other_update=db_nslcmop_update,
- )
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage="",
+ error_message=error_description_nslcmop,
+ operation_state=nslcmop_operation_state,
+ other_update=db_nslcmop_update,
+ )
if nslcmop_operation_state:
try:
self.logger.debug(logging_text + "Exit")
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
- async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int, timeout: int = 3600) -> bool:
+ async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int,
+ timeout: int = 3600, vca_type: str = None) -> bool:
# steps:
# 1. find all relations for this VCA
# 3. add relations
try:
+ vca_type = vca_type or "lxc_proxy_charm"
# STEP 1: find all relations for this VCA
# read nsr record
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
# this VCA data
my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
# read all ns-configuration relations
ns_relations = list()
- db_ns_relations = deep_get(db_nsr, ('nsd', 'ns-configuration', 'relation'))
+ db_ns_relations = deep_get(nsd, ('ns-configuration', 'relation'))
if db_ns_relations:
for r in db_ns_relations:
# check if this VCA is in the relation
to_vca_endpoint = r.get('entities')[1].get('endpoint')
if from_vca_ee_id and to_vca_ee_id:
# add relation
- await self.n2vc.add_relation(
+ await self.vca_map[vca_type].add_relation(
ee_id_1=from_vca_ee_id,
ee_id_2=to_vca_ee_id,
endpoint_1=from_vca_endpoint,
to_vca_endpoint = r.get('entities')[1].get('endpoint')
if from_vca_ee_id and to_vca_ee_id:
# add relation
- await self.n2vc.add_relation(
+ await self.vca_map[vca_type].add_relation(
ee_id_1=from_vca_ee_id,
ee_id_2=to_vca_ee_id,
endpoint_1=from_vca_endpoint,
self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
return False
+ async def _install_kdu(self, nsr_id: str, nsr_db_path: str, vnfr_data: dict, kdu_index: int, kdur: dict, kdud: dict,
+ vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600):
+
+ try:
+ k8sclustertype = k8s_instance_info["k8scluster-type"]
+ # Instantiate kdu
+ db_dict_install = {"collection": "nsrs",
+ "filter": {"_id": nsr_id},
+ "path": nsr_db_path}
+
+ kdu_instance = await self.k8scluster_map[k8sclustertype].install(
+ cluster_uuid=k8s_instance_info["k8scluster-uuid"],
+ kdu_model=k8s_instance_info["kdu-model"],
+ atomic=True,
+ params=k8params,
+ db_dict=db_dict_install,
+ timeout=timeout,
+ kdu_name=k8s_instance_info["kdu-name"],
+ namespace=k8s_instance_info["namespace"])
+ self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
+
+ # Obtain services to obtain management service ip
+ services = await self.k8scluster_map[k8sclustertype].get_services(
+ cluster_uuid=k8s_instance_info["k8scluster-uuid"],
+ kdu_instance=kdu_instance,
+ namespace=k8s_instance_info["namespace"])
+
+ # Obtain management service info (if exists)
+ if services:
+ vnfr_update_dict = {"kdur.{}.services".format(kdu_index): services}
+ mgmt_services = [service for service in kdud.get("service", []) if service.get("mgmt-service")]
+ for mgmt_service in mgmt_services:
+ for service in services:
+ if service["name"].startswith(mgmt_service["name"]):
+ # Mgmt service found, Obtain service ip
+ ip = service.get("external_ip", service.get("cluster_ip"))
+ if isinstance(ip, list) and len(ip) == 1:
+ ip = ip[0]
+
+ vnfr_update_dict["kdur.{}.ip-address".format(kdu_index)] = ip
+
+ # Check if must update also mgmt ip at the vnf
+ service_external_cp = mgmt_service.get("external-connection-point-ref")
+ if service_external_cp:
+ if deep_get(vnfd, ("mgmt-interface", "cp")) == service_external_cp:
+ vnfr_update_dict["ip-address"] = ip
+
+ break
+ else:
+ self.logger.warn("Mgmt service name: {} not found".format(mgmt_service["name"]))
+
+ self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
+
+ except Exception as e:
+ # Prepare update db with error and raise exception
+ try:
+ self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)})
+ except Exception:
+ # ignore to keep original exception
+ pass
+ # reraise original error
+ raise
+
+ return kdu_instance
+
async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
# Launch kdus if present in the descriptor
k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
- def _get_cluster_id(cluster_id, cluster_type):
+ async def _get_cluster_id(cluster_id, cluster_type):
nonlocal k8scluster_id_2_uuic
if cluster_id in k8scluster_id_2_uuic[cluster_type]:
return k8scluster_id_2_uuic[cluster_type][cluster_id]
+ # check if K8scluster is creating and wait look if previous tasks in process
+ task_name, task_dependency = self.lcm_tasks.lookfor_related("k8scluster", cluster_id)
+ if task_dependency:
+ text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(task_name, cluster_id)
+ self.logger.debug(logging_text + text)
+ await asyncio.wait(task_dependency, timeout=3600)
+
db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
if not db_k8scluster:
raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
+
k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
if not k8s_id:
- raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type))
+ raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id,
+ cluster_type))
k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
return k8s_id
updated_cluster_list = []
for vnfr_data in db_vnfrs.values():
- for kdur in get_iterable(vnfr_data, "kdur"):
+ for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
+ # Step 0: Prepare and set parameters
desc_params = self._format_additional_params(kdur.get("additionalParams"))
vnfd_id = vnfr_data.get('vnfd-id')
+ kdud = next(kdud for kdud in db_vnfds[vnfd_id]["kdu"] if kdud["name"] == kdur["kdu-name"])
namespace = kdur.get("k8s-namespace")
if kdur.get("helm-chart"):
kdumodel = kdur["helm-chart"]
storage = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage'))
if storage and storage.get('pkg-dir'): # may be not present if vnfd has not artifacts
# path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
- filename = '{}/{}/{}s/{}'.format(storage["folder"], storage["'pkg-dir"], k8sclustertype,
+ filename = '{}/{}/{}s/{}'.format(storage["folder"], storage["pkg-dir"], k8sclustertype,
kdumodel)
if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
kdumodel = self.fs.path + filename
k8s_cluster_id = kdur["k8s-cluster"]["id"]
step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
- cluster_uuid = _get_cluster_id(k8s_cluster_id, k8sclustertype)
+ cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
+ # Synchronize repos
if k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list:
del_repo_list, added_repo_dict = await asyncio.ensure_future(
self.k8sclusterhelm.synchronize_repos(cluster_uuid=cluster_uuid))
self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
updated_cluster_list.append(cluster_uuid)
+ # Instantiate kdu
step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
kdur["kdu-name"], k8s_cluster_id)
-
- k8s_instace_info = {"kdu-instance": None,
- "k8scluster-uuid": cluster_uuid,
- "k8scluster-type": k8sclustertype,
- "member-vnf-index": vnfr_data["member-vnf-index-ref"],
- "kdu-name": kdur["kdu-name"],
- "kdu-model": kdumodel,
- "namespace": namespace}
- db_nsr_update["_admin.deployed.K8s.{}".format(index)] = k8s_instace_info
+ k8s_instance_info = {"kdu-instance": None,
+ "k8scluster-uuid": cluster_uuid,
+ "k8scluster-type": k8sclustertype,
+ "member-vnf-index": vnfr_data["member-vnf-index-ref"],
+ "kdu-name": kdur["kdu-name"],
+ "kdu-model": kdumodel,
+ "namespace": namespace}
+ db_path = "_admin.deployed.K8s.{}".format(index)
+ db_nsr_update[db_path] = k8s_instance_info
self.update_db_2("nsrs", nsr_id, db_nsr_update)
- db_dict = {"collection": "nsrs",
- "filter": {"_id": nsr_id},
- "path": "_admin.deployed.K8s.{}".format(index)}
-
task = asyncio.ensure_future(
- self.k8scluster_map[k8sclustertype].install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
- atomic=True, params=desc_params,
- db_dict=db_dict, timeout=600,
- kdu_name=kdur["kdu-name"], namespace=namespace))
-
+ self._install_kdu(nsr_id, db_path, vnfr_data, kdu_index, kdur, kdud, db_vnfds[vnfd_id],
+ k8s_instance_info, k8params=desc_params, timeout=600))
self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
# launch instantiate_N2VC in a asyncio task and register task object
# Look where information of this charm is at database <nsrs>._admin.deployed.VCA
# if not found, create one entry and update database
-
# fill db_nsr._admin.deployed.VCA.<index>
- vca_index = -1
- for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
- if not vca_deployed:
- continue
- if vca_deployed.get("member-vnf-index") == member_vnf_index and \
- vca_deployed.get("vdu_id") == vdu_id and \
- vca_deployed.get("kdu_name") == kdu_name and \
- vca_deployed.get("vdu_count_index", 0) == vdu_index:
- break
- else:
- # not found, create one.
- vca_deployed = {
- "member-vnf-index": member_vnf_index,
- "vdu_id": vdu_id,
- "kdu_name": kdu_name,
- "vdu_count_index": vdu_index,
- "operational-status": "init", # TODO revise
- "detailed-status": "", # TODO revise
- "step": "initial-deploy", # TODO revise
- "vnfd_id": vnfd_id,
- "vdu_name": vdu_name,
- }
- vca_index += 1
-
- # create VCA and configurationStatus in db
- db_dict = {
- "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
- "configurationStatus.{}".format(vca_index): dict()
- }
- self.update_db_2("nsrs", nsr_id, db_dict)
- db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
+ self.logger.debug(logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id))
+ if descriptor_config.get("juju"): # There is one execution envioronment of type juju
+ ee_list = [descriptor_config]
+ elif descriptor_config.get("execution-environment-list"):
+ ee_list = descriptor_config.get("execution-environment-list")
+ else: # other types as script are not supported
+ ee_list = []
+
+ for ee_item in ee_list:
+ self.logger.debug(logging_text + "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item.get('juju'),
+ ee_item.get("helm-chart")))
+ ee_descriptor_id = ee_item.get("id")
+ if ee_item.get("juju"):
+ vca_name = ee_item['juju'].get('charm')
+ vca_type = "lxc_proxy_charm" if ee_item['juju'].get('charm') is not None else "native_charm"
+ if ee_item['juju'].get('cloud') == "k8s":
+ vca_type = "k8s_proxy_charm"
+ elif ee_item['juju'].get('proxy') is False:
+ vca_type = "native_charm"
+ elif ee_item.get("helm-chart"):
+ vca_name = ee_item['helm-chart']
+ vca_type = "helm"
+ else:
+ self.logger.debug(logging_text + "skipping non juju neither charm configuration")
+ continue
- # Launch task
- task_n2vc = asyncio.ensure_future(
- self.instantiate_N2VC(
- logging_text=logging_text,
- vca_index=vca_index,
- nsi_id=nsi_id,
- db_nsr=db_nsr,
- db_vnfr=db_vnfr,
- vdu_id=vdu_id,
- kdu_name=kdu_name,
- vdu_index=vdu_index,
- deploy_params=deploy_params,
- config_descriptor=descriptor_config,
- base_folder=base_folder,
- nslcmop_id=nslcmop_id,
- stage=stage
+ vca_index = -1
+ for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
+ if not vca_deployed:
+ continue
+ if vca_deployed.get("member-vnf-index") == member_vnf_index and \
+ vca_deployed.get("vdu_id") == vdu_id and \
+ vca_deployed.get("kdu_name") == kdu_name and \
+ vca_deployed.get("vdu_count_index", 0) == vdu_index and \
+ vca_deployed.get("ee_descriptor_id") == ee_descriptor_id:
+ break
+ else:
+ # not found, create one.
+ target = "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
+ if vdu_id:
+ target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
+ elif kdu_name:
+ target += "/kdu/{}".format(kdu_name)
+ vca_deployed = {
+ "target_element": target,
+ # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
+ "member-vnf-index": member_vnf_index,
+ "vdu_id": vdu_id,
+ "kdu_name": kdu_name,
+ "vdu_count_index": vdu_index,
+ "operational-status": "init", # TODO revise
+ "detailed-status": "", # TODO revise
+ "step": "initial-deploy", # TODO revise
+ "vnfd_id": vnfd_id,
+ "vdu_name": vdu_name,
+ "type": vca_type,
+ "ee_descriptor_id": ee_descriptor_id
+ }
+ vca_index += 1
+
+ # create VCA and configurationStatus in db
+ db_dict = {
+ "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
+ "configurationStatus.{}".format(vca_index): dict()
+ }
+ self.update_db_2("nsrs", nsr_id, db_dict)
+
+ db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
+
+ # Launch task
+ task_n2vc = asyncio.ensure_future(
+ self.instantiate_N2VC(
+ logging_text=logging_text,
+ vca_index=vca_index,
+ nsi_id=nsi_id,
+ db_nsr=db_nsr,
+ db_vnfr=db_vnfr,
+ vdu_id=vdu_id,
+ kdu_name=kdu_name,
+ vdu_index=vdu_index,
+ deploy_params=deploy_params,
+ config_descriptor=descriptor_config,
+ base_folder=base_folder,
+ nslcmop_id=nslcmop_id,
+ stage=stage,
+ vca_type=vca_type,
+ vca_name=vca_name,
+ ee_config_descriptor=ee_item
+ )
)
- )
- self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
- task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format(
- member_vnf_index or "", vdu_id or "")
-
- # Check if this VNFD has a configured terminate action
- def _has_terminate_config_primitive(self, vnfd):
- vnf_config = vnfd.get("vnf-configuration")
- if vnf_config and vnf_config.get("terminate-config-primitive"):
- return True
- else:
- return False
+ self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
+ task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format(
+ member_vnf_index or "", vdu_id or "")
@staticmethod
- def _get_terminate_config_primitive_seq_list(vnfd):
- """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
- # No need to check for existing primitive twice, already done before
- vnf_config = vnfd.get("vnf-configuration")
- seq_list = vnf_config.get("terminate-config-primitive")
- # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
- seq_list_sorted = sorted(seq_list, key=lambda x: int(x['seq']))
- return seq_list_sorted
+ def _get_terminate_config_primitive(primitive_list, vca_deployed):
+ """ Get a sorted terminate config primitive list. In case ee_descriptor_id is present at vca_deployed,
+ it get only those primitives for this execution envirom"""
+
+ primitive_list = primitive_list or []
+ # filter primitives by ee_descriptor_id
+ ee_descriptor_id = vca_deployed.get("ee_descriptor_id")
+ primitive_list = [p for p in primitive_list if p.get("execution-environment-ref") == ee_descriptor_id]
+
+ if primitive_list:
+ primitive_list.sort(key=lambda val: int(val['seq']))
+
+ return primitive_list
@staticmethod
def _create_nslcmop(nsr_id, operation, params):
# _ns_execute_primitive() or RO.create_action() will NOT be executed
return self.SUBOPERATION_STATUS_SKIP
else:
- # c. Reintent executing sub-operation
+ # c. retry executing sub-operation
# The sub-operation exists, and operationState != 'COMPLETED'
- # Update operationState = 'PROCESSING' to indicate a reintent.
+ # Update operationState = 'PROCESSING' to indicate a retry.
operationState = 'PROCESSING'
detailed_status = 'In progress'
self._update_suboperation_status(
# Find a sub-operation where all keys in a matching dictionary must match
# Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
def _find_suboperation(self, db_nslcmop, match):
- if (db_nslcmop and match):
+ if db_nslcmop and match:
op_list = db_nslcmop.get('_admin', {}).get('operations', [])
for i, op in enumerate(op_list):
if all(op.get(k) == match[k] for k in match):
# Check for 3 different cases:
# a. New: First time execution, return SUBOPERATION_STATUS_NEW
# b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
- # c. Reintent: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
+ # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
operationType, RO_nsr_id=None, RO_scaling_info=None):
# Find this sub-operation
- if (RO_nsr_id and RO_scaling_info):
+ if RO_nsr_id and RO_scaling_info:
operationType = 'SCALE-RO'
match = {
'member_vnf_index': vnf_index,
if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
return vca["ee_id"]
- async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor, vca_index, destroy_ee=True):
+ async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor,
+ vca_index, destroy_ee=True, exec_primitives=True):
"""
Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
:param logging_text:
:param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
:param vca_index: index in the database _admin.deployed.VCA
:param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
+ :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
+ not executed properly
:return: None or exception
"""
+
+ self.logger.debug(
+ logging_text + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
+ vca_index, vca_deployed, config_descriptor, destroy_ee
+ )
+ )
+
+ vca_type = vca_deployed.get("type", "lxc_proxy_charm")
+
# execute terminate_primitives
- terminate_primitives = config_descriptor.get("terminate-config-primitive")
- vdu_id = vca_deployed.get("vdu_id")
- vdu_count_index = vca_deployed.get("vdu_count_index")
- vdu_name = vca_deployed.get("vdu_name")
- vnf_index = vca_deployed.get("member-vnf-index")
- if terminate_primitives and vca_deployed.get("needed_terminate"):
- # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
- terminate_primitives = sorted(terminate_primitives, key=lambda x: int(x['seq']))
- for seq in terminate_primitives:
- # For each sequence in list, get primitive and call _ns_execute_primitive()
- step = "Calling terminate action for vnf_member_index={} primitive={}".format(
- vnf_index, seq.get("name"))
- self.logger.debug(logging_text + step)
- # Create the primitive for each sequence, i.e. "primitive": "touch"
- primitive = seq.get('name')
- mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
- # The following 3 parameters are currently set to None for 'terminate':
- # vdu_id, vdu_count_index, vdu_name
-
- # Add sub-operation
- self._add_suboperation(db_nslcmop,
- vnf_index,
- vdu_id,
- vdu_count_index,
- vdu_name,
- primitive,
- mapped_primitive_params)
- # Sub-operations: Call _ns_execute_primitive() instead of action()
- try:
- result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive,
- mapped_primitive_params)
- except LcmException:
- # this happens when VCA is not deployed. In this case it is not needed to terminate
- continue
- result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
- if result not in result_ok:
- raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
- "error {}".format(seq.get("name"), vnf_index, result_detail))
- # set that this VCA do not need terminated
- db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
- self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
+ if exec_primitives:
+ terminate_primitives = self._get_terminate_config_primitive(
+ config_descriptor.get("terminate-config-primitive"), vca_deployed)
+ vdu_id = vca_deployed.get("vdu_id")
+ vdu_count_index = vca_deployed.get("vdu_count_index")
+ vdu_name = vca_deployed.get("vdu_name")
+ vnf_index = vca_deployed.get("member-vnf-index")
+ if terminate_primitives and vca_deployed.get("needed_terminate"):
+ for seq in terminate_primitives:
+ # For each sequence in list, get primitive and call _ns_execute_primitive()
+ step = "Calling terminate action for vnf_member_index={} primitive={}".format(
+ vnf_index, seq.get("name"))
+ self.logger.debug(logging_text + step)
+ # Create the primitive for each sequence, i.e. "primitive": "touch"
+ primitive = seq.get('name')
+ mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
+
+ # Add sub-operation
+ self._add_suboperation(db_nslcmop,
+ vnf_index,
+ vdu_id,
+ vdu_count_index,
+ vdu_name,
+ primitive,
+ mapped_primitive_params)
+ # Sub-operations: Call _ns_execute_primitive() instead of action()
+ try:
+ result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive,
+ mapped_primitive_params,
+ vca_type=vca_type)
+ except LcmException:
+ # this happens when VCA is not deployed. In this case it is not needed to terminate
+ continue
+ result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
+ if result not in result_ok:
+ raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
+ "error {}".format(seq.get("name"), vnf_index, result_detail))
+ # set that this VCA do not need terminated
+ db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
+ self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
+
+ if vca_deployed.get("prometheus_jobs") and self.prometheus:
+ await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
if destroy_ee:
- await self.n2vc.delete_execution_environment(vca_deployed["ee_id"])
+ await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"])
async def _delete_all_N2VC(self, db_nsr: dict):
self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING')
timeout_ns_terminate = self.timeout_ns_terminate
db_nsr = None
db_nslcmop = None
+ operation_params = None
exc = None
error_list = [] # annotates all failed error messages
db_nslcmop_update = {}
# Destroy individual execution environments when there are terminating primitives.
# Rest of EE will be deleted at once
- if not operation_params.get("skip_terminate_primitives"):
- stage[0] = "Stage 2/3 execute terminating primitives."
- stage[1] = "Looking execution environment that needs terminate."
- self.logger.debug(logging_text + stage[1])
- for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
- config_descriptor = None
- if not vca or not vca.get("ee_id") or not vca.get("needed_terminate"):
- continue
- if not vca.get("member-vnf-index"):
- # ns
- config_descriptor = db_nsr.get("ns-configuration")
- elif vca.get("vdu_id"):
- db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
- vdud = next((vdu for vdu in db_vnfd.get("vdu", ()) if vdu["id"] == vca.get("vdu_id")), None)
- if vdud:
- config_descriptor = vdud.get("vdu-configuration")
- elif vca.get("kdu_name"):
- db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
- kdud = next((kdu for kdu in db_vnfd.get("kdu", ()) if kdu["name"] == vca.get("kdu_name")), None)
- if kdud:
- config_descriptor = kdud.get("kdu-configuration")
- else:
- config_descriptor = db_vnfds_from_member_index[vca["member-vnf-index"]].get("vnf-configuration")
- task = asyncio.ensure_future(self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor,
- vca_index, False))
- tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
-
- # wait for pending tasks of terminate primitives
- if tasks_dict_info:
- self.logger.debug(logging_text + 'Waiting for terminate primitive pending tasks...')
- error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
- min(self.timeout_charm_delete, timeout_ns_terminate),
- stage, nslcmop_id)
- if error_list:
- return # raise LcmException("; ".join(error_list))
- tasks_dict_info.clear()
+ # TODO - check before calling _destroy_N2VC
+ # if not operation_params.get("skip_terminate_primitives"):#
+ # or not vca.get("needed_terminate"):
+ stage[0] = "Stage 2/3 execute terminating primitives."
+ self.logger.debug(logging_text + stage[0])
+ stage[1] = "Looking execution environment that needs terminate."
+ self.logger.debug(logging_text + stage[1])
+ # self.logger.debug("nsr_deployed: {}".format(nsr_deployed))
+ for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
+ self.logger.debug("vca_index: {}, vca: {}".format(vca_index, vca))
+ config_descriptor = None
+ if not vca or not vca.get("ee_id"):
+ continue
+ if not vca.get("member-vnf-index"):
+ # ns
+ config_descriptor = db_nsr.get("ns-configuration")
+ elif vca.get("vdu_id"):
+ db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
+ vdud = next((vdu for vdu in db_vnfd.get("vdu", ()) if vdu["id"] == vca.get("vdu_id")), None)
+ if vdud:
+ config_descriptor = vdud.get("vdu-configuration")
+ elif vca.get("kdu_name"):
+ db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
+ kdud = next((kdu for kdu in db_vnfd.get("kdu", ()) if kdu["name"] == vca.get("kdu_name")), None)
+ if kdud:
+ config_descriptor = kdud.get("kdu-configuration")
+ else:
+ config_descriptor = db_vnfds_from_member_index[vca["member-vnf-index"]].get("vnf-configuration")
+ vca_type = vca.get("type")
+ exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
+ vca.get("needed_terminate"))
+ # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
+ # pending native charms
+ destroy_ee = "True" if vca_type in ("helm", "native_charm") else "False"
+ task = asyncio.ensure_future(
+ self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, vca_index,
+ destroy_ee, exec_terminate_primitives))
+ tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
+
+ # wait for pending tasks of terminate primitives
+ if tasks_dict_info:
+ self.logger.debug(logging_text + 'Waiting for terminate primitive pending tasks...')
+ error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
+ min(self.timeout_charm_delete, timeout_ns_terminate),
+ stage, nslcmop_id)
+ if error_list:
+ return # raise LcmException("; ".join(error_list))
+ tasks_dict_info.clear()
# remove All execution environments at once
stage[0] = "Stage 3/3 delete all."
# remove from RO
stage[1] = "Deleting ns from VIM."
- task_delete_ro = asyncio.ensure_future(
- self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
+ if self.ng_ro:
+ task_delete_ro = asyncio.ensure_future(
+ self._terminate_ng_ro(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
+ else:
+ task_delete_ro = asyncio.ensure_future(
+ self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
# rest of staff will be done at finally
error_detail=error_detail,
other_update=db_nsr_update
)
- if db_nslcmop:
- self._write_op_status(
- op_id=nslcmop_id,
- stage="",
- error_message=error_description_nslcmop,
- operation_state=nslcmop_operation_state,
- other_update=db_nslcmop_update,
- )
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage="",
+ error_message=error_description_nslcmop,
+ operation_state=nslcmop_operation_state,
+ other_update=db_nslcmop_update,
+ )
+ if ns_state == "NOT_INSTANTIATED":
+ try:
+ self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "NOT_INSTANTIATED"})
+ except DbException as e:
+ self.logger.warn(logging_text + 'Error writing VNFR status for nsr-id-ref: {} -> {}'.
+ format(nsr_id, e))
+ if operation_params:
autoremove = operation_params.get("autoremove", False)
if nslcmop_operation_state:
try:
new_error = created_tasks_info[task] + ": {}".format(exc)
error_list.append(created_tasks_info[task])
error_detail_list.append(new_error)
- if isinstance(exc, (str, DbException, N2VCException, ROclient.ROClientException, LcmException)):
+ if isinstance(exc, (str, DbException, N2VCException, ROclient.ROClientException, LcmException,
+ K8sException)):
self.logger.error(logging_text + new_error)
else:
exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
return calculated_params
- def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_count_index, kdu_name=None):
+ def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_count_index, kdu_name=None,
+ ee_descriptor_id=None):
# find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
for vca in deployed_vca:
if not vca:
continue
if kdu_name and kdu_name != vca["kdu_name"]:
continue
+ if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
+ continue
break
else:
# vca_deployed not found
- raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} is not "
- "deployed".format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
+ raise LcmException("charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
+ " is not deployed".format(member_vnf_index, vdu_id, vdu_count_index, kdu_name,
+ ee_descriptor_id))
# get ee_id
ee_id = vca.get("ee_id")
+ vca_type = vca.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm
if not ee_id:
raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
"execution environment"
.format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
- return ee_id
+ return ee_id, vca_type
async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0,
- retries_interval=30, timeout=None) -> (str, str):
+ retries_interval=30, timeout=None,
+ vca_type=None, db_dict=None) -> (str, str):
try:
if primitive == "config":
primitive_params = {"params": primitive_params}
+ vca_type = vca_type or "lxc_proxy_charm"
+
while retries >= 0:
try:
output = await asyncio.wait_for(
- self.n2vc.exec_primitive(
+ self.vca_map[vca_type].exec_primitive(
ee_id=ee_id,
primitive_name=primitive,
params_dict=primitive_params,
progress_timeout=self.timeout_progress_primitive,
- total_timeout=self.timeout_primitive),
+ total_timeout=self.timeout_primitive,
+ db_dict=db_dict),
timeout=timeout or self.timeout_primitive)
# execution was OK
break
self.update_db_2("nsrs", nsr_id, db_nsr_update)
# look for primitive
- config_primitive_desc = None
+ config_primitive_desc = descriptor_configuration = None
if vdu_id:
for vdu in get_iterable(db_vnfd, "vdu"):
if vdu_id == vdu["id"]:
- for config_primitive in deep_get(vdu, ("vdu-configuration", "config-primitive"), ()):
- if config_primitive["name"] == primitive:
- config_primitive_desc = config_primitive
- break
+ descriptor_configuration = vdu.get("vdu-configuration")
break
elif kdu_name:
for kdu in get_iterable(db_vnfd, "kdu"):
if kdu_name == kdu["name"]:
- for config_primitive in deep_get(kdu, ("kdu-configuration", "config-primitive"), ()):
- if config_primitive["name"] == primitive:
- config_primitive_desc = config_primitive
- break
+ descriptor_configuration = kdu.get("kdu-configuration")
break
elif vnf_index:
- for config_primitive in deep_get(db_vnfd, ("vnf-configuration", "config-primitive"), ()):
- if config_primitive["name"] == primitive:
- config_primitive_desc = config_primitive
- break
+ descriptor_configuration = db_vnfd.get("vnf-configuration")
else:
- for config_primitive in deep_get(db_nsd, ("ns-configuration", "config-primitive"), ()):
+ descriptor_configuration = db_nsd.get("ns-configuration")
+
+ if descriptor_configuration and descriptor_configuration.get("config-primitive"):
+ for config_primitive in descriptor_configuration["config-primitive"]:
if config_primitive["name"] == primitive:
config_primitive_desc = config_primitive
break
- if not config_primitive_desc and not (kdu_name and primitive in ("upgrade", "rollback", "status")):
- raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
- format(primitive))
+ if not config_primitive_desc:
+ if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
+ raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
+ format(primitive))
+ primitive_name = primitive
+ ee_descriptor_id = None
+ else:
+ primitive_name = config_primitive_desc.get("execution-environment-primitive", primitive)
+ ee_descriptor_id = config_primitive_desc.get("execution-environment-ref")
if vnf_index:
if vdu_id:
else:
desc_params = self._format_additional_params(db_nsr.get("additionalParamsForNs"))
+ if kdu_name:
+ kdu_action = True if not deep_get(kdu, ("kdu-configuration", "juju")) else False
+
# TODO check if ns is in a proper status
- if kdu_name and primitive in ("upgrade", "rollback", "status"):
+ if kdu_name and (primitive_name in ("upgrade", "rollback", "status") or kdu_action):
# kdur and desc_params already set from before
if primitive_params:
desc_params.update(primitive_params)
db_dict = {"collection": "nsrs",
"filter": {"_id": nsr_id},
"path": "_admin.deployed.K8s.{}".format(index)}
- self.logger.debug(logging_text + "Exec k8s {} on {}.{}".format(primitive, vnf_index, kdu_name))
- step = "Executing kdu {}".format(primitive)
- if primitive == "upgrade":
+ self.logger.debug(logging_text + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name))
+ step = "Executing kdu {}".format(primitive_name)
+ if primitive_name == "upgrade":
if desc_params.get("kdu_model"):
kdu_model = desc_params.get("kdu_model")
del desc_params["kdu_model"]
timeout=timeout_ns_action),
timeout=timeout_ns_action + 10)
self.logger.debug(logging_text + " Upgrade of kdu {} done".format(detailed_status))
- elif primitive == "rollback":
+ elif primitive_name == "rollback":
detailed_status = await asyncio.wait_for(
self.k8scluster_map[kdu["k8scluster-type"]].rollback(
cluster_uuid=kdu.get("k8scluster-uuid"),
kdu_instance=kdu.get("kdu-instance"),
db_dict=db_dict),
timeout=timeout_ns_action)
- elif primitive == "status":
+ elif primitive_name == "status":
detailed_status = await asyncio.wait_for(
self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
cluster_uuid=kdu.get("k8scluster-uuid"),
kdu_instance=kdu.get("kdu-instance")),
timeout=timeout_ns_action)
+ else:
+ kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(kdu["kdu-name"], nsr_id)
+ params = self._map_primitive_params(config_primitive_desc, primitive_params, desc_params)
+
+ detailed_status = await asyncio.wait_for(
+ self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
+ cluster_uuid=kdu.get("k8scluster-uuid"),
+ kdu_instance=kdu_instance,
+ primitive_name=primitive_name,
+ params=params, db_dict=db_dict,
+ timeout=timeout_ns_action),
+ timeout=timeout_ns_action)
if detailed_status:
nslcmop_operation_state = 'COMPLETED'
else:
detailed_status = ''
nslcmop_operation_state = 'FAILED'
-
else:
+ ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
+ member_vnf_index=vnf_index,
+ vdu_id=vdu_id,
+ vdu_count_index=vdu_count_index,
+ ee_descriptor_id=ee_descriptor_id)
+ db_nslcmop_notif = {"collection": "nslcmops",
+ "filter": {"_id": nslcmop_id},
+ "path": "admin.VCA"}
nslcmop_operation_state, detailed_status = await self._ns_execute_primitive(
- self._look_for_deployed_vca(nsr_deployed["VCA"],
- member_vnf_index=vnf_index,
- vdu_id=vdu_id,
- vdu_count_index=vdu_count_index),
- primitive=primitive,
+ ee_id,
+ primitive=primitive_name,
primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params),
- timeout=timeout_ns_action)
+ timeout=timeout_ns_action,
+ vca_type=vca_type,
+ db_dict=db_nslcmop_notif)
db_nslcmop_update["detailed-status"] = detailed_status
error_description_nslcmop = detailed_status if nslcmop_operation_state == "FAILED" else ""
other_update=db_nsr_update
)
- if db_nslcmop:
- self._write_op_status(
- op_id=nslcmop_id,
- stage="",
- error_message=error_description_nslcmop,
- operation_state=nslcmop_operation_state,
- other_update=db_nslcmop_update,
- )
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage="",
+ error_message=error_description_nslcmop,
+ operation_state=nslcmop_operation_state,
+ other_update=db_nslcmop_update,
+ )
if nslcmop_operation_state:
try:
raise LcmException(
"Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
"[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
- "primitive".format(scaling_group, config_primitive))
+ "primitive".format(scaling_group, vnf_config_primitive))
vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
if db_vnfr.get("additionalParamsForVnf"):
db_nsr_update["config-status"] = "configuring pre-scaling"
primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
- # Pre-scale reintent check: Check if this sub-operation has been executed before
+ # Pre-scale retry check: Check if this sub-operation has been executed before
op_index = self._check_or_add_scale_suboperation(
db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
- if (op_index == self.SUBOPERATION_STATUS_SKIP):
+ if op_index == self.SUBOPERATION_STATUS_SKIP:
# Skip sub-operation
result = 'COMPLETED'
result_detail = 'Done'
"vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
vnf_config_primitive, result, result_detail))
else:
- if (op_index == self.SUBOPERATION_STATUS_NEW):
+ if op_index == self.SUBOPERATION_STATUS_NEW:
# New sub-operation: Get index of this sub-operation
op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
format(vnf_config_primitive))
else:
- # Reintent: Get registered params for this existing sub-operation
+ # retry: Get registered params for this existing sub-operation
op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
vnf_index = op.get('member_vnf_index')
vnf_config_primitive = op.get('primitive')
primitive_params = op.get('primitive_params')
- self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
+ self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
format(vnf_config_primitive))
# Execute the primitive, either with new (first-time) or registered (reintent) args
+ ee_descriptor_id = config_primitive.get("execution-environment-ref")
+ primitive_name = config_primitive.get("execution-environment-primitive",
+ vnf_config_primitive)
+ ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
+ member_vnf_index=vnf_index,
+ vdu_id=None,
+ vdu_count_index=None,
+ ee_descriptor_id=ee_descriptor_id)
result, result_detail = await self._ns_execute_primitive(
- self._look_for_deployed_vca(nsr_deployed["VCA"],
- member_vnf_index=vnf_index,
- vdu_id=None,
- vdu_count_index=None),
- vnf_config_primitive, primitive_params)
+ ee_id, primitive_name, primitive_params, vca_type)
self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
vnf_config_primitive, result, result_detail))
# Update operationState = COMPLETED | FAILED
# if (RO_nsr_id and RO_scaling_info):
if RO_scaling_info:
scale_process = "RO"
- # Scale RO reintent check: Check if this sub-operation has been executed before
+ # Scale RO retry check: Check if this sub-operation has been executed before
op_index = self._check_or_add_scale_suboperation(
db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
- if (op_index == self.SUBOPERATION_STATUS_SKIP):
+ if op_index == self.SUBOPERATION_STATUS_SKIP:
# Skip sub-operation
result = 'COMPLETED'
result_detail = 'Done'
self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
result, result_detail))
else:
- if (op_index == self.SUBOPERATION_STATUS_NEW):
+ if op_index == self.SUBOPERATION_STATUS_NEW:
# New sub-operation: Get index of this sub-operation
op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
self.logger.debug(logging_text + "New sub-operation RO")
else:
- # Reintent: Get registered params for this existing sub-operation
+ # retry: Get registered params for this existing sub-operation
op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
RO_nsr_id = op.get('RO_nsr_id')
RO_scaling_info = op.get('RO_scaling_info')
- self.logger.debug(logging_text + "Sub-operation RO reintent".format(
+ self.logger.debug(logging_text + "Sub-operation RO retry for primitive {}".format(
vnf_config_primitive))
RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
if config_primitive["name"] == vnf_config_primitive:
break
else:
- raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
- "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
- "match any vnf-configuration:config-primitive".format(scaling_group,
- config_primitive))
+ raise LcmException(
+ "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
+ "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
+ "config-primitive".format(scaling_group, vnf_config_primitive))
scale_process = "VCA"
db_nsr_update["config-status"] = "configuring post-scaling"
primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
- # Post-scale reintent check: Check if this sub-operation has been executed before
+ # Post-scale retry check: Check if this sub-operation has been executed before
op_index = self._check_or_add_scale_suboperation(
db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
if op_index == self.SUBOPERATION_STATUS_SKIP:
self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
format(vnf_config_primitive))
else:
- # Reintent: Get registered params for this existing sub-operation
+ # retry: Get registered params for this existing sub-operation
op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
vnf_index = op.get('member_vnf_index')
vnf_config_primitive = op.get('primitive')
primitive_params = op.get('primitive_params')
- self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
+ self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
format(vnf_config_primitive))
# Execute the primitive, either with new (first-time) or registered (reintent) args
+ ee_descriptor_id = config_primitive.get("execution-environment-ref")
+ primitive_name = config_primitive.get("execution-environment-primitive",
+ vnf_config_primitive)
+ ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
+ member_vnf_index=vnf_index,
+ vdu_id=None,
+ vdu_count_index=None,
+ ee_descriptor_id=ee_descriptor_id)
result, result_detail = await self._ns_execute_primitive(
- self._look_for_deployed_vca(nsr_deployed["VCA"],
- member_vnf_index=vnf_index,
- vdu_id=None,
- vdu_count_index=None),
- vnf_config_primitive, primitive_params)
+ ee_id, primitive_name, primitive_params, vca_type)
self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
vnf_config_primitive, result, result_detail))
# Update operationState = COMPLETED | FAILED
scale_process = None
# POST-SCALE END
- db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
- db_nslcmop_update["statusEnteredTime"] = time()
- db_nslcmop_update["detailed-status"] = "done"
db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
else old_operational_status
current_operation_id=None
)
if exc:
- if db_nslcmop:
- db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
- db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
- db_nslcmop_update["statusEnteredTime"] = time()
+ db_nslcmop_update["detailed-status"] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
+ nslcmop_operation_state = "FAILED"
if db_nsr:
db_nsr_update["operational-status"] = old_operational_status
db_nsr_update["config-status"] = old_config_status
db_nsr_update["operational-status"] = "failed"
db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
exc)
- try:
- if db_nslcmop and db_nslcmop_update:
- self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
- if db_nsr:
- self._write_ns_status(
- nsr_id=nsr_id,
- ns_state=None,
- current_operation="IDLE",
- current_operation_id=None,
- other_update=db_nsr_update
- )
+ else:
+ error_description_nslcmop = None
+ nslcmop_operation_state = "COMPLETED"
+ db_nslcmop_update["detailed-status"] = "Done"
+
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage="",
+ error_message=error_description_nslcmop,
+ operation_state=nslcmop_operation_state,
+ other_update=db_nslcmop_update,
+ )
+ if db_nsr:
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=None,
+ current_operation="IDLE",
+ current_operation_id=None,
+ other_update=db_nsr_update
+ )
- except DbException as e:
- self.logger.error(logging_text + "Cannot update database: {}".format(e))
if nslcmop_operation_state:
try:
await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
self.logger.debug(logging_text + "Exit")
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
+
+ async def add_prometheus_metrics(self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip):
+ if not self.prometheus:
+ return
+ # look if exist a file called 'prometheus*.j2' and
+ artifact_content = self.fs.dir_ls(artifact_path)
+ job_file = next((f for f in artifact_content if f.startswith("prometheus") and f.endswith(".j2")), None)
+ if not job_file:
+ return
+ with self.fs.file_open((artifact_path, job_file), "r") as f:
+ job_data = f.read()
+
+ # TODO get_service
+ _, _, service = ee_id.partition(".") # remove prefix "namespace."
+ host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
+ host_port = "80"
+ vnfr_id = vnfr_id.replace("-", "")
+ variables = {
+ "JOB_NAME": vnfr_id,
+ "TARGET_IP": target_ip,
+ "EXPORTER_POD_IP": host_name,
+ "EXPORTER_POD_PORT": host_port,
+ }
+ job_list = self.prometheus.parse_job(job_data, variables)
+ # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
+ for job in job_list:
+ if not isinstance(job.get("job_name"), str) or vnfr_id not in job["job_name"]:
+ job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
+ job["nsr_id"] = nsr_id
+ job_dict = {jl["job_name"]: jl for jl in job_list}
+ if await self.prometheus.update(job_dict):
+ return list(job_dict.keys())