class RoleTopicAuth(BaseTopic):
topic = "roles_operations"
- topic_msg = "roles"
+ topic_msg = None # "roles"
schema_new = roles_new_schema
schema_edit = roles_edit_schema
multiproject = False
:param role_definitions: role definition to test
:return: None if ok, raises ValidationError exception on error
"""
- ignore_fields = ["_id", "_admin", "name"]
- for role_def in role_definitions.keys():
+ if not role_definitions.get("permissions"):
+ return
+ ignore_fields = ["admin", "default"]
+ for role_def in role_definitions["permissions"].keys():
if role_def in ignore_fields:
continue
- if role_def == "root":
- if isinstance(role_definitions[role_def], bool):
- continue
- else:
- raise ValidationError("Operation authorization \".\" should be True/False.")
if role_def[-1] == ":":
- raise ValidationError("Operation cannot end with \".\"")
+ raise ValidationError("Operation cannot end with ':'")
role_def_matches = [op for op in operations if op.startswith(role_def)]
if len(role_def_matches) == 0:
- raise ValidationError("No matching operation found.")
-
- if not isinstance(role_definitions[role_def], bool):
- raise ValidationError("Operation authorization {} should be True/False.".format(role_def))
+ raise ValidationError("Invalid permission '{}'".format(role_def))
def _validate_input_new(self, input, force=False):
"""
:param indata: data to be inserted
:return: None or raises EngineException
"""
- role = indata.get("name")
- role_list = list(map(lambda x: x["name"], self.auth.get_role_list()))
-
- if role in role_list:
- raise EngineException("role '{}' exists".format(role), HTTPStatus.CONFLICT)
+ # check name not exists
+ if self.db.get_one(self.topic, {"name": indata.get("name")}, fail_on_empty=False, fail_on_more=False):
+ raise EngineException("role name '{}' exists".format(indata["name"]), HTTPStatus.CONFLICT)
def check_conflict_on_edit(self, session, final_content, edit_content, _id):
"""
:param _id: internal _id
:return: None or raises EngineException
"""
- roles = self.auth.get_role_list()
- system_admin_role = [role for role in roles
- if role["name"] == "system_admin"][0]
+ if "default" not in final_content["permissions"]:
+ final_content["permissions"]["default"] = False
+ if "admin" not in final_content["permissions"]:
+ final_content["permissions"]["admin"] = False
- if _id == system_admin_role["_id"]:
- raise EngineException("You cannot edit system_admin role", http_code=HTTPStatus.FORBIDDEN)
+ # check name not exists
+ if "name" in edit_content:
+ role_name = edit_content["name"]
+ if self.db.get_one(self.topic, {"name": role_name, "_id.ne": _id}, fail_on_empty=False, fail_on_more=False):
+ raise EngineException("role name '{}' exists".format(role_name), HTTPStatus.CONFLICT)
def check_conflict_on_del(self, session, _id, db_content):
"""
content["_admin"]["created"] = now
content["_admin"]["modified"] = now
- if ":" in content.keys():
- content["root"] = content[":"]
- del content[":"]
+ if "permissions" not in content:
+ content["permissions"] = {}
- if "root" not in content.keys():
- content["root"] = False
+ if "default" not in content["permissions"]:
+ content["permissions"]["default"] = False
+ if "admin" not in content["permissions"]:
+ content["permissions"]["admin"] = False
@staticmethod
def format_on_edit(final_content, edit_content):
"""
final_content["_admin"]["modified"] = time()
- ignore_fields = ["_id", "name", "_admin"]
- delete_keys = [key for key in final_content.keys() if key not in ignore_fields]
-
- for key in delete_keys:
- del final_content[key]
-
- # Saving the role definition
- for role_def, value in edit_content.items():
- final_content[role_def] = value
-
- if ":" in final_content.keys():
- final_content["root"] = final_content[":"]
- del final_content[":"]
-
- if "root" not in final_content.keys():
- final_content["root"] = False
-
- @staticmethod
- def format_on_show(content):
- """
- Modifies the content of the role information to separate the role
- metadata from the role definition. Eases the reading process of the
- role definition.
-
- :param definition: role definition to be processed
- """
- content["_id"] = str(content["_id"])
-
- def show(self, session, _id):
- """
- Get complete information on an topic
-
- :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
- :param _id: server internal id
- :return: dictionary, raise exception if not found.
- """
- filter_db = {"_id": _id}
-
- role = self.db.get_one(self.topic, filter_db)
- new_role = dict(role)
- self.format_on_show(new_role)
-
- return new_role
-
- def list(self, session, filter_q=None):
- """
- Get a list of the topic that matches a filter
-
- :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
- :param filter_q: filter of data to be applied
- :return: The list, it can be empty if no one match the filter.
- """
- if not filter_q:
- filter_q = {}
-
- if ":" in filter_q:
- filter_q["root"] = filter_q[":"]
-
- for key in filter_q.keys():
- if key == "name":
- continue
- filter_q[key] = filter_q[key] in ["True", "true"]
+ if "permissions" not in final_content:
+ final_content["permissions"] = {}
- roles = self.db.get_list(self.topic, filter_q)
- new_roles = []
+ if "default" not in final_content["permissions"]:
+ final_content["permissions"]["default"] = False
+ if "admin" not in final_content["permissions"]:
+ final_content["permissions"]["admin"] = False
- for role in roles:
- new_role = dict(role)
- self.format_on_show(new_role)
- new_roles.append(new_role)
+ # @staticmethod
+ # def format_on_show(content):
+ # """
+ # Modifies the content of the role information to separate the role
+ # metadata from the role definition. Eases the reading process of the
+ # role definition.
+ #
+ # :param definition: role definition to be processed
+ # """
+ # content["_id"] = str(content["_id"])
+ #
+ # def show(self, session, _id):
+ # """
+ # Get complete information on an topic
+ #
+ # :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
+ # :param _id: server internal id
+ # :return: dictionary, raise exception if not found.
+ # """
+ # filter_db = {"_id": _id}
+ #
+ # role = self.db.get_one(self.topic, filter_db)
+ # new_role = dict(role)
+ # self.format_on_show(new_role)
+ #
+ # return new_role
- return new_roles
+ # def list(self, session, filter_q=None):
+ # """
+ # Get a list of the topic that matches a filter
+ #
+ # :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
+ # :param filter_q: filter of data to be applied
+ # :return: The list, it can be empty if no one match the filter.
+ # """
+ # if not filter_q:
+ # filter_q = {}
+ #
+ # if ":" in filter_q:
+ # filter_q["root"] = filter_q[":"]
+ #
+ # for key in filter_q.keys():
+ # if key == "name":
+ # continue
+ # filter_q[key] = filter_q[key] in ["True", "true"]
+ #
+ # roles = self.db.get_list(self.topic, filter_q)
+ # new_roles = []
+ #
+ # for role in roles:
+ # new_role = dict(role)
+ # self.format_on_show(new_role)
+ # new_roles.append(new_role)
+ #
+ # return new_roles
def new(self, rollback, session, indata=None, kwargs=None, headers=None):
"""
:return: _id: identity of the inserted data.
"""
try:
- content = BaseTopic._remove_envelop(indata)
+ content = self._remove_envelop(indata)
# Override descriptor with query string kwargs
- BaseTopic._update_input_with_kwargs(content, kwargs)
+ self._update_input_with_kwargs(content, kwargs)
content = self._validate_input_new(content, session["force"])
self.check_conflict_on_new(session, content)
self.format_on_new(content, project_id=session["project_id"], make_public=session["public"])
role_name = content["name"]
- role = self.auth.create_role(role_name)
- content["_id"] = role["_id"]
+ role_id = self.auth.create_role(role_name)
+ content["_id"] = role_id
_id = self.db.create(self.topic, content)
rollback.append({"topic": self.topic, "_id": _id})
# self._send_msg("create", content)
:param content:
:return: _id: identity of the inserted data.
"""
- indata = self._remove_envelop(indata)
-
- # Override descriptor with query string kwargs
- if kwargs:
- self._update_input_with_kwargs(indata, kwargs)
- try:
- indata = self._validate_input_edit(indata, force=session["force"])
-
- if not content:
- content = self.show(session, _id)
- self.check_conflict_on_edit(session, content, indata, _id=_id)
- self.format_on_edit(content, indata)
- self.db.replace(self.topic, _id, content)
- return id
- except ValidationError as e:
- raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
+ _id = super().edit(session, _id, indata, kwargs, content)
+ if indata.get("name"):
+ self.auth.update_role(_id, name=indata.get("name"))