+ user_obj = self.keystone.users.get(user_id)
+ domain_id = user_obj.domain_id
+ domain_name = self.domains_id2name.get(domain_id)
+ if domain_name in self.user_domain_ro_list:
+ raise AuthconnConflictException(
+ "Cannot delete user {} belonging to a read only domain {}".format(
+ user_id, domain_name
+ )
+ )
+
+ result, detail = self.keystone.users.delete(user_id)
+ if result.status_code != 204:
+ raise ClientException("error {} {}".format(result.status_code, detail))
+ return True
+ except ClientException as e:
+ # self.logger.exception("Error during user deletion using keystone: {}".format(e))
+ raise AuthconnOperationException(
+ "Error during user deletion using Keystone: {}".format(e)
+ )
+
+ def get_user_list(self, filter_q=None):
+ """
+ Get user list.
+
+ :param filter_q: dictionary to filter user list by one or several
+ _id:
+ name (username is also admitted). If a user id is equal to the filter name, it is also provided
+ domain_id, domain_name
+ :return: returns a list of users.
+ """
+ try:
+ self._get_domains()
+ filter_name = filter_domain = None
+ if filter_q:
+ filter_name = filter_q.get("name") or filter_q.get("username")
+ if filter_q.get("domain_name"):
+ filter_domain = self._get_domain_id(
+ filter_q["domain_name"], fail_if_not_found=False
+ )
+ # If domain is not found, use the same name to obtain an empty list
+ filter_domain = filter_domain or filter_q["domain_name"]
+ if filter_q.get("domain_id"):
+ filter_domain = filter_q["domain_id"]
+
+ users = self.keystone.users.list(name=filter_name, domain=filter_domain)
+ # get users from user_domain_name_list[1:], because it will not be provided in case of LDAP
+ if filter_domain is None and len(self.user_domain_name_list) > 1:
+ for user_domain in self.user_domain_name_list[1:]:
+ domain_id = self._get_domain_id(
+ user_domain, fail_if_not_found=False
+ )
+ if not domain_id:
+ continue
+ # find if users of this domain are already provided. In this case ignore
+ for u in users:
+ if u.domain_id == domain_id:
+ break
+ else:
+ users += self.keystone.users.list(
+ name=filter_name, domain=domain_id
+ )
+
+ # if filter name matches a user id, provide it also
+ if filter_name:
+ try:
+ user_obj = self.keystone.users.get(filter_name)
+ if user_obj not in users:
+ users.append(user_obj)
+ except Exception:
+ pass
+
+ users = [
+ {
+ "username": user.name,
+ "_id": user.id,
+ "id": user.id,
+ "_admin": user.to_dict().get("_admin", {}), # TODO: REVISE
+ "domain_name": self.domains_id2name.get(user.domain_id),
+ }
+ for user in users
+ if user.name != self.admin_username
+ ]
+
+ if filter_q and filter_q.get("_id"):
+ users = [user for user in users if filter_q["_id"] == user["_id"]]
+
+ for user in users:
+ user["project_role_mappings"] = []
+ user["projects"] = []
+ projects = self.keystone.projects.list(user=user["_id"])
+ for project in projects:
+ user["projects"].append(project.name)
+
+ roles = self.keystone.roles.list(
+ user=user["_id"], project=project.id
+ )
+ for role in roles:
+ prm = {
+ "project": project.id,
+ "project_name": project.name,
+ "role_name": role.name,
+ "role": role.id,
+ }
+ user["project_role_mappings"].append(prm)
+
+ return users
+ except ClientException as e:
+ # self.logger.exception("Error during user listing using keystone: {}".format(e))
+ raise AuthconnOperationException(
+ "Error during user listing using Keystone: {}".format(e)
+ )
+
+ def get_role_list(self, filter_q=None):
+ """
+ Get role list.
+
+ :param filter_q: dictionary to filter role list by _id and/or name.
+ :return: returns the list of roles.
+ """
+ try:
+ filter_name = None
+ if filter_q:
+ filter_name = filter_q.get("name")
+ roles_list = self.keystone.roles.list(name=filter_name)
+
+ roles = [
+ {
+ "name": role.name,
+ "_id": role.id,
+ "_admin": role.to_dict().get("_admin", {}),
+ "permissions": role.to_dict().get("permissions", {}),
+ }
+ for role in roles_list
+ if role.name != "service"
+ ]
+
+ if filter_q and filter_q.get("_id"):
+ roles = [role for role in roles if filter_q["_id"] == role["_id"]]