+ rc = self.db.del_one(self.roles_collection, {"_id": role_id})
+ self.db.del_list(self.tokens_collection, {"roles.id": role_id})
+ return rc
+
+ def update_role(self, role_info):
+ """
+ Update a role.
+
+ :param role_info: full role info.
+ :return: returns the role name and id.
+ :raises AuthconnOperationException: if user creation failed.
+ """
+ rid = role_info["_id"]
+ self.db.set_one(self.roles_collection, {"_id": rid}, role_info)
+ return {"_id": rid, "name": role_info["name"]}
+
+ def create_user(self, user_info):
+ """
+ Create a user.
+
+ :param user_info: full user info.
+ :return: returns the username and id of the user.
+ """
+ BaseTopic.format_on_new(user_info, make_public=False)
+ salt = uuid4().hex
+ user_info["_admin"]["salt"] = salt
+ present = time()
+ if not user_info["username"] == "admin":
+ if self.config.get("pwd_expiry_check"):
+ user_info["_admin"]["modified_time"] = present
+ user_info["_admin"]["expire_time"] = present
+ if "password" in user_info:
+ user_info["password"] = sha256(
+ user_info["password"].encode("utf-8") + salt.encode("utf-8")
+ ).hexdigest()
+ # "projects" are not stored any more
+ if "projects" in user_info:
+ del user_info["projects"]
+ self.db.create(self.users_collection, user_info)
+ return {"username": user_info["username"], "_id": user_info["_id"]}
+
+ def update_user(self, user_info):
+ """
+ Change the user name and/or password.
+
+ :param user_info: user info modifications
+ """
+ uid = user_info["_id"]
+ old_pwd = user_info.get("old_password")
+ user_data = self.db.get_one(
+ self.users_collection, {BaseTopic.id_field("users", uid): uid}
+ )
+ if old_pwd:
+ salt = user_data["_admin"]["salt"]
+ shadow_password = sha256(
+ old_pwd.encode("utf-8") + salt.encode("utf-8")
+ ).hexdigest()
+ if shadow_password != user_data["password"]:
+ raise AuthconnConflictException(
+ "Incorrect password", http_code=HTTPStatus.CONFLICT
+ )
+ BaseTopic.format_on_edit(user_data, user_info)
+ # User Name
+ usnm = user_info.get("username")
+ if usnm:
+ user_data["username"] = usnm
+ # If password is given and is not already encripted
+ pswd = user_info.get("password")
+ if pswd and (
+ len(pswd) != 64 or not re.match("[a-fA-F0-9]*", pswd)
+ ): # TODO: Improve check?
+ salt = uuid4().hex
+ if "_admin" not in user_data:
+ user_data["_admin"] = {}
+ user_data["_admin"]["salt"] = salt
+ user_data["password"] = sha256(
+ pswd.encode("utf-8") + salt.encode("utf-8")
+ ).hexdigest()
+ if not user_data["username"] == "admin":
+ if self.config.get("pwd_expiry_check"):
+ present = time()
+ if self.config.get("days"):
+ expire = present + 86400 * self.config.get("days")
+ user_data["_admin"]["modified_time"] = present
+ user_data["_admin"]["expire_time"] = expire
+ # Project-Role Mappings
+ # TODO: Check that user_info NEVER includes "project_role_mappings"
+ if "project_role_mappings" not in user_data:
+ user_data["project_role_mappings"] = []
+ for prm in user_info.get("add_project_role_mappings", []):
+ user_data["project_role_mappings"].append(prm)
+ for prm in user_info.get("remove_project_role_mappings", []):
+ for pidf in ["project", "project_name"]:
+ for ridf in ["role", "role_name"]:
+ try:
+ user_data["project_role_mappings"].remove(
+ {"role": prm[ridf], "project": prm[pidf]}
+ )
+ except KeyError:
+ pass
+ except ValueError:
+ pass
+ idf = BaseTopic.id_field("users", uid)
+ self.db.set_one(self.users_collection, {idf: uid}, user_data)
+ if user_info.get("remove_project_role_mappings"):
+ idf = "user_id" if idf == "_id" else idf
+ self.db.del_list(self.tokens_collection, {idf: uid})
+
+ def delete_user(self, user_id):
+ """
+ Delete user.
+
+ :param user_id: user identifier.
+ :raises AuthconnOperationException: if user deletion failed.
+ """
+ self.db.del_one(self.users_collection, {"_id": user_id})
+ self.db.del_list(self.tokens_collection, {"user_id": user_id})