Merge remote-tracking branch 'origin/v8.0'
[osm/osmclient.git] / osmclient / sol005 / http.py
1 # Copyright 2018 Telefonica
2 #
3 # All Rights Reserved.
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License"); you may
6 # not use this file except in compliance with the License. You may obtain
7 # a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 # License for the specific language governing permissions and limitations
15 # under the License.
16
17 import copy
18 from io import BytesIO
19 import json
20 import logging
21
22 from osmclient.common import http
23 from osmclient.common.exceptions import OsmHttpException, NotFound
24 import pycurl
25
26
27 class Http(http.Http):
28 CONNECT_TIMEOUT = 15
29
30 def __init__(self, url, user='admin', password='admin', **kwargs):
31 self._url = url
32 self._user = user
33 self._password = password
34 self._http_header = None
35 self._logger = logging.getLogger('osmclient')
36 self._default_query_admin = None
37 self._all_projects = None
38 self._public = None
39 if 'all_projects' in kwargs:
40 self._all_projects = kwargs['all_projects']
41 if 'public' in kwargs:
42 self._public = kwargs['public']
43 self._default_query_admin = self._complete_default_query_admin()
44
45 def _complete_default_query_admin(self):
46 query_string_list = []
47 if self._all_projects:
48 query_string_list.append("ADMIN")
49 if self._public is not None:
50 query_string_list.append("PUBLIC={}".format(self._public))
51 return "&".join(query_string_list)
52
53 def _complete_endpoint(self, endpoint):
54 if self._default_query_admin:
55 if '?' in endpoint:
56 endpoint = '&'.join([endpoint, self._default_query_admin])
57 else:
58 endpoint = '?'.join([endpoint, self._default_query_admin])
59 return endpoint
60
61 def _get_curl_cmd(self, endpoint, skip_query_admin=False):
62 self._logger.debug("")
63 curl_cmd = pycurl.Curl()
64 if self._logger.getEffectiveLevel() == logging.DEBUG:
65 curl_cmd.setopt(pycurl.VERBOSE, True)
66 if not skip_query_admin:
67 endpoint = self._complete_endpoint(endpoint)
68 curl_cmd.setopt(pycurl.CONNECTTIMEOUT, self.CONNECT_TIMEOUT)
69 curl_cmd.setopt(pycurl.URL, self._url + endpoint)
70 curl_cmd.setopt(pycurl.SSL_VERIFYPEER, 0)
71 curl_cmd.setopt(pycurl.SSL_VERIFYHOST, 0)
72 if self._http_header:
73 curl_cmd.setopt(pycurl.HTTPHEADER, self._http_header)
74 return curl_cmd
75
76 def delete_cmd(self, endpoint, skip_query_admin=False):
77 self._logger.debug("")
78 data = BytesIO()
79 curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
80 curl_cmd.setopt(pycurl.CUSTOMREQUEST, "DELETE")
81 curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
82 self._logger.info("Request METHOD: {} URL: {}".format("DELETE", self._url + endpoint))
83 curl_cmd.perform()
84 http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
85 self._logger.info("Response HTTPCODE: {}".format(http_code))
86 curl_cmd.close()
87 self.check_http_response(http_code, data)
88 # TODO 202 accepted should be returned somehow
89 if data.getvalue():
90 data_text = data.getvalue().decode()
91 self._logger.verbose("Response DATA: {}".format(data_text))
92 return http_code, data_text
93 else:
94 return http_code, None
95
96 def send_cmd(self, endpoint='', postfields_dict=None,
97 formfile=None, filename=None,
98 put_method=False, patch_method=False,
99 skip_query_admin=False):
100 self._logger.debug("")
101 data = BytesIO()
102 curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
103 if put_method:
104 curl_cmd.setopt(pycurl.CUSTOMREQUEST, "PUT")
105 elif patch_method:
106 curl_cmd.setopt(pycurl.CUSTOMREQUEST, "PATCH")
107 curl_cmd.setopt(pycurl.POST, 1)
108 curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
109
110 if postfields_dict is not None:
111 jsondata = json.dumps(postfields_dict)
112 if 'password' in postfields_dict:
113 postfields_dict_copy = copy.deepcopy(postfields_dict)
114 postfields_dict_copy['password'] = '******'
115 jsondata_log = json.dumps(postfields_dict_copy)
116 else:
117 jsondata_log = jsondata
118 self._logger.verbose("Request POSTFIELDS: {}".format(jsondata_log))
119 curl_cmd.setopt(pycurl.POSTFIELDS, jsondata)
120 elif formfile is not None:
121 curl_cmd.setopt(
122 pycurl.HTTPPOST,
123 [((formfile[0],
124 (pycurl.FORM_FILE,
125 formfile[1])))])
126 elif filename is not None:
127 with open(filename, 'rb') as stream:
128 postdata = stream.read()
129 self._logger.verbose("Request POSTFIELDS: Binary content")
130 curl_cmd.setopt(pycurl.POSTFIELDS, postdata)
131
132 if put_method:
133 self._logger.info("Request METHOD: {} URL: {}".format("PUT", self._url + endpoint))
134 elif patch_method:
135 self._logger.info("Request METHOD: {} URL: {}".format("PATCH", self._url + endpoint))
136 else:
137 self._logger.info("Request METHOD: {} URL: {}".format("POST", self._url + endpoint))
138 curl_cmd.perform()
139 http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
140 self._logger.info("Response HTTPCODE: {}".format(http_code))
141 curl_cmd.close()
142 self.check_http_response(http_code, data)
143 if data.getvalue():
144 data_text = data.getvalue().decode()
145 self._logger.verbose("Response DATA: {}".format(data_text))
146 return http_code, data_text
147 else:
148 return http_code, None
149
150 def post_cmd(self, endpoint='', postfields_dict=None,
151 formfile=None, filename=None,
152 skip_query_admin=False):
153 self._logger.debug("")
154 return self.send_cmd(endpoint=endpoint,
155 postfields_dict=postfields_dict,
156 formfile=formfile, filename=filename,
157 put_method=False, patch_method=False,
158 skip_query_admin=skip_query_admin)
159
160 def put_cmd(self, endpoint='', postfields_dict=None,
161 formfile=None, filename=None,
162 skip_query_admin=False):
163 self._logger.debug("")
164 return self.send_cmd(endpoint=endpoint,
165 postfields_dict=postfields_dict,
166 formfile=formfile, filename=filename,
167 put_method=True, patch_method=False,
168 skip_query_admin=skip_query_admin)
169
170 def patch_cmd(self, endpoint='', postfields_dict=None,
171 formfile=None, filename=None,
172 skip_query_admin=False):
173 self._logger.debug("")
174 return self.send_cmd(endpoint=endpoint,
175 postfields_dict=postfields_dict,
176 formfile=formfile, filename=filename,
177 put_method=False, patch_method=True,
178 skip_query_admin=skip_query_admin)
179
180 def get2_cmd(self, endpoint, skip_query_admin=False):
181 self._logger.debug("")
182 data = BytesIO()
183 curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
184 curl_cmd.setopt(pycurl.HTTPGET, 1)
185 curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
186 self._logger.info("Request METHOD: {} URL: {}".format("GET", self._url + endpoint))
187 curl_cmd.perform()
188 http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
189 self._logger.info("Response HTTPCODE: {}".format(http_code))
190 curl_cmd.close()
191 self.check_http_response(http_code, data)
192 if data.getvalue():
193 data_text = data.getvalue().decode()
194 self._logger.verbose("Response DATA: {}".format(data_text))
195 return http_code, data_text
196 return http_code, None
197
198 def check_http_response(self, http_code, data):
199 if http_code >= 300:
200 resp = ""
201 if data.getvalue():
202 data_text = data.getvalue().decode()
203 self._logger.verbose("Response {} DATA: {}".format(http_code, data_text))
204 resp = ": " + data_text
205 else:
206 self._logger.verbose("Response {}".format(http_code))
207 if http_code == 404:
208 raise NotFound("Error {}{}".format(http_code, resp))
209 raise OsmHttpException("Error {}{}".format(http_code, resp))
210
211 def set_query_admin(self, **kwargs):
212 if 'all_projects' in kwargs:
213 self._all_projects = kwargs['all_projects']
214 if 'public' in kwargs:
215 self._public = kwargs['public']
216 self._default_query_admin = self._complete_default_query_admin()