Move 'launchpad' directory to 'RIFT_VAR_ROOT' from 'RIFT_ARTIFACTS'
[osm/SO.git] / rwlaunchpad / plugins / rwlaunchpadtasklet / test / utest_package.py
1 #!/usr/bin/env python3
2
3 #
4 # Copyright 2016 RIFT.IO Inc
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18
19
20 import argparse
21 import logging
22 import io
23 import json
24 import os
25 import sys
26 import tarfile
27 import tempfile
28 import unittest
29 import xmlrunner
30 import yaml
31
32 #Setting RIFT_VAR_ROOT if not already set for unit test execution
33 if "RIFT_VAR_ROOT" not in os.environ:
34 os.environ['RIFT_VAR_ROOT'] = os.path.join(os.environ['RIFT_INSTALL'], 'var/rift/unittest')
35
36 import rift.package.archive
37 import rift.package.package
38 import rift.package.charm
39 import rift.package.icon
40 import rift.package.script
41 import rift.package.config
42 import rift.package.store
43 import rift.package.checksums
44 import rift.package.cloud_init
45
46
47 import gi
48 gi.require_version('RwpersonDbYang', '1.0')
49 gi.require_version('RwYang', '1.0')
50
51 from gi.repository import (
52 RwpersonDbYang,
53 RwYang,
54 )
55
56
57 nsd_yaml = b"""nsd:nsd-catalog:
58 nsd:nsd:
59 - nsd:id: gw_corpA
60 nsd:name: gw_corpA
61 nsd:description: Gateways to access as corpA to PE1 and PE2
62 """
63
64 vnfd_yaml = b"""vnfd:vnfd-catalog:
65 vnfd:vnfd:
66 - vnfd:id: gw_corpA_vnfd
67 vnfd:name: gw_corpA_vnfd
68 vnfd:description: Gateways to access as corpA to PE1 and PE2
69 """
70
71 nsd_filename = "gw_corpA__nsd.yaml"
72 vnfd_filename = "gw_corpA__vnfd.yaml"
73
74
75 def file_hdl_md5(file_hdl):
76 return rift.package.checksums.checksum(file_hdl)
77
78
79 class ArchiveTestCase(unittest.TestCase):
80 def setUp(self):
81 self._log = logging.getLogger()
82
83 self._tar_file_hdl = io.BytesIO()
84 self._tar = tarfile.open(fileobj=self._tar_file_hdl, mode="w|gz")
85
86 self._nsd_yaml_hdl = io.BytesIO(nsd_yaml)
87 self._vnfd_yaml_hdl = io.BytesIO(vnfd_yaml)
88
89 def tearDown(self):
90 self._nsd_yaml_hdl.close()
91 self._vnfd_yaml_hdl.close()
92 self._tar.close()
93 self._tar_file_hdl.close()
94
95 def create_tar_package_archive(self):
96 self._tar.close()
97 self._tar_file_hdl.flush()
98 self._tar_file_hdl.seek(0)
99 archive = rift.package.package.TarPackageArchive(
100 log=self._log,
101 tar_file_hdl=self._tar_file_hdl,
102 )
103
104 return archive
105
106 def add_tarinfo(self, name, file_hdl, mode=0o777):
107 tarinfo = tarfile.TarInfo(name)
108 tarinfo.size = len(file_hdl.read())
109 assert tarinfo.size > 0
110 file_hdl.seek(0)
111 self._tar.addfile(tarinfo, file_hdl)
112
113 def add_tarinfo_dir(self, name):
114 tarinfo = tarfile.TarInfo(name)
115 tarinfo.type = tarfile.DIRTYPE
116 self._tar.addfile(tarinfo)
117
118 def add_nsd_yaml(self):
119 self.add_tarinfo(nsd_filename, io.BytesIO(nsd_yaml))
120
121 def add_vnfd_yaml(self):
122 self.add_tarinfo(vnfd_filename, io.BytesIO(vnfd_yaml))
123
124
125 class PackageTestCase(ArchiveTestCase):
126 def create_nsd_package(self):
127 self.add_nsd_yaml()
128 archive = self.create_tar_package_archive()
129 package = archive.create_package()
130
131 return package
132
133 def create_vnfd_package(self):
134 self.add_vnfd_yaml()
135 archive = self.create_tar_package_archive()
136 package = archive.create_package()
137
138 return package
139
140
141 class TestCreateArchive(ArchiveTestCase):
142 def test_create_tar_archive(self):
143 self.add_nsd_yaml()
144 archive = self.create_tar_package_archive()
145 self.assertEquals(set(archive.filenames), {nsd_filename})
146
147 def test_nsd_tar_archive(self):
148 #Write the NSD YAML to the tar file
149 self.add_nsd_yaml()
150
151 archive = self.create_tar_package_archive()
152 with archive.open_file(nsd_filename) as nsd_hdl:
153 nsd_bytes = nsd_hdl.read()
154
155 self.assertEquals(nsd_bytes, nsd_yaml)
156
157
158 class TestPackage(PackageTestCase):
159 def create_vnfd_package_archive(self, package, hdl):
160 # Create an archive from a package
161 archive = rift.package.archive.TarPackageArchive.from_package(
162 self._log, package, hdl,
163 )
164 # Closing the archive writes any closing bytes to the file handle
165 archive.close()
166 hdl.seek(0)
167
168 return archive
169
170 def test_create_nsd_package_from_archive(self):
171 package = self.create_nsd_package()
172 self.assertTrue(isinstance(package, rift.package.package.NsdPackage))
173
174 json_str = package.json_descriptor
175 desc_dict = json.loads(json_str)
176 self.assertIn("nsd:nsd-catalog", desc_dict)
177
178 def test_create_vnfd_package_from_archive(self):
179 package = self.create_vnfd_package()
180 self.assertTrue(isinstance(package, rift.package.package.VnfdPackage))
181
182 json_str = package.json_descriptor
183 desc_dict = json.loads(json_str)
184 self.assertIn("vnfd:vnfd-catalog", desc_dict)
185
186 def test_create_vnfd_archive_from_package(self):
187 package = self.create_vnfd_package()
188 hdl = io.BytesIO()
189 self.create_vnfd_package_archive(package, hdl)
190
191 # Ensure that the archive created was valid
192 with tarfile.open(fileobj=hdl, mode='r|gz'):
193 pass
194
195 def test_round_trip_vnfd_package_from_archive(self):
196 package = self.create_vnfd_package()
197 hdl = io.BytesIO()
198 self.create_vnfd_package_archive(package, hdl)
199
200 archive = rift.package.archive.TarPackageArchive(self._log, hdl)
201 def md5(file_hdl):
202 return rift.package.checksums.checksum(file_hdl)
203
204 # Create the package from the archive and validate file checksums and modes
205 new_package = archive.create_package()
206
207 self.assertEqual(package.files, new_package.files)
208 self.assertEqual(type(package), type(new_package))
209
210 for filename in package.files:
211 pkg_file = package.open(filename)
212 new_pkg_file = new_package.open(filename)
213 self.assertEqual(md5(pkg_file), md5(new_pkg_file))
214
215 def test_create_nsd_package_from_file(self):
216 nsd_file_name = "asdf_nsd.yaml"
217 hdl = io.BytesIO(nsd_yaml)
218 hdl.name = nsd_file_name
219
220 package = rift.package.package.DescriptorPackage.from_descriptor_file_hdl(
221 self._log, hdl
222 )
223 self.assertTrue(isinstance(package, rift.package.package.NsdPackage))
224
225 with package.open(nsd_file_name) as nsd_hdl:
226 nsd_data = nsd_hdl.read()
227 self.assertEquals(yaml.load(nsd_data), yaml.load(nsd_yaml))
228
229 def test_create_vnfd_package_from_file(self):
230 vnfd_file_name = "asdf_vnfd.yaml"
231 hdl = io.BytesIO(vnfd_yaml)
232 hdl.name = vnfd_file_name
233
234 package = rift.package.package.DescriptorPackage.from_descriptor_file_hdl(
235 self._log, hdl
236 )
237 self.assertTrue(isinstance(package, rift.package.package.VnfdPackage))
238
239 with package.open(vnfd_file_name) as vnfd_hdl:
240 vnfd_data = vnfd_hdl.read()
241 self.assertEquals(yaml.load(vnfd_data), yaml.load(vnfd_yaml))
242
243
244 class TestPackageCharmExtractor(PackageTestCase):
245 def add_charm_dir(self, charm_name):
246 charm_dir = "charms/trusty/{}".format(charm_name)
247 charm_file = "{}/actions.yaml".format(charm_dir)
248 charm_text = b"THIS IS A FAKE CHARM"
249 self.add_tarinfo_dir(charm_dir)
250 self.add_tarinfo(charm_file, io.BytesIO(charm_text))
251
252 def test_extract_charm(self):
253 charm_name = "charm_a"
254 self.add_charm_dir(charm_name)
255 package = self.create_vnfd_package()
256 with tempfile.TemporaryDirectory() as tmp_dir:
257 extractor = rift.package.charm.PackageCharmExtractor(self._log, tmp_dir)
258 extractor.extract_charms(package)
259
260 charm_dir = extractor.get_extracted_charm_dir(package.descriptor_id, charm_name)
261 self.assertTrue(os.path.exists(charm_dir))
262 self.assertTrue(os.path.isdir(charm_dir))
263
264
265 class TestPackageIconExtractor(PackageTestCase):
266 def add_icon_file(self, icon_name):
267 icon_file = "icons/{}".format(icon_name)
268 icon_text = b"png file bytes"
269 self.add_tarinfo(icon_file, io.BytesIO(icon_text))
270
271 def test_extract_icon(self):
272 icon_name = "icon_a"
273 self.add_icon_file(icon_name)
274 package = self.create_vnfd_package()
275 with tempfile.TemporaryDirectory() as tmp_dir:
276 extractor = rift.package.icon.PackageIconExtractor(self._log, tmp_dir)
277 extractor.extract_icons(package)
278
279 icon_file = extractor.get_extracted_icon_path(
280 package.descriptor_type, package.descriptor_id, icon_name
281 )
282 self.assertTrue(os.path.exists(icon_file))
283 self.assertTrue(os.path.isfile(icon_file))
284
285
286 class TestPackageScriptExtractor(PackageTestCase):
287 def add_script_file(self, script_name):
288 script_file = "scripts/{}".format(script_name)
289 script_text = b"""#!/usr/bin/python
290 print("hi")
291 """
292 self.add_tarinfo(script_file, io.BytesIO(script_text), mode=0o666)
293
294 def test_extract_script(self):
295 script_name = "add_corporation.py"
296 self.add_script_file(script_name)
297 package = self.create_vnfd_package()
298 with tempfile.TemporaryDirectory() as tmp_dir:
299 extractor = rift.package.script.PackageScriptExtractor(self._log, tmp_dir)
300 extractor.extract_scripts(package)
301
302 script_dir = extractor.get_extracted_script_path(package.descriptor_id, script_name)
303 self.assertTrue(os.path.exists(script_dir))
304 self.assertTrue(os.path.isfile(script_dir))
305
306 class TestPackageCloudInitExtractor(PackageTestCase):
307 def add_cloud_init_file(self, cloud_init_filename):
308 script_file = "cloud_init/{}".format(cloud_init_filename)
309 script_text = b"""#cloud-config"""
310 self.add_tarinfo(script_file, io.BytesIO(script_text), mode=0o666)
311
312 def test_read_cloud_init(self):
313 script_name = "testVM_cloud_init.cfg"
314 valid_script_text = "#cloud-config"
315 self.add_cloud_init_file(script_name)
316 package = self.create_vnfd_package()
317
318 extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
319 cloud_init_contents = extractor.read_script(package, script_name)
320
321 self.assertEquals(cloud_init_contents, valid_script_text)
322
323 def test_cloud_init_file_missing(self):
324 script_name = "testVM_cloud_init.cfg"
325 package = self.create_vnfd_package()
326
327 extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
328
329 with self.assertRaises(rift.package.cloud_init.CloudInitExtractionError):
330 extractor.read_script(package, script_name)
331
332 class TestPackageConfigExtractor(PackageTestCase):
333 def add_ns_config_file(self, nsd_id):
334 config_file = "ns_config/{}.yaml".format(nsd_id)
335 config_text = b""" ns_config """
336 self.add_tarinfo(config_file, io.BytesIO(config_text), mode=0o666)
337
338 return config_file
339
340 def add_vnf_config_file(self, vnfd_id, member_vnf_index):
341 config_file = "vnf_config/{}_{}.yaml".format(vnfd_id, member_vnf_index)
342 config_text = b""" vnf_config """
343 self.add_tarinfo(config_file, io.BytesIO(config_text), mode=0o666)
344
345 return config_file
346
347 def test_extract_config(self):
348 ns_config_file = self.add_ns_config_file("nsd_id")
349 vnf_config_file = self.add_vnf_config_file("vnfd_id", 1)
350 package = self.create_nsd_package()
351 with tempfile.TemporaryDirectory() as tmp_dir:
352 extractor = rift.package.config.PackageConfigExtractor(self._log, tmp_dir)
353 extractor.extract_configs(package)
354
355 dest_ns_config_file = extractor.get_extracted_config_path(package.descriptor_id, ns_config_file)
356 dest_vnf_config_file = extractor.get_extracted_config_path(package.descriptor_id, vnf_config_file)
357 self.assertTrue(os.path.isfile(dest_ns_config_file))
358 self.assertTrue(os.path.isfile(dest_vnf_config_file))
359
360
361 class TestPackageValidator(PackageTestCase):
362 def setUp(self):
363 super().setUp()
364 self._validator = rift.package.package.PackageChecksumValidator(self._log)
365
366 def create_checksum_file(self, file_md5_map):
367 checksum_hdl = io.BytesIO()
368 for file_name, md5 in file_md5_map.items():
369 checksum_hdl.write("{} {}\n".format(md5, file_name).encode())
370
371 checksum_hdl.flush()
372 checksum_hdl.seek(0)
373
374 self.add_tarinfo("checksums.txt", checksum_hdl)
375 self._tar.addfile(tarfile.TarInfo(), checksum_hdl)
376
377 def create_nsd_package_with_checksum(self):
378 self.create_checksum_file(
379 {nsd_filename: file_hdl_md5(io.BytesIO(nsd_yaml))}
380 )
381 package = self.create_nsd_package()
382 return package
383
384 def test_package_no_checksum(self):
385 package = self.create_nsd_package()
386
387 # For now, a missing checksum file will be supported.
388 # No files will be validated.
389 validated_files = self._validator.validate(package)
390 self.assertEquals(validated_files, {})
391
392 def test_package_with_checksum(self):
393 package = self.create_nsd_package_with_checksum()
394 validated_files = self._validator.validate(package)
395 self.assertEquals(list(validated_files.keys()), [nsd_filename])
396
397
398 class TestPackageStore(PackageTestCase):
399 def create_store(self, root_dir):
400 store = rift.package.store.PackageFilesystemStore(self._log, root_dir)
401 return store
402
403 def create_and_store_package(self, store):
404 package = self.create_nsd_package()
405 store.store_package(package)
406
407 return package
408
409 def test_store_package(self):
410 with tempfile.TemporaryDirectory() as root_dir:
411 store = self.create_store(root_dir)
412 package = self.create_and_store_package(store)
413 new_package = store.get_package(package.descriptor_id)
414 self.assertEquals(new_package.files, package.files)
415 self.assertEquals(type(new_package), type(package))
416
417 def test_store_reload_package(self):
418 with tempfile.TemporaryDirectory() as root_dir:
419 store = self.create_store(root_dir)
420 package = self.create_and_store_package(store)
421
422 new_store = self.create_store(root_dir)
423 new_package = new_store.get_package(package.descriptor_id)
424
425 self.assertEquals(new_package.files, package.files)
426 self.assertEquals(type(new_package), type(package))
427
428 def test_delete_package(self):
429 with tempfile.TemporaryDirectory() as root_dir:
430 store = self.create_store(root_dir)
431 package = self.create_and_store_package(store)
432
433 store.get_package(package.descriptor_id)
434 store.delete_package(package.descriptor_id)
435
436 with self.assertRaises(rift.package.store.PackageStoreError):
437 store.get_package(package.descriptor_id)
438
439 def test_store_exist_package(self):
440 with tempfile.TemporaryDirectory() as root_dir:
441 store = self.create_store(root_dir)
442 package = self.create_and_store_package(store)
443
444 with self.assertRaises(rift.package.store.PackageStoreError):
445 store.store_package(package)
446
447
448 class TestTemporaryPackage(PackageTestCase):
449 def test_temp_package(self):
450 self._tar_file_hdl = tempfile.NamedTemporaryFile(delete=False)
451 self._tar = tarfile.open(fileobj=self._tar_file_hdl, mode="w|gz")
452
453 self.assertTrue(os.path.exists(self._tar_file_hdl.name))
454
455 package = self.create_nsd_package()
456 with rift.package.package.TemporaryPackage(self._log, package, self._tar_file_hdl) as temp_pkg:
457 self.assertTrue(package is temp_pkg)
458 self.assertEquals(package.files, temp_pkg.files)
459
460 self.assertFalse(os.path.exists(self._tar_file_hdl.name))
461
462
463 def main(argv=sys.argv[1:]):
464 logging.basicConfig(format='TEST %(message)s')
465
466 runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
467 parser = argparse.ArgumentParser()
468 parser.add_argument('-v', '--verbose', action='store_true')
469 parser.add_argument('-n', '--no-runner', action='store_true')
470
471 args, unknown = parser.parse_known_args(argv)
472 if args.no_runner:
473 runner = None
474
475 # Set the global logging level
476 logging.getLogger().setLevel(logging.DEBUG if args.verbose else logging.ERROR)
477
478 # The unittest framework requires a program name, so use the name of this
479 # file instead (we do not want to have to pass a fake program name to main
480 # when this is called from the interpreter).
481 unittest.main(argv=[__file__] + unknown + ["-v"], testRunner=runner)
482
483 if __name__ == '__main__':
484 main()