Standardize Formatting
[osm/osmclient.git] / osmclient / sol005 / http.py
1 # Copyright 2018 Telefonica
2 #
3 # All Rights Reserved.
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License"); you may
6 # not use this file except in compliance with the License. You may obtain
7 # a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 # License for the specific language governing permissions and limitations
15 # under the License.
16
17 import copy
18 from io import BytesIO
19 import json
20 import logging
21
22 from osmclient.common import http
23 from osmclient.common.exceptions import OsmHttpException, NotFound
24 import pycurl
25
26
27 class Http(http.Http):
28 CONNECT_TIMEOUT = 15
29
30 def __init__(self, url, user="admin", password="admin", **kwargs):
31 self._url = url
32 self._user = user
33 self._password = password
34 self._http_header = None
35 self._logger = logging.getLogger("osmclient")
36 self._default_query_admin = None
37 self._all_projects = None
38 self._public = None
39 if "all_projects" in kwargs:
40 self._all_projects = kwargs["all_projects"]
41 if "public" in kwargs:
42 self._public = kwargs["public"]
43 self._default_query_admin = self._complete_default_query_admin()
44
45 def _complete_default_query_admin(self):
46 query_string_list = []
47 if self._all_projects:
48 query_string_list.append("ADMIN")
49 if self._public is not None:
50 query_string_list.append("PUBLIC={}".format(self._public))
51 return "&".join(query_string_list)
52
53 def _complete_endpoint(self, endpoint):
54 if self._default_query_admin:
55 if "?" in endpoint:
56 endpoint = "&".join([endpoint, self._default_query_admin])
57 else:
58 endpoint = "?".join([endpoint, self._default_query_admin])
59 return endpoint
60
61 def _get_curl_cmd(self, endpoint, skip_query_admin=False):
62 self._logger.debug("")
63 curl_cmd = pycurl.Curl()
64 if self._logger.getEffectiveLevel() == logging.DEBUG:
65 curl_cmd.setopt(pycurl.VERBOSE, True)
66 if not skip_query_admin:
67 endpoint = self._complete_endpoint(endpoint)
68 curl_cmd.setopt(pycurl.CONNECTTIMEOUT, self.CONNECT_TIMEOUT)
69 curl_cmd.setopt(pycurl.URL, self._url + endpoint)
70 curl_cmd.setopt(pycurl.SSL_VERIFYPEER, 0)
71 curl_cmd.setopt(pycurl.SSL_VERIFYHOST, 0)
72 if self._http_header:
73 curl_cmd.setopt(pycurl.HTTPHEADER, self._http_header)
74 return curl_cmd
75
76 def delete_cmd(self, endpoint, skip_query_admin=False):
77 self._logger.debug("")
78 data = BytesIO()
79 curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
80 curl_cmd.setopt(pycurl.CUSTOMREQUEST, "DELETE")
81 curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
82 self._logger.info(
83 "Request METHOD: {} URL: {}".format("DELETE", self._url + endpoint)
84 )
85 curl_cmd.perform()
86 http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
87 self._logger.info("Response HTTPCODE: {}".format(http_code))
88 curl_cmd.close()
89 self.check_http_response(http_code, data)
90 # TODO 202 accepted should be returned somehow
91 if data.getvalue():
92 data_text = data.getvalue().decode()
93 self._logger.verbose("Response DATA: {}".format(data_text))
94 return http_code, data_text
95 else:
96 return http_code, None
97
98 def send_cmd(
99 self,
100 endpoint="",
101 postfields_dict=None,
102 formfile=None,
103 filename=None,
104 put_method=False,
105 patch_method=False,
106 skip_query_admin=False,
107 ):
108 self._logger.debug("")
109 data = BytesIO()
110 curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
111 if put_method:
112 curl_cmd.setopt(pycurl.CUSTOMREQUEST, "PUT")
113 elif patch_method:
114 curl_cmd.setopt(pycurl.CUSTOMREQUEST, "PATCH")
115 curl_cmd.setopt(pycurl.POST, 1)
116 curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
117
118 if postfields_dict is not None:
119 jsondata = json.dumps(postfields_dict)
120 if "password" in postfields_dict:
121 postfields_dict_copy = copy.deepcopy(postfields_dict)
122 postfields_dict_copy["password"] = "******"
123 jsondata_log = json.dumps(postfields_dict_copy)
124 else:
125 jsondata_log = jsondata
126 self._logger.verbose("Request POSTFIELDS: {}".format(jsondata_log))
127 curl_cmd.setopt(pycurl.POSTFIELDS, jsondata)
128 elif formfile is not None:
129 curl_cmd.setopt(
130 pycurl.HTTPPOST, [((formfile[0], (pycurl.FORM_FILE, formfile[1])))]
131 )
132 elif filename is not None:
133 with open(filename, "rb") as stream:
134 postdata = stream.read()
135 self._logger.verbose("Request POSTFIELDS: Binary content")
136 curl_cmd.setopt(pycurl.POSTFIELDS, postdata)
137
138 if put_method:
139 self._logger.info(
140 "Request METHOD: {} URL: {}".format("PUT", self._url + endpoint)
141 )
142 elif patch_method:
143 self._logger.info(
144 "Request METHOD: {} URL: {}".format("PATCH", self._url + endpoint)
145 )
146 else:
147 self._logger.info(
148 "Request METHOD: {} URL: {}".format("POST", self._url + endpoint)
149 )
150 curl_cmd.perform()
151 http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
152 self._logger.info("Response HTTPCODE: {}".format(http_code))
153 curl_cmd.close()
154 self.check_http_response(http_code, data)
155 if data.getvalue():
156 data_text = data.getvalue().decode()
157 self._logger.verbose("Response DATA: {}".format(data_text))
158 return http_code, data_text
159 else:
160 return http_code, None
161
162 def post_cmd(
163 self,
164 endpoint="",
165 postfields_dict=None,
166 formfile=None,
167 filename=None,
168 skip_query_admin=False,
169 ):
170 self._logger.debug("")
171 return self.send_cmd(
172 endpoint=endpoint,
173 postfields_dict=postfields_dict,
174 formfile=formfile,
175 filename=filename,
176 put_method=False,
177 patch_method=False,
178 skip_query_admin=skip_query_admin,
179 )
180
181 def put_cmd(
182 self,
183 endpoint="",
184 postfields_dict=None,
185 formfile=None,
186 filename=None,
187 skip_query_admin=False,
188 ):
189 self._logger.debug("")
190 return self.send_cmd(
191 endpoint=endpoint,
192 postfields_dict=postfields_dict,
193 formfile=formfile,
194 filename=filename,
195 put_method=True,
196 patch_method=False,
197 skip_query_admin=skip_query_admin,
198 )
199
200 def patch_cmd(
201 self,
202 endpoint="",
203 postfields_dict=None,
204 formfile=None,
205 filename=None,
206 skip_query_admin=False,
207 ):
208 self._logger.debug("")
209 return self.send_cmd(
210 endpoint=endpoint,
211 postfields_dict=postfields_dict,
212 formfile=formfile,
213 filename=filename,
214 put_method=False,
215 patch_method=True,
216 skip_query_admin=skip_query_admin,
217 )
218
219 def get2_cmd(self, endpoint, skip_query_admin=False):
220 self._logger.debug("")
221 data = BytesIO()
222 curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
223 curl_cmd.setopt(pycurl.HTTPGET, 1)
224 curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
225 self._logger.info(
226 "Request METHOD: {} URL: {}".format("GET", self._url + endpoint)
227 )
228 curl_cmd.perform()
229 http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
230 self._logger.info("Response HTTPCODE: {}".format(http_code))
231 curl_cmd.close()
232 self.check_http_response(http_code, data)
233 if data.getvalue():
234 data_text = data.getvalue().decode()
235 self._logger.verbose("Response DATA: {}".format(data_text))
236 return http_code, data_text
237 return http_code, None
238
239 def check_http_response(self, http_code, data):
240 if http_code >= 300:
241 resp = ""
242 if data.getvalue():
243 data_text = data.getvalue().decode()
244 self._logger.verbose(
245 "Response {} DATA: {}".format(http_code, data_text)
246 )
247 resp = ": " + data_text
248 else:
249 self._logger.verbose("Response {}".format(http_code))
250 if http_code == 404:
251 raise NotFound("Error {}{}".format(http_code, resp))
252 raise OsmHttpException("Error {}{}".format(http_code, resp))
253
254 def set_query_admin(self, **kwargs):
255 if "all_projects" in kwargs:
256 self._all_projects = kwargs["all_projects"]
257 if "public" in kwargs:
258 self._public = kwargs["public"]
259 self._default_query_admin = self._complete_default_query_admin()