Code Coverage

Cobertura Coverage Report > osmclient.sol005 >

http.py

Trend

File Coverage summary

NameClassesLinesConditionals
http.py
100%
1/1
14%
23/160
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
http.py
14%
23/160
N/A

Source

osmclient/sol005/http.py
1 # Copyright 2018 Telefonica
2 #
3 # All Rights Reserved.
4 #
5 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
6 #    not use this file except in compliance with the License. You may obtain
7 #    a copy of the License at
8 #
9 #         http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #    Unless required by applicable law or agreed to in writing, software
12 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 #    License for the specific language governing permissions and limitations
15 #    under the License.
16
17 1 import copy
18 1 import json
19 1 import logging
20 1 import requests
21 1 import http.client as http_client
22 1 import urllib3
23
24 1 from osmclient.common import http
25 1 from osmclient.common.exceptions import OsmHttpException, NotFound
26
27
28 1 urllib3.disable_warnings()
29
30
31 1 class Http(http.Http):
32 1     CONNECT_TIMEOUT = 15
33
34 1     def __init__(self, url, user="admin", password="admin", **kwargs):
35 0         self._url = url
36 0         self._user = user
37 0         self._password = password
38 0         self._http_header = None
39 0         self._logger = logging.getLogger("osmclient")
40 0         self._default_query_admin = None
41 0         self._all_projects = None
42 0         self._public = None
43 0         if "all_projects" in kwargs:
44 0             self._all_projects = kwargs["all_projects"]
45 0         if "public" in kwargs:
46 0             self._public = kwargs["public"]
47 0         self._default_query_admin = self._complete_default_query_admin()
48
49 1     def _complete_default_query_admin(self):
50 0         query_string_list = []
51 0         if self._all_projects:
52 0             query_string_list.append("ADMIN")
53 0         if self._public is not None:
54 0             query_string_list.append("PUBLIC={}".format(self._public))
55 0         return "&".join(query_string_list)
56
57 1     def _complete_endpoint(self, endpoint):
58 0         if self._default_query_admin:
59 0             if "?" in endpoint:
60 0                 endpoint = "&".join([endpoint, self._default_query_admin])
61             else:
62 0                 endpoint = "?".join([endpoint, self._default_query_admin])
63 0         return endpoint
64
65 1     def _get_requests_cmd(self, endpoint, skip_query_admin=False):
66 0         self._logger.debug("")
67 0         requests_cmd = requests.Request()
68 0         if self._logger.getEffectiveLevel() == logging.DEBUG:
69 0             http_client.HTTPConnection.debuglevel = 1
70 0             requests_log = logging.getLogger("requests.packages.urllib3")
71 0             requests_log.setLevel(logging.DEBUG)
72 0             requests_log.propagate = True
73         else:
74 0             http_client.HTTPConnection.debuglevel = 0
75 0             requests_log = logging.getLogger("requests.packages.urllib3")
76 0             requests_log.setLevel(logging.NOTSET)
77 0             requests_log.propagate = True
78
79 0         if not skip_query_admin:
80 0             endpoint = self._complete_endpoint(endpoint)
81 0         requests_cmd.url = self._url + endpoint
82 0         if self._http_header:
83 0             requests_cmd.headers = self._http_header
84 0         return requests_cmd
85
86 1     def delete_cmd(self, endpoint, skip_query_admin=False):
87 0         session_cmd = requests.Session()
88 0         self._logger.debug("")
89 0         requests_cmd = self._get_requests_cmd(endpoint, skip_query_admin)
90 0         requests_cmd.method = "DELETE"
91 0         self._logger.info(
92             "Request METHOD: {} URL: {}".format("DELETE", self._url + endpoint)
93         )
94 0         requests_cmd = requests_cmd.prepare()
95 0         resp = session_cmd.send(
96             requests_cmd, verify=False, timeout=self.CONNECT_TIMEOUT
97         )
98 0         http_code = resp.status_code
99 0         self._logger.info("Response HTTPCODE: {}".format(http_code))
100 0         data = resp.content
101 0         session_cmd.close()
102 0         self.check_http_response(http_code, data)
103         # TODO 202 accepted should be returned somehow
104 0         if data:
105 0             data_text = data.decode()
106 0             self._logger.verbose("Response DATA: {}".format(data_text))
107 0             return http_code, data_text
108 0         return http_code, None
109
110 1     def send_cmd(
111         self,
112         endpoint="",
113         postfields_dict=None,
114         formfile=None,
115         filename=None,
116         put_method=False,
117         patch_method=False,
118         skip_query_admin=False,
119     ):
120 0         session_cmd = requests.Session()
121 0         self._logger.debug("")
122 0         requests_cmd = self._get_requests_cmd(endpoint, skip_query_admin)
123 0         if put_method:
124 0             requests_cmd.method = "PUT"
125 0         elif patch_method:
126 0             requests_cmd.method = "PATCH"
127         else:
128 0             requests_cmd.method = "POST"
129
130 0         if postfields_dict is not None:
131 0             jsondata = json.dumps(postfields_dict)
132 0             if "password" in postfields_dict:
133 0                 postfields_dict_copy = copy.deepcopy(postfields_dict)
134 0                 postfields_dict_copy["password"] = "******"
135 0                 jsondata_log = json.dumps(postfields_dict_copy)
136             else:
137 0                 jsondata_log = jsondata
138 0             requests_cmd.json = postfields_dict
139 0             self._logger.verbose("Request POSTFIELDS: {}".format(jsondata_log))
140 0         elif formfile is not None:
141 0             requests_cmd.files = {formfile[0]: formfile[1]}
142 0         elif filename is not None:
143 0             with open(filename, "rb") as stream:
144 0                 postdata = stream.read()
145 0             self._logger.verbose("Request POSTFIELDS: Binary content")
146 0             requests_cmd.data = postdata
147
148 0         if put_method:
149 0             self._logger.info(
150                 "Request METHOD: {} URL: {}".format("PUT", self._url + endpoint)
151             )
152 0         elif patch_method:
153 0             self._logger.info(
154                 "Request METHOD: {} URL: {}".format("PATCH", self._url + endpoint)
155             )
156         else:
157 0             self._logger.info(
158                 "Request METHOD: {} URL: {}".format("POST", self._url + endpoint)
159             )
160 0         requests_cmd = requests_cmd.prepare()
161 0         resp = session_cmd.send(
162             requests_cmd, verify=False, timeout=self.CONNECT_TIMEOUT
163         )
164 0         http_code = resp.status_code
165 0         self._logger.info("Response HTTPCODE: {}".format(http_code))
166 0         data = resp.content
167 0         self.check_http_response(http_code, data)
168 0         session_cmd.close()
169 0         if data:
170 0             data_text = data.decode()
171 0             self._logger.verbose("Response DATA: {}".format(data_text))
172 0             return http_code, data_text
173 0         return http_code, None
174
175 1     def post_cmd(
176         self,
177         endpoint="",
178         postfields_dict=None,
179         formfile=None,
180         filename=None,
181         skip_query_admin=False,
182     ):
183 0         self._logger.debug("")
184 0         return self.send_cmd(
185             endpoint=endpoint,
186             postfields_dict=postfields_dict,
187             formfile=formfile,
188             filename=filename,
189             put_method=False,
190             patch_method=False,
191             skip_query_admin=skip_query_admin,
192         )
193
194 1     def put_cmd(
195         self,
196         endpoint="",
197         postfields_dict=None,
198         formfile=None,
199         filename=None,
200         skip_query_admin=False,
201     ):
202 0         self._logger.debug("")
203 0         return self.send_cmd(
204             endpoint=endpoint,
205             postfields_dict=postfields_dict,
206             formfile=formfile,
207             filename=filename,
208             put_method=True,
209             patch_method=False,
210             skip_query_admin=skip_query_admin,
211         )
212
213 1     def patch_cmd(
214         self,
215         endpoint="",
216         postfields_dict=None,
217         formfile=None,
218         filename=None,
219         skip_query_admin=False,
220     ):
221 0         self._logger.debug("")
222 0         return self.send_cmd(
223             endpoint=endpoint,
224             postfields_dict=postfields_dict,
225             formfile=formfile,
226             filename=filename,
227             put_method=False,
228             patch_method=True,
229             skip_query_admin=skip_query_admin,
230         )
231
232 1     def get2_cmd(self, endpoint, skip_query_admin=False):
233 0         session_cmd = requests.Session()
234 0         self._logger.debug("")
235 0         requests_cmd = self._get_requests_cmd(endpoint, skip_query_admin)
236 0         requests_cmd.method = "GET"
237 0         self._logger.info(
238             "Request METHOD: {} URL: {}".format("GET", self._url + endpoint)
239         )
240 0         requests_cmd = requests_cmd.prepare()
241 0         resp = session_cmd.send(
242             requests_cmd, verify=False, timeout=self.CONNECT_TIMEOUT
243         )
244 0         http_code = resp.status_code
245 0         self._logger.info("Response HTTPCODE: {}".format(http_code))
246 0         data = resp.content
247 0         session_cmd.close()
248 0         self.check_http_response(http_code, data)
249 0         if data:
250 0             data_text = data.decode()
251 0             self._logger.verbose("Response DATA: {}".format(data_text))
252 0             return http_code, data_text
253 0         return http_code, None
254
255 1     def check_http_response(self, http_code, data):
256 0         if http_code >= 300:
257 0             resp = ""
258 0             if data:
259 0                 data_text = data.decode()
260 0                 self._logger.verbose(
261                     "Response {} DATA: {}".format(http_code, data_text)
262                 )
263 0                 resp = ": " + data_text
264             else:
265 0                 self._logger.verbose("Response {}".format(http_code))
266 0             if http_code == 404:
267 0                 raise NotFound("Error {}{}".format(http_code, resp))
268 0             raise OsmHttpException("Error {}{}".format(http_code, resp))
269
270 1     def set_query_admin(self, **kwargs):
271 0         if "all_projects" in kwargs:
272 0             self._all_projects = kwargs["all_projects"]
273 0         if "public" in kwargs:
274 0             self._public = kwargs["public"]
275 0         self._default_query_admin = self._complete_default_query_admin()