Merge "CLI for OSM"
[osm/SO.git] / rwlaunchpad / plugins / rwlaunchpadtasklet / test / utest_export.py
1 #!/usr/bin/env python3
2
3 #
4 # Copyright 2016 RIFT.IO Inc
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18
19 import argparse
20 import asyncio
21 import logging
22 import io
23 import os
24 import sys
25 import tarfile
26 import tempfile
27 import time
28 import unittest
29 import uuid
30 import xmlrunner
31
32 import rift.package.archive
33 import rift.package.charm
34 import rift.package.checksums
35 import rift.package.config
36 import rift.package.convert
37 import rift.package.icon
38 import rift.package.package
39 import rift.package.script
40 import rift.package.store
41
42 from rift.tasklets.rwlaunchpad import export
43
44 import gi
45 gi.require_version('RwVnfdYang', '1.0')
46 from gi.repository import (
47 RwVnfdYang,
48 VnfdYang,
49 )
50
51 import utest_package
52
53
54 class TestExport(utest_package.PackageTestCase):
55 def setUp(self):
56 super().setUp()
57 self._exporter = export.DescriptorPackageArchiveExporter(self._log)
58 self._rw_vnfd_serializer = rift.package.convert.RwVnfdSerializer()
59 self._vnfd_serializer = rift.package.convert.VnfdSerializer()
60
61 def test_create_archive(self):
62 rw_vnfd_msg = RwVnfdYang.YangData_Vnfd_VnfdCatalog_Vnfd(
63 id="new_id", name="new_name", description="new_description"
64 )
65 json_desc_str = self._rw_vnfd_serializer.to_json_string(rw_vnfd_msg)
66
67 vnfd_package = self.create_vnfd_package()
68 with io.BytesIO() as archive_hdl:
69 archive = self._exporter.create_archive(
70 archive_hdl, vnfd_package, json_desc_str, self._rw_vnfd_serializer
71 )
72
73 archive_hdl.seek(0)
74
75 # Create a new read-only archive from the archive handle and a package from that archive
76 archive = rift.package.archive.TarPackageArchive(self._log, archive_hdl)
77 package = archive.create_package()
78
79 # Ensure that the descriptor in the package has been overwritten
80 self.assertEqual(package.descriptor_msg, rw_vnfd_msg)
81
82 def test_export_package(self):
83 rw_vnfd_msg = RwVnfdYang.YangData_Vnfd_VnfdCatalog_Vnfd(
84 id="new_id", name="new_name", description="new_description",
85 meta="THIS FIELD IS NOT IN REGULAR VNFD"
86 )
87 vnfd_msg = VnfdYang.YangData_Vnfd_VnfdCatalog_Vnfd()
88 vnfd_msg.from_dict(rw_vnfd_msg.as_dict(), ignore_missing_keys=True)
89
90 self.assertNotEqual(rw_vnfd_msg, vnfd_msg)
91
92 json_desc_str = self._rw_vnfd_serializer.to_json_string(rw_vnfd_msg)
93
94 with tempfile.TemporaryDirectory() as tmp_dir:
95 vnfd_package = self.create_vnfd_package()
96 pkg_id = str(uuid.uuid4())
97 exported_path = self._exporter.export_package(
98 vnfd_package, tmp_dir, pkg_id, json_desc_str, self._vnfd_serializer
99 )
100
101 self.assertTrue(os.path.isfile(exported_path))
102 self.assertTrue(tarfile.is_tarfile(exported_path))
103
104 with open(exported_path, "rb") as archive_hdl:
105 archive = rift.package.archive.TarPackageArchive(self._log, archive_hdl)
106 package = archive.create_package()
107
108 self.assertEqual(package.descriptor_msg, vnfd_msg)
109
110 def test_export_cleanup(self):
111 loop = asyncio.get_event_loop()
112 with tempfile.TemporaryDirectory() as tmp_dir:
113 archive_files = [tempfile.mkstemp(dir=tmp_dir, suffix=".tar.gz")[1] for _ in range(2)]
114
115 # Set the mtime on only one of the files to test the min_age_secs argument
116 times = (time.time(), time.time() - 10)
117 os.utime(archive_files[0], times)
118
119 task = loop.create_task(
120 export.periodic_export_cleanup(
121 self._log, loop, tmp_dir, period_secs=.01, min_age_secs=5
122 )
123 )
124 loop.run_until_complete(asyncio.sleep(.05, loop=loop))
125
126 if task.done() and task.exception() is not None:
127 raise task.exception()
128
129 self.assertFalse(task.done())
130
131 self.assertFalse(os.path.exists(archive_files[0]))
132 self.assertTrue(os.path.exists(archive_files[1]))
133
134 def main(argv=sys.argv[1:]):
135 logging.basicConfig(format='TEST %(message)s')
136
137 runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
138 parser = argparse.ArgumentParser()
139 parser.add_argument('-v', '--verbose', action='store_true')
140 parser.add_argument('-n', '--no-runner', action='store_true')
141
142 args, unknown = parser.parse_known_args(argv)
143 if args.no_runner:
144 runner = None
145
146 # Set the global logging level
147 logging.getLogger().setLevel(logging.DEBUG if args.verbose else logging.ERROR)
148
149 # The unittest framework requires a program name, so use the name of this
150 # file instead (we do not want to have to pass a fake program name to main
151 # when this is called from the interpreter).
152 unittest.main(argv=[__file__] + unknown + ["-v"], testRunner=runner)
153
154 if __name__ == '__main__':
155 main()