From: sinhan Date: Fri, 31 Mar 2017 19:08:59 +0000 (+0000) Subject: RIFT 15576 Support for copying descriptors with assets, with new rpc and yang data... X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FSO.git;a=commitdiff_plain;h=ffd2636644e9e62b42ddcb1dbe2c4a6e77507a70 RIFT 15576 Support for copying descriptors with assets, with new rpc and yang data model. Signed-off-by: sinhan --- diff --git a/rwlaunchpad/plugins/rwlaunchpadtasklet/rift/package/package.py b/rwlaunchpad/plugins/rwlaunchpadtasklet/rift/package/package.py index 3ce3500e..a3b18402 100644 --- a/rwlaunchpad/plugins/rwlaunchpadtasklet/rift/package/package.py +++ b/rwlaunchpad/plugins/rwlaunchpadtasklet/rift/package/package.py @@ -161,6 +161,16 @@ class DescriptorPackage(object): return self.descriptor_msg.id + @property + def descriptor_name(self): + """ The descriptor name of this descriptor in the system """ + if not self.descriptor_msg.has_field("name"): + msg = "Descriptor name not present" + self._log.error(msg) + raise PackageError(msg) + + return self.descriptor_msg.name + @classmethod def get_descriptor_patterns(cls): """ Returns a tuple of descriptor regex and Package Types """ diff --git a/rwlaunchpad/plugins/rwpkgmgr/CMakeLists.txt b/rwlaunchpad/plugins/rwpkgmgr/CMakeLists.txt index 4be45d0f..eca40c24 100644 --- a/rwlaunchpad/plugins/rwpkgmgr/CMakeLists.txt +++ b/rwlaunchpad/plugins/rwpkgmgr/CMakeLists.txt @@ -37,14 +37,16 @@ rift_python_install_tree( rift/tasklets/${TASKLET_NAME}/rpc.py rift/tasklets/${TASKLET_NAME}/downloader/__init__.py rift/tasklets/${TASKLET_NAME}/downloader/url.py + rift/tasklets/${TASKLET_NAME}/downloader/copy.py rift/tasklets/${TASKLET_NAME}/proxy/__init__.py rift/tasklets/${TASKLET_NAME}/proxy/base.py rift/tasklets/${TASKLET_NAME}/proxy/filesystem.py rift/tasklets/${TASKLET_NAME}/publisher/__init__.py rift/tasklets/${TASKLET_NAME}/publisher/download_status.py + rift/tasklets/${TASKLET_NAME}/publisher/copy_status.py rift/tasklets/${TASKLET_NAME}/subscriber/__init__.py rift/tasklets/${TASKLET_NAME}/subscriber/download_status.py COMPONENT ${PKG_LONG_NAME} PYTHON3_ONLY) -rift_add_subdirs(test) \ No newline at end of file +rift_add_subdirs(test) diff --git a/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/downloader/__init__.py b/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/downloader/__init__.py index 224ebdc5..4a9ae6d4 100644 --- a/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/downloader/__init__.py +++ b/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/downloader/__init__.py @@ -14,3 +14,4 @@ # limitations under the License. # from .url import PackageFileDownloader +from .copy import PackageFileCopier diff --git a/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/downloader/copy.py b/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/downloader/copy.py new file mode 100644 index 00000000..fa3dd3e3 --- /dev/null +++ b/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/downloader/copy.py @@ -0,0 +1,199 @@ +# +# Copyright 2017 RIFT.IO Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Author(s): Nandan Sinha +# + +import os +import uuid +import shutil +import enum + +import gi +gi.require_version('RwVnfdYang', '1.0') +gi.require_version('RwNsdYang', '1.0') +from gi.repository import ( + RwYang, + NsdYang, + VnfdYang, + RwVnfdYang, + RwNsdYang, + RwPkgMgmtYang +) + +class PackageCopyError(Exception): + pass + +class CopyStatus(enum.Enum): + UNINITIATED = 0 + STARTED = 1 + IN_PROGRESS = 2 + COMPLETED = 3 + FAILED = 4 + CANCELLED = 5 + +TaskStatus = RwPkgMgmtYang.TaskStatus + +class CopyMeta: + STATUS_MAP = { + CopyStatus.STARTED: TaskStatus.QUEUED.value_nick.upper(), + CopyStatus.UNINITIATED: TaskStatus.QUEUED.value_nick.upper(), + CopyStatus.IN_PROGRESS: TaskStatus.IN_PROGRESS.value_nick.upper(), + CopyStatus.COMPLETED: TaskStatus.COMPLETED.value_nick.upper(), + CopyStatus.FAILED: TaskStatus.FAILED.value_nick.upper(), + CopyStatus.CANCELLED: TaskStatus.CANCELLED.value_nick.upper() + } + + def __init__(self, transaction_id): + self.transaction_id = transaction_id + self.state = CopyStatus.UNINITIATED + + def set_state(self, state): + self.state = state + + def as_dict(self): + return self.__dict__ + + def to_yang(self): + job = RwPkgMgmtYang.CopyJob.from_dict({ + "transaction_id": self.transaction_id, + "status": CopyMeta.STATUS_MAP[self.state] + }) + return job + +class PackageFileCopier: + DESCRIPTOR_MAP = { + "vnfd": (RwVnfdYang.YangData_Vnfd_VnfdCatalog_Vnfd, 'vnfd rw-vnfd'), + "nsd" : (RwNsdYang.YangData_Nsd_NsdCatalog_Nsd, 'nsd rw-nsd') + } + + @classmethod + def from_rpc_input(cls, rpc_input, proxy, log=None): + return cls( + rpc_input.package_id, + rpc_input.package_type, + rpc_input.package_name, + proxy = proxy, + log=log) + + def __init__(self, + pkg_id, + pkg_type, + pkg_name, + proxy, + log): + self.src_package_id = pkg_id + self.package_type = pkg_type.lower() + self.dest_package_name = pkg_name + self.dest_package_id = str(uuid.uuid4()) + self.transaction_id = str(uuid.uuid4()) + self.proxy = proxy + self.log = log + self.meta = CopyMeta(self.transaction_id) + self.src_package = None + self.dest_desc_msg = None + + # Start of delegate calls + def call_delegate(self, event): + if not self.delegate: + return + + # Send out the descriptor message to be posted on success + # Otherwise send out the CopyJob yang conversion from meta object. + if event == "on_download_succeeded": + getattr(self.delegate, event)(self.dest_desc_msg) + else: + getattr(self.delegate, event)(self.meta.to_yang()) + + def _copy_tree(self): + """ + Locate directory tree of the source descriptor folder. + Copy directory tree to destination descriptor folder. + + """ + store = self.proxy._get_store(self.package_type) + src_path = store._get_package_dir(self.src_package_id) + self.src_package = store.get_package(self.src_package_id) + src_desc_name = self.src_package.descriptor_name + src_copy_path = os.path.join(src_path, src_desc_name) + + self.dest_copy_path = os.path.join(store.DEFAULT_ROOT_DIR, + self.dest_package_id, + self.dest_package_name) + self.log.debug("Copying contents from {src} to {dest}". + format(src=src_copy_path, dest=self.dest_copy_path)) + + shutil.copytree(src_copy_path, self.dest_copy_path) + + def _create_descriptor_file(self): + """ Update descriptor file for the newly copied descriptor catalog. + Use the existing descriptor file to create a descriptor proto gi object, + change some identifiers, and create a new descriptor yaml file from it. + + """ + src_desc_file = self.src_package.descriptor_file + src_desc_contents = self.src_package.descriptor_msg.as_dict() + src_desc_contents.update( + id =self.dest_package_id, + name = self.dest_package_name, + short_name = self.dest_package_name + ) + + desc_cls, modules = PackageFileCopier.DESCRIPTOR_MAP[self.package_type] + self.dest_desc_msg = desc_cls.from_dict(src_desc_contents) + dest_desc_path = os.path.join(self.dest_copy_path, + "{pkg_name}_{pkg_type}.yaml".format(pkg_name=self.dest_package_name, pkg_type=self.package_type)) + model = RwYang.Model.create_libncx() + for module in modules.split(): + model.load_module(module) + + with open(dest_desc_path, "w") as fh: + fh.write(self.dest_desc_msg.to_yaml(model)) + + copied_desc_file = os.path.join(self.dest_copy_path, os.path.basename(src_desc_file)) + if os.path.exists(copied_desc_file): + self.log.debug("Deleting copied yaml from old source %s" % (copied_desc_file)) + os.remove(copied_desc_file) + + def copy(self): + try: + if self.package_type not in PackageFileCopier.DESCRIPTOR_MAP: + raise PackageCopyError("Package type {} not currently supported for copy operations".format(self.package_type)) + + self._copy_tree() + self._create_descriptor_file() + self.copy_succeeded() + + except Exception as e: + self.log.exception(str(e)) + self.copy_failed() + + self.copy_finished() + + def copy_failed(self): + self.meta.set_state(CopyStatus.FAILED) + self.call_delegate("on_download_failed") + + def copy_progress(self): + self.meta.set_state(CopyStatus.IN_PROGRESS) + self.call_delegate("on_download_progress") + + def copy_succeeded(self): + self.meta.set_state(CopyStatus.COMPLETED) + self.call_delegate("on_download_succeeded") + + def copy_finished(self): + self.call_delegate("on_download_finished") + diff --git a/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/publisher/__init__.py b/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/publisher/__init__.py index 7b86d7dd..469c9d22 100644 --- a/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/publisher/__init__.py +++ b/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/publisher/__init__.py @@ -14,3 +14,4 @@ # limitations under the License. # from .download_status import DownloadStatusPublisher +from .copy_status import CopyStatusPublisher diff --git a/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/publisher/copy_status.py b/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/publisher/copy_status.py new file mode 100644 index 00000000..927331c1 --- /dev/null +++ b/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/publisher/copy_status.py @@ -0,0 +1,124 @@ +# +# Copyright 2017 RIFT.IO Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Author(s): Nandan Sinha +# + +import sys +import asyncio +import uuid +import abc +import functools +from concurrent.futures import Future + +from gi.repository import (RwDts as rwdts) +import rift.mano.dts as mano_dts +import rift.downloader as url_downloader +import rift.tasklets.rwlaunchpad.onboard as onboard + +if sys.version_info < (3, 4, 4): + asyncio.ensure_future = asyncio.async + + +class CopyStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProtocol): + + def __init__(self, log, dts, loop, tasklet_info): + super().__init__(log, dts, loop) + self.tasks = {} + self.tasklet_info = tasklet_info + + def xpath(self, transaction_id=None): + return ("D,/rw-pkg-mgmt:copy-jobs/rw-pkg-mgmt:job" + + ("[transaction-id='{}']".format(transaction_id) if transaction_id else "")) + pass + + @asyncio.coroutine + def register(self): + self.reg = yield from self.dts.register(xpath=self.xpath(), + flags=rwdts.Flag.PUBLISHER|rwdts.Flag.CACHE|rwdts.Flag.NO_PREP_READ) + + assert self.reg is not None + + @asyncio.coroutine + def register_copier(self, copier): + copier.delegate = self + future = self.loop.run_in_executor(None, copier.copy) + self.tasks[copier.transaction_id] = (copier, future) + + return (copier.transaction_id, copier.dest_package_id) + + @asyncio.coroutine + def _dts_publisher(self, job_msg): + # Publish the download state + self.reg.update_element( + self.xpath(transaction_id=job_msg.transaction_id), job_msg) + + @staticmethod + def _async_add(func, fut): + try: + ret = func() + fut.set_result(ret) + except Exception as e: + fut.set_exception(e) + + def _schedule_dts_work(self, job_msg): + f = functools.partial( + asyncio.ensure_future, + self._dts_publisher(job_msg), + loop = self.loop) + fut = Future() + self.loop.call_soon_threadsafe(CopyStatusPublisher._async_add, f, fut) + xx = fut.result() + if fut.exception() is not None: + self.log.error("Caught future exception during download: %s type %s", str(fut.exception()), type(fut.exception())) + raise fut.exception() + return xx + + def on_download_progress(self, job_msg): + """callback that triggers update. + """ + return self._schedule_dts_work(job_msg) + + def on_download_finished(self, job_msg): + """callback that triggers update. + """ + # clean up the local cache + key = job_msg.transaction_id + if key in self.tasks: + del self.tasks[key] + + return self._schedule_dts_work(job_msg) + + def on_download_succeeded(self, job_msg): + """Post the catalog descriptor object to the http endpoint. + Argument: job_msg (proto-gi descriptor_msg of the copied descriptor) + + """ + manifest = self.tasklet_info.get_pb_manifest() + use_ssl = manifest.bootstrap_phase.rwsecurity.use_ssl + ssl_cert, ssl_key = None, None + if use_ssl: + ssl_cert = manifest.bootstrap_phase.rwsecurity.cert + ssl_key = manifest.bootstrap_phase.rwsecurity.key + + onboarder = onboard.DescriptorOnboarder(self.log, + "127.0.0.1", 8008, use_ssl, ssl_cert, ssl_key) + try: + onboarder.onboard(job_msg) + except onboard.OnboardError as e: + self.log.error("Onboard exception triggered while posting copied catalog descriptor %s", e) + raise + + diff --git a/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/rpc.py b/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/rpc.py index f55a8fdf..a71f1085 100644 --- a/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/rpc.py +++ b/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/rpc.py @@ -34,6 +34,7 @@ RPC_PKG_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_GetPackageEndpoint RPC_SCHEMA_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_GetPackageSchema RPC_PACKAGE_ADD_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_PackageFileAdd RPC_PACKAGE_DELETE_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_PackageFileDelete +RPC_PACKAGE_COPY_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_PackageCopy class EndpointDiscoveryRpcHandler(mano_dts.AbstractRpcHandler): @@ -144,6 +145,33 @@ class PackageOperationsRpcHandler(mano_dts.AbstractRpcHandler): return rpc_op +class PackageCopyOperationsRpcHandler(mano_dts.AbstractRpcHandler): + def __init__(self, log, dts, loop, proxy, publisher): + """ + Args: + proxy: Any impl of .proxy.AbstractPackageManagerProxy + publisher: CopyStatusPublisher object + """ + super().__init__(log, dts, loop) + self.proxy = proxy + self.publisher = publisher + + @property + def xpath(self): + return "/rw-pkg-mgmt:package-copy" + + @asyncio.coroutine + def callback(self, ks_path, msg): + import uuid + copier = pkg_downloader.PackageFileCopier.from_rpc_input(msg, proxy=self.proxy, log=self.log) + + transaction_id, dest_package_id = yield from self.publisher.register_copier(copier) + rpc_op = RPC_PACKAGE_COPY_ENDPOINT.from_dict({ + "transaction_id":transaction_id, + "package_id":dest_package_id, + "package_type":msg.package_type}) + + return rpc_op class PackageDeleteOperationsRpcHandler(mano_dts.AbstractRpcHandler): def __init__(self, log, dts, loop, proxy): diff --git a/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/rwpkgmgr.py b/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/rwpkgmgr.py index b302b46e..5773b0e5 100644 --- a/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/rwpkgmgr.py +++ b/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/rwpkgmgr.py @@ -50,33 +50,42 @@ class PackageManagerTasklet(rift.tasklets.Tasklet): self.log.exception(e) def start(self): - super().start() self.log.debug("Registering with dts") - self.dts = rift.tasklets.DTS( + try: + super().start() + self.dts = rift.tasklets.DTS( self.tasklet_info, RwPkgMgmtYang.get_schema(), self.loop, self.on_dts_state_change ) + + proxy = filesystem.FileSystemProxy(self.loop, self.log) + args = [self.log, self.dts, self.loop] - proxy = filesystem.FileSystemProxy(self.loop, self.log) - - args = [self.log, self.dts, self.loop] - self.job_handler = pkg_publisher.DownloadStatusPublisher(*args) + # create catalog publishers + self.job_handler = pkg_publisher.DownloadStatusPublisher(*args) + self.copy_publisher = pkg_publisher.CopyStatusPublisher(*args +[self.tasklet_info]) # create catalog subscribers - self.vnfd_catalog_sub = subscriber.VnfdStatusSubscriber(*args) - self.nsd_catalog_sub = subscriber.NsdStatusSubscriber(*args) + self.vnfd_catalog_sub = subscriber.VnfdStatusSubscriber(*args) + self.nsd_catalog_sub = subscriber.NsdStatusSubscriber(*args) + + args.append(proxy) + self.endpoint_rpc = rpc.EndpointDiscoveryRpcHandler(*args) + self.schema_rpc = rpc.SchemaRpcHandler(*args) + self.delete_rpc = rpc.PackageDeleteOperationsRpcHandler(*args) + self.copy_rpc = rpc.PackageCopyOperationsRpcHandler(*(args + [self.copy_publisher])) - args.append(proxy) - self.endpoint_rpc = rpc.EndpointDiscoveryRpcHandler(*args) - self.schema_rpc = rpc.SchemaRpcHandler(*args) - self.delete_rpc = rpc.PackageDeleteOperationsRpcHandler(*args) + args.append(self.job_handler) + self.pkg_op = rpc.PackageOperationsRpcHandler(*args) - args.append(self.job_handler) - self.pkg_op = rpc.PackageOperationsRpcHandler(*args) + except Exception as e: + self.log.error("Exception caught rwpkgmgr start: %s", str(e)) + else: + self.log.debug("rwpkgmgr started successfully!") def stop(self): try: @@ -86,13 +95,18 @@ class PackageManagerTasklet(rift.tasklets.Tasklet): @asyncio.coroutine def init(self): - yield from self.endpoint_rpc.register() - yield from self.schema_rpc.register() - yield from self.pkg_op.register() - yield from self.job_handler.register() - yield from self.delete_rpc.register() - yield from self.vnfd_catalog_sub.register() - yield from self.nsd_catalog_sub.register() + try: + yield from self.endpoint_rpc.register() + yield from self.schema_rpc.register() + yield from self.pkg_op.register() + yield from self.job_handler.register() + yield from self.delete_rpc.register() + yield from self.copy_rpc.register() + yield from self.copy_publisher.register() + yield from self.vnfd_catalog_sub.register() + yield from self.nsd_catalog_sub.register() + except Exception as e: + self.log.error("Exception caught rwpkgmgr init %s", str(e)) @asyncio.coroutine def run(self): diff --git a/rwlaunchpad/plugins/yang/rw-pkg-mgmt.yang b/rwlaunchpad/plugins/yang/rw-pkg-mgmt.yang index de56e554..5fbd621b 100644 --- a/rwlaunchpad/plugins/yang/rw-pkg-mgmt.yang +++ b/rwlaunchpad/plugins/yang/rw-pkg-mgmt.yang @@ -182,6 +182,14 @@ module rw-pkg-mgmt } } + grouping copy-task-status { + leaf status { + description "The status of the copy task"; + type task-status; + default QUEUED; + } + } + container download-jobs { rwpb:msg-new DownloadJobs; description "Download jobs"; @@ -206,6 +214,24 @@ module rw-pkg-mgmt } } + container copy-jobs { + rwpb:msg-new CopyJobs; + description "Copy jobs"; + config false; + + list job { + rwpb:msg-new CopyJob; + key "transaction-id"; + + leaf transaction-id { + description "Unique UUID"; + type string; + } + + uses copy-task-status; + } + } + rpc get-package-endpoint { description "Retrieves the endpoint for the descriptor"; @@ -221,6 +247,28 @@ module rw-pkg-mgmt } } + rpc package-copy { + description "Copies the package specified in input and returns the copied package id"; + + input { + uses package-identifer; + + leaf package-name { + description "Name of destination package"; + type string; + } + } + + output { + leaf transaction-id { + description "Valid ID to track the status of the task"; + type string; + } + + uses package-identifer; + } + } + rpc get-package-schema { description "Retrieves the schema for the package type";