return self.descriptor_msg.id
+ @property
+ def descriptor_name(self):
+ """ The descriptor name of this descriptor in the system """
+ if not self.descriptor_msg.has_field("name"):
+ msg = "Descriptor name not present"
+ self._log.error(msg)
+ raise PackageError(msg)
+
+ return self.descriptor_msg.name
+
@classmethod
def get_descriptor_patterns(cls):
""" Returns a tuple of descriptor regex and Package Types """
rift/tasklets/${TASKLET_NAME}/rpc.py
rift/tasklets/${TASKLET_NAME}/downloader/__init__.py
rift/tasklets/${TASKLET_NAME}/downloader/url.py
+ rift/tasklets/${TASKLET_NAME}/downloader/copy.py
rift/tasklets/${TASKLET_NAME}/proxy/__init__.py
rift/tasklets/${TASKLET_NAME}/proxy/base.py
rift/tasklets/${TASKLET_NAME}/proxy/filesystem.py
rift/tasklets/${TASKLET_NAME}/publisher/__init__.py
rift/tasklets/${TASKLET_NAME}/publisher/download_status.py
+ rift/tasklets/${TASKLET_NAME}/publisher/copy_status.py
rift/tasklets/${TASKLET_NAME}/subscriber/__init__.py
rift/tasklets/${TASKLET_NAME}/subscriber/download_status.py
COMPONENT ${PKG_LONG_NAME}
PYTHON3_ONLY)
-rift_add_subdirs(test)
\ No newline at end of file
+rift_add_subdirs(test)
# limitations under the License.
#
from .url import PackageFileDownloader
+from .copy import PackageFileCopier
--- /dev/null
+#
+# Copyright 2017 RIFT.IO Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Author(s): Nandan Sinha
+#
+
+import os
+import uuid
+import shutil
+import enum
+
+import gi
+gi.require_version('RwVnfdYang', '1.0')
+gi.require_version('RwNsdYang', '1.0')
+from gi.repository import (
+ RwYang,
+ NsdYang,
+ VnfdYang,
+ RwVnfdYang,
+ RwNsdYang,
+ RwPkgMgmtYang
+)
+
+class PackageCopyError(Exception):
+ pass
+
+class CopyStatus(enum.Enum):
+ UNINITIATED = 0
+ STARTED = 1
+ IN_PROGRESS = 2
+ COMPLETED = 3
+ FAILED = 4
+ CANCELLED = 5
+
+TaskStatus = RwPkgMgmtYang.TaskStatus
+
+class CopyMeta:
+ STATUS_MAP = {
+ CopyStatus.STARTED: TaskStatus.QUEUED.value_nick.upper(),
+ CopyStatus.UNINITIATED: TaskStatus.QUEUED.value_nick.upper(),
+ CopyStatus.IN_PROGRESS: TaskStatus.IN_PROGRESS.value_nick.upper(),
+ CopyStatus.COMPLETED: TaskStatus.COMPLETED.value_nick.upper(),
+ CopyStatus.FAILED: TaskStatus.FAILED.value_nick.upper(),
+ CopyStatus.CANCELLED: TaskStatus.CANCELLED.value_nick.upper()
+ }
+
+ def __init__(self, transaction_id):
+ self.transaction_id = transaction_id
+ self.state = CopyStatus.UNINITIATED
+
+ def set_state(self, state):
+ self.state = state
+
+ def as_dict(self):
+ return self.__dict__
+
+ def to_yang(self):
+ job = RwPkgMgmtYang.CopyJob.from_dict({
+ "transaction_id": self.transaction_id,
+ "status": CopyMeta.STATUS_MAP[self.state]
+ })
+ return job
+
+class PackageFileCopier:
+ DESCRIPTOR_MAP = {
+ "vnfd": (RwVnfdYang.YangData_Vnfd_VnfdCatalog_Vnfd, 'vnfd rw-vnfd'),
+ "nsd" : (RwNsdYang.YangData_Nsd_NsdCatalog_Nsd, 'nsd rw-nsd')
+ }
+
+ @classmethod
+ def from_rpc_input(cls, rpc_input, proxy, log=None):
+ return cls(
+ rpc_input.package_id,
+ rpc_input.package_type,
+ rpc_input.package_name,
+ proxy = proxy,
+ log=log)
+
+ def __init__(self,
+ pkg_id,
+ pkg_type,
+ pkg_name,
+ proxy,
+ log):
+ self.src_package_id = pkg_id
+ self.package_type = pkg_type.lower()
+ self.dest_package_name = pkg_name
+ self.dest_package_id = str(uuid.uuid4())
+ self.transaction_id = str(uuid.uuid4())
+ self.proxy = proxy
+ self.log = log
+ self.meta = CopyMeta(self.transaction_id)
+ self.src_package = None
+ self.dest_desc_msg = None
+
+ # Start of delegate calls
+ def call_delegate(self, event):
+ if not self.delegate:
+ return
+
+ # Send out the descriptor message to be posted on success
+ # Otherwise send out the CopyJob yang conversion from meta object.
+ if event == "on_download_succeeded":
+ getattr(self.delegate, event)(self.dest_desc_msg)
+ else:
+ getattr(self.delegate, event)(self.meta.to_yang())
+
+ def _copy_tree(self):
+ """
+ Locate directory tree of the source descriptor folder.
+ Copy directory tree to destination descriptor folder.
+
+ """
+ store = self.proxy._get_store(self.package_type)
+ src_path = store._get_package_dir(self.src_package_id)
+ self.src_package = store.get_package(self.src_package_id)
+ src_desc_name = self.src_package.descriptor_name
+ src_copy_path = os.path.join(src_path, src_desc_name)
+
+ self.dest_copy_path = os.path.join(store.DEFAULT_ROOT_DIR,
+ self.dest_package_id,
+ self.dest_package_name)
+ self.log.debug("Copying contents from {src} to {dest}".
+ format(src=src_copy_path, dest=self.dest_copy_path))
+
+ shutil.copytree(src_copy_path, self.dest_copy_path)
+
+ def _create_descriptor_file(self):
+ """ Update descriptor file for the newly copied descriptor catalog.
+ Use the existing descriptor file to create a descriptor proto gi object,
+ change some identifiers, and create a new descriptor yaml file from it.
+
+ """
+ src_desc_file = self.src_package.descriptor_file
+ src_desc_contents = self.src_package.descriptor_msg.as_dict()
+ src_desc_contents.update(
+ id =self.dest_package_id,
+ name = self.dest_package_name,
+ short_name = self.dest_package_name
+ )
+
+ desc_cls, modules = PackageFileCopier.DESCRIPTOR_MAP[self.package_type]
+ self.dest_desc_msg = desc_cls.from_dict(src_desc_contents)
+ dest_desc_path = os.path.join(self.dest_copy_path,
+ "{pkg_name}_{pkg_type}.yaml".format(pkg_name=self.dest_package_name, pkg_type=self.package_type))
+ model = RwYang.Model.create_libncx()
+ for module in modules.split():
+ model.load_module(module)
+
+ with open(dest_desc_path, "w") as fh:
+ fh.write(self.dest_desc_msg.to_yaml(model))
+
+ copied_desc_file = os.path.join(self.dest_copy_path, os.path.basename(src_desc_file))
+ if os.path.exists(copied_desc_file):
+ self.log.debug("Deleting copied yaml from old source %s" % (copied_desc_file))
+ os.remove(copied_desc_file)
+
+ def copy(self):
+ try:
+ if self.package_type not in PackageFileCopier.DESCRIPTOR_MAP:
+ raise PackageCopyError("Package type {} not currently supported for copy operations".format(self.package_type))
+
+ self._copy_tree()
+ self._create_descriptor_file()
+ self.copy_succeeded()
+
+ except Exception as e:
+ self.log.exception(str(e))
+ self.copy_failed()
+
+ self.copy_finished()
+
+ def copy_failed(self):
+ self.meta.set_state(CopyStatus.FAILED)
+ self.call_delegate("on_download_failed")
+
+ def copy_progress(self):
+ self.meta.set_state(CopyStatus.IN_PROGRESS)
+ self.call_delegate("on_download_progress")
+
+ def copy_succeeded(self):
+ self.meta.set_state(CopyStatus.COMPLETED)
+ self.call_delegate("on_download_succeeded")
+
+ def copy_finished(self):
+ self.call_delegate("on_download_finished")
+
# limitations under the License.
#
from .download_status import DownloadStatusPublisher
+from .copy_status import CopyStatusPublisher
--- /dev/null
+#
+# Copyright 2017 RIFT.IO Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Author(s): Nandan Sinha
+#
+
+import sys
+import asyncio
+import uuid
+import abc
+import functools
+from concurrent.futures import Future
+
+from gi.repository import (RwDts as rwdts)
+import rift.mano.dts as mano_dts
+import rift.downloader as url_downloader
+import rift.tasklets.rwlaunchpad.onboard as onboard
+
+if sys.version_info < (3, 4, 4):
+ asyncio.ensure_future = asyncio.async
+
+
+class CopyStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProtocol):
+
+ def __init__(self, log, dts, loop, tasklet_info):
+ super().__init__(log, dts, loop)
+ self.tasks = {}
+ self.tasklet_info = tasklet_info
+
+ def xpath(self, transaction_id=None):
+ return ("D,/rw-pkg-mgmt:copy-jobs/rw-pkg-mgmt:job" +
+ ("[transaction-id='{}']".format(transaction_id) if transaction_id else ""))
+ pass
+
+ @asyncio.coroutine
+ def register(self):
+ self.reg = yield from self.dts.register(xpath=self.xpath(),
+ flags=rwdts.Flag.PUBLISHER|rwdts.Flag.CACHE|rwdts.Flag.NO_PREP_READ)
+
+ assert self.reg is not None
+
+ @asyncio.coroutine
+ def register_copier(self, copier):
+ copier.delegate = self
+ future = self.loop.run_in_executor(None, copier.copy)
+ self.tasks[copier.transaction_id] = (copier, future)
+
+ return (copier.transaction_id, copier.dest_package_id)
+
+ @asyncio.coroutine
+ def _dts_publisher(self, job_msg):
+ # Publish the download state
+ self.reg.update_element(
+ self.xpath(transaction_id=job_msg.transaction_id), job_msg)
+
+ @staticmethod
+ def _async_add(func, fut):
+ try:
+ ret = func()
+ fut.set_result(ret)
+ except Exception as e:
+ fut.set_exception(e)
+
+ def _schedule_dts_work(self, job_msg):
+ f = functools.partial(
+ asyncio.ensure_future,
+ self._dts_publisher(job_msg),
+ loop = self.loop)
+ fut = Future()
+ self.loop.call_soon_threadsafe(CopyStatusPublisher._async_add, f, fut)
+ xx = fut.result()
+ if fut.exception() is not None:
+ self.log.error("Caught future exception during download: %s type %s", str(fut.exception()), type(fut.exception()))
+ raise fut.exception()
+ return xx
+
+ def on_download_progress(self, job_msg):
+ """callback that triggers update.
+ """
+ return self._schedule_dts_work(job_msg)
+
+ def on_download_finished(self, job_msg):
+ """callback that triggers update.
+ """
+ # clean up the local cache
+ key = job_msg.transaction_id
+ if key in self.tasks:
+ del self.tasks[key]
+
+ return self._schedule_dts_work(job_msg)
+
+ def on_download_succeeded(self, job_msg):
+ """Post the catalog descriptor object to the http endpoint.
+ Argument: job_msg (proto-gi descriptor_msg of the copied descriptor)
+
+ """
+ manifest = self.tasklet_info.get_pb_manifest()
+ use_ssl = manifest.bootstrap_phase.rwsecurity.use_ssl
+ ssl_cert, ssl_key = None, None
+ if use_ssl:
+ ssl_cert = manifest.bootstrap_phase.rwsecurity.cert
+ ssl_key = manifest.bootstrap_phase.rwsecurity.key
+
+ onboarder = onboard.DescriptorOnboarder(self.log,
+ "127.0.0.1", 8008, use_ssl, ssl_cert, ssl_key)
+ try:
+ onboarder.onboard(job_msg)
+ except onboard.OnboardError as e:
+ self.log.error("Onboard exception triggered while posting copied catalog descriptor %s", e)
+ raise
+
+
RPC_SCHEMA_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_GetPackageSchema
RPC_PACKAGE_ADD_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_PackageFileAdd
RPC_PACKAGE_DELETE_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_PackageFileDelete
+RPC_PACKAGE_COPY_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_PackageCopy
class EndpointDiscoveryRpcHandler(mano_dts.AbstractRpcHandler):
return rpc_op
+class PackageCopyOperationsRpcHandler(mano_dts.AbstractRpcHandler):
+ def __init__(self, log, dts, loop, proxy, publisher):
+ """
+ Args:
+ proxy: Any impl of .proxy.AbstractPackageManagerProxy
+ publisher: CopyStatusPublisher object
+ """
+ super().__init__(log, dts, loop)
+ self.proxy = proxy
+ self.publisher = publisher
+
+ @property
+ def xpath(self):
+ return "/rw-pkg-mgmt:package-copy"
+
+ @asyncio.coroutine
+ def callback(self, ks_path, msg):
+ import uuid
+ copier = pkg_downloader.PackageFileCopier.from_rpc_input(msg, proxy=self.proxy, log=self.log)
+
+ transaction_id, dest_package_id = yield from self.publisher.register_copier(copier)
+ rpc_op = RPC_PACKAGE_COPY_ENDPOINT.from_dict({
+ "transaction_id":transaction_id,
+ "package_id":dest_package_id,
+ "package_type":msg.package_type})
+
+ return rpc_op
class PackageDeleteOperationsRpcHandler(mano_dts.AbstractRpcHandler):
def __init__(self, log, dts, loop, proxy):
self.log.exception(e)
def start(self):
- super().start()
self.log.debug("Registering with dts")
- self.dts = rift.tasklets.DTS(
+ try:
+ super().start()
+ self.dts = rift.tasklets.DTS(
self.tasklet_info,
RwPkgMgmtYang.get_schema(),
self.loop,
self.on_dts_state_change
)
+
+ proxy = filesystem.FileSystemProxy(self.loop, self.log)
+ args = [self.log, self.dts, self.loop]
- proxy = filesystem.FileSystemProxy(self.loop, self.log)
-
- args = [self.log, self.dts, self.loop]
- self.job_handler = pkg_publisher.DownloadStatusPublisher(*args)
+ # create catalog publishers
+ self.job_handler = pkg_publisher.DownloadStatusPublisher(*args)
+ self.copy_publisher = pkg_publisher.CopyStatusPublisher(*args +[self.tasklet_info])
# create catalog subscribers
- self.vnfd_catalog_sub = subscriber.VnfdStatusSubscriber(*args)
- self.nsd_catalog_sub = subscriber.NsdStatusSubscriber(*args)
+ self.vnfd_catalog_sub = subscriber.VnfdStatusSubscriber(*args)
+ self.nsd_catalog_sub = subscriber.NsdStatusSubscriber(*args)
+
+ args.append(proxy)
+ self.endpoint_rpc = rpc.EndpointDiscoveryRpcHandler(*args)
+ self.schema_rpc = rpc.SchemaRpcHandler(*args)
+ self.delete_rpc = rpc.PackageDeleteOperationsRpcHandler(*args)
+ self.copy_rpc = rpc.PackageCopyOperationsRpcHandler(*(args + [self.copy_publisher]))
- args.append(proxy)
- self.endpoint_rpc = rpc.EndpointDiscoveryRpcHandler(*args)
- self.schema_rpc = rpc.SchemaRpcHandler(*args)
- self.delete_rpc = rpc.PackageDeleteOperationsRpcHandler(*args)
+ args.append(self.job_handler)
+ self.pkg_op = rpc.PackageOperationsRpcHandler(*args)
- args.append(self.job_handler)
- self.pkg_op = rpc.PackageOperationsRpcHandler(*args)
+ except Exception as e:
+ self.log.error("Exception caught rwpkgmgr start: %s", str(e))
+ else:
+ self.log.debug("rwpkgmgr started successfully!")
def stop(self):
try:
@asyncio.coroutine
def init(self):
- yield from self.endpoint_rpc.register()
- yield from self.schema_rpc.register()
- yield from self.pkg_op.register()
- yield from self.job_handler.register()
- yield from self.delete_rpc.register()
- yield from self.vnfd_catalog_sub.register()
- yield from self.nsd_catalog_sub.register()
+ try:
+ yield from self.endpoint_rpc.register()
+ yield from self.schema_rpc.register()
+ yield from self.pkg_op.register()
+ yield from self.job_handler.register()
+ yield from self.delete_rpc.register()
+ yield from self.copy_rpc.register()
+ yield from self.copy_publisher.register()
+ yield from self.vnfd_catalog_sub.register()
+ yield from self.nsd_catalog_sub.register()
+ except Exception as e:
+ self.log.error("Exception caught rwpkgmgr init %s", str(e))
@asyncio.coroutine
def run(self):
}
}
+ grouping copy-task-status {
+ leaf status {
+ description "The status of the copy task";
+ type task-status;
+ default QUEUED;
+ }
+ }
+
container download-jobs {
rwpb:msg-new DownloadJobs;
description "Download jobs";
}
}
+ container copy-jobs {
+ rwpb:msg-new CopyJobs;
+ description "Copy jobs";
+ config false;
+
+ list job {
+ rwpb:msg-new CopyJob;
+ key "transaction-id";
+
+ leaf transaction-id {
+ description "Unique UUID";
+ type string;
+ }
+
+ uses copy-task-status;
+ }
+ }
+
rpc get-package-endpoint {
description "Retrieves the endpoint for the descriptor";
}
}
+ rpc package-copy {
+ description "Copies the package specified in input and returns the copied package id";
+
+ input {
+ uses package-identifer;
+
+ leaf package-name {
+ description "Name of destination package";
+ type string;
+ }
+ }
+
+ output {
+ leaf transaction-id {
+ description "Valid ID to track the status of the task";
+ type string;
+ }
+
+ uses package-identifer;
+ }
+ }
+
rpc get-package-schema {
description "Retrieves the schema for the package type";