update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / common / python / rift / downloader / url.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # Author(s): Varun Prasad
17 # Creation Date: 09/25/2016
18 #
19
20 import io
21 import logging
22 import os
23 import tempfile
24 import threading
25 import time
26 import zlib
27
28 import requests
29 import requests.exceptions
30 from requests.adapters import HTTPAdapter
31 from requests.packages.urllib3.util.retry import Retry
32 # disable unsigned certificate warning
33 from requests.packages.urllib3.exceptions import InsecureRequestWarning
34 requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
35
36 from . import base
37 from .local_file import LocalFileAdapter as LocalFileAdapter
38
39 class UrlDownloader(base.AbstractDownloader):
40 """Handles downloads of URL with some basic retry strategy.
41 """
42 def __init__(self,
43 url,
44 file_obj=None,
45 auth=None,
46 delete_on_fail=True,
47 decompress_on_fly=False,
48 log=None):
49 """
50 Args:
51 model (str or DownloadJob): Url string to download or the Yang model
52 file_obj (str,file handle): Optional, If not set we create a temp
53 location to store the file.
54 delete_on_fail (bool, optional): Clean up the partially downloaded
55 file, if the download failed or was canceled
56 callback_handler (None, optional): Instance of base.DownloaderCallbackHandler
57 """
58 super().__init__()
59
60 self.log = log or logging.getLogger()
61 self.log.setLevel(logging.DEBUG)
62
63 self._fh, filename = self._validate_fn(file_obj)
64 self.meta = base.DownloadMeta(url, filename)
65
66 self.session = self._create_session()
67 self._cancel_event = threading.Event()
68 self.auth = auth
69
70 self.delete_on_fail = delete_on_fail
71
72 self.decompress_on_fly = decompress_on_fly
73 self._decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS)
74
75 def __repr__(self):
76 data = {"model": self.meta.as_dict()}
77 return str(data)
78
79 def _validate_fn(self, file_obj):
80 """
81 If no file object is given then create a temp file
82 if a filename is given open the file in wb mode
83
84 Finally verify the mode open mode of the file
85
86 """
87 if file_obj is None:
88 _, file_obj = tempfile.mkstemp()
89 # Reopen in wb mode
90 file_obj = open(file_obj, "wb")
91
92 # If the fh is a filename
93 if type(file_obj) is str:
94 file_obj = open(file_obj, "wb")
95
96 if type(file_obj) is not io.BufferedWriter:
97 raise base.InvalidDestinationError("Destination file cannot be"
98 "opened for write")
99
100 return file_obj, file_obj.name
101
102 def _create_session(self):
103 session = requests.Session()
104 # 3 connection attempts should be more than enough, We can't wait forever!
105 # The user needs to be updated of the status
106 retries = Retry(total=2, backoff_factor=1)
107 session.mount("http://", HTTPAdapter(max_retries=retries))
108 session.mount("https://", HTTPAdapter(max_retries=retries))
109 session.mount("file://", LocalFileAdapter())
110
111 return session
112
113 def update_data_from_headers(self, headers):
114 """Update the model from the header of HEAD request
115
116 Args:
117 headers (dict): headers from HEAD response
118 """
119 self.meta.bytes_total = 0
120 if 'Content-Length' in headers:
121 self.meta.bytes_total = int(headers['Content-Length'])
122 self.meta.progress_percent = 0
123 self.meta.bytes_downloaded = 0
124
125 @property
126 def url(self):
127 return self.meta.url
128
129 @property
130 def filepath(self):
131 return self.meta.filepath
132
133 # Start of override methods
134 @property
135 def download_id(self):
136 return self.meta.download_id
137
138 def cancel_download(self):
139 self._cancel_event.set()
140
141 def close(self):
142 self.session.close()
143 self._fh.close()
144
145 def cleanup(self):
146 """Remove the file if the download failed.
147 """
148 if self.meta.status in [base.DownloadStatus.FAILED, base.DownloadStatus.CANCELLED] and self.delete_on_fail:
149 self.log.info("Cleaning up failed download and removing {}".format(
150 self.filepath))
151
152 try:
153 os.remove(self.filepath)
154 except Exception:
155 pass
156
157 def download(self):
158 """Start the download
159
160 Trigger an HEAD request to get the meta data before starting the download
161 """
162 try:
163 self._download()
164 except Exception as e:
165 self.log.exception(str(e))
166 self.meta.detail = str(e)
167 self.meta.stop_time = time.time()
168
169 self.download_failed()
170
171 # Close all file handles and clean up
172 self.close()
173 self.cleanup()
174
175 self.download_finished()
176
177 # end of override methods
178
179 def check_and_decompress(self, chunk):
180 if self.url.endswith(".gz") and self.decompress_on_fly:
181 chunk = self._decompressor.decompress(chunk)
182
183 return chunk
184
185 def _download(self):
186
187 url_options = {"verify": False, "timeout": 10}
188
189 if self.auth is not None:
190 url_options["auth"] = self.auth
191
192 response = self.session.head(self.url, **url_options)
193
194 if response.status_code != requests.codes.ok:
195 response.raise_for_status()
196
197 # Prepare the meta data
198 self.meta.update_data_with_head(response.headers)
199 self.meta.start_download()
200
201 self.download_progress()
202
203 url_options["stream"] = True,
204 request = self.session.get(self.url, **url_options)
205
206 if request.status_code != requests.codes.ok:
207 request.raise_for_status()
208
209 # actual start time, excluding the HEAD request.
210 for chunk in request.iter_content(chunk_size=1024 * 50):
211 if self._cancel_event.is_set():
212 self.log.info("Download of URL {} to {} has been cancelled".format(
213 self.url, self.filepath))
214 break
215
216 if chunk: # filter out keep-alive new chunks
217 self.meta.update_with_data(chunk)
218 self.log.debug("Download progress: {}".format(self.meta.as_dict()))
219
220 chunk = self.check_and_decompress(chunk)
221
222 self._fh.write(chunk)
223 #self.download_progress()
224
225 self.meta.end_download()
226 self.close()
227
228 if self._cancel_event.is_set():
229 self.download_cancelled()
230 else:
231 self.download_succeeded()
232
233 self.cleanup()
234
235 # Start of delegate calls
236 def call_delegate(self, event):
237 if not self.delegate:
238 return
239
240 getattr(self.delegate, event)(self.meta)
241
242 def download_failed(self):
243 self.meta.set_state(base.DownloadStatus.FAILED)
244 self.call_delegate("on_download_failed")
245
246 def download_cancelled(self):
247 self.meta.detail = "Download canceled by user."
248 self.meta.set_state(base.DownloadStatus.CANCELLED)
249 self.call_delegate("on_download_cancelled")
250
251 def download_progress(self):
252 self.meta.detail = "Download in progress."
253 self.meta.set_state(base.DownloadStatus.IN_PROGRESS)
254 self.call_delegate("on_download_progress")
255
256 def download_succeeded(self):
257 self.meta.detail = "Download completed successfully."
258 self.meta.set_state(base.DownloadStatus.COMPLETED)
259 self.call_delegate("on_download_succeeded")
260
261 def download_started(self):
262 self.meta.detail = "Setting up download and extracting meta."
263 self.meta.set_state(base.DownloadStatus.STARTED)
264 self.call_delegate("on_download_started")
265
266 def download_finished(self):
267 self.call_delegate("on_download_finished")