Code Coverage

Cobertura Coverage Report > osmclient.sol005 >

user.py

Trend

File Coverage summary

NameClassesLinesConditionals
user.py
100%
1/1
8%
12/143
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
user.py
8%
12/143
N/A

Source

osmclient/sol005/user.py
1 #
2 # Copyright 2018 Telefonica Investigacion y Desarrollo S.A.U.
3 #
4 # All Rights Reserved.
5 #
6 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
7 #    not use this file except in compliance with the License. You may obtain
8 #    a copy of the License at
9 #
10 #         http://www.apache.org/licenses/LICENSE-2.0
11 #
12 #    Unless required by applicable law or agreed to in writing, software
13 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 #    License for the specific language governing permissions and limitations
16 #    under the License.
17
18 1 """
19 OSM user mgmt API
20 """
21
22 1 from osmclient.common.exceptions import ClientException
23 1 from osmclient.common.exceptions import NotFound
24 1 import json
25 1 import logging
26
27
28 1 class User(object):
29 1     def __init__(self, http=None, client=None):
30 0         self._http = http
31 0         self._client = client
32 0         self._logger = logging.getLogger("osmclient")
33 0         self._apiName = "/admin"
34 0         self._apiVersion = "/v1"
35 0         self._apiResource = "/users"
36 0         self._apiBase = "{}{}{}".format(
37             self._apiName, self._apiVersion, self._apiResource
38         )
39
40 1     def create(self, name, user):
41         """Creates a new OSM user"""
42 0         self._logger.debug("")
43 0         self._client.get_token()
44 0         if not user["projects"] or (
45             len(user["projects"]) == 1 and not user["projects"][0]
46         ):
47 0             del user["projects"]
48 0         elif len(user["projects"]) == 1:
49 0             user["projects"] = user["projects"][0].split(",")
50
51 0         if user["project_role_mappings"]:
52 0             project_role_mappings = []
53
54 0             for set_mapping in user["project_role_mappings"]:
55 0                 set_mapping_clean = [m.strip() for m in set_mapping.split(",")]
56 0                 project, roles = set_mapping_clean[0], set_mapping_clean[1:]
57
58 0                 for role in roles:
59 0                     mapping = {"project": project, "role": role}
60
61 0                     if mapping not in project_role_mappings:
62 0                         project_role_mappings.append(mapping)
63 0             user["project_role_mappings"] = project_role_mappings
64         else:
65 0             del user["project_role_mappings"]
66
67 0         http_code, resp = self._http.post_cmd(
68             endpoint=self._apiBase, postfields_dict=user, skip_query_admin=True
69         )
70         # print('HTTP CODE: {}'.format(http_code))
71         # print('RESP: {}'.format(resp))
72         # if http_code in (200, 201, 202, 204):
73 0         if resp:
74 0             resp = json.loads(resp)
75 0         if not resp or "id" not in resp:
76 0             raise ClientException("unexpected response from server - {}".format(resp))
77 0         print(resp["id"])
78         # else:
79         #    msg = ""
80         #    if resp:
81         #        try:
82         #            msg = json.loads(resp)
83         #        except ValueError:
84         #            msg = resp
85         #    raise ClientException("failed to create user {} - {}".format(name, msg))
86
87 1     def update(self, name, user, pwd_change=False):
88         """Updates an existing OSM user identified by name"""
89 0         self._logger.debug("")
90 0         if pwd_change:
91 0             token_info = self._client.get_token(pwd_change)
92         else:
93 0             token_info = self._client.get_token()
94         # print(user)
95 0         myuser = self.get(name)
96 0         update_user = {
97             "add_project_role_mappings": [],
98             "remove_project_role_mappings": [],
99         }
100
101 0         if user.get("username"):
102 0             update_user["username"] = user["username"]
103 0         if user.get("new_password"):
104 0             update_user["password"] = user["new_password"]
105 0         if pwd_change and user.get("current_password"):
106 0             update_user["old_password"] = user["current_password"]
107
108 0         if user.get("set-project"):
109             # Remove project and insert project role mapping
110 0             for set_project in user["set-project"]:
111 0                 set_project_clean = [m.strip() for m in set_project.split(",")]
112 0                 project, roles = set_project_clean[0], set_project_clean[1:]
113
114 0                 update_user["remove_project_role_mappings"].append({"project": project})
115
116 0                 for role in roles:
117 0                     mapping = {"project": project, "role": role}
118 0                     update_user["add_project_role_mappings"].append(mapping)
119
120 0         if user.get("remove-project"):
121 0             for remove_project in user["remove-project"]:
122 0                 update_user["remove_project_role_mappings"].append(
123                     {"project": remove_project}
124                 )
125
126 0         if user.get("add-project-role"):
127 0             for add_project_role in user["add-project-role"]:
128 0                 add_project_role_clean = [
129                     m.strip() for m in add_project_role.split(",")
130                 ]
131 0                 project, roles = add_project_role_clean[0], add_project_role_clean[1:]
132
133 0                 for role in roles:
134 0                     mapping = {"project": project, "role": role}
135 0                     update_user["add_project_role_mappings"].append(mapping)
136
137 0         if user.get("remove-project-role"):
138 0             for remove_project_role in user["remove-project-role"]:
139 0                 remove_project_role_clean = [
140                     m.strip() for m in remove_project_role.split(",")
141                 ]
142 0                 project, roles = (
143                     remove_project_role_clean[0],
144                     remove_project_role_clean[1:],
145                 )
146
147 0                 for role in roles:
148 0                     mapping = {"project": project, "role": role}
149 0                     update_user["remove_project_role_mappings"].append(mapping)
150
151 0         if user.get("unlock"):
152 0             if token_info.get("admin_show"):
153 0                 update_user["unlock"] = user["unlock"]
154 0                 update_user["system_admin_id"] = token_info.get("user_id")
155             else:
156 0                 raise ClientException(
157                     "{} does not have privilege to unlock {}".format(
158                         token_info.get("username"), myuser.get("username")
159                     )
160                 )
161
162 0         if user.get("renew"):
163 0             if token_info.get("admin_show"):
164 0                 update_user["renew"] = user["renew"]
165 0                 update_user["system_admin_id"] = token_info.get("user_id")
166             else:
167 0                 raise ClientException(
168                     "{} does not have privilege to renew {}".format(
169                         token_info.get("username"), myuser.get("username")
170                     )
171                 )
172
173 0         if not update_user["remove_project_role_mappings"]:
174 0             del update_user["remove_project_role_mappings"]
175 0         if not update_user["add_project_role_mappings"]:
176 0             del update_user["add_project_role_mappings"]
177 0         if not update_user:
178 0             raise ClientException("At least something should be changed.")
179
180 0         http_code, resp = self._http.patch_cmd(
181             endpoint="{}/{}".format(self._apiBase, myuser["_id"]),
182             postfields_dict=update_user,
183             skip_query_admin=True,
184         )
185         # print('HTTP CODE: {}'.format(http_code))
186         # print('RESP: {}'.format(resp))
187 0         if http_code in (200, 201, 202):
188 0             if resp:
189 0                 resp = json.loads(resp)
190 0             if not resp or "id" not in resp:
191 0                 raise ClientException(
192                     "unexpected response from server - {}".format(resp)
193                 )
194 0             print(resp["id"])
195 0         elif http_code == 204:
196 0             print("Updated")
197         # else:
198         #    msg = ""
199         #    if resp:
200         #        try:
201         #            msg = json.loads(resp)
202         #        except ValueError:
203         #            msg = resp
204         #    raise ClientException("failed to update user {} - {}".format(name, msg))
205
206 1     def delete(self, name, force=False):
207         """Deletes an existing OSM user identified by name"""
208 0         self._logger.debug("")
209 0         self._client.get_token()
210 0         user = self.get(name)
211 0         querystring = ""
212 0         if force:
213 0             querystring = "?FORCE=True"
214 0         http_code, resp = self._http.delete_cmd(
215             "{}/{}{}".format(self._apiBase, user["_id"], querystring),
216             skip_query_admin=True,
217         )
218         # print('HTTP CODE: {}'.format(http_code))
219         # print('RESP: {}'.format(resp))
220 0         if http_code == 202:
221 0             print("Deletion in progress")
222 0         elif http_code == 204:
223 0             print("Deleted")
224 0         elif resp and "result" in resp:
225 0             print("Deleted")
226         else:
227 0             msg = resp or ""
228             # if resp:
229             #     try:
230             #         msg = json.loads(resp)
231             #     except ValueError:
232             #         msg = resp
233 0             raise ClientException("failed to delete user {} - {}".format(name, msg))
234
235 1     def list(self, filter=None):
236         """Returns the list of OSM users"""
237 0         self._logger.debug("")
238 0         response = self._client.get_token()
239 0         admin_show = None
240 0         if response:
241 0             admin_show = response.get("admin_show")
242
243 0         filter_string = ""
244 0         if filter:
245 0             filter_string = "?{}".format(filter)
246 0         _, resp = self._http.get2_cmd(
247             "{}{}".format(self._apiBase, filter_string), skip_query_admin=True
248         )
249         # print('RESP: {}'.format(resp))
250 0         if resp and response:
251 0             return json.loads(resp), admin_show
252 0         elif resp:
253 0             return json.loads(resp)
254 0         return list()
255
256 1     def get(self, name):
257         """Returns an OSM user based on name or id"""
258 0         self._logger.debug("")
259 0         self._client.get_token()
260         # keystone with external LDAP contains large ids, not uuid format
261         # utils.validate_uuid4(name) cannot be used
262 0         user_list = self.list()
263 0         for user in user_list:
264 0             if name == user["_id"]:
265 0                 return user
266 0         for user in user_list:
267 0             if name == user["username"]:
268 0                 return user
269 0         raise NotFound("User {} not found".format(name))