RIFT 15576 Support for copying descriptors with assets, with new rpc and yang data...
authorsinhan <nandan.sinha@riftio.com>
Fri, 31 Mar 2017 19:08:59 +0000 (19:08 +0000)
committersinhan <nandan.sinha@riftio.com>
Fri, 31 Mar 2017 19:10:05 +0000 (19:10 +0000)
Signed-off-by: sinhan <nandan.sinha@riftio.com>
rwlaunchpad/plugins/rwlaunchpadtasklet/rift/package/package.py
rwlaunchpad/plugins/rwpkgmgr/CMakeLists.txt
rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/downloader/__init__.py
rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/downloader/copy.py [new file with mode: 0644]
rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/publisher/__init__.py
rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/publisher/copy_status.py [new file with mode: 0644]
rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/rpc.py
rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/rwpkgmgr.py
rwlaunchpad/plugins/yang/rw-pkg-mgmt.yang

index 3ce3500..a3b1840 100644 (file)
@@ -161,6 +161,16 @@ class DescriptorPackage(object):
 
         return self.descriptor_msg.id
 
+    @property
+    def descriptor_name(self):
+        """  The descriptor name of this descriptor in the system """
+        if not self.descriptor_msg.has_field("name"):
+            msg = "Descriptor name not present"
+            self._log.error(msg)
+            raise PackageError(msg)
+
+        return self.descriptor_msg.name
+
     @classmethod
     def get_descriptor_patterns(cls):
         """ Returns a tuple of descriptor regex and Package Types  """
index 4be45d0..eca40c2 100644 (file)
@@ -37,14 +37,16 @@ rift_python_install_tree(
     rift/tasklets/${TASKLET_NAME}/rpc.py
     rift/tasklets/${TASKLET_NAME}/downloader/__init__.py
     rift/tasklets/${TASKLET_NAME}/downloader/url.py
+    rift/tasklets/${TASKLET_NAME}/downloader/copy.py
     rift/tasklets/${TASKLET_NAME}/proxy/__init__.py
     rift/tasklets/${TASKLET_NAME}/proxy/base.py
     rift/tasklets/${TASKLET_NAME}/proxy/filesystem.py
     rift/tasklets/${TASKLET_NAME}/publisher/__init__.py
     rift/tasklets/${TASKLET_NAME}/publisher/download_status.py
+    rift/tasklets/${TASKLET_NAME}/publisher/copy_status.py
     rift/tasklets/${TASKLET_NAME}/subscriber/__init__.py
     rift/tasklets/${TASKLET_NAME}/subscriber/download_status.py
   COMPONENT ${PKG_LONG_NAME}
   PYTHON3_ONLY)
 
-rift_add_subdirs(test)
\ No newline at end of file
+rift_add_subdirs(test)
diff --git a/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/downloader/copy.py b/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/downloader/copy.py
new file mode 100644 (file)
index 0000000..fa3dd3e
--- /dev/null
@@ -0,0 +1,199 @@
+# 
+#   Copyright 2017 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+#   Author(s): Nandan Sinha
+#
+
+import os
+import uuid
+import shutil 
+import enum
+
+import gi
+gi.require_version('RwVnfdYang', '1.0')
+gi.require_version('RwNsdYang', '1.0')
+from gi.repository import (
+    RwYang, 
+    NsdYang,
+    VnfdYang,
+    RwVnfdYang,
+    RwNsdYang,
+    RwPkgMgmtYang
+)
+
+class PackageCopyError(Exception): 
+    pass
+
+class CopyStatus(enum.Enum):
+    UNINITIATED = 0
+    STARTED = 1
+    IN_PROGRESS = 2
+    COMPLETED = 3
+    FAILED = 4
+    CANCELLED = 5
+
+TaskStatus = RwPkgMgmtYang.TaskStatus
+
+class CopyMeta:
+    STATUS_MAP = {
+        CopyStatus.STARTED: TaskStatus.QUEUED.value_nick.upper(),
+        CopyStatus.UNINITIATED: TaskStatus.QUEUED.value_nick.upper(),
+        CopyStatus.IN_PROGRESS: TaskStatus.IN_PROGRESS.value_nick.upper(),
+        CopyStatus.COMPLETED: TaskStatus.COMPLETED.value_nick.upper(),
+        CopyStatus.FAILED: TaskStatus.FAILED.value_nick.upper(),
+        CopyStatus.CANCELLED: TaskStatus.CANCELLED.value_nick.upper()
+        }
+
+    def __init__(self, transaction_id):
+        self.transaction_id = transaction_id
+        self.state = CopyStatus.UNINITIATED
+
+    def set_state(self, state):
+        self.state = state
+
+    def as_dict(self): 
+        return self.__dict__
+
+    def to_yang(self):
+        job = RwPkgMgmtYang.CopyJob.from_dict({
+            "transaction_id": self.transaction_id, 
+            "status": CopyMeta.STATUS_MAP[self.state]
+            })
+        return job
+
+class PackageFileCopier:
+    DESCRIPTOR_MAP = {
+            "vnfd": (RwVnfdYang.YangData_Vnfd_VnfdCatalog_Vnfd, 'vnfd rw-vnfd'), 
+            "nsd" : (RwNsdYang.YangData_Nsd_NsdCatalog_Nsd, 'nsd rw-nsd') 
+            }
+
+    @classmethod
+    def from_rpc_input(cls, rpc_input, proxy, log=None): 
+        return cls(
+                rpc_input.package_id,
+                rpc_input.package_type, 
+                rpc_input.package_name,
+                proxy = proxy,
+                log=log)
+
+    def __init__(self, 
+            pkg_id, 
+            pkg_type, 
+            pkg_name, 
+            proxy, 
+            log):
+        self.src_package_id = pkg_id
+        self.package_type = pkg_type.lower()
+        self.dest_package_name = pkg_name
+        self.dest_package_id = str(uuid.uuid4())
+        self.transaction_id = str(uuid.uuid4())
+        self.proxy = proxy
+        self.log = log
+        self.meta = CopyMeta(self.transaction_id)
+        self.src_package = None
+        self.dest_desc_msg = None
+
+    # Start of delegate calls
+    def call_delegate(self, event):
+        if not self.delegate:
+            return
+        
+        # Send out the descriptor message to be posted on success
+        # Otherwise send out the CopyJob yang conversion from meta object.
+        if event == "on_download_succeeded":
+            getattr(self.delegate, event)(self.dest_desc_msg)
+        else:
+            getattr(self.delegate, event)(self.meta.to_yang())
+
+    def _copy_tree(self):
+        """
+        Locate directory tree of the source descriptor folder. 
+        Copy directory tree to destination descriptor folder.  
+
+        """
+        store = self.proxy._get_store(self.package_type)
+        src_path = store._get_package_dir(self.src_package_id)
+        self.src_package = store.get_package(self.src_package_id) 
+        src_desc_name = self.src_package.descriptor_name
+        src_copy_path = os.path.join(src_path, src_desc_name)
+
+        self.dest_copy_path = os.path.join(store.DEFAULT_ROOT_DIR, 
+                self.dest_package_id, 
+                self.dest_package_name)
+        self.log.debug("Copying contents from {src} to {dest}".
+                format(src=src_copy_path, dest=self.dest_copy_path))
+
+        shutil.copytree(src_copy_path, self.dest_copy_path)
+
+    def _create_descriptor_file(self):
+        """ Update descriptor file for the newly copied descriptor catalog.
+        Use the existing descriptor file to create a descriptor proto gi object,
+        change some identifiers, and create a new descriptor yaml file from it.
+
+        """
+        src_desc_file = self.src_package.descriptor_file
+        src_desc_contents = self.src_package.descriptor_msg.as_dict()
+        src_desc_contents.update(
+                id =self.dest_package_id, 
+                name = self.dest_package_name,
+                short_name = self.dest_package_name
+                )
+
+        desc_cls, modules = PackageFileCopier.DESCRIPTOR_MAP[self.package_type]
+        self.dest_desc_msg = desc_cls.from_dict(src_desc_contents)
+        dest_desc_path = os.path.join(self.dest_copy_path, 
+                "{pkg_name}_{pkg_type}.yaml".format(pkg_name=self.dest_package_name, pkg_type=self.package_type))
+        model = RwYang.Model.create_libncx()
+        for module in modules.split():
+            model.load_module(module) 
+
+        with open(dest_desc_path, "w") as fh:
+            fh.write(self.dest_desc_msg.to_yaml(model))
+
+        copied_desc_file = os.path.join(self.dest_copy_path, os.path.basename(src_desc_file))
+        if os.path.exists(copied_desc_file):
+            self.log.debug("Deleting copied yaml from old source %s" % (copied_desc_file))
+            os.remove(copied_desc_file)
+
+    def copy(self):
+        try:
+            if self.package_type not in PackageFileCopier.DESCRIPTOR_MAP: 
+                raise PackageCopyError("Package type {} not currently supported for copy operations".format(self.package_type))
+
+            self._copy_tree()
+            self._create_descriptor_file()
+            self.copy_succeeded()
+
+        except Exception as e: 
+            self.log.exception(str(e))
+            self.copy_failed()
+
+        self.copy_finished()
+
+    def copy_failed(self):
+        self.meta.set_state(CopyStatus.FAILED)
+        self.call_delegate("on_download_failed")
+
+    def copy_progress(self): 
+        self.meta.set_state(CopyStatus.IN_PROGRESS)
+        self.call_delegate("on_download_progress")
+
+    def copy_succeeded(self):
+        self.meta.set_state(CopyStatus.COMPLETED)
+        self.call_delegate("on_download_succeeded")
+
+    def copy_finished(self): 
+        self.call_delegate("on_download_finished") 
+
diff --git a/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/publisher/copy_status.py b/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/publisher/copy_status.py
new file mode 100644 (file)
index 0000000..927331c
--- /dev/null
@@ -0,0 +1,124 @@
+# 
+#   Copyright 2017 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+#   Author(s): Nandan Sinha
+#
+
+import sys
+import asyncio
+import uuid
+import abc
+import functools 
+from concurrent.futures import Future
+
+from gi.repository import (RwDts as rwdts)
+import rift.mano.dts as mano_dts
+import rift.downloader as url_downloader
+import rift.tasklets.rwlaunchpad.onboard as onboard 
+
+if sys.version_info < (3, 4, 4): 
+    asyncio.ensure_future = asyncio.async
+
+
+class CopyStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProtocol): 
+
+    def __init__(self, log, dts, loop, tasklet_info):
+        super().__init__(log, dts, loop) 
+        self.tasks = {} 
+        self.tasklet_info = tasklet_info
+
+    def xpath(self, transaction_id=None):
+        return ("D,/rw-pkg-mgmt:copy-jobs/rw-pkg-mgmt:job" +
+            ("[transaction-id='{}']".format(transaction_id) if transaction_id else ""))
+        pass
+    
+    @asyncio.coroutine
+    def register(self):
+        self.reg = yield from self.dts.register(xpath=self.xpath(),
+                  flags=rwdts.Flag.PUBLISHER|rwdts.Flag.CACHE|rwdts.Flag.NO_PREP_READ)
+
+        assert self.reg is not None
+
+    @asyncio.coroutine
+    def register_copier(self, copier):
+        copier.delegate = self
+        future = self.loop.run_in_executor(None, copier.copy)
+        self.tasks[copier.transaction_id] = (copier, future)
+
+        return (copier.transaction_id, copier.dest_package_id)
+
+    @asyncio.coroutine
+    def _dts_publisher(self, job_msg): 
+        # Publish the download state 
+        self.reg.update_element(
+                self.xpath(transaction_id=job_msg.transaction_id), job_msg) 
+
+    @staticmethod
+    def _async_add(func, fut): 
+        try: 
+            ret = func()
+            fut.set_result(ret) 
+        except Exception as e: 
+            fut.set_exception(e) 
+
+    def _schedule_dts_work(self, job_msg): 
+        f = functools.partial( 
+                asyncio.ensure_future, 
+                self._dts_publisher(job_msg), 
+                loop = self.loop)
+        fut = Future()
+        self.loop.call_soon_threadsafe(CopyStatusPublisher._async_add, f, fut) 
+        xx = fut.result()
+        if fut.exception() is not None:
+            self.log.error("Caught future exception during download: %s type %s", str(fut.exception()), type(fut.exception()))
+            raise fut.exception()
+        return xx
+
+    def on_download_progress(self, job_msg):
+        """callback that triggers update.
+        """
+        return self._schedule_dts_work(job_msg) 
+
+    def on_download_finished(self, job_msg):
+        """callback that triggers update.
+        """
+        # clean up the local cache
+        key = job_msg.transaction_id
+        if key in self.tasks:
+            del self.tasks[key]
+
+        return self._schedule_dts_work(job_msg)
+
+    def on_download_succeeded(self, job_msg): 
+        """Post the catalog descriptor object to the http endpoint.
+        Argument: job_msg (proto-gi descriptor_msg of the copied descriptor)
+
+        """
+        manifest = self.tasklet_info.get_pb_manifest()
+        use_ssl = manifest.bootstrap_phase.rwsecurity.use_ssl
+        ssl_cert, ssl_key = None, None 
+        if use_ssl:
+            ssl_cert = manifest.bootstrap_phase.rwsecurity.cert
+            ssl_key = manifest.bootstrap_phase.rwsecurity.key
+
+        onboarder = onboard.DescriptorOnboarder(self.log, 
+                "127.0.0.1", 8008, use_ssl, ssl_cert, ssl_key)
+        try:
+            onboarder.onboard(job_msg)
+        except onboard.OnboardError as e: 
+            self.log.error("Onboard exception triggered while posting copied catalog descriptor %s", e)
+            raise 
+
+
index f55a8fd..a71f108 100644 (file)
@@ -34,6 +34,7 @@ RPC_PKG_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_GetPackageEndpoint
 RPC_SCHEMA_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_GetPackageSchema
 RPC_PACKAGE_ADD_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_PackageFileAdd
 RPC_PACKAGE_DELETE_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_PackageFileDelete
+RPC_PACKAGE_COPY_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_PackageCopy
 
 
 class EndpointDiscoveryRpcHandler(mano_dts.AbstractRpcHandler):
@@ -144,6 +145,33 @@ class PackageOperationsRpcHandler(mano_dts.AbstractRpcHandler):
 
         return rpc_op
 
+class PackageCopyOperationsRpcHandler(mano_dts.AbstractRpcHandler):
+    def __init__(self, log, dts, loop, proxy, publisher):
+        """
+        Args:
+            proxy: Any impl of .proxy.AbstractPackageManagerProxy
+            publisher: CopyStatusPublisher object
+        """
+        super().__init__(log, dts, loop)
+        self.proxy = proxy
+        self.publisher = publisher
+
+    @property
+    def xpath(self):
+        return "/rw-pkg-mgmt:package-copy"
+
+    @asyncio.coroutine
+    def callback(self, ks_path, msg):
+        import uuid 
+        copier = pkg_downloader.PackageFileCopier.from_rpc_input(msg, proxy=self.proxy, log=self.log)
+
+        transaction_id, dest_package_id = yield from self.publisher.register_copier(copier)
+        rpc_op = RPC_PACKAGE_COPY_ENDPOINT.from_dict({
+            "transaction_id":transaction_id,
+            "package_id":dest_package_id, 
+            "package_type":msg.package_type})
+
+        return rpc_op
 
 class PackageDeleteOperationsRpcHandler(mano_dts.AbstractRpcHandler):
     def __init__(self, log, dts, loop, proxy):
index b302b46..5773b0e 100644 (file)
@@ -50,33 +50,42 @@ class PackageManagerTasklet(rift.tasklets.Tasklet):
             self.log.exception(e)
 
     def start(self):
-        super().start()
 
         self.log.debug("Registering with dts")
 
-        self.dts = rift.tasklets.DTS(
+        try:
+            super().start()
+            self.dts = rift.tasklets.DTS(
                 self.tasklet_info,
                 RwPkgMgmtYang.get_schema(),
                 self.loop,
                 self.on_dts_state_change
                 )
+        
+            proxy = filesystem.FileSystemProxy(self.loop, self.log)
+            args = [self.log, self.dts, self.loop]
 
-        proxy = filesystem.FileSystemProxy(self.loop, self.log)
-
-        args = [self.log, self.dts, self.loop]
-        self.job_handler = pkg_publisher.DownloadStatusPublisher(*args)
+        # create catalog publishers 
+            self.job_handler = pkg_publisher.DownloadStatusPublisher(*args)
+            self.copy_publisher = pkg_publisher.CopyStatusPublisher(*args +[self.tasklet_info])
 
         # create catalog subscribers 
-        self.vnfd_catalog_sub = subscriber.VnfdStatusSubscriber(*args)
-        self.nsd_catalog_sub = subscriber.NsdStatusSubscriber(*args)
+            self.vnfd_catalog_sub = subscriber.VnfdStatusSubscriber(*args)
+            self.nsd_catalog_sub = subscriber.NsdStatusSubscriber(*args)
+
+            args.append(proxy)
+            self.endpoint_rpc = rpc.EndpointDiscoveryRpcHandler(*args)
+            self.schema_rpc = rpc.SchemaRpcHandler(*args)
+            self.delete_rpc = rpc.PackageDeleteOperationsRpcHandler(*args)
+            self.copy_rpc = rpc.PackageCopyOperationsRpcHandler(*(args + [self.copy_publisher]))
 
-        args.append(proxy)
-        self.endpoint_rpc = rpc.EndpointDiscoveryRpcHandler(*args)
-        self.schema_rpc = rpc.SchemaRpcHandler(*args)
-        self.delete_rpc = rpc.PackageDeleteOperationsRpcHandler(*args)
+            args.append(self.job_handler)
+            self.pkg_op = rpc.PackageOperationsRpcHandler(*args)
 
-        args.append(self.job_handler)
-        self.pkg_op = rpc.PackageOperationsRpcHandler(*args)
+        except Exception as e:
+            self.log.error("Exception caught rwpkgmgr start: %s", str(e))
+        else:
+            self.log.debug("rwpkgmgr started successfully!")
 
     def stop(self):
         try:
@@ -86,13 +95,18 @@ class PackageManagerTasklet(rift.tasklets.Tasklet):
 
     @asyncio.coroutine
     def init(self):
-        yield from self.endpoint_rpc.register()
-        yield from self.schema_rpc.register()
-        yield from self.pkg_op.register()
-        yield from self.job_handler.register()
-        yield from self.delete_rpc.register()
-        yield from self.vnfd_catalog_sub.register()
-        yield from self.nsd_catalog_sub.register()
+        try:
+            yield from self.endpoint_rpc.register()
+            yield from self.schema_rpc.register()
+            yield from self.pkg_op.register()
+            yield from self.job_handler.register()
+            yield from self.delete_rpc.register()
+            yield from self.copy_rpc.register()
+            yield from self.copy_publisher.register()
+            yield from self.vnfd_catalog_sub.register()
+            yield from self.nsd_catalog_sub.register()
+        except Exception as e:
+            self.log.error("Exception caught rwpkgmgr init %s", str(e))
 
     @asyncio.coroutine
     def run(self):
index de56e55..5fbd621 100644 (file)
@@ -182,6 +182,14 @@ module rw-pkg-mgmt
     }
   }
 
+  grouping copy-task-status {
+    leaf status {
+      description "The status of the copy task";
+      type task-status;
+      default QUEUED;
+    }
+  }
+
   container download-jobs {
     rwpb:msg-new DownloadJobs;
     description "Download jobs";
@@ -206,6 +214,24 @@ module rw-pkg-mgmt
     }
   }
 
+  container copy-jobs {
+    rwpb:msg-new CopyJobs;
+    description "Copy jobs";
+    config false;
+
+    list job {
+      rwpb:msg-new CopyJob;
+      key "transaction-id";
+
+      leaf transaction-id {
+        description "Unique UUID";
+        type string;
+      }
+
+      uses copy-task-status;
+    }
+  }
+
   rpc get-package-endpoint {
     description "Retrieves the endpoint for the descriptor";
 
@@ -221,6 +247,28 @@ module rw-pkg-mgmt
     }
   }
 
+  rpc package-copy {
+    description "Copies the package specified in input and returns the copied package id";
+
+    input {
+      uses package-identifer;
+
+      leaf package-name {
+        description "Name of destination package";
+        type string;
+      }
+    }
+
+    output {
+     leaf transaction-id {
+        description "Valid ID to track the status of the task";
+        type string;
+      }
+
+     uses package-identifer;
+    }
+  }
+
   rpc get-package-schema {
     description "Retrieves the schema for the package type";