-#
+#
# Copyright 2016 RIFT.IO Inc
#
# Licensed under the Apache License, Version 2.0 (the "License");
# See the License for the specific language governing permissions and
# limitations under the License.
#
-
+import abc
+import asyncio
import collections
import os
+import tempfile
import threading
import uuid
import zlib
+import re
import tornado
import tornado.escape
import gi
gi.require_version('RwLaunchpadYang', '1.0')
-gi.require_version('NsdYang', '1.0')
-gi.require_version('VnfdYang', '1.0')
+gi.require_version('ProjectNsdYang', '1.0')
+gi.require_version('ProjectVnfdYang', '1.0')
from gi.repository import (
- NsdYang,
- VnfdYang,
+ ProjectNsdYang as NsdYang,
+ ProjectVnfdYang as VnfdYang,
)
import rift.mano.cloud
-import rift.package.charm
import rift.package.checksums
-import rift.package.config
import rift.package.convert
+import rift.package.handler as pkg_handler
import rift.package.icon
import rift.package.package
import rift.package.script
import rift.package.store
+from gi.repository import (
+ RwDts as rwdts,
+ RwPkgMgmtYang
+ )
+import rift.downloader as downloader
+import rift.mano.dts as mano_dts
+import rift.tasklets
+
from . import (
export,
extract,
OnboardStart,
OnboardSuccess,
+ DownloadError,
+ DownloadSuccess,
# Update Error Messages
UpdateChecksumMismatch,
from .tosca import ExportTosca
+from .onboard import OnboardError as OnboardException
+
MB = 1024 * 1024
GB = 1024 * MB
MAX_STREAMED_SIZE = 5 * GB
+# Shortcuts
+RPC_PACKAGE_CREATE_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_PackageCreate
+RPC_PACKAGE_UPDATE_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_PackageUpdate
class HttpMessageError(Exception):
def __init__(self, code, msg):
self.msg = msg
-class GzipTemporaryFileStreamedPart(multipart_streamer.TemporaryFileStreamedPart):
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
-
- # Create a decompressor for gzip data to decompress on the fly during upload
- # http://stackoverflow.com/questions/2423866/python-decompressing-gzip-chunk-by-chunk
- self._decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS)
-
- def feed(self, data):
- decompressed_data = self._decompressor.decompress(data)
- if decompressed_data:
- super().feed(decompressed_data)
-
- def finalize(self):
- # All data has arrived, flush the decompressor to get any last decompressed data
- decompressed_data = self._decompressor.flush()
- super().feed(decompressed_data)
- super().finalize()
-
-
-class GzipMultiPartStreamer(multipart_streamer.MultiPartStreamer):
- """ This Multipart Streamer decompresses gzip files on the fly during multipart upload """
-
- @staticmethod
- def _get_descriptor_name_from_headers(headers):
- descriptor_filename = None
-
- for entry in headers:
- if entry["value"] != "form-data":
- continue
-
- form_data_params = entry["params"]
- if "name" in form_data_params:
- if form_data_params["name"] != "descriptor":
- continue
-
- if "filename" not in form_data_params:
- continue
-
- descriptor_filename = form_data_params["filename"]
-
- return descriptor_filename
-
- def create_part(self, headers):
- """ Create the StreamedPart subclass depending on the descriptor filename
-
- For gzipped descriptor packages, create a GzipTemporaryFileStreamedPart which
- can decompress the gzip while it's being streamed into the launchpad directely
- into a file.
-
- Returns:
- The descriptor filename
+class UploadRpcHandler(mano_dts.AbstractRpcHandler):
+ def __init__(self, application):
"""
- filename = GzipMultiPartStreamer._get_descriptor_name_from_headers(headers)
- if filename is None or not filename.endswith(".gz"):
- return multipart_streamer.TemporaryFileStreamedPart(self, headers)
-
- return GzipTemporaryFileStreamedPart(self, headers)
-
-
-class RequestHandler(tornado.web.RequestHandler):
- def options(self, *args, **kargs):
- pass
-
- def set_default_headers(self):
- self.set_header('Access-Control-Allow-Origin', '*')
- self.set_header('Access-Control-Allow-Headers',
- 'Content-Type, Cache-Control, Accept, X-Requested-With, Authorization')
- self.set_header('Access-Control-Allow-Methods', 'POST, GET, PUT, DELETE')
-
-
-@tornado.web.stream_request_body
-class StreamingUploadHandler(RequestHandler):
- def initialize(self, log, loop):
- """Initialize the handler
-
- Arguments:
- log - the logger that this handler should use
- loop - the tasklets ioloop
-
+ Args:
+ application: UploaderApplication
"""
- self.transaction_id = str(uuid.uuid4())
-
- self.loop = loop
- self.log = self.application.get_logger(self.transaction_id)
-
- self.part_streamer = None
-
- self.log.debug('created handler (transaction_id = {})'.format(self.transaction_id))
-
- def msg_missing_content_type(self):
- raise NotImplementedError()
+ super().__init__(application.log, application.dts, application.loop)
+ self.application = application
- def msg_unsupported_media_type(self):
- raise NotImplementedError()
-
- def msg_missing_content_boundary(self):
- raise NotImplementedError()
-
- def msg_start(self):
- raise NotImplementedError()
-
- def msg_success(self):
- raise NotImplementedError()
-
- def msg_failure(self):
- raise NotImplementedError()
-
- def msg_package_upload(self):
- raise NotImplementedError()
-
- @tornado.gen.coroutine
- def prepare(self):
- """Prepare the handler for a request
-
- The prepare function is the first part of a request transaction. It
- creates a temporary file that uploaded data can be written to.
+ @property
+ def xpath(self):
+ return "/rw-pkg-mgmt:package-create"
- """
- if self.request.method != "POST":
- return
+ @asyncio.coroutine
+ def callback(self, ks_path, msg):
+ transaction_id = str(uuid.uuid4())
+ log = self.application.get_logger(transaction_id)
+ log.message(OnboardStart())
- self.request.connection.set_max_body_size(MAX_STREAMED_SIZE)
+ self.log.debug("Package create RPC: {}".format(msg))
- self.log.message(self.msg_start())
+ auth = None
+ if msg.username is not None:
+ auth = (msg.username, msg.password)
try:
- # Retrieve the content type and parameters from the request
- content_type = self.request.headers.get('content-type', None)
- if content_type is None:
- raise HttpMessageError(400, self.msg_missing_content_type())
-
- content_type, params = tornado.httputil._parse_header(content_type)
-
- if "multipart/form-data" != content_type.lower():
- raise HttpMessageError(415, self.msg_unsupported_media_type())
-
- if "boundary" not in params:
- raise HttpMessageError(400, self.msg_missing_content_boundary())
-
- # You can get the total request size from the headers.
- try:
- total = int(self.request.headers.get("Content-Length", "0"))
- except KeyError:
- self.log.warning("Content length header not found")
- # For any well formed browser request, Content-Length should have a value.
- total = 0
-
- # And here you create a streamer that will accept incoming data
- self.part_streamer = GzipMultiPartStreamer(total)
-
- except HttpMessageError as e:
- self.log.message(e.msg)
- self.log.message(self.msg_failure())
+ project = msg.project_name
+ except AttributeError as e:
+ self._log.warning("Did not get project name in RPC: {}".
+ format(msg.as_dict()))
+ project = rift.mano.utils.project.DEFAULT_PROJECT
+
+ self.application.onboard(
+ msg.external_url,
+ transaction_id,
+ auth=auth,
+ project=project,
+ )
- raise tornado.web.HTTPError(e.code, e.msg.name)
+ rpc_op = RPC_PACKAGE_CREATE_ENDPOINT.from_dict({
+ "transaction_id": transaction_id,
+ "project_name": project,
+ })
- except Exception as e:
- self.log.exception(e)
- self.log.message(self.msg_failure())
+ return rpc_op
- @tornado.gen.coroutine
- def data_received(self, chunk):
- """Write data to the current file
-
- Arguments:
- data - a chunk of data to write to file
+class UpdateRpcHandler(mano_dts.AbstractRpcHandler):
+ def __init__(self, application):
"""
-
- """When a chunk of data is received, we forward it to the multipart streamer."""
- self.part_streamer.data_received(chunk)
-
- def post(self):
- """Handle a post request
-
- The function is called after any data associated with the body of the
- request has been received.
-
+ Args:
+ application: UploaderApplication
"""
- # You MUST call this to close the incoming stream.
- self.part_streamer.data_complete()
-
- desc_parts = self.part_streamer.get_parts_by_name("descriptor")
- if len(desc_parts) != 1:
- raise HttpMessageError(400, OnboardError("Descriptor option not found"))
-
- self.log.message(self.msg_package_upload())
-
-
-class UploadHandler(StreamingUploadHandler):
- """
- This handler is used to upload archives that contain VNFDs, NSDs, and PNFDs
- to the launchpad. This is a streaming handler that writes uploaded archives
- to disk without loading them all into memory.
- """
-
- def msg_missing_content_type(self):
- return OnboardMissingContentType()
+ super().__init__(application.log, application.dts, application.loop)
+ self.application = application
- def msg_unsupported_media_type(self):
- return OnboardUnsupportedMediaType()
-
- def msg_missing_content_boundary(self):
- return OnboardMissingContentBoundary()
-
- def msg_start(self):
- return OnboardStart()
-
- def msg_success(self):
- return OnboardSuccess()
-
- def msg_failure(self):
- return OnboardFailure()
-
- def msg_package_upload(self):
- return OnboardPackageUpload()
-
- def post(self):
- """Handle a post request
-
- The function is called after any data associated with the body of the
- request has been received.
-
- """
- try:
- super().post()
- self.application.onboard(
- self.part_streamer,
- self.transaction_id,
- auth=self.request.headers.get('authorization', None),
- )
-
- self.set_status(200)
- self.write(tornado.escape.json_encode({
- "transaction_id": self.transaction_id,
- }))
-
- except Exception:
- self.log.exception("Upload POST failed")
- self.part_streamer.release_parts()
- raise
-
-
-class UpdateHandler(StreamingUploadHandler):
- def msg_missing_content_type(self):
- return UpdateMissingContentType()
-
- def msg_unsupported_media_type(self):
- return UpdateUnsupportedMediaType()
-
- def msg_missing_content_boundary(self):
- return UpdateMissingContentBoundary()
-
- def msg_start(self):
- return UpdateStart()
-
- def msg_success(self):
- return UpdateSuccess()
-
- def msg_failure(self):
- return UpdateFailure()
+ @property
+ def xpath(self):
+ return "/rw-pkg-mgmt:package-update"
- def msg_package_upload(self):
- return UpdatePackageUpload()
+ @asyncio.coroutine
+ def callback(self, ks_path, msg):
- def post(self):
- """Handle a post request
+ transaction_id = str(uuid.uuid4())
+ log = self.application.get_logger(transaction_id)
+ log.message(UpdateStart())
- The function is called after any data associated with the body of the
- request has been received.
+ auth = None
+ if msg.username is not None:
+ auth = (msg.username, msg.password)
- """
- try:
- super().post()
+ self.application.update(
+ msg.external_url,
+ transaction_id,
+ auth=auth,
+ project=msg.project_name,
+ )
- self.application.update(
- self.part_streamer,
- self.transaction_id,
- auth=self.request.headers.get('authorization', None),
- )
+ rpc_op = RPC_PACKAGE_UPDATE_ENDPOINT.from_dict({
+ "transaction_id": transaction_id,
+ "project_name": msg.project_name,
+ })
- self.set_status(200)
- self.write(tornado.escape.json_encode({
- "transaction_id": self.transaction_id,
- }))
- except Exception:
- self.log.exception("Upload POST failed")
- self.part_streamer.release_parts()
- raise
+ return rpc_op
class UploadStateHandler(state.StateHandler):
FAILURE = UpdateFailure
-class UpdatePackage(threading.Thread):
- def __init__(self, log, loop, part_streamer, auth,
- onboarder, uploader, package_store_map):
+class UpdatePackage(downloader.DownloaderProtocol):
+
+ def __init__(self, log, loop, project, url, auth,
+ onboarder, uploader, package_store_map, transaction_id):
super().__init__()
self.log = log
self.loop = loop
- self.part_streamer = part_streamer
+ self.project = project
+ self.url = url
self.auth = auth
self.onboarder = onboarder
self.uploader = uploader
self.package_store_map = package_store_map
+ self.transaction_id = transaction_id
+
- self.io_loop = tornado.ioloop.IOLoop.current()
+ def _update_package(self, packages):
- def _update_package(self):
# Extract package could return multiple packages if
# the package is converted
- for pkg in self.extract_package():
+ for pkg in packages:
with pkg as temp_package:
package_checksums = self.validate_package(temp_package)
stored_package = self.update_package(temp_package)
+ self.validate_descriptor_fields(temp_package)
try:
- self.extract_charms(temp_package)
- self.extract_scripts(temp_package)
- self.extract_configs(temp_package)
self.extract_icons(temp_package)
-
self.update_descriptors(temp_package)
except Exception:
else:
self.upload_images(temp_package, package_checksums)
- def run(self):
+ def extract(self, packages):
try:
- self._update_package()
+ self._update_package(packages)
self.log.message(UpdateSuccess())
except MessageException as e:
self.log.message(e.msg)
self.log.message(UpdateFailure())
+ raise UpdateFailure(str(e))
except Exception as e:
self.log.exception(e)
self.log.message(UpdateError(str(e)))
self.log.message(UpdateFailure())
- def extract_package(self):
- """Extract multipart message from tarball"""
- desc_part = self.part_streamer.get_parts_by_name("descriptor")[0]
-
- # Invoke the move API to prevent the part streamer from attempting
- # to clean up (the file backed package will do that itself)
- desc_part.move(desc_part.f_out.name)
-
- package_name = desc_part.get_filename()
- package_path = desc_part.f_out.name
+ def on_download_succeeded(self, job):
+ self.log.message(DownloadSuccess("Package downloaded."))
extractor = extract.UploadPackageExtractor(self.log)
file_backed_packages = extractor.create_packages_from_upload(
- package_name, package_path
+ job.filename, job.filepath
)
-
- return file_backed_packages
+ try:
+ self.extract(file_backed_packages)
+ except Exception as e:
+ raise Exception("Error in Package Update")
+
+ def on_download_finished(self, job):
+ self.log.debug("*** Download completed")
+ if hasattr(self.project, 'update_status_handler'):
+ self.project.update_status_handler.update_status(job, self.transaction_id)
+
+ def on_download_progress(self, job):
+ self.log.debug("*** Download in progress")
+ if hasattr(self.project, 'update_status_handler'):
+ self.project.update_status_handler.update_status(job, self.transaction_id)
+
+ def on_download_failed(self, job):
+ self.log.error(job.detail)
+ self.log.message(DownloadError("Package download failed. {}".format(job.detail)))
+ self.log.message(UpdateFailure())
+
+ def download_package(self):
+
+ _, filename = tempfile.mkstemp()
+ url_downloader = downloader.UrlDownloader(
+ self.url,
+ auth=self.auth,
+ file_obj=filename,
+ decompress_on_fly=True,
+ log=self.log)
+ url_downloader.delegate = self
+ url_downloader.download()
def get_package_store(self, package):
return self.package_store_map[package.descriptor_type]
)
try:
self.uploader.upload_image(image_name, image_checksum, image_hdl)
- self.uploader.upload_image_to_cloud_accounts(image_name, image_checksum)
+ self.uploader.upload_image_to_cloud_accounts(image_name, image_checksum, self.project)
except image.ImageUploadError as e:
self.log.exception("Failed to upload image: %s", image_name)
finally:
_ = [image_hdl.close() for image_hdl in name_hdl_map.values()]
-
- def extract_charms(self, package):
- try:
- charm_extractor = rift.package.charm.PackageCharmExtractor(self.log)
- charm_extractor.extract_charms(package)
- except rift.package.charm.CharmExtractionError as e:
- raise MessageException(UpdateExtractionError()) from e
-
- def extract_scripts(self, package):
- try:
- script_extractor = rift.package.script.PackageScriptExtractor(self.log)
- script_extractor.extract_scripts(package)
- except rift.package.script.ScriptExtractionError as e:
- raise MessageException(UpdateExtractionError()) from e
-
- def extract_configs(self, package):
- try:
- config_extractor = rift.package.config.PackageConfigExtractor(self.log)
- config_extractor.extract_configs(package)
- except rift.package.config.ConfigExtractionError as e:
- raise MessageException(UpdateExtractionError()) from e
-
def extract_icons(self, package):
try:
icon_extractor = rift.package.icon.PackageIconExtractor(self.log)
except rift.package.icon.IconExtractionError as e:
raise MessageException(UpdateExtractionError()) from e
+ def validate_descriptor_fields(self, package):
+ # We can add more VNFD validations here. Currently we are validating only cloud-init
+ if package.descriptor_msg is not None:
+ self.validate_cloud_init_file(package)
+
+ def validate_cloud_init_file(self, package):
+ """ This validation is for VNFDs with associated VDUs. """
+ if 'vdu' in package.descriptor_msg.as_dict():
+ for vdu in package.descriptor_msg.as_dict()['vdu']:
+ if 'cloud_init_file' in vdu:
+ cloud_init_file = vdu['cloud_init_file']
+ for file in package.files:
+ if file.endswith('/' + cloud_init_file) is True:
+ return
+ raise MessageException(
+ OnboardError("Cloud-Init file reference in VNFD does not match with cloud-init file"))
+
def validate_package(self, package):
checksum_validator = rift.package.package.PackageChecksumValidator(self.log)
try:
- file_checksums = checksum_validator.validate(package)
+ checksum_validator.validate(package)
except rift.package.package.PackageFileChecksumError as e:
raise MessageException(UpdateChecksumMismatch(e.filename)) from e
except rift.package.package.PackageValidationError as e:
raise MessageException(UpdateUnreadablePackage()) from e
- return file_checksums
+ return checksum_validator.checksums
def update_descriptors(self, package):
descriptor_msg = package.descriptor_msg
self.log.message(UpdateDescriptorUpdate())
try:
- self.onboarder.update(descriptor_msg)
+ self.onboarder.update(descriptor_msg, project=self.project.name)
except onboard.UpdateError as e:
raise MessageException(UpdateDescriptorError(package.descriptor_file)) from e
-class OnboardPackage(threading.Thread):
- def __init__(self, log, loop, part_streamer, auth,
- onboarder, uploader, package_store_map):
- super().__init__()
+class OnboardPackage(downloader.DownloaderProtocol):
+
+ def __init__(self, log, loop, project, url, auth,
+ onboarder, uploader, package_store_map, transaction_id):
self.log = log
self.loop = loop
- self.part_streamer = part_streamer
+ self.project = project
+ self.url = url
self.auth = auth
self.onboarder = onboarder
self.uploader = uploader
self.package_store_map = package_store_map
+ self.transaction_id = transaction_id
- self.io_loop = tornado.ioloop.IOLoop.current()
-
- def _onboard_package(self):
+ def _onboard_package(self, packages):
# Extract package could return multiple packages if
# the package is converted
- for pkg in self.extract_package():
+ for pkg in packages:
with pkg as temp_package:
package_checksums = self.validate_package(temp_package)
stored_package = self.store_package(temp_package)
+ self.validate_descriptor_fields(temp_package)
try:
- self.extract_charms(temp_package)
- self.extract_scripts(temp_package)
- self.extract_configs(temp_package)
self.extract_icons(temp_package)
-
self.onboard_descriptors(temp_package)
- except Exception:
- self.delete_stored_package(stored_package)
+ except Exception as e:
+ if "data-exists" not in e.msg.text:
+ self.delete_stored_package(stored_package)
raise
-
else:
self.upload_images(temp_package, package_checksums)
- def run(self):
+ def extract(self, packages):
try:
- self._onboard_package()
+ self._onboard_package(packages)
self.log.message(OnboardSuccess())
except MessageException as e:
self.log.message(e.msg)
self.log.message(OnboardFailure())
+ raise OnboardException(OnboardFailure())
+
except Exception as e:
self.log.exception(e)
self.log.message(OnboardError(str(e)))
self.log.message(OnboardFailure())
- finally:
- self.part_streamer.release_parts()
-
- def extract_package(self):
- """Extract multipart message from tarball"""
- desc_part = self.part_streamer.get_parts_by_name("descriptor")[0]
-
- # Invoke the move API to prevent the part streamer from attempting
- # to clean up (the file backed package will do that itself)
- desc_part.move(desc_part.f_out.name)
-
- package_name = desc_part.get_filename()
- package_path = desc_part.f_out.name
+ def on_download_succeeded(self, job):
+ self.log.message(DownloadSuccess("Package downloaded."))
extractor = extract.UploadPackageExtractor(self.log)
file_backed_packages = extractor.create_packages_from_upload(
- package_name, package_path
+ job.filename, job.filepath
)
-
- return file_backed_packages
+ try:
+ self.extract(file_backed_packages)
+ except Exception as e:
+ raise Exception("Error in Onboarding Package")
+
+ def on_download_finished(self, job):
+ self.log.debug("*** Download completed")
+ if hasattr(self.project, 'upload_status_handler'):
+ self.project.upload_status_handler.upload_status(job, self.transaction_id)
+
+ def on_download_progress(self, job):
+ self.log.debug("*** Download in progress")
+ if hasattr(self.project, 'upload_status_handler'):
+ self.project.upload_status_handler.upload_status(job, self.transaction_id)
+
+ def on_download_failed(self, job):
+ self.log.error(job.detail)
+ self.log.message(DownloadError("Package download failed. {}".format(job.detail)))
+ self.log.message(OnboardFailure())
+
+ def download_package(self):
+
+ self.log.debug("Before pkg download, project = {}".format(self.project.name))
+ _, filename = tempfile.mkstemp()
+ url_downloader = downloader.UrlDownloader(
+ self.url,
+ auth=self.auth,
+ file_obj=filename,
+ decompress_on_fly=True,
+ log=self.log)
+ url_downloader.delegate = self
+ url_downloader.download()
def get_package_store(self, package):
return self.package_store_map[package.descriptor_type]
package.open(image_file_map[image_name])
)
try:
- self.uploader.upload_image(image_name, image_checksum, image_hdl)
- self.uploader.upload_image_to_cloud_accounts(image_name, image_checksum)
+ set_image_property = {}
+ self.uploader.upload_image(image_name, image_checksum, image_hdl, set_image_property)
+ self.uploader.upload_image_to_cloud_accounts(image_name, image_checksum, self.project.name)
except image.ImageUploadError as e:
- raise MessageException(OnboardImageUploadError()) from e
+ raise MessageException(OnboardImageUploadError(str(e))) from e
finally:
_ = [image_hdl.close() for image_hdl in name_hdl_map.values()]
- def extract_charms(self, package):
- try:
- charm_extractor = rift.package.charm.PackageCharmExtractor(self.log)
- charm_extractor.extract_charms(package)
- except rift.package.charm.CharmExtractionError as e:
- raise MessageException(OnboardExtractionError()) from e
-
- def extract_scripts(self, package):
- try:
- script_extractor = rift.package.script.PackageScriptExtractor(self.log)
- script_extractor.extract_scripts(package)
- except rift.package.script.ScriptExtractionError as e:
- raise MessageException(OnboardExtractionError()) from e
-
- def extract_configs(self, package):
- try:
- config_extractor = rift.package.config.PackageConfigExtractor(self.log)
- config_extractor.extract_configs(package)
- except rift.package.config.ConfigExtractionError as e:
- raise MessageException(OnboardExtractionError()) from e
-
def extract_icons(self, package):
try:
icon_extractor = rift.package.icon.PackageIconExtractor(self.log)
except rift.package.icon.IconExtractionError as e:
raise MessageException(OnboardExtractionError()) from e
+ def validate_descriptor_fields(self, package):
+ # We can add more VNFD/NSD validations here.
+ if package.descriptor_msg is not None:
+ self.validate_cloud_init_file(package)
+ self.validate_vld_mgmt_network(package)
+
+ def validate_vld_mgmt_network(self, package):
+ """ This is validation at onboarding of NSD for atleast one of the VL's to have mgmt network true
+ and have minimum one connection point"""
+ if package.descriptor_type == 'nsd':
+ for vld in package.descriptor_msg.as_dict().get('vld',[]):
+ if vld.get('mgmt_network', False) is True and \
+ len(vld.get('vnfd_connection_point_ref',[])) > 0 :
+ break
+ else:
+ self.log.error(("AtLeast One of the VL's should have Management Network as True "
+ "and have minimum one connection point"))
+
+ def validate_cloud_init_file(self, package):
+ """ This validation is for VNFDs with associated VDUs. """
+ if 'vdu' in package.descriptor_msg.as_dict():
+ for vdu in package.descriptor_msg.as_dict()['vdu']:
+ if 'cloud_init_file' in vdu:
+ cloud_init_file = vdu['cloud_init_file']
+ for file in package.files:
+ if file.endswith('/' + cloud_init_file) is True:
+ return
+ raise MessageException(
+ OnboardError("Cloud-Init file reference in VNFD does not match with cloud-init file"))
+
def validate_package(self, package):
- checksum_validator = rift.package.package.PackageChecksumValidator(self.log)
+ validators = (
+ rift.package.package.PackageChecksumValidator(self.log),
+ rift.package.package.PackageConstructValidator(self.log),
+ )
- try:
- file_checksums = checksum_validator.validate(package)
- except rift.package.package.PackageFileChecksumError as e:
- raise MessageException(OnboardChecksumMismatch(e.filename)) from e
- except rift.package.package.PackageValidationError as e:
- raise MessageException(OnboardUnreadablePackage()) from e
+ # Run the validators for checksum and package construction for imported pkgs
+ for validator in validators:
+ try:
+ validator.validate(package)
+
+ except rift.package.package.PackageFileChecksumError as e:
+ raise MessageException(OnboardChecksumMismatch(e.filename)) from e
+ except rift.package.package.PackageValidationError as e:
+ raise MessageException(OnboardUnreadablePackage()) from e
- return file_checksums
+ return validators[0].checksums
def onboard_descriptors(self, package):
- descriptor_msg = package.descriptor_msg
+ def process_error_messsage(exception, package):
+ """
+ This method captures error reason. This needs to be enhanced with time.
+ """
+ exception_msg = str(exception)
+ match_duplicate = re.findall('<error-tag>(.*?)</error-tag>', exception_msg, re.DOTALL)
+
+ if len(match_duplicate) > 0:
+ error_message = str(match_duplicate[0])
+ return error_message
+
+ match = re.findall('<tailf:missing-element>(.*?)</tailf:missing-element>', exception_msg, re.DOTALL)
+ error_message = ""
+ if len(match) > 0:
+ for element in match:
+ element_message = "Missing element : {}".format(element)
+ error_message += element_message
+ else:
+ error_message = package.descriptor_file
+ return error_message
+
+ def process_exception(exception, package):
+ return OnboardDescriptorError(process_error_messsage(exception, package))
+ descriptor_msg = package.descriptor_msg
self.log.message(OnboardDescriptorOnboard())
try:
- self.onboarder.onboard(descriptor_msg)
+ self.onboarder.onboard(descriptor_msg, project=self.project.name)
except onboard.OnboardError as e:
- raise MessageException(OnboardDescriptorError(package.descriptor_file)) from e
+ raise MessageException(process_exception(e, package)) from e
class UploaderApplication(tornado.web.Application):
- def __init__(self, tasklet):
- self.tasklet = tasklet
- self.accounts = []
- self.messages = collections.defaultdict(list)
- self.export_dir = os.path.join(os.environ['RIFT_ARTIFACTS'], 'launchpad/exports')
+ @classmethod
+ def from_tasklet(cls, tasklet):
manifest = tasklet.tasklet_info.get_pb_manifest()
- self.use_ssl = manifest.bootstrap_phase.rwsecurity.use_ssl
- self.ssl_cert = manifest.bootstrap_phase.rwsecurity.cert
- self.ssl_key = manifest.bootstrap_phase.rwsecurity.key
+ use_ssl = manifest.bootstrap_phase.rwsecurity.use_ssl
+ ssl_cert = manifest.bootstrap_phase.rwsecurity.cert
+ ssl_key = manifest.bootstrap_phase.rwsecurity.key
+ return cls(
+ tasklet,
+ ssl=(ssl_cert, ssl_key))
+
+ def __init__(
+ self,
+ tasklet,
+ ssl=None,
+ vnfd_store=None,
+ nsd_store=None):
+
+ self.log = tasklet.log
+ self.loop = tasklet.loop
+ self.dts = tasklet.dts
+
+ self.accounts = {}
+ self.ro_accounts = {}
+
+ self.use_ssl = False
+ self.ssl_cert, self.ssl_key = None, None
+ if ssl:
+ self.use_ssl = True
+ self.ssl_cert, self.ssl_key = ssl
- self.uploader = image.ImageUploader(self.log, self.loop, tasklet.dts)
+ self.messages = collections.defaultdict(list)
+ self.export_dir = os.path.join(os.environ['RIFT_VAR_ROOT'], 'launchpad/exports')
+
+ self.uploader = image.ImageUploader(self.log, self.loop, self.dts)
self.onboarder = onboard.DescriptorOnboarder(
self.log, "127.0.0.1", 8008, self.use_ssl, self.ssl_cert, self.ssl_key
)
- self.package_store_map = {
- "vnfd": self.tasklet.vnfd_package_store,
- "nsd": self.tasklet.nsd_package_store,
- }
self.exporter = export.DescriptorPackageArchiveExporter(self.log)
self.loop.create_task(export.periodic_export_cleanup(self.log, self.loop, self.export_dir))
- attrs = dict(log=self.log, loop=self.loop)
+ self.tasklet = tasklet
+ self.get_vnfd_catalog = tasklet.get_vnfd_catalog
+ self.get_nsd_catalog = tasklet.get_nsd_catalog
+ catalog_map = {
+ "vnfd": self.get_vnfd_catalog,
+ "nsd": self.get_nsd_catalog
+ }
+
+ self.upload_handler = UploadRpcHandler(self)
+ self.update_handler = UpdateRpcHandler(self)
+ self.export_handler = export.ExportRpcHandler(self, catalog_map)
- export_attrs = attrs.copy()
- export_attrs.update({
- "store_map": self.package_store_map,
- "exporter": self.exporter,
- "catalog_map": {
- "vnfd": self.vnfd_catalog,
- "nsd": self.nsd_catalog
- }
- })
+ attrs = dict(log=self.log, loop=self.loop)
super(UploaderApplication, self).__init__([
- (r"/api/update", UpdateHandler, attrs),
- (r"/api/upload", UploadHandler, attrs),
+ (r"/api/package/vnfd/(.*)", pkg_handler.FileRestApiHandler, {
+ 'path': rift.package.store.VnfdPackageFilesystemStore.DEFAULT_ROOT_DIR}),
+ (r"/api/package/nsd/(.*)", pkg_handler.FileRestApiHandler, {
+ 'path': rift.package.store.NsdPackageFilesystemStore.DEFAULT_ROOT_DIR}),
(r"/api/upload/([^/]+)/state", UploadStateHandler, attrs),
(r"/api/update/([^/]+)/state", UpdateStateHandler, attrs),
(r"/api/export/([^/]+)/state", export.ExportStateHandler, attrs),
- (r"/api/export/(nsd|vnfd)$", export.ExportHandler, export_attrs),
(r"/api/export/([^/]+.tar.gz)", tornado.web.StaticFileHandler, {
"path": self.export_dir,
}),
}),
])
- @property
- def log(self):
- return self.tasklet.log
-
- @property
- def loop(self):
- return self.tasklet.loop
+ @asyncio.coroutine
+ def register(self):
+ yield from self.upload_handler.register()
+ yield from self.update_handler.register()
+ yield from self.export_handler.register()
def get_logger(self, transaction_id):
return message.Logger(self.log, self.messages[transaction_id])
- def onboard(self, part_streamer, transaction_id, auth=None):
+ def build_store_map(self, project=None):
+ ''' Use project information to build vnfd/nsd filesystem stores with appropriate
+ package directory root.
+ '''
+ vnfd_store = rift.package.store.VnfdPackageFilesystemStore(self.log) if not \
+ project else rift.package.store.VnfdPackageFilesystemStore(self.log, project=project)
+ nsd_store = rift.package.store.NsdPackageFilesystemStore(self.log) if not \
+ project else rift.package.store.NsdPackageFilesystemStore(self.log, project=project)
+
+ return dict(vnfd = vnfd_store, nsd = nsd_store)
+
+ def onboard(self, url, transaction_id, auth=None, project=None):
log = message.Logger(self.log, self.messages[transaction_id])
- OnboardPackage(
+ try:
+ self.project = self.tasklet._get_project(project)
+ except Exception as e:
+ self.log.error("Exception raised ...%s" % (str(e)))
+ self.log.exception(e)
+
+ self.package_store_map = self.build_store_map(project)
+ onboard_package = OnboardPackage(
log,
self.loop,
- part_streamer,
+ self.project,
+ url,
auth,
self.onboarder,
self.uploader,
self.package_store_map,
- ).start()
+ transaction_id
+ )
+
+ self.loop.run_in_executor(None, onboard_package.download_package)
- def update(self, part_streamer, transaction_id, auth=None):
+ def update(self, url, transaction_id, auth=None, project=None):
log = message.Logger(self.log, self.messages[transaction_id])
- UpdatePackage(
+ try:
+ self.project = self.tasklet._get_project(project)
+ except Exception as e:
+ self.log.error("Exception raised ...%s" % (str(e)))
+ self.log.exception(e)
+
+ self.package_store_map = self.build_store_map(project)
+ update_package = UpdatePackage(
log,
self.loop,
- part_streamer,
+ self.project,
+ url,
auth,
self.onboarder,
self.uploader,
self.package_store_map,
- ).start()
-
- @property
- def vnfd_catalog(self):
- return self.tasklet.vnfd_catalog
+ transaction_id
+ )
- @property
- def nsd_catalog(self):
- return self.tasklet.nsd_catalog
+ self.loop.run_in_executor(None, update_package.download_package)