4 # Copyright 2016 RIFT.IO Inc
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
20 @author Austin Cormier (Austin.Cormier@riftio.com)
22 @brief Launchpad Module Test
36 gi
.require_version('RwlogMgmtYang', '1.0')
37 gi
.require_version('RwBaseYang', '1.0')
38 gi
.require_version('RwCloudYang', '1.0')
39 gi
.require_version('RwNsmYang', '1.0')
40 gi
.require_version('RwIwpYang', '1.0')
41 gi
.require_version('RwNsrYang', '1.0')
42 gi
.require_version('RwResourceMgrYang', '1.0')
43 gi
.require_version('RwConmanYang', '1.0')
44 gi
.require_version('RwVnfdYang', '1.0')
46 from gi
.repository
import (
61 logging
.basicConfig(level
=logging
.DEBUG
)
64 RW_KT_UTM_PKG_INSTALL_DIR
= os
.path
.join(
65 os
.environ
["RIFT_INSTALL"],
66 "usr/rift/mano/vnfds/kt_utm"
69 RW_KT_UTM_NSD_PKG_INSTALL_DIR
= os
.path
.join(
70 os
.environ
["RIFT_INSTALL"],
71 "usr/rift/mano/nsds/utm_only"
75 class PackageError(Exception):
79 def raise_package_error():
80 raise PackageError("Could not find ns packages")
83 @pytest.fixture(scope
='module')
84 def iwp_proxy(request
, mgmt_session
):
85 return mgmt_session
.proxy(RwIwpYang
)
88 @pytest.fixture(scope
='module')
89 def rwlog_mgmt_proxy(request
, mgmt_session
):
90 return mgmt_session
.proxy(RwlogMgmtYang
)
93 @pytest.fixture(scope
='module')
94 def resource_mgr_proxy(request
, mgmt_session
):
95 return mgmt_session
.proxy(RwResourceMgrYang
)
98 @pytest.fixture(scope
='module')
99 def cloud_proxy(request
, mgmt_session
):
100 return mgmt_session
.proxy(RwCloudYang
)
103 @pytest.fixture(scope
='module')
104 def vnfd_proxy(request
, mgmt_session
):
105 return mgmt_session
.proxy(RwVnfdYang
)
108 @pytest.fixture(scope
='module')
109 def vld_proxy(request
, mgmt_session
):
110 return mgmt_session
.proxy(VldYang
)
113 @pytest.fixture(scope
='module')
114 def nsd_proxy(request
, mgmt_session
):
115 return mgmt_session
.proxy(NsdYang
)
118 @pytest.fixture(scope
='module')
119 def nsr_proxy(request
, mgmt_session
):
120 return mgmt_session
.proxy(NsrYang
)
123 @pytest.fixture(scope
='module')
124 def rwnsr_proxy(request
, mgmt_session
):
125 return mgmt_session
.proxy(RwNsrYang
)
128 @pytest.fixture(scope
='module')
129 def base_proxy(request
, mgmt_session
):
130 return mgmt_session
.proxy(RwBaseYang
)
133 @pytest.fixture(scope
='module')
134 def so_proxy(request
, mgmt_session
):
135 return mgmt_session
.proxy(RwConmanYang
)
138 @pytest.fixture(scope
='module')
139 def nsm_proxy(request
, mgmt_session
):
140 return mgmt_session
.proxy(RwNsmYang
)
143 @pytest.fixture(scope
='session')
144 def kt_utm_vnfd_package_file():
145 ktutm_pkg_file
= os
.path
.join(
146 RW_KT_UTM_PKG_INSTALL_DIR
,
147 "kt_utm_vnfd.tar.gz",
149 if not os
.path
.exists(ktutm_pkg_file
):
150 raise_package_error()
152 return ktutm_pkg_file
154 @pytest.fixture(scope
='session')
155 def utm_only_nsd_package_file():
156 ktutm_nsd_pkg_file
= os
.path
.join(
157 RW_KT_UTM_NSD_PKG_INSTALL_DIR
,
158 "utm_only_nsd.tar.gz",
160 if not os
.path
.exists(ktutm_nsd_pkg_file
):
161 raise_package_error()
163 return ktutm_nsd_pkg_file
165 def upload_descriptor(logger
, descriptor_file
, host
="127.0.0.1"):
166 curl_cmd
= 'curl -F "descriptor=@{file}" http://{host}:4567/api/upload'.format(
167 file=descriptor_file
,
170 logger
.debug("Uploading descriptor %s using cmd: %s", descriptor_file
, curl_cmd
)
171 stdout
= subprocess
.check_output(shlex
.split(curl_cmd
), universal_newlines
=True)
173 json_out
= json
.loads(stdout
)
174 transaction_id
= json_out
["transaction_id"]
176 return transaction_id
179 class DescriptorOnboardError(Exception):
183 def wait_unboard_transaction_finished(logger
, transaction_id
, timeout_secs
=600, host
="127.0.0.1"):
184 logger
.info("Waiting for onboard trans_id %s to complete",
186 start_time
= time
.time()
187 while (time
.time() - start_time
) < timeout_secs
:
189 'http://{host}:4567/api/upload/{t_id}/state'.format(
190 host
=host
, t_id
=transaction_id
194 if state
["status"] == "pending":
198 elif state
["status"] == "success":
199 logger
.info("Descriptor onboard was successful")
203 raise DescriptorOnboardError(state
)
205 if state
["status"] != "success":
206 raise DescriptorOnboardError(state
)
208 def create_nsr_from_nsd_id(nsd_id
):
209 nsr
= NsrYang
.YangData_Nsr_NsInstanceConfig_Nsr()
210 nsr
.id = str(uuid
.uuid4())
211 nsr
.name
= "UTM-only"
212 nsr
.short_name
= "UTM-only"
213 nsr
.description
= "1 VNFs with 5 VLs"
215 nsr
.admin_status
= "ENABLED"
219 @pytest.mark
.incremental
220 class TestLaunchpadStartStop(object):
221 def test_configure_logging(self
, rwlog_mgmt_proxy
):
222 logging
= RwlogMgmtYang
.Logging
.from_dict({
227 "name": "rw-generic",
233 rwlog_mgmt_proxy
.merge_config("/rwlog-mgmt:logging", logging
)
235 def test_configure_cloud_account(self
, cloud_proxy
, logger
):
236 cloud_account
= RwCloudYang
.CloudAccountConfig()
237 # cloud_account.name = "cloudsim_proxy"
238 # cloud_account.account_type = "cloudsim_proxy"
239 cloud_account
.name
= "openstack"
240 cloud_account
.account_type
= "openstack"
241 cloud_account
.openstack
.key
= 'pluto'
242 cloud_account
.openstack
.secret
= 'mypasswd'
243 cloud_account
.openstack
.auth_url
= 'http://10.66.4.13:5000/v3/'
244 cloud_account
.openstack
.tenant
= 'demo'
245 cloud_account
.openstack
.mgmt_network
= 'private'
247 cloud_proxy
.merge_config("/rw-cloud:cloud-account", cloud_account
)
249 def test_configure_pools(self
, resource_mgr_proxy
):
250 pools
= RwResourceMgrYang
.ResourcePools
.from_dict({
251 "pools": [{ "name": "vm_pool_a",
252 "resource_type": "compute",
253 "pool_type" : "dynamic"},
254 {"name": "network_pool_a",
255 "resource_type": "network",
256 "pool_type" : "dynamic",}]})
258 resource_mgr_proxy
.merge_config('/rw-resource-mgr:resource-mgr-config/rw-resource-mgr:resource-pools', pools
)
260 def test_configure_resource_orchestrator(self
, so_proxy
):
261 cfg
= RwConmanYang
.RoEndpoint
.from_dict({'ro_ip_address': '127.0.0.1',
263 'ro_username' : 'admin',
264 'ro_password' : 'admin'})
265 so_proxy
.merge_config('/rw-conman:cm-config', cfg
)
267 def test_configure_service_orchestrator(self
, nsm_proxy
):
268 cfg
= RwNsmYang
.SoEndpoint
.from_dict({'cm_ip_address': '127.0.0.1',
270 'cm_username' : 'admin',
271 'cm_password' : 'admin'})
272 nsm_proxy
.merge_config('/rw-nsm:ro-config/rw-nsm:cm-endpoint', cfg
)
275 def test_onboard_ktutm_vnfd(self
, logger
, vnfd_proxy
, kt_utm_vnfd_package_file
):
276 logger
.info("Onboarding kt_utm_vnfd package: %s", kt_utm_vnfd_package_file
)
277 trans_id
= upload_descriptor(logger
, kt_utm_vnfd_package_file
)
278 wait_unboard_transaction_finished(logger
, trans_id
)
280 catalog
= vnfd_proxy
.get_config('/vnfd-catalog')
282 assert len(vnfds
) == 1, "There should only be a single vnfd"
284 assert vnfd
.name
== "kt_utm_vnfd"
286 def test_onboard_utm_only_nsd(self
, logger
, nsd_proxy
, utm_only_nsd_package_file
):
287 logger
.info("Onboarding utm_onlynsd package: %s", utm_only_nsd_package_file
)
288 trans_id
= upload_descriptor(logger
, utm_only_nsd_package_file
)
289 wait_unboard_transaction_finished(logger
, trans_id
)
291 catalog
= nsd_proxy
.get_config('/nsd-catalog')
293 assert len(nsds
) == 1, "There should only be a single nsd"
296 def test_instantiate_utm_only_nsr(self
, logger
, nsd_proxy
, nsr_proxy
, rwnsr_proxy
, base_proxy
):
297 catalog
= nsd_proxy
.get_config('/nsd-catalog')
300 nsr
= create_nsr_from_nsd_id(nsd
.id)
301 nsr_proxy
.merge_config('/ns-instance-config', nsr
)
303 nsr_opdata
= rwnsr_proxy
.get('/ns-instance-opdata')
304 nsrs
= nsr_opdata
.nsr
305 assert len(nsrs
) == 1
306 assert nsrs
[0].ns_instance_config_ref
== nsr
.id