X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwlaunchpad%2Fplugins%2Frwlaunchpadtasklet%2Frift%2Ftasklets%2Frwlaunchpad%2Fuploader.py;h=c908bb3b6b0deea252e845beb3f75bc3fe3802b7;hb=c71b002e33b3c7ba5bd69ef29d48acd0b8579fc5;hp=081c1f5c84c72617873c7d6534b2b767626afbc6;hpb=6f07e6f33f751ab4ffe624f6037f887b243bece2;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwlaunchpadtasklet/rift/tasklets/rwlaunchpad/uploader.py b/rwlaunchpad/plugins/rwlaunchpadtasklet/rift/tasklets/rwlaunchpad/uploader.py index 081c1f5c..c908bb3b 100644 --- a/rwlaunchpad/plugins/rwlaunchpadtasklet/rift/tasklets/rwlaunchpad/uploader.py +++ b/rwlaunchpad/plugins/rwlaunchpadtasklet/rift/tasklets/rwlaunchpad/uploader.py @@ -1,5 +1,5 @@ -# +# # Copyright 2016 RIFT.IO Inc # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -14,9 +14,11 @@ # See the License for the specific language governing permissions and # limitations under the License. # - +import abc +import asyncio import collections import os +import tempfile import threading import uuid import zlib @@ -49,11 +51,19 @@ import rift.package.charm import rift.package.checksums import rift.package.config import rift.package.convert +import rift.package.handler as pkg_handler import rift.package.icon import rift.package.package import rift.package.script import rift.package.store +from gi.repository import ( + RwDts as rwdts, + RwPkgMgmtYang) +import rift.downloader as downloader +import rift.mano.dts as mano_dts +import rift.tasklets + from . import ( export, extract, @@ -90,6 +100,8 @@ from .message import ( OnboardStart, OnboardSuccess, + DownloadError, + DownloadSuccess, # Update Error Messages UpdateChecksumMismatch, @@ -121,6 +133,11 @@ GB = 1024 * MB MAX_STREAMED_SIZE = 5 * GB +# Shortcuts +RPC_PACKAGE_CREATE_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_PackageCreate +RPC_PACKAGE_UPDATE_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_PackageUpdate + + class HttpMessageError(Exception): def __init__(self, code, msg): @@ -128,296 +145,76 @@ class HttpMessageError(Exception): self.msg = msg -class GzipTemporaryFileStreamedPart(multipart_streamer.TemporaryFileStreamedPart): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - # Create a decompressor for gzip data to decompress on the fly during upload - # http://stackoverflow.com/questions/2423866/python-decompressing-gzip-chunk-by-chunk - self._decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS) - - def feed(self, data): - decompressed_data = self._decompressor.decompress(data) - if decompressed_data: - super().feed(decompressed_data) - - def finalize(self): - # All data has arrived, flush the decompressor to get any last decompressed data - decompressed_data = self._decompressor.flush() - super().feed(decompressed_data) - super().finalize() - - -class GzipMultiPartStreamer(multipart_streamer.MultiPartStreamer): - """ This Multipart Streamer decompresses gzip files on the fly during multipart upload """ - - @staticmethod - def _get_descriptor_name_from_headers(headers): - descriptor_filename = None - - for entry in headers: - if entry["value"] != "form-data": - continue - - form_data_params = entry["params"] - if "name" in form_data_params: - if form_data_params["name"] != "descriptor": - continue - - if "filename" not in form_data_params: - continue - - descriptor_filename = form_data_params["filename"] - - return descriptor_filename - - def create_part(self, headers): - """ Create the StreamedPart subclass depending on the descriptor filename - - For gzipped descriptor packages, create a GzipTemporaryFileStreamedPart which - can decompress the gzip while it's being streamed into the launchpad directely - into a file. - - Returns: - The descriptor filename - """ - filename = GzipMultiPartStreamer._get_descriptor_name_from_headers(headers) - if filename is None or not filename.endswith(".gz"): - return multipart_streamer.TemporaryFileStreamedPart(self, headers) - - return GzipTemporaryFileStreamedPart(self, headers) - - -class RequestHandler(tornado.web.RequestHandler): - def options(self, *args, **kargs): - pass - - def set_default_headers(self): - self.set_header('Access-Control-Allow-Origin', '*') - self.set_header('Access-Control-Allow-Headers', - 'Content-Type, Cache-Control, Accept, X-Requested-With, Authorization') - self.set_header('Access-Control-Allow-Methods', 'POST, GET, PUT, DELETE') - - -@tornado.web.stream_request_body -class StreamingUploadHandler(RequestHandler): - def initialize(self, log, loop): - """Initialize the handler - - Arguments: - log - the logger that this handler should use - loop - the tasklets ioloop - +class UploadRpcHandler(mano_dts.AbstractRpcHandler): + def __init__(self, log, dts, loop, application): """ - self.transaction_id = str(uuid.uuid4()) - - self.loop = loop - self.log = self.application.get_logger(self.transaction_id) - - self.part_streamer = None - - self.log.debug('created handler (transaction_id = {})'.format(self.transaction_id)) - - def msg_missing_content_type(self): - raise NotImplementedError() - - def msg_unsupported_media_type(self): - raise NotImplementedError() - - def msg_missing_content_boundary(self): - raise NotImplementedError() - - def msg_start(self): - raise NotImplementedError() - - def msg_success(self): - raise NotImplementedError() - - def msg_failure(self): - raise NotImplementedError() - - def msg_package_upload(self): - raise NotImplementedError() - - @tornado.gen.coroutine - def prepare(self): - """Prepare the handler for a request - - The prepare function is the first part of a request transaction. It - creates a temporary file that uploaded data can be written to. - + Args: + application: UploaderApplication """ - if self.request.method != "POST": - return - - self.request.connection.set_max_body_size(MAX_STREAMED_SIZE) - - self.log.message(self.msg_start()) - - try: - # Retrieve the content type and parameters from the request - content_type = self.request.headers.get('content-type', None) - if content_type is None: - raise HttpMessageError(400, self.msg_missing_content_type()) - - content_type, params = tornado.httputil._parse_header(content_type) - - if "multipart/form-data" != content_type.lower(): - raise HttpMessageError(415, self.msg_unsupported_media_type()) + super().__init__(log, dts, loop) + self.application = application - if "boundary" not in params: - raise HttpMessageError(400, self.msg_missing_content_boundary()) - - # You can get the total request size from the headers. - try: - total = int(self.request.headers.get("Content-Length", "0")) - except KeyError: - self.log.warning("Content length header not found") - # For any well formed browser request, Content-Length should have a value. - total = 0 - - # And here you create a streamer that will accept incoming data - self.part_streamer = GzipMultiPartStreamer(total) - - except HttpMessageError as e: - self.log.message(e.msg) - self.log.message(self.msg_failure()) - - raise tornado.web.HTTPError(e.code, e.msg.name) + @property + def xpath(self): + return "/rw-pkg-mgmt:package-create" - except Exception as e: - self.log.exception(e) - self.log.message(self.msg_failure()) + @asyncio.coroutine + def callback(self, ks_path, msg): + transaction_id = str(uuid.uuid4()) + log = self.application.get_logger(transaction_id) + log.message(OnboardStart()) - @tornado.gen.coroutine - def data_received(self, chunk): - """Write data to the current file - Arguments: - data - a chunk of data to write to file + auth = None + if msg.username is not None: + auth = (msg.username, msg.password) - """ + self.application.onboard( + msg.external_url, + transaction_id, + auth=auth + ) - """When a chunk of data is received, we forward it to the multipart streamer.""" - self.part_streamer.data_received(chunk) + rpc_op = RPC_PACKAGE_CREATE_ENDPOINT.from_dict({ + "transaction_id": transaction_id}) - def post(self): - """Handle a post request + return rpc_op - The function is called after any data associated with the body of the - request has been received. +class UpdateRpcHandler(mano_dts.AbstractRpcHandler): + def __init__(self, log, dts, loop, application): """ - # You MUST call this to close the incoming stream. - self.part_streamer.data_complete() - - desc_parts = self.part_streamer.get_parts_by_name("descriptor") - if len(desc_parts) != 1: - raise HttpMessageError(400, OnboardError("Descriptor option not found")) - - self.log.message(self.msg_package_upload()) - - -class UploadHandler(StreamingUploadHandler): - """ - This handler is used to upload archives that contain VNFDs, NSDs, and PNFDs - to the launchpad. This is a streaming handler that writes uploaded archives - to disk without loading them all into memory. - """ - - def msg_missing_content_type(self): - return OnboardMissingContentType() - - def msg_unsupported_media_type(self): - return OnboardUnsupportedMediaType() - - def msg_missing_content_boundary(self): - return OnboardMissingContentBoundary() - - def msg_start(self): - return OnboardStart() - - def msg_success(self): - return OnboardSuccess() - - def msg_failure(self): - return OnboardFailure() - - def msg_package_upload(self): - return OnboardPackageUpload() - - def post(self): - """Handle a post request - - The function is called after any data associated with the body of the - request has been received. - + Args: + application: UploaderApplication """ - try: - super().post() - self.application.onboard( - self.part_streamer, - self.transaction_id, - auth=self.request.headers.get('authorization', None), - ) - - self.set_status(200) - self.write(tornado.escape.json_encode({ - "transaction_id": self.transaction_id, - })) - - except Exception: - self.log.exception("Upload POST failed") - self.part_streamer.release_parts() - raise - - -class UpdateHandler(StreamingUploadHandler): - def msg_missing_content_type(self): - return UpdateMissingContentType() - - def msg_unsupported_media_type(self): - return UpdateUnsupportedMediaType() - - def msg_missing_content_boundary(self): - return UpdateMissingContentBoundary() - - def msg_start(self): - return UpdateStart() + super().__init__(log, dts, loop) + self.application = application - def msg_success(self): - return UpdateSuccess() - - def msg_failure(self): - return UpdateFailure() + @property + def xpath(self): + return "/rw-pkg-mgmt:package-update" - def msg_package_upload(self): - return UpdatePackageUpload() + @asyncio.coroutine + def callback(self, ks_path, msg): - def post(self): - """Handle a post request + transaction_id = str(uuid.uuid4()) + log = self.application.get_logger(transaction_id) + log.message(UpdateStart()) - The function is called after any data associated with the body of the - request has been received. + auth = None + if msg.username is not None: + auth = (msg.username, msg.password) - """ - try: - super().post() + self.application.update( + msg.external_url, + transaction_id, + auth=auth + ) - self.application.update( - self.part_streamer, - self.transaction_id, - auth=self.request.headers.get('authorization', None), - ) + rpc_op = RPC_PACKAGE_UPDATE_ENDPOINT.from_dict({ + "transaction_id": transaction_id}) - self.set_status(200) - self.write(tornado.escape.json_encode({ - "transaction_id": self.transaction_id, - })) - except Exception: - self.log.exception("Upload POST failed") - self.part_streamer.release_parts() - raise + return rpc_op class UploadStateHandler(state.StateHandler): @@ -432,27 +229,29 @@ class UpdateStateHandler(state.StateHandler): FAILURE = UpdateFailure -class UpdatePackage(threading.Thread): - def __init__(self, log, loop, part_streamer, auth, +class UpdatePackage(downloader.DownloaderProtocol): + + def __init__(self, log, loop, url, auth, onboarder, uploader, package_store_map): super().__init__() self.log = log self.loop = loop - self.part_streamer = part_streamer + self.url = url self.auth = auth self.onboarder = onboarder self.uploader = uploader self.package_store_map = package_store_map - self.io_loop = tornado.ioloop.IOLoop.current() - def _update_package(self): + def _update_package(self, packages): + # Extract package could return multiple packages if # the package is converted - for pkg in self.extract_package(): + for pkg in packages: with pkg as temp_package: package_checksums = self.validate_package(temp_package) stored_package = self.update_package(temp_package) + self.validate_vnfd_fields(temp_package) try: self.extract_charms(temp_package) @@ -469,9 +268,9 @@ class UpdatePackage(threading.Thread): else: self.upload_images(temp_package, package_checksums) - def run(self): + def extract(self, packages): try: - self._update_package() + self._update_package(packages) self.log.message(UpdateSuccess()) except MessageException as e: @@ -484,23 +283,32 @@ class UpdatePackage(threading.Thread): self.log.message(UpdateError(str(e))) self.log.message(UpdateFailure()) - def extract_package(self): - """Extract multipart message from tarball""" - desc_part = self.part_streamer.get_parts_by_name("descriptor")[0] - - # Invoke the move API to prevent the part streamer from attempting - # to clean up (the file backed package will do that itself) - desc_part.move(desc_part.f_out.name) - - package_name = desc_part.get_filename() - package_path = desc_part.f_out.name + def on_download_succeeded(self, job): + self.log.message(DownloadSuccess("Package downloaded.")) extractor = extract.UploadPackageExtractor(self.log) file_backed_packages = extractor.create_packages_from_upload( - package_name, package_path + job.filename, job.filepath ) - return file_backed_packages + self.extract(file_backed_packages) + + def on_download_failed(self, job): + self.log.error(job.detail) + self.log.message(DownloadError("Package download failed. {}".format(job.detail))) + self.log.message(UpdateFailure()) + + def download_package(self): + + _, filename = tempfile.mkstemp() + url_downloader = downloader.UrlDownloader( + self.url, + auth=self.auth, + file_obj=filename, + decompress_on_fly=True, + log=self.log) + url_downloader.delegate = self + url_downloader.download() def get_package_store(self, package): return self.package_store_map[package.descriptor_type] @@ -556,7 +364,6 @@ class UpdatePackage(threading.Thread): finally: _ = [image_hdl.close() for image_hdl in name_hdl_map.values()] - def extract_charms(self, package): try: charm_extractor = rift.package.charm.PackageCharmExtractor(self.log) @@ -585,6 +392,23 @@ class UpdatePackage(threading.Thread): except rift.package.icon.IconExtractionError as e: raise MessageException(UpdateExtractionError()) from e + def validate_vnfd_fields(self, package): + # We can add more VNFD validations here. Currently we are validating only cloud-init + if package.descriptor_msg is not None: + self.validate_cloud_init_file(package) + + def validate_cloud_init_file(self, package): + """ This validation is for VNFDs with associated VDUs. """ + if 'vdu' in package.descriptor_msg.as_dict(): + for vdu in package.descriptor_msg.as_dict()['vdu']: + if 'cloud_init_file' in vdu: + cloud_init_file = vdu['cloud_init_file'] + for file in package.files: + if file.endswith('/' + cloud_init_file) is True: + return + raise MessageException( + OnboardError("Cloud-Init file reference in VNFD does not match with cloud-init file")) + def validate_package(self, package): checksum_validator = rift.package.package.PackageChecksumValidator(self.log) @@ -608,27 +432,26 @@ class UpdatePackage(threading.Thread): raise MessageException(UpdateDescriptorError(package.descriptor_file)) from e -class OnboardPackage(threading.Thread): - def __init__(self, log, loop, part_streamer, auth, +class OnboardPackage(downloader.DownloaderProtocol): + + def __init__(self, log, loop, url, auth, onboarder, uploader, package_store_map): - super().__init__() self.log = log self.loop = loop - self.part_streamer = part_streamer + self.url = url self.auth = auth self.onboarder = onboarder self.uploader = uploader self.package_store_map = package_store_map - self.io_loop = tornado.ioloop.IOLoop.current() - - def _onboard_package(self): + def _onboard_package(self, packages): # Extract package could return multiple packages if # the package is converted - for pkg in self.extract_package(): + for pkg in packages: with pkg as temp_package: package_checksums = self.validate_package(temp_package) stored_package = self.store_package(temp_package) + self.validate_vnfd_fields(temp_package) try: self.extract_charms(temp_package) @@ -645,9 +468,9 @@ class OnboardPackage(threading.Thread): else: self.upload_images(temp_package, package_checksums) - def run(self): + def extract(self, packages): try: - self._onboard_package() + self._onboard_package(packages) self.log.message(OnboardSuccess()) except MessageException as e: @@ -660,26 +483,32 @@ class OnboardPackage(threading.Thread): self.log.message(OnboardError(str(e))) self.log.message(OnboardFailure()) - finally: - self.part_streamer.release_parts() - - def extract_package(self): - """Extract multipart message from tarball""" - desc_part = self.part_streamer.get_parts_by_name("descriptor")[0] - - # Invoke the move API to prevent the part streamer from attempting - # to clean up (the file backed package will do that itself) - desc_part.move(desc_part.f_out.name) - - package_name = desc_part.get_filename() - package_path = desc_part.f_out.name + def on_download_succeeded(self, job): + self.log.message(DownloadSuccess("Package downloaded.")) extractor = extract.UploadPackageExtractor(self.log) file_backed_packages = extractor.create_packages_from_upload( - package_name, package_path + job.filename, job.filepath ) - return file_backed_packages + self.extract(file_backed_packages) + + def on_download_failed(self, job): + self.log.error(job.detail) + self.log.message(DownloadError("Package download failed. {}".format(job.detail))) + self.log.message(OnboardFailure()) + + def download_package(self): + + _, filename = tempfile.mkstemp() + url_downloader = downloader.UrlDownloader( + self.url, + auth=self.auth, + file_obj=filename, + decompress_on_fly=True, + log=self.log) + url_downloader.delegate = self + url_downloader.download() def get_package_store(self, package): return self.package_store_map[package.descriptor_type] @@ -759,6 +588,23 @@ class OnboardPackage(threading.Thread): except rift.package.icon.IconExtractionError as e: raise MessageException(OnboardExtractionError()) from e + def validate_vnfd_fields(self, package): + # We can add more VNFD validations here. Currently we are validating only cloud-init + if package.descriptor_msg is not None: + self.validate_cloud_init_file(package) + + def validate_cloud_init_file(self, package): + """ This validation is for VNFDs with associated VDUs. """ + if 'vdu' in package.descriptor_msg.as_dict(): + for vdu in package.descriptor_msg.as_dict()['vdu']: + if 'cloud_init_file' in vdu: + cloud_init_file = vdu['cloud_init_file'] + for file in package.files: + if file.endswith('/' + cloud_init_file) is True: + return + raise MessageException( + OnboardError("Cloud-Init file reference in VNFD does not match with cloud-init file")) + def validate_package(self, package): checksum_validator = rift.package.package.PackageChecksumValidator(self.log) @@ -783,50 +629,97 @@ class OnboardPackage(threading.Thread): class UploaderApplication(tornado.web.Application): - def __init__(self, tasklet): - self.tasklet = tasklet + + @classmethod + def from_tasklet(cls, tasklet): + manifest = tasklet.tasklet_info.get_pb_manifest() + use_ssl = manifest.bootstrap_phase.rwsecurity.use_ssl + ssl_cert = manifest.bootstrap_phase.rwsecurity.cert + ssl_key = manifest.bootstrap_phase.rwsecurity.key + return cls( + tasklet.log, + tasklet.dts, + tasklet.loop, + ssl=(ssl_cert, ssl_key), + vnfd_store=tasklet.vnfd_package_store, + nsd_store=tasklet.nsd_package_store, + vnfd_catalog=tasklet.vnfd_catalog, + nsd_catalog=tasklet.nsd_catalog) + + def __init__( + self, + log, + dts, + loop, + ssl=None, + vnfd_store=None, + nsd_store=None, + vnfd_catalog=None, + nsd_catalog=None): + + self.log = log + self.loop = loop + self.dts = dts + + self.use_ssl = False + self.ssl_cert, self.ssl_key = None, None + if ssl: + self.use_ssl = True + self.ssl_cert, self.ssl_key = ssl + + if not vnfd_store: + vnfd_store = rift.package.store.VnfdPackageFilesystemStore(self.log) + + if not nsd_store: + nsd_store = rift.package.store.NsdPackageFilesystemStore(self.log) + self.accounts = [] self.messages = collections.defaultdict(list) self.export_dir = os.path.join(os.environ['RIFT_ARTIFACTS'], 'launchpad/exports') - manifest = tasklet.tasklet_info.get_pb_manifest() - self.use_ssl = manifest.bootstrap_phase.rwsecurity.use_ssl - self.ssl_cert = manifest.bootstrap_phase.rwsecurity.cert - self.ssl_key = manifest.bootstrap_phase.rwsecurity.key - - self.uploader = image.ImageUploader(self.log, self.loop, tasklet.dts) + self.uploader = image.ImageUploader(self.log, self.loop, self.dts) self.onboarder = onboard.DescriptorOnboarder( self.log, "127.0.0.1", 8008, self.use_ssl, self.ssl_cert, self.ssl_key ) self.package_store_map = { - "vnfd": self.tasklet.vnfd_package_store, - "nsd": self.tasklet.nsd_package_store, + "vnfd": vnfd_store, + "nsd": nsd_store } self.exporter = export.DescriptorPackageArchiveExporter(self.log) self.loop.create_task(export.periodic_export_cleanup(self.log, self.loop, self.export_dir)) - attrs = dict(log=self.log, loop=self.loop) + self.vnfd_catalog = vnfd_catalog + self.nsd_catalog = nsd_catalog + catalog_map = { + "vnfd": self.vnfd_catalog, + "nsd": self.nsd_catalog + } + + self.upload_handler = UploadRpcHandler(self.log, self.dts, self.loop, self) + self.update_handler = UpdateRpcHandler(self.log, self.dts, self.loop, self) + self.export_handler = export.ExportRpcHandler( + self.log, + self.dts, + self.loop, + self, + store_map=self.package_store_map, + exporter=self.exporter, + catalog_map=catalog_map + ) - export_attrs = attrs.copy() - export_attrs.update({ - "store_map": self.package_store_map, - "exporter": self.exporter, - "catalog_map": { - "vnfd": self.vnfd_catalog, - "nsd": self.nsd_catalog - } - }) + attrs = dict(log=self.log, loop=self.loop) super(UploaderApplication, self).__init__([ - (r"/api/update", UpdateHandler, attrs), - (r"/api/upload", UploadHandler, attrs), + (r"/api/package/vnfd/(.*)", pkg_handler.FileRestApiHandler, { + 'path': vnfd_store.root_dir}), + (r"/api/package/nsd/(.*)", pkg_handler.FileRestApiHandler, { + 'path': nsd_store.root_dir}), (r"/api/upload/([^/]+)/state", UploadStateHandler, attrs), (r"/api/update/([^/]+)/state", UpdateStateHandler, attrs), (r"/api/export/([^/]+)/state", export.ExportStateHandler, attrs), - (r"/api/export/(nsd|vnfd)$", export.ExportHandler, export_attrs), (r"/api/export/([^/]+.tar.gz)", tornado.web.StaticFileHandler, { "path": self.export_dir, }), @@ -835,47 +728,42 @@ class UploaderApplication(tornado.web.Application): }), ]) - @property - def log(self): - return self.tasklet.log - - @property - def loop(self): - return self.tasklet.loop + @asyncio.coroutine + def register(self): + yield from self.upload_handler.register() + yield from self.update_handler.register() + yield from self.export_handler.register() def get_logger(self, transaction_id): return message.Logger(self.log, self.messages[transaction_id]) - def onboard(self, part_streamer, transaction_id, auth=None): + def onboard(self, url, transaction_id, auth=None): log = message.Logger(self.log, self.messages[transaction_id]) - OnboardPackage( + onboard_package = OnboardPackage( log, self.loop, - part_streamer, + url, auth, self.onboarder, self.uploader, self.package_store_map, - ).start() + ) - def update(self, part_streamer, transaction_id, auth=None): + self.loop.run_in_executor(None, onboard_package.download_package) + + def update(self, url, transaction_id, auth=None): log = message.Logger(self.log, self.messages[transaction_id]) - UpdatePackage( + update_package = UpdatePackage( log, self.loop, - part_streamer, + url, auth, self.onboarder, self.uploader, self.package_store_map, - ).start() + ) - @property - def vnfd_catalog(self): - return self.tasklet.vnfd_catalog + self.loop.run_in_executor(None, update_package.download_package) - @property - def nsd_catalog(self): - return self.tasklet.nsd_catalog