X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwlaunchpad%2Fplugins%2Frwlaunchpadtasklet%2Frift%2Ftasklets%2Frwlaunchpad%2Fuploader.py;fp=rwlaunchpad%2Fplugins%2Frwlaunchpadtasklet%2Frift%2Ftasklets%2Frwlaunchpad%2Fuploader.py;h=7aafcb94298ab0ad51772916bc41b530423389b1;hb=4870d0ee29789b859931e4e2c73e13dcb29537d5;hp=e89c50a63639012af17acb9ba78da15c7a65fc7c;hpb=6f1a3fe149e4a6b9803382cb299c902f4cf58ec9;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwlaunchpadtasklet/rift/tasklets/rwlaunchpad/uploader.py b/rwlaunchpad/plugins/rwlaunchpadtasklet/rift/tasklets/rwlaunchpad/uploader.py index e89c50a6..7aafcb94 100644 --- a/rwlaunchpad/plugins/rwlaunchpadtasklet/rift/tasklets/rwlaunchpad/uploader.py +++ b/rwlaunchpad/plugins/rwlaunchpadtasklet/rift/tasklets/rwlaunchpad/uploader.py @@ -22,6 +22,7 @@ import tempfile import threading import uuid import zlib +import re import tornado import tornado.escape @@ -38,18 +39,16 @@ requests.packages.urllib3.disable_warnings(InsecureRequestWarning) import gi gi.require_version('RwLaunchpadYang', '1.0') -gi.require_version('NsdYang', '1.0') -gi.require_version('VnfdYang', '1.0') +gi.require_version('ProjectNsdYang', '1.0') +gi.require_version('ProjectVnfdYang', '1.0') from gi.repository import ( - NsdYang, - VnfdYang, + ProjectNsdYang as NsdYang, + ProjectVnfdYang as VnfdYang, ) import rift.mano.cloud -import rift.package.charm import rift.package.checksums -import rift.package.config import rift.package.convert import rift.package.handler as pkg_handler import rift.package.icon @@ -59,7 +58,8 @@ import rift.package.store from gi.repository import ( RwDts as rwdts, - RwPkgMgmtYang) + RwPkgMgmtYang + ) import rift.downloader as downloader import rift.mano.dts as mano_dts import rift.tasklets @@ -128,6 +128,8 @@ from .message import ( from .tosca import ExportTosca +from .onboard import OnboardError as OnboardException + MB = 1024 * 1024 GB = 1024 * MB @@ -137,8 +139,6 @@ MAX_STREAMED_SIZE = 5 * GB RPC_PACKAGE_CREATE_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_PackageCreate RPC_PACKAGE_UPDATE_ENDPOINT = RwPkgMgmtYang.YangOutput_RwPkgMgmt_PackageUpdate - - class HttpMessageError(Exception): def __init__(self, code, msg): self.code = code @@ -146,12 +146,12 @@ class HttpMessageError(Exception): class UploadRpcHandler(mano_dts.AbstractRpcHandler): - def __init__(self, log, dts, loop, application): + def __init__(self, application): """ Args: application: UploaderApplication """ - super().__init__(log, dts, loop) + super().__init__(application.log, application.dts, application.loop) self.application = application @property @@ -164,30 +164,41 @@ class UploadRpcHandler(mano_dts.AbstractRpcHandler): log = self.application.get_logger(transaction_id) log.message(OnboardStart()) + self.log.debug("Package create RPC: {}".format(msg)) auth = None if msg.username is not None: auth = (msg.username, msg.password) + try: + project = msg.project_name + except AttributeError as e: + self._log.warning("Did not get project name in RPC: {}". + format(msg.as_dict())) + project = rift.mano.utils.project.DEFAULT_PROJECT + self.application.onboard( msg.external_url, transaction_id, - auth=auth + auth=auth, + project=project, ) rpc_op = RPC_PACKAGE_CREATE_ENDPOINT.from_dict({ - "transaction_id": transaction_id}) + "transaction_id": transaction_id, + "project_name": project, + }) return rpc_op class UpdateRpcHandler(mano_dts.AbstractRpcHandler): - def __init__(self, log, dts, loop, application): + def __init__(self, application): """ Args: application: UploaderApplication """ - super().__init__(log, dts, loop) + super().__init__(application.log, application.dts, application.loop) self.application = application @property @@ -208,11 +219,14 @@ class UpdateRpcHandler(mano_dts.AbstractRpcHandler): self.application.update( msg.external_url, transaction_id, - auth=auth + auth=auth, + project=msg.project_name, ) rpc_op = RPC_PACKAGE_UPDATE_ENDPOINT.from_dict({ - "transaction_id": transaction_id}) + "transaction_id": transaction_id, + "project_name": msg.project_name, + }) return rpc_op @@ -231,16 +245,18 @@ class UpdateStateHandler(state.StateHandler): class UpdatePackage(downloader.DownloaderProtocol): - def __init__(self, log, loop, url, auth, - onboarder, uploader, package_store_map): + def __init__(self, log, loop, project, url, auth, + onboarder, uploader, package_store_map, transaction_id): super().__init__() self.log = log self.loop = loop + self.project = project self.url = url self.auth = auth self.onboarder = onboarder self.uploader = uploader self.package_store_map = package_store_map + self.transaction_id = transaction_id def _update_package(self, packages): @@ -251,14 +267,10 @@ class UpdatePackage(downloader.DownloaderProtocol): with pkg as temp_package: package_checksums = self.validate_package(temp_package) stored_package = self.update_package(temp_package) - self.validate_vnfd_fields(temp_package) + self.validate_descriptor_fields(temp_package) try: - self.extract_charms(temp_package) - self.extract_scripts(temp_package) - self.extract_configs(temp_package) self.extract_icons(temp_package) - self.update_descriptors(temp_package) except Exception: @@ -276,6 +288,7 @@ class UpdatePackage(downloader.DownloaderProtocol): except MessageException as e: self.log.message(e.msg) self.log.message(UpdateFailure()) + raise UpdateFailure(str(e)) except Exception as e: self.log.exception(e) @@ -290,8 +303,20 @@ class UpdatePackage(downloader.DownloaderProtocol): file_backed_packages = extractor.create_packages_from_upload( job.filename, job.filepath ) + try: + self.extract(file_backed_packages) + except Exception as e: + raise Exception("Error in Package Update") + + def on_download_finished(self, job): + self.log.debug("*** Download completed") + if hasattr(self.project, 'update_status_handler'): + self.project.update_status_handler.update_status(job, self.transaction_id) - self.extract(file_backed_packages) + def on_download_progress(self, job): + self.log.debug("*** Download in progress") + if hasattr(self.project, 'update_status_handler'): + self.project.update_status_handler.update_status(job, self.transaction_id) def on_download_failed(self, job): self.log.error(job.detail) @@ -355,7 +380,7 @@ class UpdatePackage(downloader.DownloaderProtocol): ) try: self.uploader.upload_image(image_name, image_checksum, image_hdl) - self.uploader.upload_image_to_cloud_accounts(image_name, image_checksum) + self.uploader.upload_image_to_cloud_accounts(image_name, image_checksum, self.project) except image.ImageUploadError as e: self.log.exception("Failed to upload image: %s", image_name) @@ -364,27 +389,6 @@ class UpdatePackage(downloader.DownloaderProtocol): finally: _ = [image_hdl.close() for image_hdl in name_hdl_map.values()] - def extract_charms(self, package): - try: - charm_extractor = rift.package.charm.PackageCharmExtractor(self.log) - charm_extractor.extract_charms(package) - except rift.package.charm.CharmExtractionError as e: - raise MessageException(UpdateExtractionError()) from e - - def extract_scripts(self, package): - try: - script_extractor = rift.package.script.PackageScriptExtractor(self.log) - script_extractor.extract_scripts(package) - except rift.package.script.ScriptExtractionError as e: - raise MessageException(UpdateExtractionError()) from e - - def extract_configs(self, package): - try: - config_extractor = rift.package.config.PackageConfigExtractor(self.log) - config_extractor.extract_configs(package) - except rift.package.config.ConfigExtractionError as e: - raise MessageException(UpdateExtractionError()) from e - def extract_icons(self, package): try: icon_extractor = rift.package.icon.PackageIconExtractor(self.log) @@ -392,7 +396,7 @@ class UpdatePackage(downloader.DownloaderProtocol): except rift.package.icon.IconExtractionError as e: raise MessageException(UpdateExtractionError()) from e - def validate_vnfd_fields(self, package): + def validate_descriptor_fields(self, package): # We can add more VNFD validations here. Currently we are validating only cloud-init if package.descriptor_msg is not None: self.validate_cloud_init_file(package) @@ -427,22 +431,24 @@ class UpdatePackage(downloader.DownloaderProtocol): self.log.message(UpdateDescriptorUpdate()) try: - self.onboarder.update(descriptor_msg) + self.onboarder.update(descriptor_msg, project=self.project.name) except onboard.UpdateError as e: raise MessageException(UpdateDescriptorError(package.descriptor_file)) from e class OnboardPackage(downloader.DownloaderProtocol): - def __init__(self, log, loop, url, auth, - onboarder, uploader, package_store_map): + def __init__(self, log, loop, project, url, auth, + onboarder, uploader, package_store_map, transaction_id): self.log = log self.loop = loop + self.project = project self.url = url self.auth = auth self.onboarder = onboarder self.uploader = uploader self.package_store_map = package_store_map + self.transaction_id = transaction_id def _onboard_package(self, packages): # Extract package could return multiple packages if @@ -451,20 +457,16 @@ class OnboardPackage(downloader.DownloaderProtocol): with pkg as temp_package: package_checksums = self.validate_package(temp_package) stored_package = self.store_package(temp_package) - self.validate_vnfd_fields(temp_package) + self.validate_descriptor_fields(temp_package) try: - self.extract_charms(temp_package) - self.extract_scripts(temp_package) - self.extract_configs(temp_package) self.extract_icons(temp_package) - self.onboard_descriptors(temp_package) - except Exception: - self.delete_stored_package(stored_package) + except Exception as e: + if "data-exists" not in e.msg.text: + self.delete_stored_package(stored_package) raise - else: self.upload_images(temp_package, package_checksums) @@ -476,6 +478,8 @@ class OnboardPackage(downloader.DownloaderProtocol): except MessageException as e: self.log.message(e.msg) self.log.message(OnboardFailure()) + raise OnboardException(OnboardFailure()) + except Exception as e: self.log.exception(e) @@ -490,8 +494,20 @@ class OnboardPackage(downloader.DownloaderProtocol): file_backed_packages = extractor.create_packages_from_upload( job.filename, job.filepath ) + try: + self.extract(file_backed_packages) + except Exception as e: + raise Exception("Error in Onboarding Package") + + def on_download_finished(self, job): + self.log.debug("*** Download completed") + if hasattr(self.project, 'upload_status_handler'): + self.project.upload_status_handler.upload_status(job, self.transaction_id) - self.extract(file_backed_packages) + def on_download_progress(self, job): + self.log.debug("*** Download in progress") + if hasattr(self.project, 'upload_status_handler'): + self.project.upload_status_handler.upload_status(job, self.transaction_id) def on_download_failed(self, job): self.log.error(job.detail) @@ -500,6 +516,7 @@ class OnboardPackage(downloader.DownloaderProtocol): def download_package(self): + self.log.debug("Before pkg download, project = {}".format(self.project.name)) _, filename = tempfile.mkstemp() url_downloader = downloader.UrlDownloader( self.url, @@ -551,36 +568,16 @@ class OnboardPackage(downloader.DownloaderProtocol): package.open(image_file_map[image_name]) ) try: - self.uploader.upload_image(image_name, image_checksum, image_hdl) - self.uploader.upload_image_to_cloud_accounts(image_name, image_checksum) + set_image_property = {} + self.uploader.upload_image(image_name, image_checksum, image_hdl, set_image_property) + self.uploader.upload_image_to_cloud_accounts(image_name, image_checksum, self.project.name) except image.ImageUploadError as e: - raise MessageException(OnboardImageUploadError()) from e + raise MessageException(OnboardImageUploadError(str(e))) from e finally: _ = [image_hdl.close() for image_hdl in name_hdl_map.values()] - def extract_charms(self, package): - try: - charm_extractor = rift.package.charm.PackageCharmExtractor(self.log) - charm_extractor.extract_charms(package) - except rift.package.charm.CharmExtractionError as e: - raise MessageException(OnboardExtractionError()) from e - - def extract_scripts(self, package): - try: - script_extractor = rift.package.script.PackageScriptExtractor(self.log) - script_extractor.extract_scripts(package) - except rift.package.script.ScriptExtractionError as e: - raise MessageException(OnboardExtractionError()) from e - - def extract_configs(self, package): - try: - config_extractor = rift.package.config.PackageConfigExtractor(self.log) - config_extractor.extract_configs(package) - except rift.package.config.ConfigExtractionError as e: - raise MessageException(OnboardExtractionError()) from e - def extract_icons(self, package): try: icon_extractor = rift.package.icon.PackageIconExtractor(self.log) @@ -588,10 +585,23 @@ class OnboardPackage(downloader.DownloaderProtocol): except rift.package.icon.IconExtractionError as e: raise MessageException(OnboardExtractionError()) from e - def validate_vnfd_fields(self, package): - # We can add more VNFD validations here. Currently we are validating only cloud-init + def validate_descriptor_fields(self, package): + # We can add more VNFD/NSD validations here. if package.descriptor_msg is not None: self.validate_cloud_init_file(package) + self.validate_vld_mgmt_network(package) + + def validate_vld_mgmt_network(self, package): + """ This is validation at onboarding of NSD for atleast one of the VL's to have mgmt network true + and have minimum one connection point""" + if package.descriptor_type == 'nsd': + for vld in package.descriptor_msg.as_dict().get('vld',[]): + if vld.get('mgmt_network', False) is True and \ + len(vld.get('vnfd_connection_point_ref',[])) > 0 : + break + else: + self.log.error(("AtLeast One of the VL's should have Management Network as True " + "and have minimum one connection point")) def validate_cloud_init_file(self, package): """ This validation is for VNFDs with associated VDUs. """ @@ -624,14 +634,37 @@ class OnboardPackage(downloader.DownloaderProtocol): return validators[0].checksums def onboard_descriptors(self, package): - descriptor_msg = package.descriptor_msg + def process_error_messsage(exception, package): + """ + This method captures error reason. This needs to be enhanced with time. + """ + exception_msg = str(exception) + match_duplicate = re.findall('(.*?)', exception_msg, re.DOTALL) + + if len(match_duplicate) > 0: + error_message = str(match_duplicate[0]) + return error_message + + match = re.findall('(.*?)', exception_msg, re.DOTALL) + error_message = "" + if len(match) > 0: + for element in match: + element_message = "Missing element : {}".format(element) + error_message += element_message + else: + error_message = package.descriptor_file + return error_message + + def process_exception(exception, package): + return OnboardDescriptorError(process_error_messsage(exception, package)) + descriptor_msg = package.descriptor_msg self.log.message(OnboardDescriptorOnboard()) try: - self.onboarder.onboard(descriptor_msg) + self.onboarder.onboard(descriptor_msg, project=self.project.name) except onboard.OnboardError as e: - raise MessageException(OnboardDescriptorError(package.descriptor_file)) from e + raise MessageException(process_exception(e, package)) from e class UploaderApplication(tornado.web.Application): @@ -643,29 +676,22 @@ class UploaderApplication(tornado.web.Application): ssl_cert = manifest.bootstrap_phase.rwsecurity.cert ssl_key = manifest.bootstrap_phase.rwsecurity.key return cls( - tasklet.log, - tasklet.dts, - tasklet.loop, - ssl=(ssl_cert, ssl_key), - vnfd_store=tasklet.vnfd_package_store, - nsd_store=tasklet.nsd_package_store, - vnfd_catalog=tasklet.vnfd_catalog, - nsd_catalog=tasklet.nsd_catalog) + tasklet, + ssl=(ssl_cert, ssl_key)) def __init__( self, - log, - dts, - loop, + tasklet, ssl=None, vnfd_store=None, - nsd_store=None, - vnfd_catalog=None, - nsd_catalog=None): + nsd_store=None): - self.log = log - self.loop = loop - self.dts = dts + self.log = tasklet.log + self.loop = tasklet.loop + self.dts = tasklet.dts + + self.accounts = {} + self.ro_accounts = {} self.use_ssl = False self.ssl_cert, self.ssl_key = None, None @@ -673,55 +699,36 @@ class UploaderApplication(tornado.web.Application): self.use_ssl = True self.ssl_cert, self.ssl_key = ssl - if not vnfd_store: - vnfd_store = rift.package.store.VnfdPackageFilesystemStore(self.log) - - if not nsd_store: - nsd_store = rift.package.store.NsdPackageFilesystemStore(self.log) - - self.accounts = [] self.messages = collections.defaultdict(list) - self.export_dir = os.path.join(os.environ['RIFT_ARTIFACTS'], 'launchpad/exports') + self.export_dir = os.path.join(os.environ['RIFT_VAR_ROOT'], 'launchpad/exports') self.uploader = image.ImageUploader(self.log, self.loop, self.dts) self.onboarder = onboard.DescriptorOnboarder( self.log, "127.0.0.1", 8008, self.use_ssl, self.ssl_cert, self.ssl_key ) - self.package_store_map = { - "vnfd": vnfd_store, - "nsd": nsd_store - } self.exporter = export.DescriptorPackageArchiveExporter(self.log) self.loop.create_task(export.periodic_export_cleanup(self.log, self.loop, self.export_dir)) - self.vnfd_catalog = vnfd_catalog - self.nsd_catalog = nsd_catalog + self.tasklet = tasklet + self.get_vnfd_catalog = tasklet.get_vnfd_catalog + self.get_nsd_catalog = tasklet.get_nsd_catalog catalog_map = { - "vnfd": self.vnfd_catalog, - "nsd": self.nsd_catalog + "vnfd": self.get_vnfd_catalog, + "nsd": self.get_nsd_catalog } - self.upload_handler = UploadRpcHandler(self.log, self.dts, self.loop, self) - self.update_handler = UpdateRpcHandler(self.log, self.dts, self.loop, self) - self.export_handler = export.ExportRpcHandler( - self.log, - self.dts, - self.loop, - self, - store_map=self.package_store_map, - exporter=self.exporter, - onboarder=self.onboarder, - catalog_map=catalog_map - ) + self.upload_handler = UploadRpcHandler(self) + self.update_handler = UpdateRpcHandler(self) + self.export_handler = export.ExportRpcHandler(self, catalog_map) attrs = dict(log=self.log, loop=self.loop) super(UploaderApplication, self).__init__([ (r"/api/package/vnfd/(.*)", pkg_handler.FileRestApiHandler, { - 'path': vnfd_store.root_dir}), + 'path': rift.package.store.VnfdPackageFilesystemStore.DEFAULT_ROOT_DIR}), (r"/api/package/nsd/(.*)", pkg_handler.FileRestApiHandler, { - 'path': nsd_store.root_dir}), + 'path': rift.package.store.NsdPackageFilesystemStore.DEFAULT_ROOT_DIR}), (r"/api/upload/([^/]+)/state", UploadStateHandler, attrs), (r"/api/update/([^/]+)/state", UpdateStateHandler, attrs), @@ -744,33 +751,61 @@ class UploaderApplication(tornado.web.Application): def get_logger(self, transaction_id): return message.Logger(self.log, self.messages[transaction_id]) - def onboard(self, url, transaction_id, auth=None): + def build_store_map(self, project=None): + ''' Use project information to build vnfd/nsd filesystem stores with appropriate + package directory root. + ''' + vnfd_store = rift.package.store.VnfdPackageFilesystemStore(self.log) if not \ + project else rift.package.store.VnfdPackageFilesystemStore(self.log, project=project) + nsd_store = rift.package.store.NsdPackageFilesystemStore(self.log) if not \ + project else rift.package.store.NsdPackageFilesystemStore(self.log, project=project) + + return dict(vnfd = vnfd_store, nsd = nsd_store) + + def onboard(self, url, transaction_id, auth=None, project=None): log = message.Logger(self.log, self.messages[transaction_id]) + try: + self.project = self.tasklet._get_project(project) + except Exception as e: + self.log.error("Exception raised ...%s" % (str(e))) + self.log.exception(e) + + self.package_store_map = self.build_store_map(project) onboard_package = OnboardPackage( log, self.loop, + self.project, url, auth, self.onboarder, self.uploader, self.package_store_map, + transaction_id ) self.loop.run_in_executor(None, onboard_package.download_package) - def update(self, url, transaction_id, auth=None): + def update(self, url, transaction_id, auth=None, project=None): log = message.Logger(self.log, self.messages[transaction_id]) + try: + self.project = self.tasklet._get_project(project) + except Exception as e: + self.log.error("Exception raised ...%s" % (str(e))) + self.log.exception(e) + + self.package_store_map = self.build_store_map(project) update_package = UpdatePackage( log, self.loop, + self.project, url, auth, self.onboarder, self.uploader, self.package_store_map, + transaction_id ) self.loop.run_in_executor(None, update_package.download_package) -