update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwlaunchpadtasklet / test / utest_package.py
1 #!/usr/bin/env python3
2
3 #
4 # Copyright 2016 RIFT.IO Inc
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18
19
20 import argparse
21 import logging
22 import io
23 import json
24 import os
25 import sys
26 import tarfile
27 import tempfile
28 import unittest
29 import xmlrunner
30 import yaml
31
32 #Setting RIFT_VAR_ROOT if not already set for unit test execution
33 if "RIFT_VAR_ROOT" not in os.environ:
34 os.environ['RIFT_VAR_ROOT'] = os.path.join(os.environ['RIFT_INSTALL'], 'var/rift/unittest')
35
36 import rift.package.archive
37 import rift.package.package
38 import rift.package.icon
39 import rift.package.script
40 import rift.package.store
41 import rift.package.checksums
42 import rift.package.cloud_init
43
44
45 nsd_yaml = b"""nsd:nsd-catalog:
46 nsd:nsd:
47 - nsd:id: gw_corpA
48 nsd:name: gw_corpA
49 nsd:description: Gateways to access as corpA to PE1 and PE2
50 """
51
52 vnfd_yaml = b"""vnfd:vnfd-catalog:
53 vnfd:vnfd:
54 - vnfd:id: gw_corpA_vnfd
55 vnfd:name: gw_corpA_vnfd
56 vnfd:description: Gateways to access as corpA to PE1 and PE2
57 """
58
59 nsd_filename = "gw_corpA__nsd.yaml"
60 vnfd_filename = "gw_corpA__vnfd.yaml"
61
62
63 def file_hdl_md5(file_hdl):
64 return rift.package.checksums.checksum(file_hdl)
65
66
67 class ArchiveTestCase(unittest.TestCase):
68 def setUp(self):
69 self._log = logging.getLogger()
70
71 self._tar_file_hdl = io.BytesIO()
72 self._tar = tarfile.open(fileobj=self._tar_file_hdl, mode="w|gz")
73
74 self._nsd_yaml_hdl = io.BytesIO(nsd_yaml)
75 self._vnfd_yaml_hdl = io.BytesIO(vnfd_yaml)
76
77 def tearDown(self):
78 self._nsd_yaml_hdl.close()
79 self._vnfd_yaml_hdl.close()
80 self._tar.close()
81 self._tar_file_hdl.close()
82
83 def create_tar_package_archive(self):
84 self._tar.close()
85 self._tar_file_hdl.flush()
86 self._tar_file_hdl.seek(0)
87 archive = rift.package.package.TarPackageArchive(
88 log=self._log,
89 tar_file_hdl=self._tar_file_hdl,
90 )
91
92 return archive
93
94 def add_tarinfo(self, name, file_hdl, mode=0o777):
95 tarinfo = tarfile.TarInfo(name)
96 tarinfo.size = len(file_hdl.read())
97 assert tarinfo.size > 0
98 file_hdl.seek(0)
99 self._tar.addfile(tarinfo, file_hdl)
100
101 def add_tarinfo_dir(self, name):
102 tarinfo = tarfile.TarInfo(name)
103 tarinfo.type = tarfile.DIRTYPE
104 self._tar.addfile(tarinfo)
105
106 def add_nsd_yaml(self):
107 self.add_tarinfo(nsd_filename, io.BytesIO(nsd_yaml))
108
109 def add_vnfd_yaml(self):
110 self.add_tarinfo(vnfd_filename, io.BytesIO(vnfd_yaml))
111
112
113 class PackageTestCase(ArchiveTestCase):
114 def create_nsd_package(self):
115 self.add_nsd_yaml()
116 archive = self.create_tar_package_archive()
117 package = archive.create_package()
118
119 return package
120
121 def create_vnfd_package(self):
122 self.add_vnfd_yaml()
123 archive = self.create_tar_package_archive()
124 package = archive.create_package()
125
126 return package
127
128
129 class TestCreateArchive(ArchiveTestCase):
130 def test_create_tar_archive(self):
131 self.add_nsd_yaml()
132 archive = self.create_tar_package_archive()
133 self.assertEquals(set(archive.filenames), {nsd_filename})
134
135 def test_nsd_tar_archive(self):
136 #Write the NSD YAML to the tar file
137 self.add_nsd_yaml()
138
139 archive = self.create_tar_package_archive()
140 with archive.open_file(nsd_filename) as nsd_hdl:
141 nsd_bytes = nsd_hdl.read()
142
143 self.assertEquals(nsd_bytes, nsd_yaml)
144
145
146 class TestPackage(PackageTestCase):
147 def create_vnfd_package_archive(self, package, hdl):
148 # Create an archive from a package
149 archive = rift.package.archive.TarPackageArchive.from_package(
150 self._log, package, hdl,
151 )
152 # Closing the archive writes any closing bytes to the file handle
153 archive.close()
154 hdl.seek(0)
155
156 return archive
157
158 def test_create_nsd_package_from_archive(self):
159 package = self.create_nsd_package()
160 self.assertTrue(isinstance(package, rift.package.package.NsdPackage))
161
162 json_str = package.json_descriptor
163 desc_dict = json.loads(json_str)
164 self.assertIn("nsd:nsd-catalog", desc_dict)
165
166 def test_create_vnfd_package_from_archive(self):
167 package = self.create_vnfd_package()
168 self.assertTrue(isinstance(package, rift.package.package.VnfdPackage))
169
170 json_str = package.json_descriptor
171 desc_dict = json.loads(json_str)
172 self.assertIn("vnfd:vnfd-catalog", desc_dict)
173
174 def test_create_vnfd_archive_from_package(self):
175 package = self.create_vnfd_package()
176 hdl = io.BytesIO()
177 self.create_vnfd_package_archive(package, hdl)
178
179 # Ensure that the archive created was valid
180 with tarfile.open(fileobj=hdl, mode='r|gz'):
181 pass
182
183 def test_round_trip_vnfd_package_from_archive(self):
184 package = self.create_vnfd_package()
185 hdl = io.BytesIO()
186 self.create_vnfd_package_archive(package, hdl)
187
188 archive = rift.package.archive.TarPackageArchive(self._log, hdl)
189 def md5(file_hdl):
190 return rift.package.checksums.checksum(file_hdl)
191
192 # Create the package from the archive and validate file checksums and modes
193 new_package = archive.create_package()
194
195 self.assertEqual(package.files, new_package.files)
196 self.assertEqual(type(package), type(new_package))
197
198 for filename in package.files:
199 pkg_file = package.open(filename)
200 new_pkg_file = new_package.open(filename)
201 self.assertEqual(md5(pkg_file), md5(new_pkg_file))
202
203 def test_create_nsd_package_from_file(self):
204 nsd_file_name = "asdf_nsd.yaml"
205 hdl = io.BytesIO(nsd_yaml)
206 hdl.name = nsd_file_name
207
208 package = rift.package.package.DescriptorPackage.from_descriptor_file_hdl(
209 self._log, hdl
210 )
211 self.assertTrue(isinstance(package, rift.package.package.NsdPackage))
212
213 with package.open(nsd_file_name) as nsd_hdl:
214 nsd_data = nsd_hdl.read()
215 self.assertEquals(yaml.load(nsd_data), yaml.load(nsd_yaml))
216
217 def test_create_vnfd_package_from_file(self):
218 vnfd_file_name = "asdf_vnfd.yaml"
219 hdl = io.BytesIO(vnfd_yaml)
220 hdl.name = vnfd_file_name
221
222 package = rift.package.package.DescriptorPackage.from_descriptor_file_hdl(
223 self._log, hdl
224 )
225 self.assertTrue(isinstance(package, rift.package.package.VnfdPackage))
226
227 with package.open(vnfd_file_name) as vnfd_hdl:
228 vnfd_data = vnfd_hdl.read()
229 self.assertEquals(yaml.load(vnfd_data), yaml.load(vnfd_yaml))
230
231
232
233 class TestPackageIconExtractor(PackageTestCase):
234 def add_icon_file(self, icon_name):
235 icon_file = "icons/{}".format(icon_name)
236 icon_text = b"png file bytes"
237 self.add_tarinfo(icon_file, io.BytesIO(icon_text))
238
239 def test_extract_icon(self):
240 icon_name = "icon_a"
241 self.add_icon_file(icon_name)
242 package = self.create_vnfd_package()
243 with tempfile.TemporaryDirectory() as tmp_dir:
244 extractor = rift.package.icon.PackageIconExtractor(self._log, tmp_dir)
245 extractor.extract_icons(package)
246
247 icon_file = extractor.get_extracted_icon_path(
248 package.descriptor_type, package.descriptor_id, icon_name
249 )
250 self.assertTrue(os.path.exists(icon_file))
251 self.assertTrue(os.path.isfile(icon_file))
252
253
254 class TestPackageScriptExtractor(PackageTestCase):
255 def add_script_file(self, script_name):
256 script_file = "scripts/{}".format(script_name)
257 script_text = b"""#!/usr/bin/python
258 print("hi")
259 """
260 self.add_tarinfo(script_file, io.BytesIO(script_text), mode=0o666)
261
262 def test_extract_script(self):
263 script_name = "add_corporation.py"
264 self.add_script_file(script_name)
265 package = self.create_vnfd_package()
266 with tempfile.TemporaryDirectory() as tmp_dir:
267 extractor = rift.package.script.PackageScriptExtractor(self._log, tmp_dir)
268 extractor.extract_scripts(package)
269
270 script_dir = extractor.get_extracted_script_path(package.descriptor_id, script_name)
271 self.assertTrue(os.path.exists(script_dir))
272 self.assertTrue(os.path.isfile(script_dir))
273
274 class TestPackageCloudInitExtractor(PackageTestCase):
275 def add_cloud_init_file(self, cloud_init_filename):
276 script_file = "cloud_init/{}".format(cloud_init_filename)
277 script_text = b"""#cloud-config"""
278 self.add_tarinfo(script_file, io.BytesIO(script_text), mode=0o666)
279
280 def test_read_cloud_init(self):
281 script_name = "testVM_cloud_init.cfg"
282 valid_script_text = "#cloud-config"
283 self.add_cloud_init_file(script_name)
284 package = self.create_vnfd_package()
285
286 extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
287 cloud_init_contents = extractor.read_script(package, script_name)
288
289 self.assertEquals(cloud_init_contents, valid_script_text)
290
291 def test_cloud_init_file_missing(self):
292 script_name = "testVM_cloud_init.cfg"
293 package = self.create_vnfd_package()
294
295 extractor = rift.package.cloud_init.PackageCloudInitExtractor(self._log)
296
297 with self.assertRaises(rift.package.cloud_init.CloudInitExtractionError):
298 extractor.read_script(package, script_name)
299
300 class TestPackageValidator(PackageTestCase):
301 def setUp(self):
302 super().setUp()
303 self._validator = rift.package.package.PackageChecksumValidator(self._log)
304
305 def create_checksum_file(self, file_md5_map):
306 checksum_hdl = io.BytesIO()
307 for file_name, md5 in file_md5_map.items():
308 checksum_hdl.write("{} {}\n".format(md5, file_name).encode())
309
310 checksum_hdl.flush()
311 checksum_hdl.seek(0)
312
313 self.add_tarinfo("checksums.txt", checksum_hdl)
314 self._tar.addfile(tarfile.TarInfo(), checksum_hdl)
315
316 def create_nsd_package_with_checksum(self):
317 self.create_checksum_file(
318 {nsd_filename: file_hdl_md5(io.BytesIO(nsd_yaml))}
319 )
320 package = self.create_nsd_package()
321 return package
322
323 def test_package_no_checksum(self):
324 package = self.create_nsd_package()
325
326 # For now, a missing checksum file will be supported.
327 # No files will be validated.
328 self._validator.validate(package)
329 validated_files = self._validator.checksums
330 self.assertEquals(validated_files, {})
331
332 def test_package_with_checksum(self):
333 package = self.create_nsd_package_with_checksum()
334 self._validator.validate(package)
335 validated_files = self._validator.checksums
336 self.assertEquals(list(validated_files.keys()), [nsd_filename])
337
338
339 class TestPackageStore(PackageTestCase):
340 def create_store(self, root_dir):
341 store = rift.package.store.PackageFilesystemStore(self._log, root_dir)
342 return store
343
344 def create_and_store_package(self, store):
345 package = self.create_nsd_package()
346 store.store_package(package)
347
348 return package
349
350 def test_store_package(self):
351 with tempfile.TemporaryDirectory() as root_dir:
352 store = self.create_store(root_dir)
353 package = self.create_and_store_package(store)
354 new_package = store.get_package(package.descriptor_id)
355 self.assertEquals(new_package.files, package.files)
356 self.assertEquals(type(new_package), type(package))
357
358 def test_store_reload_package(self):
359 with tempfile.TemporaryDirectory() as root_dir:
360 store = self.create_store(root_dir)
361 package = self.create_and_store_package(store)
362
363 new_store = self.create_store(root_dir)
364 new_package = new_store.get_package(package.descriptor_id)
365
366 self.assertEquals(new_package.files, package.files)
367 self.assertEquals(type(new_package), type(package))
368
369 def test_delete_package(self):
370 with tempfile.TemporaryDirectory() as root_dir:
371 store = self.create_store(root_dir)
372 package = self.create_and_store_package(store)
373
374 store.get_package(package.descriptor_id)
375 store.delete_package(package.descriptor_id)
376
377 with self.assertRaises(rift.package.store.PackageStoreError):
378 store.get_package(package.descriptor_id)
379
380 def test_store_exist_package(self):
381 with tempfile.TemporaryDirectory() as root_dir:
382 store = self.create_store(root_dir)
383 package = self.create_and_store_package(store)
384
385 with self.assertRaises(rift.package.store.PackageStoreError):
386 store.store_package(package)
387
388
389 class TestTemporaryPackage(PackageTestCase):
390 def test_temp_package(self):
391 self._tar_file_hdl = tempfile.NamedTemporaryFile(delete=False)
392 self._tar = tarfile.open(fileobj=self._tar_file_hdl, mode="w|gz")
393
394 self.assertTrue(os.path.exists(self._tar_file_hdl.name))
395
396 package = self.create_nsd_package()
397 with rift.package.package.TemporaryPackage(self._log, package, self._tar_file_hdl) as temp_pkg:
398 self.assertTrue(package is temp_pkg)
399 self.assertEquals(package.files, temp_pkg.files)
400
401 self.assertFalse(os.path.exists(self._tar_file_hdl.name))
402
403
404 def main(argv=sys.argv[1:]):
405 logging.basicConfig(format='TEST %(message)s')
406
407 runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
408 parser = argparse.ArgumentParser()
409 parser.add_argument('-v', '--verbose', action='store_true')
410 parser.add_argument('-n', '--no-runner', action='store_true')
411
412 args, unknown = parser.parse_known_args(argv)
413 if args.no_runner:
414 runner = None
415
416 # Set the global logging level
417 logging.getLogger().setLevel(logging.DEBUG if args.verbose else logging.ERROR)
418
419 # The unittest framework requires a program name, so use the name of this
420 # file instead (we do not want to have to pass a fake program name to main
421 # when this is called from the interpreter).
422 unittest.main(argv=[__file__] + unknown + ["-v"], testRunner=runner)
423
424 if __name__ == '__main__':
425 main()