b035e440f71774fefd12b1134902a819014a4542
[osm/SO.git] / rwlaunchpad / plugins / rwlaunchpadtasklet / test / utest_export.py
1 #!/usr/bin/env python3
2
3 #
4 # Copyright 2016 RIFT.IO Inc
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18
19 import argparse
20 import asyncio
21 import logging
22 import io
23 import os
24 import sys
25 import tarfile
26 import tempfile
27 import time
28 import unittest
29 import uuid
30 import xmlrunner
31
32 #Setting RIFT_VAR_ROOT if not already set for unit test execution
33 if "RIFT_VAR_ROOT" not in os.environ:
34 os.environ['RIFT_VAR_ROOT'] = os.path.join(os.environ['RIFT_INSTALL'], 'var/rift/unittest')
35
36 import rift.package.archive
37 import rift.package.charm
38 import rift.package.checksums
39 import rift.package.config
40 import rift.package.convert
41 import rift.package.icon
42 import rift.package.package
43 import rift.package.script
44 import rift.package.store
45
46 from rift.tasklets.rwlaunchpad import export
47
48 import gi
49 gi.require_version('RwVnfdYang', '1.0')
50 from gi.repository import (
51 RwVnfdYang,
52 VnfdYang,
53 )
54
55 import utest_package
56
57
58 class TestExport(utest_package.PackageTestCase):
59 def setUp(self):
60 super().setUp()
61 self._exporter = export.DescriptorPackageArchiveExporter(self._log)
62 self._rw_vnfd_serializer = rift.package.convert.RwVnfdSerializer()
63 self._vnfd_serializer = rift.package.convert.VnfdSerializer()
64
65 def test_create_archive(self):
66 rw_vnfd_msg = RwVnfdYang.YangData_Vnfd_VnfdCatalog_Vnfd(
67 id="new_id", name="new_name", description="new_description"
68 )
69 json_desc_str = self._rw_vnfd_serializer.to_json_string(rw_vnfd_msg)
70
71 vnfd_package = self.create_vnfd_package()
72 with io.BytesIO() as archive_hdl:
73 archive = self._exporter.create_archive(
74 archive_hdl, vnfd_package, json_desc_str, self._rw_vnfd_serializer
75 )
76
77 archive_hdl.seek(0)
78
79 # Create a new read-only archive from the archive handle and a package from that archive
80 archive = rift.package.archive.TarPackageArchive(self._log, archive_hdl)
81 package = archive.create_package()
82
83 # Ensure that the descriptor in the package has been overwritten
84 self.assertEqual(package.descriptor_msg, rw_vnfd_msg)
85
86 def test_export_package(self):
87 rw_vnfd_msg = RwVnfdYang.YangData_Vnfd_VnfdCatalog_Vnfd(
88 id="new_id", name="new_name", description="new_description",
89 meta="THIS FIELD IS NOT IN REGULAR VNFD"
90 )
91 vnfd_msg = VnfdYang.YangData_Vnfd_VnfdCatalog_Vnfd()
92 vnfd_msg.from_dict(rw_vnfd_msg.as_dict(), ignore_missing_keys=True)
93
94 self.assertNotEqual(rw_vnfd_msg, vnfd_msg)
95
96 json_desc_str = self._rw_vnfd_serializer.to_json_string(rw_vnfd_msg)
97
98 with tempfile.TemporaryDirectory() as tmp_dir:
99 vnfd_package = self.create_vnfd_package()
100 pkg_id = str(uuid.uuid4())
101 exported_path = self._exporter.export_package(
102 vnfd_package, tmp_dir, pkg_id, json_desc_str, self._vnfd_serializer
103 )
104
105 self.assertTrue(os.path.isfile(exported_path))
106 self.assertTrue(tarfile.is_tarfile(exported_path))
107
108 with open(exported_path, "rb") as archive_hdl:
109 archive = rift.package.archive.TarPackageArchive(self._log, archive_hdl)
110 package = archive.create_package()
111
112 self.assertEqual(package.descriptor_msg, vnfd_msg)
113
114 def test_export_cleanup(self):
115 loop = asyncio.get_event_loop()
116 with tempfile.TemporaryDirectory() as tmp_dir:
117 archive_files = [tempfile.mkstemp(dir=tmp_dir, suffix=".tar.gz")[1] for _ in range(2)]
118
119 # Set the mtime on only one of the files to test the min_age_secs argument
120 times = (time.time(), time.time() - 10)
121 os.utime(archive_files[0], times)
122
123 task = loop.create_task(
124 export.periodic_export_cleanup(
125 self._log, loop, tmp_dir, period_secs=.01, min_age_secs=5
126 )
127 )
128 loop.run_until_complete(asyncio.sleep(.05, loop=loop))
129
130 if task.done() and task.exception() is not None:
131 raise task.exception()
132
133 self.assertFalse(task.done())
134
135 self.assertFalse(os.path.exists(archive_files[0]))
136 self.assertTrue(os.path.exists(archive_files[1]))
137
138 def main(argv=sys.argv[1:]):
139 logging.basicConfig(format='TEST %(message)s')
140
141 runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
142 parser = argparse.ArgumentParser()
143 parser.add_argument('-v', '--verbose', action='store_true')
144 parser.add_argument('-n', '--no-runner', action='store_true')
145
146 args, unknown = parser.parse_known_args(argv)
147 if args.no_runner:
148 runner = None
149
150 # Set the global logging level
151 logging.getLogger().setLevel(logging.DEBUG if args.verbose else logging.ERROR)
152
153 # The unittest framework requires a program name, so use the name of this
154 # file instead (we do not want to have to pass a fake program name to main
155 # when this is called from the interpreter).
156 unittest.main(argv=[__file__] + unknown + ["-v"], testRunner=runner)
157
158 if __name__ == '__main__':
159 main()