--- /dev/null
+#
+# Copyright 2016 RIFT.IO Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Author(s): Varun Prasad
+# Creation Date: 09/25/2016
+#
+
+import io
+import logging
+import os
+import tempfile
+import threading
+import time
+import uuid
+import zlib
+
+import requests
+import requests.exceptions
+from requests.adapters import HTTPAdapter
+from requests.packages.urllib3.util.retry import Retry
+# disable unsigned certificate warning
+from requests.packages.urllib3.exceptions import InsecureRequestWarning
+requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
+
+import gi
+gi.require_version("RwPkgMgmtYang", "1.0")
+
+from gi.repository import RwPkgMgmtYang
+from . import base
+
+
+class UrlDownloader(base.AbstractDownloader):
+ """Handles downloads of URL with some basic retry strategy.
+ """
+ def __init__(self,
+ url,
+ file_obj=None,
+ auth=None,
+ delete_on_fail=True,
+ decompress_on_fly=False,
+ log=None):
+ """
+ Args:
+ model (str or DownloadJob): Url string to download or the Yang model
+ file_obj (str,file handle): Optional, If not set we create a temp
+ location to store the file.
+ delete_on_fail (bool, optional): Clean up the partially downloaded
+ file, if the download failed or was canceled
+ callback_handler (None, optional): Instance of base.DownloaderCallbackHandler
+ """
+ super().__init__()
+
+ self.log = log or logging.getLogger()
+ self.log.setLevel(logging.DEBUG)
+
+ self._fh, filename = self._validate_fn(file_obj)
+ self.meta = base.DownloadMeta(url, filename)
+
+ self.session = self._create_session()
+ self._cancel_event = threading.Event()
+ self.auth = auth
+
+ self.delete_on_fail = delete_on_fail
+
+ self.decompress_on_fly = decompress_on_fly
+ self._decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS)
+
+ def __repr__(self):
+ data = {"model": self.meta.as_dict()}
+ return str(data)
+
+ def _validate_fn(self, file_obj):
+ """
+ If no file object is given then create a temp file
+ if a filename is given open the file in wb mode
+
+ Finally verify the mode open mode of the file
+
+ """
+ if file_obj is None:
+ _, file_obj = tempfile.mkstemp()
+ # Reopen in wb mode
+ file_obj = open(file_obj, "wb")
+
+ # If the fh is a filename
+ if type(file_obj) is str:
+ file_obj = open(file_obj, "wb")
+
+ if type(file_obj) is not io.BufferedWriter:
+ raise base.InvalidDestinationError("Destination file cannot be"
+ "opened for write")
+
+ return file_obj, file_obj.name
+
+ def _create_session(self):
+ session = requests.Session()
+ retries = Retry(total=5, backoff_factor=1)
+ session.mount("http://", HTTPAdapter(max_retries=retries))
+ session.mount("https://", HTTPAdapter(max_retries=retries))
+
+ return session
+
+ def update_data_from_headers(self, headers):
+ """Update the model from the header of HEAD request
+
+ Args:
+ headers (dict): headers from HEAD response
+ """
+ self.meta.bytes_total = 0
+ if 'Content-Length' in headers:
+ self.meta.bytes_total = int(headers['Content-Length'])
+ self.meta.progress_percent = 0
+ self.meta.bytes_downloaded = 0
+
+ @property
+ def url(self):
+ return self.meta.url
+
+ @property
+ def filepath(self):
+ return self.meta.filepath
+
+ # Start of override methods
+ @property
+ def download_id(self):
+ return self.meta.download_id
+
+ def cancel_download(self):
+ self._cancel_event.set()
+
+ def close(self):
+ self.session.close()
+ self._fh.close()
+
+ def cleanup(self):
+ """Remove the file if the download failed.
+ """
+ if self.meta.status in [base.DownloadStatus.FAILED, base.DownloadStatus.CANCELLED] and self.delete_on_fail:
+ self.log.info("Cleaning up failed download and removing {}".format(
+ self.filepath))
+
+ try:
+ os.remove(self.filepath)
+ except Exception as e:
+ self.log.exception(e)
+
+ def download(self):
+ """Start the download
+
+ Trigger an HEAD request to get the meta data before starting the download
+ """
+ try:
+ self._download()
+ except Exception as e:
+ self.log.exception(str(e))
+ self.meta.detail = str(e)
+ self.meta.stop_time = time.time()
+
+ self.download_failed()
+
+ # Close all file handles and clean up
+ self.close()
+ self.cleanup()
+
+ self.download_finished()
+
+ # end of override methods
+
+ def check_and_decompress(self, chunk):
+ if self.url.endswith(".gz") and self.decompress_on_fly:
+ chunk = self._decompressor.decompress(chunk)
+
+ return chunk
+
+ def _download(self):
+
+ url_options = {"verify": False}
+
+ if self.auth is not None:
+ url_options["auth"] = self.auth
+
+ response = self.session.head(self.url, **url_options)
+
+ # Prepare the meta data
+ self.meta.update_data_with_head(response.headers)
+ self.meta.start_download()
+
+ self.download_started()
+
+ url_options["stream"] = True,
+ request = self.session.get(self.url, **url_options)
+
+ if request.status_code != requests.codes.ok:
+ request.raise_for_status()
+
+ # actual start time, excluding the HEAD request.
+ for chunk in request.iter_content(chunk_size=1024 * 50):
+ if self._cancel_event.is_set():
+ self.log.info("Download of URL {} to {} has been cancelled".format(
+ self.url, self.filepath))
+ break
+
+ if chunk: # filter out keep-alive new chunks
+ self.meta.update_with_data(chunk)
+ self.log.debug("Download progress: {}".format(self.meta.as_dict()))
+
+ chunk = self.check_and_decompress(chunk)
+
+ self._fh.write(chunk)
+ self.download_progress()
+
+ self.meta.end_download()
+ self.close()
+
+ if self._cancel_event.is_set():
+ self.download_cancelled()
+ else:
+ self.download_succeeded()
+
+ self.cleanup()
+
+ # Start of delegate calls
+ def call_delegate(self, event):
+ if not self.delegate:
+ return
+
+ getattr(self.delegate, event)(self.meta)
+
+ def download_failed(self):
+ self.meta.set_state(base.DownloadStatus.FAILED)
+ self.call_delegate("on_download_failed")
+
+ def download_cancelled(self):
+ self.meta.detail = "Download canceled by user."
+ self.meta.set_state(base.DownloadStatus.CANCELLED)
+ self.call_delegate("on_download_cancelled")
+
+ def download_progress(self):
+ self.meta.detail = "Download in progress."
+ self.meta.set_state(base.DownloadStatus.IN_PROGRESS)
+ self.call_delegate("on_download_progress")
+
+ def download_succeeded(self):
+ self.meta.detail = "Download completed successfully."
+ self.meta.set_state(base.DownloadStatus.COMPLETED)
+ self.call_delegate("on_download_succeeded")
+
+ def download_started(self):
+ self.meta.detail = "Setting up download and extracting meta."
+ self.meta.set_state(base.DownloadStatus.STARTED)
+ self.call_delegate("on_download_started")
+
+ def download_finished(self):
+ self.call_delegate("on_download_finished")