Coverage for osmclient/sol005/user.py: 8%
143 statements
« prev ^ index » next coverage.py v7.3.1, created at 2024-07-03 09:50 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2024-07-03 09:50 +0000
1#
2# Copyright 2018 Telefonica Investigacion y Desarrollo S.A.U.
3#
4# All Rights Reserved.
5#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
18"""
19OSM user mgmt API
20"""
22from osmclient.common.exceptions import ClientException
23from osmclient.common.exceptions import NotFound
24import json
25import logging
28class User(object):
29 def __init__(self, http=None, client=None):
30 self._http = http
31 self._client = client
32 self._logger = logging.getLogger("osmclient")
33 self._apiName = "/admin"
34 self._apiVersion = "/v1"
35 self._apiResource = "/users"
36 self._apiBase = "{}{}{}".format(
37 self._apiName, self._apiVersion, self._apiResource
38 )
40 def create(self, name, user):
41 """Creates a new OSM user"""
42 self._logger.debug("")
43 self._client.get_token()
44 if not user["projects"] or (
45 len(user["projects"]) == 1 and not user["projects"][0]
46 ):
47 del user["projects"]
48 elif len(user["projects"]) == 1:
49 user["projects"] = user["projects"][0].split(",")
51 if user["project_role_mappings"]:
52 project_role_mappings = []
54 for set_mapping in user["project_role_mappings"]:
55 set_mapping_clean = [m.strip() for m in set_mapping.split(",")]
56 project, roles = set_mapping_clean[0], set_mapping_clean[1:]
58 for role in roles:
59 mapping = {"project": project, "role": role}
61 if mapping not in project_role_mappings:
62 project_role_mappings.append(mapping)
63 user["project_role_mappings"] = project_role_mappings
64 else:
65 del user["project_role_mappings"]
67 http_code, resp = self._http.post_cmd(
68 endpoint=self._apiBase, postfields_dict=user, skip_query_admin=True
69 )
70 # print('HTTP CODE: {}'.format(http_code))
71 # print('RESP: {}'.format(resp))
72 # if http_code in (200, 201, 202, 204):
73 if resp:
74 resp = json.loads(resp)
75 if not resp or "id" not in resp:
76 raise ClientException("unexpected response from server - {}".format(resp))
77 print(resp["id"])
78 # else:
79 # msg = ""
80 # if resp:
81 # try:
82 # msg = json.loads(resp)
83 # except ValueError:
84 # msg = resp
85 # raise ClientException("failed to create user {} - {}".format(name, msg))
87 def update(self, name, user, pwd_change=False):
88 """Updates an existing OSM user identified by name"""
89 self._logger.debug("")
90 if pwd_change:
91 token_info = self._client.get_token(pwd_change)
92 else:
93 token_info = self._client.get_token()
94 # print(user)
95 myuser = self.get(name)
96 update_user = {
97 "add_project_role_mappings": [],
98 "remove_project_role_mappings": [],
99 }
101 if user.get("username"):
102 update_user["username"] = user["username"]
103 if user.get("new_password"):
104 update_user["password"] = user["new_password"]
105 if pwd_change and user.get("current_password"):
106 update_user["old_password"] = user["current_password"]
108 if user.get("set-project"):
109 # Remove project and insert project role mapping
110 for set_project in user["set-project"]:
111 set_project_clean = [m.strip() for m in set_project.split(",")]
112 project, roles = set_project_clean[0], set_project_clean[1:]
114 update_user["remove_project_role_mappings"].append({"project": project})
116 for role in roles:
117 mapping = {"project": project, "role": role}
118 update_user["add_project_role_mappings"].append(mapping)
120 if user.get("remove-project"):
121 for remove_project in user["remove-project"]:
122 update_user["remove_project_role_mappings"].append(
123 {"project": remove_project}
124 )
126 if user.get("add-project-role"):
127 for add_project_role in user["add-project-role"]:
128 add_project_role_clean = [
129 m.strip() for m in add_project_role.split(",")
130 ]
131 project, roles = add_project_role_clean[0], add_project_role_clean[1:]
133 for role in roles:
134 mapping = {"project": project, "role": role}
135 update_user["add_project_role_mappings"].append(mapping)
137 if user.get("remove-project-role"):
138 for remove_project_role in user["remove-project-role"]:
139 remove_project_role_clean = [
140 m.strip() for m in remove_project_role.split(",")
141 ]
142 project, roles = (
143 remove_project_role_clean[0],
144 remove_project_role_clean[1:],
145 )
147 for role in roles:
148 mapping = {"project": project, "role": role}
149 update_user["remove_project_role_mappings"].append(mapping)
151 if user.get("unlock"):
152 if token_info.get("admin_show"):
153 update_user["unlock"] = user["unlock"]
154 update_user["system_admin_id"] = token_info.get("user_id")
155 else:
156 raise ClientException(
157 "{} does not have privilege to unlock {}".format(
158 token_info.get("username"), myuser.get("username")
159 )
160 )
162 if user.get("renew"):
163 if token_info.get("admin_show"):
164 update_user["renew"] = user["renew"]
165 update_user["system_admin_id"] = token_info.get("user_id")
166 else:
167 raise ClientException(
168 "{} does not have privilege to renew {}".format(
169 token_info.get("username"), myuser.get("username")
170 )
171 )
173 if not update_user["remove_project_role_mappings"]:
174 del update_user["remove_project_role_mappings"]
175 if not update_user["add_project_role_mappings"]:
176 del update_user["add_project_role_mappings"]
177 if not update_user:
178 raise ClientException("At least something should be changed.")
180 http_code, resp = self._http.patch_cmd(
181 endpoint="{}/{}".format(self._apiBase, myuser["_id"]),
182 postfields_dict=update_user,
183 skip_query_admin=True,
184 )
185 # print('HTTP CODE: {}'.format(http_code))
186 # print('RESP: {}'.format(resp))
187 if http_code in (200, 201, 202):
188 if resp:
189 resp = json.loads(resp)
190 if not resp or "id" not in resp:
191 raise ClientException(
192 "unexpected response from server - {}".format(resp)
193 )
194 print(resp["id"])
195 elif http_code == 204:
196 print("Updated")
197 # else:
198 # msg = ""
199 # if resp:
200 # try:
201 # msg = json.loads(resp)
202 # except ValueError:
203 # msg = resp
204 # raise ClientException("failed to update user {} - {}".format(name, msg))
206 def delete(self, name, force=False):
207 """Deletes an existing OSM user identified by name"""
208 self._logger.debug("")
209 self._client.get_token()
210 user = self.get(name)
211 querystring = ""
212 if force:
213 querystring = "?FORCE=True"
214 http_code, resp = self._http.delete_cmd(
215 "{}/{}{}".format(self._apiBase, user["_id"], querystring),
216 skip_query_admin=True,
217 )
218 # print('HTTP CODE: {}'.format(http_code))
219 # print('RESP: {}'.format(resp))
220 if http_code == 202:
221 print("Deletion in progress")
222 elif http_code == 204:
223 print("Deleted")
224 elif resp and "result" in resp:
225 print("Deleted")
226 else:
227 msg = resp or ""
228 # if resp:
229 # try:
230 # msg = json.loads(resp)
231 # except ValueError:
232 # msg = resp
233 raise ClientException("failed to delete user {} - {}".format(name, msg))
235 def list(self, filter=None):
236 """Returns the list of OSM users"""
237 self._logger.debug("")
238 response = self._client.get_token()
239 admin_show = None
240 if response:
241 admin_show = response.get("admin_show")
243 filter_string = ""
244 if filter:
245 filter_string = "?{}".format(filter)
246 _, resp = self._http.get2_cmd(
247 "{}{}".format(self._apiBase, filter_string), skip_query_admin=True
248 )
249 # print('RESP: {}'.format(resp))
250 if resp and response:
251 return json.loads(resp), admin_show
252 elif resp:
253 return json.loads(resp)
254 return list()
256 def get(self, name):
257 """Returns an OSM user based on name or id"""
258 self._logger.debug("")
259 self._client.get_token()
260 # keystone with external LDAP contains large ids, not uuid format
261 # utils.validate_uuid4(name) cannot be used
262 user_list = self.list()
263 for user in user_list:
264 if name == user["_id"]:
265 return user
266 for user in user_list:
267 if name == user["username"]:
268 return user
269 raise NotFound("User {} not found".format(name))