# License for the specific language governing permissions and limitations
# under the License.
-import requests
-import json
import copy
+import json
-from time import time
+import requests
from requests.exceptions import ConnectionError
class ContrailHttp(object):
- def __init__(self, auth_info, logger):
+ def __init__(self, auth_info, logger, verify):
self._logger = logger
- # default don't verify client cert
- self._ssl_verify = False
+ # Verify client cert
+ self.ssl_verify = verify
# auth info: must contain auth_url and auth_dict
self.auth_url = auth_info["auth_url"]
self.auth_dict = auth_info["auth_dict"]
return resp.json()
- def post_headers_cmd(self, url, headers, post_fields_dict=None):
- self._logger.debug("")
-
- # obfuscate password before logging dict
- if (
- post_fields_dict.get("auth", {})
- .get("identity", {})
- .get("password", {})
- .get("user", {})
- .get("password")
- ):
- post_fields_dict_copy = copy.deepcopy(post_fields_dict)
- post_fields_dict["auth"]["identity"]["password"]["user"][
- "password"
- ] = "******"
- json_data_log = post_fields_dict_copy
- else:
- json_data_log = post_fields_dict
-
- self._logger.debug("Request POSTFIELDS: {}".format(json.dumps(json_data_log)))
- resp = self._request("POST_HEADERS", url, headers, data=post_fields_dict)
-
- return resp.text
-
def post_cmd(self, url, headers, post_fields_dict=None):
self._logger.debug("")
return resp.text
- def _get_token(self, headers):
- if self.auth_url:
- self._logger.debug("Current Token: {}".format(self.token))
- auth_url = self.auth_url + "auth/tokens"
-
- if self.token is None or self._token_expired():
- if not self.auth_url:
- self.token = ""
-
- resp = self._request_noauth(
- url=auth_url, op="POST", headers=headers, data=self.auth_dict
- )
- self.token = resp.headers.get("x-subject-token")
- self.last_token_time = time.time()
- self._logger.debug("Obtained token: {}".format(self.token))
-
- return self.token
-
- def _token_expired(self):
- current_time = time.time()
-
- if self.last_token_time and (
- current_time - self.last_token_time < self.token_timeout
- ):
- return False
- else:
- return True
-
def _request(self, op, url, http_headers, data=None, retry_auth_error=True):
headers = http_headers.copy()
except AuthError:
# If there is an auth error retry just once
if retry_auth_error:
- return self._request(
- self, op, url, headers, data, retry_auth_error=False
- )
+ return self._request(op, url, headers, data, retry_auth_error=False)
def _request_noauth(self, op, url, headers, data=None):
# Method to execute http requests with error control
return requests.get(url, headers=headers, params=query_params)
def _http_post_headers(self, url, headers, json_data=None):
- return requests.head(url, json=json_data, headers=headers, verify=False)
+ return requests.head(
+ url, json=json_data, headers=headers, verify=self.ssl_verify
+ )
def _http_post(self, url, headers, json_data=None):
- return requests.post(url, json=json_data, headers=headers, verify=False)
+ return requests.post(
+ url, json=json_data, headers=headers, verify=self.ssl_verify
+ )
def _http_delete(self, url, headers, json_data=None):
return requests.delete(url, json=json_data, headers=headers)