from http import HTTPStatus
from time import time
from uuid import uuid4
-from functools import partial
+
+from random import randint
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
async def create_execution_environment(self, namespace: str, db_dict: dict, reuse_ee_id: str = None,
progress_timeout: float = None, total_timeout: float = None,
- artifact_path: str = None, vca_type: str = None) -> (str, dict):
+ config: dict = None, artifact_path: str = None,
+ vca_type: str = None) -> (str, dict):
# admit two new parameters, artifact_path and vca_type
if vca_type == "k8s_proxy_charm":
- ee_id = await self.n2vc.install_k8s_proxy_charm(
+ ee_id = await self.install_k8s_proxy_charm(
charm_name=artifact_path[artifact_path.rfind("/") + 1:],
namespace=namespace,
artifact_path=artifact_path,
SUBOPERATION_STATUS_SKIP = -3
task_name_deploy_vca = "Deploying VCA"
- def __init__(self, db, msg, fs, lcm_tasks, config, loop):
+ def __init__(self, db, msg, fs, lcm_tasks, config, loop, prometheus=None):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
"helm": self.conn_helm_ee
}
+ self.prometheus = prometheus
+
# create RO client
if self.ng_ro:
self.RO = NgRoClient(self.loop, **self.ro_config)
return ns_config_info
@staticmethod
- def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed):
+ def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed, ee_descriptor_id):
"""
Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
primitives as verify-ssh-credentials, or config when needed
:param desc_primitive_list: information of the descriptor
:param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
this element contains a ssh public key
+ :param ee_descriptor_id: execution environment descriptor id. It is the value of
+ XXX_configuration.execution-environment-list.INDEX.id; it can be None
:return: The modified list. Can ba an empty list, but always a list
"""
- if desc_primitive_list:
- primitive_list = desc_primitive_list.copy()
- else:
- primitive_list = []
+
+ primitive_list = desc_primitive_list or []
+
+ # filter primitives by ee_id
+ primitive_list = [p for p in primitive_list if p.get("execution-environment-ref") == ee_descriptor_id]
+
+ # sort by 'seq'
+ if primitive_list:
+ primitive_list.sort(key=lambda val: int(val['seq']))
+
# look for primitive config, and get the position. None if not present
config_position = None
for index, primitive in enumerate(primitive_list):
if not vca_deployed["member-vnf-index"] and config_position is None:
primitive_list.insert(0, {"name": "config", "parameter": []})
config_position = 0
- # for VNF/VDU add verify-ssh-credentials after config
+ # TODO revise if needed: for VNF/VDU add verify-ssh-credentials after config
if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
return primitive_list
raise LcmException("Configuration aborted because dependent charm/s timeout")
async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
- config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name):
+ config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name,
+ ee_config_descriptor):
nsr_id = db_nsr["_id"]
db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
+ osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
db_dict = {
'collection': 'nsrs',
'filter': {'_id': nsr_id},
vnfr_id = None
if db_vnfr:
vnfr_id = db_vnfr["_id"]
+ osm_config["osm"]["vnf_id"] = vnfr_id
namespace = "{nsi}.{ns}".format(
nsi=nsi_id if nsi_id else "",
namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
element_type = 'VDU'
element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
+ osm_config["osm"]["vdu_id"] = vdu_id
elif kdu_name:
namespace += ".{}".format(kdu_name)
element_type = 'KDU'
element_under_configuration = kdu_name
+ osm_config["osm"]["kdu_name"] = kdu_name
# Get artifact path
artifact_path = "{}/{}/{}/{}".format(
"charms" if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
vca_name
)
+ # get initial_config_primitive_list that applies to this element
+ initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
- # n2vc_redesign STEP 3.1
+ # add config if not present for NS charm
+ ee_descriptor_id = ee_config_descriptor.get("id")
+ initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
+ vca_deployed, ee_descriptor_id)
+ # n2vc_redesign STEP 3.1
# find old ee_id if exists
ee_id = vca_deployed.get("ee_id")
namespace=namespace,
reuse_ee_id=ee_id,
db_dict=db_dict,
+ config=osm_config,
artifact_path=artifact_path,
vca_type=vca_type)
username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
# TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
# merged. Meanwhile let's get username from initial-config-primitive
- if not username and config_descriptor.get("initial-config-primitive"):
- for config_primitive in config_descriptor["initial-config-primitive"]:
+ if not username and initial_config_primitive_list:
+ for config_primitive in initial_config_primitive_list:
for param in config_primitive.get("parameter", ()):
if param["name"] == "ssh-username":
username = param["value"]
break
if not username:
- raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with "
+ raise LcmException("Cannot determine the username neither with 'initial-config-primitive' nor with "
"'config-access.ssh-access.default-user'")
credentials["username"] = username
# n2vc_redesign STEP 3.2
self.logger.debug(logging_text + step)
config = None
if vca_type == "native_charm":
- initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
- if initial_config_primitive_list:
- for primitive in initial_config_primitive_list:
- if primitive["name"] == "config":
- config = self._map_primitive_params(
- primitive,
- {},
- deploy_params
- )
- break
+ config_primitive = next((p for p in initial_config_primitive_list if p["name"] == "config"), None)
+ if config_primitive:
+ config = self._map_primitive_params(
+ config_primitive,
+ {},
+ deploy_params
+ )
num_units = 1
if vca_type == "lxc_proxy_charm":
if element_type == "NS":
vca_index=vca_index, vca_type=vca_type)
# if SSH access is required, then get execution environment SSH public
- if vca_type in ("lxc_proxy_charm", "helm"): # if native charm we have waited already to VM be UP
+ # if native charm we have waited already to VM be UP
+ if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm"):
pub_key = None
user = None
# self.logger.debug("get ssh key block")
# n2vc_redesign STEP 6 Execute initial config primitive
step = 'execute initial config primitive'
- initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
-
- # sort initial config primitives by 'seq'
- if initial_config_primitive_list:
- try:
- initial_config_primitive_list.sort(key=lambda val: int(val['seq']))
- except Exception as e:
- self.logger.error(logging_text + step + ": " + str(e))
- else:
- self.logger.debug(logging_text + step + ": No initial-config-primitive")
-
- # add config if not present for NS charm
- initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
- vca_deployed)
# wait for dependent primitives execution (NS -> VNF -> VDU)
if initial_config_primitive_list:
# TODO register in database that primitive is done
+ # STEP 7 Configure metrics
+ if vca_type == "helm":
+ prometheus_jobs = await self.add_prometheus_metrics(
+ ee_id=ee_id,
+ artifact_path=artifact_path,
+ ee_config_descriptor=ee_config_descriptor,
+ vnfr_id=vnfr_id,
+ nsr_id=nsr_id,
+ target_ip=rw_mgmt_ip,
+ )
+ if prometheus_jobs:
+ self.update_db_2("nsrs", nsr_id, {db_update_entry + "prometheus_jobs": prometheus_jobs})
+
step = "instantiated at VCA"
self.logger.debug(logging_text + step)
logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
self.logger.debug(logging_text + "Enter")
- # Sync from FSMongo
- self.fs.sync()
-
# get all needed from database
# database nsrs record
# wait for any previous tasks in process
await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
+ stage[1] = "Sync filesystem from database"
+ self.fs.sync() # TODO, make use of partial sync, only for the needed packages
+
# STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
- stage[1] = "Reading from database,"
+ stage[1] = "Reading from database"
# nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
db_nsr_update["detailed-status"] = "creating"
db_nsr_update["operational-status"] = "init"
self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
return False
- def _write_db_callback(self, task, item, _id, on_done=None, on_exc=None):
- """
- callback for kdu install intended to store the returned kdu_instance at database
- :return: None
- """
- db_update = {}
+ async def _install_kdu(self, nsr_id: str, nsr_db_path: str, vnfr_data: dict, kdu_index: int, kdur: dict, kdud: dict,
+ vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600):
+
try:
- result = task.result()
- if on_done:
- db_update[on_done] = str(result)
+ k8sclustertype = k8s_instance_info["k8scluster-type"]
+ # Instantiate kdu
+ db_dict_install = {"collection": "nsrs",
+ "filter": {"_id": nsr_id},
+ "path": nsr_db_path}
+
+ kdu_instance = await self.k8scluster_map[k8sclustertype].install(
+ cluster_uuid=k8s_instance_info["k8scluster-uuid"],
+ kdu_model=k8s_instance_info["kdu-model"],
+ atomic=True,
+ params=k8params,
+ db_dict=db_dict_install,
+ timeout=timeout,
+ kdu_name=k8s_instance_info["kdu-name"],
+ namespace=k8s_instance_info["namespace"])
+ self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
+
+ # Obtain services to obtain management service ip
+ services = await self.k8scluster_map[k8sclustertype].get_services(
+ cluster_uuid=k8s_instance_info["k8scluster-uuid"],
+ kdu_instance=kdu_instance,
+ namespace=k8s_instance_info["namespace"])
+
+ # Obtain management service info (if exists)
+ if services:
+ vnfr_update_dict = {"kdur.{}.services".format(kdu_index): services}
+ mgmt_services = [service for service in kdud.get("service", []) if service.get("mgmt-service")]
+ for mgmt_service in mgmt_services:
+ for service in services:
+ if service["name"].startswith(mgmt_service["name"]):
+ # Mgmt service found, Obtain service ip
+ ip = service.get("external_ip", service.get("cluster_ip"))
+ if isinstance(ip, list) and len(ip) == 1:
+ ip = ip[0]
+
+ vnfr_update_dict["kdur.{}.ip-address".format(kdu_index)] = ip
+
+ # Check if must update also mgmt ip at the vnf
+ service_external_cp = mgmt_service.get("external-connection-point-ref")
+ if service_external_cp:
+ if deep_get(vnfd, ("mgmt-interface", "cp")) == service_external_cp:
+ vnfr_update_dict["ip-address"] = ip
+
+ break
+ else:
+ self.logger.warn("Mgmt service name: {} not found".format(mgmt_service["name"]))
+
+ self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
+
except Exception as e:
- if on_exc:
- db_update[on_exc] = str(e)
- if db_update:
+ # Prepare update db with error and raise exception
try:
- self.update_db_2(item, _id, db_update)
+ self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)})
except Exception:
+ # ignore to keep original exception
pass
+ # reraise original error
+ raise
+
+ return kdu_instance
async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
# Launch kdus if present in the descriptor
k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
- def _get_cluster_id(cluster_id, cluster_type):
+ async def _get_cluster_id(cluster_id, cluster_type):
nonlocal k8scluster_id_2_uuic
if cluster_id in k8scluster_id_2_uuic[cluster_type]:
return k8scluster_id_2_uuic[cluster_type][cluster_id]
+ # check if K8scluster is creating and wait look if previous tasks in process
+ task_name, task_dependency = self.lcm_tasks.lookfor_related("k8scluster", cluster_id)
+ if task_dependency:
+ text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(task_name, cluster_id)
+ self.logger.debug(logging_text + text)
+ await asyncio.wait(task_dependency, timeout=3600)
+
db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
if not db_k8scluster:
raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
+
k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
if not k8s_id:
- raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type))
+ raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id,
+ cluster_type))
k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
return k8s_id
updated_cluster_list = []
for vnfr_data in db_vnfrs.values():
- for kdur in get_iterable(vnfr_data, "kdur"):
+ for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
+ # Step 0: Prepare and set parameters
desc_params = self._format_additional_params(kdur.get("additionalParams"))
vnfd_id = vnfr_data.get('vnfd-id')
+ kdud = next(kdud for kdud in db_vnfds[vnfd_id]["kdu"] if kdud["name"] == kdur["kdu-name"])
namespace = kdur.get("k8s-namespace")
if kdur.get("helm-chart"):
kdumodel = kdur["helm-chart"]
k8s_cluster_id = kdur["k8s-cluster"]["id"]
step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
- cluster_uuid = _get_cluster_id(k8s_cluster_id, k8sclustertype)
+ cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
+ # Synchronize repos
if k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list:
del_repo_list, added_repo_dict = await asyncio.ensure_future(
self.k8sclusterhelm.synchronize_repos(cluster_uuid=cluster_uuid))
self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
updated_cluster_list.append(cluster_uuid)
+ # Instantiate kdu
step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
kdur["kdu-name"], k8s_cluster_id)
-
- k8s_instace_info = {"kdu-instance": None,
- "k8scluster-uuid": cluster_uuid,
- "k8scluster-type": k8sclustertype,
- "member-vnf-index": vnfr_data["member-vnf-index-ref"],
- "kdu-name": kdur["kdu-name"],
- "kdu-model": kdumodel,
- "namespace": namespace}
+ k8s_instance_info = {"kdu-instance": None,
+ "k8scluster-uuid": cluster_uuid,
+ "k8scluster-type": k8sclustertype,
+ "member-vnf-index": vnfr_data["member-vnf-index-ref"],
+ "kdu-name": kdur["kdu-name"],
+ "kdu-model": kdumodel,
+ "namespace": namespace}
db_path = "_admin.deployed.K8s.{}".format(index)
- db_nsr_update[db_path] = k8s_instace_info
+ db_nsr_update[db_path] = k8s_instance_info
self.update_db_2("nsrs", nsr_id, db_nsr_update)
- db_dict = {"collection": "nsrs",
- "filter": {"_id": nsr_id},
- "path": db_path}
-
task = asyncio.ensure_future(
- self.k8scluster_map[k8sclustertype].install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
- atomic=True, params=desc_params,
- db_dict=db_dict, timeout=600,
- kdu_name=kdur["kdu-name"], namespace=namespace))
-
- task.add_done_callback(partial(self._write_db_callback, item="nsrs", _id=nsr_id,
- on_done=db_path + ".kdu-instance",
- on_exc=db_path + ".detailed-status"))
+ self._install_kdu(nsr_id, db_path, vnfr_data, kdu_index, kdur, kdud, db_vnfds[vnfd_id],
+ k8s_instance_info, k8params=desc_params, timeout=600))
self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
for ee_item in ee_list:
self.logger.debug(logging_text + "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item.get('juju'),
ee_item.get("helm-chart")))
+ ee_descriptor_id = ee_item.get("id")
if ee_item.get("juju"):
vca_name = ee_item['juju'].get('charm')
vca_type = "lxc_proxy_charm" if ee_item['juju'].get('charm') is not None else "native_charm"
if vca_deployed.get("member-vnf-index") == member_vnf_index and \
vca_deployed.get("vdu_id") == vdu_id and \
vca_deployed.get("kdu_name") == kdu_name and \
- vca_deployed.get("vdu_count_index", 0) == vdu_index:
+ vca_deployed.get("vdu_count_index", 0) == vdu_index and \
+ vca_deployed.get("ee_descriptor_id") == ee_descriptor_id:
break
else:
# not found, create one.
+ target = "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
+ if vdu_id:
+ target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
+ elif kdu_name:
+ target += "/kdu/{}".format(kdu_name)
vca_deployed = {
+ "target_element": target,
+ # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
"member-vnf-index": member_vnf_index,
"vdu_id": vdu_id,
"kdu_name": kdu_name,
"step": "initial-deploy", # TODO revise
"vnfd_id": vnfd_id,
"vdu_name": vdu_name,
- "type": vca_type
+ "type": vca_type,
+ "ee_descriptor_id": ee_descriptor_id
}
vca_index += 1
nslcmop_id=nslcmop_id,
stage=stage,
vca_type=vca_type,
- vca_name=vca_name
+ vca_name=vca_name,
+ ee_config_descriptor=ee_item
)
)
self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format(
member_vnf_index or "", vdu_id or "")
- # Check if this VNFD has a configured terminate action
- def _has_terminate_config_primitive(self, vnfd):
- vnf_config = vnfd.get("vnf-configuration")
- if vnf_config and vnf_config.get("terminate-config-primitive"):
- return True
- else:
- return False
-
@staticmethod
- def _get_terminate_config_primitive_seq_list(vnfd):
- """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
- # No need to check for existing primitive twice, already done before
- vnf_config = vnfd.get("vnf-configuration")
- seq_list = vnf_config.get("terminate-config-primitive")
- # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
- seq_list_sorted = sorted(seq_list, key=lambda x: int(x['seq']))
- return seq_list_sorted
+ def _get_terminate_config_primitive(primitive_list, vca_deployed):
+ """ Get a sorted terminate config primitive list. In case ee_descriptor_id is present at vca_deployed,
+ it get only those primitives for this execution envirom"""
+
+ primitive_list = primitive_list or []
+ # filter primitives by ee_descriptor_id
+ ee_descriptor_id = vca_deployed.get("ee_descriptor_id")
+ primitive_list = [p for p in primitive_list if p.get("execution-environment-ref") == ee_descriptor_id]
+
+ if primitive_list:
+ primitive_list.sort(key=lambda val: int(val['seq']))
+
+ return primitive_list
@staticmethod
def _create_nslcmop(nsr_id, operation, params):
# execute terminate_primitives
if exec_primitives:
- terminate_primitives = config_descriptor.get("terminate-config-primitive")
+ terminate_primitives = self._get_terminate_config_primitive(
+ config_descriptor.get("terminate-config-primitive"), vca_deployed)
vdu_id = vca_deployed.get("vdu_id")
vdu_count_index = vca_deployed.get("vdu_count_index")
vdu_name = vca_deployed.get("vdu_name")
vnf_index = vca_deployed.get("member-vnf-index")
if terminate_primitives and vca_deployed.get("needed_terminate"):
- # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
- terminate_primitives = sorted(terminate_primitives, key=lambda x: int(x['seq']))
for seq in terminate_primitives:
# For each sequence in list, get primitive and call _ns_execute_primitive()
step = "Calling terminate action for vnf_member_index={} primitive={}".format(
# Create the primitive for each sequence, i.e. "primitive": "touch"
primitive = seq.get('name')
mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
- # The following 3 parameters are currently set to None for 'terminate':
- # vdu_id, vdu_count_index, vdu_name
# Add sub-operation
self._add_suboperation(db_nslcmop,
db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
+ if vca_deployed.get("prometheus_jobs") and self.prometheus:
+ await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
+
if destroy_ee:
await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"])
self.logger.debug(logging_text + stage[0])
stage[1] = "Looking execution environment that needs terminate."
self.logger.debug(logging_text + stage[1])
- self.logger.debug("nsr_deployed: {}".format(nsr_deployed))
+ # self.logger.debug("nsr_deployed: {}".format(nsr_deployed))
for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
self.logger.debug("vca_index: {}, vca: {}".format(vca_index, vca))
config_descriptor = None
config_descriptor = kdud.get("kdu-configuration")
else:
config_descriptor = db_vnfds_from_member_index[vca["member-vnf-index"]].get("vnf-configuration")
- # For helm we must destroy_ee
vca_type = vca.get("type")
exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
vca.get("needed_terminate"))
- self.logger.debug("vca type: {}".format(vca_type))
- if not vca_type == "helm":
- task = asyncio.ensure_future(self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor,
- vca_index, False, exec_terminate_primitives))
- else:
- task = asyncio.ensure_future(self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor,
- vca_index, True, exec_terminate_primitives))
+ # For helm we must destroy_ee
+ destroy_ee = "True" if vca_type == "helm" else "False"
+ task = asyncio.ensure_future(
+ self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, vca_index,
+ destroy_ee, exec_terminate_primitives))
tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
# wait for pending tasks of terminate primitives
calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
return calculated_params
- def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_count_index, kdu_name=None):
+ def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_count_index, kdu_name=None,
+ ee_descriptor_id=None):
# find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
for vca in deployed_vca:
if not vca:
continue
if kdu_name and kdu_name != vca["kdu_name"]:
continue
+ if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
+ continue
break
else:
# vca_deployed not found
- raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} is not "
- "deployed".format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
+ raise LcmException("charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
+ " is not deployed".format(member_vnf_index, vdu_id, vdu_count_index, kdu_name,
+ ee_descriptor_id))
# get ee_id
ee_id = vca.get("ee_id")
self.update_db_2("nsrs", nsr_id, db_nsr_update)
# look for primitive
- config_primitive_desc = None
+ config_primitive_desc = descriptor_configuration = None
if vdu_id:
for vdu in get_iterable(db_vnfd, "vdu"):
if vdu_id == vdu["id"]:
- for config_primitive in deep_get(vdu, ("vdu-configuration", "config-primitive"), ()):
- if config_primitive["name"] == primitive:
- config_primitive_desc = config_primitive
- break
+ descriptor_configuration = vdu.get("vdu-configuration")
break
elif kdu_name:
for kdu in get_iterable(db_vnfd, "kdu"):
if kdu_name == kdu["name"]:
- for config_primitive in deep_get(kdu, ("kdu-configuration", "config-primitive"), ()):
- if config_primitive["name"] == primitive:
- config_primitive_desc = config_primitive
- break
+ descriptor_configuration = kdu.get("kdu-configuration")
break
elif vnf_index:
- for config_primitive in deep_get(db_vnfd, ("vnf-configuration", "config-primitive"), ()):
- if config_primitive["name"] == primitive:
- config_primitive_desc = config_primitive
- break
+ descriptor_configuration = db_vnfd.get("vnf-configuration")
else:
- for config_primitive in deep_get(db_nsd, ("ns-configuration", "config-primitive"), ()):
+ descriptor_configuration = db_nsd.get("ns-configuration")
+
+ if descriptor_configuration and descriptor_configuration.get("config-primitive"):
+ for config_primitive in descriptor_configuration["config-primitive"]:
if config_primitive["name"] == primitive:
config_primitive_desc = config_primitive
break
- if not config_primitive_desc and not (kdu_name and primitive in ("upgrade", "rollback", "status")):
- raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
- format(primitive))
+ if not config_primitive_desc:
+ if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
+ raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
+ format(primitive))
+ primitive_name = primitive
+ ee_descriptor_id = None
+ else:
+ primitive_name = config_primitive_desc.get("execution-environment-primitive", primitive)
+ ee_descriptor_id = config_primitive_desc.get("execution-environment-ref")
if vnf_index:
if vdu_id:
kdu_action = True if not deep_get(kdu, ("kdu-configuration", "juju")) else False
# TODO check if ns is in a proper status
- if kdu_name and (primitive in ("upgrade", "rollback", "status") or kdu_action):
+ if kdu_name and (primitive_name in ("upgrade", "rollback", "status") or kdu_action):
# kdur and desc_params already set from before
if primitive_params:
desc_params.update(primitive_params)
db_dict = {"collection": "nsrs",
"filter": {"_id": nsr_id},
"path": "_admin.deployed.K8s.{}".format(index)}
- self.logger.debug(logging_text + "Exec k8s {} on {}.{}".format(primitive, vnf_index, kdu_name))
- step = "Executing kdu {}".format(primitive)
- if primitive == "upgrade":
+ self.logger.debug(logging_text + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name))
+ step = "Executing kdu {}".format(primitive_name)
+ if primitive_name == "upgrade":
if desc_params.get("kdu_model"):
kdu_model = desc_params.get("kdu_model")
del desc_params["kdu_model"]
timeout=timeout_ns_action),
timeout=timeout_ns_action + 10)
self.logger.debug(logging_text + " Upgrade of kdu {} done".format(detailed_status))
- elif primitive == "rollback":
+ elif primitive_name == "rollback":
detailed_status = await asyncio.wait_for(
self.k8scluster_map[kdu["k8scluster-type"]].rollback(
cluster_uuid=kdu.get("k8scluster-uuid"),
kdu_instance=kdu.get("kdu-instance"),
db_dict=db_dict),
timeout=timeout_ns_action)
- elif primitive == "status":
+ elif primitive_name == "status":
detailed_status = await asyncio.wait_for(
self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
cluster_uuid=kdu.get("k8scluster-uuid"),
self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
cluster_uuid=kdu.get("k8scluster-uuid"),
kdu_instance=kdu_instance,
- primitive_name=primitive,
+ primitive_name=primitive_name,
params=params, db_dict=db_dict,
timeout=timeout_ns_action),
timeout=timeout_ns_action)
ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
member_vnf_index=vnf_index,
vdu_id=vdu_id,
- vdu_count_index=vdu_count_index)
+ vdu_count_index=vdu_count_index,
+ ee_descriptor_id=ee_descriptor_id)
db_nslcmop_notif = {"collection": "nslcmops",
"filter": {"_id": nslcmop_id},
"path": "admin.VCA"}
nslcmop_operation_state, detailed_status = await self._ns_execute_primitive(
ee_id,
- primitive=primitive,
+ primitive=primitive_name,
primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params),
timeout=timeout_ns_action,
vca_type=vca_type,
raise LcmException(
"Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
"[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
- "primitive".format(scaling_group, config_primitive))
+ "primitive".format(scaling_group, vnf_config_primitive))
vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
if db_vnfr.get("additionalParamsForVnf"):
self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
format(vnf_config_primitive))
# Execute the primitive, either with new (first-time) or registered (reintent) args
+ ee_descriptor_id = config_primitive.get("execution-environment-ref")
+ primitive_name = config_primitive.get("execution-environment-primitive",
+ vnf_config_primitive)
ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
member_vnf_index=vnf_index,
vdu_id=None,
- vdu_count_index=None)
+ vdu_count_index=None,
+ ee_descriptor_id=ee_descriptor_id)
result, result_detail = await self._ns_execute_primitive(
- ee_id, vnf_config_primitive, primitive_params, vca_type)
+ ee_id, primitive_name, primitive_params, vca_type)
self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
vnf_config_primitive, result, result_detail))
# Update operationState = COMPLETED | FAILED
if config_primitive["name"] == vnf_config_primitive:
break
else:
- raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
- "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
- "match any vnf-configuration:config-primitive".format(scaling_group,
- config_primitive))
+ raise LcmException(
+ "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
+ "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
+ "config-primitive".format(scaling_group, vnf_config_primitive))
scale_process = "VCA"
db_nsr_update["config-status"] = "configuring post-scaling"
primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
format(vnf_config_primitive))
# Execute the primitive, either with new (first-time) or registered (reintent) args
+ ee_descriptor_id = config_primitive.get("execution-environment-ref")
+ primitive_name = config_primitive.get("execution-environment-primitive",
+ vnf_config_primitive)
ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
member_vnf_index=vnf_index,
vdu_id=None,
- vdu_count_index=None)
+ vdu_count_index=None,
+ ee_descriptor_id=ee_descriptor_id)
result, result_detail = await self._ns_execute_primitive(
- ee_id, vnf_config_primitive, primitive_params, vca_type)
+ ee_id, primitive_name, primitive_params, vca_type)
self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
vnf_config_primitive, result, result_detail))
# Update operationState = COMPLETED | FAILED
self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
self.logger.debug(logging_text + "Exit")
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
+
+ async def add_prometheus_metrics(self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip):
+ if not self.prometheus:
+ return
+ # look if exist a file called 'prometheus*.j2' and
+ artifact_content = self.fs.dir_ls(artifact_path)
+ job_file = next((f for f in artifact_content if f.startswith("prometheus") and f.endswith(".j2")), None)
+ if not job_file:
+ return
+ with self.fs.file_open((artifact_path, job_file), "r") as f:
+ job_data = f.read()
+
+ # TODO get_service
+ _, _, service = ee_id.partition(".") # remove prefix "namespace."
+ host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
+ host_port = "80"
+ vnfr_id = vnfr_id.replace("-", "")
+ variables = {
+ "JOB_NAME": vnfr_id,
+ "TARGET_IP": target_ip,
+ "EXPORTER_POD_IP": host_name,
+ "EXPORTER_POD_PORT": host_port,
+ }
+ job_list = self.prometheus.parse_job(job_data, variables)
+ # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
+ for job in job_list:
+ if not isinstance(job.get("job_name"), str) or vnfr_id not in job["job_name"]:
+ job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
+ job["nsr_id"] = nsr_id
+ job_dict = {jl["job_name"]: jl for jl in job_list}
+ if await self.prometheus.update(job_dict):
+ return list(job_dict.keys())