check_juju_bundle_existence,
get_charm_artifact_path,
get_ee_id_parts,
+ vld_to_ro_ip_profile,
)
from osm_lcm.data_utils.nsd import (
get_ns_configuration_relation_list,
return None
def _on_update_ro_db(self, nsrs_id, ro_descriptor):
-
# self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
try:
)
async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
-
# remove last dot from path (if exists)
if path.endswith("."):
path = path[:-1]
# self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
# .format(table, filter, path, updated_data))
try:
-
nsr_id = filter.get("_id")
# read ns record from database
return wim_account
def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
-
db_vdu_push_list = []
template_vdur = []
db_update = {"_admin.modified": time()}
start_deploy,
timeout_ns_deploy,
):
-
db_vims = {}
def get_vim_account(vim_account_id):
target_vim, target_vld, vld_params, target_sdn
):
if vld_params.get("ip-profile"):
- target_vld["vim_info"][target_vim]["ip_profile"] = vld_params[
- "ip-profile"
- ]
+ target_vld["vim_info"][target_vim]["ip_profile"] = vld_to_ro_ip_profile(
+ vld_params["ip-profile"]
+ )
if vld_params.get("provider-network"):
target_vld["vim_info"][target_vim]["provider_network"] = vld_params[
"provider-network"
lambda v_vld: v_vld["name"] in (a_vld["name"], a_vld["id"]),
)
if target_vld:
-
if vnf_params.get("vimAccountId") not in a_vld.get(
"vim_info", {}
):
# check if this network needs SDN assist
if vld.get("pci-interfaces"):
db_vim = get_vim_account(ns_params["vimAccountId"])
- sdnc_id = db_vim["config"].get("sdn-controller")
- if sdnc_id:
- sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
- target_sdn = "sdn:{}".format(sdnc_id)
- target_vld["vim_info"][target_sdn] = {
- "sdn": True,
- "target_vim": target_vim,
- "vlds": [sdn_vld],
- "type": vld.get("type"),
- }
+ if vim_config := db_vim.get("config"):
+ if sdnc_id := vim_config.get("sdn-controller"):
+ sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
+ target_sdn = "sdn:{}".format(sdnc_id)
+ target_vld["vim_info"][target_sdn] = {
+ "sdn": True,
+ "target_vim": target_vim,
+ "vlds": [sdn_vld],
+ "type": vld.get("type"),
+ }
nsd_vnf_profiles = get_vnf_profiles(nsd)
for nsd_vnf_profile in nsd_vnf_profiles:
and nsd_vlp.get("virtual-link-protocol-data")
and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
):
- ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"][
+ vld_params["ip-profile"] = nsd_vlp["virtual-link-protocol-data"][
"l3-protocol-data"
]
- ip_profile_dest_data = {}
- if "ip-version" in ip_profile_source_data:
- ip_profile_dest_data["ip-version"] = ip_profile_source_data[
- "ip-version"
- ]
- if "cidr" in ip_profile_source_data:
- ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
- "cidr"
- ]
- if "gateway-ip" in ip_profile_source_data:
- ip_profile_dest_data["gateway-address"] = ip_profile_source_data[
- "gateway-ip"
- ]
- if "dhcp-enabled" in ip_profile_source_data:
- ip_profile_dest_data["dhcp-params"] = {
- "enabled": ip_profile_source_data["dhcp-enabled"]
- }
- vld_params["ip-profile"] = ip_profile_dest_data
# update vld_params with instantiation params
vld_instantiation_params = find_in_list(
and vnfd_vlp.get("virtual-link-protocol-data")
and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
):
- ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"][
+ vld_params["ip-profile"] = vnfd_vlp["virtual-link-protocol-data"][
"l3-protocol-data"
]
- ip_profile_dest_data = {}
- if "ip-version" in ip_profile_source_data:
- ip_profile_dest_data["ip-version"] = ip_profile_source_data[
- "ip-version"
- ]
- if "cidr" in ip_profile_source_data:
- ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
- "cidr"
- ]
- if "gateway-ip" in ip_profile_source_data:
- ip_profile_dest_data[
- "gateway-address"
- ] = ip_profile_source_data["gateway-ip"]
- if "dhcp-enabled" in ip_profile_source_data:
- ip_profile_dest_data["dhcp-params"] = {
- "enabled": ip_profile_source_data["dhcp-enabled"]
- }
-
- vld_params["ip-profile"] = ip_profile_dest_data
# update vld_params with instantiation params
if vnf_params:
vld_instantiation_params = find_in_list(
vdu_instantiation_params, vdud
)
vdur["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
+ vdur["additionalParams"]["OSM"][
+ "vim_flavor_id"
+ ] = vdu_instantiation_params.get("vim-flavor-id")
vdur_list.append(vdur)
target_vnf["vdur"] = vdur_list
target["vnf"].append(target_vnf)
ro_retries = 0
while True:
-
ro_retries += 1
if ro_retries >= 360: # 1 hour
raise LcmException(
}
step = ""
try:
-
element_type = "NS"
element_under_configuration = nsr_id
vca_id = self.get_vca_id(db_vnfr, db_nsr)
# create or register execution environment in VCA
if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
-
self._write_configuration_status(
nsr_id=nsr_id,
vca_index=vca_index,
vnfr_id=vnfr_id,
nsr_id=nsr_id,
target_ip=rw_mgmt_ip,
+ element_type=element_type,
vnf_member_index=db_vnfr.get("member-vnf-index-ref", ""),
vdu_id=vdu_id,
vdu_index=vdu_index,
element_type: str = None,
other_update: dict = None,
):
-
# self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
# .format(vca_index, status))
vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
return modified
+ def _gather_vnfr_healing_alerts(self, vnfr, vnfd):
+ alerts = []
+ nsr_id = vnfr["nsr-id-ref"]
+ df = vnfd.get("df", [{}])[0]
+ # Checking for auto-healing configuration
+ if "healing-aspect" in df:
+ healing_aspects = df["healing-aspect"]
+ for healing in healing_aspects:
+ for healing_policy in healing.get("healing-policy", ()):
+ vdu_id = healing_policy["vdu-id"]
+ vdur = next(
+ (vdur for vdur in vnfr["vdur"] if vdu_id == vdur["vdu-id-ref"]),
+ {},
+ )
+ if not vdur:
+ continue
+ metric_name = "vm_status"
+ vdu_name = vdur.get("name")
+ vnf_member_index = vnfr["member-vnf-index-ref"]
+ uuid = str(uuid4())
+ name = f"healing_{uuid}"
+ action = healing_policy
+ # action_on_recovery = healing.get("action-on-recovery")
+ # cooldown_time = healing.get("cooldown-time")
+ # day1 = healing.get("day1")
+ alert = {
+ "uuid": uuid,
+ "name": name,
+ "metric": metric_name,
+ "tags": {
+ "ns_id": nsr_id,
+ "vnf_member_index": vnf_member_index,
+ "vdu_name": vdu_name,
+ },
+ "alarm_status": "ok",
+ "action_type": "healing",
+ "action": action,
+ }
+ alerts.append(alert)
+ return alerts
+
+ def _gather_vnfr_scaling_alerts(self, vnfr, vnfd):
+ alerts = []
+ nsr_id = vnfr["nsr-id-ref"]
+ df = vnfd.get("df", [{}])[0]
+ # Checking for auto-scaling configuration
+ if "scaling-aspect" in df:
+ rel_operation_types = {
+ "GE": ">=",
+ "LE": "<=",
+ "GT": ">",
+ "LT": "<",
+ "EQ": "==",
+ "NE": "!=",
+ }
+ scaling_aspects = df["scaling-aspect"]
+ all_vnfd_monitoring_params = {}
+ for ivld in vnfd.get("int-virtual-link-desc", ()):
+ for mp in ivld.get("monitoring-parameters", ()):
+ all_vnfd_monitoring_params[mp.get("id")] = mp
+ for vdu in vnfd.get("vdu", ()):
+ for mp in vdu.get("monitoring-parameter", ()):
+ all_vnfd_monitoring_params[mp.get("id")] = mp
+ for df in vnfd.get("df", ()):
+ for mp in df.get("monitoring-parameter", ()):
+ all_vnfd_monitoring_params[mp.get("id")] = mp
+ for scaling_aspect in scaling_aspects:
+ scaling_group_name = scaling_aspect.get("name", "")
+ # Get monitored VDUs
+ all_monitored_vdus = set()
+ for delta in scaling_aspect.get("aspect-delta-details", {}).get(
+ "deltas", ()
+ ):
+ for vdu_delta in delta.get("vdu-delta", ()):
+ all_monitored_vdus.add(vdu_delta.get("id"))
+ monitored_vdurs = list(
+ filter(
+ lambda vdur: vdur["vdu-id-ref"] in all_monitored_vdus,
+ vnfr["vdur"],
+ )
+ )
+ if not monitored_vdurs:
+ self.logger.error(
+ "Scaling criteria is referring to a vnf-monitoring-param that does not contain a reference to a vdu or vnf metric"
+ )
+ continue
+ for scaling_policy in scaling_aspect.get("scaling-policy", ()):
+ if scaling_policy["scaling-type"] != "automatic":
+ continue
+ threshold_time = scaling_policy.get("threshold-time", "1")
+ cooldown_time = scaling_policy.get("cooldown-time", "0")
+ for scaling_criteria in scaling_policy["scaling-criteria"]:
+ monitoring_param_ref = scaling_criteria.get(
+ "vnf-monitoring-param-ref"
+ )
+ vnf_monitoring_param = all_vnfd_monitoring_params[
+ monitoring_param_ref
+ ]
+ for vdur in monitored_vdurs:
+ vdu_id = vdur["vdu-id-ref"]
+ metric_name = vnf_monitoring_param.get("performance-metric")
+ metric_name = f"osm_{metric_name}"
+ vnf_member_index = vnfr["member-vnf-index-ref"]
+ scalein_threshold = scaling_criteria.get(
+ "scale-in-threshold"
+ )
+ scaleout_threshold = scaling_criteria.get(
+ "scale-out-threshold"
+ )
+ # Looking for min/max-number-of-instances
+ instances_min_number = 1
+ instances_max_number = 1
+ vdu_profile = df["vdu-profile"]
+ if vdu_profile:
+ profile = next(
+ item for item in vdu_profile if item["id"] == vdu_id
+ )
+ instances_min_number = profile.get(
+ "min-number-of-instances", 1
+ )
+ instances_max_number = profile.get(
+ "max-number-of-instances", 1
+ )
+
+ if scalein_threshold:
+ uuid = str(uuid4())
+ name = f"scalein_{uuid}"
+ operation = scaling_criteria[
+ "scale-in-relational-operation"
+ ]
+ rel_operator = rel_operation_types.get(operation, "<=")
+ metric_selector = f'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}'
+ expression = f"(count ({metric_selector}) > {instances_min_number}) and (avg({metric_selector}) {rel_operator} {scalein_threshold})"
+ labels = {
+ "ns_id": nsr_id,
+ "vnf_member_index": vnf_member_index,
+ "vdu_id": vdu_id,
+ }
+ prom_cfg = {
+ "alert": name,
+ "expr": expression,
+ "for": str(threshold_time) + "m",
+ "labels": labels,
+ }
+ action = scaling_policy
+ action = {
+ "scaling-group": scaling_group_name,
+ "cooldown-time": cooldown_time,
+ }
+ alert = {
+ "uuid": uuid,
+ "name": name,
+ "metric": metric_name,
+ "tags": {
+ "ns_id": nsr_id,
+ "vnf_member_index": vnf_member_index,
+ "vdu_id": vdu_id,
+ },
+ "alarm_status": "ok",
+ "action_type": "scale_in",
+ "action": action,
+ "prometheus_config": prom_cfg,
+ }
+ alerts.append(alert)
+
+ if scaleout_threshold:
+ uuid = str(uuid4())
+ name = f"scaleout_{uuid}"
+ operation = scaling_criteria[
+ "scale-out-relational-operation"
+ ]
+ rel_operator = rel_operation_types.get(operation, "<=")
+ metric_selector = f'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}'
+ expression = f"(count ({metric_selector}) < {instances_max_number}) and (avg({metric_selector}) {rel_operator} {scaleout_threshold})"
+ labels = {
+ "ns_id": nsr_id,
+ "vnf_member_index": vnf_member_index,
+ "vdu_id": vdu_id,
+ }
+ prom_cfg = {
+ "alert": name,
+ "expr": expression,
+ "for": str(threshold_time) + "m",
+ "labels": labels,
+ }
+ action = scaling_policy
+ action = {
+ "scaling-group": scaling_group_name,
+ "cooldown-time": cooldown_time,
+ }
+ alert = {
+ "uuid": uuid,
+ "name": name,
+ "metric": metric_name,
+ "tags": {
+ "ns_id": nsr_id,
+ "vnf_member_index": vnf_member_index,
+ "vdu_id": vdu_id,
+ },
+ "alarm_status": "ok",
+ "action_type": "scale_out",
+ "action": action,
+ "prometheus_config": prom_cfg,
+ }
+ alerts.append(alert)
+ return alerts
+
def update_nsrs_with_pla_result(self, params):
try:
nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
db_nsr_update["detailed-status"] = "Done"
db_nslcmop_update["detailed-status"] = "Done"
nslcmop_operation_state = "COMPLETED"
+ # Gather auto-healing and auto-scaling alerts for each vnfr
+ healing_alerts = []
+ scaling_alerts = []
+ for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
+ vnfd = next(
+ (sub for sub in db_vnfds if sub["_id"] == vnfr["vnfd-id"]), None
+ )
+ healing_alerts = self._gather_vnfr_healing_alerts(vnfr, vnfd)
+ for alert in healing_alerts:
+ self.logger.info(f"Storing healing alert in MongoDB: {alert}")
+ self.db.create("alerts", alert)
+
+ scaling_alerts = self._gather_vnfr_scaling_alerts(vnfr, vnfd)
+ for alert in scaling_alerts:
+ self.logger.info(f"Storing scaling alert in MongoDB: {alert}")
+ self.db.create("alerts", alert)
if db_nsr:
self._write_ns_status(
cached_vnfds: Dict[str, Any],
) -> List[Relation]:
relations = []
+ if vca.target_element == "ns":
+ self.logger.debug("VCA is a NS charm, not a VNF.")
+ return relations
vnf_profile = get_vnf_profile(nsd, vca.vnf_profile_id)
vnf_profile_id = vnf_profile["id"]
vnfd_id = vnf_profile["vnfd-id"]
vca_index: int,
timeout: int = 3600,
) -> bool:
-
# steps:
# 1. find all relations for this VCA
# 2. wait for other peers related
timeout: int = 600,
vca_id: str = None,
):
-
try:
k8sclustertype = k8s_instance_info["k8scluster-type"]
# Instantiate kdu
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
)
+ self.logger.debug(f"Deleting alerts: ns_id={nsr_id}")
+ self.db.del_list("alerts", {"tags.ns_id": nsr_id})
self.logger.debug(logging_text + "Exit")
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
del desc_params["kdu_model"]
else:
kdu_model = kdu.get("kdu-model")
- parts = kdu_model.split(sep=":")
- if len(parts) == 2:
- kdu_model = parts[0]
+ if kdu_model.count("/") < 2: # helm chart is not embedded
+ parts = kdu_model.split(sep=":")
+ if len(parts) == 2:
+ kdu_model = parts[0]
if desc_params.get("kdu_atomic_upgrade"):
atomic_upgrade = desc_params.get(
"kdu_atomic_upgrade"
raise
except Exception as e:
-
self.logger.debug("Error upgrading charm {}".format(path))
return "FAILED", "Error upgrading charm {}: {}".format(path, e)
nsr_deployed = db_nsr["_admin"].get("deployed")
if update_type == "CHANGE_VNFPKG":
-
# Get the input parameters given through update request
vnf_instance_id = db_nslcmop["operationParams"][
"changeVnfPackageData"
step = "Checking if revision has changed in VNFD"
if current_vnf_revision != latest_vnfd_revision:
-
change_type = "policy_updated"
# There is new revision of VNFD, update operation is required
step = "Getting descriptor config"
if current_vnfd.get("kdu"):
-
search_key = "kdu_name"
else:
search_key = "vnfd_id"
# There could be several charm used in the same VNF
for ee_item in ee_list:
if ee_item.get("juju"):
-
step = "Getting charm name"
charm_name = ee_item["juju"].get("charm")
if find_software_version(current_vnfd) != find_software_version(
latest_vnfd
):
-
step = "Checking if existing VNF has charm"
for current_charm_path, target_charm_path in list(
charm_artifact_paths
current_charm_path, target_charm_path
)
):
-
step = "Checking whether VNF uses juju bundle"
if check_juju_bundle_existence(current_vnfd):
-
raise LcmException(
"Charm upgrade is not supported for the instance which"
" uses juju-bundle: {}".format(
vnfr_id: str,
nsr_id: str,
target_ip: str,
+ element_type: str,
vnf_member_index: str = "",
vdu_id: str = "",
vdu_index: int = None,
vnfr_id (str): VNFR ID where this EE applies
nsr_id (str): NSR ID where this EE applies
target_ip (str): VDU/KDU instance IP address
+ element_type (str): NS or VNF or VDU or KDU
vnf_member_index (str, optional): VNF index where this EE applies. Defaults to "".
vdu_id (str, optional): VDU ID where this EE applies. Defaults to "".
vdu_index (int, optional): VDU index where this EE applies. Defaults to None.
Returns:
_type_: Prometheus jobs
"""
- self.logger.debug(f"KDU: {kdu_name}; KDU INDEX: {kdu_index}")
+ # default the vdur and kdur names to an empty string, to avoid any later
+ # problem with Prometheus when the element type is not VDU or KDU
+ vdur_name = ""
+ kdur_name = ""
+
# look if exist a file called 'prometheus*.j2' and
artifact_content = self.fs.dir_ls(artifact_path)
job_file = next(
with self.fs.file_open((artifact_path, job_file), "r") as f:
job_data = f.read()
- vdur_name = ""
- kdur_name = ""
- for r in range(360):
- db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
- if vdu_id and vdu_index is not None:
- vdur = next(
- (
- x
- for x in get_iterable(db_vnfr, "vdur")
- if (
- x.get("vdu-id-ref") == vdu_id
- and x.get("count-index") == vdu_index
- )
- ),
- {},
- )
- if vdur.get("name"):
- vdur_name = vdur.get("name")
- break
- if kdu_name and kdu_index is not None:
- kdur = next(
- (
- x
- for x in get_iterable(db_vnfr, "kdur")
- if (
- x.get("kdu-name") == kdu_name
- and x.get("count-index") == kdu_index
- )
- ),
- {},
- )
- if kdur.get("name"):
- kdur_name = kdur.get("name")
- break
+ # obtain the VDUR or KDUR, if the element type is VDU or KDU
+ if element_type in ("VDU", "KDU"):
+ for _ in range(360):
+ db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
+ if vdu_id and vdu_index is not None:
+ vdur = next(
+ (
+ x
+ for x in get_iterable(db_vnfr, "vdur")
+ if (
+ x.get("vdu-id-ref") == vdu_id
+ and x.get("count-index") == vdu_index
+ )
+ ),
+ {},
+ )
+ if vdur.get("name"):
+ vdur_name = vdur.get("name")
+ break
+ if kdu_name and kdu_index is not None:
+ kdur = next(
+ (
+ x
+ for x in get_iterable(db_vnfr, "kdur")
+ if (
+ x.get("kdu-name") == kdu_name
+ and x.get("count-index") == kdu_index
+ )
+ ),
+ {},
+ )
+ if kdur.get("name"):
+ kdur_name = kdur.get("name")
+ break
- await asyncio.sleep(10, loop=self.loop)
- else:
- if vdu_id and vdu_index is not None:
- raise LcmException(
- f"Timeout waiting VDU with name={vdu_id} and index={vdu_index} to be intantiated"
- )
- if kdu_name and kdu_index is not None:
- raise LcmException(
- f"Timeout waiting KDU with name={kdu_name} and index={kdu_index} to be intantiated"
- )
+ await asyncio.sleep(10, loop=self.loop)
+ else:
+ if vdu_id and vdu_index is not None:
+ raise LcmException(
+ f"Timeout waiting VDU with name={vdu_id} and index={vdu_index} to be intantiated"
+ )
+ if kdu_name and kdu_index is not None:
+ raise LcmException(
+ f"Timeout waiting KDU with name={kdu_name} and index={kdu_index} to be intantiated"
+ )
# TODO get_service
_, _, service = ee_id.partition(".") # remove prefix "namespace."
"VNF_MEMBER_INDEX": vnf_member_index,
"VDUR_NAME": vdur_name,
"KDUR_NAME": kdur_name,
+ "ELEMENT_TYPE": element_type,
}
job_list = parse_job(job_data, variables)
# ensure job_name is using the vnfr_id. Adding the metadata nsr_id
}
step = ""
try:
-
element_type = "NS"
element_under_configuration = nsr_id