4 # Copyright 2016 RIFT.IO Inc
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
34 import tornado
.httputil
37 from tornado
.platform
.asyncio
import AsyncIOMainLoop
38 from tornado
.ioloop
import IOLoop
39 from concurrent
.futures
.thread
import ThreadPoolExecutor
40 from concurrent
.futures
.process
import ProcessPoolExecutor
43 gi
.require_version('RwDts', '1.0')
44 gi
.require_version('RwPkgMgmtYang', '1.0')
45 gi
.require_version('RwProjectVnfdYang', '1.0')
46 from gi
.repository
import (
49 RwProjectVnfdYang
as RwVnfdYang
,
52 import rift
.tasklets
.rwlaunchpad
.uploader
as uploader
53 import rift
.tasklets
.rwlaunchpad
.message
as message
54 import rift
.tasklets
.rwlaunchpad
.export
as export
55 from rift
.mano
.utils
.project
import DEFAULT_PROJECT
60 TEST_STRING
= "foobar"
62 class TestCase(rift
.test
.dts
.AbstractDTSTest
):
64 def configure_schema(cls
):
65 return RwPkgMgmtYang
.get_schema()
68 def configure_timeout(cls
):
71 def configure_test(self
, loop
, test_id
):
72 self
.log
.debug("STARTING - %s", test_id
)
73 self
.tinfo
= self
.new_tinfo(str(test_id
))
74 self
.dts
= rift
.tasklets
.DTS(self
.tinfo
, self
.schema
, self
.loop
)
77 mock_vnfd_catalog
= mock
.MagicMock()
78 self
.uid
, path
= self
.create_mock_package()
80 mock_vnfd
= RwVnfdYang
.YangData_RwProject_Project_VnfdCatalog_Vnfd
.from_dict({
83 mock_vnfd_catalog
= {self
.uid
: mock_vnfd
}
87 def get_vnfd_catalog(project
=DEFAULT_PROJECT
):
88 return mock_vnfd_catalog
93 cls
.get_vnfd_catalog
= get_vnfd_catalog
94 cls
.get_nsd_catalog
= None
96 self
.app
= uploader
.UploaderApplication(MockTasklet())
98 AsyncIOMainLoop().install()
99 self
.server
= tornado
.httpserver
.HTTPServer(
101 io_loop
=IOLoop
.current(),
107 def create_mock_package(self
):
108 uid
= str(uuid
.uuid4())
110 os
.getenv('RIFT_ARTIFACTS'),
111 "launchpad/packages/vnfd",
114 package_path
= os
.path
.join(path
, "pong_vnfd")
116 os
.makedirs(package_path
)
117 open(os
.path
.join(path
, "pong_vnfd.xml"), "wb").close()
118 open(os
.path
.join(path
, "logo.png"), "wb").close()
122 @rift.test
.dts
.async_test
123 def test_package_create_rpc(self
):
125 1. Verify the package-create RPC handler
126 2. Check if the log messages are updated which will be used by UI
128 3. Verify the package-update RPC handler
129 4. Check if the log messages are updated which will be used by UI
132 yield from self
.app
.register()
133 ip
= RwPkgMgmtYang
.YangInput_RwPkgMgmt_PackageCreate
.from_dict({
134 "package_type": "VNFD",
135 "external_url": "http://repo.riftio.com/releases/open.riftio.com/4.2.1/VNFS/ping_vnfd.tar.gz",
136 "project_name": DEFAULT_PROJECT
139 rpc_out
= yield from self
.dts
.query_rpc(
140 "I,/rw-pkg-mgmt:package-create",
141 rwdts
.XactFlag
.TRACE
,
146 result
= yield from itr
147 trans_id
= result
.result
.transaction_id
149 assert trans_id
is not None
151 yield from asyncio
.sleep(5, loop
=self
.loop
)
152 # Verify the message logs
153 data
= self
.app
.messages
[trans_id
]
154 assert data
is not None
156 assert type(data
) is message
.DownloadSuccess
159 ip
= RwPkgMgmtYang
.YangInput_RwPkgMgmt_PackageUpdate
.from_dict({
160 "package_type": "VNFD",
161 "external_url": "http://repo.riftio.com/releases/open.riftio.com/4.2.1/VNFS/ping_vnfd.tar.gz",
162 "project_name": DEFAULT_PROJECT
164 rpc_out
= yield from self
.dts
.query_rpc(
165 "I,/rw-pkg-mgmt:package-update",
166 rwdts
.XactFlag
.TRACE
,
171 result
= yield from itr
172 trans_id
= result
.result
.transaction_id
174 assert trans_id
is not None
175 yield from asyncio
.sleep(5, loop
=self
.loop
)
176 # Verify the message logs
177 data
= self
.app
.messages
[trans_id
]
178 assert data
is not None
180 assert type(data
) is message
.DownloadSuccess
183 @rift.test
.dts
.async_test
184 def test_package_export(self
):
186 1. Verify if the package export RPC handler work
187 2. A file is actually generated in the exports dir.
189 yield from self
.app
.register()
190 ip
= RwPkgMgmtYang
.YangInput_RwPkgMgmt_PackageExport
.from_dict({
191 "package_type": "VNFD",
192 "package_id": self
.uid
195 rpc_out
= yield from self
.dts
.query_rpc(
196 "I,/rw-pkg-mgmt:package-export",
197 rwdts
.XactFlag
.TRACE
,
203 result
= yield from itr
204 trans_id
= result
.result
.transaction_id
205 filename
= result
.result
.filename
207 assert trans_id
is not None
209 # Verify the message logs
210 data
= self
.app
.messages
[trans_id
]
211 assert data
is not None
213 assert type(data
) is export
.ExportSuccess
215 os
.getenv("RIFT_ARTIFACTS"),
221 assert os
.path
.isfile(path
)
225 runner
= xmlrunner
.XMLTestRunner(output
=os
.environ
["RIFT_MODULE_TEST"])
227 parser
= argparse
.ArgumentParser()
228 parser
.add_argument('-v', '--verbose', action
='store_true')
229 parser
.add_argument('-n', '--no-runner', action
='store_true')
230 args
, unittest_args
= parser
.parse_known_args()
233 logging
.basicConfig(format
='TEST %(message)s')
234 logging
.getLogger().setLevel(logging
.DEBUG
)
237 unittest
.main(testRunner
=runner
, argv
=[sys
.argv
[0]] + unittest_args
)
239 if __name__
== '__main__':