Adds SOL004 package validator with package examples and tests
[osm/common.git] / osm_common / sol004_package.py
diff --git a/osm_common/sol004_package.py b/osm_common/sol004_package.py
new file mode 100644 (file)
index 0000000..7d402f5
--- /dev/null
@@ -0,0 +1,205 @@
+# -*- coding: utf-8 -*-
+
+# Copyright 2020 Whitestack, LLC
+# *************************************************************
+#
+# This file is part of OSM common repository.
+# All Rights Reserved to Whitestack, LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: agarcia@whitestack.com
+##
+
+"""Python module for interacting with ETSI GS NFV-SOL004 compliant packages
+
+This module provides a SOL004Package class for validating and interacting with
+ETSI SOL004 packages. A valid SOL004 package may have its files arranged according
+to one of the following two structures:
+
+SOL004 with metadata directory    SOL004 without metadata directory
+
+native_charm_vnf/                 native_charm_vnf/
+├── TOSCA-Metadata                ├── native_charm_vnfd.mf
+│   └── TOSCA.meta                ├── native_charm_vnfd.yaml
+├── manifest.mf                   ├── ChangeLog.txt
+├── Definitions                   ├── Licenses
+│   └── native_charm_vnfd.yaml    │   └── license.lic
+├── Files                         ├── Files
+│   ├── icons                     │   └── icons
+│   │   └── osm.png               │       └── osm.png
+│   ├── Licenses                  └── Scripts
+│   │   └── license.lic               ├── cloud_init
+│   └── changelog.txt                 │   └── cloud-config.txt
+└── Scripts                           └── charms
+    ├── cloud_init                        └── simple
+    │   └── cloud-config.txt                  ├── config.yaml
+    └── charms                                ├── hooks
+        └── simple                            │   ├── install
+            ├── config.yaml                  ...
+            ├── hooks                         │
+            │   ├── install                   └── src
+           ...                                    └── charm.py
+            └── src
+                └── charm.py
+"""
+
+import yaml
+import os
+import hashlib
+
+
+_METADATA_FILE_PATH = 'TOSCA-Metadata/TOSCA.meta'
+_METADATA_DESCRIPTOR_FIELD = 'Entry-Definitions'
+_METADATA_MANIFEST_FIELD = 'ETSI-Entry-Manifest'
+_METADATA_CHANGELOG_FIELD = 'ETSI-Entry-Change-Log'
+_METADATA_LICENSES_FIELD = 'ETSI-Entry-Licenses'
+_METADATA_DEFAULT_CHANGELOG_PATH = 'ChangeLog.txt'
+_METADATA_DEFAULT_LICENSES_PATH = 'Licenses'
+_MANIFEST_FILE_PATH_FIELD = 'Source'
+_MANIFEST_FILE_HASH_ALGORITHM_FIELD = 'Algorithm'
+_MANIFEST_FILE_HASH_DIGEST_FIELD = 'Hash'
+
+
+class SOL004PackageException(Exception):
+    pass
+
+
+class SOL004Package:
+    def __init__(self, package_path=''):
+        self._package_path = package_path
+        self._package_metadata = self._parse_package_metadata()
+        self._manifest_data = self._parse_manifest_data()
+
+    def _parse_package_metadata(self):
+        try:
+            return self._parse_package_metadata_with_metadata_dir()
+        except FileNotFoundError:
+            return self._parse_package_metadata_without_metadata_dir()
+
+    def _parse_package_metadata_with_metadata_dir(self):
+        try:
+            return self._parse_file_in_blocks(_METADATA_FILE_PATH)
+        except FileNotFoundError as e:
+            raise e
+        except (Exception, OSError) as e:
+            raise SOL004PackageException('Error parsing {}: {}'.format(_METADATA_FILE_PATH, e))
+
+    def _parse_package_metadata_without_metadata_dir(self):
+        package_root_files = {f for f in os.listdir(self._package_path)}
+        package_root_yamls = [f for f in package_root_files if f.endswith('.yml') or f.endswith('.yaml')]
+        if len(package_root_yamls) != 1:
+            error_msg = 'Error parsing package metadata: there should be exactly 1 descriptor YAML, found {}'
+            raise SOL004PackageException(error_msg.format(len(package_root_yamls)))
+        # TODO: Parse extra metadata from descriptor YAML?
+        return [{
+            _METADATA_DESCRIPTOR_FIELD: package_root_yamls[0],
+            _METADATA_MANIFEST_FIELD: '{}.mf'.format(os.path.splitext(package_root_yamls[0])[0]),
+            _METADATA_CHANGELOG_FIELD: _METADATA_DEFAULT_CHANGELOG_PATH,
+            _METADATA_LICENSES_FIELD: _METADATA_DEFAULT_LICENSES_PATH
+        }]
+
+    def _parse_manifest_data(self):
+        manifest_path = None
+        for tosca_meta in self._package_metadata:
+            if _METADATA_MANIFEST_FIELD in tosca_meta:
+                manifest_path = tosca_meta[_METADATA_MANIFEST_FIELD]
+                break
+        else:
+            error_msg = 'Error parsing {}: no {} field on path'.format(_METADATA_FILE_PATH, _METADATA_MANIFEST_FIELD)
+            raise SOL004PackageException(error_msg)
+
+        try:
+            return self._parse_file_in_blocks(manifest_path)
+        except (Exception, OSError) as e:
+            raise SOL004PackageException('Error parsing {}: {}'.format(manifest_path, e))
+
+    def _get_package_file_full_path(self, file_relative_path):
+        return os.path.join(self._package_path, file_relative_path)
+
+    def _parse_file_in_blocks(self, file_relative_path):
+        file_path = self._get_package_file_full_path(file_relative_path)
+        with open(file_path) as f:
+            blocks = f.read().split('\n\n')
+        parsed_blocks = map(yaml.safe_load, blocks)
+        return [block for block in parsed_blocks if block is not None]
+
+    def _get_package_file_manifest_data(self, file_relative_path):
+        for file_data in self._manifest_data:
+            if file_data.get(_MANIFEST_FILE_PATH_FIELD, '') == file_relative_path:
+                return file_data
+
+        error_msg = 'Error parsing {} manifest data: file not found on manifest file'.format(file_relative_path)
+        raise SOL004PackageException(error_msg)
+
+    def get_package_file_hash_digest_from_manifest(self, file_relative_path):
+        """Returns the hash digest of a file inside this package as specified on the manifest file."""
+        file_manifest_data = self._get_package_file_manifest_data(file_relative_path)
+        try:
+            return file_manifest_data[_MANIFEST_FILE_HASH_DIGEST_FIELD]
+        except Exception as e:
+            raise SOL004PackageException('Error parsing {} hash digest: {}'.format(file_relative_path, e))
+
+    def get_package_file_hash_algorithm_from_manifest(self, file_relative_path):
+        """Returns the hash algorithm of a file inside this package as specified on the manifest file."""
+        file_manifest_data = self._get_package_file_manifest_data(file_relative_path)
+        try:
+            return file_manifest_data[_MANIFEST_FILE_HASH_ALGORITHM_FIELD]
+        except Exception as e:
+            raise SOL004PackageException('Error parsing {} hash digest: {}'.format(file_relative_path, e))
+
+    @staticmethod
+    def _get_hash_function_from_hash_algorithm(hash_algorithm):
+        function_to_algorithm = {
+            'SHA-256': hashlib.sha256,
+            'SHA-512': hashlib.sha512
+        }
+        if hash_algorithm not in function_to_algorithm:
+            error_msg = 'Error checking hash function: hash algorithm {} not supported'.format(hash_algorithm)
+            raise SOL004PackageException(error_msg)
+        return function_to_algorithm[hash_algorithm]
+
+    def _calculate_file_hash(self, file_relative_path, hash_algorithm):
+        file_path = self._get_package_file_full_path(file_relative_path)
+        hash_function = self._get_hash_function_from_hash_algorithm(hash_algorithm)
+        try:
+            with open(file_path, "rb") as f:
+                return hash_function(f.read()).hexdigest()
+        except Exception as e:
+            raise SOL004PackageException('Error hashing {}: {}'.format(file_relative_path, e))
+
+    def validate_package_file_hash(self, file_relative_path):
+        """Validates the integrity of a file using the hash algorithm and digest on the package manifest."""
+        hash_algorithm = self.get_package_file_hash_algorithm_from_manifest(file_relative_path)
+        file_hash = self._calculate_file_hash(file_relative_path, hash_algorithm)
+        expected_file_hash = self.get_package_file_hash_digest_from_manifest(file_relative_path)
+        if file_hash != expected_file_hash:
+            error_msg = 'Error validating {} hash: calculated hash {} is different than manifest hash {}'
+            raise SOL004PackageException(error_msg.format(file_relative_path, file_hash, expected_file_hash))
+
+    def validate_package_hashes(self):
+        """Validates the integrity of all files listed on the package manifest."""
+        for file_data in self._manifest_data:
+            if _MANIFEST_FILE_PATH_FIELD in file_data:
+                file_relative_path = file_data[_MANIFEST_FILE_PATH_FIELD]
+                self.validate_package_file_hash(file_relative_path)
+
+    def get_descriptor_location(self):
+        """Returns this package descriptor location as a relative path from the package root."""
+        for tosca_meta in self._package_metadata:
+            if _METADATA_DESCRIPTOR_FIELD in tosca_meta:
+                return tosca_meta[_METADATA_DESCRIPTOR_FIELD]
+
+        error_msg = 'Error: no {} entry found on {}'.format(_METADATA_DESCRIPTOR_FIELD, _METADATA_FILE_PATH)
+        raise SOL004PackageException(error_msg)