blob: 7ffb99970f7598b26aea19985292a66add4d0e61 [file] [log] [blame]
velandy6364d012017-01-04 19:25:07 +00001#
2# Copyright 2016 RIFT.IO Inc
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# Author(s): Varun Prasad
17# Creation Date: 09/25/2016
18#
19
20import io
21import logging
22import os
23import tempfile
24import threading
25import time
velandy6364d012017-01-04 19:25:07 +000026import zlib
27
28import requests
29import requests.exceptions
30from requests.adapters import HTTPAdapter
31from requests.packages.urllib3.util.retry import Retry
32# disable unsigned certificate warning
33from requests.packages.urllib3.exceptions import InsecureRequestWarning
34requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
35
velandy6364d012017-01-04 19:25:07 +000036from . import base
37
velandy6364d012017-01-04 19:25:07 +000038class UrlDownloader(base.AbstractDownloader):
39 """Handles downloads of URL with some basic retry strategy.
40 """
41 def __init__(self,
42 url,
43 file_obj=None,
44 auth=None,
45 delete_on_fail=True,
46 decompress_on_fly=False,
47 log=None):
48 """
49 Args:
50 model (str or DownloadJob): Url string to download or the Yang model
51 file_obj (str,file handle): Optional, If not set we create a temp
52 location to store the file.
53 delete_on_fail (bool, optional): Clean up the partially downloaded
54 file, if the download failed or was canceled
55 callback_handler (None, optional): Instance of base.DownloaderCallbackHandler
56 """
57 super().__init__()
58
59 self.log = log or logging.getLogger()
60 self.log.setLevel(logging.DEBUG)
61
62 self._fh, filename = self._validate_fn(file_obj)
63 self.meta = base.DownloadMeta(url, filename)
64
65 self.session = self._create_session()
66 self._cancel_event = threading.Event()
67 self.auth = auth
68
69 self.delete_on_fail = delete_on_fail
70
71 self.decompress_on_fly = decompress_on_fly
72 self._decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS)
73
74 def __repr__(self):
75 data = {"model": self.meta.as_dict()}
76 return str(data)
77
78 def _validate_fn(self, file_obj):
79 """
80 If no file object is given then create a temp file
81 if a filename is given open the file in wb mode
82
83 Finally verify the mode open mode of the file
84
85 """
86 if file_obj is None:
87 _, file_obj = tempfile.mkstemp()
88 # Reopen in wb mode
89 file_obj = open(file_obj, "wb")
90
91 # If the fh is a filename
92 if type(file_obj) is str:
93 file_obj = open(file_obj, "wb")
94
95 if type(file_obj) is not io.BufferedWriter:
96 raise base.InvalidDestinationError("Destination file cannot be"
97 "opened for write")
98
99 return file_obj, file_obj.name
100
101 def _create_session(self):
102 session = requests.Session()
Chamartye43e0142017-04-07 11:37:31 -0400103 # 3 connection attempts should be more than enough, We can't wait forever!
104 # The user needs to be updated of the status
105 retries = Retry(total=2, backoff_factor=1)
velandy6364d012017-01-04 19:25:07 +0000106 session.mount("http://", HTTPAdapter(max_retries=retries))
107 session.mount("https://", HTTPAdapter(max_retries=retries))
108
109 return session
110
111 def update_data_from_headers(self, headers):
112 """Update the model from the header of HEAD request
113
114 Args:
115 headers (dict): headers from HEAD response
116 """
117 self.meta.bytes_total = 0
118 if 'Content-Length' in headers:
119 self.meta.bytes_total = int(headers['Content-Length'])
120 self.meta.progress_percent = 0
121 self.meta.bytes_downloaded = 0
122
123 @property
124 def url(self):
125 return self.meta.url
126
127 @property
128 def filepath(self):
129 return self.meta.filepath
130
131 # Start of override methods
132 @property
133 def download_id(self):
134 return self.meta.download_id
135
136 def cancel_download(self):
137 self._cancel_event.set()
138
139 def close(self):
140 self.session.close()
141 self._fh.close()
142
143 def cleanup(self):
144 """Remove the file if the download failed.
145 """
146 if self.meta.status in [base.DownloadStatus.FAILED, base.DownloadStatus.CANCELLED] and self.delete_on_fail:
147 self.log.info("Cleaning up failed download and removing {}".format(
148 self.filepath))
149
150 try:
151 os.remove(self.filepath)
Chamartye43e0142017-04-07 11:37:31 -0400152 except Exception:
153 pass
velandy6364d012017-01-04 19:25:07 +0000154
155 def download(self):
156 """Start the download
157
158 Trigger an HEAD request to get the meta data before starting the download
159 """
160 try:
161 self._download()
162 except Exception as e:
163 self.log.exception(str(e))
164 self.meta.detail = str(e)
165 self.meta.stop_time = time.time()
166
167 self.download_failed()
168
169 # Close all file handles and clean up
170 self.close()
171 self.cleanup()
172
173 self.download_finished()
174
175 # end of override methods
176
177 def check_and_decompress(self, chunk):
178 if self.url.endswith(".gz") and self.decompress_on_fly:
179 chunk = self._decompressor.decompress(chunk)
180
181 return chunk
182
183 def _download(self):
184
Chamartye43e0142017-04-07 11:37:31 -0400185 url_options = {"verify": False, "timeout": 1}
velandy6364d012017-01-04 19:25:07 +0000186
187 if self.auth is not None:
188 url_options["auth"] = self.auth
189
190 response = self.session.head(self.url, **url_options)
191
Chamartye43e0142017-04-07 11:37:31 -0400192 if response.status_code != requests.codes.ok:
193 response.raise_for_status()
194
velandy6364d012017-01-04 19:25:07 +0000195 # Prepare the meta data
196 self.meta.update_data_with_head(response.headers)
197 self.meta.start_download()
198
199 self.download_started()
200
201 url_options["stream"] = True,
202 request = self.session.get(self.url, **url_options)
203
204 if request.status_code != requests.codes.ok:
205 request.raise_for_status()
206
207 # actual start time, excluding the HEAD request.
208 for chunk in request.iter_content(chunk_size=1024 * 50):
209 if self._cancel_event.is_set():
210 self.log.info("Download of URL {} to {} has been cancelled".format(
211 self.url, self.filepath))
212 break
213
214 if chunk: # filter out keep-alive new chunks
215 self.meta.update_with_data(chunk)
216 self.log.debug("Download progress: {}".format(self.meta.as_dict()))
217
218 chunk = self.check_and_decompress(chunk)
219
220 self._fh.write(chunk)
221 self.download_progress()
222
223 self.meta.end_download()
224 self.close()
225
226 if self._cancel_event.is_set():
227 self.download_cancelled()
228 else:
229 self.download_succeeded()
230
231 self.cleanup()
232
233 # Start of delegate calls
234 def call_delegate(self, event):
235 if not self.delegate:
236 return
237
238 getattr(self.delegate, event)(self.meta)
239
240 def download_failed(self):
241 self.meta.set_state(base.DownloadStatus.FAILED)
242 self.call_delegate("on_download_failed")
243
244 def download_cancelled(self):
245 self.meta.detail = "Download canceled by user."
246 self.meta.set_state(base.DownloadStatus.CANCELLED)
247 self.call_delegate("on_download_cancelled")
248
249 def download_progress(self):
250 self.meta.detail = "Download in progress."
251 self.meta.set_state(base.DownloadStatus.IN_PROGRESS)
252 self.call_delegate("on_download_progress")
253
254 def download_succeeded(self):
255 self.meta.detail = "Download completed successfully."
256 self.meta.set_state(base.DownloadStatus.COMPLETED)
257 self.call_delegate("on_download_succeeded")
258
259 def download_started(self):
260 self.meta.detail = "Setting up download and extracting meta."
261 self.meta.set_state(base.DownloadStatus.STARTED)
262 self.call_delegate("on_download_started")
263
264 def download_finished(self):
265 self.call_delegate("on_download_finished")