# under the License.
import copy
-from io import BytesIO
import json
import logging
+import requests
+import http.client as http_client
+import urllib3
from osmclient.common import http
from osmclient.common.exceptions import OsmHttpException, NotFound
-import pycurl
+
+
+urllib3.disable_warnings()
class Http(http.Http):
endpoint = "?".join([endpoint, self._default_query_admin])
return endpoint
- def _get_curl_cmd(self, endpoint, skip_query_admin=False):
+ def _get_requests_cmd(self, endpoint, skip_query_admin=False):
self._logger.debug("")
- curl_cmd = pycurl.Curl()
+ requests_cmd = requests.Request()
if self._logger.getEffectiveLevel() == logging.DEBUG:
- curl_cmd.setopt(pycurl.VERBOSE, True)
+ http_client.HTTPConnection.debuglevel = 1
+ requests_log = logging.getLogger("requests.packages.urllib3")
+ requests_log.setLevel(logging.DEBUG)
+ requests_log.propagate = True
+ else:
+ http_client.HTTPConnection.debuglevel = 0
+ requests_log = logging.getLogger("requests.packages.urllib3")
+ requests_log.setLevel(logging.NOTSET)
+ requests_log.propagate = True
+
if not skip_query_admin:
endpoint = self._complete_endpoint(endpoint)
- curl_cmd.setopt(pycurl.CONNECTTIMEOUT, self.CONNECT_TIMEOUT)
- curl_cmd.setopt(pycurl.URL, self._url + endpoint)
- curl_cmd.setopt(pycurl.SSL_VERIFYPEER, 0)
- curl_cmd.setopt(pycurl.SSL_VERIFYHOST, 0)
+ requests_cmd.url = self._url + endpoint
if self._http_header:
- curl_cmd.setopt(pycurl.HTTPHEADER, self._http_header)
- return curl_cmd
+ requests_cmd.headers = self._http_header
+ return requests_cmd
def delete_cmd(self, endpoint, skip_query_admin=False):
+ session_cmd = requests.Session()
self._logger.debug("")
- data = BytesIO()
- curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
- curl_cmd.setopt(pycurl.CUSTOMREQUEST, "DELETE")
- curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
+ requests_cmd = self._get_requests_cmd(endpoint, skip_query_admin)
+ requests_cmd.method = "DELETE"
self._logger.info(
"Request METHOD: {} URL: {}".format("DELETE", self._url + endpoint)
)
- curl_cmd.perform()
- http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
+ requests_cmd = requests_cmd.prepare()
+ resp = session_cmd.send(
+ requests_cmd, verify=False, timeout=self.CONNECT_TIMEOUT
+ )
+ http_code = resp.status_code
self._logger.info("Response HTTPCODE: {}".format(http_code))
- curl_cmd.close()
+ data = resp.content
+ session_cmd.close()
self.check_http_response(http_code, data)
# TODO 202 accepted should be returned somehow
- if data.getvalue():
- data_text = data.getvalue().decode()
+ if data:
+ data_text = data.decode()
self._logger.verbose("Response DATA: {}".format(data_text))
return http_code, data_text
- else:
- return http_code, None
+ return http_code, None
def send_cmd(
self,
endpoint="",
postfields_dict=None,
+ postdata=None,
formfile=None,
filename=None,
put_method=False,
patch_method=False,
skip_query_admin=False,
):
+ session_cmd = requests.Session()
self._logger.debug("")
- data = BytesIO()
- curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
+ requests_cmd = self._get_requests_cmd(endpoint, skip_query_admin)
if put_method:
- curl_cmd.setopt(pycurl.CUSTOMREQUEST, "PUT")
+ requests_cmd.method = "PUT"
elif patch_method:
- curl_cmd.setopt(pycurl.CUSTOMREQUEST, "PATCH")
- curl_cmd.setopt(pycurl.POST, 1)
- curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
+ requests_cmd.method = "PATCH"
+ else:
+ requests_cmd.method = "POST"
+
if postfields_dict is not None:
jsondata = json.dumps(postfields_dict)
if "password" in postfields_dict:
jsondata_log = json.dumps(postfields_dict_copy)
else:
jsondata_log = jsondata
+ requests_cmd.json = postfields_dict
self._logger.verbose("Request POSTFIELDS: {}".format(jsondata_log))
- curl_cmd.setopt(pycurl.POSTFIELDS, jsondata)
- elif formfile is not None:
- curl_cmd.setopt(
- pycurl.HTTPPOST, [((formfile[0], (pycurl.FORM_FILE, formfile[1])))]
- )
- elif filename is not None:
+ if postdata is not None:
+ self._logger.verbose("Request DATA: {}".format(postdata))
+ requests_cmd.data = postdata
+ if formfile is not None:
+ self._logger.verbose("Request FILES: {}".format(formfile))
+ requests_cmd.files = formfile
+ if filename is not None:
with open(filename, "rb") as stream:
- postdata = stream.read()
+ filedata = stream.read()
self._logger.verbose("Request POSTFIELDS: Binary content")
- curl_cmd.setopt(pycurl.POSTFIELDS, postdata)
+ requests_cmd.data = filedata
if put_method:
self._logger.info(
self._logger.info(
"Request METHOD: {} URL: {}".format("POST", self._url + endpoint)
)
- curl_cmd.perform()
- http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
+ requests_cmd = requests_cmd.prepare()
+ resp = session_cmd.send(
+ requests_cmd, verify=False, timeout=self.CONNECT_TIMEOUT
+ )
+ http_code = resp.status_code
self._logger.info("Response HTTPCODE: {}".format(http_code))
- curl_cmd.close()
+ data = resp.content
self.check_http_response(http_code, data)
- if data.getvalue():
- data_text = data.getvalue().decode()
+ session_cmd.close()
+ if data:
+ data_text = data.decode()
self._logger.verbose("Response DATA: {}".format(data_text))
return http_code, data_text
- else:
- return http_code, None
+ return http_code, None
def post_cmd(
self,
endpoint="",
postfields_dict=None,
+ postdata=None,
formfile=None,
filename=None,
skip_query_admin=False,
return self.send_cmd(
endpoint=endpoint,
postfields_dict=postfields_dict,
+ postdata=postdata,
formfile=formfile,
filename=filename,
put_method=False,
self,
endpoint="",
postfields_dict=None,
+ postdata=None,
formfile=None,
filename=None,
skip_query_admin=False,
return self.send_cmd(
endpoint=endpoint,
postfields_dict=postfields_dict,
+ postdata=postdata,
formfile=formfile,
filename=filename,
put_method=True,
self,
endpoint="",
postfields_dict=None,
+ postdata=None,
formfile=None,
filename=None,
skip_query_admin=False,
return self.send_cmd(
endpoint=endpoint,
postfields_dict=postfields_dict,
+ postdata=postdata,
formfile=formfile,
filename=filename,
put_method=False,
)
def get2_cmd(self, endpoint, skip_query_admin=False):
+ session_cmd = requests.Session()
self._logger.debug("")
- data = BytesIO()
- curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
- curl_cmd.setopt(pycurl.HTTPGET, 1)
- curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
+ requests_cmd = self._get_requests_cmd(endpoint, skip_query_admin)
+ requests_cmd.method = "GET"
self._logger.info(
"Request METHOD: {} URL: {}".format("GET", self._url + endpoint)
)
- curl_cmd.perform()
- http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
+ requests_cmd = requests_cmd.prepare()
+ resp = session_cmd.send(
+ requests_cmd, verify=False, timeout=self.CONNECT_TIMEOUT
+ )
+ http_code = resp.status_code
self._logger.info("Response HTTPCODE: {}".format(http_code))
- curl_cmd.close()
+ data = resp.content
+ session_cmd.close()
self.check_http_response(http_code, data)
- if data.getvalue():
- data_text = data.getvalue().decode()
+ if data:
+ data_text = data.decode()
self._logger.verbose("Response DATA: {}".format(data_text))
return http_code, data_text
return http_code, None
def check_http_response(self, http_code, data):
if http_code >= 300:
resp = ""
- if data.getvalue():
- data_text = data.getvalue().decode()
+ if data:
+ data_text = data.decode()
self._logger.verbose(
"Response {} DATA: {}".format(http_code, data_text)
)