From 045305d441a4416c8cee6b6566b3a74e1a77421f Mon Sep 17 00:00:00 2001 From: yshah Date: Fri, 17 Jan 2025 04:46:03 +0000 Subject: [PATCH] Check dependency between NBI operations Change-Id: I77e1e5c78cba1deb3f8fe8ba5b4c2fed4ff36cff Signed-off-by: yshah Signed-off-by: garciadeblas --- osm_nbi/acm_topic.py | 50 +++++++++++++++++++++++++++++++++++++- osm_nbi/k8s_topics.py | 56 +++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 103 insertions(+), 3 deletions(-) diff --git a/osm_nbi/acm_topic.py b/osm_nbi/acm_topic.py index 8d529aa..eb335da 100644 --- a/osm_nbi/acm_topic.py +++ b/osm_nbi/acm_topic.py @@ -26,7 +26,8 @@ from osm_common.fsbase import FsException from osm_nbi.base_topic import BaseTopic, EngineException from osm_nbi.validation import ValidationError -# import logging +import logging + # import random # import string # from yaml import safe_load, YAMLError @@ -35,6 +36,11 @@ from osm_nbi.validation import ValidationError class ACMOperationTopic: def __init__(self, db, fs, msg, auth): self.multiproject = None # Declare the attribute here + self.db = db + self.fs = fs + self.msg = msg + self.logger = logging.getLogger("nbi.base") + self.auth = auth @staticmethod def format_on_operation(content, operation_type, operation_params=None): @@ -57,6 +63,48 @@ class ACMOperationTopic: content["operationHistory"].append(operation) return op_id + def check_dependency(self, check, operation_type=None): + topic_to_db_mapping = { + "cluster": "clusters", + "ksu": "ksus", + "infra_controller_profiles": "k8sinfra_controller", + "infra_config_profiles": "k8sinfra_config", + "resource_profiles": "k8sresource", + "app_profiles": "k8sapp", + "oka": "okas", + } + for topic, _id in check.items(): + filter_q = { + "_id": _id, + } + if topic == "okas": + for oka_element in check[topic]: + self.check_dependency({"oka": oka_element}) + if topic not in ("okas"): + element_content = self.db.get_one(topic_to_db_mapping[topic], filter_q) + element_name = element_content.get("name") + state = element_content["state"] + if ( + operation_type == "delete" + and state == "FAILED_CREATION" + and element_content["operatingState"] == "IDLE" + ): + self.logger.info(f"Delete operation is allowed in {state} state") + return + elif element_content["state"] != "CREATED": + raise EngineException( + f"State of the {element_name} {topic} is {state}", + HTTPStatus.UNPROCESSABLE_ENTITY, + ) + elif ( + state == "CREATED" + and element_content["operatingState"] == "PROCESSING" + ): + raise EngineException( + f"operatingState of the {element_name} {topic} is not IDLE", + HTTPStatus.UNPROCESSABLE_ENTITY, + ) + class ACMTopic(BaseTopic, ACMOperationTopic): def __init__(self, db, fs, msg, auth): diff --git a/osm_nbi/k8s_topics.py b/osm_nbi/k8s_topics.py index 3cf676b..b295739 100644 --- a/osm_nbi/k8s_topics.py +++ b/osm_nbi/k8s_topics.py @@ -71,6 +71,8 @@ class InfraContTopic(ProfileTopic): return self.default_profile(rollback, session, indata, kwargs, headers) def delete(self, session, _id, dry_run=False, not_send_msg=None): + check = {"infra_controller_profiles": _id} + self.check_dependency(check, operation_type="delete") self.delete_profile(session, _id, dry_run, not_send_msg) return _id @@ -93,6 +95,8 @@ class InfraConfTopic(ProfileTopic): return self.default_profile(rollback, session, indata, kwargs, headers) def delete(self, session, _id, dry_run=False, not_send_msg=None): + check = {"infra_config_profiles": _id} + self.check_dependency(check, operation_type="delete") self.delete_profile(session, _id, dry_run, not_send_msg) return _id @@ -115,6 +119,8 @@ class AppTopic(ProfileTopic): return self.default_profile(rollback, session, indata, kwargs, headers) def delete(self, session, _id, dry_run=False, not_send_msg=None): + check = {"app_profiles": _id} + self.check_dependency(check, operation_type="delete") self.delete_profile(session, _id, dry_run, not_send_msg) return _id @@ -137,6 +143,8 @@ class ResourceTopic(ProfileTopic): return self.default_profile(rollback, session, indata, kwargs, headers) def delete(self, session, _id, dry_run=False, not_send_msg=None): + check = {"resource_profiles": _id} + self.check_dependency(check, operation_type="delete") self.delete_profile(session, _id, dry_run, not_send_msg) return _id @@ -387,6 +395,8 @@ class ClusterTopic(ACMTopic): return True def add_profile(self, session, _id, item, indata=None): + check = {"cluster": _id, item: indata["add_profile"][0]["id"]} + self.check_dependency(check) indata = self._remove_envelop(indata) operation_params = indata profile_id = indata["add_profile"][0]["id"] @@ -430,6 +440,8 @@ class ClusterTopic(ACMTopic): return default_profiles def remove_profile(self, session, _id, item, indata): + check = {"cluster": _id, item: indata["remove_profile"][0]["id"]} + self.check_dependency(check) indata = self._remove_envelop(indata) operation_params = indata profile_id = indata["remove_profile"][0]["id"] @@ -553,6 +565,8 @@ class ClusterTopic(ACMTopic): return {"cluster_id": _id, "operation_id": op_id, "force": session["force"]} def delete(self, session, _id, dry_run=False, not_send_msg=None): + check = {"cluster": _id} + self.check_dependency(check, operation_type="delete") filter_q = self._get_project_filter(session) filter_q[self.id_field(self.topic, _id)] = _id check = self.db.get_one(self.topic, filter_q) @@ -802,17 +816,20 @@ class KsusTopic(ACMTopic): def new(self, rollback, session, indata=None, kwargs=None, headers=None): _id_list = [] - for ksus in indata["ksus"]: - content = ksus + for content in indata["ksus"]: + check = {content["profile"]["profile_type"]: content["profile"]["_id"]} oka = content["oka"][0] oka_flag = "" if oka["_id"]: + check["okas"] = [] oka_flag = "_id" oka["sw_catalog_path"] = "" elif oka["sw_catalog_path"]: oka_flag = "sw_catalog_path" for okas in content["oka"]: + if okas.get("_id") is not None: + check["okas"].append(okas["_id"]) if okas["_id"] and okas["sw_catalog_path"]: raise EngineException( "Cannot create ksu with both OKA and SW catalog path", @@ -827,6 +844,7 @@ class KsusTopic(ACMTopic): "Cannot create ksu. Give either OKA or SW catalog path for all oka in a KSU", HTTPStatus.UNPROCESSABLE_ENTITY, ) + self.check_dependency(check) # Override descriptor with query string kwargs content = self._remove_envelop(content) @@ -864,6 +882,11 @@ class KsusTopic(ACMTopic): return _id_list, op_id def clone(self, rollback, session, _id, indata, kwargs, headers): + check = { + "ksu": _id, + indata["profile"]["profile_type"]: indata["profile"]["_id"], + } + self.check_dependency(check) filter_db = self._get_project_filter(session) filter_db[BaseTopic.id_field(self.topic, _id)] = _id data = self.db.get_one(self.topic, filter_db) @@ -892,6 +915,11 @@ class KsusTopic(ACMTopic): ) def move_ksu(self, session, _id, indata=None, kwargs=None, content=None): + check = { + "ksu": _id, + indata["profile"]["profile_type"]: indata["profile"]["_id"], + } + self.check_dependency(check) indata = self._remove_envelop(indata) # Override descriptor with query string kwargs @@ -969,6 +997,18 @@ class KsusTopic(ACMTopic): self._send_msg("edit", data) def edit_ksu(self, session, _id, indata, kwargs): + check = { + "ksu": _id, + } + if indata.get("profile"): + check[indata["profile"]["profile_type"]] = indata["profile"]["_id"] + if indata.get("oka"): + check["okas"] = [] + for oka in indata["oka"]: + if oka.get("_id") is not None: + check["okas"].append(oka["_id"]) + + self.check_dependency(check) content = None indata = self._remove_envelop(indata) @@ -1034,6 +1074,13 @@ class KsusTopic(ACMTopic): filter_q = self._get_project_filter(session) filter_q[self.id_field(self.topic, _id)] = _id item_content = self.db.get_one(self.topic, filter_q) + + check = { + "ksu": _id, + item_content["profile"]["profile_type"]: item_content["profile"]["_id"], + } + self.check_dependency(check, operation_type="delete") + item_content["state"] = "IN_DELETION" item_content["operatingState"] = "PROCESSING" item_content["resourceState"] = "IN_PROGRESS" @@ -1144,6 +1191,8 @@ class OkaTopic(DescriptorTopic, ACMOperationTopic): raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY) def delete(self, session, _id, dry_run=False, not_send_msg=None): + check = {"oka": _id} + self.check_dependency(check, operation_type="delete") if not self.multiproject: filter_q = {} else: @@ -1197,6 +1246,9 @@ class OkaTopic(DescriptorTopic, ACMOperationTopic): return _id, op_id def upload_content(self, session, _id, indata, kwargs, headers): + if headers["Method"] in ("PUT", "PATCH"): + check = {"oka": _id} + self.check_dependency(check) current_desc = self.show(session, _id) compressed = None -- 2.25.1