1 # Copyright 2018 Telefonica
5 # Licensed under the Apache License, Version 2.0 (the "License"); you may
6 # not use this file except in compliance with the License. You may obtain
7 # a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 # License for the specific language governing permissions and limitations
17 from io
import BytesIO
22 from osmclient
.common
import http
23 from osmclient
.common
.exceptions
import OsmHttpException
, NotFound
26 class Http(http
.Http
):
29 def __init__(self
, url
, user
='admin', password
='admin', **kwargs
):
32 self
._password
= password
33 self
._http
_header
= None
34 self
._logger
= logging
.getLogger('osmclient')
35 self
._default
_query
_admin
= None
36 self
._all
_projects
= None;
38 if 'all_projects' in kwargs
:
39 self
._all
_projects
=kwargs
['all_projects']
40 if 'public' in kwargs
:
41 self
._public
=kwargs
['public']
42 self
._default
_query
_admin
= self
._complete
_default
_query
_admin
()
44 def _complete_default_query_admin(self
):
46 if self
._all
_projects
:
47 query_string_list
.append("ADMIN")
48 if self
._public
is not None:
49 query_string_list
.append("PUBLIC={}".format(self
._public
))
50 return "&".join(query_string_list
)
52 def _complete_endpoint(self
, endpoint
):
53 if self
._default
_query
_admin
:
55 endpoint
= '&'.join([endpoint
, self
._default
_query
_admin
])
57 endpoint
= '?'.join([endpoint
, self
._default
_query
_admin
])
60 def _get_curl_cmd(self
, endpoint
, skip_query_admin
=False):
61 self
._logger
.debug("")
62 curl_cmd
= pycurl
.Curl()
63 if self
._logger
.getEffectiveLevel() == logging
.DEBUG
:
64 curl_cmd
.setopt(pycurl
.VERBOSE
, True)
65 if not skip_query_admin
:
66 endpoint
= self
._complete
_endpoint
(endpoint
)
67 curl_cmd
.setopt(pycurl
.CONNECTTIMEOUT
, self
.CONNECT_TIMEOUT
)
68 curl_cmd
.setopt(pycurl
.URL
, self
._url
+ endpoint
)
69 curl_cmd
.setopt(pycurl
.SSL_VERIFYPEER
, 0)
70 curl_cmd
.setopt(pycurl
.SSL_VERIFYHOST
, 0)
72 curl_cmd
.setopt(pycurl
.HTTPHEADER
, self
._http
_header
)
75 def delete_cmd(self
, endpoint
, skip_query_admin
=False):
76 self
._logger
.debug("")
78 curl_cmd
= self
._get
_curl
_cmd
(endpoint
, skip_query_admin
)
79 curl_cmd
.setopt(pycurl
.CUSTOMREQUEST
, "DELETE")
80 curl_cmd
.setopt(pycurl
.WRITEFUNCTION
, data
.write
)
81 self
._logger
.info("Request METHOD: {} URL: {}".format("DELETE",self
._url
+ endpoint
))
83 http_code
= curl_cmd
.getinfo(pycurl
.HTTP_CODE
)
84 self
._logger
.info("Response HTTPCODE: {}".format(http_code
))
86 self
.check_http_response(http_code
, data
)
87 # TODO 202 accepted should be returned somehow
89 data_text
= data
.getvalue().decode()
90 self
._logger
.verbose("Response DATA: {}".format(data_text
))
91 return http_code
, data_text
93 return http_code
, None
95 def send_cmd(self
, endpoint
='', postfields_dict
=None,
96 formfile
=None, filename
=None,
97 put_method
=False, patch_method
=False,
98 skip_query_admin
=False):
99 self
._logger
.debug("")
101 curl_cmd
= self
._get
_curl
_cmd
(endpoint
, skip_query_admin
)
103 curl_cmd
.setopt(pycurl
.CUSTOMREQUEST
, "PUT")
105 curl_cmd
.setopt(pycurl
.CUSTOMREQUEST
, "PATCH")
106 curl_cmd
.setopt(pycurl
.POST
, 1)
107 curl_cmd
.setopt(pycurl
.WRITEFUNCTION
, data
.write
)
109 if postfields_dict
is not None:
110 jsondata
= json
.dumps(postfields_dict
)
111 if 'password' in postfields_dict
:
112 postfields_dict_copy
= copy
.deepcopy(postfields_dict
)
113 postfields_dict_copy
['password']='******'
114 jsondata_log
= json
.dumps(postfields_dict_copy
)
116 jsondata_log
= jsondata
117 self
._logger
.verbose("Request POSTFIELDS: {}".format(jsondata_log
))
118 curl_cmd
.setopt(pycurl
.POSTFIELDS
, jsondata
)
119 elif formfile
is not None:
125 elif filename
is not None:
126 with
open(filename
, 'rb') as stream
:
127 postdata
=stream
.read()
128 self
._logger
.verbose("Request POSTFIELDS: Binary content")
129 curl_cmd
.setopt(pycurl
.POSTFIELDS
, postdata
)
132 self
._logger
.info("Request METHOD: {} URL: {}".format("PUT",self
._url
+ endpoint
))
134 self
._logger
.info("Request METHOD: {} URL: {}".format("PATCH",self
._url
+ endpoint
))
136 self
._logger
.info("Request METHOD: {} URL: {}".format("POST",self
._url
+ endpoint
))
138 http_code
= curl_cmd
.getinfo(pycurl
.HTTP_CODE
)
139 self
._logger
.info("Response HTTPCODE: {}".format(http_code
))
141 self
.check_http_response(http_code
, data
)
143 data_text
= data
.getvalue().decode()
144 self
._logger
.verbose("Response DATA: {}".format(data_text
))
145 return http_code
, data_text
147 return http_code
, None
149 def post_cmd(self
, endpoint
='', postfields_dict
=None,
150 formfile
=None, filename
=None,
151 skip_query_admin
=False):
152 self
._logger
.debug("")
153 return self
.send_cmd(endpoint
=endpoint
,
154 postfields_dict
=postfields_dict
,
155 formfile
=formfile
, filename
=filename
,
156 put_method
=False, patch_method
=False,
157 skip_query_admin
=skip_query_admin
)
159 def put_cmd(self
, endpoint
='', postfields_dict
=None,
160 formfile
=None, filename
=None,
161 skip_query_admin
=False):
162 self
._logger
.debug("")
163 return self
.send_cmd(endpoint
=endpoint
,
164 postfields_dict
=postfields_dict
,
165 formfile
=formfile
, filename
=filename
,
166 put_method
=True, patch_method
=False,
167 skip_query_admin
=skip_query_admin
)
169 def patch_cmd(self
, endpoint
='', postfields_dict
=None,
170 formfile
=None, filename
=None,
171 skip_query_admin
=False):
172 self
._logger
.debug("")
173 return self
.send_cmd(endpoint
=endpoint
,
174 postfields_dict
=postfields_dict
,
175 formfile
=formfile
, filename
=filename
,
176 put_method
=False, patch_method
=True,
177 skip_query_admin
=skip_query_admin
)
179 def get2_cmd(self
, endpoint
, skip_query_admin
=False):
180 self
._logger
.debug("")
182 curl_cmd
= self
._get
_curl
_cmd
(endpoint
, skip_query_admin
)
183 curl_cmd
.setopt(pycurl
.HTTPGET
, 1)
184 curl_cmd
.setopt(pycurl
.WRITEFUNCTION
, data
.write
)
185 self
._logger
.info("Request METHOD: {} URL: {}".format("GET",self
._url
+ endpoint
))
187 http_code
= curl_cmd
.getinfo(pycurl
.HTTP_CODE
)
188 self
._logger
.info("Response HTTPCODE: {}".format(http_code
))
190 self
.check_http_response(http_code
, data
)
192 data_text
= data
.getvalue().decode()
193 self
._logger
.verbose("Response DATA: {}".format(data_text
))
194 return http_code
, data_text
195 return http_code
, None
197 def check_http_response(self
, http_code
, data
):
201 data_text
= data
.getvalue().decode()
202 self
._logger
.verbose("Response {} DATA: {}".format(http_code
, data_text
))
203 resp
= ": " + data_text
205 self
._logger
.verbose("Response {}".format(http_code
))
207 raise NotFound("Error {}{}".format(http_code
, resp
))
208 raise OsmHttpException("Error {}{}".format(http_code
, resp
))
210 def set_query_admin(self
, **kwargs
):
211 if 'all_projects' in kwargs
:
212 self
._all
_projects
=kwargs
['all_projects']
213 if 'public' in kwargs
:
214 self
._public
=kwargs
['public']
215 self
._default
_query
_admin
= self
._complete
_default
_query
_admin
()