27688945996b615d3b341ee4069cd46185e6f7ec
[osm/SO.git] / common / python / rift / downloader / url.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # Author(s): Varun Prasad
17 # Creation Date: 09/25/2016
18 #
19
20 import io
21 import logging
22 import os
23 import tempfile
24 import threading
25 import time
26 import uuid
27 import zlib
28
29 import requests
30 import requests.exceptions
31 from requests.adapters import HTTPAdapter
32 from requests.packages.urllib3.util.retry import Retry
33 # disable unsigned certificate warning
34 from requests.packages.urllib3.exceptions import InsecureRequestWarning
35 requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
36
37 import gi
38 gi.require_version("RwPkgMgmtYang", "1.0")
39
40 from gi.repository import RwPkgMgmtYang
41 from . import base
42
43
44 class UrlDownloader(base.AbstractDownloader):
45 """Handles downloads of URL with some basic retry strategy.
46 """
47 def __init__(self,
48 url,
49 file_obj=None,
50 auth=None,
51 delete_on_fail=True,
52 decompress_on_fly=False,
53 log=None):
54 """
55 Args:
56 model (str or DownloadJob): Url string to download or the Yang model
57 file_obj (str,file handle): Optional, If not set we create a temp
58 location to store the file.
59 delete_on_fail (bool, optional): Clean up the partially downloaded
60 file, if the download failed or was canceled
61 callback_handler (None, optional): Instance of base.DownloaderCallbackHandler
62 """
63 super().__init__()
64
65 self.log = log or logging.getLogger()
66 self.log.setLevel(logging.DEBUG)
67
68 self._fh, filename = self._validate_fn(file_obj)
69 self.meta = base.DownloadMeta(url, filename)
70
71 self.session = self._create_session()
72 self._cancel_event = threading.Event()
73 self.auth = auth
74
75 self.delete_on_fail = delete_on_fail
76
77 self.decompress_on_fly = decompress_on_fly
78 self._decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS)
79
80 def __repr__(self):
81 data = {"model": self.meta.as_dict()}
82 return str(data)
83
84 def _validate_fn(self, file_obj):
85 """
86 If no file object is given then create a temp file
87 if a filename is given open the file in wb mode
88
89 Finally verify the mode open mode of the file
90
91 """
92 if file_obj is None:
93 _, file_obj = tempfile.mkstemp()
94 # Reopen in wb mode
95 file_obj = open(file_obj, "wb")
96
97 # If the fh is a filename
98 if type(file_obj) is str:
99 file_obj = open(file_obj, "wb")
100
101 if type(file_obj) is not io.BufferedWriter:
102 raise base.InvalidDestinationError("Destination file cannot be"
103 "opened for write")
104
105 return file_obj, file_obj.name
106
107 def _create_session(self):
108 session = requests.Session()
109 retries = Retry(total=5, backoff_factor=1)
110 session.mount("http://", HTTPAdapter(max_retries=retries))
111 session.mount("https://", HTTPAdapter(max_retries=retries))
112
113 return session
114
115 def update_data_from_headers(self, headers):
116 """Update the model from the header of HEAD request
117
118 Args:
119 headers (dict): headers from HEAD response
120 """
121 self.meta.bytes_total = 0
122 if 'Content-Length' in headers:
123 self.meta.bytes_total = int(headers['Content-Length'])
124 self.meta.progress_percent = 0
125 self.meta.bytes_downloaded = 0
126
127 @property
128 def url(self):
129 return self.meta.url
130
131 @property
132 def filepath(self):
133 return self.meta.filepath
134
135 # Start of override methods
136 @property
137 def download_id(self):
138 return self.meta.download_id
139
140 def cancel_download(self):
141 self._cancel_event.set()
142
143 def close(self):
144 self.session.close()
145 self._fh.close()
146
147 def cleanup(self):
148 """Remove the file if the download failed.
149 """
150 if self.meta.status in [base.DownloadStatus.FAILED, base.DownloadStatus.CANCELLED] and self.delete_on_fail:
151 self.log.info("Cleaning up failed download and removing {}".format(
152 self.filepath))
153
154 try:
155 os.remove(self.filepath)
156 except Exception as e:
157 self.log.exception(e)
158
159 def download(self):
160 """Start the download
161
162 Trigger an HEAD request to get the meta data before starting the download
163 """
164 try:
165 self._download()
166 except Exception as e:
167 self.log.exception(str(e))
168 self.meta.detail = str(e)
169 self.meta.stop_time = time.time()
170
171 self.download_failed()
172
173 # Close all file handles and clean up
174 self.close()
175 self.cleanup()
176
177 self.download_finished()
178
179 # end of override methods
180
181 def check_and_decompress(self, chunk):
182 if self.url.endswith(".gz") and self.decompress_on_fly:
183 chunk = self._decompressor.decompress(chunk)
184
185 return chunk
186
187 def _download(self):
188
189 url_options = {"verify": False}
190
191 if self.auth is not None:
192 url_options["auth"] = self.auth
193
194 response = self.session.head(self.url, **url_options)
195
196 # Prepare the meta data
197 self.meta.update_data_with_head(response.headers)
198 self.meta.start_download()
199
200 self.download_started()
201
202 url_options["stream"] = True,
203 request = self.session.get(self.url, **url_options)
204
205 if request.status_code != requests.codes.ok:
206 request.raise_for_status()
207
208 # actual start time, excluding the HEAD request.
209 for chunk in request.iter_content(chunk_size=1024 * 50):
210 if self._cancel_event.is_set():
211 self.log.info("Download of URL {} to {} has been cancelled".format(
212 self.url, self.filepath))
213 break
214
215 if chunk: # filter out keep-alive new chunks
216 self.meta.update_with_data(chunk)
217 self.log.debug("Download progress: {}".format(self.meta.as_dict()))
218
219 chunk = self.check_and_decompress(chunk)
220
221 self._fh.write(chunk)
222 self.download_progress()
223
224 self.meta.end_download()
225 self.close()
226
227 if self._cancel_event.is_set():
228 self.download_cancelled()
229 else:
230 self.download_succeeded()
231
232 self.cleanup()
233
234 # Start of delegate calls
235 def call_delegate(self, event):
236 if not self.delegate:
237 return
238
239 getattr(self.delegate, event)(self.meta)
240
241 def download_failed(self):
242 self.meta.set_state(base.DownloadStatus.FAILED)
243 self.call_delegate("on_download_failed")
244
245 def download_cancelled(self):
246 self.meta.detail = "Download canceled by user."
247 self.meta.set_state(base.DownloadStatus.CANCELLED)
248 self.call_delegate("on_download_cancelled")
249
250 def download_progress(self):
251 self.meta.detail = "Download in progress."
252 self.meta.set_state(base.DownloadStatus.IN_PROGRESS)
253 self.call_delegate("on_download_progress")
254
255 def download_succeeded(self):
256 self.meta.detail = "Download completed successfully."
257 self.meta.set_state(base.DownloadStatus.COMPLETED)
258 self.call_delegate("on_download_succeeded")
259
260 def download_started(self):
261 self.meta.detail = "Setting up download and extracting meta."
262 self.meta.set_state(base.DownloadStatus.STARTED)
263 self.call_delegate("on_download_started")
264
265 def download_finished(self):
266 self.call_delegate("on_download_finished")