1 # Copyright 2018 Telefonica
5 # Licensed under the Apache License, Version 2.0 (the "License"); you may
6 # not use this file except in compliance with the License. You may obtain
7 # a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 # License for the specific language governing permissions and limitations
17 from io
import BytesIO
22 from osmclient
.common
import http
23 from osmclient
.common
.exceptions
import OsmHttpException
, NotFound
26 class Http(http
.Http
):
28 def __init__(self
, url
, user
='admin', password
='admin', **kwargs
):
31 self
._password
= password
32 self
._http
_header
= None
33 self
._logger
= logging
.getLogger('osmclient')
34 self
._default
_query
_admin
= None
35 self
._all
_projects
= None;
37 if 'all_projects' in kwargs
:
38 self
._all
_projects
=kwargs
['all_projects']
39 if 'public' in kwargs
:
40 self
._public
=kwargs
['public']
41 self
._default
_query
_admin
= self
._complete
_default
_query
_admin
()
43 def _complete_default_query_admin(self
):
45 if self
._all
_projects
:
46 query_string_list
.append("ADMIN")
47 if self
._public
is not None:
48 query_string_list
.append("PUBLIC={}".format(self
._public
))
49 return "&".join(query_string_list
)
51 def _complete_endpoint(self
, endpoint
):
52 if self
._default
_query
_admin
:
54 endpoint
= '&'.join([endpoint
, self
._default
_query
_admin
])
56 endpoint
= '?'.join([endpoint
, self
._default
_query
_admin
])
59 def _get_curl_cmd(self
, endpoint
, skip_query_admin
=False):
60 self
._logger
.debug("")
61 curl_cmd
= pycurl
.Curl()
62 if self
._logger
.getEffectiveLevel() == logging
.DEBUG
:
63 curl_cmd
.setopt(pycurl
.VERBOSE
, True)
64 if not skip_query_admin
:
65 endpoint
= self
._complete
_endpoint
(endpoint
)
66 curl_cmd
.setopt(pycurl
.URL
, self
._url
+ endpoint
)
67 curl_cmd
.setopt(pycurl
.SSL_VERIFYPEER
, 0)
68 curl_cmd
.setopt(pycurl
.SSL_VERIFYHOST
, 0)
70 curl_cmd
.setopt(pycurl
.HTTPHEADER
, self
._http
_header
)
73 def delete_cmd(self
, endpoint
, skip_query_admin
=False):
74 self
._logger
.debug("")
76 curl_cmd
= self
._get
_curl
_cmd
(endpoint
, skip_query_admin
)
77 curl_cmd
.setopt(pycurl
.CUSTOMREQUEST
, "DELETE")
78 curl_cmd
.setopt(pycurl
.WRITEFUNCTION
, data
.write
)
79 self
._logger
.info("Request METHOD: {} URL: {}".format("DELETE",self
._url
+ endpoint
))
81 http_code
= curl_cmd
.getinfo(pycurl
.HTTP_CODE
)
82 self
._logger
.info("Response HTTPCODE: {}".format(http_code
))
84 self
.check_http_response(http_code
, data
)
85 # TODO 202 accepted should be returned somehow
87 data_text
= data
.getvalue().decode()
88 self
._logger
.verbose("Response DATA: {}".format(data_text
))
89 return http_code
, data_text
91 return http_code
, None
93 def send_cmd(self
, endpoint
='', postfields_dict
=None,
94 formfile
=None, filename
=None,
95 put_method
=False, patch_method
=False,
96 skip_query_admin
=False):
97 self
._logger
.debug("")
99 curl_cmd
= self
._get
_curl
_cmd
(endpoint
, skip_query_admin
)
101 curl_cmd
.setopt(pycurl
.CUSTOMREQUEST
, "PUT")
103 curl_cmd
.setopt(pycurl
.CUSTOMREQUEST
, "PATCH")
104 curl_cmd
.setopt(pycurl
.POST
, 1)
105 curl_cmd
.setopt(pycurl
.WRITEFUNCTION
, data
.write
)
107 if postfields_dict
is not None:
108 jsondata
= json
.dumps(postfields_dict
)
109 if 'password' in postfields_dict
:
110 postfields_dict_copy
= copy
.deepcopy(postfields_dict
)
111 postfields_dict_copy
['password']='******'
112 jsondata_log
= json
.dumps(postfields_dict_copy
)
114 jsondata_log
= jsondata
115 self
._logger
.verbose("Request POSTFIELDS: {}".format(jsondata_log
))
116 curl_cmd
.setopt(pycurl
.POSTFIELDS
, jsondata
)
117 elif formfile
is not None:
123 elif filename
is not None:
124 with
open(filename
, 'rb') as stream
:
125 postdata
=stream
.read()
126 self
._logger
.verbose("Request POSTFIELDS: Binary content")
127 curl_cmd
.setopt(pycurl
.POSTFIELDS
, postdata
)
130 self
._logger
.info("Request METHOD: {} URL: {}".format("PUT",self
._url
+ endpoint
))
132 self
._logger
.info("Request METHOD: {} URL: {}".format("PATCH",self
._url
+ endpoint
))
134 self
._logger
.info("Request METHOD: {} URL: {}".format("POST",self
._url
+ endpoint
))
136 http_code
= curl_cmd
.getinfo(pycurl
.HTTP_CODE
)
137 self
._logger
.info("Response HTTPCODE: {}".format(http_code
))
139 self
.check_http_response(http_code
, data
)
141 data_text
= data
.getvalue().decode()
142 self
._logger
.verbose("Response DATA: {}".format(data_text
))
143 return http_code
, data_text
145 return http_code
, None
147 def post_cmd(self
, endpoint
='', postfields_dict
=None,
148 formfile
=None, filename
=None,
149 skip_query_admin
=False):
150 self
._logger
.debug("")
151 return self
.send_cmd(endpoint
=endpoint
,
152 postfields_dict
=postfields_dict
,
153 formfile
=formfile
, filename
=filename
,
154 put_method
=False, patch_method
=False,
155 skip_query_admin
=skip_query_admin
)
157 def put_cmd(self
, endpoint
='', postfields_dict
=None,
158 formfile
=None, filename
=None,
159 skip_query_admin
=False):
160 self
._logger
.debug("")
161 return self
.send_cmd(endpoint
=endpoint
,
162 postfields_dict
=postfields_dict
,
163 formfile
=formfile
, filename
=filename
,
164 put_method
=True, patch_method
=False,
165 skip_query_admin
=skip_query_admin
)
167 def patch_cmd(self
, endpoint
='', postfields_dict
=None,
168 formfile
=None, filename
=None,
169 skip_query_admin
=False):
170 self
._logger
.debug("")
171 return self
.send_cmd(endpoint
=endpoint
,
172 postfields_dict
=postfields_dict
,
173 formfile
=formfile
, filename
=filename
,
174 put_method
=False, patch_method
=True,
175 skip_query_admin
=skip_query_admin
)
177 def get2_cmd(self
, endpoint
, skip_query_admin
=False):
178 self
._logger
.debug("")
180 curl_cmd
= self
._get
_curl
_cmd
(endpoint
, skip_query_admin
)
181 curl_cmd
.setopt(pycurl
.HTTPGET
, 1)
182 curl_cmd
.setopt(pycurl
.WRITEFUNCTION
, data
.write
)
183 self
._logger
.info("Request METHOD: {} URL: {}".format("GET",self
._url
+ endpoint
))
185 http_code
= curl_cmd
.getinfo(pycurl
.HTTP_CODE
)
186 self
._logger
.info("Response HTTPCODE: {}".format(http_code
))
188 self
.check_http_response(http_code
, data
)
190 data_text
= data
.getvalue().decode()
191 self
._logger
.verbose("Response DATA: {}".format(data_text
))
192 return http_code
, data_text
193 return http_code
, None
195 def check_http_response(self
, http_code
, data
):
199 data_text
= data
.getvalue().decode()
200 self
._logger
.verbose("Response {} DATA: {}".format(http_code
, data_text
))
201 resp
= ": " + data_text
203 self
._logger
.verbose("Response {}".format(http_code
))
205 raise NotFound("Error {}{}".format(http_code
, resp
))
206 raise OsmHttpException("Error {}{}".format(http_code
, resp
))
208 def set_query_admin(self
, **kwargs
):
209 if 'all_projects' in kwargs
:
210 self
._all
_projects
=kwargs
['all_projects']
211 if 'public' in kwargs
:
212 self
._public
=kwargs
['public']
213 self
._default
_query
_admin
= self
._complete
_default
_query
_admin
()