X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwlaunchpad%2Fplugins%2Frwlaunchpadtasklet%2Frift%2Ftasklets%2Frwlaunchpad%2Fuploader.py;fp=rwlaunchpad%2Fplugins%2Frwlaunchpadtasklet%2Frift%2Ftasklets%2Frwlaunchpad%2Fuploader.py;h=081c1f5c84c72617873c7d6534b2b767626afbc6;hb=6f07e6f33f751ab4ffe624f6037f887b243bece2;hp=0000000000000000000000000000000000000000;hpb=72a563886272088feb7cb52e4aafbe6d2c580ff9;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwlaunchpadtasklet/rift/tasklets/rwlaunchpad/uploader.py b/rwlaunchpad/plugins/rwlaunchpadtasklet/rift/tasklets/rwlaunchpad/uploader.py new file mode 100644 index 00000000..081c1f5c --- /dev/null +++ b/rwlaunchpad/plugins/rwlaunchpadtasklet/rift/tasklets/rwlaunchpad/uploader.py @@ -0,0 +1,881 @@ + +# +# Copyright 2016 RIFT.IO Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import collections +import os +import threading +import uuid +import zlib + +import tornado +import tornado.escape +import tornado.ioloop +import tornado.web +import tornado.httputil +import tornadostreamform.multipart_streamer as multipart_streamer + +import requests + +# disable unsigned certificate warning +from requests.packages.urllib3.exceptions import InsecureRequestWarning +requests.packages.urllib3.disable_warnings(InsecureRequestWarning) + +import gi +gi.require_version('RwLaunchpadYang', '1.0') +gi.require_version('NsdYang', '1.0') +gi.require_version('VnfdYang', '1.0') + +from gi.repository import ( + NsdYang, + VnfdYang, + ) +import rift.mano.cloud + +import rift.package.charm +import rift.package.checksums +import rift.package.config +import rift.package.convert +import rift.package.icon +import rift.package.package +import rift.package.script +import rift.package.store + +from . import ( + export, + extract, + image, + message, + onboard, + state, + ) + +from .message import ( + MessageException, + + # Onboard Error Messages + OnboardChecksumMismatch, + OnboardDescriptorError, + OnboardDescriptorExistsError, + OnboardDescriptorFormatError, + OnboardError, + OnboardExtractionError, + OnboardImageUploadError, + OnboardMissingContentBoundary, + OnboardMissingContentType, + OnboardMissingTerminalBoundary, + OnboardUnreadableHeaders, + OnboardUnreadablePackage, + OnboardUnsupportedMediaType, + + # Onboard Status Messages + OnboardDescriptorOnboard, + OnboardFailure, + OnboardImageUpload, + OnboardPackageUpload, + OnboardPackageValidation, + OnboardStart, + OnboardSuccess, + + + # Update Error Messages + UpdateChecksumMismatch, + UpdateDescriptorError, + UpdateDescriptorFormatError, + UpdateError, + UpdateExtractionError, + UpdateImageUploadError, + UpdateMissingContentBoundary, + UpdateMissingContentType, + UpdatePackageNotFoundError, + UpdateUnreadableHeaders, + UpdateUnreadablePackage, + UpdateUnsupportedMediaType, + + # Update Status Messages + UpdateDescriptorUpdate, + UpdateDescriptorUpdated, + UpdatePackageUpload, + UpdateStart, + UpdateSuccess, + UpdateFailure, + ) + +from .tosca import ExportTosca + +MB = 1024 * 1024 +GB = 1024 * MB + +MAX_STREAMED_SIZE = 5 * GB + + +class HttpMessageError(Exception): + def __init__(self, code, msg): + self.code = code + self.msg = msg + + +class GzipTemporaryFileStreamedPart(multipart_streamer.TemporaryFileStreamedPart): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + # Create a decompressor for gzip data to decompress on the fly during upload + # http://stackoverflow.com/questions/2423866/python-decompressing-gzip-chunk-by-chunk + self._decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS) + + def feed(self, data): + decompressed_data = self._decompressor.decompress(data) + if decompressed_data: + super().feed(decompressed_data) + + def finalize(self): + # All data has arrived, flush the decompressor to get any last decompressed data + decompressed_data = self._decompressor.flush() + super().feed(decompressed_data) + super().finalize() + + +class GzipMultiPartStreamer(multipart_streamer.MultiPartStreamer): + """ This Multipart Streamer decompresses gzip files on the fly during multipart upload """ + + @staticmethod + def _get_descriptor_name_from_headers(headers): + descriptor_filename = None + + for entry in headers: + if entry["value"] != "form-data": + continue + + form_data_params = entry["params"] + if "name" in form_data_params: + if form_data_params["name"] != "descriptor": + continue + + if "filename" not in form_data_params: + continue + + descriptor_filename = form_data_params["filename"] + + return descriptor_filename + + def create_part(self, headers): + """ Create the StreamedPart subclass depending on the descriptor filename + + For gzipped descriptor packages, create a GzipTemporaryFileStreamedPart which + can decompress the gzip while it's being streamed into the launchpad directely + into a file. + + Returns: + The descriptor filename + """ + filename = GzipMultiPartStreamer._get_descriptor_name_from_headers(headers) + if filename is None or not filename.endswith(".gz"): + return multipart_streamer.TemporaryFileStreamedPart(self, headers) + + return GzipTemporaryFileStreamedPart(self, headers) + + +class RequestHandler(tornado.web.RequestHandler): + def options(self, *args, **kargs): + pass + + def set_default_headers(self): + self.set_header('Access-Control-Allow-Origin', '*') + self.set_header('Access-Control-Allow-Headers', + 'Content-Type, Cache-Control, Accept, X-Requested-With, Authorization') + self.set_header('Access-Control-Allow-Methods', 'POST, GET, PUT, DELETE') + + +@tornado.web.stream_request_body +class StreamingUploadHandler(RequestHandler): + def initialize(self, log, loop): + """Initialize the handler + + Arguments: + log - the logger that this handler should use + loop - the tasklets ioloop + + """ + self.transaction_id = str(uuid.uuid4()) + + self.loop = loop + self.log = self.application.get_logger(self.transaction_id) + + self.part_streamer = None + + self.log.debug('created handler (transaction_id = {})'.format(self.transaction_id)) + + def msg_missing_content_type(self): + raise NotImplementedError() + + def msg_unsupported_media_type(self): + raise NotImplementedError() + + def msg_missing_content_boundary(self): + raise NotImplementedError() + + def msg_start(self): + raise NotImplementedError() + + def msg_success(self): + raise NotImplementedError() + + def msg_failure(self): + raise NotImplementedError() + + def msg_package_upload(self): + raise NotImplementedError() + + @tornado.gen.coroutine + def prepare(self): + """Prepare the handler for a request + + The prepare function is the first part of a request transaction. It + creates a temporary file that uploaded data can be written to. + + """ + if self.request.method != "POST": + return + + self.request.connection.set_max_body_size(MAX_STREAMED_SIZE) + + self.log.message(self.msg_start()) + + try: + # Retrieve the content type and parameters from the request + content_type = self.request.headers.get('content-type', None) + if content_type is None: + raise HttpMessageError(400, self.msg_missing_content_type()) + + content_type, params = tornado.httputil._parse_header(content_type) + + if "multipart/form-data" != content_type.lower(): + raise HttpMessageError(415, self.msg_unsupported_media_type()) + + if "boundary" not in params: + raise HttpMessageError(400, self.msg_missing_content_boundary()) + + # You can get the total request size from the headers. + try: + total = int(self.request.headers.get("Content-Length", "0")) + except KeyError: + self.log.warning("Content length header not found") + # For any well formed browser request, Content-Length should have a value. + total = 0 + + # And here you create a streamer that will accept incoming data + self.part_streamer = GzipMultiPartStreamer(total) + + except HttpMessageError as e: + self.log.message(e.msg) + self.log.message(self.msg_failure()) + + raise tornado.web.HTTPError(e.code, e.msg.name) + + except Exception as e: + self.log.exception(e) + self.log.message(self.msg_failure()) + + @tornado.gen.coroutine + def data_received(self, chunk): + """Write data to the current file + + Arguments: + data - a chunk of data to write to file + + """ + + """When a chunk of data is received, we forward it to the multipart streamer.""" + self.part_streamer.data_received(chunk) + + def post(self): + """Handle a post request + + The function is called after any data associated with the body of the + request has been received. + + """ + # You MUST call this to close the incoming stream. + self.part_streamer.data_complete() + + desc_parts = self.part_streamer.get_parts_by_name("descriptor") + if len(desc_parts) != 1: + raise HttpMessageError(400, OnboardError("Descriptor option not found")) + + self.log.message(self.msg_package_upload()) + + +class UploadHandler(StreamingUploadHandler): + """ + This handler is used to upload archives that contain VNFDs, NSDs, and PNFDs + to the launchpad. This is a streaming handler that writes uploaded archives + to disk without loading them all into memory. + """ + + def msg_missing_content_type(self): + return OnboardMissingContentType() + + def msg_unsupported_media_type(self): + return OnboardUnsupportedMediaType() + + def msg_missing_content_boundary(self): + return OnboardMissingContentBoundary() + + def msg_start(self): + return OnboardStart() + + def msg_success(self): + return OnboardSuccess() + + def msg_failure(self): + return OnboardFailure() + + def msg_package_upload(self): + return OnboardPackageUpload() + + def post(self): + """Handle a post request + + The function is called after any data associated with the body of the + request has been received. + + """ + try: + super().post() + self.application.onboard( + self.part_streamer, + self.transaction_id, + auth=self.request.headers.get('authorization', None), + ) + + self.set_status(200) + self.write(tornado.escape.json_encode({ + "transaction_id": self.transaction_id, + })) + + except Exception: + self.log.exception("Upload POST failed") + self.part_streamer.release_parts() + raise + + +class UpdateHandler(StreamingUploadHandler): + def msg_missing_content_type(self): + return UpdateMissingContentType() + + def msg_unsupported_media_type(self): + return UpdateUnsupportedMediaType() + + def msg_missing_content_boundary(self): + return UpdateMissingContentBoundary() + + def msg_start(self): + return UpdateStart() + + def msg_success(self): + return UpdateSuccess() + + def msg_failure(self): + return UpdateFailure() + + def msg_package_upload(self): + return UpdatePackageUpload() + + def post(self): + """Handle a post request + + The function is called after any data associated with the body of the + request has been received. + + """ + try: + super().post() + + self.application.update( + self.part_streamer, + self.transaction_id, + auth=self.request.headers.get('authorization', None), + ) + + self.set_status(200) + self.write(tornado.escape.json_encode({ + "transaction_id": self.transaction_id, + })) + except Exception: + self.log.exception("Upload POST failed") + self.part_streamer.release_parts() + raise + + +class UploadStateHandler(state.StateHandler): + STARTED = OnboardStart + SUCCESS = OnboardSuccess + FAILURE = OnboardFailure + + +class UpdateStateHandler(state.StateHandler): + STARTED = UpdateStart + SUCCESS = UpdateSuccess + FAILURE = UpdateFailure + + +class UpdatePackage(threading.Thread): + def __init__(self, log, loop, part_streamer, auth, + onboarder, uploader, package_store_map): + super().__init__() + self.log = log + self.loop = loop + self.part_streamer = part_streamer + self.auth = auth + self.onboarder = onboarder + self.uploader = uploader + self.package_store_map = package_store_map + + self.io_loop = tornado.ioloop.IOLoop.current() + + def _update_package(self): + # Extract package could return multiple packages if + # the package is converted + for pkg in self.extract_package(): + with pkg as temp_package: + package_checksums = self.validate_package(temp_package) + stored_package = self.update_package(temp_package) + + try: + self.extract_charms(temp_package) + self.extract_scripts(temp_package) + self.extract_configs(temp_package) + self.extract_icons(temp_package) + + self.update_descriptors(temp_package) + + except Exception: + self.delete_stored_package(stored_package) + raise + + else: + self.upload_images(temp_package, package_checksums) + + def run(self): + try: + self._update_package() + self.log.message(UpdateSuccess()) + + except MessageException as e: + self.log.message(e.msg) + self.log.message(UpdateFailure()) + + except Exception as e: + self.log.exception(e) + if str(e): + self.log.message(UpdateError(str(e))) + self.log.message(UpdateFailure()) + + def extract_package(self): + """Extract multipart message from tarball""" + desc_part = self.part_streamer.get_parts_by_name("descriptor")[0] + + # Invoke the move API to prevent the part streamer from attempting + # to clean up (the file backed package will do that itself) + desc_part.move(desc_part.f_out.name) + + package_name = desc_part.get_filename() + package_path = desc_part.f_out.name + + extractor = extract.UploadPackageExtractor(self.log) + file_backed_packages = extractor.create_packages_from_upload( + package_name, package_path + ) + + return file_backed_packages + + def get_package_store(self, package): + return self.package_store_map[package.descriptor_type] + + def update_package(self, package): + store = self.get_package_store(package) + + try: + store.update_package(package) + except rift.package.store.PackageNotFoundError as e: + # If the package doesn't exist, then it is possible the descriptor was onboarded + # out of band. In that case, just store the package as is + self.log.warning("Package not found, storing new package instead.") + store.store_package(package) + + stored_package = store.get_package(package.descriptor_id) + + return stored_package + + def delete_stored_package(self, package): + self.log.info("Deleting stored package: %s", package) + store = self.get_package_store(package) + try: + store.delete_package(package.descriptor_id) + except Exception as e: + self.log.warning("Failed to delete package from store: %s", str(e)) + + def upload_images(self, package, package_checksums): + image_file_map = rift.package.image.get_package_image_files(package) + name_hdl_map = {name: package.open(image_file_map[name]) for name in image_file_map} + if not image_file_map: + return + + try: + for image_name, image_hdl in name_hdl_map.items(): + image_file = image_file_map[image_name] + if image_file in package_checksums: + image_checksum = package_checksums[image_file] + else: + self.log.warning("checksum not provided for image %s. Calculating checksum", + image_file) + image_checksum = rift.package.checksums.checksum( + package.open(image_file_map[image_name]) + ) + try: + self.uploader.upload_image(image_name, image_checksum, image_hdl) + self.uploader.upload_image_to_cloud_accounts(image_name, image_checksum) + + except image.ImageUploadError as e: + self.log.exception("Failed to upload image: %s", image_name) + raise MessageException(OnboardImageUploadError(str(e))) from e + + finally: + _ = [image_hdl.close() for image_hdl in name_hdl_map.values()] + + + def extract_charms(self, package): + try: + charm_extractor = rift.package.charm.PackageCharmExtractor(self.log) + charm_extractor.extract_charms(package) + except rift.package.charm.CharmExtractionError as e: + raise MessageException(UpdateExtractionError()) from e + + def extract_scripts(self, package): + try: + script_extractor = rift.package.script.PackageScriptExtractor(self.log) + script_extractor.extract_scripts(package) + except rift.package.script.ScriptExtractionError as e: + raise MessageException(UpdateExtractionError()) from e + + def extract_configs(self, package): + try: + config_extractor = rift.package.config.PackageConfigExtractor(self.log) + config_extractor.extract_configs(package) + except rift.package.config.ConfigExtractionError as e: + raise MessageException(UpdateExtractionError()) from e + + def extract_icons(self, package): + try: + icon_extractor = rift.package.icon.PackageIconExtractor(self.log) + icon_extractor.extract_icons(package) + except rift.package.icon.IconExtractionError as e: + raise MessageException(UpdateExtractionError()) from e + + def validate_package(self, package): + checksum_validator = rift.package.package.PackageChecksumValidator(self.log) + + try: + file_checksums = checksum_validator.validate(package) + except rift.package.package.PackageFileChecksumError as e: + raise MessageException(UpdateChecksumMismatch(e.filename)) from e + except rift.package.package.PackageValidationError as e: + raise MessageException(UpdateUnreadablePackage()) from e + + return file_checksums + + def update_descriptors(self, package): + descriptor_msg = package.descriptor_msg + + self.log.message(UpdateDescriptorUpdate()) + + try: + self.onboarder.update(descriptor_msg) + except onboard.UpdateError as e: + raise MessageException(UpdateDescriptorError(package.descriptor_file)) from e + + +class OnboardPackage(threading.Thread): + def __init__(self, log, loop, part_streamer, auth, + onboarder, uploader, package_store_map): + super().__init__() + self.log = log + self.loop = loop + self.part_streamer = part_streamer + self.auth = auth + self.onboarder = onboarder + self.uploader = uploader + self.package_store_map = package_store_map + + self.io_loop = tornado.ioloop.IOLoop.current() + + def _onboard_package(self): + # Extract package could return multiple packages if + # the package is converted + for pkg in self.extract_package(): + with pkg as temp_package: + package_checksums = self.validate_package(temp_package) + stored_package = self.store_package(temp_package) + + try: + self.extract_charms(temp_package) + self.extract_scripts(temp_package) + self.extract_configs(temp_package) + self.extract_icons(temp_package) + + self.onboard_descriptors(temp_package) + + except Exception: + self.delete_stored_package(stored_package) + raise + + else: + self.upload_images(temp_package, package_checksums) + + def run(self): + try: + self._onboard_package() + self.log.message(OnboardSuccess()) + + except MessageException as e: + self.log.message(e.msg) + self.log.message(OnboardFailure()) + + except Exception as e: + self.log.exception(e) + if str(e): + self.log.message(OnboardError(str(e))) + self.log.message(OnboardFailure()) + + finally: + self.part_streamer.release_parts() + + def extract_package(self): + """Extract multipart message from tarball""" + desc_part = self.part_streamer.get_parts_by_name("descriptor")[0] + + # Invoke the move API to prevent the part streamer from attempting + # to clean up (the file backed package will do that itself) + desc_part.move(desc_part.f_out.name) + + package_name = desc_part.get_filename() + package_path = desc_part.f_out.name + + extractor = extract.UploadPackageExtractor(self.log) + file_backed_packages = extractor.create_packages_from_upload( + package_name, package_path + ) + + return file_backed_packages + + def get_package_store(self, package): + return self.package_store_map[package.descriptor_type] + + def store_package(self, package): + store = self.get_package_store(package) + + try: + store.store_package(package) + except rift.package.store.PackageExistsError as e: + store.update_package(package) + + stored_package = store.get_package(package.descriptor_id) + + return stored_package + + def delete_stored_package(self, package): + self.log.info("Deleting stored package: %s", package) + store = self.get_package_store(package) + try: + store.delete_package(package.descriptor_id) + except Exception as e: + self.log.warning("Failed to delete package from store: %s", str(e)) + + def upload_images(self, package, package_checksums): + image_file_map = rift.package.image.get_package_image_files(package) + if not image_file_map: + return + + name_hdl_map = {name: package.open(image_file_map[name]) for name in image_file_map} + try: + for image_name, image_hdl in name_hdl_map.items(): + image_file = image_file_map[image_name] + if image_file in package_checksums: + image_checksum = package_checksums[image_file] + else: + self.log.warning("checksum not provided for image %s. Calculating checksum", + image_file) + image_checksum = rift.package.checksums.checksum( + package.open(image_file_map[image_name]) + ) + try: + self.uploader.upload_image(image_name, image_checksum, image_hdl) + self.uploader.upload_image_to_cloud_accounts(image_name, image_checksum) + + except image.ImageUploadError as e: + raise MessageException(OnboardImageUploadError()) from e + + finally: + _ = [image_hdl.close() for image_hdl in name_hdl_map.values()] + + def extract_charms(self, package): + try: + charm_extractor = rift.package.charm.PackageCharmExtractor(self.log) + charm_extractor.extract_charms(package) + except rift.package.charm.CharmExtractionError as e: + raise MessageException(OnboardExtractionError()) from e + + def extract_scripts(self, package): + try: + script_extractor = rift.package.script.PackageScriptExtractor(self.log) + script_extractor.extract_scripts(package) + except rift.package.script.ScriptExtractionError as e: + raise MessageException(OnboardExtractionError()) from e + + def extract_configs(self, package): + try: + config_extractor = rift.package.config.PackageConfigExtractor(self.log) + config_extractor.extract_configs(package) + except rift.package.config.ConfigExtractionError as e: + raise MessageException(OnboardExtractionError()) from e + + def extract_icons(self, package): + try: + icon_extractor = rift.package.icon.PackageIconExtractor(self.log) + icon_extractor.extract_icons(package) + except rift.package.icon.IconExtractionError as e: + raise MessageException(OnboardExtractionError()) from e + + def validate_package(self, package): + checksum_validator = rift.package.package.PackageChecksumValidator(self.log) + + try: + file_checksums = checksum_validator.validate(package) + except rift.package.package.PackageFileChecksumError as e: + raise MessageException(OnboardChecksumMismatch(e.filename)) from e + except rift.package.package.PackageValidationError as e: + raise MessageException(OnboardUnreadablePackage()) from e + + return file_checksums + + def onboard_descriptors(self, package): + descriptor_msg = package.descriptor_msg + + self.log.message(OnboardDescriptorOnboard()) + + try: + self.onboarder.onboard(descriptor_msg) + except onboard.OnboardError as e: + raise MessageException(OnboardDescriptorError(package.descriptor_file)) from e + + +class UploaderApplication(tornado.web.Application): + def __init__(self, tasklet): + self.tasklet = tasklet + self.accounts = [] + self.messages = collections.defaultdict(list) + self.export_dir = os.path.join(os.environ['RIFT_ARTIFACTS'], 'launchpad/exports') + + manifest = tasklet.tasklet_info.get_pb_manifest() + self.use_ssl = manifest.bootstrap_phase.rwsecurity.use_ssl + self.ssl_cert = manifest.bootstrap_phase.rwsecurity.cert + self.ssl_key = manifest.bootstrap_phase.rwsecurity.key + + self.uploader = image.ImageUploader(self.log, self.loop, tasklet.dts) + self.onboarder = onboard.DescriptorOnboarder( + self.log, "127.0.0.1", 8008, self.use_ssl, self.ssl_cert, self.ssl_key + ) + self.package_store_map = { + "vnfd": self.tasklet.vnfd_package_store, + "nsd": self.tasklet.nsd_package_store, + } + + self.exporter = export.DescriptorPackageArchiveExporter(self.log) + self.loop.create_task(export.periodic_export_cleanup(self.log, self.loop, self.export_dir)) + + attrs = dict(log=self.log, loop=self.loop) + + export_attrs = attrs.copy() + export_attrs.update({ + "store_map": self.package_store_map, + "exporter": self.exporter, + "catalog_map": { + "vnfd": self.vnfd_catalog, + "nsd": self.nsd_catalog + } + }) + + super(UploaderApplication, self).__init__([ + (r"/api/update", UpdateHandler, attrs), + (r"/api/upload", UploadHandler, attrs), + + (r"/api/upload/([^/]+)/state", UploadStateHandler, attrs), + (r"/api/update/([^/]+)/state", UpdateStateHandler, attrs), + (r"/api/export/([^/]+)/state", export.ExportStateHandler, attrs), + + (r"/api/export/(nsd|vnfd)$", export.ExportHandler, export_attrs), + (r"/api/export/([^/]+.tar.gz)", tornado.web.StaticFileHandler, { + "path": self.export_dir, + }), + (r"/api/export/([^/]+.zip)", tornado.web.StaticFileHandler, { + "path": self.export_dir, + }), + ]) + + @property + def log(self): + return self.tasklet.log + + @property + def loop(self): + return self.tasklet.loop + + def get_logger(self, transaction_id): + return message.Logger(self.log, self.messages[transaction_id]) + + def onboard(self, part_streamer, transaction_id, auth=None): + log = message.Logger(self.log, self.messages[transaction_id]) + + OnboardPackage( + log, + self.loop, + part_streamer, + auth, + self.onboarder, + self.uploader, + self.package_store_map, + ).start() + + def update(self, part_streamer, transaction_id, auth=None): + log = message.Logger(self.log, self.messages[transaction_id]) + + UpdatePackage( + log, + self.loop, + part_streamer, + auth, + self.onboarder, + self.uploader, + self.package_store_map, + ).start() + + @property + def vnfd_catalog(self): + return self.tasklet.vnfd_catalog + + @property + def nsd_catalog(self): + return self.tasklet.nsd_catalog