4 # Copyright 2016 RIFT.IO Inc
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
34 import tornado
.httputil
36 #Setting RIFT_VAR_ROOT if not already set for unit test execution
37 if "RIFT_VAR_ROOT" not in os
.environ
:
38 os
.environ
['RIFT_VAR_ROOT'] = os
.path
.join(os
.environ
['RIFT_INSTALL'], 'var/rift/unittest')
41 from tornado
.platform
.asyncio
import AsyncIOMainLoop
42 from tornado
.ioloop
import IOLoop
43 from concurrent
.futures
.thread
import ThreadPoolExecutor
44 from concurrent
.futures
.process
import ProcessPoolExecutor
47 gi
.require_version('RwDts', '1.0')
48 gi
.require_version('RwPkgMgmtYang', '1.0')
49 gi
.require_version('RwProjectVnfdYang', '1.0')
50 from gi
.repository
import (
53 RwProjectVnfdYang
as RwVnfdYang
,
56 import rift
.tasklets
.rwlaunchpad
.uploader
as uploader
57 import rift
.tasklets
.rwlaunchpad
.message
as message
58 import rift
.tasklets
.rwlaunchpad
.export
as export
59 from rift
.mano
.utils
.project
import ManoProject
, DEFAULT_PROJECT
61 import rift
.package
.store
65 TEST_STRING
= "foobar"
67 class TestCase(rift
.test
.dts
.AbstractDTSTest
):
69 def configure_schema(cls
):
70 return RwPkgMgmtYang
.get_schema()
73 def configure_timeout(cls
):
76 def configure_test(self
, loop
, test_id
):
77 self
.log
.debug("STARTING - %s", test_id
)
78 self
.tinfo
= self
.new_tinfo(str(test_id
))
79 self
.dts
= rift
.tasklets
.DTS(self
.tinfo
, self
.schema
, self
.loop
)
82 mock_vnfd_catalog
= mock
.MagicMock()
83 self
.uid
, path
= self
.create_mock_package(DEFAULT_PROJECT
)
85 mock_vnfd
= RwVnfdYang
.YangData_RwProject_Project_VnfdCatalog_Vnfd
.from_dict({
88 mock_vnfd_catalog
= {self
.uid
: mock_vnfd
}
92 def get_vnfd_catalog(project
=DEFAULT_PROJECT
):
93 return mock_vnfd_catalog
98 cls
.get_vnfd_catalog
= get_vnfd_catalog
99 cls
.get_nsd_catalog
= None
101 def _get_project(cls
, project_name
):
102 if cls
.project
is None:
103 cls
.project
= ManoProject(cls
.log
, project_name
)
106 vnfd_store
= rift
.package
.store
.VnfdPackageFilesystemStore(self
.log
, project
=DEFAULT_PROJECT
)
107 nsd_store
= rift
.package
.store
.NsdPackageFilesystemStore(self
.log
, project
=DEFAULT_PROJECT
)
109 self
.app
= uploader
.UploaderApplication(MockTasklet(), vnfd_store
=vnfd_store
, nsd_store
=nsd_store
)
110 self
.app
.onboarder
.get_updated_descriptor
= mock
.MagicMock(return_value
={'vnfd:vnfd':{'name':'mock', 'version':'mock'}})
111 self
.app
.onboarder
.onboard
= mock
.MagicMock()
112 self
.app
.onboarder
.update
= mock
.MagicMock()
114 AsyncIOMainLoop().install()
115 self
.server
= tornado
.httpserver
.HTTPServer(
117 io_loop
=IOLoop
.current(),
123 def create_mock_package(self
, project
):
124 uid
= str(uuid
.uuid4())
126 os
.getenv('RIFT_VAR_ROOT'),
127 "launchpad/packages/vnfd",
131 package_path
= os
.path
.join(path
, "pong_vnfd")
133 os
.makedirs(package_path
)
134 open(os
.path
.join(path
, "pong_vnfd.xml"), "wb").close()
135 open(os
.path
.join(path
, "logo.png"), "wb").close()
139 @rift.test
.dts
.async_test
140 def test_package_create_rpc(self
):
142 1. Verify the package-create RPC handler
143 2. Check if the log messages are updated which will be used by UI
145 3. Verify the package-update RPC handler
146 4. Check if the log messages are updated which will be used by UI
149 yield from self
.app
.register()
150 ip
= RwPkgMgmtYang
.YangInput_RwPkgMgmt_PackageCreate
.from_dict({
151 "package_type": "VNFD",
152 "external_url": "http://repo.riftio.com/releases/open.riftio.com/4.4.2/ping_vnfd.tar.gz",
153 "project_name": DEFAULT_PROJECT
156 rpc_out
= yield from self
.dts
.query_rpc(
157 "I,/rw-pkg-mgmt:package-create",
158 rwdts
.XactFlag
.TRACE
,
163 result
= yield from itr
164 trans_id
= result
.result
.transaction_id
166 assert trans_id
is not None
168 yield from asyncio
.sleep(5, loop
=self
.loop
)
169 # Verify the message logs
170 data
= self
.app
.messages
[trans_id
]
171 assert data
is not None
173 assert type(data
) is message
.DownloadSuccess
176 ip
= RwPkgMgmtYang
.YangInput_RwPkgMgmt_PackageUpdate
.from_dict({
177 "package_type": "VNFD",
178 "external_url": "http://repo.riftio.com/releases/open.riftio.com/4.4.2/ping_vnfd.tar.gz",
179 "project_name": DEFAULT_PROJECT
181 rpc_out
= yield from self
.dts
.query_rpc(
182 "I,/rw-pkg-mgmt:package-update",
183 rwdts
.XactFlag
.TRACE
,
188 result
= yield from itr
189 trans_id
= result
.result
.transaction_id
191 assert trans_id
is not None
192 yield from asyncio
.sleep(5, loop
=self
.loop
)
193 # Verify the message logs
194 data
= self
.app
.messages
[trans_id
]
195 assert data
is not None
197 assert type(data
) is message
.DownloadSuccess
199 @rift.test
.dts
.async_test
200 def test_package_export(self
):
202 1. Verify if the package export RPC handler work
203 2. A file is actually generated in the exports dir.
205 yield from self
.app
.register()
206 ip
= RwPkgMgmtYang
.YangInput_RwPkgMgmt_PackageExport
.from_dict({
207 "package_type": "VNFD",
208 "package_id": self
.uid
211 rpc_out
= yield from self
.dts
.query_rpc(
212 "I,/rw-pkg-mgmt:package-export",
213 rwdts
.XactFlag
.TRACE
,
219 result
= yield from itr
220 trans_id
= result
.result
.transaction_id
221 filename
= result
.result
.filename
223 assert trans_id
is not None
225 # Verify the message logs
226 data
= self
.app
.messages
[trans_id
]
227 assert data
is not None
229 assert type(data
) is export
.ExportSuccess
231 os
.getenv("RIFT_VAR_ROOT"),
237 assert os
.path
.isfile(path
)
241 runner
= xmlrunner
.XMLTestRunner(output
=os
.environ
["RIFT_MODULE_TEST"])
243 parser
= argparse
.ArgumentParser()
244 parser
.add_argument('-v', '--verbose', action
='store_true')
245 parser
.add_argument('-n', '--no-runner', action
='store_true')
246 args
, unittest_args
= parser
.parse_known_args()
249 logging
.basicConfig(format
='TEST %(message)s')
250 logging
.getLogger().setLevel(logging
.DEBUG
)
253 unittest
.main(testRunner
=runner
, argv
=[sys
.argv
[0]] + unittest_args
)
255 if __name__
== '__main__':