1 # -*- coding: utf-8 -*-
3 # Copyright 2021 Whitestack, LLC
4 # *************************************************************
6 # This file is part of OSM common repository.
7 # All Rights Reserved to Whitestack, LLC
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: fbravo@whitestack.com or agarcia@whitestack.com
30 class SOLPackageException(Exception):
35 _METADATA_FILE_PATH
= "TOSCA-Metadata/TOSCA.meta"
36 _METADATA_DESCRIPTOR_FIELD
= "Entry-Definitions"
37 _METADATA_MANIFEST_FIELD
= "ETSI-Entry-Manifest"
38 _METADATA_CHANGELOG_FIELD
= "ETSI-Entry-Change-Log"
39 _METADATA_LICENSES_FIELD
= "ETSI-Entry-Licenses"
40 _METADATA_DEFAULT_CHANGELOG_PATH
= "ChangeLog.txt"
41 _METADATA_DEFAULT_LICENSES_PATH
= "Licenses"
42 _MANIFEST_FILE_PATH_FIELD
= "Source"
43 _MANIFEST_FILE_HASH_ALGORITHM_FIELD
= "Algorithm"
44 _MANIFEST_FILE_HASH_DIGEST_FIELD
= "Hash"
46 _MANIFEST_ALL_FIELDS
= []
48 def __init__(self
, package_path
=""):
49 self
._package
_path
= package_path
51 self
._package
_metadata
= self
._parse
_package
_metadata
()
54 self
._manifest
_data
= self
._parse
_manifest
_data
()
56 self
._manifest
_data
= None
59 self
._manifest
_metadata
= self
._parse
_manifest
_metadata
()
61 self
._manifest
_metadata
= None
63 def _parse_package_metadata(self
):
65 return self
._parse
_package
_metadata
_with
_metadata
_dir
()
66 except FileNotFoundError
:
67 return self
._parse
_package
_metadata
_without
_metadata
_dir
()
69 def _parse_package_metadata_with_metadata_dir(self
):
71 return self
._parse
_file
_in
_blocks
(self
._METADATA
_FILE
_PATH
)
72 except FileNotFoundError
as e
:
74 except (Exception, OSError) as e
:
75 raise SOLPackageException(
76 "Error parsing {}: {}".format(self
._METADATA
_FILE
_PATH
, e
)
79 def _parse_package_metadata_without_metadata_dir(self
):
80 package_root_files
= {f
for f
in os
.listdir(self
._package
_path
)}
81 package_root_yamls
= [
82 f
for f
in package_root_files
if f
.endswith(".yml") or f
.endswith(".yaml")
84 if len(package_root_yamls
) != 1:
85 error_msg
= "Error parsing package metadata: there should be exactly 1 descriptor YAML, found {}"
86 raise SOLPackageException(error_msg
.format(len(package_root_yamls
)))
90 SOLPackage
._METADATA
_DESCRIPTOR
_FIELD
: package_root_yamls
[0],
91 SOLPackage
._METADATA
_MANIFEST
_FIELD
: "{}.mf".format(
92 os
.path
.splitext(package_root_yamls
[0])[0]
94 SOLPackage
._METADATA
_CHANGELOG
_FIELD
: SOLPackage
._METADATA
_DEFAULT
_CHANGELOG
_PATH
,
95 SOLPackage
._METADATA
_LICENSES
_FIELD
: SOLPackage
._METADATA
_DEFAULT
_LICENSES
_PATH
,
101 def _parse_manifest_data(self
):
103 for tosca_meta
in self
._package
_metadata
:
104 if SOLPackage
._METADATA
_MANIFEST
_FIELD
in tosca_meta
:
105 manifest_path
= tosca_meta
[SOLPackage
._METADATA
_MANIFEST
_FIELD
]
108 error_msg
= "Error parsing {}: no {} field on path".format(
109 self
._METADATA
_FILE
_PATH
, self
._METADATA
_MANIFEST
_FIELD
111 raise SOLPackageException(error_msg
)
114 return self
._parse
_file
_in
_blocks
(manifest_path
)
116 except (Exception, OSError) as e
:
117 raise SOLPackageException("Error parsing {}: {}".format(manifest_path
, e
))
119 def _parse_manifest_metadata(self
):
122 manifest_file
= os
.open(
124 self
._package
_path
, base_manifest
[self
._METADATA
_MANIFEST
_FIELD
]
128 for line
in manifest_file
:
129 fields_in_line
= line
.split(":", maxsplit
=1)
130 fields_in_line
[0] = fields_in_line
[0].strip()
131 fields_in_line
[1] = fields_in_line
[1].strip()
132 if fields_in_line
[0] in self
._MANIFEST
_ALL
_FIELDS
:
133 base_manifest
[fields_in_line
[0]] = fields_in_line
[1]
135 except (Exception, OSError) as e
:
136 raise SOLPackageException(
137 "Error parsing {}: {}".format(
138 base_manifest
[SOLPackage
._METADATA
_MANIFEST
_FIELD
], e
142 def _get_package_file_full_path(self
, file_relative_path
):
143 return os
.path
.join(self
._package
_path
, file_relative_path
)
145 def _parse_file_in_blocks(self
, file_relative_path
):
146 file_path
= self
._get
_package
_file
_full
_path
(file_relative_path
)
147 with
open(file_path
) as f
:
148 blocks
= f
.read().split("\n\n")
149 parsed_blocks
= map(yaml
.safe_load
, blocks
)
150 return [block
for block
in parsed_blocks
if block
is not None]
152 def _get_package_file_manifest_data(self
, file_relative_path
):
153 for file_data
in self
._manifest
_data
:
155 file_data
.get(SOLPackage
._MANIFEST
_FILE
_PATH
_FIELD
, "")
156 == file_relative_path
161 "Error parsing {} manifest data: file not found on manifest file".format(
165 raise SOLPackageException(error_msg
)
167 def get_package_file_hash_digest_from_manifest(self
, file_relative_path
):
168 """Returns the hash digest of a file inside this package as specified on the manifest file."""
169 file_manifest_data
= self
._get
_package
_file
_manifest
_data
(file_relative_path
)
171 return file_manifest_data
[SOLPackage
._MANIFEST
_FILE
_HASH
_DIGEST
_FIELD
]
172 except Exception as e
:
173 raise SOLPackageException(
174 "Error parsing {} hash digest: {}".format(file_relative_path
, e
)
177 def get_package_file_hash_algorithm_from_manifest(self
, file_relative_path
):
178 """Returns the hash algorithm of a file inside this package as specified on the manifest file."""
179 file_manifest_data
= self
._get
_package
_file
_manifest
_data
(file_relative_path
)
181 return file_manifest_data
[SOLPackage
._MANIFEST
_FILE
_HASH
_ALGORITHM
_FIELD
]
182 except Exception as e
:
183 raise SOLPackageException(
184 "Error parsing {} hash digest: {}".format(file_relative_path
, e
)
188 def _get_hash_function_from_hash_algorithm(hash_algorithm
):
189 function_to_algorithm
= {"SHA-256": hashlib
.sha256
, "SHA-512": hashlib
.sha512
}
190 if hash_algorithm
not in function_to_algorithm
:
192 "Error checking hash function: hash algorithm {} not supported".format(
196 raise SOLPackageException(error_msg
)
197 return function_to_algorithm
[hash_algorithm
]
199 def _calculate_file_hash(self
, file_relative_path
, hash_algorithm
):
200 file_path
= self
._get
_package
_file
_full
_path
(file_relative_path
)
201 hash_function
= self
._get
_hash
_function
_from
_hash
_algorithm
(hash_algorithm
)
203 with
open(file_path
, "rb") as f
:
204 return hash_function(f
.read()).hexdigest()
205 except Exception as e
:
206 raise SOLPackageException(
207 "Error hashing {}: {}".format(file_relative_path
, e
)
210 def validate_package_file_hash(self
, file_relative_path
):
211 """Validates the integrity of a file using the hash algorithm and digest on the package manifest."""
212 hash_algorithm
= self
.get_package_file_hash_algorithm_from_manifest(
215 file_hash
= self
._calculate
_file
_hash
(file_relative_path
, hash_algorithm
)
216 expected_file_hash
= self
.get_package_file_hash_digest_from_manifest(
219 if file_hash
!= expected_file_hash
:
220 error_msg
= "Error validating {} hash: calculated hash {} is different than manifest hash {}"
221 raise SOLPackageException(
222 error_msg
.format(file_relative_path
, file_hash
, expected_file_hash
)
225 def validate_package_hashes(self
):
226 """Validates the integrity of all files listed on the package manifest."""
227 for file_data
in self
._manifest
_data
:
228 if SOLPackage
._MANIFEST
_FILE
_PATH
_FIELD
in file_data
:
229 file_relative_path
= file_data
[SOLPackage
._MANIFEST
_FILE
_PATH
_FIELD
]
230 self
.validate_package_file_hash(file_relative_path
)
232 def create_or_update_metadata_file(self
):
234 Creates or updates the metadata file with the hashes calculated for each one of the package's files
236 if not self
._manifest
_metadata
:
237 self
.generate_manifest_data_from_descriptor()
239 self
.write_manifest_data_into_file()
241 def generate_manifest_data_from_descriptor(self
):
244 def write_manifest_data_into_file(self
):
245 with
open(self
.get_manifest_location(), "w") as metadata_file
:
246 # Write manifest metadata
247 for metadata_entry
in self
._manifest
_metadata
:
250 metadata_entry
, self
._manifest
_metadata
[metadata_entry
]
254 # Write package's files hashes
256 for root
, dirs
, files
in os
.walk(self
._package
_path
):
258 file_path
= os
.path
.join(root
, a_file
)
259 file_relative_path
= file_path
[len(self
._package
_path
) :]
260 if file_relative_path
.startswith("/"):
261 file_relative_path
= file_relative_path
[1:]
262 file_hashes
[file_relative_path
] = self
._calculate
_file
_hash
(
263 file_relative_path
, "SHA-512"
266 for file, hash in file_hashes
.items():
267 file_block
= "Source: {}\nAlgorithm: SHA-512\nHash: {}\n\n".format(
270 metadata_file
.write(file_block
)
272 def get_descriptor_location(self
):
273 """Returns this package descriptor location as a relative path from the package root."""
274 for tosca_meta
in self
._package
_metadata
:
275 if SOLPackage
._METADATA
_DESCRIPTOR
_FIELD
in tosca_meta
:
276 return tosca_meta
[SOLPackage
._METADATA
_DESCRIPTOR
_FIELD
]
278 error_msg
= "Error: no {} entry found on {}".format(
279 SOLPackage
._METADATA
_DESCRIPTOR
_FIELD
, SOLPackage
._METADATA
_FILE
_PATH
281 raise SOLPackageException(error_msg
)
283 def get_manifest_location(self
):
284 """Return the VNF/NS manifest location as a relative path from the package root."""
285 for tosca_meta
in self
._package
_metadata
:
286 if SOLPackage
._METADATA
_MANIFEST
_FIELD
in tosca_meta
:
287 return tosca_meta
[SOLPackage
._METADATA
_MANIFEST
_FIELD
]
289 raise SOLPackageException("No manifest file defined for this package")