d130879e990d8206e14f5874d1b2ba57d433ecbc
[osm/osmclient.git] / osmclient / sol005 / http.py
1 # Copyright 2018 Telefonica
2 #
3 # All Rights Reserved.
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License"); you may
6 # not use this file except in compliance with the License. You may obtain
7 # a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 # License for the specific language governing permissions and limitations
15 # under the License.
16
17 from io import BytesIO
18 import pycurl
19 import json
20 import logging
21 import copy
22 from osmclient.common import http
23 from osmclient.common.exceptions import OsmHttpException, NotFound
24
25
26 class Http(http.Http):
27
28 def __init__(self, url, user='admin', password='admin', **kwargs):
29 self._url = url
30 self._user = user
31 self._password = password
32 self._http_header = None
33 self._logger = logging.getLogger('osmclient')
34 self._default_query_admin = None
35 self._all_projects = None;
36 self._public = None;
37 if 'all_projects' in kwargs:
38 self._all_projects=kwargs['all_projects']
39 if 'public' in kwargs:
40 self._public=kwargs['public']
41 self._default_query_admin = self._complete_default_query_admin()
42
43 def _complete_default_query_admin(self):
44 query_string_list=[]
45 if self._all_projects:
46 query_string_list.append("ADMIN")
47 if self._public is not None:
48 query_string_list.append("PUBLIC={}".format(self._public))
49 return "&".join(query_string_list)
50
51 def _complete_endpoint(self, endpoint):
52 if self._default_query_admin:
53 if '?' in endpoint:
54 endpoint = '&'.join([endpoint, self._default_query_admin])
55 else:
56 endpoint = '?'.join([endpoint, self._default_query_admin])
57 return endpoint
58
59 def _get_curl_cmd(self, endpoint, skip_query_admin=False):
60 self._logger.debug("")
61 curl_cmd = pycurl.Curl()
62 if self._logger.getEffectiveLevel() == logging.DEBUG:
63 curl_cmd.setopt(pycurl.VERBOSE, True)
64 if not skip_query_admin:
65 endpoint = self._complete_endpoint(endpoint)
66 curl_cmd.setopt(pycurl.URL, self._url + endpoint)
67 curl_cmd.setopt(pycurl.SSL_VERIFYPEER, 0)
68 curl_cmd.setopt(pycurl.SSL_VERIFYHOST, 0)
69 if self._http_header:
70 curl_cmd.setopt(pycurl.HTTPHEADER, self._http_header)
71 return curl_cmd
72
73 def delete_cmd(self, endpoint, skip_query_admin=False):
74 self._logger.debug("")
75 data = BytesIO()
76 curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
77 curl_cmd.setopt(pycurl.CUSTOMREQUEST, "DELETE")
78 curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
79 self._logger.info("Request METHOD: {} URL: {}".format("DELETE",self._url + endpoint))
80 curl_cmd.perform()
81 http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
82 self._logger.info("Response HTTPCODE: {}".format(http_code))
83 curl_cmd.close()
84 self.check_http_response(http_code, data)
85 # TODO 202 accepted should be returned somehow
86 if data.getvalue():
87 data_text = data.getvalue().decode()
88 self._logger.verbose("Response DATA: {}".format(data_text))
89 return http_code, data_text
90 else:
91 return http_code, None
92
93 def send_cmd(self, endpoint='', postfields_dict=None,
94 formfile=None, filename=None,
95 put_method=False, patch_method=False,
96 skip_query_admin=False):
97 self._logger.debug("")
98 data = BytesIO()
99 curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
100 if put_method:
101 curl_cmd.setopt(pycurl.CUSTOMREQUEST, "PUT")
102 elif patch_method:
103 curl_cmd.setopt(pycurl.CUSTOMREQUEST, "PATCH")
104 curl_cmd.setopt(pycurl.POST, 1)
105 curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
106
107 if postfields_dict is not None:
108 jsondata = json.dumps(postfields_dict)
109 if 'password' in postfields_dict:
110 postfields_dict_copy = copy.deepcopy(postfields_dict)
111 postfields_dict_copy['password']='******'
112 jsondata_log = json.dumps(postfields_dict_copy)
113 else:
114 jsondata_log = jsondata
115 self._logger.verbose("Request POSTFIELDS: {}".format(jsondata_log))
116 curl_cmd.setopt(pycurl.POSTFIELDS, jsondata)
117 elif formfile is not None:
118 curl_cmd.setopt(
119 pycurl.HTTPPOST,
120 [((formfile[0],
121 (pycurl.FORM_FILE,
122 formfile[1])))])
123 elif filename is not None:
124 with open(filename, 'rb') as stream:
125 postdata=stream.read()
126 self._logger.verbose("Request POSTFIELDS: Binary content")
127 curl_cmd.setopt(pycurl.POSTFIELDS, postdata)
128
129 if put_method:
130 self._logger.info("Request METHOD: {} URL: {}".format("PUT",self._url + endpoint))
131 elif patch_method:
132 self._logger.info("Request METHOD: {} URL: {}".format("PATCH",self._url + endpoint))
133 else:
134 self._logger.info("Request METHOD: {} URL: {}".format("POST",self._url + endpoint))
135 curl_cmd.perform()
136 http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
137 self._logger.info("Response HTTPCODE: {}".format(http_code))
138 curl_cmd.close()
139 self.check_http_response(http_code, data)
140 if data.getvalue():
141 data_text = data.getvalue().decode()
142 self._logger.verbose("Response DATA: {}".format(data_text))
143 return http_code, data_text
144 else:
145 return http_code, None
146
147 def post_cmd(self, endpoint='', postfields_dict=None,
148 formfile=None, filename=None,
149 skip_query_admin=False):
150 self._logger.debug("")
151 return self.send_cmd(endpoint=endpoint,
152 postfields_dict=postfields_dict,
153 formfile=formfile, filename=filename,
154 put_method=False, patch_method=False,
155 skip_query_admin=skip_query_admin)
156
157 def put_cmd(self, endpoint='', postfields_dict=None,
158 formfile=None, filename=None,
159 skip_query_admin=False):
160 self._logger.debug("")
161 return self.send_cmd(endpoint=endpoint,
162 postfields_dict=postfields_dict,
163 formfile=formfile, filename=filename,
164 put_method=True, patch_method=False,
165 skip_query_admin=skip_query_admin)
166
167 def patch_cmd(self, endpoint='', postfields_dict=None,
168 formfile=None, filename=None,
169 skip_query_admin=False):
170 self._logger.debug("")
171 return self.send_cmd(endpoint=endpoint,
172 postfields_dict=postfields_dict,
173 formfile=formfile, filename=filename,
174 put_method=False, patch_method=True,
175 skip_query_admin=skip_query_admin)
176
177 def get2_cmd(self, endpoint, skip_query_admin=False):
178 self._logger.debug("")
179 data = BytesIO()
180 curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
181 curl_cmd.setopt(pycurl.HTTPGET, 1)
182 curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
183 self._logger.info("Request METHOD: {} URL: {}".format("GET",self._url + endpoint))
184 curl_cmd.perform()
185 http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
186 self._logger.info("Response HTTPCODE: {}".format(http_code))
187 curl_cmd.close()
188 self.check_http_response(http_code, data)
189 if data.getvalue():
190 data_text = data.getvalue().decode()
191 self._logger.verbose("Response DATA: {}".format(data_text))
192 return http_code, data_text
193 return http_code, None
194
195 def check_http_response(self, http_code, data):
196 if http_code >= 300:
197 resp = ""
198 if data.getvalue():
199 data_text = data.getvalue().decode()
200 self._logger.verbose("Response {} DATA: {}".format(http_code, data_text))
201 resp = ": " + data_text
202 else:
203 self._logger.verbose("Response {}".format(http_code))
204 if http_code == 404:
205 raise NotFound("Error {}{}".format(http_code, resp))
206 raise OsmHttpException("Error {}{}".format(http_code, resp))
207
208 def set_query_admin(self, **kwargs):
209 if 'all_projects' in kwargs:
210 self._all_projects=kwargs['all_projects']
211 if 'public' in kwargs:
212 self._public=kwargs['public']
213 self._default_query_admin = self._complete_default_query_admin()