4 # Copyright 2016 RIFT.IO Inc
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
34 import tornado
.httputil
36 #Setting RIFT_VAR_ROOT if not already set for unit test execution
37 if "RIFT_VAR_ROOT" not in os
.environ
:
38 os
.environ
['RIFT_VAR_ROOT'] = os
.path
.join(os
.environ
['RIFT_INSTALL'], 'var/rift/unittest')
42 from tornado
.platform
.asyncio
import AsyncIOMainLoop
43 from tornado
.ioloop
import IOLoop
44 from concurrent
.futures
.thread
import ThreadPoolExecutor
45 from concurrent
.futures
.process
import ProcessPoolExecutor
46 gi
.require_version('RwDts', '1.0')
47 gi
.require_version('RwPkgMgmtYang', '1.0')
48 from gi
.repository
import (
55 import rift
.tasklets
.rwlaunchpad
.uploader
as uploader
56 import rift
.tasklets
.rwlaunchpad
.message
as message
57 import rift
.tasklets
.rwlaunchpad
.export
as export
61 TEST_STRING
= "foobar"
63 class TestCase(rift
.test
.dts
.AbstractDTSTest
):
65 def configure_schema(cls
):
66 return RwPkgMgmtYang
.get_schema()
69 def configure_timeout(cls
):
72 def configure_test(self
, loop
, test_id
):
73 self
.log
.debug("STARTING - %s", test_id
)
74 self
.tinfo
= self
.new_tinfo(str(test_id
))
75 self
.dts
= rift
.tasklets
.DTS(self
.tinfo
, self
.schema
, self
.loop
)
78 mock_vnfd_catalog
= mock
.MagicMock()
79 self
.uid
, path
= self
.create_mock_package()
81 mock_vnfd
= RwVnfdYang
.YangData_Vnfd_VnfdCatalog_Vnfd
.from_dict({
84 mock_vnfd_catalog
= {self
.uid
: mock_vnfd
}
86 self
.app
= uploader
.UploaderApplication(
90 vnfd_catalog
=mock_vnfd_catalog
)
92 AsyncIOMainLoop().install()
93 self
.server
= tornado
.httpserver
.HTTPServer(
95 io_loop
=IOLoop
.current(),
101 def create_mock_package(self
):
102 uid
= str(uuid
.uuid4())
104 os
.getenv('RIFT_VAR_ROOT'),
105 "launchpad/packages/vnfd",
108 package_path
= os
.path
.join(path
, "pong_vnfd")
110 os
.makedirs(package_path
)
111 open(os
.path
.join(path
, "pong_vnfd.xml"), "wb").close()
112 open(os
.path
.join(path
, "logo.png"), "wb").close()
116 @rift.test
.dts
.async_test
117 def test_package_create_rpc(self
):
119 1. Verify the package-create RPC handler
120 2. Check if the log messages are updated which will be used by UI
122 3. Verify the package-update RPC handler
123 4. Check if the log messages are updated which will be used by UI
126 yield from self
.app
.register()
127 ip
= RwPkgMgmtYang
.YangInput_RwPkgMgmt_PackageCreate
.from_dict({
128 "package_type": "VNFD",
129 "external_url": "http://repo.riftio.com/releases/open.riftio.com/4.2.1/VNFS/ping_vnfd.tar.gz"
132 rpc_out
= yield from self
.dts
.query_rpc(
133 "I,/rw-pkg-mgmt:package-create",
134 rwdts
.XactFlag
.TRACE
,
139 result
= yield from itr
140 trans_id
= result
.result
.transaction_id
142 assert trans_id
is not None
144 yield from asyncio
.sleep(5, loop
=self
.loop
)
145 # Verify the message logs
146 data
= self
.app
.messages
[trans_id
]
147 assert data
is not None
149 assert type(data
) is message
.DownloadSuccess
152 ip
= RwPkgMgmtYang
.YangInput_RwPkgMgmt_PackageUpdate
.from_dict({
153 "package_type": "VNFD",
154 "external_url": "http://repo.riftio.com/releases/open.riftio.com/4.2.1/VNFS/ping_vnfd.tar.gz"
156 rpc_out
= yield from self
.dts
.query_rpc(
157 "I,/rw-pkg-mgmt:package-update",
158 rwdts
.XactFlag
.TRACE
,
163 result
= yield from itr
164 trans_id
= result
.result
.transaction_id
166 assert trans_id
is not None
167 yield from asyncio
.sleep(5, loop
=self
.loop
)
168 # Verify the message logs
169 data
= self
.app
.messages
[trans_id
]
170 assert data
is not None
172 assert type(data
) is message
.DownloadSuccess
175 @rift.test
.dts
.async_test
176 def test_package_export(self
):
178 1. Verify if the package export RPC handler work
179 2. A file is actually generated in the exports dir.
181 yield from self
.app
.register()
182 ip
= RwPkgMgmtYang
.YangInput_RwPkgMgmt_PackageExport
.from_dict({
183 "package_type": "VNFD",
184 "package_id": self
.uid
187 rpc_out
= yield from self
.dts
.query_rpc(
188 "I,/rw-pkg-mgmt:package-export",
189 rwdts
.XactFlag
.TRACE
,
195 result
= yield from itr
196 trans_id
= result
.result
.transaction_id
197 filename
= result
.result
.filename
199 assert trans_id
is not None
201 # Verify the message logs
202 data
= self
.app
.messages
[trans_id
]
203 assert data
is not None
205 assert type(data
) is export
.ExportSuccess
207 os
.getenv("RIFT_VAR_ROOT"),
213 assert os
.path
.isfile(path
)
217 runner
= xmlrunner
.XMLTestRunner(output
=os
.environ
["RIFT_MODULE_TEST"])
219 parser
= argparse
.ArgumentParser()
220 parser
.add_argument('-v', '--verbose', action
='store_true')
221 parser
.add_argument('-n', '--no-runner', action
='store_true')
222 args
, unittest_args
= parser
.parse_known_args()
225 logging
.basicConfig(format
='TEST %(message)s')
226 logging
.getLogger().setLevel(logging
.DEBUG
)
229 unittest
.main(testRunner
=runner
, argv
=[sys
.argv
[0]] + unittest_args
)
231 if __name__
== '__main__':