# License for the specific language governing permissions and limitations
# under the License.
-from io import BytesIO
-import pycurl
+import copy
import json
import logging
-import copy
+import requests
+import http.client as http_client
+import urllib3
+
from osmclient.common import http
from osmclient.common.exceptions import OsmHttpException, NotFound
+urllib3.disable_warnings()
+
+
class Http(http.Http):
- TIMEOUT = 10
+ CONNECT_TIMEOUT = 15
- def __init__(self, url, user='admin', password='admin', **kwargs):
+ def __init__(self, url, user="admin", password="admin", **kwargs):
self._url = url
self._user = user
self._password = password
self._http_header = None
- self._logger = logging.getLogger('osmclient')
+ self._logger = logging.getLogger("osmclient")
self._default_query_admin = None
- self._all_projects = None;
- self._public = None;
- if 'all_projects' in kwargs:
- self._all_projects=kwargs['all_projects']
- if 'public' in kwargs:
- self._public=kwargs['public']
+ self._all_projects = None
+ self._public = None
+ if "all_projects" in kwargs:
+ self._all_projects = kwargs["all_projects"]
+ if "public" in kwargs:
+ self._public = kwargs["public"]
self._default_query_admin = self._complete_default_query_admin()
def _complete_default_query_admin(self):
- query_string_list=[]
+ query_string_list = []
if self._all_projects:
query_string_list.append("ADMIN")
if self._public is not None:
def _complete_endpoint(self, endpoint):
if self._default_query_admin:
- if '?' in endpoint:
- endpoint = '&'.join([endpoint, self._default_query_admin])
+ if "?" in endpoint:
+ endpoint = "&".join([endpoint, self._default_query_admin])
else:
- endpoint = '?'.join([endpoint, self._default_query_admin])
+ endpoint = "?".join([endpoint, self._default_query_admin])
return endpoint
- def _get_curl_cmd(self, endpoint, skip_query_admin=False):
+ def _get_requests_cmd(self, endpoint, skip_query_admin=False):
self._logger.debug("")
- curl_cmd = pycurl.Curl()
+ requests_cmd = requests.Request()
if self._logger.getEffectiveLevel() == logging.DEBUG:
- curl_cmd.setopt(pycurl.VERBOSE, True)
+ http_client.HTTPConnection.debuglevel = 1
+ requests_log = logging.getLogger("requests.packages.urllib3")
+ requests_log.setLevel(logging.DEBUG)
+ requests_log.propagate = True
+ else:
+ http_client.HTTPConnection.debuglevel = 0
+ requests_log = logging.getLogger("requests.packages.urllib3")
+ requests_log.setLevel(logging.NOTSET)
+ requests_log.propagate = True
+
if not skip_query_admin:
endpoint = self._complete_endpoint(endpoint)
- curl_cmd.setopt(pycurl.TIMEOUT, self.TIMEOUT)
- curl_cmd.setopt(pycurl.URL, self._url + endpoint)
- curl_cmd.setopt(pycurl.SSL_VERIFYPEER, 0)
- curl_cmd.setopt(pycurl.SSL_VERIFYHOST, 0)
+ requests_cmd.url = self._url + endpoint
if self._http_header:
- curl_cmd.setopt(pycurl.HTTPHEADER, self._http_header)
- return curl_cmd
+ requests_cmd.headers = self._http_header
+ return requests_cmd
def delete_cmd(self, endpoint, skip_query_admin=False):
+ session_cmd = requests.Session()
self._logger.debug("")
- data = BytesIO()
- curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
- curl_cmd.setopt(pycurl.CUSTOMREQUEST, "DELETE")
- curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
- self._logger.info("Request METHOD: {} URL: {}".format("DELETE",self._url + endpoint))
- curl_cmd.perform()
- http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
+ requests_cmd = self._get_requests_cmd(endpoint, skip_query_admin)
+ requests_cmd.method = "DELETE"
+ self._logger.info(
+ "Request METHOD: {} URL: {}".format("DELETE", self._url + endpoint)
+ )
+ requests_cmd = requests_cmd.prepare()
+ resp = session_cmd.send(
+ requests_cmd, verify=False, timeout=self.CONNECT_TIMEOUT
+ )
+ http_code = resp.status_code
self._logger.info("Response HTTPCODE: {}".format(http_code))
- curl_cmd.close()
+ data = resp.content
+ session_cmd.close()
self.check_http_response(http_code, data)
# TODO 202 accepted should be returned somehow
- if data.getvalue():
- data_text = data.getvalue().decode()
+ if data:
+ data_text = data.decode()
self._logger.verbose("Response DATA: {}".format(data_text))
return http_code, data_text
- else:
- return http_code, None
+ return http_code, None
- def send_cmd(self, endpoint='', postfields_dict=None,
- formfile=None, filename=None,
- put_method=False, patch_method=False,
- skip_query_admin=False):
+ def send_cmd(
+ self,
+ endpoint="",
+ postfields_dict=None,
+ postdata=None,
+ formfile=None,
+ filename=None,
+ put_method=False,
+ patch_method=False,
+ skip_query_admin=False,
+ ):
+ session_cmd = requests.Session()
self._logger.debug("")
- data = BytesIO()
- curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
+ requests_cmd = self._get_requests_cmd(endpoint, skip_query_admin)
if put_method:
- curl_cmd.setopt(pycurl.CUSTOMREQUEST, "PUT")
+ requests_cmd.method = "PUT"
elif patch_method:
- curl_cmd.setopt(pycurl.CUSTOMREQUEST, "PATCH")
- curl_cmd.setopt(pycurl.POST, 1)
- curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
+ requests_cmd.method = "PATCH"
+ else:
+ requests_cmd.method = "POST"
if postfields_dict is not None:
jsondata = json.dumps(postfields_dict)
- if 'password' in postfields_dict:
+ if "password" in postfields_dict:
postfields_dict_copy = copy.deepcopy(postfields_dict)
- postfields_dict_copy['password']='******'
+ postfields_dict_copy["password"] = "******"
jsondata_log = json.dumps(postfields_dict_copy)
else:
jsondata_log = jsondata
+ requests_cmd.json = postfields_dict
self._logger.verbose("Request POSTFIELDS: {}".format(jsondata_log))
- curl_cmd.setopt(pycurl.POSTFIELDS, jsondata)
- elif formfile is not None:
- curl_cmd.setopt(
- pycurl.HTTPPOST,
- [((formfile[0],
- (pycurl.FORM_FILE,
- formfile[1])))])
- elif filename is not None:
- with open(filename, 'rb') as stream:
- postdata=stream.read()
+ if postdata is not None:
+ self._logger.verbose("Request DATA: {}".format(postdata))
+ requests_cmd.data = postdata
+ if formfile is not None:
+ self._logger.verbose("Request FILES: {}".format(formfile))
+ requests_cmd.files = formfile
+ if filename is not None:
+ with open(filename, "rb") as stream:
+ filedata = stream.read()
self._logger.verbose("Request POSTFIELDS: Binary content")
- curl_cmd.setopt(pycurl.POSTFIELDS, postdata)
+ requests_cmd.data = filedata
if put_method:
- self._logger.info("Request METHOD: {} URL: {}".format("PUT",self._url + endpoint))
+ self._logger.info(
+ "Request METHOD: {} URL: {}".format("PUT", self._url + endpoint)
+ )
elif patch_method:
- self._logger.info("Request METHOD: {} URL: {}".format("PATCH",self._url + endpoint))
+ self._logger.info(
+ "Request METHOD: {} URL: {}".format("PATCH", self._url + endpoint)
+ )
else:
- self._logger.info("Request METHOD: {} URL: {}".format("POST",self._url + endpoint))
- curl_cmd.perform()
- http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
+ self._logger.info(
+ "Request METHOD: {} URL: {}".format("POST", self._url + endpoint)
+ )
+ requests_cmd = requests_cmd.prepare()
+ resp = session_cmd.send(
+ requests_cmd, verify=False, timeout=self.CONNECT_TIMEOUT
+ )
+ http_code = resp.status_code
self._logger.info("Response HTTPCODE: {}".format(http_code))
- curl_cmd.close()
+ data = resp.content
self.check_http_response(http_code, data)
- if data.getvalue():
- data_text = data.getvalue().decode()
+ session_cmd.close()
+ if data:
+ data_text = data.decode()
self._logger.verbose("Response DATA: {}".format(data_text))
return http_code, data_text
- else:
- return http_code, None
+ return http_code, None
- def post_cmd(self, endpoint='', postfields_dict=None,
- formfile=None, filename=None,
- skip_query_admin=False):
+ def post_cmd(
+ self,
+ endpoint="",
+ postfields_dict=None,
+ postdata=None,
+ formfile=None,
+ filename=None,
+ skip_query_admin=False,
+ ):
self._logger.debug("")
- return self.send_cmd(endpoint=endpoint,
- postfields_dict=postfields_dict,
- formfile=formfile, filename=filename,
- put_method=False, patch_method=False,
- skip_query_admin=skip_query_admin)
-
- def put_cmd(self, endpoint='', postfields_dict=None,
- formfile=None, filename=None,
- skip_query_admin=False):
+ return self.send_cmd(
+ endpoint=endpoint,
+ postfields_dict=postfields_dict,
+ postdata=postdata,
+ formfile=formfile,
+ filename=filename,
+ put_method=False,
+ patch_method=False,
+ skip_query_admin=skip_query_admin,
+ )
+
+ def put_cmd(
+ self,
+ endpoint="",
+ postfields_dict=None,
+ postdata=None,
+ formfile=None,
+ filename=None,
+ skip_query_admin=False,
+ ):
self._logger.debug("")
- return self.send_cmd(endpoint=endpoint,
- postfields_dict=postfields_dict,
- formfile=formfile, filename=filename,
- put_method=True, patch_method=False,
- skip_query_admin=skip_query_admin)
-
- def patch_cmd(self, endpoint='', postfields_dict=None,
- formfile=None, filename=None,
- skip_query_admin=False):
+ return self.send_cmd(
+ endpoint=endpoint,
+ postfields_dict=postfields_dict,
+ postdata=postdata,
+ formfile=formfile,
+ filename=filename,
+ put_method=True,
+ patch_method=False,
+ skip_query_admin=skip_query_admin,
+ )
+
+ def patch_cmd(
+ self,
+ endpoint="",
+ postfields_dict=None,
+ postdata=None,
+ formfile=None,
+ filename=None,
+ skip_query_admin=False,
+ ):
self._logger.debug("")
- return self.send_cmd(endpoint=endpoint,
- postfields_dict=postfields_dict,
- formfile=formfile, filename=filename,
- put_method=False, patch_method=True,
- skip_query_admin=skip_query_admin)
+ return self.send_cmd(
+ endpoint=endpoint,
+ postfields_dict=postfields_dict,
+ postdata=postdata,
+ formfile=formfile,
+ filename=filename,
+ put_method=False,
+ patch_method=True,
+ skip_query_admin=skip_query_admin,
+ )
def get2_cmd(self, endpoint, skip_query_admin=False):
+ session_cmd = requests.Session()
self._logger.debug("")
- data = BytesIO()
- curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
- curl_cmd.setopt(pycurl.HTTPGET, 1)
- curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
- self._logger.info("Request METHOD: {} URL: {}".format("GET",self._url + endpoint))
- curl_cmd.perform()
- http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
+ requests_cmd = self._get_requests_cmd(endpoint, skip_query_admin)
+ requests_cmd.method = "GET"
+ self._logger.info(
+ "Request METHOD: {} URL: {}".format("GET", self._url + endpoint)
+ )
+ requests_cmd = requests_cmd.prepare()
+ resp = session_cmd.send(
+ requests_cmd, verify=False, timeout=self.CONNECT_TIMEOUT
+ )
+ http_code = resp.status_code
self._logger.info("Response HTTPCODE: {}".format(http_code))
- curl_cmd.close()
+ data = resp.content
+ session_cmd.close()
self.check_http_response(http_code, data)
- if data.getvalue():
- data_text = data.getvalue().decode()
+ if data:
+ data_text = data.decode()
self._logger.verbose("Response DATA: {}".format(data_text))
return http_code, data_text
return http_code, None
def check_http_response(self, http_code, data):
if http_code >= 300:
resp = ""
- if data.getvalue():
- data_text = data.getvalue().decode()
- self._logger.verbose("Response {} DATA: {}".format(http_code, data_text))
+ if data:
+ data_text = data.decode()
+ self._logger.verbose(
+ "Response {} DATA: {}".format(http_code, data_text)
+ )
resp = ": " + data_text
else:
self._logger.verbose("Response {}".format(http_code))
raise OsmHttpException("Error {}{}".format(http_code, resp))
def set_query_admin(self, **kwargs):
- if 'all_projects' in kwargs:
- self._all_projects=kwargs['all_projects']
- if 'public' in kwargs:
- self._public=kwargs['public']
+ if "all_projects" in kwargs:
+ self._all_projects = kwargs["all_projects"]
+ if "public" in kwargs:
+ self._public = kwargs["public"]
self._default_query_admin = self._complete_default_query_admin()