Feature 11037 Change default NBI port
[osm/osmclient.git] / osmclient / sol005 / http.py
1 # Copyright 2018 Telefonica
2 #
3 # All Rights Reserved.
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License"); you may
6 # not use this file except in compliance with the License. You may obtain
7 # a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 # License for the specific language governing permissions and limitations
15 # under the License.
16
17 import copy
18 import json
19 import logging
20 import requests
21 import http.client as http_client
22 import urllib3
23
24 from osmclient.common import http
25 from osmclient.common.exceptions import OsmHttpException, NotFound
26
27
28 urllib3.disable_warnings()
29
30
31 class Http(http.Http):
32 CONNECT_TIMEOUT = 15
33
34 def __init__(self, url, user="admin", password="admin", **kwargs):
35 self._url = url
36 self._user = user
37 self._password = password
38 self._http_header = None
39 self._logger = logging.getLogger("osmclient")
40 self._default_query_admin = None
41 self._all_projects = None
42 self._public = None
43 if "all_projects" in kwargs:
44 self._all_projects = kwargs["all_projects"]
45 if "public" in kwargs:
46 self._public = kwargs["public"]
47 self._default_query_admin = self._complete_default_query_admin()
48
49 def _complete_default_query_admin(self):
50 query_string_list = []
51 if self._all_projects:
52 query_string_list.append("ADMIN")
53 if self._public is not None:
54 query_string_list.append("PUBLIC={}".format(self._public))
55 return "&".join(query_string_list)
56
57 def _complete_endpoint(self, endpoint):
58 if self._default_query_admin:
59 if "?" in endpoint:
60 endpoint = "&".join([endpoint, self._default_query_admin])
61 else:
62 endpoint = "?".join([endpoint, self._default_query_admin])
63 return endpoint
64
65 def _get_requests_cmd(self, endpoint, skip_query_admin=False):
66 self._logger.debug("")
67 requests_cmd = requests.Request()
68 if self._logger.getEffectiveLevel() == logging.DEBUG:
69 http_client.HTTPConnection.debuglevel = 1
70 requests_log = logging.getLogger("requests.packages.urllib3")
71 requests_log.setLevel(logging.DEBUG)
72 requests_log.propagate = True
73 else:
74 http_client.HTTPConnection.debuglevel = 0
75 requests_log = logging.getLogger("requests.packages.urllib3")
76 requests_log.setLevel(logging.NOTSET)
77 requests_log.propagate = True
78
79 if not skip_query_admin:
80 endpoint = self._complete_endpoint(endpoint)
81 requests_cmd.url = self._url + endpoint
82 if self._http_header:
83 requests_cmd.headers = self._http_header
84 return requests_cmd
85
86 def delete_cmd(self, endpoint, skip_query_admin=False):
87 session_cmd = requests.Session()
88 self._logger.debug("")
89 requests_cmd = self._get_requests_cmd(endpoint, skip_query_admin)
90 requests_cmd.method = "DELETE"
91 self._logger.info(
92 "Request METHOD: {} URL: {}".format("DELETE", self._url + endpoint)
93 )
94 requests_cmd = requests_cmd.prepare()
95 resp = session_cmd.send(
96 requests_cmd, verify=False, timeout=self.CONNECT_TIMEOUT
97 )
98 http_code = resp.status_code
99 self._logger.info("Response HTTPCODE: {}".format(http_code))
100 data = resp.content
101 session_cmd.close()
102 self.check_http_response(http_code, data)
103 # TODO 202 accepted should be returned somehow
104 if data:
105 data_text = data.decode()
106 self._logger.verbose("Response DATA: {}".format(data_text))
107 return http_code, data_text
108 return http_code, None
109
110 def send_cmd(
111 self,
112 endpoint="",
113 postfields_dict=None,
114 formfile=None,
115 filename=None,
116 put_method=False,
117 patch_method=False,
118 skip_query_admin=False,
119 ):
120 session_cmd = requests.Session()
121 self._logger.debug("")
122 requests_cmd = self._get_requests_cmd(endpoint, skip_query_admin)
123 if put_method:
124 requests_cmd.method = "PUT"
125 elif patch_method:
126 requests_cmd.method = "PATCH"
127 else:
128 requests_cmd.method = "POST"
129
130 if postfields_dict is not None:
131 jsondata = json.dumps(postfields_dict)
132 if "password" in postfields_dict:
133 postfields_dict_copy = copy.deepcopy(postfields_dict)
134 postfields_dict_copy["password"] = "******"
135 jsondata_log = json.dumps(postfields_dict_copy)
136 else:
137 jsondata_log = jsondata
138 requests_cmd.json = postfields_dict
139 self._logger.verbose("Request POSTFIELDS: {}".format(jsondata_log))
140 elif formfile is not None:
141 requests_cmd.files = {formfile[0]: formfile[1]}
142 elif filename is not None:
143 with open(filename, "rb") as stream:
144 postdata = stream.read()
145 self._logger.verbose("Request POSTFIELDS: Binary content")
146 requests_cmd.data = postdata
147
148 if put_method:
149 self._logger.info(
150 "Request METHOD: {} URL: {}".format("PUT", self._url + endpoint)
151 )
152 elif patch_method:
153 self._logger.info(
154 "Request METHOD: {} URL: {}".format("PATCH", self._url + endpoint)
155 )
156 else:
157 self._logger.info(
158 "Request METHOD: {} URL: {}".format("POST", self._url + endpoint)
159 )
160 requests_cmd = requests_cmd.prepare()
161 resp = session_cmd.send(
162 requests_cmd, verify=False, timeout=self.CONNECT_TIMEOUT
163 )
164 http_code = resp.status_code
165 self._logger.info("Response HTTPCODE: {}".format(http_code))
166 data = resp.content
167 self.check_http_response(http_code, data)
168 session_cmd.close()
169 if data:
170 data_text = data.decode()
171 self._logger.verbose("Response DATA: {}".format(data_text))
172 return http_code, data_text
173 return http_code, None
174
175 def post_cmd(
176 self,
177 endpoint="",
178 postfields_dict=None,
179 formfile=None,
180 filename=None,
181 skip_query_admin=False,
182 ):
183 self._logger.debug("")
184 return self.send_cmd(
185 endpoint=endpoint,
186 postfields_dict=postfields_dict,
187 formfile=formfile,
188 filename=filename,
189 put_method=False,
190 patch_method=False,
191 skip_query_admin=skip_query_admin,
192 )
193
194 def put_cmd(
195 self,
196 endpoint="",
197 postfields_dict=None,
198 formfile=None,
199 filename=None,
200 skip_query_admin=False,
201 ):
202 self._logger.debug("")
203 return self.send_cmd(
204 endpoint=endpoint,
205 postfields_dict=postfields_dict,
206 formfile=formfile,
207 filename=filename,
208 put_method=True,
209 patch_method=False,
210 skip_query_admin=skip_query_admin,
211 )
212
213 def patch_cmd(
214 self,
215 endpoint="",
216 postfields_dict=None,
217 formfile=None,
218 filename=None,
219 skip_query_admin=False,
220 ):
221 self._logger.debug("")
222 return self.send_cmd(
223 endpoint=endpoint,
224 postfields_dict=postfields_dict,
225 formfile=formfile,
226 filename=filename,
227 put_method=False,
228 patch_method=True,
229 skip_query_admin=skip_query_admin,
230 )
231
232 def get2_cmd(self, endpoint, skip_query_admin=False):
233 session_cmd = requests.Session()
234 self._logger.debug("")
235 requests_cmd = self._get_requests_cmd(endpoint, skip_query_admin)
236 requests_cmd.method = "GET"
237 self._logger.info(
238 "Request METHOD: {} URL: {}".format("GET", self._url + endpoint)
239 )
240 requests_cmd = requests_cmd.prepare()
241 resp = session_cmd.send(
242 requests_cmd, verify=False, timeout=self.CONNECT_TIMEOUT
243 )
244 http_code = resp.status_code
245 self._logger.info("Response HTTPCODE: {}".format(http_code))
246 data = resp.content
247 session_cmd.close()
248 self.check_http_response(http_code, data)
249 if data:
250 data_text = data.decode()
251 self._logger.verbose("Response DATA: {}".format(data_text))
252 return http_code, data_text
253 return http_code, None
254
255 def check_http_response(self, http_code, data):
256 if http_code >= 300:
257 resp = ""
258 if data:
259 data_text = data.decode()
260 self._logger.verbose(
261 "Response {} DATA: {}".format(http_code, data_text)
262 )
263 resp = ": " + data_text
264 else:
265 self._logger.verbose("Response {}".format(http_code))
266 if http_code == 404:
267 raise NotFound("Error {}{}".format(http_code, resp))
268 raise OsmHttpException("Error {}{}".format(http_code, resp))
269
270 def set_query_admin(self, **kwargs):
271 if "all_projects" in kwargs:
272 self._all_projects = kwargs["all_projects"]
273 if "public" in kwargs:
274 self._public = kwargs["public"]
275 self._default_query_admin = self._complete_default_query_admin()