1 # Copyright 2018 Telefonica
5 # Licensed under the Apache License, Version 2.0 (the "License"); you may
6 # not use this file except in compliance with the License. You may obtain
7 # a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 # License for the specific language governing permissions and limitations
18 from io
import BytesIO
22 from osmclient
.common
import http
23 from osmclient
.common
.exceptions
import OsmHttpException
, NotFound
27 class Http(http
.Http
):
30 def __init__(self
, url
, user
="admin", password
="admin", **kwargs
):
33 self
._password
= password
34 self
._http
_header
= None
35 self
._logger
= logging
.getLogger("osmclient")
36 self
._default
_query
_admin
= None
37 self
._all
_projects
= None
39 if "all_projects" in kwargs
:
40 self
._all
_projects
= kwargs
["all_projects"]
41 if "public" in kwargs
:
42 self
._public
= kwargs
["public"]
43 self
._default
_query
_admin
= self
._complete
_default
_query
_admin
()
45 def _complete_default_query_admin(self
):
46 query_string_list
= []
47 if self
._all
_projects
:
48 query_string_list
.append("ADMIN")
49 if self
._public
is not None:
50 query_string_list
.append("PUBLIC={}".format(self
._public
))
51 return "&".join(query_string_list
)
53 def _complete_endpoint(self
, endpoint
):
54 if self
._default
_query
_admin
:
56 endpoint
= "&".join([endpoint
, self
._default
_query
_admin
])
58 endpoint
= "?".join([endpoint
, self
._default
_query
_admin
])
61 def _get_curl_cmd(self
, endpoint
, skip_query_admin
=False):
62 self
._logger
.debug("")
63 curl_cmd
= pycurl
.Curl()
64 if self
._logger
.getEffectiveLevel() == logging
.DEBUG
:
65 curl_cmd
.setopt(pycurl
.VERBOSE
, True)
66 if not skip_query_admin
:
67 endpoint
= self
._complete
_endpoint
(endpoint
)
68 curl_cmd
.setopt(pycurl
.CONNECTTIMEOUT
, self
.CONNECT_TIMEOUT
)
69 curl_cmd
.setopt(pycurl
.URL
, self
._url
+ endpoint
)
70 curl_cmd
.setopt(pycurl
.SSL_VERIFYPEER
, 0)
71 curl_cmd
.setopt(pycurl
.SSL_VERIFYHOST
, 0)
73 curl_cmd
.setopt(pycurl
.HTTPHEADER
, self
._http
_header
)
76 def delete_cmd(self
, endpoint
, skip_query_admin
=False):
77 self
._logger
.debug("")
79 curl_cmd
= self
._get
_curl
_cmd
(endpoint
, skip_query_admin
)
80 curl_cmd
.setopt(pycurl
.CUSTOMREQUEST
, "DELETE")
81 curl_cmd
.setopt(pycurl
.WRITEFUNCTION
, data
.write
)
83 "Request METHOD: {} URL: {}".format("DELETE", self
._url
+ endpoint
)
86 http_code
= curl_cmd
.getinfo(pycurl
.HTTP_CODE
)
87 self
._logger
.info("Response HTTPCODE: {}".format(http_code
))
89 self
.check_http_response(http_code
, data
)
90 # TODO 202 accepted should be returned somehow
92 data_text
= data
.getvalue().decode()
93 self
._logger
.verbose("Response DATA: {}".format(data_text
))
94 return http_code
, data_text
96 return http_code
, None
101 postfields_dict
=None,
106 skip_query_admin
=False,
108 self
._logger
.debug("")
110 curl_cmd
= self
._get
_curl
_cmd
(endpoint
, skip_query_admin
)
112 curl_cmd
.setopt(pycurl
.CUSTOMREQUEST
, "PUT")
114 curl_cmd
.setopt(pycurl
.CUSTOMREQUEST
, "PATCH")
115 curl_cmd
.setopt(pycurl
.POST
, 1)
116 curl_cmd
.setopt(pycurl
.WRITEFUNCTION
, data
.write
)
117 if postfields_dict
is not None:
118 jsondata
= json
.dumps(postfields_dict
)
119 if "password" in postfields_dict
:
120 postfields_dict_copy
= copy
.deepcopy(postfields_dict
)
121 postfields_dict_copy
["password"] = "******"
122 jsondata_log
= json
.dumps(postfields_dict_copy
)
124 jsondata_log
= jsondata
125 self
._logger
.verbose("Request POSTFIELDS: {}".format(jsondata_log
))
126 curl_cmd
.setopt(pycurl
.POSTFIELDS
, jsondata
)
127 elif formfile
is not None:
129 pycurl
.HTTPPOST
, [((formfile
[0], (pycurl
.FORM_FILE
, formfile
[1])))]
131 elif filename
is not None:
132 with
open(filename
, "rb") as stream
:
133 postdata
= stream
.read()
134 self
._logger
.verbose("Request POSTFIELDS: Binary content")
135 curl_cmd
.setopt(pycurl
.POSTFIELDS
, postdata
)
139 "Request METHOD: {} URL: {}".format("PUT", self
._url
+ endpoint
)
143 "Request METHOD: {} URL: {}".format("PATCH", self
._url
+ endpoint
)
147 "Request METHOD: {} URL: {}".format("POST", self
._url
+ endpoint
)
150 http_code
= curl_cmd
.getinfo(pycurl
.HTTP_CODE
)
151 self
._logger
.info("Response HTTPCODE: {}".format(http_code
))
153 self
.check_http_response(http_code
, data
)
155 data_text
= data
.getvalue().decode()
156 self
._logger
.verbose("Response DATA: {}".format(data_text
))
157 return http_code
, data_text
159 return http_code
, None
164 postfields_dict
=None,
167 skip_query_admin
=False,
169 self
._logger
.debug("")
170 return self
.send_cmd(
172 postfields_dict
=postfields_dict
,
177 skip_query_admin
=skip_query_admin
,
183 postfields_dict
=None,
186 skip_query_admin
=False,
188 self
._logger
.debug("")
189 return self
.send_cmd(
191 postfields_dict
=postfields_dict
,
196 skip_query_admin
=skip_query_admin
,
202 postfields_dict
=None,
205 skip_query_admin
=False,
207 self
._logger
.debug("")
208 return self
.send_cmd(
210 postfields_dict
=postfields_dict
,
215 skip_query_admin
=skip_query_admin
,
218 def get2_cmd(self
, endpoint
, skip_query_admin
=False):
219 self
._logger
.debug("")
221 curl_cmd
= self
._get
_curl
_cmd
(endpoint
, skip_query_admin
)
222 curl_cmd
.setopt(pycurl
.HTTPGET
, 1)
223 curl_cmd
.setopt(pycurl
.WRITEFUNCTION
, data
.write
)
225 "Request METHOD: {} URL: {}".format("GET", self
._url
+ endpoint
)
228 http_code
= curl_cmd
.getinfo(pycurl
.HTTP_CODE
)
229 self
._logger
.info("Response HTTPCODE: {}".format(http_code
))
231 self
.check_http_response(http_code
, data
)
233 data_text
= data
.getvalue().decode()
234 self
._logger
.verbose("Response DATA: {}".format(data_text
))
235 return http_code
, data_text
236 return http_code
, None
238 def check_http_response(self
, http_code
, data
):
242 data_text
= data
.getvalue().decode()
243 self
._logger
.verbose(
244 "Response {} DATA: {}".format(http_code
, data_text
)
246 resp
= ": " + data_text
248 self
._logger
.verbose("Response {}".format(http_code
))
250 raise NotFound("Error {}{}".format(http_code
, resp
))
251 raise OsmHttpException("Error {}{}".format(http_code
, resp
))
253 def set_query_admin(self
, **kwargs
):
254 if "all_projects" in kwargs
:
255 self
._all
_projects
= kwargs
["all_projects"]
256 if "public" in kwargs
:
257 self
._public
= kwargs
["public"]
258 self
._default
_query
_admin
= self
._complete
_default
_query
_admin
()