- self.keystone = client.Client(session=self.sess)
-
- def authenticate_with_user_password(self, user, password):
- """
- Authenticate a user using username and password.
-
- :param user: username
- :param password: password
- :return: an unscoped token that grants access to project list
- """
- try:
- user_id = list(filter(lambda x: x.name == user, self.keystone.users.list()))[0].id
- project_names = [project.name for project in self.keystone.projects.list(user=user_id)]
-
- token = self.keystone.get_raw_token_from_identity_service(
- auth_url=self.auth_url,
- username=user,
- password=password,
- user_domain_name=self.user_domain_name,
- project_domain_name=self.project_domain_name)
-
- return token["auth_token"], project_names
- except ClientException as e:
- self.logger.exception("Error during user authentication using keystone. Method: basic: {}".format(e))
- raise AuthException("Error during user authentication using Keystone: {}".format(e),
- http_code=HTTPStatus.UNAUTHORIZED)
-
- def authenticate_with_token(self, token, project=None):
- """
- Authenticate a user using a token. Can be used to revalidate the token
- or to get a scoped token.
-
- :param token: a valid token.
- :param project: (optional) project for a scoped token.
- :return: return a revalidated token, scoped if a project was passed or
- the previous token was already scoped.
- """
- try:
- token_info = self.keystone.tokens.validate(token=token)
- projects = self.keystone.projects.list(user=token_info["user"]["id"])
- project_names = [project.name for project in projects]
-
- new_token = self.keystone.get_raw_token_from_identity_service(
- auth_url=self.auth_url,
- token=token,
- project_name=project,
- user_domain_name=self.user_domain_name,
- project_domain_name=self.project_domain_name)
-
- return new_token["auth_token"], project_names
- except ClientException as e:
- self.logger.exception("Error during user authentication using keystone. Method: bearer: {}".format(e))
- raise AuthException("Error during user authentication using Keystone: {}".format(e),
- http_code=HTTPStatus.UNAUTHORIZED)
+ self.keystone = client.Client(
+ session=self.sess, endpoint_override=self.auth_url
+ )
+
+ def authenticate(self, credentials, token_info=None):
+ """
+ Authenticate a user using username/password or token_info, plus project
+ :param credentials: dictionary that contains:
+ username: name, id or None
+ password: password or None
+ project_id: name, id, or None. If None first found project will be used to get an scope token
+ project_domain_name: (Optional) To use a concrete domain for the project
+ user_domain_name: (Optional) To use a concrete domain for the project
+ other items are allowed and ignored
+ :param token_info: previous token_info to obtain authorization
+ :return: the scoped token info or raises an exception. The token is a dictionary with:
+ _id: token string id,
+ username: username,
+ project_id: scoped_token project_id,
+ project_name: scoped_token project_name,
+ expires: epoch time when it expires,
+
+ """
+ username = None
+ user_id = None
+ project_id = None
+ project_name = None
+ if credentials.get("project_domain_name"):
+ project_domain_name_list = (credentials["project_domain_name"],)
+ else:
+ project_domain_name_list = self.project_domain_name_list
+ if credentials.get("user_domain_name"):
+ user_domain_name_list = (credentials["user_domain_name"],)
+ else:
+ user_domain_name_list = self.user_domain_name_list
+
+ for index, project_domain_name in enumerate(project_domain_name_list):
+ user_domain_name = user_domain_name_list[index]
+ try:
+ if credentials.get("username"):
+ if is_valid_uuid(credentials["username"]):
+ user_id = credentials["username"]
+ else:
+ username = credentials["username"]
+
+ # get an unscoped token firstly
+ unscoped_token = self.keystone.get_raw_token_from_identity_service(
+ auth_url=self.auth_url,
+ user_id=user_id,
+ username=username,
+ password=credentials.get("password"),
+ user_domain_name=user_domain_name,
+ project_domain_name=project_domain_name,
+ )
+ elif token_info:
+ unscoped_token = self.keystone.tokens.validate(
+ token=token_info.get("_id")
+ )
+ else:
+ raise AuthException(
+ "Provide credentials: username/password or Authorization Bearer token",
+ http_code=HTTPStatus.UNAUTHORIZED,
+ )
+
+ if not credentials.get("project_id"):
+ # get first project for the user
+ project_list = self.keystone.projects.list(
+ user=unscoped_token["user"]["id"]
+ )
+ if not project_list:
+ raise AuthException(
+ "The user {} has not any project and cannot be used for authentication".format(
+ credentials.get("username")
+ ),
+ http_code=HTTPStatus.UNAUTHORIZED,
+ )
+ project_id = project_list[0].id
+ else:
+ if is_valid_uuid(credentials["project_id"]):
+ project_id = credentials["project_id"]
+ else:
+ project_name = credentials["project_id"]
+
+ scoped_token = self.keystone.get_raw_token_from_identity_service(
+ auth_url=self.auth_url,
+ project_name=project_name,
+ project_id=project_id,
+ user_domain_name=user_domain_name,
+ project_domain_name=project_domain_name,
+ token=unscoped_token["auth_token"],
+ )
+
+ auth_token = {
+ "_id": scoped_token.auth_token,
+ "id": scoped_token.auth_token,
+ "user_id": scoped_token.user_id,
+ "username": scoped_token.username,
+ "project_id": scoped_token.project_id,
+ "project_name": scoped_token.project_name,
+ "project_domain_name": scoped_token.project_domain_name,
+ "user_domain_name": scoped_token.user_domain_name,
+ "expires": scoped_token.expires.timestamp(),
+ "issued_at": scoped_token.issued.timestamp(),
+ }
+
+ return auth_token
+ except ClientException as e:
+ if (
+ index >= len(user_domain_name_list) - 1
+ or index >= len(project_domain_name_list) - 1
+ ):
+ # if last try, launch exception
+ # self.logger.exception("Error during user authentication using keystone: {}".format(e))
+ raise AuthException(
+ "Error during user authentication using Keystone: {}".format(e),
+ http_code=HTTPStatus.UNAUTHORIZED,
+ )