from osm_nbi.authconn import AuthException, AuthconnException, AuthExceptionUnauthorized
from osm_nbi.authconn_keystone import AuthconnKeystone
from osm_nbi.authconn_internal import AuthconnInternal
+from osm_nbi.authconn_tacacs import AuthconnTacacs
from osm_common import dbmemory, dbmongo, msglocal, msgkafka
from osm_common.dbbase import DbException
from osm_nbi.validation import is_valid_uuid
self.backend = AuthconnKeystone(self.config["authentication"], self.db, self.role_permissions)
elif config["authentication"]["backend"] == "internal":
self.backend = AuthconnInternal(self.config["authentication"], self.db, self.role_permissions)
- self._internal_tokens_prune()
+ self._internal_tokens_prune("tokens")
+ elif config["authentication"]["backend"] == "tacacs":
+ self.backend = AuthconnTacacs(self.config["authentication"], self.db, self.role_permissions)
+ self._internal_tokens_prune("tokens_tacacs")
else:
raise AuthException("Unknown authentication backend: {}"
.format(config["authentication"]["backend"]))
raise AuthException("needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED)
return token_value
- def _internal_tokens_prune(self, now=None):
+ def _internal_tokens_prune(self, token_collection, now=None):
now = now or time()
if not self.next_db_prune_time or self.next_db_prune_time >= now:
- self.db.del_list("tokens", {"expires.lt": now})
+ self.db.del_list(token_collection, {"expires.lt": now})
self.next_db_prune_time = self.periodin_db_pruning + now
# self.tokens_cache.clear() # not required any more