| velandy | 6364d01 | 2017-01-04 19:25:07 +0000 | [diff] [blame^] | 1 | # |
| 2 | # Copyright 2016 RIFT.IO Inc |
| 3 | # |
| 4 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | # you may not use this file except in compliance with the License. |
| 6 | # You may obtain a copy of the License at |
| 7 | # |
| 8 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | # |
| 10 | # Unless required by applicable law or agreed to in writing, software |
| 11 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | # See the License for the specific language governing permissions and |
| 14 | # limitations under the License. |
| 15 | # |
| 16 | # Author(s): Varun Prasad |
| 17 | # Creation Date: 09/25/2016 |
| 18 | # |
| 19 | |
| 20 | import io |
| 21 | import logging |
| 22 | import os |
| 23 | import tempfile |
| 24 | import threading |
| 25 | import time |
| 26 | import uuid |
| 27 | import zlib |
| 28 | |
| 29 | import requests |
| 30 | import requests.exceptions |
| 31 | from requests.adapters import HTTPAdapter |
| 32 | from requests.packages.urllib3.util.retry import Retry |
| 33 | # disable unsigned certificate warning |
| 34 | from requests.packages.urllib3.exceptions import InsecureRequestWarning |
| 35 | requests.packages.urllib3.disable_warnings(InsecureRequestWarning) |
| 36 | |
| 37 | import gi |
| 38 | gi.require_version("RwPkgMgmtYang", "1.0") |
| 39 | |
| 40 | from gi.repository import RwPkgMgmtYang |
| 41 | from . import base |
| 42 | |
| 43 | |
| 44 | class UrlDownloader(base.AbstractDownloader): |
| 45 | """Handles downloads of URL with some basic retry strategy. |
| 46 | """ |
| 47 | def __init__(self, |
| 48 | url, |
| 49 | file_obj=None, |
| 50 | auth=None, |
| 51 | delete_on_fail=True, |
| 52 | decompress_on_fly=False, |
| 53 | log=None): |
| 54 | """ |
| 55 | Args: |
| 56 | model (str or DownloadJob): Url string to download or the Yang model |
| 57 | file_obj (str,file handle): Optional, If not set we create a temp |
| 58 | location to store the file. |
| 59 | delete_on_fail (bool, optional): Clean up the partially downloaded |
| 60 | file, if the download failed or was canceled |
| 61 | callback_handler (None, optional): Instance of base.DownloaderCallbackHandler |
| 62 | """ |
| 63 | super().__init__() |
| 64 | |
| 65 | self.log = log or logging.getLogger() |
| 66 | self.log.setLevel(logging.DEBUG) |
| 67 | |
| 68 | self._fh, filename = self._validate_fn(file_obj) |
| 69 | self.meta = base.DownloadMeta(url, filename) |
| 70 | |
| 71 | self.session = self._create_session() |
| 72 | self._cancel_event = threading.Event() |
| 73 | self.auth = auth |
| 74 | |
| 75 | self.delete_on_fail = delete_on_fail |
| 76 | |
| 77 | self.decompress_on_fly = decompress_on_fly |
| 78 | self._decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS) |
| 79 | |
| 80 | def __repr__(self): |
| 81 | data = {"model": self.meta.as_dict()} |
| 82 | return str(data) |
| 83 | |
| 84 | def _validate_fn(self, file_obj): |
| 85 | """ |
| 86 | If no file object is given then create a temp file |
| 87 | if a filename is given open the file in wb mode |
| 88 | |
| 89 | Finally verify the mode open mode of the file |
| 90 | |
| 91 | """ |
| 92 | if file_obj is None: |
| 93 | _, file_obj = tempfile.mkstemp() |
| 94 | # Reopen in wb mode |
| 95 | file_obj = open(file_obj, "wb") |
| 96 | |
| 97 | # If the fh is a filename |
| 98 | if type(file_obj) is str: |
| 99 | file_obj = open(file_obj, "wb") |
| 100 | |
| 101 | if type(file_obj) is not io.BufferedWriter: |
| 102 | raise base.InvalidDestinationError("Destination file cannot be" |
| 103 | "opened for write") |
| 104 | |
| 105 | return file_obj, file_obj.name |
| 106 | |
| 107 | def _create_session(self): |
| 108 | session = requests.Session() |
| 109 | retries = Retry(total=5, backoff_factor=1) |
| 110 | session.mount("http://", HTTPAdapter(max_retries=retries)) |
| 111 | session.mount("https://", HTTPAdapter(max_retries=retries)) |
| 112 | |
| 113 | return session |
| 114 | |
| 115 | def update_data_from_headers(self, headers): |
| 116 | """Update the model from the header of HEAD request |
| 117 | |
| 118 | Args: |
| 119 | headers (dict): headers from HEAD response |
| 120 | """ |
| 121 | self.meta.bytes_total = 0 |
| 122 | if 'Content-Length' in headers: |
| 123 | self.meta.bytes_total = int(headers['Content-Length']) |
| 124 | self.meta.progress_percent = 0 |
| 125 | self.meta.bytes_downloaded = 0 |
| 126 | |
| 127 | @property |
| 128 | def url(self): |
| 129 | return self.meta.url |
| 130 | |
| 131 | @property |
| 132 | def filepath(self): |
| 133 | return self.meta.filepath |
| 134 | |
| 135 | # Start of override methods |
| 136 | @property |
| 137 | def download_id(self): |
| 138 | return self.meta.download_id |
| 139 | |
| 140 | def cancel_download(self): |
| 141 | self._cancel_event.set() |
| 142 | |
| 143 | def close(self): |
| 144 | self.session.close() |
| 145 | self._fh.close() |
| 146 | |
| 147 | def cleanup(self): |
| 148 | """Remove the file if the download failed. |
| 149 | """ |
| 150 | if self.meta.status in [base.DownloadStatus.FAILED, base.DownloadStatus.CANCELLED] and self.delete_on_fail: |
| 151 | self.log.info("Cleaning up failed download and removing {}".format( |
| 152 | self.filepath)) |
| 153 | |
| 154 | try: |
| 155 | os.remove(self.filepath) |
| 156 | except Exception as e: |
| 157 | self.log.exception(e) |
| 158 | |
| 159 | def download(self): |
| 160 | """Start the download |
| 161 | |
| 162 | Trigger an HEAD request to get the meta data before starting the download |
| 163 | """ |
| 164 | try: |
| 165 | self._download() |
| 166 | except Exception as e: |
| 167 | self.log.exception(str(e)) |
| 168 | self.meta.detail = str(e) |
| 169 | self.meta.stop_time = time.time() |
| 170 | |
| 171 | self.download_failed() |
| 172 | |
| 173 | # Close all file handles and clean up |
| 174 | self.close() |
| 175 | self.cleanup() |
| 176 | |
| 177 | self.download_finished() |
| 178 | |
| 179 | # end of override methods |
| 180 | |
| 181 | def check_and_decompress(self, chunk): |
| 182 | if self.url.endswith(".gz") and self.decompress_on_fly: |
| 183 | chunk = self._decompressor.decompress(chunk) |
| 184 | |
| 185 | return chunk |
| 186 | |
| 187 | def _download(self): |
| 188 | |
| 189 | url_options = {"verify": False} |
| 190 | |
| 191 | if self.auth is not None: |
| 192 | url_options["auth"] = self.auth |
| 193 | |
| 194 | response = self.session.head(self.url, **url_options) |
| 195 | |
| 196 | # Prepare the meta data |
| 197 | self.meta.update_data_with_head(response.headers) |
| 198 | self.meta.start_download() |
| 199 | |
| 200 | self.download_started() |
| 201 | |
| 202 | url_options["stream"] = True, |
| 203 | request = self.session.get(self.url, **url_options) |
| 204 | |
| 205 | if request.status_code != requests.codes.ok: |
| 206 | request.raise_for_status() |
| 207 | |
| 208 | # actual start time, excluding the HEAD request. |
| 209 | for chunk in request.iter_content(chunk_size=1024 * 50): |
| 210 | if self._cancel_event.is_set(): |
| 211 | self.log.info("Download of URL {} to {} has been cancelled".format( |
| 212 | self.url, self.filepath)) |
| 213 | break |
| 214 | |
| 215 | if chunk: # filter out keep-alive new chunks |
| 216 | self.meta.update_with_data(chunk) |
| 217 | self.log.debug("Download progress: {}".format(self.meta.as_dict())) |
| 218 | |
| 219 | chunk = self.check_and_decompress(chunk) |
| 220 | |
| 221 | self._fh.write(chunk) |
| 222 | self.download_progress() |
| 223 | |
| 224 | self.meta.end_download() |
| 225 | self.close() |
| 226 | |
| 227 | if self._cancel_event.is_set(): |
| 228 | self.download_cancelled() |
| 229 | else: |
| 230 | self.download_succeeded() |
| 231 | |
| 232 | self.cleanup() |
| 233 | |
| 234 | # Start of delegate calls |
| 235 | def call_delegate(self, event): |
| 236 | if not self.delegate: |
| 237 | return |
| 238 | |
| 239 | getattr(self.delegate, event)(self.meta) |
| 240 | |
| 241 | def download_failed(self): |
| 242 | self.meta.set_state(base.DownloadStatus.FAILED) |
| 243 | self.call_delegate("on_download_failed") |
| 244 | |
| 245 | def download_cancelled(self): |
| 246 | self.meta.detail = "Download canceled by user." |
| 247 | self.meta.set_state(base.DownloadStatus.CANCELLED) |
| 248 | self.call_delegate("on_download_cancelled") |
| 249 | |
| 250 | def download_progress(self): |
| 251 | self.meta.detail = "Download in progress." |
| 252 | self.meta.set_state(base.DownloadStatus.IN_PROGRESS) |
| 253 | self.call_delegate("on_download_progress") |
| 254 | |
| 255 | def download_succeeded(self): |
| 256 | self.meta.detail = "Download completed successfully." |
| 257 | self.meta.set_state(base.DownloadStatus.COMPLETED) |
| 258 | self.call_delegate("on_download_succeeded") |
| 259 | |
| 260 | def download_started(self): |
| 261 | self.meta.detail = "Setting up download and extracting meta." |
| 262 | self.meta.set_state(base.DownloadStatus.STARTED) |
| 263 | self.call_delegate("on_download_started") |
| 264 | |
| 265 | def download_finished(self): |
| 266 | self.call_delegate("on_download_finished") |