+
+ def get_user_list(self, filter_q=None):
+ """
+ Get user list.
+
+ :param filter_q: dictionary to filter user list by:
+ name (username is also admitted). If a user id is equal to the filter name, it is also provided
+ other
+ :return: returns a list of users.
+ """
+ filt = filter_q or {}
+ if "name" in filt: # backward compatibility
+ filt["username"] = filt.pop("name")
+ if filt.get("username") and is_valid_uuid(filt["username"]):
+ # username cannot be a uuid. If this is the case, change from username to _id
+ filt["_id"] = filt.pop("username")
+ users = self.db.get_list(self.users_collection, filt)
+ project_id_name = {}
+ role_id_name = {}
+ for user in users:
+ prms = user.get("project_role_mappings")
+ projects = user.get("projects")
+ if prms:
+ projects = []
+ # add project_name and role_name. Generate projects for backward compatibility
+ for prm in prms:
+ project_id = prm["project"]
+ if project_id not in project_id_name:
+ pr = self.db.get_one(
+ self.projects_collection,
+ {BaseTopic.id_field("projects", project_id): project_id},
+ fail_on_empty=False,
+ )
+ project_id_name[project_id] = pr["name"] if pr else None
+ prm["project_name"] = project_id_name[project_id]
+ if prm["project_name"] not in projects:
+ projects.append(prm["project_name"])
+
+ role_id = prm["role"]
+ if role_id not in role_id_name:
+ role = self.db.get_one(
+ self.roles_collection,
+ {BaseTopic.id_field("roles", role_id): role_id},
+ fail_on_empty=False,
+ )
+ role_id_name[role_id] = role["name"] if role else None
+ prm["role_name"] = role_id_name[role_id]
+ user["projects"] = projects # for backward compatibility
+ elif projects:
+ # user created with an old version. Create a project_role mapping with role project_admin
+ user["project_role_mappings"] = []
+ role = self.db.get_one(
+ self.roles_collection,
+ {BaseTopic.id_field("roles", "project_admin"): "project_admin"},
+ )
+ for p_id_name in projects:
+ pr = self.db.get_one(
+ self.projects_collection,
+ {BaseTopic.id_field("projects", p_id_name): p_id_name},
+ )
+ prm = {
+ "project": pr["_id"],
+ "project_name": pr["name"],
+ "role_name": "project_admin",
+ "role": role["_id"],
+ }
+ user["project_role_mappings"].append(prm)
+ else:
+ user["projects"] = []
+ user["project_role_mappings"] = []
+
+ return users
+
+ def get_project_list(self, filter_q={}):
+ """
+ Get role list.
+
+ :return: returns the list of projects.
+ """
+ return self.db.get_list(self.projects_collection, filter_q)
+
+ def create_project(self, project_info):
+ """
+ Create a project.
+
+ :param project: full project info.
+ :return: the internal id of the created project
+ :raises AuthconnOperationException: if project creation failed.
+ """
+ pid = self.db.create(self.projects_collection, project_info)
+ return pid
+
+ def delete_project(self, project_id):
+ """
+ Delete a project.
+
+ :param project_id: project identifier.
+ :raises AuthconnOperationException: if project deletion failed.
+ """
+ idf = BaseTopic.id_field("projects", project_id)
+ r = self.db.del_one(self.projects_collection, {idf: project_id})
+ idf = "project_id" if idf == "_id" else "project_name"
+ self.db.del_list(self.tokens_collection, {idf: project_id})
+ return r
+
+ def update_project(self, project_id, project_info):
+ """
+ Change the name of a project
+
+ :param project_id: project to be changed
+ :param project_info: full project info
+ :return: None
+ :raises AuthconnOperationException: if project update failed.
+ """
+ self.db.set_one(
+ self.projects_collection,
+ {BaseTopic.id_field("projects", project_id): project_id},
+ project_info,
+ )