# License for the specific language governing permissions and limitations
# under the License.
-from io import BytesIO
-import pycurl
+import requests
+import urllib3
+from requests.auth import HTTPBasicAuth
import json
+import logging
+urllib3.disable_warnings()
-class Http(object):
- def __init__(self,url,user='admin',password='admin'):
+class Http(object):
+ def __init__(self, url, user="admin", password="admin"):
self._url = url
self._user = user
self._password = password
self._http_header = None
+ self._logger = logging.getLogger("osmclient")
- def set_http_header(self,header):
+ def set_http_header(self, header):
self._http_header = header
- def _get_curl_cmd(self,endpoint):
- curl_cmd = pycurl.Curl()
- curl_cmd.setopt(pycurl.URL, self._url + endpoint )
- curl_cmd.setopt(pycurl.SSL_VERIFYPEER,0)
- curl_cmd.setopt(pycurl.SSL_VERIFYHOST,0)
- curl_cmd.setopt(pycurl.USERPWD, '{}:{}'.format(self._user,self._password))
+ def _get_requests_cmd(self, endpoint):
+ requests_cmd = requests.Request()
+ requests_cmd.url = self._url + endpoint
+ requests_cmd.auth = HTTPBasicAuth(self._user, self._password)
if self._http_header:
- curl_cmd.setopt(pycurl.HTTPHEADER, self._http_header)
- return curl_cmd
-
- def get_cmd( self, endpoint ):
+ requests_cmd.headers = self._http_header
+ return requests_cmd
- data = BytesIO()
- curl_cmd=self._get_curl_cmd(endpoint)
- curl_cmd.setopt(pycurl.HTTPGET,1)
- curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
- curl_cmd.perform()
- curl_cmd.close()
- if data.getvalue():
- return json.loads(data.getvalue().decode())
+ def get_cmd(self, endpoint):
+ session_cmd = requests.Session()
+ requests_cmd = self._get_requests_cmd(endpoint)
+ requests_cmd.method = "GET"
+ self._logger.info(
+ "Request METHOD: {} URL: {}".format("GET", self._url + endpoint)
+ )
+ requests_cmd = requests_cmd.prepare()
+ resp = session_cmd.send(requests_cmd, verify=False)
+ http_code = resp.status_code
+ self._logger.info("Response HTTPCODE: {}".format(http_code))
+ data = resp.content
+ session_cmd.close()
+ if data:
+ self._logger.debug("Response DATA: {}".format(json.loads(data.decode())))
+ return json.loads(data.decode())
return None
- def delete_cmd( self, endpoint ):
- data = BytesIO()
- curl_cmd=self._get_curl_cmd(endpoint)
- curl_cmd.setopt(pycurl.CUSTOMREQUEST, "DELETE")
- curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
- curl_cmd.perform()
- curl_cmd.close()
- if data.getvalue():
- return json.loads(data.getvalue().decode())
+ def delete_cmd(self, endpoint):
+ session_cmd = requests.Session()
+ requests_cmd = self._get_requests_cmd(endpoint)
+ requests_cmd.method = "DELETE"
+ self._logger.info(
+ "Request METHOD: {} URL: {}".format("DELETE", self._url + endpoint)
+ )
+ requests_cmd = requests_cmd.prepare()
+ resp = session_cmd.send(requests_cmd, verify=False)
+ http_code = resp.status_code
+ self._logger.info("Response HTTPCODE: {}".format(http_code))
+ data = resp.content
+ session_cmd.close()
+ if data:
+ self._logger.debug("Response DATA: {}".format(json.loads(data.decode())))
+ return json.loads(data.decode())
return None
- def post_cmd( self, endpoint='', postfields_dict=None, formfile=None, ):
- data = BytesIO()
- curl_cmd=self._get_curl_cmd(endpoint)
- curl_cmd.setopt(pycurl.POST,1)
- curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
+ def post_cmd(
+ self,
+ endpoint="",
+ postfields_dict=None,
+ formfile=None,
+ ):
+ session_cmd = requests.Session()
+ requests_cmd = self._get_requests_cmd(endpoint)
+ requests_cmd.method = "POST"
if postfields_dict is not None:
- jsondata=json.dumps(postfields_dict)
- curl_cmd.setopt(pycurl.POSTFIELDS,jsondata)
+ requests_cmd.json = json.dumps(postfields_dict)
if formfile is not None:
- curl_cmd.setopt(pycurl.HTTPPOST,[((formfile[0],(pycurl.FORM_FILE,formfile[1])))])
+ requests_cmd.files = {formfile[0]: formfile[1]}
- curl_cmd.perform()
- curl_cmd.close()
- if data.getvalue():
- return json.loads(data.getvalue().decode())
+ self._logger.info(
+ "Request METHOD: {} URL: {}".format("POST", self._url + endpoint)
+ )
+ requests_cmd = requests_cmd.prepare()
+ resp = session_cmd.send(requests_cmd, verify=False)
+ http_code = resp.status_code
+ self._logger.info("Response HTTPCODE: {}".format(http_code))
+ data = resp.content
+ session_cmd.close()
+ if data:
+ self._logger.debug("Response DATA: {}".format(json.loads(data.decode())))
+ return json.loads(data.decode())
return None