blob: 0cdd8d4a0beb58750969c4a588e5d6541c5542d9 [file] [log] [blame]
velandy6364d012017-01-04 19:25:07 +00001#
2# Copyright 2016 RIFT.IO Inc
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# Author(s): Varun Prasad
17# Creation Date: 09/25/2016
18#
19
20import io
21import logging
22import os
23import tempfile
24import threading
25import time
26import uuid
27import zlib
28
29import requests
30import requests.exceptions
31from requests.adapters import HTTPAdapter
32from requests.packages.urllib3.util.retry import Retry
33# disable unsigned certificate warning
34from requests.packages.urllib3.exceptions import InsecureRequestWarning
35requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
36
37import gi
38gi.require_version("RwPkgMgmtYang", "1.0")
39
40from gi.repository import RwPkgMgmtYang
41from . import base
Philip Josephf4937572017-03-03 01:55:37 +053042from .local_file import LocalFileAdapter as LocalFileAdapter
velandy6364d012017-01-04 19:25:07 +000043
44
45class UrlDownloader(base.AbstractDownloader):
46 """Handles downloads of URL with some basic retry strategy.
47 """
48 def __init__(self,
49 url,
50 file_obj=None,
51 auth=None,
52 delete_on_fail=True,
53 decompress_on_fly=False,
54 log=None):
55 """
56 Args:
57 model (str or DownloadJob): Url string to download or the Yang model
58 file_obj (str,file handle): Optional, If not set we create a temp
59 location to store the file.
60 delete_on_fail (bool, optional): Clean up the partially downloaded
61 file, if the download failed or was canceled
62 callback_handler (None, optional): Instance of base.DownloaderCallbackHandler
63 """
64 super().__init__()
65
66 self.log = log or logging.getLogger()
67 self.log.setLevel(logging.DEBUG)
68
69 self._fh, filename = self._validate_fn(file_obj)
70 self.meta = base.DownloadMeta(url, filename)
71
72 self.session = self._create_session()
73 self._cancel_event = threading.Event()
74 self.auth = auth
75
76 self.delete_on_fail = delete_on_fail
77
78 self.decompress_on_fly = decompress_on_fly
79 self._decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS)
80
81 def __repr__(self):
82 data = {"model": self.meta.as_dict()}
83 return str(data)
84
85 def _validate_fn(self, file_obj):
86 """
87 If no file object is given then create a temp file
88 if a filename is given open the file in wb mode
89
90 Finally verify the mode open mode of the file
91
92 """
93 if file_obj is None:
94 _, file_obj = tempfile.mkstemp()
95 # Reopen in wb mode
96 file_obj = open(file_obj, "wb")
97
98 # If the fh is a filename
99 if type(file_obj) is str:
100 file_obj = open(file_obj, "wb")
101
102 if type(file_obj) is not io.BufferedWriter:
103 raise base.InvalidDestinationError("Destination file cannot be"
104 "opened for write")
105
106 return file_obj, file_obj.name
107
108 def _create_session(self):
109 session = requests.Session()
110 retries = Retry(total=5, backoff_factor=1)
111 session.mount("http://", HTTPAdapter(max_retries=retries))
112 session.mount("https://", HTTPAdapter(max_retries=retries))
Philip Josephf4937572017-03-03 01:55:37 +0530113 session.mount("file://", LocalFileAdapter())
velandy6364d012017-01-04 19:25:07 +0000114
115 return session
116
117 def update_data_from_headers(self, headers):
118 """Update the model from the header of HEAD request
119
120 Args:
121 headers (dict): headers from HEAD response
122 """
123 self.meta.bytes_total = 0
124 if 'Content-Length' in headers:
125 self.meta.bytes_total = int(headers['Content-Length'])
126 self.meta.progress_percent = 0
127 self.meta.bytes_downloaded = 0
128
129 @property
130 def url(self):
131 return self.meta.url
132
133 @property
134 def filepath(self):
135 return self.meta.filepath
136
137 # Start of override methods
138 @property
139 def download_id(self):
140 return self.meta.download_id
141
142 def cancel_download(self):
143 self._cancel_event.set()
144
145 def close(self):
146 self.session.close()
147 self._fh.close()
148
149 def cleanup(self):
150 """Remove the file if the download failed.
151 """
152 if self.meta.status in [base.DownloadStatus.FAILED, base.DownloadStatus.CANCELLED] and self.delete_on_fail:
153 self.log.info("Cleaning up failed download and removing {}".format(
154 self.filepath))
155
156 try:
157 os.remove(self.filepath)
158 except Exception as e:
159 self.log.exception(e)
160
161 def download(self):
162 """Start the download
163
164 Trigger an HEAD request to get the meta data before starting the download
165 """
166 try:
167 self._download()
168 except Exception as e:
169 self.log.exception(str(e))
170 self.meta.detail = str(e)
171 self.meta.stop_time = time.time()
172
173 self.download_failed()
174
175 # Close all file handles and clean up
176 self.close()
177 self.cleanup()
178
179 self.download_finished()
180
181 # end of override methods
182
183 def check_and_decompress(self, chunk):
184 if self.url.endswith(".gz") and self.decompress_on_fly:
185 chunk = self._decompressor.decompress(chunk)
186
187 return chunk
188
189 def _download(self):
190
191 url_options = {"verify": False}
192
193 if self.auth is not None:
194 url_options["auth"] = self.auth
195
196 response = self.session.head(self.url, **url_options)
197
198 # Prepare the meta data
199 self.meta.update_data_with_head(response.headers)
200 self.meta.start_download()
201
202 self.download_started()
203
204 url_options["stream"] = True,
205 request = self.session.get(self.url, **url_options)
206
207 if request.status_code != requests.codes.ok:
208 request.raise_for_status()
209
210 # actual start time, excluding the HEAD request.
211 for chunk in request.iter_content(chunk_size=1024 * 50):
212 if self._cancel_event.is_set():
213 self.log.info("Download of URL {} to {} has been cancelled".format(
214 self.url, self.filepath))
215 break
216
217 if chunk: # filter out keep-alive new chunks
218 self.meta.update_with_data(chunk)
219 self.log.debug("Download progress: {}".format(self.meta.as_dict()))
220
221 chunk = self.check_and_decompress(chunk)
222
223 self._fh.write(chunk)
224 self.download_progress()
225
226 self.meta.end_download()
227 self.close()
228
229 if self._cancel_event.is_set():
230 self.download_cancelled()
231 else:
232 self.download_succeeded()
233
234 self.cleanup()
235
236 # Start of delegate calls
237 def call_delegate(self, event):
238 if not self.delegate:
239 return
240
241 getattr(self.delegate, event)(self.meta)
242
243 def download_failed(self):
244 self.meta.set_state(base.DownloadStatus.FAILED)
245 self.call_delegate("on_download_failed")
246
247 def download_cancelled(self):
248 self.meta.detail = "Download canceled by user."
249 self.meta.set_state(base.DownloadStatus.CANCELLED)
250 self.call_delegate("on_download_cancelled")
251
252 def download_progress(self):
253 self.meta.detail = "Download in progress."
254 self.meta.set_state(base.DownloadStatus.IN_PROGRESS)
255 self.call_delegate("on_download_progress")
256
257 def download_succeeded(self):
258 self.meta.detail = "Download completed successfully."
259 self.meta.set_state(base.DownloadStatus.COMPLETED)
260 self.call_delegate("on_download_succeeded")
261
262 def download_started(self):
263 self.meta.detail = "Setting up download and extracting meta."
264 self.meta.set_state(base.DownloadStatus.STARTED)
265 self.call_delegate("on_download_started")
266
267 def download_finished(self):
268 self.call_delegate("on_download_finished")