Code Coverage

Cobertura Coverage Report > osmclient.sol005 >

http.py

Trend

Classes0%
 
Lines0%
 
Conditionals100%
 

File Coverage summary

NameClassesLinesConditionals
http.py
0%
0/1
0%
0/151
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
http.py
0%
0/151
N/A

Source

osmclient/sol005/http.py
1 # Copyright 2018 Telefonica
2 #
3 # All Rights Reserved.
4 #
5 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
6 #    not use this file except in compliance with the License. You may obtain
7 #    a copy of the License at
8 #
9 #         http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #    Unless required by applicable law or agreed to in writing, software
12 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 #    License for the specific language governing permissions and limitations
15 #    under the License.
16
17 0 import copy
18 0 from io import BytesIO
19 0 import json
20 0 import logging
21
22 0 from osmclient.common import http
23 0 from osmclient.common.exceptions import OsmHttpException, NotFound
24 0 import pycurl
25
26
27 0 class Http(http.Http):
28 0     CONNECT_TIMEOUT = 15
29
30 0     def __init__(self, url, user='admin', password='admin', **kwargs):
31 0         self._url = url
32 0         self._user = user
33 0         self._password = password
34 0         self._http_header = None
35 0         self._logger = logging.getLogger('osmclient')
36 0         self._default_query_admin = None
37 0         self._all_projects = None
38 0         self._public = None
39 0         if 'all_projects' in kwargs:
40 0             self._all_projects = kwargs['all_projects']
41 0         if 'public' in kwargs:
42 0             self._public = kwargs['public']
43 0         self._default_query_admin = self._complete_default_query_admin()
44
45 0     def _complete_default_query_admin(self):
46 0         query_string_list = []
47 0         if self._all_projects:
48 0             query_string_list.append("ADMIN")
49 0         if self._public is not None:
50 0             query_string_list.append("PUBLIC={}".format(self._public))
51 0         return "&".join(query_string_list)
52
53 0     def _complete_endpoint(self, endpoint):
54 0         if self._default_query_admin:
55 0             if '?' in endpoint:
56 0                 endpoint = '&'.join([endpoint, self._default_query_admin])
57             else:
58 0                 endpoint = '?'.join([endpoint, self._default_query_admin])
59 0         return endpoint
60
61 0     def _get_curl_cmd(self, endpoint, skip_query_admin=False):
62 0         self._logger.debug("")
63 0         curl_cmd = pycurl.Curl()
64 0         if self._logger.getEffectiveLevel() == logging.DEBUG:
65 0             curl_cmd.setopt(pycurl.VERBOSE, True)
66 0         if not skip_query_admin:
67 0             endpoint = self._complete_endpoint(endpoint)
68 0         curl_cmd.setopt(pycurl.CONNECTTIMEOUT, self.CONNECT_TIMEOUT)
69 0         curl_cmd.setopt(pycurl.URL, self._url + endpoint)
70 0         curl_cmd.setopt(pycurl.SSL_VERIFYPEER, 0)
71 0         curl_cmd.setopt(pycurl.SSL_VERIFYHOST, 0)
72 0         if self._http_header:
73 0             curl_cmd.setopt(pycurl.HTTPHEADER, self._http_header)
74 0         return curl_cmd
75
76 0     def delete_cmd(self, endpoint, skip_query_admin=False):
77 0         self._logger.debug("")
78 0         data = BytesIO()
79 0         curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
80 0         curl_cmd.setopt(pycurl.CUSTOMREQUEST, "DELETE")
81 0         curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
82 0         self._logger.info("Request METHOD: {} URL: {}".format("DELETE", self._url + endpoint))
83 0         curl_cmd.perform()
84 0         http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
85 0         self._logger.info("Response HTTPCODE: {}".format(http_code))
86 0         curl_cmd.close()
87 0         self.check_http_response(http_code, data)
88         # TODO 202 accepted should be returned somehow
89 0         if data.getvalue():
90 0             data_text = data.getvalue().decode()
91 0             self._logger.verbose("Response DATA: {}".format(data_text))
92 0             return http_code, data_text
93         else:
94 0             return http_code, None
95
96 0     def send_cmd(self, endpoint='', postfields_dict=None,
97                  formfile=None, filename=None,
98                  put_method=False, patch_method=False,
99                  skip_query_admin=False):
100 0         self._logger.debug("")
101 0         data = BytesIO()
102 0         curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
103 0         if put_method:
104 0             curl_cmd.setopt(pycurl.CUSTOMREQUEST, "PUT")
105 0         elif patch_method:
106 0             curl_cmd.setopt(pycurl.CUSTOMREQUEST, "PATCH")
107 0         curl_cmd.setopt(pycurl.POST, 1)
108 0         curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
109
110 0         if postfields_dict is not None:
111 0             jsondata = json.dumps(postfields_dict)
112 0             if 'password' in postfields_dict:
113 0                 postfields_dict_copy = copy.deepcopy(postfields_dict)
114 0                 postfields_dict_copy['password'] = '******'
115 0                 jsondata_log = json.dumps(postfields_dict_copy)
116             else:
117 0                 jsondata_log = jsondata
118 0             self._logger.verbose("Request POSTFIELDS: {}".format(jsondata_log))
119 0             curl_cmd.setopt(pycurl.POSTFIELDS, jsondata)
120 0         elif formfile is not None:
121 0             curl_cmd.setopt(
122                 pycurl.HTTPPOST,
123                 [((formfile[0],
124                            (pycurl.FORM_FILE,
125                             formfile[1])))])
126 0         elif filename is not None:
127 0             with open(filename, 'rb') as stream:
128 0                 postdata = stream.read()
129 0             self._logger.verbose("Request POSTFIELDS: Binary content")
130 0             curl_cmd.setopt(pycurl.POSTFIELDS, postdata)
131
132 0         if put_method:
133 0             self._logger.info("Request METHOD: {} URL: {}".format("PUT", self._url + endpoint))
134 0         elif patch_method:
135 0             self._logger.info("Request METHOD: {} URL: {}".format("PATCH", self._url + endpoint))
136         else:
137 0             self._logger.info("Request METHOD: {} URL: {}".format("POST", self._url + endpoint))
138 0         curl_cmd.perform()
139 0         http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
140 0         self._logger.info("Response HTTPCODE: {}".format(http_code))
141 0         curl_cmd.close()
142 0         self.check_http_response(http_code, data)
143 0         if data.getvalue():
144 0             data_text = data.getvalue().decode()
145 0             self._logger.verbose("Response DATA: {}".format(data_text))
146 0             return http_code, data_text
147         else:
148 0             return http_code, None
149
150 0     def post_cmd(self, endpoint='', postfields_dict=None,
151                  formfile=None, filename=None,
152                  skip_query_admin=False):
153 0         self._logger.debug("")
154 0         return self.send_cmd(endpoint=endpoint,
155                              postfields_dict=postfields_dict,
156                              formfile=formfile, filename=filename,
157                              put_method=False, patch_method=False,
158                              skip_query_admin=skip_query_admin)
159
160 0     def put_cmd(self, endpoint='', postfields_dict=None,
161                 formfile=None, filename=None,
162                 skip_query_admin=False):
163 0         self._logger.debug("")
164 0         return self.send_cmd(endpoint=endpoint,
165                              postfields_dict=postfields_dict,
166                              formfile=formfile, filename=filename,
167                              put_method=True, patch_method=False,
168                              skip_query_admin=skip_query_admin)
169
170 0     def patch_cmd(self, endpoint='', postfields_dict=None,
171                   formfile=None, filename=None,
172                   skip_query_admin=False):
173 0         self._logger.debug("")
174 0         return self.send_cmd(endpoint=endpoint,
175                              postfields_dict=postfields_dict,
176                              formfile=formfile, filename=filename,
177                              put_method=False, patch_method=True,
178                              skip_query_admin=skip_query_admin)
179
180 0     def get2_cmd(self, endpoint, skip_query_admin=False):
181 0         self._logger.debug("")
182 0         data = BytesIO()
183 0         curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
184 0         curl_cmd.setopt(pycurl.HTTPGET, 1)
185 0         curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
186 0         self._logger.info("Request METHOD: {} URL: {}".format("GET", self._url + endpoint))
187 0         curl_cmd.perform()
188 0         http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
189 0         self._logger.info("Response HTTPCODE: {}".format(http_code))
190 0         curl_cmd.close()
191 0         self.check_http_response(http_code, data)
192 0         if data.getvalue():
193 0             data_text = data.getvalue().decode()
194 0             self._logger.verbose("Response DATA: {}".format(data_text))
195 0             return http_code, data_text
196 0         return http_code, None
197
198 0     def check_http_response(self, http_code, data):
199 0         if http_code >= 300:
200 0             resp = ""
201 0             if data.getvalue():
202 0                 data_text = data.getvalue().decode()
203 0                 self._logger.verbose("Response {} DATA: {}".format(http_code, data_text))
204 0                 resp = ": " + data_text
205             else:
206 0                 self._logger.verbose("Response {}".format(http_code))
207 0             if http_code == 404:
208 0                 raise NotFound("Error {}{}".format(http_code, resp))
209 0             raise OsmHttpException("Error {}{}".format(http_code, resp))
210
211 0     def set_query_admin(self, **kwargs):
212 0         if 'all_projects' in kwargs:
213 0             self._all_projects = kwargs['all_projects']
214 0         if 'public' in kwargs:
215 0             self._public = kwargs['public']
216 0         self._default_query_admin = self._complete_default_query_admin()