X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osmclient%2Fsol005%2Fhttp.py;h=8dd87e74cbd8989fd78cd49342f9697374f93ac2;hb=HEAD;hp=b73789483b095ba84b01a49abde5110671bf43ec;hpb=9994aec868ca4e9caf22f206ebede0387b6ad4f6;p=osm%2Fosmclient.git diff --git a/osmclient/sol005/http.py b/osmclient/sol005/http.py index b737894..8dd87e7 100644 --- a/osmclient/sol005/http.py +++ b/osmclient/sol005/http.py @@ -14,35 +14,40 @@ # License for the specific language governing permissions and limitations # under the License. -from io import BytesIO -import pycurl +import copy import json import logging -import copy +import requests +import http.client as http_client +import urllib3 + from osmclient.common import http from osmclient.common.exceptions import OsmHttpException, NotFound +urllib3.disable_warnings() + + class Http(http.Http): - TIMEOUT = 10 + CONNECT_TIMEOUT = 15 - def __init__(self, url, user='admin', password='admin', **kwargs): + def __init__(self, url, user="admin", password="admin", **kwargs): self._url = url self._user = user self._password = password self._http_header = None - self._logger = logging.getLogger('osmclient') + self._logger = logging.getLogger("osmclient") self._default_query_admin = None - self._all_projects = None; - self._public = None; - if 'all_projects' in kwargs: - self._all_projects=kwargs['all_projects'] - if 'public' in kwargs: - self._public=kwargs['public'] + self._all_projects = None + self._public = None + if "all_projects" in kwargs: + self._all_projects = kwargs["all_projects"] + if "public" in kwargs: + self._public = kwargs["public"] self._default_query_admin = self._complete_default_query_admin() def _complete_default_query_admin(self): - query_string_list=[] + query_string_list = [] if self._all_projects: query_string_list.append("ADMIN") if self._public is not None: @@ -51,145 +56,198 @@ class Http(http.Http): def _complete_endpoint(self, endpoint): if self._default_query_admin: - if '?' in endpoint: - endpoint = '&'.join([endpoint, self._default_query_admin]) + if "?" in endpoint: + endpoint = "&".join([endpoint, self._default_query_admin]) else: - endpoint = '?'.join([endpoint, self._default_query_admin]) + endpoint = "?".join([endpoint, self._default_query_admin]) return endpoint - def _get_curl_cmd(self, endpoint, skip_query_admin=False): + def _get_requests_cmd(self, endpoint, skip_query_admin=False): self._logger.debug("") - curl_cmd = pycurl.Curl() + requests_cmd = requests.Request() if self._logger.getEffectiveLevel() == logging.DEBUG: - curl_cmd.setopt(pycurl.VERBOSE, True) + http_client.HTTPConnection.debuglevel = 1 + requests_log = logging.getLogger("requests.packages.urllib3") + requests_log.setLevel(logging.DEBUG) + requests_log.propagate = True + else: + http_client.HTTPConnection.debuglevel = 0 + requests_log = logging.getLogger("requests.packages.urllib3") + requests_log.setLevel(logging.NOTSET) + requests_log.propagate = True + if not skip_query_admin: endpoint = self._complete_endpoint(endpoint) - curl_cmd.setopt(pycurl.TIMEOUT, self.TIMEOUT) - curl_cmd.setopt(pycurl.URL, self._url + endpoint) - curl_cmd.setopt(pycurl.SSL_VERIFYPEER, 0) - curl_cmd.setopt(pycurl.SSL_VERIFYHOST, 0) + requests_cmd.url = self._url + endpoint if self._http_header: - curl_cmd.setopt(pycurl.HTTPHEADER, self._http_header) - return curl_cmd + requests_cmd.headers = self._http_header + return requests_cmd def delete_cmd(self, endpoint, skip_query_admin=False): + session_cmd = requests.Session() self._logger.debug("") - data = BytesIO() - curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin) - curl_cmd.setopt(pycurl.CUSTOMREQUEST, "DELETE") - curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write) - self._logger.info("Request METHOD: {} URL: {}".format("DELETE",self._url + endpoint)) - curl_cmd.perform() - http_code = curl_cmd.getinfo(pycurl.HTTP_CODE) + requests_cmd = self._get_requests_cmd(endpoint, skip_query_admin) + requests_cmd.method = "DELETE" + self._logger.info( + "Request METHOD: {} URL: {}".format("DELETE", self._url + endpoint) + ) + requests_cmd = requests_cmd.prepare() + resp = session_cmd.send( + requests_cmd, verify=False, timeout=self.CONNECT_TIMEOUT + ) + http_code = resp.status_code self._logger.info("Response HTTPCODE: {}".format(http_code)) - curl_cmd.close() + data = resp.content + session_cmd.close() self.check_http_response(http_code, data) # TODO 202 accepted should be returned somehow - if data.getvalue(): - data_text = data.getvalue().decode() + if data: + data_text = data.decode() self._logger.verbose("Response DATA: {}".format(data_text)) return http_code, data_text - else: - return http_code, None + return http_code, None - def send_cmd(self, endpoint='', postfields_dict=None, - formfile=None, filename=None, - put_method=False, patch_method=False, - skip_query_admin=False): + def send_cmd( + self, + endpoint="", + postfields_dict=None, + formfile=None, + filename=None, + put_method=False, + patch_method=False, + skip_query_admin=False, + ): + session_cmd = requests.Session() self._logger.debug("") - data = BytesIO() - curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin) + requests_cmd = self._get_requests_cmd(endpoint, skip_query_admin) if put_method: - curl_cmd.setopt(pycurl.CUSTOMREQUEST, "PUT") + requests_cmd.method = "PUT" elif patch_method: - curl_cmd.setopt(pycurl.CUSTOMREQUEST, "PATCH") - curl_cmd.setopt(pycurl.POST, 1) - curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write) + requests_cmd.method = "PATCH" + else: + requests_cmd.method = "POST" if postfields_dict is not None: jsondata = json.dumps(postfields_dict) - if 'password' in postfields_dict: + if "password" in postfields_dict: postfields_dict_copy = copy.deepcopy(postfields_dict) - postfields_dict_copy['password']='******' + postfields_dict_copy["password"] = "******" jsondata_log = json.dumps(postfields_dict_copy) else: jsondata_log = jsondata + requests_cmd.json = postfields_dict self._logger.verbose("Request POSTFIELDS: {}".format(jsondata_log)) - curl_cmd.setopt(pycurl.POSTFIELDS, jsondata) elif formfile is not None: - curl_cmd.setopt( - pycurl.HTTPPOST, - [((formfile[0], - (pycurl.FORM_FILE, - formfile[1])))]) + requests_cmd.files = {formfile[0]: formfile[1]} elif filename is not None: - with open(filename, 'rb') as stream: - postdata=stream.read() + with open(filename, "rb") as stream: + postdata = stream.read() self._logger.verbose("Request POSTFIELDS: Binary content") - curl_cmd.setopt(pycurl.POSTFIELDS, postdata) + requests_cmd.data = postdata if put_method: - self._logger.info("Request METHOD: {} URL: {}".format("PUT",self._url + endpoint)) + self._logger.info( + "Request METHOD: {} URL: {}".format("PUT", self._url + endpoint) + ) elif patch_method: - self._logger.info("Request METHOD: {} URL: {}".format("PATCH",self._url + endpoint)) + self._logger.info( + "Request METHOD: {} URL: {}".format("PATCH", self._url + endpoint) + ) else: - self._logger.info("Request METHOD: {} URL: {}".format("POST",self._url + endpoint)) - curl_cmd.perform() - http_code = curl_cmd.getinfo(pycurl.HTTP_CODE) + self._logger.info( + "Request METHOD: {} URL: {}".format("POST", self._url + endpoint) + ) + requests_cmd = requests_cmd.prepare() + resp = session_cmd.send( + requests_cmd, verify=False, timeout=self.CONNECT_TIMEOUT + ) + http_code = resp.status_code self._logger.info("Response HTTPCODE: {}".format(http_code)) - curl_cmd.close() + data = resp.content self.check_http_response(http_code, data) - if data.getvalue(): - data_text = data.getvalue().decode() + session_cmd.close() + if data: + data_text = data.decode() self._logger.verbose("Response DATA: {}".format(data_text)) return http_code, data_text - else: - return http_code, None + return http_code, None - def post_cmd(self, endpoint='', postfields_dict=None, - formfile=None, filename=None, - skip_query_admin=False): + def post_cmd( + self, + endpoint="", + postfields_dict=None, + formfile=None, + filename=None, + skip_query_admin=False, + ): self._logger.debug("") - return self.send_cmd(endpoint=endpoint, - postfields_dict=postfields_dict, - formfile=formfile, filename=filename, - put_method=False, patch_method=False, - skip_query_admin=skip_query_admin) - - def put_cmd(self, endpoint='', postfields_dict=None, - formfile=None, filename=None, - skip_query_admin=False): + return self.send_cmd( + endpoint=endpoint, + postfields_dict=postfields_dict, + formfile=formfile, + filename=filename, + put_method=False, + patch_method=False, + skip_query_admin=skip_query_admin, + ) + + def put_cmd( + self, + endpoint="", + postfields_dict=None, + formfile=None, + filename=None, + skip_query_admin=False, + ): self._logger.debug("") - return self.send_cmd(endpoint=endpoint, - postfields_dict=postfields_dict, - formfile=formfile, filename=filename, - put_method=True, patch_method=False, - skip_query_admin=skip_query_admin) - - def patch_cmd(self, endpoint='', postfields_dict=None, - formfile=None, filename=None, - skip_query_admin=False): + return self.send_cmd( + endpoint=endpoint, + postfields_dict=postfields_dict, + formfile=formfile, + filename=filename, + put_method=True, + patch_method=False, + skip_query_admin=skip_query_admin, + ) + + def patch_cmd( + self, + endpoint="", + postfields_dict=None, + formfile=None, + filename=None, + skip_query_admin=False, + ): self._logger.debug("") - return self.send_cmd(endpoint=endpoint, - postfields_dict=postfields_dict, - formfile=formfile, filename=filename, - put_method=False, patch_method=True, - skip_query_admin=skip_query_admin) + return self.send_cmd( + endpoint=endpoint, + postfields_dict=postfields_dict, + formfile=formfile, + filename=filename, + put_method=False, + patch_method=True, + skip_query_admin=skip_query_admin, + ) def get2_cmd(self, endpoint, skip_query_admin=False): + session_cmd = requests.Session() self._logger.debug("") - data = BytesIO() - curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin) - curl_cmd.setopt(pycurl.HTTPGET, 1) - curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write) - self._logger.info("Request METHOD: {} URL: {}".format("GET",self._url + endpoint)) - curl_cmd.perform() - http_code = curl_cmd.getinfo(pycurl.HTTP_CODE) + requests_cmd = self._get_requests_cmd(endpoint, skip_query_admin) + requests_cmd.method = "GET" + self._logger.info( + "Request METHOD: {} URL: {}".format("GET", self._url + endpoint) + ) + requests_cmd = requests_cmd.prepare() + resp = session_cmd.send( + requests_cmd, verify=False, timeout=self.CONNECT_TIMEOUT + ) + http_code = resp.status_code self._logger.info("Response HTTPCODE: {}".format(http_code)) - curl_cmd.close() + data = resp.content + session_cmd.close() self.check_http_response(http_code, data) - if data.getvalue(): - data_text = data.getvalue().decode() + if data: + data_text = data.decode() self._logger.verbose("Response DATA: {}".format(data_text)) return http_code, data_text return http_code, None @@ -197,9 +255,11 @@ class Http(http.Http): def check_http_response(self, http_code, data): if http_code >= 300: resp = "" - if data.getvalue(): - data_text = data.getvalue().decode() - self._logger.verbose("Response {} DATA: {}".format(http_code, data_text)) + if data: + data_text = data.decode() + self._logger.verbose( + "Response {} DATA: {}".format(http_code, data_text) + ) resp = ": " + data_text else: self._logger.verbose("Response {}".format(http_code)) @@ -208,8 +268,8 @@ class Http(http.Http): raise OsmHttpException("Error {}{}".format(http_code, resp)) def set_query_admin(self, **kwargs): - if 'all_projects' in kwargs: - self._all_projects=kwargs['all_projects'] - if 'public' in kwargs: - self._public=kwargs['public'] + if "all_projects" in kwargs: + self._all_projects = kwargs["all_projects"] + if "public" in kwargs: + self._public = kwargs["public"] self._default_query_admin = self._complete_default_query_admin()