Coverage for osmclient/sol005/user.py: 8%

143 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2024-07-03 09:50 +0000

1# 

2# Copyright 2018 Telefonica Investigacion y Desarrollo S.A.U. 

3# 

4# All Rights Reserved. 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); you may 

7# not use this file except in compliance with the License. You may obtain 

8# a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

15# License for the specific language governing permissions and limitations 

16# under the License. 

17 

18""" 

19OSM user mgmt API 

20""" 

21 

22from osmclient.common.exceptions import ClientException 

23from osmclient.common.exceptions import NotFound 

24import json 

25import logging 

26 

27 

28class User(object): 

29 def __init__(self, http=None, client=None): 

30 self._http = http 

31 self._client = client 

32 self._logger = logging.getLogger("osmclient") 

33 self._apiName = "/admin" 

34 self._apiVersion = "/v1" 

35 self._apiResource = "/users" 

36 self._apiBase = "{}{}{}".format( 

37 self._apiName, self._apiVersion, self._apiResource 

38 ) 

39 

40 def create(self, name, user): 

41 """Creates a new OSM user""" 

42 self._logger.debug("") 

43 self._client.get_token() 

44 if not user["projects"] or ( 

45 len(user["projects"]) == 1 and not user["projects"][0] 

46 ): 

47 del user["projects"] 

48 elif len(user["projects"]) == 1: 

49 user["projects"] = user["projects"][0].split(",") 

50 

51 if user["project_role_mappings"]: 

52 project_role_mappings = [] 

53 

54 for set_mapping in user["project_role_mappings"]: 

55 set_mapping_clean = [m.strip() for m in set_mapping.split(",")] 

56 project, roles = set_mapping_clean[0], set_mapping_clean[1:] 

57 

58 for role in roles: 

59 mapping = {"project": project, "role": role} 

60 

61 if mapping not in project_role_mappings: 

62 project_role_mappings.append(mapping) 

63 user["project_role_mappings"] = project_role_mappings 

64 else: 

65 del user["project_role_mappings"] 

66 

67 http_code, resp = self._http.post_cmd( 

68 endpoint=self._apiBase, postfields_dict=user, skip_query_admin=True 

69 ) 

70 # print('HTTP CODE: {}'.format(http_code)) 

71 # print('RESP: {}'.format(resp)) 

72 # if http_code in (200, 201, 202, 204): 

73 if resp: 

74 resp = json.loads(resp) 

75 if not resp or "id" not in resp: 

76 raise ClientException("unexpected response from server - {}".format(resp)) 

77 print(resp["id"]) 

78 # else: 

79 # msg = "" 

80 # if resp: 

81 # try: 

82 # msg = json.loads(resp) 

83 # except ValueError: 

84 # msg = resp 

85 # raise ClientException("failed to create user {} - {}".format(name, msg)) 

86 

87 def update(self, name, user, pwd_change=False): 

88 """Updates an existing OSM user identified by name""" 

89 self._logger.debug("") 

90 if pwd_change: 

91 token_info = self._client.get_token(pwd_change) 

92 else: 

93 token_info = self._client.get_token() 

94 # print(user) 

95 myuser = self.get(name) 

96 update_user = { 

97 "add_project_role_mappings": [], 

98 "remove_project_role_mappings": [], 

99 } 

100 

101 if user.get("username"): 

102 update_user["username"] = user["username"] 

103 if user.get("new_password"): 

104 update_user["password"] = user["new_password"] 

105 if pwd_change and user.get("current_password"): 

106 update_user["old_password"] = user["current_password"] 

107 

108 if user.get("set-project"): 

109 # Remove project and insert project role mapping 

110 for set_project in user["set-project"]: 

111 set_project_clean = [m.strip() for m in set_project.split(",")] 

112 project, roles = set_project_clean[0], set_project_clean[1:] 

113 

114 update_user["remove_project_role_mappings"].append({"project": project}) 

115 

116 for role in roles: 

117 mapping = {"project": project, "role": role} 

118 update_user["add_project_role_mappings"].append(mapping) 

119 

120 if user.get("remove-project"): 

121 for remove_project in user["remove-project"]: 

122 update_user["remove_project_role_mappings"].append( 

123 {"project": remove_project} 

124 ) 

125 

126 if user.get("add-project-role"): 

127 for add_project_role in user["add-project-role"]: 

128 add_project_role_clean = [ 

129 m.strip() for m in add_project_role.split(",") 

130 ] 

131 project, roles = add_project_role_clean[0], add_project_role_clean[1:] 

132 

133 for role in roles: 

134 mapping = {"project": project, "role": role} 

135 update_user["add_project_role_mappings"].append(mapping) 

136 

137 if user.get("remove-project-role"): 

138 for remove_project_role in user["remove-project-role"]: 

139 remove_project_role_clean = [ 

140 m.strip() for m in remove_project_role.split(",") 

141 ] 

142 project, roles = ( 

143 remove_project_role_clean[0], 

144 remove_project_role_clean[1:], 

145 ) 

146 

147 for role in roles: 

148 mapping = {"project": project, "role": role} 

149 update_user["remove_project_role_mappings"].append(mapping) 

150 

151 if user.get("unlock"): 

152 if token_info.get("admin_show"): 

153 update_user["unlock"] = user["unlock"] 

154 update_user["system_admin_id"] = token_info.get("user_id") 

155 else: 

156 raise ClientException( 

157 "{} does not have privilege to unlock {}".format( 

158 token_info.get("username"), myuser.get("username") 

159 ) 

160 ) 

161 

162 if user.get("renew"): 

163 if token_info.get("admin_show"): 

164 update_user["renew"] = user["renew"] 

165 update_user["system_admin_id"] = token_info.get("user_id") 

166 else: 

167 raise ClientException( 

168 "{} does not have privilege to renew {}".format( 

169 token_info.get("username"), myuser.get("username") 

170 ) 

171 ) 

172 

173 if not update_user["remove_project_role_mappings"]: 

174 del update_user["remove_project_role_mappings"] 

175 if not update_user["add_project_role_mappings"]: 

176 del update_user["add_project_role_mappings"] 

177 if not update_user: 

178 raise ClientException("At least something should be changed.") 

179 

180 http_code, resp = self._http.patch_cmd( 

181 endpoint="{}/{}".format(self._apiBase, myuser["_id"]), 

182 postfields_dict=update_user, 

183 skip_query_admin=True, 

184 ) 

185 # print('HTTP CODE: {}'.format(http_code)) 

186 # print('RESP: {}'.format(resp)) 

187 if http_code in (200, 201, 202): 

188 if resp: 

189 resp = json.loads(resp) 

190 if not resp or "id" not in resp: 

191 raise ClientException( 

192 "unexpected response from server - {}".format(resp) 

193 ) 

194 print(resp["id"]) 

195 elif http_code == 204: 

196 print("Updated") 

197 # else: 

198 # msg = "" 

199 # if resp: 

200 # try: 

201 # msg = json.loads(resp) 

202 # except ValueError: 

203 # msg = resp 

204 # raise ClientException("failed to update user {} - {}".format(name, msg)) 

205 

206 def delete(self, name, force=False): 

207 """Deletes an existing OSM user identified by name""" 

208 self._logger.debug("") 

209 self._client.get_token() 

210 user = self.get(name) 

211 querystring = "" 

212 if force: 

213 querystring = "?FORCE=True" 

214 http_code, resp = self._http.delete_cmd( 

215 "{}/{}{}".format(self._apiBase, user["_id"], querystring), 

216 skip_query_admin=True, 

217 ) 

218 # print('HTTP CODE: {}'.format(http_code)) 

219 # print('RESP: {}'.format(resp)) 

220 if http_code == 202: 

221 print("Deletion in progress") 

222 elif http_code == 204: 

223 print("Deleted") 

224 elif resp and "result" in resp: 

225 print("Deleted") 

226 else: 

227 msg = resp or "" 

228 # if resp: 

229 # try: 

230 # msg = json.loads(resp) 

231 # except ValueError: 

232 # msg = resp 

233 raise ClientException("failed to delete user {} - {}".format(name, msg)) 

234 

235 def list(self, filter=None): 

236 """Returns the list of OSM users""" 

237 self._logger.debug("") 

238 response = self._client.get_token() 

239 admin_show = None 

240 if response: 

241 admin_show = response.get("admin_show") 

242 

243 filter_string = "" 

244 if filter: 

245 filter_string = "?{}".format(filter) 

246 _, resp = self._http.get2_cmd( 

247 "{}{}".format(self._apiBase, filter_string), skip_query_admin=True 

248 ) 

249 # print('RESP: {}'.format(resp)) 

250 if resp and response: 

251 return json.loads(resp), admin_show 

252 elif resp: 

253 return json.loads(resp) 

254 return list() 

255 

256 def get(self, name): 

257 """Returns an OSM user based on name or id""" 

258 self._logger.debug("") 

259 self._client.get_token() 

260 # keystone with external LDAP contains large ids, not uuid format 

261 # utils.validate_uuid4(name) cannot be used 

262 user_list = self.list() 

263 for user in user_list: 

264 if name == user["_id"]: 

265 return user 

266 for user in user_list: 

267 if name == user["username"]: 

268 return user 

269 raise NotFound("User {} not found".format(name))