1 # Copyright 2018 Telefonica
5 # Licensed under the Apache License, Version 2.0 (the "License"); you may
6 # not use this file except in compliance with the License. You may obtain
7 # a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 # License for the specific language governing permissions and limitations
18 from io
import BytesIO
22 from osmclient
.common
import http
23 from osmclient
.common
.exceptions
import OsmHttpException
, NotFound
27 class Http(http
.Http
):
30 def __init__(self
, url
, user
="admin", password
="admin", **kwargs
):
33 self
._password
= password
34 self
._http
_header
= None
35 self
._logger
= logging
.getLogger("osmclient")
36 self
._default
_query
_admin
= None
37 self
._all
_projects
= None
39 if "all_projects" in kwargs
:
40 self
._all
_projects
= kwargs
["all_projects"]
41 if "public" in kwargs
:
42 self
._public
= kwargs
["public"]
43 self
._default
_query
_admin
= self
._complete
_default
_query
_admin
()
45 def _complete_default_query_admin(self
):
46 query_string_list
= []
47 if self
._all
_projects
:
48 query_string_list
.append("ADMIN")
49 if self
._public
is not None:
50 query_string_list
.append("PUBLIC={}".format(self
._public
))
51 return "&".join(query_string_list
)
53 def _complete_endpoint(self
, endpoint
):
54 if self
._default
_query
_admin
:
56 endpoint
= "&".join([endpoint
, self
._default
_query
_admin
])
58 endpoint
= "?".join([endpoint
, self
._default
_query
_admin
])
61 def _get_curl_cmd(self
, endpoint
, skip_query_admin
=False):
62 self
._logger
.debug("")
63 curl_cmd
= pycurl
.Curl()
64 if self
._logger
.getEffectiveLevel() == logging
.DEBUG
:
65 curl_cmd
.setopt(pycurl
.VERBOSE
, True)
66 if not skip_query_admin
:
67 endpoint
= self
._complete
_endpoint
(endpoint
)
68 curl_cmd
.setopt(pycurl
.CONNECTTIMEOUT
, self
.CONNECT_TIMEOUT
)
69 curl_cmd
.setopt(pycurl
.URL
, self
._url
+ endpoint
)
70 curl_cmd
.setopt(pycurl
.SSL_VERIFYPEER
, 0)
71 curl_cmd
.setopt(pycurl
.SSL_VERIFYHOST
, 0)
73 curl_cmd
.setopt(pycurl
.HTTPHEADER
, self
._http
_header
)
76 def delete_cmd(self
, endpoint
, skip_query_admin
=False):
77 self
._logger
.debug("")
79 curl_cmd
= self
._get
_curl
_cmd
(endpoint
, skip_query_admin
)
80 curl_cmd
.setopt(pycurl
.CUSTOMREQUEST
, "DELETE")
81 curl_cmd
.setopt(pycurl
.WRITEFUNCTION
, data
.write
)
83 "Request METHOD: {} URL: {}".format("DELETE", self
._url
+ endpoint
)
86 http_code
= curl_cmd
.getinfo(pycurl
.HTTP_CODE
)
87 self
._logger
.info("Response HTTPCODE: {}".format(http_code
))
89 self
.check_http_response(http_code
, data
)
90 # TODO 202 accepted should be returned somehow
92 data_text
= data
.getvalue().decode()
93 self
._logger
.verbose("Response DATA: {}".format(data_text
))
94 return http_code
, data_text
96 return http_code
, None
101 postfields_dict
=None,
106 skip_query_admin
=False,
108 self
._logger
.debug("")
110 curl_cmd
= self
._get
_curl
_cmd
(endpoint
, skip_query_admin
)
112 curl_cmd
.setopt(pycurl
.CUSTOMREQUEST
, "PUT")
114 curl_cmd
.setopt(pycurl
.CUSTOMREQUEST
, "PATCH")
115 curl_cmd
.setopt(pycurl
.POST
, 1)
116 curl_cmd
.setopt(pycurl
.WRITEFUNCTION
, data
.write
)
118 if postfields_dict
is not None:
119 jsondata
= json
.dumps(postfields_dict
)
120 if "password" in postfields_dict
:
121 postfields_dict_copy
= copy
.deepcopy(postfields_dict
)
122 postfields_dict_copy
["password"] = "******"
123 jsondata_log
= json
.dumps(postfields_dict_copy
)
125 jsondata_log
= jsondata
126 self
._logger
.verbose("Request POSTFIELDS: {}".format(jsondata_log
))
127 curl_cmd
.setopt(pycurl
.POSTFIELDS
, jsondata
)
128 elif formfile
is not None:
130 pycurl
.HTTPPOST
, [((formfile
[0], (pycurl
.FORM_FILE
, formfile
[1])))]
132 elif filename
is not None:
133 with
open(filename
, "rb") as stream
:
134 postdata
= stream
.read()
135 self
._logger
.verbose("Request POSTFIELDS: Binary content")
136 curl_cmd
.setopt(pycurl
.POSTFIELDS
, postdata
)
140 "Request METHOD: {} URL: {}".format("PUT", self
._url
+ endpoint
)
144 "Request METHOD: {} URL: {}".format("PATCH", self
._url
+ endpoint
)
148 "Request METHOD: {} URL: {}".format("POST", self
._url
+ endpoint
)
151 http_code
= curl_cmd
.getinfo(pycurl
.HTTP_CODE
)
152 self
._logger
.info("Response HTTPCODE: {}".format(http_code
))
154 self
.check_http_response(http_code
, data
)
156 data_text
= data
.getvalue().decode()
157 self
._logger
.verbose("Response DATA: {}".format(data_text
))
158 return http_code
, data_text
160 return http_code
, None
165 postfields_dict
=None,
168 skip_query_admin
=False,
170 self
._logger
.debug("")
171 return self
.send_cmd(
173 postfields_dict
=postfields_dict
,
178 skip_query_admin
=skip_query_admin
,
184 postfields_dict
=None,
187 skip_query_admin
=False,
189 self
._logger
.debug("")
190 return self
.send_cmd(
192 postfields_dict
=postfields_dict
,
197 skip_query_admin
=skip_query_admin
,
203 postfields_dict
=None,
206 skip_query_admin
=False,
208 self
._logger
.debug("")
209 return self
.send_cmd(
211 postfields_dict
=postfields_dict
,
216 skip_query_admin
=skip_query_admin
,
219 def get2_cmd(self
, endpoint
, skip_query_admin
=False):
220 self
._logger
.debug("")
222 curl_cmd
= self
._get
_curl
_cmd
(endpoint
, skip_query_admin
)
223 curl_cmd
.setopt(pycurl
.HTTPGET
, 1)
224 curl_cmd
.setopt(pycurl
.WRITEFUNCTION
, data
.write
)
226 "Request METHOD: {} URL: {}".format("GET", self
._url
+ endpoint
)
229 http_code
= curl_cmd
.getinfo(pycurl
.HTTP_CODE
)
230 self
._logger
.info("Response HTTPCODE: {}".format(http_code
))
232 self
.check_http_response(http_code
, data
)
234 data_text
= data
.getvalue().decode()
235 self
._logger
.verbose("Response DATA: {}".format(data_text
))
236 return http_code
, data_text
237 return http_code
, None
239 def check_http_response(self
, http_code
, data
):
243 data_text
= data
.getvalue().decode()
244 self
._logger
.verbose(
245 "Response {} DATA: {}".format(http_code
, data_text
)
247 resp
= ": " + data_text
249 self
._logger
.verbose("Response {}".format(http_code
))
251 raise NotFound("Error {}{}".format(http_code
, resp
))
252 raise OsmHttpException("Error {}{}".format(http_code
, resp
))
254 def set_query_admin(self
, **kwargs
):
255 if "all_projects" in kwargs
:
256 self
._all
_projects
= kwargs
["all_projects"]
257 if "public" in kwargs
:
258 self
._public
= kwargs
["public"]
259 self
._default
_query
_admin
= self
._complete
_default
_query
_admin
()