import copy
import json
+from time import time
import requests
from requests.exceptions import ConnectionError
return resp.json()
+ def post_headers_cmd(self, url, headers, post_fields_dict=None):
+ self._logger.debug("")
+
+ # obfuscate password before logging dict
+ if (
+ post_fields_dict.get("auth", {})
+ .get("identity", {})
+ .get("password", {})
+ .get("user", {})
+ .get("password")
+ ):
+ post_fields_dict_copy = copy.deepcopy(post_fields_dict)
+ post_fields_dict["auth"]["identity"]["password"]["user"][
+ "password"
+ ] = "******"
+ json_data_log = post_fields_dict_copy
+ else:
+ json_data_log = post_fields_dict
+
+ self._logger.debug("Request POSTFIELDS: {}".format(json.dumps(json_data_log)))
+ resp = self._request("POST_HEADERS", url, headers, data=post_fields_dict)
+
+ return resp.text
+
def post_cmd(self, url, headers, post_fields_dict=None):
self._logger.debug("")
return resp.text
+ def _get_token(self, headers):
+ if self.auth_url:
+ self._logger.debug("Current Token: {}".format(self.token))
+ auth_url = self.auth_url + "auth/tokens"
+
+ if self.token is None or self._token_expired():
+ if not self.auth_url:
+ self.token = ""
+
+ resp = self._request_noauth(
+ url=auth_url, op="POST", headers=headers, data=self.auth_dict
+ )
+ self.token = resp.headers.get("x-subject-token")
+ self.last_token_time = time.time()
+ self._logger.debug("Obtained token: {}".format(self.token))
+
+ return self.token
+
+ def _token_expired(self):
+ current_time = time.time()
+
+ if self.last_token_time and (
+ current_time - self.last_token_time < self.token_timeout
+ ):
+ return False
+ else:
+ return True
+
def _request(self, op, url, http_headers, data=None, retry_auth_error=True):
headers = http_headers.copy()