1 # Copyright 2018 Telefonica
5 # Licensed under the Apache License, Version 2.0 (the "License"); you may
6 # not use this file except in compliance with the License. You may obtain
7 # a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 # License for the specific language governing permissions and limitations
21 import http
.client
as http_client
24 from osmclient
.common
import http
25 from osmclient
.common
.exceptions
import OsmHttpException
, NotFound
28 urllib3
.disable_warnings()
31 class Http(http
.Http
):
34 def __init__(self
, url
, user
="admin", password
="admin", **kwargs
):
37 self
._password
= password
38 self
._http
_header
= None
39 self
._logger
= logging
.getLogger("osmclient")
40 self
._default
_query
_admin
= None
41 self
._all
_projects
= None
43 if "all_projects" in kwargs
:
44 self
._all
_projects
= kwargs
["all_projects"]
45 if "public" in kwargs
:
46 self
._public
= kwargs
["public"]
47 self
._default
_query
_admin
= self
._complete
_default
_query
_admin
()
49 def _complete_default_query_admin(self
):
50 query_string_list
= []
51 if self
._all
_projects
:
52 query_string_list
.append("ADMIN")
53 if self
._public
is not None:
54 query_string_list
.append("PUBLIC={}".format(self
._public
))
55 return "&".join(query_string_list
)
57 def _complete_endpoint(self
, endpoint
):
58 if self
._default
_query
_admin
:
60 endpoint
= "&".join([endpoint
, self
._default
_query
_admin
])
62 endpoint
= "?".join([endpoint
, self
._default
_query
_admin
])
65 def _get_requests_cmd(self
, endpoint
, skip_query_admin
=False):
66 self
._logger
.debug("")
67 requests_cmd
= requests
.Request()
68 if self
._logger
.getEffectiveLevel() == logging
.DEBUG
:
69 http_client
.HTTPConnection
.debuglevel
= 1
70 requests_log
= logging
.getLogger("requests.packages.urllib3")
71 requests_log
.setLevel(logging
.DEBUG
)
72 requests_log
.propagate
= True
74 http_client
.HTTPConnection
.debuglevel
= 0
75 requests_log
= logging
.getLogger("requests.packages.urllib3")
76 requests_log
.setLevel(logging
.NOTSET
)
77 requests_log
.propagate
= True
79 if not skip_query_admin
:
80 endpoint
= self
._complete
_endpoint
(endpoint
)
81 requests_cmd
.url
= self
._url
+ endpoint
83 requests_cmd
.headers
= self
._http
_header
86 def delete_cmd(self
, endpoint
, skip_query_admin
=False):
87 session_cmd
= requests
.Session()
88 self
._logger
.debug("")
89 requests_cmd
= self
._get
_requests
_cmd
(endpoint
, skip_query_admin
)
90 requests_cmd
.method
= "DELETE"
92 "Request METHOD: {} URL: {}".format("DELETE", self
._url
+ endpoint
)
94 requests_cmd
= requests_cmd
.prepare()
95 resp
= session_cmd
.send(
96 requests_cmd
, verify
=False, timeout
=self
.CONNECT_TIMEOUT
98 http_code
= resp
.status_code
99 self
._logger
.info("Response HTTPCODE: {}".format(http_code
))
102 self
.check_http_response(http_code
, data
)
103 # TODO 202 accepted should be returned somehow
105 data_text
= data
.decode()
106 self
._logger
.verbose("Response DATA: {}".format(data_text
))
107 return http_code
, data_text
108 return http_code
, None
113 postfields_dict
=None,
118 skip_query_admin
=False,
120 session_cmd
= requests
.Session()
121 self
._logger
.debug("")
122 requests_cmd
= self
._get
_requests
_cmd
(endpoint
, skip_query_admin
)
124 requests_cmd
.method
= "PUT"
126 requests_cmd
.method
= "PATCH"
128 requests_cmd
.method
= "POST"
130 if postfields_dict
is not None:
131 jsondata
= json
.dumps(postfields_dict
)
132 if "password" in postfields_dict
:
133 postfields_dict_copy
= copy
.deepcopy(postfields_dict
)
134 postfields_dict_copy
["password"] = "******"
135 jsondata_log
= json
.dumps(postfields_dict_copy
)
137 jsondata_log
= jsondata
138 requests_cmd
.json
= postfields_dict
139 self
._logger
.verbose("Request POSTFIELDS: {}".format(jsondata_log
))
140 elif formfile
is not None:
141 requests_cmd
.files
= {formfile
[0]: formfile
[1]}
142 elif filename
is not None:
143 with
open(filename
, "rb") as stream
:
144 postdata
= stream
.read()
145 self
._logger
.verbose("Request POSTFIELDS: Binary content")
146 requests_cmd
.data
= postdata
150 "Request METHOD: {} URL: {}".format("PUT", self
._url
+ endpoint
)
154 "Request METHOD: {} URL: {}".format("PATCH", self
._url
+ endpoint
)
158 "Request METHOD: {} URL: {}".format("POST", self
._url
+ endpoint
)
160 requests_cmd
= requests_cmd
.prepare()
161 resp
= session_cmd
.send(
162 requests_cmd
, verify
=False, timeout
=self
.CONNECT_TIMEOUT
164 http_code
= resp
.status_code
165 self
._logger
.info("Response HTTPCODE: {}".format(http_code
))
167 self
.check_http_response(http_code
, data
)
170 data_text
= data
.decode()
171 self
._logger
.verbose("Response DATA: {}".format(data_text
))
172 return http_code
, data_text
173 return http_code
, None
178 postfields_dict
=None,
181 skip_query_admin
=False,
183 self
._logger
.debug("")
184 return self
.send_cmd(
186 postfields_dict
=postfields_dict
,
191 skip_query_admin
=skip_query_admin
,
197 postfields_dict
=None,
200 skip_query_admin
=False,
202 self
._logger
.debug("")
203 return self
.send_cmd(
205 postfields_dict
=postfields_dict
,
210 skip_query_admin
=skip_query_admin
,
216 postfields_dict
=None,
219 skip_query_admin
=False,
221 self
._logger
.debug("")
222 return self
.send_cmd(
224 postfields_dict
=postfields_dict
,
229 skip_query_admin
=skip_query_admin
,
232 def get2_cmd(self
, endpoint
, skip_query_admin
=False):
233 session_cmd
= requests
.Session()
234 self
._logger
.debug("")
235 requests_cmd
= self
._get
_requests
_cmd
(endpoint
, skip_query_admin
)
236 requests_cmd
.method
= "GET"
238 "Request METHOD: {} URL: {}".format("GET", self
._url
+ endpoint
)
240 requests_cmd
= requests_cmd
.prepare()
241 resp
= session_cmd
.send(
242 requests_cmd
, verify
=False, timeout
=self
.CONNECT_TIMEOUT
244 http_code
= resp
.status_code
245 self
._logger
.info("Response HTTPCODE: {}".format(http_code
))
248 self
.check_http_response(http_code
, data
)
250 data_text
= data
.decode()
251 self
._logger
.verbose("Response DATA: {}".format(data_text
))
252 return http_code
, data_text
253 return http_code
, None
255 def check_http_response(self
, http_code
, data
):
259 data_text
= data
.decode()
260 self
._logger
.verbose(
261 "Response {} DATA: {}".format(http_code
, data_text
)
263 resp
= ": " + data_text
265 self
._logger
.verbose("Response {}".format(http_code
))
267 raise NotFound("Error {}{}".format(http_code
, resp
))
268 raise OsmHttpException("Error {}{}".format(http_code
, resp
))
270 def set_query_admin(self
, **kwargs
):
271 if "all_projects" in kwargs
:
272 self
._all
_projects
= kwargs
["all_projects"]
273 if "public" in kwargs
:
274 self
._public
= kwargs
["public"]
275 self
._default
_query
_admin
= self
._complete
_default
_query
_admin
()