X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;ds=sidebyside;f=rwlaunchpad%2Fplugins%2Frwlaunchpadtasklet%2Frift%2Ftasklets%2Frwlaunchpad%2Fuploader.py;h=7aafcb94298ab0ad51772916bc41b530423389b1;hb=refs%2Fchanges%2F77%2F5477%2F1;hp=081c1f5c84c72617873c7d6534b2b767626afbc6;hpb=255ff03a528a3090ce7f46f0a63b65da3e6f9bcf;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwlaunchpadtasklet/rift/tasklets/rwlaunchpad/uploader.py b/rwlaunchpad/plugins/rwlaunchpadtasklet/rift/tasklets/rwlaunchpad/uploader.py index 081c1f5c..7aafcb94 100644 --- a/rwlaunchpad/plugins/rwlaunchpadtasklet/rift/tasklets/rwlaunchpad/uploader.py +++ b/rwlaunchpad/plugins/rwlaunchpadtasklet/rift/tasklets/rwlaunchpad/uploader.py @@ -1,5 +1,5 @@ -# +# # Copyright 2016 RIFT.IO Inc # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -14,12 +14,15 @@ # See the License for the specific language governing permissions and # limitations under the License. # - +import abc +import asyncio import collections import os +import tempfile import threading import uuid import zlib +import re import tornado import tornado.escape @@ -36,24 +39,31 @@ requests.packages.urllib3.disable_warnings(InsecureRequestWarning) import gi gi.require_version('RwLaunchpadYang', '1.0') -gi.require_version('NsdYang', '1.0') -gi.require_version('VnfdYang', '1.0') +gi.require_version('ProjectNsdYang', '1.0') +gi.require_version('ProjectVnfdYang', '1.0') from gi.repository import ( - NsdYang, - VnfdYang, + ProjectNsdYang as NsdYang, + ProjectVnfdYang as VnfdYang, ) import rift.mano.cloud -import rift.package.charm import rift.package.checksums -import rift.package.config import rift.package.convert +import rift.package.handler as pkg_handler import rift.package.icon import rift.package.package import rift.package.script import rift.package.store +from gi.repository import ( + RwDts as rwdts, + RwPkgMgmtYang + ) +import rift.downloader as downloader +import rift.mano.dts as mano_dts +import rift.tasklets + from . import ( export, extract, @@ -90,6 +100,8 @@ from .message import ( OnboardStart, OnboardSuccess, + DownloadError, + DownloadSuccess, # Update Error Messages UpdateChecksumMismatch, @@ -116,11 +128,16 @@ from .message import ( from .tosca import ExportTosca +from .onboard import OnboardError as OnboardException + MB = 1024 * 1024 GB = 1024 * MB MAX_STREAMED_SIZE = 5 * GB +# Shortcuts +RPC_PACKAGE_CREATE_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_PackageCreate +RPC_PACKAGE_UPDATE_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_PackageUpdate class HttpMessageError(Exception): def __init__(self, code, msg): @@ -128,296 +145,90 @@ class HttpMessageError(Exception): self.msg = msg -class GzipTemporaryFileStreamedPart(multipart_streamer.TemporaryFileStreamedPart): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - # Create a decompressor for gzip data to decompress on the fly during upload - # http://stackoverflow.com/questions/2423866/python-decompressing-gzip-chunk-by-chunk - self._decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS) - - def feed(self, data): - decompressed_data = self._decompressor.decompress(data) - if decompressed_data: - super().feed(decompressed_data) - - def finalize(self): - # All data has arrived, flush the decompressor to get any last decompressed data - decompressed_data = self._decompressor.flush() - super().feed(decompressed_data) - super().finalize() - - -class GzipMultiPartStreamer(multipart_streamer.MultiPartStreamer): - """ This Multipart Streamer decompresses gzip files on the fly during multipart upload """ - - @staticmethod - def _get_descriptor_name_from_headers(headers): - descriptor_filename = None - - for entry in headers: - if entry["value"] != "form-data": - continue - - form_data_params = entry["params"] - if "name" in form_data_params: - if form_data_params["name"] != "descriptor": - continue - - if "filename" not in form_data_params: - continue - - descriptor_filename = form_data_params["filename"] - - return descriptor_filename - - def create_part(self, headers): - """ Create the StreamedPart subclass depending on the descriptor filename - - For gzipped descriptor packages, create a GzipTemporaryFileStreamedPart which - can decompress the gzip while it's being streamed into the launchpad directely - into a file. - - Returns: - The descriptor filename +class UploadRpcHandler(mano_dts.AbstractRpcHandler): + def __init__(self, application): """ - filename = GzipMultiPartStreamer._get_descriptor_name_from_headers(headers) - if filename is None or not filename.endswith(".gz"): - return multipart_streamer.TemporaryFileStreamedPart(self, headers) - - return GzipTemporaryFileStreamedPart(self, headers) - - -class RequestHandler(tornado.web.RequestHandler): - def options(self, *args, **kargs): - pass - - def set_default_headers(self): - self.set_header('Access-Control-Allow-Origin', '*') - self.set_header('Access-Control-Allow-Headers', - 'Content-Type, Cache-Control, Accept, X-Requested-With, Authorization') - self.set_header('Access-Control-Allow-Methods', 'POST, GET, PUT, DELETE') - - -@tornado.web.stream_request_body -class StreamingUploadHandler(RequestHandler): - def initialize(self, log, loop): - """Initialize the handler - - Arguments: - log - the logger that this handler should use - loop - the tasklets ioloop - + Args: + application: UploaderApplication """ - self.transaction_id = str(uuid.uuid4()) - - self.loop = loop - self.log = self.application.get_logger(self.transaction_id) - - self.part_streamer = None - - self.log.debug('created handler (transaction_id = {})'.format(self.transaction_id)) - - def msg_missing_content_type(self): - raise NotImplementedError() + super().__init__(application.log, application.dts, application.loop) + self.application = application - def msg_unsupported_media_type(self): - raise NotImplementedError() - - def msg_missing_content_boundary(self): - raise NotImplementedError() - - def msg_start(self): - raise NotImplementedError() - - def msg_success(self): - raise NotImplementedError() - - def msg_failure(self): - raise NotImplementedError() - - def msg_package_upload(self): - raise NotImplementedError() - - @tornado.gen.coroutine - def prepare(self): - """Prepare the handler for a request - - The prepare function is the first part of a request transaction. It - creates a temporary file that uploaded data can be written to. + @property + def xpath(self): + return "/rw-pkg-mgmt:package-create" - """ - if self.request.method != "POST": - return + @asyncio.coroutine + def callback(self, ks_path, msg): + transaction_id = str(uuid.uuid4()) + log = self.application.get_logger(transaction_id) + log.message(OnboardStart()) - self.request.connection.set_max_body_size(MAX_STREAMED_SIZE) + self.log.debug("Package create RPC: {}".format(msg)) - self.log.message(self.msg_start()) + auth = None + if msg.username is not None: + auth = (msg.username, msg.password) try: - # Retrieve the content type and parameters from the request - content_type = self.request.headers.get('content-type', None) - if content_type is None: - raise HttpMessageError(400, self.msg_missing_content_type()) - - content_type, params = tornado.httputil._parse_header(content_type) - - if "multipart/form-data" != content_type.lower(): - raise HttpMessageError(415, self.msg_unsupported_media_type()) - - if "boundary" not in params: - raise HttpMessageError(400, self.msg_missing_content_boundary()) - - # You can get the total request size from the headers. - try: - total = int(self.request.headers.get("Content-Length", "0")) - except KeyError: - self.log.warning("Content length header not found") - # For any well formed browser request, Content-Length should have a value. - total = 0 - - # And here you create a streamer that will accept incoming data - self.part_streamer = GzipMultiPartStreamer(total) - - except HttpMessageError as e: - self.log.message(e.msg) - self.log.message(self.msg_failure()) + project = msg.project_name + except AttributeError as e: + self._log.warning("Did not get project name in RPC: {}". + format(msg.as_dict())) + project = rift.mano.utils.project.DEFAULT_PROJECT + + self.application.onboard( + msg.external_url, + transaction_id, + auth=auth, + project=project, + ) - raise tornado.web.HTTPError(e.code, e.msg.name) + rpc_op = RPC_PACKAGE_CREATE_ENDPOINT.from_dict({ + "transaction_id": transaction_id, + "project_name": project, + }) - except Exception as e: - self.log.exception(e) - self.log.message(self.msg_failure()) + return rpc_op - @tornado.gen.coroutine - def data_received(self, chunk): - """Write data to the current file - - Arguments: - data - a chunk of data to write to file +class UpdateRpcHandler(mano_dts.AbstractRpcHandler): + def __init__(self, application): """ - - """When a chunk of data is received, we forward it to the multipart streamer.""" - self.part_streamer.data_received(chunk) - - def post(self): - """Handle a post request - - The function is called after any data associated with the body of the - request has been received. - + Args: + application: UploaderApplication """ - # You MUST call this to close the incoming stream. - self.part_streamer.data_complete() - - desc_parts = self.part_streamer.get_parts_by_name("descriptor") - if len(desc_parts) != 1: - raise HttpMessageError(400, OnboardError("Descriptor option not found")) - - self.log.message(self.msg_package_upload()) - - -class UploadHandler(StreamingUploadHandler): - """ - This handler is used to upload archives that contain VNFDs, NSDs, and PNFDs - to the launchpad. This is a streaming handler that writes uploaded archives - to disk without loading them all into memory. - """ - - def msg_missing_content_type(self): - return OnboardMissingContentType() + super().__init__(application.log, application.dts, application.loop) + self.application = application - def msg_unsupported_media_type(self): - return OnboardUnsupportedMediaType() - - def msg_missing_content_boundary(self): - return OnboardMissingContentBoundary() - - def msg_start(self): - return OnboardStart() - - def msg_success(self): - return OnboardSuccess() - - def msg_failure(self): - return OnboardFailure() - - def msg_package_upload(self): - return OnboardPackageUpload() - - def post(self): - """Handle a post request - - The function is called after any data associated with the body of the - request has been received. - - """ - try: - super().post() - self.application.onboard( - self.part_streamer, - self.transaction_id, - auth=self.request.headers.get('authorization', None), - ) - - self.set_status(200) - self.write(tornado.escape.json_encode({ - "transaction_id": self.transaction_id, - })) - - except Exception: - self.log.exception("Upload POST failed") - self.part_streamer.release_parts() - raise - - -class UpdateHandler(StreamingUploadHandler): - def msg_missing_content_type(self): - return UpdateMissingContentType() - - def msg_unsupported_media_type(self): - return UpdateUnsupportedMediaType() - - def msg_missing_content_boundary(self): - return UpdateMissingContentBoundary() - - def msg_start(self): - return UpdateStart() - - def msg_success(self): - return UpdateSuccess() - - def msg_failure(self): - return UpdateFailure() + @property + def xpath(self): + return "/rw-pkg-mgmt:package-update" - def msg_package_upload(self): - return UpdatePackageUpload() + @asyncio.coroutine + def callback(self, ks_path, msg): - def post(self): - """Handle a post request + transaction_id = str(uuid.uuid4()) + log = self.application.get_logger(transaction_id) + log.message(UpdateStart()) - The function is called after any data associated with the body of the - request has been received. + auth = None + if msg.username is not None: + auth = (msg.username, msg.password) - """ - try: - super().post() + self.application.update( + msg.external_url, + transaction_id, + auth=auth, + project=msg.project_name, + ) - self.application.update( - self.part_streamer, - self.transaction_id, - auth=self.request.headers.get('authorization', None), - ) + rpc_op = RPC_PACKAGE_UPDATE_ENDPOINT.from_dict({ + "transaction_id": transaction_id, + "project_name": msg.project_name, + }) - self.set_status(200) - self.write(tornado.escape.json_encode({ - "transaction_id": self.transaction_id, - })) - except Exception: - self.log.exception("Upload POST failed") - self.part_streamer.release_parts() - raise + return rpc_op class UploadStateHandler(state.StateHandler): @@ -432,34 +243,34 @@ class UpdateStateHandler(state.StateHandler): FAILURE = UpdateFailure -class UpdatePackage(threading.Thread): - def __init__(self, log, loop, part_streamer, auth, - onboarder, uploader, package_store_map): +class UpdatePackage(downloader.DownloaderProtocol): + + def __init__(self, log, loop, project, url, auth, + onboarder, uploader, package_store_map, transaction_id): super().__init__() self.log = log self.loop = loop - self.part_streamer = part_streamer + self.project = project + self.url = url self.auth = auth self.onboarder = onboarder self.uploader = uploader self.package_store_map = package_store_map + self.transaction_id = transaction_id + - self.io_loop = tornado.ioloop.IOLoop.current() + def _update_package(self, packages): - def _update_package(self): # Extract package could return multiple packages if # the package is converted - for pkg in self.extract_package(): + for pkg in packages: with pkg as temp_package: package_checksums = self.validate_package(temp_package) stored_package = self.update_package(temp_package) + self.validate_descriptor_fields(temp_package) try: - self.extract_charms(temp_package) - self.extract_scripts(temp_package) - self.extract_configs(temp_package) self.extract_icons(temp_package) - self.update_descriptors(temp_package) except Exception: @@ -469,14 +280,15 @@ class UpdatePackage(threading.Thread): else: self.upload_images(temp_package, package_checksums) - def run(self): + def extract(self, packages): try: - self._update_package() + self._update_package(packages) self.log.message(UpdateSuccess()) except MessageException as e: self.log.message(e.msg) self.log.message(UpdateFailure()) + raise UpdateFailure(str(e)) except Exception as e: self.log.exception(e) @@ -484,23 +296,44 @@ class UpdatePackage(threading.Thread): self.log.message(UpdateError(str(e))) self.log.message(UpdateFailure()) - def extract_package(self): - """Extract multipart message from tarball""" - desc_part = self.part_streamer.get_parts_by_name("descriptor")[0] - - # Invoke the move API to prevent the part streamer from attempting - # to clean up (the file backed package will do that itself) - desc_part.move(desc_part.f_out.name) - - package_name = desc_part.get_filename() - package_path = desc_part.f_out.name + def on_download_succeeded(self, job): + self.log.message(DownloadSuccess("Package downloaded.")) extractor = extract.UploadPackageExtractor(self.log) file_backed_packages = extractor.create_packages_from_upload( - package_name, package_path + job.filename, job.filepath ) - - return file_backed_packages + try: + self.extract(file_backed_packages) + except Exception as e: + raise Exception("Error in Package Update") + + def on_download_finished(self, job): + self.log.debug("*** Download completed") + if hasattr(self.project, 'update_status_handler'): + self.project.update_status_handler.update_status(job, self.transaction_id) + + def on_download_progress(self, job): + self.log.debug("*** Download in progress") + if hasattr(self.project, 'update_status_handler'): + self.project.update_status_handler.update_status(job, self.transaction_id) + + def on_download_failed(self, job): + self.log.error(job.detail) + self.log.message(DownloadError("Package download failed. {}".format(job.detail))) + self.log.message(UpdateFailure()) + + def download_package(self): + + _, filename = tempfile.mkstemp() + url_downloader = downloader.UrlDownloader( + self.url, + auth=self.auth, + file_obj=filename, + decompress_on_fly=True, + log=self.log) + url_downloader.delegate = self + url_downloader.download() def get_package_store(self, package): return self.package_store_map[package.descriptor_type] @@ -547,7 +380,7 @@ class UpdatePackage(threading.Thread): ) try: self.uploader.upload_image(image_name, image_checksum, image_hdl) - self.uploader.upload_image_to_cloud_accounts(image_name, image_checksum) + self.uploader.upload_image_to_cloud_accounts(image_name, image_checksum, self.project) except image.ImageUploadError as e: self.log.exception("Failed to upload image: %s", image_name) @@ -556,28 +389,6 @@ class UpdatePackage(threading.Thread): finally: _ = [image_hdl.close() for image_hdl in name_hdl_map.values()] - - def extract_charms(self, package): - try: - charm_extractor = rift.package.charm.PackageCharmExtractor(self.log) - charm_extractor.extract_charms(package) - except rift.package.charm.CharmExtractionError as e: - raise MessageException(UpdateExtractionError()) from e - - def extract_scripts(self, package): - try: - script_extractor = rift.package.script.PackageScriptExtractor(self.log) - script_extractor.extract_scripts(package) - except rift.package.script.ScriptExtractionError as e: - raise MessageException(UpdateExtractionError()) from e - - def extract_configs(self, package): - try: - config_extractor = rift.package.config.PackageConfigExtractor(self.log) - config_extractor.extract_configs(package) - except rift.package.config.ConfigExtractionError as e: - raise MessageException(UpdateExtractionError()) from e - def extract_icons(self, package): try: icon_extractor = rift.package.icon.PackageIconExtractor(self.log) @@ -585,17 +396,34 @@ class UpdatePackage(threading.Thread): except rift.package.icon.IconExtractionError as e: raise MessageException(UpdateExtractionError()) from e + def validate_descriptor_fields(self, package): + # We can add more VNFD validations here. Currently we are validating only cloud-init + if package.descriptor_msg is not None: + self.validate_cloud_init_file(package) + + def validate_cloud_init_file(self, package): + """ This validation is for VNFDs with associated VDUs. """ + if 'vdu' in package.descriptor_msg.as_dict(): + for vdu in package.descriptor_msg.as_dict()['vdu']: + if 'cloud_init_file' in vdu: + cloud_init_file = vdu['cloud_init_file'] + for file in package.files: + if file.endswith('/' + cloud_init_file) is True: + return + raise MessageException( + OnboardError("Cloud-Init file reference in VNFD does not match with cloud-init file")) + def validate_package(self, package): checksum_validator = rift.package.package.PackageChecksumValidator(self.log) try: - file_checksums = checksum_validator.validate(package) + checksum_validator.validate(package) except rift.package.package.PackageFileChecksumError as e: raise MessageException(UpdateChecksumMismatch(e.filename)) from e except rift.package.package.PackageValidationError as e: raise MessageException(UpdateUnreadablePackage()) from e - return file_checksums + return checksum_validator.checksums def update_descriptors(self, package): descriptor_msg = package.descriptor_msg @@ -603,56 +431,55 @@ class UpdatePackage(threading.Thread): self.log.message(UpdateDescriptorUpdate()) try: - self.onboarder.update(descriptor_msg) + self.onboarder.update(descriptor_msg, project=self.project.name) except onboard.UpdateError as e: raise MessageException(UpdateDescriptorError(package.descriptor_file)) from e -class OnboardPackage(threading.Thread): - def __init__(self, log, loop, part_streamer, auth, - onboarder, uploader, package_store_map): - super().__init__() +class OnboardPackage(downloader.DownloaderProtocol): + + def __init__(self, log, loop, project, url, auth, + onboarder, uploader, package_store_map, transaction_id): self.log = log self.loop = loop - self.part_streamer = part_streamer + self.project = project + self.url = url self.auth = auth self.onboarder = onboarder self.uploader = uploader self.package_store_map = package_store_map + self.transaction_id = transaction_id - self.io_loop = tornado.ioloop.IOLoop.current() - - def _onboard_package(self): + def _onboard_package(self, packages): # Extract package could return multiple packages if # the package is converted - for pkg in self.extract_package(): + for pkg in packages: with pkg as temp_package: package_checksums = self.validate_package(temp_package) stored_package = self.store_package(temp_package) + self.validate_descriptor_fields(temp_package) try: - self.extract_charms(temp_package) - self.extract_scripts(temp_package) - self.extract_configs(temp_package) self.extract_icons(temp_package) - self.onboard_descriptors(temp_package) - except Exception: - self.delete_stored_package(stored_package) + except Exception as e: + if "data-exists" not in e.msg.text: + self.delete_stored_package(stored_package) raise - else: self.upload_images(temp_package, package_checksums) - def run(self): + def extract(self, packages): try: - self._onboard_package() + self._onboard_package(packages) self.log.message(OnboardSuccess()) except MessageException as e: self.log.message(e.msg) self.log.message(OnboardFailure()) + raise OnboardException(OnboardFailure()) + except Exception as e: self.log.exception(e) @@ -660,26 +487,45 @@ class OnboardPackage(threading.Thread): self.log.message(OnboardError(str(e))) self.log.message(OnboardFailure()) - finally: - self.part_streamer.release_parts() - - def extract_package(self): - """Extract multipart message from tarball""" - desc_part = self.part_streamer.get_parts_by_name("descriptor")[0] - - # Invoke the move API to prevent the part streamer from attempting - # to clean up (the file backed package will do that itself) - desc_part.move(desc_part.f_out.name) - - package_name = desc_part.get_filename() - package_path = desc_part.f_out.name + def on_download_succeeded(self, job): + self.log.message(DownloadSuccess("Package downloaded.")) extractor = extract.UploadPackageExtractor(self.log) file_backed_packages = extractor.create_packages_from_upload( - package_name, package_path + job.filename, job.filepath ) - - return file_backed_packages + try: + self.extract(file_backed_packages) + except Exception as e: + raise Exception("Error in Onboarding Package") + + def on_download_finished(self, job): + self.log.debug("*** Download completed") + if hasattr(self.project, 'upload_status_handler'): + self.project.upload_status_handler.upload_status(job, self.transaction_id) + + def on_download_progress(self, job): + self.log.debug("*** Download in progress") + if hasattr(self.project, 'upload_status_handler'): + self.project.upload_status_handler.upload_status(job, self.transaction_id) + + def on_download_failed(self, job): + self.log.error(job.detail) + self.log.message(DownloadError("Package download failed. {}".format(job.detail))) + self.log.message(OnboardFailure()) + + def download_package(self): + + self.log.debug("Before pkg download, project = {}".format(self.project.name)) + _, filename = tempfile.mkstemp() + url_downloader = downloader.UrlDownloader( + self.url, + auth=self.auth, + file_obj=filename, + decompress_on_fly=True, + log=self.log) + url_downloader.delegate = self + url_downloader.download() def get_package_store(self, package): return self.package_store_map[package.descriptor_type] @@ -722,36 +568,16 @@ class OnboardPackage(threading.Thread): package.open(image_file_map[image_name]) ) try: - self.uploader.upload_image(image_name, image_checksum, image_hdl) - self.uploader.upload_image_to_cloud_accounts(image_name, image_checksum) + set_image_property = {} + self.uploader.upload_image(image_name, image_checksum, image_hdl, set_image_property) + self.uploader.upload_image_to_cloud_accounts(image_name, image_checksum, self.project.name) except image.ImageUploadError as e: - raise MessageException(OnboardImageUploadError()) from e + raise MessageException(OnboardImageUploadError(str(e))) from e finally: _ = [image_hdl.close() for image_hdl in name_hdl_map.values()] - def extract_charms(self, package): - try: - charm_extractor = rift.package.charm.PackageCharmExtractor(self.log) - charm_extractor.extract_charms(package) - except rift.package.charm.CharmExtractionError as e: - raise MessageException(OnboardExtractionError()) from e - - def extract_scripts(self, package): - try: - script_extractor = rift.package.script.PackageScriptExtractor(self.log) - script_extractor.extract_scripts(package) - except rift.package.script.ScriptExtractionError as e: - raise MessageException(OnboardExtractionError()) from e - - def extract_configs(self, package): - try: - config_extractor = rift.package.config.PackageConfigExtractor(self.log) - config_extractor.extract_configs(package) - except rift.package.config.ConfigExtractionError as e: - raise MessageException(OnboardExtractionError()) from e - def extract_icons(self, package): try: icon_extractor = rift.package.icon.PackageIconExtractor(self.log) @@ -759,74 +585,155 @@ class OnboardPackage(threading.Thread): except rift.package.icon.IconExtractionError as e: raise MessageException(OnboardExtractionError()) from e + def validate_descriptor_fields(self, package): + # We can add more VNFD/NSD validations here. + if package.descriptor_msg is not None: + self.validate_cloud_init_file(package) + self.validate_vld_mgmt_network(package) + + def validate_vld_mgmt_network(self, package): + """ This is validation at onboarding of NSD for atleast one of the VL's to have mgmt network true + and have minimum one connection point""" + if package.descriptor_type == 'nsd': + for vld in package.descriptor_msg.as_dict().get('vld',[]): + if vld.get('mgmt_network', False) is True and \ + len(vld.get('vnfd_connection_point_ref',[])) > 0 : + break + else: + self.log.error(("AtLeast One of the VL's should have Management Network as True " + "and have minimum one connection point")) + + def validate_cloud_init_file(self, package): + """ This validation is for VNFDs with associated VDUs. """ + if 'vdu' in package.descriptor_msg.as_dict(): + for vdu in package.descriptor_msg.as_dict()['vdu']: + if 'cloud_init_file' in vdu: + cloud_init_file = vdu['cloud_init_file'] + for file in package.files: + if file.endswith('/' + cloud_init_file) is True: + return + raise MessageException( + OnboardError("Cloud-Init file reference in VNFD does not match with cloud-init file")) + def validate_package(self, package): - checksum_validator = rift.package.package.PackageChecksumValidator(self.log) + validators = ( + rift.package.package.PackageChecksumValidator(self.log), + rift.package.package.PackageConstructValidator(self.log), + ) - try: - file_checksums = checksum_validator.validate(package) - except rift.package.package.PackageFileChecksumError as e: - raise MessageException(OnboardChecksumMismatch(e.filename)) from e - except rift.package.package.PackageValidationError as e: - raise MessageException(OnboardUnreadablePackage()) from e + # Run the validators for checksum and package construction for imported pkgs + for validator in validators: + try: + validator.validate(package) + + except rift.package.package.PackageFileChecksumError as e: + raise MessageException(OnboardChecksumMismatch(e.filename)) from e + except rift.package.package.PackageValidationError as e: + raise MessageException(OnboardUnreadablePackage()) from e - return file_checksums + return validators[0].checksums def onboard_descriptors(self, package): - descriptor_msg = package.descriptor_msg + def process_error_messsage(exception, package): + """ + This method captures error reason. This needs to be enhanced with time. + """ + exception_msg = str(exception) + match_duplicate = re.findall('(.*?)', exception_msg, re.DOTALL) + + if len(match_duplicate) > 0: + error_message = str(match_duplicate[0]) + return error_message + + match = re.findall('(.*?)', exception_msg, re.DOTALL) + error_message = "" + if len(match) > 0: + for element in match: + element_message = "Missing element : {}".format(element) + error_message += element_message + else: + error_message = package.descriptor_file + return error_message + + def process_exception(exception, package): + return OnboardDescriptorError(process_error_messsage(exception, package)) + descriptor_msg = package.descriptor_msg self.log.message(OnboardDescriptorOnboard()) try: - self.onboarder.onboard(descriptor_msg) + self.onboarder.onboard(descriptor_msg, project=self.project.name) except onboard.OnboardError as e: - raise MessageException(OnboardDescriptorError(package.descriptor_file)) from e + raise MessageException(process_exception(e, package)) from e class UploaderApplication(tornado.web.Application): - def __init__(self, tasklet): - self.tasklet = tasklet - self.accounts = [] - self.messages = collections.defaultdict(list) - self.export_dir = os.path.join(os.environ['RIFT_ARTIFACTS'], 'launchpad/exports') + @classmethod + def from_tasklet(cls, tasklet): manifest = tasklet.tasklet_info.get_pb_manifest() - self.use_ssl = manifest.bootstrap_phase.rwsecurity.use_ssl - self.ssl_cert = manifest.bootstrap_phase.rwsecurity.cert - self.ssl_key = manifest.bootstrap_phase.rwsecurity.key + use_ssl = manifest.bootstrap_phase.rwsecurity.use_ssl + ssl_cert = manifest.bootstrap_phase.rwsecurity.cert + ssl_key = manifest.bootstrap_phase.rwsecurity.key + return cls( + tasklet, + ssl=(ssl_cert, ssl_key)) + + def __init__( + self, + tasklet, + ssl=None, + vnfd_store=None, + nsd_store=None): + + self.log = tasklet.log + self.loop = tasklet.loop + self.dts = tasklet.dts + + self.accounts = {} + self.ro_accounts = {} + + self.use_ssl = False + self.ssl_cert, self.ssl_key = None, None + if ssl: + self.use_ssl = True + self.ssl_cert, self.ssl_key = ssl - self.uploader = image.ImageUploader(self.log, self.loop, tasklet.dts) + self.messages = collections.defaultdict(list) + self.export_dir = os.path.join(os.environ['RIFT_VAR_ROOT'], 'launchpad/exports') + + self.uploader = image.ImageUploader(self.log, self.loop, self.dts) self.onboarder = onboard.DescriptorOnboarder( self.log, "127.0.0.1", 8008, self.use_ssl, self.ssl_cert, self.ssl_key ) - self.package_store_map = { - "vnfd": self.tasklet.vnfd_package_store, - "nsd": self.tasklet.nsd_package_store, - } self.exporter = export.DescriptorPackageArchiveExporter(self.log) self.loop.create_task(export.periodic_export_cleanup(self.log, self.loop, self.export_dir)) - attrs = dict(log=self.log, loop=self.loop) + self.tasklet = tasklet + self.get_vnfd_catalog = tasklet.get_vnfd_catalog + self.get_nsd_catalog = tasklet.get_nsd_catalog + catalog_map = { + "vnfd": self.get_vnfd_catalog, + "nsd": self.get_nsd_catalog + } + + self.upload_handler = UploadRpcHandler(self) + self.update_handler = UpdateRpcHandler(self) + self.export_handler = export.ExportRpcHandler(self, catalog_map) - export_attrs = attrs.copy() - export_attrs.update({ - "store_map": self.package_store_map, - "exporter": self.exporter, - "catalog_map": { - "vnfd": self.vnfd_catalog, - "nsd": self.nsd_catalog - } - }) + attrs = dict(log=self.log, loop=self.loop) super(UploaderApplication, self).__init__([ - (r"/api/update", UpdateHandler, attrs), - (r"/api/upload", UploadHandler, attrs), + (r"/api/package/vnfd/(.*)", pkg_handler.FileRestApiHandler, { + 'path': rift.package.store.VnfdPackageFilesystemStore.DEFAULT_ROOT_DIR}), + (r"/api/package/nsd/(.*)", pkg_handler.FileRestApiHandler, { + 'path': rift.package.store.NsdPackageFilesystemStore.DEFAULT_ROOT_DIR}), (r"/api/upload/([^/]+)/state", UploadStateHandler, attrs), (r"/api/update/([^/]+)/state", UpdateStateHandler, attrs), (r"/api/export/([^/]+)/state", export.ExportStateHandler, attrs), - (r"/api/export/(nsd|vnfd)$", export.ExportHandler, export_attrs), (r"/api/export/([^/]+.tar.gz)", tornado.web.StaticFileHandler, { "path": self.export_dir, }), @@ -835,47 +742,70 @@ class UploaderApplication(tornado.web.Application): }), ]) - @property - def log(self): - return self.tasklet.log - - @property - def loop(self): - return self.tasklet.loop + @asyncio.coroutine + def register(self): + yield from self.upload_handler.register() + yield from self.update_handler.register() + yield from self.export_handler.register() def get_logger(self, transaction_id): return message.Logger(self.log, self.messages[transaction_id]) - def onboard(self, part_streamer, transaction_id, auth=None): + def build_store_map(self, project=None): + ''' Use project information to build vnfd/nsd filesystem stores with appropriate + package directory root. + ''' + vnfd_store = rift.package.store.VnfdPackageFilesystemStore(self.log) if not \ + project else rift.package.store.VnfdPackageFilesystemStore(self.log, project=project) + nsd_store = rift.package.store.NsdPackageFilesystemStore(self.log) if not \ + project else rift.package.store.NsdPackageFilesystemStore(self.log, project=project) + + return dict(vnfd = vnfd_store, nsd = nsd_store) + + def onboard(self, url, transaction_id, auth=None, project=None): log = message.Logger(self.log, self.messages[transaction_id]) - OnboardPackage( + try: + self.project = self.tasklet._get_project(project) + except Exception as e: + self.log.error("Exception raised ...%s" % (str(e))) + self.log.exception(e) + + self.package_store_map = self.build_store_map(project) + onboard_package = OnboardPackage( log, self.loop, - part_streamer, + self.project, + url, auth, self.onboarder, self.uploader, self.package_store_map, - ).start() + transaction_id + ) + + self.loop.run_in_executor(None, onboard_package.download_package) - def update(self, part_streamer, transaction_id, auth=None): + def update(self, url, transaction_id, auth=None, project=None): log = message.Logger(self.log, self.messages[transaction_id]) - UpdatePackage( + try: + self.project = self.tasklet._get_project(project) + except Exception as e: + self.log.error("Exception raised ...%s" % (str(e))) + self.log.exception(e) + + self.package_store_map = self.build_store_map(project) + update_package = UpdatePackage( log, self.loop, - part_streamer, + self.project, + url, auth, self.onboarder, self.uploader, self.package_store_map, - ).start() - - @property - def vnfd_catalog(self): - return self.tasklet.vnfd_catalog + transaction_id + ) - @property - def nsd_catalog(self): - return self.tasklet.nsd_catalog + self.loop.run_in_executor(None, update_package.download_package)