# limitations under the License.
import os
+import tarfile
+import zipfile
import logging
# import tarfile
f = "/".join(storage)
return os.path.getsize(self.path + f)
- def file_extract(self, tar_object, path):
+ def file_extract(self, compressed_object, path):
"""
extract a tar file
- :param tar_object: object of type tar
+ :param compressed_object: object of type tar or zip
:param path: can be a str or a str list, or a tar object where to extract the tar_object
:return: None
"""
f = self.path + path
else:
f = self.path + "/".join(path)
- tar_object.extractall(path=f)
+
+ if type(compressed_object) is tarfile.TarFile:
+ compressed_object.extractall(path=f)
+ elif (
+ type(compressed_object) is zipfile.ZipFile
+ ): # Just a check to know if this works with both tar and zip
+ compressed_object.extractall(path=f)
def file_open(self, storage, mode):
"""
import logging
import os
import datetime
+import tarfile
+import zipfile
from gridfs import GridFSBucket, errors
from osm_common.fsbase import FsBase, FsException
"Multiple files found", http_code=HTTPStatus.INTERNAL_SERVER_ERROR
)
+ print(requested_file.metadata)
+
# if no special mode is required just check it does exists
if not mode:
return True
return requested_file.length
- def file_extract(self, tar_object, path):
+ def file_extract(self, compressed_object, path):
"""
extract a tar file
- :param tar_object: object of type tar
+ :param compressed_object: object of type tar or zip
:param path: can be a str or a str list, or a tar object where to extract the tar_object
:return: None
"""
f = path if isinstance(path, str) else "/".join(path)
- for member in tar_object.getmembers():
- if member.isfile():
- stream = tar_object.extractfile(member)
- elif member.issym():
- stream = BytesIO(member.linkname.encode("utf-8"))
- else:
- stream = BytesIO()
+ if type(compressed_object) is tarfile.TarFile:
+ for member in compressed_object.getmembers():
+ if member.isfile():
+ stream = compressed_object.extractfile(member)
+ elif member.issym():
+ stream = BytesIO(member.linkname.encode("utf-8"))
+ else:
+ stream = BytesIO()
- if member.isfile():
- file_type = "file"
- elif member.issym():
- file_type = "sym"
- else:
- file_type = "dir"
+ if member.isfile():
+ file_type = "file"
+ elif member.issym():
+ file_type = "sym"
+ else:
+ file_type = "dir"
- metadata = {"type": file_type, "permissions": member.mode}
+ metadata = {"type": file_type, "permissions": member.mode}
- self.fs.upload_from_stream(f + "/" + member.name, stream, metadata=metadata)
+ self.fs.upload_from_stream(
+ f + "/" + member.name, stream, metadata=metadata
+ )
- stream.close()
+ stream.close()
+ elif type(compressed_object) is zipfile.ZipFile:
+ for member in compressed_object.infolist():
+ if member.is_dir():
+ stream = BytesIO()
+ else:
+ stream = compressed_object.read(member)
+
+ if member.is_dir():
+ file_type = "dir"
+ else:
+ file_type = "file"
+
+ metadata = {"type": file_type}
+
+ print("Now uploading...")
+ print(f + "/" + member.filename)
+ self.fs.upload_from_stream(
+ f + "/" + member.filename, stream, metadata=metadata
+ )
+
+ if member.is_dir():
+ stream.close()
def file_open(self, storage, mode):
"""
http_code=HTTPStatus.NOT_FOUND,
)
+ if f.endswith("/"):
+ f = f[:-1]
+
files_cursor = self.fs.find(
{"filename": {"$regex": "^{}/([^/])*".format(f)}}
)
# -*- coding: utf-8 -*-
-# Copyright 2020 Whitestack, LLC
+# Copyright 2021 Whitestack, LLC
# *************************************************************
#
# This file is part of OSM common repository.
# under the License.
#
# For those usages not covered by the Apache License, Version 2.0 please
-# contact: agarcia@whitestack.com
+# contact: agarcia@whitestack.com or fbravo@whitestack.com
##
"""Python module for interacting with ETSI GS NFV-SOL004 compliant packages
"""
import yaml
+import datetime
import os
-import hashlib
-
-
-_METADATA_FILE_PATH = "TOSCA-Metadata/TOSCA.meta"
-_METADATA_DESCRIPTOR_FIELD = "Entry-Definitions"
-_METADATA_MANIFEST_FIELD = "ETSI-Entry-Manifest"
-_METADATA_CHANGELOG_FIELD = "ETSI-Entry-Change-Log"
-_METADATA_LICENSES_FIELD = "ETSI-Entry-Licenses"
-_METADATA_DEFAULT_CHANGELOG_PATH = "ChangeLog.txt"
-_METADATA_DEFAULT_LICENSES_PATH = "Licenses"
-_MANIFEST_FILE_PATH_FIELD = "Source"
-_MANIFEST_FILE_HASH_ALGORITHM_FIELD = "Algorithm"
-_MANIFEST_FILE_HASH_DIGEST_FIELD = "Hash"
+from .sol_package import SOLPackage
class SOL004PackageException(Exception):
pass
-class SOL004Package:
- def __init__(self, package_path=""):
- self._package_path = package_path
- self._package_metadata = self._parse_package_metadata()
- self._manifest_data = self._parse_manifest_data()
-
- def _parse_package_metadata(self):
- try:
- return self._parse_package_metadata_with_metadata_dir()
- except FileNotFoundError:
- return self._parse_package_metadata_without_metadata_dir()
-
- def _parse_package_metadata_with_metadata_dir(self):
- try:
- return self._parse_file_in_blocks(_METADATA_FILE_PATH)
- except FileNotFoundError as e:
- raise e
- except (Exception, OSError) as e:
- raise SOL004PackageException(
- "Error parsing {}: {}".format(_METADATA_FILE_PATH, e)
- )
-
- def _parse_package_metadata_without_metadata_dir(self):
- package_root_files = {f for f in os.listdir(self._package_path)}
- package_root_yamls = [
- f for f in package_root_files if f.endswith(".yml") or f.endswith(".yaml")
- ]
- if len(package_root_yamls) != 1:
- error_msg = "Error parsing package metadata: there should be exactly 1 descriptor YAML, found {}"
- raise SOL004PackageException(error_msg.format(len(package_root_yamls)))
- # TODO: Parse extra metadata from descriptor YAML?
- return [
- {
- _METADATA_DESCRIPTOR_FIELD: package_root_yamls[0],
- _METADATA_MANIFEST_FIELD: "{}.mf".format(
- os.path.splitext(package_root_yamls[0])[0]
- ),
- _METADATA_CHANGELOG_FIELD: _METADATA_DEFAULT_CHANGELOG_PATH,
- _METADATA_LICENSES_FIELD: _METADATA_DEFAULT_LICENSES_PATH,
- }
- ]
-
- def _parse_manifest_data(self):
- manifest_path = None
- for tosca_meta in self._package_metadata:
- if _METADATA_MANIFEST_FIELD in tosca_meta:
- manifest_path = tosca_meta[_METADATA_MANIFEST_FIELD]
- break
- else:
- error_msg = "Error parsing {}: no {} field on path".format(
- _METADATA_FILE_PATH, _METADATA_MANIFEST_FIELD
- )
- raise SOL004PackageException(error_msg)
-
- try:
- return self._parse_file_in_blocks(manifest_path)
- except (Exception, OSError) as e:
- raise SOL004PackageException(
- "Error parsing {}: {}".format(manifest_path, e)
- )
+class SOL004Package(SOLPackage):
+ _MANIFEST_VNFD_ID = "vnfd_id"
+ _MANIFEST_VNFD_PRODUCT_NAME = "vnfd_product_name"
+ _MANIFEST_VNFD_PROVIDER_ID = "vnfd_provider_id"
+ _MANIFEST_VNFD_SOFTWARE_VERSION = "vnfd_software_version"
+ _MANIFEST_VNFD_PACKAGE_VERSION = "vnfd_package_version"
+ _MANIFEST_VNFD_RELEASE_DATE_TIME = "vnfd_release_date_time"
+ _MANIFEST_VNFD_COMPATIBLE_SPECIFICATION_VERSIONS = (
+ "compatible_specification_versions"
+ )
+ _MANIFEST_VNFM_INFO = "vnfm_info"
+
+ _MANIFEST_ALL_FIELDS = [
+ _MANIFEST_VNFD_ID,
+ _MANIFEST_VNFD_PRODUCT_NAME,
+ _MANIFEST_VNFD_PROVIDER_ID,
+ _MANIFEST_VNFD_SOFTWARE_VERSION,
+ _MANIFEST_VNFD_PACKAGE_VERSION,
+ _MANIFEST_VNFD_RELEASE_DATE_TIME,
+ _MANIFEST_VNFD_COMPATIBLE_SPECIFICATION_VERSIONS,
+ _MANIFEST_VNFM_INFO,
+ ]
- def _get_package_file_full_path(self, file_relative_path):
- return os.path.join(self._package_path, file_relative_path)
-
- def _parse_file_in_blocks(self, file_relative_path):
- file_path = self._get_package_file_full_path(file_relative_path)
- with open(file_path) as f:
- blocks = f.read().split("\n\n")
- parsed_blocks = map(yaml.safe_load, blocks)
- return [block for block in parsed_blocks if block is not None]
-
- def _get_package_file_manifest_data(self, file_relative_path):
- for file_data in self._manifest_data:
- if file_data.get(_MANIFEST_FILE_PATH_FIELD, "") == file_relative_path:
- return file_data
+ def __init__(self, package_path=""):
+ super().__init__(package_path)
- error_msg = (
- "Error parsing {} manifest data: file not found on manifest file".format(
- file_relative_path
- )
+ def generate_manifest_data_from_descriptor(self):
+ descriptor_path = os.path.join(
+ self._package_path, self.get_descriptor_location()
)
- raise SOL004PackageException(error_msg)
-
- def get_package_file_hash_digest_from_manifest(self, file_relative_path):
- """Returns the hash digest of a file inside this package as specified on the manifest file."""
- file_manifest_data = self._get_package_file_manifest_data(file_relative_path)
- try:
- return file_manifest_data[_MANIFEST_FILE_HASH_DIGEST_FIELD]
- except Exception as e:
- raise SOL004PackageException(
- "Error parsing {} hash digest: {}".format(file_relative_path, e)
+ with open(descriptor_path, "r") as descriptor:
+ try:
+ vnfd_data = yaml.safe_load(descriptor)["vnfd"]
+ except yaml.YAMLError as e:
+ print("Error reading descriptor {}: {}".format(descriptor_path, e))
+ return
+
+ self._manifest_metadata = {}
+ self._manifest_metadata[self._MANIFEST_VNFD_ID] = vnfd_data.get(
+ "id", "default-id"
)
-
- def get_package_file_hash_algorithm_from_manifest(self, file_relative_path):
- """Returns the hash algorithm of a file inside this package as specified on the manifest file."""
- file_manifest_data = self._get_package_file_manifest_data(file_relative_path)
- try:
- return file_manifest_data[_MANIFEST_FILE_HASH_ALGORITHM_FIELD]
- except Exception as e:
- raise SOL004PackageException(
- "Error parsing {} hash digest: {}".format(file_relative_path, e)
+ self._manifest_metadata[self._MANIFEST_VNFD_PRODUCT_NAME] = vnfd_data.get(
+ "product-name", "default-product-name"
)
-
- @staticmethod
- def _get_hash_function_from_hash_algorithm(hash_algorithm):
- function_to_algorithm = {"SHA-256": hashlib.sha256, "SHA-512": hashlib.sha512}
- if hash_algorithm not in function_to_algorithm:
- error_msg = (
- "Error checking hash function: hash algorithm {} not supported".format(
- hash_algorithm
- )
+ self._manifest_metadata[self._MANIFEST_VNFD_PROVIDER_ID] = vnfd_data.get(
+ "provider", "OSM"
)
- raise SOL004PackageException(error_msg)
- return function_to_algorithm[hash_algorithm]
-
- def _calculate_file_hash(self, file_relative_path, hash_algorithm):
- file_path = self._get_package_file_full_path(file_relative_path)
- hash_function = self._get_hash_function_from_hash_algorithm(hash_algorithm)
- try:
- with open(file_path, "rb") as f:
- return hash_function(f.read()).hexdigest()
- except Exception as e:
- raise SOL004PackageException(
- "Error hashing {}: {}".format(file_relative_path, e)
+ self._manifest_metadata[
+ self._MANIFEST_VNFD_SOFTWARE_VERSION
+ ] = vnfd_data.get("version", "1.0")
+ self._manifest_metadata[self._MANIFEST_VNFD_PACKAGE_VERSION] = "1.0.0"
+ self._manifest_metadata[self._MANIFEST_VNFD_RELEASE_DATE_TIME] = (
+ datetime.datetime.now().astimezone().isoformat()
)
-
- def validate_package_file_hash(self, file_relative_path):
- """Validates the integrity of a file using the hash algorithm and digest on the package manifest."""
- hash_algorithm = self.get_package_file_hash_algorithm_from_manifest(
- file_relative_path
- )
- file_hash = self._calculate_file_hash(file_relative_path, hash_algorithm)
- expected_file_hash = self.get_package_file_hash_digest_from_manifest(
- file_relative_path
- )
- if file_hash != expected_file_hash:
- error_msg = "Error validating {} hash: calculated hash {} is different than manifest hash {}"
- raise SOL004PackageException(
- error_msg.format(file_relative_path, file_hash, expected_file_hash)
- )
-
- def validate_package_hashes(self):
- """Validates the integrity of all files listed on the package manifest."""
- for file_data in self._manifest_data:
- if _MANIFEST_FILE_PATH_FIELD in file_data:
- file_relative_path = file_data[_MANIFEST_FILE_PATH_FIELD]
- self.validate_package_file_hash(file_relative_path)
-
- def get_descriptor_location(self):
- """Returns this package descriptor location as a relative path from the package root."""
- for tosca_meta in self._package_metadata:
- if _METADATA_DESCRIPTOR_FIELD in tosca_meta:
- return tosca_meta[_METADATA_DESCRIPTOR_FIELD]
-
- error_msg = "Error: no {} entry found on {}".format(
- _METADATA_DESCRIPTOR_FIELD, _METADATA_FILE_PATH
- )
- raise SOL004PackageException(error_msg)
+ self._manifest_metadata[
+ self._MANIFEST_VNFD_COMPATIBLE_SPECIFICATION_VERSIONS
+ ] = "2.7.1"
+ self._manifest_metadata[self._MANIFEST_VNFM_INFO] = "OSM"
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Copyright 2021 Whitestack, LLC
+# *************************************************************
+#
+# This file is part of OSM common repository.
+# All Rights Reserved to Whitestack, LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: fbravo@whitestack.com
+##
+
+"""Python module for interacting with ETSI GS NFV-SOL007 compliant packages
+
+This module provides a SOL007Package class for validating and interacting with
+ETSI SOL007 packages. A valid SOL007 package may have its files arranged according
+to one of the following two structures:
+
+SOL007 with metadata directory SOL007 without metadata directory
+
+native_charm_vnf/ native_charm_vnf/
+├── TOSCA-Metadata ├── native_charm_nsd.mf
+│ └── TOSCA.meta ├── native_charm_nsd.yaml
+├── manifest.mf ├── ChangeLog.txt
+├── Definitions ├── Licenses
+│ └── native_charm_nsd.yaml │ └── license.lic
+├── Files ├── Files
+│ ├── icons │ └── icons
+│ │ └── osm.png │ └── osm.png
+│ ├── Licenses └── Scripts
+│ │ └── license.lic ├── cloud_init
+│ └── changelog.txt │ └── cloud-config.txt
+└── Scripts └── charms
+ ├── cloud_init └── simple
+ │ └── cloud-config.txt ├── config.yaml
+ └── charms ├── hooks
+ └── simple │ ├── install
+ ├── config.yaml ...
+ ├── hooks │
+ │ ├── install └── src
+ ... └── charm.py
+ └── src
+ └── charm.py
+"""
+
+import yaml
+import datetime
+import os
+from .sol_package import SOLPackage
+
+
+class SOL007PackageException(Exception):
+ pass
+
+
+class SOL007Package(SOLPackage):
+ _MANIFEST_NSD_INVARIANT_ID = "nsd_invariant_id"
+ _MANIFEST_NSD_NAME = "nsd_name"
+ _MANIFEST_NSD_DESIGNER = "nsd_designer"
+ _MANIFEST_NSD_FILE_STRUCTURE_VERSION = "nsd_file_structure_version"
+ _MANIFEST_NSD_RELEASE_DATE_TIME = "nsd_release_date_time"
+ _MANIFEST_NSD_COMPATIBLE_SPECIFICATION_VERSIONS = (
+ "compatible_specification_versions"
+ )
+
+ _MANIFEST_ALL_FIELDS = [
+ _MANIFEST_NSD_INVARIANT_ID,
+ _MANIFEST_NSD_NAME,
+ _MANIFEST_NSD_DESIGNER,
+ _MANIFEST_NSD_FILE_STRUCTURE_VERSION,
+ _MANIFEST_NSD_RELEASE_DATE_TIME,
+ _MANIFEST_NSD_COMPATIBLE_SPECIFICATION_VERSIONS,
+ ]
+
+ def __init__(self, package_path=""):
+ super().__init__(package_path)
+
+ def generate_manifest_data_from_descriptor(self):
+ descriptor_path = os.path.join(
+ self._package_path, self.get_descriptor_location()
+ )
+ with open(descriptor_path, "r") as descriptor:
+ try:
+ nsd_data = yaml.safe_load(descriptor)["nsd"]
+ except yaml.YAMLError as e:
+ print("Error reading descriptor {}: {}".format(descriptor_path, e))
+ return
+
+ self._manifest_metadata = {}
+ self._manifest_metadata[self._MANIFEST_NSD_INVARIANT_ID] = nsd_data.get(
+ "id", "default-id"
+ )
+ self._manifest_metadata[self._MANIFEST_NSD_NAME] = nsd_data.get(
+ "name", "default-name"
+ )
+ self._manifest_metadata[self._MANIFEST_NSD_DESIGNER] = nsd_data.get(
+ "designer", "OSM"
+ )
+ self._manifest_metadata[
+ self._MANIFEST_NSD_FILE_STRUCTURE_VERSION
+ ] = nsd_data.get("version", "1.0")
+ self._manifest_metadata[self._MANIFEST_NSD_RELEASE_DATE_TIME] = (
+ datetime.datetime.now().astimezone().isoformat()
+ )
+ self._manifest_metadata[
+ self._MANIFEST_NSD_COMPATIBLE_SPECIFICATION_VERSIONS
+ ] = "2.7.1"
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Copyright 2021 Whitestack, LLC
+# *************************************************************
+#
+# This file is part of OSM common repository.
+# All Rights Reserved to Whitestack, LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: fbravo@whitestack.com or agarcia@whitestack.com
+##
+
+import os
+import yaml
+import hashlib
+
+
+class SOLPackageException(Exception):
+ pass
+
+
+class SOLPackage:
+ _METADATA_FILE_PATH = "TOSCA-Metadata/TOSCA.meta"
+ _METADATA_DESCRIPTOR_FIELD = "Entry-Definitions"
+ _METADATA_MANIFEST_FIELD = "ETSI-Entry-Manifest"
+ _METADATA_CHANGELOG_FIELD = "ETSI-Entry-Change-Log"
+ _METADATA_LICENSES_FIELD = "ETSI-Entry-Licenses"
+ _METADATA_DEFAULT_CHANGELOG_PATH = "ChangeLog.txt"
+ _METADATA_DEFAULT_LICENSES_PATH = "Licenses"
+ _MANIFEST_FILE_PATH_FIELD = "Source"
+ _MANIFEST_FILE_HASH_ALGORITHM_FIELD = "Algorithm"
+ _MANIFEST_FILE_HASH_DIGEST_FIELD = "Hash"
+
+ _MANIFEST_ALL_FIELDS = []
+
+ def __init__(self, package_path=""):
+ self._package_path = package_path
+
+ self._package_metadata = self._parse_package_metadata()
+
+ try:
+ self._manifest_data = self._parse_manifest_data()
+ except Exception:
+ self._manifest_data = None
+
+ try:
+ self._manifest_metadata = self._parse_manifest_metadata()
+ except Exception:
+ self._manifest_metadata = None
+
+ def _parse_package_metadata(self):
+ try:
+ return self._parse_package_metadata_with_metadata_dir()
+ except FileNotFoundError:
+ return self._parse_package_metadata_without_metadata_dir()
+
+ def _parse_package_metadata_with_metadata_dir(self):
+ try:
+ return self._parse_file_in_blocks(self._METADATA_FILE_PATH)
+ except FileNotFoundError as e:
+ raise e
+ except (Exception, OSError) as e:
+ raise SOLPackageException(
+ "Error parsing {}: {}".format(self._METADATA_FILE_PATH, e)
+ )
+
+ def _parse_package_metadata_without_metadata_dir(self):
+ package_root_files = {f for f in os.listdir(self._package_path)}
+ package_root_yamls = [
+ f for f in package_root_files if f.endswith(".yml") or f.endswith(".yaml")
+ ]
+ if len(package_root_yamls) != 1:
+ error_msg = "Error parsing package metadata: there should be exactly 1 descriptor YAML, found {}"
+ raise SOLPackageException(error_msg.format(len(package_root_yamls)))
+
+ base_manifest = [
+ {
+ SOLPackage._METADATA_DESCRIPTOR_FIELD: package_root_yamls[0],
+ SOLPackage._METADATA_MANIFEST_FIELD: "{}.mf".format(
+ os.path.splitext(package_root_yamls[0])[0]
+ ),
+ SOLPackage._METADATA_CHANGELOG_FIELD: SOLPackage._METADATA_DEFAULT_CHANGELOG_PATH,
+ SOLPackage._METADATA_LICENSES_FIELD: SOLPackage._METADATA_DEFAULT_LICENSES_PATH,
+ }
+ ]
+
+ return base_manifest
+
+ def _parse_manifest_data(self):
+ manifest_path = None
+ for tosca_meta in self._package_metadata:
+ if SOLPackage._METADATA_MANIFEST_FIELD in tosca_meta:
+ manifest_path = tosca_meta[SOLPackage._METADATA_MANIFEST_FIELD]
+ break
+ else:
+ error_msg = "Error parsing {}: no {} field on path".format(
+ self._METADATA_FILE_PATH, self._METADATA_MANIFEST_FIELD
+ )
+ raise SOLPackageException(error_msg)
+
+ try:
+ return self._parse_file_in_blocks(manifest_path)
+
+ except (Exception, OSError) as e:
+ raise SOLPackageException("Error parsing {}: {}".format(manifest_path, e))
+
+ def _parse_manifest_metadata(self):
+ try:
+ base_manifest = {}
+ manifest_file = os.open(
+ os.path.join(
+ self._package_path, base_manifest[self._METADATA_MANIFEST_FIELD]
+ ),
+ "rw",
+ )
+ for line in manifest_file:
+ fields_in_line = line.split(":", maxsplit=1)
+ fields_in_line[0] = fields_in_line[0].strip()
+ fields_in_line[1] = fields_in_line[1].strip()
+ if fields_in_line[0] in self._MANIFEST_ALL_FIELDS:
+ base_manifest[fields_in_line[0]] = fields_in_line[1]
+ return base_manifest
+ except (Exception, OSError) as e:
+ raise SOLPackageException(
+ "Error parsing {}: {}".format(
+ base_manifest[SOLPackage._METADATA_MANIFEST_FIELD], e
+ )
+ )
+
+ def _get_package_file_full_path(self, file_relative_path):
+ return os.path.join(self._package_path, file_relative_path)
+
+ def _parse_file_in_blocks(self, file_relative_path):
+ file_path = self._get_package_file_full_path(file_relative_path)
+ with open(file_path) as f:
+ blocks = f.read().split("\n\n")
+ parsed_blocks = map(yaml.safe_load, blocks)
+ return [block for block in parsed_blocks if block is not None]
+
+ def _get_package_file_manifest_data(self, file_relative_path):
+ for file_data in self._manifest_data:
+ if (
+ file_data.get(SOLPackage._MANIFEST_FILE_PATH_FIELD, "")
+ == file_relative_path
+ ):
+ return file_data
+
+ error_msg = (
+ "Error parsing {} manifest data: file not found on manifest file".format(
+ file_relative_path
+ )
+ )
+ raise SOLPackageException(error_msg)
+
+ def get_package_file_hash_digest_from_manifest(self, file_relative_path):
+ """Returns the hash digest of a file inside this package as specified on the manifest file."""
+ file_manifest_data = self._get_package_file_manifest_data(file_relative_path)
+ try:
+ return file_manifest_data[SOLPackage._MANIFEST_FILE_HASH_DIGEST_FIELD]
+ except Exception as e:
+ raise SOLPackageException(
+ "Error parsing {} hash digest: {}".format(file_relative_path, e)
+ )
+
+ def get_package_file_hash_algorithm_from_manifest(self, file_relative_path):
+ """Returns the hash algorithm of a file inside this package as specified on the manifest file."""
+ file_manifest_data = self._get_package_file_manifest_data(file_relative_path)
+ try:
+ return file_manifest_data[SOLPackage._MANIFEST_FILE_HASH_ALGORITHM_FIELD]
+ except Exception as e:
+ raise SOLPackageException(
+ "Error parsing {} hash digest: {}".format(file_relative_path, e)
+ )
+
+ @staticmethod
+ def _get_hash_function_from_hash_algorithm(hash_algorithm):
+ function_to_algorithm = {"SHA-256": hashlib.sha256, "SHA-512": hashlib.sha512}
+ if hash_algorithm not in function_to_algorithm:
+ error_msg = (
+ "Error checking hash function: hash algorithm {} not supported".format(
+ hash_algorithm
+ )
+ )
+ raise SOLPackageException(error_msg)
+ return function_to_algorithm[hash_algorithm]
+
+ def _calculate_file_hash(self, file_relative_path, hash_algorithm):
+ file_path = self._get_package_file_full_path(file_relative_path)
+ hash_function = self._get_hash_function_from_hash_algorithm(hash_algorithm)
+ try:
+ with open(file_path, "rb") as f:
+ return hash_function(f.read()).hexdigest()
+ except Exception as e:
+ raise SOLPackageException(
+ "Error hashing {}: {}".format(file_relative_path, e)
+ )
+
+ def validate_package_file_hash(self, file_relative_path):
+ """Validates the integrity of a file using the hash algorithm and digest on the package manifest."""
+ hash_algorithm = self.get_package_file_hash_algorithm_from_manifest(
+ file_relative_path
+ )
+ file_hash = self._calculate_file_hash(file_relative_path, hash_algorithm)
+ expected_file_hash = self.get_package_file_hash_digest_from_manifest(
+ file_relative_path
+ )
+ if file_hash != expected_file_hash:
+ error_msg = "Error validating {} hash: calculated hash {} is different than manifest hash {}"
+ raise SOLPackageException(
+ error_msg.format(file_relative_path, file_hash, expected_file_hash)
+ )
+
+ def validate_package_hashes(self):
+ """Validates the integrity of all files listed on the package manifest."""
+ for file_data in self._manifest_data:
+ if SOLPackage._MANIFEST_FILE_PATH_FIELD in file_data:
+ file_relative_path = file_data[SOLPackage._MANIFEST_FILE_PATH_FIELD]
+ self.validate_package_file_hash(file_relative_path)
+
+ def create_or_update_metadata_file(self):
+ """
+ Creates or updates the metadata file with the hashes calculated for each one of the package's files
+ """
+ if not self._manifest_metadata:
+ self.generate_manifest_data_from_descriptor()
+
+ self.write_manifest_data_into_file()
+
+ def generate_manifest_data_from_descriptor(self):
+ pass
+
+ def write_manifest_data_into_file(self):
+ with open(self.get_manifest_location(), "w") as metadata_file:
+ # Write manifest metadata
+ for metadata_entry in self._manifest_metadata:
+ metadata_file.write(
+ "{}: {}\n".format(
+ metadata_entry, self._manifest_metadata[metadata_entry]
+ )
+ )
+
+ # Write package's files hashes
+ file_hashes = {}
+ for root, dirs, files in os.walk(self._package_path):
+ for a_file in files:
+ file_path = os.path.join(root, a_file)
+ file_relative_path = file_path[len(self._package_path) :]
+ if file_relative_path.startswith("/"):
+ file_relative_path = file_relative_path[1:]
+ file_hashes[file_relative_path] = self._calculate_file_hash(
+ file_relative_path, "SHA-512"
+ )
+
+ for file, hash in file_hashes.items():
+ file_block = "Source: {}\nAlgorithm: SHA-512\nHash: {}\n\n".format(
+ file, hash
+ )
+ metadata_file.write(file_block)
+
+ def get_descriptor_location(self):
+ """Returns this package descriptor location as a relative path from the package root."""
+ for tosca_meta in self._package_metadata:
+ if SOLPackage._METADATA_DESCRIPTOR_FIELD in tosca_meta:
+ return tosca_meta[SOLPackage._METADATA_DESCRIPTOR_FIELD]
+
+ error_msg = "Error: no {} entry found on {}".format(
+ SOLPackage._METADATA_DESCRIPTOR_FIELD, SOLPackage._METADATA_FILE_PATH
+ )
+ raise SOLPackageException(error_msg)
+
+ def get_manifest_location(self):
+ """Return the VNF/NS manifest location as a relative path from the package root."""
+ for tosca_meta in self._package_metadata:
+ if SOLPackage._METADATA_MANIFEST_FIELD in tosca_meta:
+ return tosca_meta[SOLPackage._METADATA_MANIFEST_FIELD]
+
+ raise SOLPackageException("No manifest file defined for this package")
tar = tarfile.open(tar_path, "r")
fs = FsMongo()
fs.fs = FakeFS()
- fs.file_extract(tar_object=tar, path=".")
+ fs.file_extract(compressed_object=tar, path=".")
finally:
os.remove(tar_path)
subprocess.call(["rm", "-rf", "./tmp"])
+++ /dev/null
-# -*- coding: utf-8 -*-
-
-# Copyright 2020 Whitestack, LLC
-# *************************************************************
-#
-# This file is part of OSM common repository.
-# All Rights Reserved to Whitestack, LLC
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-# For those usages not covered by the Apache License, Version 2.0 please
-# contact: agarcia@whitestack.com
-##
-
-from osm_common.sol004_package import SOL004Package, SOL004PackageException
-import unittest
-
-
-class SOL004ValidatorTest(unittest.TestCase):
- def test_get_package_file_hash_algorithm_from_manifest_with_metadata_dir(self):
- package = SOL004Package(
- "osm_common/tests/packages/native_charm_with_metadata_dir_vnf"
- )
- algorithm = package.get_package_file_hash_algorithm_from_manifest(
- "Scripts/charms/simple/src/charm.py"
- )
- self.assertEqual(algorithm, "SHA-256")
-
- def test_get_package_file_hash_algorithm_from_manifest_without_metadata_dir(self):
- package = SOL004Package(
- "osm_common/tests/packages/native_charm_without_metadata_dir_vnf"
- )
- algorithm = package.get_package_file_hash_algorithm_from_manifest(
- "Scripts/charms/simple/src/charm.py"
- )
- self.assertEqual(algorithm, "SHA-256")
-
- def test_get_package_file_hash_algorithm_from_manifest_on_non_existent_file(self):
- package = SOL004Package(
- "osm_common/tests/packages/native_charm_with_metadata_dir_vnf"
- )
- with self.assertRaises(SOL004PackageException):
- package.get_package_file_hash_algorithm_from_manifest("Non/Existing/file")
-
- def test_get_package_file_hash_digest_from_manifest_with_metadata_dir(self):
- package = SOL004Package(
- "osm_common/tests/packages/native_charm_with_metadata_dir_vnf"
- )
- digest = package.get_package_file_hash_digest_from_manifest(
- "Scripts/charms/simple/src/charm.py"
- )
- self.assertEqual(
- digest, "ea72f897a966e6174ed9164fabc3c500df5a2f712eb6b22ab2408afb07d04d14"
- )
-
- def test_get_package_file_hash_digest_from_manifest_without_metadata_dir(self):
- package = SOL004Package(
- "osm_common/tests/packages/native_charm_without_metadata_dir_vnf"
- )
- digest = package.get_package_file_hash_digest_from_manifest(
- "Scripts/charms/simple/src/charm.py"
- )
- self.assertEqual(
- digest, "ea72f897a966e6174ed9164fabc3c500df5a2f712eb6b22ab2408afb07d04d14"
- )
-
- def test_get_package_file_hash_digest_from_manifest_on_non_existent_file(self):
- package = SOL004Package(
- "osm_common/tests/packages/native_charm_with_metadata_dir_vnf"
- )
- with self.assertRaises(SOL004PackageException):
- package.get_package_file_hash_digest_from_manifest("Non/Existing/file")
-
- def test_get_package_file_hash_digest_from_manifest_on_non_existing_hash_entry(
- self,
- ):
- package = SOL004Package("osm_common/tests/packages/invalid_package_vnf")
- with self.assertRaises(SOL004PackageException):
- package.get_package_file_hash_digest_from_manifest(
- "Scripts/charms/simple/hooks/upgrade-charm"
- )
-
- def test_validate_package_file_hash_with_metadata_dir(self):
- package = SOL004Package(
- "osm_common/tests/packages/native_charm_with_metadata_dir_vnf"
- )
- package.validate_package_file_hash("Scripts/charms/simple/src/charm.py")
-
- def test_validate_package_file_hash_without_metadata_dir(self):
- package = SOL004Package(
- "osm_common/tests/packages/native_charm_without_metadata_dir_vnf"
- )
- package.validate_package_file_hash("Scripts/charms/simple/src/charm.py")
-
- def test_validate_package_file_hash_on_non_existing_file(self):
- package = SOL004Package(
- "osm_common/tests/packages/native_charm_with_metadata_dir_vnf"
- )
- with self.assertRaises(SOL004PackageException):
- package.validate_package_file_hash("Non/Existing/file")
-
- def test_validate_package_file_hash_on_wrong_manifest_hash(self):
- package = SOL004Package("osm_common/tests/packages/invalid_package_vnf")
- with self.assertRaises(SOL004PackageException):
- package.validate_package_file_hash("Scripts/charms/simple/hooks/start")
-
- def test_validate_package_file_hash_on_unsupported_hash_algorithm(self):
- package = SOL004Package("osm_common/tests/packages/invalid_package_vnf")
- with self.assertRaises(SOL004PackageException):
- package.validate_package_file_hash("Scripts/charms/simple/src/charm.py")
-
- def test_validate_package_hashes_with_metadata_dir(self):
- package = SOL004Package(
- "osm_common/tests/packages/native_charm_with_metadata_dir_vnf"
- )
- package.validate_package_hashes()
-
- def test_validate_package_hashes_without_metadata_dir(self):
- package = SOL004Package(
- "osm_common/tests/packages/native_charm_without_metadata_dir_vnf"
- )
- package.validate_package_hashes()
-
- def test_validate_package_hashes_on_invalid_package(self):
- package = SOL004Package("osm_common/tests/packages/invalid_package_vnf")
- with self.assertRaises(SOL004PackageException):
- package.validate_package_hashes()
-
- def test_get_descriptor_location_with_metadata_dir(self):
- package = SOL004Package(
- "osm_common/tests/packages/native_charm_with_metadata_dir_vnf"
- )
- descriptor_path = package.get_descriptor_location()
- self.assertEqual(descriptor_path, "Definitions/native_charm_vnfd.yaml")
-
- def test_get_descriptor_location_without_metadata_dir(self):
- package = SOL004Package(
- "osm_common/tests/packages/native_charm_without_metadata_dir_vnf"
- )
- descriptor_path = package.get_descriptor_location()
- self.assertEqual(descriptor_path, "native_charm_vnfd.yaml")