From ad682a52ef94fa2662e2a0f6e3f81fb7c8f5e0fe Mon Sep 17 00:00:00 2001 From: delacruzramo Date: Tue, 10 Dec 2019 16:26:34 +0100 Subject: [PATCH] Token Cache Management Change-Id: I09ea3a8c0c537a5eeba0ac5bc1426167133db998 Signed-off-by: delacruzramo --- osm_nbi/admin_topics.py | 11 ++--- osm_nbi/auth.py | 52 +++++++++++++++++----- osm_nbi/authconn.py | 2 +- osm_nbi/authconn_internal.py | 59 ++++++++++++++----------- osm_nbi/authconn_keystone.py | 4 +- osm_nbi/engine.py | 19 ++++---- osm_nbi/nbi.py | 7 ++- osm_nbi/subscriptions.py | 44 ++++++++++++++---- osm_nbi/tests/test_admin_topics.py | 8 ++-- osm_nbi/tests/test_descriptor_topics.py | 4 +- 10 files changed, 141 insertions(+), 69 deletions(-) diff --git a/osm_nbi/admin_topics.py b/osm_nbi/admin_topics.py index 7a7ace6..90c5d08 100644 --- a/osm_nbi/admin_topics.py +++ b/osm_nbi/admin_topics.py @@ -1107,11 +1107,12 @@ class RoleTopicAuth(BaseTopic): raise EngineException("You cannot delete role '{}'".format(role["name"]), http_code=HTTPStatus.FORBIDDEN) # If any user is using this role, raise CONFLICT exception - for user in self.auth.get_user_list(): - for prm in user.get("project_role_mappings"): - if prm["role"] == _id: - raise EngineException("Role '{}' ({}) is being used by user '{}'" - .format(role["name"], _id, user["username"]), HTTPStatus.CONFLICT) + if not session["force"]: + for user in self.auth.get_user_list(): + for prm in user.get("project_role_mappings"): + if prm["role"] == _id: + raise EngineException("Role '{}' ({}) is being used by user '{}'" + .format(role["name"], _id, user["username"]), HTTPStatus.CONFLICT) @staticmethod def format_on_new(content, project_id=None, make_public=False): # TO BE REMOVED ? diff --git a/osm_nbi/auth.py b/osm_nbi/auth.py index 023d286..1b8fa2b 100644 --- a/osm_nbi/auth.py +++ b/osm_nbi/auth.py @@ -41,9 +41,8 @@ from os import path from osm_nbi.authconn import AuthException, AuthExceptionUnauthorized from osm_nbi.authconn_keystone import AuthconnKeystone -from osm_nbi.authconn_internal import AuthconnInternal # Comment out for testing&debugging, uncomment when ready -from osm_common import dbmongo -from osm_common import dbmemory +from osm_nbi.authconn_internal import AuthconnInternal +from osm_common import dbmemory, dbmongo, msglocal, msgkafka from osm_common.dbbase import DbException from osm_nbi.validation import is_valid_uuid from itertools import chain @@ -60,6 +59,7 @@ class Authenticator: """ periodin_db_pruning = 60 * 30 # for the internal backend only. every 30 minutes expired tokens will be pruned + token_limit = 500 # when reached, the token cache will be cleared def __init__(self, valid_methods, valid_query_string): """ @@ -69,6 +69,7 @@ class Authenticator: self.backend = None self.config = None self.db = None + self.msg = None self.tokens_cache = dict() self.next_db_prune_time = 0 # time when next cleaning of expired tokens must be done self.roles_to_operations_file = None @@ -101,11 +102,21 @@ class Authenticator: else: raise AuthException("Invalid configuration param '{}' at '[database]':'driver'" .format(config["database"]["driver"])) + if not self.msg: + if config["message"]["driver"] == "local": + self.msg = msglocal.MsgLocal() + self.msg.connect(config["message"]) + elif config["message"]["driver"] == "kafka": + self.msg = msgkafka.MsgKafka() + self.msg.connect(config["message"]) + else: + raise AuthException("Invalid configuration param '{}' at '[message]':'driver'" + .format(config["message"]["driver"])) if not self.backend: if config["authentication"]["backend"] == "keystone": - self.backend = AuthconnKeystone(self.config["authentication"], self.db, self.tokens_cache) + self.backend = AuthconnKeystone(self.config["authentication"], self.db) elif config["authentication"]["backend"] == "internal": - self.backend = AuthconnInternal(self.config["authentication"], self.db, self.tokens_cache) + self.backend = AuthconnInternal(self.config["authentication"], self.db) self._internal_tokens_prune() else: raise AuthException("Unknown authentication backend: {}" @@ -354,7 +365,22 @@ class Authenticator: if not token: raise AuthException("Needed a token or Authorization http header", http_code=HTTPStatus.UNAUTHORIZED) - token_info = self.backend.validate_token(token) + + # try to get from cache first + now = time() + token_info = self.tokens_cache.get(token) + if token_info and token_info["expires"] < now: + # delete token. MUST be done with care, as another thread maybe already delete it. Do not use del + self.tokens_cache.pop(token, None) + token_info = None + + # get from database if not in cache + if not token_info: + token_info = self.backend.validate_token(token) + # Clear cache if token limit reached + if len(self.tokens_cache) > self.token_limit: + self.tokens_cache.clear() + self.tokens_cache[token] = token_info # TODO add to token info remote host, port if role_permission: @@ -396,8 +422,6 @@ class Authenticator: elif remote.ip: new_token_info["remote_host"] = remote.ip - self.tokens_cache[new_token_info["_id"]] = new_token_info - # TODO call self._internal_tokens_prune(now) ? return deepcopy(new_token_info) @@ -424,7 +448,8 @@ class Authenticator: def del_token(self, token): try: self.backend.revoke_token(token) - self.tokens_cache.pop(token, None) + # self.tokens_cache.pop(token, None) + self.remove_token_from_cache(token) return "token '{}' deleted".format(token) except KeyError: raise AuthException("Token '{}' not found".format(token), http_code=HTTPStatus.NOT_FOUND) @@ -557,4 +582,11 @@ class Authenticator: if not self.next_db_prune_time or self.next_db_prune_time >= now: self.db.del_list("tokens", {"expires.lt": now}) self.next_db_prune_time = self.periodin_db_pruning + now - self.tokens_cache.clear() # force to reload tokens from database + # self.tokens_cache.clear() # not required any more + + def remove_token_from_cache(self, token=None): + if token: + self.tokens_cache.pop(token, None) + else: + self.tokens_cache.clear() + self.msg.write("admin", "revoke_token", {"_id": token} if token else None) diff --git a/osm_nbi/authconn.py b/osm_nbi/authconn.py index ef4d75c..2ebbb47 100644 --- a/osm_nbi/authconn.py +++ b/osm_nbi/authconn.py @@ -110,7 +110,7 @@ class Authconn: Each Auth backend connector plugin must be a subclass of Authconn class. """ - def __init__(self, config, db, token_cache): + def __init__(self, config, db): """ Constructor of the Authconn class. diff --git a/osm_nbi/authconn_internal.py b/osm_nbi/authconn_internal.py index 50a2123..672892d 100644 --- a/osm_nbi/authconn_internal.py +++ b/osm_nbi/authconn_internal.py @@ -34,7 +34,7 @@ from osm_nbi.base_topic import BaseTopic import logging import re -from time import time +from time import time, sleep from http import HTTPStatus from uuid import uuid4 from hashlib import sha256 @@ -43,16 +43,18 @@ from random import choice as random_choice class AuthconnInternal(Authconn): - def __init__(self, config, db, token_cache): - Authconn.__init__(self, config, db, token_cache) + token_time_window = 2 # seconds + token_delay = 1 # seconds to wait upon second request within time window + def __init__(self, config, db): + Authconn.__init__(self, config, db) self.logger = logging.getLogger("nbi.authenticator.internal") self.db = db - self.token_cache = token_cache + # self.msg = msg + # self.token_cache = token_cache # To be Confirmed - self.auth = None self.sess = None def validate_token(self, token): @@ -75,19 +77,13 @@ class AuthconnInternal(Authconn): if not token: raise AuthException("Needed a token or Authorization HTTP header", http_code=HTTPStatus.UNAUTHORIZED) - # try to get from cache first now = time() - token_info = self.token_cache.get(token) - if token_info and token_info["expires"] < now: - # delete token. MUST be done with care, as another thread maybe already delete it. Do not use del - self.token_cache.pop(token, None) - token_info = None # get from database if not in cache - if not token_info: - token_info = self.db.get_one("tokens", {"_id": token}) - if token_info["expires"] < now: - raise AuthException("Expired Token or Authorization HTTP header", http_code=HTTPStatus.UNAUTHORIZED) + # if not token_info: + token_info = self.db.get_one("tokens", {"_id": token}) + if token_info["expires"] < now: + raise AuthException("Expired Token or Authorization HTTP header", http_code=HTTPStatus.UNAUTHORIZED) return token_info @@ -110,7 +106,7 @@ class AuthconnInternal(Authconn): :param token: token to be revoked """ try: - self.token_cache.pop(token, None) + # self.token_cache.pop(token, None) self.db.del_one("tokens", {"_id": token}) return True except DbException as e: @@ -118,9 +114,9 @@ class AuthconnInternal(Authconn): raise AuthException("Token '{}' not found".format(token), http_code=HTTPStatus.NOT_FOUND) else: # raise - msg = "Error during token revocation using internal backend" - self.logger.exception(msg) - raise AuthException(msg, http_code=HTTPStatus.UNAUTHORIZED) + exmsg = "Error during token revocation using internal backend" + self.logger.exception(exmsg) + raise AuthException(exmsg, http_code=HTTPStatus.UNAUTHORIZED) def authenticate(self, user, password, project=None, token_info=None): """ @@ -163,6 +159,13 @@ class AuthconnInternal(Authconn): raise AuthException("Provide credentials: username/password or Authorization Bearer token", http_code=HTTPStatus.UNAUTHORIZED) + # Delay upon second request within time window + if now - user_content["_admin"].get("last_token_time", 0) < self.token_time_window: + sleep(self.token_delay) + # user_content["_admin"]["last_token_time"] = now + # self.db.replace("users", user_content["_id"], user_content) # might cause race conditions + self.db.set_one("users", {"_id": user_content["_id"]}, {"_admin.last_token_time": now}) + token_id = ''.join(random_choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') for _ in range(0, 32)) @@ -216,7 +219,6 @@ class AuthconnInternal(Authconn): "roles": roles_list, } - self.token_cache[token_id] = new_token self.db.create("tokens", new_token) return deepcopy(new_token) @@ -249,7 +251,9 @@ class AuthconnInternal(Authconn): :param role_id: role identifier. :raises AuthconnOperationException: if role deletion failed. """ - return self.db.del_one("roles", {"_id": role_id}) + rc = self.db.del_one("roles", {"_id": role_id}) + self.db.del_list("tokens", {"roles.id": role_id}) + return rc def update_role(self, role_info): """ @@ -260,7 +264,7 @@ class AuthconnInternal(Authconn): :raises AuthconnOperationException: if user creation failed. """ rid = role_info["_id"] - self.db.set_one("roles", {"_id": rid}, role_info) # CONFIRM + self.db.set_one("roles", {"_id": rid}, role_info) return {"_id": rid, "name": role_info["name"]} def create_user(self, user_info): @@ -320,8 +324,8 @@ class AuthconnInternal(Authconn): idf = BaseTopic.id_field("users", uid) self.db.set_one("users", {idf: uid}, user_data) if user_info.get("remove_project_role_mappings"): - self.db.del_list("tokens", {"user_id" if idf == "_id" else idf: uid}) - self.token_cache.clear() + idf = "user_id" if idf == "_id" else idf + self.db.del_list("tokens", {idf: uid}) def delete_user(self, user_id): """ @@ -332,7 +336,6 @@ class AuthconnInternal(Authconn): """ self.db.del_one("users", {"_id": user_id}) self.db.del_list("tokens", {"user_id": user_id}) - self.token_cache.clear() return True def get_user_list(self, filter_q=None): @@ -416,8 +419,10 @@ class AuthconnInternal(Authconn): :param project_id: project identifier. :raises AuthconnOperationException: if project deletion failed. """ - filter_q = {BaseTopic.id_field("projects", project_id): project_id} - r = self.db.del_one("projects", filter_q) + idf = BaseTopic.id_field("projects", project_id) + r = self.db.del_one("projects", {idf: project_id}) + idf = "project_id" if idf == "_id" else "project_name" + self.db.del_list("tokens", {idf: project_id}) return r def update_project(self, project_id, project_info): diff --git a/osm_nbi/authconn_keystone.py b/osm_nbi/authconn_keystone.py index ebd7654..685773b 100644 --- a/osm_nbi/authconn_keystone.py +++ b/osm_nbi/authconn_keystone.py @@ -45,8 +45,8 @@ from osm_nbi.validation import is_valid_uuid class AuthconnKeystone(Authconn): - def __init__(self, config, db, token_cache): - Authconn.__init__(self, config, db, token_cache) + def __init__(self, config, db): + Authconn.__init__(self, config, db) self.logger = logging.getLogger("nbi.authenticator.keystone") diff --git a/osm_nbi/engine.py b/osm_nbi/engine.py index 3f83557..ca7a837 100644 --- a/osm_nbi/engine.py +++ b/osm_nbi/engine.py @@ -68,17 +68,18 @@ class Engine(object): # Add new versions here } - def __init__(self, token_cache): + def __init__(self, authenticator): self.db = None self.fs = None self.msg = None - self.auth = None + self.authconn = None self.config = None self.operations = None self.logger = logging.getLogger("nbi.engine") self.map_topic = {} self.write_lock = None - self.token_cache = token_cache + # self.token_cache = token_cache + self.authenticator = authenticator def start(self, config): """ @@ -123,11 +124,11 @@ class Engine(object): else: raise EngineException("Invalid configuration param '{}' at '[message]':'driver'".format( config["message"]["driver"])) - if not self.auth: + if not self.authconn: if config["authentication"]["backend"] == "keystone": - self.auth = AuthconnKeystone(config["authentication"], self.db, None) + self.authconn = AuthconnKeystone(config["authentication"], self.db) else: - self.auth = AuthconnInternal(config["authentication"], self.db, self.token_cache) + self.authconn = AuthconnInternal(config["authentication"], self.db) if not self.operations: if "resources_to_operations" in config["rbac"]: resources_to_operations_file = config["rbac"]["resources_to_operations"] @@ -157,11 +158,11 @@ class Engine(object): for topic, topic_class in self.map_from_topic_to_class.items(): # if self.auth and topic_class in (UserTopicAuth, ProjectTopicAuth): # self.map_topic[topic] = topic_class(self.db, self.fs, self.msg, self.auth) - if self.auth and topic_class == RoleTopicAuth: - self.map_topic[topic] = topic_class(self.db, self.fs, self.msg, self.auth, + if self.authconn and topic_class == RoleTopicAuth: + self.map_topic[topic] = topic_class(self.db, self.fs, self.msg, self.authconn, self.operations) else: - self.map_topic[topic] = topic_class(self.db, self.fs, self.msg, self.auth) + self.map_topic[topic] = topic_class(self.db, self.fs, self.msg, self.authconn) self.map_topic["pm_jobs"] = PmJobsTopic(self.db, config["prometheus"].get("host"), config["prometheus"].get("port")) diff --git a/osm_nbi/nbi.py b/osm_nbi/nbi.py index 66105d6..f5d2293 100644 --- a/osm_nbi/nbi.py +++ b/osm_nbi/nbi.py @@ -484,7 +484,7 @@ class Server(object): def __init__(self): self.instance += 1 self.authenticator = Authenticator(valid_url_methods, valid_query_string) - self.engine = Engine(self.authenticator.tokens_cache) + self.engine = Engine(self.authenticator) def _format_in(self, kwargs): try: @@ -1132,6 +1132,11 @@ class Server(object): # if Role information changes, it is needed to reload the information of roles if topic == "roles" and method != "GET": self.authenticator.load_operation_to_allowed_roles() + + if topic == "projects" and method == "DELETE" \ + or topic in ["users", "roles"] and method in ["PUT", "PATCH", "DELETE"]: + self.authenticator.remove_token_from_cache() + return self._format_out(outdata, token_info, _format) except Exception as e: if isinstance(e, (NbiException, EngineException, DbException, FsException, MsgException, AuthException, diff --git a/osm_nbi/subscriptions.py b/osm_nbi/subscriptions.py index 03bb92b..393918c 100644 --- a/osm_nbi/subscriptions.py +++ b/osm_nbi/subscriptions.py @@ -55,7 +55,8 @@ class SubscriptionThread(threading.Thread): self.engine = engine self.loop = None self.logger = logging.getLogger("nbi.subscriptions") - self.aiomain_task = None # asyncio task for receiving kafka bus + self.aiomain_task_admin = None # asyncio task for receiving admin actions from kafka bus + self.aiomain_task = None # asyncio task for receiving normal actions from kafka bus self.internal_session = { # used for a session to the engine methods "project_id": (), "set_project": (), @@ -79,11 +80,21 @@ class SubscriptionThread(threading.Thread): if not kafka_working: self.logger.critical("kafka is working again") kafka_working = True - await asyncio.sleep(10, loop=self.loop) - self.aiomain_task = asyncio.ensure_future(self.msg.aioread(("ns", "nsi"), loop=self.loop, - aiocallback=self._msg_callback), - loop=self.loop) - await asyncio.wait_for(self.aiomain_task, timeout=None, loop=self.loop) + if not self.aiomain_task_admin or self.aiomain_task_admin._state == "FINISHED": + await asyncio.sleep(10, loop=self.loop) + self.logger.debug("Starting admin subscription task") + self.aiomain_task_admin = asyncio.ensure_future(self.msg.aioread(("admin",), loop=self.loop, + group_id=False, + aiocallback=self._msg_callback), + loop=self.loop) + if not self.aiomain_task or self.aiomain_task._state == "FINISHED": + await asyncio.sleep(10, loop=self.loop) + self.logger.debug("Starting non-admin subscription task") + self.aiomain_task = asyncio.ensure_future(self.msg.aioread(("ns", "nsi"), loop=self.loop, + aiocallback=self._msg_callback), + loop=self.loop) + await asyncio.wait([self.aiomain_task, self.aiomain_task_admin], + timeout=None, loop=self.loop, return_when=asyncio.FIRST_COMPLETED) except Exception as e: if self.to_terminate: return @@ -165,8 +176,25 @@ class SubscriptionThread(threading.Thread): self.engine.del_item(self.internal_session, "nsis", _id=params["nsir_id"], not_send_msg=msg_to_send) self.logger.debug("nsis={} deleted from database".format(params["nsir_id"])) - - # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that, + elif topic == "admin": + self.logger.debug("received {} {} {}".format(topic, command, params)) + if command in ["echo", "ping"]: # ignored commands + pass + elif command == "revoke_token": + if params: + if isinstance(params, dict) and "_id" in params: + tid = params.get("_id") + self.engine.authenticator.tokens_cache.pop(tid, None) + self.logger.debug("token '{}' removed from token_cache".format(tid)) + else: + self.logger.debug("unrecognized params in command '{} {}': {}" + .format(topic, command, params)) + else: + self.engine.authenticator.tokens_cache.clear() + self.logger.debug("token_cache cleared") + else: + self.logger.debug("unrecognized command '{} {}'".format(topic, command)) + # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that, # but content to be written is stored at msg_to_send for msg in msg_to_send: await self.msg.aiowrite(*msg, loop=self.loop) diff --git a/osm_nbi/tests/test_admin_topics.py b/osm_nbi/tests/test_admin_topics.py index 04a64f1..a28953b 100755 --- a/osm_nbi/tests/test_admin_topics.py +++ b/osm_nbi/tests/test_admin_topics.py @@ -50,7 +50,7 @@ class Test_ProjectTopicAuth(TestCase): self.db = Mock(dbbase.DbBase()) self.fs = Mock(fsbase.FsBase()) self.msg = Mock(msgbase.MsgBase()) - self.auth = Mock(authconn.Authconn(None, None, None)) + self.auth = Mock(authconn.Authconn(None, None)) self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth) self.fake_session = {"username": self.test_name, "project_id": (test_pid,), "method": None, "admin": True, "force": False, "public": False, "allow_show_user_project_role": True} @@ -210,7 +210,7 @@ class Test_RoleTopicAuth(TestCase): self.db = Mock(dbbase.DbBase()) self.fs = Mock(fsbase.FsBase()) self.msg = Mock(msgbase.MsgBase()) - self.auth = Mock(authconn.Authconn(None, None, None)) + self.auth = Mock(authconn.Authconn(None, None)) self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth, self.test_operations) self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None, "admin": True, "force": False, "public": False, "allow_show_user_project_role": True} @@ -370,7 +370,7 @@ class Test_UserTopicAuth(TestCase): self.db = Mock(dbbase.DbBase()) self.fs = Mock(fsbase.FsBase()) self.msg = Mock(msgbase.MsgBase()) - self.auth = Mock(authconn.Authconn(None, None, None)) + self.auth = Mock(authconn.Authconn(None, None)) self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth) self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None, "admin": True, "force": False, "public": False, "allow_show_user_project_role": True} @@ -588,7 +588,7 @@ class Test_CommonVimWimSdn(TestCase): self.db = Mock(dbbase.DbBase()) self.fs = Mock(fsbase.FsBase()) self.msg = Mock(msgbase.MsgBase()) - self.auth = Mock(authconn.Authconn(None, None, None)) + self.auth = Mock(authconn.Authconn(None, None)) self.topic = CommonVimWimSdn(self.db, self.fs, self.msg, self.auth) # Use WIM schemas for testing because they are the simplest self.topic.topic = "wims" diff --git a/osm_nbi/tests/test_descriptor_topics.py b/osm_nbi/tests/test_descriptor_topics.py index 773131c..6e39030 100755 --- a/osm_nbi/tests/test_descriptor_topics.py +++ b/osm_nbi/tests/test_descriptor_topics.py @@ -84,7 +84,7 @@ class Test_VnfdTopic(TestCase): self.db = Mock(dbbase.DbBase()) self.fs = Mock(fsbase.FsBase()) self.msg = Mock(msgbase.MsgBase()) - self.auth = Mock(authconn.Authconn(None, None, None)) + self.auth = Mock(authconn.Authconn(None, None)) self.topic = VnfdTopic(self.db, self.fs, self.msg, self.auth) def test_new_vnfd(self): @@ -512,7 +512,7 @@ class Test_NsdTopic(TestCase): self.db = Mock(dbbase.DbBase()) self.fs = Mock(fsbase.FsBase()) self.msg = Mock(msgbase.MsgBase()) - self.auth = Mock(authconn.Authconn(None, None, None)) + self.auth = Mock(authconn.Authconn(None, None)) self.topic = NsdTopic(self.db, self.fs, self.msg, self.auth) def test_new_nsd(self): -- 2.25.1