X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_common%2Fsol_package.py;fp=osm_common%2Fsol_package.py;h=e336cd5b7e6b66a30d4a9b38a0c5c2efbf04ef8c;hb=fc5a6f81fe314d76d5af7fda83c2890e12faf615;hp=0000000000000000000000000000000000000000;hpb=65d6c32f5c0235fdb15d184c3f7ae9166780c641;p=osm%2Fcommon.git diff --git a/osm_common/sol_package.py b/osm_common/sol_package.py new file mode 100644 index 0000000..e336cd5 --- /dev/null +++ b/osm_common/sol_package.py @@ -0,0 +1,289 @@ +# -*- coding: utf-8 -*- + +# Copyright 2021 Whitestack, LLC +# ************************************************************* +# +# This file is part of OSM common repository. +# All Rights Reserved to Whitestack, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# For those usages not covered by the Apache License, Version 2.0 please +# contact: fbravo@whitestack.com or agarcia@whitestack.com +## + +import os +import yaml +import hashlib + + +class SOLPackageException(Exception): + pass + + +class SOLPackage: + _METADATA_FILE_PATH = "TOSCA-Metadata/TOSCA.meta" + _METADATA_DESCRIPTOR_FIELD = "Entry-Definitions" + _METADATA_MANIFEST_FIELD = "ETSI-Entry-Manifest" + _METADATA_CHANGELOG_FIELD = "ETSI-Entry-Change-Log" + _METADATA_LICENSES_FIELD = "ETSI-Entry-Licenses" + _METADATA_DEFAULT_CHANGELOG_PATH = "ChangeLog.txt" + _METADATA_DEFAULT_LICENSES_PATH = "Licenses" + _MANIFEST_FILE_PATH_FIELD = "Source" + _MANIFEST_FILE_HASH_ALGORITHM_FIELD = "Algorithm" + _MANIFEST_FILE_HASH_DIGEST_FIELD = "Hash" + + _MANIFEST_ALL_FIELDS = [] + + def __init__(self, package_path=""): + self._package_path = package_path + + self._package_metadata = self._parse_package_metadata() + + try: + self._manifest_data = self._parse_manifest_data() + except Exception: + self._manifest_data = None + + try: + self._manifest_metadata = self._parse_manifest_metadata() + except Exception: + self._manifest_metadata = None + + def _parse_package_metadata(self): + try: + return self._parse_package_metadata_with_metadata_dir() + except FileNotFoundError: + return self._parse_package_metadata_without_metadata_dir() + + def _parse_package_metadata_with_metadata_dir(self): + try: + return self._parse_file_in_blocks(self._METADATA_FILE_PATH) + except FileNotFoundError as e: + raise e + except (Exception, OSError) as e: + raise SOLPackageException( + "Error parsing {}: {}".format(self._METADATA_FILE_PATH, e) + ) + + def _parse_package_metadata_without_metadata_dir(self): + package_root_files = {f for f in os.listdir(self._package_path)} + package_root_yamls = [ + f for f in package_root_files if f.endswith(".yml") or f.endswith(".yaml") + ] + if len(package_root_yamls) != 1: + error_msg = "Error parsing package metadata: there should be exactly 1 descriptor YAML, found {}" + raise SOLPackageException(error_msg.format(len(package_root_yamls))) + + base_manifest = [ + { + SOLPackage._METADATA_DESCRIPTOR_FIELD: package_root_yamls[0], + SOLPackage._METADATA_MANIFEST_FIELD: "{}.mf".format( + os.path.splitext(package_root_yamls[0])[0] + ), + SOLPackage._METADATA_CHANGELOG_FIELD: SOLPackage._METADATA_DEFAULT_CHANGELOG_PATH, + SOLPackage._METADATA_LICENSES_FIELD: SOLPackage._METADATA_DEFAULT_LICENSES_PATH, + } + ] + + return base_manifest + + def _parse_manifest_data(self): + manifest_path = None + for tosca_meta in self._package_metadata: + if SOLPackage._METADATA_MANIFEST_FIELD in tosca_meta: + manifest_path = tosca_meta[SOLPackage._METADATA_MANIFEST_FIELD] + break + else: + error_msg = "Error parsing {}: no {} field on path".format( + self._METADATA_FILE_PATH, self._METADATA_MANIFEST_FIELD + ) + raise SOLPackageException(error_msg) + + try: + return self._parse_file_in_blocks(manifest_path) + + except (Exception, OSError) as e: + raise SOLPackageException("Error parsing {}: {}".format(manifest_path, e)) + + def _parse_manifest_metadata(self): + try: + base_manifest = {} + manifest_file = os.open( + os.path.join( + self._package_path, base_manifest[self._METADATA_MANIFEST_FIELD] + ), + "rw", + ) + for line in manifest_file: + fields_in_line = line.split(":", maxsplit=1) + fields_in_line[0] = fields_in_line[0].strip() + fields_in_line[1] = fields_in_line[1].strip() + if fields_in_line[0] in self._MANIFEST_ALL_FIELDS: + base_manifest[fields_in_line[0]] = fields_in_line[1] + return base_manifest + except (Exception, OSError) as e: + raise SOLPackageException( + "Error parsing {}: {}".format( + base_manifest[SOLPackage._METADATA_MANIFEST_FIELD], e + ) + ) + + def _get_package_file_full_path(self, file_relative_path): + return os.path.join(self._package_path, file_relative_path) + + def _parse_file_in_blocks(self, file_relative_path): + file_path = self._get_package_file_full_path(file_relative_path) + with open(file_path) as f: + blocks = f.read().split("\n\n") + parsed_blocks = map(yaml.safe_load, blocks) + return [block for block in parsed_blocks if block is not None] + + def _get_package_file_manifest_data(self, file_relative_path): + for file_data in self._manifest_data: + if ( + file_data.get(SOLPackage._MANIFEST_FILE_PATH_FIELD, "") + == file_relative_path + ): + return file_data + + error_msg = ( + "Error parsing {} manifest data: file not found on manifest file".format( + file_relative_path + ) + ) + raise SOLPackageException(error_msg) + + def get_package_file_hash_digest_from_manifest(self, file_relative_path): + """Returns the hash digest of a file inside this package as specified on the manifest file.""" + file_manifest_data = self._get_package_file_manifest_data(file_relative_path) + try: + return file_manifest_data[SOLPackage._MANIFEST_FILE_HASH_DIGEST_FIELD] + except Exception as e: + raise SOLPackageException( + "Error parsing {} hash digest: {}".format(file_relative_path, e) + ) + + def get_package_file_hash_algorithm_from_manifest(self, file_relative_path): + """Returns the hash algorithm of a file inside this package as specified on the manifest file.""" + file_manifest_data = self._get_package_file_manifest_data(file_relative_path) + try: + return file_manifest_data[SOLPackage._MANIFEST_FILE_HASH_ALGORITHM_FIELD] + except Exception as e: + raise SOLPackageException( + "Error parsing {} hash digest: {}".format(file_relative_path, e) + ) + + @staticmethod + def _get_hash_function_from_hash_algorithm(hash_algorithm): + function_to_algorithm = {"SHA-256": hashlib.sha256, "SHA-512": hashlib.sha512} + if hash_algorithm not in function_to_algorithm: + error_msg = ( + "Error checking hash function: hash algorithm {} not supported".format( + hash_algorithm + ) + ) + raise SOLPackageException(error_msg) + return function_to_algorithm[hash_algorithm] + + def _calculate_file_hash(self, file_relative_path, hash_algorithm): + file_path = self._get_package_file_full_path(file_relative_path) + hash_function = self._get_hash_function_from_hash_algorithm(hash_algorithm) + try: + with open(file_path, "rb") as f: + return hash_function(f.read()).hexdigest() + except Exception as e: + raise SOLPackageException( + "Error hashing {}: {}".format(file_relative_path, e) + ) + + def validate_package_file_hash(self, file_relative_path): + """Validates the integrity of a file using the hash algorithm and digest on the package manifest.""" + hash_algorithm = self.get_package_file_hash_algorithm_from_manifest( + file_relative_path + ) + file_hash = self._calculate_file_hash(file_relative_path, hash_algorithm) + expected_file_hash = self.get_package_file_hash_digest_from_manifest( + file_relative_path + ) + if file_hash != expected_file_hash: + error_msg = "Error validating {} hash: calculated hash {} is different than manifest hash {}" + raise SOLPackageException( + error_msg.format(file_relative_path, file_hash, expected_file_hash) + ) + + def validate_package_hashes(self): + """Validates the integrity of all files listed on the package manifest.""" + for file_data in self._manifest_data: + if SOLPackage._MANIFEST_FILE_PATH_FIELD in file_data: + file_relative_path = file_data[SOLPackage._MANIFEST_FILE_PATH_FIELD] + self.validate_package_file_hash(file_relative_path) + + def create_or_update_metadata_file(self): + """ + Creates or updates the metadata file with the hashes calculated for each one of the package's files + """ + if not self._manifest_metadata: + self.generate_manifest_data_from_descriptor() + + self.write_manifest_data_into_file() + + def generate_manifest_data_from_descriptor(self): + pass + + def write_manifest_data_into_file(self): + with open(self.get_manifest_location(), "w") as metadata_file: + # Write manifest metadata + for metadata_entry in self._manifest_metadata: + metadata_file.write( + "{}: {}\n".format( + metadata_entry, self._manifest_metadata[metadata_entry] + ) + ) + + # Write package's files hashes + file_hashes = {} + for root, dirs, files in os.walk(self._package_path): + for a_file in files: + file_path = os.path.join(root, a_file) + file_relative_path = file_path[len(self._package_path) :] + if file_relative_path.startswith("/"): + file_relative_path = file_relative_path[1:] + file_hashes[file_relative_path] = self._calculate_file_hash( + file_relative_path, "SHA-512" + ) + + for file, hash in file_hashes.items(): + file_block = "Source: {}\nAlgorithm: SHA-512\nHash: {}\n\n".format( + file, hash + ) + metadata_file.write(file_block) + + def get_descriptor_location(self): + """Returns this package descriptor location as a relative path from the package root.""" + for tosca_meta in self._package_metadata: + if SOLPackage._METADATA_DESCRIPTOR_FIELD in tosca_meta: + return tosca_meta[SOLPackage._METADATA_DESCRIPTOR_FIELD] + + error_msg = "Error: no {} entry found on {}".format( + SOLPackage._METADATA_DESCRIPTOR_FIELD, SOLPackage._METADATA_FILE_PATH + ) + raise SOLPackageException(error_msg) + + def get_manifest_location(self): + """Return the VNF/NS manifest location as a relative path from the package root.""" + for tosca_meta in self._package_metadata: + if SOLPackage._METADATA_MANIFEST_FIELD in tosca_meta: + return tosca_meta[SOLPackage._METADATA_MANIFEST_FIELD] + + raise SOLPackageException("No manifest file defined for this package")