##
import asyncio
-from typing import Any, Dict
+from typing import Any, Dict, List
import yaml
import logging
import logging.handlers
from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
from osm_lcm.lcm_helm_conn import LCMHelmConn
+from osm_lcm.prometheus import parse_job
from copy import copy, deepcopy
from time import time
SUBOPERATION_STATUS_SKIP = -3
task_name_deploy_vca = "Deploying VCA"
- def __init__(self, msg, lcm_tasks, config, loop, prometheus=None):
+ def __init__(self, msg, lcm_tasks, config, loop):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
"helm-v3": self.conn_helm_ee,
}
- self.prometheus = prometheus
-
# create RO client
self.RO = NgRoClient(self.loop, **self.ro_config)
try:
if vdu.get("cloud-init-file"):
base_folder = vnfd["_admin"]["storage"]
- cloud_init_file = "{}/{}/cloud_init/{}".format(
- base_folder["folder"],
- base_folder["pkg-dir"],
- vdu["cloud-init-file"],
- )
+ if base_folder["pkg-dir"]:
+ cloud_init_file = "{}/{}/cloud_init/{}".format(
+ base_folder["folder"],
+ base_folder["pkg-dir"],
+ vdu["cloud-init-file"],
+ )
+ else:
+ cloud_init_file = "{}/Scripts/cloud_init/{}".format(
+ base_folder["folder"],
+ vdu["cloud-init-file"],
+ )
with self.fs.file_open(cloud_init_file, "r") as ci_file:
cloud_init_content = ci_file.read()
elif vdu.get("cloud-init"):
def _get_vdu_additional_params(self, db_vnfr, vdu_id):
vdur = next(
- vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]
+ (vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]),
+ {}
)
additional_params = vdur.get("additionalParams")
return parse_yaml_strings(additional_params)
def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
db_vdu_push_list = []
+ template_vdur = []
db_update = {"_admin.modified": time()}
if vdu_create:
for vdu_id, vdu_count in vdu_create.items():
None,
)
if not vdur:
- raise LcmException(
- "Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
+ # Read the template saved in the db:
+ self.logger.debug(f"No vdur in the database. Using the vdur-template to scale")
+ vdur_template = db_vnfr.get("vdur-template")
+ if not vdur_template:
+ raise LcmException(
+ "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
vdu_id
+ )
)
- )
-
+ vdur = vdur_template[0]
+ #Delete a template from the database after using it
+ self.db.set_one("vnfrs",
+ {"_id": db_vnfr["_id"]},
+ None,
+ pull={"vdur-template": {"_id": vdur['_id']}}
+ )
for count in range(vdu_count):
vdur_copy = deepcopy(vdur)
vdur_copy["status"] = "BUILD"
vdur_copy["status-detailed"] = None
- vdur_copy["ip-address"]: None
+ vdur_copy["ip-address"] = None
vdur_copy["_id"] = str(uuid4())
vdur_copy["count-index"] += count + 1
vdur_copy["id"] = "{}-{}".format(
)
else:
iface.pop("mac-address", None)
- iface.pop(
- "mgmt_vnf", None
- ) # only first vdu can be managment of vnf
+ if db_vnfr["vdur"]:
+ iface.pop(
+ "mgmt_vnf", None
+ ) # only first vdu can be managment of vnf
db_vdu_push_list.append(vdur_copy)
# self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
if vdu_delete:
+ if len(db_vnfr["vdur"]) == 1:
+ # The scale will move to 0 instances
+ self.logger.debug(f"Scaling to 0 !, creating the template with the last vdur")
+ template_vdur = [db_vnfr["vdur"][0]]
for vdu_id, vdu_count in vdu_delete.items():
if mark_delete:
indexes_to_delete = [
None,
pull={"vdur": {"_id": vdu["_id"]}},
)
- db_push = {"vdur": db_vdu_push_list} if db_vdu_push_list else None
+ db_push = {}
+ if db_vdu_push_list:
+ db_push["vdur"] = db_vdu_push_list
+ if template_vdur:
+ db_push["vdur-template"] = template_vdur
+ if not db_push:
+ db_push = None
+ db_vnfr["vdur-template"] = template_vdur
self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
# modify passed dictionary db_vnfr
db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
if vld_params.get("common_id"):
target_vld["common_id"] = vld_params.get("common_id")
+ # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
+ def update_ns_vld_target(target, ns_params):
+ for vnf_params in ns_params.get("vnf", ()):
+ if vnf_params.get("vimAccountId"):
+ target_vnf = next(
+ (
+ vnfr
+ for vnfr in db_vnfrs.values()
+ if vnf_params["member-vnf-index"]
+ == vnfr["member-vnf-index-ref"]
+ ),
+ None,
+ )
+ vdur = next((vdur for vdur in target_vnf.get("vdur", ())), None)
+ for a_index, a_vld in enumerate(target["ns"]["vld"]):
+ target_vld = find_in_list(
+ get_iterable(vdur, "interfaces"),
+ lambda iface: iface.get("ns-vld-id") == a_vld["name"],
+ )
+ if target_vld:
+ if vnf_params.get("vimAccountId") not in a_vld.get(
+ "vim_info", {}
+ ):
+ target["ns"]["vld"][a_index].get("vim_info").update(
+ {
+ "vim:{}".format(vnf_params["vimAccountId"]): {
+ "vim_network_name": ""
+ }
+ }
+ )
+
nslcmop_id = db_nslcmop["_id"]
target = {
"name": db_nsr["name"],
image["vim_info"] = {}
for flavor in target["flavor"]:
flavor["vim_info"] = {}
+ if db_nsr.get("affinity-or-anti-affinity-group"):
+ target["affinity-or-anti-affinity-group"] = deepcopy(db_nsr["affinity-or-anti-affinity-group"])
+ for affinity_or_anti_affinity_group in target["affinity-or-anti-affinity-group"]:
+ affinity_or_anti_affinity_group["vim_info"] = {}
if db_nslcmop.get("lcmOperationType") != "instantiate":
# get parameters of instantiation:
vld_params.update(vld_instantiation_params)
parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
target["ns"]["vld"].append(target_vld)
+ # Update the target ns_vld if vnf vim_account is overriden by instantiation params
+ update_ns_vld_target(target, ns_params)
for vnfr in db_vnfrs.values():
vnfd = find_in_list(
# read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
if vdur["cloud-init"] not in target["cloud_init_content"]:
base_folder = vnfd["_admin"]["storage"]
- cloud_init_file = "{}/{}/cloud_init/{}".format(
- base_folder["folder"],
- base_folder["pkg-dir"],
- vdud.get("cloud-init-file"),
- )
+ if base_folder["pkg-dir"]:
+ cloud_init_file = "{}/{}/cloud_init/{}".format(
+ base_folder["folder"],
+ base_folder["pkg-dir"],
+ vdud.get("cloud-init-file"),
+ )
+ else:
+ cloud_init_file = "{}/Scripts/cloud_init/{}".format(
+ base_folder["folder"],
+ vdud.get("cloud-init-file"),
+ )
with self.fs.file_open(cloud_init_file, "r") as ci_file:
target["cloud_init_content"][
vdur["cloud-init"]
if target_vim not in ns_image["vim_info"]:
ns_image["vim_info"][target_vim] = {}
+ # Affinity groups
+ if vdur.get("affinity-or-anti-affinity-group-id"):
+ for ags_id in vdur["affinity-or-anti-affinity-group-id"]:
+ ns_ags = target["affinity-or-anti-affinity-group"][int(ags_id)]
+ if target_vim not in ns_ags["vim_info"]:
+ ns_ags["vim_info"][target_vim] = {}
+
vdur["vim_info"] = {target_vim: {}}
# instantiation parameters
# if vnf_params:
osm_config["osm"]["kdu_name"] = kdu_name
# Get artifact path
- artifact_path = "{}/{}/{}/{}".format(
- base_folder["folder"],
- base_folder["pkg-dir"],
- "charms"
- if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
- else "helm-charts",
- vca_name,
- )
+ if base_folder["pkg-dir"]:
+ artifact_path = "{}/{}/{}/{}".format(
+ base_folder["folder"],
+ base_folder["pkg-dir"],
+ "charms"
+ if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
+ else "helm-charts",
+ vca_name,
+ )
+ else:
+ artifact_path = "{}/Scripts/{}/{}/".format(
+ base_folder["folder"],
+ "charms"
+ if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
+ else "helm-charts",
+ vca_name,
+ )
self.logger.debug("Artifact path > {}".format(artifact_path))
# STEP 7 Configure metrics
if vca_type == "helm" or vca_type == "helm-v3":
- prometheus_jobs = await self.add_prometheus_metrics(
+ prometheus_jobs = await self.extract_prometheus_scrape_jobs(
ee_id=ee_id,
artifact_path=artifact_path,
ee_config_descriptor=ee_config_descriptor,
{db_update_entry + "prometheus_jobs": prometheus_jobs},
)
+ for job in prometheus_jobs:
+ self.db.set_one(
+ "prometheus_jobs",
+ {
+ "job_name": job["job_name"]
+ },
+ job,
+ upsert=True,
+ fail_on_empty=False
+ )
+
step = "instantiated at VCA"
self.logger.debug(logging_text + step)
# read from db: operation
stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ if db_nslcmop["operationParams"].get("additionalParamsForVnf"):
+ db_nslcmop["operationParams"]["additionalParamsForVnf"] = json.loads(
+ db_nslcmop["operationParams"]["additionalParamsForVnf"]
+ )
ns_params = db_nslcmop.get("operationParams")
if ns_params and ns_params.get("timeout_ns_deploy"):
timeout_ns_deploy = ns_params["timeout_ns_deploy"]
# for each vnf in ns, read vnfd
for vnfr in db_vnfrs_list:
+ if vnfr.get("kdur"):
+ kdur_list = []
+ for kdur in vnfr["kdur"]:
+ if kdur.get("additionalParams"):
+ kdur["additionalParams"] = json.loads(
+ kdur["additionalParams"]
+ )
+ kdur_list.append(kdur)
+ vnfr["kdur"] = kdur_list
+
db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
vnfd_id = vnfr["vnfd-id"]
vnfd_ref = vnfr["vnfd-ref"]
)
deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
if kdur.get("additionalParams"):
- deploy_params_kdu = parse_yaml_strings(
- kdur["additionalParams"]
+ deploy_params_kdu.update(
+ parse_yaml_strings(kdur["additionalParams"].copy())
)
self._deploy_n2vc(
nsd: Dict[str, Any],
vca: DeployedVCA,
cached_vnfds: Dict[str, Any],
- ):
+ ) -> List[Relation]:
relations = []
db_ns_relations = get_ns_configuration_relation_list(nsd)
for r in db_ns_relations:
+ provider_dict = None
+ requirer_dict = None
+ if all(key in r for key in ("provider", "requirer")):
+ provider_dict = r["provider"]
+ requirer_dict = r["requirer"]
+ elif "entities" in r:
+ provider_id = r["entities"][0]["id"]
+ provider_dict = {
+ "nsr-id": nsr_id,
+ "endpoint": r["entities"][0]["endpoint"],
+ }
+ if provider_id != nsd["id"]:
+ provider_dict["vnf-profile-id"] = provider_id
+ requirer_id = r["entities"][1]["id"]
+ requirer_dict = {
+ "nsr-id": nsr_id,
+ "endpoint": r["entities"][1]["endpoint"],
+ }
+ if requirer_id != nsd["id"]:
+ requirer_dict["vnf-profile-id"] = requirer_id
+ else:
+ raise Exception("provider/requirer or entities must be included in the relation.")
relation_provider = self._update_ee_relation_data_with_implicit_data(
- nsr_id, nsd, r["provider"], cached_vnfds
+ nsr_id, nsd, provider_dict, cached_vnfds
)
relation_requirer = self._update_ee_relation_data_with_implicit_data(
- nsr_id, nsd, r["requirer"], cached_vnfds
+ nsr_id, nsd, requirer_dict, cached_vnfds
)
provider = EERelation(relation_provider)
requirer = EERelation(relation_requirer)
nsd: Dict[str, Any],
vca: DeployedVCA,
cached_vnfds: Dict[str, Any],
- ):
+ ) -> List[Relation]:
relations = []
vnf_profile = get_vnf_profile(nsd, vca.vnf_profile_id)
vnf_profile_id = vnf_profile["id"]
db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
db_vnf_relations = get_relation_list(db_vnfd, vnfd_id)
for r in db_vnf_relations:
+ provider_dict = None
+ requirer_dict = None
+ if all(key in r for key in ("provider", "requirer")):
+ provider_dict = r["provider"]
+ requirer_dict = r["requirer"]
+ elif "entities" in r:
+ provider_id = r["entities"][0]["id"]
+ provider_dict = {
+ "nsr-id": nsr_id,
+ "vnf-profile-id": vnf_profile_id,
+ "endpoint": r["entities"][0]["endpoint"],
+ }
+ if provider_id != vnfd_id:
+ provider_dict["vdu-profile-id"] = provider_id
+ requirer_id = r["entities"][1]["id"]
+ requirer_dict = {
+ "nsr-id": nsr_id,
+ "vnf-profile-id": vnf_profile_id,
+ "endpoint": r["entities"][1]["endpoint"],
+ }
+ if requirer_id != vnfd_id:
+ requirer_dict["vdu-profile-id"] = requirer_id
+ else:
+ raise Exception("provider/requirer or entities must be included in the relation.")
relation_provider = self._update_ee_relation_data_with_implicit_data(
- nsr_id, nsd, r["provider"], cached_vnfds, vnf_profile_id=vnf_profile_id
+ nsr_id, nsd, provider_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
)
relation_requirer = self._update_ee_relation_data_with_implicit_data(
- nsr_id, nsd, r["requirer"], cached_vnfds, vnf_profile_id=vnf_profile_id
+ nsr_id, nsd, requirer_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
)
provider = EERelation(relation_provider)
requirer = EERelation(relation_requirer)
db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
)
storage = deep_get(vnfd_with_id, ("_admin", "storage"))
- if storage and storage.get(
- "pkg-dir"
- ): # may be not present if vnfd has not artifacts
+ if storage: # may be not present if vnfd has not artifacts
# path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
- filename = "{}/{}/{}s/{}".format(
- storage["folder"],
- storage["pkg-dir"],
- k8sclustertype,
- kdumodel,
- )
+ if storage["pkg-dir"]:
+ filename = "{}/{}/{}s/{}".format(
+ storage["folder"],
+ storage["pkg-dir"],
+ k8sclustertype,
+ kdumodel,
+ )
+ else:
+ filename = "{}/Scripts/{}s/{}".format(
+ storage["folder"],
+ k8sclustertype,
+ kdumodel,
+ )
if self.fs.file_exists(
filename, mode="file"
) or self.fs.file_exists(filename, mode="dir"):
"nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}
)
- if vca_deployed.get("prometheus_jobs") and self.prometheus:
- await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
+ # Delete Prometheus Jobs if any
+ # This uses NSR_ID, so it will destroy any jobs under this index
+ self.db.del_list("prometheus_jobs", {"nsr_id": db_nslcmop["nsInstanceId"]})
if destroy_ee:
await self.vca_map[vca_type].delete_execution_environment(
step = "Getting information from database"
db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ if db_nslcmop["operationParams"].get("primitive_params"):
+ db_nslcmop["operationParams"]["primitive_params"] = json.loads(
+ db_nslcmop["operationParams"]["primitive_params"]
+ )
nsr_deployed = db_nsr["_admin"].get("deployed")
vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
db_vnfr = self.db.get_one(
"vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
)
+ if db_vnfr.get("kdur"):
+ kdur_list = []
+ for kdur in db_vnfr["kdur"]:
+ if kdur.get("additionalParams"):
+ kdur["additionalParams"] = json.loads(
+ kdur["additionalParams"]
+ )
+ kdur_list.append(kdur)
+ db_vnfr["kdur"] = kdur_list
step = "Getting vnfd from database"
db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
else:
db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False
)
- async def add_prometheus_metrics(
- self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip
+ async def extract_prometheus_scrape_jobs(
+ self,
+ ee_id,
+ artifact_path,
+ ee_config_descriptor,
+ vnfr_id,
+ nsr_id,
+ target_ip
):
- if not self.prometheus:
- return
# look if exist a file called 'prometheus*.j2' and
artifact_content = self.fs.dir_ls(artifact_path)
job_file = next(
"EXPORTER_POD_IP": host_name,
"EXPORTER_POD_PORT": host_port,
}
- job_list = self.prometheus.parse_job(job_data, variables)
+ job_list = parse_job(job_data, variables)
# ensure job_name is using the vnfr_id. Adding the metadata nsr_id
for job in job_list:
if (
):
job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
job["nsr_id"] = nsr_id
- job_dict = {jl["job_name"]: jl for jl in job_list}
- if await self.prometheus.update(job_dict):
- return list(job_dict.keys())
+ job["vnfr_id"] = vnfr_id
+ return job_list
def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
"""