X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2Fcommon.git;a=blobdiff_plain;f=osm_common%2Fsol004_package.py;fp=osm_common%2Fsol004_package.py;h=7d402f5ab49d9804720a60f95051dbe4c2f1fd16;hp=0000000000000000000000000000000000000000;hb=0839503b0c0fb61f256f387bfdf07419e673bbe3;hpb=277c931191194490c5595307c22b721d94294254 diff --git a/osm_common/sol004_package.py b/osm_common/sol004_package.py new file mode 100644 index 0000000..7d402f5 --- /dev/null +++ b/osm_common/sol004_package.py @@ -0,0 +1,205 @@ +# -*- coding: utf-8 -*- + +# Copyright 2020 Whitestack, LLC +# ************************************************************* +# +# This file is part of OSM common repository. +# All Rights Reserved to Whitestack, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# For those usages not covered by the Apache License, Version 2.0 please +# contact: agarcia@whitestack.com +## + +"""Python module for interacting with ETSI GS NFV-SOL004 compliant packages + +This module provides a SOL004Package class for validating and interacting with +ETSI SOL004 packages. A valid SOL004 package may have its files arranged according +to one of the following two structures: + +SOL004 with metadata directory SOL004 without metadata directory + +native_charm_vnf/ native_charm_vnf/ +├── TOSCA-Metadata ├── native_charm_vnfd.mf +│ └── TOSCA.meta ├── native_charm_vnfd.yaml +├── manifest.mf ├── ChangeLog.txt +├── Definitions ├── Licenses +│ └── native_charm_vnfd.yaml │ └── license.lic +├── Files ├── Files +│ ├── icons │ └── icons +│ │ └── osm.png │ └── osm.png +│ ├── Licenses └── Scripts +│ │ └── license.lic ├── cloud_init +│ └── changelog.txt │ └── cloud-config.txt +└── Scripts └── charms + ├── cloud_init └── simple + │ └── cloud-config.txt ├── config.yaml + └── charms ├── hooks + └── simple │ ├── install + ├── config.yaml ... + ├── hooks │ + │ ├── install └── src + ... └── charm.py + └── src + └── charm.py +""" + +import yaml +import os +import hashlib + + +_METADATA_FILE_PATH = 'TOSCA-Metadata/TOSCA.meta' +_METADATA_DESCRIPTOR_FIELD = 'Entry-Definitions' +_METADATA_MANIFEST_FIELD = 'ETSI-Entry-Manifest' +_METADATA_CHANGELOG_FIELD = 'ETSI-Entry-Change-Log' +_METADATA_LICENSES_FIELD = 'ETSI-Entry-Licenses' +_METADATA_DEFAULT_CHANGELOG_PATH = 'ChangeLog.txt' +_METADATA_DEFAULT_LICENSES_PATH = 'Licenses' +_MANIFEST_FILE_PATH_FIELD = 'Source' +_MANIFEST_FILE_HASH_ALGORITHM_FIELD = 'Algorithm' +_MANIFEST_FILE_HASH_DIGEST_FIELD = 'Hash' + + +class SOL004PackageException(Exception): + pass + + +class SOL004Package: + def __init__(self, package_path=''): + self._package_path = package_path + self._package_metadata = self._parse_package_metadata() + self._manifest_data = self._parse_manifest_data() + + def _parse_package_metadata(self): + try: + return self._parse_package_metadata_with_metadata_dir() + except FileNotFoundError: + return self._parse_package_metadata_without_metadata_dir() + + def _parse_package_metadata_with_metadata_dir(self): + try: + return self._parse_file_in_blocks(_METADATA_FILE_PATH) + except FileNotFoundError as e: + raise e + except (Exception, OSError) as e: + raise SOL004PackageException('Error parsing {}: {}'.format(_METADATA_FILE_PATH, e)) + + def _parse_package_metadata_without_metadata_dir(self): + package_root_files = {f for f in os.listdir(self._package_path)} + package_root_yamls = [f for f in package_root_files if f.endswith('.yml') or f.endswith('.yaml')] + if len(package_root_yamls) != 1: + error_msg = 'Error parsing package metadata: there should be exactly 1 descriptor YAML, found {}' + raise SOL004PackageException(error_msg.format(len(package_root_yamls))) + # TODO: Parse extra metadata from descriptor YAML? + return [{ + _METADATA_DESCRIPTOR_FIELD: package_root_yamls[0], + _METADATA_MANIFEST_FIELD: '{}.mf'.format(os.path.splitext(package_root_yamls[0])[0]), + _METADATA_CHANGELOG_FIELD: _METADATA_DEFAULT_CHANGELOG_PATH, + _METADATA_LICENSES_FIELD: _METADATA_DEFAULT_LICENSES_PATH + }] + + def _parse_manifest_data(self): + manifest_path = None + for tosca_meta in self._package_metadata: + if _METADATA_MANIFEST_FIELD in tosca_meta: + manifest_path = tosca_meta[_METADATA_MANIFEST_FIELD] + break + else: + error_msg = 'Error parsing {}: no {} field on path'.format(_METADATA_FILE_PATH, _METADATA_MANIFEST_FIELD) + raise SOL004PackageException(error_msg) + + try: + return self._parse_file_in_blocks(manifest_path) + except (Exception, OSError) as e: + raise SOL004PackageException('Error parsing {}: {}'.format(manifest_path, e)) + + def _get_package_file_full_path(self, file_relative_path): + return os.path.join(self._package_path, file_relative_path) + + def _parse_file_in_blocks(self, file_relative_path): + file_path = self._get_package_file_full_path(file_relative_path) + with open(file_path) as f: + blocks = f.read().split('\n\n') + parsed_blocks = map(yaml.safe_load, blocks) + return [block for block in parsed_blocks if block is not None] + + def _get_package_file_manifest_data(self, file_relative_path): + for file_data in self._manifest_data: + if file_data.get(_MANIFEST_FILE_PATH_FIELD, '') == file_relative_path: + return file_data + + error_msg = 'Error parsing {} manifest data: file not found on manifest file'.format(file_relative_path) + raise SOL004PackageException(error_msg) + + def get_package_file_hash_digest_from_manifest(self, file_relative_path): + """Returns the hash digest of a file inside this package as specified on the manifest file.""" + file_manifest_data = self._get_package_file_manifest_data(file_relative_path) + try: + return file_manifest_data[_MANIFEST_FILE_HASH_DIGEST_FIELD] + except Exception as e: + raise SOL004PackageException('Error parsing {} hash digest: {}'.format(file_relative_path, e)) + + def get_package_file_hash_algorithm_from_manifest(self, file_relative_path): + """Returns the hash algorithm of a file inside this package as specified on the manifest file.""" + file_manifest_data = self._get_package_file_manifest_data(file_relative_path) + try: + return file_manifest_data[_MANIFEST_FILE_HASH_ALGORITHM_FIELD] + except Exception as e: + raise SOL004PackageException('Error parsing {} hash digest: {}'.format(file_relative_path, e)) + + @staticmethod + def _get_hash_function_from_hash_algorithm(hash_algorithm): + function_to_algorithm = { + 'SHA-256': hashlib.sha256, + 'SHA-512': hashlib.sha512 + } + if hash_algorithm not in function_to_algorithm: + error_msg = 'Error checking hash function: hash algorithm {} not supported'.format(hash_algorithm) + raise SOL004PackageException(error_msg) + return function_to_algorithm[hash_algorithm] + + def _calculate_file_hash(self, file_relative_path, hash_algorithm): + file_path = self._get_package_file_full_path(file_relative_path) + hash_function = self._get_hash_function_from_hash_algorithm(hash_algorithm) + try: + with open(file_path, "rb") as f: + return hash_function(f.read()).hexdigest() + except Exception as e: + raise SOL004PackageException('Error hashing {}: {}'.format(file_relative_path, e)) + + def validate_package_file_hash(self, file_relative_path): + """Validates the integrity of a file using the hash algorithm and digest on the package manifest.""" + hash_algorithm = self.get_package_file_hash_algorithm_from_manifest(file_relative_path) + file_hash = self._calculate_file_hash(file_relative_path, hash_algorithm) + expected_file_hash = self.get_package_file_hash_digest_from_manifest(file_relative_path) + if file_hash != expected_file_hash: + error_msg = 'Error validating {} hash: calculated hash {} is different than manifest hash {}' + raise SOL004PackageException(error_msg.format(file_relative_path, file_hash, expected_file_hash)) + + def validate_package_hashes(self): + """Validates the integrity of all files listed on the package manifest.""" + for file_data in self._manifest_data: + if _MANIFEST_FILE_PATH_FIELD in file_data: + file_relative_path = file_data[_MANIFEST_FILE_PATH_FIELD] + self.validate_package_file_hash(file_relative_path) + + def get_descriptor_location(self): + """Returns this package descriptor location as a relative path from the package root.""" + for tosca_meta in self._package_metadata: + if _METADATA_DESCRIPTOR_FIELD in tosca_meta: + return tosca_meta[_METADATA_DESCRIPTOR_FIELD] + + error_msg = 'Error: no {} entry found on {}'.format(_METADATA_DESCRIPTOR_FIELD, _METADATA_FILE_PATH) + raise SOL004PackageException(error_msg)