c57c4a0f564b51a30f8d995c77bfdc75ec1058a4
4 # Copyright 2016 RIFT.IO Inc
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
32 #Setting RIFT_VAR_ROOT if not already set for unit test execution
33 if "RIFT_VAR_ROOT" not in os
.environ
:
34 os
.environ
['RIFT_VAR_ROOT'] = os
.path
.join(os
.environ
['RIFT_INSTALL'], 'var/rift/unittest')
36 import rift
.package
.archive
37 import rift
.package
.package
38 import rift
.package
.icon
39 import rift
.package
.script
40 import rift
.package
.store
41 import rift
.package
.checksums
42 import rift
.package
.cloud_init
45 nsd_yaml
= b
"""nsd:nsd-catalog:
49 nsd:description: Gateways to access as corpA to PE1 and PE2
52 vnfd_yaml
= b
"""vnfd:vnfd-catalog:
54 - vnfd:id: gw_corpA_vnfd
55 vnfd:name: gw_corpA_vnfd
56 vnfd:description: Gateways to access as corpA to PE1 and PE2
59 nsd_filename
= "gw_corpA__nsd.yaml"
60 vnfd_filename
= "gw_corpA__vnfd.yaml"
63 def file_hdl_md5(file_hdl
):
64 return rift
.package
.checksums
.checksum(file_hdl
)
67 class ArchiveTestCase(unittest
.TestCase
):
69 self
._log
= logging
.getLogger()
71 self
._tar
_file
_hdl
= io
.BytesIO()
72 self
._tar
= tarfile
.open(fileobj
=self
._tar
_file
_hdl
, mode
="w|gz")
74 self
._nsd
_yaml
_hdl
= io
.BytesIO(nsd_yaml
)
75 self
._vnfd
_yaml
_hdl
= io
.BytesIO(vnfd_yaml
)
78 self
._nsd
_yaml
_hdl
.close()
79 self
._vnfd
_yaml
_hdl
.close()
81 self
._tar
_file
_hdl
.close()
83 def create_tar_package_archive(self
):
85 self
._tar
_file
_hdl
.flush()
86 self
._tar
_file
_hdl
.seek(0)
87 archive
= rift
.package
.package
.TarPackageArchive(
89 tar_file_hdl
=self
._tar
_file
_hdl
,
94 def add_tarinfo(self
, name
, file_hdl
, mode
=0o777):
95 tarinfo
= tarfile
.TarInfo(name
)
96 tarinfo
.size
= len(file_hdl
.read())
97 assert tarinfo
.size
> 0
99 self
._tar
.addfile(tarinfo
, file_hdl
)
101 def add_tarinfo_dir(self
, name
):
102 tarinfo
= tarfile
.TarInfo(name
)
103 tarinfo
.type = tarfile
.DIRTYPE
104 self
._tar
.addfile(tarinfo
)
106 def add_nsd_yaml(self
):
107 self
.add_tarinfo(nsd_filename
, io
.BytesIO(nsd_yaml
))
109 def add_vnfd_yaml(self
):
110 self
.add_tarinfo(vnfd_filename
, io
.BytesIO(vnfd_yaml
))
113 class PackageTestCase(ArchiveTestCase
):
114 def create_nsd_package(self
):
116 archive
= self
.create_tar_package_archive()
117 package
= archive
.create_package()
121 def create_vnfd_package(self
):
123 archive
= self
.create_tar_package_archive()
124 package
= archive
.create_package()
129 class TestCreateArchive(ArchiveTestCase
):
130 def test_create_tar_archive(self
):
132 archive
= self
.create_tar_package_archive()
133 self
.assertEquals(set(archive
.filenames
), {nsd_filename}
)
135 def test_nsd_tar_archive(self
):
136 #Write the NSD YAML to the tar file
139 archive
= self
.create_tar_package_archive()
140 with archive
.open_file(nsd_filename
) as nsd_hdl
:
141 nsd_bytes
= nsd_hdl
.read()
143 self
.assertEquals(nsd_bytes
, nsd_yaml
)
146 class TestPackage(PackageTestCase
):
147 def create_vnfd_package_archive(self
, package
, hdl
):
148 # Create an archive from a package
149 archive
= rift
.package
.archive
.TarPackageArchive
.from_package(
150 self
._log
, package
, hdl
,
152 # Closing the archive writes any closing bytes to the file handle
158 def test_create_nsd_package_from_archive(self
):
159 package
= self
.create_nsd_package()
160 self
.assertTrue(isinstance(package
, rift
.package
.package
.NsdPackage
))
162 json_str
= package
.json_descriptor
163 desc_dict
= json
.loads(json_str
)
164 self
.assertIn("nsd:nsd-catalog", desc_dict
)
166 def test_create_vnfd_package_from_archive(self
):
167 package
= self
.create_vnfd_package()
168 self
.assertTrue(isinstance(package
, rift
.package
.package
.VnfdPackage
))
170 json_str
= package
.json_descriptor
171 desc_dict
= json
.loads(json_str
)
172 self
.assertIn("vnfd:vnfd-catalog", desc_dict
)
174 def test_create_vnfd_archive_from_package(self
):
175 package
= self
.create_vnfd_package()
177 self
.create_vnfd_package_archive(package
, hdl
)
179 # Ensure that the archive created was valid
180 with tarfile
.open(fileobj
=hdl
, mode
='r|gz'):
183 def test_round_trip_vnfd_package_from_archive(self
):
184 package
= self
.create_vnfd_package()
186 self
.create_vnfd_package_archive(package
, hdl
)
188 archive
= rift
.package
.archive
.TarPackageArchive(self
._log
, hdl
)
190 return rift
.package
.checksums
.checksum(file_hdl
)
192 # Create the package from the archive and validate file checksums and modes
193 new_package
= archive
.create_package()
195 self
.assertEqual(package
.files
, new_package
.files
)
196 self
.assertEqual(type(package
), type(new_package
))
198 for filename
in package
.files
:
199 pkg_file
= package
.open(filename
)
200 new_pkg_file
= new_package
.open(filename
)
201 self
.assertEqual(md5(pkg_file
), md5(new_pkg_file
))
203 def test_create_nsd_package_from_file(self
):
204 nsd_file_name
= "asdf_nsd.yaml"
205 hdl
= io
.BytesIO(nsd_yaml
)
206 hdl
.name
= nsd_file_name
208 package
= rift
.package
.package
.DescriptorPackage
.from_descriptor_file_hdl(
211 self
.assertTrue(isinstance(package
, rift
.package
.package
.NsdPackage
))
213 with package
.open(nsd_file_name
) as nsd_hdl
:
214 nsd_data
= nsd_hdl
.read()
215 self
.assertEquals(yaml
.load(nsd_data
), yaml
.load(nsd_yaml
))
217 def test_create_vnfd_package_from_file(self
):
218 vnfd_file_name
= "asdf_vnfd.yaml"
219 hdl
= io
.BytesIO(vnfd_yaml
)
220 hdl
.name
= vnfd_file_name
222 package
= rift
.package
.package
.DescriptorPackage
.from_descriptor_file_hdl(
225 self
.assertTrue(isinstance(package
, rift
.package
.package
.VnfdPackage
))
227 with package
.open(vnfd_file_name
) as vnfd_hdl
:
228 vnfd_data
= vnfd_hdl
.read()
229 self
.assertEquals(yaml
.load(vnfd_data
), yaml
.load(vnfd_yaml
))
233 class TestPackageIconExtractor(PackageTestCase
):
234 def add_icon_file(self
, icon_name
):
235 icon_file
= "icons/{}".format(icon_name
)
236 icon_text
= b
"png file bytes"
237 self
.add_tarinfo(icon_file
, io
.BytesIO(icon_text
))
239 def test_extract_icon(self
):
241 self
.add_icon_file(icon_name
)
242 package
= self
.create_vnfd_package()
243 with tempfile
.TemporaryDirectory() as tmp_dir
:
244 extractor
= rift
.package
.icon
.PackageIconExtractor(self
._log
, tmp_dir
)
245 extractor
.extract_icons(package
)
247 icon_file
= extractor
.get_extracted_icon_path(
248 package
.descriptor_type
, package
.descriptor_id
, icon_name
250 self
.assertTrue(os
.path
.exists(icon_file
))
251 self
.assertTrue(os
.path
.isfile(icon_file
))
254 class TestPackageScriptExtractor(PackageTestCase
):
255 def add_script_file(self
, script_name
):
256 script_file
= "scripts/{}".format(script_name
)
257 script_text
= b
"""#!/usr/bin/python
260 self
.add_tarinfo(script_file
, io
.BytesIO(script_text
), mode
=0o666)
262 def test_extract_script(self
):
263 script_name
= "add_corporation.py"
264 self
.add_script_file(script_name
)
265 package
= self
.create_vnfd_package()
266 with tempfile
.TemporaryDirectory() as tmp_dir
:
267 extractor
= rift
.package
.script
.PackageScriptExtractor(self
._log
, tmp_dir
)
268 extractor
.extract_scripts(package
)
270 script_dir
= extractor
.get_extracted_script_path(package
.descriptor_id
, script_name
)
271 self
.assertTrue(os
.path
.exists(script_dir
))
272 self
.assertTrue(os
.path
.isfile(script_dir
))
274 class TestPackageCloudInitExtractor(PackageTestCase
):
275 def add_cloud_init_file(self
, cloud_init_filename
):
276 script_file
= "cloud_init/{}".format(cloud_init_filename
)
277 script_text
= b
"""#cloud-config"""
278 self
.add_tarinfo(script_file
, io
.BytesIO(script_text
), mode
=0o666)
280 def test_read_cloud_init(self
):
281 script_name
= "testVM_cloud_init.cfg"
282 valid_script_text
= "#cloud-config"
283 self
.add_cloud_init_file(script_name
)
284 package
= self
.create_vnfd_package()
286 extractor
= rift
.package
.cloud_init
.PackageCloudInitExtractor(self
._log
)
287 cloud_init_contents
= extractor
.read_script(package
, script_name
)
289 self
.assertEquals(cloud_init_contents
, valid_script_text
)
291 def test_cloud_init_file_missing(self
):
292 script_name
= "testVM_cloud_init.cfg"
293 package
= self
.create_vnfd_package()
295 extractor
= rift
.package
.cloud_init
.PackageCloudInitExtractor(self
._log
)
297 with self
.assertRaises(rift
.package
.cloud_init
.CloudInitExtractionError
):
298 extractor
.read_script(package
, script_name
)
300 class TestPackageValidator(PackageTestCase
):
303 self
._validator
= rift
.package
.package
.PackageChecksumValidator(self
._log
)
305 def create_checksum_file(self
, file_md5_map
):
306 checksum_hdl
= io
.BytesIO()
307 for file_name
, md5
in file_md5_map
.items():
308 checksum_hdl
.write("{} {}\n".format(md5
, file_name
).encode())
313 self
.add_tarinfo("checksums.txt", checksum_hdl
)
314 self
._tar
.addfile(tarfile
.TarInfo(), checksum_hdl
)
316 def create_nsd_package_with_checksum(self
):
317 self
.create_checksum_file(
318 {nsd_filename
: file_hdl_md5(io
.BytesIO(nsd_yaml
))}
320 package
= self
.create_nsd_package()
323 def test_package_no_checksum(self
):
324 package
= self
.create_nsd_package()
326 # For now, a missing checksum file will be supported.
327 # No files will be validated.
328 self
._validator
.validate(package
)
329 validated_files
= self
._validator
.checksums
330 self
.assertEquals(validated_files
, {})
332 def test_package_with_checksum(self
):
333 package
= self
.create_nsd_package_with_checksum()
334 self
._validator
.validate(package
)
335 validated_files
= self
._validator
.checksums
336 self
.assertEquals(list(validated_files
.keys()), [nsd_filename
])
339 class TestPackageStore(PackageTestCase
):
340 def create_store(self
, root_dir
):
341 store
= rift
.package
.store
.PackageFilesystemStore(self
._log
, root_dir
)
344 def create_and_store_package(self
, store
):
345 package
= self
.create_nsd_package()
346 store
.store_package(package
)
350 def test_store_package(self
):
351 with tempfile
.TemporaryDirectory() as root_dir
:
352 store
= self
.create_store(root_dir
)
353 package
= self
.create_and_store_package(store
)
354 new_package
= store
.get_package(package
.descriptor_id
)
355 self
.assertEquals(new_package
.files
, package
.files
)
356 self
.assertEquals(type(new_package
), type(package
))
358 def test_store_reload_package(self
):
359 with tempfile
.TemporaryDirectory() as root_dir
:
360 store
= self
.create_store(root_dir
)
361 package
= self
.create_and_store_package(store
)
363 new_store
= self
.create_store(root_dir
)
364 new_package
= new_store
.get_package(package
.descriptor_id
)
366 self
.assertEquals(new_package
.files
, package
.files
)
367 self
.assertEquals(type(new_package
), type(package
))
369 def test_delete_package(self
):
370 with tempfile
.TemporaryDirectory() as root_dir
:
371 store
= self
.create_store(root_dir
)
372 package
= self
.create_and_store_package(store
)
374 store
.get_package(package
.descriptor_id
)
375 store
.delete_package(package
.descriptor_id
)
377 with self
.assertRaises(rift
.package
.store
.PackageStoreError
):
378 store
.get_package(package
.descriptor_id
)
380 def test_store_exist_package(self
):
381 with tempfile
.TemporaryDirectory() as root_dir
:
382 store
= self
.create_store(root_dir
)
383 package
= self
.create_and_store_package(store
)
385 with self
.assertRaises(rift
.package
.store
.PackageStoreError
):
386 store
.store_package(package
)
389 class TestTemporaryPackage(PackageTestCase
):
390 def test_temp_package(self
):
391 self
._tar
_file
_hdl
= tempfile
.NamedTemporaryFile(delete
=False)
392 self
._tar
= tarfile
.open(fileobj
=self
._tar
_file
_hdl
, mode
="w|gz")
394 self
.assertTrue(os
.path
.exists(self
._tar
_file
_hdl
.name
))
396 package
= self
.create_nsd_package()
397 with rift
.package
.package
.TemporaryPackage(self
._log
, package
, self
._tar
_file
_hdl
) as temp_pkg
:
398 self
.assertTrue(package
is temp_pkg
)
399 self
.assertEquals(package
.files
, temp_pkg
.files
)
401 self
.assertFalse(os
.path
.exists(self
._tar
_file
_hdl
.name
))
404 def main(argv
=sys
.argv
[1:]):
405 logging
.basicConfig(format
='TEST %(message)s')
407 runner
= xmlrunner
.XMLTestRunner(output
=os
.environ
["RIFT_MODULE_TEST"])
408 parser
= argparse
.ArgumentParser()
409 parser
.add_argument('-v', '--verbose', action
='store_true')
410 parser
.add_argument('-n', '--no-runner', action
='store_true')
412 args
, unknown
= parser
.parse_known_args(argv
)
416 # Set the global logging level
417 logging
.getLogger().setLevel(logging
.DEBUG
if args
.verbose
else logging
.ERROR
)
419 # The unittest framework requires a program name, so use the name of this
420 # file instead (we do not want to have to pass a fake program name to main
421 # when this is called from the interpreter).
422 unittest
.main(argv
=[__file__
] + unknown
+ ["-v"], testRunner
=runner
)
424 if __name__
== '__main__':