Coverage for osmclient/sol005/http.py: 14%

160 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2024-06-22 09:01 +0000

1# Copyright 2018 Telefonica 

2# 

3# All Rights Reserved. 

4# 

5# Licensed under the Apache License, Version 2.0 (the "License"); you may 

6# not use this file except in compliance with the License. You may obtain 

7# a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

14# License for the specific language governing permissions and limitations 

15# under the License. 

16 

17import copy 

18import json 

19import logging 

20import requests 

21import http.client as http_client 

22import urllib3 

23 

24from osmclient.common import http 

25from osmclient.common.exceptions import OsmHttpException, NotFound 

26 

27 

28urllib3.disable_warnings() 

29 

30 

31class Http(http.Http): 

32 CONNECT_TIMEOUT = 15 

33 

34 def __init__(self, url, user="admin", password="admin", **kwargs): 

35 self._url = url 

36 self._user = user 

37 self._password = password 

38 self._http_header = None 

39 self._logger = logging.getLogger("osmclient") 

40 self._default_query_admin = None 

41 self._all_projects = None 

42 self._public = None 

43 if "all_projects" in kwargs: 

44 self._all_projects = kwargs["all_projects"] 

45 if "public" in kwargs: 

46 self._public = kwargs["public"] 

47 self._default_query_admin = self._complete_default_query_admin() 

48 

49 def _complete_default_query_admin(self): 

50 query_string_list = [] 

51 if self._all_projects: 

52 query_string_list.append("ADMIN") 

53 if self._public is not None: 

54 query_string_list.append("PUBLIC={}".format(self._public)) 

55 return "&".join(query_string_list) 

56 

57 def _complete_endpoint(self, endpoint): 

58 if self._default_query_admin: 

59 if "?" in endpoint: 

60 endpoint = "&".join([endpoint, self._default_query_admin]) 

61 else: 

62 endpoint = "?".join([endpoint, self._default_query_admin]) 

63 return endpoint 

64 

65 def _get_requests_cmd(self, endpoint, skip_query_admin=False): 

66 self._logger.debug("") 

67 requests_cmd = requests.Request() 

68 if self._logger.getEffectiveLevel() == logging.DEBUG: 

69 http_client.HTTPConnection.debuglevel = 1 

70 requests_log = logging.getLogger("requests.packages.urllib3") 

71 requests_log.setLevel(logging.DEBUG) 

72 requests_log.propagate = True 

73 else: 

74 http_client.HTTPConnection.debuglevel = 0 

75 requests_log = logging.getLogger("requests.packages.urllib3") 

76 requests_log.setLevel(logging.NOTSET) 

77 requests_log.propagate = True 

78 

79 if not skip_query_admin: 

80 endpoint = self._complete_endpoint(endpoint) 

81 requests_cmd.url = self._url + endpoint 

82 if self._http_header: 

83 requests_cmd.headers = self._http_header 

84 return requests_cmd 

85 

86 def delete_cmd(self, endpoint, skip_query_admin=False): 

87 session_cmd = requests.Session() 

88 self._logger.debug("") 

89 requests_cmd = self._get_requests_cmd(endpoint, skip_query_admin) 

90 requests_cmd.method = "DELETE" 

91 self._logger.info( 

92 "Request METHOD: {} URL: {}".format("DELETE", self._url + endpoint) 

93 ) 

94 requests_cmd = requests_cmd.prepare() 

95 resp = session_cmd.send( 

96 requests_cmd, verify=False, timeout=self.CONNECT_TIMEOUT 

97 ) 

98 http_code = resp.status_code 

99 self._logger.info("Response HTTPCODE: {}".format(http_code)) 

100 data = resp.content 

101 session_cmd.close() 

102 self.check_http_response(http_code, data) 

103 # TODO 202 accepted should be returned somehow 

104 if data: 

105 data_text = data.decode() 

106 self._logger.verbose("Response DATA: {}".format(data_text)) 

107 return http_code, data_text 

108 return http_code, None 

109 

110 def send_cmd( 

111 self, 

112 endpoint="", 

113 postfields_dict=None, 

114 formfile=None, 

115 filename=None, 

116 put_method=False, 

117 patch_method=False, 

118 skip_query_admin=False, 

119 ): 

120 session_cmd = requests.Session() 

121 self._logger.debug("") 

122 requests_cmd = self._get_requests_cmd(endpoint, skip_query_admin) 

123 if put_method: 

124 requests_cmd.method = "PUT" 

125 elif patch_method: 

126 requests_cmd.method = "PATCH" 

127 else: 

128 requests_cmd.method = "POST" 

129 

130 if postfields_dict is not None: 

131 jsondata = json.dumps(postfields_dict) 

132 if "password" in postfields_dict: 

133 postfields_dict_copy = copy.deepcopy(postfields_dict) 

134 postfields_dict_copy["password"] = "******" 

135 jsondata_log = json.dumps(postfields_dict_copy) 

136 else: 

137 jsondata_log = jsondata 

138 requests_cmd.json = postfields_dict 

139 self._logger.verbose("Request POSTFIELDS: {}".format(jsondata_log)) 

140 elif formfile is not None: 

141 requests_cmd.files = {formfile[0]: formfile[1]} 

142 elif filename is not None: 

143 with open(filename, "rb") as stream: 

144 postdata = stream.read() 

145 self._logger.verbose("Request POSTFIELDS: Binary content") 

146 requests_cmd.data = postdata 

147 

148 if put_method: 

149 self._logger.info( 

150 "Request METHOD: {} URL: {}".format("PUT", self._url + endpoint) 

151 ) 

152 elif patch_method: 

153 self._logger.info( 

154 "Request METHOD: {} URL: {}".format("PATCH", self._url + endpoint) 

155 ) 

156 else: 

157 self._logger.info( 

158 "Request METHOD: {} URL: {}".format("POST", self._url + endpoint) 

159 ) 

160 requests_cmd = requests_cmd.prepare() 

161 resp = session_cmd.send( 

162 requests_cmd, verify=False, timeout=self.CONNECT_TIMEOUT 

163 ) 

164 http_code = resp.status_code 

165 self._logger.info("Response HTTPCODE: {}".format(http_code)) 

166 data = resp.content 

167 self.check_http_response(http_code, data) 

168 session_cmd.close() 

169 if data: 

170 data_text = data.decode() 

171 self._logger.verbose("Response DATA: {}".format(data_text)) 

172 return http_code, data_text 

173 return http_code, None 

174 

175 def post_cmd( 

176 self, 

177 endpoint="", 

178 postfields_dict=None, 

179 formfile=None, 

180 filename=None, 

181 skip_query_admin=False, 

182 ): 

183 self._logger.debug("") 

184 return self.send_cmd( 

185 endpoint=endpoint, 

186 postfields_dict=postfields_dict, 

187 formfile=formfile, 

188 filename=filename, 

189 put_method=False, 

190 patch_method=False, 

191 skip_query_admin=skip_query_admin, 

192 ) 

193 

194 def put_cmd( 

195 self, 

196 endpoint="", 

197 postfields_dict=None, 

198 formfile=None, 

199 filename=None, 

200 skip_query_admin=False, 

201 ): 

202 self._logger.debug("") 

203 return self.send_cmd( 

204 endpoint=endpoint, 

205 postfields_dict=postfields_dict, 

206 formfile=formfile, 

207 filename=filename, 

208 put_method=True, 

209 patch_method=False, 

210 skip_query_admin=skip_query_admin, 

211 ) 

212 

213 def patch_cmd( 

214 self, 

215 endpoint="", 

216 postfields_dict=None, 

217 formfile=None, 

218 filename=None, 

219 skip_query_admin=False, 

220 ): 

221 self._logger.debug("") 

222 return self.send_cmd( 

223 endpoint=endpoint, 

224 postfields_dict=postfields_dict, 

225 formfile=formfile, 

226 filename=filename, 

227 put_method=False, 

228 patch_method=True, 

229 skip_query_admin=skip_query_admin, 

230 ) 

231 

232 def get2_cmd(self, endpoint, skip_query_admin=False): 

233 session_cmd = requests.Session() 

234 self._logger.debug("") 

235 requests_cmd = self._get_requests_cmd(endpoint, skip_query_admin) 

236 requests_cmd.method = "GET" 

237 self._logger.info( 

238 "Request METHOD: {} URL: {}".format("GET", self._url + endpoint) 

239 ) 

240 requests_cmd = requests_cmd.prepare() 

241 resp = session_cmd.send( 

242 requests_cmd, verify=False, timeout=self.CONNECT_TIMEOUT 

243 ) 

244 http_code = resp.status_code 

245 self._logger.info("Response HTTPCODE: {}".format(http_code)) 

246 data = resp.content 

247 session_cmd.close() 

248 self.check_http_response(http_code, data) 

249 if data: 

250 data_text = data.decode() 

251 self._logger.verbose("Response DATA: {}".format(data_text)) 

252 return http_code, data_text 

253 return http_code, None 

254 

255 def check_http_response(self, http_code, data): 

256 if http_code >= 300: 

257 resp = "" 

258 if data: 

259 data_text = data.decode() 

260 self._logger.verbose( 

261 "Response {} DATA: {}".format(http_code, data_text) 

262 ) 

263 resp = ": " + data_text 

264 else: 

265 self._logger.verbose("Response {}".format(http_code)) 

266 if http_code == 404: 

267 raise NotFound("Error {}{}".format(http_code, resp)) 

268 raise OsmHttpException("Error {}{}".format(http_code, resp)) 

269 

270 def set_query_admin(self, **kwargs): 

271 if "all_projects" in kwargs: 

272 self._all_projects = kwargs["all_projects"] 

273 if "public" in kwargs: 

274 self._public = kwargs["public"] 

275 self._default_query_admin = self._complete_default_query_admin()