blob: e6b40b450d58d46bc63c25da2e96662a13cab03a [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright 2020 Whitestack, LLC
# *************************************************************
#
# This file is part of OSM common repository.
# All Rights Reserved to Whitestack, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# For those usages not covered by the Apache License, Version 2.0 please
# contact: agarcia@whitestack.com
##
"""Python module for interacting with ETSI GS NFV-SOL004 compliant packages
This module provides a SOL004Package class for validating and interacting with
ETSI SOL004 packages. A valid SOL004 package may have its files arranged according
to one of the following two structures:
SOL004 with metadata directory SOL004 without metadata directory
native_charm_vnf/ native_charm_vnf/
├── TOSCA-Metadata ├── native_charm_vnfd.mf
│ └── TOSCA.meta ├── native_charm_vnfd.yaml
├── manifest.mf ├── ChangeLog.txt
├── Definitions ├── Licenses
│ └── native_charm_vnfd.yaml │ └── license.lic
├── Files ├── Files
│ ├── icons │ └── icons
│ │ └── osm.png │ └── osm.png
│ ├── Licenses └── Scripts
│ │ └── license.lic ├── cloud_init
│ └── changelog.txt │ └── cloud-config.txt
└── Scripts └── charms
├── cloud_init └── simple
│ └── cloud-config.txt ├── config.yaml
└── charms ├── hooks
└── simple │ ├── install
├── config.yaml ...
├── hooks │
│ ├── install └── src
... └── charm.py
└── src
└── charm.py
"""
import yaml
import os
import hashlib
_METADATA_FILE_PATH = "TOSCA-Metadata/TOSCA.meta"
_METADATA_DESCRIPTOR_FIELD = "Entry-Definitions"
_METADATA_MANIFEST_FIELD = "ETSI-Entry-Manifest"
_METADATA_CHANGELOG_FIELD = "ETSI-Entry-Change-Log"
_METADATA_LICENSES_FIELD = "ETSI-Entry-Licenses"
_METADATA_DEFAULT_CHANGELOG_PATH = "ChangeLog.txt"
_METADATA_DEFAULT_LICENSES_PATH = "Licenses"
_MANIFEST_FILE_PATH_FIELD = "Source"
_MANIFEST_FILE_HASH_ALGORITHM_FIELD = "Algorithm"
_MANIFEST_FILE_HASH_DIGEST_FIELD = "Hash"
class SOL004PackageException(Exception):
pass
class SOL004Package:
def __init__(self, package_path=""):
self._package_path = package_path
self._package_metadata = self._parse_package_metadata()
self._manifest_data = self._parse_manifest_data()
def _parse_package_metadata(self):
try:
return self._parse_package_metadata_with_metadata_dir()
except FileNotFoundError:
return self._parse_package_metadata_without_metadata_dir()
def _parse_package_metadata_with_metadata_dir(self):
try:
return self._parse_file_in_blocks(_METADATA_FILE_PATH)
except FileNotFoundError as e:
raise e
except (Exception, OSError) as e:
raise SOL004PackageException(
"Error parsing {}: {}".format(_METADATA_FILE_PATH, e)
)
def _parse_package_metadata_without_metadata_dir(self):
package_root_files = {f for f in os.listdir(self._package_path)}
package_root_yamls = [
f for f in package_root_files if f.endswith(".yml") or f.endswith(".yaml")
]
if len(package_root_yamls) != 1:
error_msg = "Error parsing package metadata: there should be exactly 1 descriptor YAML, found {}"
raise SOL004PackageException(error_msg.format(len(package_root_yamls)))
# TODO: Parse extra metadata from descriptor YAML?
return [
{
_METADATA_DESCRIPTOR_FIELD: package_root_yamls[0],
_METADATA_MANIFEST_FIELD: "{}.mf".format(
os.path.splitext(package_root_yamls[0])[0]
),
_METADATA_CHANGELOG_FIELD: _METADATA_DEFAULT_CHANGELOG_PATH,
_METADATA_LICENSES_FIELD: _METADATA_DEFAULT_LICENSES_PATH,
}
]
def _parse_manifest_data(self):
manifest_path = None
for tosca_meta in self._package_metadata:
if _METADATA_MANIFEST_FIELD in tosca_meta:
manifest_path = tosca_meta[_METADATA_MANIFEST_FIELD]
break
else:
error_msg = "Error parsing {}: no {} field on path".format(
_METADATA_FILE_PATH, _METADATA_MANIFEST_FIELD
)
raise SOL004PackageException(error_msg)
try:
return self._parse_file_in_blocks(manifest_path)
except (Exception, OSError) as e:
raise SOL004PackageException(
"Error parsing {}: {}".format(manifest_path, e)
)
def _get_package_file_full_path(self, file_relative_path):
return os.path.join(self._package_path, file_relative_path)
def _parse_file_in_blocks(self, file_relative_path):
file_path = self._get_package_file_full_path(file_relative_path)
with open(file_path) as f:
blocks = f.read().split("\n\n")
parsed_blocks = map(yaml.safe_load, blocks)
return [block for block in parsed_blocks if block is not None]
def _get_package_file_manifest_data(self, file_relative_path):
for file_data in self._manifest_data:
if file_data.get(_MANIFEST_FILE_PATH_FIELD, "") == file_relative_path:
return file_data
error_msg = (
"Error parsing {} manifest data: file not found on manifest file".format(
file_relative_path
)
)
raise SOL004PackageException(error_msg)
def get_package_file_hash_digest_from_manifest(self, file_relative_path):
"""Returns the hash digest of a file inside this package as specified on the manifest file."""
file_manifest_data = self._get_package_file_manifest_data(file_relative_path)
try:
return file_manifest_data[_MANIFEST_FILE_HASH_DIGEST_FIELD]
except Exception as e:
raise SOL004PackageException(
"Error parsing {} hash digest: {}".format(file_relative_path, e)
)
def get_package_file_hash_algorithm_from_manifest(self, file_relative_path):
"""Returns the hash algorithm of a file inside this package as specified on the manifest file."""
file_manifest_data = self._get_package_file_manifest_data(file_relative_path)
try:
return file_manifest_data[_MANIFEST_FILE_HASH_ALGORITHM_FIELD]
except Exception as e:
raise SOL004PackageException(
"Error parsing {} hash digest: {}".format(file_relative_path, e)
)
@staticmethod
def _get_hash_function_from_hash_algorithm(hash_algorithm):
function_to_algorithm = {"SHA-256": hashlib.sha256, "SHA-512": hashlib.sha512}
if hash_algorithm not in function_to_algorithm:
error_msg = (
"Error checking hash function: hash algorithm {} not supported".format(
hash_algorithm
)
)
raise SOL004PackageException(error_msg)
return function_to_algorithm[hash_algorithm]
def _calculate_file_hash(self, file_relative_path, hash_algorithm):
file_path = self._get_package_file_full_path(file_relative_path)
hash_function = self._get_hash_function_from_hash_algorithm(hash_algorithm)
try:
with open(file_path, "rb") as f:
return hash_function(f.read()).hexdigest()
except Exception as e:
raise SOL004PackageException(
"Error hashing {}: {}".format(file_relative_path, e)
)
def validate_package_file_hash(self, file_relative_path):
"""Validates the integrity of a file using the hash algorithm and digest on the package manifest."""
hash_algorithm = self.get_package_file_hash_algorithm_from_manifest(
file_relative_path
)
file_hash = self._calculate_file_hash(file_relative_path, hash_algorithm)
expected_file_hash = self.get_package_file_hash_digest_from_manifest(
file_relative_path
)
if file_hash != expected_file_hash:
error_msg = "Error validating {} hash: calculated hash {} is different than manifest hash {}"
raise SOL004PackageException(
error_msg.format(file_relative_path, file_hash, expected_file_hash)
)
def validate_package_hashes(self):
"""Validates the integrity of all files listed on the package manifest."""
for file_data in self._manifest_data:
if _MANIFEST_FILE_PATH_FIELD in file_data:
file_relative_path = file_data[_MANIFEST_FILE_PATH_FIELD]
self.validate_package_file_hash(file_relative_path)
def get_descriptor_location(self):
"""Returns this package descriptor location as a relative path from the package root."""
for tosca_meta in self._package_metadata:
if _METADATA_DESCRIPTOR_FIELD in tosca_meta:
return tosca_meta[_METADATA_DESCRIPTOR_FIELD]
error_msg = "Error: no {} entry found on {}".format(
_METADATA_DESCRIPTOR_FIELD, _METADATA_FILE_PATH
)
raise SOL004PackageException(error_msg)