Coverage for osmclient/common/sol_package.py: 24%

158 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2024-06-22 09:01 +0000

1# Copyright ETSI Contributors and Others. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

12# implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15 

16import os 

17import yaml 

18import hashlib 

19 

20 

21class SOLPackageException(Exception): 

22 pass 

23 

24 

25class SOLPackage: 

26 _METADATA_FILE_PATH = "TOSCA-Metadata/TOSCA.meta" 

27 _METADATA_DESCRIPTOR_FIELD = "Entry-Definitions" 

28 _METADATA_MANIFEST_FIELD = "ETSI-Entry-Manifest" 

29 _METADATA_CHANGELOG_FIELD = "ETSI-Entry-Change-Log" 

30 _METADATA_LICENSES_FIELD = "ETSI-Entry-Licenses" 

31 _METADATA_DEFAULT_CHANGELOG_PATH = "ChangeLog.txt" 

32 _METADATA_DEFAULT_LICENSES_PATH = "Licenses" 

33 _MANIFEST_FILE_PATH_FIELD = "Source" 

34 _MANIFEST_FILE_HASH_ALGORITHM_FIELD = "Algorithm" 

35 _MANIFEST_FILE_HASH_DIGEST_FIELD = "Hash" 

36 

37 _MANIFEST_ALL_FIELDS = [] 

38 

39 def __init__(self, package_path=""): 

40 self._package_path = package_path 

41 

42 self._package_metadata = self._parse_package_metadata() 

43 

44 try: 

45 self._manifest_data = self._parse_manifest_data() 

46 except Exception: 

47 self._manifest_data = None 

48 

49 try: 

50 self._manifest_metadata = self._parse_manifest_metadata() 

51 except Exception: 

52 self._manifest_metadata = None 

53 

54 def _parse_package_metadata(self): 

55 try: 

56 return self._parse_package_metadata_with_metadata_dir() 

57 except FileNotFoundError: 

58 return self._parse_package_metadata_without_metadata_dir() 

59 

60 def _parse_package_metadata_with_metadata_dir(self): 

61 try: 

62 return self._parse_file_in_blocks(self._METADATA_FILE_PATH) 

63 except FileNotFoundError as e: 

64 raise e 

65 except (Exception, OSError) as e: 

66 raise SOLPackageException( 

67 "Error parsing {}: {}".format(self._METADATA_FILE_PATH, e) 

68 ) 

69 

70 def _parse_package_metadata_without_metadata_dir(self): 

71 package_root_files = {f for f in os.listdir(self._package_path)} 

72 package_root_yamls = [ 

73 f for f in package_root_files if f.endswith(".yml") or f.endswith(".yaml") 

74 ] 

75 if len(package_root_yamls) != 1: 

76 error_msg = "Error parsing package metadata: there should be exactly 1 descriptor YAML, found {}" 

77 raise SOLPackageException(error_msg.format(len(package_root_yamls))) 

78 

79 base_manifest = [ 

80 { 

81 SOLPackage._METADATA_DESCRIPTOR_FIELD: package_root_yamls[0], 

82 SOLPackage._METADATA_MANIFEST_FIELD: "{}.mf".format( 

83 os.path.splitext(package_root_yamls[0])[0] 

84 ), 

85 SOLPackage._METADATA_CHANGELOG_FIELD: SOLPackage._METADATA_DEFAULT_CHANGELOG_PATH, 

86 SOLPackage._METADATA_LICENSES_FIELD: SOLPackage._METADATA_DEFAULT_LICENSES_PATH, 

87 } 

88 ] 

89 

90 return base_manifest 

91 

92 def _parse_manifest_data(self): 

93 manifest_path = None 

94 for tosca_meta in self._package_metadata: 

95 if SOLPackage._METADATA_MANIFEST_FIELD in tosca_meta: 

96 manifest_path = tosca_meta[SOLPackage._METADATA_MANIFEST_FIELD] 

97 break 

98 else: 

99 error_msg = "Error parsing {}: no {} field on path".format( 

100 self._METADATA_FILE_PATH, self._METADATA_MANIFEST_FIELD 

101 ) 

102 raise SOLPackageException(error_msg) 

103 

104 try: 

105 return self._parse_file_in_blocks(manifest_path) 

106 

107 except (Exception, OSError) as e: 

108 raise SOLPackageException("Error parsing {}: {}".format(manifest_path, e)) 

109 

110 def _parse_manifest_metadata(self): 

111 try: 

112 base_manifest = {} 

113 manifest_file = os.open( 

114 os.path.join( 

115 self._package_path, 

116 base_manifest[SOLPackage._METADATA_MANIFEST_FIELD], 

117 ), 

118 "rw", 

119 ) 

120 for line in manifest_file: 

121 fields_in_line = line.split(":", maxsplit=1) 

122 fields_in_line[0] = fields_in_line[0].strip() 

123 fields_in_line[1] = fields_in_line[1].strip() 

124 if fields_in_line[0] in self._MANIFEST_ALL_FIELDS: 

125 base_manifest[fields_in_line[0]] = fields_in_line[1] 

126 return base_manifest 

127 except (Exception, OSError) as e: 

128 raise SOLPackageException( 

129 "Error parsing {}: {}".format( 

130 base_manifest[SOLPackage._METADATA_MANIFEST_FIELD], e 

131 ) 

132 ) 

133 

134 def _get_package_file_full_path(self, file_relative_path): 

135 return os.path.join(self._package_path, file_relative_path) 

136 

137 def _parse_file_in_blocks(self, file_relative_path): 

138 file_path = self._get_package_file_full_path(file_relative_path) 

139 with open(file_path) as f: 

140 blocks = f.read().split("\n\n") 

141 parsed_blocks = map(yaml.safe_load, blocks) 

142 return [block for block in parsed_blocks if block is not None] 

143 

144 def _get_package_file_manifest_data(self, file_relative_path): 

145 for file_data in self._manifest_data: 

146 if ( 

147 file_data.get(SOLPackage._MANIFEST_FILE_PATH_FIELD, "") 

148 == file_relative_path 

149 ): 

150 return file_data 

151 

152 error_msg = ( 

153 "Error parsing {} manifest data: file not found on manifest file".format( 

154 file_relative_path 

155 ) 

156 ) 

157 raise SOLPackageException(error_msg) 

158 

159 def get_package_file_hash_digest_from_manifest(self, file_relative_path): 

160 """Returns the hash digest of a file inside this package as specified on the manifest file.""" 

161 file_manifest_data = self._get_package_file_manifest_data(file_relative_path) 

162 try: 

163 return file_manifest_data[SOLPackage._MANIFEST_FILE_HASH_DIGEST_FIELD] 

164 except Exception as e: 

165 raise SOLPackageException( 

166 "Error parsing {} hash digest: {}".format(file_relative_path, e) 

167 ) 

168 

169 def get_package_file_hash_algorithm_from_manifest(self, file_relative_path): 

170 """Returns the hash algorithm of a file inside this package as specified on the manifest file.""" 

171 file_manifest_data = self._get_package_file_manifest_data(file_relative_path) 

172 try: 

173 return file_manifest_data[SOLPackage._MANIFEST_FILE_HASH_ALGORITHM_FIELD] 

174 except Exception as e: 

175 raise SOLPackageException( 

176 "Error parsing {} hash digest: {}".format(file_relative_path, e) 

177 ) 

178 

179 @staticmethod 

180 def _get_hash_function_from_hash_algorithm(hash_algorithm): 

181 function_to_algorithm = {"SHA-256": hashlib.sha256, "SHA-512": hashlib.sha512} 

182 if hash_algorithm not in function_to_algorithm: 

183 error_msg = ( 

184 "Error checking hash function: hash algorithm {} not supported".format( 

185 hash_algorithm 

186 ) 

187 ) 

188 raise SOLPackageException(error_msg) 

189 return function_to_algorithm[hash_algorithm] 

190 

191 def _calculate_file_hash(self, file_relative_path, hash_algorithm): 

192 file_path = self._get_package_file_full_path(file_relative_path) 

193 hash_function = self._get_hash_function_from_hash_algorithm(hash_algorithm) 

194 try: 

195 with open(file_path, "rb") as f: 

196 return hash_function(f.read()).hexdigest() 

197 except Exception as e: 

198 raise SOLPackageException( 

199 "Error hashing {}: {}".format(file_relative_path, e) 

200 ) 

201 

202 def validate_package_file_hash(self, file_relative_path): 

203 """Validates the integrity of a file using the hash algorithm and digest on the package manifest.""" 

204 hash_algorithm = self.get_package_file_hash_algorithm_from_manifest( 

205 file_relative_path 

206 ) 

207 file_hash = self._calculate_file_hash(file_relative_path, hash_algorithm) 

208 expected_file_hash = self.get_package_file_hash_digest_from_manifest( 

209 file_relative_path 

210 ) 

211 if file_hash != expected_file_hash: 

212 error_msg = "Error validating {} hash: calculated hash {} is different than manifest hash {}" 

213 raise SOLPackageException( 

214 error_msg.format(file_relative_path, file_hash, expected_file_hash) 

215 ) 

216 

217 def validate_package_hashes(self): 

218 """Validates the integrity of all files listed on the package manifest.""" 

219 for file_data in self._manifest_data: 

220 if SOLPackage._MANIFEST_FILE_PATH_FIELD in file_data: 

221 file_relative_path = file_data[SOLPackage._MANIFEST_FILE_PATH_FIELD] 

222 self.validate_package_file_hash(file_relative_path) 

223 

224 def create_or_update_metadata_file(self): 

225 """ 

226 Creates or updates the metadata file with the hashes calculated for each one of the package's files 

227 """ 

228 if not self._manifest_metadata: 

229 self.generate_manifest_data_from_descriptor() 

230 

231 self.write_manifest_data_into_file() 

232 

233 def generate_manifest_data_from_descriptor(self): 

234 pass 

235 

236 def write_manifest_data_into_file(self): 

237 with open(self.get_manifest_location(), "w") as metadata_file: 

238 # Write manifest metadata 

239 for metadata_entry in self._manifest_metadata: 

240 metadata_file.write( 

241 "{}: {}\n".format( 

242 metadata_entry, self._manifest_metadata[metadata_entry] 

243 ) 

244 ) 

245 

246 # Write package's files hashes 

247 file_hashes = {} 

248 for root, dirs, files in os.walk(self._package_path): 

249 for a_file in files: 

250 file_path = os.path.join(root, a_file) 

251 file_relative_path = file_path[len(self._package_path) :] 

252 if file_relative_path.startswith("/"): 

253 file_relative_path = file_relative_path[1:] 

254 file_hashes[file_relative_path] = self._calculate_file_hash( 

255 file_relative_path, "SHA-512" 

256 ) 

257 

258 for file, hash in file_hashes.items(): 

259 file_block = "Source: {}\nAlgorithm: SHA-512\nHash: {}\n\n".format( 

260 file, hash 

261 ) 

262 metadata_file.write(file_block) 

263 

264 def get_descriptor_location(self): 

265 """Returns this package descriptor location as a relative path from the package root.""" 

266 for tosca_meta in self._package_metadata: 

267 if SOLPackage._METADATA_DESCRIPTOR_FIELD in tosca_meta: 

268 return tosca_meta[SOLPackage._METADATA_DESCRIPTOR_FIELD] 

269 

270 error_msg = "Error: no {} entry found on {}".format( 

271 SOLPackage._METADATA_DESCRIPTOR_FIELD, SOLPackage._METADATA_FILE_PATH 

272 ) 

273 raise SOLPackageException(error_msg) 

274 

275 def get_manifest_location(self): 

276 """Return the VNF/NS manifest location as a relative path from the package root.""" 

277 for tosca_meta in self._package_metadata: 

278 if SOLPackage._METADATA_MANIFEST_FIELD in tosca_meta: 

279 return tosca_meta[SOLPackage._METADATA_MANIFEST_FIELD] 

280 

281 raise SOLPackageException("No manifest file defined for this package")