oid = super().format_on_new(content, project_id, make_public)
self.db.encrypt_decrypt_fields(content["credentials"], 'encrypt', ['password', 'secret'],
schema_version=content["schema_version"], salt=content["_id"])
+ # Add Helm/Juju Repo lists
+ repos = {"helm-chart": [], "juju-bundle": []}
+ for proj in content["_admin"]["projects_read"]:
+ if proj != 'ANY':
+ for repo in self.db.get_list("k8srepos", {"_admin.projects_read": proj}):
+ if repo["_id"] not in repos[repo["type"]]:
+ repos[repo["type"]].append(repo["_id"])
+ for k in repos:
+ content["_admin"][k.replace('-', '_')+"_repos"] = repos[k]
return oid
def format_on_edit(self, final_content, edit_content):
oid = super().format_on_edit(final_content, edit_content)
return oid
+ def check_conflict_on_edit(self, session, final_content, edit_content, _id):
+ super(CommonVimWimSdn, self).check_conflict_on_edit(session, final_content, edit_content, _id)
+ super().check_conflict_on_edit(session, final_content, edit_content, _id)
+ # Update Helm/Juju Repo lists
+ repos = {"helm-chart": [], "juju-bundle": []}
+ for proj in session.get("set_project", []):
+ if proj != 'ANY':
+ for repo in self.db.get_list("k8srepos", {"_admin.projects_read": proj}):
+ if repo["_id"] not in repos[repo["type"]]:
+ repos[repo["type"]].append(repo["_id"])
+ for k in repos:
+ rlist = k.replace('-', '_') + "_repos"
+ if rlist not in final_content["_admin"]:
+ final_content["_admin"][rlist] = []
+ final_content["_admin"][rlist] += repos[k]
+
class K8sRepoTopic(CommonVimWimSdn):
topic = "k8srepos"
password_to_encrypt = None
config_to_encrypt = {}
+ def format_on_new(self, content, project_id=None, make_public=False):
+ oid = super().format_on_new(content, project_id, make_public)
+ # Update Helm/Juju Repo lists
+ repo_list = content["type"].replace('-', '_')+"_repos"
+ for proj in content["_admin"]["projects_read"]:
+ if proj != 'ANY':
+ self.db.set_list("k8sclusters",
+ {"_admin.projects_read": proj, "_admin."+repo_list+".ne": content["_id"]}, {},
+ push={"_admin."+repo_list: content["_id"]})
+ return oid
+
+ def delete(self, session, _id, dry_run=False, not_send_msg=None):
+ type = self.db.get_one("k8srepos", {"_id": _id})["type"]
+ oid = super().delete(session, _id, dry_run, not_send_msg)
+ if oid:
+ # Remove from Helm/Juju Repo lists
+ repo_list = type.replace('-', '_') + "_repos"
+ self.db.set_list("k8sclusters", {"_admin."+repo_list: _id}, {}, pull={"_admin."+repo_list: _id})
+ return oid
+
class UserTopicAuth(UserTopic):
# topic = "users"
:return: dictionary, raise exception if not found.
"""
# Allow _id to be a name or uuid
- filter_q = {self.id_field(self.topic, _id): _id}
+ filter_q = {"username": _id}
# users = self.auth.get_user_list(filter_q)
users = self.list(session, filter_q) # To allow default filtering (Bug 853)
if len(users) == 1:
raise EngineException("You cannot delete role '{}'".format(role["name"]), http_code=HTTPStatus.FORBIDDEN)
# If any user is using this role, raise CONFLICT exception
- for user in self.auth.get_user_list():
- for prm in user.get("project_role_mappings"):
- if prm["role"] == _id:
- raise EngineException("Role '{}' ({}) is being used by user '{}'"
- .format(role["name"], _id, user["username"]), HTTPStatus.CONFLICT)
+ if not session["force"]:
+ for user in self.auth.get_user_list():
+ for prm in user.get("project_role_mappings"):
+ if prm["role"] == _id:
+ raise EngineException("Role '{}' ({}) is being used by user '{}'"
+ .format(role["name"], _id, user["username"]), HTTPStatus.CONFLICT)
@staticmethod
def format_on_new(content, project_id=None, make_public=False): # TO BE REMOVED ?