blob: 3aa9ac27957cd7150956472a13bafd693b33629e [file] [log] [blame]
velandy6364d012017-01-04 19:25:07 +00001#
2# Copyright 2016 RIFT.IO Inc
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# Author(s): Varun Prasad
17# Creation Date: 09/25/2016
18#
19
20import io
21import logging
22import os
23import tempfile
24import threading
25import time
velandy6364d012017-01-04 19:25:07 +000026import zlib
27
28import requests
29import requests.exceptions
30from requests.adapters import HTTPAdapter
31from requests.packages.urllib3.util.retry import Retry
32# disable unsigned certificate warning
33from requests.packages.urllib3.exceptions import InsecureRequestWarning
34requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
35
velandy6364d012017-01-04 19:25:07 +000036from . import base
Jeremy Mordkoff4870d0e2017-09-30 20:28:33 -040037from .local_file import LocalFileAdapter as LocalFileAdapter
velandy6364d012017-01-04 19:25:07 +000038
velandy6364d012017-01-04 19:25:07 +000039class UrlDownloader(base.AbstractDownloader):
40 """Handles downloads of URL with some basic retry strategy.
41 """
42 def __init__(self,
43 url,
44 file_obj=None,
45 auth=None,
46 delete_on_fail=True,
47 decompress_on_fly=False,
48 log=None):
49 """
50 Args:
51 model (str or DownloadJob): Url string to download or the Yang model
52 file_obj (str,file handle): Optional, If not set we create a temp
53 location to store the file.
54 delete_on_fail (bool, optional): Clean up the partially downloaded
55 file, if the download failed or was canceled
56 callback_handler (None, optional): Instance of base.DownloaderCallbackHandler
57 """
58 super().__init__()
59
60 self.log = log or logging.getLogger()
61 self.log.setLevel(logging.DEBUG)
62
63 self._fh, filename = self._validate_fn(file_obj)
64 self.meta = base.DownloadMeta(url, filename)
65
66 self.session = self._create_session()
67 self._cancel_event = threading.Event()
68 self.auth = auth
69
70 self.delete_on_fail = delete_on_fail
71
72 self.decompress_on_fly = decompress_on_fly
73 self._decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS)
74
75 def __repr__(self):
76 data = {"model": self.meta.as_dict()}
77 return str(data)
78
79 def _validate_fn(self, file_obj):
80 """
81 If no file object is given then create a temp file
82 if a filename is given open the file in wb mode
83
84 Finally verify the mode open mode of the file
85
86 """
87 if file_obj is None:
88 _, file_obj = tempfile.mkstemp()
89 # Reopen in wb mode
90 file_obj = open(file_obj, "wb")
91
92 # If the fh is a filename
93 if type(file_obj) is str:
94 file_obj = open(file_obj, "wb")
95
96 if type(file_obj) is not io.BufferedWriter:
97 raise base.InvalidDestinationError("Destination file cannot be"
98 "opened for write")
99
100 return file_obj, file_obj.name
101
102 def _create_session(self):
103 session = requests.Session()
Chamartye43e0142017-04-07 11:37:31 -0400104 # 3 connection attempts should be more than enough, We can't wait forever!
105 # The user needs to be updated of the status
106 retries = Retry(total=2, backoff_factor=1)
velandy6364d012017-01-04 19:25:07 +0000107 session.mount("http://", HTTPAdapter(max_retries=retries))
108 session.mount("https://", HTTPAdapter(max_retries=retries))
Jeremy Mordkoff4870d0e2017-09-30 20:28:33 -0400109 session.mount("file://", LocalFileAdapter())
velandy6364d012017-01-04 19:25:07 +0000110
111 return session
112
113 def update_data_from_headers(self, headers):
114 """Update the model from the header of HEAD request
115
116 Args:
117 headers (dict): headers from HEAD response
118 """
119 self.meta.bytes_total = 0
120 if 'Content-Length' in headers:
121 self.meta.bytes_total = int(headers['Content-Length'])
122 self.meta.progress_percent = 0
123 self.meta.bytes_downloaded = 0
124
125 @property
126 def url(self):
127 return self.meta.url
128
129 @property
130 def filepath(self):
131 return self.meta.filepath
132
133 # Start of override methods
134 @property
135 def download_id(self):
136 return self.meta.download_id
137
138 def cancel_download(self):
139 self._cancel_event.set()
140
141 def close(self):
142 self.session.close()
143 self._fh.close()
144
145 def cleanup(self):
146 """Remove the file if the download failed.
147 """
148 if self.meta.status in [base.DownloadStatus.FAILED, base.DownloadStatus.CANCELLED] and self.delete_on_fail:
149 self.log.info("Cleaning up failed download and removing {}".format(
150 self.filepath))
151
152 try:
153 os.remove(self.filepath)
Chamartye43e0142017-04-07 11:37:31 -0400154 except Exception:
155 pass
velandy6364d012017-01-04 19:25:07 +0000156
157 def download(self):
158 """Start the download
159
160 Trigger an HEAD request to get the meta data before starting the download
161 """
162 try:
163 self._download()
164 except Exception as e:
165 self.log.exception(str(e))
166 self.meta.detail = str(e)
167 self.meta.stop_time = time.time()
168
169 self.download_failed()
170
171 # Close all file handles and clean up
172 self.close()
173 self.cleanup()
174
175 self.download_finished()
176
177 # end of override methods
178
179 def check_and_decompress(self, chunk):
180 if self.url.endswith(".gz") and self.decompress_on_fly:
181 chunk = self._decompressor.decompress(chunk)
182
183 return chunk
184
185 def _download(self):
186
Hashir Mohammed5e9506c2017-06-02 02:35:20 -0400187 url_options = {"verify": False, "timeout": 10}
velandy6364d012017-01-04 19:25:07 +0000188
189 if self.auth is not None:
190 url_options["auth"] = self.auth
191
192 response = self.session.head(self.url, **url_options)
193
Chamartye43e0142017-04-07 11:37:31 -0400194 if response.status_code != requests.codes.ok:
195 response.raise_for_status()
196
velandy6364d012017-01-04 19:25:07 +0000197 # Prepare the meta data
198 self.meta.update_data_with_head(response.headers)
199 self.meta.start_download()
200
Jeremy Mordkoff4870d0e2017-09-30 20:28:33 -0400201 self.download_progress()
velandy6364d012017-01-04 19:25:07 +0000202
203 url_options["stream"] = True,
204 request = self.session.get(self.url, **url_options)
205
206 if request.status_code != requests.codes.ok:
207 request.raise_for_status()
208
209 # actual start time, excluding the HEAD request.
210 for chunk in request.iter_content(chunk_size=1024 * 50):
211 if self._cancel_event.is_set():
212 self.log.info("Download of URL {} to {} has been cancelled".format(
213 self.url, self.filepath))
214 break
215
216 if chunk: # filter out keep-alive new chunks
217 self.meta.update_with_data(chunk)
218 self.log.debug("Download progress: {}".format(self.meta.as_dict()))
219
220 chunk = self.check_and_decompress(chunk)
221
222 self._fh.write(chunk)
Jeremy Mordkoff4870d0e2017-09-30 20:28:33 -0400223 #self.download_progress()
velandy6364d012017-01-04 19:25:07 +0000224
225 self.meta.end_download()
226 self.close()
227
228 if self._cancel_event.is_set():
229 self.download_cancelled()
230 else:
231 self.download_succeeded()
232
233 self.cleanup()
234
235 # Start of delegate calls
236 def call_delegate(self, event):
237 if not self.delegate:
238 return
239
240 getattr(self.delegate, event)(self.meta)
241
242 def download_failed(self):
243 self.meta.set_state(base.DownloadStatus.FAILED)
244 self.call_delegate("on_download_failed")
245
246 def download_cancelled(self):
247 self.meta.detail = "Download canceled by user."
248 self.meta.set_state(base.DownloadStatus.CANCELLED)
249 self.call_delegate("on_download_cancelled")
250
251 def download_progress(self):
252 self.meta.detail = "Download in progress."
253 self.meta.set_state(base.DownloadStatus.IN_PROGRESS)
254 self.call_delegate("on_download_progress")
255
256 def download_succeeded(self):
257 self.meta.detail = "Download completed successfully."
258 self.meta.set_state(base.DownloadStatus.COMPLETED)
259 self.call_delegate("on_download_succeeded")
260
261 def download_started(self):
262 self.meta.detail = "Setting up download and extracting meta."
263 self.meta.set_state(base.DownloadStatus.STARTED)
264 self.call_delegate("on_download_started")
265
266 def download_finished(self):
267 self.call_delegate("on_download_finished")