from time import time
from uuid import uuid4
-from random import randint
+from random import SystemRandom
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
SUBOPERATION_STATUS_SKIP = -3
task_name_deploy_vca = "Deploying VCA"
- def __init__(self, msg, lcm_tasks, config: LcmCfg, loop):
+ def __init__(self, msg, lcm_tasks, config: LcmCfg):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
self.db = Database().instance.db
self.fs = Filesystem().instance.fs
- self.loop = loop
self.lcm_tasks = lcm_tasks
self.timeout = config.timeout
self.ro_config = config.RO
# create N2VC connector
self.n2vc = N2VCJujuConnector(
log=self.logger,
- loop=self.loop,
on_update_db=self._on_update_n2vc_db,
fs=self.fs,
db=self.db,
self.conn_helm_ee = LCMHelmConn(
log=self.logger,
- loop=self.loop,
vca_config=self.vca_config,
on_update_db=self._on_update_n2vc_db,
)
kubectl_command=self.vca_config.kubectlpath,
juju_command=self.vca_config.jujupath,
log=self.logger,
- loop=self.loop,
on_update_db=self._on_update_k8s_db,
fs=self.fs,
db=self.db,
}
# create RO client
- self.RO = NgRoClient(self.loop, **self.ro_config.to_dict())
+ self.RO = NgRoClient(**self.ro_config.to_dict())
self.op_status_map = {
"instantiation": self.RO.status,
vdu_instantiation_params, vdud
)
vdur["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
+ vdur["additionalParams"]["OSM"][
+ "vim_flavor_id"
+ ] = vdu_instantiation_params.get("vim-flavor-id")
vdur_list.append(vdur)
target_vnf["vdur"] = vdur_list
target["vnf"].append(target_vnf)
db_nsr_update["detailed-status"] = " ".join(stage)
self.update_db_2("nsrs", nsr_id, db_nsr_update)
self._write_op_status(nslcmop_id, stage)
- await asyncio.sleep(15, loop=self.loop)
+ await asyncio.sleep(15)
else: # timeout_ns_deploy
raise NgRoException("Timeout waiting ns to deploy")
"target KDU={} is in error state".format(kdu_name)
)
- await asyncio.sleep(10, loop=self.loop)
+ await asyncio.sleep(10)
nb_tries += 1
raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
"Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
)
- await asyncio.sleep(10, loop=self.loop)
+ await asyncio.sleep(10)
# get ip address
if not target_vdu_id:
self.logger.debug(
logging_text + "Invoke and wait for placement optimization"
)
- await self.msg.aiowrite(
- "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
- )
+ await self.msg.aiowrite("pla", "get_placement", {"nslcmopId": nslcmop_id})
db_poll_interval = 5
wait = db_poll_interval * 10
pla_result = None
vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
return modified
+ def _gather_vnfr_healing_alerts(self, vnfr, vnfd):
+ alerts = []
+ nsr_id = vnfr["nsr-id-ref"]
+ df = vnfd.get("df", [{}])[0]
+ # Checking for auto-healing configuration
+ if "healing-aspect" in df:
+ healing_aspects = df["healing-aspect"]
+ for healing in healing_aspects:
+ for healing_policy in healing.get("healing-policy", ()):
+ vdu_id = healing_policy["vdu-id"]
+ vdur = next(
+ (vdur for vdur in vnfr["vdur"] if vdu_id == vdur["vdu-id-ref"]),
+ {},
+ )
+ if not vdur:
+ continue
+ metric_name = "vm_status"
+ vdu_name = vdur.get("name")
+ vnf_member_index = vnfr["member-vnf-index-ref"]
+ uuid = str(uuid4())
+ name = f"healing_{uuid}"
+ action = healing_policy
+ # action_on_recovery = healing.get("action-on-recovery")
+ # cooldown_time = healing.get("cooldown-time")
+ # day1 = healing.get("day1")
+ alert = {
+ "uuid": uuid,
+ "name": name,
+ "metric": metric_name,
+ "tags": {
+ "ns_id": nsr_id,
+ "vnf_member_index": vnf_member_index,
+ "vdu_name": vdu_name,
+ },
+ "alarm_status": "ok",
+ "action_type": "healing",
+ "action": action,
+ }
+ alerts.append(alert)
+ return alerts
+
+ def _gather_vnfr_scaling_alerts(self, vnfr, vnfd):
+ alerts = []
+ nsr_id = vnfr["nsr-id-ref"]
+ df = vnfd.get("df", [{}])[0]
+ # Checking for auto-scaling configuration
+ if "scaling-aspect" in df:
+ rel_operation_types = {
+ "GE": ">=",
+ "LE": "<=",
+ "GT": ">",
+ "LT": "<",
+ "EQ": "==",
+ "NE": "!=",
+ }
+ scaling_aspects = df["scaling-aspect"]
+ all_vnfd_monitoring_params = {}
+ for ivld in vnfd.get("int-virtual-link-desc", ()):
+ for mp in ivld.get("monitoring-parameters", ()):
+ all_vnfd_monitoring_params[mp.get("id")] = mp
+ for vdu in vnfd.get("vdu", ()):
+ for mp in vdu.get("monitoring-parameter", ()):
+ all_vnfd_monitoring_params[mp.get("id")] = mp
+ for df in vnfd.get("df", ()):
+ for mp in df.get("monitoring-parameter", ()):
+ all_vnfd_monitoring_params[mp.get("id")] = mp
+ for scaling_aspect in scaling_aspects:
+ scaling_group_name = scaling_aspect.get("name", "")
+ # Get monitored VDUs
+ all_monitored_vdus = set()
+ for delta in scaling_aspect.get("aspect-delta-details", {}).get(
+ "deltas", ()
+ ):
+ for vdu_delta in delta.get("vdu-delta", ()):
+ all_monitored_vdus.add(vdu_delta.get("id"))
+ monitored_vdurs = list(
+ filter(
+ lambda vdur: vdur["vdu-id-ref"] in all_monitored_vdus,
+ vnfr["vdur"],
+ )
+ )
+ if not monitored_vdurs:
+ self.logger.error(
+ "Scaling criteria is referring to a vnf-monitoring-param that does not contain a reference to a vdu or vnf metric"
+ )
+ continue
+ for scaling_policy in scaling_aspect.get("scaling-policy", ()):
+ if scaling_policy["scaling-type"] != "automatic":
+ continue
+ threshold_time = scaling_policy.get("threshold-time", "1")
+ cooldown_time = scaling_policy.get("cooldown-time", "0")
+ for scaling_criteria in scaling_policy["scaling-criteria"]:
+ monitoring_param_ref = scaling_criteria.get(
+ "vnf-monitoring-param-ref"
+ )
+ vnf_monitoring_param = all_vnfd_monitoring_params[
+ monitoring_param_ref
+ ]
+ for vdur in monitored_vdurs:
+ vdu_id = vdur["vdu-id-ref"]
+ metric_name = vnf_monitoring_param.get("performance-metric")
+ metric_name = f"osm_{metric_name}"
+ vnf_member_index = vnfr["member-vnf-index-ref"]
+ scalein_threshold = scaling_criteria.get(
+ "scale-in-threshold"
+ )
+ scaleout_threshold = scaling_criteria.get(
+ "scale-out-threshold"
+ )
+ # Looking for min/max-number-of-instances
+ instances_min_number = 1
+ instances_max_number = 1
+ vdu_profile = df["vdu-profile"]
+ if vdu_profile:
+ profile = next(
+ item for item in vdu_profile if item["id"] == vdu_id
+ )
+ instances_min_number = profile.get(
+ "min-number-of-instances", 1
+ )
+ instances_max_number = profile.get(
+ "max-number-of-instances", 1
+ )
+
+ if scalein_threshold:
+ uuid = str(uuid4())
+ name = f"scalein_{uuid}"
+ operation = scaling_criteria[
+ "scale-in-relational-operation"
+ ]
+ rel_operator = rel_operation_types.get(operation, "<=")
+ metric_selector = f'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}'
+ expression = f"(count ({metric_selector}) > {instances_min_number}) and (avg({metric_selector}) {rel_operator} {scalein_threshold})"
+ labels = {
+ "ns_id": nsr_id,
+ "vnf_member_index": vnf_member_index,
+ "vdu_id": vdu_id,
+ }
+ prom_cfg = {
+ "alert": name,
+ "expr": expression,
+ "for": str(threshold_time) + "m",
+ "labels": labels,
+ }
+ action = scaling_policy
+ action = {
+ "scaling-group": scaling_group_name,
+ "cooldown-time": cooldown_time,
+ }
+ alert = {
+ "uuid": uuid,
+ "name": name,
+ "metric": metric_name,
+ "tags": {
+ "ns_id": nsr_id,
+ "vnf_member_index": vnf_member_index,
+ "vdu_id": vdu_id,
+ },
+ "alarm_status": "ok",
+ "action_type": "scale_in",
+ "action": action,
+ "prometheus_config": prom_cfg,
+ }
+ alerts.append(alert)
+
+ if scaleout_threshold:
+ uuid = str(uuid4())
+ name = f"scaleout_{uuid}"
+ operation = scaling_criteria[
+ "scale-out-relational-operation"
+ ]
+ rel_operator = rel_operation_types.get(operation, "<=")
+ metric_selector = f'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}'
+ expression = f"(count ({metric_selector}) < {instances_max_number}) and (avg({metric_selector}) {rel_operator} {scaleout_threshold})"
+ labels = {
+ "ns_id": nsr_id,
+ "vnf_member_index": vnf_member_index,
+ "vdu_id": vdu_id,
+ }
+ prom_cfg = {
+ "alert": name,
+ "expr": expression,
+ "for": str(threshold_time) + "m",
+ "labels": labels,
+ }
+ action = scaling_policy
+ action = {
+ "scaling-group": scaling_group_name,
+ "cooldown-time": cooldown_time,
+ }
+ alert = {
+ "uuid": uuid,
+ "name": name,
+ "metric": metric_name,
+ "tags": {
+ "ns_id": nsr_id,
+ "vnf_member_index": vnf_member_index,
+ "vdu_id": vdu_id,
+ },
+ "alarm_status": "ok",
+ "action_type": "scale_out",
+ "action": action,
+ "prometheus_config": prom_cfg,
+ }
+ alerts.append(alert)
+ return alerts
+
def update_nsrs_with_pla_result(self, params):
try:
nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
db_nsr_update["detailed-status"] = "Done"
db_nslcmop_update["detailed-status"] = "Done"
nslcmop_operation_state = "COMPLETED"
+ # Gather auto-healing and auto-scaling alerts for each vnfr
+ healing_alerts = []
+ scaling_alerts = []
+ for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
+ vnfd = next(
+ (sub for sub in db_vnfds if sub["_id"] == vnfr["vnfd-id"]), None
+ )
+ healing_alerts = self._gather_vnfr_healing_alerts(vnfr, vnfd)
+ for alert in healing_alerts:
+ self.logger.info(f"Storing healing alert in MongoDB: {alert}")
+ self.db.create("alerts", alert)
+
+ scaling_alerts = self._gather_vnfr_scaling_alerts(vnfr, vnfd)
+ for alert in scaling_alerts:
+ self.logger.info(f"Storing scaling alert in MongoDB: {alert}")
+ self.db.create("alerts", alert)
if db_nsr:
self._write_ns_status(
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
},
- loop=self.loop,
)
except Exception as e:
self.logger.error(
"operationState": nslcmop_operation_state,
"autoremove": autoremove,
},
- loop=self.loop,
)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
)
+ self.logger.debug(f"Deleting alerts: ns_id={nsr_id}")
+ self.db.del_list("alerts", {"tags.ns_id": nsr_id})
self.logger.debug(logging_text + "Exit")
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
)
)
# wait and retry
- await asyncio.sleep(retries_interval, loop=self.loop)
+ await asyncio.sleep(retries_interval)
else:
if isinstance(e, asyncio.TimeoutError):
e = N2VCException(
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
},
- loop=self.loop,
)
except Exception as e:
self.logger.error(
and member_vnf_index
):
msg.update({"vnf_member_index": member_vnf_index})
- await self.msg.aiowrite("ns", change_type, msg, loop=self.loop)
+ await self.msg.aiowrite("ns", change_type, msg)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
}
- await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
+ await self.msg.aiowrite("ns", "scaled", msg)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
kdur_name = kdur.get("name")
break
- await asyncio.sleep(10, loop=self.loop)
+ await asyncio.sleep(10)
else:
if vdu_id and vdu_index is not None:
raise LcmException(
not isinstance(job.get("job_name"), str)
or vnfr_id not in job["job_name"]
):
- job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
+ job["job_name"] = vnfr_id + "_" + str(SystemRandom().randint(1, 10000))
job["nsr_id"] = nsr_id
job["vnfr_id"] = vnfr_id
return job_list
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
}
- await self.msg.aiowrite("ns", "migrated", msg, loop=self.loop)
+ await self.msg.aiowrite("ns", "migrated", msg)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
for target_vdu in target_vdu_list:
deploy_params_vdu = target_vdu
# Set run-day1 vnf level value if not vdu level value exists
- if not deploy_params_vdu.get("run-day1") and target_vnf[
- "additionalParams"
- ].get("run-day1"):
+ if not deploy_params_vdu.get("run-day1") and target_vnf.get(
+ "additionalParams", {}
+ ).get("run-day1"):
deploy_params_vdu["run-day1"] = target_vnf[
"additionalParams"
].get("run-day1")
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
}
- await self.msg.aiowrite("ns", "healed", msg, loop=self.loop)
+ await self.msg.aiowrite("ns", "healed", msg)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
self.logger.debug("Wait Heal RO > {}".format(operational_status_ro))
if operational_status_ro != "healing":
break
- await asyncio.sleep(15, loop=self.loop)
+ await asyncio.sleep(15)
else: # timeout_ns_deploy
raise NgRoException("Timeout waiting ns to deploy")
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
}
- await self.msg.aiowrite("ns", "verticalscaled", msg, loop=self.loop)
+ await self.msg.aiowrite("ns", "verticalscaled", msg)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)