Feature 10914: Enforce Password change on First login
[osm/osmclient.git] / osmclient / sol005 / user.py
1 #
2 # Copyright 2018 Telefonica Investigacion y Desarrollo S.A.U.
3 #
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17
18 """
19 OSM user mgmt API
20 """
21
22 from osmclient.common.exceptions import ClientException
23 from osmclient.common.exceptions import NotFound
24 import json
25 import logging
26
27
28 class User(object):
29 def __init__(self, http=None, client=None):
30 self._http = http
31 self._client = client
32 self._logger = logging.getLogger("osmclient")
33 self._apiName = "/admin"
34 self._apiVersion = "/v1"
35 self._apiResource = "/users"
36 self._apiBase = "{}{}{}".format(
37 self._apiName, self._apiVersion, self._apiResource
38 )
39
40 def create(self, name, user):
41 """Creates a new OSM user"""
42 self._logger.debug("")
43 self._client.get_token()
44 if not user["projects"] or (
45 len(user["projects"]) == 1 and not user["projects"][0]
46 ):
47 del user["projects"]
48 elif len(user["projects"]) == 1:
49 user["projects"] = user["projects"][0].split(",")
50
51 if user["project_role_mappings"]:
52 project_role_mappings = []
53
54 for set_mapping in user["project_role_mappings"]:
55 set_mapping_clean = [m.strip() for m in set_mapping.split(",")]
56 project, roles = set_mapping_clean[0], set_mapping_clean[1:]
57
58 for role in roles:
59 mapping = {"project": project, "role": role}
60
61 if mapping not in project_role_mappings:
62 project_role_mappings.append(mapping)
63 user["project_role_mappings"] = project_role_mappings
64 else:
65 del user["project_role_mappings"]
66
67 http_code, resp = self._http.post_cmd(
68 endpoint=self._apiBase, postfields_dict=user, skip_query_admin=True
69 )
70 # print('HTTP CODE: {}'.format(http_code))
71 # print('RESP: {}'.format(resp))
72 # if http_code in (200, 201, 202, 204):
73 if resp:
74 resp = json.loads(resp)
75 if not resp or "id" not in resp:
76 raise ClientException("unexpected response from server - {}".format(resp))
77 print(resp["id"])
78 # else:
79 # msg = ""
80 # if resp:
81 # try:
82 # msg = json.loads(resp)
83 # except ValueError:
84 # msg = resp
85 # raise ClientException("failed to create user {} - {}".format(name, msg))
86
87 def update(self, name, user, pwd_change=None):
88 """Updates an existing OSM user identified by name"""
89 self._logger.debug("")
90 if pwd_change:
91 self._client.get_token(pwd_change)
92 else:
93 self._client.get_token()
94 # print(user)
95 myuser = self.get(name)
96 update_user = {
97 "add_project_role_mappings": [],
98 "remove_project_role_mappings": [],
99 }
100
101 if not user.get("change_password"):
102 # if password is defined, update the password
103 if user.get("password"):
104 update_user["password"] = user["password"]
105 if user.get("username"):
106 update_user["username"] = user["username"]
107 else:
108 update_user["old_password"] = user["change_password"]
109 update_user["password"] = user["new_password"]
110
111 if user.get("set-project"):
112 # Remove project and insert project role mapping
113 for set_project in user["set-project"]:
114
115 set_project_clean = [m.strip() for m in set_project.split(",")]
116 project, roles = set_project_clean[0], set_project_clean[1:]
117
118 update_user["remove_project_role_mappings"].append({"project": project})
119
120 for role in roles:
121 mapping = {"project": project, "role": role}
122 update_user["add_project_role_mappings"].append(mapping)
123
124 if user.get("remove-project"):
125 for remove_project in user["remove-project"]:
126 update_user["remove_project_role_mappings"].append(
127 {"project": remove_project}
128 )
129
130 if user.get("add-project-role"):
131 for add_project_role in user["add-project-role"]:
132 add_project_role_clean = [
133 m.strip() for m in add_project_role.split(",")
134 ]
135 project, roles = add_project_role_clean[0], add_project_role_clean[1:]
136
137 for role in roles:
138 mapping = {"project": project, "role": role}
139 update_user["add_project_role_mappings"].append(mapping)
140
141 if user.get("remove-project-role"):
142 for remove_project_role in user["remove-project-role"]:
143 remove_project_role_clean = [
144 m.strip() for m in remove_project_role.split(",")
145 ]
146 project, roles = (
147 remove_project_role_clean[0],
148 remove_project_role_clean[1:],
149 )
150
151 for role in roles:
152 mapping = {"project": project, "role": role}
153 update_user["remove_project_role_mappings"].append(mapping)
154
155 if not update_user["remove_project_role_mappings"]:
156 del update_user["remove_project_role_mappings"]
157 if not update_user["add_project_role_mappings"]:
158 del update_user["add_project_role_mappings"]
159 if not update_user:
160 raise ClientException("At least something should be changed.")
161
162 http_code, resp = self._http.patch_cmd(
163 endpoint="{}/{}".format(self._apiBase, myuser["_id"]),
164 postfields_dict=update_user,
165 skip_query_admin=True,
166 )
167 # print('HTTP CODE: {}'.format(http_code))
168 # print('RESP: {}'.format(resp))
169 if http_code in (200, 201, 202):
170 if resp:
171 resp = json.loads(resp)
172 if not resp or "id" not in resp:
173 raise ClientException(
174 "unexpected response from server - {}".format(resp)
175 )
176 print(resp["id"])
177 elif http_code == 204:
178 print("Updated")
179 # else:
180 # msg = ""
181 # if resp:
182 # try:
183 # msg = json.loads(resp)
184 # except ValueError:
185 # msg = resp
186 # raise ClientException("failed to update user {} - {}".format(name, msg))
187
188 def delete(self, name, force=False):
189 """Deletes an existing OSM user identified by name"""
190 self._logger.debug("")
191 self._client.get_token()
192 user = self.get(name)
193 querystring = ""
194 if force:
195 querystring = "?FORCE=True"
196 http_code, resp = self._http.delete_cmd(
197 "{}/{}{}".format(self._apiBase, user["_id"], querystring),
198 skip_query_admin=True,
199 )
200 # print('HTTP CODE: {}'.format(http_code))
201 # print('RESP: {}'.format(resp))
202 if http_code == 202:
203 print("Deletion in progress")
204 elif http_code == 204:
205 print("Deleted")
206 elif resp and "result" in resp:
207 print("Deleted")
208 else:
209 msg = resp or ""
210 # if resp:
211 # try:
212 # msg = json.loads(resp)
213 # except ValueError:
214 # msg = resp
215 raise ClientException("failed to delete user {} - {}".format(name, msg))
216
217 def list(self, filter=None):
218 """Returns the list of OSM users"""
219 self._logger.debug("")
220 self._client.get_token()
221 filter_string = ""
222 if filter:
223 filter_string = "?{}".format(filter)
224 _, resp = self._http.get2_cmd(
225 "{}{}".format(self._apiBase, filter_string), skip_query_admin=True
226 )
227 # print('RESP: {}'.format(resp))
228 if resp:
229 return json.loads(resp)
230 return list()
231
232 def get(self, name):
233 """Returns an OSM user based on name or id"""
234 self._logger.debug("")
235 self._client.get_token()
236 # keystone with external LDAP contains large ids, not uuid format
237 # utils.validate_uuid4(name) cannot be used
238 user_list = self.list()
239 for user in user_list:
240 if name == user["_id"]:
241 return user
242 for user in user_list:
243 if name == user["username"]:
244 return user
245 raise NotFound("User {} not found".format(name))