From 986c308d5d89b8dbb3361ce2f1638b3285859d8e Mon Sep 17 00:00:00 2001 From: garciadeblas Date: Thu, 4 May 2023 11:54:30 +0200 Subject: [PATCH] Refactor MongoUpgrade1214 in db_upgrade.py Change-Id: I1da050701c20d1a04c34457743b43a89a2786929 Signed-off-by: garciadeblas --- .../osm-update-db-operator/src/db_upgrade.py | 425 ++++++++++-------- 1 file changed, 226 insertions(+), 199 deletions(-) diff --git a/installers/charm/osm-update-db-operator/src/db_upgrade.py b/installers/charm/osm-update-db-operator/src/db_upgrade.py index d0b3e9f6..72eaa81f 100644 --- a/installers/charm/osm-update-db-operator/src/db_upgrade.py +++ b/installers/charm/osm-update-db-operator/src/db_upgrade.py @@ -26,6 +26,212 @@ logger = logging.getLogger(__name__) class MongoUpgrade1214: """Upgrade MongoDB Database from OSM v12 to v14.""" + @staticmethod + def gather_vnfr_healing_alerts(vnfr, vnfd): + alerts = [] + nsr_id = vnfr["nsr-id-ref"] + df = vnfd.get("df", [{}])[0] + # Checking for auto-healing configuration + if "healing-aspect" in df: + healing_aspects = df["healing-aspect"] + for healing in healing_aspects: + for healing_policy in healing.get("healing-policy", ()): + vdu_id = healing_policy["vdu-id"] + vdur = next( + ( + vdur + for vdur in vnfr["vdur"] + if vdu_id == vdur["vdu-id-ref"] + ), + {}, + ) + if not vdur: + continue + metric_name = "vm_status" + vdu_name = vdur.get("name") + vnf_member_index = vnfr["member-vnf-index-ref"] + uuid = str(uuid4()) + name = f"healing_{uuid}" + action = healing_policy + # action_on_recovery = healing.get("action-on-recovery") + # cooldown_time = healing.get("cooldown-time") + # day1 = healing.get("day1") + alert = { + "uuid": uuid, + "name": name, + "metric": metric_name, + "tags": { + "ns_id": nsr_id, + "vnf_member_index": vnf_member_index, + "vdu_name": vdu_name, + }, + "alarm_status": "ok", + "action_type": "healing", + "action": action, + } + alerts.append(alert) + return alerts + + @staticmethod + def gather_vnfr_scaling_alerts(vnfr, vnfd): + alerts = [] + nsr_id = vnfr["nsr-id-ref"] + df = vnfd.get("df", [{}])[0] + # Checking for auto-scaling configuration + if "scaling-aspect" in df: + rel_operation_types = { + "GE": ">=", + "LE": "<=", + "GT": ">", + "LT": "<", + "EQ": "==", + "NE": "!=", + } + scaling_aspects = df["scaling-aspect"] + all_vnfd_monitoring_params = {} + for ivld in vnfd.get("int-virtual-link-desc", ()): + for mp in ivld.get("monitoring-parameters", ()): + all_vnfd_monitoring_params[mp.get("id")] = mp + for vdu in vnfd.get("vdu", ()): + for mp in vdu.get("monitoring-parameter", ()): + all_vnfd_monitoring_params[mp.get("id")] = mp + for df in vnfd.get("df", ()): + for mp in df.get("monitoring-parameter", ()): + all_vnfd_monitoring_params[mp.get("id")] = mp + for scaling_aspect in scaling_aspects: + scaling_group_name = scaling_aspect.get("name", "") + # Get monitored VDUs + all_monitored_vdus = set() + for delta in scaling_aspect.get( + "aspect-delta-details", {} + ).get("deltas", ()): + for vdu_delta in delta.get("vdu-delta", ()): + all_monitored_vdus.add(vdu_delta.get("id")) + monitored_vdurs = list( + filter( + lambda vdur: vdur["vdu-id-ref"] + in all_monitored_vdus, + vnfr["vdur"], + ) + ) + if not monitored_vdurs: + logger.error("Scaling criteria is referring to a vnf-monitoring-param that does not contain a reference to a vdu or vnf metric") + continue + for scaling_policy in scaling_aspect.get( + "scaling-policy", () + ): + if scaling_policy["scaling-type"] != "automatic": + continue + threshold_time = scaling_policy.get( + "threshold-time", "1" + ) + cooldown_time = scaling_policy.get("cooldown-time", "0") + for scaling_criteria in scaling_policy["scaling-criteria"]: + monitoring_param_ref = scaling_criteria.get( + "vnf-monitoring-param-ref" + ) + vnf_monitoring_param = all_vnfd_monitoring_params[ + monitoring_param_ref + ] + for vdur in monitored_vdurs: + vdu_id = vdur["vdu-id-ref"] + metric_name = vnf_monitoring_param.get("performance-metric") + vdu_name = vdur["name"] + vnf_member_index = vnfr["member-vnf-index-ref"] + scalein_threshold = scaling_criteria.get("scale-in-threshold") + # Looking for min/max-number-of-instances + instances_min_number = 1 + instances_max_number = 1 + vdu_profile = df["vdu-profile"] + if vdu_profile: + profile = next( + item + for item in vdu_profile + if item["id"] == vdu_id + ) + instances_min_number = profile.get("min-number-of-instances", 1) + instances_max_number = profile.get("max-number-of-instances", 1) + + if scalein_threshold: + uuid = str(uuid4()) + name = f"scalein_{uuid}" + operation = scaling_criteria["scale-in-relational-operation"] + rel_operator = rel_operation_types.get(operation, "<=") + metric_selector = f'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}' + expression = f"(count ({metric_selector}) > {instances_min_number}) and (avg({metric_selector}) {rel_operator} {scalein_threshold})" + labels = { + "ns_id": nsr_id, + "vnf_member_index": vnf_member_index, + "vdu_id": vdu_id, + } + prom_cfg = { + "alert": name, + "expr": expression, + "for": str(threshold_time) + "m", + "labels": labels, + } + action = scaling_policy + action = { + "scaling-group": scaling_group_name, + "cooldown-time": cooldown_time, + } + alert = { + "uuid": uuid, + "name": name, + "metric": metric_name, + "tags": { + "ns_id": nsr_id, + "vnf_member_index": vnf_member_index, + "vdu_id": vdu_id, + }, + "alarm_status": "ok", + "action_type": "scale_in", + "action": action, + "prometheus_config": prom_cfg, + } + alerts.append(alert) + + scaleout_threshold = scaling_criteria.get("scale-out-threshold") + if scaleout_threshold: + uuid = str(uuid4()) + name = f"scaleout_{uuid}" + operation = scaling_criteria["scale-out-relational-operation"] + rel_operator = rel_operation_types.get(operation, "<=") + metric_selector = f'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}' + expression = f"(count ({metric_selector}) < {instances_max_number}) and (avg({metric_selector}) {rel_operator} {scaleout_threshold})" + labels = { + "ns_id": nsr_id, + "vnf_member_index": vnf_member_index, + "vdu_id": vdu_id, + } + prom_cfg = { + "alert": name, + "expr": expression, + "for": str(threshold_time) + "m", + "labels": labels, + } + action = scaling_policy + action = { + "scaling-group": scaling_group_name, + "cooldown-time": cooldown_time, + } + alert = { + "uuid": uuid, + "name": name, + "metric": metric_name, + "tags": { + "ns_id": nsr_id, + "vnf_member_index": vnf_member_index, + "vdu_id": vdu_id, + }, + "alarm_status": "ok", + "action_type": "scale_out", + "action": action, + "prometheus_config": prom_cfg, + } + alerts.append(alert) + return alerts + @staticmethod def _migrate_alerts(osm_db): """Create new alerts collection. @@ -33,216 +239,37 @@ class MongoUpgrade1214: if "alerts" in osm_db.list_collection_names(): return logger.info("Entering in MongoUpgrade1214._migrate_alerts function") + + # Get vnfds from MongoDB logger.info("Reading VNF descriptors:") - alerts = osm_db["alerts"] vnfds = osm_db["vnfds"] db_vnfds = [] for vnfd in vnfds.find(): logger.info(f' {vnfd["_id"]}: {vnfd["description"]}') db_vnfds.append(vnfd) + + # Get vnfrs from MongoDB logger.info("Reading VNFRs") vnfrs = osm_db["vnfrs"] + + # Gather healing and scaling alerts for each vnfr + healing_alerts = [] + scaling_alerts = [] for vnfr in vnfrs.find(): logger.info(f' vnfr {vnfr["_id"]}') vnfd = next((sub for sub in db_vnfds if sub["_id"] == vnfr["vnfd-id"]), None) - nsr_id = vnfr["nsr-id-ref"] - # Checking for auto-healing configuration - df = vnfd.get("df", [{}])[0] - if "healing-aspect" in df: - healing_aspects = df["healing-aspect"] - for healing in healing_aspects: - for healing_policy in healing.get("healing-policy", ()): - vdu_id = healing_policy["vdu-id"] - vdur = next( - ( - vdur - for vdur in vnfr["vdur"] - if vdu_id == vdur["vdu-id-ref"] - ), - {}, - ) - if not vdur: - continue - metric_name = "vm_status" - vdu_name = vdur.get("name") - vnf_member_index = vnfr["member-vnf-index-ref"] - uuid = str(uuid4()) - name = f"healing_{uuid}" - action = healing_policy - # action_on_recovery = healing.get("action-on-recovery") - # cooldown_time = healing.get("cooldown-time") - # day1 = healing.get("day1") - alert = { - "uuid": uuid, - "name": name, - "metric": metric_name, - "tags": { - "ns_id": nsr_id, - "vnf_member_index": vnf_member_index, - "vdu_name": vdu_name, - }, - "alarm_status": "ok", - "action_type": "healing", - "action": action, - } - logger.info(f"Storing in MongoDB alert {alert}") - alerts.insert_one(alert) - - # Checking for auto-scaling configuration - if "scaling-aspect" in df: - rel_operation_types = { - "GE": ">=", - "LE": "<=", - "GT": ">", - "LT": "<", - "EQ": "==", - "NE": "!=", - } - scaling_aspects = df["scaling-aspect"] - all_vnfd_monitoring_params = {} - for ivld in vnfd.get("int-virtual-link-desc", ()): - for mp in ivld.get("monitoring-parameters", ()): - all_vnfd_monitoring_params[mp.get("id")] = mp - for vdu in vnfd.get("vdu", ()): - for mp in vdu.get("monitoring-parameter", ()): - all_vnfd_monitoring_params[mp.get("id")] = mp - for df in vnfd.get("df", ()): - for mp in df.get("monitoring-parameter", ()): - all_vnfd_monitoring_params[mp.get("id")] = mp - for scaling_aspect in scaling_aspects: - scaling_group_name = scaling_aspect.get("name", "") - # Get monitored VDUs - all_monitored_vdus = set() - for delta in scaling_aspect.get( - "aspect-delta-details", {} - ).get("deltas", ()): - for vdu_delta in delta.get("vdu-delta", ()): - all_monitored_vdus.add(vdu_delta.get("id")) - monitored_vdurs = list( - filter( - lambda vdur: vdur["vdu-id-ref"] - in all_monitored_vdus, - vnfr["vdur"], - ) - ) - if not monitored_vdurs: - logger.error("Scaling criteria is referring to a vnf-monitoring-param that does not contain a reference to a vdu or vnf metric") - continue - for scaling_policy in scaling_aspect.get( - "scaling-policy", () - ): - if scaling_policy["scaling-type"] != "automatic": - continue - threshold_time = scaling_policy.get( - "threshold-time", "1" - ) - cooldown_time = scaling_policy.get("cooldown-time", "0") - for scaling_criteria in scaling_policy["scaling-criteria"]: - monitoring_param_ref = scaling_criteria.get( - "vnf-monitoring-param-ref" - ) - vnf_monitoring_param = all_vnfd_monitoring_params[ - monitoring_param_ref - ] - for vdur in monitored_vdurs: - vdu_id = vdur["vdu-id-ref"] - metric_name = vnf_monitoring_param.get("performance-metric") - vdu_name = vdur["name"] - vnf_member_index = vnfr["member-vnf-index-ref"] - scalein_threshold = scaling_criteria["scale-in-threshold"] - # Looking for min/max-number-of-instances - instances_min_number = 1 - instances_max_number = 1 - vdu_profile = df["vdu-profile"] - if vdu_profile: - profile = next( - item - for item in vdu_profile - if item["id"] == vdu_id - ) - instances_min_number = profile.get("min-number-of-instances", 1) - instances_max_number = profile.get("max-number-of-instances", 1) - - if scalein_threshold: - uuid = str(uuid4()) - name = f"scalein_{uuid}" - operation = scaling_criteria["scale-in-relational-operation"] - rel_operator = rel_operation_types.get(operation, "<=") - metric_selector = f'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}' - expression = f"(count ({metric_selector}) > {instances_min_number}) and (avg({metric_selector}) {rel_operator} {scalein_threshold})" - labels = { - "ns_id": nsr_id, - "vnf_member_index": vnf_member_index, - "vdu_id": vdu_id, - } - prom_cfg = { - "alert": name, - "expr": expression, - "for": str(threshold_time) + "m", - "labels": labels, - } - action = scaling_policy - action = { - "scaling-group": scaling_group_name, - "cooldown-time": cooldown_time, - } - alert = { - "uuid": uuid, - "name": name, - "metric": metric_name, - "tags": { - "ns_id": nsr_id, - "vnf_member_index": vnf_member_index, - "vdu_id": vdu_id, - }, - "alarm_status": "ok", - "action_type": "scale_in", - "action": action, - "prometheus_config": prom_cfg, - } - logger.info(f"Storing in MongoDB alert {alert}") - alerts.insert_one(alert) - - scaleout_threshold = scaling_criteria["scale-out-threshold"] - if scaleout_threshold: - uuid = str(uuid4()) - name = f"scaleout_{uuid}" - operation = scaling_criteria["scale-out-relational-operation"] - rel_operator = rel_operation_types.get(operation, "<=") - metric_selector = f'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}' - expression = f"(count ({metric_selector}) < {instances_max_number}) and (avg({metric_selector}) {rel_operator} {scaleout_threshold})" - labels = { - "ns_id": nsr_id, - "vnf_member_index": vnf_member_index, - "vdu_id": vdu_id, - } - prom_cfg = { - "alert": name, - "expr": expression, - "for": str(threshold_time) + "m", - "labels": labels, - } - action = scaling_policy - action = { - "scaling-group": scaling_group_name, - "cooldown-time": cooldown_time, - } - alert = { - "uuid": uuid, - "name": name, - "metric": metric_name, - "tags": { - "ns_id": nsr_id, - "vnf_member_index": vnf_member_index, - "vdu_id": vdu_id, - }, - "alarm_status": "ok", - "action_type": "scale_out", - "action": action, - "prometheus_config": prom_cfg, - } - logger.info(f"Storing in MongoDB alert {alert}") - alerts.insert_one(alert) + healing_alerts.extend(MongoUpgrade1214.gather_vnfr_healing_alerts(vnfr, vnfd)) + scaling_alerts.extend(MongoUpgrade1214.gather_vnfr_scaling_alerts(vnfr, vnfd)) + + # Add new alerts in MongoDB + alerts = osm_db["alerts"] + for alert in healing_alerts: + logger.info(f"Storing healing alert in MongoDB: {alert}") + alerts.insert_one(alert) + for alert in scaling_alerts: + logger.info(f"Storing scaling alert in MongoDB: {alert}") + alerts.insert_one(alert) + # Delete old alarms collections logger.info("Deleting alarms and alarms_action collections") alarms = osm_db["alarms"] -- 2.25.1