Merge from OSM SO master
[osm/SO.git] / common / python / rift / downloader / url.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # Author(s): Varun Prasad
17 # Creation Date: 09/25/2016
18 #
19
20 import io
21 import logging
22 import os
23 import tempfile
24 import threading
25 import time
26 import uuid
27 import zlib
28
29 import requests
30 import requests.exceptions
31 from requests.adapters import HTTPAdapter
32 from requests.packages.urllib3.util.retry import Retry
33 # disable unsigned certificate warning
34 from requests.packages.urllib3.exceptions import InsecureRequestWarning
35 requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
36
37 import gi
38 gi.require_version("RwPkgMgmtYang", "1.0")
39
40 from gi.repository import RwPkgMgmtYang
41 from . import base
42 from .local_file import LocalFileAdapter as LocalFileAdapter
43
44
45 class UrlDownloader(base.AbstractDownloader):
46 """Handles downloads of URL with some basic retry strategy.
47 """
48 def __init__(self,
49 url,
50 file_obj=None,
51 auth=None,
52 delete_on_fail=True,
53 decompress_on_fly=False,
54 log=None):
55 """
56 Args:
57 model (str or DownloadJob): Url string to download or the Yang model
58 file_obj (str,file handle): Optional, If not set we create a temp
59 location to store the file.
60 delete_on_fail (bool, optional): Clean up the partially downloaded
61 file, if the download failed or was canceled
62 callback_handler (None, optional): Instance of base.DownloaderCallbackHandler
63 """
64 super().__init__()
65
66 self.log = log or logging.getLogger()
67 self.log.setLevel(logging.DEBUG)
68
69 self._fh, filename = self._validate_fn(file_obj)
70 self.meta = base.DownloadMeta(url, filename)
71
72 self.session = self._create_session()
73 self._cancel_event = threading.Event()
74 self.auth = auth
75
76 self.delete_on_fail = delete_on_fail
77
78 self.decompress_on_fly = decompress_on_fly
79 self._decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS)
80
81 def __repr__(self):
82 data = {"model": self.meta.as_dict()}
83 return str(data)
84
85 def _validate_fn(self, file_obj):
86 """
87 If no file object is given then create a temp file
88 if a filename is given open the file in wb mode
89
90 Finally verify the mode open mode of the file
91
92 """
93 if file_obj is None:
94 _, file_obj = tempfile.mkstemp()
95 # Reopen in wb mode
96 file_obj = open(file_obj, "wb")
97
98 # If the fh is a filename
99 if type(file_obj) is str:
100 file_obj = open(file_obj, "wb")
101
102 if type(file_obj) is not io.BufferedWriter:
103 raise base.InvalidDestinationError("Destination file cannot be"
104 "opened for write")
105
106 return file_obj, file_obj.name
107
108 def _create_session(self):
109 session = requests.Session()
110 retries = Retry(total=5, backoff_factor=1)
111 session.mount("http://", HTTPAdapter(max_retries=retries))
112 session.mount("https://", HTTPAdapter(max_retries=retries))
113 session.mount("file://", LocalFileAdapter())
114
115 return session
116
117 def update_data_from_headers(self, headers):
118 """Update the model from the header of HEAD request
119
120 Args:
121 headers (dict): headers from HEAD response
122 """
123 self.meta.bytes_total = 0
124 if 'Content-Length' in headers:
125 self.meta.bytes_total = int(headers['Content-Length'])
126 self.meta.progress_percent = 0
127 self.meta.bytes_downloaded = 0
128
129 @property
130 def url(self):
131 return self.meta.url
132
133 @property
134 def filepath(self):
135 return self.meta.filepath
136
137 # Start of override methods
138 @property
139 def download_id(self):
140 return self.meta.download_id
141
142 def cancel_download(self):
143 self._cancel_event.set()
144
145 def close(self):
146 self.session.close()
147 self._fh.close()
148
149 def cleanup(self):
150 """Remove the file if the download failed.
151 """
152 if self.meta.status in [base.DownloadStatus.FAILED, base.DownloadStatus.CANCELLED] and self.delete_on_fail:
153 self.log.info("Cleaning up failed download and removing {}".format(
154 self.filepath))
155
156 try:
157 os.remove(self.filepath)
158 except Exception as e:
159 self.log.exception(e)
160
161 def download(self):
162 """Start the download
163
164 Trigger an HEAD request to get the meta data before starting the download
165 """
166 try:
167 self._download()
168 except Exception as e:
169 self.log.exception(str(e))
170 self.meta.detail = str(e)
171 self.meta.stop_time = time.time()
172
173 self.download_failed()
174
175 # Close all file handles and clean up
176 self.close()
177 self.cleanup()
178
179 self.download_finished()
180
181 # end of override methods
182
183 def check_and_decompress(self, chunk):
184 if self.url.endswith(".gz") and self.decompress_on_fly:
185 chunk = self._decompressor.decompress(chunk)
186
187 return chunk
188
189 def _download(self):
190
191 url_options = {"verify": False}
192
193 if self.auth is not None:
194 url_options["auth"] = self.auth
195
196 response = self.session.head(self.url, **url_options)
197
198 # Prepare the meta data
199 self.meta.update_data_with_head(response.headers)
200 self.meta.start_download()
201
202 self.download_started()
203
204 url_options["stream"] = True,
205 request = self.session.get(self.url, **url_options)
206
207 if request.status_code != requests.codes.ok:
208 request.raise_for_status()
209
210 # actual start time, excluding the HEAD request.
211 for chunk in request.iter_content(chunk_size=1024 * 50):
212 if self._cancel_event.is_set():
213 self.log.info("Download of URL {} to {} has been cancelled".format(
214 self.url, self.filepath))
215 break
216
217 if chunk: # filter out keep-alive new chunks
218 self.meta.update_with_data(chunk)
219 self.log.debug("Download progress: {}".format(self.meta.as_dict()))
220
221 chunk = self.check_and_decompress(chunk)
222
223 self._fh.write(chunk)
224 self.download_progress()
225
226 self.meta.end_download()
227 self.close()
228
229 if self._cancel_event.is_set():
230 self.download_cancelled()
231 else:
232 self.download_succeeded()
233
234 self.cleanup()
235
236 # Start of delegate calls
237 def call_delegate(self, event):
238 if not self.delegate:
239 return
240
241 getattr(self.delegate, event)(self.meta)
242
243 def download_failed(self):
244 self.meta.set_state(base.DownloadStatus.FAILED)
245 self.call_delegate("on_download_failed")
246
247 def download_cancelled(self):
248 self.meta.detail = "Download canceled by user."
249 self.meta.set_state(base.DownloadStatus.CANCELLED)
250 self.call_delegate("on_download_cancelled")
251
252 def download_progress(self):
253 self.meta.detail = "Download in progress."
254 self.meta.set_state(base.DownloadStatus.IN_PROGRESS)
255 self.call_delegate("on_download_progress")
256
257 def download_succeeded(self):
258 self.meta.detail = "Download completed successfully."
259 self.meta.set_state(base.DownloadStatus.COMPLETED)
260 self.call_delegate("on_download_succeeded")
261
262 def download_started(self):
263 self.meta.detail = "Setting up download and extracting meta."
264 self.meta.set_state(base.DownloadStatus.STARTED)
265 self.call_delegate("on_download_started")
266
267 def download_finished(self):
268 self.call_delegate("on_download_finished")