X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osmclient%2Fsol005%2Fuser.py;h=d6c1e16b832e893fd090d07786ee018e09ce51f3;hb=70208ca1de7ff0e91a17d8f918d5b6044e9fa388;hp=1321aa5a80cdda2d5a37481dc27928c4b8616a75;hpb=a63fb3c6887ecf8221e868e0c8d39d51319ea085;p=osm%2Fosmclient.git diff --git a/osmclient/sol005/user.py b/osmclient/sol005/user.py index 1321aa5..d6c1e16 100644 --- a/osmclient/sol005/user.py +++ b/osmclient/sol005/user.py @@ -19,189 +19,220 @@ OSM user mgmt API """ -from osmclient.common import utils from osmclient.common.exceptions import ClientException from osmclient.common.exceptions import NotFound import json +import logging class User(object): def __init__(self, http=None, client=None): self._http = http self._client = client - self._apiName = '/admin' - self._apiVersion = '/v1' - self._apiResource = '/users' - self._apiBase = '{}{}{}'.format(self._apiName, - self._apiVersion, self._apiResource) + self._logger = logging.getLogger("osmclient") + self._apiName = "/admin" + self._apiVersion = "/v1" + self._apiResource = "/users" + self._apiBase = "{}{}{}".format( + self._apiName, self._apiVersion, self._apiResource + ) def create(self, name, user): - """Creates a new OSM user - """ - if len(user["projects"]) == 1: + """Creates a new OSM user""" + self._logger.debug("") + self._client.get_token() + if not user["projects"] or ( + len(user["projects"]) == 1 and not user["projects"][0] + ): + del user["projects"] + elif len(user["projects"]) == 1: user["projects"] = user["projects"][0].split(",") - if user["project-role-mappings"]: + if user["project_role_mappings"]: project_role_mappings = [] - for set_mapping in user["project-role-mappings"]: - set_mapping_clean = [m.trim() for m in set_mapping.split(",")] + for set_mapping in user["project_role_mappings"]: + set_mapping_clean = [m.strip() for m in set_mapping.split(",")] project, roles = set_mapping_clean[0], set_mapping_clean[1:] for role in roles: - mapping = [project, role] + mapping = {"project": project, "role": role} - if mapping not in project_role_mappings: + if mapping not in project_role_mappings: project_role_mappings.append(mapping) - - user["project-role-mappings"] = project_role_mappings - - http_code, resp = self._http.post_cmd(endpoint=self._apiBase, - postfields_dict=user) - #print('HTTP CODE: {}'.format(http_code)) - #print('RESP: {}'.format(resp)) - if http_code in (200, 201, 202, 204): - if resp: - resp = json.loads(resp) - if not resp or 'id' not in resp: - raise ClientException('unexpected response from server - {}'.format( - resp)) - print(resp['id']) + user["project_role_mappings"] = project_role_mappings else: - msg = "" - if resp: - try: - msg = json.loads(resp) - except ValueError: - msg = resp - raise ClientException("failed to create user {} - {}".format(name, msg)) + del user["project_role_mappings"] + + http_code, resp = self._http.post_cmd( + endpoint=self._apiBase, postfields_dict=user, skip_query_admin=True + ) + # print('HTTP CODE: {}'.format(http_code)) + # print('RESP: {}'.format(resp)) + # if http_code in (200, 201, 202, 204): + if resp: + resp = json.loads(resp) + if not resp or "id" not in resp: + raise ClientException("unexpected response from server - {}".format(resp)) + print(resp["id"]) + # else: + # msg = "" + # if resp: + # try: + # msg = json.loads(resp) + # except ValueError: + # msg = resp + # raise ClientException("failed to create user {} - {}".format(name, msg)) def update(self, name, user): - """Updates an existing OSM user identified by name - """ - myuser = self.get(name) + """Updates an existing OSM user identified by name""" + self._logger.debug("") + self._client.get_token() + # print(user) + myuser = self.get(name) update_user = { - "_id": myuser["_id"], - "name": myuser["user"], - "project_role_mappings": myuser["project_role_mappings"] + "add_project_role_mappings": [], + "remove_project_role_mappings": [], } # if password is defined, update the password if user["password"]: update_user["password"] = user["password"] - + if user["username"]: + update_user["username"] = user["username"] + if user["set-project"]: + # Remove project and insert project role mapping for set_project in user["set-project"]: - set_project_clean = [m.trim() for m in set_project.split(",")] + + set_project_clean = [m.strip() for m in set_project.split(",")] project, roles = set_project_clean[0], set_project_clean[1:] - update_user["project_role_mappings"] = [mapping for mapping - in update_user["project_role_mappings"] - if mapping[0] != project] + update_user["remove_project_role_mappings"].append({"project": project}) for role in roles: - update_user["project_role_mappings"].append([project, role]) - + mapping = {"project": project, "role": role} + update_user["add_project_role_mappings"].append(mapping) + if user["remove-project"]: for remove_project in user["remove-project"]: - update_user["project_role_mappings"] = [mapping for mapping - in update_user["project_role_mappings"] - if mapping[0] != remove_project] - + update_user["remove_project_role_mappings"].append( + {"project": remove_project} + ) + if user["add-project-role"]: for add_project_role in user["add-project-role"]: - add_project_role_clean = [m.trim() for m in add_project_role.split(",")] + add_project_role_clean = [ + m.strip() for m in add_project_role.split(",") + ] project, roles = add_project_role_clean[0], add_project_role_clean[1:] for role in roles: - mapping = [project, role] - if mapping not in update_user["project_role_mappings"]: - update_user["project_role_mappings"].append(mapping) - + mapping = {"project": project, "role": role} + update_user["add_project_role_mappings"].append(mapping) + if user["remove-project-role"]: for remove_project_role in user["remove-project-role"]: - remove_project_role_clean = [m.trim() for m in remove_project_role.split(",")] - project, roles = remove_project_role_clean[0], remove_project_role_clean[1:] + remove_project_role_clean = [ + m.strip() for m in remove_project_role.split(",") + ] + project, roles = ( + remove_project_role_clean[0], + remove_project_role_clean[1:], + ) for role in roles: - mapping_to_remove = [project, role] - update_user["project_role_mappings"] = [mapping for mapping - in update_user["project_role_mappings"] - if mapping != mapping_to_remove] - - if not user["password"] and not user["set-project"] and not user["remove-project"] \ - and not user["add-project-role"] and not user["remove-project-role"]: - raise ClientException("At least one parameter should be defined.") - - http_code, resp = self._http.put_cmd(endpoint='{}/{}'.format(self._apiBase,myuser['_id']), - postfields_dict=update_user) - #print('HTTP CODE: {}'.format(http_code)) - #print('RESP: {}'.format(resp)) - if http_code in (200, 201, 202, 204): + mapping = {"project": project, "role": role} + update_user["remove_project_role_mappings"].append(mapping) + + if not update_user["remove_project_role_mappings"]: + del update_user["remove_project_role_mappings"] + if not update_user["add_project_role_mappings"]: + del update_user["add_project_role_mappings"] + if not update_user: + raise ClientException("At least something should be changed.") + + http_code, resp = self._http.patch_cmd( + endpoint="{}/{}".format(self._apiBase, myuser["_id"]), + postfields_dict=update_user, + skip_query_admin=True, + ) + # print('HTTP CODE: {}'.format(http_code)) + # print('RESP: {}'.format(resp)) + if http_code in (200, 201, 202): if resp: resp = json.loads(resp) - if not resp or 'id' not in resp: - raise ClientException('unexpected response from server - {}'.format( - resp)) - print(resp['id']) - else: - msg = "" - if resp: - try: - msg = json.loads(resp) - except ValueError: - msg = resp - raise ClientException("failed to update user {} - {}".format(name, msg)) + if not resp or "id" not in resp: + raise ClientException( + "unexpected response from server - {}".format(resp) + ) + print(resp["id"]) + elif http_code == 204: + print("Updated") + # else: + # msg = "" + # if resp: + # try: + # msg = json.loads(resp) + # except ValueError: + # msg = resp + # raise ClientException("failed to update user {} - {}".format(name, msg)) def delete(self, name, force=False): - """Deletes an existing OSM user identified by name - """ + """Deletes an existing OSM user identified by name""" + self._logger.debug("") + self._client.get_token() user = self.get(name) - querystring = '' + querystring = "" if force: - querystring = '?FORCE=True' - http_code, resp = self._http.delete_cmd('{}/{}{}'.format(self._apiBase, - user['_id'], querystring)) - #print('HTTP CODE: {}'.format(http_code)) - #print('RESP: {}'.format(resp)) + querystring = "?FORCE=True" + http_code, resp = self._http.delete_cmd( + "{}/{}{}".format(self._apiBase, user["_id"], querystring), + skip_query_admin=True, + ) + # print('HTTP CODE: {}'.format(http_code)) + # print('RESP: {}'.format(resp)) if http_code == 202: - print('Deletion in progress') + print("Deletion in progress") elif http_code == 204: - print('Deleted') - elif resp and 'result' in resp: - print('Deleted') + print("Deleted") + elif resp and "result" in resp: + print("Deleted") else: - msg = "" - if resp: - try: - msg = json.loads(resp) - except ValueError: - msg = resp + msg = resp or "" + # if resp: + # try: + # msg = json.loads(resp) + # except ValueError: + # msg = resp raise ClientException("failed to delete user {} - {}".format(name, msg)) def list(self, filter=None): - """Returns the list of OSM users - """ - filter_string = '' + """Returns the list of OSM users""" + self._logger.debug("") + self._client.get_token() + filter_string = "" if filter: - filter_string = '?{}'.format(filter) - resp = self._http.get_cmd('{}{}'.format(self._apiBase,filter_string)) - #print('RESP: {}'.format(resp)) + filter_string = "?{}".format(filter) + _, resp = self._http.get2_cmd( + "{}{}".format(self._apiBase, filter_string), skip_query_admin=True + ) + # print('RESP: {}'.format(resp)) if resp: - return resp + return json.loads(resp) return list() def get(self, name): - """Returns an OSM user based on name or id - """ - if utils.validate_uuid4(name): - for user in self.list(): - if name == user['_id']: - return user - else: - for user in self.list(): - if name == user['username']: - return user + """Returns an OSM user based on name or id""" + self._logger.debug("") + self._client.get_token() + # keystone with external LDAP contains large ids, not uuid format + # utils.validate_uuid4(name) cannot be used + user_list = self.list() + for user in user_list: + if name == user["_id"]: + return user + for user in user_list: + if name == user["username"]: + return user raise NotFound("User {} not found".format(name)) - -