1 # Copyright ETSI Contributors and Others.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
21 class SOLPackageException(Exception):
26 _METADATA_FILE_PATH
= "TOSCA-Metadata/TOSCA.meta"
27 _METADATA_DESCRIPTOR_FIELD
= "Entry-Definitions"
28 _METADATA_MANIFEST_FIELD
= "ETSI-Entry-Manifest"
29 _METADATA_CHANGELOG_FIELD
= "ETSI-Entry-Change-Log"
30 _METADATA_LICENSES_FIELD
= "ETSI-Entry-Licenses"
31 _METADATA_DEFAULT_CHANGELOG_PATH
= "ChangeLog.txt"
32 _METADATA_DEFAULT_LICENSES_PATH
= "Licenses"
33 _MANIFEST_FILE_PATH_FIELD
= "Source"
34 _MANIFEST_FILE_HASH_ALGORITHM_FIELD
= "Algorithm"
35 _MANIFEST_FILE_HASH_DIGEST_FIELD
= "Hash"
37 _MANIFEST_ALL_FIELDS
= []
39 def __init__(self
, package_path
=""):
40 self
._package
_path
= package_path
42 self
._package
_metadata
= self
._parse
_package
_metadata
()
45 self
._manifest
_data
= self
._parse
_manifest
_data
()
47 self
._manifest
_data
= None
50 self
._manifest
_metadata
= self
._parse
_manifest
_metadata
()
52 self
._manifest
_metadata
= None
54 def _parse_package_metadata(self
):
56 return self
._parse
_package
_metadata
_with
_metadata
_dir
()
57 except FileNotFoundError
:
58 return self
._parse
_package
_metadata
_without
_metadata
_dir
()
60 def _parse_package_metadata_with_metadata_dir(self
):
62 return self
._parse
_file
_in
_blocks
(self
._METADATA
_FILE
_PATH
)
63 except FileNotFoundError
as e
:
65 except (Exception, OSError) as e
:
66 raise SOLPackageException(
67 "Error parsing {}: {}".format(self
._METADATA
_FILE
_PATH
, e
)
70 def _parse_package_metadata_without_metadata_dir(self
):
71 package_root_files
= {f
for f
in os
.listdir(self
._package
_path
)}
72 package_root_yamls
= [
73 f
for f
in package_root_files
if f
.endswith(".yml") or f
.endswith(".yaml")
75 if len(package_root_yamls
) != 1:
76 error_msg
= "Error parsing package metadata: there should be exactly 1 descriptor YAML, found {}"
77 raise SOLPackageException(error_msg
.format(len(package_root_yamls
)))
81 SOLPackage
._METADATA
_DESCRIPTOR
_FIELD
: package_root_yamls
[0],
82 SOLPackage
._METADATA
_MANIFEST
_FIELD
: "{}.mf".format(
83 os
.path
.splitext(package_root_yamls
[0])[0]
85 SOLPackage
._METADATA
_CHANGELOG
_FIELD
: SOLPackage
._METADATA
_DEFAULT
_CHANGELOG
_PATH
,
86 SOLPackage
._METADATA
_LICENSES
_FIELD
: SOLPackage
._METADATA
_DEFAULT
_LICENSES
_PATH
,
92 def _parse_manifest_data(self
):
94 for tosca_meta
in self
._package
_metadata
:
95 if SOLPackage
._METADATA
_MANIFEST
_FIELD
in tosca_meta
:
96 manifest_path
= tosca_meta
[SOLPackage
._METADATA
_MANIFEST
_FIELD
]
99 error_msg
= "Error parsing {}: no {} field on path".format(
100 self
._METADATA
_FILE
_PATH
, self
._METADATA
_MANIFEST
_FIELD
102 raise SOLPackageException(error_msg
)
105 return self
._parse
_file
_in
_blocks
(manifest_path
)
107 except (Exception, OSError) as e
:
108 raise SOLPackageException("Error parsing {}: {}".format(manifest_path
, e
))
110 def _parse_manifest_metadata(self
):
113 manifest_file
= os
.open(
116 base_manifest
[SOLPackage
._METADATA
_MANIFEST
_FIELD
],
120 for line
in manifest_file
:
121 fields_in_line
= line
.split(":", maxsplit
=1)
122 fields_in_line
[0] = fields_in_line
[0].strip()
123 fields_in_line
[1] = fields_in_line
[1].strip()
124 if fields_in_line
[0] in self
._MANIFEST
_ALL
_FIELDS
:
125 base_manifest
[fields_in_line
[0]] = fields_in_line
[1]
127 except (Exception, OSError) as e
:
128 raise SOLPackageException(
129 "Error parsing {}: {}".format(
130 base_manifest
[SOLPackage
._METADATA
_MANIFEST
_FIELD
], e
134 def _get_package_file_full_path(self
, file_relative_path
):
135 return os
.path
.join(self
._package
_path
, file_relative_path
)
137 def _parse_file_in_blocks(self
, file_relative_path
):
138 file_path
= self
._get
_package
_file
_full
_path
(file_relative_path
)
139 with
open(file_path
) as f
:
140 blocks
= f
.read().split("\n\n")
141 parsed_blocks
= map(yaml
.safe_load
, blocks
)
142 return [block
for block
in parsed_blocks
if block
is not None]
144 def _get_package_file_manifest_data(self
, file_relative_path
):
145 for file_data
in self
._manifest
_data
:
147 file_data
.get(SOLPackage
._MANIFEST
_FILE
_PATH
_FIELD
, "")
148 == file_relative_path
153 "Error parsing {} manifest data: file not found on manifest file".format(
157 raise SOLPackageException(error_msg
)
159 def get_package_file_hash_digest_from_manifest(self
, file_relative_path
):
160 """Returns the hash digest of a file inside this package as specified on the manifest file."""
161 file_manifest_data
= self
._get
_package
_file
_manifest
_data
(file_relative_path
)
163 return file_manifest_data
[SOLPackage
._MANIFEST
_FILE
_HASH
_DIGEST
_FIELD
]
164 except Exception as e
:
165 raise SOLPackageException(
166 "Error parsing {} hash digest: {}".format(file_relative_path
, e
)
169 def get_package_file_hash_algorithm_from_manifest(self
, file_relative_path
):
170 """Returns the hash algorithm of a file inside this package as specified on the manifest file."""
171 file_manifest_data
= self
._get
_package
_file
_manifest
_data
(file_relative_path
)
173 return file_manifest_data
[SOLPackage
._MANIFEST
_FILE
_HASH
_ALGORITHM
_FIELD
]
174 except Exception as e
:
175 raise SOLPackageException(
176 "Error parsing {} hash digest: {}".format(file_relative_path
, e
)
180 def _get_hash_function_from_hash_algorithm(hash_algorithm
):
181 function_to_algorithm
= {"SHA-256": hashlib
.sha256
, "SHA-512": hashlib
.sha512
}
182 if hash_algorithm
not in function_to_algorithm
:
184 "Error checking hash function: hash algorithm {} not supported".format(
188 raise SOLPackageException(error_msg
)
189 return function_to_algorithm
[hash_algorithm
]
191 def _calculate_file_hash(self
, file_relative_path
, hash_algorithm
):
192 file_path
= self
._get
_package
_file
_full
_path
(file_relative_path
)
193 hash_function
= self
._get
_hash
_function
_from
_hash
_algorithm
(hash_algorithm
)
195 with
open(file_path
, "rb") as f
:
196 return hash_function(f
.read()).hexdigest()
197 except Exception as e
:
198 raise SOLPackageException(
199 "Error hashing {}: {}".format(file_relative_path
, e
)
202 def validate_package_file_hash(self
, file_relative_path
):
203 """Validates the integrity of a file using the hash algorithm and digest on the package manifest."""
204 hash_algorithm
= self
.get_package_file_hash_algorithm_from_manifest(
207 file_hash
= self
._calculate
_file
_hash
(file_relative_path
, hash_algorithm
)
208 expected_file_hash
= self
.get_package_file_hash_digest_from_manifest(
211 if file_hash
!= expected_file_hash
:
212 error_msg
= "Error validating {} hash: calculated hash {} is different than manifest hash {}"
213 raise SOLPackageException(
214 error_msg
.format(file_relative_path
, file_hash
, expected_file_hash
)
217 def validate_package_hashes(self
):
218 """Validates the integrity of all files listed on the package manifest."""
219 for file_data
in self
._manifest
_data
:
220 if SOLPackage
._MANIFEST
_FILE
_PATH
_FIELD
in file_data
:
221 file_relative_path
= file_data
[SOLPackage
._MANIFEST
_FILE
_PATH
_FIELD
]
222 self
.validate_package_file_hash(file_relative_path
)
224 def create_or_update_metadata_file(self
):
226 Creates or updates the metadata file with the hashes calculated for each one of the package's files
228 if not self
._manifest
_metadata
:
229 self
.generate_manifest_data_from_descriptor()
231 self
.write_manifest_data_into_file()
233 def generate_manifest_data_from_descriptor(self
):
236 def write_manifest_data_into_file(self
):
237 with
open(self
.get_manifest_location(), "w") as metadata_file
:
238 # Write manifest metadata
239 for metadata_entry
in self
._manifest
_metadata
:
242 metadata_entry
, self
._manifest
_metadata
[metadata_entry
]
246 # Write package's files hashes
248 for root
, dirs
, files
in os
.walk(self
._package
_path
):
250 file_path
= os
.path
.join(root
, a_file
)
251 file_relative_path
= file_path
[len(self
._package
_path
) :]
252 if file_relative_path
.startswith("/"):
253 file_relative_path
= file_relative_path
[1:]
254 file_hashes
[file_relative_path
] = self
._calculate
_file
_hash
(
255 file_relative_path
, "SHA-512"
258 for file, hash in file_hashes
.items():
259 file_block
= "Source: {}\nAlgorithm: SHA-512\nHash: {}\n\n".format(
262 metadata_file
.write(file_block
)
264 def get_descriptor_location(self
):
265 """Returns this package descriptor location as a relative path from the package root."""
266 for tosca_meta
in self
._package
_metadata
:
267 if SOLPackage
._METADATA
_DESCRIPTOR
_FIELD
in tosca_meta
:
268 return tosca_meta
[SOLPackage
._METADATA
_DESCRIPTOR
_FIELD
]
270 error_msg
= "Error: no {} entry found on {}".format(
271 SOLPackage
._METADATA
_DESCRIPTOR
_FIELD
, SOLPackage
._METADATA
_FILE
_PATH
273 raise SOLPackageException(error_msg
)
275 def get_manifest_location(self
):
276 """Return the VNF/NS manifest location as a relative path from the package root."""
277 for tosca_meta
in self
._package
_metadata
:
278 if SOLPackage
._METADATA
_MANIFEST
_FIELD
in tosca_meta
:
279 return tosca_meta
[SOLPackage
._METADATA
_MANIFEST
_FIELD
]
281 raise SOLPackageException("No manifest file defined for this package")