141 - Support for Package Management in SO
authorvelandy <rajesh.velandy@riftio.com>
Wed, 4 Jan 2017 19:25:07 +0000 (19:25 +0000)
committervelandy <rajesh.velandy@riftio.com>
Wed, 4 Jan 2017 19:25:07 +0000 (19:25 +0000)
81 files changed:
common/python/CMakeLists.txt
common/python/rift/downloader/__init__.py [new file with mode: 0644]
common/python/rift/downloader/base.py [new file with mode: 0644]
common/python/rift/downloader/url.py [new file with mode: 0644]
common/python/rift/mano/dts/__init__.py
common/python/rift/mano/dts/rpc/__init__.py [new file with mode: 0644]
common/python/rift/mano/dts/rpc/core.py [new file with mode: 0644]
common/python/test/CMakeLists.txt
common/python/test/utest_url_downloader.py [new file with mode: 0755]
examples/ping_pong_ns/CMakeLists.txt
examples/ping_pong_ns/rift/mano/examples/ping_pong_nsd.py
examples/ping_pong_ns/rift/mano/examples/ping_set_rate.py [new file with mode: 0644]
models/plugins/yang/mano-types.yang
models/plugins/yang/nsd.yang
rwcm/plugins/rwconman/rift/tasklets/rwconmantasklet/rwconman_config.py
rwcm/plugins/rwconman/rift/tasklets/rwconmantasklet/rwconmantasklet.py
rwlaunchpad/plugins/CMakeLists.txt
rwlaunchpad/plugins/cli/cli_launchpad_schema_listing.txt
rwlaunchpad/plugins/rwlaunchpadtasklet/CMakeLists.txt
rwlaunchpad/plugins/rwlaunchpadtasklet/rift/package/handler.py [new file with mode: 0644]
rwlaunchpad/plugins/rwlaunchpadtasklet/rift/package/package.py
rwlaunchpad/plugins/rwlaunchpadtasklet/rift/package/store.py
rwlaunchpad/plugins/rwlaunchpadtasklet/rift/tasklets/rwlaunchpad/export.py
rwlaunchpad/plugins/rwlaunchpadtasklet/rift/tasklets/rwlaunchpad/image.py
rwlaunchpad/plugins/rwlaunchpadtasklet/rift/tasklets/rwlaunchpad/message.py
rwlaunchpad/plugins/rwlaunchpadtasklet/rift/tasklets/rwlaunchpad/tasklet.py
rwlaunchpad/plugins/rwlaunchpadtasklet/rift/tasklets/rwlaunchpad/uploader.py
rwlaunchpad/plugins/rwlaunchpadtasklet/test/CMakeLists.txt
rwlaunchpad/plugins/rwlaunchpadtasklet/test/utest_fileserver.py [new file with mode: 0755]
rwlaunchpad/plugins/rwlaunchpadtasklet/test/utest_uploader_app_dts.py [new file with mode: 0755]
rwlaunchpad/plugins/rwmonparam/test/utest_mon_params_dts.py
rwlaunchpad/plugins/rwpkgmgr/CMakeLists.txt [new file with mode: 0644]
rwlaunchpad/plugins/rwpkgmgr/Makefile [new file with mode: 0644]
rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/__init__.py [new file with mode: 0644]
rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/downloader/__init__.py [new file with mode: 0644]
rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/downloader/url.py [new file with mode: 0644]
rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/proxy/__init__.py [new file with mode: 0644]
rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/proxy/base.py [new file with mode: 0644]
rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/proxy/filesystem.py [new file with mode: 0644]
rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/publisher/__init__.py [new file with mode: 0644]
rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/publisher/download_status.py [new file with mode: 0644]
rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/rpc.py [new file with mode: 0644]
rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/rwpkgmgr.py [new file with mode: 0644]
rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/subscriber/__init__.py [new file with mode: 0644]
rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/subscriber/download_status.py [new file with mode: 0644]
rwlaunchpad/plugins/rwpkgmgr/rwpkgmgr.py [new file with mode: 0644]
rwlaunchpad/plugins/rwpkgmgr/test/CMakeLists.txt [new file with mode: 0644]
rwlaunchpad/plugins/rwpkgmgr/test/utest_filesystem_proxy_dts.py [new file with mode: 0755]
rwlaunchpad/plugins/rwpkgmgr/test/utest_publisher_dts.py [new file with mode: 0755]
rwlaunchpad/plugins/rwpkgmgr/test/utest_subscriber_dts.py [new file with mode: 0755]
rwlaunchpad/plugins/rwstagingmgr/CMakeLists.txt [new file with mode: 0644]
rwlaunchpad/plugins/rwstagingmgr/Makefile [new file with mode: 0644]
rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/__init__.py [new file with mode: 0644]
rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/model/__init__.py [new file with mode: 0644]
rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/model/staging_area.py [new file with mode: 0644]
rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/protocol.py [new file with mode: 0644]
rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/publisher/__init__.py [new file with mode: 0644]
rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/publisher/staging_status.py [new file with mode: 0644]
rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/rpc.py [new file with mode: 0644]
rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/rwstagingmgr.py [new file with mode: 0644]
rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/server/__init__.py [new file with mode: 0644]
rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/server/app.py [new file with mode: 0644]
rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/server/handler.py [new file with mode: 0644]
rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/store/__init__.py [new file with mode: 0644]
rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/store/file_store.py [new file with mode: 0644]
rwlaunchpad/plugins/rwstagingmgr/rwstagingmgr.py [new file with mode: 0644]
rwlaunchpad/plugins/rwstagingmgr/test/CMakeLists.txt [new file with mode: 0644]
rwlaunchpad/plugins/rwstagingmgr/test/utest_publisher_dts.py [new file with mode: 0755]
rwlaunchpad/plugins/rwstagingmgr/test/utest_rpc_dts.py [new file with mode: 0755]
rwlaunchpad/plugins/rwstagingmgr/test/utest_staging_store.py [new file with mode: 0755]
rwlaunchpad/plugins/rwstagingmgr/test/utest_tornado_app.py [new file with mode: 0755]
rwlaunchpad/plugins/yang/CMakeLists.txt
rwlaunchpad/plugins/yang/rw-launchpad.yang
rwlaunchpad/plugins/yang/rw-pkg-mgmt.tailf.yang [new file with mode: 0644]
rwlaunchpad/plugins/yang/rw-pkg-mgmt.yang [new file with mode: 0644]
rwlaunchpad/plugins/yang/rw-staging-mgmt.tailf.yang [new file with mode: 0644]
rwlaunchpad/plugins/yang/rw-staging-mgmt.yang [new file with mode: 0644]
rwlaunchpad/ra/racfg/pingpong_records_systest_openstack.racfg
rwlaunchpad/ra/racfg/pingpong_records_systest_openstack_xml.racfg
rwlaunchpad/ra/racfg/pingpong_vnf_reload_systest_openstack_xml.racfg
rwlaunchpad/test/launchpad.py

index 85ead68..3343119 100644 (file)
@@ -32,6 +32,7 @@ rift_python_install_tree(
   )
 
 
+# Subscribers
 rift_python_install_tree(
   FILES
     rift/mano/dts/__init__.py
@@ -46,23 +47,29 @@ rift_python_install_tree(
   PYTHON3_ONLY
   )
 
+# RPCs
 rift_python_install_tree(
   FILES
-    rift/mano/config_data/__init__.py
-    rift/mano/config_data/config.py
+    rift/mano/dts/rpc/__init__.py
+    rift/mano/dts/rpc/core.py
   COMPONENT ${PKG_LONG_NAME}
   PYTHON3_ONLY
   )
 
+# Downloaders
 rift_python_install_tree(
   FILES
-    rift/mano/dts/__init__.py
-    rift/mano/dts/core.py
-    rift/mano/dts/subscriber/__init__.py
-    rift/mano/dts/subscriber/core.py
-    rift/mano/dts/subscriber/store.py
-    rift/mano/dts/subscriber/ns_subscriber.py
-    rift/mano/dts/subscriber/vnf_subscriber.py
+    rift/downloader/__init__.py
+    rift/downloader/base.py
+    rift/downloader/url.py
+  COMPONENT ${PKG_LONG_NAME}
+  PYTHON3_ONLY
+  )
+
+rift_python_install_tree(
+  FILES
+    rift/mano/config_data/__init__.py
+    rift/mano/config_data/config.py
   COMPONENT ${PKG_LONG_NAME}
   PYTHON3_ONLY
   )
diff --git a/common/python/rift/downloader/__init__.py b/common/python/rift/downloader/__init__.py
new file mode 100644 (file)
index 0000000..cc07ed3
--- /dev/null
@@ -0,0 +1,19 @@
+# 
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+
+from .base import DownloaderProtocol, DownloadStatus
+from .url import UrlDownloader
\ No newline at end of file
diff --git a/common/python/rift/downloader/base.py b/common/python/rift/downloader/base.py
new file mode 100644 (file)
index 0000000..c87839f
--- /dev/null
@@ -0,0 +1,180 @@
+#
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# Author(s): Varun Prasad
+# Creation Date: 09/25/2016
+#
+
+import abc
+import enum
+import os
+import uuid
+import time
+
+
+class InvalidDestinationError(Exception):
+    pass
+
+
+class DownloaderProtocol:
+    """Listener of this class can implement the following method to get a
+    callback
+    """
+    def on_download_started(self, job):
+        """Called when the download starts
+
+        Args:
+            job (DownloadJob): Yang Model
+
+        """
+        pass
+
+    def on_download_progress(self, job):
+        """Called after each chunk is downloaded
+
+        Args:
+            job (DownloadJob): Yang Model
+
+        """
+        pass
+
+    def on_download_succeeded(self, job):
+        """Called when the download is completed successfully
+
+        Args:
+            job (DownloadJob): Yang Model
+
+        """
+        pass
+
+    def on_download_failed(self, job):
+        """Called when the download fails
+
+        Args:
+            job (DownloadJob): Yang Model
+
+        """
+        pass
+
+    def on_download_cancelled(self, job):
+        """Called when the download is canceled
+
+        Args:
+            job (DownloadJob): Yang Model
+
+        """
+        pass
+
+    def on_download_finished(self, job):
+        """Called when the download finishes regardless of the status of the
+        download (success, failed or canceled)
+
+        Args:
+            job (DownloadJob): Yang Model
+
+        """
+        pass
+
+
+class DownloadStatus(enum.Enum):
+    STARTED = 1
+    IN_PROGRESS = 2
+    COMPLETED = 3
+    FAILED = 4
+    CANCELLED = 5
+
+
+class DownloadMeta:
+    """Model data used by the downloader.
+    """
+    def __init__(self, url, dest_file):
+        self.url = url
+        self.filepath = dest_file
+        self.download_id = str(uuid.uuid4())
+        self.bytes_total = 0
+        self.progress_percent = 0
+        self.bytes_downloaded = 0
+        self.bytes_per_second = 0
+
+
+        self.start_time = 0
+        self.stop_time = 0
+        self.detail = ""
+
+    @property
+    def filename(self):
+        return os.path.basename(self.filepath)
+
+    def start_download(self):
+        self.start_time = time.time()
+
+    def end_download(self):
+        self.end_time = time.time()
+
+    def set_state(self, state):
+        self.status = state
+
+    def update_with_data(self, downloaded_chunk):
+        self.bytes_downloaded += len(downloaded_chunk)
+
+        if self.bytes_total != 0:
+            self.progress_percent = \
+                int((self.bytes_downloaded / self.bytes_total) * 100)
+
+        # compute bps
+        seconds_elapsed = time.time() - self.start_time
+        self.bytes_per_second = self.bytes_downloaded // seconds_elapsed
+
+    def update_data_with_head(self, headers):
+        """Update the model from the header of HEAD request
+
+        Args:
+            headers (dict): headers from HEAD response
+        """
+        if 'Content-Length' in headers:
+            self.bytes_total = int(headers['Content-Length'])
+
+    def as_dict(self):
+        return self.__dict__
+
+
+class AbstractDownloader:
+
+    def __init__(self):
+        self._delegate = None
+
+    @property
+    def delegate(self):
+        return self._delegate
+
+    @delegate.setter
+    def delegate(self, delegate):
+        self._delegate = delegate
+
+    @abc.abstractproperty
+    def download_id(self):
+        pass
+
+    @abc.abstractmethod
+    def cancel_download(self):
+        pass
+
+    @abc.abstractmethod
+    def close(self):
+        pass
+
+    @abc.abstractmethod
+    def download(self):
+        pass
diff --git a/common/python/rift/downloader/url.py b/common/python/rift/downloader/url.py
new file mode 100644 (file)
index 0000000..2768894
--- /dev/null
@@ -0,0 +1,266 @@
+#
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# Author(s): Varun Prasad
+# Creation Date: 09/25/2016
+#
+
+import io
+import logging
+import os
+import tempfile
+import threading
+import time
+import uuid
+import zlib
+
+import requests
+import requests.exceptions
+from requests.adapters import HTTPAdapter
+from requests.packages.urllib3.util.retry import Retry
+# disable unsigned certificate warning
+from requests.packages.urllib3.exceptions import InsecureRequestWarning
+requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
+
+import gi
+gi.require_version("RwPkgMgmtYang", "1.0")
+
+from gi.repository import RwPkgMgmtYang
+from . import base
+
+
+class UrlDownloader(base.AbstractDownloader):
+    """Handles downloads of URL with some basic retry strategy.
+    """
+    def __init__(self,
+                 url,
+                 file_obj=None,
+                 auth=None,
+                 delete_on_fail=True,
+                 decompress_on_fly=False,
+                 log=None):
+        """
+        Args:
+            model (str or DownloadJob): Url string to download or the Yang model
+            file_obj (str,file handle): Optional, If not set we create a temp
+                location to store the file.
+            delete_on_fail (bool, optional): Clean up the partially downloaded
+                file, if the download failed or was canceled
+            callback_handler (None, optional): Instance of base.DownloaderCallbackHandler
+        """
+        super().__init__()
+
+        self.log = log or logging.getLogger()
+        self.log.setLevel(logging.DEBUG)
+
+        self._fh, filename = self._validate_fn(file_obj)
+        self.meta = base.DownloadMeta(url, filename)
+
+        self.session = self._create_session()
+        self._cancel_event = threading.Event()
+        self.auth = auth
+
+        self.delete_on_fail = delete_on_fail
+
+        self.decompress_on_fly = decompress_on_fly
+        self._decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS)
+
+    def __repr__(self):
+        data = {"model": self.meta.as_dict()}
+        return str(data)
+
+    def _validate_fn(self, file_obj):
+        """
+        If no file object is given then create a temp file
+        if a filename is given open the file in wb mode
+
+        Finally verify the mode open mode of the file
+
+        """
+        if file_obj is None:
+            _, file_obj = tempfile.mkstemp()
+            # Reopen in wb mode
+            file_obj = open(file_obj, "wb")
+
+        # If the fh is a filename
+        if type(file_obj) is str:
+            file_obj = open(file_obj, "wb")
+
+        if type(file_obj) is not io.BufferedWriter:
+            raise base.InvalidDestinationError("Destination file cannot be"
+                        "opened for write")
+
+        return file_obj, file_obj.name
+
+    def _create_session(self):
+        session = requests.Session()
+        retries = Retry(total=5, backoff_factor=1)
+        session.mount("http://", HTTPAdapter(max_retries=retries))
+        session.mount("https://", HTTPAdapter(max_retries=retries))
+
+        return session
+
+    def update_data_from_headers(self, headers):
+        """Update the model from the header of HEAD request
+
+        Args:
+            headers (dict): headers from HEAD response
+        """
+        self.meta.bytes_total = 0
+        if 'Content-Length' in headers:
+            self.meta.bytes_total = int(headers['Content-Length'])
+        self.meta.progress_percent = 0
+        self.meta.bytes_downloaded = 0
+
+    @property
+    def url(self):
+        return self.meta.url
+
+    @property
+    def filepath(self):
+        return self.meta.filepath
+
+    # Start of override methods
+    @property
+    def download_id(self):
+        return self.meta.download_id
+
+    def cancel_download(self):
+        self._cancel_event.set()
+
+    def close(self):
+        self.session.close()
+        self._fh.close()
+
+    def cleanup(self):
+        """Remove the file if the download failed.
+        """
+        if self.meta.status in [base.DownloadStatus.FAILED, base.DownloadStatus.CANCELLED] and self.delete_on_fail:
+            self.log.info("Cleaning up failed download and removing {}".format(
+                    self.filepath))
+
+            try:
+                os.remove(self.filepath)
+            except Exception as e:
+                self.log.exception(e)
+
+    def download(self):
+        """Start the download
+
+        Trigger an HEAD request to get the meta data before starting the download
+        """
+        try:
+            self._download()
+        except Exception as e:
+            self.log.exception(str(e))
+            self.meta.detail = str(e)
+            self.meta.stop_time = time.time()
+
+            self.download_failed()
+
+            # Close all file handles and clean up
+            self.close()
+            self.cleanup()
+
+        self.download_finished()
+
+    # end of override methods
+
+    def check_and_decompress(self, chunk):
+        if self.url.endswith(".gz") and self.decompress_on_fly:
+            chunk = self._decompressor.decompress(chunk)
+
+        return chunk
+
+    def _download(self):
+
+        url_options = {"verify": False}
+
+        if self.auth is not None:
+            url_options["auth"] = self.auth
+
+        response = self.session.head(self.url, **url_options)
+
+        # Prepare the meta data
+        self.meta.update_data_with_head(response.headers)
+        self.meta.start_download()
+
+        self.download_started()
+
+        url_options["stream"] = True,
+        request = self.session.get(self.url, **url_options)
+
+        if request.status_code != requests.codes.ok:
+            request.raise_for_status()
+
+        # actual start time, excluding the HEAD request.
+        for chunk in request.iter_content(chunk_size=1024 * 50):
+            if self._cancel_event.is_set():
+                self.log.info("Download of URL {} to {} has been cancelled".format(
+                    self.url, self.filepath))
+                break
+
+            if chunk:  # filter out keep-alive new chunks
+                self.meta.update_with_data(chunk)
+                self.log.debug("Download progress: {}".format(self.meta.as_dict()))
+
+                chunk = self.check_and_decompress(chunk)
+
+                self._fh.write(chunk)
+                self.download_progress()
+
+        self.meta.end_download()
+        self.close()
+
+        if self._cancel_event.is_set():
+            self.download_cancelled()
+        else:
+            self.download_succeeded()
+
+        self.cleanup()
+
+    # Start of delegate calls
+    def call_delegate(self, event):
+        if not self.delegate:
+            return
+
+        getattr(self.delegate, event)(self.meta)
+
+    def download_failed(self):
+        self.meta.set_state(base.DownloadStatus.FAILED)
+        self.call_delegate("on_download_failed")
+
+    def download_cancelled(self):
+        self.meta.detail = "Download canceled by user."
+        self.meta.set_state(base.DownloadStatus.CANCELLED)
+        self.call_delegate("on_download_cancelled")
+
+    def download_progress(self):
+        self.meta.detail = "Download in progress."
+        self.meta.set_state(base.DownloadStatus.IN_PROGRESS)
+        self.call_delegate("on_download_progress")
+
+    def download_succeeded(self):
+        self.meta.detail = "Download completed successfully."
+        self.meta.set_state(base.DownloadStatus.COMPLETED)
+        self.call_delegate("on_download_succeeded")
+
+    def download_started(self):
+        self.meta.detail = "Setting up download and extracting meta."
+        self.meta.set_state(base.DownloadStatus.STARTED)
+        self.call_delegate("on_download_started")
+
+    def download_finished(self):
+        self.call_delegate("on_download_finished")
index 20d3978..e3ffbbb 100644 (file)
@@ -25,4 +25,6 @@ from .subscriber.ns_subscriber import (
         NsdCatalogSubscriber,
         NsInstanceConfigSubscriber)
 from .subscriber.store import SubscriberStore
-from .subscriber.ro_account import ROAccountConfigSubscriber
\ No newline at end of file
+from .subscriber.ro_account import ROAccountConfigSubscriber
+
+from .rpc.core import AbstractRpcHandler
\ No newline at end of file
diff --git a/common/python/rift/mano/dts/rpc/__init__.py b/common/python/rift/mano/dts/rpc/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/common/python/rift/mano/dts/rpc/core.py b/common/python/rift/mano/dts/rpc/core.py
new file mode 100644 (file)
index 0000000..dfa08bb
--- /dev/null
@@ -0,0 +1,107 @@
+"""
+#
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+@file core.py
+@author Varun Prasad (varun.prasad@riftio.com)
+@date 28-Sep-2016
+
+"""
+
+import abc
+import asyncio
+
+import gi
+gi.require_version("RwDts", "1.0")
+
+from gi.repository import RwDts as rwdts
+import rift.tasklets
+
+from ..core import DtsHandler
+
+
+class AbstractRpcHandler(DtsHandler):
+    """Base class to simplify RPC implementation
+    """
+    def __init__(self, log, dts, loop):
+        super().__init__(log, dts, loop)
+
+        if not asyncio.iscoroutinefunction(self.callback):
+            raise ValueError('%s has to be a coroutine' % (self.callback))
+
+    @abc.abstractproperty
+    def xpath(self):
+        pass
+
+    @property
+    def input_xpath(self):
+        return "I,{}".format(self.xpath)
+
+    @property
+    def output_xpath(self):
+        return "O,{}".format(self.xpath)
+
+    def flags(self):
+        return rwdts.Flag.PUBLISHER
+
+    @asyncio.coroutine
+    def on_prepare(self, xact_info, action, ks_path, msg):
+        assert action == rwdts.QueryAction.RPC
+
+        try:
+            rpc_op = yield from self.callback(ks_path, msg)
+            xact_info.respond_xpath(
+                rwdts.XactRspCode.ACK,
+                self.output_xpath,
+                rpc_op)
+
+        except Exception as e:
+            self.log.exception(e)
+            xact_info.respond_xpath(
+                rwdts.XactRspCode.NACK,
+                self.output_xpath)
+
+    @asyncio.coroutine
+    def register(self):
+        reg_event = asyncio.Event(loop=self.loop)
+
+        @asyncio.coroutine
+        def on_ready(regh, status):
+            reg_event.set()
+
+        handler = rift.tasklets.DTS.RegistrationHandler(
+                on_prepare=self.on_prepare,
+                on_ready=on_ready)
+
+        with self.dts.group_create() as group:
+            self.reg = group.register(
+                  xpath=self.input_xpath,
+                  handler=handler,
+                  flags=self.flags())
+
+        yield from reg_event.wait()
+
+    @abc.abstractmethod
+    @asyncio.coroutine
+    def callback(self, ks_path, msg):
+        """Subclass needs to override this method
+
+        Args:
+            ks_path : Key spec path
+            msg : RPC input
+        """
+        pass
+
index 1abb50d..e854c2a 100644 (file)
@@ -7,3 +7,12 @@ rift_py3test(utest_juju_api
   TEST_ARGS
   ${CMAKE_CURRENT_SOURCE_DIR}/utest_juju_api.py
   )
+
+
+##
+# utest_url_downloader
+##
+rift_py3test(utest_url_downloader.py
+  TEST_ARGS
+  ${CMAKE_CURRENT_SOURCE_DIR}/utest_url_downloader.py
+  )
diff --git a/common/python/test/utest_url_downloader.py b/common/python/test/utest_url_downloader.py
new file mode 100755 (executable)
index 0000000..33e24a8
--- /dev/null
@@ -0,0 +1,136 @@
+#!/usr/bin/env python3
+
+#
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+
+import argparse
+import asyncio
+import logging
+import os
+import sys
+import unittest
+import xmlrunner
+
+import rift.downloader as downloader
+
+TEST_URL = "https://raw.githubusercontent.com/RIFTIO/RIFT.ware/master/rift-shell"
+
+class TestCase(unittest.TestCase):
+    def setUp(self):
+        pass
+
+    def _common_checks(self, job):
+        if job.status != "COMPLETED":
+            return
+
+        # assert job.bytes_downloaded == job.bytes_total
+        assert job.stop_time > 0
+        assert job.start_time > 0
+        assert job.stop_time >= job.start_time
+
+    def test_file_download(self):
+        """
+        Asserts:
+            1. Successful download
+            2. Model attributes (Process percent, detail, status)
+        """
+        url_downl = downloader.UrlDownloader(TEST_URL)
+        url_downl.download()
+        assert os.path.isfile(url_downl.filename)
+
+
+        assert url_downl.meta.status == downloader.DownloadStatus.COMPLETED
+        # assert url_downl.job.progress_percent == 100
+        assert "success" in url_downl.meta.detail
+        self._common_checks(url_downl.meta)
+
+    def test_file_not_exists(self):
+        """
+        Asserts:
+            1. 404 download with retries
+            2. Model attributes (Process percent, detail, status)
+        """
+        url_downl = downloader.UrlDownloader(TEST_URL + ".blah")
+        url_downl.download()
+
+        assert not os.path.isfile(url_downl.filename)
+        assert url_downl.meta.status == downloader.DownloadStatus.FAILED
+        assert "Max retries" in url_downl.meta.detail or "404" in url_downl.meta.detail
+
+        self._common_checks(url_downl.meta)
+
+    def test_cancellation(self):
+        """
+        Asserts:
+            1. Cancel for a download and clean up of the downloaded file.
+            2. Model attributes (Process percent, detail, status)
+        """
+        url = "http://speedtest.ftp.otenet.gr/files/test1Mb.db"
+        url_dwld = downloader.UrlDownloader(url)
+        loop = asyncio.get_event_loop()
+        fut = loop.run_in_executor(None, url_dwld.download)
+
+        def cancel():
+            fut.cancel()
+            url_dwld.cancel_download()
+
+        @asyncio.coroutine
+        def sleep():
+            yield from asyncio.sleep(2)
+            cancel()
+            yield from asyncio.sleep(2)
+
+        loop.run_until_complete(sleep())
+
+        assert url_dwld.meta.status == downloader.DownloadStatus.CANCELLED
+        assert url_dwld.meta.bytes_downloaded == url_dwld.meta.bytes_downloaded
+        assert "cancel" in url_dwld.meta.detail
+        self._common_checks(url_dwld.meta)
+
+    def test_auth_url(self):
+        url_downl = downloader.UrlDownloader(
+                'https://api.github.com/user')
+
+        url_downl.download()
+
+
+    def tearDown(self):
+        pass
+
+
+def main(argv=sys.argv[1:]):
+    logging.basicConfig(format='TEST %(message)s')
+
+    runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
+    parser = argparse.ArgumentParser()
+    parser.add_argument('-v', '--verbose', action='store_true')
+    parser.add_argument('-n', '--no-runner', action='store_true')
+
+    args, unknown = parser.parse_known_args(argv)
+    if args.no_runner:
+        runner = None
+
+    # Set the global logging level
+    logging.getLogger().setLevel(logging.DEBUG if args.verbose else logging.ERROR)
+
+    # The unittest framework requires a program name, so use the name of this
+    # file instead (we do not want to have to pass a fake program name to main
+    # when this is called from the interpreter).
+    unittest.main(argv=[__file__] + unknown + ["-v"], testRunner=runner)
+
+if __name__ == '__main__':
+    main()
index e373c2a..34f7a03 100644 (file)
@@ -68,6 +68,7 @@ rift_python_install_tree(
   FILES
     rift/mano/examples/ping_pong_nsd.py
     rift/mano/examples/start_traffic.py
+    rift/mano/examples/ping_set_rate.py
   )
 
 install(
index d84a912..ef2dd90 100755 (executable)
@@ -120,6 +120,7 @@ class VirtualNetworkFunction(ManoDescriptor):
         self.vnfd = None
         self.instance_count = instance_count
         self._placement_groups = []
+        self.use_vnf_init_conf = False
         super(VirtualNetworkFunction, self).__init__(name)
 
     def add_placement_group(self, group):
@@ -324,6 +325,22 @@ class VirtualNetworkFunction(ManoDescriptor):
         with open(script_file, "w") as f:
             f.write("{}".format(cfg))
 
+        # Copy the vnf_init_config script
+        if self.use_vnf_init_conf and ('ping' in self.name):
+            script_name = 'ping_set_rate.py'
+
+            src_path = os.path.dirname(os.path.abspath(os.path.realpath(__file__)))
+            script_src = os.path.join(src_path, script_name)
+            if not os.path.exists(script_src):
+                src_path = os.path.join(os.environ['RIFT_ROOT'],
+                                        'modules/core/mano/examples/ping_pong_ns/rift/mano/examples')
+                script_src = os.path.join(src_path, script_name)
+
+            dest_path = os.path.join(outdir, self.name, 'scripts')
+            os.makedirs(dest_path, exist_ok=True)
+
+            shutil.copy2(script_src, dest_path)
+
 
 class NetworkService(ManoDescriptor):
     def __init__(self, name):
@@ -332,7 +349,7 @@ class NetworkService(ManoDescriptor):
         self.vnfd_config = {}
         self._placement_groups = []
 
-    def ping_config(self, mano_ut, use_ns_init_conf):
+    def ping_config(self, mano_ut, use_ns_init_conf, use_vnf_init_conf):
         suffix = ''
         if mano_ut:
             ping_cfg = r'''
@@ -366,7 +383,10 @@ then
     echo "Failed to set server info for ping!"
     exit $rc
 fi
+''' % suffix
 
+            if use_vnf_init_conf is False:
+                 ping_cfg +='''
 curl -D /dev/stdout \
     -H "Accept: application/vnd.yang.data+xml" \
     -H "Content-Type: application/vnd.yang.data+json" \
@@ -380,7 +400,7 @@ then
     exit $rc
 fi
 
-''' % suffix
+'''
             if use_ns_init_conf:
                 ping_cfg += "exit 0\n"
             else:
@@ -529,7 +549,9 @@ exit 0
             })
             vnf_config.service_primitive.append(config)
 
-    def default_config(self, const_vnfd, vnfd, mano_ut, use_ns_init_conf):
+    def default_config(self, const_vnfd, vnfd, mano_ut,
+                       use_ns_init_conf,
+                       use_vnf_init_conf):
         vnf_config = vnfd.vnfd.vnf_configuration
 
         vnf_config.config_attributes.config_priority = 0
@@ -552,7 +574,23 @@ exit 0
 
         if vnfd.name == 'ping_vnfd' or vnfd.name == 'ping_vnfd_with_epa' or vnfd.name == 'ping_vnfd_aws':
             vnf_config.config_attributes.config_priority = 2
-            vnf_config.config_template = self.ping_config(mano_ut, use_ns_init_conf)
+            vnf_config.config_template = self.ping_config(mano_ut,
+                                                          use_ns_init_conf,
+                                                          use_vnf_init_conf)
+            if use_vnf_init_conf:
+                vnf_config.initial_config_primitive.add().from_dict(
+                    {
+                        "seq": 1,
+                        "name": "set ping rate",
+                        "user_defined_script": "ping_set_rate.py",
+                        "parameter": [
+                            {
+                                'name': 'rate',
+                                'value': '5',
+                            },
+                        ],
+                    }
+                )
 
     def ns_config(self, nsd, vnfd_list, mano_ut):
         # Used by scale group
@@ -615,11 +653,14 @@ exit 0
 
 
 
-    def compose(self, vnfd_list, cpgroup_list, mano_ut, use_ns_init_conf=True):
+    def compose(self, vnfd_list, cpgroup_list, mano_ut,
+                use_ns_init_conf=True,
+                use_vnf_init_conf=True,):
 
         if mano_ut:
             # Disable NS initial config primitive
-            use_ns_init_conf=False
+            use_ns_init_conf = False
+            use_vnf_init_conf = False
 
         self.descriptor = RwNsdYang.YangData_Nsd_NsdCatalog()
         self.id = str(uuid.uuid1())
@@ -679,7 +720,7 @@ exit 0
 
                 constituent_vnfd.vnfd_id_ref = vnfd.descriptor.vnfd[0].id
                 self.default_config(constituent_vnfd, vnfd, mano_ut,
-                                    use_ns_init_conf,)
+                                    use_ns_init_conf, use_vnf_init_conf)
                 member_vnf_index += 1
 
         # Enable config primitives if either mano_ut or
@@ -892,6 +933,7 @@ def generate_ping_pong_descriptors(fmt="json",
                                    ex_pong_userdata=None,
                                    use_placement_group=True,
                                    use_ns_init_conf=True,
+                                   use_vnf_init_conf=True,
                                    ):
     # List of connection point groups
     # Each connection point group refers to a virtual link
@@ -902,6 +944,7 @@ def generate_ping_pong_descriptors(fmt="json",
 
     suffix = ''
     ping = VirtualNetworkFunction("ping_vnfd%s" % (suffix), pingcount)
+    ping.use_vnf_init_conf = use_vnf_init_conf
 
     if use_placement_group:
         ### Add group name Eris
@@ -1041,7 +1084,8 @@ def generate_ping_pong_descriptors(fmt="json",
     nsd_catalog.compose(vnfd_list,
                         cpgroup_list,
                         mano_ut,
-                        use_ns_init_conf=use_ns_init_conf,)
+                        use_ns_init_conf=use_ns_init_conf,
+                        use_vnf_init_conf=use_vnf_init_conf,)
 
     if write_to_file:
         ping.write_to_file(out_dir, ping_fmt if ping_fmt is not None else fmt)
diff --git a/examples/ping_pong_ns/rift/mano/examples/ping_set_rate.py b/examples/ping_pong_ns/rift/mano/examples/ping_set_rate.py
new file mode 100644 (file)
index 0000000..54629e8
--- /dev/null
@@ -0,0 +1,108 @@
+#!/usr/bin/env python3
+
+############################################################################
+# Copyright 2016 RIFT.IO Inc                                               #
+#                                                                          #
+# Licensed under the Apache License, Version 2.0 (the "License");          #
+# you may not use this file except in compliance with the License.         #
+# You may obtain a copy of the License at                                  #
+#                                                                          #
+#     http://www.apache.org/licenses/LICENSE-2.0                           #
+#                                                                          #
+# Unless required by applicable law or agreed to in writing, software      #
+# distributed under the License is distributed on an "AS IS" BASIS,        #
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
+# See the License for the specific language governing permissions and      #
+# limitations under the License.                                           #
+############################################################################
+
+
+import argparse
+import logging
+import os
+import subprocess
+import sys
+import time
+
+import yaml
+
+
+def ping_set_rate(yaml_cfg, logger):
+    '''Use curl and set traffic rate on ping vnf'''
+
+    def set_rate(mgmt_ip, port, rate):
+        curl_cmd = '''curl -D /dev/stdout \
+    -H "Accept: application/vnd.yang.data+xml" \
+    -H "Content-Type: application/vnd.yang.data+json" \
+    -X POST \
+    -d "{{ \\"rate\\":{ping_rate} }}" \
+    http://{ping_mgmt_ip}:{ping_mgmt_port}/api/v1/ping/rate
+'''.format(ping_mgmt_ip=mgmt_ip,
+           ping_mgmt_port=port,
+           ping_rate=rate)
+
+        logger.debug("Executing cmd: %s", curl_cmd)
+        subprocess.check_call(curl_cmd, shell=True)
+
+    # Get the ping rate
+    rate = yaml_cfg['parameter']['rate']
+
+    # Set ping rate
+    for index, vnfr in yaml_cfg['vnfr'].items():
+        logger.debug("VNFR {}: {}".format(index, vnfr))
+
+        # Check if it is pong vnf
+        if 'ping_vnfd' in vnfr['name']:
+            vnf_type = 'ping'
+            port = 18888
+            set_rate(vnfr['mgmt_ip_address'], port, rate)
+            break
+
+def main(argv=sys.argv[1:]):
+    try:
+        parser = argparse.ArgumentParser()
+        parser.add_argument("yaml_cfg_file", type=argparse.FileType('r'))
+        parser.add_argument("-q", "--quiet", dest="verbose", action="store_false")
+        args = parser.parse_args()
+
+        run_dir = os.path.join(os.environ['RIFT_INSTALL'], "var/run/rift")
+        if not os.path.exists(run_dir):
+            os.makedirs(run_dir)
+        log_file = "{}/ping_set_rate-{}.log".format(run_dir, time.strftime("%Y%m%d%H%M%S"))
+        logging.basicConfig(filename=log_file, level=logging.DEBUG)
+        logger = logging.getLogger()
+
+    except Exception as e:
+        print("Exception in {}: {}".format(__file__, e))
+        sys.exit(1)
+
+    try:
+        ch = logging.StreamHandler()
+        if args.verbose:
+            ch.setLevel(logging.DEBUG)
+        else:
+            ch.setLevel(logging.INFO)
+
+        # create formatter and add it to the handlers
+        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+        ch.setFormatter(formatter)
+        logger.addHandler(ch)
+
+    except Exception as e:
+        logger.exception(e)
+        raise e
+
+    try:
+        yaml_str = args.yaml_cfg_file.read()
+        # logger.debug("Input YAML file:\n{}".format(yaml_str))
+        yaml_cfg = yaml.load(yaml_str)
+        logger.debug("Input YAML: {}".format(yaml_cfg))
+
+        ping_set_rate(yaml_cfg, logger)
+
+    except Exception as e:
+        logger.exception(e)
+        raise e
+
+if __name__ == "__main__":
+    main()
index ec3d583..b767ea6 100644 (file)
@@ -46,6 +46,13 @@ module mano-types
       enum JSON;
     }
   }
+  typedef package-type {
+      description "Type of descriptor being on-boarded";
+      type enumeration {
+        enum NSD;
+        enum VNFD;
+      }
+    }
 
   typedef parameter-data-type {
     type enumeration {
@@ -140,6 +147,37 @@ module mano-types
       type string;
     } 
   }
+  grouping initial-config {
+    leaf seq {
+      description
+          "Sequence number for the configuration primitive.";
+      type uint64;
+    }
+
+    leaf name {
+      description
+          "Name of the configuration primitive.";
+      type string;
+      mandatory "true";
+    }
+
+    leaf user-defined-script {
+      description
+          "A user defined script.";
+      type string;
+    }
+
+    list parameter {
+      key "name";
+      leaf name {
+        type string;
+      }
+
+      leaf value {
+        type string;
+      }
+    }
+  }
 
   grouping vnf-configuration {
     container vnf-configuration {
@@ -289,30 +327,9 @@ module mano-types
       list initial-config-primitive {
         rwpb:msg-new InitialConfigPrimitive;
         description
-            "Initial set of configuration primitives.";
+          "Initial set of configuration primitives.";
         key "seq";
-        leaf seq {
-          description
-              "Sequence number for the configuration primitive.";
-          type uint64;
-        }
-
-        leaf name {
-          description 
-              "Name of the configuration primitive.";
-          type string;
-        }
-
-        list parameter {
-          key "name";
-          leaf name {
-            type string;
-          }
-
-          leaf value {
-            type string;
-          }
-        }
+        uses initial-config;
       }
 
       leaf config-template {
index aa55f34..2e32c6c 100644 (file)
@@ -124,38 +124,6 @@ module nsd
     }
   }
 
-  grouping ns-initial-config-primitive {
-    leaf seq {
-      description
-          "Sequence number for the configuration primitive.";
-      type uint64;
-    }
-
-    leaf name {
-      description
-          "Name of the configuration primitive.";
-      type string;
-      mandatory "true";
-    }
-
-    leaf user-defined-script {
-      description
-          "A user defined script.";
-      type string;
-    }
-
-    list parameter {
-      key "name";
-      leaf name {
-        type string;
-      }
-
-      leaf value {
-        type string;
-      }
-    }
-  }
-
   grouping nsd-descriptor {
     leaf id {
       description "Identifier for the NSD.";
@@ -902,7 +870,7 @@ module nsd
         "Initial set of configuration primitives for NSD.";
       key "seq";
 
-      uses ns-initial-config-primitive;
+      uses manotypes:initial-config;
     }
 
     list key-pair {
index 4848e9e..caf09b5 100644 (file)
@@ -665,8 +665,8 @@ class ConfigManagerConfig(object):
             self._log.info("NSR(%s/%s) is deleted", nsr_obj.nsr_name, id)
 
     @asyncio.coroutine
-    def process_ns_initial_config(self, nsr_obj):
-        '''Apply the initial-config-primitives specified in NSD'''
+    def process_initial_config(self, nsr_obj, conf, script, vnfr_name=None):
+        '''Apply the initial-config-primitives specified in NSD or VNFD'''
 
         def get_input_file(parameters):
             inp = {}
@@ -674,6 +674,10 @@ class ConfigManagerConfig(object):
             # Add NSR name to file
             inp['nsr_name'] = nsr_obj.nsr_name
 
+            # Add VNFR name if available
+            if vnfr_name:
+                inp['vnfr_name'] = vnfr_name
+
             # TODO (pjoseph): Add config agents, we need to identify which all
             # config agents are required from this NS and provide only those
             inp['config-agent'] = {}
@@ -684,8 +688,12 @@ class ConfigManagerConfig(object):
                 try:
                     inp['parameter'][parameter['name']] = parameter['value']
                 except KeyError as e:
-                    self._log.info("NSR {} initial config parameter {} with no value: {}".
-                                    format(nsr_obj.nsr_name, parameter, e))
+                    if vnfr_name:
+                        self._log.info("VNFR {} initial config parameter {} with no value: {}".
+                                       format(vnfr_name, parameter, e))
+                    else:
+                        self._log.info("NSR {} initial config parameter {} with no value: {}".
+                                       format(nsr_obj.nsr_name, parameter, e))
 
 
             # Add vnfrs specific data
@@ -720,8 +728,9 @@ class ConfigManagerConfig(object):
 
                 inp['vnfr'][vnfr['member_vnf_index_ref']] = v
 
-            self._log.debug("Input data for NSR {}: {}".
-                            format(nsr_obj.nsr_name, inp))
+            self._log.debug("Input data for {}: {}".
+                            format((vnfr_name if vnfr_name else nsr_obj.nsr_name),
+                                   inp))
 
             # Convert to YAML string
             yaml_string = yaml.dump(inp, default_flow_style=False)
@@ -730,91 +739,125 @@ class ConfigManagerConfig(object):
             tmp_file = None
             with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
                 tmp_file.write(yaml_string.encode("UTF-8"))
-            self._log.debug("Input file created for NSR {}: {}".
-                            format(nsr_obj.nsr_name, tmp_file.name))
+            self._log.debug("Input file created for {}: {}".
+                            format((vnfr_name if vnfr_name \
+                                    else nsr_obj.nsr_name),
+                                   tmp_file.name))
 
             return tmp_file.name
 
-        def get_script_file(script_name, nsd_name, nsd_id):
-            # Get the full path to the script
-            script = ''
-            # If script name starts with /, assume it is full path
-            if script_name[0] == '/':
-                # The script has full path, use as is
-                script = script_name
-            else:
-                script = os.path.join(os.environ['RIFT_ARTIFACTS'],
-                                      'launchpad/packages/nsd',
-                                      nsd_id,
-                                      nsd_name,
-                                      'scripts',
-                                      script_name)
-                self._log.debug("Checking for script at %s", script)
-                if not os.path.exists(script):
-                    self._log.debug("Did not find script %s", script)
-                    script = os.path.join(os.environ['RIFT_INSTALL'],
-                                          'usr/bin',
-                                          script_name)
-
-                # Seen cases in jenkins, where the script execution fails
-                # with permission denied. Setting the permission on script
-                # to make sure it has execute permission
-                perm = os.stat(script).st_mode
-                if not (perm  &  stat.S_IXUSR):
-                    self._log.warn("NSR {} initial config script {} " \
-                                  "without execute permission: {}".
-                                  format(nsr_id, script, perm))
-                    os.chmod(script, perm | stat.S_IXUSR)
-                return script
-
-        nsr_id = nsr_obj.nsr_id
-        nsr_name = nsr_obj.nsr_name
-        self._log.debug("Apply initial config for NSR {}({})".
-                        format(nsr_name, nsr_id))
-
-        # Fetch NSR
-        nsr = yield from self.cmdts_obj.get_nsr(nsr_id)
+        parameters = []
+        try:
+            parameters = conf['parameter']
+        except Exception as e:
+            self._log.debug("Parameter conf: {}, e: {}".
+                            format(conf, e))
+
+        inp_file = get_input_file(parameters)
+
+        cmd = "{0} {1}".format(script, inp_file)
+        self._log.debug("Running the CMD: {}".format(cmd))
+
+        process = yield from asyncio.create_subprocess_shell(cmd,
+                                                             loop=self._loop)
+        yield from process.wait()
+
+        if process.returncode:
+            msg = "NSR/VNFR {} initial config using {} failed with {}". \
+                  format(vnfr_name if vnfr_name else nsr_obj.nsr_name,
+                         script, process.returncode)
+            self._log.error(msg)
+            raise InitialConfigError(msg)
+        else:
+            # os.remove(inp_file)
+            pass
+
+    def get_script_file(self, script_name, d_name, d_id, d_type):
+          # Get the full path to the script
+          script = ''
+          # If script name starts with /, assume it is full path
+          if script_name[0] == '/':
+              # The script has full path, use as is
+              script = script_name
+          else:
+              script = os.path.join(os.environ['RIFT_ARTIFACTS'],
+                                    'launchpad/packages',
+                                    d_type,
+                                    d_id,
+                                    d_name,
+                                    'scripts',
+                                    script_name)
+              self._log.debug("Checking for script at %s", script)
+              if not os.path.exists(script):
+                  self._log.debug("Did not find script %s", script)
+                  script = os.path.join(os.environ['RIFT_INSTALL'],
+                                        'usr/bin',
+                                        script_name)
+
+              # Seen cases in jenkins, where the script execution fails
+              # with permission denied. Setting the permission on script
+              # to make sure it has execute permission
+              perm = os.stat(script).st_mode
+              if not (perm  &  stat.S_IXUSR):
+                  self._log.warn("NSR/VNFR {} initial config script {} " \
+                                "without execute permission: {}".
+                                format(d_name, script, perm))
+                  os.chmod(script, perm | stat.S_IXUSR)
+              return script
+
+    @asyncio.coroutine
+    def process_ns_initial_config(self, nsr_obj):
+        '''Apply the initial-config-primitives specified in NSD'''
+
+        nsr = yield from self.cmdts_obj.get_nsr(nsr_obj.nsr_id)
+        if 'initial_config_primitive' not in nsr:
+            return
+
         if nsr is not None:
-            nsd = yield from self.cmdts_obj.get_nsd(nsr_id)
+            nsd = yield from self.cmdts_obj.get_nsd(nsr_obj.nsr_id)
+            for conf in nsr['initial_config_primitive']:
+                self._log.debug("NSR {} initial config: {}".
+                                format(nsr_obj.nsr_name, conf))
+                script = self.get_script_file(conf['user_defined_script'],
+                                              nsd.name,
+                                              nsd.id,
+                                              'nsd')
 
-            try:
-                # Check if initial config is present
-                # TODO (pjoseph): Sort based on seq
-                for conf in nsr['initial_config_primitive']:
-                    self._log.debug("Parameter conf: {}".
-                                    format(conf))
+                yield from self.process_initial_config(nsr_obj, conf, script)
 
-                    parameters = []
-                    try:
-                        parameters = conf['parameter']
-                    except Exception as e:
-                        self._log.debug("Parameter conf: {}, e: {}".
-                                        format(conf, e))
-                        pass
-
-                    inp_file = get_input_file(parameters)
-
-                    script = get_script_file(conf['user_defined_script'],
-                                             nsd.name,
-                                             nsd.id)
-
-                    cmd = "{0} {1}".format(script, inp_file)
-                    self._log.debug("Running the CMD: {}".format(cmd))
-
-                    process = yield from asyncio. \
-                              create_subprocess_shell(cmd, loop=self._loop)
-                    yield from process.wait()
-                    if process.returncode:
-                        msg = "NSR {} initial config using {} failed with {}". \
-                              format(nsr_name, script, process.returncode)
-                        self._log.error(msg)
-                        raise InitialConfigError(msg)
-                    else:
-                        os.remove(inp_file)
+    @asyncio.coroutine
+    def process_vnf_initial_config(self, nsr_obj, vnfr):
+        '''Apply the initial-config-primitives specified in VNFD'''
+
+        vnfr_name = vnfr.name
 
-            except KeyError as e:
-                self._log.debug("Did not find initial config {}".
-                                format(e))
+        vnfd = yield from self.cmdts_obj.get_vnfd(vnfr.vnfd_ref)
+        if vnfd is None:
+            msg = "VNFR {}, unable to get VNFD {}". \
+                  format(vnfr_name, vnfr.vnfd_ref)
+            self._log.error(msg)
+            raise InitialConfigError(msg)
+
+        vnf_cfg = vnfd.vnf_configuration
+
+        for conf in vnf_cfg.initial_config_primitive:
+                self._log.debug("VNFR {} initial config: {}".
+                                format(vnfr_name, conf))
+
+                if not conf.user_defined_script:
+                    self._log.debug("VNFR {} did not fine user defined script: {}".
+                                    format(vnfr_name, conf))
+                    continue
+
+                script = self.get_script_file(conf.user_defined_script,
+                                              vnfd.id,
+                                              vnfd.name,
+                                              'vnfd')
+
+                yield from self.process_initial_config(nsr_obj,
+                                                       conf.as_dict(),
+                                                       script,
+                                                       vnfr_name=vnfr_name)
 
 
 class ConfigManagerNSR(object):
@@ -1292,6 +1335,11 @@ class XPaths(object):
         return ("D,/vnfr:vnfr-catalog/vnfr:vnfr" +
                 ("[vnfr:id='{}']".format(k) if k is not None else ""))
 
+    @staticmethod
+    def vnfd(k=None):
+        return ("C,/vnfd:vnfd-catalog/vnfd:vnfd" +
+                ("[vnfd:id='{}']".format(k) if k is not None else ""))
+
     @staticmethod
     def config_agent(k=None):
         return ("D,/rw-config-agent:config-agent/rw-config-agent:account" +
@@ -1377,6 +1425,15 @@ class ConfigManagerDTS(object):
             vnfr_msg = vnfrl[0]
         return vnfr_msg
 
+    @asyncio.coroutine
+    def get_vnfd(self, vnfd_id):
+        self._log.debug("Attempting to get VNFD: %s", vnfd_id)
+        vnfdl = yield from self._read_dts(XPaths.vnfd(vnfd_id), do_trace=False)
+        vnfd_msg = None
+        if len(vnfdl) > 0:
+            vnfd_msg = vnfdl[0]
+        return vnfd_msg
+
     @asyncio.coroutine
     def get_vlr(self, id):
         self._log.debug("Attempting to get VLR subnet: %s", id)
index 7ea73c4..4e92b6c 100755 (executable)
@@ -146,7 +146,20 @@ class ConfigurationManager(object):
                                                  done))
 
             if done:
-                yield from self.update_vnf_state(vnf_cfg, conmanY.RecordState.READY)
+                self._log.warn("Apply initial config on VNFR {}".
+                                format(log_this_vnf(vnf_cfg)))
+                try:
+                    yield from nsr_obj.parent.process_vnf_initial_config(
+                        nsr_obj,
+                        agent_vnfr.vnfr_msg)
+                    yield from self.update_vnf_state(vnf_cfg,
+                                                     conmanY.RecordState.READY)
+
+                except Exception as e:
+                    nsr_obj.vnf_failed = True
+                    self._log.exception(e)
+                    yield from self.update_vnf_state(vnf_cfg,
+                                                     conmanY.RecordState.CFG_FAILED)
 
             else:
                 # Check to see if the VNF configure failed
@@ -163,6 +176,7 @@ class ConfigurationManager(object):
                     self._log.error("Failed to apply configuration for VNF = {}"
                                     .format(log_this_vnf(vnf_cfg)))
 
+
             return done
 
         @asyncio.coroutine
index dfc3ce0..8b50276 100644 (file)
@@ -28,6 +28,8 @@ set(subdirs
   rwmonitor
   rwmonparam
   rwnsm
+  rwpkgmgr
+  rwstagingmgr
   rwresmgr
   rwvnfm
   rwvns
@@ -35,6 +37,8 @@ set(subdirs
   yang
   )
 
+
+
 ##
 # Include the subdirs
 ##
index c64cff6..4a2741d 100644 (file)
@@ -20,6 +20,8 @@ rw-dtsperf
 rw-dtsperfmgr
 rw-launchpad
 rw-image-mgmt
+rw-pkg-mgmt
+rw-staging-mgmt
 rw-log
 rwlog-mgmt
 rw-manifest
index b1f6a7f..34463ef 100644 (file)
@@ -55,6 +55,7 @@ rift_python_install_tree(
     rift/package/checksums.py
     rift/package/config.py
     rift/package/convert.py
+    rift/package/handler.py
     rift/package/icon.py
     rift/package/image.py
     rift/package/package.py
diff --git a/rwlaunchpad/plugins/rwlaunchpadtasklet/rift/package/handler.py b/rwlaunchpad/plugins/rwlaunchpadtasklet/rift/package/handler.py
new file mode 100644 (file)
index 0000000..4c000cd
--- /dev/null
@@ -0,0 +1,170 @@
+
+#
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+import os
+
+import tornado
+import tornado.web
+import tornado.gen
+
+
+class File:
+    """Convenience class that represents the file
+    """
+    def __init__(self, root_dir, path):
+        self.path = path
+        self.root_dir = root_dir
+        self._meta = None
+
+    @property
+    def relative_path(self):
+        return os.path.relpath(self.path, start=self.root_dir)
+
+    @property
+    def meta(self):
+        """Fetch the meta data for the file.
+        """
+        if not self._meta:
+            self._meta = os.stat(self.path)
+        return self._meta
+
+    def serialize(self):
+        """Converts the object to dict that can be exposed via rest.
+        """
+        data = {}
+        data['name'] = self.relative_path
+        data['last_modified_time'] = self.meta.st_mtime
+        data['byte_size'] = self.meta.st_size
+        return data
+
+class Folder(File):
+    """
+    Convenience class that represents the folder.
+    """
+    def __init__(self, root_dir, path):
+        super().__init__(root_dir, path)
+        self.contents = []
+
+    def serialize(self):
+        """Converts the object to dict that can be exposed via rest.
+        """
+        data = super().serialize()
+        data['contents'] = []
+        for node in self.contents:
+            data['contents'].append(node.serialize())
+        return data
+
+
+class FileRestApiHandler(tornado.web.StaticFileHandler):
+    """Requesthandler class that extends StaticFileHandler. Difference being
+    GETS are now handled at folder level as well and for files we default to
+    the StaticFileHandler
+
+    for the following directory structure
+    Foo
+    |
+     --> bar.py
+
+    <URL>/Foo
+    will generate the list of all files in the directory!
+
+    <URL>/Foo./bar.py
+    will download the file.
+
+    """
+
+    def validate_absolute_path(self, root, absolute_path):
+        """Override the method to disable path validation for directory.
+        """
+        root = os.path.abspath(root)
+        if not root.endswith(os.path.sep):
+            root += os.path.sep
+
+        if not (absolute_path + os.path.sep).startswith(root):
+            raise tornado.web.HTTPError(403, "%s is not in root static directory",
+                            self.path)
+        if (os.path.isdir(absolute_path) and
+                self.default_filename is not None):
+            if not self.request.path.endswith("/"):
+                self.redirect(self.request.path + "/", permanent=True)
+                return
+
+            absolute_path = os.path.join(absolute_path, self.default_filename)
+        if not os.path.exists(absolute_path):
+            raise tornado.web.HTTPError(404)
+
+        return absolute_path
+
+    @classmethod
+    def _get_cached_version(cls, abs_path):
+        """Overridden method to disable caching for folder.
+        """
+        if os.path.isdir(abs_path):
+            return None
+
+        return super()._get_cached_version(abs_path)
+
+    @tornado.gen.coroutine
+    def get(self, path, include_body=True):
+        """Override the get method to support both file and folder handling
+        File handling will be handled by StaticFileHandler
+        Folder handling will be done by the derived class.
+        """
+        self.path = self.parse_url_path(path)
+        del path  # make sure we don't refer to path instead of self.path again
+        absolute_path = self.get_absolute_path(self.root, self.path)
+
+        self.absolute_path = self.validate_absolute_path(
+            self.root, absolute_path)
+
+        if self.absolute_path is None:
+            return
+
+        if os.path.isfile(absolute_path):
+            super().get(absolute_path)
+            return
+
+        # More meaningful!
+        root_dir = absolute_path
+
+        if not os.path.exists(root_dir):
+            raise tornado.web.HTTPError(404, "File/Folder not found")
+
+        folder_cache = {}
+        for root, dirs, files in os.walk(root_dir):
+            folder = folder_cache.setdefault(
+                root,
+                Folder(root_dir, root))
+
+            # Files
+            for file in files:
+                 file_path = os.path.join(root, file)
+                 folder.contents.append(
+                        File(root_dir, file_path))
+
+            # Sub folders
+            for dir_name in dirs:
+                dir_path = os.path.join(root, dir_name)
+                sub_folder = folder_cache.setdefault(
+                        dir_path,
+                        Folder(root_dir, dir_path))
+
+                folder.contents.append(sub_folder)
+
+        # Return the root object!
+        structure = folder_cache[root_dir].serialize()
+        self.write(tornado.escape.json_encode(structure))
index 45d8ba8..3ce3500 100644 (file)
@@ -1,5 +1,5 @@
 
-# 
+#
 #   Copyright 2016 RIFT.IO Inc
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
@@ -42,6 +42,10 @@ class PackageValidationError(Exception):
     pass
 
 
+class PackageAppendError(Exception):
+    pass
+
+
 class PackageFileChecksumError(PackageValidationError):
     def __init__(self, filename):
         self.filename = filename
@@ -333,6 +337,36 @@ class DescriptorPackage(object):
                     # Set the file mode to original
                     os.chmod(dest_file_path, self._package_file_mode_map[filename])
 
+    def insert_file(self, new_file, dest_file, rel_path, mode=0o777):
+        self.add_file(rel_path, mode)
+
+        try:
+            # Copy the contents of the file to the correct path
+            dest_dir_path = os.path.dirname(dest_file)
+            if not os.path.isdir(dest_dir_path):
+                os.makedirs(dest_dir_path)
+
+            with open(dest_file, 'wb') as dst_hdl:
+                with open(new_file, 'rb') as src_hdl:
+                    shutil.copyfileobj(src_hdl, dst_hdl, 10 * 1024 * 1024)
+
+                    # Set the file mode to original
+                    os.chmod(dest_file, self._package_file_mode_map[rel_path])
+        except Exception as e:
+            # Clear the file when an exception happens
+            if os.path.isfile(dest_file):
+                os.remove(dest_file)
+
+            raise PackageAppendError(str(e))
+
+    def delete_file(self, dest_file, rel_path):
+        self.remove_file(rel_path)
+
+        try:
+            os.remove(dest_file)
+        except Exception as e:
+            raise PackageAppendError(str(e))
+
     def extract_file(self, src_file, dest_file):
         """ Extract a specific package file to dest_file
 
@@ -428,6 +462,15 @@ class DescriptorPackage(object):
 
         self._package_file_mode_map[rel_path] = mode
 
+    def remove_file(self, rel_path):
+        if not rel_path:
+            raise PackageError("Empty file name added")
+
+        if rel_path not in self._package_file_mode_map:
+            raise PackageError("File %s does not in package" % rel_path)
+
+        del self._package_file_mode_map[rel_path]
+
     def add_dir(self, rel_path):
         """ Add a directory to the package
 
index 2436993..34fff1c 100644 (file)
@@ -49,6 +49,11 @@ class PackageFilesystemStore(object):
 
         self.refresh()
 
+    @property
+    def root_dir(self):
+        return self._root_dir
+    
+
     def _get_package_dir(self, package_id):
         return os.path.join(self._root_dir, package_id)
 
index 4256765..7fa6130 100644 (file)
@@ -37,11 +37,16 @@ from . import tosca
 import gi
 gi.require_version('NsdYang', '1.0')
 gi.require_version('VnfdYang', '1.0')
+gi.require_version('RwPkgMgmtYang', '1.0')
 
 from gi.repository import (
         NsdYang,
         VnfdYang,
-        )
+        RwPkgMgmtYang)
+import rift.mano.dts as mano_dts
+
+
+RPC_PACKAGE_EXPORT_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_PackageExport
 
 
 class ExportStart(message.StatusMessage):
@@ -189,80 +194,84 @@ class DescriptorPackageArchiveExporter(object):
         return archive_path
 
 
-class ExportHandler(tornado.web.RequestHandler):
-    def options(self, *args, **kargs):
-        pass
-
-    def set_default_headers(self):
-        self.set_header('Access-Control-Allow-Origin', '*')
-        self.set_header('Access-Control-Allow-Headers',
-                        'Content-Type, Cache-Control, Accept, X-Requested-With, Authorization')
-        self.set_header('Access-Control-Allow-Methods', 'POST, GET, PUT, DELETE')
+class ExportRpcHandler(mano_dts.AbstractRpcHandler):
+    def __init__(self, log, dts, loop, application, store_map, exporter, catalog_map):
+        """
+        Args:
+            application: UploaderApplication
+            store_map: dict containing VnfdStore & NsdStore
+            exporter : DescriptorPackageArchiveExporter
+            calalog_map: Dict containing Vnfds and Nsd onboarding.
+        """
+        super().__init__(log, dts, loop)
 
-    def initialize(self, log, loop, store_map, exporter, catalog_map):
-        self.loop = loop
-        self.transaction_id = str(uuid.uuid4())
-        self.log = message.Logger(
-                log,
-                self.application.messages[self.transaction_id],
-                )
+        self.application = application
         self.store_map = store_map
         self.exporter = exporter
         self.catalog_map = catalog_map
+        self.log = log
+
+    @property
+    def xpath(self):
+        return "/rw-pkg-mgmt:package-export"
+
+    @asyncio.coroutine
+    def callback(self, ks_path, msg):
+        transaction_id = str(uuid.uuid4())
+        log = message.Logger(
+                self.log,
+                self.application.messages[transaction_id],
+                )
 
-    def get(self, desc_type):
-        if desc_type not in self.catalog_map:
-            raise tornado.web.HTTPError(400, "unknown descriptor type: {}".format(desc_type))
+        file_name = self.export(transaction_id, log, msg)
 
-        self.log.message(ExportStart())
+        rpc_out = RPC_PACKAGE_EXPORT_ENDPOINT.from_dict({
+            'transaction_id': transaction_id,
+            'filename': file_name})
 
-        # Parse the IDs
-        ids_query = self.get_query_argument("ids")
-        ids = [id.strip() for id in ids_query.split(',')]
-        if len(ids) != 1:
-            raise message.MessageException(ExportSingleDescriptorOnlyError)
-        desc_id = ids[0]
+        return rpc_out
 
+    def export(self, transaction_id, log, msg):
+        log.message(ExportStart())
+        desc_type = msg.package_type.lower()
+
+        if desc_type not in self.catalog_map:
+            raise ValueError("Invalid package type: {}".format(desc_type))
+
+        # Parse the IDs
+        desc_id = msg.package_id
         catalog = self.catalog_map[desc_type]
 
         if desc_id not in catalog:
-            raise tornado.web.HTTPError(400, "unknown descriptor id: {}".format(desc_id))
+            raise ValueError("Unable to find package ID: {}".format(desc_id))
 
         desc_msg = catalog[desc_id]
 
         # Get the schema for exporting
-        schema = self.get_argument("schema", default="rift")
+        schema = msg.export_schema.lower()
 
         # Get the grammar for exporting
-        grammar = self.get_argument("grammar", default="osm")
+        grammar = msg.export_grammar.lower()
 
         # Get the format for exporting
-        format_ = self.get_argument("format", default="yaml")
+        format_ = msg.export_format.lower()
 
         filename = None
 
         if grammar == 'tosca':
-            filename = "{}.zip".format(self.transaction_id)
-            self.export_tosca(schema, format_, desc_type, desc_id, desc_msg)
-            self.log.message(message.FilenameMessage(filename))
+            filename = "{}.zip".format(transaction_id)
+            self.export_tosca(schema, format_, desc_type, desc_id, desc_msg, log, transaction_id)
+            log.message(message.FilenameMessage(filename))
         else:
-            filename = "{}.tar.gz".format(self.transaction_id)
-            self.export_rift(schema, format_, desc_type, desc_id, desc_msg)
-            self.log.message(message.FilenameMessage(filename))
+            filename = "{}.tar.gz".format(transaction_id)
+            self.export_rift(schema, format_, desc_type, desc_id, desc_msg, log, transaction_id)
+            log.message(message.FilenameMessage(filename))
 
-        self.log.message(ExportSuccess())
+        log.message(ExportSuccess())
 
-        if filename is not None:
-            self.write(tornado.escape.json_encode({
-                "transaction_id": self.transaction_id,
-                "filename": filename,
-            }))
-        else:
-            self.write(tornado.escape.json_encode({
-                "transaction_id": self.transaction_id,
-            }))
+        return filename
 
-    def export_rift(self, schema, format_, desc_type, desc_id, desc_msg):
+    def export_rift(self, schema, format_, desc_type, desc_id, desc_msg, log, transaction_id):
         convert = rift.package.convert
         schema_serializer_map = {
                 "rift": {
@@ -279,7 +288,7 @@ class ExportHandler(tornado.web.RequestHandler):
             raise tornado.web.HTTPError(400, "unknown schema: {}".format(schema))
 
         if format_ != "yaml":
-            self.log.warn("Only yaml format supported for export")
+            log.warn("Only yaml format supported for export")
 
         if desc_type not in schema_serializer_map[schema]:
             raise tornado.web.HTTPError(400, "unknown descriptor type: {}".format(desc_type))
@@ -296,26 +305,26 @@ class ExportHandler(tornado.web.RequestHandler):
         try:
             package = package_store.get_package(desc_id)
         except rift.package.store.PackageNotFoundError:
-            self.log.debug("stored package not found.  creating package from descriptor config")
+            log.debug("stored package not found.  creating package from descriptor config")
 
             desc_yaml_str = src_serializer.to_yaml_string(desc_msg)
             with io.BytesIO(desc_yaml_str.encode()) as hdl:
                 hdl.name = "{}__{}.yaml".format(desc_msg.id, desc_type)
                 package = rift.package.package.DescriptorPackage.from_descriptor_file_hdl(
-                    self.log, hdl
+                    log, hdl
                     )
 
         self.exporter.export_package(
                 package=package,
                 export_dir=self.application.export_dir,
-                file_id=self.transaction_id,
+                file_id=transaction_id,
                 json_desc_str=src_serializer.to_json_string(desc_msg),
                 dest_serializer=dest_serializer,
                 )
 
-    def export_tosca(self, format_, schema, desc_type, desc_id, desc_msg):
+    def export_tosca(self, format_, schema, desc_type, desc_id, desc_msg, log, transaction_id):
         if format_ != "yaml":
-            self.log.warn("Only yaml format supported for TOSCA export")
+            log.warn("Only yaml format supported for TOSCA export")
 
         if desc_type != "nsd":
             raise tornado.web.HTTPError(
@@ -330,9 +339,9 @@ class ExportHandler(tornado.web.RequestHandler):
                 package = package_store.get_package(id_)
 
             except rift.package.store.PackageNotFoundError:
-                self.log.debug("stored package not found for {}.".format(id_))
+                log.debug("stored package not found for {}.".format(id_))
             except rift.package.store.PackageStoreError:
-                self.log.debug("stored package error for {}.".format(id_))
+                log.debug("stored package error for {}.".format(id_))
 
             return package
 
@@ -355,7 +364,7 @@ class ExportHandler(tornado.web.RequestHandler):
                     format(vnfd_id, nsd_id))
 
         # Create the archive.
-        pkg.create_archive(self.transaction_id,
+        pkg.create_archive(transaction_id,
                            dest=self.application.export_dir)
 
 
index ce30981..b18e304 100644 (file)
@@ -44,7 +44,7 @@ class ImageUploader(object):
         self._client = client.UploadJobClient(self._log, self._loop, self._dts)
 
     def upload_image(self, image_name, image_checksum, image_hdl):
-        endpoint = "http://127.0.0.1:9999"
+        endpoint = "http://127.0.0.1:9292"
         glance_client = glanceclient.Client('1', endpoint, token="asdf")
 
         try:
index a1827eb..0ab6564 100644 (file)
@@ -1,5 +1,5 @@
 
-# 
+#
 #   Copyright 2016 RIFT.IO Inc
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
@@ -139,6 +139,15 @@ class Logger(object):
         return getattr(self._rift_logger, name)
 
 
+class DownloadError(ErrorMessage):
+    def __init__(self, msg):
+        super().__init__("Download-error", msg)
+
+
+class DownloadSuccess(StatusMessage):
+    def __init__(self, msg):
+        super().__init__("Download-Successful.", msg)
+
 
 class OnboardError(ErrorMessage):
     def __init__(self, msg):
index 9bcb2d3..0eff616 100644 (file)
@@ -1,5 +1,5 @@
 
-# 
+#
 #   Copyright 2016 RIFT.IO Inc
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
@@ -371,7 +371,8 @@ class LaunchpadTasklet(rift.tasklets.Tasklet):
     @asyncio.coroutine
     def init(self):
         io_loop = rift.tasklets.tornado.TaskletAsyncIOLoop(asyncio_loop=self.loop)
-        self.app = uploader.UploaderApplication(self)
+        self.app = uploader.UploaderApplication.from_tasklet(self)
+        yield from self.app.register()
 
         manifest = self.tasklet_info.get_pb_manifest()
         ssl_cert = manifest.bootstrap_phase.rwsecurity.cert
index 081c1f5..c8fc3fc 100644 (file)
@@ -1,5 +1,5 @@
 
-# 
+#
 #   Copyright 2016 RIFT.IO Inc
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 #
-
+import abc
+import asyncio
 import collections
 import os
+import tempfile
 import threading
 import uuid
 import zlib
@@ -49,11 +51,19 @@ import rift.package.charm
 import rift.package.checksums
 import rift.package.config
 import rift.package.convert
+import rift.package.handler as pkg_handler
 import rift.package.icon
 import rift.package.package
 import rift.package.script
 import rift.package.store
 
+from gi.repository import (
+   RwDts as rwdts,
+   RwPkgMgmtYang)
+import rift.downloader as downloader
+import rift.mano.dts as mano_dts
+import rift.tasklets
+
 from . import (
         export,
         extract,
@@ -90,6 +100,8 @@ from .message import (
         OnboardStart,
         OnboardSuccess,
 
+        DownloadError,
+        DownloadSuccess,
 
         # Update Error Messages
         UpdateChecksumMismatch,
@@ -121,6 +133,11 @@ GB = 1024 * MB
 
 MAX_STREAMED_SIZE = 5 * GB
 
+# Shortcuts
+RPC_PACKAGE_CREATE_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_PackageCreate
+RPC_PACKAGE_UPDATE_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_PackageUpdate
+
+
 
 class HttpMessageError(Exception):
     def __init__(self, code, msg):
@@ -128,296 +145,76 @@ class HttpMessageError(Exception):
         self.msg = msg
 
 
-class GzipTemporaryFileStreamedPart(multipart_streamer.TemporaryFileStreamedPart):
-    def __init__(self, *args, **kwargs):
-        super().__init__(*args, **kwargs)
-
-        # Create a decompressor for gzip data to decompress on the fly during upload
-        # http://stackoverflow.com/questions/2423866/python-decompressing-gzip-chunk-by-chunk
-        self._decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS)
-
-    def feed(self, data):
-        decompressed_data = self._decompressor.decompress(data)
-        if decompressed_data:
-            super().feed(decompressed_data)
-
-    def finalize(self):
-        # All data has arrived, flush the decompressor to get any last decompressed data
-        decompressed_data = self._decompressor.flush()
-        super().feed(decompressed_data)
-        super().finalize()
-
-
-class GzipMultiPartStreamer(multipart_streamer.MultiPartStreamer):
-    """ This Multipart Streamer decompresses gzip files on the fly during multipart upload """
-
-    @staticmethod
-    def _get_descriptor_name_from_headers(headers):
-        descriptor_filename = None
-
-        for entry in headers:
-            if entry["value"] != "form-data":
-                continue
-
-            form_data_params = entry["params"]
-            if "name" in form_data_params:
-                if form_data_params["name"] != "descriptor":
-                    continue
-
-                if "filename" not in form_data_params:
-                    continue
-
-                descriptor_filename = form_data_params["filename"]
-
-        return descriptor_filename
-
-    def create_part(self, headers):
-        """ Create the StreamedPart subclass depending on the descriptor filename
-
-        For gzipped descriptor packages, create a GzipTemporaryFileStreamedPart which
-        can decompress the gzip while it's being streamed into the launchpad directely
-        into a file.
-
-        Returns:
-            The descriptor filename
+class UploadRpcHandler(mano_dts.AbstractRpcHandler):
+    def __init__(self, log, dts, loop, application):
         """
-        filename = GzipMultiPartStreamer._get_descriptor_name_from_headers(headers)
-        if filename is None or not filename.endswith(".gz"):
-            return multipart_streamer.TemporaryFileStreamedPart(self, headers)
-
-        return GzipTemporaryFileStreamedPart(self, headers)
-
-
-class RequestHandler(tornado.web.RequestHandler):
-    def options(self, *args, **kargs):
-        pass
-
-    def set_default_headers(self):
-        self.set_header('Access-Control-Allow-Origin', '*')
-        self.set_header('Access-Control-Allow-Headers',
-                        'Content-Type, Cache-Control, Accept, X-Requested-With, Authorization')
-        self.set_header('Access-Control-Allow-Methods', 'POST, GET, PUT, DELETE')
-
-
-@tornado.web.stream_request_body
-class StreamingUploadHandler(RequestHandler):
-    def initialize(self, log, loop):
-        """Initialize the handler
-
-        Arguments:
-            log  - the logger that this handler should use
-            loop - the tasklets ioloop
-
+        Args:
+            application: UploaderApplication
         """
-        self.transaction_id = str(uuid.uuid4())
-
-        self.loop = loop
-        self.log = self.application.get_logger(self.transaction_id)
-
-        self.part_streamer = None
-
-        self.log.debug('created handler (transaction_id = {})'.format(self.transaction_id))
-
-    def msg_missing_content_type(self):
-        raise NotImplementedError()
+        super().__init__(log, dts, loop)
+        self.application = application
 
-    def msg_unsupported_media_type(self):
-        raise NotImplementedError()
-
-    def msg_missing_content_boundary(self):
-        raise NotImplementedError()
-
-    def msg_start(self):
-        raise NotImplementedError()
-
-    def msg_success(self):
-        raise NotImplementedError()
-
-    def msg_failure(self):
-        raise NotImplementedError()
-
-    def msg_package_upload(self):
-        raise NotImplementedError()
-
-    @tornado.gen.coroutine
-    def prepare(self):
-        """Prepare the handler for a request
-
-        The prepare function is the first part of a request transaction. It
-        creates a temporary file that uploaded data can be written to.
-
-        """
-        if self.request.method != "POST":
-            return
-
-        self.request.connection.set_max_body_size(MAX_STREAMED_SIZE)
-
-        self.log.message(self.msg_start())
-
-        try:
-            # Retrieve the content type and parameters from the request
-            content_type = self.request.headers.get('content-type', None)
-            if content_type is None:
-                raise HttpMessageError(400, self.msg_missing_content_type())
-
-            content_type, params = tornado.httputil._parse_header(content_type)
-
-            if "multipart/form-data" != content_type.lower():
-                raise HttpMessageError(415, self.msg_unsupported_media_type())
-
-            if "boundary" not in params:
-                raise HttpMessageError(400, self.msg_missing_content_boundary())
-
-            # You can get the total request size from the headers.
-            try:
-                total = int(self.request.headers.get("Content-Length", "0"))
-            except KeyError:
-                self.log.warning("Content length header not found")
-                # For any well formed browser request, Content-Length should have a value.
-                total = 0
-
-            # And here you create a streamer that will accept incoming data
-            self.part_streamer = GzipMultiPartStreamer(total)
-
-        except HttpMessageError as e:
-            self.log.message(e.msg)
-            self.log.message(self.msg_failure())
-
-            raise tornado.web.HTTPError(e.code, e.msg.name)
+    @property
+    def xpath(self):
+        return "/rw-pkg-mgmt:package-create"
 
-        except Exception as e:
-            self.log.exception(e)
-            self.log.message(self.msg_failure())
+    @asyncio.coroutine
+    def callback(self, ks_path, msg):
+        transaction_id = str(uuid.uuid4())
+        log = self.application.get_logger(transaction_id)
+        log.message(OnboardStart())
 
-    @tornado.gen.coroutine
-    def data_received(self, chunk):
-        """Write data to the current file
 
-        Arguments:
-            data - a chunk of data to write to file
+        auth = None
+        if msg.username is not None:
+            auth = (msg.username, msg.password)
 
-        """
+        self.application.onboard(
+                msg.external_url,
+                transaction_id,
+                auth=auth
+                )
 
-        """When a chunk of data is received, we forward it to the multipart streamer."""
-        self.part_streamer.data_received(chunk)
+        rpc_op = RPC_PACKAGE_CREATE_ENDPOINT.from_dict({
+                "transaction_id": transaction_id})
 
-    def post(self):
-        """Handle a post request
+        return rpc_op
 
-        The function is called after any data associated with the body of the
-        request has been received.
 
+class UpdateRpcHandler(mano_dts.AbstractRpcHandler):
+    def __init__(self, log, dts, loop, application):
         """
-        # You MUST call this to close the incoming stream.
-        self.part_streamer.data_complete()
-
-        desc_parts = self.part_streamer.get_parts_by_name("descriptor")
-        if len(desc_parts) != 1:
-            raise HttpMessageError(400, OnboardError("Descriptor option not found"))
-
-        self.log.message(self.msg_package_upload())
-
-
-class UploadHandler(StreamingUploadHandler):
-    """
-    This handler is used to upload archives that contain VNFDs, NSDs, and PNFDs
-    to the launchpad. This is a streaming handler that writes uploaded archives
-    to disk without loading them all into memory.
-    """
-
-    def msg_missing_content_type(self):
-        return OnboardMissingContentType()
-
-    def msg_unsupported_media_type(self):
-        return OnboardUnsupportedMediaType()
-
-    def msg_missing_content_boundary(self):
-        return OnboardMissingContentBoundary()
-
-    def msg_start(self):
-        return OnboardStart()
-
-    def msg_success(self):
-        return OnboardSuccess()
-
-    def msg_failure(self):
-        return OnboardFailure()
-
-    def msg_package_upload(self):
-        return OnboardPackageUpload()
-
-    def post(self):
-        """Handle a post request
-
-        The function is called after any data associated with the body of the
-        request has been received.
-
+        Args:
+            application: UploaderApplication
         """
-        try:
-            super().post()
-            self.application.onboard(
-                    self.part_streamer,
-                    self.transaction_id,
-                    auth=self.request.headers.get('authorization', None),
-                    )
-
-            self.set_status(200)
-            self.write(tornado.escape.json_encode({
-                "transaction_id": self.transaction_id,
-                    }))
-
-        except Exception:
-            self.log.exception("Upload POST failed")
-            self.part_streamer.release_parts()
-            raise
-
-
-class UpdateHandler(StreamingUploadHandler):
-    def msg_missing_content_type(self):
-        return UpdateMissingContentType()
-
-    def msg_unsupported_media_type(self):
-        return UpdateUnsupportedMediaType()
-
-    def msg_missing_content_boundary(self):
-        return UpdateMissingContentBoundary()
+        super().__init__(log, dts, loop)
+        self.application = application
 
-    def msg_start(self):
-        return UpdateStart()
-
-    def msg_success(self):
-        return UpdateSuccess()
-
-    def msg_failure(self):
-        return UpdateFailure()
+    @property
+    def xpath(self):
+        return "/rw-pkg-mgmt:package-update"
 
-    def msg_package_upload(self):
-        return UpdatePackageUpload()
+    @asyncio.coroutine
+    def callback(self, ks_path, msg):
 
-    def post(self):
-        """Handle a post request
+        transaction_id = str(uuid.uuid4())
+        log = self.application.get_logger(transaction_id)
+        log.message(UpdateStart())
 
-        The function is called after any data associated with the body of the
-        request has been received.
+        auth = None
+        if msg.username is not None:
+            auth = (msg.username, msg.password)
 
-        """
-        try:
-            super().post()
+        self.application.update(
+                msg.external_url,
+                transaction_id,
+                auth=auth
+                )
 
-            self.application.update(
-                    self.part_streamer,
-                    self.transaction_id,
-                    auth=self.request.headers.get('authorization', None),
-                    )
+        rpc_op = RPC_PACKAGE_UPDATE_ENDPOINT.from_dict({
+                "transaction_id": transaction_id})
 
-            self.set_status(200)
-            self.write(tornado.escape.json_encode({
-                "transaction_id": self.transaction_id,
-                    }))
-        except Exception:
-            self.log.exception("Upload POST failed")
-            self.part_streamer.release_parts()
-            raise
+        return rpc_op
 
 
 class UploadStateHandler(state.StateHandler):
@@ -432,24 +229,25 @@ class UpdateStateHandler(state.StateHandler):
     FAILURE = UpdateFailure
 
 
-class UpdatePackage(threading.Thread):
-    def __init__(self, log, loop, part_streamer, auth,
+class UpdatePackage(downloader.DownloaderProtocol):
+
+    def __init__(self, log, loop, url, auth,
                  onboarder, uploader, package_store_map):
         super().__init__()
         self.log = log
         self.loop = loop
-        self.part_streamer = part_streamer
+        self.url = url
         self.auth = auth
         self.onboarder = onboarder
         self.uploader = uploader
         self.package_store_map = package_store_map
 
-        self.io_loop = tornado.ioloop.IOLoop.current()
 
-    def _update_package(self):
+    def _update_package(self, packages):
+
         # Extract package could return multiple packages if
         # the package is converted
-        for pkg in self.extract_package():
+        for pkg in packages:
             with pkg as temp_package:
                 package_checksums = self.validate_package(temp_package)
                 stored_package = self.update_package(temp_package)
@@ -469,9 +267,9 @@ class UpdatePackage(threading.Thread):
                 else:
                     self.upload_images(temp_package, package_checksums)
 
-    def run(self):
+    def extract(self, packages):
         try:
-            self._update_package()
+            self._update_package(packages)
             self.log.message(UpdateSuccess())
 
         except MessageException as e:
@@ -484,23 +282,32 @@ class UpdatePackage(threading.Thread):
                 self.log.message(UpdateError(str(e)))
             self.log.message(UpdateFailure())
 
-    def extract_package(self):
-        """Extract multipart message from tarball"""
-        desc_part = self.part_streamer.get_parts_by_name("descriptor")[0]
-
-        # Invoke the move API to prevent the part streamer from attempting
-        # to clean up (the file backed package will do that itself)
-        desc_part.move(desc_part.f_out.name)
-
-        package_name = desc_part.get_filename()
-        package_path = desc_part.f_out.name
+    def on_download_succeeded(self, job):
+        self.log.message(DownloadSuccess("Package downloaded."))
 
         extractor = extract.UploadPackageExtractor(self.log)
         file_backed_packages = extractor.create_packages_from_upload(
-                package_name, package_path
+                job.filename, job.filepath
                 )
 
-        return file_backed_packages
+        self.extract(file_backed_packages)
+
+    def on_download_failed(self, job):
+        self.log.error(job.detail)
+        self.log.message(DownloadError("Package download failed. {}".format(job.detail)))
+        self.log.message(UpdateFailure())
+
+    def download_package(self):
+
+        _, filename = tempfile.mkstemp()
+        url_downloader = downloader.UrlDownloader(
+                self.url,
+                auth=self.auth,
+                file_obj=filename,
+                decompress_on_fly=True,
+                log=self.log)
+        url_downloader.delegate = self
+        url_downloader.download()
 
     def get_package_store(self, package):
         return self.package_store_map[package.descriptor_type]
@@ -556,7 +363,6 @@ class UpdatePackage(threading.Thread):
         finally:
             _ = [image_hdl.close() for image_hdl in name_hdl_map.values()]
 
-
     def extract_charms(self, package):
         try:
             charm_extractor = rift.package.charm.PackageCharmExtractor(self.log)
@@ -608,24 +414,22 @@ class UpdatePackage(threading.Thread):
             raise MessageException(UpdateDescriptorError(package.descriptor_file)) from e
 
 
-class OnboardPackage(threading.Thread):
-    def __init__(self, log, loop, part_streamer, auth,
+class OnboardPackage(downloader.DownloaderProtocol):
+
+    def __init__(self, log, loop, url, auth,
                  onboarder, uploader, package_store_map):
-        super().__init__()
         self.log = log
         self.loop = loop
-        self.part_streamer = part_streamer
+        self.url = url
         self.auth = auth
         self.onboarder = onboarder
         self.uploader = uploader
         self.package_store_map = package_store_map
 
-        self.io_loop = tornado.ioloop.IOLoop.current()
-
-    def _onboard_package(self):
+    def _onboard_package(self, packages):
         # Extract package could return multiple packages if
         # the package is converted
-        for pkg in self.extract_package():
+        for pkg in packages:
             with pkg as temp_package:
                 package_checksums = self.validate_package(temp_package)
                 stored_package = self.store_package(temp_package)
@@ -645,9 +449,9 @@ class OnboardPackage(threading.Thread):
                 else:
                     self.upload_images(temp_package, package_checksums)
 
-    def run(self):
+    def extract(self, packages):
         try:
-            self._onboard_package()
+            self._onboard_package(packages)
             self.log.message(OnboardSuccess())
 
         except MessageException as e:
@@ -660,26 +464,32 @@ class OnboardPackage(threading.Thread):
                 self.log.message(OnboardError(str(e)))
             self.log.message(OnboardFailure())
 
-        finally:
-            self.part_streamer.release_parts()
-
-    def extract_package(self):
-        """Extract multipart message from tarball"""
-        desc_part = self.part_streamer.get_parts_by_name("descriptor")[0]
-
-        # Invoke the move API to prevent the part streamer from attempting
-        # to clean up (the file backed package will do that itself)
-        desc_part.move(desc_part.f_out.name)
-
-        package_name = desc_part.get_filename()
-        package_path = desc_part.f_out.name
+    def on_download_succeeded(self, job):
+        self.log.message(DownloadSuccess("Package downloaded."))
 
         extractor = extract.UploadPackageExtractor(self.log)
         file_backed_packages = extractor.create_packages_from_upload(
-                package_name, package_path
+                job.filename, job.filepath
                 )
 
-        return file_backed_packages
+        self.extract(file_backed_packages)
+
+    def on_download_failed(self, job):
+        self.log.error(job.detail)
+        self.log.message(DownloadError("Package download failed. {}".format(job.detail)))
+        self.log.message(OnboardFailure())
+
+    def download_package(self):
+
+        _, filename = tempfile.mkstemp()
+        url_downloader = downloader.UrlDownloader(
+                self.url,
+                auth=self.auth,
+                file_obj=filename,
+                decompress_on_fly=True,
+                log=self.log)
+        url_downloader.delegate = self
+        url_downloader.download()
 
     def get_package_store(self, package):
         return self.package_store_map[package.descriptor_type]
@@ -783,50 +593,97 @@ class OnboardPackage(threading.Thread):
 
 
 class UploaderApplication(tornado.web.Application):
-    def __init__(self, tasklet):
-        self.tasklet = tasklet
+
+    @classmethod
+    def from_tasklet(cls, tasklet):
+        manifest = tasklet.tasklet_info.get_pb_manifest()
+        use_ssl = manifest.bootstrap_phase.rwsecurity.use_ssl
+        ssl_cert = manifest.bootstrap_phase.rwsecurity.cert
+        ssl_key = manifest.bootstrap_phase.rwsecurity.key
+        return cls(
+                tasklet.log,
+                tasklet.dts,
+                tasklet.loop,
+                ssl=(ssl_cert, ssl_key),
+                vnfd_store=tasklet.vnfd_package_store,
+                nsd_store=tasklet.nsd_package_store,
+                vnfd_catalog=tasklet.vnfd_catalog,
+                nsd_catalog=tasklet.nsd_catalog)
+
+    def __init__(
+            self,
+            log,
+            dts,
+            loop,
+            ssl=None,
+            vnfd_store=None,
+            nsd_store=None,
+            vnfd_catalog=None,
+            nsd_catalog=None):
+
+        self.log = log
+        self.loop = loop
+        self.dts = dts
+
+        self.use_ssl = False
+        self.ssl_cert, self.ssl_key = None, None
+        if ssl:
+            self.use_ssl = True
+            self.ssl_cert, self.ssl_key = ssl
+
+        if not vnfd_store:
+            vnfd_store = rift.package.store.VnfdPackageFilesystemStore(self.log)
+
+        if not nsd_store:
+            nsd_store = rift.package.store.NsdPackageFilesystemStore(self.log)
+
         self.accounts = []
         self.messages = collections.defaultdict(list)
         self.export_dir = os.path.join(os.environ['RIFT_ARTIFACTS'], 'launchpad/exports')
 
-        manifest = tasklet.tasklet_info.get_pb_manifest()
-        self.use_ssl = manifest.bootstrap_phase.rwsecurity.use_ssl
-        self.ssl_cert = manifest.bootstrap_phase.rwsecurity.cert
-        self.ssl_key = manifest.bootstrap_phase.rwsecurity.key
-
-        self.uploader = image.ImageUploader(self.log, self.loop, tasklet.dts)
+        self.uploader = image.ImageUploader(self.log, self.loop, self.dts)
         self.onboarder = onboard.DescriptorOnboarder(
                 self.log, "127.0.0.1", 8008, self.use_ssl, self.ssl_cert, self.ssl_key
                 )
         self.package_store_map = {
-                "vnfd": self.tasklet.vnfd_package_store,
-                "nsd": self.tasklet.nsd_package_store,
+                "vnfd": vnfd_store,
+                "nsd": nsd_store
                 }
 
         self.exporter = export.DescriptorPackageArchiveExporter(self.log)
         self.loop.create_task(export.periodic_export_cleanup(self.log, self.loop, self.export_dir))
 
-        attrs = dict(log=self.log, loop=self.loop)
+        self.vnfd_catalog = vnfd_catalog
+        self.nsd_catalog = nsd_catalog
+        catalog_map = {
+                 "vnfd": self.vnfd_catalog,
+                 "nsd": self.nsd_catalog
+                 }
+
+        self.upload_handler = UploadRpcHandler(self.log, self.dts, self.loop, self)
+        self.update_handler = UpdateRpcHandler(self.log, self.dts, self.loop, self)
+        self.export_handler = export.ExportRpcHandler(
+                    self.log,
+                    self.dts,
+                    self.loop,
+                    self,
+                    store_map=self.package_store_map,
+                    exporter=self.exporter,
+                    catalog_map=catalog_map
+                    )
 
-        export_attrs = attrs.copy()
-        export_attrs.update({
-            "store_map": self.package_store_map,
-            "exporter": self.exporter,
-            "catalog_map": {
-                "vnfd": self.vnfd_catalog,
-                "nsd": self.nsd_catalog
-                }
-            })
+        attrs = dict(log=self.log, loop=self.loop)
 
         super(UploaderApplication, self).__init__([
-            (r"/api/update", UpdateHandler, attrs),
-            (r"/api/upload", UploadHandler, attrs),
+            (r"/api/package/vnfd/(.*)", pkg_handler.FileRestApiHandler, {
+                'path': vnfd_store.root_dir}),
+            (r"/api/package/nsd/(.*)", pkg_handler.FileRestApiHandler, {
+                'path': nsd_store.root_dir}),
 
             (r"/api/upload/([^/]+)/state", UploadStateHandler, attrs),
             (r"/api/update/([^/]+)/state", UpdateStateHandler, attrs),
             (r"/api/export/([^/]+)/state", export.ExportStateHandler, attrs),
 
-            (r"/api/export/(nsd|vnfd)$", export.ExportHandler, export_attrs),
             (r"/api/export/([^/]+.tar.gz)", tornado.web.StaticFileHandler, {
                 "path": self.export_dir,
                 }),
@@ -835,47 +692,42 @@ class UploaderApplication(tornado.web.Application):
                 }),
             ])
 
-    @property
-    def log(self):
-        return self.tasklet.log
-
-    @property
-    def loop(self):
-        return self.tasklet.loop
+    @asyncio.coroutine
+    def register(self):
+        yield from self.upload_handler.register()
+        yield from self.update_handler.register()
+        yield from self.export_handler.register()
 
     def get_logger(self, transaction_id):
         return message.Logger(self.log, self.messages[transaction_id])
 
-    def onboard(self, part_streamer, transaction_id, auth=None):
+    def onboard(self, url, transaction_id, auth=None):
         log = message.Logger(self.log, self.messages[transaction_id])
 
-        OnboardPackage(
+        onboard_package = OnboardPackage(
                 log,
                 self.loop,
-                part_streamer,
+                url,
                 auth,
                 self.onboarder,
                 self.uploader,
                 self.package_store_map,
-                ).start()
+                )
 
-    def update(self, part_streamer, transaction_id, auth=None):
+        self.loop.run_in_executor(None, onboard_package.download_package)
+
+    def update(self, url, transaction_id, auth=None):
         log = message.Logger(self.log, self.messages[transaction_id])
 
-        UpdatePackage(
+        update_package = UpdatePackage(
                 log,
                 self.loop,
-                part_streamer,
+                url,
                 auth,
                 self.onboarder,
                 self.uploader,
                 self.package_store_map,
-                ).start()
+                )
 
-    @property
-    def vnfd_catalog(self):
-        return self.tasklet.vnfd_catalog
+        self.loop.run_in_executor(None, update_package.download_package)
 
-    @property
-    def nsd_catalog(self):
-        return self.tasklet.nsd_catalog
index 8f2e904..ed6a3c0 100644 (file)
@@ -36,3 +36,13 @@ rift_py3test(utest_package.py
   TEST_ARGS
   ${CMAKE_CURRENT_SOURCE_DIR}/utest_package.py
   )
+
+rift_py3test(utest_uploader_app_dts.create.py
+  TEST_ARGS
+  ${CMAKE_CURRENT_SOURCE_DIR}/utest_uploader_app_dts.py TestCase.test_package_create_rpc
+  )
+
+rift_py3test(utest_uploader_app_dts.export.py
+  TEST_ARGS
+  ${CMAKE_CURRENT_SOURCE_DIR}/utest_uploader_app_dts.py TestCase.test_package_export
+  )
diff --git a/rwlaunchpad/plugins/rwlaunchpadtasklet/test/utest_fileserver.py b/rwlaunchpad/plugins/rwlaunchpadtasklet/test/utest_fileserver.py
new file mode 100755 (executable)
index 0000000..e56ec04
--- /dev/null
@@ -0,0 +1,109 @@
+#!/usr/bin/env python3
+
+#
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+
+import argparse
+import asyncio
+import logging
+import os
+import sys
+import tornado.platform.asyncio
+import tornado.testing
+import tornado.web
+import tempfile
+import unittest
+import json
+import xmlrunner
+
+from rift.package.handler import FileRestApiHandler
+
+import gi
+gi.require_version('NsdYang', '1.0')
+gi.require_version('VnfdYang', '1.0')
+
+from gi.repository import (
+        NsdYang,
+        VnfdYang,
+        )
+
+
+class FileServerTestCase(tornado.testing.AsyncHTTPTestCase):
+    def setUp(self):
+        self._log = logging.getLogger(__file__)
+        self._loop = asyncio.get_event_loop()
+
+        super().setUp()
+        self._port = self.get_http_port()
+
+    def get_new_ioloop(self):
+        return tornado.platform.asyncio.AsyncIOMainLoop()
+
+    def get_app(self):
+
+        def create_mock_structure():
+            path = tempfile.mkdtemp()
+            package_path = os.path.join(path, "pong_vnfd")
+            os.makedirs(package_path)
+            open(os.path.join(path, "pong_vnfd.xml"), "wb").close()
+            open(os.path.join(path, "logo.png"), "wb").close()
+
+            return path
+
+        self.path = create_mock_structure()
+        print (self.path)
+
+        return tornado.web.Application([
+            (r"/api/package/vnfd/(.*)", FileRestApiHandler, {"path": self.path}),
+            ])
+
+    def test_get_file(self):
+        response = self.fetch("/api/package/vnfd/pong_vnfd.xml")
+        assert response.code == 200
+
+    def test_get_folder(self):
+        response = self.fetch("/api/package/vnfd/")
+        assert response.code == 200
+
+        data = json.loads(response.body.decode("utf-8"))
+        files = [content['name'] for content in data['contents']]
+        assert "pong_vnfd.xml" in files
+        assert "logo.png" in files
+
+
+def main(argv=sys.argv[1:]):
+    logging.basicConfig(format='TEST %(message)s')
+
+    runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
+    parser = argparse.ArgumentParser()
+    parser.add_argument('-v', '--verbose', action='store_true')
+    parser.add_argument('-n', '--no-runner', action='store_true')
+
+    args, unknown = parser.parse_known_args(argv)
+    if args.no_runner:
+        runner = None
+
+    # Set the global logging level
+    logging.getLogger().setLevel(logging.DEBUG if args.verbose else logging.ERROR)
+
+    # The unittest framework requires a program name, so use the name of this
+    # file instead (we do not want to have to pass a fake program name to main
+    # when this is called from the interpreter).
+    unittest.main(argv=[__file__] + unknown + ["-v"], testRunner=runner)
+
+if __name__ == '__main__':
+    main()
diff --git a/rwlaunchpad/plugins/rwlaunchpadtasklet/test/utest_uploader_app_dts.py b/rwlaunchpad/plugins/rwlaunchpadtasklet/test/utest_uploader_app_dts.py
new file mode 100755 (executable)
index 0000000..fdc2e22
--- /dev/null
@@ -0,0 +1,228 @@
+#!/usr/bin/env python3
+
+#
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+import asyncio
+import argparse
+import os
+import logging
+
+import shutil
+import stat
+import sys
+import unittest
+import uuid
+import xmlrunner
+import tornado
+import tornado.escape
+import tornado.ioloop
+import tornado.web
+import tornado.httputil
+
+import gi
+import requests
+from tornado.platform.asyncio import AsyncIOMainLoop
+from tornado.ioloop import IOLoop
+from concurrent.futures.thread import ThreadPoolExecutor
+from concurrent.futures.process import ProcessPoolExecutor
+gi.require_version('RwDts', '1.0')
+gi.require_version('RwPkgMgmtYang', '1.0')
+from gi.repository import (
+        RwDts as rwdts,
+        RwPkgMgmtYang,
+        RwVnfdYang
+
+        )
+
+import rift.tasklets.rwlaunchpad.uploader as uploader
+import rift.tasklets.rwlaunchpad.message as message
+import rift.tasklets.rwlaunchpad.export as export
+import rift.test.dts
+import mock
+
+TEST_STRING = "foobar"
+
+class TestCase(rift.test.dts.AbstractDTSTest):
+    @classmethod
+    def configure_schema(cls):
+        return RwPkgMgmtYang.get_schema()
+
+    @classmethod
+    def configure_timeout(cls):
+        return 240
+
+    def configure_test(self, loop, test_id):
+        self.log.debug("STARTING - %s", test_id)
+        self.tinfo = self.new_tinfo(str(test_id))
+        self.dts = rift.tasklets.DTS(self.tinfo, self.schema, self.loop)
+
+
+        mock_vnfd_catalog = mock.MagicMock()
+        self.uid, path = self.create_mock_package()
+
+        mock_vnfd = RwVnfdYang.YangData_Vnfd_VnfdCatalog_Vnfd.from_dict({
+              "id": self.uid
+            })
+        mock_vnfd_catalog = {self.uid: mock_vnfd}
+
+        self.app = uploader.UploaderApplication(
+                self.log,
+                self.dts,
+                self.loop,
+                vnfd_catalog=mock_vnfd_catalog)
+
+        AsyncIOMainLoop().install()
+        self.server = tornado.httpserver.HTTPServer(
+            self.app,
+            io_loop=IOLoop.current(),
+        )
+
+    def tearDown(self):
+        super().tearDown()
+
+    def create_mock_package(self):
+        uid = str(uuid.uuid4())
+        path = os.path.join(
+                os.getenv('RIFT_ARTIFACTS'),
+                "launchpad/packages/vnfd",
+                uid)
+
+        package_path = os.path.join(path, "pong_vnfd")
+
+        os.makedirs(package_path)
+        open(os.path.join(path, "pong_vnfd.xml"), "wb").close()
+        open(os.path.join(path, "logo.png"), "wb").close()
+
+        return uid, path
+
+    @rift.test.dts.async_test
+    def test_package_create_rpc(self):
+        """
+            1. Verify the package-create RPC handler
+            2. Check if the log messages are updated which will be used by UI
+                for polling
+            3. Verify the package-update RPC handler
+            4. Check if the log messages are updated which will be used by UI
+                for polling
+        """
+        yield from self.app.register()
+        ip = RwPkgMgmtYang.YangInput_RwPkgMgmt_PackageCreate.from_dict({
+                "package_type": "VNFD",
+                "external_url":  "http://repo.riftio.com/releases/open.riftio.com/4.2.1/VNFS/ping_vnfd.tar.gz"
+                })
+
+        rpc_out = yield from self.dts.query_rpc(
+                    "I,/rw-pkg-mgmt:package-create",
+                    rwdts.XactFlag.TRACE,
+                    ip)
+
+        trans_id = None
+        for itr in rpc_out:
+            result = yield from itr
+            trans_id = result.result.transaction_id
+
+        assert trans_id is not None
+
+        yield from asyncio.sleep(5, loop=self.loop)
+        # Verify the message logs
+        data = self.app.messages[trans_id]
+        assert data is not None
+        data = data[1]
+        assert type(data) is message.DownloadSuccess
+
+        # Update
+        ip = RwPkgMgmtYang.YangInput_RwPkgMgmt_PackageUpdate.from_dict({
+                "package_type": "VNFD",
+                "external_url":  "http://repo.riftio.com/releases/open.riftio.com/4.2.1/VNFS/ping_vnfd.tar.gz"
+                })
+        rpc_out = yield from self.dts.query_rpc(
+                    "I,/rw-pkg-mgmt:package-update",
+                    rwdts.XactFlag.TRACE,
+                    ip)
+
+        trans_id = None
+        for itr in rpc_out:
+            result = yield from itr
+            trans_id = result.result.transaction_id
+
+        assert trans_id is not None
+        yield from asyncio.sleep(5, loop=self.loop)
+        # Verify the message logs
+        data = self.app.messages[trans_id]
+        assert data is not None
+        data = data[1]
+        assert type(data) is message.DownloadSuccess
+
+
+    @rift.test.dts.async_test
+    def test_package_export(self):
+        """
+            1. Verify if the package export RPC handler work
+            2. A file is actually generated in the exports dir.
+        """
+        yield from self.app.register()
+        ip = RwPkgMgmtYang.YangInput_RwPkgMgmt_PackageExport.from_dict({
+                "package_type": "VNFD",
+                "package_id": self.uid
+                })
+
+        rpc_out = yield from self.dts.query_rpc(
+                    "I,/rw-pkg-mgmt:package-export",
+                    rwdts.XactFlag.TRACE,
+                    ip)
+
+        trans_id = None
+        filename = None
+        for itr in rpc_out:
+            result = yield from itr
+            trans_id = result.result.transaction_id
+            filename = result.result.filename
+
+        assert trans_id is not None
+
+        # Verify the message logs
+        data = self.app.messages[trans_id]
+        assert data is not None
+        data = data[-1]
+        assert type(data) is export.ExportSuccess
+        path = os.path.join(
+                os.getenv("RIFT_ARTIFACTS"),
+                "launchpad/exports",
+                filename)
+
+
+        print (path)
+        assert os.path.isfile(path)
+
+
+def main():
+    runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
+
+    parser = argparse.ArgumentParser()
+    parser.add_argument('-v', '--verbose', action='store_true')
+    parser.add_argument('-n', '--no-runner', action='store_true')
+    args, unittest_args = parser.parse_known_args()
+    if args.no_runner:
+        runner = None
+    logging.basicConfig(format='TEST %(message)s')
+    logging.getLogger().setLevel(logging.DEBUG)
+
+
+    unittest.main(testRunner=runner, argv=[sys.argv[0]] + unittest_args)
+
+if __name__ == '__main__':
+    main()
index 3163405..fb0b039 100644 (file)
@@ -326,7 +326,7 @@ class MonParamsDtsTestCase(rift.test.dts.AbstractDTSTest):
         yield from self._test_publish("COUNT", 2)
 
     @rift.test.dts.async_test
-    def _test_legacy_nsr_monitor_publish_avg(self):
+    def test_legacy_nsr_monitor_publish_avg(self):
         yield from self._test_publish("AVERAGE", 1, legacy=True)
 
 
diff --git a/rwlaunchpad/plugins/rwpkgmgr/CMakeLists.txt b/rwlaunchpad/plugins/rwpkgmgr/CMakeLists.txt
new file mode 100644 (file)
index 0000000..4be45d0
--- /dev/null
@@ -0,0 +1,50 @@
+#
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# Author(s): Varun Prasad
+# Creation Date: 2016/07/01
+#
+
+include(rift_plugin)
+
+set(TASKLET_NAME rwpkgmgr)
+
+##
+# This function creates an install target for the plugin artifacts
+##
+rift_install_python_plugin(${TASKLET_NAME} ${TASKLET_NAME}.py)
+
+# Workaround RIFT-6485 - rpmbuild defaults to python2 for
+# anything not in a site-packages directory so we have to
+# install the plugin implementation in site-packages and then
+# import it from the actual plugin.
+rift_python_install_tree(
+  FILES
+    rift/tasklets/${TASKLET_NAME}/__init__.py
+    rift/tasklets/${TASKLET_NAME}/${TASKLET_NAME}.py
+    rift/tasklets/${TASKLET_NAME}/rpc.py
+    rift/tasklets/${TASKLET_NAME}/downloader/__init__.py
+    rift/tasklets/${TASKLET_NAME}/downloader/url.py
+    rift/tasklets/${TASKLET_NAME}/proxy/__init__.py
+    rift/tasklets/${TASKLET_NAME}/proxy/base.py
+    rift/tasklets/${TASKLET_NAME}/proxy/filesystem.py
+    rift/tasklets/${TASKLET_NAME}/publisher/__init__.py
+    rift/tasklets/${TASKLET_NAME}/publisher/download_status.py
+    rift/tasklets/${TASKLET_NAME}/subscriber/__init__.py
+    rift/tasklets/${TASKLET_NAME}/subscriber/download_status.py
+  COMPONENT ${PKG_LONG_NAME}
+  PYTHON3_ONLY)
+
+rift_add_subdirs(test)
\ No newline at end of file
diff --git a/rwlaunchpad/plugins/rwpkgmgr/Makefile b/rwlaunchpad/plugins/rwpkgmgr/Makefile
new file mode 100644 (file)
index 0000000..2b691a8
--- /dev/null
@@ -0,0 +1,36 @@
+# 
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# Author(s): Tim Mortsolf
+# Creation Date: 11/25/2013
+# 
+
+##
+# Define a Makefile function: find_upwards(filename)
+#
+# Searches for a file of the given name in the directory ., .., ../.., ../../.., etc.,
+# until the file is found or the root directory is reached
+##
+find_upward = $(word 1, $(shell while [ `pwd` != / ] ; do find `pwd` -maxdepth 1 -name $1 ; cd .. ; done))
+
+##
+# Call find_upward("Makefile.top") to find the nearest upwards adjacent Makefile.top
+##
+makefile.top := $(call find_upward, "Makefile.top")
+
+##
+# If Makefile.top was found, then include it
+##
+include $(makefile.top)
diff --git a/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/__init__.py b/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/__init__.py
new file mode 100644 (file)
index 0000000..3ebee9d
--- /dev/null
@@ -0,0 +1 @@
+from .rwpkgmgr import PackageManagerTasklet
diff --git a/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/downloader/__init__.py b/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/downloader/__init__.py
new file mode 100644 (file)
index 0000000..1cb78dc
--- /dev/null
@@ -0,0 +1 @@
+from .url import PackageFileDownloader
\ No newline at end of file
diff --git a/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/downloader/url.py b/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/downloader/url.py
new file mode 100644 (file)
index 0000000..ec1abf0
--- /dev/null
@@ -0,0 +1,103 @@
+import rift.downloader as downloader
+from gi.repository import RwPkgMgmtYang
+
+
+TaskStatus = RwPkgMgmtYang.TaskStatus
+
+
+class PackageFileDownloader(downloader.UrlDownloader):
+    STATUS_MAP = {
+        downloader.DownloadStatus.STARTED: TaskStatus.QUEUED.value_nick.upper(),
+        downloader.DownloadStatus.IN_PROGRESS: TaskStatus.IN_PROGRESS.value_nick.upper(),
+        downloader.DownloadStatus.COMPLETED: TaskStatus.COMPLETED.value_nick.upper(),
+        downloader.DownloadStatus.FAILED: TaskStatus.FAILED.value_nick.upper(),
+        downloader.DownloadStatus.CANCELLED: TaskStatus.CANCELLED.value_nick.upper()
+        }
+
+    @classmethod
+    def from_rpc_input(cls, rpc_input, file_obj, proxy, log=None, auth=None):
+        """Convenience class to set up an instance form RPC data
+        """
+        url_downloader = cls(
+            rpc_input.external_url,
+            rpc_input.package_id,
+            rpc_input.package_path,
+            rpc_input.package_type,
+            auth=auth,
+            proxy=proxy,
+            file_obj=file_obj,
+            log=log)
+
+        return url_downloader
+
+    def __init__(self,
+                 url,
+                 package_id,
+                 package_path,
+                 package_type,
+                 proxy,
+                 file_obj=None,
+                 delete_on_fail=True,
+                 decompress_on_fly=False,
+                 auth=None,
+                 log=None):
+        super().__init__(
+                url,
+                file_obj=file_obj,
+                delete_on_fail=delete_on_fail,
+                decompress_on_fly=decompress_on_fly,
+                auth=auth,
+                log=log)
+
+        self.package_id = package_id
+        self.package_type = package_type
+        self.package_path = package_path
+        self.proxy = proxy
+
+    def convert_to_yang(self):
+
+        job = RwPkgMgmtYang.DownloadJob.from_dict({
+                "url": self.meta.url,
+                "download_id": self.meta.download_id,
+                "package_id": self.package_id,
+                "package_path": self.package_path,
+                "package_type": self.package_type,
+                "detail": self.meta.detail,
+                "progress_percent": self.meta.progress_percent,
+                "bytes_downloaded": self.meta.bytes_downloaded,
+                "bytes_total": self.meta.bytes_total,
+                "bytes_per_second": self.meta.bytes_per_second,
+                "start_time": self.meta.start_time,
+                "stop_time": self.meta.stop_time,
+                "status": self.STATUS_MAP[self.meta.status]
+            })
+
+        return job
+
+    # Start of delegate calls
+    def call_delegate(self, event):
+        if not self.delegate:
+            return
+
+        job = self.convert_to_yang()
+        getattr(self.delegate, event)(job)
+
+
+    def download_succeeded(self):
+
+        try:
+            # Add the file to package
+            self.proxy.package_file_add(
+                self.meta.filepath,
+                self.package_type,
+                self.package_id,
+                self.package_path)
+
+        except Exception as e:
+            self.log.exception(e)
+            self.job.detail = str(e)
+            self.download_failed()
+            return
+
+        super().download_succeeded()
+
diff --git a/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/proxy/__init__.py b/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/proxy/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/proxy/base.py b/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/proxy/base.py
new file mode 100644 (file)
index 0000000..d0a1536
--- /dev/null
@@ -0,0 +1,86 @@
+# 
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# Author(s): Varun Prasad
+# Creation Date: 09/25/2016
+# 
+
+
+import abc
+import asyncio
+
+
+class AbstractPackageManagerProxy():
+    """Proxy interface that need to be implemented by the package store
+    """
+    @abc.abstractmethod
+    @asyncio.coroutine
+    def endpoint(self, package_type, package_id):
+        """Rest based endpoint to reveal the package contents
+
+        Args:
+            package_type (str): NSD, VNFD
+            package_id (str) ID
+        
+        Returns:
+            str: URL of the endpoint
+        
+        """
+        pass
+
+    @abc.abstractmethod
+    @asyncio.coroutine
+    def schema(self, package_type):
+        """Summary
+        
+        Args:
+            package_type (str): Type of package (NSD|VNFD)
+        
+        Returns:
+            list: List of top level dirs
+        
+        """
+        pass
+
+    @abc.abstractmethod
+    @asyncio.coroutine
+    def package_file_add(self, new_file, package_type, package_id, package_path):
+        """Add file to a package
+        
+        Args:
+            new_file (str): Path to the new file
+            package_type (str): NSD,VNFD
+            package_id (str): ID
+            package_path (str): relative path into the package.
+        
+        Returns:
+            Bool: True, If operation succeeded.
+        """
+        pass
+
+    @abc.abstractmethod
+    @asyncio.coroutine
+    def package_file_delete(self, package_type, package_id, package_path):
+        """delete file from a package
+        
+        Args:
+            package_type (str): NSD,VNFD
+            package_id (str): ID
+            package_path (str): relative path into the package.
+        
+        Returns:
+            Bool: True, If operation succeeded.
+        """
+        pass
diff --git a/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/proxy/filesystem.py b/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/proxy/filesystem.py
new file mode 100644 (file)
index 0000000..a303424
--- /dev/null
@@ -0,0 +1,128 @@
+#
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# Author(s): Varun Prasad
+# Creation Date: 09/25/2016
+#
+
+import asyncio
+import os
+
+import rift.package.store as store
+import rift.package.package
+
+from .base import AbstractPackageManagerProxy
+
+
+class UnknownPackageType(Exception):
+    pass
+
+
+class FileSystemProxy(AbstractPackageManagerProxy):
+    """Proxy for Filesystem based store.
+    """
+    PACKAGE_TYPE_MAP = {"vnfd": store.VnfdPackageFilesystemStore,
+                        "nsd": store.NsdPackageFilesystemStore}
+
+    # Refer: https://confluence.riftio.com/display/ATG/Launchpad+package+formats
+    SCHEMA = {
+        "nsd": ["icons", "ns_config", "scripts", "vnf_config"],
+        "vnfd": ["charms", "cloud_init", "icons", "images", "scripts"]
+    }
+
+    SCHEMA_TO_PERMS = {'scripts': 0o777}
+
+    def __init__(self, loop, log):
+        self.loop = loop
+        self.log = log
+        self.store_cache = {}
+
+    def _get_store(self, package_type):
+        store_cls = self.PACKAGE_TYPE_MAP[package_type]
+        store = self.store_cache.setdefault(package_type, store_cls(self.log))
+
+        return store
+
+    @asyncio.coroutine
+    def endpoint(self, package_type, package_id):
+        package_type = package_type.lower()
+        if package_type not in self.PACKAGE_TYPE_MAP:
+            raise UnknownPackageType()
+
+        store = self._get_store(package_type)
+
+        package = store._get_package_dir(package_id)
+        rel_path = os.path.relpath(package, start=store.root_dir)
+
+        url = "https://127.0.0.1:4567/api/package/{}/{}".format(package_type, rel_path)
+
+        return url
+
+    @asyncio.coroutine
+    def schema(self, package_type):
+        package_type = package_type.lower()
+        if package_type not in self.PACKAGE_TYPE_MAP:
+            raise UnknownPackageType()
+
+        return self.SCHEMA[package_type]
+
+    def package_file_add(self, new_file, package_type, package_id, package_path):
+        # Get the schema from thr package path
+        # the first part will always be the vnfd/nsd name
+        mode = 0o664
+        components = package_path.split("/")
+        if len(components) > 2:
+            schema = components[1]
+            mode = self.SCHEMA_TO_PERMS.get(schema, mode)
+
+        # Fetch the package object
+        package_type = package_type.lower()
+        store = self._get_store(package_type)
+        package = store.get_package(package_id)
+
+        # Construct abs path of the destination obj
+        path = store._get_package_dir(package_id)
+        dest_file = os.path.join(path, package_path)
+
+        try:
+            package.insert_file(new_file, dest_file, package_path, mode=mode)
+        except rift.package.package.PackageAppendError as e:
+            self.log.exception(e)
+            return False
+
+        return True
+
+    def package_file_delete(self, package_type, package_id, package_path):
+        package_type = package_type.lower()
+        store = self._get_store(package_type)
+        package = store.get_package(package_id)
+
+        # package_path has to be relative, so strip off the starting slash if
+        # provided incorrectly.
+        if package_path[0] == "/":
+            package_path = package_path[1:]
+
+        # Construct abs path of the destination obj
+        path = store._get_package_dir(package_id)
+        dest_file = os.path.join(path, package_path)
+
+        try:
+            package.delete_file(dest_file, package_path)
+        except rift.package.package.PackageAppendError as e:
+            self.log.exception(e)
+            return False
+
+        return True
+
diff --git a/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/publisher/__init__.py b/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/publisher/__init__.py
new file mode 100644 (file)
index 0000000..90d9047
--- /dev/null
@@ -0,0 +1 @@
+from .download_status import DownloadStatusPublisher
\ No newline at end of file
diff --git a/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/publisher/download_status.py b/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/publisher/download_status.py
new file mode 100644 (file)
index 0000000..6890241
--- /dev/null
@@ -0,0 +1,96 @@
+# 
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# Author(s): Varun Prasad
+# Creation Date: 09/25/2016
+# 
+
+import asyncio
+import uuid
+
+from gi.repository import (RwDts as rwdts)
+import rift.mano.dts as mano_dts
+
+import rift.downloader as url_downloader
+
+
+class DownloadStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProtocol):
+
+    def __init__(self, log, dts, loop):
+        super().__init__(log, dts, loop)
+        self.tasks = {}
+
+    def xpath(self, download_id=None):
+        return ("D,/rw-pkg-mgmt:download-jobs/rw-pkg-mgmt:job" +
+            ("[download-id='{}']".format(download_id) if download_id else ""))
+
+    @asyncio.coroutine
+    def register(self):
+        self.reg = yield from self.dts.register(xpath=self.xpath(),
+                  flags=rwdts.Flag.PUBLISHER|rwdts.Flag.CACHE|rwdts.Flag.NO_PREP_READ)
+
+        assert self.reg is not None
+
+
+    def on_download_progress(self, download_job_msg):
+        """callback that triggers update.
+        """
+        key = download_job_msg.download_id
+        # Trigger progess update
+        self.reg.update_element(
+                self.xpath(download_id=key),
+                download_job_msg)
+
+    def on_download_finished(self, download_job_msg):
+        """callback that triggers update.
+        """
+
+        # clean up the local cache
+        key = download_job_msg.download_id
+        if key in self.tasks:
+            del self.tasks[key]
+
+        # Publish the final state
+        self.reg.update_element(
+                self.xpath(download_id=key),
+                download_job_msg)
+
+    @asyncio.coroutine
+    def register_downloader(self, downloader):
+        downloader.delegate = self
+        future = self.loop.run_in_executor(None, downloader.download)
+        self.tasks[downloader.download_id] = (downloader, future)
+
+        return downloader.download_id
+
+    @asyncio.coroutine
+    def cancel_download(self, key):
+        task, future = self.tasks[key]
+
+        future.cancel()
+        task.cancel_download()
+
+    def stop(self):
+        self.deregister()
+
+        for task, future in self.tasks:
+            task.cancel()
+            future.cancel()
+
+    def deregister(self):
+        """ de-register with dts """
+        if self.reg is not None:
+            self.reg.deregister()
+            self.reg = None
diff --git a/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/rpc.py b/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/rpc.py
new file mode 100644 (file)
index 0000000..f55a8fd
--- /dev/null
@@ -0,0 +1,176 @@
+# 
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# Author(s): Varun Prasad
+# Creation Date: 09/25/2016
+# 
+
+import abc
+import asyncio
+import tempfile
+
+from gi.repository import (
+   RwDts as rwdts,
+   RwPkgMgmtYang)
+import rift.tasklets
+import rift.mano.dts as mano_dts
+
+from . import downloader as pkg_downloader
+
+# Shortcuts
+RPC_PKG_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_GetPackageEndpoint
+RPC_SCHEMA_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_GetPackageSchema
+RPC_PACKAGE_ADD_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_PackageFileAdd
+RPC_PACKAGE_DELETE_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_PackageFileDelete
+
+
+class EndpointDiscoveryRpcHandler(mano_dts.AbstractRpcHandler):
+    """RPC handler to generate the endpoint for the Package manager."""
+
+    def __init__(self, log, dts, loop, proxy):
+        """
+        Args:
+            proxy: Any impl of .proxy.AbstractPackageManagerProxy
+        """
+        super().__init__(log, dts, loop)
+        self.proxy = proxy
+
+    @property
+    def xpath(self):
+        return "/rw-pkg-mgmt:get-package-endpoint"
+
+    @asyncio.coroutine
+    def callback(self, ks_path, msg):
+        """Forwards the request to proxy.
+        """
+        url = yield from self.proxy.endpoint(
+                msg.package_type,
+                msg.package_id)
+
+        rpc_op = RPC_PKG_ENDPOINT.from_dict({"endpoint": url})
+
+        return rpc_op
+
+
+class SchemaRpcHandler(mano_dts.AbstractRpcHandler):
+    """RPC handler to generate the schema for the packages.
+    """
+    def __init__(self, log, dts, loop, proxy):
+        """
+        Args:
+            proxy: Any impl of .proxy.AbstractPackageManagerProxy
+        """
+        super().__init__(log, dts, loop)
+        self.proxy = proxy
+
+    @property
+    def xpath(self):
+        return "/rw-pkg-mgmt:get-package-schema"
+
+    @asyncio.coroutine
+    def callback(self, ks_path, msg):
+
+        package_type = msg.package_type.lower()
+        schema = yield from self.proxy.schema(msg.package_type)
+
+        rpc_op = RPC_SCHEMA_ENDPOINT()
+        for dirname in schema:
+            rpc_op.schema.append(dirname)
+
+        return rpc_op
+
+
+class PackageOperationsRpcHandler(mano_dts.AbstractRpcHandler):
+    """File add RPC
+
+    Steps:
+    1. For a request, we schedule a download in the background
+    2. We register the downloader to a publisher to push out the download status
+        Note: The publisher starts the download automatically.
+    3. Return a tracking ID for the client to monitor the entire status
+
+    """
+    def __init__(self, log, dts, loop, proxy, publisher):
+        """
+        Args:
+            proxy: Any impl of .proxy.AbstractPackageManagerProxy
+            publisher: Instance of DownloadStatusPublisher
+        """
+        super().__init__(log, dts, loop)
+        self.proxy = proxy
+        self.publisher = publisher
+
+    @property
+    def xpath(self):
+        return "/rw-pkg-mgmt:package-file-add"
+
+    @asyncio.coroutine
+    def callback(self, ks_path, msg):
+        if not msg.external_url:
+            # For now we will only support External URL download
+            raise Exception ("No download URL provided")
+
+        # Create a tmp file to download the url
+        # We first store the data in temp and post download finish
+        # we move the file to actual location.
+        _, filename = tempfile.mkstemp()
+
+        auth = None
+        if msg.username is not None:
+            auth = (msg.username, msg.password)
+
+        url_downloader = pkg_downloader.PackageFileDownloader.from_rpc_input(
+                msg,
+                auth=auth,
+                file_obj=filename,
+                proxy=self.proxy,
+                log=self.log)
+
+        download_id = yield from self.publisher.register_downloader(url_downloader)
+
+        rpc_op = RPC_PACKAGE_ADD_ENDPOINT.from_dict({"task_id": download_id})
+
+        return rpc_op
+
+
+class PackageDeleteOperationsRpcHandler(mano_dts.AbstractRpcHandler):
+    def __init__(self, log, dts, loop, proxy):
+        """
+        Args:
+            proxy: Any impl of .proxy.AbstractPackageManagerProxy
+        """
+        super().__init__(log, dts, loop)
+        self.proxy = proxy
+
+    @property
+    def xpath(self):
+        return "/rw-pkg-mgmt:package-file-delete"
+
+    @asyncio.coroutine
+    def callback(self, ks_path, msg):
+
+        rpc_op = RPC_PACKAGE_DELETE_ENDPOINT.from_dict({"status": str(True)})
+
+        try:
+            self.proxy.package_file_delete(
+                msg.package_type,
+                msg.package_id,
+                msg.package_path)
+        except Exception as e:
+            self.log.exception(e)
+            rpc_op.status = str(False)
+            rpc_op.error_trace = str(e)
+
+        return rpc_op
diff --git a/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/rwpkgmgr.py b/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/rwpkgmgr.py
new file mode 100644 (file)
index 0000000..d4e5736
--- /dev/null
@@ -0,0 +1,124 @@
+"""
+# 
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+@file rwpkgmgr.py
+@author Varun Prasad (varun.prasad@riftio.com)
+@date 18-Sep-2016
+
+"""
+
+import asyncio
+
+import gi
+gi.require_version('RwDts', '1.0')
+gi.require_version('RwPkgMgmtYang', '1.0')
+
+
+from gi.repository import (
+        RwDts as rwdts,
+        RwPkgMgmtYang)
+import rift.tasklets
+
+
+from . import rpc
+from .proxy import filesystem
+from . import publisher as pkg_publisher
+
+
+class PackageManagerTasklet(rift.tasklets.Tasklet):
+    def __init__(self, *args, **kwargs):
+        try:
+            super().__init__(*args, **kwargs)
+            self.rwlog.set_category("rw-mano-log")
+            self.endpoint_rpc = None
+            self.schema_rpc = None
+        except Exception as e:
+            self.log.exception(e)
+
+    def start(self):
+        super().start()
+
+        self.log.debug("Registering with dts")
+
+        self.dts = rift.tasklets.DTS(
+                self.tasklet_info,
+                RwPkgMgmtYang.get_schema(),
+                self.loop,
+                self.on_dts_state_change
+                )
+
+        proxy = filesystem.FileSystemProxy(self.loop, self.log)
+
+        args = [self.log, self.dts, self.loop]
+        self.job_handler = pkg_publisher.DownloadStatusPublisher(*args)
+
+        args.append(proxy)
+        self.endpoint_rpc = rpc.EndpointDiscoveryRpcHandler(*args)
+        self.schema_rpc = rpc.SchemaRpcHandler(*args)
+        self.delete_rpc = rpc.PackageDeleteOperationsRpcHandler(*args)
+
+        args.append(self.job_handler)
+        self.pkg_op = rpc.PackageOperationsRpcHandler(*args)
+
+    def stop(self):
+        try:
+            self.dts.deinit()
+        except Exception as e:
+            self.log.exception(e)
+
+    @asyncio.coroutine
+    def init(self):
+        yield from self.endpoint_rpc.register()
+        yield from self.schema_rpc.register()
+        yield from self.pkg_op.register()
+        yield from self.job_handler.register()
+        yield from self.delete_rpc.register()
+
+    @asyncio.coroutine
+    def run(self):
+        pass
+
+    @asyncio.coroutine
+    def on_dts_state_change(self, state):
+        """Handle DTS state change
+
+        Take action according to current DTS state to transition application
+        into the corresponding application state
+
+        Arguments
+            state - current dts state
+
+        """
+        switch = {
+            rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
+            rwdts.State.CONFIG: rwdts.State.RUN,
+        }
+
+        handlers = {
+            rwdts.State.INIT: self.init,
+            rwdts.State.RUN: self.run,
+        }
+
+        # Transition application to next state
+        handler = handlers.get(state, None)
+        if handler is not None:
+            yield from handler()
+
+        # Transition dts to next state
+        next_state = switch.get(state, None)
+        if next_state is not None:
+            self.dts.handle.set_state(next_state)
diff --git a/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/subscriber/__init__.py b/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/subscriber/__init__.py
new file mode 100644 (file)
index 0000000..6dcc156
--- /dev/null
@@ -0,0 +1 @@
+from .download_status import DownloadStatusSubscriber
\ No newline at end of file
diff --git a/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/subscriber/download_status.py b/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/subscriber/download_status.py
new file mode 100644 (file)
index 0000000..c4a5a53
--- /dev/null
@@ -0,0 +1,28 @@
+# 
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# Author(s): Varun Prasad
+# Creation Date: 09/25/2016
+# 
+
+import rift.mano.dts as mano_dts
+
+
+class DownloadStatusSubscriber(mano_dts.AbstractOpdataSubscriber):
+    def __init__(self, log, dts, loop, callback=None):
+        super().__init__(log, dts, loop, callback)
+
+    def get_xpath(self):
+        return ("D,/rw-pkg-mgmt:download-jobs/rw-pkg-mgmt:job")
diff --git a/rwlaunchpad/plugins/rwpkgmgr/rwpkgmgr.py b/rwlaunchpad/plugins/rwpkgmgr/rwpkgmgr.py
new file mode 100644 (file)
index 0000000..a71bce3
--- /dev/null
@@ -0,0 +1,25 @@
+# 
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+# Workaround RIFT-6485 - rpmbuild defaults to python2 for
+# anything not in a site-packages directory so we have to
+# install the plugin implementation in site-packages and then
+# import it from the actual plugin.
+
+import rift.tasklets.rwpkgmgr
+
+class Tasklet(rift.tasklets.rwpkgmgr.PackageManagerTasklet):
+    pass
diff --git a/rwlaunchpad/plugins/rwpkgmgr/test/CMakeLists.txt b/rwlaunchpad/plugins/rwpkgmgr/test/CMakeLists.txt
new file mode 100644 (file)
index 0000000..a42e8e9
--- /dev/null
@@ -0,0 +1,101 @@
+# 
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# Author(s): Varun Prasad
+# Creation Date: 28/09/2016
+# 
+
+##
+# utest_subscriber_dts
+##
+rift_py3test(utest_subscriber_dts.py
+  TEST_ARGS
+  ${CMAKE_CURRENT_SOURCE_DIR}/utest_subscriber_dts.py
+  )
+
+##
+# utest_publisher_dts
+##
+rift_py3test(utest_publisher_dts.test_download_publisher
+  TEST_ARGS
+  ${CMAKE_CURRENT_SOURCE_DIR}/utest_publisher_dts.py TestCase.test_download_publisher
+  )
+
+rift_py3test(utest_publisher_dts.test_publish
+  TEST_ARGS
+  ${CMAKE_CURRENT_SOURCE_DIR}/utest_publisher_dts.py TestCase.test_publish
+  )
+
+rift_py3test(utest_publisher_dts.test_url_download
+  TEST_ARGS
+  ${CMAKE_CURRENT_SOURCE_DIR}/utest_publisher_dts.py TestCase.test_url_download
+  )
+
+rift_py3test(utest_publisher_dts.test_cancelled
+  TEST_ARGS
+  ${CMAKE_CURRENT_SOURCE_DIR}/utest_publisher_dts.py TestCase.test_cancelled
+  )
+
+add_custom_target(utest_publisher_dts.py
+  DEPENDS
+    utest_publisher_dts.test_download_publisher
+    utest_publisher_dts.test_publish
+    utest_publisher_dts.test_url_download
+    utest_publisher_dts.test_cancelled
+  )
+
+##
+# utest_filesystem_proxy_dts.py
+##
+rift_py3test(utest_filesystem_proxy.test_endpoint_discovery
+  TEST_ARGS
+  ${CMAKE_CURRENT_SOURCE_DIR}/utest_filesystem_proxy_dts.py TestCase.test_endpoint_discovery
+  )
+
+rift_py3test(utest_filesystem_proxy.test_schema_rpc
+  TEST_ARGS
+  ${CMAKE_CURRENT_SOURCE_DIR}/utest_filesystem_proxy_dts.py TestCase.test_schema_rpc
+  )
+
+rift_py3test(utest_filesystem_proxy.test_file_proxy_rpc
+  TEST_ARGS
+  ${CMAKE_CURRENT_SOURCE_DIR}/utest_filesystem_proxy_dts.py TestCase.test_file_proxy_rpc
+  )
+
+rift_py3test(utest_filesystem_proxy.test_file_add_workflow
+  TEST_ARGS
+  ${CMAKE_CURRENT_SOURCE_DIR}/utest_filesystem_proxy_dts.py TestCase.test_file_add_workflow
+  )
+
+rift_py3test(utest_filesystem_proxy.test_file_delete_workflow
+  TEST_ARGS
+  ${CMAKE_CURRENT_SOURCE_DIR}/utest_filesystem_proxy_dts.py TestCase.test_file_delete_workflow
+  )
+
+add_custom_target(utest_filesystem_proxy_dts.py
+  DEPENDS
+    utest_filesystem_proxy.test_endpoint_discovery
+    utest_filesystem_proxy.test_schema_rpc
+    utest_filesystem_proxy.test_file_proxy_rpc
+    utest_filesystem_proxy.test_file_add_workflow
+    utest_filesystem_proxy.test_file_delete_workflow
+  )
+
+add_custom_target(rwpkgmgmt_test
+  DEPENDS
+    utest_filesystem_proxy_dts.py
+    utest_publisher_dts.py
+    utest_subscriber_dts.py
+  )
diff --git a/rwlaunchpad/plugins/rwpkgmgr/test/utest_filesystem_proxy_dts.py b/rwlaunchpad/plugins/rwpkgmgr/test/utest_filesystem_proxy_dts.py
new file mode 100755 (executable)
index 0000000..75b310a
--- /dev/null
@@ -0,0 +1,252 @@
+#!/usr/bin/env python3
+
+#
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+import asyncio
+import argparse
+import logging
+import os
+import shutil
+import stat
+import sys
+import unittest
+import uuid
+import xmlrunner
+
+import gi
+gi.require_version('RwDts', '1.0')
+gi.require_version('RwPkgMgmtYang', '1.0')
+from gi.repository import (
+        RwDts as rwdts,
+        RwPkgMgmtYang,
+        )
+from rift.tasklets.rwpkgmgr.proxy import filesystem
+
+import rift.tasklets.rwpkgmgr.publisher as pkg_publisher
+import rift.tasklets.rwpkgmgr.rpc as rpc
+import rift.test.dts
+
+TEST_STRING = "foobar"
+
+class TestCase(rift.test.dts.AbstractDTSTest):
+    @classmethod
+    def configure_schema(cls):
+        return RwPkgMgmtYang.get_schema()
+
+    @classmethod
+    def configure_timeout(cls):
+        return 240
+
+    def configure_test(self, loop, test_id):
+        self.log.debug("STARTING - %s", test_id)
+        self.tinfo = self.new_tinfo(str(test_id))
+        self.dts = rift.tasklets.DTS(self.tinfo, self.schema, self.loop)
+
+    def tearDown(self):
+        super().tearDown()
+
+    def create_mock_package(self):
+        uid = str(uuid.uuid4())
+        path = os.path.join(
+                os.getenv('RIFT_ARTIFACTS'),
+                "launchpad/packages/vnfd",
+                uid)
+
+        package_path = os.path.join(path, "pong_vnfd")
+
+        os.makedirs(package_path)
+        open(os.path.join(path, "pong_vnfd.xml"), "wb").close()
+        open(os.path.join(path, "logo.png"), "wb").close()
+
+        return uid, path
+
+    @rift.test.dts.async_test
+    def test_endpoint_discovery(self):
+        """
+        Verifies the following:
+            The endpoint RPC returns a URL
+        """
+        proxy = filesystem.FileSystemProxy(self.loop, self.log)
+        endpoint = rpc.EndpointDiscoveryRpcHandler(self.log, self.dts, self.loop, proxy)
+        yield from endpoint.register()
+
+        ip = RwPkgMgmtYang.YangInput_RwPkgMgmt_GetPackageEndpoint.from_dict({
+                "package_type": "VNFD",
+                "package_id": "BLAHID"})
+
+        rpc_out = yield from self.dts.query_rpc(
+                    "I,/get-package-endpoint",
+                    rwdts.XactFlag.TRACE,
+                    ip)
+
+        for itr in rpc_out:
+            result = yield from itr
+            assert result.result.endpoint == 'https://127.0.0.1:4567/api/package/vnfd/BLAHID'
+
+    @rift.test.dts.async_test
+    def test_schema_rpc(self):
+        """
+        Verifies the following:
+            The schema RPC return the schema structure
+        """
+        proxy = filesystem.FileSystemProxy(self.loop, self.log)
+        endpoint = rpc.SchemaRpcHandler(self.log, self.dts, self.loop, proxy)
+        yield from endpoint.register()
+
+        ip = RwPkgMgmtYang.YangInput_RwPkgMgmt_GetPackageSchema.from_dict({
+                "package_type": "VNFD"})
+
+        rpc_out = yield from self.dts.query_rpc(
+                    "I,/get-package-schema",
+                    rwdts.XactFlag.TRACE,
+                    ip)
+
+        for itr in rpc_out:
+            result = yield from itr
+            assert "charms" in result.result.schema
+
+    @rift.test.dts.async_test
+    def test_file_proxy_rpc(self):
+        """
+            1. The file RPC returns a valid UUID thro' DTS
+        """
+        assert_uid = str(uuid.uuid4())
+        class MockPublisher:
+            @asyncio.coroutine
+            def register_downloader(self, *args):
+                return assert_uid
+
+        uid, path = self.create_mock_package()
+
+        proxy = filesystem.FileSystemProxy(self.loop, self.log)
+        endpoint = rpc.PackageOperationsRpcHandler(
+            self.log,
+            self.dts,
+            self.loop,
+            proxy,
+            MockPublisher())
+        yield from endpoint.register()
+
+        ip = RwPkgMgmtYang.YangInput_RwPkgMgmt_PackageFileAdd.from_dict({
+                "package_type": "VNFD",
+                "package_id": uid,
+                "external_url": "https://raw.githubusercontent.com/RIFTIO/RIFT.ware/master/rift-shell",
+                "package_path": "script/rift-shell"})
+
+        rpc_out = yield from self.dts.query_rpc(
+                    "I,/rw-pkg-mgmt:package-file-add",
+                    rwdts.XactFlag.TRACE,
+                    ip)
+
+        for itr in rpc_out:
+            result = yield from itr
+            assert result.result.task_id == assert_uid
+
+        shutil.rmtree(path)
+
+    @rift.test.dts.async_test
+    def test_file_add_workflow(self):
+        """
+            Integration test:
+                1. Verify the end to end flow of package ADD (NO MOCKS)
+        """
+        uid, path = self.create_mock_package()
+
+        proxy = filesystem.FileSystemProxy(self.loop, self.log)
+        publisher = pkg_publisher.DownloadStatusPublisher(self.log, self.dts, self.loop)
+        endpoint = rpc.PackageOperationsRpcHandler(
+            self.log,
+            self.dts,
+            self.loop,
+            proxy,
+            publisher)
+
+        yield from publisher.register()
+        yield from endpoint.register()
+
+        ip = RwPkgMgmtYang.YangInput_RwPkgMgmt_PackageFileAdd.from_dict({
+                "package_type": "VNFD",
+                "package_id": uid,
+                "external_url": "https://raw.githubusercontent.com/RIFTIO/RIFT.ware/master/rift-shell",
+                "package_path": "icons/rift-shell"})
+
+        rpc_out = yield from self.dts.query_rpc(
+                    "I,/rw-pkg-mgmt:package-file-add",
+                    rwdts.XactFlag.TRACE,
+                    ip)
+
+        yield from asyncio.sleep(5, loop=self.loop)
+        filepath = os.path.join(path, ip.package_path)
+        assert os.path.isfile(filepath)
+        mode = oct(os.stat(filepath)[stat.ST_MODE])
+        assert str(mode) == "0o100664"
+
+        shutil.rmtree(path)
+
+
+    @rift.test.dts.async_test
+    def test_file_delete_workflow(self):
+        """
+            Integration test:
+                1. Verify the end to end flow of package ADD (NO MOCKS)
+        """
+        uid, path = self.create_mock_package()
+
+        proxy = filesystem.FileSystemProxy(self.loop, self.log)
+        endpoint = rpc.PackageDeleteOperationsRpcHandler(
+            self.log,
+            self.dts,
+            self.loop,
+            proxy)
+
+        yield from endpoint.register()
+
+        ip = RwPkgMgmtYang.YangInput_RwPkgMgmt_PackageFileDelete.from_dict({
+                "package_type": "VNFD",
+                "package_id": uid,
+                "package_path": "logo.png"})
+
+        assert os.path.isfile(os.path.join(path, ip.package_path))
+
+        rpc_out = yield from self.dts.query_rpc(
+                    "I,/rw-pkg-mgmt:package-file-delete",
+                    rwdts.XactFlag.TRACE,
+                    ip)
+
+        yield from asyncio.sleep(5, loop=self.loop)
+        assert not os.path.isfile(os.path.join(path, ip.package_path))
+
+        shutil.rmtree(path)
+
+def main():
+    runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
+
+    parser = argparse.ArgumentParser()
+    parser.add_argument('-v', '--verbose', action='store_true')
+    parser.add_argument('-n', '--no-runner', action='store_true')
+    args, unittest_args = parser.parse_known_args()
+    if args.no_runner:
+        runner = None
+
+    logging.basicConfig(format='TEST %(message)s')
+    logging.getLogger().setLevel(logging.DEBUG)
+
+
+    unittest.main(testRunner=runner, argv=[sys.argv[0]] + unittest_args)
+
+if __name__ == '__main__':
+    main()
diff --git a/rwlaunchpad/plugins/rwpkgmgr/test/utest_publisher_dts.py b/rwlaunchpad/plugins/rwpkgmgr/test/utest_publisher_dts.py
new file mode 100755 (executable)
index 0000000..335638e
--- /dev/null
@@ -0,0 +1,201 @@
+#!/usr/bin/env python3
+
+# 
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+import argparse
+import asyncio
+import logging
+import mock
+import os
+import sys
+import unittest
+import uuid
+import xmlrunner
+
+import gi
+gi.require_version('RwDts', '1.0')
+gi.require_version('RwPkgMgmtYang', '1.0')
+from gi.repository import (
+        RwDts as rwdts,
+        RwPkgMgmtYang
+        )
+import rift.tasklets.rwpkgmgr.downloader as downloader
+import rift.tasklets.rwpkgmgr.publisher as pkg_publisher
+import rift.test.dts
+
+
+class TestCase(rift.test.dts.AbstractDTSTest):
+    @classmethod
+    def configure_schema(cls):
+        return RwPkgMgmtYang.get_schema()
+
+    @classmethod
+    def configure_timeout(cls):
+        return 240
+
+    def configure_test(self, loop, test_id):
+        self.log.debug("STARTING - %s", test_id)
+        self.tinfo = self.new_tinfo(str(test_id))
+        self.dts = rift.tasklets.DTS(self.tinfo, self.schema, self.loop)
+
+        self.job_handler = pkg_publisher.DownloadStatusPublisher(self.log, self.dts, self.loop)
+
+    def tearDown(self):
+        super().tearDown()
+
+    @asyncio.coroutine
+    def get_published_xpaths(self):
+        published_xpaths = set()
+
+        res_iter = yield from self.dts.query_read("D,/rwdts:dts")
+        for i in res_iter:
+            res = (yield from i).result
+            for member in res.member:
+                published_xpaths |= {reg.keyspec for reg in member.state.registration if reg.flags == "publisher"}
+
+        return published_xpaths
+
+    @asyncio.coroutine
+    def read_xpath(self, xpath):
+        itr = yield from self.dts.query_read(xpath)
+
+        result = None
+        for fut in itr:
+            result = yield from fut
+            return result.result
+
+    @rift.test.dts.async_test
+    def test_download_publisher(self):
+        yield from self.job_handler.register()
+        published_xpaths = yield from self.get_published_xpaths()
+        assert self.job_handler.xpath() in published_xpaths
+
+    @rift.test.dts.async_test
+    def test_publish(self):
+        """
+        Asserts:
+            1. Verify if an update on_download_progess & on_download_finished
+               triggers a DTS update
+            2. Verify if the internal store is updated
+        """
+        yield from self.job_handler.register()
+
+        mock_msg = RwPkgMgmtYang.DownloadJob.from_dict({
+                "url": "http://foo/bar",
+                "package_id": "123",
+                "download_id": str(uuid.uuid4())})
+
+        self.job_handler.on_download_progress(mock_msg)
+        yield from asyncio.sleep(5, loop=self.loop)
+
+        itr = yield from self.dts.query_read("/download-jobs/job[download-id='{}']".format(
+            mock_msg.download_id))
+
+        result = None
+        for fut in itr:
+            result = yield from fut
+            result = result.result
+
+        print (mock_msg)
+        assert result == mock_msg
+
+        # Modify the msg
+        mock_msg.url = "http://bar/foo"
+        self.job_handler.on_download_finished(mock_msg)
+        yield from asyncio.sleep(5, loop=self.loop)
+        
+        itr = yield from self.dts.query_read("/download-jobs/job[download-id='{}']".format(
+            mock_msg.download_id))
+
+        result = None
+        for fut in itr:
+            result = yield from fut
+            result = result.result
+        assert result == mock_msg
+
+
+    @rift.test.dts.async_test
+    def test_url_download(self):
+        """
+        Integration Test:
+            Test the updates with download/url.py
+        """
+        yield from self.job_handler.register()
+
+        proxy = mock.MagicMock()
+
+        url = "https://raw.githubusercontent.com/RIFTIO/RIFT.ware/master/rift-shell"
+        url_downloader = downloader.PackageFileDownloader(url, "1", "/", "VNFD", proxy)
+
+        download_id = yield from self.job_handler.register_downloader(url_downloader)
+        assert download_id is not None
+        
+        yield from asyncio.sleep(5, loop=self.loop)
+        xpath = "/download-jobs/job[download-id='{}']".format(
+            download_id)
+        result = yield from self.read_xpath(xpath)
+        print (result)
+        assert result.status == "COMPLETED"
+        assert len(self.job_handler.tasks) == 0
+
+
+    @rift.test.dts.async_test
+    def test_cancelled(self):
+        """
+        Integration Test:
+            1. Test the updates with downloader.py
+            2. Verifies if cancel triggers the job status to move to cancelled
+        """
+        yield from self.job_handler.register()
+
+        proxy = mock.MagicMock()
+        url = "http://mirror.0x.sg/fedora/linux/releases/24/CloudImages/x86_64/images/Fedora-Cloud-Base-24-1.2.x86_64.qcow2"
+        url_downloader = downloader.PackageFileDownloader(url, "1", "/", "VNFD", proxy)
+
+        download_id = yield from self.job_handler.register_downloader(url_downloader)
+        assert download_id is not None
+        xpath = "/download-jobs/job[download-id='{}']".format(
+            download_id)
+
+        yield from asyncio.sleep(3, loop=self.loop)
+
+        result = yield from self.read_xpath(xpath)
+        assert result.status == "IN_PROGRESS"
+
+        yield from self.job_handler.cancel_download(download_id)
+        yield from asyncio.sleep(3, loop=self.loop)
+        result = yield from self.read_xpath(xpath)
+        assert result.status == "CANCELLED"
+        assert len(self.job_handler.tasks) == 0
+    
+
+def main():
+    runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
+
+    parser = argparse.ArgumentParser()
+    parser.add_argument('-v', '--verbose', action='store_true')
+    parser.add_argument('-n', '--no-runner', action='store_true')
+    args, unittest_args = parser.parse_known_args()
+    if args.no_runner:
+        runner = None
+
+    TestCase.log_level = logging.DEBUG if args.verbose else logging.WARN
+
+    unittest.main(testRunner=runner, argv=[sys.argv[0]] + unittest_args)
+
+if __name__ == '__main__':
+    main()
diff --git a/rwlaunchpad/plugins/rwpkgmgr/test/utest_subscriber_dts.py b/rwlaunchpad/plugins/rwpkgmgr/test/utest_subscriber_dts.py
new file mode 100755 (executable)
index 0000000..4281e11
--- /dev/null
@@ -0,0 +1,138 @@
+
+# 
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+import asyncio
+import sys
+import unittest
+import uuid
+
+import gi
+gi.require_version('RwDtsYang', '1.0')
+gi.require_version('RwPkgMgmtYang', '1.0')
+from gi.repository import (
+        RwPkgMgmtYang,
+        RwDts as rwdts,
+        )
+import rift.tasklets.rwpkgmgr.subscriber as pkg_subscriber
+import rift.test.dts
+
+
+class DescriptorPublisher(object):
+    # TODO: Need to be moved to a central page, too many copy pastes
+    def __init__(self, log, dts, loop):
+        self.log = log
+        self.loop = loop
+        self.dts = dts
+
+        self._registrations = []
+
+    @asyncio.coroutine
+    def publish(self, w_path, path, desc):
+        ready_event = asyncio.Event(loop=self.loop)
+
+        @asyncio.coroutine
+        def on_ready(regh, status):
+            self.log.debug("Create element: %s, obj-type:%s obj:%s",
+                           path, type(desc), desc)
+            with self.dts.transaction() as xact:
+                regh.create_element(path, desc, xact.xact)
+            self.log.debug("Created element: %s, obj:%s", path, desc)
+            ready_event.set()
+
+        handler = rift.tasklets.DTS.RegistrationHandler(
+                on_ready=on_ready
+                )
+
+        self.log.debug("Registering path: %s, obj:%s", w_path, desc)
+        reg = yield from self.dts.register(
+                w_path,
+                handler,
+                flags=rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ
+                )
+        self._registrations.append(reg)
+        self.log.debug("Registered path : %s", w_path)
+        yield from ready_event.wait()
+
+        return reg
+
+    def unpublish_all(self):
+        self.log.debug("Deregistering all published descriptors")
+        for reg in self._registrations:
+            reg.deregister()
+
+class SubscriberStoreDtsTestCase(rift.test.dts.AbstractDTSTest):
+    @classmethod
+    def configure_schema(cls):
+       return RwPkgMgmtYang.get_schema()
+
+    @classmethod
+    def configure_timeout(cls):
+        return 240
+
+    def configure_test(self, loop, test_id):
+        self.log.debug("STARTING - %s", test_id)
+        self.tinfo = self.new_tinfo(str(test_id))
+        self.dts = rift.tasklets.DTS(self.tinfo, self.schema, self.loop)
+        self.publisher = DescriptorPublisher(self.log, self.dts, self.loop)
+
+    def tearDown(self):
+        super().tearDown()
+
+    @rift.test.dts.async_test
+    def test_download_status_handler(self):
+
+        mock_msg = RwPkgMgmtYang.DownloadJob.from_dict({
+                "url": "http://foo/bar",
+                "package_id": "123",
+                "download_id": str(uuid.uuid4())})
+
+        w_xpath = "D,/rw-pkg-mgmt:download-jobs/rw-pkg-mgmt:job"
+        xpath = "{}[download-id='{}']".format(w_xpath, mock_msg.download_id)
+
+        mock_called = False
+        def mock_cb(msg, status):
+            nonlocal mock_called
+            assert msg == mock_msg
+            mock_called = True
+
+        sub =  pkg_subscriber.DownloadStatusSubscriber(
+            self.log,
+            self.dts,
+            self.loop,
+            callback=mock_cb)
+
+        yield from sub.register()
+        yield from asyncio.sleep(1, loop=self.loop)
+
+        yield from self.publisher.publish(w_xpath, xpath, mock_msg)
+        yield from asyncio.sleep(1, loop=self.loop)
+
+        assert mock_called is True
+
+
+def main(argv=sys.argv[1:]):
+
+    # The unittest framework requires a program name, so use the name of this
+    # file instead (we do not want to have to pass a fake program name to main
+    # when this is called from the interpreter).
+    unittest.main(
+            argv=[__file__] + argv,
+            testRunner=None#xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
+            )
+
+if __name__ == '__main__':
+    main()
\ No newline at end of file
diff --git a/rwlaunchpad/plugins/rwstagingmgr/CMakeLists.txt b/rwlaunchpad/plugins/rwstagingmgr/CMakeLists.txt
new file mode 100644 (file)
index 0000000..71f0704
--- /dev/null
@@ -0,0 +1,52 @@
+#
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# Author(s): Varun Prasad
+# Creation Date: 2016/07/01
+#
+
+include(rift_plugin)
+
+set(TASKLET_NAME rwstagingmgr)
+
+##
+# This function creates an install target for the plugin artifacts
+##
+rift_install_python_plugin(${TASKLET_NAME} ${TASKLET_NAME}.py)
+
+
+# Workaround RIFT-6485 - rpmbuild defaults to python2 for
+# anything not in a site-packages directory so we have to
+# install the plugin implementation in site-packages and then
+# import it from the actual plugin.
+rift_python_install_tree(
+  FILES
+    rift/tasklets/${TASKLET_NAME}/__init__.py
+    rift/tasklets/${TASKLET_NAME}/${TASKLET_NAME}.py
+    rift/tasklets/${TASKLET_NAME}/publisher/__init__.py
+    rift/tasklets/${TASKLET_NAME}/publisher/staging_status.py
+    rift/tasklets/${TASKLET_NAME}/protocol.py
+    rift/tasklets/${TASKLET_NAME}/rpc.py
+    rift/tasklets/${TASKLET_NAME}/server/__init__.py
+    rift/tasklets/${TASKLET_NAME}/server/app.py
+    rift/tasklets/${TASKLET_NAME}/server/handler.py
+    rift/tasklets/${TASKLET_NAME}/store/__init__.py
+    rift/tasklets/${TASKLET_NAME}/store/file_store.py
+    rift/tasklets/${TASKLET_NAME}/model/__init__.py
+    rift/tasklets/${TASKLET_NAME}/model/staging_area.py
+  COMPONENT ${PKG_LONG_NAME}
+  PYTHON3_ONLY)
+
+rift_add_subdirs(test)
\ No newline at end of file
diff --git a/rwlaunchpad/plugins/rwstagingmgr/Makefile b/rwlaunchpad/plugins/rwstagingmgr/Makefile
new file mode 100644 (file)
index 0000000..2b691a8
--- /dev/null
@@ -0,0 +1,36 @@
+# 
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# Author(s): Tim Mortsolf
+# Creation Date: 11/25/2013
+# 
+
+##
+# Define a Makefile function: find_upwards(filename)
+#
+# Searches for a file of the given name in the directory ., .., ../.., ../../.., etc.,
+# until the file is found or the root directory is reached
+##
+find_upward = $(word 1, $(shell while [ `pwd` != / ] ; do find `pwd` -maxdepth 1 -name $1 ; cd .. ; done))
+
+##
+# Call find_upward("Makefile.top") to find the nearest upwards adjacent Makefile.top
+##
+makefile.top := $(call find_upward, "Makefile.top")
+
+##
+# If Makefile.top was found, then include it
+##
+include $(makefile.top)
diff --git a/rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/__init__.py b/rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/__init__.py
new file mode 100644 (file)
index 0000000..2c9a85d
--- /dev/null
@@ -0,0 +1 @@
+from .rwstagingmgr import StagingManagerTasklet
diff --git a/rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/model/__init__.py b/rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/model/__init__.py
new file mode 100644 (file)
index 0000000..4c70e42
--- /dev/null
@@ -0,0 +1 @@
+from .staging_area import StagingArea
\ No newline at end of file
diff --git a/rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/model/staging_area.py b/rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/model/staging_area.py
new file mode 100644 (file)
index 0000000..473999f
--- /dev/null
@@ -0,0 +1,55 @@
+#
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# Author(s): Varun Prasad
+# Creation Date: 09/28/2016
+#
+
+
+import time
+
+import gi
+gi.require_version('RwStagingMgmtYang', '1.0')
+from gi.repository import (
+        RwStagingMgmtYang,
+        )
+
+
+class StagingArea(object):
+    """A pythonic wrapper around the GI object StagingArea
+    """
+    def __init__(self, model=None):
+        self._model = model
+        if not self._model:
+            self._model = RwStagingMgmtYang.StagingArea.from_dict({})
+
+    @property
+    def area_id(self):
+        return self._model.area_id
+
+    @property
+    def model(self):
+        return self._model
+
+    @property
+    def has_expired(self):
+        current_time = time.time()
+        expiry_time = self.model.created_time + self.model.validity_time
+        if expiry_time <= current_time:
+            return True
+        return False
+
+    def as_dict(self):
+        return self._model.as_dict()
diff --git a/rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/protocol.py b/rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/protocol.py
new file mode 100644 (file)
index 0000000..4591759
--- /dev/null
@@ -0,0 +1,10 @@
+class StagingStorePublisherProtocol:
+    def on_recovery(self, staging_areas):
+        pass
+
+class StagingStoreProtocol(object):
+    def on_staging_area_create(self, store):
+        pass
+
+    def on_staging_area_delete(self, store):
+        pass
\ No newline at end of file
diff --git a/rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/publisher/__init__.py b/rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/publisher/__init__.py
new file mode 100644 (file)
index 0000000..732d845
--- /dev/null
@@ -0,0 +1 @@
+from .staging_status import StagingStorePublisher
\ No newline at end of file
diff --git a/rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/publisher/staging_status.py b/rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/publisher/staging_status.py
new file mode 100644 (file)
index 0000000..82e2da5
--- /dev/null
@@ -0,0 +1,75 @@
+#
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# Author(s): Varun Prasad
+# Creation Date: 09/25/2016
+#
+
+import asyncio
+import uuid
+
+from gi.repository import (RwDts as rwdts)
+import rift.mano.dts as mano_dts
+import rift.tasklets
+
+from ..protocol import StagingStoreProtocol
+
+class StagingStorePublisher(mano_dts.DtsHandler, StagingStoreProtocol):
+
+    def __init__(self, log, dts, loop):
+        super().__init__(log, dts, loop)
+        self.delegate = None
+
+    def xpath(self, area_id=None):
+        return ("D,/rw-staging-mgmt:staging-areas/rw-staging-mgmt:staging-area" +
+            ("[area-id='{}']".format(area_id) if area_id else ""))
+
+    @asyncio.coroutine
+    def register(self):
+        # we need a dummy callback for recovery to work
+        @asyncio.coroutine
+        def on_event(dts, g_reg, xact, xact_event, scratch_data):
+            if xact_event == rwdts.MemberEvent.INSTALL:
+                if self.delegate:
+                    self.delegate.on_recovery(self.reg.elements)
+
+            return rwdts.MemberRspCode.ACTION_OK
+
+        hdl = rift.tasklets.DTS.RegistrationHandler()
+        handlers = rift.tasklets.Group.Handler(on_event=on_event)
+        with self.dts.group_create(handler=handlers) as group:
+            self.reg = group.register(xpath=self.xpath(),
+                                        handler=hdl,
+                                        flags=(rwdts.Flag.PUBLISHER |
+                                               rwdts.Flag.NO_PREP_READ |
+                                               rwdts.Flag.CACHE |
+                                               rwdts.Flag.DATASTORE),)
+
+        assert self.reg is not None
+
+    def on_staging_area_create(self, store):
+        self.reg.update_element(self.xpath(store.area_id), store)
+
+    def on_staging_area_delete(self, store):
+        self.reg.update_element(self.xpath(store.area_id), store)
+
+    def stop(self):
+        self.deregister()
+
+    def deregister(self):
+        """ de-register with dts """
+        if self.reg is not None:
+            self.reg.deregister()
+            self.reg = None
diff --git a/rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/rpc.py b/rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/rpc.py
new file mode 100644 (file)
index 0000000..5fe12ca
--- /dev/null
@@ -0,0 +1,57 @@
+#
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# Author(s): Varun Prasad
+# Creation Date: 09/28/2016
+#
+
+import asyncio
+
+import gi
+gi.require_version("RwStagingMgmtYang", "1.0")
+from gi.repository import (
+   RwDts as rwdts,
+   RwStagingMgmtYang)
+
+import rift.mano.dts as mano_dts
+
+
+# Shortcuts
+RPC_STAGING_CREATE_ENDPOINT = RwStagingMgmtYang.YangOutput_RwStagingMgmt_CreateStagingArea
+
+
+class StagingAreaCreateRpcHandler(mano_dts.AbstractRpcHandler):
+    """RPC handler to generate staging Area"""
+
+    def __init__(self, log, dts, loop, store):
+        super().__init__(log, dts, loop)
+        self.store = store
+
+    @property
+    def xpath(self):
+        return "/rw-staging-mgmt:create-staging-area"
+
+    @asyncio.coroutine
+    def callback(self, ks_path, msg):
+        """Forwards the request to proxy.
+        """
+        self.log.debug("Got a staging create request for {}".format(msg.as_dict()))
+        staging_area = self.store.create_staging_area(msg)
+
+        rpc_op = RPC_STAGING_CREATE_ENDPOINT.from_dict({
+            "port": 4568,
+            "endpoint": "api/upload/{}".format(staging_area.model.area_id)})
+
+        return rpc_op
diff --git a/rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/rwstagingmgr.py b/rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/rwstagingmgr.py
new file mode 100644 (file)
index 0000000..04a7cae
--- /dev/null
@@ -0,0 +1,146 @@
+"""
+#
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+@file rwstagingmgr.py
+@author Varun Prasad (varun.prasad@riftio.com)
+@date 28-Sep-2016
+
+"""
+
+import asyncio
+
+import tornado
+import tornado.httpserver
+import tornado.httputil
+import tornado.platform.asyncio
+import tornadostreamform.multipart_streamer as multipart_streamer
+
+import gi
+gi.require_version('RwDts', '1.0')
+gi.require_version('RwStagingMgmtYang', '1.0')
+from gi.repository import (
+        RwDts as rwdts,
+        RwStagingMgmtYang)
+import rift.tasklets
+
+from . import rpc
+from . import store
+from .server import StagingApplication
+from .publisher import StagingStorePublisher
+
+
+class StagingManagerTasklet(rift.tasklets.Tasklet):
+    """Tasklet to handle all staging related operations
+    """
+    def __init__(self, *args, **kwargs):
+        try:
+            super().__init__(*args, **kwargs)
+        except Exception as e:
+            self.log.exception(e)
+
+    def start(self):
+        super().start()
+
+        self.log.debug("Registering with dts")
+
+        self.dts = rift.tasklets.DTS(
+                self.tasklet_info,
+                RwStagingMgmtYang.get_schema(),
+                self.loop,
+                self.on_dts_state_change
+                )
+
+    def stop(self):
+        try:
+            self.dts.deinit()
+        except Exception as e:
+            self.log.exception(e)
+
+    @asyncio.coroutine
+    def init(self):
+        self.store = store.StagingFileStore(log=self.log)
+        self.publisher = StagingStorePublisher(self.log, self.dts, self.loop)
+        # Fore recovery
+        self.publisher.delegate = self.store
+        # For create and delete events
+        self.store.delegate = self.publisher
+        yield from self.publisher.register()
+
+
+        io_loop = rift.tasklets.tornado.TaskletAsyncIOLoop(asyncio_loop=self.loop)
+        self.app = StagingApplication(self.store)
+
+        manifest = self.tasklet_info.get_pb_manifest()
+        ssl_cert = manifest.bootstrap_phase.rwsecurity.cert
+        ssl_key = manifest.bootstrap_phase.rwsecurity.key
+        ssl_options = {"certfile": ssl_cert, "keyfile": ssl_key}
+
+        if manifest.bootstrap_phase.rwsecurity.use_ssl:
+            self.server = tornado.httpserver.HTTPServer(
+                self.app,
+                max_body_size=self.app.MAX_BODY_SIZE,
+                io_loop=io_loop,
+                ssl_options=ssl_options)
+        else:
+            self.server = tornado.httpserver.HTTPServer(
+                self.app,
+                max_body_size=self.app.MAX_BODY_SIZE,
+                io_loop=io_loop,
+            )
+
+        self.create_stg_rpc = rpc.StagingAreaCreateRpcHandler(
+                self.log,
+                self.dts,
+                self.loop,
+                self.store)
+
+        yield from self.create_stg_rpc.register()
+
+    @asyncio.coroutine
+    def run(self):
+        self.server.listen(self.app.PORT)
+
+    @asyncio.coroutine
+    def on_dts_state_change(self, state):
+        """Handle DTS state change
+
+        Take action according to current DTS state to transition application
+        into the corresponding application state
+
+        Arguments
+            state - current dts state
+
+        """
+        switch = {
+            rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
+            rwdts.State.CONFIG: rwdts.State.RUN,
+        }
+
+        handlers = {
+            rwdts.State.INIT: self.init,
+            rwdts.State.RUN: self.run,
+        }
+
+        # Transition application to next state
+        handler = handlers.get(state, None)
+        if handler is not None:
+            yield from handler()
+
+        # Transition dts to next state
+        next_state = switch.get(state, None)
+        if next_state is not None:
+            self.dts.handle.set_state(next_state)
diff --git a/rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/server/__init__.py b/rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/server/__init__.py
new file mode 100644 (file)
index 0000000..9d63d1d
--- /dev/null
@@ -0,0 +1 @@
+from .app import StagingApplication
\ No newline at end of file
diff --git a/rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/server/app.py b/rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/server/app.py
new file mode 100644 (file)
index 0000000..41bbc59
--- /dev/null
@@ -0,0 +1,117 @@
+
+#
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+import logging
+import os
+import threading
+import time
+
+import requests
+# disable unsigned certificate warning
+from requests.packages.urllib3.exceptions import InsecureRequestWarning
+requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
+
+import tornado
+import tornado.escape
+import tornado.httpclient
+import tornado.httputil
+import tornado.ioloop
+import tornado.web
+
+import gi
+gi.require_version('RwStagingMgmtYang', '1.0')
+from gi.repository import (
+        RwStagingMgmtYang,
+        )
+
+from . import handler
+
+
+MB = 1024 * 1024
+GB = 1024 * MB
+MAX_STREAMED_SIZE = 5 * GB
+
+
+class StagingApplication(tornado.web.Application):
+    MAX_BUFFER_SIZE = 1 * MB  # Max. size loaded into memory!
+    MAX_BODY_SIZE = 1 * MB  # Max. size loaded into memory!
+    PORT = 4568
+
+    def __init__(self, store, cleanup_interval=60):
+
+        self.store = store
+
+        self.cleaner = CleanupThread(self.store, cleanup_interval=cleanup_interval)
+        self.cleaner.start()
+
+        super(StagingApplication, self).__init__([
+            (r"/api/upload/(.*)", handler.UploadStagingHandler, {'store': store}),
+            (r"/api/download/(.*)", tornado.web.StaticFileHandler, {'path': store.root_dir}),
+            ])
+
+
+class CleanUpStaging(object):
+    def __init__(self, store, log=None):
+        """
+        Args:
+            store : Any store obj from store opackage
+            log : Log handle
+        """
+        self.store = store
+        self.log = log or logging.getLogger()
+        self.log.setLevel(logging.DEBUG)
+
+    def cleanup(self):
+        # Extract package could return multiple packages if
+        # the package is converted
+        for root, dirs, files in os.walk(self.store.root_dir):
+            for staging_id in dirs:
+                try:
+                    staging_area = self.store.get_staging_area(staging_id)
+                    if staging_area.has_expired:
+                        self.store.remove_staging_area(staging_area)
+                except Exception as e:
+                    # Ignore the temp directories
+                    pass
+
+
+class CleanupThread(threading.Thread):
+    """Daemon thread that clean up the staging area
+    """
+    def __init__(self, store, log=None, cleanup_interval=60):
+        """
+        Args:
+            store : A compatible store object
+            log (None, optional): Log handle
+            cleanup_interval (int, optional): Cleanup interval in secs
+        """
+        super().__init__()
+        self.log = log or logging.getLogger()
+        self.store = store
+        self._cleaner = CleanUpStaging(store, log)
+        self.cleanup_interval = cleanup_interval
+        self.daemon = True
+
+    def run(self):
+        try:
+            while True:
+                self._cleaner.cleanup()
+                time.sleep(self.cleanup_interval)
+
+        except Exception as e:
+            self.log.exception(e)
+
diff --git a/rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/server/handler.py b/rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/server/handler.py
new file mode 100644 (file)
index 0000000..ce26e06
--- /dev/null
@@ -0,0 +1,148 @@
+#
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# Author(s): Varun Prasad
+# Creation Date: 09/28/2016
+#
+
+import tornado.httpclient
+import tornado.web
+import tornadostreamform.multipart_streamer as multipart_streamer
+import logging
+import os
+
+MB = 1024 * 1024
+GB = 1024 * MB
+
+MAX_STREAMED_SIZE = 5 * GB
+
+class HttpMessageError(Exception):
+    def __init__(self, code, msg):
+        self.code = code
+        self.msg = msg
+
+
+class RequestHandler(tornado.web.RequestHandler):
+    def options(self, *args, **kargs):
+        pass
+
+    def set_default_headers(self):
+        self.set_header('Access-Control-Allow-Origin', '*')
+        self.set_header('Access-Control-Allow-Headers',
+                        'Content-Type, Cache-Control, Accept, X-Requested-With, Authorization')
+        self.set_header('Access-Control-Allow-Methods', 'POST, GET, PUT, DELETE')
+
+
+class StoreStreamerPart(multipart_streamer.MultiPartStreamer):
+    """
+    Create a Part streamer with a custom temp directory. Using the default
+    tmp directory and trying to move the file to $RIFT_ARTIFACTS occasionally
+    causes link errors. So create a temp directory within the staging area.
+    """
+    def __init__(self, store, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.store = store
+
+    def create_part(self, headers):
+        return multipart_streamer.TemporaryFileStreamedPart(self, headers, tmp_dir=self.store.tmp_dir)
+
+
+@tornado.web.stream_request_body
+class UploadStagingHandler(RequestHandler):
+    def initialize(self, store):
+        """Initialize the handler
+
+        Arguments:
+            log  - the logger that this handler should use
+            loop - the tasklets ioloop
+
+        """
+        self.log = logging.getLogger()
+        self.store = store
+
+        self.part_streamer = None
+
+    @tornado.gen.coroutine
+    def prepare(self):
+        """Prepare the handler for a request
+
+        The prepare function is the first part of a request transaction. It
+        creates a temporary file that uploaded data can be written to.
+
+        """
+        if self.request.method != "POST":
+            return
+
+        self.request.connection.set_max_body_size(MAX_STREAMED_SIZE)
+
+        # Retrieve the content type and parameters from the request
+        content_type = self.request.headers.get('content-type', None)
+        if content_type is None:
+            raise tornado.httpclient.HTTPError(400, "No content type set")
+
+        content_type, params = tornado.httputil._parse_header(content_type)
+
+        if "multipart/form-data" != content_type.lower():
+            raise tornado.httpclient.HTTPError(415, "Invalid content type")
+
+        # You can get the total request size from the headers.
+        try:
+            total = int(self.request.headers.get("Content-Length", "0"))
+        except KeyError:
+            self.log.warning("Content length header not found")
+            # For any well formed browser request, Content-Length should have a value.
+            total = 0
+
+        # And here you create a streamer that will accept incoming data
+        self.part_streamer = StoreStreamerPart(self.store, total)
+
+
+    @tornado.gen.coroutine
+    def data_received(self, chunk):
+        """When a chunk of data is received, we forward it to the multipart streamer."""
+        self.part_streamer.data_received(chunk)
+
+    def post(self, staging_id):
+        """Handle a post request
+
+        The function is called after any data associated with the body of the
+        request has been received.
+
+        """
+        try:
+            # You MUST call this to close the incoming stream.
+            self.part_streamer.data_complete()
+            desc_parts = self.part_streamer.get_parts_by_name("file")
+            if len(desc_parts) != 1:
+                raise tornado.httpclient.HTTPError(400, "File option not found")
+
+            binary_data = desc_parts[0]
+            staging_area = self.store.get_staging_area(staging_id)
+            filename = binary_data.get_filename()
+            staging_area.model.name = filename
+            staging_area.model.size = binary_data.get_size()
+
+            dest_file = os.path.join(staging_area.model.path, filename)
+            binary_data.move(dest_file)
+
+            self.set_status(200)
+            self.write(tornado.escape.json_encode({
+                "path": "/api/download/{}/{}".format(staging_id, filename)
+                    }))
+
+        finally:
+            self.part_streamer.release_parts()
+            self.finish()
+
diff --git a/rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/store/__init__.py b/rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/store/__init__.py
new file mode 100644 (file)
index 0000000..bbdf4de
--- /dev/null
@@ -0,0 +1 @@
+from .file_store import StagingFileStore
\ No newline at end of file
diff --git a/rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/store/file_store.py b/rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/store/file_store.py
new file mode 100644 (file)
index 0000000..aec4180
--- /dev/null
@@ -0,0 +1,146 @@
+"""
+#
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+@file file_store.py
+@author Varun Prasad (varun.prasad@riftio.com)
+@date 28-Sep-2016
+
+"""
+
+import logging
+import os
+import shutil
+import tempfile
+import time
+import uuid
+import yaml
+
+import gi
+gi.require_version("RwStagingMgmtYang", "1.0")
+from gi.repository import RwStagingMgmtYang
+import rift.mano.dts as mano_dts
+
+from .. import model
+from ..protocol import StagingStorePublisherProtocol
+
+
+class StagingAreaExists(Exception):
+    pass
+
+class InvalidStagingArea(Exception):
+    pass
+
+class StagingStructureError(Exception):
+    pass
+
+class StagingFileStore(StagingStorePublisherProtocol):
+    """File based store for creating and managing staging areas.
+    """
+    META_YAML = "meta.yaml"
+    DEFAULT_EXPIRY = 60 * 60
+
+    def __init__(self, log=None, root_dir=None):
+        default_path = os.path.join(
+            os.getenv('RIFT_ARTIFACTS'),
+            "launchpad/staging")
+
+        self.root_dir = root_dir or default_path
+
+        if not os.path.isdir(self.root_dir):
+            os.makedirs(self.root_dir)
+
+        self.log = log or logging.getLogger()
+        self.tmp_dir = tempfile.mkdtemp(dir=self.root_dir)
+
+        self._cache = {}
+        self.delegate = None
+
+    def on_recovery(self, staging_areas):
+        for area in staging_areas:
+            staging_area = model.StagingArea(area)
+            self._cache[area.area_id] = staging_area
+
+
+    def get_staging_area(self, area_id):
+        if area_id not in self._cache:
+            raise InvalidStagingArea
+
+        return self._cache[area_id]
+
+
+    def create_staging_area(self, staging_area_config):
+        """Create the staging area
+        Args:
+            staging_area_config (YangInput_RwStagingMgmt_CreateStagingArea): Rpc input
+
+        Returns:
+            model.StagingArea
+
+        Raises:
+            StagingAreaExists: if the staging area already exists
+        """
+        area_id = str(uuid.uuid4())
+
+        container_path = os.path.join(self.root_dir, str(area_id))
+        meta_path = os.path.join(container_path, self.META_YAML)
+
+        if os.path.exists(container_path):
+            raise StagingAreaExists
+
+        # Create the dir
+        os.makedirs(container_path)
+
+        config_dict = staging_area_config.as_dict()
+        config_dict.update({
+            "area_id": area_id,
+            "created_time": time.time(),
+            "status": "LIVE",
+            "path": container_path
+            })
+
+        staging_area = RwStagingMgmtYang.StagingArea.from_dict(config_dict)
+        staging_area = model.StagingArea(staging_area)
+
+        self._cache[area_id] = staging_area
+
+        try:
+            if self.delegate:
+                self.delegate.on_staging_area_create(staging_area.model)
+        except Exception as e:
+            self.log.exception(str(e))
+
+        return staging_area
+
+    def remove_staging_area(self, staging_area):
+        """Delete the staging area
+        Args:
+            staging_area (str or model.StagingArea): Staging ID or the
+                StagingArea object
+        """
+        if type(staging_area) is str:
+            staging_area = self.get_staging_area(staging_area)
+
+        if os.path.isdir(staging_area.model.path):
+            shutil.rmtree(staging_area.model.path)
+
+        staging_area.model.status = "EXPIRED"
+
+        try:
+            if self.delegate:
+                self.delegate.on_staging_area_delete(staging_area.model)
+        except Exception as e:
+            self.log.exception(str(e))
diff --git a/rwlaunchpad/plugins/rwstagingmgr/rwstagingmgr.py b/rwlaunchpad/plugins/rwstagingmgr/rwstagingmgr.py
new file mode 100644 (file)
index 0000000..338996d
--- /dev/null
@@ -0,0 +1,25 @@
+# 
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+# Workaround RIFT-6485 - rpmbuild defaults to python2 for
+# anything not in a site-packages directory so we have to
+# install the plugin implementation in site-packages and then
+# import it from the actual plugin.
+
+import rift.tasklets.rwstagingmgr
+
+class Tasklet(rift.tasklets.rwstagingmgr.StagingManagerTasklet):
+    pass
diff --git a/rwlaunchpad/plugins/rwstagingmgr/test/CMakeLists.txt b/rwlaunchpad/plugins/rwstagingmgr/test/CMakeLists.txt
new file mode 100644 (file)
index 0000000..b70e722
--- /dev/null
@@ -0,0 +1,75 @@
+#
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# Author(s): Varun Prasad
+# Creation Date: 28/09/2016
+#
+
+##
+# utest_publisher_dts
+##
+rift_py3test(rwstg_utest_publisher_dts.test_download_publisher
+  TEST_ARGS
+  ${CMAKE_CURRENT_SOURCE_DIR}/utest_publisher_dts.py TestCase.test_download_publisher
+  )
+
+rift_py3test(rwstg_utest_publisher_dts.test_publish
+  TEST_ARGS
+  ${CMAKE_CURRENT_SOURCE_DIR}/utest_publisher_dts.py TestCase.test_publish
+  )
+
+
+add_custom_target(rwstg_utest_publisher_dts.py
+  DEPENDS
+    rwstg_utest_publisher_dts.test_download_publisher
+    rwstg_utest_publisher_dts.test_publish
+  )
+
+##
+# utest_rpc_dts.py
+##
+rift_py3test(rwstg_utest_rpc_dts.test_staging_area_create
+  TEST_ARGS
+  ${CMAKE_CURRENT_SOURCE_DIR}/utest_rpc_dts.py TestCase.test_staging_area_create
+  )
+
+add_custom_target(rwstg_utest_rpc_dts.py
+  DEPENDS
+    rwstg_utest_rpc_dts.test_staging_area_create
+  )
+
+##
+# utest_staging_store.py
+##
+rift_py3test(rwstg_utest_staging_store.py
+  TEST_ARGS
+  ${CMAKE_CURRENT_SOURCE_DIR}/utest_staging_store.py
+  )
+
+##
+# utest_tornado_app.py
+##
+rift_py3test(rwstg_utest_tornado_app.py
+  TEST_ARGS
+  ${CMAKE_CURRENT_SOURCE_DIR}/utest_tornado_app.py
+  )
+
+add_custom_target(rwstagingmgmt_test
+  DEPENDS
+    rwstg_utest_staging_store.py
+    rwstg_utest_publisher_dts.py
+    rwstg_utest_rpc_dts.py
+    rwstg_utest_tornado_app.py
+  )
diff --git a/rwlaunchpad/plugins/rwstagingmgr/test/utest_publisher_dts.py b/rwlaunchpad/plugins/rwstagingmgr/test/utest_publisher_dts.py
new file mode 100755 (executable)
index 0000000..585a0d9
--- /dev/null
@@ -0,0 +1,125 @@
+#!/usr/bin/env python3
+
+#
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+import argparse
+import asyncio
+import logging
+import os
+import sys
+import unittest
+import uuid
+import xmlrunner
+
+import gi
+gi.require_version('RwDts', '1.0')
+gi.require_version('RwStagingMgmtYang', '1.0')
+from gi.repository import (
+        RwDts as rwdts,
+        RwStagingMgmtYang
+        )
+import rift.tasklets.rwstagingmgr.publisher as publisher
+import rift.test.dts
+
+
+class TestCase(rift.test.dts.AbstractDTSTest):
+    @classmethod
+    def configure_schema(cls):
+        return RwStagingMgmtYang.get_schema()
+
+    @classmethod
+    def configure_timeout(cls):
+        return 240
+
+    def configure_test(self, loop, test_id):
+        self.log.debug("STARTING - %s", test_id)
+        self.tinfo = self.new_tinfo(str(test_id))
+        self.dts = rift.tasklets.DTS(self.tinfo, self.schema, self.loop)
+
+        self.job_handler = publisher.StagingStorePublisher(self.log, self.dts, self.loop)
+
+    def tearDown(self):
+        super().tearDown()
+
+    @asyncio.coroutine
+    def get_published_xpaths(self):
+        published_xpaths = set()
+
+        res_iter = yield from self.dts.query_read("D,/rwdts:dts")
+        for i in res_iter:
+            res = (yield from i).result
+            for member in res.member:
+                published_xpaths |= {reg.keyspec for reg in member.state.registration if reg.flags == "publisher"}
+
+        return published_xpaths
+
+    @asyncio.coroutine
+    def read_xpath(self, xpath):
+        itr = yield from self.dts.query_read(xpath)
+
+        result = None
+        for fut in itr:
+            result = yield from fut
+            return result.result
+
+    @rift.test.dts.async_test
+    def test_download_publisher(self):
+        yield from self.job_handler.register()
+        yield from asyncio.sleep(2, loop=self.loop)
+        published_xpaths = yield from self.get_published_xpaths()
+        assert self.job_handler.xpath() in published_xpaths
+
+    @rift.test.dts.async_test
+    def test_publish(self):
+        """
+        """
+        yield from self.job_handler.register()
+
+        mock_msg = RwStagingMgmtYang.StagingArea.from_dict({
+                "area_id": "123"})
+
+        self.job_handler.on_staging_area_create(mock_msg)
+        yield from asyncio.sleep(5, loop=self.loop)
+
+        itr = yield from self.dts.query_read("/staging-areas/staging-area[area-id='{}']".format(
+            mock_msg.area_id))
+
+
+        result = None
+        for fut in itr:
+            result = yield from fut
+            result = result.result
+
+        print (result)
+        assert result == mock_msg
+
+def main():
+    runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
+
+    parser = argparse.ArgumentParser()
+    parser.add_argument('-v', '--verbose', action='store_true')
+    parser.add_argument('-n', '--no-runner', action='store_true')
+    args, unittest_args = parser.parse_known_args()
+    if args.no_runner:
+        runner = None
+
+    TestCase.log_level = logging.DEBUG if args.verbose else logging.WARN
+
+    unittest.main(testRunner=runner, argv=[sys.argv[0]] + unittest_args)
+
+if __name__ == '__main__':
+    main()
diff --git a/rwlaunchpad/plugins/rwstagingmgr/test/utest_rpc_dts.py b/rwlaunchpad/plugins/rwstagingmgr/test/utest_rpc_dts.py
new file mode 100755 (executable)
index 0000000..8ae3311
--- /dev/null
@@ -0,0 +1,104 @@
+#!/usr/bin/env python3
+
+#
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+import asyncio
+import argparse
+import os
+import shutil
+import sys
+import unittest
+import uuid
+import xmlrunner
+import mock
+import uuid
+
+import gi
+gi.require_version('RwDts', '1.0')
+gi.require_version('RwStagingMgmtYang', '1.0')
+from gi.repository import (
+        RwDts as rwdts,
+        RwStagingMgmtYang,
+        )
+
+
+import rift.tasklets.rwstagingmgr.rpc as rpc
+import rift.test.dts
+
+
+class TestCase(rift.test.dts.AbstractDTSTest):
+    @classmethod
+    def configure_schema(cls):
+        return RwStagingMgmtYang.get_schema()
+
+    @classmethod
+    def configure_timeout(cls):
+        return 240
+
+    def configure_test(self, loop, test_id):
+        self.log.debug("STARTING - %s", test_id)
+        self.tinfo = self.new_tinfo(str(test_id))
+        self.dts = rift.tasklets.DTS(self.tinfo, self.schema, self.loop)
+
+    def tearDown(self):
+        super().tearDown()
+
+    @rift.test.dts.async_test
+    def test_staging_area_create(self):
+        """
+        Verifies the following:
+            The endpoint RPC returns a URL
+        """
+        uid = str(uuid.uuid4())
+        mock_model = mock.MagicMock()
+        mock_model.model.area_id = uid
+        mock_store = mock.MagicMock()
+        mock_store.create_staging_area.return_value = mock_model
+
+        endpoint = rpc.StagingAreaCreateRpcHandler(self.log, self.dts, self.loop, mock_store)
+        yield from endpoint.register()
+
+        yield from asyncio.sleep(2, loop=self.loop)
+
+        ip = RwStagingMgmtYang.YangInput_RwStagingMgmt_CreateStagingArea.from_dict({
+                "package_type": "VNFD"})
+
+        rpc_out = yield from self.dts.query_rpc(
+                    "I,/rw-staging-mgmt:create-staging-area",
+                    rwdts.XactFlag.TRACE,
+                    ip)
+
+        for itr in rpc_out:
+            result = yield from itr
+            print (result)
+            assert uid in result.result.endpoint
+
+def main():
+    runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
+
+    parser = argparse.ArgumentParser()
+    parser.add_argument('-v', '--verbose', action='store_true')
+    parser.add_argument('-n', '--no-runner', action='store_true')
+    args, unittest_args = parser.parse_known_args()
+    if args.no_runner:
+        runner = None
+
+
+    unittest.main(testRunner=runner, argv=[sys.argv[0]] + unittest_args)
+
+if __name__ == '__main__':
+    main()
diff --git a/rwlaunchpad/plugins/rwstagingmgr/test/utest_staging_store.py b/rwlaunchpad/plugins/rwstagingmgr/test/utest_staging_store.py
new file mode 100755 (executable)
index 0000000..eb71aa3
--- /dev/null
@@ -0,0 +1,101 @@
+#!/usr/bin/env python3
+
+#
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+
+import argparse
+import logging
+import io
+import os
+import sys
+import tempfile
+import unittest
+import xmlrunner
+
+from rift.tasklets.rwstagingmgr.store import StagingFileStore
+
+import gi
+gi.require_version('RwStagingMgmtYang', '1.0')
+from gi.repository import (
+        RwStagingMgmtYang,
+        )
+
+class TestSerializer(unittest.TestCase):
+
+    def test_staging_area_create(self):
+        """
+        1. Verify a valid id is created
+        2. if the folder and meta files were created.
+        3. Verify if the meta file has been created.
+
+        """
+        tmp_dir = tempfile.mkdtemp()
+        store = StagingFileStore(root_dir=tmp_dir)
+
+        mock_model = RwStagingMgmtYang.StagingArea.from_dict({})
+        stg = store.create_staging_area(mock_model)
+        mock_id = stg.model.area_id
+
+        assert mock_id == store.get_staging_area(mock_id).model.area_id
+        area_path = os.path.join(store.root_dir, mock_id)
+        print (area_path)
+        assert os.path.isdir(area_path)
+
+    def test_staging_area_remove(self):
+        """
+        1. Verify a valid id is created
+        2. if the folder and meta files were created.
+        3. Verify if the meta file has been created.
+
+        """
+        tmp_dir = tempfile.mkdtemp()
+        store = StagingFileStore(root_dir=tmp_dir)
+
+        mock_model = RwStagingMgmtYang.StagingArea.from_dict({})
+        # get the wrapped mock model
+        mock_model = store.create_staging_area(mock_model)
+        mock_id = mock_model.model.area_id
+        area_path = os.path.join(store.root_dir, mock_id)
+
+        # check if dir actually exists
+        assert os.path.isdir(area_path)
+        store.remove_staging_area(mock_model)
+
+        assert not os.path.isdir(area_path)
+
+def main(argv=sys.argv[1:]):
+    logging.basicConfig(format='TEST %(message)s')
+
+    runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
+    parser = argparse.ArgumentParser()
+    parser.add_argument('-v', '--verbose', action='store_true')
+    parser.add_argument('-n', '--no-runner', action='store_true')
+
+    args, unknown = parser.parse_known_args(argv)
+    if args.no_runner:
+        runner = None
+
+    # Set the global logging level
+    logging.getLogger().setLevel(logging.DEBUG if args.verbose else logging.ERROR)
+
+    # The unittest framework requires a program name, so use the name of this
+    # file instead (we do not want to have to pass a fake program name to main
+    # when this is called from the interpreter).
+    unittest.main(argv=[__file__] + unknown + ["-v"], testRunner=runner)
+
+if __name__ == '__main__':
+    main()
diff --git a/rwlaunchpad/plugins/rwstagingmgr/test/utest_tornado_app.py b/rwlaunchpad/plugins/rwstagingmgr/test/utest_tornado_app.py
new file mode 100755 (executable)
index 0000000..ec8e105
--- /dev/null
@@ -0,0 +1,168 @@
+#!/usr/bin/env python3
+
+#
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+
+import argparse
+import asyncio
+import logging
+import os
+import sys
+import tornado.testing
+import tornado.web
+import tempfile
+import unittest
+import json
+import xmlrunner
+import urllib.parse
+from requests_toolbelt import MultipartEncoder
+import mock
+import uuid
+import shutil
+import requests
+import filecmp
+import  yaml
+import time
+import shutil
+from rift.rwlib.util import certs
+
+from rift.package.handler import FileRestApiHandler
+from rift.tasklets.rwstagingmgr.server.app import StagingApplication, CleanUpStaging
+from rift.tasklets.rwstagingmgr.model import StagingArea
+
+import gi
+gi.require_version('RwStagingMgmtYang', '1.0')
+from gi.repository import (
+        RwStagingMgmtYang,
+        )
+
+
+class TestCase(tornado.testing.AsyncHTTPTestCase):
+    def setUp(self):
+        self._log = logging.getLogger(__file__)
+        self._loop = asyncio.get_event_loop()
+
+        super().setUp()
+        self._port = self.get_http_port()
+
+    def get_new_ioloop(self):
+        return tornado.platform.asyncio.AsyncIOMainLoop()
+
+    def create_mock_store(self):
+        self.staging_dir_tmp = tempfile.mkdtemp()
+        self.staging_id = str(uuid.uuid4())
+        self.staging_dir = os.path.join(self.staging_dir_tmp, self.staging_id)
+        os.makedirs(self.staging_dir)
+        mock_model = RwStagingMgmtYang.StagingArea.from_dict({
+            'path': self.staging_dir,
+            "validity_time": int(time.time()) + 5
+            })
+
+        with open(os.path.join(self.staging_dir, "meta.yaml"), "w") as fh:
+            yaml.dump(mock_model.as_dict(), fh, default_flow_style=True)
+
+        mock_model = StagingArea(mock_model)
+        store = mock.MagicMock()
+        store.get_staging_area.return_value = mock_model
+        store.root_dir = self.staging_dir_tmp
+        store.tmp_dir = self.staging_dir_tmp
+        store.META_YAML = "meta.yaml"
+        store.remove_staging_area = mock.Mock(return_value=None)
+
+        return store, mock_model
+
+    def create_tmp_file(self):
+        _, self.temp_file = tempfile.mkstemp()
+        with open(self.temp_file, "w") as fh:
+            fh.write("Lorem Ipsum")
+
+        return self.temp_file
+
+
+    def get_app(self):
+        self.store, self.mock_model = self.create_mock_store()
+        return StagingApplication(self.store, cleanup_interval=5)
+
+    def test_file_upload_and_download(self):
+        """
+
+        Asserts:
+            1. The file upload
+            2. the response of the file upload
+            3. Finally downloads the file and verifies if the uploaded and download
+               files are the same.
+            4. Verify if the directory is cleaned up after expiry
+        """
+        temp_file = self.create_tmp_file()
+        form = MultipartEncoder(fields={
+            'file': (os.path.basename(temp_file), open(temp_file, 'rb'), 'application/octet-stream')})
+
+        # Upload
+        response = self.fetch("/api/upload/{}".format(self.staging_id),
+                              method="POST",
+                              body=form.to_string(),
+                              headers={"Content-Type": "multipart/form-data"})
+
+        assert response.code == 200
+        assert os.path.isfile(os.path.join(
+                                    self.staging_dir,
+                                    os.path.basename(temp_file)))
+        assert self.staging_id in response.body.decode("utf-8")
+
+        response = response.body.decode("utf-8")
+        response = json.loads(response)
+
+        # Download
+        _, downloaded_file = tempfile.mkstemp()
+        response = self.fetch(response['path'])
+
+        with open(downloaded_file, 'wb') as fh:
+                fh.write(response.body)
+
+        assert filecmp.cmp(temp_file, downloaded_file)
+
+        print (self.get_url('/'))
+        print (self.staging_dir)
+        time.sleep(5)
+        self.store.remove_staging_area.assert_called_once_with(self.mock_model)
+
+    def tearDown(self):
+        shutil.rmtree(self.staging_dir_tmp)
+
+
+def main(argv=sys.argv[1:]):
+    logging.basicConfig(format='TEST %(message)s')
+
+    runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
+    parser = argparse.ArgumentParser()
+    parser.add_argument('-v', '--verbose', action='store_true')
+    parser.add_argument('-n', '--no-runner', action='store_true')
+
+    args, unknown = parser.parse_known_args(argv)
+    if args.no_runner:
+        runner = None
+
+    # Set the global logging level
+    logging.getLogger().setLevel(logging.DEBUG if args.verbose else logging.ERROR)
+
+    # The unittest framework requires a program name, so use the name of this
+    # file instead (we do not want to have to pass a fake program name to main
+    # when this is called from the interpreter).
+    unittest.main(argv=[__file__] + unknown + ["-v"], testRunner=runner)
+
+if __name__ == '__main__':
+    main()
\ No newline at end of file
index aa5846a..43e87e1 100644 (file)
@@ -25,6 +25,8 @@ set(source_yang_files
   rw-vnfm.yang
   rw-vns.yang
   rw-image-mgmt.yang
+  rw-pkg-mgmt.yang
+  rw-staging-mgmt.yang
   )
 
 ##
index 454aec1..0adaee9 100644 (file)
@@ -1,6 +1,6 @@
 
 /*
- * 
+ *
  *   Copyright 2016 RIFT.IO Inc
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
@@ -92,6 +92,15 @@ module rw-launchpad
   import rw-image-mgmt {
     prefix "rw-image-mgmt";
   }
+  
+  import rw-pkg-mgmt {
+    prefix "rw-pkg-mgmt";
+  }
+  
+
+  import mano-types {
+    prefix "manotypes";
+  }
 
   revision 2015-09-14 {
     description
diff --git a/rwlaunchpad/plugins/yang/rw-pkg-mgmt.tailf.yang b/rwlaunchpad/plugins/yang/rw-pkg-mgmt.tailf.yang
new file mode 100644 (file)
index 0000000..13136c9
--- /dev/null
@@ -0,0 +1,65 @@
+
+/*
+ *
+ *   Copyright 2016 RIFT.IO Inc
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ *
+ */
+
+module rw-pkg-mgmt-annotation
+{
+  namespace "http://riftio.com/ns/riftware-1.0/rw-pkg-mgmt-annotation";
+  prefix "rw-pkg-mgmt-ann";
+
+  import rw-pkg-mgmt {
+    prefix rw-pkg-mgmt;
+  }
+
+  import tailf-common {
+    prefix tailf;
+  }
+
+  tailf:annotate "/rw-pkg-mgmt:download-jobs" {
+    tailf:callpoint rw_callpoint;
+  }
+
+  tailf:annotate "/rw-pkg-mgmt:get-package-endpoint" {
+     tailf:actionpoint rw_actionpoint;
+  }
+
+  tailf:annotate "/rw-pkg-mgmt:get-package-schema" {
+     tailf:actionpoint rw_actionpoint;
+  }
+
+  tailf:annotate "/rw-pkg-mgmt:package-file-add" {
+     tailf:actionpoint rw_actionpoint;
+  }
+
+  tailf:annotate "/rw-pkg-mgmt:package-file-delete" {
+     tailf:actionpoint rw_actionpoint;
+  }
+
+  tailf:annotate "/rw-pkg-mgmt:package-create" {
+     tailf:actionpoint rw_actionpoint;
+  }
+
+  tailf:annotate "/rw-pkg-mgmt:package-update" {
+     tailf:actionpoint rw_actionpoint;
+  }
+
+  tailf:annotate "/rw-pkg-mgmt:package-export" {
+     tailf:actionpoint rw_actionpoint;
+  }
+}
diff --git a/rwlaunchpad/plugins/yang/rw-pkg-mgmt.yang b/rwlaunchpad/plugins/yang/rw-pkg-mgmt.yang
new file mode 100644 (file)
index 0000000..8370c1b
--- /dev/null
@@ -0,0 +1,343 @@
+/*
+ *
+ *   Copyright 2016 RIFT.IO Inc
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ *
+ */
+
+/**
+ * @file rw-pkg-mgmt.yang
+ * @author Varun Prasad
+ * @date 2016/09/21
+ * @brief Pacakage Management Yang
+ */
+
+module rw-pkg-mgmt
+{
+  namespace "http://riftio.com/ns/riftware-1.0/rw-pkg-mgmt";
+  prefix "rw-pkg-mgmt";
+
+  import ietf-yang-types {
+    prefix "yang";
+  }
+
+  import rw-pb-ext {
+    prefix "rwpb";
+  }
+
+  import rw-cli-ext {
+    prefix "rwcli";
+  }
+
+  import rw-cloud {
+    prefix "rwcloud";
+  }
+
+  import rwcal {
+    prefix "rwcal";
+  }
+
+  import mano-types {
+    prefix "manotypes";
+  }
+
+  revision 2016-06-01 {
+    description
+      "Initial revision.";
+  }
+
+  typedef task-status {
+    type enumeration {
+      enum QUEUED;
+      enum IN_PROGRESS;
+      enum DOWNLOADING;
+      enum CANCELLED;
+      enum COMPLETED;
+      enum FAILED;
+    }
+  }
+
+  typedef export-schema {
+    type enumeration {
+      enum RIFT;
+      enum MANO;
+    }
+  }
+
+  typedef export-grammar {
+    type enumeration {
+      enum OSM;
+      enum TOSCA;
+    }
+  }
+
+  typedef export-format {
+    type enumeration {
+      enum YAML;
+      enum JSON;
+    }
+  }
+
+  grouping external-url-data {
+    leaf external-url {
+      description "Url to download";
+      type string;
+    }
+
+    leaf username {
+      description "username if the url uses authentication";
+      type string;
+    }
+
+    leaf password {
+      description "password if the url uses authentication";
+      type string;
+    }
+  }
+
+  grouping package-identifer {
+    leaf package-type {
+      description "Type of the package";
+      type manotypes:package-type;
+    }
+
+    leaf package-id {
+      description "Id of the package";
+      type string;
+    }
+  }
+
+  grouping package-file-identifer {
+    uses package-identifer;
+
+    leaf package-path {
+      description "Relative path in the package";
+      type string;
+    }
+  }
+
+  grouping download-task-status {
+    leaf status {
+      description "The status of the download task";
+      type task-status;
+      default QUEUED;
+    }
+
+    leaf detail {
+      description "Detailed download status message";
+      type string;
+    }
+
+    leaf progress-percent {
+      description "The download progress percentage (0-100)";
+      type uint8;
+      default 0;
+    }
+
+    leaf bytes_downloaded {
+      description "The number of bytes downloaded";
+      type uint64;
+      default 0;
+    }
+
+    leaf bytes_total {
+      description "The total number of bytes to write";
+      type uint64;
+      default 0;
+    }
+
+    leaf bytes_per_second {
+      description "The total number of bytes written per second";
+      type uint32;
+      default 0;
+    }
+
+    leaf start-time {
+      description "start time (unix epoch)";
+      type uint32;
+    }
+
+    leaf stop-time {
+      description "stop time (unix epoch)";
+      type uint32;
+    }
+  }
+
+  container download-jobs {
+    rwpb:msg-new DownloadJobs;
+    description "Download jobs";
+    config false;
+
+    list job {
+      rwpb:msg-new DownloadJob;
+      key "download-id";
+
+      leaf download-id {
+        description "Unique UUID";
+        type string;
+      }
+
+      leaf url {
+        description "URL of the download";
+        type string;
+      }
+
+      uses package-file-identifer;
+      uses download-task-status;
+    }
+  }
+
+  rpc get-package-endpoint {
+    description "Retrieves the endpoint for the descriptor";
+
+    input {
+      uses package-identifer;
+    }
+
+    output {
+     leaf endpoint {
+        description "Endpoint that contains all the package-related data";
+        type string;
+      }
+    }
+  }
+
+  rpc get-package-schema {
+    description "Retrieves the schema for the package type";
+
+    input {
+      leaf package-type {
+        description "Type of the package";
+        type manotypes:package-type;
+      }
+    }
+
+    output {
+      leaf-list schema {
+        description "List of all top level directories for the package.";
+        type string;
+      }
+    }
+  }
+
+  rpc package-create {
+    description "Creates a new package";
+
+    input {
+      uses package-identifer;
+      uses external-url-data;
+    }
+
+    output {
+     leaf transaction-id {
+        description "Valid ID to track the status of the task";
+        type string;
+      }
+    }
+  }
+
+  rpc package-update {
+    description "Creates a new package";
+
+    input {
+      uses package-identifer;
+      uses external-url-data;
+    }
+
+    output {
+     leaf transaction-id {
+        description "Valid ID to track the status of the task";
+        type string;
+      }
+    }
+  }
+
+  rpc package-export {
+    description "Export a package";
+
+    input {
+      uses package-identifer;
+
+      leaf export-schema {
+        description "Schema to export";
+        type export-schema;
+        default RIFT;
+      }
+
+      leaf export-grammar {
+        description "Schema to export";
+        type export-grammar;
+        default OSM;
+      }
+
+      leaf export-format {
+        description "Format to export";
+        type export-format;
+        default YAML;
+      }
+
+    }
+
+    output {
+     leaf transaction-id {
+        description "Valid ID to track the status of the task";
+        type string;
+      }
+
+     leaf filename {
+        description "Valid ID to track the status of the task";
+        type string;
+      }
+    }
+  }
+
+  rpc package-file-add {
+    description "Retrieves the file from the URL and store it in the package";
+
+    input {
+      uses package-file-identifer;
+      uses external-url-data;
+    }
+
+    output {
+     leaf task-id {
+        description "Valid ID to track the status of the task";
+        type string;
+      }
+    }
+  }
+
+  rpc package-file-delete {
+    description "Retrieves the file from the URL and store it in the package";
+
+    input {
+      uses package-file-identifer;
+    }
+
+    output {
+     leaf status {
+        description "Status of the delte operation";
+        type string;
+      }
+
+      leaf error-trace {
+        description "Trace in case of a failure";
+        type string;
+      }
+
+    }
+  }
+
+}
\ No newline at end of file
diff --git a/rwlaunchpad/plugins/yang/rw-staging-mgmt.tailf.yang b/rwlaunchpad/plugins/yang/rw-staging-mgmt.tailf.yang
new file mode 100644 (file)
index 0000000..9b35ff4
--- /dev/null
@@ -0,0 +1,42 @@
+
+/*
+ *
+ *   Copyright 2016 RIFT.IO Inc
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ *
+ */
+
+module rw-staging-mgmt-annotation
+{
+  namespace "http://riftio.com/ns/riftware-1.0/rw-staging-mgmt-annotation";
+  prefix "rw-staging-mgmt-ann";
+
+  import rw-staging-mgmt {
+    prefix rw-staging-mgmt;
+  }
+
+  import tailf-common {
+    prefix tailf;
+  }
+
+  tailf:annotate "/rw-staging-mgmt:create-staging-area" {
+     tailf:actionpoint rw_actionpoint;
+  }
+
+  tailf:annotate "/rw-staging-mgmt:staging-areas" {
+    tailf:callpoint rw_callpoint;
+  }
+
+}
diff --git a/rwlaunchpad/plugins/yang/rw-staging-mgmt.yang b/rwlaunchpad/plugins/yang/rw-staging-mgmt.yang
new file mode 100644 (file)
index 0000000..d5722cd
--- /dev/null
@@ -0,0 +1,153 @@
+/*
+ *
+ *   Copyright 2016 RIFT.IO Inc
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ *
+ */
+
+/**
+ * @file rw-staging-mgmt.yang
+ * @author Varun Prasad
+ * @date 2016/09/21
+ * @brief Pacakage Management Yang
+ */
+
+module rw-staging-mgmt
+{
+  namespace "http://riftio.com/ns/riftware-1.0/rw-staging-mgmt";
+  prefix "rw-staging-mgmt";
+
+  import ietf-yang-types {
+    prefix "yang";
+  }
+
+  import rw-pb-ext {
+    prefix "rwpb";
+  }
+
+  import rw-cli-ext {
+    prefix "rwcli";
+  }
+
+  import rw-cloud {
+    prefix "rwcloud";
+  }
+
+  import rwcal {
+    prefix "rwcal";
+  }
+
+  import mano-types {
+    prefix "manotypes";
+  }
+
+  revision 2016-06-01 {
+    description
+      "Initial revision.";
+  }
+
+  typedef staging-area-status {
+    type enumeration {
+      enum LIVE;
+      enum EXPIRED;
+    }
+  }
+
+  grouping staging-area-config {
+    leaf package-type {
+      mandatory true;
+      type manotypes:package-type;
+    }
+
+    leaf name {
+      description "name of the binary";
+      type string;
+    }
+
+    leaf validity-time {
+      description "lifetime of the staging area (in seconds)";
+      type uint64;
+      default 3600;
+    }
+  }
+
+  grouping staging-area-meta {
+
+    leaf status {
+      description "The status of the staging area";
+      type staging-area-status;
+    }
+
+    leaf created-time {
+      description "start time (unix epoch)";
+      type uint32;
+    }
+
+    leaf deleted-time {
+      description "stop time (unix epoch)";
+      type uint32;
+    }
+
+    leaf size {
+      description "size of the binary in bytes";
+      type uint64;
+    }
+
+    leaf path {
+      description "Path of the staging area";
+      type string;
+    }
+
+  }
+
+  container staging-areas {
+    rwpb:msg-new StagingAreas;
+    description "Staging Areas";
+    config false;
+
+    list staging-area {
+      rwpb:msg-new StagingArea;
+      key "area-id";
+
+      leaf area-id {
+        description "Staging Area ID";
+        type string;
+      }
+
+      uses staging-area-config;
+      uses staging-area-meta;
+    }
+  }
+
+
+  rpc create-staging-area {
+    description "Creates a staging area for the upload.";
+
+    input {
+      uses staging-area-config;
+    }
+
+    output {
+      leaf port {
+        type uint32;
+      }
+
+      leaf endpoint {
+        description "Endpoint that contains all the package-related data";
+        type string;
+      }
+    }
+  }
+}
\ No newline at end of file
index 62940eb..f46ea65 100644 (file)
@@ -3,7 +3,7 @@
   "commandline":"./pingpong_records_systest --test-name 'TC_PINGPONG_RECORDS_OPENSTACK' --cloud-type 'openstack' --cloud-host={cloud_host} --sysinfo  --user={user} {tenants} --netconf --restconf",
   "test_description":"System test for ping and pong vnf (Openstack)",
   "run_as_root": true,
-  "status":"broken",
+  "status":"working",
   "keywords":["nightly","smoke","MANO","openstack"],
   "timelimit": 2600,
   "networks":[],
index 76b7c66..65fc464 100644 (file)
@@ -3,7 +3,7 @@
   "commandline":"./pingpong_records_systest  --test-name 'TC_PINGPONG_RECORDS_OPENSTACK_XML' --cloud-type 'openstack' --sysinfo --use-xml-mode --cloud-host={cloud_host} --user={user} {tenants} --restconf",
   "test_description":"System test for ping and pong vnf (Openstack)",
   "run_as_root": true,
-  "status":"broken",
+  "status":"working",
   "keywords":["nightly","smoke","MANO","openstack"],
   "timelimit": 2600,
   "networks":[],
index ce44c75..0423d26 100644 (file)
@@ -3,7 +3,7 @@
   "commandline":"./pingpong_vnf_reload_systest  --test-name 'TC_PINGPONG_VNF_RELOAD_OPENSTACK_XML' --cloud-type 'openstack' --sysinfo --use-xml-mode --cloud-host={cloud_host} --user={user} {tenants} --restconf",
   "test_description":"System test for ping pong vnf reload(Openstack)",
   "run_as_root": false,
-  "status":"broken",
+  "status":"working",
   "keywords":["nightly","smoke","MANO","openstack"],
   "timelimit": 2200,
   "networks":[],
index 324b94b..c9e471e 100755 (executable)
@@ -282,6 +282,32 @@ class AutoscalerTasklet(rift.vcs.core.Tasklet):
     plugin_directory = ClassProperty('./usr/lib/rift/plugins/rwautoscaler')
     plugin_name = ClassProperty('rwautoscaler')
 
+class StagingManagerTasklet(rift.vcs.core.Tasklet):
+    """
+    A class that provide a simple staging area for all tasklets
+    """
+
+    def __init__(self, name='StagingManager', uid=None,
+                 config_ready=True,
+                 recovery_action=core.RecoveryType.FAILCRITICAL.value,
+                 data_storetype=core.DataStore.NOSTORE.value,
+                 ):
+        """
+        Creates a StagingMangerTasklet object.
+
+        Arguments:
+            name  - the name of the tasklet
+            uid   - a unique identifier
+
+        """
+        super(StagingManagerTasklet, self).__init__(name=name, uid=uid,
+                                             config_ready=config_ready,
+                                             recovery_action=recovery_action,
+                                             data_storetype=data_storetype,
+                                            )
+
+    plugin_directory = ClassProperty('./usr/lib/rift/plugins/rwstagingmgr')
+    plugin_name = ClassProperty('rwstagingmgr')
 
 def get_ui_ssl_args():
     """Returns the SSL parameter string for launchpad UI processes"""
@@ -343,6 +369,32 @@ class ConfigManagerTasklet(rift.vcs.core.Tasklet):
     plugin_directory = ClassProperty('./usr/lib/rift/plugins/rwconmantasklet')
     plugin_name = ClassProperty('rwconmantasklet')
 
+class PackageManagerTasklet(rift.vcs.core.Tasklet):
+    """
+    This class represents a Resource Manager tasklet.
+    """
+
+    def __init__(self, name='Package-Manager', uid=None,
+                 config_ready=True,
+                 recovery_action=core.RecoveryType.FAILCRITICAL.value,
+                 data_storetype=core.DataStore.NOSTORE.value,
+                 ):
+        """
+        Creates a PackageManager object.
+
+        Arguments:
+            name  - the name of the tasklet
+            uid   - a unique identifier
+        """
+        super(PackageManagerTasklet, self).__init__(name=name, uid=uid,
+                                                   config_ready=config_ready,
+                                                   recovery_action=recovery_action,
+                                                   data_storetype=data_storetype,
+                                                  )
+
+    plugin_directory = ClassProperty('./usr/lib/rift/plugins/rwpkgmgr')
+    plugin_name = ClassProperty('rwpkgmgr')
+
 class GlanceServer(rift.vcs.NativeProcess):
     def __init__(self, name="glance-image-catalog",
                  config_ready=True,
@@ -399,6 +451,8 @@ class Demo(rift.vcs.demo.Demo):
               ResMgrTasklet(recovery_action=core.RecoveryType.RESTART.value, data_storetype=datastore),
               ImageMgrTasklet(recovery_action=core.RecoveryType.RESTART.value, data_storetype=datastore),
               AutoscalerTasklet(recovery_action=core.RecoveryType.RESTART.value, data_storetype=datastore),
+              PackageManagerTasklet(recovery_action=core.RecoveryType.RESTART.value, data_storetype=datastore),
+              StagingManagerTasklet(recovery_action=core.RecoveryType.RESTART.value, data_storetype=datastore),
             ]
 
         if not mgmt_ip_list or len(mgmt_ip_list) == 0:
@@ -470,7 +524,6 @@ def main(argv=sys.argv[1:]):
     # since it doesn't need it and it will fail within containers
     os.environ["NO_KERNEL_MODS"] = "1"
 
-
     cleanup_dir_name = None
     if os.environ["INSTALLDIR"] in ["/", "/home/rift", "/home/rift/.install",
         "/usr/rift/build/fc20_debug/install/usr/rift", "/usr/rift"]:
@@ -495,8 +548,8 @@ def main(argv=sys.argv[1:]):
         for f in os.listdir(cleanup_dir_name):
             if f.endswith(".aof") or f.endswith(".rdb"):
                 os.remove(os.path.join(cleanup_dir_name, f))
-
-        # Remove the persistant DTS recovery files
+    
+        # Remove the persistant DTS recovery files 
         for f in os.listdir(cleanup_dir_name):
             if f.endswith(".db"):
                 os.remove(os.path.join(cleanup_dir_name, f))