Feature 10955: Osmclient changes related to VIM configuration with a Prometheus TSDB...
[osm/osmclient.git] / osmclient / sol005 / http.py
1 # Copyright 2018 Telefonica
2 #
3 # All Rights Reserved.
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License"); you may
6 # not use this file except in compliance with the License. You may obtain
7 # a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 # License for the specific language governing permissions and limitations
15 # under the License.
16
17 import copy
18 from io import BytesIO
19 import json
20 import logging
21
22 from osmclient.common import http
23 from osmclient.common.exceptions import OsmHttpException, NotFound
24 import pycurl
25
26
27 class Http(http.Http):
28 CONNECT_TIMEOUT = 15
29
30 def __init__(self, url, user="admin", password="admin", **kwargs):
31 self._url = url
32 self._user = user
33 self._password = password
34 self._http_header = None
35 self._logger = logging.getLogger("osmclient")
36 self._default_query_admin = None
37 self._all_projects = None
38 self._public = None
39 if "all_projects" in kwargs:
40 self._all_projects = kwargs["all_projects"]
41 if "public" in kwargs:
42 self._public = kwargs["public"]
43 self._default_query_admin = self._complete_default_query_admin()
44
45 def _complete_default_query_admin(self):
46 query_string_list = []
47 if self._all_projects:
48 query_string_list.append("ADMIN")
49 if self._public is not None:
50 query_string_list.append("PUBLIC={}".format(self._public))
51 return "&".join(query_string_list)
52
53 def _complete_endpoint(self, endpoint):
54 if self._default_query_admin:
55 if "?" in endpoint:
56 endpoint = "&".join([endpoint, self._default_query_admin])
57 else:
58 endpoint = "?".join([endpoint, self._default_query_admin])
59 return endpoint
60
61 def _get_curl_cmd(self, endpoint, skip_query_admin=False):
62 self._logger.debug("")
63 curl_cmd = pycurl.Curl()
64 if self._logger.getEffectiveLevel() == logging.DEBUG:
65 curl_cmd.setopt(pycurl.VERBOSE, True)
66 if not skip_query_admin:
67 endpoint = self._complete_endpoint(endpoint)
68 curl_cmd.setopt(pycurl.CONNECTTIMEOUT, self.CONNECT_TIMEOUT)
69 curl_cmd.setopt(pycurl.URL, self._url + endpoint)
70 curl_cmd.setopt(pycurl.SSL_VERIFYPEER, 0)
71 curl_cmd.setopt(pycurl.SSL_VERIFYHOST, 0)
72 if self._http_header:
73 curl_cmd.setopt(pycurl.HTTPHEADER, self._http_header)
74 return curl_cmd
75
76 def delete_cmd(self, endpoint, skip_query_admin=False):
77 self._logger.debug("")
78 data = BytesIO()
79 curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
80 curl_cmd.setopt(pycurl.CUSTOMREQUEST, "DELETE")
81 curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
82 self._logger.info(
83 "Request METHOD: {} URL: {}".format("DELETE", self._url + endpoint)
84 )
85 curl_cmd.perform()
86 http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
87 self._logger.info("Response HTTPCODE: {}".format(http_code))
88 curl_cmd.close()
89 self.check_http_response(http_code, data)
90 # TODO 202 accepted should be returned somehow
91 if data.getvalue():
92 data_text = data.getvalue().decode()
93 self._logger.verbose("Response DATA: {}".format(data_text))
94 return http_code, data_text
95 else:
96 return http_code, None
97
98 def send_cmd(
99 self,
100 endpoint="",
101 postfields_dict=None,
102 formfile=None,
103 filename=None,
104 put_method=False,
105 patch_method=False,
106 skip_query_admin=False,
107 ):
108 self._logger.debug("")
109 data = BytesIO()
110 curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
111 if put_method:
112 curl_cmd.setopt(pycurl.CUSTOMREQUEST, "PUT")
113 elif patch_method:
114 curl_cmd.setopt(pycurl.CUSTOMREQUEST, "PATCH")
115 curl_cmd.setopt(pycurl.POST, 1)
116 curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
117 if postfields_dict is not None:
118 jsondata = json.dumps(postfields_dict)
119 if "password" in postfields_dict:
120 postfields_dict_copy = copy.deepcopy(postfields_dict)
121 postfields_dict_copy["password"] = "******"
122 jsondata_log = json.dumps(postfields_dict_copy)
123 else:
124 jsondata_log = jsondata
125 self._logger.verbose("Request POSTFIELDS: {}".format(jsondata_log))
126 curl_cmd.setopt(pycurl.POSTFIELDS, jsondata)
127 elif formfile is not None:
128 curl_cmd.setopt(
129 pycurl.HTTPPOST, [((formfile[0], (pycurl.FORM_FILE, formfile[1])))]
130 )
131 elif filename is not None:
132 with open(filename, "rb") as stream:
133 postdata = stream.read()
134 self._logger.verbose("Request POSTFIELDS: Binary content")
135 curl_cmd.setopt(pycurl.POSTFIELDS, postdata)
136
137 if put_method:
138 self._logger.info(
139 "Request METHOD: {} URL: {}".format("PUT", self._url + endpoint)
140 )
141 elif patch_method:
142 self._logger.info(
143 "Request METHOD: {} URL: {}".format("PATCH", self._url + endpoint)
144 )
145 else:
146 self._logger.info(
147 "Request METHOD: {} URL: {}".format("POST", self._url + endpoint)
148 )
149 curl_cmd.perform()
150 http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
151 self._logger.info("Response HTTPCODE: {}".format(http_code))
152 curl_cmd.close()
153 self.check_http_response(http_code, data)
154 if data.getvalue():
155 data_text = data.getvalue().decode()
156 self._logger.verbose("Response DATA: {}".format(data_text))
157 return http_code, data_text
158 else:
159 return http_code, None
160
161 def post_cmd(
162 self,
163 endpoint="",
164 postfields_dict=None,
165 formfile=None,
166 filename=None,
167 skip_query_admin=False,
168 ):
169 self._logger.debug("")
170 return self.send_cmd(
171 endpoint=endpoint,
172 postfields_dict=postfields_dict,
173 formfile=formfile,
174 filename=filename,
175 put_method=False,
176 patch_method=False,
177 skip_query_admin=skip_query_admin,
178 )
179
180 def put_cmd(
181 self,
182 endpoint="",
183 postfields_dict=None,
184 formfile=None,
185 filename=None,
186 skip_query_admin=False,
187 ):
188 self._logger.debug("")
189 return self.send_cmd(
190 endpoint=endpoint,
191 postfields_dict=postfields_dict,
192 formfile=formfile,
193 filename=filename,
194 put_method=True,
195 patch_method=False,
196 skip_query_admin=skip_query_admin,
197 )
198
199 def patch_cmd(
200 self,
201 endpoint="",
202 postfields_dict=None,
203 formfile=None,
204 filename=None,
205 skip_query_admin=False,
206 ):
207 self._logger.debug("")
208 return self.send_cmd(
209 endpoint=endpoint,
210 postfields_dict=postfields_dict,
211 formfile=formfile,
212 filename=filename,
213 put_method=False,
214 patch_method=True,
215 skip_query_admin=skip_query_admin,
216 )
217
218 def get2_cmd(self, endpoint, skip_query_admin=False):
219 self._logger.debug("")
220 data = BytesIO()
221 curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
222 curl_cmd.setopt(pycurl.HTTPGET, 1)
223 curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
224 self._logger.info(
225 "Request METHOD: {} URL: {}".format("GET", self._url + endpoint)
226 )
227 curl_cmd.perform()
228 http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
229 self._logger.info("Response HTTPCODE: {}".format(http_code))
230 curl_cmd.close()
231 self.check_http_response(http_code, data)
232 if data.getvalue():
233 data_text = data.getvalue().decode()
234 self._logger.verbose("Response DATA: {}".format(data_text))
235 return http_code, data_text
236 return http_code, None
237
238 def check_http_response(self, http_code, data):
239 if http_code >= 300:
240 resp = ""
241 if data.getvalue():
242 data_text = data.getvalue().decode()
243 self._logger.verbose(
244 "Response {} DATA: {}".format(http_code, data_text)
245 )
246 resp = ": " + data_text
247 else:
248 self._logger.verbose("Response {}".format(http_code))
249 if http_code == 404:
250 raise NotFound("Error {}{}".format(http_code, resp))
251 raise OsmHttpException("Error {}{}".format(http_code, resp))
252
253 def set_query_admin(self, **kwargs):
254 if "all_projects" in kwargs:
255 self._all_projects = kwargs["all_projects"]
256 if "public" in kwargs:
257 self._public = kwargs["public"]
258 self._default_query_admin = self._complete_default_query_admin()