1 # Copyright 2018 Telefonica
5 # Licensed under the Apache License, Version 2.0 (the "License"); you may
6 # not use this file except in compliance with the License. You may obtain
7 # a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 # License for the specific language governing permissions and limitations
18 from io
import BytesIO
22 from osmclient
.common
import http
23 from osmclient
.common
.exceptions
import OsmHttpException
, NotFound
27 class Http(http
.Http
):
30 def __init__(self
, url
, user
='admin', password
='admin', **kwargs
):
33 self
._password
= password
34 self
._http
_header
= None
35 self
._logger
= logging
.getLogger('osmclient')
36 self
._default
_query
_admin
= None
37 self
._all
_projects
= None
39 if 'all_projects' in kwargs
:
40 self
._all
_projects
= kwargs
['all_projects']
41 if 'public' in kwargs
:
42 self
._public
= kwargs
['public']
43 self
._default
_query
_admin
= self
._complete
_default
_query
_admin
()
45 def _complete_default_query_admin(self
):
46 query_string_list
= []
47 if self
._all
_projects
:
48 query_string_list
.append("ADMIN")
49 if self
._public
is not None:
50 query_string_list
.append("PUBLIC={}".format(self
._public
))
51 return "&".join(query_string_list
)
53 def _complete_endpoint(self
, endpoint
):
54 if self
._default
_query
_admin
:
56 endpoint
= '&'.join([endpoint
, self
._default
_query
_admin
])
58 endpoint
= '?'.join([endpoint
, self
._default
_query
_admin
])
61 def _get_curl_cmd(self
, endpoint
, skip_query_admin
=False):
62 self
._logger
.debug("")
63 curl_cmd
= pycurl
.Curl()
64 if self
._logger
.getEffectiveLevel() == logging
.DEBUG
:
65 curl_cmd
.setopt(pycurl
.VERBOSE
, True)
66 if not skip_query_admin
:
67 endpoint
= self
._complete
_endpoint
(endpoint
)
68 curl_cmd
.setopt(pycurl
.CONNECTTIMEOUT
, self
.CONNECT_TIMEOUT
)
69 curl_cmd
.setopt(pycurl
.URL
, self
._url
+ endpoint
)
70 curl_cmd
.setopt(pycurl
.SSL_VERIFYPEER
, 0)
71 curl_cmd
.setopt(pycurl
.SSL_VERIFYHOST
, 0)
73 curl_cmd
.setopt(pycurl
.HTTPHEADER
, self
._http
_header
)
76 def delete_cmd(self
, endpoint
, skip_query_admin
=False):
77 self
._logger
.debug("")
79 curl_cmd
= self
._get
_curl
_cmd
(endpoint
, skip_query_admin
)
80 curl_cmd
.setopt(pycurl
.CUSTOMREQUEST
, "DELETE")
81 curl_cmd
.setopt(pycurl
.WRITEFUNCTION
, data
.write
)
82 self
._logger
.info("Request METHOD: {} URL: {}".format("DELETE", self
._url
+ endpoint
))
84 http_code
= curl_cmd
.getinfo(pycurl
.HTTP_CODE
)
85 self
._logger
.info("Response HTTPCODE: {}".format(http_code
))
87 self
.check_http_response(http_code
, data
)
88 # TODO 202 accepted should be returned somehow
90 data_text
= data
.getvalue().decode()
91 self
._logger
.verbose("Response DATA: {}".format(data_text
))
92 return http_code
, data_text
94 return http_code
, None
96 def send_cmd(self
, endpoint
='', postfields_dict
=None,
97 formfile
=None, filename
=None,
98 put_method
=False, patch_method
=False,
99 skip_query_admin
=False):
100 self
._logger
.debug("")
102 curl_cmd
= self
._get
_curl
_cmd
(endpoint
, skip_query_admin
)
104 curl_cmd
.setopt(pycurl
.CUSTOMREQUEST
, "PUT")
106 curl_cmd
.setopt(pycurl
.CUSTOMREQUEST
, "PATCH")
107 curl_cmd
.setopt(pycurl
.POST
, 1)
108 curl_cmd
.setopt(pycurl
.WRITEFUNCTION
, data
.write
)
110 if postfields_dict
is not None:
111 jsondata
= json
.dumps(postfields_dict
)
112 if 'password' in postfields_dict
:
113 postfields_dict_copy
= copy
.deepcopy(postfields_dict
)
114 postfields_dict_copy
['password'] = '******'
115 jsondata_log
= json
.dumps(postfields_dict_copy
)
117 jsondata_log
= jsondata
118 self
._logger
.verbose("Request POSTFIELDS: {}".format(jsondata_log
))
119 curl_cmd
.setopt(pycurl
.POSTFIELDS
, jsondata
)
120 elif formfile
is not None:
126 elif filename
is not None:
127 with
open(filename
, 'rb') as stream
:
128 postdata
= stream
.read()
129 self
._logger
.verbose("Request POSTFIELDS: Binary content")
130 curl_cmd
.setopt(pycurl
.POSTFIELDS
, postdata
)
133 self
._logger
.info("Request METHOD: {} URL: {}".format("PUT", self
._url
+ endpoint
))
135 self
._logger
.info("Request METHOD: {} URL: {}".format("PATCH", self
._url
+ endpoint
))
137 self
._logger
.info("Request METHOD: {} URL: {}".format("POST", self
._url
+ endpoint
))
139 http_code
= curl_cmd
.getinfo(pycurl
.HTTP_CODE
)
140 self
._logger
.info("Response HTTPCODE: {}".format(http_code
))
142 self
.check_http_response(http_code
, data
)
144 data_text
= data
.getvalue().decode()
145 self
._logger
.verbose("Response DATA: {}".format(data_text
))
146 return http_code
, data_text
148 return http_code
, None
150 def post_cmd(self
, endpoint
='', postfields_dict
=None,
151 formfile
=None, filename
=None,
152 skip_query_admin
=False):
153 self
._logger
.debug("")
154 return self
.send_cmd(endpoint
=endpoint
,
155 postfields_dict
=postfields_dict
,
156 formfile
=formfile
, filename
=filename
,
157 put_method
=False, patch_method
=False,
158 skip_query_admin
=skip_query_admin
)
160 def put_cmd(self
, endpoint
='', postfields_dict
=None,
161 formfile
=None, filename
=None,
162 skip_query_admin
=False):
163 self
._logger
.debug("")
164 return self
.send_cmd(endpoint
=endpoint
,
165 postfields_dict
=postfields_dict
,
166 formfile
=formfile
, filename
=filename
,
167 put_method
=True, patch_method
=False,
168 skip_query_admin
=skip_query_admin
)
170 def patch_cmd(self
, endpoint
='', postfields_dict
=None,
171 formfile
=None, filename
=None,
172 skip_query_admin
=False):
173 self
._logger
.debug("")
174 return self
.send_cmd(endpoint
=endpoint
,
175 postfields_dict
=postfields_dict
,
176 formfile
=formfile
, filename
=filename
,
177 put_method
=False, patch_method
=True,
178 skip_query_admin
=skip_query_admin
)
180 def get2_cmd(self
, endpoint
, skip_query_admin
=False):
181 self
._logger
.debug("")
183 curl_cmd
= self
._get
_curl
_cmd
(endpoint
, skip_query_admin
)
184 curl_cmd
.setopt(pycurl
.HTTPGET
, 1)
185 curl_cmd
.setopt(pycurl
.WRITEFUNCTION
, data
.write
)
186 self
._logger
.info("Request METHOD: {} URL: {}".format("GET", self
._url
+ endpoint
))
188 http_code
= curl_cmd
.getinfo(pycurl
.HTTP_CODE
)
189 self
._logger
.info("Response HTTPCODE: {}".format(http_code
))
191 self
.check_http_response(http_code
, data
)
193 data_text
= data
.getvalue().decode()
194 self
._logger
.verbose("Response DATA: {}".format(data_text
))
195 return http_code
, data_text
196 return http_code
, None
198 def check_http_response(self
, http_code
, data
):
202 data_text
= data
.getvalue().decode()
203 self
._logger
.verbose("Response {} DATA: {}".format(http_code
, data_text
))
204 resp
= ": " + data_text
206 self
._logger
.verbose("Response {}".format(http_code
))
208 raise NotFound("Error {}{}".format(http_code
, resp
))
209 raise OsmHttpException("Error {}{}".format(http_code
, resp
))
211 def set_query_admin(self
, **kwargs
):
212 if 'all_projects' in kwargs
:
213 self
._all
_projects
= kwargs
['all_projects']
214 if 'public' in kwargs
:
215 self
._public
= kwargs
['public']
216 self
._default
_query
_admin
= self
._complete
_default
_query
_admin
()