Fix bug 2417: KSU creation from SW catalog path 63/15563/1
authorgarciadeblas <gerardo.garciadeblas@telefonica.com>
Wed, 3 Dec 2025 07:58:42 +0000 (08:58 +0100)
committergarciadeblas <gerardo.garciadeblas@telefonica.com>
Wed, 3 Dec 2025 07:59:08 +0000 (08:59 +0100)
Change-Id: If131a045ffea526e4e0712e6f151b7879cfe2281
Signed-off-by: garciadeblas <gerardo.garciadeblas@telefonica.com>
osm_nbi/k8s_topics.py

index a1d21b7..333969a 100644 (file)
@@ -429,8 +429,8 @@ class ClusterTopic(ACMTopic):
                 raise EngineException(error_msg, HTTPStatus.UNPROCESSABLE_ENTITY)
 
     def edit_extra_before(self, session, _id, indata=None, kwargs=None, content=None):
-        check = self.db.get_one(self.topic, {"_id": _id})
-        if "name" in indata and check["name"] != indata["name"]:
+        check_dict = self.db.get_one(self.topic, {"_id": _id})
+        if "name" in indata and check_dict["name"] != indata["name"]:
             self.check_unique_name(session, indata["name"])
             _filter = {"name": indata["name"]}
             topic_list = [
@@ -451,7 +451,7 @@ class ClusterTopic(ACMTopic):
                     )
             # Replace name in k8scluster and profiles
             for topic in topic_list:
-                data = self.db.get_one(topic, {"name": check["name"]})
+                data = self.db.get_one(topic, {"name": check_dict["name"]})
                 data["name"] = indata["name"]
                 self.db.replace(topic, data["_id"], data)
         return True
@@ -627,13 +627,13 @@ class ClusterTopic(ACMTopic):
         return {"cluster_id": _id, "operation_id": op_id, "force": session["force"]}
 
     def delete(self, session, _id, dry_run=False, not_send_msg=None):
-        check = {"cluster": _id}
-        self.check_dependency(check, operation_type="delete")
+        check_dict = {"cluster": _id}
+        self.check_dependency(check_dict, operation_type="delete")
         filter_q = self._get_project_filter(session)
         filter_q[self.id_field(self.topic, _id)] = _id
-        check = self.db.get_one(self.topic, filter_q)
-        op_id = check["current_operation"]
-        if check["created"] == "false":
+        check_dict = self.db.get_one(self.topic, filter_q)
+        op_id = check_dict["current_operation"]
+        if check_dict["created"] == "false":
             raise EngineException(
                 "Cannot delete registered cluster. Please deregister.",
                 HTTPStatus.UNPROCESSABLE_ENTITY,
@@ -1063,34 +1063,23 @@ class KsusTopic(ACMTopic):
     def new(self, rollback, session, indata=None, kwargs=None, headers=None):
         _id_list = []
         for content in indata["ksus"]:
-            check = {content["profile"]["profile_type"]: content["profile"]["_id"]}
-            oka = content["oka"][0]
-            oka_flag = ""
-            if oka["_id"]:
-                check["okas"] = []
-                oka_flag = "_id"
-                oka["sw_catalog_path"] = ""
-            elif oka["sw_catalog_path"]:
-                oka_flag = "sw_catalog_path"
+            check_dict = {content["profile"]["profile_type"]: content["profile"]["_id"]}
+            check_dict["okas"] = []
 
             for okas in content["oka"]:
-                if okas.get("_id") is not None:
-                    check["okas"].append(okas["_id"])
-                if okas["_id"] and okas["sw_catalog_path"]:
+                if "_id" in okas and "sw_catalog_path" in okas:
                     raise EngineException(
                         "Cannot create ksu with both OKA and SW catalog path",
                         HTTPStatus.UNPROCESSABLE_ENTITY,
                     )
-                if not okas["sw_catalog_path"]:
-                    okas.pop("sw_catalog_path")
-                elif not okas["_id"]:
-                    okas.pop("_id")
-                if oka_flag not in okas.keys():
+                elif "_id" not in okas and "sw_catalog_path" not in okas:
                     raise EngineException(
-                        "Cannot create ksu. Give either OKA or SW catalog path for all oka in a KSU",
+                        "Cannot create ksu. Either oka id or SW catalog path is required for all OKA in a KSU",
                         HTTPStatus.UNPROCESSABLE_ENTITY,
                     )
-            self.check_dependency(check)
+                elif "_id" in okas:
+                    check_dict["okas"].append(okas["_id"])
+            self.check_dependency(check_dict)
 
             # Override descriptor with query string kwargs
             content = self._remove_envelop(content)
@@ -1143,11 +1132,11 @@ class KsusTopic(ACMTopic):
         return _id_list, op_id
 
     def clone(self, rollback, session, _id, indata, kwargs, headers):
-        check = {
+        check_dict = {
             "ksu": _id,
             indata["profile"]["profile_type"]: indata["profile"]["_id"],
         }
-        self.check_dependency(check)
+        self.check_dependency(check_dict)
         filter_db = self._get_project_filter(session)
         filter_db[BaseTopic.id_field(self.topic, _id)] = _id
         data = self.db.get_one(self.topic, filter_db)
@@ -1176,11 +1165,11 @@ class KsusTopic(ACMTopic):
             )
 
     def move_ksu(self, session, _id, indata=None, kwargs=None, content=None):
-        check = {
+        check_dict = {
             "ksu": _id,
             indata["profile"]["profile_type"]: indata["profile"]["_id"],
         }
-        self.check_dependency(check)
+        self.check_dependency(check_dict)
         indata = self._remove_envelop(indata)
 
         # Override descriptor with query string kwargs
@@ -1304,18 +1293,18 @@ class KsusTopic(ACMTopic):
         return outdata
 
     def edit_ksu(self, session, _id, indata, kwargs):
-        check = {
+        check_dict = {
             "ksu": _id,
         }
         if indata.get("profile"):
-            check[indata["profile"]["profile_type"]] = indata["profile"]["_id"]
+            check_dict[indata["profile"]["profile_type"]] = indata["profile"]["_id"]
         if indata.get("oka"):
-            check["okas"] = []
+            check_dict["okas"] = []
             for oka in indata["oka"]:
                 if oka.get("_id") is not None:
-                    check["okas"].append(oka["_id"])
+                    check_dict["okas"].append(oka["_id"])
 
-        self.check_dependency(check)
+        self.check_dependency(check_dict)
         content = None
         indata = self._remove_envelop(indata)