Fix connection timeouts in URL downloader
[osm/SO.git] / common / python / rift / downloader / url.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # Author(s): Varun Prasad
17 # Creation Date: 09/25/2016
18 #
19
20 import io
21 import logging
22 import os
23 import tempfile
24 import threading
25 import time
26 import zlib
27
28 import requests
29 import requests.exceptions
30 from requests.adapters import HTTPAdapter
31 from requests.packages.urllib3.util.retry import Retry
32 # disable unsigned certificate warning
33 from requests.packages.urllib3.exceptions import InsecureRequestWarning
34 requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
35
36 from . import base
37
38 class UrlDownloader(base.AbstractDownloader):
39 """Handles downloads of URL with some basic retry strategy.
40 """
41 def __init__(self,
42 url,
43 file_obj=None,
44 auth=None,
45 delete_on_fail=True,
46 decompress_on_fly=False,
47 log=None):
48 """
49 Args:
50 model (str or DownloadJob): Url string to download or the Yang model
51 file_obj (str,file handle): Optional, If not set we create a temp
52 location to store the file.
53 delete_on_fail (bool, optional): Clean up the partially downloaded
54 file, if the download failed or was canceled
55 callback_handler (None, optional): Instance of base.DownloaderCallbackHandler
56 """
57 super().__init__()
58
59 self.log = log or logging.getLogger()
60 self.log.setLevel(logging.DEBUG)
61
62 self._fh, filename = self._validate_fn(file_obj)
63 self.meta = base.DownloadMeta(url, filename)
64
65 self.session = self._create_session()
66 self._cancel_event = threading.Event()
67 self.auth = auth
68
69 self.delete_on_fail = delete_on_fail
70
71 self.decompress_on_fly = decompress_on_fly
72 self._decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS)
73
74 def __repr__(self):
75 data = {"model": self.meta.as_dict()}
76 return str(data)
77
78 def _validate_fn(self, file_obj):
79 """
80 If no file object is given then create a temp file
81 if a filename is given open the file in wb mode
82
83 Finally verify the mode open mode of the file
84
85 """
86 if file_obj is None:
87 _, file_obj = tempfile.mkstemp()
88 # Reopen in wb mode
89 file_obj = open(file_obj, "wb")
90
91 # If the fh is a filename
92 if type(file_obj) is str:
93 file_obj = open(file_obj, "wb")
94
95 if type(file_obj) is not io.BufferedWriter:
96 raise base.InvalidDestinationError("Destination file cannot be"
97 "opened for write")
98
99 return file_obj, file_obj.name
100
101 def _create_session(self):
102 session = requests.Session()
103 # 3 connection attempts should be more than enough, We can't wait forever!
104 # The user needs to be updated of the status
105 retries = Retry(total=2, backoff_factor=1)
106 session.mount("http://", HTTPAdapter(max_retries=retries))
107 session.mount("https://", HTTPAdapter(max_retries=retries))
108
109 return session
110
111 def update_data_from_headers(self, headers):
112 """Update the model from the header of HEAD request
113
114 Args:
115 headers (dict): headers from HEAD response
116 """
117 self.meta.bytes_total = 0
118 if 'Content-Length' in headers:
119 self.meta.bytes_total = int(headers['Content-Length'])
120 self.meta.progress_percent = 0
121 self.meta.bytes_downloaded = 0
122
123 @property
124 def url(self):
125 return self.meta.url
126
127 @property
128 def filepath(self):
129 return self.meta.filepath
130
131 # Start of override methods
132 @property
133 def download_id(self):
134 return self.meta.download_id
135
136 def cancel_download(self):
137 self._cancel_event.set()
138
139 def close(self):
140 self.session.close()
141 self._fh.close()
142
143 def cleanup(self):
144 """Remove the file if the download failed.
145 """
146 if self.meta.status in [base.DownloadStatus.FAILED, base.DownloadStatus.CANCELLED] and self.delete_on_fail:
147 self.log.info("Cleaning up failed download and removing {}".format(
148 self.filepath))
149
150 try:
151 os.remove(self.filepath)
152 except Exception:
153 pass
154
155 def download(self):
156 """Start the download
157
158 Trigger an HEAD request to get the meta data before starting the download
159 """
160 try:
161 self._download()
162 except Exception as e:
163 self.log.exception(str(e))
164 self.meta.detail = str(e)
165 self.meta.stop_time = time.time()
166
167 self.download_failed()
168
169 # Close all file handles and clean up
170 self.close()
171 self.cleanup()
172
173 self.download_finished()
174
175 # end of override methods
176
177 def check_and_decompress(self, chunk):
178 if self.url.endswith(".gz") and self.decompress_on_fly:
179 chunk = self._decompressor.decompress(chunk)
180
181 return chunk
182
183 def _download(self):
184
185 url_options = {"verify": False, "timeout": 1}
186
187 if self.auth is not None:
188 url_options["auth"] = self.auth
189
190 response = self.session.head(self.url, **url_options)
191
192 if response.status_code != requests.codes.ok:
193 response.raise_for_status()
194
195 # Prepare the meta data
196 self.meta.update_data_with_head(response.headers)
197 self.meta.start_download()
198
199 self.download_started()
200
201 url_options["stream"] = True,
202 request = self.session.get(self.url, **url_options)
203
204 if request.status_code != requests.codes.ok:
205 request.raise_for_status()
206
207 # actual start time, excluding the HEAD request.
208 for chunk in request.iter_content(chunk_size=1024 * 50):
209 if self._cancel_event.is_set():
210 self.log.info("Download of URL {} to {} has been cancelled".format(
211 self.url, self.filepath))
212 break
213
214 if chunk: # filter out keep-alive new chunks
215 self.meta.update_with_data(chunk)
216 self.log.debug("Download progress: {}".format(self.meta.as_dict()))
217
218 chunk = self.check_and_decompress(chunk)
219
220 self._fh.write(chunk)
221 self.download_progress()
222
223 self.meta.end_download()
224 self.close()
225
226 if self._cancel_event.is_set():
227 self.download_cancelled()
228 else:
229 self.download_succeeded()
230
231 self.cleanup()
232
233 # Start of delegate calls
234 def call_delegate(self, event):
235 if not self.delegate:
236 return
237
238 getattr(self.delegate, event)(self.meta)
239
240 def download_failed(self):
241 self.meta.set_state(base.DownloadStatus.FAILED)
242 self.call_delegate("on_download_failed")
243
244 def download_cancelled(self):
245 self.meta.detail = "Download canceled by user."
246 self.meta.set_state(base.DownloadStatus.CANCELLED)
247 self.call_delegate("on_download_cancelled")
248
249 def download_progress(self):
250 self.meta.detail = "Download in progress."
251 self.meta.set_state(base.DownloadStatus.IN_PROGRESS)
252 self.call_delegate("on_download_progress")
253
254 def download_succeeded(self):
255 self.meta.detail = "Download completed successfully."
256 self.meta.set_state(base.DownloadStatus.COMPLETED)
257 self.call_delegate("on_download_succeeded")
258
259 def download_started(self):
260 self.meta.detail = "Setting up download and extracting meta."
261 self.meta.set_state(base.DownloadStatus.STARTED)
262 self.call_delegate("on_download_started")
263
264 def download_finished(self):
265 self.call_delegate("on_download_finished")