141 - Support for Package Management in SO
diff --git a/common/python/CMakeLists.txt b/common/python/CMakeLists.txt
index 85ead68..3343119 100644
--- a/common/python/CMakeLists.txt
+++ b/common/python/CMakeLists.txt
@@ -32,6 +32,7 @@
   )
 
 
+# Subscribers
 rift_python_install_tree(
   FILES
     rift/mano/dts/__init__.py
@@ -46,23 +47,29 @@
   PYTHON3_ONLY
   )
 
+# RPCs
 rift_python_install_tree(
   FILES
-    rift/mano/config_data/__init__.py
-    rift/mano/config_data/config.py
+    rift/mano/dts/rpc/__init__.py
+    rift/mano/dts/rpc/core.py
+  COMPONENT ${PKG_LONG_NAME}
+  PYTHON3_ONLY
+  )
+
+# Downloaders
+rift_python_install_tree(
+  FILES
+    rift/downloader/__init__.py
+    rift/downloader/base.py
+    rift/downloader/url.py
   COMPONENT ${PKG_LONG_NAME}
   PYTHON3_ONLY
   )
 
 rift_python_install_tree(
   FILES
-    rift/mano/dts/__init__.py
-    rift/mano/dts/core.py
-    rift/mano/dts/subscriber/__init__.py
-    rift/mano/dts/subscriber/core.py
-    rift/mano/dts/subscriber/store.py
-    rift/mano/dts/subscriber/ns_subscriber.py
-    rift/mano/dts/subscriber/vnf_subscriber.py
+    rift/mano/config_data/__init__.py
+    rift/mano/config_data/config.py
   COMPONENT ${PKG_LONG_NAME}
   PYTHON3_ONLY
   )
diff --git a/common/python/rift/downloader/__init__.py b/common/python/rift/downloader/__init__.py
new file mode 100644
index 0000000..cc07ed3
--- /dev/null
+++ b/common/python/rift/downloader/__init__.py
@@ -0,0 +1,19 @@
+# 
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+
+from .base import DownloaderProtocol, DownloadStatus
+from .url import UrlDownloader
\ No newline at end of file
diff --git a/common/python/rift/downloader/base.py b/common/python/rift/downloader/base.py
new file mode 100644
index 0000000..c87839f
--- /dev/null
+++ b/common/python/rift/downloader/base.py
@@ -0,0 +1,180 @@
+#
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# Author(s): Varun Prasad
+# Creation Date: 09/25/2016
+#
+
+import abc
+import enum
+import os
+import uuid
+import time
+
+
+class InvalidDestinationError(Exception):
+    pass
+
+
+class DownloaderProtocol:
+    """Listener of this class can implement the following method to get a
+    callback
+    """
+    def on_download_started(self, job):
+        """Called when the download starts
+
+        Args:
+            job (DownloadJob): Yang Model
+
+        """
+        pass
+
+    def on_download_progress(self, job):
+        """Called after each chunk is downloaded
+
+        Args:
+            job (DownloadJob): Yang Model
+
+        """
+        pass
+
+    def on_download_succeeded(self, job):
+        """Called when the download is completed successfully
+
+        Args:
+            job (DownloadJob): Yang Model
+
+        """
+        pass
+
+    def on_download_failed(self, job):
+        """Called when the download fails
+
+        Args:
+            job (DownloadJob): Yang Model
+
+        """
+        pass
+
+    def on_download_cancelled(self, job):
+        """Called when the download is canceled
+
+        Args:
+            job (DownloadJob): Yang Model
+
+        """
+        pass
+
+    def on_download_finished(self, job):
+        """Called when the download finishes regardless of the status of the
+        download (success, failed or canceled)
+
+        Args:
+            job (DownloadJob): Yang Model
+
+        """
+        pass
+
+
+class DownloadStatus(enum.Enum):
+    STARTED = 1
+    IN_PROGRESS = 2
+    COMPLETED = 3
+    FAILED = 4
+    CANCELLED = 5
+
+
+class DownloadMeta:
+    """Model data used by the downloader.
+    """
+    def __init__(self, url, dest_file):
+        self.url = url
+        self.filepath = dest_file
+        self.download_id = str(uuid.uuid4())
+        self.bytes_total = 0
+        self.progress_percent = 0
+        self.bytes_downloaded = 0
+        self.bytes_per_second = 0
+
+
+        self.start_time = 0
+        self.stop_time = 0
+        self.detail = ""
+
+    @property
+    def filename(self):
+        return os.path.basename(self.filepath)
+
+    def start_download(self):
+        self.start_time = time.time()
+
+    def end_download(self):
+        self.end_time = time.time()
+
+    def set_state(self, state):
+        self.status = state
+
+    def update_with_data(self, downloaded_chunk):
+        self.bytes_downloaded += len(downloaded_chunk)
+
+        if self.bytes_total != 0:
+            self.progress_percent = \
+                int((self.bytes_downloaded / self.bytes_total) * 100)
+
+        # compute bps
+        seconds_elapsed = time.time() - self.start_time
+        self.bytes_per_second = self.bytes_downloaded // seconds_elapsed
+
+    def update_data_with_head(self, headers):
+        """Update the model from the header of HEAD request
+
+        Args:
+            headers (dict): headers from HEAD response
+        """
+        if 'Content-Length' in headers:
+            self.bytes_total = int(headers['Content-Length'])
+
+    def as_dict(self):
+        return self.__dict__
+
+
+class AbstractDownloader:
+
+    def __init__(self):
+        self._delegate = None
+
+    @property
+    def delegate(self):
+        return self._delegate
+
+    @delegate.setter
+    def delegate(self, delegate):
+        self._delegate = delegate
+
+    @abc.abstractproperty
+    def download_id(self):
+        pass
+
+    @abc.abstractmethod
+    def cancel_download(self):
+        pass
+
+    @abc.abstractmethod
+    def close(self):
+        pass
+
+    @abc.abstractmethod
+    def download(self):
+        pass
diff --git a/common/python/rift/downloader/url.py b/common/python/rift/downloader/url.py
new file mode 100644
index 0000000..2768894
--- /dev/null
+++ b/common/python/rift/downloader/url.py
@@ -0,0 +1,266 @@
+#
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# Author(s): Varun Prasad
+# Creation Date: 09/25/2016
+#
+
+import io
+import logging
+import os
+import tempfile
+import threading
+import time
+import uuid
+import zlib
+
+import requests
+import requests.exceptions
+from requests.adapters import HTTPAdapter
+from requests.packages.urllib3.util.retry import Retry
+# disable unsigned certificate warning
+from requests.packages.urllib3.exceptions import InsecureRequestWarning
+requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
+
+import gi
+gi.require_version("RwPkgMgmtYang", "1.0")
+
+from gi.repository import RwPkgMgmtYang
+from . import base
+
+
+class UrlDownloader(base.AbstractDownloader):
+    """Handles downloads of URL with some basic retry strategy.
+    """
+    def __init__(self,
+                 url,
+                 file_obj=None,
+                 auth=None,
+                 delete_on_fail=True,
+                 decompress_on_fly=False,
+                 log=None):
+        """
+        Args:
+            model (str or DownloadJob): Url string to download or the Yang model
+            file_obj (str,file handle): Optional, If not set we create a temp
+                location to store the file.
+            delete_on_fail (bool, optional): Clean up the partially downloaded
+                file, if the download failed or was canceled
+            callback_handler (None, optional): Instance of base.DownloaderCallbackHandler
+        """
+        super().__init__()
+
+        self.log = log or logging.getLogger()
+        self.log.setLevel(logging.DEBUG)
+
+        self._fh, filename = self._validate_fn(file_obj)
+        self.meta = base.DownloadMeta(url, filename)
+
+        self.session = self._create_session()
+        self._cancel_event = threading.Event()
+        self.auth = auth
+
+        self.delete_on_fail = delete_on_fail
+
+        self.decompress_on_fly = decompress_on_fly
+        self._decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS)
+
+    def __repr__(self):
+        data = {"model": self.meta.as_dict()}
+        return str(data)
+
+    def _validate_fn(self, file_obj):
+        """
+        If no file object is given then create a temp file
+        if a filename is given open the file in wb mode
+
+        Finally verify the mode open mode of the file
+
+        """
+        if file_obj is None:
+            _, file_obj = tempfile.mkstemp()
+            # Reopen in wb mode
+            file_obj = open(file_obj, "wb")
+
+        # If the fh is a filename
+        if type(file_obj) is str:
+            file_obj = open(file_obj, "wb")
+
+        if type(file_obj) is not io.BufferedWriter:
+            raise base.InvalidDestinationError("Destination file cannot be"
+                        "opened for write")
+
+        return file_obj, file_obj.name
+
+    def _create_session(self):
+        session = requests.Session()
+        retries = Retry(total=5, backoff_factor=1)
+        session.mount("http://", HTTPAdapter(max_retries=retries))
+        session.mount("https://", HTTPAdapter(max_retries=retries))
+
+        return session
+
+    def update_data_from_headers(self, headers):
+        """Update the model from the header of HEAD request
+
+        Args:
+            headers (dict): headers from HEAD response
+        """
+        self.meta.bytes_total = 0
+        if 'Content-Length' in headers:
+            self.meta.bytes_total = int(headers['Content-Length'])
+        self.meta.progress_percent = 0
+        self.meta.bytes_downloaded = 0
+
+    @property
+    def url(self):
+        return self.meta.url
+
+    @property
+    def filepath(self):
+        return self.meta.filepath
+
+    # Start of override methods
+    @property
+    def download_id(self):
+        return self.meta.download_id
+
+    def cancel_download(self):
+        self._cancel_event.set()
+
+    def close(self):
+        self.session.close()
+        self._fh.close()
+
+    def cleanup(self):
+        """Remove the file if the download failed.
+        """
+        if self.meta.status in [base.DownloadStatus.FAILED, base.DownloadStatus.CANCELLED] and self.delete_on_fail:
+            self.log.info("Cleaning up failed download and removing {}".format(
+                    self.filepath))
+
+            try:
+                os.remove(self.filepath)
+            except Exception as e:
+                self.log.exception(e)
+
+    def download(self):
+        """Start the download
+
+        Trigger an HEAD request to get the meta data before starting the download
+        """
+        try:
+            self._download()
+        except Exception as e:
+            self.log.exception(str(e))
+            self.meta.detail = str(e)
+            self.meta.stop_time = time.time()
+
+            self.download_failed()
+
+            # Close all file handles and clean up
+            self.close()
+            self.cleanup()
+
+        self.download_finished()
+
+    # end of override methods
+
+    def check_and_decompress(self, chunk):
+        if self.url.endswith(".gz") and self.decompress_on_fly:
+            chunk = self._decompressor.decompress(chunk)
+
+        return chunk
+
+    def _download(self):
+
+        url_options = {"verify": False}
+
+        if self.auth is not None:
+            url_options["auth"] = self.auth
+
+        response = self.session.head(self.url, **url_options)
+
+        # Prepare the meta data
+        self.meta.update_data_with_head(response.headers)
+        self.meta.start_download()
+
+        self.download_started()
+
+        url_options["stream"] = True,
+        request = self.session.get(self.url, **url_options)
+
+        if request.status_code != requests.codes.ok:
+            request.raise_for_status()
+
+        # actual start time, excluding the HEAD request.
+        for chunk in request.iter_content(chunk_size=1024 * 50):
+            if self._cancel_event.is_set():
+                self.log.info("Download of URL {} to {} has been cancelled".format(
+                    self.url, self.filepath))
+                break
+
+            if chunk:  # filter out keep-alive new chunks
+                self.meta.update_with_data(chunk)
+                self.log.debug("Download progress: {}".format(self.meta.as_dict()))
+
+                chunk = self.check_and_decompress(chunk)
+
+                self._fh.write(chunk)
+                self.download_progress()
+
+        self.meta.end_download()
+        self.close()
+
+        if self._cancel_event.is_set():
+            self.download_cancelled()
+        else:
+            self.download_succeeded()
+
+        self.cleanup()
+
+    # Start of delegate calls
+    def call_delegate(self, event):
+        if not self.delegate:
+            return
+
+        getattr(self.delegate, event)(self.meta)
+
+    def download_failed(self):
+        self.meta.set_state(base.DownloadStatus.FAILED)
+        self.call_delegate("on_download_failed")
+
+    def download_cancelled(self):
+        self.meta.detail = "Download canceled by user."
+        self.meta.set_state(base.DownloadStatus.CANCELLED)
+        self.call_delegate("on_download_cancelled")
+
+    def download_progress(self):
+        self.meta.detail = "Download in progress."
+        self.meta.set_state(base.DownloadStatus.IN_PROGRESS)
+        self.call_delegate("on_download_progress")
+
+    def download_succeeded(self):
+        self.meta.detail = "Download completed successfully."
+        self.meta.set_state(base.DownloadStatus.COMPLETED)
+        self.call_delegate("on_download_succeeded")
+
+    def download_started(self):
+        self.meta.detail = "Setting up download and extracting meta."
+        self.meta.set_state(base.DownloadStatus.STARTED)
+        self.call_delegate("on_download_started")
+
+    def download_finished(self):
+        self.call_delegate("on_download_finished")
diff --git a/common/python/rift/mano/dts/__init__.py b/common/python/rift/mano/dts/__init__.py
index 20d3978..e3ffbbb 100644
--- a/common/python/rift/mano/dts/__init__.py
+++ b/common/python/rift/mano/dts/__init__.py
@@ -25,4 +25,6 @@
         NsdCatalogSubscriber,
         NsInstanceConfigSubscriber)
 from .subscriber.store import SubscriberStore
-from .subscriber.ro_account import ROAccountConfigSubscriber
\ No newline at end of file
+from .subscriber.ro_account import ROAccountConfigSubscriber
+
+from .rpc.core import AbstractRpcHandler
\ No newline at end of file
diff --git a/common/python/rift/mano/dts/rpc/__init__.py b/common/python/rift/mano/dts/rpc/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/common/python/rift/mano/dts/rpc/__init__.py
diff --git a/common/python/rift/mano/dts/rpc/core.py b/common/python/rift/mano/dts/rpc/core.py
new file mode 100644
index 0000000..dfa08bb
--- /dev/null
+++ b/common/python/rift/mano/dts/rpc/core.py
@@ -0,0 +1,107 @@
+"""
+#
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+@file core.py
+@author Varun Prasad (varun.prasad@riftio.com)
+@date 28-Sep-2016
+
+"""
+
+import abc
+import asyncio
+
+import gi
+gi.require_version("RwDts", "1.0")
+
+from gi.repository import RwDts as rwdts
+import rift.tasklets
+
+from ..core import DtsHandler
+
+
+class AbstractRpcHandler(DtsHandler):
+    """Base class to simplify RPC implementation
+    """
+    def __init__(self, log, dts, loop):
+        super().__init__(log, dts, loop)
+
+        if not asyncio.iscoroutinefunction(self.callback):
+            raise ValueError('%s has to be a coroutine' % (self.callback))
+
+    @abc.abstractproperty
+    def xpath(self):
+        pass
+
+    @property
+    def input_xpath(self):
+        return "I,{}".format(self.xpath)
+
+    @property
+    def output_xpath(self):
+        return "O,{}".format(self.xpath)
+
+    def flags(self):
+        return rwdts.Flag.PUBLISHER
+
+    @asyncio.coroutine
+    def on_prepare(self, xact_info, action, ks_path, msg):
+        assert action == rwdts.QueryAction.RPC
+
+        try:
+            rpc_op = yield from self.callback(ks_path, msg)
+            xact_info.respond_xpath(
+                rwdts.XactRspCode.ACK,
+                self.output_xpath,
+                rpc_op)
+
+        except Exception as e:
+            self.log.exception(e)
+            xact_info.respond_xpath(
+                rwdts.XactRspCode.NACK,
+                self.output_xpath)
+
+    @asyncio.coroutine
+    def register(self):
+        reg_event = asyncio.Event(loop=self.loop)
+
+        @asyncio.coroutine
+        def on_ready(regh, status):
+            reg_event.set()
+
+        handler = rift.tasklets.DTS.RegistrationHandler(
+                on_prepare=self.on_prepare,
+                on_ready=on_ready)
+
+        with self.dts.group_create() as group:
+            self.reg = group.register(
+                  xpath=self.input_xpath,
+                  handler=handler,
+                  flags=self.flags())
+
+        yield from reg_event.wait()
+
+    @abc.abstractmethod
+    @asyncio.coroutine
+    def callback(self, ks_path, msg):
+        """Subclass needs to override this method
+
+        Args:
+            ks_path : Key spec path
+            msg : RPC input
+        """
+        pass
+
diff --git a/common/python/test/CMakeLists.txt b/common/python/test/CMakeLists.txt
index 1abb50d..e854c2a 100644
--- a/common/python/test/CMakeLists.txt
+++ b/common/python/test/CMakeLists.txt
@@ -7,3 +7,12 @@
   TEST_ARGS
   ${CMAKE_CURRENT_SOURCE_DIR}/utest_juju_api.py
   )
+
+
+##
+# utest_url_downloader
+##
+rift_py3test(utest_url_downloader.py
+  TEST_ARGS
+  ${CMAKE_CURRENT_SOURCE_DIR}/utest_url_downloader.py
+  )
diff --git a/common/python/test/utest_url_downloader.py b/common/python/test/utest_url_downloader.py
new file mode 100755
index 0000000..33e24a8
--- /dev/null
+++ b/common/python/test/utest_url_downloader.py
@@ -0,0 +1,136 @@
+#!/usr/bin/env python3
+
+#
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+
+import argparse
+import asyncio
+import logging
+import os
+import sys
+import unittest
+import xmlrunner
+
+import rift.downloader as downloader
+
+TEST_URL = "https://raw.githubusercontent.com/RIFTIO/RIFT.ware/master/rift-shell"
+
+class TestCase(unittest.TestCase):
+    def setUp(self):
+        pass
+
+    def _common_checks(self, job):
+        if job.status != "COMPLETED":
+            return
+
+        # assert job.bytes_downloaded == job.bytes_total
+        assert job.stop_time > 0
+        assert job.start_time > 0
+        assert job.stop_time >= job.start_time
+
+    def test_file_download(self):
+        """
+        Asserts:
+            1. Successful download
+            2. Model attributes (Process percent, detail, status)
+        """
+        url_downl = downloader.UrlDownloader(TEST_URL)
+        url_downl.download()
+        assert os.path.isfile(url_downl.filename)
+
+
+        assert url_downl.meta.status == downloader.DownloadStatus.COMPLETED
+        # assert url_downl.job.progress_percent == 100
+        assert "success" in url_downl.meta.detail
+        self._common_checks(url_downl.meta)
+
+    def test_file_not_exists(self):
+        """
+        Asserts:
+            1. 404 download with retries
+            2. Model attributes (Process percent, detail, status)
+        """
+        url_downl = downloader.UrlDownloader(TEST_URL + ".blah")
+        url_downl.download()
+
+        assert not os.path.isfile(url_downl.filename)
+        assert url_downl.meta.status == downloader.DownloadStatus.FAILED
+        assert "Max retries" in url_downl.meta.detail or "404" in url_downl.meta.detail
+
+        self._common_checks(url_downl.meta)
+
+    def test_cancellation(self):
+        """
+        Asserts:
+            1. Cancel for a download and clean up of the downloaded file.
+            2. Model attributes (Process percent, detail, status)
+        """
+        url = "http://speedtest.ftp.otenet.gr/files/test1Mb.db"
+        url_dwld = downloader.UrlDownloader(url)
+        loop = asyncio.get_event_loop()
+        fut = loop.run_in_executor(None, url_dwld.download)
+
+        def cancel():
+            fut.cancel()
+            url_dwld.cancel_download()
+
+        @asyncio.coroutine
+        def sleep():
+            yield from asyncio.sleep(2)
+            cancel()
+            yield from asyncio.sleep(2)
+
+        loop.run_until_complete(sleep())
+
+        assert url_dwld.meta.status == downloader.DownloadStatus.CANCELLED
+        assert url_dwld.meta.bytes_downloaded == url_dwld.meta.bytes_downloaded
+        assert "cancel" in url_dwld.meta.detail
+        self._common_checks(url_dwld.meta)
+
+    def test_auth_url(self):
+        url_downl = downloader.UrlDownloader(
+                'https://api.github.com/user')
+
+        url_downl.download()
+
+
+    def tearDown(self):
+        pass
+
+
+def main(argv=sys.argv[1:]):
+    logging.basicConfig(format='TEST %(message)s')
+
+    runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
+    parser = argparse.ArgumentParser()
+    parser.add_argument('-v', '--verbose', action='store_true')
+    parser.add_argument('-n', '--no-runner', action='store_true')
+
+    args, unknown = parser.parse_known_args(argv)
+    if args.no_runner:
+        runner = None
+
+    # Set the global logging level
+    logging.getLogger().setLevel(logging.DEBUG if args.verbose else logging.ERROR)
+
+    # The unittest framework requires a program name, so use the name of this
+    # file instead (we do not want to have to pass a fake program name to main
+    # when this is called from the interpreter).
+    unittest.main(argv=[__file__] + unknown + ["-v"], testRunner=runner)
+
+if __name__ == '__main__':
+    main()