826871c3d1a4f749f1bdc5bfadb0262c5c196a88
4 # Copyright 2016 RIFT.IO Inc
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
32 import rift
.package
.archive
33 import rift
.package
.package
34 import rift
.package
.charm
35 import rift
.package
.icon
36 import rift
.package
.script
37 import rift
.package
.config
38 import rift
.package
.store
39 import rift
.package
.checksums
40 import rift
.package
.cloud_init
43 nsd_yaml
= b
"""nsd:nsd-catalog:
47 nsd:description: Gateways to access as corpA to PE1 and PE2
50 vnfd_yaml
= b
"""vnfd:vnfd-catalog:
52 - vnfd:id: gw_corpA_vnfd
53 vnfd:name: gw_corpA_vnfd
54 vnfd:description: Gateways to access as corpA to PE1 and PE2
57 nsd_filename
= "gw_corpA__nsd.yaml"
58 vnfd_filename
= "gw_corpA__vnfd.yaml"
61 def file_hdl_md5(file_hdl
):
62 return rift
.package
.checksums
.checksum(file_hdl
)
65 class ArchiveTestCase(unittest
.TestCase
):
67 self
._log
= logging
.getLogger()
69 self
._tar
_file
_hdl
= io
.BytesIO()
70 self
._tar
= tarfile
.open(fileobj
=self
._tar
_file
_hdl
, mode
="w|gz")
72 self
._nsd
_yaml
_hdl
= io
.BytesIO(nsd_yaml
)
73 self
._vnfd
_yaml
_hdl
= io
.BytesIO(vnfd_yaml
)
76 self
._nsd
_yaml
_hdl
.close()
77 self
._vnfd
_yaml
_hdl
.close()
79 self
._tar
_file
_hdl
.close()
81 def create_tar_package_archive(self
):
83 self
._tar
_file
_hdl
.flush()
84 self
._tar
_file
_hdl
.seek(0)
85 archive
= rift
.package
.package
.TarPackageArchive(
87 tar_file_hdl
=self
._tar
_file
_hdl
,
92 def add_tarinfo(self
, name
, file_hdl
, mode
=0o777):
93 tarinfo
= tarfile
.TarInfo(name
)
94 tarinfo
.size
= len(file_hdl
.read())
95 assert tarinfo
.size
> 0
97 self
._tar
.addfile(tarinfo
, file_hdl
)
99 def add_tarinfo_dir(self
, name
):
100 tarinfo
= tarfile
.TarInfo(name
)
101 tarinfo
.type = tarfile
.DIRTYPE
102 self
._tar
.addfile(tarinfo
)
104 def add_nsd_yaml(self
):
105 self
.add_tarinfo(nsd_filename
, io
.BytesIO(nsd_yaml
))
107 def add_vnfd_yaml(self
):
108 self
.add_tarinfo(vnfd_filename
, io
.BytesIO(vnfd_yaml
))
111 class PackageTestCase(ArchiveTestCase
):
112 def create_nsd_package(self
):
114 archive
= self
.create_tar_package_archive()
115 package
= archive
.create_package()
119 def create_vnfd_package(self
):
121 archive
= self
.create_tar_package_archive()
122 package
= archive
.create_package()
127 class TestCreateArchive(ArchiveTestCase
):
128 def test_create_tar_archive(self
):
130 archive
= self
.create_tar_package_archive()
131 self
.assertEquals(set(archive
.filenames
), {nsd_filename}
)
133 def test_nsd_tar_archive(self
):
134 #Write the NSD YAML to the tar file
137 archive
= self
.create_tar_package_archive()
138 with archive
.open_file(nsd_filename
) as nsd_hdl
:
139 nsd_bytes
= nsd_hdl
.read()
141 self
.assertEquals(nsd_bytes
, nsd_yaml
)
144 class TestPackage(PackageTestCase
):
145 def create_vnfd_package_archive(self
, package
, hdl
):
146 # Create an archive from a package
147 archive
= rift
.package
.archive
.TarPackageArchive
.from_package(
148 self
._log
, package
, hdl
,
150 # Closing the archive writes any closing bytes to the file handle
156 def test_create_nsd_package_from_archive(self
):
157 package
= self
.create_nsd_package()
158 self
.assertTrue(isinstance(package
, rift
.package
.package
.NsdPackage
))
160 json_str
= package
.json_descriptor
161 desc_dict
= json
.loads(json_str
)
162 self
.assertIn("nsd:nsd-catalog", desc_dict
)
164 def test_create_vnfd_package_from_archive(self
):
165 package
= self
.create_vnfd_package()
166 self
.assertTrue(isinstance(package
, rift
.package
.package
.VnfdPackage
))
168 json_str
= package
.json_descriptor
169 desc_dict
= json
.loads(json_str
)
170 self
.assertIn("vnfd:vnfd-catalog", desc_dict
)
172 def test_create_vnfd_archive_from_package(self
):
173 package
= self
.create_vnfd_package()
175 self
.create_vnfd_package_archive(package
, hdl
)
177 # Ensure that the archive created was valid
178 with tarfile
.open(fileobj
=hdl
, mode
='r|gz'):
181 def test_round_trip_vnfd_package_from_archive(self
):
182 package
= self
.create_vnfd_package()
184 self
.create_vnfd_package_archive(package
, hdl
)
186 archive
= rift
.package
.archive
.TarPackageArchive(self
._log
, hdl
)
188 return rift
.package
.checksums
.checksum(file_hdl
)
190 # Create the package from the archive and validate file checksums and modes
191 new_package
= archive
.create_package()
193 self
.assertEqual(package
.files
, new_package
.files
)
194 self
.assertEqual(type(package
), type(new_package
))
196 for filename
in package
.files
:
197 pkg_file
= package
.open(filename
)
198 new_pkg_file
= new_package
.open(filename
)
199 self
.assertEqual(md5(pkg_file
), md5(new_pkg_file
))
201 def test_create_nsd_package_from_file(self
):
202 nsd_file_name
= "asdf_nsd.yaml"
203 hdl
= io
.BytesIO(nsd_yaml
)
204 hdl
.name
= nsd_file_name
206 package
= rift
.package
.package
.DescriptorPackage
.from_descriptor_file_hdl(
209 self
.assertTrue(isinstance(package
, rift
.package
.package
.NsdPackage
))
211 with package
.open(nsd_file_name
) as nsd_hdl
:
212 nsd_data
= nsd_hdl
.read()
213 self
.assertEquals(yaml
.load(nsd_data
), yaml
.load(nsd_yaml
))
215 def test_create_vnfd_package_from_file(self
):
216 vnfd_file_name
= "asdf_vnfd.yaml"
217 hdl
= io
.BytesIO(vnfd_yaml
)
218 hdl
.name
= vnfd_file_name
220 package
= rift
.package
.package
.DescriptorPackage
.from_descriptor_file_hdl(
223 self
.assertTrue(isinstance(package
, rift
.package
.package
.VnfdPackage
))
225 with package
.open(vnfd_file_name
) as vnfd_hdl
:
226 vnfd_data
= vnfd_hdl
.read()
227 self
.assertEquals(yaml
.load(vnfd_data
), yaml
.load(vnfd_yaml
))
230 class TestPackageCharmExtractor(PackageTestCase
):
231 def add_charm_dir(self
, charm_name
):
232 charm_dir
= "charms/trusty/{}".format(charm_name
)
233 charm_file
= "{}/actions.yaml".format(charm_dir
)
234 charm_text
= b
"THIS IS A FAKE CHARM"
235 self
.add_tarinfo_dir(charm_dir
)
236 self
.add_tarinfo(charm_file
, io
.BytesIO(charm_text
))
238 def test_extract_charm(self
):
239 charm_name
= "charm_a"
240 self
.add_charm_dir(charm_name
)
241 package
= self
.create_vnfd_package()
242 with tempfile
.TemporaryDirectory() as tmp_dir
:
243 extractor
= rift
.package
.charm
.PackageCharmExtractor(self
._log
, tmp_dir
)
244 extractor
.extract_charms(package
)
246 charm_dir
= extractor
.get_extracted_charm_dir(package
.descriptor_id
, charm_name
)
247 self
.assertTrue(os
.path
.exists(charm_dir
))
248 self
.assertTrue(os
.path
.isdir(charm_dir
))
251 class TestPackageIconExtractor(PackageTestCase
):
252 def add_icon_file(self
, icon_name
):
253 icon_file
= "icons/{}".format(icon_name
)
254 icon_text
= b
"png file bytes"
255 self
.add_tarinfo(icon_file
, io
.BytesIO(icon_text
))
257 def test_extract_icon(self
):
259 self
.add_icon_file(icon_name
)
260 package
= self
.create_vnfd_package()
261 with tempfile
.TemporaryDirectory() as tmp_dir
:
262 extractor
= rift
.package
.icon
.PackageIconExtractor(self
._log
, tmp_dir
)
263 extractor
.extract_icons(package
)
265 icon_file
= extractor
.get_extracted_icon_path(
266 package
.descriptor_type
, package
.descriptor_id
, icon_name
268 self
.assertTrue(os
.path
.exists(icon_file
))
269 self
.assertTrue(os
.path
.isfile(icon_file
))
272 class TestPackageScriptExtractor(PackageTestCase
):
273 def add_script_file(self
, script_name
):
274 script_file
= "scripts/{}".format(script_name
)
275 script_text
= b
"""#!/usr/bin/python
278 self
.add_tarinfo(script_file
, io
.BytesIO(script_text
), mode
=0o666)
280 def test_extract_script(self
):
281 script_name
= "add_corporation.py"
282 self
.add_script_file(script_name
)
283 package
= self
.create_vnfd_package()
284 with tempfile
.TemporaryDirectory() as tmp_dir
:
285 extractor
= rift
.package
.script
.PackageScriptExtractor(self
._log
, tmp_dir
)
286 extractor
.extract_scripts(package
)
288 script_dir
= extractor
.get_extracted_script_path(package
.descriptor_id
, script_name
)
289 self
.assertTrue(os
.path
.exists(script_dir
))
290 self
.assertTrue(os
.path
.isfile(script_dir
))
292 class TestPackageCloudInitExtractor(PackageTestCase
):
293 def add_cloud_init_file(self
, cloud_init_filename
):
294 script_file
= "cloud_init/{}".format(cloud_init_filename
)
295 script_text
= b
"""#cloud-config"""
296 self
.add_tarinfo(script_file
, io
.BytesIO(script_text
), mode
=0o666)
298 def test_read_cloud_init(self
):
299 script_name
= "testVM_cloud_init.cfg"
300 valid_script_text
= "#cloud-config"
301 self
.add_cloud_init_file(script_name
)
302 package
= self
.create_vnfd_package()
304 extractor
= rift
.package
.cloud_init
.PackageCloudInitExtractor(self
._log
)
305 cloud_init_contents
= extractor
.read_script(package
, script_name
)
307 self
.assertEquals(cloud_init_contents
, valid_script_text
)
309 def test_cloud_init_file_missing(self
):
310 script_name
= "testVM_cloud_init.cfg"
311 package
= self
.create_vnfd_package()
313 extractor
= rift
.package
.cloud_init
.PackageCloudInitExtractor(self
._log
)
315 with self
.assertRaises(rift
.package
.cloud_init
.CloudInitExtractionError
):
316 extractor
.read_script(package
, script_name
)
318 class TestPackageConfigExtractor(PackageTestCase
):
319 def add_ns_config_file(self
, nsd_id
):
320 config_file
= "ns_config/{}.yaml".format(nsd_id
)
321 config_text
= b
""" ns_config """
322 self
.add_tarinfo(config_file
, io
.BytesIO(config_text
), mode
=0o666)
326 def add_vnf_config_file(self
, vnfd_id
, member_vnf_index
):
327 config_file
= "vnf_config/{}_{}.yaml".format(vnfd_id
, member_vnf_index
)
328 config_text
= b
""" vnf_config """
329 self
.add_tarinfo(config_file
, io
.BytesIO(config_text
), mode
=0o666)
333 def test_extract_config(self
):
334 ns_config_file
= self
.add_ns_config_file("nsd_id")
335 vnf_config_file
= self
.add_vnf_config_file("vnfd_id", 1)
336 package
= self
.create_nsd_package()
337 with tempfile
.TemporaryDirectory() as tmp_dir
:
338 extractor
= rift
.package
.config
.PackageConfigExtractor(self
._log
, tmp_dir
)
339 extractor
.extract_configs(package
)
341 dest_ns_config_file
= extractor
.get_extracted_config_path(package
.descriptor_id
, ns_config_file
)
342 dest_vnf_config_file
= extractor
.get_extracted_config_path(package
.descriptor_id
, vnf_config_file
)
343 self
.assertTrue(os
.path
.isfile(dest_ns_config_file
))
344 self
.assertTrue(os
.path
.isfile(dest_vnf_config_file
))
347 class TestPackageValidator(PackageTestCase
):
350 self
._validator
= rift
.package
.package
.PackageChecksumValidator(self
._log
)
352 def create_checksum_file(self
, file_md5_map
):
353 checksum_hdl
= io
.BytesIO()
354 for file_name
, md5
in file_md5_map
.items():
355 checksum_hdl
.write("{} {}\n".format(md5
, file_name
).encode())
360 self
.add_tarinfo("checksums.txt", checksum_hdl
)
361 self
._tar
.addfile(tarfile
.TarInfo(), checksum_hdl
)
363 def create_nsd_package_with_checksum(self
):
364 self
.create_checksum_file(
365 {nsd_filename
: file_hdl_md5(io
.BytesIO(nsd_yaml
))}
367 package
= self
.create_nsd_package()
370 def test_package_no_checksum(self
):
371 package
= self
.create_nsd_package()
373 # For now, a missing checksum file will be supported.
374 # No files will be validated.
375 validated_files
= self
._validator
.validate(package
)
376 self
.assertEquals(validated_files
, {})
378 def test_package_with_checksum(self
):
379 package
= self
.create_nsd_package_with_checksum()
380 validated_files
= self
._validator
.validate(package
)
381 self
.assertEquals(list(validated_files
.keys()), [nsd_filename
])
384 class TestPackageStore(PackageTestCase
):
385 def create_store(self
, root_dir
):
386 store
= rift
.package
.store
.PackageFilesystemStore(self
._log
, root_dir
)
389 def create_and_store_package(self
, store
):
390 package
= self
.create_nsd_package()
391 store
.store_package(package
)
395 def test_store_package(self
):
396 with tempfile
.TemporaryDirectory() as root_dir
:
397 store
= self
.create_store(root_dir
)
398 package
= self
.create_and_store_package(store
)
399 new_package
= store
.get_package(package
.descriptor_id
)
400 self
.assertEquals(new_package
.files
, package
.files
)
401 self
.assertEquals(type(new_package
), type(package
))
403 def test_store_reload_package(self
):
404 with tempfile
.TemporaryDirectory() as root_dir
:
405 store
= self
.create_store(root_dir
)
406 package
= self
.create_and_store_package(store
)
408 new_store
= self
.create_store(root_dir
)
409 new_package
= new_store
.get_package(package
.descriptor_id
)
411 self
.assertEquals(new_package
.files
, package
.files
)
412 self
.assertEquals(type(new_package
), type(package
))
414 def test_delete_package(self
):
415 with tempfile
.TemporaryDirectory() as root_dir
:
416 store
= self
.create_store(root_dir
)
417 package
= self
.create_and_store_package(store
)
419 store
.get_package(package
.descriptor_id
)
420 store
.delete_package(package
.descriptor_id
)
422 with self
.assertRaises(rift
.package
.store
.PackageStoreError
):
423 store
.get_package(package
.descriptor_id
)
425 def test_store_exist_package(self
):
426 with tempfile
.TemporaryDirectory() as root_dir
:
427 store
= self
.create_store(root_dir
)
428 package
= self
.create_and_store_package(store
)
430 with self
.assertRaises(rift
.package
.store
.PackageStoreError
):
431 store
.store_package(package
)
434 class TestTemporaryPackage(PackageTestCase
):
435 def test_temp_package(self
):
436 self
._tar
_file
_hdl
= tempfile
.NamedTemporaryFile(delete
=False)
437 self
._tar
= tarfile
.open(fileobj
=self
._tar
_file
_hdl
, mode
="w|gz")
439 self
.assertTrue(os
.path
.exists(self
._tar
_file
_hdl
.name
))
441 package
= self
.create_nsd_package()
442 with rift
.package
.package
.TemporaryPackage(self
._log
, package
, self
._tar
_file
_hdl
) as temp_pkg
:
443 self
.assertTrue(package
is temp_pkg
)
444 self
.assertEquals(package
.files
, temp_pkg
.files
)
446 self
.assertFalse(os
.path
.exists(self
._tar
_file
_hdl
.name
))
449 def main(argv
=sys
.argv
[1:]):
450 logging
.basicConfig(format
='TEST %(message)s')
452 runner
= xmlrunner
.XMLTestRunner(output
=os
.environ
["RIFT_MODULE_TEST"])
453 parser
= argparse
.ArgumentParser()
454 parser
.add_argument('-v', '--verbose', action
='store_true')
455 parser
.add_argument('-n', '--no-runner', action
='store_true')
457 args
, unknown
= parser
.parse_known_args(argv
)
461 # Set the global logging level
462 logging
.getLogger().setLevel(logging
.DEBUG
if args
.verbose
else logging
.ERROR
)
464 # The unittest framework requires a program name, so use the name of this
465 # file instead (we do not want to have to pass a fake program name to main
466 # when this is called from the interpreter).
467 unittest
.main(argv
=[__file__
] + unknown
+ ["-v"], testRunner
=runner
)
469 if __name__
== '__main__':