Code Coverage

Cobertura Coverage Report > osmclient.sol005 >

user.py

Trend

Classes100%
 
Lines10%
   
Conditionals100%
 

File Coverage summary

NameClassesLinesConditionals
user.py
100%
1/1
10%
12/124
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
user.py
10%
12/124
N/A

Source

osmclient/sol005/user.py
1 #
2 # Copyright 2018 Telefonica Investigacion y Desarrollo S.A.U.
3 #
4 # All Rights Reserved.
5 #
6 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
7 #    not use this file except in compliance with the License. You may obtain
8 #    a copy of the License at
9 #
10 #         http://www.apache.org/licenses/LICENSE-2.0
11 #
12 #    Unless required by applicable law or agreed to in writing, software
13 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 #    License for the specific language governing permissions and limitations
16 #    under the License.
17
18 1 """
19 OSM user mgmt API
20 """
21
22 1 from osmclient.common.exceptions import ClientException
23 1 from osmclient.common.exceptions import NotFound
24 1 import json
25 1 import logging
26
27
28 1 class User(object):
29 1     def __init__(self, http=None, client=None):
30 0         self._http = http
31 0         self._client = client
32 0         self._logger = logging.getLogger("osmclient")
33 0         self._apiName = "/admin"
34 0         self._apiVersion = "/v1"
35 0         self._apiResource = "/users"
36 0         self._apiBase = "{}{}{}".format(
37             self._apiName, self._apiVersion, self._apiResource
38         )
39
40 1     def create(self, name, user):
41         """Creates a new OSM user"""
42 0         self._logger.debug("")
43 0         self._client.get_token()
44 0         if not user["projects"] or (
45             len(user["projects"]) == 1 and not user["projects"][0]
46         ):
47 0             del user["projects"]
48 0         elif len(user["projects"]) == 1:
49 0             user["projects"] = user["projects"][0].split(",")
50
51 0         if user["project_role_mappings"]:
52 0             project_role_mappings = []
53
54 0             for set_mapping in user["project_role_mappings"]:
55 0                 set_mapping_clean = [m.strip() for m in set_mapping.split(",")]
56 0                 project, roles = set_mapping_clean[0], set_mapping_clean[1:]
57
58 0                 for role in roles:
59 0                     mapping = {"project": project, "role": role}
60
61 0                     if mapping not in project_role_mappings:
62 0                         project_role_mappings.append(mapping)
63 0             user["project_role_mappings"] = project_role_mappings
64         else:
65 0             del user["project_role_mappings"]
66
67 0         http_code, resp = self._http.post_cmd(
68             endpoint=self._apiBase, postfields_dict=user, skip_query_admin=True
69         )
70         # print('HTTP CODE: {}'.format(http_code))
71         # print('RESP: {}'.format(resp))
72         # if http_code in (200, 201, 202, 204):
73 0         if resp:
74 0             resp = json.loads(resp)
75 0         if not resp or "id" not in resp:
76 0             raise ClientException("unexpected response from server - {}".format(resp))
77 0         print(resp["id"])
78         # else:
79         #    msg = ""
80         #    if resp:
81         #        try:
82         #            msg = json.loads(resp)
83         #        except ValueError:
84         #            msg = resp
85         #    raise ClientException("failed to create user {} - {}".format(name, msg))
86
87 1     def update(self, name, user):
88         """Updates an existing OSM user identified by name"""
89 0         self._logger.debug("")
90 0         self._client.get_token()
91         # print(user)
92 0         myuser = self.get(name)
93 0         update_user = {
94             "add_project_role_mappings": [],
95             "remove_project_role_mappings": [],
96         }
97
98         # if password is defined, update the password
99 0         if user["password"]:
100 0             update_user["password"] = user["password"]
101 0         if user["username"]:
102 0             update_user["username"] = user["username"]
103
104 0         if user["set-project"]:
105             # Remove project and insert project role mapping
106 0             for set_project in user["set-project"]:
107
108 0                 set_project_clean = [m.strip() for m in set_project.split(",")]
109 0                 project, roles = set_project_clean[0], set_project_clean[1:]
110
111 0                 update_user["remove_project_role_mappings"].append({"project": project})
112
113 0                 for role in roles:
114 0                     mapping = {"project": project, "role": role}
115 0                     update_user["add_project_role_mappings"].append(mapping)
116
117 0         if user["remove-project"]:
118 0             for remove_project in user["remove-project"]:
119 0                 update_user["remove_project_role_mappings"].append(
120                     {"project": remove_project}
121                 )
122
123 0         if user["add-project-role"]:
124 0             for add_project_role in user["add-project-role"]:
125 0                 add_project_role_clean = [
126                     m.strip() for m in add_project_role.split(",")
127                 ]
128 0                 project, roles = add_project_role_clean[0], add_project_role_clean[1:]
129
130 0                 for role in roles:
131 0                     mapping = {"project": project, "role": role}
132 0                     update_user["add_project_role_mappings"].append(mapping)
133
134 0         if user["remove-project-role"]:
135 0             for remove_project_role in user["remove-project-role"]:
136 0                 remove_project_role_clean = [
137                     m.strip() for m in remove_project_role.split(",")
138                 ]
139 0                 project, roles = (
140                     remove_project_role_clean[0],
141                     remove_project_role_clean[1:],
142                 )
143
144 0                 for role in roles:
145 0                     mapping = {"project": project, "role": role}
146 0                     update_user["remove_project_role_mappings"].append(mapping)
147
148 0         if not update_user["remove_project_role_mappings"]:
149 0             del update_user["remove_project_role_mappings"]
150 0         if not update_user["add_project_role_mappings"]:
151 0             del update_user["add_project_role_mappings"]
152 0         if not update_user:
153 0             raise ClientException("At least something should be changed.")
154
155 0         http_code, resp = self._http.patch_cmd(
156             endpoint="{}/{}".format(self._apiBase, myuser["_id"]),
157             postfields_dict=update_user,
158             skip_query_admin=True,
159         )
160         # print('HTTP CODE: {}'.format(http_code))
161         # print('RESP: {}'.format(resp))
162 0         if http_code in (200, 201, 202):
163 0             if resp:
164 0                 resp = json.loads(resp)
165 0             if not resp or "id" not in resp:
166 0                 raise ClientException(
167                     "unexpected response from server - {}".format(resp)
168                 )
169 0             print(resp["id"])
170 0         elif http_code == 204:
171 0             print("Updated")
172         # else:
173         #    msg = ""
174         #    if resp:
175         #        try:
176         #            msg = json.loads(resp)
177         #        except ValueError:
178         #            msg = resp
179         #    raise ClientException("failed to update user {} - {}".format(name, msg))
180
181 1     def delete(self, name, force=False):
182         """Deletes an existing OSM user identified by name"""
183 0         self._logger.debug("")
184 0         self._client.get_token()
185 0         user = self.get(name)
186 0         querystring = ""
187 0         if force:
188 0             querystring = "?FORCE=True"
189 0         http_code, resp = self._http.delete_cmd(
190             "{}/{}{}".format(self._apiBase, user["_id"], querystring),
191             skip_query_admin=True,
192         )
193         # print('HTTP CODE: {}'.format(http_code))
194         # print('RESP: {}'.format(resp))
195 0         if http_code == 202:
196 0             print("Deletion in progress")
197 0         elif http_code == 204:
198 0             print("Deleted")
199 0         elif resp and "result" in resp:
200 0             print("Deleted")
201         else:
202 0             msg = resp or ""
203             # if resp:
204             #     try:
205             #         msg = json.loads(resp)
206             #     except ValueError:
207             #         msg = resp
208 0             raise ClientException("failed to delete user {} - {}".format(name, msg))
209
210 1     def list(self, filter=None):
211         """Returns the list of OSM users"""
212 0         self._logger.debug("")
213 0         self._client.get_token()
214 0         filter_string = ""
215 0         if filter:
216 0             filter_string = "?{}".format(filter)
217 0         _, resp = self._http.get2_cmd(
218             "{}{}".format(self._apiBase, filter_string), skip_query_admin=True
219         )
220         # print('RESP: {}'.format(resp))
221 0         if resp:
222 0             return json.loads(resp)
223 0         return list()
224
225 1     def get(self, name):
226         """Returns an OSM user based on name or id"""
227 0         self._logger.debug("")
228 0         self._client.get_token()
229         # keystone with external LDAP contains large ids, not uuid format
230         # utils.validate_uuid4(name) cannot be used
231 0         user_list = self.list()
232 0         for user in user_list:
233 0             if name == user["_id"]:
234 0                 return user
235 0         for user in user_list:
236 0             if name == user["username"]:
237 0                 return user
238 0         raise NotFound("User {} not found".format(name))