Coverage for RO-SDN-juniper_contrail/osm_rosdn_juniper_contrail/rest_lib.py: 33%

98 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2024-06-04 15:25 +0000

1# Copyright 2020 ETSI 

2# 

3# All Rights Reserved. 

4# 

5# Licensed under the Apache License, Version 2.0 (the "License"); you may 

6# not use this file except in compliance with the License. You may obtain 

7# a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

14# License for the specific language governing permissions and limitations 

15# under the License. 

16 

17import copy 

18import json 

19 

20import requests 

21from requests.exceptions import ConnectionError 

22 

23 

24class HttpException(Exception): 

25 pass 

26 

27 

28class NotFound(HttpException): 

29 pass 

30 

31 

32class AuthError(HttpException): 

33 pass 

34 

35 

36class DuplicateFound(HttpException): 

37 pass 

38 

39 

40class ServiceUnavailableException(HttpException): 

41 pass 

42 

43 

44class ContrailHttp(object): 

45 def __init__(self, auth_info, logger, verify): 

46 self._logger = logger 

47 # Verify client cert 

48 self.ssl_verify = verify 

49 # auth info: must contain auth_url and auth_dict 

50 self.auth_url = auth_info["auth_url"] 

51 self.auth_dict = auth_info["auth_dict"] 

52 

53 self.max_retries = 3 

54 

55 # Default token timeout 

56 self.token_timeout = 3500 

57 self.token = None 

58 # TODO - improve configuration timeouts 

59 

60 def get_cmd(self, url, headers): 

61 self._logger.debug("") 

62 resp = self._request("GET", url, headers) 

63 

64 return resp.json() 

65 

66 def post_cmd(self, url, headers, post_fields_dict=None): 

67 self._logger.debug("") 

68 

69 # obfuscate password before logging dict 

70 if ( 

71 post_fields_dict.get("auth", {}) 

72 .get("identity", {}) 

73 .get("password", {}) 

74 .get("user", {}) 

75 .get("password") 

76 ): 

77 post_fields_dict_copy = copy.deepcopy(post_fields_dict) 

78 post_fields_dict["auth"]["identity"]["password"]["user"][ 

79 "password" 

80 ] = "******" 

81 json_data_log = post_fields_dict_copy 

82 else: 

83 json_data_log = post_fields_dict 

84 

85 self._logger.debug("Request POSTFIELDS: {}".format(json.dumps(json_data_log))) 

86 resp = self._request("POST", url, headers, data=post_fields_dict) 

87 

88 return resp.text 

89 

90 def delete_cmd(self, url, headers): 

91 self._logger.debug("") 

92 resp = self._request("DELETE", url, headers) 

93 

94 return resp.text 

95 

96 def _request(self, op, url, http_headers, data=None, retry_auth_error=True): 

97 headers = http_headers.copy() 

98 

99 # Get authorization (include authentication headers) 

100 # TODO add again token 

101 # token = self._get_token(headers) 

102 token = None 

103 

104 if token: 

105 headers["X-Auth-Token"] = token 

106 

107 try: 

108 return self._request_noauth(op, url, headers, data) 

109 except AuthError: 

110 # If there is an auth error retry just once 

111 if retry_auth_error: 

112 return self._request(op, url, headers, data, retry_auth_error=False) 

113 

114 def _request_noauth(self, op, url, headers, data=None): 

115 # Method to execute http requests with error control 

116 # Authentication error, always make just one retry 

117 # ConnectionError or ServiceUnavailable make configured retries with sleep between them 

118 # Other errors to raise: 

119 # - NotFound 

120 # - Conflict 

121 

122 retry = 0 

123 while retry < self.max_retries: 

124 retry += 1 

125 

126 # Execute operation 

127 try: 

128 self._logger.info("Request METHOD: {} URL: {}".format(op, url)) 

129 if op == "GET": 

130 resp = self._http_get(url, headers, query_params=data) 

131 elif op == "POST": 

132 resp = self._http_post(url, headers, json_data=data) 

133 elif op == "POST_HEADERS": 

134 resp = self._http_post_headers(url, headers, json_data=data) 

135 elif op == "DELETE": 

136 resp = self._http_delete(url, headers, json_data=data) 

137 else: 

138 raise HttpException("Unsupported operation: {}".format(op)) 

139 

140 self._logger.info("Response HTTPCODE: {}".format(resp.status_code)) 

141 

142 # Check http return code 

143 if resp: 

144 return resp 

145 else: 

146 status_code = resp.status_code 

147 if status_code == 401: 

148 # Auth Error - set token to None to reload it and raise AuthError 

149 self.token = None 

150 

151 raise AuthError("Auth error executing operation") 

152 elif status_code == 409: 

153 raise DuplicateFound( 

154 "Duplicate resource url: {}, response: {}".format( 

155 url, resp.text 

156 ) 

157 ) 

158 elif status_code == 404: 

159 raise NotFound( 

160 "Not found resource url: {}, response: {}".format( 

161 url, resp.text 

162 ) 

163 ) 

164 elif resp.status_code in [502, 503]: 

165 if not self.max_retries or retry >= self.max_retries: 

166 raise ServiceUnavailableException( 

167 "Service unavailable error url: {}".format(url) 

168 ) 

169 

170 continue 

171 else: 

172 raise HttpException( 

173 "Error status_code: {}, error_text: {}".format( 

174 resp.status_code, resp.text 

175 ) 

176 ) 

177 

178 except ConnectionError as e: 

179 self._logger.error( 

180 "Connection error executing request: {}".format(repr(e)) 

181 ) 

182 

183 if not self.max_retries or retry >= self.max_retries: 

184 raise ConnectionError 

185 

186 continue 

187 except Exception as e: 

188 self._logger.error("Error executing request: {}".format(repr(e))) 

189 raise e 

190 

191 def _http_get(self, url, headers, query_params=None): 

192 return requests.get(url, headers=headers, params=query_params) 

193 

194 def _http_post_headers(self, url, headers, json_data=None): 

195 return requests.head( 

196 url, json=json_data, headers=headers, verify=self.ssl_verify 

197 ) 

198 

199 def _http_post(self, url, headers, json_data=None): 

200 return requests.post( 

201 url, json=json_data, headers=headers, verify=self.ssl_verify 

202 ) 

203 

204 def _http_delete(self, url, headers, json_data=None): 

205 return requests.delete(url, json=json_data, headers=headers)