import copy
import json
-from time import time
import requests
from requests.exceptions import ConnectionError
return resp.json()
- def post_headers_cmd(self, url, headers, post_fields_dict=None):
- self._logger.debug("")
-
- # obfuscate password before logging dict
- if (
- post_fields_dict.get("auth", {})
- .get("identity", {})
- .get("password", {})
- .get("user", {})
- .get("password")
- ):
- post_fields_dict_copy = copy.deepcopy(post_fields_dict)
- post_fields_dict["auth"]["identity"]["password"]["user"][
- "password"
- ] = "******"
- json_data_log = post_fields_dict_copy
- else:
- json_data_log = post_fields_dict
-
- self._logger.debug("Request POSTFIELDS: {}".format(json.dumps(json_data_log)))
- resp = self._request("POST_HEADERS", url, headers, data=post_fields_dict)
-
- return resp.text
-
def post_cmd(self, url, headers, post_fields_dict=None):
self._logger.debug("")
return resp.text
- def _get_token(self, headers):
- if self.auth_url:
- self._logger.debug("Current Token: {}".format(self.token))
- auth_url = self.auth_url + "auth/tokens"
-
- if self.token is None or self._token_expired():
- if not self.auth_url:
- self.token = ""
-
- resp = self._request_noauth(
- url=auth_url, op="POST", headers=headers, data=self.auth_dict
- )
- self.token = resp.headers.get("x-subject-token")
- self.last_token_time = time.time()
- self._logger.debug("Obtained token: {}".format(self.token))
-
- return self.token
-
- def _token_expired(self):
- current_time = time.time()
-
- if self.last_token_time and (
- current_time - self.last_token_time < self.token_timeout
- ):
- return False
- else:
- return True
-
def _request(self, op, url, http_headers, data=None, retry_auth_error=True):
headers = http_headers.copy()