# License for the specific language governing permissions and limitations
# under the License.
++import copy
from io import BytesIO
--import pycurl
import json
import logging
--import copy
++
from osmclient.common import http
from osmclient.common.exceptions import OsmHttpException, NotFound
++import pycurl
class Http(http.Http):
self._http_header = None
self._logger = logging.getLogger('osmclient')
self._default_query_admin = None
-- self._all_projects = None;
-- self._public = None;
++ self._all_projects = None
++ self._public = None
if 'all_projects' in kwargs:
-- self._all_projects=kwargs['all_projects']
++ self._all_projects = kwargs['all_projects']
if 'public' in kwargs:
-- self._public=kwargs['public']
++ self._public = kwargs['public']
self._default_query_admin = self._complete_default_query_admin()
def _complete_default_query_admin(self):
-- query_string_list=[]
++ query_string_list = []
if self._all_projects:
query_string_list.append("ADMIN")
if self._public is not None:
curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
curl_cmd.setopt(pycurl.CUSTOMREQUEST, "DELETE")
curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
-- self._logger.info("Request METHOD: {} URL: {}".format("DELETE",self._url + endpoint))
++ self._logger.info("Request METHOD: {} URL: {}".format("DELETE", self._url + endpoint))
curl_cmd.perform()
http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
self._logger.info("Response HTTPCODE: {}".format(http_code))
jsondata = json.dumps(postfields_dict)
if 'password' in postfields_dict:
postfields_dict_copy = copy.deepcopy(postfields_dict)
-- postfields_dict_copy['password']='******'
++ postfields_dict_copy['password'] = '******'
jsondata_log = json.dumps(postfields_dict_copy)
else:
jsondata_log = jsondata
formfile[1])))])
elif filename is not None:
with open(filename, 'rb') as stream:
-- postdata=stream.read()
++ postdata = stream.read()
self._logger.verbose("Request POSTFIELDS: Binary content")
curl_cmd.setopt(pycurl.POSTFIELDS, postdata)
if put_method:
-- self._logger.info("Request METHOD: {} URL: {}".format("PUT",self._url + endpoint))
++ self._logger.info("Request METHOD: {} URL: {}".format("PUT", self._url + endpoint))
elif patch_method:
-- self._logger.info("Request METHOD: {} URL: {}".format("PATCH",self._url + endpoint))
++ self._logger.info("Request METHOD: {} URL: {}".format("PATCH", self._url + endpoint))
else:
-- self._logger.info("Request METHOD: {} URL: {}".format("POST",self._url + endpoint))
++ self._logger.info("Request METHOD: {} URL: {}".format("POST", self._url + endpoint))
curl_cmd.perform()
http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
self._logger.info("Response HTTPCODE: {}".format(http_code))
skip_query_admin=skip_query_admin)
def patch_cmd(self, endpoint='', postfields_dict=None,
-- formfile=None, filename=None,
-- skip_query_admin=False):
++ formfile=None, filename=None,
++ skip_query_admin=False):
self._logger.debug("")
return self.send_cmd(endpoint=endpoint,
postfields_dict=postfields_dict,
curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
curl_cmd.setopt(pycurl.HTTPGET, 1)
curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
-- self._logger.info("Request METHOD: {} URL: {}".format("GET",self._url + endpoint))
++ self._logger.info("Request METHOD: {} URL: {}".format("GET", self._url + endpoint))
curl_cmd.perform()
http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
self._logger.info("Response HTTPCODE: {}".format(http_code))
def set_query_admin(self, **kwargs):
if 'all_projects' in kwargs:
-- self._all_projects=kwargs['all_projects']
++ self._all_projects = kwargs['all_projects']
if 'public' in kwargs:
-- self._public=kwargs['public']
++ self._public = kwargs['public']
self._default_query_admin = self._complete_default_query_admin()