feature: sol004 and sol007
[osm/common.git] / osm_common / sol_package.py
diff --git a/osm_common/sol_package.py b/osm_common/sol_package.py
new file mode 100644 (file)
index 0000000..e336cd5
--- /dev/null
@@ -0,0 +1,289 @@
+# -*- coding: utf-8 -*-
+
+# Copyright 2021 Whitestack, LLC
+# *************************************************************
+#
+# This file is part of OSM common repository.
+# All Rights Reserved to Whitestack, LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: fbravo@whitestack.com or agarcia@whitestack.com
+##
+
+import os
+import yaml
+import hashlib
+
+
+class SOLPackageException(Exception):
+    pass
+
+
+class SOLPackage:
+    _METADATA_FILE_PATH = "TOSCA-Metadata/TOSCA.meta"
+    _METADATA_DESCRIPTOR_FIELD = "Entry-Definitions"
+    _METADATA_MANIFEST_FIELD = "ETSI-Entry-Manifest"
+    _METADATA_CHANGELOG_FIELD = "ETSI-Entry-Change-Log"
+    _METADATA_LICENSES_FIELD = "ETSI-Entry-Licenses"
+    _METADATA_DEFAULT_CHANGELOG_PATH = "ChangeLog.txt"
+    _METADATA_DEFAULT_LICENSES_PATH = "Licenses"
+    _MANIFEST_FILE_PATH_FIELD = "Source"
+    _MANIFEST_FILE_HASH_ALGORITHM_FIELD = "Algorithm"
+    _MANIFEST_FILE_HASH_DIGEST_FIELD = "Hash"
+
+    _MANIFEST_ALL_FIELDS = []
+
+    def __init__(self, package_path=""):
+        self._package_path = package_path
+
+        self._package_metadata = self._parse_package_metadata()
+
+        try:
+            self._manifest_data = self._parse_manifest_data()
+        except Exception:
+            self._manifest_data = None
+
+        try:
+            self._manifest_metadata = self._parse_manifest_metadata()
+        except Exception:
+            self._manifest_metadata = None
+
+    def _parse_package_metadata(self):
+        try:
+            return self._parse_package_metadata_with_metadata_dir()
+        except FileNotFoundError:
+            return self._parse_package_metadata_without_metadata_dir()
+
+    def _parse_package_metadata_with_metadata_dir(self):
+        try:
+            return self._parse_file_in_blocks(self._METADATA_FILE_PATH)
+        except FileNotFoundError as e:
+            raise e
+        except (Exception, OSError) as e:
+            raise SOLPackageException(
+                "Error parsing {}: {}".format(self._METADATA_FILE_PATH, e)
+            )
+
+    def _parse_package_metadata_without_metadata_dir(self):
+        package_root_files = {f for f in os.listdir(self._package_path)}
+        package_root_yamls = [
+            f for f in package_root_files if f.endswith(".yml") or f.endswith(".yaml")
+        ]
+        if len(package_root_yamls) != 1:
+            error_msg = "Error parsing package metadata: there should be exactly 1 descriptor YAML, found {}"
+            raise SOLPackageException(error_msg.format(len(package_root_yamls)))
+
+        base_manifest = [
+            {
+                SOLPackage._METADATA_DESCRIPTOR_FIELD: package_root_yamls[0],
+                SOLPackage._METADATA_MANIFEST_FIELD: "{}.mf".format(
+                    os.path.splitext(package_root_yamls[0])[0]
+                ),
+                SOLPackage._METADATA_CHANGELOG_FIELD: SOLPackage._METADATA_DEFAULT_CHANGELOG_PATH,
+                SOLPackage._METADATA_LICENSES_FIELD: SOLPackage._METADATA_DEFAULT_LICENSES_PATH,
+            }
+        ]
+
+        return base_manifest
+
+    def _parse_manifest_data(self):
+        manifest_path = None
+        for tosca_meta in self._package_metadata:
+            if SOLPackage._METADATA_MANIFEST_FIELD in tosca_meta:
+                manifest_path = tosca_meta[SOLPackage._METADATA_MANIFEST_FIELD]
+                break
+        else:
+            error_msg = "Error parsing {}: no {} field on path".format(
+                self._METADATA_FILE_PATH, self._METADATA_MANIFEST_FIELD
+            )
+            raise SOLPackageException(error_msg)
+
+        try:
+            return self._parse_file_in_blocks(manifest_path)
+
+        except (Exception, OSError) as e:
+            raise SOLPackageException("Error parsing {}: {}".format(manifest_path, e))
+
+    def _parse_manifest_metadata(self):
+        try:
+            base_manifest = {}
+            manifest_file = os.open(
+                os.path.join(
+                    self._package_path, base_manifest[self._METADATA_MANIFEST_FIELD]
+                ),
+                "rw",
+            )
+            for line in manifest_file:
+                fields_in_line = line.split(":", maxsplit=1)
+                fields_in_line[0] = fields_in_line[0].strip()
+                fields_in_line[1] = fields_in_line[1].strip()
+                if fields_in_line[0] in self._MANIFEST_ALL_FIELDS:
+                    base_manifest[fields_in_line[0]] = fields_in_line[1]
+            return base_manifest
+        except (Exception, OSError) as e:
+            raise SOLPackageException(
+                "Error parsing {}: {}".format(
+                    base_manifest[SOLPackage._METADATA_MANIFEST_FIELD], e
+                )
+            )
+
+    def _get_package_file_full_path(self, file_relative_path):
+        return os.path.join(self._package_path, file_relative_path)
+
+    def _parse_file_in_blocks(self, file_relative_path):
+        file_path = self._get_package_file_full_path(file_relative_path)
+        with open(file_path) as f:
+            blocks = f.read().split("\n\n")
+        parsed_blocks = map(yaml.safe_load, blocks)
+        return [block for block in parsed_blocks if block is not None]
+
+    def _get_package_file_manifest_data(self, file_relative_path):
+        for file_data in self._manifest_data:
+            if (
+                file_data.get(SOLPackage._MANIFEST_FILE_PATH_FIELD, "")
+                == file_relative_path
+            ):
+                return file_data
+
+        error_msg = (
+            "Error parsing {} manifest data: file not found on manifest file".format(
+                file_relative_path
+            )
+        )
+        raise SOLPackageException(error_msg)
+
+    def get_package_file_hash_digest_from_manifest(self, file_relative_path):
+        """Returns the hash digest of a file inside this package as specified on the manifest file."""
+        file_manifest_data = self._get_package_file_manifest_data(file_relative_path)
+        try:
+            return file_manifest_data[SOLPackage._MANIFEST_FILE_HASH_DIGEST_FIELD]
+        except Exception as e:
+            raise SOLPackageException(
+                "Error parsing {} hash digest: {}".format(file_relative_path, e)
+            )
+
+    def get_package_file_hash_algorithm_from_manifest(self, file_relative_path):
+        """Returns the hash algorithm of a file inside this package as specified on the manifest file."""
+        file_manifest_data = self._get_package_file_manifest_data(file_relative_path)
+        try:
+            return file_manifest_data[SOLPackage._MANIFEST_FILE_HASH_ALGORITHM_FIELD]
+        except Exception as e:
+            raise SOLPackageException(
+                "Error parsing {} hash digest: {}".format(file_relative_path, e)
+            )
+
+    @staticmethod
+    def _get_hash_function_from_hash_algorithm(hash_algorithm):
+        function_to_algorithm = {"SHA-256": hashlib.sha256, "SHA-512": hashlib.sha512}
+        if hash_algorithm not in function_to_algorithm:
+            error_msg = (
+                "Error checking hash function: hash algorithm {} not supported".format(
+                    hash_algorithm
+                )
+            )
+            raise SOLPackageException(error_msg)
+        return function_to_algorithm[hash_algorithm]
+
+    def _calculate_file_hash(self, file_relative_path, hash_algorithm):
+        file_path = self._get_package_file_full_path(file_relative_path)
+        hash_function = self._get_hash_function_from_hash_algorithm(hash_algorithm)
+        try:
+            with open(file_path, "rb") as f:
+                return hash_function(f.read()).hexdigest()
+        except Exception as e:
+            raise SOLPackageException(
+                "Error hashing {}: {}".format(file_relative_path, e)
+            )
+
+    def validate_package_file_hash(self, file_relative_path):
+        """Validates the integrity of a file using the hash algorithm and digest on the package manifest."""
+        hash_algorithm = self.get_package_file_hash_algorithm_from_manifest(
+            file_relative_path
+        )
+        file_hash = self._calculate_file_hash(file_relative_path, hash_algorithm)
+        expected_file_hash = self.get_package_file_hash_digest_from_manifest(
+            file_relative_path
+        )
+        if file_hash != expected_file_hash:
+            error_msg = "Error validating {} hash: calculated hash {} is different than manifest hash {}"
+            raise SOLPackageException(
+                error_msg.format(file_relative_path, file_hash, expected_file_hash)
+            )
+
+    def validate_package_hashes(self):
+        """Validates the integrity of all files listed on the package manifest."""
+        for file_data in self._manifest_data:
+            if SOLPackage._MANIFEST_FILE_PATH_FIELD in file_data:
+                file_relative_path = file_data[SOLPackage._MANIFEST_FILE_PATH_FIELD]
+                self.validate_package_file_hash(file_relative_path)
+
+    def create_or_update_metadata_file(self):
+        """
+        Creates or updates the metadata file with the hashes calculated for each one of the package's files
+        """
+        if not self._manifest_metadata:
+            self.generate_manifest_data_from_descriptor()
+
+        self.write_manifest_data_into_file()
+
+    def generate_manifest_data_from_descriptor(self):
+        pass
+
+    def write_manifest_data_into_file(self):
+        with open(self.get_manifest_location(), "w") as metadata_file:
+            # Write manifest metadata
+            for metadata_entry in self._manifest_metadata:
+                metadata_file.write(
+                    "{}: {}\n".format(
+                        metadata_entry, self._manifest_metadata[metadata_entry]
+                    )
+                )
+
+            # Write package's files hashes
+            file_hashes = {}
+            for root, dirs, files in os.walk(self._package_path):
+                for a_file in files:
+                    file_path = os.path.join(root, a_file)
+                    file_relative_path = file_path[len(self._package_path) :]
+                    if file_relative_path.startswith("/"):
+                        file_relative_path = file_relative_path[1:]
+                    file_hashes[file_relative_path] = self._calculate_file_hash(
+                        file_relative_path, "SHA-512"
+                    )
+
+            for file, hash in file_hashes.items():
+                file_block = "Source: {}\nAlgorithm: SHA-512\nHash: {}\n\n".format(
+                    file, hash
+                )
+                metadata_file.write(file_block)
+
+    def get_descriptor_location(self):
+        """Returns this package descriptor location as a relative path from the package root."""
+        for tosca_meta in self._package_metadata:
+            if SOLPackage._METADATA_DESCRIPTOR_FIELD in tosca_meta:
+                return tosca_meta[SOLPackage._METADATA_DESCRIPTOR_FIELD]
+
+        error_msg = "Error: no {} entry found on {}".format(
+            SOLPackage._METADATA_DESCRIPTOR_FIELD, SOLPackage._METADATA_FILE_PATH
+        )
+        raise SOLPackageException(error_msg)
+
+    def get_manifest_location(self):
+        """Return the VNF/NS manifest location as a relative path from the package root."""
+        for tosca_meta in self._package_metadata:
+            if SOLPackage._METADATA_MANIFEST_FIELD in tosca_meta:
+                return tosca_meta[SOLPackage._METADATA_MANIFEST_FIELD]
+
+        raise SOLPackageException("No manifest file defined for this package")