X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwlaunchpad%2Fplugins%2Frwlaunchpadtasklet%2Frift%2Ftasklets%2Frwlaunchpad%2Fuploader.py;h=1e45ea4bfb791c68961164a4c2642a5b07768475;hb=a3bb91f092d378448cb870eccd45d43865de143c;hp=081c1f5c84c72617873c7d6534b2b767626afbc6;hpb=255ff03a528a3090ce7f46f0a63b65da3e6f9bcf;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwlaunchpadtasklet/rift/tasklets/rwlaunchpad/uploader.py b/rwlaunchpad/plugins/rwlaunchpadtasklet/rift/tasklets/rwlaunchpad/uploader.py index 081c1f5c..1e45ea4b 100644 --- a/rwlaunchpad/plugins/rwlaunchpadtasklet/rift/tasklets/rwlaunchpad/uploader.py +++ b/rwlaunchpad/plugins/rwlaunchpadtasklet/rift/tasklets/rwlaunchpad/uploader.py @@ -1,5 +1,5 @@ -# +# # Copyright 2016 RIFT.IO Inc # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -14,9 +14,11 @@ # See the License for the specific language governing permissions and # limitations under the License. # - +import abc +import asyncio import collections import os +import tempfile import threading import uuid import zlib @@ -36,12 +38,12 @@ requests.packages.urllib3.disable_warnings(InsecureRequestWarning) import gi gi.require_version('RwLaunchpadYang', '1.0') -gi.require_version('NsdYang', '1.0') -gi.require_version('VnfdYang', '1.0') +gi.require_version('ProjectNsdYang', '1.0') +gi.require_version('ProjectVnfdYang', '1.0') from gi.repository import ( - NsdYang, - VnfdYang, + ProjectNsdYang as NsdYang, + ProjectVnfdYang as VnfdYang, ) import rift.mano.cloud @@ -49,11 +51,19 @@ import rift.package.charm import rift.package.checksums import rift.package.config import rift.package.convert +import rift.package.handler as pkg_handler import rift.package.icon import rift.package.package import rift.package.script import rift.package.store +from gi.repository import ( + RwDts as rwdts, + RwPkgMgmtYang) +import rift.downloader as downloader +import rift.mano.dts as mano_dts +import rift.tasklets + from . import ( export, extract, @@ -90,6 +100,8 @@ from .message import ( OnboardStart, OnboardSuccess, + DownloadError, + DownloadSuccess, # Update Error Messages UpdateChecksumMismatch, @@ -121,6 +133,11 @@ GB = 1024 * MB MAX_STREAMED_SIZE = 5 * GB +# Shortcuts +RPC_PACKAGE_CREATE_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_PackageCreate +RPC_PACKAGE_UPDATE_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_PackageUpdate + + class HttpMessageError(Exception): def __init__(self, code, msg): @@ -128,296 +145,90 @@ class HttpMessageError(Exception): self.msg = msg -class GzipTemporaryFileStreamedPart(multipart_streamer.TemporaryFileStreamedPart): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - # Create a decompressor for gzip data to decompress on the fly during upload - # http://stackoverflow.com/questions/2423866/python-decompressing-gzip-chunk-by-chunk - self._decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS) - - def feed(self, data): - decompressed_data = self._decompressor.decompress(data) - if decompressed_data: - super().feed(decompressed_data) - - def finalize(self): - # All data has arrived, flush the decompressor to get any last decompressed data - decompressed_data = self._decompressor.flush() - super().feed(decompressed_data) - super().finalize() - - -class GzipMultiPartStreamer(multipart_streamer.MultiPartStreamer): - """ This Multipart Streamer decompresses gzip files on the fly during multipart upload """ - - @staticmethod - def _get_descriptor_name_from_headers(headers): - descriptor_filename = None - - for entry in headers: - if entry["value"] != "form-data": - continue - - form_data_params = entry["params"] - if "name" in form_data_params: - if form_data_params["name"] != "descriptor": - continue - - if "filename" not in form_data_params: - continue - - descriptor_filename = form_data_params["filename"] - - return descriptor_filename - - def create_part(self, headers): - """ Create the StreamedPart subclass depending on the descriptor filename - - For gzipped descriptor packages, create a GzipTemporaryFileStreamedPart which - can decompress the gzip while it's being streamed into the launchpad directely - into a file. - - Returns: - The descriptor filename +class UploadRpcHandler(mano_dts.AbstractRpcHandler): + def __init__(self, application): """ - filename = GzipMultiPartStreamer._get_descriptor_name_from_headers(headers) - if filename is None or not filename.endswith(".gz"): - return multipart_streamer.TemporaryFileStreamedPart(self, headers) - - return GzipTemporaryFileStreamedPart(self, headers) - - -class RequestHandler(tornado.web.RequestHandler): - def options(self, *args, **kargs): - pass - - def set_default_headers(self): - self.set_header('Access-Control-Allow-Origin', '*') - self.set_header('Access-Control-Allow-Headers', - 'Content-Type, Cache-Control, Accept, X-Requested-With, Authorization') - self.set_header('Access-Control-Allow-Methods', 'POST, GET, PUT, DELETE') - - -@tornado.web.stream_request_body -class StreamingUploadHandler(RequestHandler): - def initialize(self, log, loop): - """Initialize the handler - - Arguments: - log - the logger that this handler should use - loop - the tasklets ioloop - + Args: + application: UploaderApplication """ - self.transaction_id = str(uuid.uuid4()) - - self.loop = loop - self.log = self.application.get_logger(self.transaction_id) - - self.part_streamer = None + super().__init__(application.log, application.dts, application.loop) + self.application = application - self.log.debug('created handler (transaction_id = {})'.format(self.transaction_id)) - - def msg_missing_content_type(self): - raise NotImplementedError() - - def msg_unsupported_media_type(self): - raise NotImplementedError() - - def msg_missing_content_boundary(self): - raise NotImplementedError() - - def msg_start(self): - raise NotImplementedError() - - def msg_success(self): - raise NotImplementedError() - - def msg_failure(self): - raise NotImplementedError() - - def msg_package_upload(self): - raise NotImplementedError() - - @tornado.gen.coroutine - def prepare(self): - """Prepare the handler for a request - - The prepare function is the first part of a request transaction. It - creates a temporary file that uploaded data can be written to. + @property + def xpath(self): + return "/rw-pkg-mgmt:package-create" - """ - if self.request.method != "POST": - return + @asyncio.coroutine + def callback(self, ks_path, msg): + transaction_id = str(uuid.uuid4()) + log = self.application.get_logger(transaction_id) + log.message(OnboardStart()) - self.request.connection.set_max_body_size(MAX_STREAMED_SIZE) + self.log.debug("Package create RPC: {}".format(msg)) - self.log.message(self.msg_start()) + auth = None + if msg.username is not None: + auth = (msg.username, msg.password) try: - # Retrieve the content type and parameters from the request - content_type = self.request.headers.get('content-type', None) - if content_type is None: - raise HttpMessageError(400, self.msg_missing_content_type()) - - content_type, params = tornado.httputil._parse_header(content_type) - - if "multipart/form-data" != content_type.lower(): - raise HttpMessageError(415, self.msg_unsupported_media_type()) - - if "boundary" not in params: - raise HttpMessageError(400, self.msg_missing_content_boundary()) - - # You can get the total request size from the headers. - try: - total = int(self.request.headers.get("Content-Length", "0")) - except KeyError: - self.log.warning("Content length header not found") - # For any well formed browser request, Content-Length should have a value. - total = 0 - - # And here you create a streamer that will accept incoming data - self.part_streamer = GzipMultiPartStreamer(total) - - except HttpMessageError as e: - self.log.message(e.msg) - self.log.message(self.msg_failure()) - - raise tornado.web.HTTPError(e.code, e.msg.name) - - except Exception as e: - self.log.exception(e) - self.log.message(self.msg_failure()) - - @tornado.gen.coroutine - def data_received(self, chunk): - """Write data to the current file - - Arguments: - data - a chunk of data to write to file - - """ + project = msg.project_name + except AttributeError as e: + self._log.warning("Did not get project name in RPC: {}". + format(msg.as_dict())) + project = rift.mano.utils.project.DEFAULT_PROJECT + + self.application.onboard( + msg.external_url, + transaction_id, + auth=auth, + project=project, + ) - """When a chunk of data is received, we forward it to the multipart streamer.""" - self.part_streamer.data_received(chunk) + rpc_op = RPC_PACKAGE_CREATE_ENDPOINT.from_dict({ + "transaction_id": transaction_id, + "project_name": project, + }) - def post(self): - """Handle a post request + return rpc_op - The function is called after any data associated with the body of the - request has been received. +class UpdateRpcHandler(mano_dts.AbstractRpcHandler): + def __init__(self, application): """ - # You MUST call this to close the incoming stream. - self.part_streamer.data_complete() - - desc_parts = self.part_streamer.get_parts_by_name("descriptor") - if len(desc_parts) != 1: - raise HttpMessageError(400, OnboardError("Descriptor option not found")) - - self.log.message(self.msg_package_upload()) - - -class UploadHandler(StreamingUploadHandler): - """ - This handler is used to upload archives that contain VNFDs, NSDs, and PNFDs - to the launchpad. This is a streaming handler that writes uploaded archives - to disk without loading them all into memory. - """ - - def msg_missing_content_type(self): - return OnboardMissingContentType() - - def msg_unsupported_media_type(self): - return OnboardUnsupportedMediaType() - - def msg_missing_content_boundary(self): - return OnboardMissingContentBoundary() - - def msg_start(self): - return OnboardStart() - - def msg_success(self): - return OnboardSuccess() - - def msg_failure(self): - return OnboardFailure() - - def msg_package_upload(self): - return OnboardPackageUpload() - - def post(self): - """Handle a post request - - The function is called after any data associated with the body of the - request has been received. - + Args: + application: UploaderApplication """ - try: - super().post() - self.application.onboard( - self.part_streamer, - self.transaction_id, - auth=self.request.headers.get('authorization', None), - ) - - self.set_status(200) - self.write(tornado.escape.json_encode({ - "transaction_id": self.transaction_id, - })) - - except Exception: - self.log.exception("Upload POST failed") - self.part_streamer.release_parts() - raise - - -class UpdateHandler(StreamingUploadHandler): - def msg_missing_content_type(self): - return UpdateMissingContentType() + super().__init__(application.log, application.dts, application.loop) + self.application = application - def msg_unsupported_media_type(self): - return UpdateUnsupportedMediaType() - - def msg_missing_content_boundary(self): - return UpdateMissingContentBoundary() - - def msg_start(self): - return UpdateStart() - - def msg_success(self): - return UpdateSuccess() - - def msg_failure(self): - return UpdateFailure() + @property + def xpath(self): + return "/rw-pkg-mgmt:package-update" - def msg_package_upload(self): - return UpdatePackageUpload() + @asyncio.coroutine + def callback(self, ks_path, msg): - def post(self): - """Handle a post request + transaction_id = str(uuid.uuid4()) + log = self.application.get_logger(transaction_id) + log.message(UpdateStart()) - The function is called after any data associated with the body of the - request has been received. + auth = None + if msg.username is not None: + auth = (msg.username, msg.password) - """ - try: - super().post() + self.application.update( + msg.external_url, + transaction_id, + auth=auth, + project=msg.project_name, + ) - self.application.update( - self.part_streamer, - self.transaction_id, - auth=self.request.headers.get('authorization', None), - ) + rpc_op = RPC_PACKAGE_UPDATE_ENDPOINT.from_dict({ + "transaction_id": transaction_id, + "project_name": msg.project_name, + }) - self.set_status(200) - self.write(tornado.escape.json_encode({ - "transaction_id": self.transaction_id, - })) - except Exception: - self.log.exception("Upload POST failed") - self.part_streamer.release_parts() - raise + return rpc_op class UploadStateHandler(state.StateHandler): @@ -432,27 +243,30 @@ class UpdateStateHandler(state.StateHandler): FAILURE = UpdateFailure -class UpdatePackage(threading.Thread): - def __init__(self, log, loop, part_streamer, auth, +class UpdatePackage(downloader.DownloaderProtocol): + + def __init__(self, log, loop, project, url, auth, onboarder, uploader, package_store_map): super().__init__() self.log = log self.loop = loop - self.part_streamer = part_streamer + self.project = project + self.url = url self.auth = auth self.onboarder = onboarder self.uploader = uploader self.package_store_map = package_store_map - self.io_loop = tornado.ioloop.IOLoop.current() - def _update_package(self): + def _update_package(self, packages): + # Extract package could return multiple packages if # the package is converted - for pkg in self.extract_package(): + for pkg in packages: with pkg as temp_package: package_checksums = self.validate_package(temp_package) stored_package = self.update_package(temp_package) + self.validate_vnfd_fields(temp_package) try: self.extract_charms(temp_package) @@ -469,9 +283,9 @@ class UpdatePackage(threading.Thread): else: self.upload_images(temp_package, package_checksums) - def run(self): + def extract(self, packages): try: - self._update_package() + self._update_package(packages) self.log.message(UpdateSuccess()) except MessageException as e: @@ -484,23 +298,32 @@ class UpdatePackage(threading.Thread): self.log.message(UpdateError(str(e))) self.log.message(UpdateFailure()) - def extract_package(self): - """Extract multipart message from tarball""" - desc_part = self.part_streamer.get_parts_by_name("descriptor")[0] - - # Invoke the move API to prevent the part streamer from attempting - # to clean up (the file backed package will do that itself) - desc_part.move(desc_part.f_out.name) - - package_name = desc_part.get_filename() - package_path = desc_part.f_out.name + def on_download_succeeded(self, job): + self.log.message(DownloadSuccess("Package downloaded.")) extractor = extract.UploadPackageExtractor(self.log) file_backed_packages = extractor.create_packages_from_upload( - package_name, package_path + job.filename, job.filepath ) - return file_backed_packages + self.extract(file_backed_packages) + + def on_download_failed(self, job): + self.log.error(job.detail) + self.log.message(DownloadError("Package download failed. {}".format(job.detail))) + self.log.message(UpdateFailure()) + + def download_package(self): + + _, filename = tempfile.mkstemp() + url_downloader = downloader.UrlDownloader( + self.url, + auth=self.auth, + file_obj=filename, + decompress_on_fly=True, + log=self.log) + url_downloader.delegate = self + url_downloader.download() def get_package_store(self, package): return self.package_store_map[package.descriptor_type] @@ -547,7 +370,7 @@ class UpdatePackage(threading.Thread): ) try: self.uploader.upload_image(image_name, image_checksum, image_hdl) - self.uploader.upload_image_to_cloud_accounts(image_name, image_checksum) + self.uploader.upload_image_to_cloud_accounts(image_name, image_checksum, self.project) except image.ImageUploadError as e: self.log.exception("Failed to upload image: %s", image_name) @@ -556,7 +379,6 @@ class UpdatePackage(threading.Thread): finally: _ = [image_hdl.close() for image_hdl in name_hdl_map.values()] - def extract_charms(self, package): try: charm_extractor = rift.package.charm.PackageCharmExtractor(self.log) @@ -585,6 +407,23 @@ class UpdatePackage(threading.Thread): except rift.package.icon.IconExtractionError as e: raise MessageException(UpdateExtractionError()) from e + def validate_vnfd_fields(self, package): + # We can add more VNFD validations here. Currently we are validating only cloud-init + if package.descriptor_msg is not None: + self.validate_cloud_init_file(package) + + def validate_cloud_init_file(self, package): + """ This validation is for VNFDs with associated VDUs. """ + if 'vdu' in package.descriptor_msg.as_dict(): + for vdu in package.descriptor_msg.as_dict()['vdu']: + if 'cloud_init_file' in vdu: + cloud_init_file = vdu['cloud_init_file'] + for file in package.files: + if file.endswith('/' + cloud_init_file) is True: + return + raise MessageException( + OnboardError("Cloud-Init file reference in VNFD does not match with cloud-init file")) + def validate_package(self, package): checksum_validator = rift.package.package.PackageChecksumValidator(self.log) @@ -603,32 +442,33 @@ class UpdatePackage(threading.Thread): self.log.message(UpdateDescriptorUpdate()) try: - self.onboarder.update(descriptor_msg) + self.onboarder.update(descriptor_msg, project=self.project) except onboard.UpdateError as e: raise MessageException(UpdateDescriptorError(package.descriptor_file)) from e -class OnboardPackage(threading.Thread): - def __init__(self, log, loop, part_streamer, auth, +class OnboardPackage(downloader.DownloaderProtocol): + + def __init__(self, log, loop, project, url, auth, onboarder, uploader, package_store_map): - super().__init__() self.log = log self.loop = loop - self.part_streamer = part_streamer + self.project = project + self.url = url self.auth = auth self.onboarder = onboarder self.uploader = uploader self.package_store_map = package_store_map + self.project = project - self.io_loop = tornado.ioloop.IOLoop.current() - - def _onboard_package(self): + def _onboard_package(self, packages): # Extract package could return multiple packages if # the package is converted - for pkg in self.extract_package(): + for pkg in packages: with pkg as temp_package: package_checksums = self.validate_package(temp_package) stored_package = self.store_package(temp_package) + self.validate_vnfd_fields(temp_package) try: self.extract_charms(temp_package) @@ -645,9 +485,9 @@ class OnboardPackage(threading.Thread): else: self.upload_images(temp_package, package_checksums) - def run(self): + def extract(self, packages): try: - self._onboard_package() + self._onboard_package(packages) self.log.message(OnboardSuccess()) except MessageException as e: @@ -660,26 +500,32 @@ class OnboardPackage(threading.Thread): self.log.message(OnboardError(str(e))) self.log.message(OnboardFailure()) - finally: - self.part_streamer.release_parts() - - def extract_package(self): - """Extract multipart message from tarball""" - desc_part = self.part_streamer.get_parts_by_name("descriptor")[0] - - # Invoke the move API to prevent the part streamer from attempting - # to clean up (the file backed package will do that itself) - desc_part.move(desc_part.f_out.name) - - package_name = desc_part.get_filename() - package_path = desc_part.f_out.name + def on_download_succeeded(self, job): + self.log.message(DownloadSuccess("Package downloaded.")) extractor = extract.UploadPackageExtractor(self.log) file_backed_packages = extractor.create_packages_from_upload( - package_name, package_path + job.filename, job.filepath ) - return file_backed_packages + self.extract(file_backed_packages) + + def on_download_failed(self, job): + self.log.error(job.detail) + self.log.message(DownloadError("Package download failed. {}".format(job.detail))) + self.log.message(OnboardFailure()) + + def download_package(self): + + _, filename = tempfile.mkstemp() + url_downloader = downloader.UrlDownloader( + self.url, + auth=self.auth, + file_obj=filename, + decompress_on_fly=True, + log=self.log) + url_downloader.delegate = self + url_downloader.download() def get_package_store(self, package): return self.package_store_map[package.descriptor_type] @@ -759,6 +605,23 @@ class OnboardPackage(threading.Thread): except rift.package.icon.IconExtractionError as e: raise MessageException(OnboardExtractionError()) from e + def validate_vnfd_fields(self, package): + # We can add more VNFD validations here. Currently we are validating only cloud-init + if package.descriptor_msg is not None: + self.validate_cloud_init_file(package) + + def validate_cloud_init_file(self, package): + """ This validation is for VNFDs with associated VDUs. """ + if 'vdu' in package.descriptor_msg.as_dict(): + for vdu in package.descriptor_msg.as_dict()['vdu']: + if 'cloud_init_file' in vdu: + cloud_init_file = vdu['cloud_init_file'] + for file in package.files: + if file.endswith('/' + cloud_init_file) is True: + return + raise MessageException( + OnboardError("Cloud-Init file reference in VNFD does not match with cloud-init file")) + def validate_package(self, package): checksum_validator = rift.package.package.PackageChecksumValidator(self.log) @@ -777,56 +640,88 @@ class OnboardPackage(threading.Thread): self.log.message(OnboardDescriptorOnboard()) try: - self.onboarder.onboard(descriptor_msg) + self.onboarder.onboard(descriptor_msg, project=self.project) except onboard.OnboardError as e: raise MessageException(OnboardDescriptorError(package.descriptor_file)) from e class UploaderApplication(tornado.web.Application): - def __init__(self, tasklet): - self.tasklet = tasklet - self.accounts = [] - self.messages = collections.defaultdict(list) - self.export_dir = os.path.join(os.environ['RIFT_ARTIFACTS'], 'launchpad/exports') + @classmethod + def from_tasklet(cls, tasklet): manifest = tasklet.tasklet_info.get_pb_manifest() - self.use_ssl = manifest.bootstrap_phase.rwsecurity.use_ssl - self.ssl_cert = manifest.bootstrap_phase.rwsecurity.cert - self.ssl_key = manifest.bootstrap_phase.rwsecurity.key + use_ssl = manifest.bootstrap_phase.rwsecurity.use_ssl + ssl_cert = manifest.bootstrap_phase.rwsecurity.cert + ssl_key = manifest.bootstrap_phase.rwsecurity.key + return cls( + tasklet, + ssl=(ssl_cert, ssl_key), + vnfd_store=tasklet.vnfd_package_store, + nsd_store=tasklet.nsd_package_store) + + def __init__( + self, + tasklet, + ssl=None, + vnfd_store=None, + nsd_store=None): + + self.log = tasklet.log + self.loop = tasklet.loop + self.dts = tasklet.dts + + self.accounts = {} + + self.use_ssl = False + self.ssl_cert, self.ssl_key = None, None + if ssl: + self.use_ssl = True + self.ssl_cert, self.ssl_key = ssl + + if not vnfd_store: + vnfd_store = rift.package.store.VnfdPackageFilesystemStore(self.log) + + if not nsd_store: + nsd_store = rift.package.store.NsdPackageFilesystemStore(self.log) - self.uploader = image.ImageUploader(self.log, self.loop, tasklet.dts) + self.messages = collections.defaultdict(list) + self.export_dir = os.path.join(os.environ['RIFT_ARTIFACTS'], 'launchpad/exports') + + self.uploader = image.ImageUploader(self.log, self.loop, self.dts) self.onboarder = onboard.DescriptorOnboarder( self.log, "127.0.0.1", 8008, self.use_ssl, self.ssl_cert, self.ssl_key ) self.package_store_map = { - "vnfd": self.tasklet.vnfd_package_store, - "nsd": self.tasklet.nsd_package_store, + "vnfd": vnfd_store, + "nsd": nsd_store } self.exporter = export.DescriptorPackageArchiveExporter(self.log) self.loop.create_task(export.periodic_export_cleanup(self.log, self.loop, self.export_dir)) - attrs = dict(log=self.log, loop=self.loop) + self.get_vnfd_catalog = tasklet.get_vnfd_catalog + self.get_nsd_catalog = tasklet.get_nsd_catalog + catalog_map = { + "vnfd": self.get_vnfd_catalog, + "nsd": self.get_nsd_catalog + } - export_attrs = attrs.copy() - export_attrs.update({ - "store_map": self.package_store_map, - "exporter": self.exporter, - "catalog_map": { - "vnfd": self.vnfd_catalog, - "nsd": self.nsd_catalog - } - }) + self.upload_handler = UploadRpcHandler(self) + self.update_handler = UpdateRpcHandler(self) + self.export_handler = export.ExportRpcHandler(self, catalog_map) + + attrs = dict(log=self.log, loop=self.loop) super(UploaderApplication, self).__init__([ - (r"/api/update", UpdateHandler, attrs), - (r"/api/upload", UploadHandler, attrs), + (r"/api/package/vnfd/(.*)", pkg_handler.FileRestApiHandler, { + 'path': vnfd_store.root_dir}), + (r"/api/package/nsd/(.*)", pkg_handler.FileRestApiHandler, { + 'path': nsd_store.root_dir}), (r"/api/upload/([^/]+)/state", UploadStateHandler, attrs), (r"/api/update/([^/]+)/state", UpdateStateHandler, attrs), (r"/api/export/([^/]+)/state", export.ExportStateHandler, attrs), - (r"/api/export/(nsd|vnfd)$", export.ExportHandler, export_attrs), (r"/api/export/([^/]+.tar.gz)", tornado.web.StaticFileHandler, { "path": self.export_dir, }), @@ -835,47 +730,44 @@ class UploaderApplication(tornado.web.Application): }), ]) - @property - def log(self): - return self.tasklet.log - - @property - def loop(self): - return self.tasklet.loop + @asyncio.coroutine + def register(self): + yield from self.upload_handler.register() + yield from self.update_handler.register() + yield from self.export_handler.register() def get_logger(self, transaction_id): return message.Logger(self.log, self.messages[transaction_id]) - def onboard(self, part_streamer, transaction_id, auth=None): + def onboard(self, url, transaction_id, auth=None, project=None): log = message.Logger(self.log, self.messages[transaction_id]) - OnboardPackage( + onboard_package = OnboardPackage( log, self.loop, - part_streamer, + project, + url, auth, self.onboarder, self.uploader, self.package_store_map, - ).start() + ) + + self.loop.run_in_executor(None, onboard_package.download_package) - def update(self, part_streamer, transaction_id, auth=None): + def update(self, url, transaction_id, auth=None, project=None): log = message.Logger(self.log, self.messages[transaction_id]) - UpdatePackage( + update_package = UpdatePackage( log, self.loop, - part_streamer, + project, + url, auth, self.onboarder, self.uploader, self.package_store_map, - ).start() + ) - @property - def vnfd_catalog(self): - return self.tasklet.vnfd_catalog + self.loop.run_in_executor(None, update_package.download_package) - @property - def nsd_catalog(self): - return self.tasklet.nsd_catalog