class Http(http.Http):
CONNECT_TIMEOUT = 15
- def __init__(self, url, user='admin', password='admin', **kwargs):
+ def __init__(self, url, user="admin", password="admin", **kwargs):
self._url = url
self._user = user
self._password = password
self._http_header = None
- self._logger = logging.getLogger('osmclient')
+ self._logger = logging.getLogger("osmclient")
self._default_query_admin = None
self._all_projects = None
self._public = None
- if 'all_projects' in kwargs:
- self._all_projects = kwargs['all_projects']
- if 'public' in kwargs:
- self._public = kwargs['public']
+ if "all_projects" in kwargs:
+ self._all_projects = kwargs["all_projects"]
+ if "public" in kwargs:
+ self._public = kwargs["public"]
self._default_query_admin = self._complete_default_query_admin()
def _complete_default_query_admin(self):
def _complete_endpoint(self, endpoint):
if self._default_query_admin:
- if '?' in endpoint:
- endpoint = '&'.join([endpoint, self._default_query_admin])
+ if "?" in endpoint:
+ endpoint = "&".join([endpoint, self._default_query_admin])
else:
- endpoint = '?'.join([endpoint, self._default_query_admin])
+ endpoint = "?".join([endpoint, self._default_query_admin])
return endpoint
def _get_curl_cmd(self, endpoint, skip_query_admin=False):
curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
curl_cmd.setopt(pycurl.CUSTOMREQUEST, "DELETE")
curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
- self._logger.info("Request METHOD: {} URL: {}".format("DELETE", self._url + endpoint))
+ self._logger.info(
+ "Request METHOD: {} URL: {}".format("DELETE", self._url + endpoint)
+ )
curl_cmd.perform()
http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
self._logger.info("Response HTTPCODE: {}".format(http_code))
else:
return http_code, None
- def send_cmd(self, endpoint='', postfields_dict=None,
- formfile=None, filename=None,
- put_method=False, patch_method=False,
- skip_query_admin=False):
+ def send_cmd(
+ self,
+ endpoint="",
+ postfields_dict=None,
+ formfile=None,
+ filename=None,
+ put_method=False,
+ patch_method=False,
+ skip_query_admin=False,
+ ):
self._logger.debug("")
data = BytesIO()
curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
if postfields_dict is not None:
jsondata = json.dumps(postfields_dict)
- if 'password' in postfields_dict:
+ if "password" in postfields_dict:
postfields_dict_copy = copy.deepcopy(postfields_dict)
- postfields_dict_copy['password'] = '******'
+ postfields_dict_copy["password"] = "******"
jsondata_log = json.dumps(postfields_dict_copy)
else:
jsondata_log = jsondata
curl_cmd.setopt(pycurl.POSTFIELDS, jsondata)
elif formfile is not None:
curl_cmd.setopt(
- pycurl.HTTPPOST,
- [((formfile[0],
- (pycurl.FORM_FILE,
- formfile[1])))])
+ pycurl.HTTPPOST, [((formfile[0], (pycurl.FORM_FILE, formfile[1])))]
+ )
elif filename is not None:
- with open(filename, 'rb') as stream:
+ with open(filename, "rb") as stream:
postdata = stream.read()
self._logger.verbose("Request POSTFIELDS: Binary content")
curl_cmd.setopt(pycurl.POSTFIELDS, postdata)
if put_method:
- self._logger.info("Request METHOD: {} URL: {}".format("PUT", self._url + endpoint))
+ self._logger.info(
+ "Request METHOD: {} URL: {}".format("PUT", self._url + endpoint)
+ )
elif patch_method:
- self._logger.info("Request METHOD: {} URL: {}".format("PATCH", self._url + endpoint))
+ self._logger.info(
+ "Request METHOD: {} URL: {}".format("PATCH", self._url + endpoint)
+ )
else:
- self._logger.info("Request METHOD: {} URL: {}".format("POST", self._url + endpoint))
+ self._logger.info(
+ "Request METHOD: {} URL: {}".format("POST", self._url + endpoint)
+ )
curl_cmd.perform()
http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
self._logger.info("Response HTTPCODE: {}".format(http_code))
else:
return http_code, None
- def post_cmd(self, endpoint='', postfields_dict=None,
- formfile=None, filename=None,
- skip_query_admin=False):
+ def post_cmd(
+ self,
+ endpoint="",
+ postfields_dict=None,
+ formfile=None,
+ filename=None,
+ skip_query_admin=False,
+ ):
self._logger.debug("")
- return self.send_cmd(endpoint=endpoint,
- postfields_dict=postfields_dict,
- formfile=formfile, filename=filename,
- put_method=False, patch_method=False,
- skip_query_admin=skip_query_admin)
-
- def put_cmd(self, endpoint='', postfields_dict=None,
- formfile=None, filename=None,
- skip_query_admin=False):
+ return self.send_cmd(
+ endpoint=endpoint,
+ postfields_dict=postfields_dict,
+ formfile=formfile,
+ filename=filename,
+ put_method=False,
+ patch_method=False,
+ skip_query_admin=skip_query_admin,
+ )
+
+ def put_cmd(
+ self,
+ endpoint="",
+ postfields_dict=None,
+ formfile=None,
+ filename=None,
+ skip_query_admin=False,
+ ):
self._logger.debug("")
- return self.send_cmd(endpoint=endpoint,
- postfields_dict=postfields_dict,
- formfile=formfile, filename=filename,
- put_method=True, patch_method=False,
- skip_query_admin=skip_query_admin)
-
- def patch_cmd(self, endpoint='', postfields_dict=None,
- formfile=None, filename=None,
- skip_query_admin=False):
+ return self.send_cmd(
+ endpoint=endpoint,
+ postfields_dict=postfields_dict,
+ formfile=formfile,
+ filename=filename,
+ put_method=True,
+ patch_method=False,
+ skip_query_admin=skip_query_admin,
+ )
+
+ def patch_cmd(
+ self,
+ endpoint="",
+ postfields_dict=None,
+ formfile=None,
+ filename=None,
+ skip_query_admin=False,
+ ):
self._logger.debug("")
- return self.send_cmd(endpoint=endpoint,
- postfields_dict=postfields_dict,
- formfile=formfile, filename=filename,
- put_method=False, patch_method=True,
- skip_query_admin=skip_query_admin)
+ return self.send_cmd(
+ endpoint=endpoint,
+ postfields_dict=postfields_dict,
+ formfile=formfile,
+ filename=filename,
+ put_method=False,
+ patch_method=True,
+ skip_query_admin=skip_query_admin,
+ )
def get2_cmd(self, endpoint, skip_query_admin=False):
self._logger.debug("")
curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
curl_cmd.setopt(pycurl.HTTPGET, 1)
curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
- self._logger.info("Request METHOD: {} URL: {}".format("GET", self._url + endpoint))
+ self._logger.info(
+ "Request METHOD: {} URL: {}".format("GET", self._url + endpoint)
+ )
curl_cmd.perform()
http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
self._logger.info("Response HTTPCODE: {}".format(http_code))
resp = ""
if data.getvalue():
data_text = data.getvalue().decode()
- self._logger.verbose("Response {} DATA: {}".format(http_code, data_text))
+ self._logger.verbose(
+ "Response {} DATA: {}".format(http_code, data_text)
+ )
resp = ": " + data_text
else:
self._logger.verbose("Response {}".format(http_code))
raise OsmHttpException("Error {}{}".format(http_code, resp))
def set_query_admin(self, **kwargs):
- if 'all_projects' in kwargs:
- self._all_projects = kwargs['all_projects']
- if 'public' in kwargs:
- self._public = kwargs['public']
+ if "all_projects" in kwargs:
+ self._all_projects = kwargs["all_projects"]
+ if "public" in kwargs:
+ self._public = kwargs["public"]
self._default_query_admin = self._complete_default_query_admin()