class MongoUpgrade1214:
"""Upgrade MongoDB Database from OSM v12 to v14."""
+ @staticmethod
+ def gather_vnfr_healing_alerts(vnfr, vnfd):
+ alerts = []
+ nsr_id = vnfr["nsr-id-ref"]
+ df = vnfd.get("df", [{}])[0]
+ # Checking for auto-healing configuration
+ if "healing-aspect" in df:
+ healing_aspects = df["healing-aspect"]
+ for healing in healing_aspects:
+ for healing_policy in healing.get("healing-policy", ()):
+ vdu_id = healing_policy["vdu-id"]
+ vdur = next(
+ (
+ vdur
+ for vdur in vnfr["vdur"]
+ if vdu_id == vdur["vdu-id-ref"]
+ ),
+ {},
+ )
+ if not vdur:
+ continue
+ metric_name = "vm_status"
+ vdu_name = vdur.get("name")
+ vnf_member_index = vnfr["member-vnf-index-ref"]
+ uuid = str(uuid4())
+ name = f"healing_{uuid}"
+ action = healing_policy
+ # action_on_recovery = healing.get("action-on-recovery")
+ # cooldown_time = healing.get("cooldown-time")
+ # day1 = healing.get("day1")
+ alert = {
+ "uuid": uuid,
+ "name": name,
+ "metric": metric_name,
+ "tags": {
+ "ns_id": nsr_id,
+ "vnf_member_index": vnf_member_index,
+ "vdu_name": vdu_name,
+ },
+ "alarm_status": "ok",
+ "action_type": "healing",
+ "action": action,
+ }
+ alerts.append(alert)
+ return alerts
+
+ @staticmethod
+ def gather_vnfr_scaling_alerts(vnfr, vnfd):
+ alerts = []
+ nsr_id = vnfr["nsr-id-ref"]
+ df = vnfd.get("df", [{}])[0]
+ # Checking for auto-scaling configuration
+ if "scaling-aspect" in df:
+ rel_operation_types = {
+ "GE": ">=",
+ "LE": "<=",
+ "GT": ">",
+ "LT": "<",
+ "EQ": "==",
+ "NE": "!=",
+ }
+ scaling_aspects = df["scaling-aspect"]
+ all_vnfd_monitoring_params = {}
+ for ivld in vnfd.get("int-virtual-link-desc", ()):
+ for mp in ivld.get("monitoring-parameters", ()):
+ all_vnfd_monitoring_params[mp.get("id")] = mp
+ for vdu in vnfd.get("vdu", ()):
+ for mp in vdu.get("monitoring-parameter", ()):
+ all_vnfd_monitoring_params[mp.get("id")] = mp
+ for df in vnfd.get("df", ()):
+ for mp in df.get("monitoring-parameter", ()):
+ all_vnfd_monitoring_params[mp.get("id")] = mp
+ for scaling_aspect in scaling_aspects:
+ scaling_group_name = scaling_aspect.get("name", "")
+ # Get monitored VDUs
+ all_monitored_vdus = set()
+ for delta in scaling_aspect.get(
+ "aspect-delta-details", {}
+ ).get("deltas", ()):
+ for vdu_delta in delta.get("vdu-delta", ()):
+ all_monitored_vdus.add(vdu_delta.get("id"))
+ monitored_vdurs = list(
+ filter(
+ lambda vdur: vdur["vdu-id-ref"]
+ in all_monitored_vdus,
+ vnfr["vdur"],
+ )
+ )
+ if not monitored_vdurs:
+ logger.error("Scaling criteria is referring to a vnf-monitoring-param that does not contain a reference to a vdu or vnf metric")
+ continue
+ for scaling_policy in scaling_aspect.get(
+ "scaling-policy", ()
+ ):
+ if scaling_policy["scaling-type"] != "automatic":
+ continue
+ threshold_time = scaling_policy.get(
+ "threshold-time", "1"
+ )
+ cooldown_time = scaling_policy.get("cooldown-time", "0")
+ for scaling_criteria in scaling_policy["scaling-criteria"]:
+ monitoring_param_ref = scaling_criteria.get(
+ "vnf-monitoring-param-ref"
+ )
+ vnf_monitoring_param = all_vnfd_monitoring_params[
+ monitoring_param_ref
+ ]
+ for vdur in monitored_vdurs:
+ vdu_id = vdur["vdu-id-ref"]
+ metric_name = vnf_monitoring_param.get("performance-metric")
+ vdu_name = vdur["name"]
+ vnf_member_index = vnfr["member-vnf-index-ref"]
+ scalein_threshold = scaling_criteria.get("scale-in-threshold")
+ # Looking for min/max-number-of-instances
+ instances_min_number = 1
+ instances_max_number = 1
+ vdu_profile = df["vdu-profile"]
+ if vdu_profile:
+ profile = next(
+ item
+ for item in vdu_profile
+ if item["id"] == vdu_id
+ )
+ instances_min_number = profile.get("min-number-of-instances", 1)
+ instances_max_number = profile.get("max-number-of-instances", 1)
+
+ if scalein_threshold:
+ uuid = str(uuid4())
+ name = f"scalein_{uuid}"
+ operation = scaling_criteria["scale-in-relational-operation"]
+ rel_operator = rel_operation_types.get(operation, "<=")
+ metric_selector = f'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}'
+ expression = f"(count ({metric_selector}) > {instances_min_number}) and (avg({metric_selector}) {rel_operator} {scalein_threshold})"
+ labels = {
+ "ns_id": nsr_id,
+ "vnf_member_index": vnf_member_index,
+ "vdu_id": vdu_id,
+ }
+ prom_cfg = {
+ "alert": name,
+ "expr": expression,
+ "for": str(threshold_time) + "m",
+ "labels": labels,
+ }
+ action = scaling_policy
+ action = {
+ "scaling-group": scaling_group_name,
+ "cooldown-time": cooldown_time,
+ }
+ alert = {
+ "uuid": uuid,
+ "name": name,
+ "metric": metric_name,
+ "tags": {
+ "ns_id": nsr_id,
+ "vnf_member_index": vnf_member_index,
+ "vdu_id": vdu_id,
+ },
+ "alarm_status": "ok",
+ "action_type": "scale_in",
+ "action": action,
+ "prometheus_config": prom_cfg,
+ }
+ alerts.append(alert)
+
+ scaleout_threshold = scaling_criteria.get("scale-out-threshold")
+ if scaleout_threshold:
+ uuid = str(uuid4())
+ name = f"scaleout_{uuid}"
+ operation = scaling_criteria["scale-out-relational-operation"]
+ rel_operator = rel_operation_types.get(operation, "<=")
+ metric_selector = f'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}'
+ expression = f"(count ({metric_selector}) < {instances_max_number}) and (avg({metric_selector}) {rel_operator} {scaleout_threshold})"
+ labels = {
+ "ns_id": nsr_id,
+ "vnf_member_index": vnf_member_index,
+ "vdu_id": vdu_id,
+ }
+ prom_cfg = {
+ "alert": name,
+ "expr": expression,
+ "for": str(threshold_time) + "m",
+ "labels": labels,
+ }
+ action = scaling_policy
+ action = {
+ "scaling-group": scaling_group_name,
+ "cooldown-time": cooldown_time,
+ }
+ alert = {
+ "uuid": uuid,
+ "name": name,
+ "metric": metric_name,
+ "tags": {
+ "ns_id": nsr_id,
+ "vnf_member_index": vnf_member_index,
+ "vdu_id": vdu_id,
+ },
+ "alarm_status": "ok",
+ "action_type": "scale_out",
+ "action": action,
+ "prometheus_config": prom_cfg,
+ }
+ alerts.append(alert)
+ return alerts
+
@staticmethod
def _migrate_alerts(osm_db):
"""Create new alerts collection.
if "alerts" in osm_db.list_collection_names():
return
logger.info("Entering in MongoUpgrade1214._migrate_alerts function")
+
+ # Get vnfds from MongoDB
logger.info("Reading VNF descriptors:")
- alerts = osm_db["alerts"]
vnfds = osm_db["vnfds"]
db_vnfds = []
for vnfd in vnfds.find():
logger.info(f' {vnfd["_id"]}: {vnfd["description"]}')
db_vnfds.append(vnfd)
+
+ # Get vnfrs from MongoDB
logger.info("Reading VNFRs")
vnfrs = osm_db["vnfrs"]
+
+ # Gather healing and scaling alerts for each vnfr
+ healing_alerts = []
+ scaling_alerts = []
for vnfr in vnfrs.find():
logger.info(f' vnfr {vnfr["_id"]}')
vnfd = next((sub for sub in db_vnfds if sub["_id"] == vnfr["vnfd-id"]), None)
- nsr_id = vnfr["nsr-id-ref"]
- # Checking for auto-healing configuration
- df = vnfd.get("df", [{}])[0]
- if "healing-aspect" in df:
- healing_aspects = df["healing-aspect"]
- for healing in healing_aspects:
- for healing_policy in healing.get("healing-policy", ()):
- vdu_id = healing_policy["vdu-id"]
- vdur = next(
- (
- vdur
- for vdur in vnfr["vdur"]
- if vdu_id == vdur["vdu-id-ref"]
- ),
- {},
- )
- if not vdur:
- continue
- metric_name = "vm_status"
- vdu_name = vdur.get("name")
- vnf_member_index = vnfr["member-vnf-index-ref"]
- uuid = str(uuid4())
- name = f"healing_{uuid}"
- action = healing_policy
- # action_on_recovery = healing.get("action-on-recovery")
- # cooldown_time = healing.get("cooldown-time")
- # day1 = healing.get("day1")
- alert = {
- "uuid": uuid,
- "name": name,
- "metric": metric_name,
- "tags": {
- "ns_id": nsr_id,
- "vnf_member_index": vnf_member_index,
- "vdu_name": vdu_name,
- },
- "alarm_status": "ok",
- "action_type": "healing",
- "action": action,
- }
- logger.info(f"Storing in MongoDB alert {alert}")
- alerts.insert_one(alert)
-
- # Checking for auto-scaling configuration
- if "scaling-aspect" in df:
- rel_operation_types = {
- "GE": ">=",
- "LE": "<=",
- "GT": ">",
- "LT": "<",
- "EQ": "==",
- "NE": "!=",
- }
- scaling_aspects = df["scaling-aspect"]
- all_vnfd_monitoring_params = {}
- for ivld in vnfd.get("int-virtual-link-desc", ()):
- for mp in ivld.get("monitoring-parameters", ()):
- all_vnfd_monitoring_params[mp.get("id")] = mp
- for vdu in vnfd.get("vdu", ()):
- for mp in vdu.get("monitoring-parameter", ()):
- all_vnfd_monitoring_params[mp.get("id")] = mp
- for df in vnfd.get("df", ()):
- for mp in df.get("monitoring-parameter", ()):
- all_vnfd_monitoring_params[mp.get("id")] = mp
- for scaling_aspect in scaling_aspects:
- scaling_group_name = scaling_aspect.get("name", "")
- # Get monitored VDUs
- all_monitored_vdus = set()
- for delta in scaling_aspect.get(
- "aspect-delta-details", {}
- ).get("deltas", ()):
- for vdu_delta in delta.get("vdu-delta", ()):
- all_monitored_vdus.add(vdu_delta.get("id"))
- monitored_vdurs = list(
- filter(
- lambda vdur: vdur["vdu-id-ref"]
- in all_monitored_vdus,
- vnfr["vdur"],
- )
- )
- if not monitored_vdurs:
- logger.error("Scaling criteria is referring to a vnf-monitoring-param that does not contain a reference to a vdu or vnf metric")
- continue
- for scaling_policy in scaling_aspect.get(
- "scaling-policy", ()
- ):
- if scaling_policy["scaling-type"] != "automatic":
- continue
- threshold_time = scaling_policy.get(
- "threshold-time", "1"
- )
- cooldown_time = scaling_policy.get("cooldown-time", "0")
- for scaling_criteria in scaling_policy["scaling-criteria"]:
- monitoring_param_ref = scaling_criteria.get(
- "vnf-monitoring-param-ref"
- )
- vnf_monitoring_param = all_vnfd_monitoring_params[
- monitoring_param_ref
- ]
- for vdur in monitored_vdurs:
- vdu_id = vdur["vdu-id-ref"]
- metric_name = vnf_monitoring_param.get("performance-metric")
- vdu_name = vdur["name"]
- vnf_member_index = vnfr["member-vnf-index-ref"]
- scalein_threshold = scaling_criteria["scale-in-threshold"]
- # Looking for min/max-number-of-instances
- instances_min_number = 1
- instances_max_number = 1
- vdu_profile = df["vdu-profile"]
- if vdu_profile:
- profile = next(
- item
- for item in vdu_profile
- if item["id"] == vdu_id
- )
- instances_min_number = profile.get("min-number-of-instances", 1)
- instances_max_number = profile.get("max-number-of-instances", 1)
-
- if scalein_threshold:
- uuid = str(uuid4())
- name = f"scalein_{uuid}"
- operation = scaling_criteria["scale-in-relational-operation"]
- rel_operator = rel_operation_types.get(operation, "<=")
- metric_selector = f'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}'
- expression = f"(count ({metric_selector}) > {instances_min_number}) and (avg({metric_selector}) {rel_operator} {scalein_threshold})"
- labels = {
- "ns_id": nsr_id,
- "vnf_member_index": vnf_member_index,
- "vdu_id": vdu_id,
- }
- prom_cfg = {
- "alert": name,
- "expr": expression,
- "for": str(threshold_time) + "m",
- "labels": labels,
- }
- action = scaling_policy
- action = {
- "scaling-group": scaling_group_name,
- "cooldown-time": cooldown_time,
- }
- alert = {
- "uuid": uuid,
- "name": name,
- "metric": metric_name,
- "tags": {
- "ns_id": nsr_id,
- "vnf_member_index": vnf_member_index,
- "vdu_id": vdu_id,
- },
- "alarm_status": "ok",
- "action_type": "scale_in",
- "action": action,
- "prometheus_config": prom_cfg,
- }
- logger.info(f"Storing in MongoDB alert {alert}")
- alerts.insert_one(alert)
-
- scaleout_threshold = scaling_criteria["scale-out-threshold"]
- if scaleout_threshold:
- uuid = str(uuid4())
- name = f"scaleout_{uuid}"
- operation = scaling_criteria["scale-out-relational-operation"]
- rel_operator = rel_operation_types.get(operation, "<=")
- metric_selector = f'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}'
- expression = f"(count ({metric_selector}) < {instances_max_number}) and (avg({metric_selector}) {rel_operator} {scaleout_threshold})"
- labels = {
- "ns_id": nsr_id,
- "vnf_member_index": vnf_member_index,
- "vdu_id": vdu_id,
- }
- prom_cfg = {
- "alert": name,
- "expr": expression,
- "for": str(threshold_time) + "m",
- "labels": labels,
- }
- action = scaling_policy
- action = {
- "scaling-group": scaling_group_name,
- "cooldown-time": cooldown_time,
- }
- alert = {
- "uuid": uuid,
- "name": name,
- "metric": metric_name,
- "tags": {
- "ns_id": nsr_id,
- "vnf_member_index": vnf_member_index,
- "vdu_id": vdu_id,
- },
- "alarm_status": "ok",
- "action_type": "scale_out",
- "action": action,
- "prometheus_config": prom_cfg,
- }
- logger.info(f"Storing in MongoDB alert {alert}")
- alerts.insert_one(alert)
+ healing_alerts.extend(MongoUpgrade1214.gather_vnfr_healing_alerts(vnfr, vnfd))
+ scaling_alerts.extend(MongoUpgrade1214.gather_vnfr_scaling_alerts(vnfr, vnfd))
+
+ # Add new alerts in MongoDB
+ alerts = osm_db["alerts"]
+ for alert in healing_alerts:
+ logger.info(f"Storing healing alert in MongoDB: {alert}")
+ alerts.insert_one(alert)
+ for alert in scaling_alerts:
+ logger.info(f"Storing scaling alert in MongoDB: {alert}")
+ alerts.insert_one(alert)
+
# Delete old alarms collections
logger.info("Deleting alarms and alarms_action collections")
alarms = osm_db["alarms"]