# License for the specific language governing permissions and limitations
# under the License.
-from io import BytesIO
-import pycurl
+import requests
+import urllib3
+from requests.auth import HTTPBasicAuth
import json
import logging
+urllib3.disable_warnings()
+
class Http(object):
def __init__(self, url, user="admin", password="admin"):
def set_http_header(self, header):
self._http_header = header
- def _get_curl_cmd(self, endpoint):
- curl_cmd = pycurl.Curl()
- curl_cmd.setopt(pycurl.URL, self._url + endpoint)
- curl_cmd.setopt(pycurl.SSL_VERIFYPEER, 0)
- curl_cmd.setopt(pycurl.SSL_VERIFYHOST, 0)
- curl_cmd.setopt(pycurl.USERPWD, "{}:{}".format(self._user, self._password))
+ def _get_requests_cmd(self, endpoint):
+ requests_cmd = requests.Request()
+ requests_cmd.url = self._url + endpoint
+ requests_cmd.auth = HTTPBasicAuth(self._user, self._password)
if self._http_header:
- curl_cmd.setopt(pycurl.HTTPHEADER, self._http_header)
- return curl_cmd
+ requests_cmd.headers = self._http_header
+ return requests_cmd
def get_cmd(self, endpoint):
- data = BytesIO()
- curl_cmd = self._get_curl_cmd(endpoint)
- curl_cmd.setopt(pycurl.HTTPGET, 1)
- curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
+ session_cmd = requests.Session()
+ requests_cmd = self._get_requests_cmd(endpoint)
+ requests_cmd.method = "GET"
self._logger.info(
"Request METHOD: {} URL: {}".format("GET", self._url + endpoint)
)
- curl_cmd.perform()
- http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
+ requests_cmd = requests_cmd.prepare()
+ resp = session_cmd.send(requests_cmd, verify=False)
+ http_code = resp.status_code
self._logger.info("Response HTTPCODE: {}".format(http_code))
- curl_cmd.close()
- if data.getvalue():
- self._logger.debug(
- "Response DATA: {}".format(json.loads(data.getvalue().decode()))
- )
- return json.loads(data.getvalue().decode())
+ data = resp.content
+ session_cmd.close()
+ if data:
+ self._logger.debug("Response DATA: {}".format(json.loads(data.decode())))
+ return json.loads(data.decode())
return None
def delete_cmd(self, endpoint):
- data = BytesIO()
- curl_cmd = self._get_curl_cmd(endpoint)
- curl_cmd.setopt(pycurl.CUSTOMREQUEST, "DELETE")
- curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
+ session_cmd = requests.Session()
+ requests_cmd = self._get_requests_cmd(endpoint)
+ requests_cmd.method = "DELETE"
self._logger.info(
"Request METHOD: {} URL: {}".format("DELETE", self._url + endpoint)
)
- curl_cmd.perform()
- http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
+ requests_cmd = requests_cmd.prepare()
+ resp = session_cmd.send(requests_cmd, verify=False)
+ http_code = resp.status_code
self._logger.info("Response HTTPCODE: {}".format(http_code))
- curl_cmd.close()
- if data.getvalue():
- self._logger.debug(
- "Response DATA: {}".format(json.loads(data.getvalue().decode()))
- )
- return json.loads(data.getvalue().decode())
+ data = resp.content
+ session_cmd.close()
+ if data:
+ self._logger.debug("Response DATA: {}".format(json.loads(data.decode())))
+ return json.loads(data.decode())
return None
def post_cmd(
postfields_dict=None,
formfile=None,
):
- data = BytesIO()
- curl_cmd = self._get_curl_cmd(endpoint)
- curl_cmd.setopt(pycurl.POST, 1)
- curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
+ session_cmd = requests.Session()
+ requests_cmd = self._get_requests_cmd(endpoint)
+ requests_cmd.method = "POST"
if postfields_dict is not None:
- jsondata = json.dumps(postfields_dict)
- curl_cmd.setopt(pycurl.POSTFIELDS, jsondata)
+ requests_cmd.json = json.dumps(postfields_dict)
if formfile is not None:
- curl_cmd.setopt(
- pycurl.HTTPPOST, [((formfile[0], (pycurl.FORM_FILE, formfile[1])))]
- )
+ requests_cmd.files = {formfile[0]: formfile[1]}
self._logger.info(
"Request METHOD: {} URL: {}".format("POST", self._url + endpoint)
)
- curl_cmd.perform()
- http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
+ requests_cmd = requests_cmd.prepare()
+ resp = session_cmd.send(requests_cmd, verify=False)
+ http_code = resp.status_code
self._logger.info("Response HTTPCODE: {}".format(http_code))
- curl_cmd.close()
- if data.getvalue():
- self._logger.debug(
- "Response DATA: {}".format(json.loads(data.getvalue().decode()))
- )
- return json.loads(data.getvalue().decode())
+ data = resp.content
+ session_cmd.close()
+ if data:
+ self._logger.debug("Response DATA: {}".format(json.loads(data.decode())))
+ return json.loads(data.decode())
return None
)
self._headers["Accept"] = "application/json"
self._headers["Content-Type"] = "application/yaml"
- http_header = [
- "{}: {}".format(key, val) for (key, val) in list(self._headers.items())
- ]
- self._http_client.set_http_header(http_header)
+ self._http_client.set_http_header(self._headers)
self.vnfd = vnfd.Vnfd(self._http_client, client=self)
self.nsd = nsd.Nsd(self._http_client, client=self)
if self._token is not None:
self._headers["Authorization"] = "Bearer {}".format(self._token)
- http_header = [
- "{}: {}".format(key, val)
- for (key, val) in list(self._headers.items())
- ]
- self._http_client.set_http_header(http_header)
+ self._http_client.set_http_header(self._headers)
return token
def get_version(self):
# under the License.
import copy
-from io import BytesIO
import json
import logging
+import requests
+import http.client as http_client
+import urllib3
from osmclient.common import http
from osmclient.common.exceptions import OsmHttpException, NotFound
-import pycurl
+
+
+urllib3.disable_warnings()
class Http(http.Http):
endpoint = "?".join([endpoint, self._default_query_admin])
return endpoint
- def _get_curl_cmd(self, endpoint, skip_query_admin=False):
+ def _get_requests_cmd(self, endpoint, skip_query_admin=False):
self._logger.debug("")
- curl_cmd = pycurl.Curl()
+ requests_cmd = requests.Request()
if self._logger.getEffectiveLevel() == logging.DEBUG:
- curl_cmd.setopt(pycurl.VERBOSE, True)
+ http_client.HTTPConnection.debuglevel = 1
+ requests_log = logging.getLogger("requests.packages.urllib3")
+ requests_log.setLevel(logging.DEBUG)
+ requests_log.propagate = True
+ else:
+ http_client.HTTPConnection.debuglevel = 0
+ requests_log = logging.getLogger("requests.packages.urllib3")
+ requests_log.setLevel(logging.NOTSET)
+ requests_log.propagate = True
+
if not skip_query_admin:
endpoint = self._complete_endpoint(endpoint)
- curl_cmd.setopt(pycurl.CONNECTTIMEOUT, self.CONNECT_TIMEOUT)
- curl_cmd.setopt(pycurl.URL, self._url + endpoint)
- curl_cmd.setopt(pycurl.SSL_VERIFYPEER, 0)
- curl_cmd.setopt(pycurl.SSL_VERIFYHOST, 0)
+ requests_cmd.url = self._url + endpoint
if self._http_header:
- curl_cmd.setopt(pycurl.HTTPHEADER, self._http_header)
- return curl_cmd
+ requests_cmd.headers = self._http_header
+ return requests_cmd
def delete_cmd(self, endpoint, skip_query_admin=False):
+ session_cmd = requests.Session()
self._logger.debug("")
- data = BytesIO()
- curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
- curl_cmd.setopt(pycurl.CUSTOMREQUEST, "DELETE")
- curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
+ requests_cmd = self._get_requests_cmd(endpoint, skip_query_admin)
+ requests_cmd.method = "DELETE"
self._logger.info(
"Request METHOD: {} URL: {}".format("DELETE", self._url + endpoint)
)
- curl_cmd.perform()
- http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
+ requests_cmd = requests_cmd.prepare()
+ resp = session_cmd.send(
+ requests_cmd, verify=False, timeout=self.CONNECT_TIMEOUT
+ )
+ http_code = resp.status_code
self._logger.info("Response HTTPCODE: {}".format(http_code))
- curl_cmd.close()
+ data = resp.content
+ session_cmd.close()
self.check_http_response(http_code, data)
# TODO 202 accepted should be returned somehow
- if data.getvalue():
- data_text = data.getvalue().decode()
+ if data:
+ data_text = data.decode()
self._logger.verbose("Response DATA: {}".format(data_text))
return http_code, data_text
- else:
- return http_code, None
+ return http_code, None
def send_cmd(
self,
patch_method=False,
skip_query_admin=False,
):
+ session_cmd = requests.Session()
self._logger.debug("")
- data = BytesIO()
- curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
+ requests_cmd = self._get_requests_cmd(endpoint, skip_query_admin)
if put_method:
- curl_cmd.setopt(pycurl.CUSTOMREQUEST, "PUT")
+ requests_cmd.method = "PUT"
elif patch_method:
- curl_cmd.setopt(pycurl.CUSTOMREQUEST, "PATCH")
- curl_cmd.setopt(pycurl.POST, 1)
- curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
+ requests_cmd.method = "PATCH"
+ else:
+ requests_cmd.method = "POST"
if postfields_dict is not None:
jsondata = json.dumps(postfields_dict)
jsondata_log = json.dumps(postfields_dict_copy)
else:
jsondata_log = jsondata
+ requests_cmd.json = postfields_dict
self._logger.verbose("Request POSTFIELDS: {}".format(jsondata_log))
- curl_cmd.setopt(pycurl.POSTFIELDS, jsondata)
elif formfile is not None:
- curl_cmd.setopt(
- pycurl.HTTPPOST, [((formfile[0], (pycurl.FORM_FILE, formfile[1])))]
- )
+ requests_cmd.files = {formfile[0]: formfile[1]}
elif filename is not None:
with open(filename, "rb") as stream:
postdata = stream.read()
self._logger.verbose("Request POSTFIELDS: Binary content")
- curl_cmd.setopt(pycurl.POSTFIELDS, postdata)
+ requests_cmd.data = postdata
if put_method:
self._logger.info(
self._logger.info(
"Request METHOD: {} URL: {}".format("POST", self._url + endpoint)
)
- curl_cmd.perform()
- http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
+ requests_cmd = requests_cmd.prepare()
+ resp = session_cmd.send(
+ requests_cmd, verify=False, timeout=self.CONNECT_TIMEOUT
+ )
+ http_code = resp.status_code
self._logger.info("Response HTTPCODE: {}".format(http_code))
- curl_cmd.close()
+ data = resp.content
self.check_http_response(http_code, data)
- if data.getvalue():
- data_text = data.getvalue().decode()
+ session_cmd.close()
+ if data:
+ data_text = data.decode()
self._logger.verbose("Response DATA: {}".format(data_text))
return http_code, data_text
- else:
- return http_code, None
+ return http_code, None
def post_cmd(
self,
)
def get2_cmd(self, endpoint, skip_query_admin=False):
+ session_cmd = requests.Session()
self._logger.debug("")
- data = BytesIO()
- curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
- curl_cmd.setopt(pycurl.HTTPGET, 1)
- curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
+ requests_cmd = self._get_requests_cmd(endpoint, skip_query_admin)
+ requests_cmd.method = "GET"
self._logger.info(
"Request METHOD: {} URL: {}".format("GET", self._url + endpoint)
)
- curl_cmd.perform()
- http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
+ requests_cmd = requests_cmd.prepare()
+ resp = session_cmd.send(
+ requests_cmd, verify=False, timeout=self.CONNECT_TIMEOUT
+ )
+ http_code = resp.status_code
self._logger.info("Response HTTPCODE: {}".format(http_code))
- curl_cmd.close()
+ data = resp.content
+ session_cmd.close()
self.check_http_response(http_code, data)
- if data.getvalue():
- data_text = data.getvalue().decode()
+ if data:
+ data_text = data.decode()
self._logger.verbose("Response DATA: {}".format(data_text))
return http_code, data_text
return http_code, None
def check_http_response(self, http_code, data):
if http_code >= 300:
resp = ""
- if data.getvalue():
- data_text = data.getvalue().decode()
+ if data:
+ data_text = data.decode()
self._logger.verbose(
"Response {} DATA: {}".format(http_code, data_text)
)
)
headers = self._client._headers
headers["Content-Type"] = "application/yaml"
- http_header = [
- "{}: {}".format(key, val) for (key, val) in list(headers.items())
- ]
- self._http.set_http_header(http_header)
+ self._http.set_http_header(headers)
http_code, resp = self._http.post_cmd(
endpoint=self._apiBase, postfields_dict=ns
)
)
)
headers["Content-File-MD5"] = utils.md5(filename)
- http_header = [
- "{}: {}".format(key, val) for (key, val) in list(headers.items())
- ]
- self._http.set_http_header(http_header)
+ self._http.set_http_header(headers)
if update_endpoint:
http_code, resp = self._http.put_cmd(
endpoint=update_endpoint, filename=filename
)
headers = self._client._headers
headers["Content-Type"] = "application/yaml"
- http_header = [
- "{}: {}".format(key, val) for (key, val) in list(headers.items())
- ]
- self._http.set_http_header(http_header)
+ self._http.set_http_header(headers)
http_code, resp = self._http.post_cmd(
endpoint=self._apiBase, postfields_dict=nsi
)
)
)
headers["Content-File-MD5"] = utils.md5(filename)
- http_header = [
- "{}: {}".format(key, val) for (key, val) in list(headers.items())
- ]
- self._http.set_http_header(http_header)
+ self._http.set_http_header(headers)
if update_endpoint:
http_code, resp = self._http.put_cmd(
endpoint=update_endpoint, filename=filename
# file_size = stat(filename).st_size
# headers['Content-Range'] = 'bytes 0-{}/{}'.format(file_size - 1, file_size)
headers["Content-File-MD5"] = utils.md5(filename)
- http_header = [
- "{}: {}".format(key, val) for (key, val) in list(headers.items())
- ]
- self._http.set_http_header(http_header)
+ self._http.set_http_header(headers)
http_code, resp = self._http.post_cmd(endpoint=endpoint, filename=filename)
# print('HTTP CODE: {}'.format(http_code))
# print('RESP: {}'.format(resp))
self._client.get_token()
headers = self._client._headers
headers["Content-Type"] = "application/yaml"
- http_header = [
- "{}: {}".format(key, val) for (key, val) in list(headers.items())
- ]
- self._http.set_http_header(http_header)
+ self._http.set_http_header(headers)
if update_endpoint:
http_code, resp = self._http.patch_cmd(
endpoint=update_endpoint, postfields_dict=pdu
special_override_string = special_override_string.rstrip(";")
headers["Content-File-MD5"] = utils.md5(filename)
- http_header = [
- "{}: {}".format(key, val) for (key, val) in list(headers.items())
- ]
-
- self._http.set_http_header(http_header)
+ self._http.set_http_header(headers)
if update_endpoint:
http_code, resp = self._http.put_cmd(
endpoint=update_endpoint, filename=filename
jinja2
packaging
prettytable
-pycurl
python-magic
pyyaml==5.4.1
requests
# via requests
jinja2==3.1.2
# via -r requirements.in
-markupsafe==2.1.2
+markupsafe==2.1.3
# via jinja2
packaging==23.1
# via -r requirements.in