| velandy | 6364d01 | 2017-01-04 19:25:07 +0000 | [diff] [blame] | 1 | # |
| 2 | # Copyright 2016 RIFT.IO Inc |
| 3 | # |
| 4 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | # you may not use this file except in compliance with the License. |
| 6 | # You may obtain a copy of the License at |
| 7 | # |
| 8 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | # |
| 10 | # Unless required by applicable law or agreed to in writing, software |
| 11 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | # See the License for the specific language governing permissions and |
| 14 | # limitations under the License. |
| 15 | # |
| 16 | # Author(s): Varun Prasad |
| 17 | # Creation Date: 09/25/2016 |
| 18 | # |
| 19 | |
| 20 | import io |
| 21 | import logging |
| 22 | import os |
| 23 | import tempfile |
| 24 | import threading |
| 25 | import time |
| velandy | 6364d01 | 2017-01-04 19:25:07 +0000 | [diff] [blame] | 26 | import zlib |
| 27 | |
| 28 | import requests |
| 29 | import requests.exceptions |
| 30 | from requests.adapters import HTTPAdapter |
| 31 | from requests.packages.urllib3.util.retry import Retry |
| 32 | # disable unsigned certificate warning |
| 33 | from requests.packages.urllib3.exceptions import InsecureRequestWarning |
| 34 | requests.packages.urllib3.disable_warnings(InsecureRequestWarning) |
| 35 | |
| velandy | 6364d01 | 2017-01-04 19:25:07 +0000 | [diff] [blame] | 36 | from . import base |
| Jeremy Mordkoff | 4870d0e | 2017-09-30 20:28:33 -0400 | [diff] [blame] | 37 | from .local_file import LocalFileAdapter as LocalFileAdapter |
| velandy | 6364d01 | 2017-01-04 19:25:07 +0000 | [diff] [blame] | 38 | |
| velandy | 6364d01 | 2017-01-04 19:25:07 +0000 | [diff] [blame] | 39 | class UrlDownloader(base.AbstractDownloader): |
| 40 | """Handles downloads of URL with some basic retry strategy. |
| 41 | """ |
| 42 | def __init__(self, |
| 43 | url, |
| 44 | file_obj=None, |
| 45 | auth=None, |
| 46 | delete_on_fail=True, |
| 47 | decompress_on_fly=False, |
| 48 | log=None): |
| 49 | """ |
| 50 | Args: |
| 51 | model (str or DownloadJob): Url string to download or the Yang model |
| 52 | file_obj (str,file handle): Optional, If not set we create a temp |
| 53 | location to store the file. |
| 54 | delete_on_fail (bool, optional): Clean up the partially downloaded |
| 55 | file, if the download failed or was canceled |
| 56 | callback_handler (None, optional): Instance of base.DownloaderCallbackHandler |
| 57 | """ |
| 58 | super().__init__() |
| 59 | |
| 60 | self.log = log or logging.getLogger() |
| 61 | self.log.setLevel(logging.DEBUG) |
| 62 | |
| 63 | self._fh, filename = self._validate_fn(file_obj) |
| 64 | self.meta = base.DownloadMeta(url, filename) |
| 65 | |
| 66 | self.session = self._create_session() |
| 67 | self._cancel_event = threading.Event() |
| 68 | self.auth = auth |
| 69 | |
| 70 | self.delete_on_fail = delete_on_fail |
| 71 | |
| 72 | self.decompress_on_fly = decompress_on_fly |
| 73 | self._decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS) |
| 74 | |
| 75 | def __repr__(self): |
| 76 | data = {"model": self.meta.as_dict()} |
| 77 | return str(data) |
| 78 | |
| 79 | def _validate_fn(self, file_obj): |
| 80 | """ |
| 81 | If no file object is given then create a temp file |
| 82 | if a filename is given open the file in wb mode |
| 83 | |
| 84 | Finally verify the mode open mode of the file |
| 85 | |
| 86 | """ |
| 87 | if file_obj is None: |
| 88 | _, file_obj = tempfile.mkstemp() |
| 89 | # Reopen in wb mode |
| 90 | file_obj = open(file_obj, "wb") |
| 91 | |
| 92 | # If the fh is a filename |
| 93 | if type(file_obj) is str: |
| 94 | file_obj = open(file_obj, "wb") |
| 95 | |
| 96 | if type(file_obj) is not io.BufferedWriter: |
| 97 | raise base.InvalidDestinationError("Destination file cannot be" |
| 98 | "opened for write") |
| 99 | |
| 100 | return file_obj, file_obj.name |
| 101 | |
| 102 | def _create_session(self): |
| 103 | session = requests.Session() |
| Chamarty | e43e014 | 2017-04-07 11:37:31 -0400 | [diff] [blame] | 104 | # 3 connection attempts should be more than enough, We can't wait forever! |
| 105 | # The user needs to be updated of the status |
| 106 | retries = Retry(total=2, backoff_factor=1) |
| velandy | 6364d01 | 2017-01-04 19:25:07 +0000 | [diff] [blame] | 107 | session.mount("http://", HTTPAdapter(max_retries=retries)) |
| 108 | session.mount("https://", HTTPAdapter(max_retries=retries)) |
| Jeremy Mordkoff | 4870d0e | 2017-09-30 20:28:33 -0400 | [diff] [blame] | 109 | session.mount("file://", LocalFileAdapter()) |
| velandy | 6364d01 | 2017-01-04 19:25:07 +0000 | [diff] [blame] | 110 | |
| 111 | return session |
| 112 | |
| 113 | def update_data_from_headers(self, headers): |
| 114 | """Update the model from the header of HEAD request |
| 115 | |
| 116 | Args: |
| 117 | headers (dict): headers from HEAD response |
| 118 | """ |
| 119 | self.meta.bytes_total = 0 |
| 120 | if 'Content-Length' in headers: |
| 121 | self.meta.bytes_total = int(headers['Content-Length']) |
| 122 | self.meta.progress_percent = 0 |
| 123 | self.meta.bytes_downloaded = 0 |
| 124 | |
| 125 | @property |
| 126 | def url(self): |
| 127 | return self.meta.url |
| 128 | |
| 129 | @property |
| 130 | def filepath(self): |
| 131 | return self.meta.filepath |
| 132 | |
| 133 | # Start of override methods |
| 134 | @property |
| 135 | def download_id(self): |
| 136 | return self.meta.download_id |
| 137 | |
| 138 | def cancel_download(self): |
| 139 | self._cancel_event.set() |
| 140 | |
| 141 | def close(self): |
| 142 | self.session.close() |
| 143 | self._fh.close() |
| 144 | |
| 145 | def cleanup(self): |
| 146 | """Remove the file if the download failed. |
| 147 | """ |
| 148 | if self.meta.status in [base.DownloadStatus.FAILED, base.DownloadStatus.CANCELLED] and self.delete_on_fail: |
| 149 | self.log.info("Cleaning up failed download and removing {}".format( |
| 150 | self.filepath)) |
| 151 | |
| 152 | try: |
| 153 | os.remove(self.filepath) |
| Chamarty | e43e014 | 2017-04-07 11:37:31 -0400 | [diff] [blame] | 154 | except Exception: |
| 155 | pass |
| velandy | 6364d01 | 2017-01-04 19:25:07 +0000 | [diff] [blame] | 156 | |
| 157 | def download(self): |
| 158 | """Start the download |
| 159 | |
| 160 | Trigger an HEAD request to get the meta data before starting the download |
| 161 | """ |
| 162 | try: |
| 163 | self._download() |
| 164 | except Exception as e: |
| 165 | self.log.exception(str(e)) |
| 166 | self.meta.detail = str(e) |
| 167 | self.meta.stop_time = time.time() |
| 168 | |
| 169 | self.download_failed() |
| 170 | |
| 171 | # Close all file handles and clean up |
| 172 | self.close() |
| 173 | self.cleanup() |
| 174 | |
| 175 | self.download_finished() |
| 176 | |
| 177 | # end of override methods |
| 178 | |
| 179 | def check_and_decompress(self, chunk): |
| 180 | if self.url.endswith(".gz") and self.decompress_on_fly: |
| 181 | chunk = self._decompressor.decompress(chunk) |
| 182 | |
| 183 | return chunk |
| 184 | |
| 185 | def _download(self): |
| 186 | |
| Hashir Mohammed | 5e9506c | 2017-06-02 02:35:20 -0400 | [diff] [blame] | 187 | url_options = {"verify": False, "timeout": 10} |
| velandy | 6364d01 | 2017-01-04 19:25:07 +0000 | [diff] [blame] | 188 | |
| 189 | if self.auth is not None: |
| 190 | url_options["auth"] = self.auth |
| 191 | |
| 192 | response = self.session.head(self.url, **url_options) |
| 193 | |
| Chamarty | e43e014 | 2017-04-07 11:37:31 -0400 | [diff] [blame] | 194 | if response.status_code != requests.codes.ok: |
| 195 | response.raise_for_status() |
| 196 | |
| velandy | 6364d01 | 2017-01-04 19:25:07 +0000 | [diff] [blame] | 197 | # Prepare the meta data |
| 198 | self.meta.update_data_with_head(response.headers) |
| 199 | self.meta.start_download() |
| 200 | |
| Jeremy Mordkoff | 4870d0e | 2017-09-30 20:28:33 -0400 | [diff] [blame] | 201 | self.download_progress() |
| velandy | 6364d01 | 2017-01-04 19:25:07 +0000 | [diff] [blame] | 202 | |
| 203 | url_options["stream"] = True, |
| 204 | request = self.session.get(self.url, **url_options) |
| 205 | |
| 206 | if request.status_code != requests.codes.ok: |
| 207 | request.raise_for_status() |
| 208 | |
| 209 | # actual start time, excluding the HEAD request. |
| 210 | for chunk in request.iter_content(chunk_size=1024 * 50): |
| 211 | if self._cancel_event.is_set(): |
| 212 | self.log.info("Download of URL {} to {} has been cancelled".format( |
| 213 | self.url, self.filepath)) |
| 214 | break |
| 215 | |
| 216 | if chunk: # filter out keep-alive new chunks |
| 217 | self.meta.update_with_data(chunk) |
| 218 | self.log.debug("Download progress: {}".format(self.meta.as_dict())) |
| 219 | |
| 220 | chunk = self.check_and_decompress(chunk) |
| 221 | |
| 222 | self._fh.write(chunk) |
| Jeremy Mordkoff | 4870d0e | 2017-09-30 20:28:33 -0400 | [diff] [blame] | 223 | #self.download_progress() |
| velandy | 6364d01 | 2017-01-04 19:25:07 +0000 | [diff] [blame] | 224 | |
| 225 | self.meta.end_download() |
| 226 | self.close() |
| 227 | |
| 228 | if self._cancel_event.is_set(): |
| 229 | self.download_cancelled() |
| 230 | else: |
| 231 | self.download_succeeded() |
| 232 | |
| 233 | self.cleanup() |
| 234 | |
| 235 | # Start of delegate calls |
| 236 | def call_delegate(self, event): |
| 237 | if not self.delegate: |
| 238 | return |
| 239 | |
| 240 | getattr(self.delegate, event)(self.meta) |
| 241 | |
| 242 | def download_failed(self): |
| 243 | self.meta.set_state(base.DownloadStatus.FAILED) |
| 244 | self.call_delegate("on_download_failed") |
| 245 | |
| 246 | def download_cancelled(self): |
| 247 | self.meta.detail = "Download canceled by user." |
| 248 | self.meta.set_state(base.DownloadStatus.CANCELLED) |
| 249 | self.call_delegate("on_download_cancelled") |
| 250 | |
| 251 | def download_progress(self): |
| 252 | self.meta.detail = "Download in progress." |
| 253 | self.meta.set_state(base.DownloadStatus.IN_PROGRESS) |
| 254 | self.call_delegate("on_download_progress") |
| 255 | |
| 256 | def download_succeeded(self): |
| 257 | self.meta.detail = "Download completed successfully." |
| 258 | self.meta.set_state(base.DownloadStatus.COMPLETED) |
| 259 | self.call_delegate("on_download_succeeded") |
| 260 | |
| 261 | def download_started(self): |
| 262 | self.meta.detail = "Setting up download and extracting meta." |
| 263 | self.meta.set_state(base.DownloadStatus.STARTED) |
| 264 | self.call_delegate("on_download_started") |
| 265 | |
| 266 | def download_finished(self): |
| 267 | self.call_delegate("on_download_finished") |