from time import time
from uuid import uuid4
from functools import partial
+from random import randint
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
async def create_execution_environment(self, namespace: str, db_dict: dict, reuse_ee_id: str = None,
progress_timeout: float = None, total_timeout: float = None,
- artifact_path: str = None, vca_type: str = None) -> (str, dict):
+ config: dict = None, artifact_path: str = None,
+ vca_type: str = None) -> (str, dict):
# admit two new parameters, artifact_path and vca_type
if vca_type == "k8s_proxy_charm":
ee_id = await self.n2vc.install_k8s_proxy_charm(
SUBOPERATION_STATUS_SKIP = -3
task_name_deploy_vca = "Deploying VCA"
- def __init__(self, db, msg, fs, lcm_tasks, config, loop):
+ def __init__(self, db, msg, fs, lcm_tasks, config, loop, prometheus=None):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
"helm": self.conn_helm_ee
}
+ self.prometheus = prometheus
+
# create RO client
if self.ng_ro:
self.RO = NgRoClient(self.loop, **self.ro_config)
raise LcmException("Configuration aborted because dependent charm/s timeout")
async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
- config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name):
+ config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name,
+ ee_config_descriptor):
nsr_id = db_nsr["_id"]
db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
+ osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
db_dict = {
'collection': 'nsrs',
'filter': {'_id': nsr_id},
vnfr_id = None
if db_vnfr:
vnfr_id = db_vnfr["_id"]
+ osm_config["osm"]["vnf_id"] = vnfr_id
namespace = "{nsi}.{ns}".format(
nsi=nsi_id if nsi_id else "",
namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
element_type = 'VDU'
element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
+ osm_config["osm"]["vdu_id"] = vdu_id
elif kdu_name:
namespace += ".{}".format(kdu_name)
element_type = 'KDU'
element_under_configuration = kdu_name
+ osm_config["osm"]["kdu_name"] = kdu_name
# Get artifact path
artifact_path = "{}/{}/{}/{}".format(
namespace=namespace,
reuse_ee_id=ee_id,
db_dict=db_dict,
+ config=osm_config,
artifact_path=artifact_path,
vca_type=vca_type)
# TODO register in database that primitive is done
+ # STEP 7 Configure metrics
+ if vca_type == "helm":
+ prometheus_jobs = await self.add_prometheus_metrics(
+ ee_id=ee_id,
+ artifact_path=artifact_path,
+ ee_config_descriptor=ee_config_descriptor,
+ vnfr_id=vnfr_id,
+ nsr_id=nsr_id,
+ target_ip=rw_mgmt_ip,
+ )
+ if prometheus_jobs:
+ self.update_db_2("nsrs", nsr_id, {db_update_entry + "prometheus_jobs": prometheus_jobs})
+
step = "instantiated at VCA"
self.logger.debug(logging_text + step)
nslcmop_id=nslcmop_id,
stage=stage,
vca_type=vca_type,
- vca_name=vca_name
+ vca_name=vca_name,
+ ee_config_descriptor=ee_item
)
)
self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
+ if vca_deployed.get("prometheus_jobs") and self.prometheus:
+ await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
+
if destroy_ee:
await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"])
self.logger.debug(logging_text + stage[0])
stage[1] = "Looking execution environment that needs terminate."
self.logger.debug(logging_text + stage[1])
- self.logger.debug("nsr_deployed: {}".format(nsr_deployed))
+ # self.logger.debug("nsr_deployed: {}".format(nsr_deployed))
for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
self.logger.debug("vca_index: {}, vca: {}".format(vca_index, vca))
config_descriptor = None
config_descriptor = kdud.get("kdu-configuration")
else:
config_descriptor = db_vnfds_from_member_index[vca["member-vnf-index"]].get("vnf-configuration")
- # For helm we must destroy_ee
vca_type = vca.get("type")
exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
vca.get("needed_terminate"))
- self.logger.debug("vca type: {}".format(vca_type))
- if not vca_type == "helm":
- task = asyncio.ensure_future(self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor,
- vca_index, False, exec_terminate_primitives))
- else:
- task = asyncio.ensure_future(self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor,
- vca_index, True, exec_terminate_primitives))
+ # For helm we must destroy_ee
+ destroy_ee = "True" if vca_type == "helm" else "False"
+ task = asyncio.ensure_future(
+ self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, vca_index,
+ destroy_ee, exec_terminate_primitives))
tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
# wait for pending tasks of terminate primitives
self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
self.logger.debug(logging_text + "Exit")
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
+
+ async def add_prometheus_metrics(self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip):
+ if not self.prometheus:
+ return
+ # look if exist a file called 'prometheus*.j2' and
+ artifact_content = self.fs.dir_ls(artifact_path)
+ job_file = next((f for f in artifact_content if f.startswith("prometheus") and f.endswith(".j2")), None)
+ if not job_file:
+ return
+ with self.fs.file_open((artifact_path, job_file), "r") as f:
+ job_data = f.read()
+
+ # TODO get_service
+ _, _, service = ee_id.partition(".") # remove prefix "namespace."
+ host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
+ host_port = "80"
+ vnfr_id = vnfr_id.replace("-", "")
+ variables = {
+ "JOB_NAME": vnfr_id,
+ "TARGET_IP": target_ip,
+ "EXPORTER_POD_IP": host_name,
+ "EXPORTER_POD_PORT": host_port,
+ }
+ job_list = self.prometheus.parse_job(job_data, variables)
+ # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
+ for job in job_list:
+ if not isinstance(job.get("job_name"), str) or vnfr_id not in job["job_name"]:
+ job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
+ job["nsr_id"] = nsr_id
+ job_dict = {jl["job_name"]: jl for jl in job_list}
+ if await self.prometheus.update(job_dict):
+ return list(job_dict.keys())