4 # Copyright 2016 RIFT.IO Inc
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
32 #Setting RIFT_VAR_ROOT if not already set for unit test execution
33 if "RIFT_VAR_ROOT" not in os
.environ
:
34 os
.environ
['RIFT_VAR_ROOT'] = os
.path
.join(os
.environ
['RIFT_INSTALL'], 'var/rift/unittest')
36 import rift
.package
.archive
37 import rift
.package
.package
38 import rift
.package
.charm
39 import rift
.package
.icon
40 import rift
.package
.script
41 import rift
.package
.config
42 import rift
.package
.store
43 import rift
.package
.checksums
44 import rift
.package
.cloud_init
48 gi
.require_version('RwpersonDbYang', '1.0')
49 gi
.require_version('RwYang', '1.0')
51 from gi
.repository
import (
57 nsd_yaml
= b
"""nsd:nsd-catalog:
61 nsd:description: Gateways to access as corpA to PE1 and PE2
64 vnfd_yaml
= b
"""vnfd:vnfd-catalog:
66 - vnfd:id: gw_corpA_vnfd
67 vnfd:name: gw_corpA_vnfd
68 vnfd:description: Gateways to access as corpA to PE1 and PE2
71 nsd_filename
= "gw_corpA__nsd.yaml"
72 vnfd_filename
= "gw_corpA__vnfd.yaml"
75 def file_hdl_md5(file_hdl
):
76 return rift
.package
.checksums
.checksum(file_hdl
)
79 class ArchiveTestCase(unittest
.TestCase
):
81 self
._log
= logging
.getLogger()
83 self
._tar
_file
_hdl
= io
.BytesIO()
84 self
._tar
= tarfile
.open(fileobj
=self
._tar
_file
_hdl
, mode
="w|gz")
86 self
._nsd
_yaml
_hdl
= io
.BytesIO(nsd_yaml
)
87 self
._vnfd
_yaml
_hdl
= io
.BytesIO(vnfd_yaml
)
90 self
._nsd
_yaml
_hdl
.close()
91 self
._vnfd
_yaml
_hdl
.close()
93 self
._tar
_file
_hdl
.close()
95 def create_tar_package_archive(self
):
97 self
._tar
_file
_hdl
.flush()
98 self
._tar
_file
_hdl
.seek(0)
99 archive
= rift
.package
.package
.TarPackageArchive(
101 tar_file_hdl
=self
._tar
_file
_hdl
,
106 def add_tarinfo(self
, name
, file_hdl
, mode
=0o777):
107 tarinfo
= tarfile
.TarInfo(name
)
108 tarinfo
.size
= len(file_hdl
.read())
109 assert tarinfo
.size
> 0
111 self
._tar
.addfile(tarinfo
, file_hdl
)
113 def add_tarinfo_dir(self
, name
):
114 tarinfo
= tarfile
.TarInfo(name
)
115 tarinfo
.type = tarfile
.DIRTYPE
116 self
._tar
.addfile(tarinfo
)
118 def add_nsd_yaml(self
):
119 self
.add_tarinfo(nsd_filename
, io
.BytesIO(nsd_yaml
))
121 def add_vnfd_yaml(self
):
122 self
.add_tarinfo(vnfd_filename
, io
.BytesIO(vnfd_yaml
))
125 class PackageTestCase(ArchiveTestCase
):
126 def create_nsd_package(self
):
128 archive
= self
.create_tar_package_archive()
129 package
= archive
.create_package()
133 def create_vnfd_package(self
):
135 archive
= self
.create_tar_package_archive()
136 package
= archive
.create_package()
141 class TestCreateArchive(ArchiveTestCase
):
142 def test_create_tar_archive(self
):
144 archive
= self
.create_tar_package_archive()
145 self
.assertEquals(set(archive
.filenames
), {nsd_filename}
)
147 def test_nsd_tar_archive(self
):
148 #Write the NSD YAML to the tar file
151 archive
= self
.create_tar_package_archive()
152 with archive
.open_file(nsd_filename
) as nsd_hdl
:
153 nsd_bytes
= nsd_hdl
.read()
155 self
.assertEquals(nsd_bytes
, nsd_yaml
)
158 class TestPackage(PackageTestCase
):
159 def create_vnfd_package_archive(self
, package
, hdl
):
160 # Create an archive from a package
161 archive
= rift
.package
.archive
.TarPackageArchive
.from_package(
162 self
._log
, package
, hdl
,
164 # Closing the archive writes any closing bytes to the file handle
170 def test_create_nsd_package_from_archive(self
):
171 package
= self
.create_nsd_package()
172 self
.assertTrue(isinstance(package
, rift
.package
.package
.NsdPackage
))
174 json_str
= package
.json_descriptor
175 desc_dict
= json
.loads(json_str
)
176 self
.assertIn("nsd:nsd-catalog", desc_dict
)
178 def test_create_vnfd_package_from_archive(self
):
179 package
= self
.create_vnfd_package()
180 self
.assertTrue(isinstance(package
, rift
.package
.package
.VnfdPackage
))
182 json_str
= package
.json_descriptor
183 desc_dict
= json
.loads(json_str
)
184 self
.assertIn("vnfd:vnfd-catalog", desc_dict
)
186 def test_create_vnfd_archive_from_package(self
):
187 package
= self
.create_vnfd_package()
189 self
.create_vnfd_package_archive(package
, hdl
)
191 # Ensure that the archive created was valid
192 with tarfile
.open(fileobj
=hdl
, mode
='r|gz'):
195 def test_round_trip_vnfd_package_from_archive(self
):
196 package
= self
.create_vnfd_package()
198 self
.create_vnfd_package_archive(package
, hdl
)
200 archive
= rift
.package
.archive
.TarPackageArchive(self
._log
, hdl
)
202 return rift
.package
.checksums
.checksum(file_hdl
)
204 # Create the package from the archive and validate file checksums and modes
205 new_package
= archive
.create_package()
207 self
.assertEqual(package
.files
, new_package
.files
)
208 self
.assertEqual(type(package
), type(new_package
))
210 for filename
in package
.files
:
211 pkg_file
= package
.open(filename
)
212 new_pkg_file
= new_package
.open(filename
)
213 self
.assertEqual(md5(pkg_file
), md5(new_pkg_file
))
215 def test_create_nsd_package_from_file(self
):
216 nsd_file_name
= "asdf_nsd.yaml"
217 hdl
= io
.BytesIO(nsd_yaml
)
218 hdl
.name
= nsd_file_name
220 package
= rift
.package
.package
.DescriptorPackage
.from_descriptor_file_hdl(
223 self
.assertTrue(isinstance(package
, rift
.package
.package
.NsdPackage
))
225 with package
.open(nsd_file_name
) as nsd_hdl
:
226 nsd_data
= nsd_hdl
.read()
227 self
.assertEquals(yaml
.load(nsd_data
), yaml
.load(nsd_yaml
))
229 def test_create_vnfd_package_from_file(self
):
230 vnfd_file_name
= "asdf_vnfd.yaml"
231 hdl
= io
.BytesIO(vnfd_yaml
)
232 hdl
.name
= vnfd_file_name
234 package
= rift
.package
.package
.DescriptorPackage
.from_descriptor_file_hdl(
237 self
.assertTrue(isinstance(package
, rift
.package
.package
.VnfdPackage
))
239 with package
.open(vnfd_file_name
) as vnfd_hdl
:
240 vnfd_data
= vnfd_hdl
.read()
241 self
.assertEquals(yaml
.load(vnfd_data
), yaml
.load(vnfd_yaml
))
244 class TestPackageCharmExtractor(PackageTestCase
):
245 def add_charm_dir(self
, charm_name
):
246 charm_dir
= "charms/trusty/{}".format(charm_name
)
247 charm_file
= "{}/actions.yaml".format(charm_dir
)
248 charm_text
= b
"THIS IS A FAKE CHARM"
249 self
.add_tarinfo_dir(charm_dir
)
250 self
.add_tarinfo(charm_file
, io
.BytesIO(charm_text
))
252 def test_extract_charm(self
):
253 charm_name
= "charm_a"
254 self
.add_charm_dir(charm_name
)
255 package
= self
.create_vnfd_package()
256 with tempfile
.TemporaryDirectory() as tmp_dir
:
257 extractor
= rift
.package
.charm
.PackageCharmExtractor(self
._log
, tmp_dir
)
258 extractor
.extract_charms(package
)
260 charm_dir
= extractor
.get_extracted_charm_dir(package
.descriptor_id
, charm_name
)
261 self
.assertTrue(os
.path
.exists(charm_dir
))
262 self
.assertTrue(os
.path
.isdir(charm_dir
))
265 class TestPackageIconExtractor(PackageTestCase
):
266 def add_icon_file(self
, icon_name
):
267 icon_file
= "icons/{}".format(icon_name
)
268 icon_text
= b
"png file bytes"
269 self
.add_tarinfo(icon_file
, io
.BytesIO(icon_text
))
271 def test_extract_icon(self
):
273 self
.add_icon_file(icon_name
)
274 package
= self
.create_vnfd_package()
275 with tempfile
.TemporaryDirectory() as tmp_dir
:
276 extractor
= rift
.package
.icon
.PackageIconExtractor(self
._log
, tmp_dir
)
277 extractor
.extract_icons(package
)
279 icon_file
= extractor
.get_extracted_icon_path(
280 package
.descriptor_type
, package
.descriptor_id
, icon_name
282 self
.assertTrue(os
.path
.exists(icon_file
))
283 self
.assertTrue(os
.path
.isfile(icon_file
))
286 class TestPackageScriptExtractor(PackageTestCase
):
287 def add_script_file(self
, script_name
):
288 script_file
= "scripts/{}".format(script_name
)
289 script_text
= b
"""#!/usr/bin/python
292 self
.add_tarinfo(script_file
, io
.BytesIO(script_text
), mode
=0o666)
294 def test_extract_script(self
):
295 script_name
= "add_corporation.py"
296 self
.add_script_file(script_name
)
297 package
= self
.create_vnfd_package()
298 with tempfile
.TemporaryDirectory() as tmp_dir
:
299 extractor
= rift
.package
.script
.PackageScriptExtractor(self
._log
, tmp_dir
)
300 extractor
.extract_scripts(package
)
302 script_dir
= extractor
.get_extracted_script_path(package
.descriptor_id
, script_name
)
303 self
.assertTrue(os
.path
.exists(script_dir
))
304 self
.assertTrue(os
.path
.isfile(script_dir
))
306 class TestPackageCloudInitExtractor(PackageTestCase
):
307 def add_cloud_init_file(self
, cloud_init_filename
):
308 script_file
= "cloud_init/{}".format(cloud_init_filename
)
309 script_text
= b
"""#cloud-config"""
310 self
.add_tarinfo(script_file
, io
.BytesIO(script_text
), mode
=0o666)
312 def test_read_cloud_init(self
):
313 script_name
= "testVM_cloud_init.cfg"
314 valid_script_text
= "#cloud-config"
315 self
.add_cloud_init_file(script_name
)
316 package
= self
.create_vnfd_package()
318 extractor
= rift
.package
.cloud_init
.PackageCloudInitExtractor(self
._log
)
319 cloud_init_contents
= extractor
.read_script(package
, script_name
)
321 self
.assertEquals(cloud_init_contents
, valid_script_text
)
323 def test_cloud_init_file_missing(self
):
324 script_name
= "testVM_cloud_init.cfg"
325 package
= self
.create_vnfd_package()
327 extractor
= rift
.package
.cloud_init
.PackageCloudInitExtractor(self
._log
)
329 with self
.assertRaises(rift
.package
.cloud_init
.CloudInitExtractionError
):
330 extractor
.read_script(package
, script_name
)
332 class TestPackageConfigExtractor(PackageTestCase
):
333 def add_ns_config_file(self
, nsd_id
):
334 config_file
= "ns_config/{}.yaml".format(nsd_id
)
335 config_text
= b
""" ns_config """
336 self
.add_tarinfo(config_file
, io
.BytesIO(config_text
), mode
=0o666)
340 def add_vnf_config_file(self
, vnfd_id
, member_vnf_index
):
341 config_file
= "vnf_config/{}_{}.yaml".format(vnfd_id
, member_vnf_index
)
342 config_text
= b
""" vnf_config """
343 self
.add_tarinfo(config_file
, io
.BytesIO(config_text
), mode
=0o666)
347 def test_extract_config(self
):
348 ns_config_file
= self
.add_ns_config_file("nsd_id")
349 vnf_config_file
= self
.add_vnf_config_file("vnfd_id", 1)
350 package
= self
.create_nsd_package()
351 with tempfile
.TemporaryDirectory() as tmp_dir
:
352 extractor
= rift
.package
.config
.PackageConfigExtractor(self
._log
, tmp_dir
)
353 extractor
.extract_configs(package
)
355 dest_ns_config_file
= extractor
.get_extracted_config_path(package
.descriptor_id
, ns_config_file
)
356 dest_vnf_config_file
= extractor
.get_extracted_config_path(package
.descriptor_id
, vnf_config_file
)
357 self
.assertTrue(os
.path
.isfile(dest_ns_config_file
))
358 self
.assertTrue(os
.path
.isfile(dest_vnf_config_file
))
361 class TestPackageValidator(PackageTestCase
):
364 self
._validator
= rift
.package
.package
.PackageChecksumValidator(self
._log
)
366 def create_checksum_file(self
, file_md5_map
):
367 checksum_hdl
= io
.BytesIO()
368 for file_name
, md5
in file_md5_map
.items():
369 checksum_hdl
.write("{} {}\n".format(md5
, file_name
).encode())
374 self
.add_tarinfo("checksums.txt", checksum_hdl
)
375 self
._tar
.addfile(tarfile
.TarInfo(), checksum_hdl
)
377 def create_nsd_package_with_checksum(self
):
378 self
.create_checksum_file(
379 {nsd_filename
: file_hdl_md5(io
.BytesIO(nsd_yaml
))}
381 package
= self
.create_nsd_package()
384 def test_package_no_checksum(self
):
385 package
= self
.create_nsd_package()
387 # For now, a missing checksum file will be supported.
388 # No files will be validated.
389 validated_files
= self
._validator
.validate(package
)
390 self
.assertEquals(validated_files
, {})
392 def test_package_with_checksum(self
):
393 package
= self
.create_nsd_package_with_checksum()
394 validated_files
= self
._validator
.validate(package
)
395 self
.assertEquals(list(validated_files
.keys()), [nsd_filename
])
398 class TestPackageStore(PackageTestCase
):
399 def create_store(self
, root_dir
):
400 store
= rift
.package
.store
.PackageFilesystemStore(self
._log
, root_dir
)
403 def create_and_store_package(self
, store
):
404 package
= self
.create_nsd_package()
405 store
.store_package(package
)
409 def test_store_package(self
):
410 with tempfile
.TemporaryDirectory() as root_dir
:
411 store
= self
.create_store(root_dir
)
412 package
= self
.create_and_store_package(store
)
413 new_package
= store
.get_package(package
.descriptor_id
)
414 self
.assertEquals(new_package
.files
, package
.files
)
415 self
.assertEquals(type(new_package
), type(package
))
417 def test_store_reload_package(self
):
418 with tempfile
.TemporaryDirectory() as root_dir
:
419 store
= self
.create_store(root_dir
)
420 package
= self
.create_and_store_package(store
)
422 new_store
= self
.create_store(root_dir
)
423 new_package
= new_store
.get_package(package
.descriptor_id
)
425 self
.assertEquals(new_package
.files
, package
.files
)
426 self
.assertEquals(type(new_package
), type(package
))
428 def test_delete_package(self
):
429 with tempfile
.TemporaryDirectory() as root_dir
:
430 store
= self
.create_store(root_dir
)
431 package
= self
.create_and_store_package(store
)
433 store
.get_package(package
.descriptor_id
)
434 store
.delete_package(package
.descriptor_id
)
436 with self
.assertRaises(rift
.package
.store
.PackageStoreError
):
437 store
.get_package(package
.descriptor_id
)
439 def test_store_exist_package(self
):
440 with tempfile
.TemporaryDirectory() as root_dir
:
441 store
= self
.create_store(root_dir
)
442 package
= self
.create_and_store_package(store
)
444 with self
.assertRaises(rift
.package
.store
.PackageStoreError
):
445 store
.store_package(package
)
448 class TestTemporaryPackage(PackageTestCase
):
449 def test_temp_package(self
):
450 self
._tar
_file
_hdl
= tempfile
.NamedTemporaryFile(delete
=False)
451 self
._tar
= tarfile
.open(fileobj
=self
._tar
_file
_hdl
, mode
="w|gz")
453 self
.assertTrue(os
.path
.exists(self
._tar
_file
_hdl
.name
))
455 package
= self
.create_nsd_package()
456 with rift
.package
.package
.TemporaryPackage(self
._log
, package
, self
._tar
_file
_hdl
) as temp_pkg
:
457 self
.assertTrue(package
is temp_pkg
)
458 self
.assertEquals(package
.files
, temp_pkg
.files
)
460 self
.assertFalse(os
.path
.exists(self
._tar
_file
_hdl
.name
))
463 def main(argv
=sys
.argv
[1:]):
464 logging
.basicConfig(format
='TEST %(message)s')
466 runner
= xmlrunner
.XMLTestRunner(output
=os
.environ
["RIFT_MODULE_TEST"])
467 parser
= argparse
.ArgumentParser()
468 parser
.add_argument('-v', '--verbose', action
='store_true')
469 parser
.add_argument('-n', '--no-runner', action
='store_true')
471 args
, unknown
= parser
.parse_known_args(argv
)
475 # Set the global logging level
476 logging
.getLogger().setLevel(logging
.DEBUG
if args
.verbose
else logging
.ERROR
)
478 # The unittest framework requires a program name, so use the name of this
479 # file instead (we do not want to have to pass a fake program name to main
480 # when this is called from the interpreter).
481 unittest
.main(argv
=[__file__
] + unknown
+ ["-v"], testRunner
=runner
)
483 if __name__
== '__main__':