2 # Copyright 2016 RIFT.IO Inc
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 # Author(s): Varun Prasad
17 # Creation Date: 09/25/2016
29 import requests
.exceptions
30 from requests
.adapters
import HTTPAdapter
31 from requests
.packages
.urllib3
.util
.retry
import Retry
32 # disable unsigned certificate warning
33 from requests
.packages
.urllib3
.exceptions
import InsecureRequestWarning
34 requests
.packages
.urllib3
.disable_warnings(InsecureRequestWarning
)
38 class UrlDownloader(base
.AbstractDownloader
):
39 """Handles downloads of URL with some basic retry strategy.
46 decompress_on_fly
=False,
50 model (str or DownloadJob): Url string to download or the Yang model
51 file_obj (str,file handle): Optional, If not set we create a temp
52 location to store the file.
53 delete_on_fail (bool, optional): Clean up the partially downloaded
54 file, if the download failed or was canceled
55 callback_handler (None, optional): Instance of base.DownloaderCallbackHandler
59 self
.log
= log
or logging
.getLogger()
60 self
.log
.setLevel(logging
.DEBUG
)
62 self
._fh
, filename
= self
._validate
_fn
(file_obj
)
63 self
.meta
= base
.DownloadMeta(url
, filename
)
65 self
.session
= self
._create
_session
()
66 self
._cancel
_event
= threading
.Event()
69 self
.delete_on_fail
= delete_on_fail
71 self
.decompress_on_fly
= decompress_on_fly
72 self
._decompressor
= zlib
.decompressobj(16 + zlib
.MAX_WBITS
)
75 data
= {"model": self
.meta
.as_dict()}
78 def _validate_fn(self
, file_obj
):
80 If no file object is given then create a temp file
81 if a filename is given open the file in wb mode
83 Finally verify the mode open mode of the file
87 _
, file_obj
= tempfile
.mkstemp()
89 file_obj
= open(file_obj
, "wb")
91 # If the fh is a filename
92 if type(file_obj
) is str:
93 file_obj
= open(file_obj
, "wb")
95 if type(file_obj
) is not io
.BufferedWriter
:
96 raise base
.InvalidDestinationError("Destination file cannot be"
99 return file_obj
, file_obj
.name
101 def _create_session(self
):
102 session
= requests
.Session()
103 # 3 connection attempts should be more than enough, We can't wait forever!
104 # The user needs to be updated of the status
105 retries
= Retry(total
=2, backoff_factor
=1)
106 session
.mount("http://", HTTPAdapter(max_retries
=retries
))
107 session
.mount("https://", HTTPAdapter(max_retries
=retries
))
111 def update_data_from_headers(self
, headers
):
112 """Update the model from the header of HEAD request
115 headers (dict): headers from HEAD response
117 self
.meta
.bytes_total
= 0
118 if 'Content-Length' in headers
:
119 self
.meta
.bytes_total
= int(headers
['Content-Length'])
120 self
.meta
.progress_percent
= 0
121 self
.meta
.bytes_downloaded
= 0
129 return self
.meta
.filepath
131 # Start of override methods
133 def download_id(self
):
134 return self
.meta
.download_id
136 def cancel_download(self
):
137 self
._cancel
_event
.set()
144 """Remove the file if the download failed.
146 if self
.meta
.status
in [base
.DownloadStatus
.FAILED
, base
.DownloadStatus
.CANCELLED
] and self
.delete_on_fail
:
147 self
.log
.info("Cleaning up failed download and removing {}".format(
151 os
.remove(self
.filepath
)
156 """Start the download
158 Trigger an HEAD request to get the meta data before starting the download
162 except Exception as e
:
163 self
.log
.exception(str(e
))
164 self
.meta
.detail
= str(e
)
165 self
.meta
.stop_time
= time
.time()
167 self
.download_failed()
169 # Close all file handles and clean up
173 self
.download_finished()
175 # end of override methods
177 def check_and_decompress(self
, chunk
):
178 if self
.url
.endswith(".gz") and self
.decompress_on_fly
:
179 chunk
= self
._decompressor
.decompress(chunk
)
185 url_options
= {"verify": False, "timeout": 10}
187 if self
.auth
is not None:
188 url_options
["auth"] = self
.auth
190 response
= self
.session
.head(self
.url
, **url_options
)
192 if response
.status_code
!= requests
.codes
.ok
:
193 response
.raise_for_status()
195 # Prepare the meta data
196 self
.meta
.update_data_with_head(response
.headers
)
197 self
.meta
.start_download()
199 self
.download_started()
201 url_options
["stream"] = True,
202 request
= self
.session
.get(self
.url
, **url_options
)
204 if request
.status_code
!= requests
.codes
.ok
:
205 request
.raise_for_status()
207 # actual start time, excluding the HEAD request.
208 for chunk
in request
.iter_content(chunk_size
=1024 * 50):
209 if self
._cancel
_event
.is_set():
210 self
.log
.info("Download of URL {} to {} has been cancelled".format(
211 self
.url
, self
.filepath
))
214 if chunk
: # filter out keep-alive new chunks
215 self
.meta
.update_with_data(chunk
)
216 self
.log
.debug("Download progress: {}".format(self
.meta
.as_dict()))
218 chunk
= self
.check_and_decompress(chunk
)
220 self
._fh
.write(chunk
)
221 self
.download_progress()
223 self
.meta
.end_download()
226 if self
._cancel
_event
.is_set():
227 self
.download_cancelled()
229 self
.download_succeeded()
233 # Start of delegate calls
234 def call_delegate(self
, event
):
235 if not self
.delegate
:
238 getattr(self
.delegate
, event
)(self
.meta
)
240 def download_failed(self
):
241 self
.meta
.set_state(base
.DownloadStatus
.FAILED
)
242 self
.call_delegate("on_download_failed")
244 def download_cancelled(self
):
245 self
.meta
.detail
= "Download canceled by user."
246 self
.meta
.set_state(base
.DownloadStatus
.CANCELLED
)
247 self
.call_delegate("on_download_cancelled")
249 def download_progress(self
):
250 self
.meta
.detail
= "Download in progress."
251 self
.meta
.set_state(base
.DownloadStatus
.IN_PROGRESS
)
252 self
.call_delegate("on_download_progress")
254 def download_succeeded(self
):
255 self
.meta
.detail
= "Download completed successfully."
256 self
.meta
.set_state(base
.DownloadStatus
.COMPLETED
)
257 self
.call_delegate("on_download_succeeded")
259 def download_started(self
):
260 self
.meta
.detail
= "Setting up download and extracting meta."
261 self
.meta
.set_state(base
.DownloadStatus
.STARTED
)
262 self
.call_delegate("on_download_started")
264 def download_finished(self
):
265 self
.call_delegate("on_download_finished")