Code Coverage

Cobertura Coverage Report > osm_common >

sol_package.py

Trend

File Coverage summary

NameClassesLinesConditionals
sol_package.py
0%
0/1
0%
0/158
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
sol_package.py
0%
0/158
N/A

Source

osm_common/sol_package.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2021 Whitestack, LLC
4 # *************************************************************
5 #
6 # This file is part of OSM common repository.
7 # All Rights Reserved to Whitestack, LLC
8 #
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12 #
13 #         http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20 #
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: fbravo@whitestack.com or agarcia@whitestack.com
23 ##
24 0 import hashlib
25 0 import os
26
27 0 import yaml
28
29
30 0 class SOLPackageException(Exception):
31 0     pass
32
33
34 0 class SOLPackage:
35 0     _METADATA_FILE_PATH = "TOSCA-Metadata/TOSCA.meta"
36 0     _METADATA_DESCRIPTOR_FIELD = "Entry-Definitions"
37 0     _METADATA_MANIFEST_FIELD = "ETSI-Entry-Manifest"
38 0     _METADATA_CHANGELOG_FIELD = "ETSI-Entry-Change-Log"
39 0     _METADATA_LICENSES_FIELD = "ETSI-Entry-Licenses"
40 0     _METADATA_DEFAULT_CHANGELOG_PATH = "ChangeLog.txt"
41 0     _METADATA_DEFAULT_LICENSES_PATH = "Licenses"
42 0     _MANIFEST_FILE_PATH_FIELD = "Source"
43 0     _MANIFEST_FILE_HASH_ALGORITHM_FIELD = "Algorithm"
44 0     _MANIFEST_FILE_HASH_DIGEST_FIELD = "Hash"
45
46 0     _MANIFEST_ALL_FIELDS = []
47
48 0     def __init__(self, package_path=""):
49 0         self._package_path = package_path
50
51 0         self._package_metadata = self._parse_package_metadata()
52
53 0         try:
54 0             self._manifest_data = self._parse_manifest_data()
55 0         except Exception:
56 0             self._manifest_data = None
57
58 0         try:
59 0             self._manifest_metadata = self._parse_manifest_metadata()
60 0         except Exception:
61 0             self._manifest_metadata = None
62
63 0     def _parse_package_metadata(self):
64 0         try:
65 0             return self._parse_package_metadata_with_metadata_dir()
66 0         except FileNotFoundError:
67 0             return self._parse_package_metadata_without_metadata_dir()
68
69 0     def _parse_package_metadata_with_metadata_dir(self):
70 0         try:
71 0             return self._parse_file_in_blocks(self._METADATA_FILE_PATH)
72 0         except FileNotFoundError as e:
73 0             raise e
74 0         except (Exception, OSError) as e:
75 0             raise SOLPackageException(
76                 "Error parsing {}: {}".format(self._METADATA_FILE_PATH, e)
77             )
78
79 0     def _parse_package_metadata_without_metadata_dir(self):
80 0         package_root_files = {f for f in os.listdir(self._package_path)}
81 0         package_root_yamls = [
82             f for f in package_root_files if f.endswith(".yml") or f.endswith(".yaml")
83         ]
84 0         if len(package_root_yamls) != 1:
85 0             error_msg = "Error parsing package metadata: there should be exactly 1 descriptor YAML, found {}"
86 0             raise SOLPackageException(error_msg.format(len(package_root_yamls)))
87
88 0         base_manifest = [
89             {
90                 SOLPackage._METADATA_DESCRIPTOR_FIELD: package_root_yamls[0],
91                 SOLPackage._METADATA_MANIFEST_FIELD: "{}.mf".format(
92                     os.path.splitext(package_root_yamls[0])[0]
93                 ),
94                 SOLPackage._METADATA_CHANGELOG_FIELD: SOLPackage._METADATA_DEFAULT_CHANGELOG_PATH,
95                 SOLPackage._METADATA_LICENSES_FIELD: SOLPackage._METADATA_DEFAULT_LICENSES_PATH,
96             }
97         ]
98
99 0         return base_manifest
100
101 0     def _parse_manifest_data(self):
102 0         manifest_path = None
103 0         for tosca_meta in self._package_metadata:
104 0             if SOLPackage._METADATA_MANIFEST_FIELD in tosca_meta:
105 0                 manifest_path = tosca_meta[SOLPackage._METADATA_MANIFEST_FIELD]
106 0                 break
107         else:
108 0             error_msg = "Error parsing {}: no {} field on path".format(
109                 self._METADATA_FILE_PATH, self._METADATA_MANIFEST_FIELD
110             )
111 0             raise SOLPackageException(error_msg)
112
113 0         try:
114 0             return self._parse_file_in_blocks(manifest_path)
115
116 0         except (Exception, OSError) as e:
117 0             raise SOLPackageException("Error parsing {}: {}".format(manifest_path, e))
118
119 0     def _parse_manifest_metadata(self):
120 0         try:
121 0             base_manifest = {}
122 0             manifest_file = os.open(
123                 os.path.join(
124                     self._package_path, base_manifest[self._METADATA_MANIFEST_FIELD]
125                 ),
126                 "rw",
127             )
128 0             for line in manifest_file:
129 0                 fields_in_line = line.split(":", maxsplit=1)
130 0                 fields_in_line[0] = fields_in_line[0].strip()
131 0                 fields_in_line[1] = fields_in_line[1].strip()
132 0                 if fields_in_line[0] in self._MANIFEST_ALL_FIELDS:
133 0                     base_manifest[fields_in_line[0]] = fields_in_line[1]
134 0             return base_manifest
135 0         except (Exception, OSError) as e:
136 0             raise SOLPackageException(
137                 "Error parsing {}: {}".format(
138                     base_manifest[SOLPackage._METADATA_MANIFEST_FIELD], e
139                 )
140             )
141
142 0     def _get_package_file_full_path(self, file_relative_path):
143 0         return os.path.join(self._package_path, file_relative_path)
144
145 0     def _parse_file_in_blocks(self, file_relative_path):
146 0         file_path = self._get_package_file_full_path(file_relative_path)
147 0         with open(file_path) as f:
148 0             blocks = f.read().split("\n\n")
149 0         parsed_blocks = map(yaml.safe_load, blocks)
150 0         return [block for block in parsed_blocks if block is not None]
151
152 0     def _get_package_file_manifest_data(self, file_relative_path):
153 0         for file_data in self._manifest_data:
154 0             if (
155                 file_data.get(SOLPackage._MANIFEST_FILE_PATH_FIELD, "")
156                 == file_relative_path
157             ):
158 0                 return file_data
159
160 0         error_msg = (
161             "Error parsing {} manifest data: file not found on manifest file".format(
162                 file_relative_path
163             )
164         )
165 0         raise SOLPackageException(error_msg)
166
167 0     def get_package_file_hash_digest_from_manifest(self, file_relative_path):
168         """Returns the hash digest of a file inside this package as specified on the manifest file."""
169 0         file_manifest_data = self._get_package_file_manifest_data(file_relative_path)
170 0         try:
171 0             return file_manifest_data[SOLPackage._MANIFEST_FILE_HASH_DIGEST_FIELD]
172 0         except Exception as e:
173 0             raise SOLPackageException(
174                 "Error parsing {} hash digest: {}".format(file_relative_path, e)
175             )
176
177 0     def get_package_file_hash_algorithm_from_manifest(self, file_relative_path):
178         """Returns the hash algorithm of a file inside this package as specified on the manifest file."""
179 0         file_manifest_data = self._get_package_file_manifest_data(file_relative_path)
180 0         try:
181 0             return file_manifest_data[SOLPackage._MANIFEST_FILE_HASH_ALGORITHM_FIELD]
182 0         except Exception as e:
183 0             raise SOLPackageException(
184                 "Error parsing {} hash digest: {}".format(file_relative_path, e)
185             )
186
187 0     @staticmethod
188 0     def _get_hash_function_from_hash_algorithm(hash_algorithm):
189 0         function_to_algorithm = {"SHA-256": hashlib.sha256, "SHA-512": hashlib.sha512}
190 0         if hash_algorithm not in function_to_algorithm:
191 0             error_msg = (
192                 "Error checking hash function: hash algorithm {} not supported".format(
193                     hash_algorithm
194                 )
195             )
196 0             raise SOLPackageException(error_msg)
197 0         return function_to_algorithm[hash_algorithm]
198
199 0     def _calculate_file_hash(self, file_relative_path, hash_algorithm):
200 0         file_path = self._get_package_file_full_path(file_relative_path)
201 0         hash_function = self._get_hash_function_from_hash_algorithm(hash_algorithm)
202 0         try:
203 0             with open(file_path, "rb") as f:
204 0                 return hash_function(f.read()).hexdigest()
205 0         except Exception as e:
206 0             raise SOLPackageException(
207                 "Error hashing {}: {}".format(file_relative_path, e)
208             )
209
210 0     def validate_package_file_hash(self, file_relative_path):
211         """Validates the integrity of a file using the hash algorithm and digest on the package manifest."""
212 0         hash_algorithm = self.get_package_file_hash_algorithm_from_manifest(
213             file_relative_path
214         )
215 0         file_hash = self._calculate_file_hash(file_relative_path, hash_algorithm)
216 0         expected_file_hash = self.get_package_file_hash_digest_from_manifest(
217             file_relative_path
218         )
219 0         if file_hash != expected_file_hash:
220 0             error_msg = "Error validating {} hash: calculated hash {} is different than manifest hash {}"
221 0             raise SOLPackageException(
222                 error_msg.format(file_relative_path, file_hash, expected_file_hash)
223             )
224
225 0     def validate_package_hashes(self):
226         """Validates the integrity of all files listed on the package manifest."""
227 0         for file_data in self._manifest_data:
228 0             if SOLPackage._MANIFEST_FILE_PATH_FIELD in file_data:
229 0                 file_relative_path = file_data[SOLPackage._MANIFEST_FILE_PATH_FIELD]
230 0                 self.validate_package_file_hash(file_relative_path)
231
232 0     def create_or_update_metadata_file(self):
233         """
234         Creates or updates the metadata file with the hashes calculated for each one of the package's files
235         """
236 0         if not self._manifest_metadata:
237 0             self.generate_manifest_data_from_descriptor()
238
239 0         self.write_manifest_data_into_file()
240
241 0     def generate_manifest_data_from_descriptor(self):
242 0         pass
243
244 0     def write_manifest_data_into_file(self):
245 0         with open(self.get_manifest_location(), "w") as metadata_file:
246             # Write manifest metadata
247 0             for metadata_entry in self._manifest_metadata:
248 0                 metadata_file.write(
249                     "{}: {}\n".format(
250                         metadata_entry, self._manifest_metadata[metadata_entry]
251                     )
252                 )
253
254             # Write package's files hashes
255 0             file_hashes = {}
256 0             for root, dirs, files in os.walk(self._package_path):
257 0                 for a_file in files:
258 0                     file_path = os.path.join(root, a_file)
259 0                     file_relative_path = file_path[len(self._package_path) :]
260 0                     if file_relative_path.startswith("/"):
261 0                         file_relative_path = file_relative_path[1:]
262 0                     file_hashes[file_relative_path] = self._calculate_file_hash(
263                         file_relative_path, "SHA-512"
264                     )
265
266 0             for file, hash in file_hashes.items():
267 0                 file_block = "Source: {}\nAlgorithm: SHA-512\nHash: {}\n\n".format(
268                     file, hash
269                 )
270 0                 metadata_file.write(file_block)
271
272 0     def get_descriptor_location(self):
273         """Returns this package descriptor location as a relative path from the package root."""
274 0         for tosca_meta in self._package_metadata:
275 0             if SOLPackage._METADATA_DESCRIPTOR_FIELD in tosca_meta:
276 0                 return tosca_meta[SOLPackage._METADATA_DESCRIPTOR_FIELD]
277
278 0         error_msg = "Error: no {} entry found on {}".format(
279             SOLPackage._METADATA_DESCRIPTOR_FIELD, SOLPackage._METADATA_FILE_PATH
280         )
281 0         raise SOLPackageException(error_msg)
282
283 0     def get_manifest_location(self):
284         """Return the VNF/NS manifest location as a relative path from the package root."""
285 0         for tosca_meta in self._package_metadata:
286 0             if SOLPackage._METADATA_MANIFEST_FIELD in tosca_meta:
287 0                 return tosca_meta[SOLPackage._METADATA_MANIFEST_FIELD]
288
289 0         raise SOLPackageException("No manifest file defined for this package")