class ContrailHttp(object):
-
def __init__(self, auth_info, logger):
self._logger = logger
# default don't verify client cert
def get_cmd(self, url, headers):
self._logger.debug("")
resp = self._request("GET", url, headers)
+
return resp.json()
def post_headers_cmd(self, url, headers, post_fields_dict=None):
self._logger.debug("")
+
# obfuscate password before logging dict
- if post_fields_dict.get('auth', {}).get('identity', {}).get('password', {}).get('user', {}).get('password'):
+ if (
+ post_fields_dict.get("auth", {})
+ .get("identity", {})
+ .get("password", {})
+ .get("user", {})
+ .get("password")
+ ):
post_fields_dict_copy = copy.deepcopy(post_fields_dict)
- post_fields_dict['auth']['identity']['password']['user']['password'] = '******'
+ post_fields_dict["auth"]["identity"]["password"]["user"][
+ "password"
+ ] = "******"
json_data_log = post_fields_dict_copy
else:
json_data_log = post_fields_dict
+
self._logger.debug("Request POSTFIELDS: {}".format(json.dumps(json_data_log)))
resp = self._request("POST_HEADERS", url, headers, data=post_fields_dict)
+
return resp.text
def post_cmd(self, url, headers, post_fields_dict=None):
self._logger.debug("")
+
# obfuscate password before logging dict
- if post_fields_dict.get('auth', {}).get('identity', {}).get('password', {}).get('user', {}).get('password'):
+ if (
+ post_fields_dict.get("auth", {})
+ .get("identity", {})
+ .get("password", {})
+ .get("user", {})
+ .get("password")
+ ):
post_fields_dict_copy = copy.deepcopy(post_fields_dict)
- post_fields_dict['auth']['identity']['password']['user']['password'] = '******'
+ post_fields_dict["auth"]["identity"]["password"]["user"][
+ "password"
+ ] = "******"
json_data_log = post_fields_dict_copy
else:
json_data_log = post_fields_dict
+
self._logger.debug("Request POSTFIELDS: {}".format(json.dumps(json_data_log)))
resp = self._request("POST", url, headers, data=post_fields_dict)
+
return resp.text
def delete_cmd(self, url, headers):
self._logger.debug("")
resp = self._request("DELETE", url, headers)
+
return resp.text
def _get_token(self, headers):
if self.auth_url:
- self._logger.debug('Current Token: {}'.format(self.token))
- auth_url = self.auth_url + 'auth/tokens'
+ self._logger.debug("Current Token: {}".format(self.token))
+ auth_url = self.auth_url + "auth/tokens"
+
if self.token is None or self._token_expired():
if not self.auth_url:
self.token = ""
- resp = self._request_noauth(url=auth_url, op="POST", headers=headers,
- data=self.auth_dict)
- self.token = resp.headers.get('x-subject-token')
+
+ resp = self._request_noauth(
+ url=auth_url, op="POST", headers=headers, data=self.auth_dict
+ )
+ self.token = resp.headers.get("x-subject-token")
self.last_token_time = time.time()
- self._logger.debug('Obtained token: {}'.format(self.token))
+ self._logger.debug("Obtained token: {}".format(self.token))
return self.token
def _token_expired(self):
current_time = time.time()
- if self.last_token_time and (current_time - self.last_token_time < self.token_timeout):
+
+ if self.last_token_time and (
+ current_time - self.last_token_time < self.token_timeout
+ ):
return False
else:
return True
# TODO add again token
# token = self._get_token(headers)
token = None
+
if token:
- headers['X-Auth-Token'] = token
+ headers["X-Auth-Token"] = token
+
try:
return self._request_noauth(op, url, headers, data)
except AuthError:
# If there is an auth error retry just once
if retry_auth_error:
- return self._request(self, op, url, headers, data, retry_auth_error=False)
+ return self._request(
+ self, op, url, headers, data, retry_auth_error=False
+ )
def _request_noauth(self, op, url, headers, data=None):
# Method to execute http requests with error control
resp = self._http_delete(url, headers, json_data=data)
else:
raise HttpException("Unsupported operation: {}".format(op))
+
self._logger.info("Response HTTPCODE: {}".format(resp.status_code))
# Check http return code
if status_code == 401:
# Auth Error - set token to None to reload it and raise AuthError
self.token = None
+
raise AuthError("Auth error executing operation")
elif status_code == 409:
- raise DuplicateFound("Duplicate resource url: {}, response: {}".format(url, resp.text))
+ raise DuplicateFound(
+ "Duplicate resource url: {}, response: {}".format(
+ url, resp.text
+ )
+ )
elif status_code == 404:
- raise NotFound("Not found resource url: {}, response: {}".format(url, resp.text))
+ raise NotFound(
+ "Not found resource url: {}, response: {}".format(
+ url, resp.text
+ )
+ )
elif resp.status_code in [502, 503]:
if not self.max_retries or retry >= self.max_retries:
- raise ServiceUnavailableException("Service unavailable error url: {}".format(url))
+ raise ServiceUnavailableException(
+ "Service unavailable error url: {}".format(url)
+ )
continue
else:
- raise HttpException("Error status_code: {}, error_text: {}".format(resp.status_code, resp.text))
+ raise HttpException(
+ "Error status_code: {}, error_text: {}".format(
+ resp.status_code, resp.text
+ )
+ )
except ConnectionError as e:
- self._logger.error("Connection error executing request: {}".format(repr(e)))
+ self._logger.error(
+ "Connection error executing request: {}".format(repr(e))
+ )
+
if not self.max_retries or retry >= self.max_retries:
raise ConnectionError
+
continue
except Exception as e:
self._logger.error("Error executing request: {}".format(repr(e)))