raise EngineException(error_msg, HTTPStatus.UNPROCESSABLE_ENTITY)
def edit_extra_before(self, session, _id, indata=None, kwargs=None, content=None):
- check = self.db.get_one(self.topic, {"_id": _id})
- if "name" in indata and check["name"] != indata["name"]:
+ check_dict = self.db.get_one(self.topic, {"_id": _id})
+ if "name" in indata and check_dict["name"] != indata["name"]:
self.check_unique_name(session, indata["name"])
_filter = {"name": indata["name"]}
topic_list = [
)
# Replace name in k8scluster and profiles
for topic in topic_list:
- data = self.db.get_one(topic, {"name": check["name"]})
+ data = self.db.get_one(topic, {"name": check_dict["name"]})
data["name"] = indata["name"]
self.db.replace(topic, data["_id"], data)
return True
return {"cluster_id": _id, "operation_id": op_id, "force": session["force"]}
def delete(self, session, _id, dry_run=False, not_send_msg=None):
- check = {"cluster": _id}
- self.check_dependency(check, operation_type="delete")
+ check_dict = {"cluster": _id}
+ self.check_dependency(check_dict, operation_type="delete")
filter_q = self._get_project_filter(session)
filter_q[self.id_field(self.topic, _id)] = _id
- check = self.db.get_one(self.topic, filter_q)
- op_id = check["current_operation"]
- if check["created"] == "false":
+ check_dict = self.db.get_one(self.topic, filter_q)
+ op_id = check_dict["current_operation"]
+ if check_dict["created"] == "false":
raise EngineException(
"Cannot delete registered cluster. Please deregister.",
HTTPStatus.UNPROCESSABLE_ENTITY,
def new(self, rollback, session, indata=None, kwargs=None, headers=None):
_id_list = []
for content in indata["ksus"]:
- check = {content["profile"]["profile_type"]: content["profile"]["_id"]}
- oka = content["oka"][0]
- oka_flag = ""
- if oka["_id"]:
- check["okas"] = []
- oka_flag = "_id"
- oka["sw_catalog_path"] = ""
- elif oka["sw_catalog_path"]:
- oka_flag = "sw_catalog_path"
+ check_dict = {content["profile"]["profile_type"]: content["profile"]["_id"]}
+ check_dict["okas"] = []
for okas in content["oka"]:
- if okas.get("_id") is not None:
- check["okas"].append(okas["_id"])
- if okas["_id"] and okas["sw_catalog_path"]:
+ if "_id" in okas and "sw_catalog_path" in okas:
raise EngineException(
"Cannot create ksu with both OKA and SW catalog path",
HTTPStatus.UNPROCESSABLE_ENTITY,
)
- if not okas["sw_catalog_path"]:
- okas.pop("sw_catalog_path")
- elif not okas["_id"]:
- okas.pop("_id")
- if oka_flag not in okas.keys():
+ elif "_id" not in okas and "sw_catalog_path" not in okas:
raise EngineException(
- "Cannot create ksu. Give either OKA or SW catalog path for all oka in a KSU",
+ "Cannot create ksu. Either oka id or SW catalog path is required for all OKA in a KSU",
HTTPStatus.UNPROCESSABLE_ENTITY,
)
- self.check_dependency(check)
+ elif "_id" in okas:
+ check_dict["okas"].append(okas["_id"])
+ self.check_dependency(check_dict)
# Override descriptor with query string kwargs
content = self._remove_envelop(content)
return _id_list, op_id
def clone(self, rollback, session, _id, indata, kwargs, headers):
- check = {
+ check_dict = {
"ksu": _id,
indata["profile"]["profile_type"]: indata["profile"]["_id"],
}
- self.check_dependency(check)
+ self.check_dependency(check_dict)
filter_db = self._get_project_filter(session)
filter_db[BaseTopic.id_field(self.topic, _id)] = _id
data = self.db.get_one(self.topic, filter_db)
)
def move_ksu(self, session, _id, indata=None, kwargs=None, content=None):
- check = {
+ check_dict = {
"ksu": _id,
indata["profile"]["profile_type"]: indata["profile"]["_id"],
}
- self.check_dependency(check)
+ self.check_dependency(check_dict)
indata = self._remove_envelop(indata)
# Override descriptor with query string kwargs
return outdata
def edit_ksu(self, session, _id, indata, kwargs):
- check = {
+ check_dict = {
"ksu": _id,
}
if indata.get("profile"):
- check[indata["profile"]["profile_type"]] = indata["profile"]["_id"]
+ check_dict[indata["profile"]["profile_type"]] = indata["profile"]["_id"]
if indata.get("oka"):
- check["okas"] = []
+ check_dict["okas"] = []
for oka in indata["oka"]:
if oka.get("_id") is not None:
- check["okas"].append(oka["_id"])
+ check_dict["okas"].append(oka["_id"])
- self.check_dependency(check)
+ self.check_dependency(check_dict)
content = None
indata = self._remove_envelop(indata)