Token Cache Management 71/8371/13
authordelacruzramo <pedro.delacruzramos@altran.com>
Tue, 10 Dec 2019 15:26:34 +0000 (16:26 +0100)
committerdelacruzramo <pedro.delacruzramos@altran.com>
Mon, 20 Jan 2020 13:42:23 +0000 (13:42 +0000)
Change-Id: I09ea3a8c0c537a5eeba0ac5bc1426167133db998
Signed-off-by: delacruzramo <pedro.delacruzramos@altran.com>
osm_nbi/admin_topics.py
osm_nbi/auth.py
osm_nbi/authconn.py
osm_nbi/authconn_internal.py
osm_nbi/authconn_keystone.py
osm_nbi/engine.py
osm_nbi/nbi.py
osm_nbi/subscriptions.py
osm_nbi/tests/test_admin_topics.py
osm_nbi/tests/test_descriptor_topics.py

index 7a7ace6..90c5d08 100644 (file)
@@ -1107,11 +1107,12 @@ class RoleTopicAuth(BaseTopic):
             raise EngineException("You cannot delete role '{}'".format(role["name"]), http_code=HTTPStatus.FORBIDDEN)
 
         # If any user is using this role, raise CONFLICT exception
-        for user in self.auth.get_user_list():
-            for prm in user.get("project_role_mappings"):
-                if prm["role"] == _id:
-                    raise EngineException("Role '{}' ({}) is being used by user '{}'"
-                                          .format(role["name"], _id, user["username"]), HTTPStatus.CONFLICT)
+        if not session["force"]:
+            for user in self.auth.get_user_list():
+                for prm in user.get("project_role_mappings"):
+                    if prm["role"] == _id:
+                        raise EngineException("Role '{}' ({}) is being used by user '{}'"
+                                              .format(role["name"], _id, user["username"]), HTTPStatus.CONFLICT)
 
     @staticmethod
     def format_on_new(content, project_id=None, make_public=False):   # TO BE REMOVED ?
index 023d286..1b8fa2b 100644 (file)
@@ -41,9 +41,8 @@ from os import path
 
 from osm_nbi.authconn import AuthException, AuthExceptionUnauthorized
 from osm_nbi.authconn_keystone import AuthconnKeystone
-from osm_nbi.authconn_internal import AuthconnInternal   # Comment out for testing&debugging, uncomment when ready
-from osm_common import dbmongo
-from osm_common import dbmemory
+from osm_nbi.authconn_internal import AuthconnInternal
+from osm_common import dbmemory, dbmongo, msglocal, msgkafka
 from osm_common.dbbase import DbException
 from osm_nbi.validation import is_valid_uuid
 from itertools import chain
@@ -60,6 +59,7 @@ class Authenticator:
     """
 
     periodin_db_pruning = 60 * 30  # for the internal backend only. every 30 minutes expired tokens will be pruned
+    token_limit = 500   # when reached, the token cache will be cleared
 
     def __init__(self, valid_methods, valid_query_string):
         """
@@ -69,6 +69,7 @@ class Authenticator:
         self.backend = None
         self.config = None
         self.db = None
+        self.msg = None
         self.tokens_cache = dict()
         self.next_db_prune_time = 0  # time when next cleaning of expired tokens must be done
         self.roles_to_operations_file = None
@@ -101,11 +102,21 @@ class Authenticator:
                 else:
                     raise AuthException("Invalid configuration param '{}' at '[database]':'driver'"
                                         .format(config["database"]["driver"]))
+            if not self.msg:
+                if config["message"]["driver"] == "local":
+                    self.msg = msglocal.MsgLocal()
+                    self.msg.connect(config["message"])
+                elif config["message"]["driver"] == "kafka":
+                    self.msg = msgkafka.MsgKafka()
+                    self.msg.connect(config["message"])
+                else:
+                    raise AuthException("Invalid configuration param '{}' at '[message]':'driver'"
+                                        .format(config["message"]["driver"]))
             if not self.backend:
                 if config["authentication"]["backend"] == "keystone":
-                    self.backend = AuthconnKeystone(self.config["authentication"], self.db, self.tokens_cache)
+                    self.backend = AuthconnKeystone(self.config["authentication"], self.db)
                 elif config["authentication"]["backend"] == "internal":
-                    self.backend = AuthconnInternal(self.config["authentication"], self.db, self.tokens_cache)
+                    self.backend = AuthconnInternal(self.config["authentication"], self.db)
                     self._internal_tokens_prune()
                 else:
                     raise AuthException("Unknown authentication backend: {}"
@@ -354,7 +365,22 @@ class Authenticator:
             if not token:
                 raise AuthException("Needed a token or Authorization http header",
                                     http_code=HTTPStatus.UNAUTHORIZED)
-            token_info = self.backend.validate_token(token)
+
+            # try to get from cache first
+            now = time()
+            token_info = self.tokens_cache.get(token)
+            if token_info and token_info["expires"] < now:
+                # delete token. MUST be done with care, as another thread maybe already delete it. Do not use del
+                self.tokens_cache.pop(token, None)
+                token_info = None
+
+            # get from database if not in cache
+            if not token_info:
+                token_info = self.backend.validate_token(token)
+                # Clear cache if token limit reached
+                if len(self.tokens_cache) > self.token_limit:
+                    self.tokens_cache.clear()
+                self.tokens_cache[token] = token_info
             # TODO add to token info remote host, port
 
             if role_permission:
@@ -396,8 +422,6 @@ class Authenticator:
         elif remote.ip:
             new_token_info["remote_host"] = remote.ip
 
-        self.tokens_cache[new_token_info["_id"]] = new_token_info
-
         # TODO call self._internal_tokens_prune(now) ?
         return deepcopy(new_token_info)
 
@@ -424,7 +448,8 @@ class Authenticator:
     def del_token(self, token):
         try:
             self.backend.revoke_token(token)
-            self.tokens_cache.pop(token, None)
+            # self.tokens_cache.pop(token, None)
+            self.remove_token_from_cache(token)
             return "token '{}' deleted".format(token)
         except KeyError:
             raise AuthException("Token '{}' not found".format(token), http_code=HTTPStatus.NOT_FOUND)
@@ -557,4 +582,11 @@ class Authenticator:
         if not self.next_db_prune_time or self.next_db_prune_time >= now:
             self.db.del_list("tokens", {"expires.lt": now})
             self.next_db_prune_time = self.periodin_db_pruning + now
-            self.tokens_cache.clear()  # force to reload tokens from database
+            # self.tokens_cache.clear()  # not required any more
+
+    def remove_token_from_cache(self, token=None):
+        if token:
+            self.tokens_cache.pop(token, None)
+        else:
+            self.tokens_cache.clear()
+        self.msg.write("admin", "revoke_token", {"_id": token} if token else None)
index ef4d75c..2ebbb47 100644 (file)
@@ -110,7 +110,7 @@ class Authconn:
     Each Auth backend connector plugin must be a subclass of
     Authconn class.
     """
-    def __init__(self, config, db, token_cache):
+    def __init__(self, config, db):
         """
         Constructor of the Authconn class.
 
index 50a2123..672892d 100644 (file)
@@ -34,7 +34,7 @@ from osm_nbi.base_topic import BaseTopic
 
 import logging
 import re
-from time import time
+from time import time, sleep
 from http import HTTPStatus
 from uuid import uuid4
 from hashlib import sha256
@@ -43,16 +43,18 @@ from random import choice as random_choice
 
 
 class AuthconnInternal(Authconn):
-    def __init__(self, config, db, token_cache):
-        Authconn.__init__(self, config, db, token_cache)
+    token_time_window = 2   # seconds
+    token_delay = 1   # seconds to wait upon second request within time window
 
+    def __init__(self, config, db):
+        Authconn.__init__(self, config, db)
         self.logger = logging.getLogger("nbi.authenticator.internal")
 
         self.db = db
-        self.token_cache = token_cache
+        # self.msg = msg
+        # self.token_cache = token_cache
 
         # To be Confirmed
-        self.auth = None
         self.sess = None
 
     def validate_token(self, token):
@@ -75,19 +77,13 @@ class AuthconnInternal(Authconn):
             if not token:
                 raise AuthException("Needed a token or Authorization HTTP header", http_code=HTTPStatus.UNAUTHORIZED)
 
-            # try to get from cache first
             now = time()
-            token_info = self.token_cache.get(token)
-            if token_info and token_info["expires"] < now:
-                # delete token. MUST be done with care, as another thread maybe already delete it. Do not use del
-                self.token_cache.pop(token, None)
-                token_info = None
 
             # get from database if not in cache
-            if not token_info:
-                token_info = self.db.get_one("tokens", {"_id": token})
-                if token_info["expires"] < now:
-                    raise AuthException("Expired Token or Authorization HTTP header", http_code=HTTPStatus.UNAUTHORIZED)
+            if not token_info:
+            token_info = self.db.get_one("tokens", {"_id": token})
+            if token_info["expires"] < now:
+                raise AuthException("Expired Token or Authorization HTTP header", http_code=HTTPStatus.UNAUTHORIZED)
 
             return token_info
 
@@ -110,7 +106,7 @@ class AuthconnInternal(Authconn):
         :param token: token to be revoked
         """
         try:
-            self.token_cache.pop(token, None)
+            self.token_cache.pop(token, None)
             self.db.del_one("tokens", {"_id": token})
             return True
         except DbException as e:
@@ -118,9 +114,9 @@ class AuthconnInternal(Authconn):
                 raise AuthException("Token '{}' not found".format(token), http_code=HTTPStatus.NOT_FOUND)
             else:
                 # raise
-                msg = "Error during token revocation using internal backend"
-                self.logger.exception(msg)
-                raise AuthException(msg, http_code=HTTPStatus.UNAUTHORIZED)
+                exmsg = "Error during token revocation using internal backend"
+                self.logger.exception(exmsg)
+                raise AuthException(exmsg, http_code=HTTPStatus.UNAUTHORIZED)
 
     def authenticate(self, user, password, project=None, token_info=None):
         """
@@ -163,6 +159,13 @@ class AuthconnInternal(Authconn):
             raise AuthException("Provide credentials: username/password or Authorization Bearer token",
                                 http_code=HTTPStatus.UNAUTHORIZED)
 
+        # Delay upon second request within time window
+        if now - user_content["_admin"].get("last_token_time", 0) < self.token_time_window:
+            sleep(self.token_delay)
+        # user_content["_admin"]["last_token_time"] = now
+        # self.db.replace("users", user_content["_id"], user_content)   # might cause race conditions
+        self.db.set_one("users", {"_id": user_content["_id"]}, {"_admin.last_token_time": now})
+
         token_id = ''.join(random_choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789')
                            for _ in range(0, 32))
 
@@ -216,7 +219,6 @@ class AuthconnInternal(Authconn):
                      "roles": roles_list,
                      }
 
-        self.token_cache[token_id] = new_token
         self.db.create("tokens", new_token)
         return deepcopy(new_token)
 
@@ -249,7 +251,9 @@ class AuthconnInternal(Authconn):
         :param role_id: role identifier.
         :raises AuthconnOperationException: if role deletion failed.
         """
-        return self.db.del_one("roles", {"_id": role_id})
+        rc = self.db.del_one("roles", {"_id": role_id})
+        self.db.del_list("tokens", {"roles.id": role_id})
+        return rc
 
     def update_role(self, role_info):
         """
@@ -260,7 +264,7 @@ class AuthconnInternal(Authconn):
         :raises AuthconnOperationException: if user creation failed.
         """
         rid = role_info["_id"]
-        self.db.set_one("roles", {"_id": rid}, role_info)   # CONFIRM
+        self.db.set_one("roles", {"_id": rid}, role_info)
         return {"_id": rid, "name": role_info["name"]}
 
     def create_user(self, user_info):
@@ -320,8 +324,8 @@ class AuthconnInternal(Authconn):
         idf = BaseTopic.id_field("users", uid)
         self.db.set_one("users", {idf: uid}, user_data)
         if user_info.get("remove_project_role_mappings"):
-            self.db.del_list("tokens", {"user_id" if idf == "_id" else idf: uid})
-            self.token_cache.clear()
+            idf = "user_id" if idf == "_id" else idf
+            self.db.del_list("tokens", {idf: uid})
 
     def delete_user(self, user_id):
         """
@@ -332,7 +336,6 @@ class AuthconnInternal(Authconn):
         """
         self.db.del_one("users", {"_id": user_id})
         self.db.del_list("tokens", {"user_id": user_id})
-        self.token_cache.clear()
         return True
 
     def get_user_list(self, filter_q=None):
@@ -416,8 +419,10 @@ class AuthconnInternal(Authconn):
         :param project_id: project identifier.
         :raises AuthconnOperationException: if project deletion failed.
         """
-        filter_q = {BaseTopic.id_field("projects", project_id): project_id}
-        r = self.db.del_one("projects", filter_q)
+        idf = BaseTopic.id_field("projects", project_id)
+        r = self.db.del_one("projects", {idf: project_id})
+        idf = "project_id" if idf == "_id" else "project_name"
+        self.db.del_list("tokens", {idf: project_id})
         return r
 
     def update_project(self, project_id, project_info):
index ebd7654..685773b 100644 (file)
@@ -45,8 +45,8 @@ from osm_nbi.validation import is_valid_uuid
 
 
 class AuthconnKeystone(Authconn):
-    def __init__(self, config, db, token_cache):
-        Authconn.__init__(self, config, db, token_cache)
+    def __init__(self, config, db):
+        Authconn.__init__(self, config, db)
 
         self.logger = logging.getLogger("nbi.authenticator.keystone")
 
index 3f83557..ca7a837 100644 (file)
@@ -68,17 +68,18 @@ class Engine(object):
         # Add new versions here
     }
 
-    def __init__(self, token_cache):
+    def __init__(self, authenticator):
         self.db = None
         self.fs = None
         self.msg = None
-        self.auth = None
+        self.authconn = None
         self.config = None
         self.operations = None
         self.logger = logging.getLogger("nbi.engine")
         self.map_topic = {}
         self.write_lock = None
-        self.token_cache = token_cache
+        # self.token_cache = token_cache
+        self.authenticator = authenticator
 
     def start(self, config):
         """
@@ -123,11 +124,11 @@ class Engine(object):
                 else:
                     raise EngineException("Invalid configuration param '{}' at '[message]':'driver'".format(
                         config["message"]["driver"]))
-            if not self.auth:
+            if not self.authconn:
                 if config["authentication"]["backend"] == "keystone":
-                    self.auth = AuthconnKeystone(config["authentication"], self.db, None)
+                    self.authconn = AuthconnKeystone(config["authentication"], self.db)
                 else:
-                    self.auth = AuthconnInternal(config["authentication"], self.db, self.token_cache)
+                    self.authconn = AuthconnInternal(config["authentication"], self.db)
             if not self.operations:
                 if "resources_to_operations" in config["rbac"]:
                     resources_to_operations_file = config["rbac"]["resources_to_operations"]
@@ -157,11 +158,11 @@ class Engine(object):
             for topic, topic_class in self.map_from_topic_to_class.items():
                 # if self.auth and topic_class in (UserTopicAuth, ProjectTopicAuth):
                 #     self.map_topic[topic] = topic_class(self.db, self.fs, self.msg, self.auth)
-                if self.auth and topic_class == RoleTopicAuth:
-                    self.map_topic[topic] = topic_class(self.db, self.fs, self.msg, self.auth,
+                if self.authconn and topic_class == RoleTopicAuth:
+                    self.map_topic[topic] = topic_class(self.db, self.fs, self.msg, self.authconn,
                                                         self.operations)
                 else:
-                    self.map_topic[topic] = topic_class(self.db, self.fs, self.msg, self.auth)
+                    self.map_topic[topic] = topic_class(self.db, self.fs, self.msg, self.authconn)
             
             self.map_topic["pm_jobs"] = PmJobsTopic(self.db, config["prometheus"].get("host"),
                                                     config["prometheus"].get("port"))
index 66105d6..f5d2293 100644 (file)
@@ -484,7 +484,7 @@ class Server(object):
     def __init__(self):
         self.instance += 1
         self.authenticator = Authenticator(valid_url_methods, valid_query_string)
-        self.engine = Engine(self.authenticator.tokens_cache)
+        self.engine = Engine(self.authenticator)
 
     def _format_in(self, kwargs):
         try:
@@ -1132,6 +1132,11 @@ class Server(object):
             # if Role information changes, it is needed to reload the information of roles
             if topic == "roles" and method != "GET":
                 self.authenticator.load_operation_to_allowed_roles()
+
+            if topic == "projects" and method == "DELETE" \
+                    or topic in ["users", "roles"] and method in ["PUT", "PATCH", "DELETE"]:
+                self.authenticator.remove_token_from_cache()
+
             return self._format_out(outdata, token_info, _format)
         except Exception as e:
             if isinstance(e, (NbiException, EngineException, DbException, FsException, MsgException, AuthException,
index 03bb92b..393918c 100644 (file)
@@ -55,7 +55,8 @@ class SubscriptionThread(threading.Thread):
         self.engine = engine
         self.loop = None
         self.logger = logging.getLogger("nbi.subscriptions")
-        self.aiomain_task = None  # asyncio task for receiving kafka bus
+        self.aiomain_task_admin = None  # asyncio task for receiving admin actions from kafka bus
+        self.aiomain_task = None  # asyncio task for receiving normal actions from kafka bus
         self.internal_session = {  # used for a session to the engine methods
             "project_id": (),
             "set_project": (),
@@ -79,11 +80,21 @@ class SubscriptionThread(threading.Thread):
                 if not kafka_working:
                     self.logger.critical("kafka is working again")
                     kafka_working = True
-                await asyncio.sleep(10, loop=self.loop)
-                self.aiomain_task = asyncio.ensure_future(self.msg.aioread(("ns", "nsi"), loop=self.loop,
-                                                                           aiocallback=self._msg_callback),
-                                                          loop=self.loop)
-                await asyncio.wait_for(self.aiomain_task, timeout=None, loop=self.loop)
+                if not self.aiomain_task_admin or self.aiomain_task_admin._state == "FINISHED":
+                    await asyncio.sleep(10, loop=self.loop)
+                    self.logger.debug("Starting admin subscription task")
+                    self.aiomain_task_admin = asyncio.ensure_future(self.msg.aioread(("admin",), loop=self.loop,
+                                                                                     group_id=False,
+                                                                                     aiocallback=self._msg_callback),
+                                                                    loop=self.loop)
+                if not self.aiomain_task or self.aiomain_task._state == "FINISHED":
+                    await asyncio.sleep(10, loop=self.loop)
+                    self.logger.debug("Starting non-admin subscription task")
+                    self.aiomain_task = asyncio.ensure_future(self.msg.aioread(("ns", "nsi"), loop=self.loop,
+                                                                               aiocallback=self._msg_callback),
+                                                              loop=self.loop)
+                await asyncio.wait([self.aiomain_task, self.aiomain_task_admin],
+                                   timeout=None, loop=self.loop, return_when=asyncio.FIRST_COMPLETED)
             except Exception as e:
                 if self.to_terminate:
                     return
@@ -165,8 +176,25 @@ class SubscriptionThread(threading.Thread):
                         self.engine.del_item(self.internal_session, "nsis", _id=params["nsir_id"],
                                              not_send_msg=msg_to_send)
                         self.logger.debug("nsis={} deleted from database".format(params["nsir_id"]))
-
-            # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that, 
+            elif topic == "admin":
+                self.logger.debug("received {} {} {}".format(topic, command, params))
+                if command in ["echo", "ping"]:   # ignored commands
+                    pass
+                elif command == "revoke_token":
+                    if params:
+                        if isinstance(params, dict) and "_id" in params:
+                            tid = params.get("_id")
+                            self.engine.authenticator.tokens_cache.pop(tid, None)
+                            self.logger.debug("token '{}' removed from token_cache".format(tid))
+                        else:
+                            self.logger.debug("unrecognized params in command '{} {}': {}"
+                                              .format(topic, command, params))
+                    else:
+                        self.engine.authenticator.tokens_cache.clear()
+                        self.logger.debug("token_cache cleared")
+                else:
+                    self.logger.debug("unrecognized command '{} {}'".format(topic, command))
+            # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that,
             # but content to be written is stored at msg_to_send
             for msg in msg_to_send:
                 await self.msg.aiowrite(*msg, loop=self.loop)
index 04a64f1..a28953b 100755 (executable)
@@ -50,7 +50,7 @@ class Test_ProjectTopicAuth(TestCase):
         self.db = Mock(dbbase.DbBase())
         self.fs = Mock(fsbase.FsBase())
         self.msg = Mock(msgbase.MsgBase())
-        self.auth = Mock(authconn.Authconn(None, None, None))
+        self.auth = Mock(authconn.Authconn(None, None))
         self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth)
         self.fake_session = {"username": self.test_name, "project_id": (test_pid,), "method": None,
                              "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
@@ -210,7 +210,7 @@ class Test_RoleTopicAuth(TestCase):
         self.db = Mock(dbbase.DbBase())
         self.fs = Mock(fsbase.FsBase())
         self.msg = Mock(msgbase.MsgBase())
-        self.auth = Mock(authconn.Authconn(None, None, None))
+        self.auth = Mock(authconn.Authconn(None, None))
         self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth, self.test_operations)
         self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None,
                              "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
@@ -370,7 +370,7 @@ class Test_UserTopicAuth(TestCase):
         self.db = Mock(dbbase.DbBase())
         self.fs = Mock(fsbase.FsBase())
         self.msg = Mock(msgbase.MsgBase())
-        self.auth = Mock(authconn.Authconn(None, None, None))
+        self.auth = Mock(authconn.Authconn(None, None))
         self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth)
         self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None,
                              "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
@@ -588,7 +588,7 @@ class Test_CommonVimWimSdn(TestCase):
         self.db = Mock(dbbase.DbBase())
         self.fs = Mock(fsbase.FsBase())
         self.msg = Mock(msgbase.MsgBase())
-        self.auth = Mock(authconn.Authconn(None, None, None))
+        self.auth = Mock(authconn.Authconn(None, None))
         self.topic = CommonVimWimSdn(self.db, self.fs, self.msg, self.auth)
         # Use WIM schemas for testing because they are the simplest
         self.topic.topic = "wims"
index 773131c..6e39030 100755 (executable)
@@ -84,7 +84,7 @@ class Test_VnfdTopic(TestCase):
         self.db = Mock(dbbase.DbBase())
         self.fs = Mock(fsbase.FsBase())
         self.msg = Mock(msgbase.MsgBase())
-        self.auth = Mock(authconn.Authconn(None, None, None))
+        self.auth = Mock(authconn.Authconn(None, None))
         self.topic = VnfdTopic(self.db, self.fs, self.msg, self.auth)
 
     def test_new_vnfd(self):
@@ -512,7 +512,7 @@ class Test_NsdTopic(TestCase):
         self.db = Mock(dbbase.DbBase())
         self.fs = Mock(fsbase.FsBase())
         self.msg = Mock(msgbase.MsgBase())
-        self.auth = Mock(authconn.Authconn(None, None, None))
+        self.auth = Mock(authconn.Authconn(None, None))
         self.topic = NsdTopic(self.db, self.fs, self.msg, self.auth)
 
     def test_new_nsd(self):