4 # Copyright 2016 RIFT.IO Inc
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
32 import rift
.package
.archive
33 import rift
.package
.package
34 import rift
.package
.charm
35 import rift
.package
.icon
36 import rift
.package
.script
37 import rift
.package
.config
38 import rift
.package
.store
39 import rift
.package
.checksums
40 import rift
.package
.cloud_init
44 gi
.require_version('RwpersonDbYang', '1.0')
45 gi
.require_version('RwYang', '1.0')
47 from gi
.repository
import (
53 nsd_yaml
= b
"""nsd:nsd-catalog:
57 nsd:description: Gateways to access as corpA to PE1 and PE2
60 vnfd_yaml
= b
"""vnfd:vnfd-catalog:
62 - vnfd:id: gw_corpA_vnfd
63 vnfd:name: gw_corpA_vnfd
64 vnfd:description: Gateways to access as corpA to PE1 and PE2
67 nsd_filename
= "gw_corpA__nsd.yaml"
68 vnfd_filename
= "gw_corpA__vnfd.yaml"
71 def file_hdl_md5(file_hdl
):
72 return rift
.package
.checksums
.checksum(file_hdl
)
75 class ArchiveTestCase(unittest
.TestCase
):
77 self
._log
= logging
.getLogger()
79 self
._tar
_file
_hdl
= io
.BytesIO()
80 self
._tar
= tarfile
.open(fileobj
=self
._tar
_file
_hdl
, mode
="w|gz")
82 self
._nsd
_yaml
_hdl
= io
.BytesIO(nsd_yaml
)
83 self
._vnfd
_yaml
_hdl
= io
.BytesIO(vnfd_yaml
)
86 self
._nsd
_yaml
_hdl
.close()
87 self
._vnfd
_yaml
_hdl
.close()
89 self
._tar
_file
_hdl
.close()
91 def create_tar_package_archive(self
):
93 self
._tar
_file
_hdl
.flush()
94 self
._tar
_file
_hdl
.seek(0)
95 archive
= rift
.package
.package
.TarPackageArchive(
97 tar_file_hdl
=self
._tar
_file
_hdl
,
102 def add_tarinfo(self
, name
, file_hdl
, mode
=0o777):
103 tarinfo
= tarfile
.TarInfo(name
)
104 tarinfo
.size
= len(file_hdl
.read())
105 assert tarinfo
.size
> 0
107 self
._tar
.addfile(tarinfo
, file_hdl
)
109 def add_tarinfo_dir(self
, name
):
110 tarinfo
= tarfile
.TarInfo(name
)
111 tarinfo
.type = tarfile
.DIRTYPE
112 self
._tar
.addfile(tarinfo
)
114 def add_nsd_yaml(self
):
115 self
.add_tarinfo(nsd_filename
, io
.BytesIO(nsd_yaml
))
117 def add_vnfd_yaml(self
):
118 self
.add_tarinfo(vnfd_filename
, io
.BytesIO(vnfd_yaml
))
121 class PackageTestCase(ArchiveTestCase
):
122 def create_nsd_package(self
):
124 archive
= self
.create_tar_package_archive()
125 package
= archive
.create_package()
129 def create_vnfd_package(self
):
131 archive
= self
.create_tar_package_archive()
132 package
= archive
.create_package()
137 class TestCreateArchive(ArchiveTestCase
):
138 def test_create_tar_archive(self
):
140 archive
= self
.create_tar_package_archive()
141 self
.assertEquals(set(archive
.filenames
), {nsd_filename}
)
143 def test_nsd_tar_archive(self
):
144 #Write the NSD YAML to the tar file
147 archive
= self
.create_tar_package_archive()
148 with archive
.open_file(nsd_filename
) as nsd_hdl
:
149 nsd_bytes
= nsd_hdl
.read()
151 self
.assertEquals(nsd_bytes
, nsd_yaml
)
154 class TestPackage(PackageTestCase
):
155 def create_vnfd_package_archive(self
, package
, hdl
):
156 # Create an archive from a package
157 archive
= rift
.package
.archive
.TarPackageArchive
.from_package(
158 self
._log
, package
, hdl
,
160 # Closing the archive writes any closing bytes to the file handle
166 def test_create_nsd_package_from_archive(self
):
167 package
= self
.create_nsd_package()
168 self
.assertTrue(isinstance(package
, rift
.package
.package
.NsdPackage
))
170 json_str
= package
.json_descriptor
171 desc_dict
= json
.loads(json_str
)
172 self
.assertIn("nsd:nsd-catalog", desc_dict
)
174 def test_create_vnfd_package_from_archive(self
):
175 package
= self
.create_vnfd_package()
176 self
.assertTrue(isinstance(package
, rift
.package
.package
.VnfdPackage
))
178 json_str
= package
.json_descriptor
179 desc_dict
= json
.loads(json_str
)
180 self
.assertIn("vnfd:vnfd-catalog", desc_dict
)
182 def test_create_vnfd_archive_from_package(self
):
183 package
= self
.create_vnfd_package()
185 self
.create_vnfd_package_archive(package
, hdl
)
187 # Ensure that the archive created was valid
188 with tarfile
.open(fileobj
=hdl
, mode
='r|gz'):
191 def test_round_trip_vnfd_package_from_archive(self
):
192 package
= self
.create_vnfd_package()
194 self
.create_vnfd_package_archive(package
, hdl
)
196 archive
= rift
.package
.archive
.TarPackageArchive(self
._log
, hdl
)
198 return rift
.package
.checksums
.checksum(file_hdl
)
200 # Create the package from the archive and validate file checksums and modes
201 new_package
= archive
.create_package()
203 self
.assertEqual(package
.files
, new_package
.files
)
204 self
.assertEqual(type(package
), type(new_package
))
206 for filename
in package
.files
:
207 pkg_file
= package
.open(filename
)
208 new_pkg_file
= new_package
.open(filename
)
209 self
.assertEqual(md5(pkg_file
), md5(new_pkg_file
))
211 def test_create_nsd_package_from_file(self
):
212 nsd_file_name
= "asdf_nsd.yaml"
213 hdl
= io
.BytesIO(nsd_yaml
)
214 hdl
.name
= nsd_file_name
216 package
= rift
.package
.package
.DescriptorPackage
.from_descriptor_file_hdl(
219 self
.assertTrue(isinstance(package
, rift
.package
.package
.NsdPackage
))
221 with package
.open(nsd_file_name
) as nsd_hdl
:
222 nsd_data
= nsd_hdl
.read()
223 self
.assertEquals(yaml
.load(nsd_data
), yaml
.load(nsd_yaml
))
225 def test_create_vnfd_package_from_file(self
):
226 vnfd_file_name
= "asdf_vnfd.yaml"
227 hdl
= io
.BytesIO(vnfd_yaml
)
228 hdl
.name
= vnfd_file_name
230 package
= rift
.package
.package
.DescriptorPackage
.from_descriptor_file_hdl(
233 self
.assertTrue(isinstance(package
, rift
.package
.package
.VnfdPackage
))
235 with package
.open(vnfd_file_name
) as vnfd_hdl
:
236 vnfd_data
= vnfd_hdl
.read()
237 self
.assertEquals(yaml
.load(vnfd_data
), yaml
.load(vnfd_yaml
))
240 class TestPackageCharmExtractor(PackageTestCase
):
241 def add_charm_dir(self
, charm_name
):
242 charm_dir
= "charms/trusty/{}".format(charm_name
)
243 charm_file
= "{}/actions.yaml".format(charm_dir
)
244 charm_text
= b
"THIS IS A FAKE CHARM"
245 self
.add_tarinfo_dir(charm_dir
)
246 self
.add_tarinfo(charm_file
, io
.BytesIO(charm_text
))
248 def test_extract_charm(self
):
249 charm_name
= "charm_a"
250 self
.add_charm_dir(charm_name
)
251 package
= self
.create_vnfd_package()
252 with tempfile
.TemporaryDirectory() as tmp_dir
:
253 extractor
= rift
.package
.charm
.PackageCharmExtractor(self
._log
, tmp_dir
)
254 extractor
.extract_charms(package
)
256 charm_dir
= extractor
.get_extracted_charm_dir(package
.descriptor_id
, charm_name
)
257 self
.assertTrue(os
.path
.exists(charm_dir
))
258 self
.assertTrue(os
.path
.isdir(charm_dir
))
261 class TestPackageIconExtractor(PackageTestCase
):
262 def add_icon_file(self
, icon_name
):
263 icon_file
= "icons/{}".format(icon_name
)
264 icon_text
= b
"png file bytes"
265 self
.add_tarinfo(icon_file
, io
.BytesIO(icon_text
))
267 def test_extract_icon(self
):
269 self
.add_icon_file(icon_name
)
270 package
= self
.create_vnfd_package()
271 with tempfile
.TemporaryDirectory() as tmp_dir
:
272 extractor
= rift
.package
.icon
.PackageIconExtractor(self
._log
, tmp_dir
)
273 extractor
.extract_icons(package
)
275 icon_file
= extractor
.get_extracted_icon_path(
276 package
.descriptor_type
, package
.descriptor_id
, icon_name
278 self
.assertTrue(os
.path
.exists(icon_file
))
279 self
.assertTrue(os
.path
.isfile(icon_file
))
282 class TestPackageScriptExtractor(PackageTestCase
):
283 def add_script_file(self
, script_name
):
284 script_file
= "scripts/{}".format(script_name
)
285 script_text
= b
"""#!/usr/bin/python
288 self
.add_tarinfo(script_file
, io
.BytesIO(script_text
), mode
=0o666)
290 def test_extract_script(self
):
291 script_name
= "add_corporation.py"
292 self
.add_script_file(script_name
)
293 package
= self
.create_vnfd_package()
294 with tempfile
.TemporaryDirectory() as tmp_dir
:
295 extractor
= rift
.package
.script
.PackageScriptExtractor(self
._log
, tmp_dir
)
296 extractor
.extract_scripts(package
)
298 script_dir
= extractor
.get_extracted_script_path(package
.descriptor_id
, script_name
)
299 self
.assertTrue(os
.path
.exists(script_dir
))
300 self
.assertTrue(os
.path
.isfile(script_dir
))
302 class TestPackageCloudInitExtractor(PackageTestCase
):
303 def add_cloud_init_file(self
, cloud_init_filename
):
304 script_file
= "cloud_init/{}".format(cloud_init_filename
)
305 script_text
= b
"""#cloud-config"""
306 self
.add_tarinfo(script_file
, io
.BytesIO(script_text
), mode
=0o666)
308 def test_read_cloud_init(self
):
309 script_name
= "testVM_cloud_init.cfg"
310 valid_script_text
= "#cloud-config"
311 self
.add_cloud_init_file(script_name
)
312 package
= self
.create_vnfd_package()
314 extractor
= rift
.package
.cloud_init
.PackageCloudInitExtractor(self
._log
)
315 cloud_init_contents
= extractor
.read_script(package
, script_name
)
317 self
.assertEquals(cloud_init_contents
, valid_script_text
)
319 def test_cloud_init_file_missing(self
):
320 script_name
= "testVM_cloud_init.cfg"
321 package
= self
.create_vnfd_package()
323 extractor
= rift
.package
.cloud_init
.PackageCloudInitExtractor(self
._log
)
325 with self
.assertRaises(rift
.package
.cloud_init
.CloudInitExtractionError
):
326 extractor
.read_script(package
, script_name
)
328 class TestPackageConfigExtractor(PackageTestCase
):
329 def add_ns_config_file(self
, nsd_id
):
330 config_file
= "ns_config/{}.yaml".format(nsd_id
)
331 config_text
= b
""" ns_config """
332 self
.add_tarinfo(config_file
, io
.BytesIO(config_text
), mode
=0o666)
336 def add_vnf_config_file(self
, vnfd_id
, member_vnf_index
):
337 config_file
= "vnf_config/{}_{}.yaml".format(vnfd_id
, member_vnf_index
)
338 config_text
= b
""" vnf_config """
339 self
.add_tarinfo(config_file
, io
.BytesIO(config_text
), mode
=0o666)
343 def test_extract_config(self
):
344 ns_config_file
= self
.add_ns_config_file("nsd_id")
345 vnf_config_file
= self
.add_vnf_config_file("vnfd_id", 1)
346 package
= self
.create_nsd_package()
347 with tempfile
.TemporaryDirectory() as tmp_dir
:
348 extractor
= rift
.package
.config
.PackageConfigExtractor(self
._log
, tmp_dir
)
349 extractor
.extract_configs(package
)
351 dest_ns_config_file
= extractor
.get_extracted_config_path(package
.descriptor_id
, ns_config_file
)
352 dest_vnf_config_file
= extractor
.get_extracted_config_path(package
.descriptor_id
, vnf_config_file
)
353 self
.assertTrue(os
.path
.isfile(dest_ns_config_file
))
354 self
.assertTrue(os
.path
.isfile(dest_vnf_config_file
))
357 class TestPackageValidator(PackageTestCase
):
360 self
._validator
= rift
.package
.package
.PackageChecksumValidator(self
._log
)
362 def create_checksum_file(self
, file_md5_map
):
363 checksum_hdl
= io
.BytesIO()
364 for file_name
, md5
in file_md5_map
.items():
365 checksum_hdl
.write("{} {}\n".format(md5
, file_name
).encode())
370 self
.add_tarinfo("checksums.txt", checksum_hdl
)
371 self
._tar
.addfile(tarfile
.TarInfo(), checksum_hdl
)
373 def create_nsd_package_with_checksum(self
):
374 self
.create_checksum_file(
375 {nsd_filename
: file_hdl_md5(io
.BytesIO(nsd_yaml
))}
377 package
= self
.create_nsd_package()
380 def test_package_no_checksum(self
):
381 package
= self
.create_nsd_package()
383 # For now, a missing checksum file will be supported.
384 # No files will be validated.
385 validated_files
= self
._validator
.validate(package
)
386 self
.assertEquals(validated_files
, {})
388 def test_package_with_checksum(self
):
389 package
= self
.create_nsd_package_with_checksum()
390 validated_files
= self
._validator
.validate(package
)
391 self
.assertEquals(list(validated_files
.keys()), [nsd_filename
])
394 class TestPackageStore(PackageTestCase
):
395 def create_store(self
, root_dir
):
396 store
= rift
.package
.store
.PackageFilesystemStore(self
._log
, root_dir
)
399 def create_and_store_package(self
, store
):
400 package
= self
.create_nsd_package()
401 store
.store_package(package
)
405 def test_store_package(self
):
406 with tempfile
.TemporaryDirectory() as root_dir
:
407 store
= self
.create_store(root_dir
)
408 package
= self
.create_and_store_package(store
)
409 new_package
= store
.get_package(package
.descriptor_id
)
410 self
.assertEquals(new_package
.files
, package
.files
)
411 self
.assertEquals(type(new_package
), type(package
))
413 def test_store_reload_package(self
):
414 with tempfile
.TemporaryDirectory() as root_dir
:
415 store
= self
.create_store(root_dir
)
416 package
= self
.create_and_store_package(store
)
418 new_store
= self
.create_store(root_dir
)
419 new_package
= new_store
.get_package(package
.descriptor_id
)
421 self
.assertEquals(new_package
.files
, package
.files
)
422 self
.assertEquals(type(new_package
), type(package
))
424 def test_delete_package(self
):
425 with tempfile
.TemporaryDirectory() as root_dir
:
426 store
= self
.create_store(root_dir
)
427 package
= self
.create_and_store_package(store
)
429 store
.get_package(package
.descriptor_id
)
430 store
.delete_package(package
.descriptor_id
)
432 with self
.assertRaises(rift
.package
.store
.PackageStoreError
):
433 store
.get_package(package
.descriptor_id
)
435 def test_store_exist_package(self
):
436 with tempfile
.TemporaryDirectory() as root_dir
:
437 store
= self
.create_store(root_dir
)
438 package
= self
.create_and_store_package(store
)
440 with self
.assertRaises(rift
.package
.store
.PackageStoreError
):
441 store
.store_package(package
)
444 class TestTemporaryPackage(PackageTestCase
):
445 def test_temp_package(self
):
446 self
._tar
_file
_hdl
= tempfile
.NamedTemporaryFile(delete
=False)
447 self
._tar
= tarfile
.open(fileobj
=self
._tar
_file
_hdl
, mode
="w|gz")
449 self
.assertTrue(os
.path
.exists(self
._tar
_file
_hdl
.name
))
451 package
= self
.create_nsd_package()
452 with rift
.package
.package
.TemporaryPackage(self
._log
, package
, self
._tar
_file
_hdl
) as temp_pkg
:
453 self
.assertTrue(package
is temp_pkg
)
454 self
.assertEquals(package
.files
, temp_pkg
.files
)
456 self
.assertFalse(os
.path
.exists(self
._tar
_file
_hdl
.name
))
459 def main(argv
=sys
.argv
[1:]):
460 logging
.basicConfig(format
='TEST %(message)s')
462 runner
= xmlrunner
.XMLTestRunner(output
=os
.environ
["RIFT_MODULE_TEST"])
463 parser
= argparse
.ArgumentParser()
464 parser
.add_argument('-v', '--verbose', action
='store_true')
465 parser
.add_argument('-n', '--no-runner', action
='store_true')
467 args
, unknown
= parser
.parse_known_args(argv
)
471 # Set the global logging level
472 logging
.getLogger().setLevel(logging
.DEBUG
if args
.verbose
else logging
.ERROR
)
474 # The unittest framework requires a program name, so use the name of this
475 # file instead (we do not want to have to pass a fake program name to main
476 # when this is called from the interpreter).
477 unittest
.main(argv
=[__file__
] + unknown
+ ["-v"], testRunner
=runner
)
479 if __name__
== '__main__':