Coverage for osm_nbi/authconn.py: 53%

91 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2024-06-30 10:14 +0000

1# -*- coding: utf-8 -*- 

2 

3# Copyright 2018 Whitestack, LLC 

4# 

5# Licensed under the Apache License, Version 2.0 (the "License"); you may 

6# not use this file except in compliance with the License. You may obtain 

7# a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

14# License for the specific language governing permissions and limitations 

15# under the License. 

16# 

17# For those usages not covered by the Apache License, Version 2.0 please 

18# contact: esousa@whitestack.com or glavado@whitestack.com 

19## 

20 

21""" 

22Authconn implements an Abstract class for the Auth backend connector 

23plugins with the definition of the methods to be implemented. 

24""" 

25 

26__author__ = ( 

27 "Eduardo Sousa <esousa@whitestack.com>, " 

28 "Pedro de la Cruz Ramos <pdelacruzramos@altran.com>" 

29) 

30__date__ = "$27-jul-2018 23:59:59$" 

31 

32from http import HTTPStatus 

33from osm_nbi.base_topic import BaseTopic 

34 

35 

36class AuthException(Exception): 

37 """ 

38 Authentication error, because token, user password not recognized 

39 """ 

40 

41 def __init__(self, message, http_code=HTTPStatus.UNAUTHORIZED): 

42 super(AuthException, self).__init__(message) 

43 self.http_code = http_code 

44 

45 

46class AuthExceptionUnauthorized(AuthException): 

47 """ 

48 Authentication error, because not having rights to make this operation 

49 """ 

50 

51 pass 

52 

53 

54class AuthconnException(Exception): 

55 """ 

56 Common and base class Exception for all authconn exceptions. 

57 """ 

58 

59 def __init__(self, message, http_code=HTTPStatus.UNAUTHORIZED): 

60 super(AuthconnException, self).__init__(message) 

61 self.http_code = http_code 

62 

63 

64class AuthconnConnectionException(AuthconnException): 

65 """ 

66 Connectivity error with Auth backend. 

67 """ 

68 

69 def __init__(self, message, http_code=HTTPStatus.BAD_GATEWAY): 

70 super(AuthconnConnectionException, self).__init__(message, http_code) 

71 

72 

73class AuthconnNotSupportedException(AuthconnException): 

74 """ 

75 The request is not supported by the Auth backend. 

76 """ 

77 

78 def __init__(self, message, http_code=HTTPStatus.NOT_IMPLEMENTED): 

79 super(AuthconnNotSupportedException, self).__init__(message, http_code) 

80 

81 

82class AuthconnNotImplementedException(AuthconnException): 

83 """ 

84 The method is not implemented by the Auth backend. 

85 """ 

86 

87 def __init__(self, message, http_code=HTTPStatus.NOT_IMPLEMENTED): 

88 super(AuthconnNotImplementedException, self).__init__(message, http_code) 

89 

90 

91class AuthconnOperationException(AuthconnException): 

92 """ 

93 The operation executed failed. 

94 """ 

95 

96 def __init__(self, message, http_code=HTTPStatus.INTERNAL_SERVER_ERROR): 

97 super(AuthconnOperationException, self).__init__(message, http_code) 

98 

99 

100class AuthconnNotFoundException(AuthconnException): 

101 """ 

102 The operation executed failed because element not found. 

103 """ 

104 

105 def __init__(self, message, http_code=HTTPStatus.NOT_FOUND): 

106 super().__init__(message, http_code) 

107 

108 

109class AuthconnConflictException(AuthconnException): 

110 """ 

111 The operation has conflicts. 

112 """ 

113 

114 def __init__(self, message, http_code=HTTPStatus.CONFLICT): 

115 super().__init__(message, http_code) 

116 

117 

118class Authconn: 

119 """ 

120 Abstract base class for all the Auth backend connector plugins. 

121 Each Auth backend connector plugin must be a subclass of 

122 Authconn class. 

123 """ 

124 

125 def __init__(self, config, db, role_permissions): 

126 """ 

127 Constructor of the Authconn class. 

128 :param config: configuration dictionary containing all the 

129 necessary configuration parameters. 

130 :param db: internal database classs 

131 :param role_permissions: read only role permission list 

132 """ 

133 self.config = config 

134 self.role_permissions = role_permissions 

135 

136 def authenticate(self, credentials, token_info=None): 

137 """ 

138 Authenticate a user using username/password or token_info, plus project 

139 :param credentials: dictionary that contains: 

140 username: name, id or None 

141 password: password or None 

142 project_id: name, id, or None. If None first found project will be used to get an scope token 

143 other items are allowed for specific auth backends 

144 :param token_info: previous token_info to obtain authorization 

145 :return: the scoped token info or raises an exception. The token is a dictionary with: 

146 _id: token string id, 

147 username: username, 

148 project_id: scoped_token project_id, 

149 project_name: scoped_token project_name, 

150 expires: epoch time when it expires, 

151 

152 """ 

153 raise AuthconnNotImplementedException("Should have implemented this") 

154 

155 def validate_token(self, token): 

156 """ 

157 Check if the token is valid. 

158 

159 :param token: token to validate 

160 :return: dictionary with information associated with the token. If the 

161 token is not valid, returns None. 

162 """ 

163 raise AuthconnNotImplementedException("Should have implemented this") 

164 

165 def revoke_token(self, token): 

166 """ 

167 Invalidate a token. 

168 

169 :param token: token to be revoked 

170 """ 

171 raise AuthconnNotImplementedException("Should have implemented this") 

172 

173 def create_user(self, user_info): 

174 """ 

175 Create a user. 

176 

177 :param user_info: full user info. 

178 :raises AuthconnOperationException: if user creation failed. 

179 """ 

180 raise AuthconnNotImplementedException("Should have implemented this") 

181 

182 def update_user(self, user_info): 

183 """ 

184 Change the user name and/or password. 

185 

186 :param user_info: user info modifications 

187 :raises AuthconnNotImplementedException: if function not implemented 

188 """ 

189 raise AuthconnNotImplementedException("Should have implemented this") 

190 

191 def delete_user(self, user_id): 

192 """ 

193 Delete user. 

194 

195 :param user_id: user identifier. 

196 :raises AuthconnOperationException: if user deletion failed. 

197 """ 

198 raise AuthconnNotImplementedException("Should have implemented this") 

199 

200 def get_user_list(self, filter_q=None): 

201 """ 

202 Get user list. 

203 

204 :param filter_q: dictionary to filter user list by name (username is also admited) and/or _id 

205 :return: returns a list of users. 

206 """ 

207 return list() # Default return value so that the method get_user passes pylint 

208 

209 def get_user(self, _id, fail=True): 

210 """ 

211 Get one user 

212 :param _id: id or name 

213 :param fail: True to raise exception on not found. False to return None on not found 

214 :return: dictionary with the user information 

215 """ 

216 filt = {BaseTopic.id_field("users", _id): _id} 

217 users = self.get_user_list(filt) 

218 if not users: 

219 if fail: 

220 raise AuthconnNotFoundException( 

221 "User with {} not found".format(filt), 

222 http_code=HTTPStatus.NOT_FOUND, 

223 ) 

224 else: 

225 return None 

226 return users[0] 

227 

228 def create_role(self, role_info): 

229 """ 

230 Create a role. 

231 

232 :param role_info: full role info. 

233 :raises AuthconnOperationException: if role creation failed. 

234 """ 

235 raise AuthconnNotImplementedException("Should have implemented this") 

236 

237 def delete_role(self, role_id): 

238 """ 

239 Delete a role. 

240 

241 :param role_id: role identifier. 

242 :raises AuthconnOperationException: if user deletion failed. 

243 """ 

244 raise AuthconnNotImplementedException("Should have implemented this") 

245 

246 def get_role_list(self, filter_q=None): 

247 """ 

248 Get all the roles. 

249 

250 :param filter_q: dictionary to filter role list by _id and/or name. 

251 :return: list of roles 

252 """ 

253 raise AuthconnNotImplementedException("Should have implemented this") 

254 

255 def get_role(self, _id, fail=True): 

256 """ 

257 Get one role 

258 :param _id: id or name 

259 :param fail: True to raise exception on not found. False to return None on not found 

260 :return: dictionary with the role information 

261 """ 

262 filt = {BaseTopic.id_field("roles", _id): _id} 

263 roles = self.get_role_list(filt) 

264 if not roles: 

265 if fail: 

266 raise AuthconnNotFoundException("Role with {} not found".format(filt)) 

267 else: 

268 return None 

269 return roles[0] 

270 

271 def update_role(self, role_info): 

272 """ 

273 Change the information of a role 

274 :param role_info: full role info 

275 :return: None 

276 """ 

277 raise AuthconnNotImplementedException("Should have implemented this") 

278 

279 def create_project(self, project_info): 

280 """ 

281 Create a project. 

282 

283 :param project_info: full project info. 

284 :return: the internal id of the created project 

285 :raises AuthconnOperationException: if project creation failed. 

286 """ 

287 raise AuthconnNotImplementedException("Should have implemented this") 

288 

289 def delete_project(self, project_id): 

290 """ 

291 Delete a project. 

292 

293 :param project_id: project identifier. 

294 :raises AuthconnOperationException: if project deletion failed. 

295 """ 

296 raise AuthconnNotImplementedException("Should have implemented this") 

297 

298 def get_project_list(self, filter_q=None): 

299 """ 

300 Get all the projects. 

301 

302 :param filter_q: dictionary to filter project list, by "name" and/or "_id" 

303 :return: list of projects 

304 """ 

305 raise AuthconnNotImplementedException("Should have implemented this") 

306 

307 def get_project(self, _id, fail=True): 

308 """ 

309 Get one project 

310 :param _id: id or name 

311 :param fail: True to raise exception on not found. False to return None on not found 

312 :return: dictionary with the project information 

313 """ 

314 filt = {BaseTopic.id_field("projects", _id): _id} 

315 projs = self.get_project_list(filt) 

316 if not projs: 

317 if fail: 

318 raise AuthconnNotFoundException( 

319 "project with {} not found".format(filt) 

320 ) 

321 else: 

322 return None 

323 return projs[0] 

324 

325 def update_project(self, project_id, project_info): 

326 """ 

327 Change the information of a project 

328 :param project_id: project to be changed 

329 :param project_info: full project info 

330 :return: None 

331 """ 

332 raise AuthconnNotImplementedException("Should have implemented this")