Merge from OSM SO master
[osm/SO.git] / rwlaunchpad / plugins / rwlaunchpadtasklet / test / utest_package.py
1 #!/usr/bin/env python3
2
3 #
4 # Copyright 2016 RIFT.IO Inc
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18
19
20 import argparse
21 import logging
22 import io
23 import json
24 import os
25 import sys
26 import tarfile
27 import tempfile
28 import unittest
29 import xmlrunner
30 import yaml
31
32 import rift.package.archive
33 import rift.package.package
34 import rift.package.charm
35 import rift.package.icon
36 import rift.package.script
37 import rift.package.config
38 import rift.package.store
39 import rift.package.checksums
40 import rift.package.cloud_init
41
42
43 nsd_yaml = b"""nsd:nsd-catalog:
44 nsd:nsd:
45 - nsd:id: gw_corpA
46 nsd:name: gw_corpA
47 nsd:description: Gateways to access as corpA to PE1 and PE2
48 """
49
50 vnfd_yaml = b"""vnfd:vnfd-catalog:
51 vnfd:vnfd:
52 - vnfd:id: gw_corpA_vnfd
53 vnfd:name: gw_corpA_vnfd
54 vnfd:description: Gateways to access as corpA to PE1 and PE2
55 """
56
57 nsd_filename = "gw_corpA__nsd.yaml"
58 vnfd_filename = "gw_corpA__vnfd.yaml"
59
60
61 def file_hdl_md5(file_hdl):
62 return rift.package.checksums.checksum(file_hdl)
63
64
65 class ArchiveTestCase(unittest.TestCase):
66 def setUp(self):
67 self._log = logging.getLogger()
68
69 self._tar_file_hdl = io.BytesIO()
70 self._tar = tarfile.open(fileobj=self._tar_file_hdl, mode="w|gz")
71
72 self._nsd_yaml_hdl = io.BytesIO(nsd_yaml)
73 self._vnfd_yaml_hdl = io.BytesIO(vnfd_yaml)
74
75 def tearDown(self):
76 self._nsd_yaml_hdl.close()
77 self._vnfd_yaml_hdl.close()
78 self._tar.close()
79 self._tar_file_hdl.close()
80
81 def create_tar_package_archive(self):
82 self._tar.close()
83 self._tar_file_hdl.flush()
84 self._tar_file_hdl.seek(0)
85 archive = rift.package.package.TarPackageArchive(
86 log=self._log,
87 tar_file_hdl=self._tar_file_hdl,
88 )
89
90 return archive
91
92 def add_tarinfo(self, name, file_hdl, mode=0o777):
93 tarinfo = tarfile.TarInfo(name)
94 tarinfo.size = len(file_hdl.read())
95 assert tarinfo.size > 0
96 file_hdl.seek(0)
97 self._tar.addfile(tarinfo, file_hdl)
98
99 def add_tarinfo_dir(self, name):
100 tarinfo = tarfile.TarInfo(name)
101 tarinfo.type = tarfile.DIRTYPE
102 self._tar.addfile(tarinfo)
103
104 def add_nsd_yaml(self):
105 self.add_tarinfo(nsd_filename, io.BytesIO(nsd_yaml))
106
107 def add_vnfd_yaml(self):
108 self.add_tarinfo(vnfd_filename, io.BytesIO(vnfd_yaml))
109
110
111 class PackageTestCase(ArchiveTestCase):
112 def create_nsd_package(self):
113 self.add_nsd_yaml()
114 archive = self.create_tar_package_archive()
115 package = archive.create_package()
116
117 return package
118
119 def create_vnfd_package(self):
120 self.add_vnfd_yaml()
121 archive = self.create_tar_package_archive()
122 package = archive.create_package()
123
124 return package
125
126
127 class TestCreateArchive(ArchiveTestCase):
128 def test_create_tar_archive(self):
129 self.add_nsd_yaml()
130 archive = self.create_tar_package_archive()
131 self.assertEquals(set(archive.filenames), {nsd_filename})
132
133 def test_nsd_tar_archive(self):
134 #Write the NSD YAML to the tar file
135 self.add_nsd_yaml()
136
137 archive = self.create_tar_package_archive()
138 with archive.open_file(nsd_filename) as nsd_hdl:
139 nsd_bytes = nsd_hdl.read()
140
141 self.assertEquals(nsd_bytes, nsd_yaml)
142
143
144 class TestPackage(PackageTestCase):
145 def create_vnfd_package_archive(self, package, hdl):
146 # Create an archive from a package
147 archive = rift.package.archive.TarPackageArchive.from_package(
148 self._log, package, hdl,
149 )
150 # Closing the archive writes any closing bytes to the file handle
151 archive.close()
152 hdl.seek(0)
153
154 return archive
155
156 def test_create_nsd_package_from_archive(self):
157 package = self.create_nsd_package()
158 self.assertTrue(isinstance(package, rift.package.package.NsdPackage))
159
160 json_str = package.json_descriptor
161 desc_dict = json.loads(json_str)
162 self.assertIn("nsd:nsd-catalog", desc_dict)
163
164 def test_create_vnfd_package_from_archive(self):
165 package = self.create_vnfd_package()
166 self.assertTrue(isinstance(package, rift.package.package.VnfdPackage))
167
168 json_str = package.json_descriptor
169 desc_dict = json.loads(json_str)
170 self.assertIn("vnfd:vnfd-catalog", desc_dict)
171
172 def test_create_vnfd_archive_from_package(self):
173 package = self.create_vnfd_package()
174 hdl = io.BytesIO()
175 self.create_vnfd_package_archive(package, hdl)
176
177 # Ensure that the archive created was valid
178 with tarfile.open(fileobj=hdl, mode='r|gz'):
179 pass
180
181 def test_round_trip_vnfd_package_from_archive(self):
182 package = self.create_vnfd_package()
183 hdl = io.BytesIO()
184 self.create_vnfd_package_archive(package, hdl)
185
186 archive = rift.package.archive.TarPackageArchive(self._log, hdl)
187 def md5(file_hdl):
188 return rift.package.checksums.checksum(file_hdl)
189
190 # Create the package from the archive and validate file checksums and modes
191 new_package = archive.create_package()
192
193 self.assertEqual(package.files, new_package.files)
194 self.assertEqual(type(package), type(new_package))
195
196 for filename in package.files:
197 pkg_file = package.open(filename)
198 new_pkg_file = new_package.open(filename)
199 self.assertEqual(md5(pkg_file), md5(new_pkg_file))
200
201 def test_create_nsd_package_from_file(self):
202 nsd_file_name = "asdf_nsd.yaml"
203 hdl = io.BytesIO(nsd_yaml)
204 hdl.name = nsd_file_name
205
206 package = rift.package.package.DescriptorPackage.from_descriptor_file_hdl(
207 self._log, hdl
208 )
209 self.assertTrue(isinstance(package, rift.package.package.NsdPackage))
210
211 with package.open(nsd_file_name) as nsd_hdl:
212 nsd_data = nsd_hdl.read()
213 self.assertEquals(yaml.load(nsd_data), yaml.load(nsd_yaml))
214
215 def test_create_vnfd_package_from_file(self):
216 vnfd_file_name = "asdf_vnfd.yaml"
217 hdl = io.BytesIO(vnfd_yaml)
218 hdl.name = vnfd_file_name
219
220 package = rift.package.package.DescriptorPackage.from_descriptor_file_hdl(
221 self._log, hdl
222 )
223 self.assertTrue(isinstance(package, rift.package.package.VnfdPackage))
224
225 with package.open(vnfd_file_name) as vnfd_hdl:
226 vnfd_data = vnfd_hdl.read()
227 self.assertEquals(yaml.load(vnfd_data), yaml.load(vnfd_yaml))
228
229
230 class TestPackageCharmExtractor(PackageTestCase):
231 def add_charm_dir(self, charm_name):
232 charm_dir = "charms/trusty/{}".format(charm_name)
233 charm_file = "{}/actions.yaml".format(charm_dir)
234 charm_text = b"THIS IS A FAKE CHARM"
235 self.add_tarinfo_dir(charm_dir)
236 self.add_tarinfo(charm_file, io.BytesIO(charm_text))
237
238 def test_extract_charm(self):
239 charm_name = "charm_a"
240 self.add_charm_dir(charm_name)
241 package = self.create_vnfd_package()
242 with tempfile.TemporaryDirectory() as tmp_dir:
243 extractor = rift.package.charm.PackageCharmExtractor(self._log, tmp_dir)
244 extractor.extract_charms(package)
245
246 charm_dir = extractor.get_extracted_charm_dir(package.descriptor_id, charm_name)
247 self.assertTrue(os.path.exists(charm_dir))
248 self.assertTrue(os.path.isdir(charm_dir))
249
250
251 class TestPackageIconExtractor(PackageTestCase):
252 def add_icon_file(self, icon_name):
253 icon_file = "icons/{}".format(icon_name)
254 icon_text = b"png file bytes"
255 self.add_tarinfo(icon_file, io.BytesIO(icon_text))
256
257 def test_extract_icon(self):
258 icon_name = "icon_a"
259 self.add_icon_file(icon_name)
260 package = self.create_vnfd_package()
261 with tempfile.TemporaryDirectory() as tmp_dir:
262 extractor = rift.package.icon.PackageIconExtractor(self._log, tmp_dir)
263 extractor.extract_icons(package)
264
265 icon_file = extractor.get_extracted_icon_path(
266 package.descriptor_type, package.descriptor_id, icon_name
267 )
268 self.assertTrue(os.path.exists(icon_file))
269 self.assertTrue(os.path.isfile(icon_file))
270
271
272 class TestPackageScriptExtractor(PackageTestCase):
273 def add_script_file(self, script_name):
274 script_file = "scripts/{}".format(script_name)
275 script_text = b"""#!/usr/bin/python
276 print("hi")
277 """
278 self.add_tarinfo(script_file, io.BytesIO(script_text), mode=0o666)
279
280 def test_extract_script(self):
281 script_name = "add_corporation.py"
282 self.add_script_file(script_name)
283 package = self.create_vnfd_package()
284 with tempfile.TemporaryDirectory() as tmp_dir:
285 extractor = rift.package.script.PackageScriptExtractor(self._log, tmp_dir)
286 extractor.extract_scripts(package)
287
288 script_dir = extractor.get_extracted_script_path(package.descriptor_id, script_name)
289 self.assertTrue(os.path.exists(script_dir))
290 self.assertTrue(os.path.isfile(script_dir))
291
292 class TestPackageCloudInitExtractor(PackageTestCase):
293 def add_cloud_init_file(self, cloud_init_filename):
294 script_file = "cloud_init/{}".format(cloud_init_filename)
295 script_text = b"""#cloud-config"""
296 self.add_tarinfo(script_file, io.BytesIO(script_text), mode=0o666)
297
298 def test_read_cloud_init(self):
299 script_name = "testVM_cloud_init.cfg"
300 valid_script_text = "#cloud-config"
301 self.add_cloud_init_file(script_name)
302 package = self.create_vnfd_package()
303
304 extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
305 cloud_init_contents = extractor.read_script(package, script_name)
306
307 self.assertEquals(cloud_init_contents, valid_script_text)
308
309 def test_cloud_init_file_missing(self):
310 script_name = "testVM_cloud_init.cfg"
311 package = self.create_vnfd_package()
312
313 extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
314
315 with self.assertRaises(rift.package.cloud_init.CloudInitExtractionError):
316 extractor.read_script(package, script_name)
317
318 class TestPackageConfigExtractor(PackageTestCase):
319 def add_ns_config_file(self, nsd_id):
320 config_file = "ns_config/{}.yaml".format(nsd_id)
321 config_text = b""" ns_config """
322 self.add_tarinfo(config_file, io.BytesIO(config_text), mode=0o666)
323
324 return config_file
325
326 def add_vnf_config_file(self, vnfd_id, member_vnf_index):
327 config_file = "vnf_config/{}_{}.yaml".format(vnfd_id, member_vnf_index)
328 config_text = b""" vnf_config """
329 self.add_tarinfo(config_file, io.BytesIO(config_text), mode=0o666)
330
331 return config_file
332
333 def test_extract_config(self):
334 ns_config_file = self.add_ns_config_file("nsd_id")
335 vnf_config_file = self.add_vnf_config_file("vnfd_id", 1)
336 package = self.create_nsd_package()
337 with tempfile.TemporaryDirectory() as tmp_dir:
338 extractor = rift.package.config.PackageConfigExtractor(self._log, tmp_dir)
339 extractor.extract_configs(package)
340
341 dest_ns_config_file = extractor.get_extracted_config_path(package.descriptor_id, ns_config_file)
342 dest_vnf_config_file = extractor.get_extracted_config_path(package.descriptor_id, vnf_config_file)
343 self.assertTrue(os.path.isfile(dest_ns_config_file))
344 self.assertTrue(os.path.isfile(dest_vnf_config_file))
345
346
347 class TestPackageValidator(PackageTestCase):
348 def setUp(self):
349 super().setUp()
350 self._validator = rift.package.package.PackageChecksumValidator(self._log)
351
352 def create_checksum_file(self, file_md5_map):
353 checksum_hdl = io.BytesIO()
354 for file_name, md5 in file_md5_map.items():
355 checksum_hdl.write("{} {}\n".format(md5, file_name).encode())
356
357 checksum_hdl.flush()
358 checksum_hdl.seek(0)
359
360 self.add_tarinfo("checksums.txt", checksum_hdl)
361 self._tar.addfile(tarfile.TarInfo(), checksum_hdl)
362
363 def create_nsd_package_with_checksum(self):
364 self.create_checksum_file(
365 {nsd_filename: file_hdl_md5(io.BytesIO(nsd_yaml))}
366 )
367 package = self.create_nsd_package()
368 return package
369
370 def test_package_no_checksum(self):
371 package = self.create_nsd_package()
372
373 # For now, a missing checksum file will be supported.
374 # No files will be validated.
375 validated_files = self._validator.validate(package)
376 self.assertEquals(validated_files, {})
377
378 def test_package_with_checksum(self):
379 package = self.create_nsd_package_with_checksum()
380 validated_files = self._validator.validate(package)
381 self.assertEquals(list(validated_files.keys()), [nsd_filename])
382
383
384 class TestPackageStore(PackageTestCase):
385 def create_store(self, root_dir):
386 store = rift.package.store.PackageFilesystemStore(self._log, root_dir)
387 return store
388
389 def create_and_store_package(self, store):
390 package = self.create_nsd_package()
391 store.store_package(package)
392
393 return package
394
395 def test_store_package(self):
396 with tempfile.TemporaryDirectory() as root_dir:
397 store = self.create_store(root_dir)
398 package = self.create_and_store_package(store)
399 new_package = store.get_package(package.descriptor_id)
400 self.assertEquals(new_package.files, package.files)
401 self.assertEquals(type(new_package), type(package))
402
403 def test_store_reload_package(self):
404 with tempfile.TemporaryDirectory() as root_dir:
405 store = self.create_store(root_dir)
406 package = self.create_and_store_package(store)
407
408 new_store = self.create_store(root_dir)
409 new_package = new_store.get_package(package.descriptor_id)
410
411 self.assertEquals(new_package.files, package.files)
412 self.assertEquals(type(new_package), type(package))
413
414 def test_delete_package(self):
415 with tempfile.TemporaryDirectory() as root_dir:
416 store = self.create_store(root_dir)
417 package = self.create_and_store_package(store)
418
419 store.get_package(package.descriptor_id)
420 store.delete_package(package.descriptor_id)
421
422 with self.assertRaises(rift.package.store.PackageStoreError):
423 store.get_package(package.descriptor_id)
424
425 def test_store_exist_package(self):
426 with tempfile.TemporaryDirectory() as root_dir:
427 store = self.create_store(root_dir)
428 package = self.create_and_store_package(store)
429
430 with self.assertRaises(rift.package.store.PackageStoreError):
431 store.store_package(package)
432
433
434 class TestTemporaryPackage(PackageTestCase):
435 def test_temp_package(self):
436 self._tar_file_hdl = tempfile.NamedTemporaryFile(delete=False)
437 self._tar = tarfile.open(fileobj=self._tar_file_hdl, mode="w|gz")
438
439 self.assertTrue(os.path.exists(self._tar_file_hdl.name))
440
441 package = self.create_nsd_package()
442 with rift.package.package.TemporaryPackage(self._log, package, self._tar_file_hdl) as temp_pkg:
443 self.assertTrue(package is temp_pkg)
444 self.assertEquals(package.files, temp_pkg.files)
445
446 self.assertFalse(os.path.exists(self._tar_file_hdl.name))
447
448
449 def main(argv=sys.argv[1:]):
450 logging.basicConfig(format='TEST %(message)s')
451
452 runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
453 parser = argparse.ArgumentParser()
454 parser.add_argument('-v', '--verbose', action='store_true')
455 parser.add_argument('-n', '--no-runner', action='store_true')
456
457 args, unknown = parser.parse_known_args(argv)
458 if args.no_runner:
459 runner = None
460
461 # Set the global logging level
462 logging.getLogger().setLevel(logging.DEBUG if args.verbose else logging.ERROR)
463
464 # The unittest framework requires a program name, so use the name of this
465 # file instead (we do not want to have to pass a fake program name to main
466 # when this is called from the interpreter).
467 unittest.main(argv=[__file__] + unknown + ["-v"], testRunner=runner)
468
469 if __name__ == '__main__':
470 main()