from time import time
from os import path
-from osm_nbi.authconn import AuthException, AuthExceptionUnauthorized
+from osm_nbi.authconn import AuthException, AuthconnException, AuthExceptionUnauthorized
from osm_nbi.authconn_keystone import AuthconnKeystone
from osm_nbi.authconn_internal import AuthconnInternal
+from osm_nbi.authconn_tacacs import AuthconnTacacs
from osm_common import dbmemory, dbmongo, msglocal, msgkafka
from osm_common.dbbase import DbException
from osm_nbi.validation import is_valid_uuid
self.backend = AuthconnKeystone(self.config["authentication"], self.db, self.role_permissions)
elif config["authentication"]["backend"] == "internal":
self.backend = AuthconnInternal(self.config["authentication"], self.db, self.role_permissions)
- self._internal_tokens_prune()
+ self._internal_tokens_prune("tokens")
+ elif config["authentication"]["backend"] == "tacacs":
+ self.backend = AuthconnTacacs(self.config["authentication"], self.db, self.role_permissions)
+ self._internal_tokens_prune("tokens_tacacs")
else:
raise AuthException("Unknown authentication backend: {}"
.format(config["authentication"]["backend"]))
records = self.backend.get_role_list()
- # Loading permissions to MongoDB if there is not any permission.
- if not records or (len(records) == 1 and records[0]["name"] == "admin"):
+ # Loading permissions to AUTH. At lease system_admin must be present.
+ if not records or not next((r for r in records if r["name"] == "system_admin"), None):
with open(self.roles_to_operations_file, "r") as stream:
roles_to_operations_yaml = yaml.load(stream, Loader=yaml.Loader)
.format(permission, role_with_operations["name"],
self.roles_to_operations_file))
- # TODO chek permission is ok
+ # TODO check permission is ok
if permission[-1] == ":":
raise AuthException("Invalid permission '{}' terminated in ':' for role '{}'; at file {}"
.format(permission, role_with_operations["name"],
}
# self.db.create(self.roles_to_operations_table, role_with_operations)
- self.backend.create_role(role_with_operations)
- self.logger.info("Role '{}' created at database".format(role_with_operations["name"]))
+ try:
+ self.backend.create_role(role_with_operations)
+ self.logger.info("Role '{}' created".format(role_with_operations["name"]))
+ except (AuthException, AuthconnException) as e:
+ if role_with_operations["name"] == "system_admin":
+ raise
+ self.logger.error("Role '{}' cannot be created: {}".format(role_with_operations["name"], e))
# Create admin project&user if required
pid = self.create_admin_project()
:return: True if access granted by permission rules, False if access granted by default rules (Bug 853)
:raises: AuthExceptionUnauthorized if access denied
"""
+ self.load_operation_to_allowed_roles()
roles_required = self.operation_to_allowed_roles[role_permission]
roles_allowed = [role["name"] for role in token_info["roles"]]
raise AuthException("needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED)
return token_value
- def _internal_tokens_prune(self, now=None):
+ def _internal_tokens_prune(self, token_collection, now=None):
now = now or time()
if not self.next_db_prune_time or self.next_db_prune_time >= now:
- self.db.del_list("tokens", {"expires.lt": now})
+ self.db.del_list(token_collection, {"expires.lt": now})
self.next_db_prune_time = self.periodin_db_pruning + now
# self.tokens_cache.clear() # not required any more