from osm_nbi.validation import user_new_schema, user_edit_schema, project_new_schema, project_edit_schema, \
vim_account_new_schema, vim_account_edit_schema, sdn_new_schema, sdn_edit_schema, \
wim_account_new_schema, wim_account_edit_schema, roles_new_schema, roles_edit_schema, \
+ k8scluster_new_schema, k8scluster_edit_schema, k8srepo_new_schema, k8srepo_edit_schema, \
validate_input, ValidationError, is_valid_uuid # To check that User/Project Names don't look like UUIDs
from osm_nbi.base_topic import BaseTopic, EngineException
from osm_nbi.authconn import AuthconnNotFoundException, AuthconnConflictException
:param edit_content: user requested update content
:return: operation id
"""
+ super().format_on_edit(final_content, edit_content)
# encrypt passwords
schema_version = final_content.get("schema_version")
config_to_encrypt = {}
+class K8sClusterTopic(CommonVimWimSdn):
+ topic = "k8sclusters"
+ topic_msg = "k8scluster"
+ schema_new = k8scluster_new_schema
+ schema_edit = k8scluster_edit_schema
+ multiproject = True
+ password_to_encrypt = None
+ config_to_encrypt = {}
+
+ def format_on_new(self, content, project_id=None, make_public=False):
+ oid = super().format_on_new(content, project_id, make_public)
+ self.db.encrypt_decrypt_fields(content["credentials"], 'encrypt', ['password', 'secret'],
+ schema_version=content["schema_version"], salt=content["_id"])
+ return oid
+
+ def format_on_edit(self, final_content, edit_content):
+ if final_content.get("schema_version") and edit_content.get("credentials"):
+ self.db.encrypt_decrypt_fields(edit_content["credentials"], 'encrypt', ['password', 'secret'],
+ schema_version=final_content["schema_version"], salt=final_content["_id"])
+ deep_update_rfc7396(final_content["credentials"], edit_content["credentials"])
+ oid = super().format_on_edit(final_content, edit_content)
+ return oid
+
+
+class K8sRepoTopic(CommonVimWimSdn):
+ topic = "k8srepos"
+ topic_msg = "k8srepo"
+ schema_new = k8srepo_new_schema
+ schema_edit = k8srepo_edit_schema
+ multiproject = True
+ password_to_encrypt = None
+ config_to_encrypt = {}
+
+
class UserTopicAuth(UserTopic):
# topic = "users"
# topic_msg = "users"
rollback.append({"topic": self.topic, "_id": _id})
# del content["password"]
- # self._send_msg("create", content)
+ # self._send_msg("created", content)
return _id, None
except ValidationError as e:
raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
"""
# Allow _id to be a name or uuid
filter_q = {self.id_field(self.topic, _id): _id}
- users = self.auth.get_user_list(filter_q)
-
+ # users = self.auth.get_user_list(filter_q)
+ users = self.list(session, filter_q) # To allow default filtering (Bug 853)
if len(users) == 1:
return users[0]
elif len(users) > 1:
:param filter_q: filter of data to be applied
:return: The list, it can be empty if no one match the filter.
"""
- users = self.auth.get_user_list(filter_q)
-
- return users
+ user_list = self.auth.get_user_list(filter_q)
+ if not session["allow_show_user_project_role"]:
+ # Bug 853 - Default filtering
+ user_list = [usr for usr in user_list if usr["username"] == session["username"]]
+ return user_list
def delete(self, session, _id, dry_run=False):
"""
project_name = edit_content.get("name")
if project_name != final_content["name"]: # It is a true renaming
if is_valid_uuid(project_name):
- raise EngineException("project name '{}' cannot have an uuid format".format(project_name),
+ raise EngineException("project name '{}' cannot have an uuid format".format(project_name),
HTTPStatus.UNPROCESSABLE_ENTITY)
if final_content["name"] == "admin":
self.format_on_new(content, project_id=session["project_id"], make_public=session["public"])
_id = self.auth.create_project(content)
rollback.append({"topic": self.topic, "_id": _id})
- # self._send_msg("create", content)
+ # self._send_msg("created", content)
return _id, None
except ValidationError as e:
raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
"""
# Allow _id to be a name or uuid
filter_q = {self.id_field(self.topic, _id): _id}
- projects = self.auth.get_project_list(filter_q=filter_q)
-
+ # projects = self.auth.get_project_list(filter_q=filter_q)
+ projects = self.list(session, filter_q) # To allow default filtering (Bug 853)
if len(projects) == 1:
return projects[0]
elif len(projects) > 1:
:param filter_q: filter of data to be applied
:return: The list, it can be empty if no one match the filter.
"""
- return self.auth.get_project_list(filter_q)
+ project_list = self.auth.get_project_list(filter_q)
+ if not session["allow_show_user_project_role"]:
+ # Bug 853 - Default filtering
+ user = self.auth.get_user(session["username"])
+ projects = [prm["project"] for prm in user["project_role_mappings"]]
+ project_list = [proj for proj in project_list if proj["_id"] in projects]
+ return project_list
def delete(self, session, _id, dry_run=False):
"""
:param indata: data to be inserted
:return: None or raises EngineException
"""
+ # check name is not uuid
+ role_name = indata.get("name")
+ if is_valid_uuid(role_name):
+ raise EngineException("role name '{}' cannot have an uuid format".format(role_name),
+ HTTPStatus.UNPROCESSABLE_ENTITY)
# check name not exists
name = indata["name"]
# if self.db.get_one(self.topic, {"name": indata.get("name")}, fail_on_empty=False, fail_on_more=False):
if "admin" not in final_content["permissions"]:
final_content["permissions"]["admin"] = False
+ # check name is not uuid
+ role_name = edit_content.get("name")
+ if is_valid_uuid(role_name):
+ raise EngineException("role name '{}' cannot have an uuid format".format(role_name),
+ HTTPStatus.UNPROCESSABLE_ENTITY)
+
+ # Check renaming of admin roles
+ role = self.auth.get_role(_id)
+ if role["name"] in ["system_admin", "project_admin"]:
+ raise EngineException("You cannot rename role '{}'".format(role["name"]), http_code=HTTPStatus.FORBIDDEN)
+
# check name not exists
if "name" in edit_content:
role_name = edit_content["name"]
:return: dictionary, raise exception if not found.
"""
filter_q = {BaseTopic.id_field(self.topic, _id): _id}
- roles = self.auth.get_role_list(filter_q)
+ # roles = self.auth.get_role_list(filter_q)
+ roles = self.list(session, filter_q) # To allow default filtering (Bug 853)
if not roles:
raise AuthconnNotFoundException("Not found any role with filter {}".format(filter_q))
elif len(roles) > 1:
:param filter_q: filter of data to be applied
:return: The list, it can be empty if no one match the filter.
"""
- return self.auth.get_role_list(filter_q)
+ role_list = self.auth.get_role_list(filter_q)
+ if not session["allow_show_user_project_role"]:
+ # Bug 853 - Default filtering
+ user = self.auth.get_user(session["username"])
+ roles = [prm["role"] for prm in user["project_role_mappings"]]
+ role_list = [role for role in role_list if role["_id"] in roles]
+ return role_list
def new(self, rollback, session, indata=None, kwargs=None, headers=None):
"""
content["_id"] = rid
# _id = self.db.create(self.topic, content)
rollback.append({"topic": self.topic, "_id": rid})
- # self._send_msg("create", content)
+ # self._send_msg("created", content)
return rid, None
except ValidationError as e:
raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)