Bug 210 - Disable NFVI metrics
[osm/SO.git] / rwlaunchpad / plugins / rwlaunchpadtasklet / test / utest_package.py
1 #!/usr/bin/env python3
2
3 #
4 # Copyright 2016 RIFT.IO Inc
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18
19
20 import argparse
21 import logging
22 import io
23 import json
24 import os
25 import sys
26 import tarfile
27 import tempfile
28 import unittest
29 import xmlrunner
30 import yaml
31
32 import rift.package.archive
33 import rift.package.package
34 import rift.package.charm
35 import rift.package.icon
36 import rift.package.script
37 import rift.package.config
38 import rift.package.store
39 import rift.package.checksums
40 import rift.package.cloud_init
41
42
43 import gi
44 gi.require_version('RwpersonDbYang', '1.0')
45 gi.require_version('RwYang', '1.0')
46
47 from gi.repository import (
48 RwpersonDbYang,
49 RwYang,
50 )
51
52
53 nsd_yaml = b"""nsd:nsd-catalog:
54 nsd:nsd:
55 - nsd:id: gw_corpA
56 nsd:name: gw_corpA
57 nsd:description: Gateways to access as corpA to PE1 and PE2
58 """
59
60 vnfd_yaml = b"""vnfd:vnfd-catalog:
61 vnfd:vnfd:
62 - vnfd:id: gw_corpA_vnfd
63 vnfd:name: gw_corpA_vnfd
64 vnfd:description: Gateways to access as corpA to PE1 and PE2
65 """
66
67 nsd_filename = "gw_corpA__nsd.yaml"
68 vnfd_filename = "gw_corpA__vnfd.yaml"
69
70
71 def file_hdl_md5(file_hdl):
72 return rift.package.checksums.checksum(file_hdl)
73
74
75 class ArchiveTestCase(unittest.TestCase):
76 def setUp(self):
77 self._log = logging.getLogger()
78
79 self._tar_file_hdl = io.BytesIO()
80 self._tar = tarfile.open(fileobj=self._tar_file_hdl, mode="w|gz")
81
82 self._nsd_yaml_hdl = io.BytesIO(nsd_yaml)
83 self._vnfd_yaml_hdl = io.BytesIO(vnfd_yaml)
84
85 def tearDown(self):
86 self._nsd_yaml_hdl.close()
87 self._vnfd_yaml_hdl.close()
88 self._tar.close()
89 self._tar_file_hdl.close()
90
91 def create_tar_package_archive(self):
92 self._tar.close()
93 self._tar_file_hdl.flush()
94 self._tar_file_hdl.seek(0)
95 archive = rift.package.package.TarPackageArchive(
96 log=self._log,
97 tar_file_hdl=self._tar_file_hdl,
98 )
99
100 return archive
101
102 def add_tarinfo(self, name, file_hdl, mode=0o777):
103 tarinfo = tarfile.TarInfo(name)
104 tarinfo.size = len(file_hdl.read())
105 assert tarinfo.size > 0
106 file_hdl.seek(0)
107 self._tar.addfile(tarinfo, file_hdl)
108
109 def add_tarinfo_dir(self, name):
110 tarinfo = tarfile.TarInfo(name)
111 tarinfo.type = tarfile.DIRTYPE
112 self._tar.addfile(tarinfo)
113
114 def add_nsd_yaml(self):
115 self.add_tarinfo(nsd_filename, io.BytesIO(nsd_yaml))
116
117 def add_vnfd_yaml(self):
118 self.add_tarinfo(vnfd_filename, io.BytesIO(vnfd_yaml))
119
120
121 class PackageTestCase(ArchiveTestCase):
122 def create_nsd_package(self):
123 self.add_nsd_yaml()
124 archive = self.create_tar_package_archive()
125 package = archive.create_package()
126
127 return package
128
129 def create_vnfd_package(self):
130 self.add_vnfd_yaml()
131 archive = self.create_tar_package_archive()
132 package = archive.create_package()
133
134 return package
135
136
137 class TestCreateArchive(ArchiveTestCase):
138 def test_create_tar_archive(self):
139 self.add_nsd_yaml()
140 archive = self.create_tar_package_archive()
141 self.assertEquals(set(archive.filenames), {nsd_filename})
142
143 def test_nsd_tar_archive(self):
144 #Write the NSD YAML to the tar file
145 self.add_nsd_yaml()
146
147 archive = self.create_tar_package_archive()
148 with archive.open_file(nsd_filename) as nsd_hdl:
149 nsd_bytes = nsd_hdl.read()
150
151 self.assertEquals(nsd_bytes, nsd_yaml)
152
153
154 class TestPackage(PackageTestCase):
155 def create_vnfd_package_archive(self, package, hdl):
156 # Create an archive from a package
157 archive = rift.package.archive.TarPackageArchive.from_package(
158 self._log, package, hdl,
159 )
160 # Closing the archive writes any closing bytes to the file handle
161 archive.close()
162 hdl.seek(0)
163
164 return archive
165
166 def test_create_nsd_package_from_archive(self):
167 package = self.create_nsd_package()
168 self.assertTrue(isinstance(package, rift.package.package.NsdPackage))
169
170 json_str = package.json_descriptor
171 desc_dict = json.loads(json_str)
172 self.assertIn("nsd:nsd-catalog", desc_dict)
173
174 def test_create_vnfd_package_from_archive(self):
175 package = self.create_vnfd_package()
176 self.assertTrue(isinstance(package, rift.package.package.VnfdPackage))
177
178 json_str = package.json_descriptor
179 desc_dict = json.loads(json_str)
180 self.assertIn("vnfd:vnfd-catalog", desc_dict)
181
182 def test_create_vnfd_archive_from_package(self):
183 package = self.create_vnfd_package()
184 hdl = io.BytesIO()
185 self.create_vnfd_package_archive(package, hdl)
186
187 # Ensure that the archive created was valid
188 with tarfile.open(fileobj=hdl, mode='r|gz'):
189 pass
190
191 def test_round_trip_vnfd_package_from_archive(self):
192 package = self.create_vnfd_package()
193 hdl = io.BytesIO()
194 self.create_vnfd_package_archive(package, hdl)
195
196 archive = rift.package.archive.TarPackageArchive(self._log, hdl)
197 def md5(file_hdl):
198 return rift.package.checksums.checksum(file_hdl)
199
200 # Create the package from the archive and validate file checksums and modes
201 new_package = archive.create_package()
202
203 self.assertEqual(package.files, new_package.files)
204 self.assertEqual(type(package), type(new_package))
205
206 for filename in package.files:
207 pkg_file = package.open(filename)
208 new_pkg_file = new_package.open(filename)
209 self.assertEqual(md5(pkg_file), md5(new_pkg_file))
210
211 def test_create_nsd_package_from_file(self):
212 nsd_file_name = "asdf_nsd.yaml"
213 hdl = io.BytesIO(nsd_yaml)
214 hdl.name = nsd_file_name
215
216 package = rift.package.package.DescriptorPackage.from_descriptor_file_hdl(
217 self._log, hdl
218 )
219 self.assertTrue(isinstance(package, rift.package.package.NsdPackage))
220
221 with package.open(nsd_file_name) as nsd_hdl:
222 nsd_data = nsd_hdl.read()
223 self.assertEquals(yaml.load(nsd_data), yaml.load(nsd_yaml))
224
225 def test_create_vnfd_package_from_file(self):
226 vnfd_file_name = "asdf_vnfd.yaml"
227 hdl = io.BytesIO(vnfd_yaml)
228 hdl.name = vnfd_file_name
229
230 package = rift.package.package.DescriptorPackage.from_descriptor_file_hdl(
231 self._log, hdl
232 )
233 self.assertTrue(isinstance(package, rift.package.package.VnfdPackage))
234
235 with package.open(vnfd_file_name) as vnfd_hdl:
236 vnfd_data = vnfd_hdl.read()
237 self.assertEquals(yaml.load(vnfd_data), yaml.load(vnfd_yaml))
238
239
240 class TestPackageCharmExtractor(PackageTestCase):
241 def add_charm_dir(self, charm_name):
242 charm_dir = "charms/trusty/{}".format(charm_name)
243 charm_file = "{}/actions.yaml".format(charm_dir)
244 charm_text = b"THIS IS A FAKE CHARM"
245 self.add_tarinfo_dir(charm_dir)
246 self.add_tarinfo(charm_file, io.BytesIO(charm_text))
247
248 def test_extract_charm(self):
249 charm_name = "charm_a"
250 self.add_charm_dir(charm_name)
251 package = self.create_vnfd_package()
252 with tempfile.TemporaryDirectory() as tmp_dir:
253 extractor = rift.package.charm.PackageCharmExtractor(self._log, tmp_dir)
254 extractor.extract_charms(package)
255
256 charm_dir = extractor.get_extracted_charm_dir(package.descriptor_id, charm_name)
257 self.assertTrue(os.path.exists(charm_dir))
258 self.assertTrue(os.path.isdir(charm_dir))
259
260
261 class TestPackageIconExtractor(PackageTestCase):
262 def add_icon_file(self, icon_name):
263 icon_file = "icons/{}".format(icon_name)
264 icon_text = b"png file bytes"
265 self.add_tarinfo(icon_file, io.BytesIO(icon_text))
266
267 def test_extract_icon(self):
268 icon_name = "icon_a"
269 self.add_icon_file(icon_name)
270 package = self.create_vnfd_package()
271 with tempfile.TemporaryDirectory() as tmp_dir:
272 extractor = rift.package.icon.PackageIconExtractor(self._log, tmp_dir)
273 extractor.extract_icons(package)
274
275 icon_file = extractor.get_extracted_icon_path(
276 package.descriptor_type, package.descriptor_id, icon_name
277 )
278 self.assertTrue(os.path.exists(icon_file))
279 self.assertTrue(os.path.isfile(icon_file))
280
281
282 class TestPackageScriptExtractor(PackageTestCase):
283 def add_script_file(self, script_name):
284 script_file = "scripts/{}".format(script_name)
285 script_text = b"""#!/usr/bin/python
286 print("hi")
287 """
288 self.add_tarinfo(script_file, io.BytesIO(script_text), mode=0o666)
289
290 def test_extract_script(self):
291 script_name = "add_corporation.py"
292 self.add_script_file(script_name)
293 package = self.create_vnfd_package()
294 with tempfile.TemporaryDirectory() as tmp_dir:
295 extractor = rift.package.script.PackageScriptExtractor(self._log, tmp_dir)
296 extractor.extract_scripts(package)
297
298 script_dir = extractor.get_extracted_script_path(package.descriptor_id, script_name)
299 self.assertTrue(os.path.exists(script_dir))
300 self.assertTrue(os.path.isfile(script_dir))
301
302 class TestPackageCloudInitExtractor(PackageTestCase):
303 def add_cloud_init_file(self, cloud_init_filename):
304 script_file = "cloud_init/{}".format(cloud_init_filename)
305 script_text = b"""#cloud-config"""
306 self.add_tarinfo(script_file, io.BytesIO(script_text), mode=0o666)
307
308 def test_read_cloud_init(self):
309 script_name = "testVM_cloud_init.cfg"
310 valid_script_text = "#cloud-config"
311 self.add_cloud_init_file(script_name)
312 package = self.create_vnfd_package()
313
314 extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
315 cloud_init_contents = extractor.read_script(package, script_name)
316
317 self.assertEquals(cloud_init_contents, valid_script_text)
318
319 def test_cloud_init_file_missing(self):
320 script_name = "testVM_cloud_init.cfg"
321 package = self.create_vnfd_package()
322
323 extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
324
325 with self.assertRaises(rift.package.cloud_init.CloudInitExtractionError):
326 extractor.read_script(package, script_name)
327
328 class TestPackageConfigExtractor(PackageTestCase):
329 def add_ns_config_file(self, nsd_id):
330 config_file = "ns_config/{}.yaml".format(nsd_id)
331 config_text = b""" ns_config """
332 self.add_tarinfo(config_file, io.BytesIO(config_text), mode=0o666)
333
334 return config_file
335
336 def add_vnf_config_file(self, vnfd_id, member_vnf_index):
337 config_file = "vnf_config/{}_{}.yaml".format(vnfd_id, member_vnf_index)
338 config_text = b""" vnf_config """
339 self.add_tarinfo(config_file, io.BytesIO(config_text), mode=0o666)
340
341 return config_file
342
343 def test_extract_config(self):
344 ns_config_file = self.add_ns_config_file("nsd_id")
345 vnf_config_file = self.add_vnf_config_file("vnfd_id", 1)
346 package = self.create_nsd_package()
347 with tempfile.TemporaryDirectory() as tmp_dir:
348 extractor = rift.package.config.PackageConfigExtractor(self._log, tmp_dir)
349 extractor.extract_configs(package)
350
351 dest_ns_config_file = extractor.get_extracted_config_path(package.descriptor_id, ns_config_file)
352 dest_vnf_config_file = extractor.get_extracted_config_path(package.descriptor_id, vnf_config_file)
353 self.assertTrue(os.path.isfile(dest_ns_config_file))
354 self.assertTrue(os.path.isfile(dest_vnf_config_file))
355
356
357 class TestPackageValidator(PackageTestCase):
358 def setUp(self):
359 super().setUp()
360 self._validator = rift.package.package.PackageChecksumValidator(self._log)
361
362 def create_checksum_file(self, file_md5_map):
363 checksum_hdl = io.BytesIO()
364 for file_name, md5 in file_md5_map.items():
365 checksum_hdl.write("{} {}\n".format(md5, file_name).encode())
366
367 checksum_hdl.flush()
368 checksum_hdl.seek(0)
369
370 self.add_tarinfo("checksums.txt", checksum_hdl)
371 self._tar.addfile(tarfile.TarInfo(), checksum_hdl)
372
373 def create_nsd_package_with_checksum(self):
374 self.create_checksum_file(
375 {nsd_filename: file_hdl_md5(io.BytesIO(nsd_yaml))}
376 )
377 package = self.create_nsd_package()
378 return package
379
380 def test_package_no_checksum(self):
381 package = self.create_nsd_package()
382
383 # For now, a missing checksum file will be supported.
384 # No files will be validated.
385 validated_files = self._validator.validate(package)
386 self.assertEquals(validated_files, {})
387
388 def test_package_with_checksum(self):
389 package = self.create_nsd_package_with_checksum()
390 validated_files = self._validator.validate(package)
391 self.assertEquals(list(validated_files.keys()), [nsd_filename])
392
393
394 class TestPackageStore(PackageTestCase):
395 def create_store(self, root_dir):
396 store = rift.package.store.PackageFilesystemStore(self._log, root_dir)
397 return store
398
399 def create_and_store_package(self, store):
400 package = self.create_nsd_package()
401 store.store_package(package)
402
403 return package
404
405 def test_store_package(self):
406 with tempfile.TemporaryDirectory() as root_dir:
407 store = self.create_store(root_dir)
408 package = self.create_and_store_package(store)
409 new_package = store.get_package(package.descriptor_id)
410 self.assertEquals(new_package.files, package.files)
411 self.assertEquals(type(new_package), type(package))
412
413 def test_store_reload_package(self):
414 with tempfile.TemporaryDirectory() as root_dir:
415 store = self.create_store(root_dir)
416 package = self.create_and_store_package(store)
417
418 new_store = self.create_store(root_dir)
419 new_package = new_store.get_package(package.descriptor_id)
420
421 self.assertEquals(new_package.files, package.files)
422 self.assertEquals(type(new_package), type(package))
423
424 def test_delete_package(self):
425 with tempfile.TemporaryDirectory() as root_dir:
426 store = self.create_store(root_dir)
427 package = self.create_and_store_package(store)
428
429 store.get_package(package.descriptor_id)
430 store.delete_package(package.descriptor_id)
431
432 with self.assertRaises(rift.package.store.PackageStoreError):
433 store.get_package(package.descriptor_id)
434
435 def test_store_exist_package(self):
436 with tempfile.TemporaryDirectory() as root_dir:
437 store = self.create_store(root_dir)
438 package = self.create_and_store_package(store)
439
440 with self.assertRaises(rift.package.store.PackageStoreError):
441 store.store_package(package)
442
443
444 class TestTemporaryPackage(PackageTestCase):
445 def test_temp_package(self):
446 self._tar_file_hdl = tempfile.NamedTemporaryFile(delete=False)
447 self._tar = tarfile.open(fileobj=self._tar_file_hdl, mode="w|gz")
448
449 self.assertTrue(os.path.exists(self._tar_file_hdl.name))
450
451 package = self.create_nsd_package()
452 with rift.package.package.TemporaryPackage(self._log, package, self._tar_file_hdl) as temp_pkg:
453 self.assertTrue(package is temp_pkg)
454 self.assertEquals(package.files, temp_pkg.files)
455
456 self.assertFalse(os.path.exists(self._tar_file_hdl.name))
457
458
459 def main(argv=sys.argv[1:]):
460 logging.basicConfig(format='TEST %(message)s')
461
462 runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
463 parser = argparse.ArgumentParser()
464 parser.add_argument('-v', '--verbose', action='store_true')
465 parser.add_argument('-n', '--no-runner', action='store_true')
466
467 args, unknown = parser.parse_known_args(argv)
468 if args.no_runner:
469 runner = None
470
471 # Set the global logging level
472 logging.getLogger().setLevel(logging.DEBUG if args.verbose else logging.ERROR)
473
474 # The unittest framework requires a program name, so use the name of this
475 # file instead (we do not want to have to pass a fake program name to main
476 # when this is called from the interpreter).
477 unittest.main(argv=[__file__] + unknown + ["-v"], testRunner=runner)
478
479 if __name__ == '__main__':
480 main()