| # -*- coding: utf-8 -*- |
| |
| # Copyright 2021 Whitestack, LLC |
| # ************************************************************* |
| # |
| # This file is part of OSM common repository. |
| # All Rights Reserved to Whitestack, LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. You may obtain |
| # a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations |
| # under the License. |
| # |
| # For those usages not covered by the Apache License, Version 2.0 please |
| # contact: fbravo@whitestack.com or agarcia@whitestack.com |
| ## |
| import hashlib |
| import os |
| |
| import yaml |
| |
| |
| class SOLPackageException(Exception): |
| pass |
| |
| |
| class SOLPackage: |
| _METADATA_FILE_PATH = "TOSCA-Metadata/TOSCA.meta" |
| _METADATA_DESCRIPTOR_FIELD = "Entry-Definitions" |
| _METADATA_MANIFEST_FIELD = "ETSI-Entry-Manifest" |
| _METADATA_CHANGELOG_FIELD = "ETSI-Entry-Change-Log" |
| _METADATA_LICENSES_FIELD = "ETSI-Entry-Licenses" |
| _METADATA_DEFAULT_CHANGELOG_PATH = "ChangeLog.txt" |
| _METADATA_DEFAULT_LICENSES_PATH = "Licenses" |
| _MANIFEST_FILE_PATH_FIELD = "Source" |
| _MANIFEST_FILE_HASH_ALGORITHM_FIELD = "Algorithm" |
| _MANIFEST_FILE_HASH_DIGEST_FIELD = "Hash" |
| |
| _MANIFEST_ALL_FIELDS = [] |
| |
| def __init__(self, package_path=""): |
| self._package_path = package_path |
| |
| self._package_metadata = self._parse_package_metadata() |
| |
| try: |
| self._manifest_data = self._parse_manifest_data() |
| except Exception: |
| self._manifest_data = None |
| |
| try: |
| self._manifest_metadata = self._parse_manifest_metadata() |
| except Exception: |
| self._manifest_metadata = None |
| |
| def _parse_package_metadata(self): |
| try: |
| return self._parse_package_metadata_with_metadata_dir() |
| except FileNotFoundError: |
| return self._parse_package_metadata_without_metadata_dir() |
| |
| def _parse_package_metadata_with_metadata_dir(self): |
| try: |
| return self._parse_file_in_blocks(self._METADATA_FILE_PATH) |
| except FileNotFoundError as e: |
| raise e |
| except (Exception, OSError) as e: |
| raise SOLPackageException( |
| "Error parsing {}: {}".format(self._METADATA_FILE_PATH, e) |
| ) |
| |
| def _parse_package_metadata_without_metadata_dir(self): |
| package_root_files = {f for f in os.listdir(self._package_path)} |
| package_root_yamls = [ |
| f for f in package_root_files if f.endswith(".yml") or f.endswith(".yaml") |
| ] |
| if len(package_root_yamls) != 1: |
| error_msg = "Error parsing package metadata: there should be exactly 1 descriptor YAML, found {}" |
| raise SOLPackageException(error_msg.format(len(package_root_yamls))) |
| |
| base_manifest = [ |
| { |
| SOLPackage._METADATA_DESCRIPTOR_FIELD: package_root_yamls[0], |
| SOLPackage._METADATA_MANIFEST_FIELD: "{}.mf".format( |
| os.path.splitext(package_root_yamls[0])[0] |
| ), |
| SOLPackage._METADATA_CHANGELOG_FIELD: SOLPackage._METADATA_DEFAULT_CHANGELOG_PATH, |
| SOLPackage._METADATA_LICENSES_FIELD: SOLPackage._METADATA_DEFAULT_LICENSES_PATH, |
| } |
| ] |
| |
| return base_manifest |
| |
| def _parse_manifest_data(self): |
| manifest_path = None |
| for tosca_meta in self._package_metadata: |
| if SOLPackage._METADATA_MANIFEST_FIELD in tosca_meta: |
| manifest_path = tosca_meta[SOLPackage._METADATA_MANIFEST_FIELD] |
| break |
| else: |
| error_msg = "Error parsing {}: no {} field on path".format( |
| self._METADATA_FILE_PATH, self._METADATA_MANIFEST_FIELD |
| ) |
| raise SOLPackageException(error_msg) |
| |
| try: |
| return self._parse_file_in_blocks(manifest_path) |
| |
| except (Exception, OSError) as e: |
| raise SOLPackageException("Error parsing {}: {}".format(manifest_path, e)) |
| |
| def _parse_manifest_metadata(self): |
| try: |
| base_manifest = {} |
| manifest_file = os.open( |
| os.path.join( |
| self._package_path, base_manifest[self._METADATA_MANIFEST_FIELD] |
| ), |
| "rw", |
| ) |
| for line in manifest_file: |
| fields_in_line = line.split(":", maxsplit=1) |
| fields_in_line[0] = fields_in_line[0].strip() |
| fields_in_line[1] = fields_in_line[1].strip() |
| if fields_in_line[0] in self._MANIFEST_ALL_FIELDS: |
| base_manifest[fields_in_line[0]] = fields_in_line[1] |
| return base_manifest |
| except (Exception, OSError) as e: |
| raise SOLPackageException( |
| "Error parsing {}: {}".format( |
| base_manifest[SOLPackage._METADATA_MANIFEST_FIELD], e |
| ) |
| ) |
| |
| def _get_package_file_full_path(self, file_relative_path): |
| return os.path.join(self._package_path, file_relative_path) |
| |
| def _parse_file_in_blocks(self, file_relative_path): |
| file_path = self._get_package_file_full_path(file_relative_path) |
| with open(file_path) as f: |
| blocks = f.read().split("\n\n") |
| parsed_blocks = map(yaml.safe_load, blocks) |
| return [block for block in parsed_blocks if block is not None] |
| |
| def _get_package_file_manifest_data(self, file_relative_path): |
| for file_data in self._manifest_data: |
| if ( |
| file_data.get(SOLPackage._MANIFEST_FILE_PATH_FIELD, "") |
| == file_relative_path |
| ): |
| return file_data |
| |
| error_msg = ( |
| "Error parsing {} manifest data: file not found on manifest file".format( |
| file_relative_path |
| ) |
| ) |
| raise SOLPackageException(error_msg) |
| |
| def get_package_file_hash_digest_from_manifest(self, file_relative_path): |
| """Returns the hash digest of a file inside this package as specified on the manifest file.""" |
| file_manifest_data = self._get_package_file_manifest_data(file_relative_path) |
| try: |
| return file_manifest_data[SOLPackage._MANIFEST_FILE_HASH_DIGEST_FIELD] |
| except Exception as e: |
| raise SOLPackageException( |
| "Error parsing {} hash digest: {}".format(file_relative_path, e) |
| ) |
| |
| def get_package_file_hash_algorithm_from_manifest(self, file_relative_path): |
| """Returns the hash algorithm of a file inside this package as specified on the manifest file.""" |
| file_manifest_data = self._get_package_file_manifest_data(file_relative_path) |
| try: |
| return file_manifest_data[SOLPackage._MANIFEST_FILE_HASH_ALGORITHM_FIELD] |
| except Exception as e: |
| raise SOLPackageException( |
| "Error parsing {} hash digest: {}".format(file_relative_path, e) |
| ) |
| |
| @staticmethod |
| def _get_hash_function_from_hash_algorithm(hash_algorithm): |
| function_to_algorithm = {"SHA-256": hashlib.sha256, "SHA-512": hashlib.sha512} |
| if hash_algorithm not in function_to_algorithm: |
| error_msg = ( |
| "Error checking hash function: hash algorithm {} not supported".format( |
| hash_algorithm |
| ) |
| ) |
| raise SOLPackageException(error_msg) |
| return function_to_algorithm[hash_algorithm] |
| |
| def _calculate_file_hash(self, file_relative_path, hash_algorithm): |
| file_path = self._get_package_file_full_path(file_relative_path) |
| hash_function = self._get_hash_function_from_hash_algorithm(hash_algorithm) |
| try: |
| with open(file_path, "rb") as f: |
| return hash_function(f.read()).hexdigest() |
| except Exception as e: |
| raise SOLPackageException( |
| "Error hashing {}: {}".format(file_relative_path, e) |
| ) |
| |
| def validate_package_file_hash(self, file_relative_path): |
| """Validates the integrity of a file using the hash algorithm and digest on the package manifest.""" |
| hash_algorithm = self.get_package_file_hash_algorithm_from_manifest( |
| file_relative_path |
| ) |
| file_hash = self._calculate_file_hash(file_relative_path, hash_algorithm) |
| expected_file_hash = self.get_package_file_hash_digest_from_manifest( |
| file_relative_path |
| ) |
| if file_hash != expected_file_hash: |
| error_msg = "Error validating {} hash: calculated hash {} is different than manifest hash {}" |
| raise SOLPackageException( |
| error_msg.format(file_relative_path, file_hash, expected_file_hash) |
| ) |
| |
| def validate_package_hashes(self): |
| """Validates the integrity of all files listed on the package manifest.""" |
| for file_data in self._manifest_data: |
| if SOLPackage._MANIFEST_FILE_PATH_FIELD in file_data: |
| file_relative_path = file_data[SOLPackage._MANIFEST_FILE_PATH_FIELD] |
| self.validate_package_file_hash(file_relative_path) |
| |
| def create_or_update_metadata_file(self): |
| """ |
| Creates or updates the metadata file with the hashes calculated for each one of the package's files |
| """ |
| if not self._manifest_metadata: |
| self.generate_manifest_data_from_descriptor() |
| |
| self.write_manifest_data_into_file() |
| |
| def generate_manifest_data_from_descriptor(self): |
| pass |
| |
| def write_manifest_data_into_file(self): |
| with open(self.get_manifest_location(), "w") as metadata_file: |
| # Write manifest metadata |
| for metadata_entry in self._manifest_metadata: |
| metadata_file.write( |
| "{}: {}\n".format( |
| metadata_entry, self._manifest_metadata[metadata_entry] |
| ) |
| ) |
| |
| # Write package's files hashes |
| file_hashes = {} |
| for root, dirs, files in os.walk(self._package_path): |
| for a_file in files: |
| file_path = os.path.join(root, a_file) |
| file_relative_path = file_path[len(self._package_path) :] |
| if file_relative_path.startswith("/"): |
| file_relative_path = file_relative_path[1:] |
| file_hashes[file_relative_path] = self._calculate_file_hash( |
| file_relative_path, "SHA-512" |
| ) |
| |
| for file, hash in file_hashes.items(): |
| file_block = "Source: {}\nAlgorithm: SHA-512\nHash: {}\n\n".format( |
| file, hash |
| ) |
| metadata_file.write(file_block) |
| |
| def get_descriptor_location(self): |
| """Returns this package descriptor location as a relative path from the package root.""" |
| for tosca_meta in self._package_metadata: |
| if SOLPackage._METADATA_DESCRIPTOR_FIELD in tosca_meta: |
| return tosca_meta[SOLPackage._METADATA_DESCRIPTOR_FIELD] |
| |
| error_msg = "Error: no {} entry found on {}".format( |
| SOLPackage._METADATA_DESCRIPTOR_FIELD, SOLPackage._METADATA_FILE_PATH |
| ) |
| raise SOLPackageException(error_msg) |
| |
| def get_manifest_location(self): |
| """Return the VNF/NS manifest location as a relative path from the package root.""" |
| for tosca_meta in self._package_metadata: |
| if SOLPackage._METADATA_MANIFEST_FIELD in tosca_meta: |
| return tosca_meta[SOLPackage._METADATA_MANIFEST_FIELD] |
| |
| raise SOLPackageException("No manifest file defined for this package") |