check_juju_bundle_existence,
get_charm_artifact_path,
get_ee_id_parts,
+ vld_to_ro_ip_profile,
)
from osm_lcm.data_utils.nsd import (
get_ns_configuration_relation_list,
return None
def _on_update_ro_db(self, nsrs_id, ro_descriptor):
-
# self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
try:
)
async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
-
# remove last dot from path (if exists)
if path.endswith("."):
path = path[:-1]
# self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
# .format(table, filter, path, updated_data))
try:
-
nsr_id = filter.get("_id")
# read ns record from database
return wim_account
def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
-
db_vdu_push_list = []
template_vdur = []
db_update = {"_admin.modified": time()}
start_deploy,
timeout_ns_deploy,
):
-
db_vims = {}
def get_vim_account(vim_account_id):
target_vim, target_vld, vld_params, target_sdn
):
if vld_params.get("ip-profile"):
- target_vld["vim_info"][target_vim]["ip_profile"] = vld_params[
- "ip-profile"
- ]
+ target_vld["vim_info"][target_vim]["ip_profile"] = vld_to_ro_ip_profile(
+ vld_params["ip-profile"]
+ )
if vld_params.get("provider-network"):
target_vld["vim_info"][target_vim]["provider_network"] = vld_params[
"provider-network"
lambda v_vld: v_vld["name"] in (a_vld["name"], a_vld["id"]),
)
if target_vld:
-
if vnf_params.get("vimAccountId") not in a_vld.get(
"vim_info", {}
):
and nsd_vlp.get("virtual-link-protocol-data")
and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
):
- ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"][
+ vld_params["ip-profile"] = nsd_vlp["virtual-link-protocol-data"][
"l3-protocol-data"
]
- ip_profile_dest_data = {}
- if "ip-version" in ip_profile_source_data:
- ip_profile_dest_data["ip-version"] = ip_profile_source_data[
- "ip-version"
- ]
- if "cidr" in ip_profile_source_data:
- ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
- "cidr"
- ]
- if "gateway-ip" in ip_profile_source_data:
- ip_profile_dest_data["gateway-address"] = ip_profile_source_data[
- "gateway-ip"
- ]
- if "dhcp-enabled" in ip_profile_source_data:
- ip_profile_dest_data["dhcp-params"] = {
- "enabled": ip_profile_source_data["dhcp-enabled"]
- }
- vld_params["ip-profile"] = ip_profile_dest_data
# update vld_params with instantiation params
vld_instantiation_params = find_in_list(
and vnfd_vlp.get("virtual-link-protocol-data")
and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
):
- ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"][
+ vld_params["ip-profile"] = vnfd_vlp["virtual-link-protocol-data"][
"l3-protocol-data"
]
- ip_profile_dest_data = {}
- if "ip-version" in ip_profile_source_data:
- ip_profile_dest_data["ip-version"] = ip_profile_source_data[
- "ip-version"
- ]
- if "cidr" in ip_profile_source_data:
- ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
- "cidr"
- ]
- if "gateway-ip" in ip_profile_source_data:
- ip_profile_dest_data[
- "gateway-address"
- ] = ip_profile_source_data["gateway-ip"]
- if "dhcp-enabled" in ip_profile_source_data:
- ip_profile_dest_data["dhcp-params"] = {
- "enabled": ip_profile_source_data["dhcp-enabled"]
- }
-
- vld_params["ip-profile"] = ip_profile_dest_data
# update vld_params with instantiation params
if vnf_params:
vld_instantiation_params = find_in_list(
vdu_instantiation_params, vdud
)
vdur["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
+ vdur["additionalParams"]["OSM"][
+ "vim_flavor_id"
+ ] = vdu_instantiation_params.get("vim-flavor-id")
vdur_list.append(vdur)
target_vnf["vdur"] = vdur_list
target["vnf"].append(target_vnf)
ro_retries = 0
while True:
-
ro_retries += 1
if ro_retries >= 360: # 1 hour
raise LcmException(
}
step = ""
try:
-
element_type = "NS"
element_under_configuration = nsr_id
vca_id = self.get_vca_id(db_vnfr, db_nsr)
# create or register execution environment in VCA
if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
-
self._write_configuration_status(
nsr_id=nsr_id,
vca_index=vca_index,
element_type: str = None,
other_update: dict = None,
):
-
# self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
# .format(vca_index, status))
vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
return modified
+ def _gather_vnfr_healing_alerts(self, vnfr, vnfd):
+ alerts = []
+ nsr_id = vnfr["nsr-id-ref"]
+ df = vnfd.get("df", [{}])[0]
+ # Checking for auto-healing configuration
+ if "healing-aspect" in df:
+ healing_aspects = df["healing-aspect"]
+ for healing in healing_aspects:
+ for healing_policy in healing.get("healing-policy", ()):
+ vdu_id = healing_policy["vdu-id"]
+ vdur = next(
+ (vdur for vdur in vnfr["vdur"] if vdu_id == vdur["vdu-id-ref"]),
+ {},
+ )
+ if not vdur:
+ continue
+ metric_name = "vm_status"
+ vdu_name = vdur.get("name")
+ vnf_member_index = vnfr["member-vnf-index-ref"]
+ uuid = str(uuid4())
+ name = f"healing_{uuid}"
+ action = healing_policy
+ # action_on_recovery = healing.get("action-on-recovery")
+ # cooldown_time = healing.get("cooldown-time")
+ # day1 = healing.get("day1")
+ alert = {
+ "uuid": uuid,
+ "name": name,
+ "metric": metric_name,
+ "tags": {
+ "ns_id": nsr_id,
+ "vnf_member_index": vnf_member_index,
+ "vdu_name": vdu_name,
+ },
+ "alarm_status": "ok",
+ "action_type": "healing",
+ "action": action,
+ }
+ alerts.append(alert)
+ return alerts
+
+ def _gather_vnfr_scaling_alerts(self, vnfr, vnfd):
+ alerts = []
+ nsr_id = vnfr["nsr-id-ref"]
+ df = vnfd.get("df", [{}])[0]
+ # Checking for auto-scaling configuration
+ if "scaling-aspect" in df:
+ rel_operation_types = {
+ "GE": ">=",
+ "LE": "<=",
+ "GT": ">",
+ "LT": "<",
+ "EQ": "==",
+ "NE": "!=",
+ }
+ scaling_aspects = df["scaling-aspect"]
+ all_vnfd_monitoring_params = {}
+ for ivld in vnfd.get("int-virtual-link-desc", ()):
+ for mp in ivld.get("monitoring-parameters", ()):
+ all_vnfd_monitoring_params[mp.get("id")] = mp
+ for vdu in vnfd.get("vdu", ()):
+ for mp in vdu.get("monitoring-parameter", ()):
+ all_vnfd_monitoring_params[mp.get("id")] = mp
+ for df in vnfd.get("df", ()):
+ for mp in df.get("monitoring-parameter", ()):
+ all_vnfd_monitoring_params[mp.get("id")] = mp
+ for scaling_aspect in scaling_aspects:
+ scaling_group_name = scaling_aspect.get("name", "")
+ # Get monitored VDUs
+ all_monitored_vdus = set()
+ for delta in scaling_aspect.get("aspect-delta-details", {}).get(
+ "deltas", ()
+ ):
+ for vdu_delta in delta.get("vdu-delta", ()):
+ all_monitored_vdus.add(vdu_delta.get("id"))
+ monitored_vdurs = list(
+ filter(
+ lambda vdur: vdur["vdu-id-ref"] in all_monitored_vdus,
+ vnfr["vdur"],
+ )
+ )
+ if not monitored_vdurs:
+ self.logger.error(
+ "Scaling criteria is referring to a vnf-monitoring-param that does not contain a reference to a vdu or vnf metric"
+ )
+ continue
+ for scaling_policy in scaling_aspect.get("scaling-policy", ()):
+ if scaling_policy["scaling-type"] != "automatic":
+ continue
+ threshold_time = scaling_policy.get("threshold-time", "1")
+ cooldown_time = scaling_policy.get("cooldown-time", "0")
+ for scaling_criteria in scaling_policy["scaling-criteria"]:
+ monitoring_param_ref = scaling_criteria.get(
+ "vnf-monitoring-param-ref"
+ )
+ vnf_monitoring_param = all_vnfd_monitoring_params[
+ monitoring_param_ref
+ ]
+ for vdur in monitored_vdurs:
+ vdu_id = vdur["vdu-id-ref"]
+ metric_name = vnf_monitoring_param.get("performance-metric")
+ metric_name = f"osm_{metric_name}"
+ vnf_member_index = vnfr["member-vnf-index-ref"]
+ scalein_threshold = scaling_criteria.get(
+ "scale-in-threshold"
+ )
+ scaleout_threshold = scaling_criteria.get(
+ "scale-out-threshold"
+ )
+ # Looking for min/max-number-of-instances
+ instances_min_number = 1
+ instances_max_number = 1
+ vdu_profile = df["vdu-profile"]
+ if vdu_profile:
+ profile = next(
+ item for item in vdu_profile if item["id"] == vdu_id
+ )
+ instances_min_number = profile.get(
+ "min-number-of-instances", 1
+ )
+ instances_max_number = profile.get(
+ "max-number-of-instances", 1
+ )
+
+ if scalein_threshold:
+ uuid = str(uuid4())
+ name = f"scalein_{uuid}"
+ operation = scaling_criteria[
+ "scale-in-relational-operation"
+ ]
+ rel_operator = rel_operation_types.get(operation, "<=")
+ metric_selector = f'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}'
+ expression = f"(count ({metric_selector}) > {instances_min_number}) and (avg({metric_selector}) {rel_operator} {scalein_threshold})"
+ labels = {
+ "ns_id": nsr_id,
+ "vnf_member_index": vnf_member_index,
+ "vdu_id": vdu_id,
+ }
+ prom_cfg = {
+ "alert": name,
+ "expr": expression,
+ "for": str(threshold_time) + "m",
+ "labels": labels,
+ }
+ action = scaling_policy
+ action = {
+ "scaling-group": scaling_group_name,
+ "cooldown-time": cooldown_time,
+ }
+ alert = {
+ "uuid": uuid,
+ "name": name,
+ "metric": metric_name,
+ "tags": {
+ "ns_id": nsr_id,
+ "vnf_member_index": vnf_member_index,
+ "vdu_id": vdu_id,
+ },
+ "alarm_status": "ok",
+ "action_type": "scale_in",
+ "action": action,
+ "prometheus_config": prom_cfg,
+ }
+ alerts.append(alert)
+
+ if scaleout_threshold:
+ uuid = str(uuid4())
+ name = f"scaleout_{uuid}"
+ operation = scaling_criteria[
+ "scale-out-relational-operation"
+ ]
+ rel_operator = rel_operation_types.get(operation, "<=")
+ metric_selector = f'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}'
+ expression = f"(count ({metric_selector}) < {instances_max_number}) and (avg({metric_selector}) {rel_operator} {scaleout_threshold})"
+ labels = {
+ "ns_id": nsr_id,
+ "vnf_member_index": vnf_member_index,
+ "vdu_id": vdu_id,
+ }
+ prom_cfg = {
+ "alert": name,
+ "expr": expression,
+ "for": str(threshold_time) + "m",
+ "labels": labels,
+ }
+ action = scaling_policy
+ action = {
+ "scaling-group": scaling_group_name,
+ "cooldown-time": cooldown_time,
+ }
+ alert = {
+ "uuid": uuid,
+ "name": name,
+ "metric": metric_name,
+ "tags": {
+ "ns_id": nsr_id,
+ "vnf_member_index": vnf_member_index,
+ "vdu_id": vdu_id,
+ },
+ "alarm_status": "ok",
+ "action_type": "scale_out",
+ "action": action,
+ "prometheus_config": prom_cfg,
+ }
+ alerts.append(alert)
+ return alerts
+
def update_nsrs_with_pla_result(self, params):
try:
nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
db_nsr_update["detailed-status"] = "Done"
db_nslcmop_update["detailed-status"] = "Done"
nslcmop_operation_state = "COMPLETED"
+ # Gather auto-healing and auto-scaling alerts for each vnfr
+ healing_alerts = []
+ scaling_alerts = []
+ for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
+ vnfd = next(
+ (sub for sub in db_vnfds if sub["_id"] == vnfr["vnfd-id"]), None
+ )
+ healing_alerts = self._gather_vnfr_healing_alerts(vnfr, vnfd)
+ for alert in healing_alerts:
+ self.logger.info(f"Storing healing alert in MongoDB: {alert}")
+ self.db.create("alerts", alert)
+
+ scaling_alerts = self._gather_vnfr_scaling_alerts(vnfr, vnfd)
+ for alert in scaling_alerts:
+ self.logger.info(f"Storing scaling alert in MongoDB: {alert}")
+ self.db.create("alerts", alert)
if db_nsr:
self._write_ns_status(
vca_index: int,
timeout: int = 3600,
) -> bool:
-
# steps:
# 1. find all relations for this VCA
# 2. wait for other peers related
timeout: int = 600,
vca_id: str = None,
):
-
try:
k8sclustertype = k8s_instance_info["k8scluster-type"]
# Instantiate kdu
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
)
+ self.logger.debug(f"Deleting alerts: ns_id={nsr_id}")
+ self.db.del_list("alerts", {"tags.ns_id": nsr_id})
self.logger.debug(logging_text + "Exit")
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
del desc_params["kdu_model"]
else:
kdu_model = kdu.get("kdu-model")
- parts = kdu_model.split(sep=":")
- if len(parts) == 2:
- kdu_model = parts[0]
+ if kdu_model.count("/") < 2: # helm chart is not embedded
+ parts = kdu_model.split(sep=":")
+ if len(parts) == 2:
+ kdu_model = parts[0]
if desc_params.get("kdu_atomic_upgrade"):
atomic_upgrade = desc_params.get(
"kdu_atomic_upgrade"
raise
except Exception as e:
-
self.logger.debug("Error upgrading charm {}".format(path))
return "FAILED", "Error upgrading charm {}: {}".format(path, e)
nsr_deployed = db_nsr["_admin"].get("deployed")
if update_type == "CHANGE_VNFPKG":
-
# Get the input parameters given through update request
vnf_instance_id = db_nslcmop["operationParams"][
"changeVnfPackageData"
step = "Checking if revision has changed in VNFD"
if current_vnf_revision != latest_vnfd_revision:
-
change_type = "policy_updated"
# There is new revision of VNFD, update operation is required
step = "Getting descriptor config"
if current_vnfd.get("kdu"):
-
search_key = "kdu_name"
else:
search_key = "vnfd_id"
# There could be several charm used in the same VNF
for ee_item in ee_list:
if ee_item.get("juju"):
-
step = "Getting charm name"
charm_name = ee_item["juju"].get("charm")
if find_software_version(current_vnfd) != find_software_version(
latest_vnfd
):
-
step = "Checking if existing VNF has charm"
for current_charm_path, target_charm_path in list(
charm_artifact_paths
current_charm_path, target_charm_path
)
):
-
step = "Checking whether VNF uses juju bundle"
if check_juju_bundle_existence(current_vnfd):
-
raise LcmException(
"Charm upgrade is not supported for the instance which"
" uses juju-bundle: {}".format(
}
step = ""
try:
-
element_type = "NS"
element_under_configuration = nsr_id