| #!/usr/bin/env python3 |
| |
| # |
| # Copyright 2016 RIFT.IO Inc |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| |
| import argparse |
| import logging |
| import io |
| import json |
| import os |
| import sys |
| import tarfile |
| import tempfile |
| import unittest |
| import xmlrunner |
| import yaml |
| |
| import rift.package.archive |
| import rift.package.package |
| import rift.package.charm |
| import rift.package.icon |
| import rift.package.script |
| import rift.package.config |
| import rift.package.store |
| import rift.package.checksums |
| import rift.package.cloud_init |
| |
| |
| import gi |
| gi.require_version('RwpersonDbYang', '1.0') |
| gi.require_version('RwYang', '1.0') |
| |
| from gi.repository import ( |
| RwpersonDbYang, |
| RwYang, |
| ) |
| |
| |
| nsd_yaml = b"""nsd:nsd-catalog: |
| nsd:nsd: |
| - nsd:id: gw_corpA |
| nsd:name: gw_corpA |
| nsd:description: Gateways to access as corpA to PE1 and PE2 |
| """ |
| |
| vnfd_yaml = b"""vnfd:vnfd-catalog: |
| vnfd:vnfd: |
| - vnfd:id: gw_corpA_vnfd |
| vnfd:name: gw_corpA_vnfd |
| vnfd:description: Gateways to access as corpA to PE1 and PE2 |
| """ |
| |
| nsd_filename = "gw_corpA__nsd.yaml" |
| vnfd_filename = "gw_corpA__vnfd.yaml" |
| |
| |
| def file_hdl_md5(file_hdl): |
| return rift.package.checksums.checksum(file_hdl) |
| |
| |
| class ArchiveTestCase(unittest.TestCase): |
| def setUp(self): |
| self._log = logging.getLogger() |
| |
| self._tar_file_hdl = io.BytesIO() |
| self._tar = tarfile.open(fileobj=self._tar_file_hdl, mode="w|gz") |
| |
| self._nsd_yaml_hdl = io.BytesIO(nsd_yaml) |
| self._vnfd_yaml_hdl = io.BytesIO(vnfd_yaml) |
| |
| def tearDown(self): |
| self._nsd_yaml_hdl.close() |
| self._vnfd_yaml_hdl.close() |
| self._tar.close() |
| self._tar_file_hdl.close() |
| |
| def create_tar_package_archive(self): |
| self._tar.close() |
| self._tar_file_hdl.flush() |
| self._tar_file_hdl.seek(0) |
| archive = rift.package.package.TarPackageArchive( |
| log=self._log, |
| tar_file_hdl=self._tar_file_hdl, |
| ) |
| |
| return archive |
| |
| def add_tarinfo(self, name, file_hdl, mode=0o777): |
| tarinfo = tarfile.TarInfo(name) |
| tarinfo.size = len(file_hdl.read()) |
| assert tarinfo.size > 0 |
| file_hdl.seek(0) |
| self._tar.addfile(tarinfo, file_hdl) |
| |
| def add_tarinfo_dir(self, name): |
| tarinfo = tarfile.TarInfo(name) |
| tarinfo.type = tarfile.DIRTYPE |
| self._tar.addfile(tarinfo) |
| |
| def add_nsd_yaml(self): |
| self.add_tarinfo(nsd_filename, io.BytesIO(nsd_yaml)) |
| |
| def add_vnfd_yaml(self): |
| self.add_tarinfo(vnfd_filename, io.BytesIO(vnfd_yaml)) |
| |
| |
| class PackageTestCase(ArchiveTestCase): |
| def create_nsd_package(self): |
| self.add_nsd_yaml() |
| archive = self.create_tar_package_archive() |
| package = archive.create_package() |
| |
| return package |
| |
| def create_vnfd_package(self): |
| self.add_vnfd_yaml() |
| archive = self.create_tar_package_archive() |
| package = archive.create_package() |
| |
| return package |
| |
| |
| class TestCreateArchive(ArchiveTestCase): |
| def test_create_tar_archive(self): |
| self.add_nsd_yaml() |
| archive = self.create_tar_package_archive() |
| self.assertEquals(set(archive.filenames), {nsd_filename}) |
| |
| def test_nsd_tar_archive(self): |
| #Write the NSD YAML to the tar file |
| self.add_nsd_yaml() |
| |
| archive = self.create_tar_package_archive() |
| with archive.open_file(nsd_filename) as nsd_hdl: |
| nsd_bytes = nsd_hdl.read() |
| |
| self.assertEquals(nsd_bytes, nsd_yaml) |
| |
| |
| class TestPackage(PackageTestCase): |
| def create_vnfd_package_archive(self, package, hdl): |
| # Create an archive from a package |
| archive = rift.package.archive.TarPackageArchive.from_package( |
| self._log, package, hdl, |
| ) |
| # Closing the archive writes any closing bytes to the file handle |
| archive.close() |
| hdl.seek(0) |
| |
| return archive |
| |
| def test_create_nsd_package_from_archive(self): |
| package = self.create_nsd_package() |
| self.assertTrue(isinstance(package, rift.package.package.NsdPackage)) |
| |
| json_str = package.json_descriptor |
| desc_dict = json.loads(json_str) |
| self.assertIn("nsd:nsd-catalog", desc_dict) |
| |
| def test_create_vnfd_package_from_archive(self): |
| package = self.create_vnfd_package() |
| self.assertTrue(isinstance(package, rift.package.package.VnfdPackage)) |
| |
| json_str = package.json_descriptor |
| desc_dict = json.loads(json_str) |
| self.assertIn("vnfd:vnfd-catalog", desc_dict) |
| |
| def test_create_vnfd_archive_from_package(self): |
| package = self.create_vnfd_package() |
| hdl = io.BytesIO() |
| self.create_vnfd_package_archive(package, hdl) |
| |
| # Ensure that the archive created was valid |
| with tarfile.open(fileobj=hdl, mode='r|gz'): |
| pass |
| |
| def test_round_trip_vnfd_package_from_archive(self): |
| package = self.create_vnfd_package() |
| hdl = io.BytesIO() |
| self.create_vnfd_package_archive(package, hdl) |
| |
| archive = rift.package.archive.TarPackageArchive(self._log, hdl) |
| def md5(file_hdl): |
| return rift.package.checksums.checksum(file_hdl) |
| |
| # Create the package from the archive and validate file checksums and modes |
| new_package = archive.create_package() |
| |
| self.assertEqual(package.files, new_package.files) |
| self.assertEqual(type(package), type(new_package)) |
| |
| for filename in package.files: |
| pkg_file = package.open(filename) |
| new_pkg_file = new_package.open(filename) |
| self.assertEqual(md5(pkg_file), md5(new_pkg_file)) |
| |
| def test_create_nsd_package_from_file(self): |
| nsd_file_name = "asdf_nsd.yaml" |
| hdl = io.BytesIO(nsd_yaml) |
| hdl.name = nsd_file_name |
| |
| package = rift.package.package.DescriptorPackage.from_descriptor_file_hdl( |
| self._log, hdl |
| ) |
| self.assertTrue(isinstance(package, rift.package.package.NsdPackage)) |
| |
| with package.open(nsd_file_name) as nsd_hdl: |
| nsd_data = nsd_hdl.read() |
| self.assertEquals(yaml.load(nsd_data), yaml.load(nsd_yaml)) |
| |
| def test_create_vnfd_package_from_file(self): |
| vnfd_file_name = "asdf_vnfd.yaml" |
| hdl = io.BytesIO(vnfd_yaml) |
| hdl.name = vnfd_file_name |
| |
| package = rift.package.package.DescriptorPackage.from_descriptor_file_hdl( |
| self._log, hdl |
| ) |
| self.assertTrue(isinstance(package, rift.package.package.VnfdPackage)) |
| |
| with package.open(vnfd_file_name) as vnfd_hdl: |
| vnfd_data = vnfd_hdl.read() |
| self.assertEquals(yaml.load(vnfd_data), yaml.load(vnfd_yaml)) |
| |
| |
| class TestPackageCharmExtractor(PackageTestCase): |
| def add_charm_dir(self, charm_name): |
| charm_dir = "charms/trusty/{}".format(charm_name) |
| charm_file = "{}/actions.yaml".format(charm_dir) |
| charm_text = b"THIS IS A FAKE CHARM" |
| self.add_tarinfo_dir(charm_dir) |
| self.add_tarinfo(charm_file, io.BytesIO(charm_text)) |
| |
| def test_extract_charm(self): |
| charm_name = "charm_a" |
| self.add_charm_dir(charm_name) |
| package = self.create_vnfd_package() |
| with tempfile.TemporaryDirectory() as tmp_dir: |
| extractor = rift.package.charm.PackageCharmExtractor(self._log, tmp_dir) |
| extractor.extract_charms(package) |
| |
| charm_dir = extractor.get_extracted_charm_dir(package.descriptor_id, charm_name) |
| self.assertTrue(os.path.exists(charm_dir)) |
| self.assertTrue(os.path.isdir(charm_dir)) |
| |
| |
| class TestPackageIconExtractor(PackageTestCase): |
| def add_icon_file(self, icon_name): |
| icon_file = "icons/{}".format(icon_name) |
| icon_text = b"png file bytes" |
| self.add_tarinfo(icon_file, io.BytesIO(icon_text)) |
| |
| def test_extract_icon(self): |
| icon_name = "icon_a" |
| self.add_icon_file(icon_name) |
| package = self.create_vnfd_package() |
| with tempfile.TemporaryDirectory() as tmp_dir: |
| extractor = rift.package.icon.PackageIconExtractor(self._log, tmp_dir) |
| extractor.extract_icons(package) |
| |
| icon_file = extractor.get_extracted_icon_path( |
| package.descriptor_type, package.descriptor_id, icon_name |
| ) |
| self.assertTrue(os.path.exists(icon_file)) |
| self.assertTrue(os.path.isfile(icon_file)) |
| |
| |
| class TestPackageScriptExtractor(PackageTestCase): |
| def add_script_file(self, script_name): |
| script_file = "scripts/{}".format(script_name) |
| script_text = b"""#!/usr/bin/python |
| print("hi") |
| """ |
| self.add_tarinfo(script_file, io.BytesIO(script_text), mode=0o666) |
| |
| def test_extract_script(self): |
| script_name = "add_corporation.py" |
| self.add_script_file(script_name) |
| package = self.create_vnfd_package() |
| with tempfile.TemporaryDirectory() as tmp_dir: |
| extractor = rift.package.script.PackageScriptExtractor(self._log, tmp_dir) |
| extractor.extract_scripts(package) |
| |
| script_dir = extractor.get_extracted_script_path(package.descriptor_id, script_name) |
| self.assertTrue(os.path.exists(script_dir)) |
| self.assertTrue(os.path.isfile(script_dir)) |
| |
| class TestPackageCloudInitExtractor(PackageTestCase): |
| def add_cloud_init_file(self, cloud_init_filename): |
| script_file = "cloud_init/{}".format(cloud_init_filename) |
| script_text = b"""#cloud-config""" |
| self.add_tarinfo(script_file, io.BytesIO(script_text), mode=0o666) |
| |
| def test_read_cloud_init(self): |
| script_name = "testVM_cloud_init.cfg" |
| valid_script_text = "#cloud-config" |
| self.add_cloud_init_file(script_name) |
| package = self.create_vnfd_package() |
| |
| extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log) |
| cloud_init_contents = extractor.read_script(package, script_name) |
| |
| self.assertEquals(cloud_init_contents, valid_script_text) |
| |
| def test_cloud_init_file_missing(self): |
| script_name = "testVM_cloud_init.cfg" |
| package = self.create_vnfd_package() |
| |
| extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log) |
| |
| with self.assertRaises(rift.package.cloud_init.CloudInitExtractionError): |
| extractor.read_script(package, script_name) |
| |
| class TestPackageConfigExtractor(PackageTestCase): |
| def add_ns_config_file(self, nsd_id): |
| config_file = "ns_config/{}.yaml".format(nsd_id) |
| config_text = b""" ns_config """ |
| self.add_tarinfo(config_file, io.BytesIO(config_text), mode=0o666) |
| |
| return config_file |
| |
| def add_vnf_config_file(self, vnfd_id, member_vnf_index): |
| config_file = "vnf_config/{}_{}.yaml".format(vnfd_id, member_vnf_index) |
| config_text = b""" vnf_config """ |
| self.add_tarinfo(config_file, io.BytesIO(config_text), mode=0o666) |
| |
| return config_file |
| |
| def test_extract_config(self): |
| ns_config_file = self.add_ns_config_file("nsd_id") |
| vnf_config_file = self.add_vnf_config_file("vnfd_id", 1) |
| package = self.create_nsd_package() |
| with tempfile.TemporaryDirectory() as tmp_dir: |
| extractor = rift.package.config.PackageConfigExtractor(self._log, tmp_dir) |
| extractor.extract_configs(package) |
| |
| dest_ns_config_file = extractor.get_extracted_config_path(package.descriptor_id, ns_config_file) |
| dest_vnf_config_file = extractor.get_extracted_config_path(package.descriptor_id, vnf_config_file) |
| self.assertTrue(os.path.isfile(dest_ns_config_file)) |
| self.assertTrue(os.path.isfile(dest_vnf_config_file)) |
| |
| |
| class TestPackageValidator(PackageTestCase): |
| def setUp(self): |
| super().setUp() |
| self._validator = rift.package.package.PackageChecksumValidator(self._log) |
| |
| def create_checksum_file(self, file_md5_map): |
| checksum_hdl = io.BytesIO() |
| for file_name, md5 in file_md5_map.items(): |
| checksum_hdl.write("{} {}\n".format(md5, file_name).encode()) |
| |
| checksum_hdl.flush() |
| checksum_hdl.seek(0) |
| |
| self.add_tarinfo("checksums.txt", checksum_hdl) |
| self._tar.addfile(tarfile.TarInfo(), checksum_hdl) |
| |
| def create_nsd_package_with_checksum(self): |
| self.create_checksum_file( |
| {nsd_filename: file_hdl_md5(io.BytesIO(nsd_yaml))} |
| ) |
| package = self.create_nsd_package() |
| return package |
| |
| def test_package_no_checksum(self): |
| package = self.create_nsd_package() |
| |
| # For now, a missing checksum file will be supported. |
| # No files will be validated. |
| self._validator.validate(package) |
| validated_files = self._validator.checksums |
| self.assertEquals(validated_files, {}) |
| |
| def test_package_with_checksum(self): |
| package = self.create_nsd_package_with_checksum() |
| self._validator.validate(package) |
| validated_files = self._validator.checksums |
| self.assertEquals(list(validated_files.keys()), [nsd_filename]) |
| |
| |
| class TestPackageStore(PackageTestCase): |
| def create_store(self, root_dir): |
| store = rift.package.store.PackageFilesystemStore(self._log, root_dir) |
| return store |
| |
| def create_and_store_package(self, store): |
| package = self.create_nsd_package() |
| store.store_package(package) |
| |
| return package |
| |
| def test_store_package(self): |
| with tempfile.TemporaryDirectory() as root_dir: |
| store = self.create_store(root_dir) |
| package = self.create_and_store_package(store) |
| new_package = store.get_package(package.descriptor_id) |
| self.assertEquals(new_package.files, package.files) |
| self.assertEquals(type(new_package), type(package)) |
| |
| def test_store_reload_package(self): |
| with tempfile.TemporaryDirectory() as root_dir: |
| store = self.create_store(root_dir) |
| package = self.create_and_store_package(store) |
| |
| new_store = self.create_store(root_dir) |
| new_package = new_store.get_package(package.descriptor_id) |
| |
| self.assertEquals(new_package.files, package.files) |
| self.assertEquals(type(new_package), type(package)) |
| |
| def test_delete_package(self): |
| with tempfile.TemporaryDirectory() as root_dir: |
| store = self.create_store(root_dir) |
| package = self.create_and_store_package(store) |
| |
| store.get_package(package.descriptor_id) |
| store.delete_package(package.descriptor_id) |
| |
| with self.assertRaises(rift.package.store.PackageStoreError): |
| store.get_package(package.descriptor_id) |
| |
| def test_store_exist_package(self): |
| with tempfile.TemporaryDirectory() as root_dir: |
| store = self.create_store(root_dir) |
| package = self.create_and_store_package(store) |
| |
| with self.assertRaises(rift.package.store.PackageStoreError): |
| store.store_package(package) |
| |
| |
| class TestTemporaryPackage(PackageTestCase): |
| def test_temp_package(self): |
| self._tar_file_hdl = tempfile.NamedTemporaryFile(delete=False) |
| self._tar = tarfile.open(fileobj=self._tar_file_hdl, mode="w|gz") |
| |
| self.assertTrue(os.path.exists(self._tar_file_hdl.name)) |
| |
| package = self.create_nsd_package() |
| with rift.package.package.TemporaryPackage(self._log, package, self._tar_file_hdl) as temp_pkg: |
| self.assertTrue(package is temp_pkg) |
| self.assertEquals(package.files, temp_pkg.files) |
| |
| self.assertFalse(os.path.exists(self._tar_file_hdl.name)) |
| |
| |
| def main(argv=sys.argv[1:]): |
| logging.basicConfig(format='TEST %(message)s') |
| |
| runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"]) |
| parser = argparse.ArgumentParser() |
| parser.add_argument('-v', '--verbose', action='store_true') |
| parser.add_argument('-n', '--no-runner', action='store_true') |
| |
| args, unknown = parser.parse_known_args(argv) |
| if args.no_runner: |
| runner = None |
| |
| # Set the global logging level |
| logging.getLogger().setLevel(logging.DEBUG if args.verbose else logging.ERROR) |
| |
| # The unittest framework requires a program name, so use the name of this |
| # file instead (we do not want to have to pass a fake program name to main |
| # when this is called from the interpreter). |
| unittest.main(argv=[__file__] + unknown + ["-v"], testRunner=runner) |
| |
| if __name__ == '__main__': |
| main() |