Code Coverage

Cobertura Coverage Report > osmclient.sol005 >

http.py

Trend

Classes100%
 
Lines14%
   
Conditionals100%
 

File Coverage summary

NameClassesLinesConditionals
http.py
100%
1/1
14%
21/151
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
http.py
14%
21/151
N/A

Source

osmclient/sol005/http.py
1 # Copyright 2018 Telefonica
2 #
3 # All Rights Reserved.
4 #
5 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
6 #    not use this file except in compliance with the License. You may obtain
7 #    a copy of the License at
8 #
9 #         http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #    Unless required by applicable law or agreed to in writing, software
12 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 #    License for the specific language governing permissions and limitations
15 #    under the License.
16
17 1 import copy
18 1 from io import BytesIO
19 1 import json
20 1 import logging
21
22 1 from osmclient.common import http
23 1 from osmclient.common.exceptions import OsmHttpException, NotFound
24 1 import pycurl
25
26
27 1 class Http(http.Http):
28 1     CONNECT_TIMEOUT = 15
29
30 1     def __init__(self, url, user="admin", password="admin", **kwargs):
31 0         self._url = url
32 0         self._user = user
33 0         self._password = password
34 0         self._http_header = None
35 0         self._logger = logging.getLogger("osmclient")
36 0         self._default_query_admin = None
37 0         self._all_projects = None
38 0         self._public = None
39 0         if "all_projects" in kwargs:
40 0             self._all_projects = kwargs["all_projects"]
41 0         if "public" in kwargs:
42 0             self._public = kwargs["public"]
43 0         self._default_query_admin = self._complete_default_query_admin()
44
45 1     def _complete_default_query_admin(self):
46 0         query_string_list = []
47 0         if self._all_projects:
48 0             query_string_list.append("ADMIN")
49 0         if self._public is not None:
50 0             query_string_list.append("PUBLIC={}".format(self._public))
51 0         return "&".join(query_string_list)
52
53 1     def _complete_endpoint(self, endpoint):
54 0         if self._default_query_admin:
55 0             if "?" in endpoint:
56 0                 endpoint = "&".join([endpoint, self._default_query_admin])
57             else:
58 0                 endpoint = "?".join([endpoint, self._default_query_admin])
59 0         return endpoint
60
61 1     def _get_curl_cmd(self, endpoint, skip_query_admin=False):
62 0         self._logger.debug("")
63 0         curl_cmd = pycurl.Curl()
64 0         if self._logger.getEffectiveLevel() == logging.DEBUG:
65 0             curl_cmd.setopt(pycurl.VERBOSE, True)
66 0         if not skip_query_admin:
67 0             endpoint = self._complete_endpoint(endpoint)
68 0         curl_cmd.setopt(pycurl.CONNECTTIMEOUT, self.CONNECT_TIMEOUT)
69 0         curl_cmd.setopt(pycurl.URL, self._url + endpoint)
70 0         curl_cmd.setopt(pycurl.SSL_VERIFYPEER, 0)
71 0         curl_cmd.setopt(pycurl.SSL_VERIFYHOST, 0)
72 0         if self._http_header:
73 0             curl_cmd.setopt(pycurl.HTTPHEADER, self._http_header)
74 0         return curl_cmd
75
76 1     def delete_cmd(self, endpoint, skip_query_admin=False):
77 0         self._logger.debug("")
78 0         data = BytesIO()
79 0         curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
80 0         curl_cmd.setopt(pycurl.CUSTOMREQUEST, "DELETE")
81 0         curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
82 0         self._logger.info(
83             "Request METHOD: {} URL: {}".format("DELETE", self._url + endpoint)
84         )
85 0         curl_cmd.perform()
86 0         http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
87 0         self._logger.info("Response HTTPCODE: {}".format(http_code))
88 0         curl_cmd.close()
89 0         self.check_http_response(http_code, data)
90         # TODO 202 accepted should be returned somehow
91 0         if data.getvalue():
92 0             data_text = data.getvalue().decode()
93 0             self._logger.verbose("Response DATA: {}".format(data_text))
94 0             return http_code, data_text
95         else:
96 0             return http_code, None
97
98 1     def send_cmd(
99         self,
100         endpoint="",
101         postfields_dict=None,
102         formfile=None,
103         filename=None,
104         put_method=False,
105         patch_method=False,
106         skip_query_admin=False,
107     ):
108 0         self._logger.debug("")
109 0         data = BytesIO()
110 0         curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
111 0         if put_method:
112 0             curl_cmd.setopt(pycurl.CUSTOMREQUEST, "PUT")
113 0         elif patch_method:
114 0             curl_cmd.setopt(pycurl.CUSTOMREQUEST, "PATCH")
115 0         curl_cmd.setopt(pycurl.POST, 1)
116 0         curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
117
118 0         if postfields_dict is not None:
119 0             jsondata = json.dumps(postfields_dict)
120 0             if "password" in postfields_dict:
121 0                 postfields_dict_copy = copy.deepcopy(postfields_dict)
122 0                 postfields_dict_copy["password"] = "******"
123 0                 jsondata_log = json.dumps(postfields_dict_copy)
124             else:
125 0                 jsondata_log = jsondata
126 0             self._logger.verbose("Request POSTFIELDS: {}".format(jsondata_log))
127 0             curl_cmd.setopt(pycurl.POSTFIELDS, jsondata)
128 0         elif formfile is not None:
129 0             curl_cmd.setopt(
130                 pycurl.HTTPPOST, [((formfile[0], (pycurl.FORM_FILE, formfile[1])))]
131             )
132 0         elif filename is not None:
133 0             with open(filename, "rb") as stream:
134 0                 postdata = stream.read()
135 0             self._logger.verbose("Request POSTFIELDS: Binary content")
136 0             curl_cmd.setopt(pycurl.POSTFIELDS, postdata)
137
138 0         if put_method:
139 0             self._logger.info(
140                 "Request METHOD: {} URL: {}".format("PUT", self._url + endpoint)
141             )
142 0         elif patch_method:
143 0             self._logger.info(
144                 "Request METHOD: {} URL: {}".format("PATCH", self._url + endpoint)
145             )
146         else:
147 0             self._logger.info(
148                 "Request METHOD: {} URL: {}".format("POST", self._url + endpoint)
149             )
150 0         curl_cmd.perform()
151 0         http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
152 0         self._logger.info("Response HTTPCODE: {}".format(http_code))
153 0         curl_cmd.close()
154 0         self.check_http_response(http_code, data)
155 0         if data.getvalue():
156 0             data_text = data.getvalue().decode()
157 0             self._logger.verbose("Response DATA: {}".format(data_text))
158 0             return http_code, data_text
159         else:
160 0             return http_code, None
161
162 1     def post_cmd(
163         self,
164         endpoint="",
165         postfields_dict=None,
166         formfile=None,
167         filename=None,
168         skip_query_admin=False,
169     ):
170 0         self._logger.debug("")
171 0         return self.send_cmd(
172             endpoint=endpoint,
173             postfields_dict=postfields_dict,
174             formfile=formfile,
175             filename=filename,
176             put_method=False,
177             patch_method=False,
178             skip_query_admin=skip_query_admin,
179         )
180
181 1     def put_cmd(
182         self,
183         endpoint="",
184         postfields_dict=None,
185         formfile=None,
186         filename=None,
187         skip_query_admin=False,
188     ):
189 0         self._logger.debug("")
190 0         return self.send_cmd(
191             endpoint=endpoint,
192             postfields_dict=postfields_dict,
193             formfile=formfile,
194             filename=filename,
195             put_method=True,
196             patch_method=False,
197             skip_query_admin=skip_query_admin,
198         )
199
200 1     def patch_cmd(
201         self,
202         endpoint="",
203         postfields_dict=None,
204         formfile=None,
205         filename=None,
206         skip_query_admin=False,
207     ):
208 0         self._logger.debug("")
209 0         return self.send_cmd(
210             endpoint=endpoint,
211             postfields_dict=postfields_dict,
212             formfile=formfile,
213             filename=filename,
214             put_method=False,
215             patch_method=True,
216             skip_query_admin=skip_query_admin,
217         )
218
219 1     def get2_cmd(self, endpoint, skip_query_admin=False):
220 0         self._logger.debug("")
221 0         data = BytesIO()
222 0         curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
223 0         curl_cmd.setopt(pycurl.HTTPGET, 1)
224 0         curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
225 0         self._logger.info(
226             "Request METHOD: {} URL: {}".format("GET", self._url + endpoint)
227         )
228 0         curl_cmd.perform()
229 0         http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
230 0         self._logger.info("Response HTTPCODE: {}".format(http_code))
231 0         curl_cmd.close()
232 0         self.check_http_response(http_code, data)
233 0         if data.getvalue():
234 0             data_text = data.getvalue().decode()
235 0             self._logger.verbose("Response DATA: {}".format(data_text))
236 0             return http_code, data_text
237 0         return http_code, None
238
239 1     def check_http_response(self, http_code, data):
240 0         if http_code >= 300:
241 0             resp = ""
242 0             if data.getvalue():
243 0                 data_text = data.getvalue().decode()
244 0                 self._logger.verbose(
245                     "Response {} DATA: {}".format(http_code, data_text)
246                 )
247 0                 resp = ": " + data_text
248             else:
249 0                 self._logger.verbose("Response {}".format(http_code))
250 0             if http_code == 404:
251 0                 raise NotFound("Error {}{}".format(http_code, resp))
252 0             raise OsmHttpException("Error {}{}".format(http_code, resp))
253
254 1     def set_query_admin(self, **kwargs):
255 0         if "all_projects" in kwargs:
256 0             self._all_projects = kwargs["all_projects"]
257 0         if "public" in kwargs:
258 0             self._public = kwargs["public"]
259 0         self._default_query_admin = self._complete_default_query_admin()