| # -*- coding: utf-8 -*- |
| |
| # Copyright 2020 Whitestack, LLC |
| # ************************************************************* |
| # |
| # This file is part of OSM common repository. |
| # All Rights Reserved to Whitestack, LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. You may obtain |
| # a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations |
| # under the License. |
| # |
| # For those usages not covered by the Apache License, Version 2.0 please |
| # contact: agarcia@whitestack.com |
| ## |
| |
| """Python module for interacting with ETSI GS NFV-SOL004 compliant packages |
| |
| This module provides a SOL004Package class for validating and interacting with |
| ETSI SOL004 packages. A valid SOL004 package may have its files arranged according |
| to one of the following two structures: |
| |
| SOL004 with metadata directory SOL004 without metadata directory |
| |
| native_charm_vnf/ native_charm_vnf/ |
| ├── TOSCA-Metadata ├── native_charm_vnfd.mf |
| │ └── TOSCA.meta ├── native_charm_vnfd.yaml |
| ├── manifest.mf ├── ChangeLog.txt |
| ├── Definitions ├── Licenses |
| │ └── native_charm_vnfd.yaml │ └── license.lic |
| ├── Files ├── Files |
| │ ├── icons │ └── icons |
| │ │ └── osm.png │ └── osm.png |
| │ ├── Licenses └── Scripts |
| │ │ └── license.lic ├── cloud_init |
| │ └── changelog.txt │ └── cloud-config.txt |
| └── Scripts └── charms |
| ├── cloud_init └── simple |
| │ └── cloud-config.txt ├── config.yaml |
| └── charms ├── hooks |
| └── simple │ ├── install |
| ├── config.yaml ... |
| ├── hooks │ |
| │ ├── install └── src |
| ... └── charm.py |
| └── src |
| └── charm.py |
| """ |
| |
| import yaml |
| import os |
| import hashlib |
| |
| |
| _METADATA_FILE_PATH = "TOSCA-Metadata/TOSCA.meta" |
| _METADATA_DESCRIPTOR_FIELD = "Entry-Definitions" |
| _METADATA_MANIFEST_FIELD = "ETSI-Entry-Manifest" |
| _METADATA_CHANGELOG_FIELD = "ETSI-Entry-Change-Log" |
| _METADATA_LICENSES_FIELD = "ETSI-Entry-Licenses" |
| _METADATA_DEFAULT_CHANGELOG_PATH = "ChangeLog.txt" |
| _METADATA_DEFAULT_LICENSES_PATH = "Licenses" |
| _MANIFEST_FILE_PATH_FIELD = "Source" |
| _MANIFEST_FILE_HASH_ALGORITHM_FIELD = "Algorithm" |
| _MANIFEST_FILE_HASH_DIGEST_FIELD = "Hash" |
| |
| |
| class SOL004PackageException(Exception): |
| pass |
| |
| |
| class SOL004Package: |
| def __init__(self, package_path=""): |
| self._package_path = package_path |
| self._package_metadata = self._parse_package_metadata() |
| self._manifest_data = self._parse_manifest_data() |
| |
| def _parse_package_metadata(self): |
| try: |
| return self._parse_package_metadata_with_metadata_dir() |
| except FileNotFoundError: |
| return self._parse_package_metadata_without_metadata_dir() |
| |
| def _parse_package_metadata_with_metadata_dir(self): |
| try: |
| return self._parse_file_in_blocks(_METADATA_FILE_PATH) |
| except FileNotFoundError as e: |
| raise e |
| except (Exception, OSError) as e: |
| raise SOL004PackageException( |
| "Error parsing {}: {}".format(_METADATA_FILE_PATH, e) |
| ) |
| |
| def _parse_package_metadata_without_metadata_dir(self): |
| package_root_files = {f for f in os.listdir(self._package_path)} |
| package_root_yamls = [ |
| f for f in package_root_files if f.endswith(".yml") or f.endswith(".yaml") |
| ] |
| if len(package_root_yamls) != 1: |
| error_msg = "Error parsing package metadata: there should be exactly 1 descriptor YAML, found {}" |
| raise SOL004PackageException(error_msg.format(len(package_root_yamls))) |
| # TODO: Parse extra metadata from descriptor YAML? |
| return [ |
| { |
| _METADATA_DESCRIPTOR_FIELD: package_root_yamls[0], |
| _METADATA_MANIFEST_FIELD: "{}.mf".format( |
| os.path.splitext(package_root_yamls[0])[0] |
| ), |
| _METADATA_CHANGELOG_FIELD: _METADATA_DEFAULT_CHANGELOG_PATH, |
| _METADATA_LICENSES_FIELD: _METADATA_DEFAULT_LICENSES_PATH, |
| } |
| ] |
| |
| def _parse_manifest_data(self): |
| manifest_path = None |
| for tosca_meta in self._package_metadata: |
| if _METADATA_MANIFEST_FIELD in tosca_meta: |
| manifest_path = tosca_meta[_METADATA_MANIFEST_FIELD] |
| break |
| else: |
| error_msg = "Error parsing {}: no {} field on path".format( |
| _METADATA_FILE_PATH, _METADATA_MANIFEST_FIELD |
| ) |
| raise SOL004PackageException(error_msg) |
| |
| try: |
| return self._parse_file_in_blocks(manifest_path) |
| except (Exception, OSError) as e: |
| raise SOL004PackageException( |
| "Error parsing {}: {}".format(manifest_path, e) |
| ) |
| |
| def _get_package_file_full_path(self, file_relative_path): |
| return os.path.join(self._package_path, file_relative_path) |
| |
| def _parse_file_in_blocks(self, file_relative_path): |
| file_path = self._get_package_file_full_path(file_relative_path) |
| with open(file_path) as f: |
| blocks = f.read().split("\n\n") |
| parsed_blocks = map(yaml.safe_load, blocks) |
| return [block for block in parsed_blocks if block is not None] |
| |
| def _get_package_file_manifest_data(self, file_relative_path): |
| for file_data in self._manifest_data: |
| if file_data.get(_MANIFEST_FILE_PATH_FIELD, "") == file_relative_path: |
| return file_data |
| |
| error_msg = ( |
| "Error parsing {} manifest data: file not found on manifest file".format( |
| file_relative_path |
| ) |
| ) |
| raise SOL004PackageException(error_msg) |
| |
| def get_package_file_hash_digest_from_manifest(self, file_relative_path): |
| """Returns the hash digest of a file inside this package as specified on the manifest file.""" |
| file_manifest_data = self._get_package_file_manifest_data(file_relative_path) |
| try: |
| return file_manifest_data[_MANIFEST_FILE_HASH_DIGEST_FIELD] |
| except Exception as e: |
| raise SOL004PackageException( |
| "Error parsing {} hash digest: {}".format(file_relative_path, e) |
| ) |
| |
| def get_package_file_hash_algorithm_from_manifest(self, file_relative_path): |
| """Returns the hash algorithm of a file inside this package as specified on the manifest file.""" |
| file_manifest_data = self._get_package_file_manifest_data(file_relative_path) |
| try: |
| return file_manifest_data[_MANIFEST_FILE_HASH_ALGORITHM_FIELD] |
| except Exception as e: |
| raise SOL004PackageException( |
| "Error parsing {} hash digest: {}".format(file_relative_path, e) |
| ) |
| |
| @staticmethod |
| def _get_hash_function_from_hash_algorithm(hash_algorithm): |
| function_to_algorithm = {"SHA-256": hashlib.sha256, "SHA-512": hashlib.sha512} |
| if hash_algorithm not in function_to_algorithm: |
| error_msg = ( |
| "Error checking hash function: hash algorithm {} not supported".format( |
| hash_algorithm |
| ) |
| ) |
| raise SOL004PackageException(error_msg) |
| return function_to_algorithm[hash_algorithm] |
| |
| def _calculate_file_hash(self, file_relative_path, hash_algorithm): |
| file_path = self._get_package_file_full_path(file_relative_path) |
| hash_function = self._get_hash_function_from_hash_algorithm(hash_algorithm) |
| try: |
| with open(file_path, "rb") as f: |
| return hash_function(f.read()).hexdigest() |
| except Exception as e: |
| raise SOL004PackageException( |
| "Error hashing {}: {}".format(file_relative_path, e) |
| ) |
| |
| def validate_package_file_hash(self, file_relative_path): |
| """Validates the integrity of a file using the hash algorithm and digest on the package manifest.""" |
| hash_algorithm = self.get_package_file_hash_algorithm_from_manifest( |
| file_relative_path |
| ) |
| file_hash = self._calculate_file_hash(file_relative_path, hash_algorithm) |
| expected_file_hash = self.get_package_file_hash_digest_from_manifest( |
| file_relative_path |
| ) |
| if file_hash != expected_file_hash: |
| error_msg = "Error validating {} hash: calculated hash {} is different than manifest hash {}" |
| raise SOL004PackageException( |
| error_msg.format(file_relative_path, file_hash, expected_file_hash) |
| ) |
| |
| def validate_package_hashes(self): |
| """Validates the integrity of all files listed on the package manifest.""" |
| for file_data in self._manifest_data: |
| if _MANIFEST_FILE_PATH_FIELD in file_data: |
| file_relative_path = file_data[_MANIFEST_FILE_PATH_FIELD] |
| self.validate_package_file_hash(file_relative_path) |
| |
| def get_descriptor_location(self): |
| """Returns this package descriptor location as a relative path from the package root.""" |
| for tosca_meta in self._package_metadata: |
| if _METADATA_DESCRIPTOR_FIELD in tosca_meta: |
| return tosca_meta[_METADATA_DESCRIPTOR_FIELD] |
| |
| error_msg = "Error: no {} entry found on {}".format( |
| _METADATA_DESCRIPTOR_FIELD, _METADATA_FILE_PATH |
| ) |
| raise SOL004PackageException(error_msg) |