Standardize Formatting
[osm/osmclient.git] / osmclient / sol005 / http.py
index 9a7bc6e..4234fc8 100644 (file)
@@ -27,19 +27,19 @@ import pycurl
 class Http(http.Http):
     CONNECT_TIMEOUT = 15
 
-    def __init__(self, url, user='admin', password='admin', **kwargs):
+    def __init__(self, url, user="admin", password="admin", **kwargs):
         self._url = url
         self._user = user
         self._password = password
         self._http_header = None
-        self._logger = logging.getLogger('osmclient')
+        self._logger = logging.getLogger("osmclient")
         self._default_query_admin = None
         self._all_projects = None
         self._public = None
-        if 'all_projects' in kwargs:
-            self._all_projects = kwargs['all_projects']
-        if 'public' in kwargs:
-            self._public = kwargs['public']
+        if "all_projects" in kwargs:
+            self._all_projects = kwargs["all_projects"]
+        if "public" in kwargs:
+            self._public = kwargs["public"]
         self._default_query_admin = self._complete_default_query_admin()
 
     def _complete_default_query_admin(self):
@@ -52,10 +52,10 @@ class Http(http.Http):
 
     def _complete_endpoint(self, endpoint):
         if self._default_query_admin:
-            if '?' in endpoint:
-                endpoint = '&'.join([endpoint, self._default_query_admin])
+            if "?" in endpoint:
+                endpoint = "&".join([endpoint, self._default_query_admin])
             else:
-                endpoint = '?'.join([endpoint, self._default_query_admin])
+                endpoint = "?".join([endpoint, self._default_query_admin])
         return endpoint
 
     def _get_curl_cmd(self, endpoint, skip_query_admin=False):
@@ -79,7 +79,9 @@ class Http(http.Http):
         curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
         curl_cmd.setopt(pycurl.CUSTOMREQUEST, "DELETE")
         curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
-        self._logger.info("Request METHOD: {} URL: {}".format("DELETE", self._url + endpoint))
+        self._logger.info(
+            "Request METHOD: {} URL: {}".format("DELETE", self._url + endpoint)
+        )
         curl_cmd.perform()
         http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
         self._logger.info("Response HTTPCODE: {}".format(http_code))
@@ -93,10 +95,16 @@ class Http(http.Http):
         else:
             return http_code, None
 
-    def send_cmd(self, endpoint='', postfields_dict=None,
-                 formfile=None, filename=None,
-                 put_method=False, patch_method=False,
-                 skip_query_admin=False):
+    def send_cmd(
+        self,
+        endpoint="",
+        postfields_dict=None,
+        formfile=None,
+        filename=None,
+        put_method=False,
+        patch_method=False,
+        skip_query_admin=False,
+    ):
         self._logger.debug("")
         data = BytesIO()
         curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
@@ -109,9 +117,9 @@ class Http(http.Http):
 
         if postfields_dict is not None:
             jsondata = json.dumps(postfields_dict)
-            if 'password' in postfields_dict:
+            if "password" in postfields_dict:
                 postfields_dict_copy = copy.deepcopy(postfields_dict)
-                postfields_dict_copy['password'] = '******'
+                postfields_dict_copy["password"] = "******"
                 jsondata_log = json.dumps(postfields_dict_copy)
             else:
                 jsondata_log = jsondata
@@ -119,22 +127,26 @@ class Http(http.Http):
             curl_cmd.setopt(pycurl.POSTFIELDS, jsondata)
         elif formfile is not None:
             curl_cmd.setopt(
-                pycurl.HTTPPOST,
-                [((formfile[0],
-                           (pycurl.FORM_FILE,
-                            formfile[1])))])
+                pycurl.HTTPPOST, [((formfile[0], (pycurl.FORM_FILE, formfile[1])))]
+            )
         elif filename is not None:
-            with open(filename, 'rb') as stream:
+            with open(filename, "rb") as stream:
                 postdata = stream.read()
             self._logger.verbose("Request POSTFIELDS: Binary content")
             curl_cmd.setopt(pycurl.POSTFIELDS, postdata)
 
         if put_method:
-            self._logger.info("Request METHOD: {} URL: {}".format("PUT", self._url + endpoint))
+            self._logger.info(
+                "Request METHOD: {} URL: {}".format("PUT", self._url + endpoint)
+            )
         elif patch_method:
-            self._logger.info("Request METHOD: {} URL: {}".format("PATCH", self._url + endpoint))
+            self._logger.info(
+                "Request METHOD: {} URL: {}".format("PATCH", self._url + endpoint)
+            )
         else:
-            self._logger.info("Request METHOD: {} URL: {}".format("POST", self._url + endpoint))
+            self._logger.info(
+                "Request METHOD: {} URL: {}".format("POST", self._url + endpoint)
+            )
         curl_cmd.perform()
         http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
         self._logger.info("Response HTTPCODE: {}".format(http_code))
@@ -147,35 +159,62 @@ class Http(http.Http):
         else:
             return http_code, None
 
-    def post_cmd(self, endpoint='', postfields_dict=None,
-                 formfile=None, filename=None,
-                 skip_query_admin=False):
+    def post_cmd(
+        self,
+        endpoint="",
+        postfields_dict=None,
+        formfile=None,
+        filename=None,
+        skip_query_admin=False,
+    ):
         self._logger.debug("")
-        return self.send_cmd(endpoint=endpoint,
-                             postfields_dict=postfields_dict,
-                             formfile=formfile, filename=filename,
-                             put_method=False, patch_method=False,
-                             skip_query_admin=skip_query_admin)
-
-    def put_cmd(self, endpoint='', postfields_dict=None,
-                formfile=None, filename=None,
-                skip_query_admin=False):
+        return self.send_cmd(
+            endpoint=endpoint,
+            postfields_dict=postfields_dict,
+            formfile=formfile,
+            filename=filename,
+            put_method=False,
+            patch_method=False,
+            skip_query_admin=skip_query_admin,
+        )
+
+    def put_cmd(
+        self,
+        endpoint="",
+        postfields_dict=None,
+        formfile=None,
+        filename=None,
+        skip_query_admin=False,
+    ):
         self._logger.debug("")
-        return self.send_cmd(endpoint=endpoint,
-                             postfields_dict=postfields_dict,
-                             formfile=formfile, filename=filename,
-                             put_method=True, patch_method=False,
-                             skip_query_admin=skip_query_admin)
-
-    def patch_cmd(self, endpoint='', postfields_dict=None,
-                  formfile=None, filename=None,
-                  skip_query_admin=False):
+        return self.send_cmd(
+            endpoint=endpoint,
+            postfields_dict=postfields_dict,
+            formfile=formfile,
+            filename=filename,
+            put_method=True,
+            patch_method=False,
+            skip_query_admin=skip_query_admin,
+        )
+
+    def patch_cmd(
+        self,
+        endpoint="",
+        postfields_dict=None,
+        formfile=None,
+        filename=None,
+        skip_query_admin=False,
+    ):
         self._logger.debug("")
-        return self.send_cmd(endpoint=endpoint,
-                             postfields_dict=postfields_dict,
-                             formfile=formfile, filename=filename,
-                             put_method=False, patch_method=True,
-                             skip_query_admin=skip_query_admin)
+        return self.send_cmd(
+            endpoint=endpoint,
+            postfields_dict=postfields_dict,
+            formfile=formfile,
+            filename=filename,
+            put_method=False,
+            patch_method=True,
+            skip_query_admin=skip_query_admin,
+        )
 
     def get2_cmd(self, endpoint, skip_query_admin=False):
         self._logger.debug("")
@@ -183,7 +222,9 @@ class Http(http.Http):
         curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
         curl_cmd.setopt(pycurl.HTTPGET, 1)
         curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
-        self._logger.info("Request METHOD: {} URL: {}".format("GET", self._url + endpoint))
+        self._logger.info(
+            "Request METHOD: {} URL: {}".format("GET", self._url + endpoint)
+        )
         curl_cmd.perform()
         http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
         self._logger.info("Response HTTPCODE: {}".format(http_code))
@@ -200,7 +241,9 @@ class Http(http.Http):
             resp = ""
             if data.getvalue():
                 data_text = data.getvalue().decode()
-                self._logger.verbose("Response {} DATA: {}".format(http_code, data_text))
+                self._logger.verbose(
+                    "Response {} DATA: {}".format(http_code, data_text)
+                )
                 resp = ": " + data_text
             else:
                 self._logger.verbose("Response {}".format(http_code))
@@ -209,8 +252,8 @@ class Http(http.Http):
             raise OsmHttpException("Error {}{}".format(http_code, resp))
 
     def set_query_admin(self, **kwargs):
-        if 'all_projects' in kwargs:
-            self._all_projects = kwargs['all_projects']
-        if 'public' in kwargs:
-            self._public = kwargs['public']
+        if "all_projects" in kwargs:
+            self._all_projects = kwargs["all_projects"]
+        if "public" in kwargs:
+            self._public = kwargs["public"]
         self._default_query_admin = self._complete_default_query_admin()