Code Coverage

Cobertura Coverage Report > osmclient.common >

sol_package.py

Trend

File Coverage summary

NameClassesLinesConditionals
sol_package.py
100%
1/1
24%
38/158
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
sol_package.py
24%
38/158
N/A

Source

osmclient/common/sol_package.py
1 # Copyright ETSI Contributors and Others.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #    http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 1 import os
17 1 import yaml
18 1 import hashlib
19
20
21 1 class SOLPackageException(Exception):
22 1     pass
23
24
25 1 class SOLPackage:
26 1     _METADATA_FILE_PATH = "TOSCA-Metadata/TOSCA.meta"
27 1     _METADATA_DESCRIPTOR_FIELD = "Entry-Definitions"
28 1     _METADATA_MANIFEST_FIELD = "ETSI-Entry-Manifest"
29 1     _METADATA_CHANGELOG_FIELD = "ETSI-Entry-Change-Log"
30 1     _METADATA_LICENSES_FIELD = "ETSI-Entry-Licenses"
31 1     _METADATA_DEFAULT_CHANGELOG_PATH = "ChangeLog.txt"
32 1     _METADATA_DEFAULT_LICENSES_PATH = "Licenses"
33 1     _MANIFEST_FILE_PATH_FIELD = "Source"
34 1     _MANIFEST_FILE_HASH_ALGORITHM_FIELD = "Algorithm"
35 1     _MANIFEST_FILE_HASH_DIGEST_FIELD = "Hash"
36
37 1     _MANIFEST_ALL_FIELDS = []
38
39 1     def __init__(self, package_path=""):
40 0         self._package_path = package_path
41
42 0         self._package_metadata = self._parse_package_metadata()
43
44 0         try:
45 0             self._manifest_data = self._parse_manifest_data()
46 0         except Exception:
47 0             self._manifest_data = None
48
49 0         try:
50 0             self._manifest_metadata = self._parse_manifest_metadata()
51 0         except Exception:
52 0             self._manifest_metadata = None
53
54 1     def _parse_package_metadata(self):
55 0         try:
56 0             return self._parse_package_metadata_with_metadata_dir()
57 0         except FileNotFoundError:
58 0             return self._parse_package_metadata_without_metadata_dir()
59
60 1     def _parse_package_metadata_with_metadata_dir(self):
61 0         try:
62 0             return self._parse_file_in_blocks(self._METADATA_FILE_PATH)
63 0         except FileNotFoundError as e:
64 0             raise e
65 0         except (Exception, OSError) as e:
66 0             raise SOLPackageException(
67                 "Error parsing {}: {}".format(self._METADATA_FILE_PATH, e)
68             )
69
70 1     def _parse_package_metadata_without_metadata_dir(self):
71 0         package_root_files = {f for f in os.listdir(self._package_path)}
72 0         package_root_yamls = [
73             f for f in package_root_files if f.endswith(".yml") or f.endswith(".yaml")
74         ]
75 0         if len(package_root_yamls) != 1:
76 0             error_msg = "Error parsing package metadata: there should be exactly 1 descriptor YAML, found {}"
77 0             raise SOLPackageException(error_msg.format(len(package_root_yamls)))
78
79 0         base_manifest = [
80             {
81                 SOLPackage._METADATA_DESCRIPTOR_FIELD: package_root_yamls[0],
82                 SOLPackage._METADATA_MANIFEST_FIELD: "{}.mf".format(
83                     os.path.splitext(package_root_yamls[0])[0]
84                 ),
85                 SOLPackage._METADATA_CHANGELOG_FIELD: SOLPackage._METADATA_DEFAULT_CHANGELOG_PATH,
86                 SOLPackage._METADATA_LICENSES_FIELD: SOLPackage._METADATA_DEFAULT_LICENSES_PATH,
87             }
88         ]
89
90 0         return base_manifest
91
92 1     def _parse_manifest_data(self):
93 0         manifest_path = None
94 0         for tosca_meta in self._package_metadata:
95 0             if SOLPackage._METADATA_MANIFEST_FIELD in tosca_meta:
96 0                 manifest_path = tosca_meta[SOLPackage._METADATA_MANIFEST_FIELD]
97 0                 break
98         else:
99 0             error_msg = "Error parsing {}: no {} field on path".format(
100                 self._METADATA_FILE_PATH, self._METADATA_MANIFEST_FIELD
101             )
102 0             raise SOLPackageException(error_msg)
103
104 0         try:
105 0             return self._parse_file_in_blocks(manifest_path)
106
107 0         except (Exception, OSError) as e:
108 0             raise SOLPackageException("Error parsing {}: {}".format(manifest_path, e))
109
110 1     def _parse_manifest_metadata(self):
111 0         try:
112 0             base_manifest = {}
113 0             manifest_file = os.open(
114                 os.path.join(
115                     self._package_path,
116                     base_manifest[SOLPackage._METADATA_MANIFEST_FIELD],
117                 ),
118                 "rw",
119             )
120 0             for line in manifest_file:
121 0                 fields_in_line = line.split(":", maxsplit=1)
122 0                 fields_in_line[0] = fields_in_line[0].strip()
123 0                 fields_in_line[1] = fields_in_line[1].strip()
124 0                 if fields_in_line[0] in self._MANIFEST_ALL_FIELDS:
125 0                     base_manifest[fields_in_line[0]] = fields_in_line[1]
126 0             return base_manifest
127 0         except (Exception, OSError) as e:
128 0             raise SOLPackageException(
129                 "Error parsing {}: {}".format(
130                     base_manifest[SOLPackage._METADATA_MANIFEST_FIELD], e
131                 )
132             )
133
134 1     def _get_package_file_full_path(self, file_relative_path):
135 0         return os.path.join(self._package_path, file_relative_path)
136
137 1     def _parse_file_in_blocks(self, file_relative_path):
138 0         file_path = self._get_package_file_full_path(file_relative_path)
139 0         with open(file_path) as f:
140 0             blocks = f.read().split("\n\n")
141 0         parsed_blocks = map(yaml.safe_load, blocks)
142 0         return [block for block in parsed_blocks if block is not None]
143
144 1     def _get_package_file_manifest_data(self, file_relative_path):
145 0         for file_data in self._manifest_data:
146 0             if (
147                 file_data.get(SOLPackage._MANIFEST_FILE_PATH_FIELD, "")
148                 == file_relative_path
149             ):
150 0                 return file_data
151
152 0         error_msg = (
153             "Error parsing {} manifest data: file not found on manifest file".format(
154                 file_relative_path
155             )
156         )
157 0         raise SOLPackageException(error_msg)
158
159 1     def get_package_file_hash_digest_from_manifest(self, file_relative_path):
160         """Returns the hash digest of a file inside this package as specified on the manifest file."""
161 0         file_manifest_data = self._get_package_file_manifest_data(file_relative_path)
162 0         try:
163 0             return file_manifest_data[SOLPackage._MANIFEST_FILE_HASH_DIGEST_FIELD]
164 0         except Exception as e:
165 0             raise SOLPackageException(
166                 "Error parsing {} hash digest: {}".format(file_relative_path, e)
167             )
168
169 1     def get_package_file_hash_algorithm_from_manifest(self, file_relative_path):
170         """Returns the hash algorithm of a file inside this package as specified on the manifest file."""
171 0         file_manifest_data = self._get_package_file_manifest_data(file_relative_path)
172 0         try:
173 0             return file_manifest_data[SOLPackage._MANIFEST_FILE_HASH_ALGORITHM_FIELD]
174 0         except Exception as e:
175 0             raise SOLPackageException(
176                 "Error parsing {} hash digest: {}".format(file_relative_path, e)
177             )
178
179 1     @staticmethod
180 1     def _get_hash_function_from_hash_algorithm(hash_algorithm):
181 0         function_to_algorithm = {"SHA-256": hashlib.sha256, "SHA-512": hashlib.sha512}
182 0         if hash_algorithm not in function_to_algorithm:
183 0             error_msg = (
184                 "Error checking hash function: hash algorithm {} not supported".format(
185                     hash_algorithm
186                 )
187             )
188 0             raise SOLPackageException(error_msg)
189 0         return function_to_algorithm[hash_algorithm]
190
191 1     def _calculate_file_hash(self, file_relative_path, hash_algorithm):
192 0         file_path = self._get_package_file_full_path(file_relative_path)
193 0         hash_function = self._get_hash_function_from_hash_algorithm(hash_algorithm)
194 0         try:
195 0             with open(file_path, "rb") as f:
196 0                 return hash_function(f.read()).hexdigest()
197 0         except Exception as e:
198 0             raise SOLPackageException(
199                 "Error hashing {}: {}".format(file_relative_path, e)
200             )
201
202 1     def validate_package_file_hash(self, file_relative_path):
203         """Validates the integrity of a file using the hash algorithm and digest on the package manifest."""
204 0         hash_algorithm = self.get_package_file_hash_algorithm_from_manifest(
205             file_relative_path
206         )
207 0         file_hash = self._calculate_file_hash(file_relative_path, hash_algorithm)
208 0         expected_file_hash = self.get_package_file_hash_digest_from_manifest(
209             file_relative_path
210         )
211 0         if file_hash != expected_file_hash:
212 0             error_msg = "Error validating {} hash: calculated hash {} is different than manifest hash {}"
213 0             raise SOLPackageException(
214                 error_msg.format(file_relative_path, file_hash, expected_file_hash)
215             )
216
217 1     def validate_package_hashes(self):
218         """Validates the integrity of all files listed on the package manifest."""
219 0         for file_data in self._manifest_data:
220 0             if SOLPackage._MANIFEST_FILE_PATH_FIELD in file_data:
221 0                 file_relative_path = file_data[SOLPackage._MANIFEST_FILE_PATH_FIELD]
222 0                 self.validate_package_file_hash(file_relative_path)
223
224 1     def create_or_update_metadata_file(self):
225         """
226         Creates or updates the metadata file with the hashes calculated for each one of the package's files
227         """
228 0         if not self._manifest_metadata:
229 0             self.generate_manifest_data_from_descriptor()
230
231 0         self.write_manifest_data_into_file()
232
233 1     def generate_manifest_data_from_descriptor(self):
234 0         pass
235
236 1     def write_manifest_data_into_file(self):
237 0         with open(self.get_manifest_location(), "w") as metadata_file:
238             # Write manifest metadata
239 0             for metadata_entry in self._manifest_metadata:
240 0                 metadata_file.write(
241                     "{}: {}\n".format(
242                         metadata_entry, self._manifest_metadata[metadata_entry]
243                     )
244                 )
245
246             # Write package's files hashes
247 0             file_hashes = {}
248 0             for root, dirs, files in os.walk(self._package_path):
249 0                 for a_file in files:
250 0                     file_path = os.path.join(root, a_file)
251 0                     file_relative_path = file_path[len(self._package_path) :]
252 0                     if file_relative_path.startswith("/"):
253 0                         file_relative_path = file_relative_path[1:]
254 0                     file_hashes[file_relative_path] = self._calculate_file_hash(
255                         file_relative_path, "SHA-512"
256                     )
257
258 0             for file, hash in file_hashes.items():
259 0                 file_block = "Source: {}\nAlgorithm: SHA-512\nHash: {}\n\n".format(
260                     file, hash
261                 )
262 0                 metadata_file.write(file_block)
263
264 1     def get_descriptor_location(self):
265         """Returns this package descriptor location as a relative path from the package root."""
266 0         for tosca_meta in self._package_metadata:
267 0             if SOLPackage._METADATA_DESCRIPTOR_FIELD in tosca_meta:
268 0                 return tosca_meta[SOLPackage._METADATA_DESCRIPTOR_FIELD]
269
270 0         error_msg = "Error: no {} entry found on {}".format(
271             SOLPackage._METADATA_DESCRIPTOR_FIELD, SOLPackage._METADATA_FILE_PATH
272         )
273 0         raise SOLPackageException(error_msg)
274
275 1     def get_manifest_location(self):
276         """Return the VNF/NS manifest location as a relative path from the package root."""
277 0         for tosca_meta in self._package_metadata:
278 0             if SOLPackage._METADATA_MANIFEST_FIELD in tosca_meta:
279 0                 return tosca_meta[SOLPackage._METADATA_MANIFEST_FIELD]
280
281 0         raise SOLPackageException("No manifest file defined for this package")