# License for the specific language governing permissions and limitations
# under the License.
-import requests
-import json
import copy
-import logging
+import json
-from time import time
+import requests
from requests.exceptions import ConnectionError
+
class HttpException(Exception):
pass
class ContrailHttp(object):
-
- def __init__(self, auth_info, logger):
+ def __init__(self, auth_info, logger, verify):
self._logger = logger
- # default don't verify client cert
- self._ssl_verify = False
+ # Verify client cert
+ self.ssl_verify = verify
# auth info: must contain auth_url and auth_dict
self.auth_url = auth_info["auth_url"]
self.auth_dict = auth_info["auth_dict"]
def get_cmd(self, url, headers):
self._logger.debug("")
resp = self._request("GET", url, headers)
- return resp.json()
- def post_headers_cmd(self, url, headers, post_fields_dict=None):
- self._logger.debug("")
- # obfuscate password before logging dict
- if post_fields_dict.get('auth', {}).get('identity', {}).get('password', {}).get('user', {}).get('password'):
- post_fields_dict_copy = copy.deepcopy(post_fields_dict)
- post_fields_dict['auth']['identity']['password']['user']['password'] = '******'
- json_data_log = post_fields_dict_copy
- else:
- json_data_log = post_fields_dict
- self._logger.debug("Request POSTFIELDS: {}".format(json.dumps(json_data_log)))
- resp = self._request("POST_HEADERS", url, headers, data=post_fields_dict)
- return resp.text
+ return resp.json()
def post_cmd(self, url, headers, post_fields_dict=None):
self._logger.debug("")
+
# obfuscate password before logging dict
- if post_fields_dict.get('auth', {}).get('identity', {}).get('password', {}).get('user', {}).get('password'):
+ if (
+ post_fields_dict.get("auth", {})
+ .get("identity", {})
+ .get("password", {})
+ .get("user", {})
+ .get("password")
+ ):
post_fields_dict_copy = copy.deepcopy(post_fields_dict)
- post_fields_dict['auth']['identity']['password']['user']['password'] = '******'
+ post_fields_dict["auth"]["identity"]["password"]["user"][
+ "password"
+ ] = "******"
json_data_log = post_fields_dict_copy
else:
json_data_log = post_fields_dict
+
self._logger.debug("Request POSTFIELDS: {}".format(json.dumps(json_data_log)))
resp = self._request("POST", url, headers, data=post_fields_dict)
+
return resp.text
def delete_cmd(self, url, headers):
self._logger.debug("")
resp = self._request("DELETE", url, headers)
- return resp.text
- def _get_token(self, headers):
- if self.auth_url:
- self._logger.debug('Current Token:'.format(self.token))
- auth_url = self.auth_url + 'auth/tokens'
- if self.token is None or self._token_expired():
- if not self.auth_url:
- self.token = ""
- resp = self._request_noauth(url=auth_url, op="POST", headers=headers,
- data=self.auth_dict)
- self.token = resp.headers.get('x-subject-token')
- self.last_token_time = time.time()
- self._logger.debug('Obtained token: '.format(self.token))
-
- return self.token
-
- def _token_expired(self):
- current_time = time.time()
- if self.last_token_time and (current_time - self.last_token_time < self.token_timeout):
- return False
- else:
- return True
+ return resp.text
def _request(self, op, url, http_headers, data=None, retry_auth_error=True):
headers = http_headers.copy()
# Get authorization (include authentication headers)
- # todo - aƱadir token de nuevo
- #token = self._get_token(headers)
+ # TODO add again token
+ # token = self._get_token(headers)
token = None
+
if token:
- headers['X-Auth-Token'] = token
+ headers["X-Auth-Token"] = token
+
try:
return self._request_noauth(op, url, headers, data)
except AuthError:
# If there is an auth error retry just once
if retry_auth_error:
- return self._request(self, op, url, headers, data, retry_auth_error=False)
+ return self._request(op, url, headers, data, retry_auth_error=False)
def _request_noauth(self, op, url, headers, data=None):
# Method to execute http requests with error control
# Execute operation
try:
self._logger.info("Request METHOD: {} URL: {}".format(op, url))
- if (op == "GET"):
+ if op == "GET":
resp = self._http_get(url, headers, query_params=data)
- elif (op == "POST"):
+ elif op == "POST":
resp = self._http_post(url, headers, json_data=data)
- elif (op == "POST_HEADERS"):
+ elif op == "POST_HEADERS":
resp = self._http_post_headers(url, headers, json_data=data)
- elif (op == "DELETE"):
+ elif op == "DELETE":
resp = self._http_delete(url, headers, json_data=data)
else:
raise HttpException("Unsupported operation: {}".format(op))
+
self._logger.info("Response HTTPCODE: {}".format(resp.status_code))
# Check http return code
if status_code == 401:
# Auth Error - set token to None to reload it and raise AuthError
self.token = None
+
raise AuthError("Auth error executing operation")
elif status_code == 409:
- raise DuplicateFound("Duplicate resource url: {}, response: {}".format(url, resp.text))
+ raise DuplicateFound(
+ "Duplicate resource url: {}, response: {}".format(
+ url, resp.text
+ )
+ )
elif status_code == 404:
- raise NotFound("Not found resource url: {}, response: {}".format(url, resp.text))
+ raise NotFound(
+ "Not found resource url: {}, response: {}".format(
+ url, resp.text
+ )
+ )
elif resp.status_code in [502, 503]:
if not self.max_retries or retry >= self.max_retries:
- raise ServiceUnavailableException("Service unavailable error url: {}".format(url))
+ raise ServiceUnavailableException(
+ "Service unavailable error url: {}".format(url)
+ )
continue
else:
- raise HttpException("Error status_code: {}, error_text: {}".format(resp.status_code, resp.text))
+ raise HttpException(
+ "Error status_code: {}, error_text: {}".format(
+ resp.status_code, resp.text
+ )
+ )
except ConnectionError as e:
- self._logger.error("Connection error executing request: {}".format(repr(e)))
+ self._logger.error(
+ "Connection error executing request: {}".format(repr(e))
+ )
+
if not self.max_retries or retry >= self.max_retries:
raise ConnectionError
+
continue
except Exception as e:
self._logger.error("Error executing request: {}".format(repr(e)))
return requests.get(url, headers=headers, params=query_params)
def _http_post_headers(self, url, headers, json_data=None):
- return requests.head(url, json=json_data, headers=headers, verify=False)
+ return requests.head(
+ url, json=json_data, headers=headers, verify=self.ssl_verify
+ )
def _http_post(self, url, headers, json_data=None):
- return requests.post(url, json=json_data, headers=headers, verify=False)
+ return requests.post(
+ url, json=json_data, headers=headers, verify=self.ssl_verify
+ )
def _http_delete(self, url, headers, json_data=None):
return requests.delete(url, json=json_data, headers=headers)
-