1 # Copyright 2018 Telefonica
5 # Licensed under the Apache License, Version 2.0 (the "License"); you may
6 # not use this file except in compliance with the License. You may obtain
7 # a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 # License for the specific language governing permissions and limitations
17 from io
import BytesIO
22 from osmclient
.common
import http
23 from osmclient
.common
.exceptions
import OsmHttpException
, NotFound
26 class Http(http
.Http
):
28 def __init__(self
, url
, user
='admin', password
='admin'):
31 self
._password
= password
32 self
._http
_header
= None
33 self
._logger
= logging
.getLogger('osmclient')
35 def _get_curl_cmd(self
, endpoint
):
36 self
._logger
.debug("")
37 curl_cmd
= pycurl
.Curl()
38 if self
._logger
.getEffectiveLevel() == logging
.DEBUG
:
39 curl_cmd
.setopt(pycurl
.VERBOSE
, True)
40 curl_cmd
.setopt(pycurl
.URL
, self
._url
+ endpoint
)
41 curl_cmd
.setopt(pycurl
.SSL_VERIFYPEER
, 0)
42 curl_cmd
.setopt(pycurl
.SSL_VERIFYHOST
, 0)
44 curl_cmd
.setopt(pycurl
.HTTPHEADER
, self
._http
_header
)
47 def delete_cmd(self
, endpoint
):
48 self
._logger
.debug("")
50 curl_cmd
= self
._get
_curl
_cmd
(endpoint
)
51 curl_cmd
.setopt(pycurl
.CUSTOMREQUEST
, "DELETE")
52 curl_cmd
.setopt(pycurl
.WRITEFUNCTION
, data
.write
)
53 self
._logger
.info("Request METHOD: {} URL: {}".format("DELETE",self
._url
+ endpoint
))
55 http_code
= curl_cmd
.getinfo(pycurl
.HTTP_CODE
)
56 self
._logger
.info("Response HTTPCODE: {}".format(http_code
))
58 self
.check_http_response(http_code
, data
)
59 # TODO 202 accepted should be returned somehow
61 data_text
= data
.getvalue().decode()
62 self
._logger
.verbose("Response DATA: {}".format(data_text
))
63 return http_code
, data_text
65 return http_code
, None
67 def send_cmd(self
, endpoint
='', postfields_dict
=None,
68 formfile
=None, filename
=None,
69 put_method
=False, patch_method
=False):
70 self
._logger
.debug("")
72 curl_cmd
= self
._get
_curl
_cmd
(endpoint
)
74 curl_cmd
.setopt(pycurl
.CUSTOMREQUEST
, "PUT")
76 curl_cmd
.setopt(pycurl
.CUSTOMREQUEST
, "PATCH")
77 curl_cmd
.setopt(pycurl
.POST
, 1)
78 curl_cmd
.setopt(pycurl
.WRITEFUNCTION
, data
.write
)
80 if postfields_dict
is not None:
81 jsondata
= json
.dumps(postfields_dict
)
82 if 'password' in postfields_dict
:
83 postfields_dict_copy
= copy
.deepcopy(postfields_dict
)
84 postfields_dict_copy
['password']='******'
85 jsondata_log
= json
.dumps(postfields_dict_copy
)
87 jsondata_log
= jsondata
88 self
._logger
.verbose("Request POSTFIELDS: {}".format(jsondata_log
))
89 curl_cmd
.setopt(pycurl
.POSTFIELDS
, jsondata
)
90 elif formfile
is not None:
96 elif filename
is not None:
97 with
open(filename
, 'rb') as stream
:
98 postdata
=stream
.read()
99 self
._logger
.verbose("Request POSTFIELDS: Binary content")
100 curl_cmd
.setopt(pycurl
.POSTFIELDS
, postdata
)
103 self
._logger
.info("Request METHOD: {} URL: {}".format("PUT",self
._url
+ endpoint
))
105 self
._logger
.info("Request METHOD: {} URL: {}".format("PATCH",self
._url
+ endpoint
))
107 self
._logger
.info("Request METHOD: {} URL: {}".format("POST",self
._url
+ endpoint
))
109 http_code
= curl_cmd
.getinfo(pycurl
.HTTP_CODE
)
110 self
._logger
.info("Response HTTPCODE: {}".format(http_code
))
112 self
.check_http_response(http_code
, data
)
114 data_text
= data
.getvalue().decode()
115 self
._logger
.verbose("Response DATA: {}".format(data_text
))
116 return http_code
, data_text
118 return http_code
, None
120 def post_cmd(self
, endpoint
='', postfields_dict
=None,
121 formfile
=None, filename
=None):
122 self
._logger
.debug("")
123 return self
.send_cmd(endpoint
=endpoint
,
124 postfields_dict
=postfields_dict
,
127 put_method
=False, patch_method
=False)
129 def put_cmd(self
, endpoint
='', postfields_dict
=None,
130 formfile
=None, filename
=None):
131 self
._logger
.debug("")
132 return self
.send_cmd(endpoint
=endpoint
,
133 postfields_dict
=postfields_dict
,
136 put_method
=True, patch_method
=False)
138 def patch_cmd(self
, endpoint
='', postfields_dict
=None,
139 formfile
=None, filename
=None):
140 self
._logger
.debug("")
141 return self
.send_cmd(endpoint
=endpoint
,
142 postfields_dict
=postfields_dict
,
145 put_method
=False, patch_method
=True)
147 def get2_cmd(self
, endpoint
):
148 self
._logger
.debug("")
150 curl_cmd
= self
._get
_curl
_cmd
(endpoint
)
151 curl_cmd
.setopt(pycurl
.HTTPGET
, 1)
152 curl_cmd
.setopt(pycurl
.WRITEFUNCTION
, data
.write
)
153 self
._logger
.info("Request METHOD: {} URL: {}".format("GET",self
._url
+ endpoint
))
155 http_code
= curl_cmd
.getinfo(pycurl
.HTTP_CODE
)
156 self
._logger
.info("Response HTTPCODE: {}".format(http_code
))
158 self
.check_http_response(http_code
, data
)
160 data_text
= data
.getvalue().decode()
161 self
._logger
.verbose("Response DATA: {}".format(data_text
))
162 return http_code
, data_text
163 return http_code
, None
165 def check_http_response(self
, http_code
, data
):
169 data_text
= data
.getvalue().decode()
170 self
._logger
.verbose("Response {} DATA: {}".format(http_code
, data_text
))
171 resp
= ": " + data_text
173 self
._logger
.verbose("Response {}".format(http_code
))
175 raise NotFound("Error {}{}".format(http_code
, resp
))
176 raise OsmHttpException("Error {}{}".format(http_code
, resp
))