1 |
|
# -*- coding: utf-8 -*- |
2 |
|
|
3 |
|
# Copyright 2021 Whitestack, LLC |
4 |
|
# ************************************************************* |
5 |
|
# |
6 |
|
# This file is part of OSM common repository. |
7 |
|
# All Rights Reserved to Whitestack, LLC |
8 |
|
# |
9 |
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may |
10 |
|
# not use this file except in compliance with the License. You may obtain |
11 |
|
# a copy of the License at |
12 |
|
# |
13 |
|
# http://www.apache.org/licenses/LICENSE-2.0 |
14 |
|
# |
15 |
|
# Unless required by applicable law or agreed to in writing, software |
16 |
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
17 |
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
18 |
|
# License for the specific language governing permissions and limitations |
19 |
|
# under the License. |
20 |
|
# |
21 |
|
# For those usages not covered by the Apache License, Version 2.0 please |
22 |
|
# contact: fbravo@whitestack.com or agarcia@whitestack.com |
23 |
|
## |
24 |
0 |
import hashlib |
25 |
0 |
import os |
26 |
|
|
27 |
0 |
import yaml |
28 |
|
|
29 |
|
|
30 |
0 |
class SOLPackageException(Exception): |
31 |
0 |
pass |
32 |
|
|
33 |
|
|
34 |
0 |
class SOLPackage: |
35 |
0 |
_METADATA_FILE_PATH = "TOSCA-Metadata/TOSCA.meta" |
36 |
0 |
_METADATA_DESCRIPTOR_FIELD = "Entry-Definitions" |
37 |
0 |
_METADATA_MANIFEST_FIELD = "ETSI-Entry-Manifest" |
38 |
0 |
_METADATA_CHANGELOG_FIELD = "ETSI-Entry-Change-Log" |
39 |
0 |
_METADATA_LICENSES_FIELD = "ETSI-Entry-Licenses" |
40 |
0 |
_METADATA_DEFAULT_CHANGELOG_PATH = "ChangeLog.txt" |
41 |
0 |
_METADATA_DEFAULT_LICENSES_PATH = "Licenses" |
42 |
0 |
_MANIFEST_FILE_PATH_FIELD = "Source" |
43 |
0 |
_MANIFEST_FILE_HASH_ALGORITHM_FIELD = "Algorithm" |
44 |
0 |
_MANIFEST_FILE_HASH_DIGEST_FIELD = "Hash" |
45 |
|
|
46 |
0 |
_MANIFEST_ALL_FIELDS = [] |
47 |
|
|
48 |
0 |
def __init__(self, package_path=""): |
49 |
0 |
self._package_path = package_path |
50 |
|
|
51 |
0 |
self._package_metadata = self._parse_package_metadata() |
52 |
|
|
53 |
0 |
try: |
54 |
0 |
self._manifest_data = self._parse_manifest_data() |
55 |
0 |
except Exception: |
56 |
0 |
self._manifest_data = None |
57 |
|
|
58 |
0 |
try: |
59 |
0 |
self._manifest_metadata = self._parse_manifest_metadata() |
60 |
0 |
except Exception: |
61 |
0 |
self._manifest_metadata = None |
62 |
|
|
63 |
0 |
def _parse_package_metadata(self): |
64 |
0 |
try: |
65 |
0 |
return self._parse_package_metadata_with_metadata_dir() |
66 |
0 |
except FileNotFoundError: |
67 |
0 |
return self._parse_package_metadata_without_metadata_dir() |
68 |
|
|
69 |
0 |
def _parse_package_metadata_with_metadata_dir(self): |
70 |
0 |
try: |
71 |
0 |
return self._parse_file_in_blocks(self._METADATA_FILE_PATH) |
72 |
0 |
except FileNotFoundError as e: |
73 |
0 |
raise e |
74 |
0 |
except (Exception, OSError) as e: |
75 |
0 |
raise SOLPackageException( |
76 |
|
"Error parsing {}: {}".format(self._METADATA_FILE_PATH, e) |
77 |
|
) |
78 |
|
|
79 |
0 |
def _parse_package_metadata_without_metadata_dir(self): |
80 |
0 |
package_root_files = {f for f in os.listdir(self._package_path)} |
81 |
0 |
package_root_yamls = [ |
82 |
|
f for f in package_root_files if f.endswith(".yml") or f.endswith(".yaml") |
83 |
|
] |
84 |
0 |
if len(package_root_yamls) != 1: |
85 |
0 |
error_msg = "Error parsing package metadata: there should be exactly 1 descriptor YAML, found {}" |
86 |
0 |
raise SOLPackageException(error_msg.format(len(package_root_yamls))) |
87 |
|
|
88 |
0 |
base_manifest = [ |
89 |
|
{ |
90 |
|
SOLPackage._METADATA_DESCRIPTOR_FIELD: package_root_yamls[0], |
91 |
|
SOLPackage._METADATA_MANIFEST_FIELD: "{}.mf".format( |
92 |
|
os.path.splitext(package_root_yamls[0])[0] |
93 |
|
), |
94 |
|
SOLPackage._METADATA_CHANGELOG_FIELD: SOLPackage._METADATA_DEFAULT_CHANGELOG_PATH, |
95 |
|
SOLPackage._METADATA_LICENSES_FIELD: SOLPackage._METADATA_DEFAULT_LICENSES_PATH, |
96 |
|
} |
97 |
|
] |
98 |
|
|
99 |
0 |
return base_manifest |
100 |
|
|
101 |
0 |
def _parse_manifest_data(self): |
102 |
0 |
manifest_path = None |
103 |
0 |
for tosca_meta in self._package_metadata: |
104 |
0 |
if SOLPackage._METADATA_MANIFEST_FIELD in tosca_meta: |
105 |
0 |
manifest_path = tosca_meta[SOLPackage._METADATA_MANIFEST_FIELD] |
106 |
0 |
break |
107 |
|
else: |
108 |
0 |
error_msg = "Error parsing {}: no {} field on path".format( |
109 |
|
self._METADATA_FILE_PATH, self._METADATA_MANIFEST_FIELD |
110 |
|
) |
111 |
0 |
raise SOLPackageException(error_msg) |
112 |
|
|
113 |
0 |
try: |
114 |
0 |
return self._parse_file_in_blocks(manifest_path) |
115 |
|
|
116 |
0 |
except (Exception, OSError) as e: |
117 |
0 |
raise SOLPackageException("Error parsing {}: {}".format(manifest_path, e)) |
118 |
|
|
119 |
0 |
def _parse_manifest_metadata(self): |
120 |
0 |
try: |
121 |
0 |
base_manifest = {} |
122 |
0 |
manifest_file = os.open( |
123 |
|
os.path.join( |
124 |
|
self._package_path, base_manifest[self._METADATA_MANIFEST_FIELD] |
125 |
|
), |
126 |
|
"rw", |
127 |
|
) |
128 |
0 |
for line in manifest_file: |
129 |
0 |
fields_in_line = line.split(":", maxsplit=1) |
130 |
0 |
fields_in_line[0] = fields_in_line[0].strip() |
131 |
0 |
fields_in_line[1] = fields_in_line[1].strip() |
132 |
0 |
if fields_in_line[0] in self._MANIFEST_ALL_FIELDS: |
133 |
0 |
base_manifest[fields_in_line[0]] = fields_in_line[1] |
134 |
0 |
return base_manifest |
135 |
0 |
except (Exception, OSError) as e: |
136 |
0 |
raise SOLPackageException( |
137 |
|
"Error parsing {}: {}".format( |
138 |
|
base_manifest[SOLPackage._METADATA_MANIFEST_FIELD], e |
139 |
|
) |
140 |
|
) |
141 |
|
|
142 |
0 |
def _get_package_file_full_path(self, file_relative_path): |
143 |
0 |
return os.path.join(self._package_path, file_relative_path) |
144 |
|
|
145 |
0 |
def _parse_file_in_blocks(self, file_relative_path): |
146 |
0 |
file_path = self._get_package_file_full_path(file_relative_path) |
147 |
0 |
with open(file_path) as f: |
148 |
0 |
blocks = f.read().split("\n\n") |
149 |
0 |
parsed_blocks = map(yaml.safe_load, blocks) |
150 |
0 |
return [block for block in parsed_blocks if block is not None] |
151 |
|
|
152 |
0 |
def _get_package_file_manifest_data(self, file_relative_path): |
153 |
0 |
for file_data in self._manifest_data: |
154 |
0 |
if ( |
155 |
|
file_data.get(SOLPackage._MANIFEST_FILE_PATH_FIELD, "") |
156 |
|
== file_relative_path |
157 |
|
): |
158 |
0 |
return file_data |
159 |
|
|
160 |
0 |
error_msg = ( |
161 |
|
"Error parsing {} manifest data: file not found on manifest file".format( |
162 |
|
file_relative_path |
163 |
|
) |
164 |
|
) |
165 |
0 |
raise SOLPackageException(error_msg) |
166 |
|
|
167 |
0 |
def get_package_file_hash_digest_from_manifest(self, file_relative_path): |
168 |
|
"""Returns the hash digest of a file inside this package as specified on the manifest file.""" |
169 |
0 |
file_manifest_data = self._get_package_file_manifest_data(file_relative_path) |
170 |
0 |
try: |
171 |
0 |
return file_manifest_data[SOLPackage._MANIFEST_FILE_HASH_DIGEST_FIELD] |
172 |
0 |
except Exception as e: |
173 |
0 |
raise SOLPackageException( |
174 |
|
"Error parsing {} hash digest: {}".format(file_relative_path, e) |
175 |
|
) |
176 |
|
|
177 |
0 |
def get_package_file_hash_algorithm_from_manifest(self, file_relative_path): |
178 |
|
"""Returns the hash algorithm of a file inside this package as specified on the manifest file.""" |
179 |
0 |
file_manifest_data = self._get_package_file_manifest_data(file_relative_path) |
180 |
0 |
try: |
181 |
0 |
return file_manifest_data[SOLPackage._MANIFEST_FILE_HASH_ALGORITHM_FIELD] |
182 |
0 |
except Exception as e: |
183 |
0 |
raise SOLPackageException( |
184 |
|
"Error parsing {} hash digest: {}".format(file_relative_path, e) |
185 |
|
) |
186 |
|
|
187 |
0 |
@staticmethod |
188 |
0 |
def _get_hash_function_from_hash_algorithm(hash_algorithm): |
189 |
0 |
function_to_algorithm = {"SHA-256": hashlib.sha256, "SHA-512": hashlib.sha512} |
190 |
0 |
if hash_algorithm not in function_to_algorithm: |
191 |
0 |
error_msg = ( |
192 |
|
"Error checking hash function: hash algorithm {} not supported".format( |
193 |
|
hash_algorithm |
194 |
|
) |
195 |
|
) |
196 |
0 |
raise SOLPackageException(error_msg) |
197 |
0 |
return function_to_algorithm[hash_algorithm] |
198 |
|
|
199 |
0 |
def _calculate_file_hash(self, file_relative_path, hash_algorithm): |
200 |
0 |
file_path = self._get_package_file_full_path(file_relative_path) |
201 |
0 |
hash_function = self._get_hash_function_from_hash_algorithm(hash_algorithm) |
202 |
0 |
try: |
203 |
0 |
with open(file_path, "rb") as f: |
204 |
0 |
return hash_function(f.read()).hexdigest() |
205 |
0 |
except Exception as e: |
206 |
0 |
raise SOLPackageException( |
207 |
|
"Error hashing {}: {}".format(file_relative_path, e) |
208 |
|
) |
209 |
|
|
210 |
0 |
def validate_package_file_hash(self, file_relative_path): |
211 |
|
"""Validates the integrity of a file using the hash algorithm and digest on the package manifest.""" |
212 |
0 |
hash_algorithm = self.get_package_file_hash_algorithm_from_manifest( |
213 |
|
file_relative_path |
214 |
|
) |
215 |
0 |
file_hash = self._calculate_file_hash(file_relative_path, hash_algorithm) |
216 |
0 |
expected_file_hash = self.get_package_file_hash_digest_from_manifest( |
217 |
|
file_relative_path |
218 |
|
) |
219 |
0 |
if file_hash != expected_file_hash: |
220 |
0 |
error_msg = "Error validating {} hash: calculated hash {} is different than manifest hash {}" |
221 |
0 |
raise SOLPackageException( |
222 |
|
error_msg.format(file_relative_path, file_hash, expected_file_hash) |
223 |
|
) |
224 |
|
|
225 |
0 |
def validate_package_hashes(self): |
226 |
|
"""Validates the integrity of all files listed on the package manifest.""" |
227 |
0 |
for file_data in self._manifest_data: |
228 |
0 |
if SOLPackage._MANIFEST_FILE_PATH_FIELD in file_data: |
229 |
0 |
file_relative_path = file_data[SOLPackage._MANIFEST_FILE_PATH_FIELD] |
230 |
0 |
self.validate_package_file_hash(file_relative_path) |
231 |
|
|
232 |
0 |
def create_or_update_metadata_file(self): |
233 |
|
""" |
234 |
|
Creates or updates the metadata file with the hashes calculated for each one of the package's files |
235 |
|
""" |
236 |
0 |
if not self._manifest_metadata: |
237 |
0 |
self.generate_manifest_data_from_descriptor() |
238 |
|
|
239 |
0 |
self.write_manifest_data_into_file() |
240 |
|
|
241 |
0 |
def generate_manifest_data_from_descriptor(self): |
242 |
0 |
pass |
243 |
|
|
244 |
0 |
def write_manifest_data_into_file(self): |
245 |
0 |
with open(self.get_manifest_location(), "w") as metadata_file: |
246 |
|
# Write manifest metadata |
247 |
0 |
for metadata_entry in self._manifest_metadata: |
248 |
0 |
metadata_file.write( |
249 |
|
"{}: {}\n".format( |
250 |
|
metadata_entry, self._manifest_metadata[metadata_entry] |
251 |
|
) |
252 |
|
) |
253 |
|
|
254 |
|
# Write package's files hashes |
255 |
0 |
file_hashes = {} |
256 |
0 |
for root, dirs, files in os.walk(self._package_path): |
257 |
0 |
for a_file in files: |
258 |
0 |
file_path = os.path.join(root, a_file) |
259 |
0 |
file_relative_path = file_path[len(self._package_path) :] |
260 |
0 |
if file_relative_path.startswith("/"): |
261 |
0 |
file_relative_path = file_relative_path[1:] |
262 |
0 |
file_hashes[file_relative_path] = self._calculate_file_hash( |
263 |
|
file_relative_path, "SHA-512" |
264 |
|
) |
265 |
|
|
266 |
0 |
for file, hash in file_hashes.items(): |
267 |
0 |
file_block = "Source: {}\nAlgorithm: SHA-512\nHash: {}\n\n".format( |
268 |
|
file, hash |
269 |
|
) |
270 |
0 |
metadata_file.write(file_block) |
271 |
|
|
272 |
0 |
def get_descriptor_location(self): |
273 |
|
"""Returns this package descriptor location as a relative path from the package root.""" |
274 |
0 |
for tosca_meta in self._package_metadata: |
275 |
0 |
if SOLPackage._METADATA_DESCRIPTOR_FIELD in tosca_meta: |
276 |
0 |
return tosca_meta[SOLPackage._METADATA_DESCRIPTOR_FIELD] |
277 |
|
|
278 |
0 |
error_msg = "Error: no {} entry found on {}".format( |
279 |
|
SOLPackage._METADATA_DESCRIPTOR_FIELD, SOLPackage._METADATA_FILE_PATH |
280 |
|
) |
281 |
0 |
raise SOLPackageException(error_msg) |
282 |
|
|
283 |
0 |
def get_manifest_location(self): |
284 |
|
"""Return the VNF/NS manifest location as a relative path from the package root.""" |
285 |
0 |
for tosca_meta in self._package_metadata: |
286 |
0 |
if SOLPackage._METADATA_MANIFEST_FIELD in tosca_meta: |
287 |
0 |
return tosca_meta[SOLPackage._METADATA_MANIFEST_FIELD] |
288 |
|
|
289 |
0 |
raise SOLPackageException("No manifest file defined for this package") |