4 # Copyright 2016 RIFT.IO Inc
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
20 @author Austin Cormier (Austin.Cormier@riftio.com)
22 @brief Launchpad Module Test
36 gi
.require_version('RwlogMgmtYang', '1.0')
37 gi
.require_version('RwBaseYang', '1.0')
38 gi
.require_version('RwCloudYang', '1.0')
39 gi
.require_version('RwIwpYang', '1.0')
40 gi
.require_version('RwNsmYang', '1.0')
41 gi
.require_version('RwNsrYang', '1.0')
42 gi
.require_version('RwNsrYang', '1.0')
43 gi
.require_version('RwResourceMgrYang', '1.0')
44 gi
.require_version('RwConmanYang', '1.0')
45 gi
.require_version('RwVnfdYang', '1.0')
47 from gi
.repository
import (
62 logging
.basicConfig(level
=logging
.DEBUG
)
65 RW_KT_UTM_PKG_INSTALL_DIR
= os
.path
.join(
66 os
.environ
["RIFT_INSTALL"],
67 "usr/rift/mano/vnfds/kt_utm"
70 RW_KT_WIMS_PKG_INSTALL_DIR
= os
.path
.join(
71 os
.environ
["RIFT_INSTALL"],
72 "usr/rift/mano/vnfds/kt_wims"
75 RW_KT_UTM_WIMS_NSD_PKG_INSTALL_DIR
= os
.path
.join(
76 os
.environ
["RIFT_INSTALL"],
77 "usr/rift/mano/nsds/utm_wims"
81 class PackageError(Exception):
85 def raise_package_error():
86 raise PackageError("Could not find ns packages")
89 @pytest.fixture(scope
='module')
90 def iwp_proxy(request
, mgmt_session
):
91 return mgmt_session
.proxy(RwIwpYang
)
94 @pytest.fixture(scope
='module')
95 def rwlog_mgmt_proxy(request
, mgmt_session
):
96 return mgmt_session
.proxy(RwlogMgmtYang
)
99 @pytest.fixture(scope
='module')
100 def resource_mgr_proxy(request
, mgmt_session
):
101 return mgmt_session
.proxy(RwResourceMgrYang
)
104 @pytest.fixture(scope
='module')
105 def cloud_proxy(request
, mgmt_session
):
106 return mgmt_session
.proxy(RwCloudYang
)
109 @pytest.fixture(scope
='module')
110 def vnfd_proxy(request
, mgmt_session
):
111 return mgmt_session
.proxy(RwVnfdYang
)
114 @pytest.fixture(scope
='module')
115 def vld_proxy(request
, mgmt_session
):
116 return mgmt_session
.proxy(VldYang
)
119 @pytest.fixture(scope
='module')
120 def nsd_proxy(request
, mgmt_session
):
121 return mgmt_session
.proxy(NsdYang
)
124 @pytest.fixture(scope
='module')
125 def nsr_proxy(request
, mgmt_session
):
126 return mgmt_session
.proxy(NsrYang
)
129 @pytest.fixture(scope
='module')
130 def rwnsr_proxy(request
, mgmt_session
):
131 return mgmt_session
.proxy(RwNsrYang
)
134 @pytest.fixture(scope
='module')
135 def base_proxy(request
, mgmt_session
):
136 return mgmt_session
.proxy(RwBaseYang
)
139 @pytest.fixture(scope
='module')
140 def so_proxy(request
, mgmt_session
):
141 return mgmt_session
.proxy(RwConmanYang
)
144 @pytest.fixture(scope
='module')
145 def nsm_proxy(request
, mgmt_session
):
146 return mgmt_session
.proxy(RwNsmYang
)
149 @pytest.fixture(scope
='session')
150 def kt_utm_vnfd_package_file():
151 ktutm_pkg_file
= os
.path
.join(
152 RW_KT_UTM_PKG_INSTALL_DIR
,
153 "kt_utm_vnfd.tar.gz",
155 if not os
.path
.exists(ktutm_pkg_file
):
156 raise_package_error()
158 return ktutm_pkg_file
160 @pytest.fixture(scope
='session')
161 def kt_wims_vnfd_package_file():
162 ktwims_pkg_file
= os
.path
.join(
163 RW_KT_WIMS_PKG_INSTALL_DIR
,
164 "kt_wims_vnfd.tar.gz",
166 if not os
.path
.exists(ktwims_pkg_file
):
167 raise_package_error()
169 return ktwims_pkg_file
171 @pytest.fixture(scope
='session')
172 def utm_wims_nsd_package_file():
173 ktutm_wims_nsd_pkg_file
= os
.path
.join(
174 RW_KT_UTM_WIMS_NSD_PKG_INSTALL_DIR
,
175 "utm_wims_nsd.tar.gz",
177 if not os
.path
.exists(ktutm_wims_nsd_pkg_file
):
178 raise_package_error()
180 return ktutm_wims_nsd_pkg_file
182 def upload_descriptor(logger
, descriptor_file
, host
="127.0.0.1"):
183 curl_cmd
= 'curl -F "descriptor=@{file}" http://{host}:4567/api/upload'.format(
184 file=descriptor_file
,
187 logger
.debug("Uploading descriptor %s using cmd: %s", descriptor_file
, curl_cmd
)
188 stdout
= subprocess
.check_output(shlex
.split(curl_cmd
), universal_newlines
=True)
190 json_out
= json
.loads(stdout
)
191 transaction_id
= json_out
["transaction_id"]
193 return transaction_id
196 class DescriptorOnboardError(Exception):
200 def wait_unboard_transaction_finished(logger
, transaction_id
, timeout_secs
=600, host
="127.0.0.1"):
201 logger
.info("Waiting for onboard trans_id %s to complete",
203 start_time
= time
.time()
204 while (time
.time() - start_time
) < timeout_secs
:
206 'http://{host}:4567/api/upload/{t_id}/state'.format(
207 host
=host
, t_id
=transaction_id
211 if state
["status"] == "pending":
215 elif state
["status"] == "success":
216 logger
.info("Descriptor onboard was successful")
220 raise DescriptorOnboardError(state
)
222 if state
["status"] != "success":
223 raise DescriptorOnboardError(state
)
225 def create_nsr_from_nsd_id(nsd_id
):
226 nsr
= NsrYang
.YangData_Nsr_NsInstanceConfig_Nsr()
227 nsr
.id = str(uuid
.uuid4())
228 nsr
.name
= "UTM-WIMS"
229 nsr
.short_name
= "UTM-WIMS"
230 nsr
.description
= "2 VNFs with 4 VLs"
232 nsr
.admin_status
= "ENABLED"
236 @pytest.mark
.incremental
237 class TestLaunchpadStartStop(object):
238 def test_configure_logging(self
, rwlog_mgmt_proxy
):
239 logging
= RwlogMgmtYang
.Logging
.from_dict({
244 "name": "rw-generic",
250 rwlog_mgmt_proxy
.merge_config("/rwlog-mgmt:logging", logging
)
252 def test_configure_cloud_account(self
, cloud_proxy
, logger
):
253 cloud_account
= RwCloudYang
.CloudAccountConfig()
254 # cloud_account.name = "cloudsim_proxy"
255 # cloud_account.account_type = "cloudsim_proxy"
256 cloud_account
.name
= "openstack"
257 cloud_account
.account_type
= "openstack"
258 cloud_account
.openstack
.key
= 'pluto'
259 cloud_account
.openstack
.secret
= 'mypasswd'
260 cloud_account
.openstack
.auth_url
= 'http://10.66.4.xx:5000/v3/'
261 cloud_account
.openstack
.tenant
= 'demo'
262 cloud_account
.openstack
.mgmt_network
= 'private'
264 cloud_proxy
.merge_config("/rw-cloud:cloud-account", cloud_account
)
266 def test_configure_pools(self
, resource_mgr_proxy
):
267 pools
= RwResourceMgrYang
.ResourcePools
.from_dict({
268 "pools": [{ "name": "vm_pool_a",
269 "resource_type": "compute",
270 "pool_type" : "dynamic"},
271 {"name": "network_pool_a",
272 "resource_type": "network",
273 "pool_type" : "dynamic",}]})
275 resource_mgr_proxy
.merge_config('/rw-resource-mgr:resource-mgr-config/rw-resource-mgr:resource-pools', pools
)
277 def test_configure_resource_orchestrator(self
, so_proxy
):
278 cfg
= RwConmanYang
.RoEndpoint
.from_dict({'ro_ip_address': '127.0.0.1',
280 'ro_username' : 'admin',
281 'ro_password' : 'admin'})
282 so_proxy
.merge_config('/rw-conman:cm-config', cfg
)
284 def test_configure_service_orchestrator(self
, nsm_proxy
):
285 cfg
= RwNsmYang
.SoEndpoint
.from_dict({'cm_ip_address': '127.0.0.1',
287 'cm_username' : 'admin',
288 'cm_password' : 'admin'})
289 nsm_proxy
.merge_config('/rw-nsm:ro-config/rw-nsm:cm-endpoint', cfg
)
292 def test_onboard_ktutm_vnfd(self
, logger
, vnfd_proxy
, kt_utm_vnfd_package_file
):
293 logger
.info("Onboarding kt_utm_vnfd package: %s", kt_utm_vnfd_package_file
)
294 trans_id
= upload_descriptor(logger
, kt_utm_vnfd_package_file
)
295 wait_unboard_transaction_finished(logger
, trans_id
)
297 catalog
= vnfd_proxy
.get_config('/vnfd-catalog')
299 assert len(vnfds
) == 1, "There should only be a single vnfd"
301 assert vnfd
.name
== "kt_utm_vnfd"
303 def test_onboard_ktwims_vnfd(self
, logger
, vnfd_proxy
, kt_wims_vnfd_package_file
):
304 logger
.info("Onboarding kt_wims_vnfd package: %s", kt_wims_vnfd_package_file
)
305 trans_id
= upload_descriptor(logger
, kt_wims_vnfd_package_file
)
306 wait_unboard_transaction_finished(logger
, trans_id
)
308 catalog
= vnfd_proxy
.get_config('/vnfd-catalog')
310 assert len(vnfds
) == 2, "There should only be two vnfd"
311 assert "kt_wims_vnfd" in [vnfds
[0].name
, vnfds
[1].name
]
313 def test_onboard_utm_wims_nsd(self
, logger
, nsd_proxy
, utm_wims_nsd_package_file
):
314 logger
.info("Onboarding utm_wims_nsd package: %s", utm_wims_nsd_package_file
)
315 trans_id
= upload_descriptor(logger
, utm_wims_nsd_package_file
)
316 wait_unboard_transaction_finished(logger
, trans_id
)
318 catalog
= nsd_proxy
.get_config('/nsd-catalog')
320 assert len(nsds
) == 1, "There should only be a single nsd"
323 def test_instantiate_utm_wims_nsr(self
, logger
, nsd_proxy
, nsr_proxy
, rwnsr_proxy
, base_proxy
):
324 catalog
= nsd_proxy
.get_config('/nsd-catalog')
327 nsr
= create_nsr_from_nsd_id(nsd
.id)
328 nsr_proxy
.merge_config('/ns-instance-config', nsr
)
330 nsr_opdata
= rwnsr_proxy
.get('/ns-instance-opdata')
331 nsrs
= nsr_opdata
.nsr
332 assert len(nsrs
) == 1
333 assert nsrs
[0].ns_instance_config_ref
== nsr
.id