X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2Fcommon.git;a=blobdiff_plain;f=osm_common%2Fsol004_package.py;fp=osm_common%2Fsol004_package.py;h=813e52de1a5e0cc2b5d74e55169f24a83c949b3d;hp=e6b40b450d58d46bc63c25da2e96662a13cab03a;hb=98fc8f01d18d1a05c16fed7ccee355611a6e20ce;hpb=2644b76248a1b96f7a47013b414e31b4e3feecf8 diff --git a/osm_common/sol004_package.py b/osm_common/sol004_package.py index e6b40b4..813e52d 100644 --- a/osm_common/sol004_package.py +++ b/osm_common/sol004_package.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -# Copyright 2020 Whitestack, LLC +# Copyright 2021 Whitestack, LLC # ************************************************************* # # This file is part of OSM common repository. @@ -19,7 +19,7 @@ # under the License. # # For those usages not covered by the Apache License, Version 2.0 please -# contact: agarcia@whitestack.com +# contact: agarcia@whitestack.com or fbravo@whitestack.com ## """Python module for interacting with ETSI GS NFV-SOL004 compliant packages @@ -56,181 +56,70 @@ native_charm_vnf/ native_charm_vnf/ """ import yaml +import datetime import os -import hashlib - - -_METADATA_FILE_PATH = "TOSCA-Metadata/TOSCA.meta" -_METADATA_DESCRIPTOR_FIELD = "Entry-Definitions" -_METADATA_MANIFEST_FIELD = "ETSI-Entry-Manifest" -_METADATA_CHANGELOG_FIELD = "ETSI-Entry-Change-Log" -_METADATA_LICENSES_FIELD = "ETSI-Entry-Licenses" -_METADATA_DEFAULT_CHANGELOG_PATH = "ChangeLog.txt" -_METADATA_DEFAULT_LICENSES_PATH = "Licenses" -_MANIFEST_FILE_PATH_FIELD = "Source" -_MANIFEST_FILE_HASH_ALGORITHM_FIELD = "Algorithm" -_MANIFEST_FILE_HASH_DIGEST_FIELD = "Hash" +from .sol_package import SOLPackage class SOL004PackageException(Exception): pass -class SOL004Package: - def __init__(self, package_path=""): - self._package_path = package_path - self._package_metadata = self._parse_package_metadata() - self._manifest_data = self._parse_manifest_data() - - def _parse_package_metadata(self): - try: - return self._parse_package_metadata_with_metadata_dir() - except FileNotFoundError: - return self._parse_package_metadata_without_metadata_dir() - - def _parse_package_metadata_with_metadata_dir(self): - try: - return self._parse_file_in_blocks(_METADATA_FILE_PATH) - except FileNotFoundError as e: - raise e - except (Exception, OSError) as e: - raise SOL004PackageException( - "Error parsing {}: {}".format(_METADATA_FILE_PATH, e) - ) - - def _parse_package_metadata_without_metadata_dir(self): - package_root_files = {f for f in os.listdir(self._package_path)} - package_root_yamls = [ - f for f in package_root_files if f.endswith(".yml") or f.endswith(".yaml") - ] - if len(package_root_yamls) != 1: - error_msg = "Error parsing package metadata: there should be exactly 1 descriptor YAML, found {}" - raise SOL004PackageException(error_msg.format(len(package_root_yamls))) - # TODO: Parse extra metadata from descriptor YAML? - return [ - { - _METADATA_DESCRIPTOR_FIELD: package_root_yamls[0], - _METADATA_MANIFEST_FIELD: "{}.mf".format( - os.path.splitext(package_root_yamls[0])[0] - ), - _METADATA_CHANGELOG_FIELD: _METADATA_DEFAULT_CHANGELOG_PATH, - _METADATA_LICENSES_FIELD: _METADATA_DEFAULT_LICENSES_PATH, - } - ] - - def _parse_manifest_data(self): - manifest_path = None - for tosca_meta in self._package_metadata: - if _METADATA_MANIFEST_FIELD in tosca_meta: - manifest_path = tosca_meta[_METADATA_MANIFEST_FIELD] - break - else: - error_msg = "Error parsing {}: no {} field on path".format( - _METADATA_FILE_PATH, _METADATA_MANIFEST_FIELD - ) - raise SOL004PackageException(error_msg) - - try: - return self._parse_file_in_blocks(manifest_path) - except (Exception, OSError) as e: - raise SOL004PackageException( - "Error parsing {}: {}".format(manifest_path, e) - ) +class SOL004Package(SOLPackage): + _MANIFEST_VNFD_ID = "vnfd_id" + _MANIFEST_VNFD_PRODUCT_NAME = "vnfd_product_name" + _MANIFEST_VNFD_PROVIDER_ID = "vnfd_provider_id" + _MANIFEST_VNFD_SOFTWARE_VERSION = "vnfd_software_version" + _MANIFEST_VNFD_PACKAGE_VERSION = "vnfd_package_version" + _MANIFEST_VNFD_RELEASE_DATE_TIME = "vnfd_release_date_time" + _MANIFEST_VNFD_COMPATIBLE_SPECIFICATION_VERSIONS = ( + "compatible_specification_versions" + ) + _MANIFEST_VNFM_INFO = "vnfm_info" + + _MANIFEST_ALL_FIELDS = [ + _MANIFEST_VNFD_ID, + _MANIFEST_VNFD_PRODUCT_NAME, + _MANIFEST_VNFD_PROVIDER_ID, + _MANIFEST_VNFD_SOFTWARE_VERSION, + _MANIFEST_VNFD_PACKAGE_VERSION, + _MANIFEST_VNFD_RELEASE_DATE_TIME, + _MANIFEST_VNFD_COMPATIBLE_SPECIFICATION_VERSIONS, + _MANIFEST_VNFM_INFO, + ] - def _get_package_file_full_path(self, file_relative_path): - return os.path.join(self._package_path, file_relative_path) - - def _parse_file_in_blocks(self, file_relative_path): - file_path = self._get_package_file_full_path(file_relative_path) - with open(file_path) as f: - blocks = f.read().split("\n\n") - parsed_blocks = map(yaml.safe_load, blocks) - return [block for block in parsed_blocks if block is not None] - - def _get_package_file_manifest_data(self, file_relative_path): - for file_data in self._manifest_data: - if file_data.get(_MANIFEST_FILE_PATH_FIELD, "") == file_relative_path: - return file_data + def __init__(self, package_path=""): + super().__init__(package_path) - error_msg = ( - "Error parsing {} manifest data: file not found on manifest file".format( - file_relative_path - ) + def generate_manifest_data_from_descriptor(self): + descriptor_path = os.path.join( + self._package_path, self.get_descriptor_location() ) - raise SOL004PackageException(error_msg) - - def get_package_file_hash_digest_from_manifest(self, file_relative_path): - """Returns the hash digest of a file inside this package as specified on the manifest file.""" - file_manifest_data = self._get_package_file_manifest_data(file_relative_path) - try: - return file_manifest_data[_MANIFEST_FILE_HASH_DIGEST_FIELD] - except Exception as e: - raise SOL004PackageException( - "Error parsing {} hash digest: {}".format(file_relative_path, e) + with open(descriptor_path, "r") as descriptor: + try: + vnfd_data = yaml.safe_load(descriptor)["vnfd"] + except yaml.YAMLError as e: + print("Error reading descriptor {}: {}".format(descriptor_path, e)) + return + + self._manifest_metadata = {} + self._manifest_metadata[self._MANIFEST_VNFD_ID] = vnfd_data.get( + "id", "default-id" ) - - def get_package_file_hash_algorithm_from_manifest(self, file_relative_path): - """Returns the hash algorithm of a file inside this package as specified on the manifest file.""" - file_manifest_data = self._get_package_file_manifest_data(file_relative_path) - try: - return file_manifest_data[_MANIFEST_FILE_HASH_ALGORITHM_FIELD] - except Exception as e: - raise SOL004PackageException( - "Error parsing {} hash digest: {}".format(file_relative_path, e) + self._manifest_metadata[self._MANIFEST_VNFD_PRODUCT_NAME] = vnfd_data.get( + "product-name", "default-product-name" ) - - @staticmethod - def _get_hash_function_from_hash_algorithm(hash_algorithm): - function_to_algorithm = {"SHA-256": hashlib.sha256, "SHA-512": hashlib.sha512} - if hash_algorithm not in function_to_algorithm: - error_msg = ( - "Error checking hash function: hash algorithm {} not supported".format( - hash_algorithm - ) + self._manifest_metadata[self._MANIFEST_VNFD_PROVIDER_ID] = vnfd_data.get( + "provider", "OSM" ) - raise SOL004PackageException(error_msg) - return function_to_algorithm[hash_algorithm] - - def _calculate_file_hash(self, file_relative_path, hash_algorithm): - file_path = self._get_package_file_full_path(file_relative_path) - hash_function = self._get_hash_function_from_hash_algorithm(hash_algorithm) - try: - with open(file_path, "rb") as f: - return hash_function(f.read()).hexdigest() - except Exception as e: - raise SOL004PackageException( - "Error hashing {}: {}".format(file_relative_path, e) + self._manifest_metadata[ + self._MANIFEST_VNFD_SOFTWARE_VERSION + ] = vnfd_data.get("version", "1.0") + self._manifest_metadata[self._MANIFEST_VNFD_PACKAGE_VERSION] = "1.0.0" + self._manifest_metadata[self._MANIFEST_VNFD_RELEASE_DATE_TIME] = ( + datetime.datetime.now().astimezone().isoformat() ) - - def validate_package_file_hash(self, file_relative_path): - """Validates the integrity of a file using the hash algorithm and digest on the package manifest.""" - hash_algorithm = self.get_package_file_hash_algorithm_from_manifest( - file_relative_path - ) - file_hash = self._calculate_file_hash(file_relative_path, hash_algorithm) - expected_file_hash = self.get_package_file_hash_digest_from_manifest( - file_relative_path - ) - if file_hash != expected_file_hash: - error_msg = "Error validating {} hash: calculated hash {} is different than manifest hash {}" - raise SOL004PackageException( - error_msg.format(file_relative_path, file_hash, expected_file_hash) - ) - - def validate_package_hashes(self): - """Validates the integrity of all files listed on the package manifest.""" - for file_data in self._manifest_data: - if _MANIFEST_FILE_PATH_FIELD in file_data: - file_relative_path = file_data[_MANIFEST_FILE_PATH_FIELD] - self.validate_package_file_hash(file_relative_path) - - def get_descriptor_location(self): - """Returns this package descriptor location as a relative path from the package root.""" - for tosca_meta in self._package_metadata: - if _METADATA_DESCRIPTOR_FIELD in tosca_meta: - return tosca_meta[_METADATA_DESCRIPTOR_FIELD] - - error_msg = "Error: no {} entry found on {}".format( - _METADATA_DESCRIPTOR_FIELD, _METADATA_FILE_PATH - ) - raise SOL004PackageException(error_msg) + self._manifest_metadata[ + self._MANIFEST_VNFD_COMPATIBLE_SPECIFICATION_VERSIONS + ] = "2.7.1" + self._manifest_metadata[self._MANIFEST_VNFM_INFO] = "OSM"