Coverage for osmclient/sol005/role.py: 12%

117 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2024-06-29 09:50 +0000

1# Copyright 2019 Whitestack, LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); you may 

4# not use this file except in compliance with the License. You may obtain 

5# a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

12# License for the specific language governing permissions and limitations 

13# under the License. 

14# 

15# For those usages not covered by the Apache License, Version 2.0 please 

16# contact: esousa@whitestack.com or glavado@whitestack.com 

17## 

18 

19""" 

20OSM role mgmt API 

21""" 

22 

23from osmclient.common import utils 

24from osmclient.common.exceptions import ClientException 

25from osmclient.common.exceptions import NotFound 

26import json 

27import yaml 

28import logging 

29 

30 

31class Role(object): 

32 def __init__(self, http=None, client=None): 

33 self._http = http 

34 self._client = client 

35 self._logger = logging.getLogger("osmclient") 

36 self._apiName = "/admin" 

37 self._apiVersion = "/v1" 

38 self._apiResource = "/roles" 

39 self._apiBase = "{}{}{}".format( 

40 self._apiName, self._apiVersion, self._apiResource 

41 ) 

42 

43 def create(self, name, permissions): 

44 """ 

45 Creates a new OSM role. 

46 

47 :param name: name of the role. 

48 :param permissions: permissions of the role in YAML. 

49 :raises ClientException: when receives an unexpected from the server. 

50 :raises ClientException: when fails creating a role. 

51 """ 

52 self._logger.debug("") 

53 self._client.get_token() 

54 role = {"name": name} 

55 

56 if permissions: 

57 role_permissions = yaml.safe_load(permissions) 

58 

59 if not isinstance(role_permissions, dict): 

60 raise ClientException( 

61 "Role permissions should be provided in a key-value fashion" 

62 ) 

63 

64 for key, value in role_permissions.items(): 

65 if not isinstance(value, bool): 

66 raise ClientException( 

67 "Value of '{}' in a role permissions should be boolean".format( 

68 key 

69 ) 

70 ) 

71 

72 role["permissions"] = role_permissions 

73 

74 http_code, resp = self._http.post_cmd( 

75 endpoint=self._apiBase, postfields_dict=role, skip_query_admin=True 

76 ) 

77 # print('HTTP CODE: {}'.format(http_code)) 

78 # print('RESP: {}'.format(resp)) 

79 # if http_code in (200, 201, 202, 204): 

80 if resp: 

81 resp = json.loads(resp) 

82 if not resp or "id" not in resp: 

83 raise ClientException("Unexpected response from server - {}".format(resp)) 

84 print(resp["id"]) 

85 # else: 

86 # msg = "" 

87 # if resp: 

88 # try: 

89 # msg = json.loads(resp) 

90 # except ValueError: 

91 # msg = resp 

92 # raise ClientException("Failed to create role {} - {}".format(name, msg)) 

93 

94 def update(self, name, new_name, permissions, add=None, remove=None): 

95 """ 

96 Updates an OSM role identified by name. 

97 

98 NOTE: definition and add/remove are mutually exclusive. 

99 

100 :param name: name of the role 

101 :param set_name: if provided, change the name. 

102 :param permissions: if provided, overwrites the existing role specification. NOT IMPLEMENTED 

103 :param add: if provided, adds new rules to the definition. 

104 :param remove: if provided, removes rules from the definition. 

105 :raises ClientException: when receives an unexpected response from the server. 

106 :raises ClientException: when fails updating a role. 

107 """ 

108 self._logger.debug("") 

109 self._client.get_token() 

110 if new_name is None and permissions is None and add is None and remove is None: 

111 raise ClientException("At least one option should be provided") 

112 elif permissions and (add or remove): 

113 raise ClientException("permissions and add/remove are mutually exclusive") 

114 

115 role_obj = self.get(name) 

116 new_role_obj = {"permissions": {}} 

117 if new_name: 

118 new_role_obj["name"] = new_name 

119 

120 if permissions: 

121 role_definition = yaml.safe_load(permissions) 

122 

123 if not isinstance(role_definition, dict): 

124 raise ClientException( 

125 "Role permissions should be provided in a key-value fashion" 

126 ) 

127 

128 for key, value in role_definition.items(): 

129 if not isinstance(value, bool) and value is not None: 

130 raise ClientException( 

131 "Value in a role permissions should be boolean or None to remove" 

132 ) 

133 

134 new_role_obj["permissions"] = role_definition 

135 else: 

136 if remove: 

137 keys_from_remove = yaml.safe_load(remove) 

138 

139 if not isinstance(keys_from_remove, list): 

140 raise ClientException("Keys should be provided in a list fashion") 

141 

142 for key in keys_from_remove: 

143 if not isinstance(key, str): 

144 raise ClientException("Individual keys should be strings") 

145 new_role_obj["permissions"][key] = None 

146 

147 if add: 

148 add_roles = yaml.safe_load(add) 

149 

150 if not isinstance(add_roles, dict): 

151 raise ClientException( 

152 "Add should be provided in a key-value fashion" 

153 ) 

154 

155 for key, value in add_roles.items(): 

156 if not isinstance(value, bool): 

157 raise ClientException( 

158 "Value '{}' in a role permissions should be boolean".format( 

159 key 

160 ) 

161 ) 

162 

163 new_role_obj["permissions"][key] = value 

164 if not new_role_obj["permissions"]: 

165 del new_role_obj["permissions"] 

166 

167 http_code, resp = self._http.patch_cmd( 

168 endpoint="{}/{}".format(self._apiBase, role_obj["_id"]), 

169 postfields_dict=new_role_obj, 

170 skip_query_admin=True, 

171 ) 

172 # print('HTTP CODE: {}'.format(http_code)) 

173 # print('RESP: {}'.format(resp)) 

174 if http_code in (200, 201, 202): 

175 if resp: 

176 resp = json.loads(resp) 

177 if not resp or "id" not in resp: 

178 raise ClientException( 

179 "Unexpected response from server - {}".format(resp) 

180 ) 

181 print(resp["id"]) 

182 elif http_code == 204: 

183 print("Updated") 

184 # else: 

185 # msg = "" 

186 # if resp: 

187 # try: 

188 # msg = json.loads(resp) 

189 # except ValueError: 

190 # msg = resp 

191 # raise ClientException("Failed to update role {} - {}".format(name, msg)) 

192 

193 def delete(self, name, force=False): 

194 """ 

195 Deletes an OSM role identified by name. 

196 

197 :param name: 

198 :param force: 

199 :raises ClientException: when fails to delete a role. 

200 """ 

201 self._logger.debug("") 

202 self._client.get_token() 

203 role = self.get(name) 

204 querystring = "" 

205 if force: 

206 querystring = "?FORCE=True" 

207 http_code, resp = self._http.delete_cmd( 

208 "{}/{}{}".format(self._apiBase, role["_id"], querystring), 

209 skip_query_admin=True, 

210 ) 

211 # print('HTTP CODE: {}'.format(http_code)) 

212 # print('RESP: {}'.format(resp)) 

213 if http_code == 202: 

214 print("Deletion in progress") 

215 elif http_code == 204: 

216 print("Deleted") 

217 elif resp and "result" in resp: 

218 print("Deleted") 

219 else: 

220 msg = resp or "" 

221 # if resp: 

222 # try: 

223 # msg = json.loads(resp) 

224 # except ValueError: 

225 # msg = resp 

226 raise ClientException("Failed to delete role {} - {}".format(name, msg)) 

227 

228 def list(self, filter=None): 

229 """ 

230 Returns the list of OSM role. 

231 

232 :param filter: 

233 :returns: 

234 """ 

235 self._logger.debug("") 

236 self._client.get_token() 

237 filter_string = "" 

238 if filter: 

239 filter_string = "?{}".format(filter) 

240 _, resp = self._http.get2_cmd( 

241 "{}{}".format(self._apiBase, filter_string), skip_query_admin=True 

242 ) 

243 # print('RESP: {}'.format(resp)) 

244 if resp: 

245 return json.loads(resp) 

246 return list() 

247 

248 def get(self, name): 

249 """ 

250 Returns a specific OSM role based on name or id. 

251 

252 :param name: 

253 :raises NotFound: when the role is not found. 

254 :returns: the specified role. 

255 """ 

256 self._logger.debug("") 

257 self._client.get_token() 

258 if utils.validate_uuid4(name): 

259 for role in self.list(): 

260 if name == role["_id"]: 

261 return role 

262 else: 

263 for role in self.list(): 

264 if name == role["name"]: 

265 return role 

266 raise NotFound("Role {} not found".format(name))