from osm_nbi.authconn import AuthException, AuthconnException, AuthExceptionUnauthorized
from osm_nbi.authconn_keystone import AuthconnKeystone
from osm_nbi.authconn_internal import AuthconnInternal
+from osm_nbi.authconn_tacacs import AuthconnTacacs
from osm_common import dbmemory, dbmongo, msglocal, msgkafka
from osm_common.dbbase import DbException
from osm_nbi.validation import is_valid_uuid
self.backend = AuthconnKeystone(self.config["authentication"], self.db, self.role_permissions)
elif config["authentication"]["backend"] == "internal":
self.backend = AuthconnInternal(self.config["authentication"], self.db, self.role_permissions)
- self._internal_tokens_prune()
+ self._internal_tokens_prune("tokens")
+ elif config["authentication"]["backend"] == "tacacs":
+ self.backend = AuthconnTacacs(self.config["authentication"], self.db, self.role_permissions)
+ self._internal_tokens_prune("tokens_tacacs")
else:
raise AuthException("Unknown authentication backend: {}"
.format(config["authentication"]["backend"]))
:return: True if access granted by permission rules, False if access granted by default rules (Bug 853)
:raises: AuthExceptionUnauthorized if access denied
"""
+ self.load_operation_to_allowed_roles()
roles_required = self.operation_to_allowed_roles[role_permission]
roles_allowed = [role["name"] for role in token_info["roles"]]
raise AuthException("needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED)
return token_value
- def _internal_tokens_prune(self, now=None):
+ def _internal_tokens_prune(self, token_collection, now=None):
now = now or time()
if not self.next_db_prune_time or self.next_db_prune_time >= now:
- self.db.del_list("tokens", {"expires.lt": now})
+ self.db.del_list(token_collection, {"expires.lt": now})
self.next_db_prune_time = self.periodin_db_pruning + now
# self.tokens_cache.clear() # not required any more