plugins with the definition of the methods to be implemented.
"""
-__author__ = "Eduardo Sousa <esousa@whitestack.com>"
+__author__ = "Eduardo Sousa <esousa@whitestack.com>, " \
+ "Pedro de la Cruz Ramos <pdelacruzramos@altran.com>"
__date__ = "$27-jul-2018 23:59:59$"
from http import HTTPStatus
+from osm_nbi.base_topic import BaseTopic
class AuthException(Exception):
"""
- Authentication error.
+ Authentication error, because token, user password not recognized
"""
def __init__(self, message, http_code=HTTPStatus.UNAUTHORIZED):
super(AuthException, self).__init__(message)
self.http_code = http_code
+class AuthExceptionUnauthorized(AuthException):
+ """
+ Authentication error, because not having rights to make this operation
+ """
+ pass
+
+
class AuthconnException(Exception):
"""
Common and base class Exception for all authconn exceptions.
super(AuthconnOperationException, self).__init__(message, http_code)
+class AuthconnNotFoundException(AuthconnException):
+ """
+ The operation executed failed because element not found.
+ """
+ def __init__(self, message, http_code=HTTPStatus.NOT_FOUND):
+ super().__init__(message, http_code)
+
+
+class AuthconnConflictException(AuthconnException):
+ """
+ The operation has conflicts.
+ """
+ def __init__(self, message, http_code=HTTPStatus.CONFLICT):
+ super().__init__(message, http_code)
+
+
class Authconn:
"""
Abstract base class for all the Auth backend connector plugins.
Each Auth backend connector plugin must be a subclass of
Authconn class.
"""
- def __init__(self, config):
+ def __init__(self, config, db, role_permissions):
"""
Constructor of the Authconn class.
-
- Note: each subclass
-
:param config: configuration dictionary containing all the
necessary configuration parameters.
+ :param db: internal database classs
+ :param role_permissions: read only role permission list
"""
self.config = config
+ self.role_permissions = role_permissions
- def authenticate(self, user, password, project=None, token=None):
+ def authenticate(self, credentials, token_info=None):
"""
- Authenticate a user using username/password or token, plus project
- :param user: user: name, id or None
- :param password: password or None
- :param project: name, id, or None. If None first found project will be used to get an scope token
- :param token: previous token to obtain authorization
+ Authenticate a user using username/password or token_info, plus project
+ :param credentials: dictionary that contains:
+ username: name, id or None
+ password: password or None
+ project_id: name, id, or None. If None first found project will be used to get an scope token
+ other items are allowed for specific auth backends
+ :param token_info: previous token_info to obtain authorization
:return: the scoped token info or raises an exception. The token is a dictionary with:
_id: token string id,
username: username,
"""
raise AuthconnNotImplementedException("Should have implemented this")
- # def authenticate_with_token(self, token, project=None):
- # """
- # Authenticate a user using a token. Can be used to revalidate the token
- # or to get a scoped token.
- #
- # :param token: a valid token.
- # :param project: (optional) project for a scoped token.
- # :return: return a revalidated token, scoped if a project was passed or
- # the previous token was already scoped.
- # """
- # raise AuthconnNotImplementedException("Should have implemented this")
-
def validate_token(self, token):
"""
Check if the token is valid.
"""
raise AuthconnNotImplementedException("Should have implemented this")
- def get_user_project_list(self, token):
- """
- Get all the projects associated with a user.
-
- :param token: valid token
- :return: list of projects
- """
- raise AuthconnNotImplementedException("Should have implemented this")
-
- def get_user_role_list(self, token):
- """
- Get role list for a scoped project.
-
- :param token: scoped token.
- :return: returns the list of roles for the user in that project. If
- the token is unscoped it returns None.
- """
- raise AuthconnNotImplementedException("Should have implemented this")
-
- def create_user(self, user, password):
+ def create_user(self, user_info):
"""
Create a user.
- :param user: username.
- :param password: password.
+ :param user_info: full user info.
:raises AuthconnOperationException: if user creation failed.
"""
raise AuthconnNotImplementedException("Should have implemented this")
- def change_password(self, user, new_password):
+ def update_user(self, user_info):
"""
- Change the user password.
+ Change the user name and/or password.
- :param user: username.
- :param new_password: new password.
- :raises AuthconnOperationException: if user password change failed.
+ :param user_info: user info modifications
+ :raises AuthconnNotImplementedException: if function not implemented
"""
raise AuthconnNotImplementedException("Should have implemented this")
"""
raise AuthconnNotImplementedException("Should have implemented this")
- def get_user_list(self, filter_q={}):
+ def get_user_list(self, filter_q=None):
"""
Get user list.
- :param filter_q: dictionary to filter user list.
+ :param filter_q: dictionary to filter user list by name (username is also admited) and/or _id
:return: returns a list of users.
"""
- def create_role(self, role):
+ def get_user(self, _id, fail=True):
+ """
+ Get one user
+ :param _id: id or name
+ :param fail: True to raise exception on not found. False to return None on not found
+ :return: dictionary with the user information
+ """
+ filt = {BaseTopic.id_field("users", _id): _id}
+ users = self.get_user_list(filt)
+ if not users:
+ if fail:
+ raise AuthconnNotFoundException("User with {} not found".format(filt), http_code=HTTPStatus.NOT_FOUND)
+ else:
+ return None
+ return users[0]
+
+ def create_role(self, role_info):
"""
Create a role.
- :param role: role name.
+ :param role_info: full role info.
:raises AuthconnOperationException: if role creation failed.
"""
raise AuthconnNotImplementedException("Should have implemented this")
"""
raise AuthconnNotImplementedException("Should have implemented this")
- def get_role_list(self):
+ def get_role_list(self, filter_q=None):
"""
Get all the roles.
+ :param filter_q: dictionary to filter role list by _id and/or name.
:return: list of roles
"""
raise AuthconnNotImplementedException("Should have implemented this")
- def create_project(self, project):
+ def get_role(self, _id, fail=True):
+ """
+ Get one role
+ :param _id: id or name
+ :param fail: True to raise exception on not found. False to return None on not found
+ :return: dictionary with the role information
+ """
+ filt = {BaseTopic.id_field("roles", _id): _id}
+ roles = self.get_role_list(filt)
+ if not roles:
+ if fail:
+ raise AuthconnNotFoundException("Role with {} not found".format(filt))
+ else:
+ return None
+ return roles[0]
+
+ def update_role(self, role_info):
+ """
+ Change the information of a role
+ :param role_info: full role info
+ :return: None
+ """
+ raise AuthconnNotImplementedException("Should have implemented this")
+
+ def create_project(self, project_info):
"""
Create a project.
- :param project: project name.
+ :param project_info: full project info.
:return: the internal id of the created project
:raises AuthconnOperationException: if project creation failed.
"""
"""
raise AuthconnNotImplementedException("Should have implemented this")
- def update_project(self, project_id, new_name):
- """
- Change the name of a project
- :param project_id: project to be changed
- :param new_name: new name
- :return: None
- """
- raise AuthconnNotImplementedException("Should have implemented this")
-
- def assign_role_to_user(self, user, project, role):
+ def get_project(self, _id, fail=True):
"""
- Assigning a role to a user in a project.
-
- :param user: username.
- :param project: project name.
- :param role: role name.
- :raises AuthconnOperationException: if role assignment failed.
+ Get one project
+ :param _id: id or name
+ :param fail: True to raise exception on not found. False to return None on not found
+ :return: dictionary with the project information
"""
- raise AuthconnNotImplementedException("Should have implemented this")
+ filt = {BaseTopic.id_field("projects", _id): _id}
+ projs = self.get_project_list(filt)
+ if not projs:
+ if fail:
+ raise AuthconnNotFoundException("project with {} not found".format(filt))
+ else:
+ return None
+ return projs[0]
- def remove_role_from_user(self, user, project, role):
+ def update_project(self, project_id, project_info):
"""
- Remove a role from a user in a project.
-
- :param user: username.
- :param project: project name.
- :param role: role name.
- :raises AuthconnOperationException: if role assignment revocation failed.
+ Change the information of a project
+ :param project_id: project to be changed
+ :param project_info: full project info
+ :return: None
"""
raise AuthconnNotImplementedException("Should have implemented this")