summary |
shortlog |
log |
commit | commitdiff |
tree
raw |
patch |
inline | side by side (from parent 1:
e197f95)
Change-Id: Iab7e5baf25e1917658de627d48e575363787469f
Signed-off-by: gonzalezpach <pol.gonzalez.pacheco@upc.edu>
12 files changed:
# License for the specific language governing permissions and limitations
# under the License.
# License for the specific language governing permissions and limitations
# under the License.
-from io import BytesIO
-import pycurl
+import requests
+import urllib3
+from requests.auth import HTTPBasicAuth
import json
import logging
import json
import logging
+urllib3.disable_warnings()
+
class Http(object):
def __init__(self, url, user="admin", password="admin"):
class Http(object):
def __init__(self, url, user="admin", password="admin"):
def set_http_header(self, header):
self._http_header = header
def set_http_header(self, header):
self._http_header = header
- def _get_curl_cmd(self, endpoint):
- curl_cmd = pycurl.Curl()
- curl_cmd.setopt(pycurl.URL, self._url + endpoint)
- curl_cmd.setopt(pycurl.SSL_VERIFYPEER, 0)
- curl_cmd.setopt(pycurl.SSL_VERIFYHOST, 0)
- curl_cmd.setopt(pycurl.USERPWD, "{}:{}".format(self._user, self._password))
+ def _get_requests_cmd(self, endpoint):
+ requests_cmd = requests.Request()
+ requests_cmd.url = self._url + endpoint
+ requests_cmd.auth = HTTPBasicAuth(self._user, self._password)
- curl_cmd.setopt(pycurl.HTTPHEADER, self._http_header)
- return curl_cmd
+ requests_cmd.headers = self._http_header
+ return requests_cmd
def get_cmd(self, endpoint):
def get_cmd(self, endpoint):
- data = BytesIO()
- curl_cmd = self._get_curl_cmd(endpoint)
- curl_cmd.setopt(pycurl.HTTPGET, 1)
- curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
+ session_cmd = requests.Session()
+ requests_cmd = self._get_requests_cmd(endpoint)
+ requests_cmd.method = "GET"
self._logger.info(
"Request METHOD: {} URL: {}".format("GET", self._url + endpoint)
)
self._logger.info(
"Request METHOD: {} URL: {}".format("GET", self._url + endpoint)
)
- curl_cmd.perform()
- http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
+ requests_cmd = requests_cmd.prepare()
+ resp = session_cmd.send(requests_cmd, verify=False)
+ http_code = resp.status_code
self._logger.info("Response HTTPCODE: {}".format(http_code))
self._logger.info("Response HTTPCODE: {}".format(http_code))
- curl_cmd.close()
- if data.getvalue():
- self._logger.debug(
- "Response DATA: {}".format(json.loads(data.getvalue().decode()))
- )
- return json.loads(data.getvalue().decode())
+ data = resp.content
+ session_cmd.close()
+ if data:
+ self._logger.debug("Response DATA: {}".format(json.loads(data.decode())))
+ return json.loads(data.decode())
return None
def delete_cmd(self, endpoint):
return None
def delete_cmd(self, endpoint):
- data = BytesIO()
- curl_cmd = self._get_curl_cmd(endpoint)
- curl_cmd.setopt(pycurl.CUSTOMREQUEST, "DELETE")
- curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
+ session_cmd = requests.Session()
+ requests_cmd = self._get_requests_cmd(endpoint)
+ requests_cmd.method = "DELETE"
self._logger.info(
"Request METHOD: {} URL: {}".format("DELETE", self._url + endpoint)
)
self._logger.info(
"Request METHOD: {} URL: {}".format("DELETE", self._url + endpoint)
)
- curl_cmd.perform()
- http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
+ requests_cmd = requests_cmd.prepare()
+ resp = session_cmd.send(requests_cmd, verify=False)
+ http_code = resp.status_code
self._logger.info("Response HTTPCODE: {}".format(http_code))
self._logger.info("Response HTTPCODE: {}".format(http_code))
- curl_cmd.close()
- if data.getvalue():
- self._logger.debug(
- "Response DATA: {}".format(json.loads(data.getvalue().decode()))
- )
- return json.loads(data.getvalue().decode())
+ data = resp.content
+ session_cmd.close()
+ if data:
+ self._logger.debug("Response DATA: {}".format(json.loads(data.decode())))
+ return json.loads(data.decode())
return None
def post_cmd(
return None
def post_cmd(
postfields_dict=None,
formfile=None,
):
postfields_dict=None,
formfile=None,
):
- data = BytesIO()
- curl_cmd = self._get_curl_cmd(endpoint)
- curl_cmd.setopt(pycurl.POST, 1)
- curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
+ session_cmd = requests.Session()
+ requests_cmd = self._get_requests_cmd(endpoint)
+ requests_cmd.method = "POST"
if postfields_dict is not None:
if postfields_dict is not None:
- jsondata = json.dumps(postfields_dict)
- curl_cmd.setopt(pycurl.POSTFIELDS, jsondata)
+ requests_cmd.json = json.dumps(postfields_dict)
- curl_cmd.setopt(
- pycurl.HTTPPOST, [((formfile[0], (pycurl.FORM_FILE, formfile[1])))]
- )
+ requests_cmd.files = {formfile[0]: formfile[1]}
self._logger.info(
"Request METHOD: {} URL: {}".format("POST", self._url + endpoint)
)
self._logger.info(
"Request METHOD: {} URL: {}".format("POST", self._url + endpoint)
)
- curl_cmd.perform()
- http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
+ requests_cmd = requests_cmd.prepare()
+ resp = session_cmd.send(requests_cmd, verify=False)
+ http_code = resp.status_code
self._logger.info("Response HTTPCODE: {}".format(http_code))
self._logger.info("Response HTTPCODE: {}".format(http_code))
- curl_cmd.close()
- if data.getvalue():
- self._logger.debug(
- "Response DATA: {}".format(json.loads(data.getvalue().decode()))
- )
- return json.loads(data.getvalue().decode())
+ data = resp.content
+ session_cmd.close()
+ if data:
+ self._logger.debug("Response DATA: {}".format(json.loads(data.decode())))
+ return json.loads(data.decode())
)
self._headers["Accept"] = "application/json"
self._headers["Content-Type"] = "application/yaml"
)
self._headers["Accept"] = "application/json"
self._headers["Content-Type"] = "application/yaml"
- http_header = [
- "{}: {}".format(key, val) for (key, val) in list(self._headers.items())
- ]
- self._http_client.set_http_header(http_header)
+ self._http_client.set_http_header(self._headers)
self.vnfd = vnfd.Vnfd(self._http_client, client=self)
self.nsd = nsd.Nsd(self._http_client, client=self)
self.vnfd = vnfd.Vnfd(self._http_client, client=self)
self.nsd = nsd.Nsd(self._http_client, client=self)
if self._token is not None:
self._headers["Authorization"] = "Bearer {}".format(self._token)
if self._token is not None:
self._headers["Authorization"] = "Bearer {}".format(self._token)
- http_header = [
- "{}: {}".format(key, val)
- for (key, val) in list(self._headers.items())
- ]
- self._http_client.set_http_header(http_header)
+ self._http_client.set_http_header(self._headers)
return token
def get_version(self):
return token
def get_version(self):
# under the License.
import copy
# under the License.
import copy
import json
import logging
import json
import logging
+import requests
+import http.client as http_client
+import urllib3
from osmclient.common import http
from osmclient.common.exceptions import OsmHttpException, NotFound
from osmclient.common import http
from osmclient.common.exceptions import OsmHttpException, NotFound
+
+
+urllib3.disable_warnings()
endpoint = "?".join([endpoint, self._default_query_admin])
return endpoint
endpoint = "?".join([endpoint, self._default_query_admin])
return endpoint
- def _get_curl_cmd(self, endpoint, skip_query_admin=False):
+ def _get_requests_cmd(self, endpoint, skip_query_admin=False):
- curl_cmd = pycurl.Curl()
+ requests_cmd = requests.Request()
if self._logger.getEffectiveLevel() == logging.DEBUG:
if self._logger.getEffectiveLevel() == logging.DEBUG:
- curl_cmd.setopt(pycurl.VERBOSE, True)
+ http_client.HTTPConnection.debuglevel = 1
+ requests_log = logging.getLogger("requests.packages.urllib3")
+ requests_log.setLevel(logging.DEBUG)
+ requests_log.propagate = True
+ else:
+ http_client.HTTPConnection.debuglevel = 0
+ requests_log = logging.getLogger("requests.packages.urllib3")
+ requests_log.setLevel(logging.NOTSET)
+ requests_log.propagate = True
+
if not skip_query_admin:
endpoint = self._complete_endpoint(endpoint)
if not skip_query_admin:
endpoint = self._complete_endpoint(endpoint)
- curl_cmd.setopt(pycurl.CONNECTTIMEOUT, self.CONNECT_TIMEOUT)
- curl_cmd.setopt(pycurl.URL, self._url + endpoint)
- curl_cmd.setopt(pycurl.SSL_VERIFYPEER, 0)
- curl_cmd.setopt(pycurl.SSL_VERIFYHOST, 0)
+ requests_cmd.url = self._url + endpoint
- curl_cmd.setopt(pycurl.HTTPHEADER, self._http_header)
- return curl_cmd
+ requests_cmd.headers = self._http_header
+ return requests_cmd
def delete_cmd(self, endpoint, skip_query_admin=False):
def delete_cmd(self, endpoint, skip_query_admin=False):
+ session_cmd = requests.Session()
- data = BytesIO()
- curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
- curl_cmd.setopt(pycurl.CUSTOMREQUEST, "DELETE")
- curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
+ requests_cmd = self._get_requests_cmd(endpoint, skip_query_admin)
+ requests_cmd.method = "DELETE"
self._logger.info(
"Request METHOD: {} URL: {}".format("DELETE", self._url + endpoint)
)
self._logger.info(
"Request METHOD: {} URL: {}".format("DELETE", self._url + endpoint)
)
- curl_cmd.perform()
- http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
+ requests_cmd = requests_cmd.prepare()
+ resp = session_cmd.send(
+ requests_cmd, verify=False, timeout=self.CONNECT_TIMEOUT
+ )
+ http_code = resp.status_code
self._logger.info("Response HTTPCODE: {}".format(http_code))
self._logger.info("Response HTTPCODE: {}".format(http_code))
+ data = resp.content
+ session_cmd.close()
self.check_http_response(http_code, data)
# TODO 202 accepted should be returned somehow
self.check_http_response(http_code, data)
# TODO 202 accepted should be returned somehow
- if data.getvalue():
- data_text = data.getvalue().decode()
+ if data:
+ data_text = data.decode()
self._logger.verbose("Response DATA: {}".format(data_text))
return http_code, data_text
self._logger.verbose("Response DATA: {}".format(data_text))
return http_code, data_text
- else:
- return http_code, None
patch_method=False,
skip_query_admin=False,
):
patch_method=False,
skip_query_admin=False,
):
+ session_cmd = requests.Session()
- data = BytesIO()
- curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
+ requests_cmd = self._get_requests_cmd(endpoint, skip_query_admin)
- curl_cmd.setopt(pycurl.CUSTOMREQUEST, "PUT")
+ requests_cmd.method = "PUT"
- curl_cmd.setopt(pycurl.CUSTOMREQUEST, "PATCH")
- curl_cmd.setopt(pycurl.POST, 1)
- curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
+ requests_cmd.method = "PATCH"
+ else:
+ requests_cmd.method = "POST"
if postfields_dict is not None:
jsondata = json.dumps(postfields_dict)
if postfields_dict is not None:
jsondata = json.dumps(postfields_dict)
jsondata_log = json.dumps(postfields_dict_copy)
else:
jsondata_log = jsondata
jsondata_log = json.dumps(postfields_dict_copy)
else:
jsondata_log = jsondata
+ requests_cmd.json = postfields_dict
self._logger.verbose("Request POSTFIELDS: {}".format(jsondata_log))
self._logger.verbose("Request POSTFIELDS: {}".format(jsondata_log))
- curl_cmd.setopt(pycurl.POSTFIELDS, jsondata)
elif formfile is not None:
elif formfile is not None:
- curl_cmd.setopt(
- pycurl.HTTPPOST, [((formfile[0], (pycurl.FORM_FILE, formfile[1])))]
- )
+ requests_cmd.files = {formfile[0]: formfile[1]}
elif filename is not None:
with open(filename, "rb") as stream:
postdata = stream.read()
self._logger.verbose("Request POSTFIELDS: Binary content")
elif filename is not None:
with open(filename, "rb") as stream:
postdata = stream.read()
self._logger.verbose("Request POSTFIELDS: Binary content")
- curl_cmd.setopt(pycurl.POSTFIELDS, postdata)
+ requests_cmd.data = postdata
if put_method:
self._logger.info(
if put_method:
self._logger.info(
self._logger.info(
"Request METHOD: {} URL: {}".format("POST", self._url + endpoint)
)
self._logger.info(
"Request METHOD: {} URL: {}".format("POST", self._url + endpoint)
)
- curl_cmd.perform()
- http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
+ requests_cmd = requests_cmd.prepare()
+ resp = session_cmd.send(
+ requests_cmd, verify=False, timeout=self.CONNECT_TIMEOUT
+ )
+ http_code = resp.status_code
self._logger.info("Response HTTPCODE: {}".format(http_code))
self._logger.info("Response HTTPCODE: {}".format(http_code))
self.check_http_response(http_code, data)
self.check_http_response(http_code, data)
- if data.getvalue():
- data_text = data.getvalue().decode()
+ session_cmd.close()
+ if data:
+ data_text = data.decode()
self._logger.verbose("Response DATA: {}".format(data_text))
return http_code, data_text
self._logger.verbose("Response DATA: {}".format(data_text))
return http_code, data_text
- else:
- return http_code, None
)
def get2_cmd(self, endpoint, skip_query_admin=False):
)
def get2_cmd(self, endpoint, skip_query_admin=False):
+ session_cmd = requests.Session()
- data = BytesIO()
- curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
- curl_cmd.setopt(pycurl.HTTPGET, 1)
- curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
+ requests_cmd = self._get_requests_cmd(endpoint, skip_query_admin)
+ requests_cmd.method = "GET"
self._logger.info(
"Request METHOD: {} URL: {}".format("GET", self._url + endpoint)
)
self._logger.info(
"Request METHOD: {} URL: {}".format("GET", self._url + endpoint)
)
- curl_cmd.perform()
- http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
+ requests_cmd = requests_cmd.prepare()
+ resp = session_cmd.send(
+ requests_cmd, verify=False, timeout=self.CONNECT_TIMEOUT
+ )
+ http_code = resp.status_code
self._logger.info("Response HTTPCODE: {}".format(http_code))
self._logger.info("Response HTTPCODE: {}".format(http_code))
+ data = resp.content
+ session_cmd.close()
self.check_http_response(http_code, data)
self.check_http_response(http_code, data)
- if data.getvalue():
- data_text = data.getvalue().decode()
+ if data:
+ data_text = data.decode()
self._logger.verbose("Response DATA: {}".format(data_text))
return http_code, data_text
return http_code, None
self._logger.verbose("Response DATA: {}".format(data_text))
return http_code, data_text
return http_code, None
def check_http_response(self, http_code, data):
if http_code >= 300:
resp = ""
def check_http_response(self, http_code, data):
if http_code >= 300:
resp = ""
- if data.getvalue():
- data_text = data.getvalue().decode()
+ if data:
+ data_text = data.decode()
self._logger.verbose(
"Response {} DATA: {}".format(http_code, data_text)
)
self._logger.verbose(
"Response {} DATA: {}".format(http_code, data_text)
)
)
headers = self._client._headers
headers["Content-Type"] = "application/yaml"
)
headers = self._client._headers
headers["Content-Type"] = "application/yaml"
- http_header = [
- "{}: {}".format(key, val) for (key, val) in list(headers.items())
- ]
- self._http.set_http_header(http_header)
+ self._http.set_http_header(headers)
http_code, resp = self._http.post_cmd(
endpoint=self._apiBase, postfields_dict=ns
)
http_code, resp = self._http.post_cmd(
endpoint=self._apiBase, postfields_dict=ns
)
)
)
headers["Content-File-MD5"] = utils.md5(filename)
)
)
headers["Content-File-MD5"] = utils.md5(filename)
- http_header = [
- "{}: {}".format(key, val) for (key, val) in list(headers.items())
- ]
- self._http.set_http_header(http_header)
+ self._http.set_http_header(headers)
if update_endpoint:
http_code, resp = self._http.put_cmd(
endpoint=update_endpoint, filename=filename
if update_endpoint:
http_code, resp = self._http.put_cmd(
endpoint=update_endpoint, filename=filename
)
headers = self._client._headers
headers["Content-Type"] = "application/yaml"
)
headers = self._client._headers
headers["Content-Type"] = "application/yaml"
- http_header = [
- "{}: {}".format(key, val) for (key, val) in list(headers.items())
- ]
- self._http.set_http_header(http_header)
+ self._http.set_http_header(headers)
http_code, resp = self._http.post_cmd(
endpoint=self._apiBase, postfields_dict=nsi
)
http_code, resp = self._http.post_cmd(
endpoint=self._apiBase, postfields_dict=nsi
)
)
)
headers["Content-File-MD5"] = utils.md5(filename)
)
)
headers["Content-File-MD5"] = utils.md5(filename)
- http_header = [
- "{}: {}".format(key, val) for (key, val) in list(headers.items())
- ]
- self._http.set_http_header(http_header)
+ self._http.set_http_header(headers)
if update_endpoint:
http_code, resp = self._http.put_cmd(
endpoint=update_endpoint, filename=filename
if update_endpoint:
http_code, resp = self._http.put_cmd(
endpoint=update_endpoint, filename=filename
# file_size = stat(filename).st_size
# headers['Content-Range'] = 'bytes 0-{}/{}'.format(file_size - 1, file_size)
headers["Content-File-MD5"] = utils.md5(filename)
# file_size = stat(filename).st_size
# headers['Content-Range'] = 'bytes 0-{}/{}'.format(file_size - 1, file_size)
headers["Content-File-MD5"] = utils.md5(filename)
- http_header = [
- "{}: {}".format(key, val) for (key, val) in list(headers.items())
- ]
- self._http.set_http_header(http_header)
+ self._http.set_http_header(headers)
http_code, resp = self._http.post_cmd(endpoint=endpoint, filename=filename)
# print('HTTP CODE: {}'.format(http_code))
# print('RESP: {}'.format(resp))
http_code, resp = self._http.post_cmd(endpoint=endpoint, filename=filename)
# print('HTTP CODE: {}'.format(http_code))
# print('RESP: {}'.format(resp))
self._client.get_token()
headers = self._client._headers
headers["Content-Type"] = "application/yaml"
self._client.get_token()
headers = self._client._headers
headers["Content-Type"] = "application/yaml"
- http_header = [
- "{}: {}".format(key, val) for (key, val) in list(headers.items())
- ]
- self._http.set_http_header(http_header)
+ self._http.set_http_header(headers)
if update_endpoint:
http_code, resp = self._http.patch_cmd(
endpoint=update_endpoint, postfields_dict=pdu
if update_endpoint:
http_code, resp = self._http.patch_cmd(
endpoint=update_endpoint, postfields_dict=pdu
special_override_string = special_override_string.rstrip(";")
headers["Content-File-MD5"] = utils.md5(filename)
special_override_string = special_override_string.rstrip(";")
headers["Content-File-MD5"] = utils.md5(filename)
- http_header = [
- "{}: {}".format(key, val) for (key, val) in list(headers.items())
- ]
-
- self._http.set_http_header(http_header)
+ self._http.set_http_header(headers)
if update_endpoint:
http_code, resp = self._http.put_cmd(
endpoint=update_endpoint, filename=filename
if update_endpoint:
http_code, resp = self._http.put_cmd(
endpoint=update_endpoint, filename=filename
jinja2
packaging
prettytable
jinja2
packaging
prettytable
python-magic
pyyaml==5.4.1
requests
python-magic
pyyaml==5.4.1
requests
# via requests
jinja2==3.1.2
# via -r requirements.in
# via requests
jinja2==3.1.2
# via -r requirements.in
# via jinja2
packaging==23.1
# via -r requirements.in
# via jinja2
packaging==23.1
# via -r requirements.in