Code Coverage

Cobertura Coverage Report > osmclient.sol005 >

user.py

Trend

Classes100%
 
Lines9%
   
Conditionals100%
 

File Coverage summary

NameClassesLinesConditionals
user.py
100%
1/1
9%
12/129
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
user.py
9%
12/129
N/A

Source

osmclient/sol005/user.py
1 #
2 # Copyright 2018 Telefonica Investigacion y Desarrollo S.A.U.
3 #
4 # All Rights Reserved.
5 #
6 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
7 #    not use this file except in compliance with the License. You may obtain
8 #    a copy of the License at
9 #
10 #         http://www.apache.org/licenses/LICENSE-2.0
11 #
12 #    Unless required by applicable law or agreed to in writing, software
13 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 #    License for the specific language governing permissions and limitations
16 #    under the License.
17
18 1 """
19 OSM user mgmt API
20 """
21
22 1 from osmclient.common.exceptions import ClientException
23 1 from osmclient.common.exceptions import NotFound
24 1 import json
25 1 import logging
26
27
28 1 class User(object):
29 1     def __init__(self, http=None, client=None):
30 0         self._http = http
31 0         self._client = client
32 0         self._logger = logging.getLogger("osmclient")
33 0         self._apiName = "/admin"
34 0         self._apiVersion = "/v1"
35 0         self._apiResource = "/users"
36 0         self._apiBase = "{}{}{}".format(
37             self._apiName, self._apiVersion, self._apiResource
38         )
39
40 1     def create(self, name, user):
41         """Creates a new OSM user"""
42 0         self._logger.debug("")
43 0         self._client.get_token()
44 0         if not user["projects"] or (
45             len(user["projects"]) == 1 and not user["projects"][0]
46         ):
47 0             del user["projects"]
48 0         elif len(user["projects"]) == 1:
49 0             user["projects"] = user["projects"][0].split(",")
50
51 0         if user["project_role_mappings"]:
52 0             project_role_mappings = []
53
54 0             for set_mapping in user["project_role_mappings"]:
55 0                 set_mapping_clean = [m.strip() for m in set_mapping.split(",")]
56 0                 project, roles = set_mapping_clean[0], set_mapping_clean[1:]
57
58 0                 for role in roles:
59 0                     mapping = {"project": project, "role": role}
60
61 0                     if mapping not in project_role_mappings:
62 0                         project_role_mappings.append(mapping)
63 0             user["project_role_mappings"] = project_role_mappings
64         else:
65 0             del user["project_role_mappings"]
66
67 0         http_code, resp = self._http.post_cmd(
68             endpoint=self._apiBase, postfields_dict=user, skip_query_admin=True
69         )
70         # print('HTTP CODE: {}'.format(http_code))
71         # print('RESP: {}'.format(resp))
72         # if http_code in (200, 201, 202, 204):
73 0         if resp:
74 0             resp = json.loads(resp)
75 0         if not resp or "id" not in resp:
76 0             raise ClientException("unexpected response from server - {}".format(resp))
77 0         print(resp["id"])
78         # else:
79         #    msg = ""
80         #    if resp:
81         #        try:
82         #            msg = json.loads(resp)
83         #        except ValueError:
84         #            msg = resp
85         #    raise ClientException("failed to create user {} - {}".format(name, msg))
86
87 1     def update(self, name, user, pwd_change=None):
88         """Updates an existing OSM user identified by name"""
89 0         self._logger.debug("")
90 0         if pwd_change:
91 0             self._client.get_token(pwd_change)
92         else:
93 0             self._client.get_token()
94         # print(user)
95 0         myuser = self.get(name)
96 0         update_user = {
97             "add_project_role_mappings": [],
98             "remove_project_role_mappings": [],
99         }
100
101 0         if not user.get("change_password"):
102             # if password is defined, update the password
103 0             if user.get("password"):
104 0                 update_user["password"] = user["password"]
105 0             if user.get("username"):
106 0                 update_user["username"] = user["username"]
107         else:
108 0             update_user["old_password"] = user["change_password"]
109 0             update_user["password"] = user["new_password"]
110
111 0         if user.get("set-project"):
112             # Remove project and insert project role mapping
113 0             for set_project in user["set-project"]:
114 0                 set_project_clean = [m.strip() for m in set_project.split(",")]
115 0                 project, roles = set_project_clean[0], set_project_clean[1:]
116
117 0                 update_user["remove_project_role_mappings"].append({"project": project})
118
119 0                 for role in roles:
120 0                     mapping = {"project": project, "role": role}
121 0                     update_user["add_project_role_mappings"].append(mapping)
122
123 0         if user.get("remove-project"):
124 0             for remove_project in user["remove-project"]:
125 0                 update_user["remove_project_role_mappings"].append(
126                     {"project": remove_project}
127                 )
128
129 0         if user.get("add-project-role"):
130 0             for add_project_role in user["add-project-role"]:
131 0                 add_project_role_clean = [
132                     m.strip() for m in add_project_role.split(",")
133                 ]
134 0                 project, roles = add_project_role_clean[0], add_project_role_clean[1:]
135
136 0                 for role in roles:
137 0                     mapping = {"project": project, "role": role}
138 0                     update_user["add_project_role_mappings"].append(mapping)
139
140 0         if user.get("remove-project-role"):
141 0             for remove_project_role in user["remove-project-role"]:
142 0                 remove_project_role_clean = [
143                     m.strip() for m in remove_project_role.split(",")
144                 ]
145 0                 project, roles = (
146                     remove_project_role_clean[0],
147                     remove_project_role_clean[1:],
148                 )
149
150 0                 for role in roles:
151 0                     mapping = {"project": project, "role": role}
152 0                     update_user["remove_project_role_mappings"].append(mapping)
153
154 0         if not update_user["remove_project_role_mappings"]:
155 0             del update_user["remove_project_role_mappings"]
156 0         if not update_user["add_project_role_mappings"]:
157 0             del update_user["add_project_role_mappings"]
158 0         if not update_user:
159 0             raise ClientException("At least something should be changed.")
160
161 0         http_code, resp = self._http.patch_cmd(
162             endpoint="{}/{}".format(self._apiBase, myuser["_id"]),
163             postfields_dict=update_user,
164             skip_query_admin=True,
165         )
166         # print('HTTP CODE: {}'.format(http_code))
167         # print('RESP: {}'.format(resp))
168 0         if http_code in (200, 201, 202):
169 0             if resp:
170 0                 resp = json.loads(resp)
171 0             if not resp or "id" not in resp:
172 0                 raise ClientException(
173                     "unexpected response from server - {}".format(resp)
174                 )
175 0             print(resp["id"])
176 0         elif http_code == 204:
177 0             print("Updated")
178         # else:
179         #    msg = ""
180         #    if resp:
181         #        try:
182         #            msg = json.loads(resp)
183         #        except ValueError:
184         #            msg = resp
185         #    raise ClientException("failed to update user {} - {}".format(name, msg))
186
187 1     def delete(self, name, force=False):
188         """Deletes an existing OSM user identified by name"""
189 0         self._logger.debug("")
190 0         self._client.get_token()
191 0         user = self.get(name)
192 0         querystring = ""
193 0         if force:
194 0             querystring = "?FORCE=True"
195 0         http_code, resp = self._http.delete_cmd(
196             "{}/{}{}".format(self._apiBase, user["_id"], querystring),
197             skip_query_admin=True,
198         )
199         # print('HTTP CODE: {}'.format(http_code))
200         # print('RESP: {}'.format(resp))
201 0         if http_code == 202:
202 0             print("Deletion in progress")
203 0         elif http_code == 204:
204 0             print("Deleted")
205 0         elif resp and "result" in resp:
206 0             print("Deleted")
207         else:
208 0             msg = resp or ""
209             # if resp:
210             #     try:
211             #         msg = json.loads(resp)
212             #     except ValueError:
213             #         msg = resp
214 0             raise ClientException("failed to delete user {} - {}".format(name, msg))
215
216 1     def list(self, filter=None):
217         """Returns the list of OSM users"""
218 0         self._logger.debug("")
219 0         self._client.get_token()
220 0         filter_string = ""
221 0         if filter:
222 0             filter_string = "?{}".format(filter)
223 0         _, resp = self._http.get2_cmd(
224             "{}{}".format(self._apiBase, filter_string), skip_query_admin=True
225         )
226         # print('RESP: {}'.format(resp))
227 0         if resp:
228 0             return json.loads(resp)
229 0         return list()
230
231 1     def get(self, name):
232         """Returns an OSM user based on name or id"""
233 0         self._logger.debug("")
234 0         self._client.get_token()
235         # keystone with external LDAP contains large ids, not uuid format
236         # utils.validate_uuid4(name) cannot be used
237 0         user_list = self.list()
238 0         for user in user_list:
239 0             if name == user["_id"]:
240 0                 return user
241 0         for user in user_list:
242 0             if name == user["username"]:
243 0                 return user
244 0         raise NotFound("User {} not found".format(name))