from time import time
from os import path
-from osm_nbi.authconn import AuthException, AuthExceptionUnauthorized
+from osm_nbi.authconn import AuthException, AuthconnException, AuthExceptionUnauthorized
from osm_nbi.authconn_keystone import AuthconnKeystone
from osm_nbi.authconn_internal import AuthconnInternal
from osm_common import dbmemory, dbmongo, msglocal, msgkafka
self.role_permissions = []
self.valid_methods = valid_methods
self.valid_query_string = valid_query_string
+ self.system_admin_role_id = None # system_role id
+ self.test_project_id = None # test_project_id
def start(self, config):
"""
self.role_permissions.append(permission)
elif k in ("TODO", "METHODS"):
continue
- else:
+ elif method_dict[k]:
load_role_permissions(method_dict[k])
load_role_permissions(self.valid_methods)
if permission not in self.role_permissions:
self.role_permissions.append(permission)
+ # get ids of role system_admin and test project
+ role_system_admin = self.db.get_one("roles", {"name": "system_admin"}, fail_on_empty=False)
+ if role_system_admin:
+ self.system_admin_role_id = role_system_admin["_id"]
+ test_project_name = self.config["authentication"].get("project_not_authorized", "admin")
+ test_project = self.db.get_one("projects", {"name": test_project_name}, fail_on_empty=False)
+ if test_project:
+ self.test_project_id = test_project["_id"]
+
except Exception as e:
raise AuthException(str(e))
records = self.backend.get_role_list()
- # Loading permissions to MongoDB if there is not any permission.
- if not records or (len(records) == 1 and records[0]["name"] == "admin"):
+ # Loading permissions to AUTH. At lease system_admin must be present.
+ if not records or not next((r for r in records if r["name"] == "system_admin"), None):
with open(self.roles_to_operations_file, "r") as stream:
roles_to_operations_yaml = yaml.load(stream, Loader=yaml.Loader)
.format(permission, role_with_operations["name"],
self.roles_to_operations_file))
- # TODO chek permission is ok
+ # TODO check permission is ok
if permission[-1] == ":":
raise AuthException("Invalid permission '{}' terminated in ':' for role '{}'; at file {}"
.format(permission, role_with_operations["name"],
}
# self.db.create(self.roles_to_operations_table, role_with_operations)
- self.backend.create_role(role_with_operations)
- self.logger.info("Role '{}' created at database".format(role_with_operations["name"]))
+ try:
+ self.backend.create_role(role_with_operations)
+ self.logger.info("Role '{}' created".format(role_with_operations["name"]))
+ except (AuthException, AuthconnException) as e:
+ if role_with_operations["name"] == "system_admin":
+ raise
+ self.logger.error("Role '{}' cannot be created: {}".format(role_with_operations["name"], e))
# Create admin project&user if required
pid = self.create_admin_project()
if cherrypy.session.get('Authorization'):
del cherrypy.session['Authorization']
cherrypy.response.headers["WWW-Authenticate"] = 'Bearer realm="{}"'.format(e)
- elif self.config.get("user_not_authorized"):
- # TODO provide user_id, roles id (not name), project_id
- return {"id": "fake-token-id-for-test",
- "project_id": self.config.get("project_not_authorized", "admin"),
- "username": self.config["user_not_authorized"],
- "roles": ["system_admin"]}
+ if self.config["authentication"].get("user_not_authorized"):
+ return {"id": "testing-token", "_id": "testing-token",
+ "project_id": self.test_project_id,
+ "username": self.config["authentication"]["user_not_authorized"],
+ "roles": [self.system_admin_role_id],
+ "admin": True, "allow_show_user_project_role": True}
raise
def new_token(self, token_info, indata, remote):