plugins with the definition of the methods to be implemented.
"""
-__author__ = "Eduardo Sousa <esousa@whitestack.com>"
+__author__ = "Eduardo Sousa <esousa@whitestack.com>, " \
+ "Pedro de la Cruz Ramos <pdelacruzramos@altran.com>"
__date__ = "$27-jul-2018 23:59:59$"
from http import HTTPStatus
+from osm_nbi.base_topic import BaseTopic
class AuthException(Exception):
Each Auth backend connector plugin must be a subclass of
Authconn class.
"""
- def __init__(self, config):
+ def __init__(self, config, db, role_permissions):
"""
Constructor of the Authconn class.
-
- Note: each subclass
-
:param config: configuration dictionary containing all the
necessary configuration parameters.
+ :param db: internal database classs
+ :param role_permissions: read only role permission list
"""
self.config = config
+ self.role_permissions = role_permissions
- def authenticate(self, user, password, project=None, token_info=None):
+ def authenticate(self, credentials, token_info=None):
"""
Authenticate a user using username/password or token_info, plus project
- :param user: user: name, id or None
- :param password: password or None
- :param project: name, id, or None. If None first found project will be used to get an scope token
+ :param credentials: dictionary that contains:
+ username: name, id or None
+ password: password or None
+ project_id: name, id, or None. If None first found project will be used to get an scope token
+ other items are allowed for specific auth backends
:param token_info: previous token_info to obtain authorization
:return: the scoped token info or raises an exception. The token is a dictionary with:
_id: token string id,
"""
raise AuthconnNotImplementedException("Should have implemented this")
- # def authenticate_with_token(self, token, project=None):
- # """
- # Authenticate a user using a token. Can be used to revalidate the token
- # or to get a scoped token.
- #
- # :param token: a valid token.
- # :param project: (optional) project for a scoped token.
- # :return: return a revalidated token, scoped if a project was passed or
- # the previous token was already scoped.
- # """
- # raise AuthconnNotImplementedException("Should have implemented this")
-
def validate_token(self, token):
"""
Check if the token is valid.
"""
raise AuthconnNotImplementedException("Should have implemented this")
- def get_user_project_list(self, token):
- """
- Get all the projects associated with a user.
-
- :param token: valid token
- :return: list of projects
- """
- raise AuthconnNotImplementedException("Should have implemented this")
-
- def get_user_role_list(self, token):
- """
- Get role list for a scoped project.
-
- :param token: scoped token.
- :return: returns the list of roles for the user in that project. If
- the token is unscoped it returns None.
- """
- raise AuthconnNotImplementedException("Should have implemented this")
-
- def create_user(self, user, password):
+ def create_user(self, user_info):
"""
Create a user.
- :param user: username.
- :param password: password.
+ :param user_info: full user info.
:raises AuthconnOperationException: if user creation failed.
"""
raise AuthconnNotImplementedException("Should have implemented this")
- def update_user(self, user, new_name=None, new_password=None):
+ def update_user(self, user_info):
"""
Change the user name and/or password.
- :param user: username or user_id
- :param new_name: new name
- :param new_password: new password.
- :raises AuthconnOperationException: if change failed.
+ :param user_info: user info modifications
+ :raises AuthconnNotImplementedException: if function not implemented
"""
raise AuthconnNotImplementedException("Should have implemented this")
:return: returns a list of users.
"""
- def create_role(self, role):
+ def get_user(self, _id, fail=True):
+ """
+ Get one user
+ :param _id: id or name
+ :param fail: True to raise exception on not found. False to return None on not found
+ :return: dictionary with the user information
+ """
+ filt = {BaseTopic.id_field("users", _id): _id}
+ users = self.get_user_list(filt)
+ if not users:
+ if fail:
+ raise AuthconnNotFoundException("User with {} not found".format(filt), http_code=HTTPStatus.NOT_FOUND)
+ else:
+ return None
+ return users[0]
+
+ def create_role(self, role_info):
"""
Create a role.
- :param role: role name.
+ :param role_info: full role info.
:raises AuthconnOperationException: if role creation failed.
"""
raise AuthconnNotImplementedException("Should have implemented this")
"""
raise AuthconnNotImplementedException("Should have implemented this")
- def update_role(self, role, new_name):
+ def get_role(self, _id, fail=True):
+ """
+ Get one role
+ :param _id: id or name
+ :param fail: True to raise exception on not found. False to return None on not found
+ :return: dictionary with the role information
+ """
+ filt = {BaseTopic.id_field("roles", _id): _id}
+ roles = self.get_role_list(filt)
+ if not roles:
+ if fail:
+ raise AuthconnNotFoundException("Role with {} not found".format(filt))
+ else:
+ return None
+ return roles[0]
+
+ def update_role(self, role_info):
"""
- Change the name of a role
- :param role: role name or id to be changed
- :param new_name: new name
+ Change the information of a role
+ :param role_info: full role info
:return: None
"""
raise AuthconnNotImplementedException("Should have implemented this")
- def create_project(self, project):
+ def create_project(self, project_info):
"""
Create a project.
- :param project: project name.
+ :param project_info: full project info.
:return: the internal id of the created project
:raises AuthconnOperationException: if project creation failed.
"""
"""
raise AuthconnNotImplementedException("Should have implemented this")
- def update_project(self, project_id, new_name):
+ def get_project(self, _id, fail=True):
"""
- Change the name of a project
- :param project_id: project to be changed
- :param new_name: new name
- :return: None
+ Get one project
+ :param _id: id or name
+ :param fail: True to raise exception on not found. False to return None on not found
+ :return: dictionary with the project information
"""
- raise AuthconnNotImplementedException("Should have implemented this")
+ filt = {BaseTopic.id_field("projects", _id): _id}
+ projs = self.get_project_list(filt)
+ if not projs:
+ if fail:
+ raise AuthconnNotFoundException("project with {} not found".format(filt))
+ else:
+ return None
+ return projs[0]
- def assign_role_to_user(self, user, project, role):
+ def update_project(self, project_id, project_info):
"""
- Assigning a role to a user in a project.
-
- :param user: username.
- :param project: project name.
- :param role: role name.
- :raises AuthconnOperationException: if role assignment failed.
- """
- raise AuthconnNotImplementedException("Should have implemented this")
-
- def remove_role_from_user(self, user, project, role):
- """
- Remove a role from a user in a project.
-
- :param user: username.
- :param project: project name.
- :param role: role name.
- :raises AuthconnOperationException: if role assignment revocation failed.
+ Change the information of a project
+ :param project_id: project to be changed
+ :param project_info: full project info
+ :return: None
"""
raise AuthconnNotImplementedException("Should have implemented this")