# msg = resp
# raise ClientException("failed to create user {} - {}".format(name, msg))
- def update(self, name, user):
+ def update(self, name, user, pwd_change=False):
"""Updates an existing OSM user identified by name"""
self._logger.debug("")
- self._client.get_token()
+ if pwd_change:
+ token_info = self._client.get_token(pwd_change)
+ else:
+ token_info = self._client.get_token()
# print(user)
myuser = self.get(name)
update_user = {
"remove_project_role_mappings": [],
}
- # if password is defined, update the password
- if user["password"]:
- update_user["password"] = user["password"]
- if user["username"]:
+ if user.get("username"):
update_user["username"] = user["username"]
+ if user.get("new_password"):
+ update_user["password"] = user["new_password"]
+ if pwd_change and user.get("current_password"):
+ update_user["old_password"] = user["current_password"]
- if user["set-project"]:
+ if user.get("set-project"):
# Remove project and insert project role mapping
for set_project in user["set-project"]:
-
set_project_clean = [m.strip() for m in set_project.split(",")]
project, roles = set_project_clean[0], set_project_clean[1:]
mapping = {"project": project, "role": role}
update_user["add_project_role_mappings"].append(mapping)
- if user["remove-project"]:
+ if user.get("remove-project"):
for remove_project in user["remove-project"]:
update_user["remove_project_role_mappings"].append(
{"project": remove_project}
)
- if user["add-project-role"]:
+ if user.get("add-project-role"):
for add_project_role in user["add-project-role"]:
add_project_role_clean = [
m.strip() for m in add_project_role.split(",")
mapping = {"project": project, "role": role}
update_user["add_project_role_mappings"].append(mapping)
- if user["remove-project-role"]:
+ if user.get("remove-project-role"):
for remove_project_role in user["remove-project-role"]:
remove_project_role_clean = [
m.strip() for m in remove_project_role.split(",")
mapping = {"project": project, "role": role}
update_user["remove_project_role_mappings"].append(mapping)
+ if user.get("unlock"):
+ if token_info.get("admin_show"):
+ update_user["unlock"] = user["unlock"]
+ update_user["system_admin_id"] = token_info.get("user_id")
+ else:
+ raise ClientException(
+ "{} does not have privilege to unlock {}".format(
+ token_info.get("username"), myuser.get("username")
+ )
+ )
+
+ if user.get("renew"):
+ if token_info.get("admin_show"):
+ update_user["renew"] = user["renew"]
+ update_user["system_admin_id"] = token_info.get("user_id")
+ else:
+ raise ClientException(
+ "{} does not have privilege to renew {}".format(
+ token_info.get("username"), myuser.get("username")
+ )
+ )
+
if not update_user["remove_project_role_mappings"]:
del update_user["remove_project_role_mappings"]
if not update_user["add_project_role_mappings"]:
def list(self, filter=None):
"""Returns the list of OSM users"""
self._logger.debug("")
- self._client.get_token()
+ response = self._client.get_token()
+ admin_show = None
+ if response:
+ admin_show = response.get("admin_show")
+
filter_string = ""
if filter:
filter_string = "?{}".format(filter)
"{}{}".format(self._apiBase, filter_string), skip_query_admin=True
)
# print('RESP: {}'.format(resp))
- if resp:
+ if resp and response:
+ return json.loads(resp), admin_show
+ elif resp:
return json.loads(resp)
return list()