e336cd5b7e6b66a30d4a9b38a0c5c2efbf04ef8c
[osm/common.git] / osm_common / sol_package.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2021 Whitestack, LLC
4 # *************************************************************
5 #
6 # This file is part of OSM common repository.
7 # All Rights Reserved to Whitestack, LLC
8 #
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12 #
13 # http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20 #
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: fbravo@whitestack.com or agarcia@whitestack.com
23 ##
24
25 import os
26 import yaml
27 import hashlib
28
29
30 class SOLPackageException(Exception):
31 pass
32
33
34 class SOLPackage:
35 _METADATA_FILE_PATH = "TOSCA-Metadata/TOSCA.meta"
36 _METADATA_DESCRIPTOR_FIELD = "Entry-Definitions"
37 _METADATA_MANIFEST_FIELD = "ETSI-Entry-Manifest"
38 _METADATA_CHANGELOG_FIELD = "ETSI-Entry-Change-Log"
39 _METADATA_LICENSES_FIELD = "ETSI-Entry-Licenses"
40 _METADATA_DEFAULT_CHANGELOG_PATH = "ChangeLog.txt"
41 _METADATA_DEFAULT_LICENSES_PATH = "Licenses"
42 _MANIFEST_FILE_PATH_FIELD = "Source"
43 _MANIFEST_FILE_HASH_ALGORITHM_FIELD = "Algorithm"
44 _MANIFEST_FILE_HASH_DIGEST_FIELD = "Hash"
45
46 _MANIFEST_ALL_FIELDS = []
47
48 def __init__(self, package_path=""):
49 self._package_path = package_path
50
51 self._package_metadata = self._parse_package_metadata()
52
53 try:
54 self._manifest_data = self._parse_manifest_data()
55 except Exception:
56 self._manifest_data = None
57
58 try:
59 self._manifest_metadata = self._parse_manifest_metadata()
60 except Exception:
61 self._manifest_metadata = None
62
63 def _parse_package_metadata(self):
64 try:
65 return self._parse_package_metadata_with_metadata_dir()
66 except FileNotFoundError:
67 return self._parse_package_metadata_without_metadata_dir()
68
69 def _parse_package_metadata_with_metadata_dir(self):
70 try:
71 return self._parse_file_in_blocks(self._METADATA_FILE_PATH)
72 except FileNotFoundError as e:
73 raise e
74 except (Exception, OSError) as e:
75 raise SOLPackageException(
76 "Error parsing {}: {}".format(self._METADATA_FILE_PATH, e)
77 )
78
79 def _parse_package_metadata_without_metadata_dir(self):
80 package_root_files = {f for f in os.listdir(self._package_path)}
81 package_root_yamls = [
82 f for f in package_root_files if f.endswith(".yml") or f.endswith(".yaml")
83 ]
84 if len(package_root_yamls) != 1:
85 error_msg = "Error parsing package metadata: there should be exactly 1 descriptor YAML, found {}"
86 raise SOLPackageException(error_msg.format(len(package_root_yamls)))
87
88 base_manifest = [
89 {
90 SOLPackage._METADATA_DESCRIPTOR_FIELD: package_root_yamls[0],
91 SOLPackage._METADATA_MANIFEST_FIELD: "{}.mf".format(
92 os.path.splitext(package_root_yamls[0])[0]
93 ),
94 SOLPackage._METADATA_CHANGELOG_FIELD: SOLPackage._METADATA_DEFAULT_CHANGELOG_PATH,
95 SOLPackage._METADATA_LICENSES_FIELD: SOLPackage._METADATA_DEFAULT_LICENSES_PATH,
96 }
97 ]
98
99 return base_manifest
100
101 def _parse_manifest_data(self):
102 manifest_path = None
103 for tosca_meta in self._package_metadata:
104 if SOLPackage._METADATA_MANIFEST_FIELD in tosca_meta:
105 manifest_path = tosca_meta[SOLPackage._METADATA_MANIFEST_FIELD]
106 break
107 else:
108 error_msg = "Error parsing {}: no {} field on path".format(
109 self._METADATA_FILE_PATH, self._METADATA_MANIFEST_FIELD
110 )
111 raise SOLPackageException(error_msg)
112
113 try:
114 return self._parse_file_in_blocks(manifest_path)
115
116 except (Exception, OSError) as e:
117 raise SOLPackageException("Error parsing {}: {}".format(manifest_path, e))
118
119 def _parse_manifest_metadata(self):
120 try:
121 base_manifest = {}
122 manifest_file = os.open(
123 os.path.join(
124 self._package_path, base_manifest[self._METADATA_MANIFEST_FIELD]
125 ),
126 "rw",
127 )
128 for line in manifest_file:
129 fields_in_line = line.split(":", maxsplit=1)
130 fields_in_line[0] = fields_in_line[0].strip()
131 fields_in_line[1] = fields_in_line[1].strip()
132 if fields_in_line[0] in self._MANIFEST_ALL_FIELDS:
133 base_manifest[fields_in_line[0]] = fields_in_line[1]
134 return base_manifest
135 except (Exception, OSError) as e:
136 raise SOLPackageException(
137 "Error parsing {}: {}".format(
138 base_manifest[SOLPackage._METADATA_MANIFEST_FIELD], e
139 )
140 )
141
142 def _get_package_file_full_path(self, file_relative_path):
143 return os.path.join(self._package_path, file_relative_path)
144
145 def _parse_file_in_blocks(self, file_relative_path):
146 file_path = self._get_package_file_full_path(file_relative_path)
147 with open(file_path) as f:
148 blocks = f.read().split("\n\n")
149 parsed_blocks = map(yaml.safe_load, blocks)
150 return [block for block in parsed_blocks if block is not None]
151
152 def _get_package_file_manifest_data(self, file_relative_path):
153 for file_data in self._manifest_data:
154 if (
155 file_data.get(SOLPackage._MANIFEST_FILE_PATH_FIELD, "")
156 == file_relative_path
157 ):
158 return file_data
159
160 error_msg = (
161 "Error parsing {} manifest data: file not found on manifest file".format(
162 file_relative_path
163 )
164 )
165 raise SOLPackageException(error_msg)
166
167 def get_package_file_hash_digest_from_manifest(self, file_relative_path):
168 """Returns the hash digest of a file inside this package as specified on the manifest file."""
169 file_manifest_data = self._get_package_file_manifest_data(file_relative_path)
170 try:
171 return file_manifest_data[SOLPackage._MANIFEST_FILE_HASH_DIGEST_FIELD]
172 except Exception as e:
173 raise SOLPackageException(
174 "Error parsing {} hash digest: {}".format(file_relative_path, e)
175 )
176
177 def get_package_file_hash_algorithm_from_manifest(self, file_relative_path):
178 """Returns the hash algorithm of a file inside this package as specified on the manifest file."""
179 file_manifest_data = self._get_package_file_manifest_data(file_relative_path)
180 try:
181 return file_manifest_data[SOLPackage._MANIFEST_FILE_HASH_ALGORITHM_FIELD]
182 except Exception as e:
183 raise SOLPackageException(
184 "Error parsing {} hash digest: {}".format(file_relative_path, e)
185 )
186
187 @staticmethod
188 def _get_hash_function_from_hash_algorithm(hash_algorithm):
189 function_to_algorithm = {"SHA-256": hashlib.sha256, "SHA-512": hashlib.sha512}
190 if hash_algorithm not in function_to_algorithm:
191 error_msg = (
192 "Error checking hash function: hash algorithm {} not supported".format(
193 hash_algorithm
194 )
195 )
196 raise SOLPackageException(error_msg)
197 return function_to_algorithm[hash_algorithm]
198
199 def _calculate_file_hash(self, file_relative_path, hash_algorithm):
200 file_path = self._get_package_file_full_path(file_relative_path)
201 hash_function = self._get_hash_function_from_hash_algorithm(hash_algorithm)
202 try:
203 with open(file_path, "rb") as f:
204 return hash_function(f.read()).hexdigest()
205 except Exception as e:
206 raise SOLPackageException(
207 "Error hashing {}: {}".format(file_relative_path, e)
208 )
209
210 def validate_package_file_hash(self, file_relative_path):
211 """Validates the integrity of a file using the hash algorithm and digest on the package manifest."""
212 hash_algorithm = self.get_package_file_hash_algorithm_from_manifest(
213 file_relative_path
214 )
215 file_hash = self._calculate_file_hash(file_relative_path, hash_algorithm)
216 expected_file_hash = self.get_package_file_hash_digest_from_manifest(
217 file_relative_path
218 )
219 if file_hash != expected_file_hash:
220 error_msg = "Error validating {} hash: calculated hash {} is different than manifest hash {}"
221 raise SOLPackageException(
222 error_msg.format(file_relative_path, file_hash, expected_file_hash)
223 )
224
225 def validate_package_hashes(self):
226 """Validates the integrity of all files listed on the package manifest."""
227 for file_data in self._manifest_data:
228 if SOLPackage._MANIFEST_FILE_PATH_FIELD in file_data:
229 file_relative_path = file_data[SOLPackage._MANIFEST_FILE_PATH_FIELD]
230 self.validate_package_file_hash(file_relative_path)
231
232 def create_or_update_metadata_file(self):
233 """
234 Creates or updates the metadata file with the hashes calculated for each one of the package's files
235 """
236 if not self._manifest_metadata:
237 self.generate_manifest_data_from_descriptor()
238
239 self.write_manifest_data_into_file()
240
241 def generate_manifest_data_from_descriptor(self):
242 pass
243
244 def write_manifest_data_into_file(self):
245 with open(self.get_manifest_location(), "w") as metadata_file:
246 # Write manifest metadata
247 for metadata_entry in self._manifest_metadata:
248 metadata_file.write(
249 "{}: {}\n".format(
250 metadata_entry, self._manifest_metadata[metadata_entry]
251 )
252 )
253
254 # Write package's files hashes
255 file_hashes = {}
256 for root, dirs, files in os.walk(self._package_path):
257 for a_file in files:
258 file_path = os.path.join(root, a_file)
259 file_relative_path = file_path[len(self._package_path) :]
260 if file_relative_path.startswith("/"):
261 file_relative_path = file_relative_path[1:]
262 file_hashes[file_relative_path] = self._calculate_file_hash(
263 file_relative_path, "SHA-512"
264 )
265
266 for file, hash in file_hashes.items():
267 file_block = "Source: {}\nAlgorithm: SHA-512\nHash: {}\n\n".format(
268 file, hash
269 )
270 metadata_file.write(file_block)
271
272 def get_descriptor_location(self):
273 """Returns this package descriptor location as a relative path from the package root."""
274 for tosca_meta in self._package_metadata:
275 if SOLPackage._METADATA_DESCRIPTOR_FIELD in tosca_meta:
276 return tosca_meta[SOLPackage._METADATA_DESCRIPTOR_FIELD]
277
278 error_msg = "Error: no {} entry found on {}".format(
279 SOLPackage._METADATA_DESCRIPTOR_FIELD, SOLPackage._METADATA_FILE_PATH
280 )
281 raise SOLPackageException(error_msg)
282
283 def get_manifest_location(self):
284 """Return the VNF/NS manifest location as a relative path from the package root."""
285 for tosca_meta in self._package_metadata:
286 if SOLPackage._METADATA_MANIFEST_FIELD in tosca_meta:
287 return tosca_meta[SOLPackage._METADATA_MANIFEST_FIELD]
288
289 raise SOLPackageException("No manifest file defined for this package")