update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / ra / pytest / ns / rbac / test_rbac_usages.py
1 #!/usr/bin/env python3
2 """
3 #
4 # Copyright 2017 RIFT.IO Inc
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18 """
19
20 import gi
21 import pytest
22 import time
23 import random
24 import rift.auto.mano
25 import rift.auto.descriptor
26
27 gi.require_version('RwConmanYang', '1.0')
28 gi.require_version('RwProjectVnfdYang', '1.0')
29 gi.require_version('RwProjectNsdYang', '1.0')
30 gi.require_version('RwNsrYang', '1.0')
31 gi.require_version('RwVnfrYang', '1.0')
32 gi.require_version('RwRbacInternalYang', '1.0')
33 gi.require_version('RwRbacPlatformYang', '1.0')
34 gi.require_version('RwProjectYang', '1.0')
35 gi.require_version('RwUserYang', '1.0')
36 gi.require_version('RwOpenidcProviderYang', '1.0')
37 from gi.repository import (
38 RwConmanYang,
39 RwProjectVnfdYang,
40 RwProjectNsdYang,
41 RwNsrYang,
42 RwVnfrYang,
43 RwVlrYang,
44 RwRbacInternalYang,
45 RwRbacPlatformYang,
46 RwProjectYang,
47 RwUserYang,
48 RwOpenidcProviderYang,
49 )
50 gi.require_version('RwKeyspec', '1.0')
51 from gi.repository.RwKeyspec import quoted_key
52
53 @pytest.fixture(scope='session')
54 def complex_scaling_factor():
55 return 10
56
57 @pytest.mark.incremental
58 class TestRbacSetup(object):
59 def test_onboarded_vnfds_project_independent(self, descriptors, logger, rbac_platform_proxy, rw_conman_proxy, rw_user_proxy,
60 rw_project_proxy, rbac_user_passwd, user_domain, fmt_vnfd_catalog_xpath, session_class, confd_host, fmt_vnfd_id_xpath, rw_rbac_int_proxy):
61 """Same VNFDs on boarded in two different projects. VNFD changes in one project shouldn't affect another."""
62 map_project_user_roles = {
63 'user1': ('project_test_onboarded_vnfds_project_independent_1', 'rw-project-mano:catalog-admin'),
64 'user2': ('project_test_onboarded_vnfds_project_independent_2', 'rw-project:project-admin'),
65 }
66 user_to_modify_vnfds, user_not_supposed_to_see_vnfd_changes = 'user1', 'user2'
67
68 modified_vnfd_name = 'test_rbac_vnfd'
69 user_sessions = {}
70 logger.debug('descriptors being used: {}'.format(descriptors))
71
72 for user, project_role_tuple in map_project_user_roles.items():
73 project_name, role = project_role_tuple
74 logger.debug('Creating user {} with {}'.format(user, project_role_tuple))
75
76 rift.auto.mano.create_project(rw_conman_proxy, project_name)
77 rift.auto.mano.create_user(rw_user_proxy, user, rbac_user_passwd, user_domain)
78 if 'platform' in role:
79 rift.auto.mano.assign_platform_role_to_user(rbac_platform_proxy, role, user, user_domain, rw_rbac_int_proxy)
80 else:
81 rift.auto.mano.assign_project_role_to_user(rw_project_proxy, role, user,
82 project_name, user_domain, rw_rbac_int_proxy)
83
84 logger.debug('User {} onboarding the packages'.format(user))
85 user_session = rift.auto.mano.get_session(session_class, confd_host, user, rbac_user_passwd)
86 user_sessions[user] = user_session
87 for descriptor in descriptors:
88 rift.auto.descriptor.onboard(user_session, descriptor, project=project_name)
89
90 vnfd_pxy = user_sessions[user_to_modify_vnfds].proxy(RwProjectVnfdYang)
91 vnfd_xpath = '{}/vnfd'.format(fmt_vnfd_catalog_xpath.format(project=quoted_key(map_project_user_roles[user_to_modify_vnfds][0])))
92 for vnfd in vnfd_pxy.get(vnfd_xpath, list_obj=True).vnfd:
93 logger.debug('Changing the vnfd name from {} to {} for user {}'.format(vnfd.name, modified_vnfd_name, user_to_modify_vnfds))
94 vnfd.name = modified_vnfd_name
95 vnfd_pxy.replace_config(fmt_vnfd_id_xpath.format(
96 project=quoted_key(map_project_user_roles[user_to_modify_vnfds][0]), vnfd_id=quoted_key(vnfd.id)), vnfd)
97
98 for vnfd in vnfd_pxy.get(vnfd_xpath, list_obj=True).vnfd:
99 assert vnfd.name == modified_vnfd_name
100
101 vnfd_pxy = user_sessions[user_not_supposed_to_see_vnfd_changes].proxy(RwProjectVnfdYang)
102 vnfd_xpath = '{}/vnfd'.format(fmt_vnfd_catalog_xpath.format(project=quoted_key(map_project_user_roles[user_not_supposed_to_see_vnfd_changes][0])))
103 for vnfd in vnfd_pxy.get(vnfd_xpath, list_obj=True).vnfd:
104 logger.debug('Verifying the vnfd name {} for user {} did not change to {}'.format(
105 vnfd.name, user_not_supposed_to_see_vnfd_changes, modified_vnfd_name))
106 assert vnfd.name != modified_vnfd_name
107
108 def test_multi_projects_multi_vnf(
109 self, rw_project_proxy, rw_conman_proxy, cloud_account,
110 cloud_module, descriptors, session_class,
111 confd_host, user_domain, mgmt_session, fmt_nsd_catalog_xpath,
112 logger, rw_rbac_int_proxy):
113 """Creates multiple projects, cloud accounts and then
114 instantiates them. Then it lets the instantiated NS's run for a minute
115 after which gets terminated. Use the SCALE_FACTOR to adjust the number
116 of instantiations."""
117
118 def instantiate_nsr_not_wait(nsr, rwnsr_proxy, project='default'):
119 ns_instance_opdata_xpath = '/project[name={}]/ns-instance-opdata'.format(quoted_key(project))
120 rwnsr_proxy.create_config('/rw-project:project[rw-project:name={}]/nsr:ns-instance-config/nsr:nsr'.format(quoted_key(project)), nsr)
121 nsr_opdata = rwnsr_proxy.get('{}/nsr[ns-instance-config-ref={}]'.format(ns_instance_opdata_xpath, quoted_key(nsr.id)))
122 assert nsr_opdata is not None
123
124 nsr_opdata = rwnsr_proxy.get(ns_instance_opdata_xpath)
125 nsr_ = [nsr_ for nsr_ in nsr_opdata.nsr if nsr_.ns_instance_config_ref==nsr.id][0]
126
127 #Creating multiple projects according to the scale factor
128 SCALE_FACTOR = 5
129 PROJECT_LIST = {}
130 for idx in range(1,SCALE_FACTOR+1):
131 rift.auto.mano.create_project(rw_conman_proxy, 'cloud_project_{}'.format(idx))
132 PROJECT_LIST['cloud_project_{}'.format(idx)] = None
133 rift.auto.mano.assign_project_role_to_user(rw_project_proxy, 'rw-project:project-admin', 'admin', 'cloud_project_{}'
134 .format(idx), 'system', rw_rbac_int_proxy)
135 #Creating cloud accounts, uploading descriptors, instantiating NS
136 for project_name in PROJECT_LIST:
137 rift.auto.mano.create_cloud_account(mgmt_session, cloud_account, project_name)
138 for descriptor in descriptors:
139 rift.auto.descriptor.onboard(mgmt_session, descriptor, project=project_name)
140 admin_nsd_pxy = mgmt_session.proxy(RwProjectNsdYang)
141 nsd_catalog = admin_nsd_pxy.get_config(fmt_nsd_catalog_xpath.format(project=quoted_key(project_name)))
142 assert nsd_catalog
143 nsd = nsd_catalog.nsd[0]
144 nsr = rift.auto.descriptor.create_nsr(cloud_account.name, nsd.name, nsd)
145 PROJECT_LIST[project_name] = nsr
146
147 for project_name, NSR in PROJECT_LIST.items():
148 admin_rwnsr_pxy = mgmt_session.proxy(RwNsrYang)
149 admin_rwvnfr_pxy = mgmt_session.proxy(RwVnfrYang)
150 admin_rwvlr_pxy = mgmt_session.proxy(RwVlrYang)
151 instantiate_nsr_not_wait(NSR, admin_rwnsr_pxy,
152 project=project_name)
153
154 # Waiting for NS's to get started and configured.
155 for project_name in PROJECT_LIST:
156 admin_rwnsr_pxy = mgmt_session.proxy(RwNsrYang)
157 nsr_opdata = admin_rwnsr_pxy.get('/rw-project:project[rw-project:name={}]/ns-instance-opdata'.format(quoted_key(project_name)))
158 nsrs = nsr_opdata.nsr
159
160 for nsr in nsrs:
161 xpath = "/rw-project:project[rw-project:name={}]/ns-instance-opdata/nsr[ns-instance-config-ref={}]/operational-status".format(
162 quoted_key(project_name), quoted_key(nsr.ns_instance_config_ref))
163 admin_rwnsr_pxy.wait_for(xpath, "running", fail_on=['failed'], timeout=400)
164
165 for nsr in nsrs:
166 xpath = "/rw-project:project[rw-project:name={}]/ns-instance-opdata/nsr[ns-instance-config-ref={}]/config-status".format(
167 quoted_key(project_name), quoted_key(nsr.ns_instance_config_ref))
168 admin_rwnsr_pxy.wait_for(xpath, "configured", fail_on=['failed'], timeout=400)
169
170 # Letting the started NS's run for a minute after which is terminated
171 start_time = time.time()
172 while (time.time() - start_time) < 60:
173 time.sleep(2)
174 for project_name in PROJECT_LIST:
175 rift.auto.descriptor.terminate_nsr(
176 admin_rwvnfr_pxy, admin_rwnsr_pxy, admin_rwvlr_pxy, logger,
177 project=project_name)
178
179 def test_descriptor_nsr_persistence_check(
180 self, rw_conman_proxy, rw_user_proxy, rw_project_proxy,
181 cloud_account, cloud_module, mgmt_session, descriptors, logger,
182 user_domain, session_class, confd_host, rbac_user_passwd,
183 fmt_nsd_catalog_xpath, rw_rbac_int_proxy):
184 """Creates a project and cloud account for it. Uploads descriptors.
185 Logs in as project-admin and checks if the uploaded descriptors
186 are still there, after which he logs out.
187 Then instantiates nsr. Again logs in as project admin and checks
188 if the instantiated nsr is still there."""
189 # Creating a project, assigning project admin and creating
190 # a cloud account for the project
191 for idx in range(1,6):
192 rift.auto.mano.create_project(rw_conman_proxy, 'xcloud_project_{}'.format(idx))
193 rift.auto.mano.create_user(rw_user_proxy, 'project_admin_{}'.format(idx), rbac_user_passwd, user_domain)
194 rift.auto.mano.assign_project_role_to_user(rw_project_proxy, 'rw-project:project-admin', 'project_admin_{}'
195 .format(idx), 'xcloud_project_{}'.format(idx), user_domain, rw_rbac_int_proxy)
196 rift.auto.mano.create_cloud_account(mgmt_session, cloud_account, 'xcloud_project_{}'.format(idx))
197 #Uploading descriptors and verifying its existence from another user(project admin)
198 for descriptor in descriptors:
199 rift.auto.descriptor.onboard(mgmt_session, descriptor, project='xcloud_project_{}'.format(idx))
200 user_session = rift.auto.mano.get_session(session_class, confd_host, 'project_admin_{}'.format(idx), rbac_user_passwd)
201 project_admin_nsd_pxy = user_session.proxy(RwProjectNsdYang)
202 nsd_catalog = project_admin_nsd_pxy.get_config(fmt_nsd_catalog_xpath.format(project=quoted_key('xcloud_project_{}'.format(idx))))
203 assert nsd_catalog, "Descriptor Not found on try no: {}".format(idx)
204 nsd = nsd_catalog.nsd[0]
205 nsr = rift.auto.descriptor.create_nsr(cloud_account.name, nsd.name, nsd)
206 rift.auto.mano.close_session(user_session)
207 #Instantiating the nsr and verifying its existence from another user(project admin), after which it gets terminated
208 admin_rwnsr_pxy = mgmt_session.proxy(RwNsrYang)
209 admin_rwvnfr_pxy = mgmt_session.proxy(RwVnfrYang)
210 admin_rwvlr_pxy = mgmt_session.proxy(RwVlrYang)
211
212 rift.auto.descriptor.instantiate_nsr(nsr, admin_rwnsr_pxy, logger, project='xcloud_project_{}'.format(idx))
213 user_session = rift.auto.mano.get_session(session_class, confd_host, 'project_admin_{}'.format(idx), rbac_user_passwd)
214 pxy = user_session.proxy(RwNsrYang)
215 nsr_opdata = pxy.get('/rw-project:project[rw-project:name={}]/ns-instance-opdata'.format(quoted_key('xcloud_project_{}'.format(idx))))
216 nsrs = nsr_opdata.nsr
217 for nsr in nsrs:
218 xpath = "/rw-project:project[rw-project:name={}]/ns-instance-opdata/nsr[ns-instance-config-ref={}]/config-status".format(
219 quoted_key('xcloud_project_{}'.format(idx)), quoted_key(nsr.ns_instance_config_ref))
220 pxy.wait_for(xpath, "configured", fail_on=['failed'], timeout=60)
221 rift.auto.mano.close_session(user_session)
222 rift.auto.descriptor.terminate_nsr(
223 admin_rwvnfr_pxy, admin_rwnsr_pxy, admin_rwvlr_pxy, logger,
224 project='xcloud_project_{}'.format(idx))
225
226 def delete_records(self, nsd_proxy, vnfd_proxy, project_name='default'):
227 """Delete the NSD & VNFD records."""
228 nsds = nsd_proxy.get(
229 "/rw-project:project[rw-project:name={}]/nsd-catalog/nsd".format(
230 quoted_key(project_name)),
231 list_obj=True)
232 for nsd in nsds.nsd:
233 xpath = (
234 "/rw-project:project[rw-project:name={}]".format(
235 quoted_key(project_name)) +
236 "/nsd-catalog/nsd[id={}]".format(quoted_key(nsd.id))
237 )
238 nsd_proxy.delete_config(xpath)
239
240 nsds = nsd_proxy.get(
241 "/rw-project:project[rw-project:name={}]/nsd-catalog/nsd".format(
242 quoted_key(project_name)),
243 list_obj=True)
244 assert nsds is None or len(nsds.nsd) == 0
245
246 vnfds = vnfd_proxy.get(
247 "/rw-project:project[rw-project:name={}]/vnfd-catalog/vnfd".format(
248 quoted_key(project_name)),
249 list_obj=True)
250 for vnfd_record in vnfds.vnfd:
251 xpath = (
252 "/rw-project:project[rw-project:name={}]/".format(
253 quoted_key(project_name)) +
254 "vnfd-catalog/vnfd[id={}]".format(quoted_key(vnfd_record.id))
255 )
256 vnfd_proxy.delete_config(xpath)
257
258 vnfds = vnfd_proxy.get(
259 "/rw-project:project[rw-project:name={}]/vnfd-catalog/vnfd".format(
260 quoted_key(project_name)),
261 list_obj=True)
262 assert vnfds is None or len(vnfds.vnfd) == 0
263
264 def test_delete_project_and_vim_accounts(
265 self, rw_conman_proxy, rw_user_proxy, logger,
266 rbac_user_passwd, user_domain, rw_project_proxy, rw_rbac_int_proxy,
267 mgmt_session, cloud_module, cloud_account, descriptors,
268 fmt_nsd_catalog_xpath, session_class, confd_host):
269 """Testing vim accounts."""
270 # Create a project and three cloud accounts for it.
271 rift.auto.mano.create_project(rw_conman_proxy, 'vim_project')
272 rift.auto.mano.assign_project_role_to_user(
273 rw_project_proxy, 'rw-project:project-admin', 'admin',
274 'vim_project', 'system', rw_rbac_int_proxy)
275 for idx in range(1, 4):
276 rift.auto.mano.create_cloud_account(
277 mgmt_session, cloud_account,
278 'vim_project', 'cloud_account_{}'.format(idx))
279 # Uploading descriptors
280 for descriptor in descriptors:
281 rift.auto.descriptor.onboard(
282 mgmt_session, descriptor, project='vim_project')
283 nsd_pxy = mgmt_session.proxy(RwProjectNsdYang)
284 nsd_catalog = nsd_pxy.get_config(fmt_nsd_catalog_xpath.format(
285 project=quoted_key('vim_project')))
286 assert nsd_catalog
287 nsd = nsd_catalog.nsd[0]
288 nsr = rift.auto.descriptor.create_nsr(
289 'cloud_account_1', nsd.name, nsd)
290 # Instantiating the nsr
291 rwnsr_pxy = mgmt_session.proxy(RwNsrYang)
292 rift.auto.descriptor.instantiate_nsr(
293 nsr, rwnsr_pxy, logger, project='vim_project')
294 # Trying to delete the project before taking the instance down
295 with pytest.raises(
296 Exception,
297 message="Project deletion should've failed"):
298 rift.auto.mano.delete_project(rw_conman_proxy, 'vim_project')
299 # Trying to delete the vim account before taking the instance down
300 with pytest.raises(
301 Exception,
302 message="Vim account deletion should've failed"):
303 rift.auto.mano.delete_cloud_account(
304 mgmt_session, 'cloud_account_1', 'vim_project')
305 # Terminating the nsr
306 rwvnfr_pxy = mgmt_session.proxy(RwVnfrYang)
307 rwvlr_pxy = mgmt_session.proxy(RwVlrYang)
308 rift.auto.descriptor.terminate_nsr(
309 rwvnfr_pxy, rwnsr_pxy, rwvlr_pxy, logger, project='vim_project')
310 # Delete all cloud accounts for the project
311 for idx in range(1, 4):
312 rift.auto.mano.delete_cloud_account(
313 mgmt_session, 'cloud_account_{}'.format(idx), 'vim_project')
314 # Delete the uploaded descriptors
315 vnfd_proxy = mgmt_session.proxy(RwProjectVnfdYang)
316 self.delete_records(nsd_pxy, vnfd_proxy, 'vim_project')
317 # Delete the project
318 rift.auto.mano.delete_project(rw_conman_proxy, 'vim_project')
319 # Check in rw-rbac-internal if project is removed
320 rwinternal_xpath = '/rw-rbac-internal/role'
321 response = (
322 rw_rbac_int_proxy.get(
323 rwinternal_xpath, list_obj=True)
324 ).as_dict()['role']
325 keys = [role['keys'] for role in response if 'keys' in role]
326 for key in keys:
327 assert 'vim_project' not in key, "Improper project deletion"
328
329 @pytest.mark.skipif(
330 not pytest.config.getoption("--complex-scaling"),
331 reason="need --complex-scaling option to run")
332 def test_complex_scaling(
333 self, rw_conman_proxy, rw_user_proxy, rbac_user_passwd,
334 user_domain, rw_project_proxy, rw_rbac_int_proxy, logger,
335 rbac_platform_proxy, user_roles, platform_roles, mgmt_session,
336 cloud_module, cloud_account, rw_ro_account_proxy,
337 tbac, fmt_nsd_catalog_xpath, descriptors, complex_scaling_factor):
338 """Complex scaling - Default values.
339
340 No. of projects - 25 (Two users & two cloud accounts per project)
341 No. of users - 50 (Two roles per user)
342 No. of cloud accounts - 50
343 No. of RO accounts - 25 (50 if you are considering the default 'rift').
344 """
345 # This test can be controlled using complex_scaling_factor fixture
346 logger.debug('Creating projects')
347 for idx in range(1, complex_scaling_factor + 1):
348 rift.auto.mano.create_project(
349 rw_conman_proxy, 'scaling_project_{}'.format(idx)
350 )
351 logger.debug('Create users, cloud accounts double the no. of projects')
352 for idx in range(1, (2 * complex_scaling_factor) + 1):
353 project_index = int((idx + 1) / 2)
354 rift.auto.mano.create_user(
355 rw_user_proxy, 'scaling_user_{}'.format(idx),
356 rbac_user_passwd, user_domain)
357 # Each user has a project role & platform role
358 pr_role = random.choice(user_roles)
359 pl_role = random.choice(platform_roles)
360 rift.auto.mano.assign_project_role_to_user(
361 rw_project_proxy, pr_role, 'scaling_user_{}'.format(idx),
362 'scaling_project_{}'.format(project_index), user_domain,
363 rw_rbac_int_proxy)
364 rift.auto.mano.assign_platform_role_to_user(
365 rbac_platform_proxy, pl_role,
366 'scaling_user_{}'.format(idx), user_domain, rw_rbac_int_proxy)
367 # Creating two cloud accounts for each project
368 rift.auto.mano.create_cloud_account(
369 mgmt_session, cloud_account,
370 'scaling_project_{}'.format(project_index),
371 'cloud_account_{}'.format(idx)
372 )
373 logger.debug('Creating RO accounts')
374 for idx in range(1, complex_scaling_factor + 1):
375 rift.auto.mano.create_ro_account(
376 rw_ro_account_proxy, 'ro_account_{}'.format(idx),
377 'scaling_project_{}'.format(idx)
378 )
379 # Uploading descriptors
380 for descriptor in descriptors:
381 rift.auto.descriptor.onboard(
382 mgmt_session, descriptor,
383 project='scaling_project_{}'.format(idx)
384 )
385 nsd_pxy = mgmt_session.proxy(RwProjectNsdYang)
386 nsd_catalog = nsd_pxy.get_config(
387 fmt_nsd_catalog_xpath.format(
388 project=quoted_key('scaling_project_{}'.format(idx))
389 )
390 )
391 assert nsd_catalog
392
393 @pytest.mark.skipif(
394 not pytest.config.getoption("--complex-scaling"),
395 reason="need --complex-scaling option to run")
396 def test_complex_scaling_verification(
397 self, complex_scaling_factor, rw_project_proxy, rw_ro_account_proxy,
398 mgmt_session, fmt_nsd_catalog_xpath, cloud_module, logger):
399 """Reboot verification script for test_complex_scaling."""
400 for idx in range(1, complex_scaling_factor + 1):
401 # Verifying projects
402 logger.debug('Verification: projects, ro accounts started')
403 project_name = 'scaling_project_{}'.format(idx)
404 project_cm_config_xpath = '/project[name={project_name}]/project-state'
405 project_ = rw_project_proxy.get_config(
406 project_cm_config_xpath.format(
407 project_name=quoted_key(project_name)
408 ),
409 list_obj=True
410 )
411 assert project_
412 # Verifying RO Accounts
413 ro_account_name = 'ro_account_{}'.format(idx)
414 ro_obj = rw_ro_account_proxy.get_config(
415 '/project[name={}]/ro-account/account[name={}]'.format(
416 quoted_key(project_name), quoted_key(ro_account_name))
417 )
418 assert ro_obj.name == ro_account_name
419 assert ro_obj.ro_account_type == 'openmano'
420 logger.debug('Verification: descriptors, cloud accounts started')
421 # Verifying Descriptors
422 nsd_pxy = mgmt_session.proxy(RwProjectNsdYang)
423 nsd_catalog = nsd_pxy.get_config(
424 fmt_nsd_catalog_xpath.format(
425 project=quoted_key(project_name)
426 )
427 )
428 assert nsd_catalog
429 for idx in range(1, (2 * complex_scaling_factor) + 1):
430 # Verifying cloud accounts
431 project_index = int((idx + 1) / 2)
432 project_name = 'scaling_project_{}'.format(project_index)
433 cloud_acc_name = 'cloud_account_{}'.format(idx)
434 fmt_cloud_xpath = (
435 '/project[name={project}]/cloud/account[name={account_name}]'
436 )
437 cloud_pxy = mgmt_session.proxy(cloud_module)
438 response = cloud_pxy.get(fmt_cloud_xpath.format(
439 project=quoted_key(project_name),
440 account_name=quoted_key(cloud_acc_name))
441 )
442 assert response.name == cloud_acc_name
443
444
445 def test_change_visibility_same_session(self, session_class, rw_conman_proxy, confd_host, logger,
446 user_domain, project_keyed_xpath, rw_project_proxy, rw_rbac_int_proxy, rw_user_proxy):
447 """admin make changes which is seen by the operator already logged in for the same project.
448
449 oper is logged in. admin assigns oper to a new project X. oper should be able to see the new project X being \
450 in the same session without re-logging-in.
451 """
452 user = 'oper2' if user_domain != 'default' else 'oper'
453 oper_user, oper_passwd = [user]*2
454
455 if user_domain != 'default':
456 rift.auto.mano.create_user(rw_user_proxy, oper_user, oper_passwd, user_domain)
457 rift.auto.mano.assign_project_role_to_user(rw_project_proxy, 'rw-project:project-oper', oper_user,
458 'default', user_domain, rw_rbac_int_proxy)
459 oper_session = rift.auto.mano.get_session(session_class, confd_host, oper_user, oper_passwd)
460 oper_conman_pxy = oper_session.proxy(RwProjectYang)
461
462 default_project_cm_config_xpath = project_keyed_xpath.format(project_name=quoted_key('default'))+'/project-state'
463 assert oper_conman_pxy.get_config(default_project_cm_config_xpath, list_obj=True)
464
465 # admin assigns oper 'project-admin' role under a new project
466 new_project = 'project_test_change_visibility_same_session_1'
467 rift.auto.mano.create_project(rw_project_proxy, new_project)
468 rift.auto.mano.assign_project_role_to_user(rw_project_proxy, 'rw-project:project-admin', oper_user, new_project,
469 user_domain, rw_rbac_int_proxy)
470
471 # Check oper user should be able to access the new project
472 new_project_cm_config_xpath = project_keyed_xpath.format(project_name=quoted_key(new_project))+'/project-state'
473 assert oper_conman_pxy.get_config(new_project_cm_config_xpath, list_obj=True)
474
475 def test_super_admin(
476 self, rw_user_proxy, rbac_platform_proxy, rw_project_proxy,
477 session_class, confd_host, rbac_user_passwd, user_domain,
478 rw_rbac_int_proxy):
479 """Variou tests on the super-admin role."""
480 # Creating two super admins and then deleting the first one.
481 rift.auto.mano.create_user(
482 rw_user_proxy, 'super_admin', rbac_user_passwd, user_domain)
483 rift.auto.mano.assign_platform_role_to_user(
484 rbac_platform_proxy, 'rw-rbac-platform:super-admin',
485 'super_admin', user_domain, rw_rbac_int_proxy)
486 rift.auto.mano.create_user(
487 rw_user_proxy, 'super_admin_2', rbac_user_passwd, user_domain)
488 rift.auto.mano.assign_platform_role_to_user(
489 rbac_platform_proxy, 'rw-rbac-platform:super-admin',
490 'super_admin_2', user_domain, rw_rbac_int_proxy)
491
492 user_session = rift.auto.mano.get_session(
493 session_class, confd_host, 'super_admin_2', rbac_user_passwd)
494 pxy = user_session.proxy(RwRbacPlatformYang)
495 role_keyed_path = (
496 "/rbac-platform-config/" +
497 "user[user-name={user}][user-domain={domain}]"
498 )
499 pxy.delete_config(role_keyed_path.format(
500 user=quoted_key('super_admin'), domain=quoted_key(user_domain))
501 )
502 pxy = user_session.proxy(RwUserYang)
503 rift.auto.mano.delete_user(pxy, 'super_admin', user_domain)
504 rift.auto.mano.close_session(user_session)
505
506 @pytest.mark.skipif(not pytest.config.getoption("--tbac"), reason="need --tbac option to run")
507 def test_token_expiry_timeout(self, mgmt_session, rw_user_proxy, rw_conman_proxy, rbac_user_passwd, user_domain,
508 confd_host, logger, rw_project_proxy, rw_rbac_int_proxy, session_class):
509 """Set 30 seconds as token-expiry-timeout; then verifies an user session is automatically expired after 30 secs"""
510 test_user, role = 'user-1', 'rw-project:project-oper'
511 test_proj = 'project_test_token_expiry_timeout'
512 token_expiry_timeout = 30
513
514 logger.debug('Creating user {} under project {} and assigning it {}'.format(test_user, test_proj, role))
515 rift.auto.mano.create_project(rw_conman_proxy, test_proj)
516 rift.auto.mano.create_user(rw_user_proxy, test_user, rbac_user_passwd, user_domain)
517 rift.auto.mano.assign_project_role_to_user(rw_project_proxy, role, test_user, test_proj, user_domain, rw_rbac_int_proxy)
518
519 # admin user setting token_expiry_timeout
520 openidc_provider_xpath = '/rw-openidc-provider:openidc-provider-config'
521 openidc_provider = RwOpenidcProviderYang.YangData_RwOpenidcProvider_OpenidcProviderConfig.from_dict(
522 {'token_expiry_timeout': 30})
523 pxy = mgmt_session.proxy(RwOpenidcProviderYang)
524 logger.debug('Settig token_expiry_timeout to {} secs'.format(token_expiry_timeout))
525 pxy.replace_config(openidc_provider_xpath, openidc_provider)
526
527 # Verifying if token_expiry_timeout is set in openidc-provider-config
528 openidc_provider = pxy.get_config(openidc_provider_xpath)
529 assert openidc_provider
530 assert openidc_provider.token_expiry_timeout == token_expiry_timeout
531
532 def project_access(user_session):
533 user_conman_pxy = user_session.proxy(RwProjectYang)
534 assert user_conman_pxy.get_config('/project[name={}]/project-state'.format(quoted_key(test_proj)), list_obj=True)
535
536 # Log-in as test_user and validate operations under that user getting 'Unauthorized' after time-out
537 user_session = rift.auto.mano.get_session(session_class, confd_host, test_user, rbac_user_passwd)
538 project_access(user_session)
539
540 logger.debug('Sleeping for {} secs'.format(token_expiry_timeout))
541 time.sleep(token_expiry_timeout+5)
542
543 with pytest.raises(Exception, message='logged-in user able to access default project even after token expired'):
544 logger.debug('User {} trying to access default project. It should fail')
545 project_access(user_session)
546
547 # log-in as same user and perform the same operation. It should pass now.
548 user_session = rift.auto.mano.get_session(session_class, confd_host, test_user, rbac_user_passwd)
549 project_access(user_session)