def load_operation_to_allowed_roles(self):
"""
Fills the internal self.operation_to_allowed_roles based on database role content and self.operations
+ It works in a shadow copy and replace at the end to allow other threads working with the old copy
:return: None
"""
permissions = {oper: [] for oper in self.operations}
records = self.db.get_list("roles_operations")
- ignore_fields = ["_id", "_admin", "name", "default", "admin"]
+ ignore_fields = ["_id", "_admin", "name", "default"]
for record in records:
record_permissions = {oper: record["permissions"].get("default", False) for oper in self.operations}
operations_joined = [(oper, value) for oper, value in record["permissions"].items()
for allowed_op in allowed_operations:
permissions[allowed_op].append(record["name"])
- for oper, role_list in permissions.items():
- self.operation_to_allowed_roles[oper] = role_list
+ self.operation_to_allowed_roles = permissions
def authorize(self):
token = None
raise AuthException("Needed a token or Authorization http header",
http_code=HTTPStatus.UNAUTHORIZED)
try:
- self.backend.validate_token(token)
- self.check_permissions(self.tokens_cache[token], cherrypy.request.path_info,
+ token_info = self.backend.validate_token(token)
+ # TODO add to token info remote host, port
+
+ self.check_permissions(token_info, cherrypy.request.path_info,
cherrypy.request.method)
- # TODO: check if this can be avoided. Backend may provide enough information
- return deepcopy(self.tokens_cache[token])
+ return token_info
except AuthException:
self.del_token(token)
raise
operation = self.resources_to_operations_mapping[key]
roles_required = self.operation_to_allowed_roles[operation]
- roles_allowed = self.backend.get_user_role_list(session["_id"])
+ roles_allowed = [role["name"] for role in session["roles"]]
+
+ # fills session["admin"] if some roles allows it
+ session["admin"] = False
+ for role in roles_allowed:
+ if role in self.operation_to_allowed_roles["admin"]:
+ session["admin"] = True
+ break
if "anonymous" in roles_required:
return