# -*- coding: utf-8 -*-
-# Copyright 2020 Whitestack, LLC
+# Copyright 2021 Whitestack, LLC
# *************************************************************
#
# This file is part of OSM common repository.
# under the License.
#
# For those usages not covered by the Apache License, Version 2.0 please
-# contact: agarcia@whitestack.com
+# contact: agarcia@whitestack.com or fbravo@whitestack.com
##
"""Python module for interacting with ETSI GS NFV-SOL004 compliant packages
"""
import yaml
+import datetime
import os
-import hashlib
-
-
-_METADATA_FILE_PATH = "TOSCA-Metadata/TOSCA.meta"
-_METADATA_DESCRIPTOR_FIELD = "Entry-Definitions"
-_METADATA_MANIFEST_FIELD = "ETSI-Entry-Manifest"
-_METADATA_CHANGELOG_FIELD = "ETSI-Entry-Change-Log"
-_METADATA_LICENSES_FIELD = "ETSI-Entry-Licenses"
-_METADATA_DEFAULT_CHANGELOG_PATH = "ChangeLog.txt"
-_METADATA_DEFAULT_LICENSES_PATH = "Licenses"
-_MANIFEST_FILE_PATH_FIELD = "Source"
-_MANIFEST_FILE_HASH_ALGORITHM_FIELD = "Algorithm"
-_MANIFEST_FILE_HASH_DIGEST_FIELD = "Hash"
+from .sol_package import SOLPackage
class SOL004PackageException(Exception):
pass
-class SOL004Package:
- def __init__(self, package_path=""):
- self._package_path = package_path
- self._package_metadata = self._parse_package_metadata()
- self._manifest_data = self._parse_manifest_data()
-
- def _parse_package_metadata(self):
- try:
- return self._parse_package_metadata_with_metadata_dir()
- except FileNotFoundError:
- return self._parse_package_metadata_without_metadata_dir()
-
- def _parse_package_metadata_with_metadata_dir(self):
- try:
- return self._parse_file_in_blocks(_METADATA_FILE_PATH)
- except FileNotFoundError as e:
- raise e
- except (Exception, OSError) as e:
- raise SOL004PackageException(
- "Error parsing {}: {}".format(_METADATA_FILE_PATH, e)
- )
-
- def _parse_package_metadata_without_metadata_dir(self):
- package_root_files = {f for f in os.listdir(self._package_path)}
- package_root_yamls = [
- f for f in package_root_files if f.endswith(".yml") or f.endswith(".yaml")
- ]
- if len(package_root_yamls) != 1:
- error_msg = "Error parsing package metadata: there should be exactly 1 descriptor YAML, found {}"
- raise SOL004PackageException(error_msg.format(len(package_root_yamls)))
- # TODO: Parse extra metadata from descriptor YAML?
- return [
- {
- _METADATA_DESCRIPTOR_FIELD: package_root_yamls[0],
- _METADATA_MANIFEST_FIELD: "{}.mf".format(
- os.path.splitext(package_root_yamls[0])[0]
- ),
- _METADATA_CHANGELOG_FIELD: _METADATA_DEFAULT_CHANGELOG_PATH,
- _METADATA_LICENSES_FIELD: _METADATA_DEFAULT_LICENSES_PATH,
- }
- ]
-
- def _parse_manifest_data(self):
- manifest_path = None
- for tosca_meta in self._package_metadata:
- if _METADATA_MANIFEST_FIELD in tosca_meta:
- manifest_path = tosca_meta[_METADATA_MANIFEST_FIELD]
- break
- else:
- error_msg = "Error parsing {}: no {} field on path".format(
- _METADATA_FILE_PATH, _METADATA_MANIFEST_FIELD
- )
- raise SOL004PackageException(error_msg)
-
- try:
- return self._parse_file_in_blocks(manifest_path)
- except (Exception, OSError) as e:
- raise SOL004PackageException(
- "Error parsing {}: {}".format(manifest_path, e)
- )
+class SOL004Package(SOLPackage):
+ _MANIFEST_VNFD_ID = "vnfd_id"
+ _MANIFEST_VNFD_PRODUCT_NAME = "vnfd_product_name"
+ _MANIFEST_VNFD_PROVIDER_ID = "vnfd_provider_id"
+ _MANIFEST_VNFD_SOFTWARE_VERSION = "vnfd_software_version"
+ _MANIFEST_VNFD_PACKAGE_VERSION = "vnfd_package_version"
+ _MANIFEST_VNFD_RELEASE_DATE_TIME = "vnfd_release_date_time"
+ _MANIFEST_VNFD_COMPATIBLE_SPECIFICATION_VERSIONS = (
+ "compatible_specification_versions"
+ )
+ _MANIFEST_VNFM_INFO = "vnfm_info"
+
+ _MANIFEST_ALL_FIELDS = [
+ _MANIFEST_VNFD_ID,
+ _MANIFEST_VNFD_PRODUCT_NAME,
+ _MANIFEST_VNFD_PROVIDER_ID,
+ _MANIFEST_VNFD_SOFTWARE_VERSION,
+ _MANIFEST_VNFD_PACKAGE_VERSION,
+ _MANIFEST_VNFD_RELEASE_DATE_TIME,
+ _MANIFEST_VNFD_COMPATIBLE_SPECIFICATION_VERSIONS,
+ _MANIFEST_VNFM_INFO,
+ ]
- def _get_package_file_full_path(self, file_relative_path):
- return os.path.join(self._package_path, file_relative_path)
-
- def _parse_file_in_blocks(self, file_relative_path):
- file_path = self._get_package_file_full_path(file_relative_path)
- with open(file_path) as f:
- blocks = f.read().split("\n\n")
- parsed_blocks = map(yaml.safe_load, blocks)
- return [block for block in parsed_blocks if block is not None]
-
- def _get_package_file_manifest_data(self, file_relative_path):
- for file_data in self._manifest_data:
- if file_data.get(_MANIFEST_FILE_PATH_FIELD, "") == file_relative_path:
- return file_data
+ def __init__(self, package_path=""):
+ super().__init__(package_path)
- error_msg = (
- "Error parsing {} manifest data: file not found on manifest file".format(
- file_relative_path
- )
+ def generate_manifest_data_from_descriptor(self):
+ descriptor_path = os.path.join(
+ self._package_path, self.get_descriptor_location()
)
- raise SOL004PackageException(error_msg)
-
- def get_package_file_hash_digest_from_manifest(self, file_relative_path):
- """Returns the hash digest of a file inside this package as specified on the manifest file."""
- file_manifest_data = self._get_package_file_manifest_data(file_relative_path)
- try:
- return file_manifest_data[_MANIFEST_FILE_HASH_DIGEST_FIELD]
- except Exception as e:
- raise SOL004PackageException(
- "Error parsing {} hash digest: {}".format(file_relative_path, e)
+ with open(descriptor_path, "r") as descriptor:
+ try:
+ vnfd_data = yaml.safe_load(descriptor)["vnfd"]
+ except yaml.YAMLError as e:
+ print("Error reading descriptor {}: {}".format(descriptor_path, e))
+ return
+
+ self._manifest_metadata = {}
+ self._manifest_metadata[self._MANIFEST_VNFD_ID] = vnfd_data.get(
+ "id", "default-id"
)
-
- def get_package_file_hash_algorithm_from_manifest(self, file_relative_path):
- """Returns the hash algorithm of a file inside this package as specified on the manifest file."""
- file_manifest_data = self._get_package_file_manifest_data(file_relative_path)
- try:
- return file_manifest_data[_MANIFEST_FILE_HASH_ALGORITHM_FIELD]
- except Exception as e:
- raise SOL004PackageException(
- "Error parsing {} hash digest: {}".format(file_relative_path, e)
+ self._manifest_metadata[self._MANIFEST_VNFD_PRODUCT_NAME] = vnfd_data.get(
+ "product-name", "default-product-name"
)
-
- @staticmethod
- def _get_hash_function_from_hash_algorithm(hash_algorithm):
- function_to_algorithm = {"SHA-256": hashlib.sha256, "SHA-512": hashlib.sha512}
- if hash_algorithm not in function_to_algorithm:
- error_msg = (
- "Error checking hash function: hash algorithm {} not supported".format(
- hash_algorithm
- )
+ self._manifest_metadata[self._MANIFEST_VNFD_PROVIDER_ID] = vnfd_data.get(
+ "provider", "OSM"
)
- raise SOL004PackageException(error_msg)
- return function_to_algorithm[hash_algorithm]
-
- def _calculate_file_hash(self, file_relative_path, hash_algorithm):
- file_path = self._get_package_file_full_path(file_relative_path)
- hash_function = self._get_hash_function_from_hash_algorithm(hash_algorithm)
- try:
- with open(file_path, "rb") as f:
- return hash_function(f.read()).hexdigest()
- except Exception as e:
- raise SOL004PackageException(
- "Error hashing {}: {}".format(file_relative_path, e)
+ self._manifest_metadata[
+ self._MANIFEST_VNFD_SOFTWARE_VERSION
+ ] = vnfd_data.get("version", "1.0")
+ self._manifest_metadata[self._MANIFEST_VNFD_PACKAGE_VERSION] = "1.0.0"
+ self._manifest_metadata[self._MANIFEST_VNFD_RELEASE_DATE_TIME] = (
+ datetime.datetime.now().astimezone().isoformat()
)
-
- def validate_package_file_hash(self, file_relative_path):
- """Validates the integrity of a file using the hash algorithm and digest on the package manifest."""
- hash_algorithm = self.get_package_file_hash_algorithm_from_manifest(
- file_relative_path
- )
- file_hash = self._calculate_file_hash(file_relative_path, hash_algorithm)
- expected_file_hash = self.get_package_file_hash_digest_from_manifest(
- file_relative_path
- )
- if file_hash != expected_file_hash:
- error_msg = "Error validating {} hash: calculated hash {} is different than manifest hash {}"
- raise SOL004PackageException(
- error_msg.format(file_relative_path, file_hash, expected_file_hash)
- )
-
- def validate_package_hashes(self):
- """Validates the integrity of all files listed on the package manifest."""
- for file_data in self._manifest_data:
- if _MANIFEST_FILE_PATH_FIELD in file_data:
- file_relative_path = file_data[_MANIFEST_FILE_PATH_FIELD]
- self.validate_package_file_hash(file_relative_path)
-
- def get_descriptor_location(self):
- """Returns this package descriptor location as a relative path from the package root."""
- for tosca_meta in self._package_metadata:
- if _METADATA_DESCRIPTOR_FIELD in tosca_meta:
- return tosca_meta[_METADATA_DESCRIPTOR_FIELD]
-
- error_msg = "Error: no {} entry found on {}".format(
- _METADATA_DESCRIPTOR_FIELD, _METADATA_FILE_PATH
- )
- raise SOL004PackageException(error_msg)
+ self._manifest_metadata[
+ self._MANIFEST_VNFD_COMPATIBLE_SPECIFICATION_VERSIONS
+ ] = "2.7.1"
+ self._manifest_metadata[self._MANIFEST_VNFM_INFO] = "OSM"