Merge from OSM SO master
[osm/SO.git] / rwlaunchpad / plugins / rwlaunchpadtasklet / test / utest_export.py
1 #!/usr/bin/env python3
2
3 #
4 # Copyright 2016 RIFT.IO Inc
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18
19 import argparse
20 import asyncio
21 import logging
22 import io
23 import os
24 import sys
25 import tarfile
26 import tempfile
27 import time
28 import unittest
29 import uuid
30 import xmlrunner
31
32 import rift.package.archive
33 import rift.package.charm
34 import rift.package.checksums
35 import rift.package.config
36 import rift.package.convert
37 import rift.package.icon
38 import rift.package.package
39 import rift.package.script
40 import rift.package.store
41
42 from rift.tasklets.rwlaunchpad import export
43
44 import gi
45 gi.require_version('ProjectVnfdYang', '1.0')
46 gi.require_version('RwProjectVnfdYang', '1.0')
47 from gi.repository import (
48 RwProjectVnfdYang as RwVnfdYang,
49 ProjectVnfdYang as VnfdYang,
50 )
51
52 import utest_package
53
54
55 class TestExport(utest_package.PackageTestCase):
56 def setUp(self):
57 super().setUp()
58 self._exporter = export.DescriptorPackageArchiveExporter(self._log)
59 self._rw_vnfd_serializer = rift.package.convert.RwVnfdSerializer()
60 self._vnfd_serializer = rift.package.convert.VnfdSerializer()
61
62 def test_create_archive(self):
63 rw_vnfd_msg = RwVnfdYang.YangData_RwProject_Project_VnfdCatalog_Vnfd(
64 id="new_id", name="new_name", description="new_description"
65 )
66 json_desc_str = self._rw_vnfd_serializer.to_json_string(rw_vnfd_msg)
67
68 vnfd_package = self.create_vnfd_package()
69 with io.BytesIO() as archive_hdl:
70 archive = self._exporter.create_archive(
71 archive_hdl, vnfd_package, json_desc_str, self._rw_vnfd_serializer
72 )
73
74 archive_hdl.seek(0)
75
76 # Create a new read-only archive from the archive handle and a package from that archive
77 archive = rift.package.archive.TarPackageArchive(self._log, archive_hdl)
78 package = archive.create_package()
79
80 # Ensure that the descriptor in the package has been overwritten
81 self.assertEqual(package.descriptor_msg, rw_vnfd_msg)
82
83 def test_export_package(self):
84 rw_vnfd_msg = RwVnfdYang.YangData_RwProject_Project_VnfdCatalog_Vnfd(
85 id="new_id", name="new_name", description="new_description",
86 meta="THIS FIELD IS NOT IN REGULAR VNFD"
87 )
88 vnfd_msg = VnfdYang.YangData_RwProject_Project_VnfdCatalog_Vnfd()
89 vnfd_msg.from_dict(rw_vnfd_msg.as_dict(), ignore_missing_keys=True)
90
91 self.assertNotEqual(rw_vnfd_msg, vnfd_msg)
92
93 json_desc_str = self._rw_vnfd_serializer.to_json_string(rw_vnfd_msg)
94
95 with tempfile.TemporaryDirectory() as tmp_dir:
96 vnfd_package = self.create_vnfd_package()
97 pkg_id = str(uuid.uuid4())
98 exported_path = self._exporter.export_package(
99 vnfd_package, tmp_dir, pkg_id, json_desc_str, self._vnfd_serializer
100 )
101
102 self.assertTrue(os.path.isfile(exported_path))
103 self.assertTrue(tarfile.is_tarfile(exported_path))
104
105 with open(exported_path, "rb") as archive_hdl:
106 archive = rift.package.archive.TarPackageArchive(self._log, archive_hdl)
107 package = archive.create_package()
108
109 self.assertEqual(package.descriptor_msg, vnfd_msg)
110
111 def test_export_cleanup(self):
112 loop = asyncio.get_event_loop()
113 with tempfile.TemporaryDirectory() as tmp_dir:
114 archive_files = [tempfile.mkstemp(dir=tmp_dir, suffix=".tar.gz")[1] for _ in range(2)]
115
116 # Set the mtime on only one of the files to test the min_age_secs argument
117 times = (time.time(), time.time() - 10)
118 os.utime(archive_files[0], times)
119
120 task = loop.create_task(
121 export.periodic_export_cleanup(
122 self._log, loop, tmp_dir, period_secs=.01, min_age_secs=5
123 )
124 )
125 loop.run_until_complete(asyncio.sleep(.05, loop=loop))
126
127 if task.done() and task.exception() is not None:
128 raise task.exception()
129
130 self.assertFalse(task.done())
131
132 self.assertFalse(os.path.exists(archive_files[0]))
133 self.assertTrue(os.path.exists(archive_files[1]))
134
135 def main(argv=sys.argv[1:]):
136 logging.basicConfig(format='TEST %(message)s')
137
138 runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
139 parser = argparse.ArgumentParser()
140 parser.add_argument('-v', '--verbose', action='store_true')
141 parser.add_argument('-n', '--no-runner', action='store_true')
142
143 args, unknown = parser.parse_known_args(argv)
144 if args.no_runner:
145 runner = None
146
147 # Set the global logging level
148 logging.getLogger().setLevel(logging.DEBUG if args.verbose else logging.ERROR)
149
150 # The unittest framework requires a program name, so use the name of this
151 # file instead (we do not want to have to pass a fake program name to main
152 # when this is called from the interpreter).
153 unittest.main(argv=[__file__] + unknown + ["-v"], testRunner=runner)
154
155 if __name__ == '__main__':
156 main()