Fix black errors shown with the latest version of black
[osm/osmclient.git] / osmclient / sol005 / user.py
1 #
2 # Copyright 2018 Telefonica Investigacion y Desarrollo S.A.U.
3 #
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17
18 """
19 OSM user mgmt API
20 """
21
22 from osmclient.common.exceptions import ClientException
23 from osmclient.common.exceptions import NotFound
24 import json
25 import logging
26
27
28 class User(object):
29 def __init__(self, http=None, client=None):
30 self._http = http
31 self._client = client
32 self._logger = logging.getLogger("osmclient")
33 self._apiName = "/admin"
34 self._apiVersion = "/v1"
35 self._apiResource = "/users"
36 self._apiBase = "{}{}{}".format(
37 self._apiName, self._apiVersion, self._apiResource
38 )
39
40 def create(self, name, user):
41 """Creates a new OSM user"""
42 self._logger.debug("")
43 self._client.get_token()
44 if not user["projects"] or (
45 len(user["projects"]) == 1 and not user["projects"][0]
46 ):
47 del user["projects"]
48 elif len(user["projects"]) == 1:
49 user["projects"] = user["projects"][0].split(",")
50
51 if user["project_role_mappings"]:
52 project_role_mappings = []
53
54 for set_mapping in user["project_role_mappings"]:
55 set_mapping_clean = [m.strip() for m in set_mapping.split(",")]
56 project, roles = set_mapping_clean[0], set_mapping_clean[1:]
57
58 for role in roles:
59 mapping = {"project": project, "role": role}
60
61 if mapping not in project_role_mappings:
62 project_role_mappings.append(mapping)
63 user["project_role_mappings"] = project_role_mappings
64 else:
65 del user["project_role_mappings"]
66
67 http_code, resp = self._http.post_cmd(
68 endpoint=self._apiBase, postfields_dict=user, skip_query_admin=True
69 )
70 # print('HTTP CODE: {}'.format(http_code))
71 # print('RESP: {}'.format(resp))
72 # if http_code in (200, 201, 202, 204):
73 if resp:
74 resp = json.loads(resp)
75 if not resp or "id" not in resp:
76 raise ClientException("unexpected response from server - {}".format(resp))
77 print(resp["id"])
78 # else:
79 # msg = ""
80 # if resp:
81 # try:
82 # msg = json.loads(resp)
83 # except ValueError:
84 # msg = resp
85 # raise ClientException("failed to create user {} - {}".format(name, msg))
86
87 def update(self, name, user, pwd_change=None):
88 """Updates an existing OSM user identified by name"""
89 self._logger.debug("")
90 if pwd_change:
91 self._client.get_token(pwd_change)
92 else:
93 self._client.get_token()
94 # print(user)
95 myuser = self.get(name)
96 update_user = {
97 "add_project_role_mappings": [],
98 "remove_project_role_mappings": [],
99 }
100
101 if not user.get("change_password"):
102 # if password is defined, update the password
103 if user.get("password"):
104 update_user["password"] = user["password"]
105 if user.get("username"):
106 update_user["username"] = user["username"]
107 else:
108 update_user["old_password"] = user["change_password"]
109 update_user["password"] = user["new_password"]
110
111 if user.get("set-project"):
112 # Remove project and insert project role mapping
113 for set_project in user["set-project"]:
114 set_project_clean = [m.strip() for m in set_project.split(",")]
115 project, roles = set_project_clean[0], set_project_clean[1:]
116
117 update_user["remove_project_role_mappings"].append({"project": project})
118
119 for role in roles:
120 mapping = {"project": project, "role": role}
121 update_user["add_project_role_mappings"].append(mapping)
122
123 if user.get("remove-project"):
124 for remove_project in user["remove-project"]:
125 update_user["remove_project_role_mappings"].append(
126 {"project": remove_project}
127 )
128
129 if user.get("add-project-role"):
130 for add_project_role in user["add-project-role"]:
131 add_project_role_clean = [
132 m.strip() for m in add_project_role.split(",")
133 ]
134 project, roles = add_project_role_clean[0], add_project_role_clean[1:]
135
136 for role in roles:
137 mapping = {"project": project, "role": role}
138 update_user["add_project_role_mappings"].append(mapping)
139
140 if user.get("remove-project-role"):
141 for remove_project_role in user["remove-project-role"]:
142 remove_project_role_clean = [
143 m.strip() for m in remove_project_role.split(",")
144 ]
145 project, roles = (
146 remove_project_role_clean[0],
147 remove_project_role_clean[1:],
148 )
149
150 for role in roles:
151 mapping = {"project": project, "role": role}
152 update_user["remove_project_role_mappings"].append(mapping)
153
154 if not update_user["remove_project_role_mappings"]:
155 del update_user["remove_project_role_mappings"]
156 if not update_user["add_project_role_mappings"]:
157 del update_user["add_project_role_mappings"]
158 if not update_user:
159 raise ClientException("At least something should be changed.")
160
161 http_code, resp = self._http.patch_cmd(
162 endpoint="{}/{}".format(self._apiBase, myuser["_id"]),
163 postfields_dict=update_user,
164 skip_query_admin=True,
165 )
166 # print('HTTP CODE: {}'.format(http_code))
167 # print('RESP: {}'.format(resp))
168 if http_code in (200, 201, 202):
169 if resp:
170 resp = json.loads(resp)
171 if not resp or "id" not in resp:
172 raise ClientException(
173 "unexpected response from server - {}".format(resp)
174 )
175 print(resp["id"])
176 elif http_code == 204:
177 print("Updated")
178 # else:
179 # msg = ""
180 # if resp:
181 # try:
182 # msg = json.loads(resp)
183 # except ValueError:
184 # msg = resp
185 # raise ClientException("failed to update user {} - {}".format(name, msg))
186
187 def delete(self, name, force=False):
188 """Deletes an existing OSM user identified by name"""
189 self._logger.debug("")
190 self._client.get_token()
191 user = self.get(name)
192 querystring = ""
193 if force:
194 querystring = "?FORCE=True"
195 http_code, resp = self._http.delete_cmd(
196 "{}/{}{}".format(self._apiBase, user["_id"], querystring),
197 skip_query_admin=True,
198 )
199 # print('HTTP CODE: {}'.format(http_code))
200 # print('RESP: {}'.format(resp))
201 if http_code == 202:
202 print("Deletion in progress")
203 elif http_code == 204:
204 print("Deleted")
205 elif resp and "result" in resp:
206 print("Deleted")
207 else:
208 msg = resp or ""
209 # if resp:
210 # try:
211 # msg = json.loads(resp)
212 # except ValueError:
213 # msg = resp
214 raise ClientException("failed to delete user {} - {}".format(name, msg))
215
216 def list(self, filter=None):
217 """Returns the list of OSM users"""
218 self._logger.debug("")
219 self._client.get_token()
220 filter_string = ""
221 if filter:
222 filter_string = "?{}".format(filter)
223 _, resp = self._http.get2_cmd(
224 "{}{}".format(self._apiBase, filter_string), skip_query_admin=True
225 )
226 # print('RESP: {}'.format(resp))
227 if resp:
228 return json.loads(resp)
229 return list()
230
231 def get(self, name):
232 """Returns an OSM user based on name or id"""
233 self._logger.debug("")
234 self._client.get_token()
235 # keystone with external LDAP contains large ids, not uuid format
236 # utils.validate_uuid4(name) cannot be used
237 user_list = self.list()
238 for user in user_list:
239 if name == user["_id"]:
240 return user
241 for user in user_list:
242 if name == user["username"]:
243 return user
244 raise NotFound("User {} not found".format(name))