raise EngineException("You cannot delete role '{}'".format(role["name"]), http_code=HTTPStatus.FORBIDDEN)
# If any user is using this role, raise CONFLICT exception
- for user in self.auth.get_user_list():
- for prm in user.get("project_role_mappings"):
- if prm["role"] == _id:
- raise EngineException("Role '{}' ({}) is being used by user '{}'"
- .format(role["name"], _id, user["username"]), HTTPStatus.CONFLICT)
+ if not session["force"]:
+ for user in self.auth.get_user_list():
+ for prm in user.get("project_role_mappings"):
+ if prm["role"] == _id:
+ raise EngineException("Role '{}' ({}) is being used by user '{}'"
+ .format(role["name"], _id, user["username"]), HTTPStatus.CONFLICT)
@staticmethod
def format_on_new(content, project_id=None, make_public=False): # TO BE REMOVED ?
from osm_nbi.authconn import AuthException, AuthExceptionUnauthorized
from osm_nbi.authconn_keystone import AuthconnKeystone
-from osm_nbi.authconn_internal import AuthconnInternal # Comment out for testing&debugging, uncomment when ready
-from osm_common import dbmongo
-from osm_common import dbmemory
+from osm_nbi.authconn_internal import AuthconnInternal
+from osm_common import dbmemory, dbmongo, msglocal, msgkafka
from osm_common.dbbase import DbException
from osm_nbi.validation import is_valid_uuid
from itertools import chain
"""
periodin_db_pruning = 60 * 30 # for the internal backend only. every 30 minutes expired tokens will be pruned
+ token_limit = 500 # when reached, the token cache will be cleared
def __init__(self, valid_methods, valid_query_string):
"""
self.backend = None
self.config = None
self.db = None
+ self.msg = None
self.tokens_cache = dict()
self.next_db_prune_time = 0 # time when next cleaning of expired tokens must be done
self.roles_to_operations_file = None
else:
raise AuthException("Invalid configuration param '{}' at '[database]':'driver'"
.format(config["database"]["driver"]))
+ if not self.msg:
+ if config["message"]["driver"] == "local":
+ self.msg = msglocal.MsgLocal()
+ self.msg.connect(config["message"])
+ elif config["message"]["driver"] == "kafka":
+ self.msg = msgkafka.MsgKafka()
+ self.msg.connect(config["message"])
+ else:
+ raise AuthException("Invalid configuration param '{}' at '[message]':'driver'"
+ .format(config["message"]["driver"]))
if not self.backend:
if config["authentication"]["backend"] == "keystone":
- self.backend = AuthconnKeystone(self.config["authentication"], self.db, self.tokens_cache)
+ self.backend = AuthconnKeystone(self.config["authentication"], self.db)
elif config["authentication"]["backend"] == "internal":
- self.backend = AuthconnInternal(self.config["authentication"], self.db, self.tokens_cache)
+ self.backend = AuthconnInternal(self.config["authentication"], self.db)
self._internal_tokens_prune()
else:
raise AuthException("Unknown authentication backend: {}"
if not token:
raise AuthException("Needed a token or Authorization http header",
http_code=HTTPStatus.UNAUTHORIZED)
- token_info = self.backend.validate_token(token)
+
+ # try to get from cache first
+ now = time()
+ token_info = self.tokens_cache.get(token)
+ if token_info and token_info["expires"] < now:
+ # delete token. MUST be done with care, as another thread maybe already delete it. Do not use del
+ self.tokens_cache.pop(token, None)
+ token_info = None
+
+ # get from database if not in cache
+ if not token_info:
+ token_info = self.backend.validate_token(token)
+ # Clear cache if token limit reached
+ if len(self.tokens_cache) > self.token_limit:
+ self.tokens_cache.clear()
+ self.tokens_cache[token] = token_info
# TODO add to token info remote host, port
if role_permission:
elif remote.ip:
new_token_info["remote_host"] = remote.ip
- self.tokens_cache[new_token_info["_id"]] = new_token_info
-
# TODO call self._internal_tokens_prune(now) ?
return deepcopy(new_token_info)
def del_token(self, token):
try:
self.backend.revoke_token(token)
- self.tokens_cache.pop(token, None)
+ # self.tokens_cache.pop(token, None)
+ self.remove_token_from_cache(token)
return "token '{}' deleted".format(token)
except KeyError:
raise AuthException("Token '{}' not found".format(token), http_code=HTTPStatus.NOT_FOUND)
if not self.next_db_prune_time or self.next_db_prune_time >= now:
self.db.del_list("tokens", {"expires.lt": now})
self.next_db_prune_time = self.periodin_db_pruning + now
- self.tokens_cache.clear() # force to reload tokens from database
+ # self.tokens_cache.clear() # not required any more
+
+ def remove_token_from_cache(self, token=None):
+ if token:
+ self.tokens_cache.pop(token, None)
+ else:
+ self.tokens_cache.clear()
+ self.msg.write("admin", "revoke_token", {"_id": token} if token else None)
Each Auth backend connector plugin must be a subclass of
Authconn class.
"""
- def __init__(self, config, db, token_cache):
+ def __init__(self, config, db):
"""
Constructor of the Authconn class.
import logging
import re
-from time import time
+from time import time, sleep
from http import HTTPStatus
from uuid import uuid4
from hashlib import sha256
class AuthconnInternal(Authconn):
- def __init__(self, config, db, token_cache):
- Authconn.__init__(self, config, db, token_cache)
+ token_time_window = 2 # seconds
+ token_delay = 1 # seconds to wait upon second request within time window
+ def __init__(self, config, db):
+ Authconn.__init__(self, config, db)
self.logger = logging.getLogger("nbi.authenticator.internal")
self.db = db
- self.token_cache = token_cache
+ # self.msg = msg
+ # self.token_cache = token_cache
# To be Confirmed
- self.auth = None
self.sess = None
def validate_token(self, token):
if not token:
raise AuthException("Needed a token or Authorization HTTP header", http_code=HTTPStatus.UNAUTHORIZED)
- # try to get from cache first
now = time()
- token_info = self.token_cache.get(token)
- if token_info and token_info["expires"] < now:
- # delete token. MUST be done with care, as another thread maybe already delete it. Do not use del
- self.token_cache.pop(token, None)
- token_info = None
# get from database if not in cache
- if not token_info:
- token_info = self.db.get_one("tokens", {"_id": token})
- if token_info["expires"] < now:
- raise AuthException("Expired Token or Authorization HTTP header", http_code=HTTPStatus.UNAUTHORIZED)
+ # if not token_info:
+ token_info = self.db.get_one("tokens", {"_id": token})
+ if token_info["expires"] < now:
+ raise AuthException("Expired Token or Authorization HTTP header", http_code=HTTPStatus.UNAUTHORIZED)
return token_info
:param token: token to be revoked
"""
try:
- self.token_cache.pop(token, None)
+ # self.token_cache.pop(token, None)
self.db.del_one("tokens", {"_id": token})
return True
except DbException as e:
raise AuthException("Token '{}' not found".format(token), http_code=HTTPStatus.NOT_FOUND)
else:
# raise
- msg = "Error during token revocation using internal backend"
- self.logger.exception(msg)
- raise AuthException(msg, http_code=HTTPStatus.UNAUTHORIZED)
+ exmsg = "Error during token revocation using internal backend"
+ self.logger.exception(exmsg)
+ raise AuthException(exmsg, http_code=HTTPStatus.UNAUTHORIZED)
def authenticate(self, user, password, project=None, token_info=None):
"""
raise AuthException("Provide credentials: username/password or Authorization Bearer token",
http_code=HTTPStatus.UNAUTHORIZED)
+ # Delay upon second request within time window
+ if now - user_content["_admin"].get("last_token_time", 0) < self.token_time_window:
+ sleep(self.token_delay)
+ # user_content["_admin"]["last_token_time"] = now
+ # self.db.replace("users", user_content["_id"], user_content) # might cause race conditions
+ self.db.set_one("users", {"_id": user_content["_id"]}, {"_admin.last_token_time": now})
+
token_id = ''.join(random_choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789')
for _ in range(0, 32))
"roles": roles_list,
}
- self.token_cache[token_id] = new_token
self.db.create("tokens", new_token)
return deepcopy(new_token)
:param role_id: role identifier.
:raises AuthconnOperationException: if role deletion failed.
"""
- return self.db.del_one("roles", {"_id": role_id})
+ rc = self.db.del_one("roles", {"_id": role_id})
+ self.db.del_list("tokens", {"roles.id": role_id})
+ return rc
def update_role(self, role_info):
"""
:raises AuthconnOperationException: if user creation failed.
"""
rid = role_info["_id"]
- self.db.set_one("roles", {"_id": rid}, role_info) # CONFIRM
+ self.db.set_one("roles", {"_id": rid}, role_info)
return {"_id": rid, "name": role_info["name"]}
def create_user(self, user_info):
idf = BaseTopic.id_field("users", uid)
self.db.set_one("users", {idf: uid}, user_data)
if user_info.get("remove_project_role_mappings"):
- self.db.del_list("tokens", {"user_id" if idf == "_id" else idf: uid})
- self.token_cache.clear()
+ idf = "user_id" if idf == "_id" else idf
+ self.db.del_list("tokens", {idf: uid})
def delete_user(self, user_id):
"""
"""
self.db.del_one("users", {"_id": user_id})
self.db.del_list("tokens", {"user_id": user_id})
- self.token_cache.clear()
return True
def get_user_list(self, filter_q=None):
:param project_id: project identifier.
:raises AuthconnOperationException: if project deletion failed.
"""
- filter_q = {BaseTopic.id_field("projects", project_id): project_id}
- r = self.db.del_one("projects", filter_q)
+ idf = BaseTopic.id_field("projects", project_id)
+ r = self.db.del_one("projects", {idf: project_id})
+ idf = "project_id" if idf == "_id" else "project_name"
+ self.db.del_list("tokens", {idf: project_id})
return r
def update_project(self, project_id, project_info):
class AuthconnKeystone(Authconn):
- def __init__(self, config, db, token_cache):
- Authconn.__init__(self, config, db, token_cache)
+ def __init__(self, config, db):
+ Authconn.__init__(self, config, db)
self.logger = logging.getLogger("nbi.authenticator.keystone")
# Add new versions here
}
- def __init__(self, token_cache):
+ def __init__(self, authenticator):
self.db = None
self.fs = None
self.msg = None
- self.auth = None
+ self.authconn = None
self.config = None
self.operations = None
self.logger = logging.getLogger("nbi.engine")
self.map_topic = {}
self.write_lock = None
- self.token_cache = token_cache
+ # self.token_cache = token_cache
+ self.authenticator = authenticator
def start(self, config):
"""
else:
raise EngineException("Invalid configuration param '{}' at '[message]':'driver'".format(
config["message"]["driver"]))
- if not self.auth:
+ if not self.authconn:
if config["authentication"]["backend"] == "keystone":
- self.auth = AuthconnKeystone(config["authentication"], self.db, None)
+ self.authconn = AuthconnKeystone(config["authentication"], self.db)
else:
- self.auth = AuthconnInternal(config["authentication"], self.db, self.token_cache)
+ self.authconn = AuthconnInternal(config["authentication"], self.db)
if not self.operations:
if "resources_to_operations" in config["rbac"]:
resources_to_operations_file = config["rbac"]["resources_to_operations"]
for topic, topic_class in self.map_from_topic_to_class.items():
# if self.auth and topic_class in (UserTopicAuth, ProjectTopicAuth):
# self.map_topic[topic] = topic_class(self.db, self.fs, self.msg, self.auth)
- if self.auth and topic_class == RoleTopicAuth:
- self.map_topic[topic] = topic_class(self.db, self.fs, self.msg, self.auth,
+ if self.authconn and topic_class == RoleTopicAuth:
+ self.map_topic[topic] = topic_class(self.db, self.fs, self.msg, self.authconn,
self.operations)
else:
- self.map_topic[topic] = topic_class(self.db, self.fs, self.msg, self.auth)
+ self.map_topic[topic] = topic_class(self.db, self.fs, self.msg, self.authconn)
self.map_topic["pm_jobs"] = PmJobsTopic(self.db, config["prometheus"].get("host"),
config["prometheus"].get("port"))
def __init__(self):
self.instance += 1
self.authenticator = Authenticator(valid_url_methods, valid_query_string)
- self.engine = Engine(self.authenticator.tokens_cache)
+ self.engine = Engine(self.authenticator)
def _format_in(self, kwargs):
try:
# if Role information changes, it is needed to reload the information of roles
if topic == "roles" and method != "GET":
self.authenticator.load_operation_to_allowed_roles()
+
+ if topic == "projects" and method == "DELETE" \
+ or topic in ["users", "roles"] and method in ["PUT", "PATCH", "DELETE"]:
+ self.authenticator.remove_token_from_cache()
+
return self._format_out(outdata, token_info, _format)
except Exception as e:
if isinstance(e, (NbiException, EngineException, DbException, FsException, MsgException, AuthException,
self.engine = engine
self.loop = None
self.logger = logging.getLogger("nbi.subscriptions")
- self.aiomain_task = None # asyncio task for receiving kafka bus
+ self.aiomain_task_admin = None # asyncio task for receiving admin actions from kafka bus
+ self.aiomain_task = None # asyncio task for receiving normal actions from kafka bus
self.internal_session = { # used for a session to the engine methods
"project_id": (),
"set_project": (),
if not kafka_working:
self.logger.critical("kafka is working again")
kafka_working = True
- await asyncio.sleep(10, loop=self.loop)
- self.aiomain_task = asyncio.ensure_future(self.msg.aioread(("ns", "nsi"), loop=self.loop,
- aiocallback=self._msg_callback),
- loop=self.loop)
- await asyncio.wait_for(self.aiomain_task, timeout=None, loop=self.loop)
+ if not self.aiomain_task_admin or self.aiomain_task_admin._state == "FINISHED":
+ await asyncio.sleep(10, loop=self.loop)
+ self.logger.debug("Starting admin subscription task")
+ self.aiomain_task_admin = asyncio.ensure_future(self.msg.aioread(("admin",), loop=self.loop,
+ group_id=False,
+ aiocallback=self._msg_callback),
+ loop=self.loop)
+ if not self.aiomain_task or self.aiomain_task._state == "FINISHED":
+ await asyncio.sleep(10, loop=self.loop)
+ self.logger.debug("Starting non-admin subscription task")
+ self.aiomain_task = asyncio.ensure_future(self.msg.aioread(("ns", "nsi"), loop=self.loop,
+ aiocallback=self._msg_callback),
+ loop=self.loop)
+ await asyncio.wait([self.aiomain_task, self.aiomain_task_admin],
+ timeout=None, loop=self.loop, return_when=asyncio.FIRST_COMPLETED)
except Exception as e:
if self.to_terminate:
return
self.engine.del_item(self.internal_session, "nsis", _id=params["nsir_id"],
not_send_msg=msg_to_send)
self.logger.debug("nsis={} deleted from database".format(params["nsir_id"]))
-
- # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that,
+ elif topic == "admin":
+ self.logger.debug("received {} {} {}".format(topic, command, params))
+ if command in ["echo", "ping"]: # ignored commands
+ pass
+ elif command == "revoke_token":
+ if params:
+ if isinstance(params, dict) and "_id" in params:
+ tid = params.get("_id")
+ self.engine.authenticator.tokens_cache.pop(tid, None)
+ self.logger.debug("token '{}' removed from token_cache".format(tid))
+ else:
+ self.logger.debug("unrecognized params in command '{} {}': {}"
+ .format(topic, command, params))
+ else:
+ self.engine.authenticator.tokens_cache.clear()
+ self.logger.debug("token_cache cleared")
+ else:
+ self.logger.debug("unrecognized command '{} {}'".format(topic, command))
+ # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that,
# but content to be written is stored at msg_to_send
for msg in msg_to_send:
await self.msg.aiowrite(*msg, loop=self.loop)
self.db = Mock(dbbase.DbBase())
self.fs = Mock(fsbase.FsBase())
self.msg = Mock(msgbase.MsgBase())
- self.auth = Mock(authconn.Authconn(None, None, None))
+ self.auth = Mock(authconn.Authconn(None, None))
self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth)
self.fake_session = {"username": self.test_name, "project_id": (test_pid,), "method": None,
"admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
self.db = Mock(dbbase.DbBase())
self.fs = Mock(fsbase.FsBase())
self.msg = Mock(msgbase.MsgBase())
- self.auth = Mock(authconn.Authconn(None, None, None))
+ self.auth = Mock(authconn.Authconn(None, None))
self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth, self.test_operations)
self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None,
"admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
self.db = Mock(dbbase.DbBase())
self.fs = Mock(fsbase.FsBase())
self.msg = Mock(msgbase.MsgBase())
- self.auth = Mock(authconn.Authconn(None, None, None))
+ self.auth = Mock(authconn.Authconn(None, None))
self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth)
self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None,
"admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
self.db = Mock(dbbase.DbBase())
self.fs = Mock(fsbase.FsBase())
self.msg = Mock(msgbase.MsgBase())
- self.auth = Mock(authconn.Authconn(None, None, None))
+ self.auth = Mock(authconn.Authconn(None, None))
self.topic = CommonVimWimSdn(self.db, self.fs, self.msg, self.auth)
# Use WIM schemas for testing because they are the simplest
self.topic.topic = "wims"
self.db = Mock(dbbase.DbBase())
self.fs = Mock(fsbase.FsBase())
self.msg = Mock(msgbase.MsgBase())
- self.auth = Mock(authconn.Authconn(None, None, None))
+ self.auth = Mock(authconn.Authconn(None, None))
self.topic = VnfdTopic(self.db, self.fs, self.msg, self.auth)
def test_new_vnfd(self):
self.db = Mock(dbbase.DbBase())
self.fs = Mock(fsbase.FsBase())
self.msg = Mock(msgbase.MsgBase())
- self.auth = Mock(authconn.Authconn(None, None, None))
+ self.auth = Mock(authconn.Authconn(None, None))
self.topic = NsdTopic(self.db, self.fs, self.msg, self.auth)
def test_new_nsd(self):