X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwlaunchpad%2Fra%2Fpytest%2Fns%2Frbac%2Ftest_rbac_usages.py;fp=rwlaunchpad%2Fra%2Fpytest%2Fns%2Frbac%2Ftest_rbac_usages.py;h=cff1c9ce3d3b49754bc7a529b09e2a4e0e6b7c07;hb=4870d0ee29789b859931e4e2c73e13dcb29537d5;hp=0000000000000000000000000000000000000000;hpb=6f1a3fe149e4a6b9803382cb299c902f4cf58ec9;p=osm%2FSO.git diff --git a/rwlaunchpad/ra/pytest/ns/rbac/test_rbac_usages.py b/rwlaunchpad/ra/pytest/ns/rbac/test_rbac_usages.py new file mode 100644 index 00000000..cff1c9ce --- /dev/null +++ b/rwlaunchpad/ra/pytest/ns/rbac/test_rbac_usages.py @@ -0,0 +1,549 @@ +#!/usr/bin/env python3 +""" +# +# Copyright 2017 RIFT.IO Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" + +import gi +import pytest +import time +import random +import rift.auto.mano +import rift.auto.descriptor + +gi.require_version('RwConmanYang', '1.0') +gi.require_version('RwProjectVnfdYang', '1.0') +gi.require_version('RwProjectNsdYang', '1.0') +gi.require_version('RwNsrYang', '1.0') +gi.require_version('RwVnfrYang', '1.0') +gi.require_version('RwRbacInternalYang', '1.0') +gi.require_version('RwRbacPlatformYang', '1.0') +gi.require_version('RwProjectYang', '1.0') +gi.require_version('RwUserYang', '1.0') +gi.require_version('RwOpenidcProviderYang', '1.0') +from gi.repository import ( + RwConmanYang, + RwProjectVnfdYang, + RwProjectNsdYang, + RwNsrYang, + RwVnfrYang, + RwVlrYang, + RwRbacInternalYang, + RwRbacPlatformYang, + RwProjectYang, + RwUserYang, + RwOpenidcProviderYang, +) +gi.require_version('RwKeyspec', '1.0') +from gi.repository.RwKeyspec import quoted_key + +@pytest.fixture(scope='session') +def complex_scaling_factor(): + return 10 + +@pytest.mark.incremental +class TestRbacSetup(object): + def test_onboarded_vnfds_project_independent(self, descriptors, logger, rbac_platform_proxy, rw_conman_proxy, rw_user_proxy, + rw_project_proxy, rbac_user_passwd, user_domain, fmt_vnfd_catalog_xpath, session_class, confd_host, fmt_vnfd_id_xpath, rw_rbac_int_proxy): + """Same VNFDs on boarded in two different projects. VNFD changes in one project shouldn't affect another.""" + map_project_user_roles = { + 'user1': ('project_test_onboarded_vnfds_project_independent_1', 'rw-project-mano:catalog-admin'), + 'user2': ('project_test_onboarded_vnfds_project_independent_2', 'rw-project:project-admin'), + } + user_to_modify_vnfds, user_not_supposed_to_see_vnfd_changes = 'user1', 'user2' + + modified_vnfd_name = 'test_rbac_vnfd' + user_sessions = {} + logger.debug('descriptors being used: {}'.format(descriptors)) + + for user, project_role_tuple in map_project_user_roles.items(): + project_name, role = project_role_tuple + logger.debug('Creating user {} with {}'.format(user, project_role_tuple)) + + rift.auto.mano.create_project(rw_conman_proxy, project_name) + rift.auto.mano.create_user(rw_user_proxy, user, rbac_user_passwd, user_domain) + if 'platform' in role: + rift.auto.mano.assign_platform_role_to_user(rbac_platform_proxy, role, user, user_domain, rw_rbac_int_proxy) + else: + rift.auto.mano.assign_project_role_to_user(rw_project_proxy, role, user, + project_name, user_domain, rw_rbac_int_proxy) + + logger.debug('User {} onboarding the packages'.format(user)) + user_session = rift.auto.mano.get_session(session_class, confd_host, user, rbac_user_passwd) + user_sessions[user] = user_session + for descriptor in descriptors: + rift.auto.descriptor.onboard(user_session, descriptor, project=project_name) + + vnfd_pxy = user_sessions[user_to_modify_vnfds].proxy(RwProjectVnfdYang) + vnfd_xpath = '{}/vnfd'.format(fmt_vnfd_catalog_xpath.format(project=quoted_key(map_project_user_roles[user_to_modify_vnfds][0]))) + for vnfd in vnfd_pxy.get(vnfd_xpath, list_obj=True).vnfd: + logger.debug('Changing the vnfd name from {} to {} for user {}'.format(vnfd.name, modified_vnfd_name, user_to_modify_vnfds)) + vnfd.name = modified_vnfd_name + vnfd_pxy.replace_config(fmt_vnfd_id_xpath.format( + project=quoted_key(map_project_user_roles[user_to_modify_vnfds][0]), vnfd_id=quoted_key(vnfd.id)), vnfd) + + for vnfd in vnfd_pxy.get(vnfd_xpath, list_obj=True).vnfd: + assert vnfd.name == modified_vnfd_name + + vnfd_pxy = user_sessions[user_not_supposed_to_see_vnfd_changes].proxy(RwProjectVnfdYang) + vnfd_xpath = '{}/vnfd'.format(fmt_vnfd_catalog_xpath.format(project=quoted_key(map_project_user_roles[user_not_supposed_to_see_vnfd_changes][0]))) + for vnfd in vnfd_pxy.get(vnfd_xpath, list_obj=True).vnfd: + logger.debug('Verifying the vnfd name {} for user {} did not change to {}'.format( + vnfd.name, user_not_supposed_to_see_vnfd_changes, modified_vnfd_name)) + assert vnfd.name != modified_vnfd_name + + def test_multi_projects_multi_vnf( + self, rw_project_proxy, rw_conman_proxy, cloud_account, + cloud_module, descriptors, session_class, + confd_host, user_domain, mgmt_session, fmt_nsd_catalog_xpath, + logger, rw_rbac_int_proxy): + """Creates multiple projects, cloud accounts and then + instantiates them. Then it lets the instantiated NS's run for a minute + after which gets terminated. Use the SCALE_FACTOR to adjust the number + of instantiations.""" + + def instantiate_nsr_not_wait(nsr, rwnsr_proxy, project='default'): + ns_instance_opdata_xpath = '/project[name={}]/ns-instance-opdata'.format(quoted_key(project)) + rwnsr_proxy.create_config('/rw-project:project[rw-project:name={}]/nsr:ns-instance-config/nsr:nsr'.format(quoted_key(project)), nsr) + nsr_opdata = rwnsr_proxy.get('{}/nsr[ns-instance-config-ref={}]'.format(ns_instance_opdata_xpath, quoted_key(nsr.id))) + assert nsr_opdata is not None + + nsr_opdata = rwnsr_proxy.get(ns_instance_opdata_xpath) + nsr_ = [nsr_ for nsr_ in nsr_opdata.nsr if nsr_.ns_instance_config_ref==nsr.id][0] + + #Creating multiple projects according to the scale factor + SCALE_FACTOR = 5 + PROJECT_LIST = {} + for idx in range(1,SCALE_FACTOR+1): + rift.auto.mano.create_project(rw_conman_proxy, 'cloud_project_{}'.format(idx)) + PROJECT_LIST['cloud_project_{}'.format(idx)] = None + rift.auto.mano.assign_project_role_to_user(rw_project_proxy, 'rw-project:project-admin', 'admin', 'cloud_project_{}' + .format(idx), 'system', rw_rbac_int_proxy) + #Creating cloud accounts, uploading descriptors, instantiating NS + for project_name in PROJECT_LIST: + rift.auto.mano.create_cloud_account(mgmt_session, cloud_account, project_name) + for descriptor in descriptors: + rift.auto.descriptor.onboard(mgmt_session, descriptor, project=project_name) + admin_nsd_pxy = mgmt_session.proxy(RwProjectNsdYang) + nsd_catalog = admin_nsd_pxy.get_config(fmt_nsd_catalog_xpath.format(project=quoted_key(project_name))) + assert nsd_catalog + nsd = nsd_catalog.nsd[0] + nsr = rift.auto.descriptor.create_nsr(cloud_account.name, nsd.name, nsd) + PROJECT_LIST[project_name] = nsr + + for project_name, NSR in PROJECT_LIST.items(): + admin_rwnsr_pxy = mgmt_session.proxy(RwNsrYang) + admin_rwvnfr_pxy = mgmt_session.proxy(RwVnfrYang) + admin_rwvlr_pxy = mgmt_session.proxy(RwVlrYang) + instantiate_nsr_not_wait(NSR, admin_rwnsr_pxy, + project=project_name) + + # Waiting for NS's to get started and configured. + for project_name in PROJECT_LIST: + admin_rwnsr_pxy = mgmt_session.proxy(RwNsrYang) + nsr_opdata = admin_rwnsr_pxy.get('/rw-project:project[rw-project:name={}]/ns-instance-opdata'.format(quoted_key(project_name))) + nsrs = nsr_opdata.nsr + + for nsr in nsrs: + xpath = "/rw-project:project[rw-project:name={}]/ns-instance-opdata/nsr[ns-instance-config-ref={}]/operational-status".format( + quoted_key(project_name), quoted_key(nsr.ns_instance_config_ref)) + admin_rwnsr_pxy.wait_for(xpath, "running", fail_on=['failed'], timeout=400) + + for nsr in nsrs: + xpath = "/rw-project:project[rw-project:name={}]/ns-instance-opdata/nsr[ns-instance-config-ref={}]/config-status".format( + quoted_key(project_name), quoted_key(nsr.ns_instance_config_ref)) + admin_rwnsr_pxy.wait_for(xpath, "configured", fail_on=['failed'], timeout=400) + + # Letting the started NS's run for a minute after which is terminated + start_time = time.time() + while (time.time() - start_time) < 60: + time.sleep(2) + for project_name in PROJECT_LIST: + rift.auto.descriptor.terminate_nsr( + admin_rwvnfr_pxy, admin_rwnsr_pxy, admin_rwvlr_pxy, logger, + project=project_name) + + def test_descriptor_nsr_persistence_check( + self, rw_conman_proxy, rw_user_proxy, rw_project_proxy, + cloud_account, cloud_module, mgmt_session, descriptors, logger, + user_domain, session_class, confd_host, rbac_user_passwd, + fmt_nsd_catalog_xpath, rw_rbac_int_proxy): + """Creates a project and cloud account for it. Uploads descriptors. + Logs in as project-admin and checks if the uploaded descriptors + are still there, after which he logs out. + Then instantiates nsr. Again logs in as project admin and checks + if the instantiated nsr is still there.""" + # Creating a project, assigning project admin and creating + # a cloud account for the project + for idx in range(1,6): + rift.auto.mano.create_project(rw_conman_proxy, 'xcloud_project_{}'.format(idx)) + rift.auto.mano.create_user(rw_user_proxy, 'project_admin_{}'.format(idx), rbac_user_passwd, user_domain) + rift.auto.mano.assign_project_role_to_user(rw_project_proxy, 'rw-project:project-admin', 'project_admin_{}' + .format(idx), 'xcloud_project_{}'.format(idx), user_domain, rw_rbac_int_proxy) + rift.auto.mano.create_cloud_account(mgmt_session, cloud_account, 'xcloud_project_{}'.format(idx)) + #Uploading descriptors and verifying its existence from another user(project admin) + for descriptor in descriptors: + rift.auto.descriptor.onboard(mgmt_session, descriptor, project='xcloud_project_{}'.format(idx)) + user_session = rift.auto.mano.get_session(session_class, confd_host, 'project_admin_{}'.format(idx), rbac_user_passwd) + project_admin_nsd_pxy = user_session.proxy(RwProjectNsdYang) + nsd_catalog = project_admin_nsd_pxy.get_config(fmt_nsd_catalog_xpath.format(project=quoted_key('xcloud_project_{}'.format(idx)))) + assert nsd_catalog, "Descriptor Not found on try no: {}".format(idx) + nsd = nsd_catalog.nsd[0] + nsr = rift.auto.descriptor.create_nsr(cloud_account.name, nsd.name, nsd) + rift.auto.mano.close_session(user_session) + #Instantiating the nsr and verifying its existence from another user(project admin), after which it gets terminated + admin_rwnsr_pxy = mgmt_session.proxy(RwNsrYang) + admin_rwvnfr_pxy = mgmt_session.proxy(RwVnfrYang) + admin_rwvlr_pxy = mgmt_session.proxy(RwVlrYang) + + rift.auto.descriptor.instantiate_nsr(nsr, admin_rwnsr_pxy, logger, project='xcloud_project_{}'.format(idx)) + user_session = rift.auto.mano.get_session(session_class, confd_host, 'project_admin_{}'.format(idx), rbac_user_passwd) + pxy = user_session.proxy(RwNsrYang) + nsr_opdata = pxy.get('/rw-project:project[rw-project:name={}]/ns-instance-opdata'.format(quoted_key('xcloud_project_{}'.format(idx)))) + nsrs = nsr_opdata.nsr + for nsr in nsrs: + xpath = "/rw-project:project[rw-project:name={}]/ns-instance-opdata/nsr[ns-instance-config-ref={}]/config-status".format( + quoted_key('xcloud_project_{}'.format(idx)), quoted_key(nsr.ns_instance_config_ref)) + pxy.wait_for(xpath, "configured", fail_on=['failed'], timeout=60) + rift.auto.mano.close_session(user_session) + rift.auto.descriptor.terminate_nsr( + admin_rwvnfr_pxy, admin_rwnsr_pxy, admin_rwvlr_pxy, logger, + project='xcloud_project_{}'.format(idx)) + + def delete_records(self, nsd_proxy, vnfd_proxy, project_name='default'): + """Delete the NSD & VNFD records.""" + nsds = nsd_proxy.get( + "/rw-project:project[rw-project:name={}]/nsd-catalog/nsd".format( + quoted_key(project_name)), + list_obj=True) + for nsd in nsds.nsd: + xpath = ( + "/rw-project:project[rw-project:name={}]".format( + quoted_key(project_name)) + + "/nsd-catalog/nsd[id={}]".format(quoted_key(nsd.id)) + ) + nsd_proxy.delete_config(xpath) + + nsds = nsd_proxy.get( + "/rw-project:project[rw-project:name={}]/nsd-catalog/nsd".format( + quoted_key(project_name)), + list_obj=True) + assert nsds is None or len(nsds.nsd) == 0 + + vnfds = vnfd_proxy.get( + "/rw-project:project[rw-project:name={}]/vnfd-catalog/vnfd".format( + quoted_key(project_name)), + list_obj=True) + for vnfd_record in vnfds.vnfd: + xpath = ( + "/rw-project:project[rw-project:name={}]/".format( + quoted_key(project_name)) + + "vnfd-catalog/vnfd[id={}]".format(quoted_key(vnfd_record.id)) + ) + vnfd_proxy.delete_config(xpath) + + vnfds = vnfd_proxy.get( + "/rw-project:project[rw-project:name={}]/vnfd-catalog/vnfd".format( + quoted_key(project_name)), + list_obj=True) + assert vnfds is None or len(vnfds.vnfd) == 0 + + def test_delete_project_and_vim_accounts( + self, rw_conman_proxy, rw_user_proxy, logger, + rbac_user_passwd, user_domain, rw_project_proxy, rw_rbac_int_proxy, + mgmt_session, cloud_module, cloud_account, descriptors, + fmt_nsd_catalog_xpath, session_class, confd_host): + """Testing vim accounts.""" + # Create a project and three cloud accounts for it. + rift.auto.mano.create_project(rw_conman_proxy, 'vim_project') + rift.auto.mano.assign_project_role_to_user( + rw_project_proxy, 'rw-project:project-admin', 'admin', + 'vim_project', 'system', rw_rbac_int_proxy) + for idx in range(1, 4): + rift.auto.mano.create_cloud_account( + mgmt_session, cloud_account, + 'vim_project', 'cloud_account_{}'.format(idx)) + # Uploading descriptors + for descriptor in descriptors: + rift.auto.descriptor.onboard( + mgmt_session, descriptor, project='vim_project') + nsd_pxy = mgmt_session.proxy(RwProjectNsdYang) + nsd_catalog = nsd_pxy.get_config(fmt_nsd_catalog_xpath.format( + project=quoted_key('vim_project'))) + assert nsd_catalog + nsd = nsd_catalog.nsd[0] + nsr = rift.auto.descriptor.create_nsr( + 'cloud_account_1', nsd.name, nsd) + # Instantiating the nsr + rwnsr_pxy = mgmt_session.proxy(RwNsrYang) + rift.auto.descriptor.instantiate_nsr( + nsr, rwnsr_pxy, logger, project='vim_project') + # Trying to delete the project before taking the instance down + with pytest.raises( + Exception, + message="Project deletion should've failed"): + rift.auto.mano.delete_project(rw_conman_proxy, 'vim_project') + # Trying to delete the vim account before taking the instance down + with pytest.raises( + Exception, + message="Vim account deletion should've failed"): + rift.auto.mano.delete_cloud_account( + mgmt_session, 'cloud_account_1', 'vim_project') + # Terminating the nsr + rwvnfr_pxy = mgmt_session.proxy(RwVnfrYang) + rwvlr_pxy = mgmt_session.proxy(RwVlrYang) + rift.auto.descriptor.terminate_nsr( + rwvnfr_pxy, rwnsr_pxy, rwvlr_pxy, logger, project='vim_project') + # Delete all cloud accounts for the project + for idx in range(1, 4): + rift.auto.mano.delete_cloud_account( + mgmt_session, 'cloud_account_{}'.format(idx), 'vim_project') + # Delete the uploaded descriptors + vnfd_proxy = mgmt_session.proxy(RwProjectVnfdYang) + self.delete_records(nsd_pxy, vnfd_proxy, 'vim_project') + # Delete the project + rift.auto.mano.delete_project(rw_conman_proxy, 'vim_project') + # Check in rw-rbac-internal if project is removed + rwinternal_xpath = '/rw-rbac-internal/role' + response = ( + rw_rbac_int_proxy.get( + rwinternal_xpath, list_obj=True) + ).as_dict()['role'] + keys = [role['keys'] for role in response if 'keys' in role] + for key in keys: + assert 'vim_project' not in key, "Improper project deletion" + + @pytest.mark.skipif( + not pytest.config.getoption("--complex-scaling"), + reason="need --complex-scaling option to run") + def test_complex_scaling( + self, rw_conman_proxy, rw_user_proxy, rbac_user_passwd, + user_domain, rw_project_proxy, rw_rbac_int_proxy, logger, + rbac_platform_proxy, user_roles, platform_roles, mgmt_session, + cloud_module, cloud_account, rw_ro_account_proxy, + tbac, fmt_nsd_catalog_xpath, descriptors, complex_scaling_factor): + """Complex scaling - Default values. + + No. of projects - 25 (Two users & two cloud accounts per project) + No. of users - 50 (Two roles per user) + No. of cloud accounts - 50 + No. of RO accounts - 25 (50 if you are considering the default 'rift'). + """ + # This test can be controlled using complex_scaling_factor fixture + logger.debug('Creating projects') + for idx in range(1, complex_scaling_factor + 1): + rift.auto.mano.create_project( + rw_conman_proxy, 'scaling_project_{}'.format(idx) + ) + logger.debug('Create users, cloud accounts double the no. of projects') + for idx in range(1, (2 * complex_scaling_factor) + 1): + project_index = int((idx + 1) / 2) + rift.auto.mano.create_user( + rw_user_proxy, 'scaling_user_{}'.format(idx), + rbac_user_passwd, user_domain) + # Each user has a project role & platform role + pr_role = random.choice(user_roles) + pl_role = random.choice(platform_roles) + rift.auto.mano.assign_project_role_to_user( + rw_project_proxy, pr_role, 'scaling_user_{}'.format(idx), + 'scaling_project_{}'.format(project_index), user_domain, + rw_rbac_int_proxy) + rift.auto.mano.assign_platform_role_to_user( + rbac_platform_proxy, pl_role, + 'scaling_user_{}'.format(idx), user_domain, rw_rbac_int_proxy) + # Creating two cloud accounts for each project + rift.auto.mano.create_cloud_account( + mgmt_session, cloud_account, + 'scaling_project_{}'.format(project_index), + 'cloud_account_{}'.format(idx) + ) + logger.debug('Creating RO accounts') + for idx in range(1, complex_scaling_factor + 1): + rift.auto.mano.create_ro_account( + rw_ro_account_proxy, 'ro_account_{}'.format(idx), + 'scaling_project_{}'.format(idx) + ) + # Uploading descriptors + for descriptor in descriptors: + rift.auto.descriptor.onboard( + mgmt_session, descriptor, + project='scaling_project_{}'.format(idx) + ) + nsd_pxy = mgmt_session.proxy(RwProjectNsdYang) + nsd_catalog = nsd_pxy.get_config( + fmt_nsd_catalog_xpath.format( + project=quoted_key('scaling_project_{}'.format(idx)) + ) + ) + assert nsd_catalog + + @pytest.mark.skipif( + not pytest.config.getoption("--complex-scaling"), + reason="need --complex-scaling option to run") + def test_complex_scaling_verification( + self, complex_scaling_factor, rw_project_proxy, rw_ro_account_proxy, + mgmt_session, fmt_nsd_catalog_xpath, cloud_module, logger): + """Reboot verification script for test_complex_scaling.""" + for idx in range(1, complex_scaling_factor + 1): + # Verifying projects + logger.debug('Verification: projects, ro accounts started') + project_name = 'scaling_project_{}'.format(idx) + project_cm_config_xpath = '/project[name={project_name}]/project-state' + project_ = rw_project_proxy.get_config( + project_cm_config_xpath.format( + project_name=quoted_key(project_name) + ), + list_obj=True + ) + assert project_ + # Verifying RO Accounts + ro_account_name = 'ro_account_{}'.format(idx) + ro_obj = rw_ro_account_proxy.get_config( + '/project[name={}]/ro-account/account[name={}]'.format( + quoted_key(project_name), quoted_key(ro_account_name)) + ) + assert ro_obj.name == ro_account_name + assert ro_obj.ro_account_type == 'openmano' + logger.debug('Verification: descriptors, cloud accounts started') + # Verifying Descriptors + nsd_pxy = mgmt_session.proxy(RwProjectNsdYang) + nsd_catalog = nsd_pxy.get_config( + fmt_nsd_catalog_xpath.format( + project=quoted_key(project_name) + ) + ) + assert nsd_catalog + for idx in range(1, (2 * complex_scaling_factor) + 1): + # Verifying cloud accounts + project_index = int((idx + 1) / 2) + project_name = 'scaling_project_{}'.format(project_index) + cloud_acc_name = 'cloud_account_{}'.format(idx) + fmt_cloud_xpath = ( + '/project[name={project}]/cloud/account[name={account_name}]' + ) + cloud_pxy = mgmt_session.proxy(cloud_module) + response = cloud_pxy.get(fmt_cloud_xpath.format( + project=quoted_key(project_name), + account_name=quoted_key(cloud_acc_name)) + ) + assert response.name == cloud_acc_name + + + def test_change_visibility_same_session(self, session_class, rw_conman_proxy, confd_host, logger, + user_domain, project_keyed_xpath, rw_project_proxy, rw_rbac_int_proxy, rw_user_proxy): + """admin make changes which is seen by the operator already logged in for the same project. + + oper is logged in. admin assigns oper to a new project X. oper should be able to see the new project X being \ + in the same session without re-logging-in. + """ + user = 'oper2' if user_domain != 'default' else 'oper' + oper_user, oper_passwd = [user]*2 + + if user_domain != 'default': + rift.auto.mano.create_user(rw_user_proxy, oper_user, oper_passwd, user_domain) + rift.auto.mano.assign_project_role_to_user(rw_project_proxy, 'rw-project:project-oper', oper_user, + 'default', user_domain, rw_rbac_int_proxy) + oper_session = rift.auto.mano.get_session(session_class, confd_host, oper_user, oper_passwd) + oper_conman_pxy = oper_session.proxy(RwProjectYang) + + default_project_cm_config_xpath = project_keyed_xpath.format(project_name=quoted_key('default'))+'/project-state' + assert oper_conman_pxy.get_config(default_project_cm_config_xpath, list_obj=True) + + # admin assigns oper 'project-admin' role under a new project + new_project = 'project_test_change_visibility_same_session_1' + rift.auto.mano.create_project(rw_project_proxy, new_project) + rift.auto.mano.assign_project_role_to_user(rw_project_proxy, 'rw-project:project-admin', oper_user, new_project, + user_domain, rw_rbac_int_proxy) + + # Check oper user should be able to access the new project + new_project_cm_config_xpath = project_keyed_xpath.format(project_name=quoted_key(new_project))+'/project-state' + assert oper_conman_pxy.get_config(new_project_cm_config_xpath, list_obj=True) + + def test_super_admin( + self, rw_user_proxy, rbac_platform_proxy, rw_project_proxy, + session_class, confd_host, rbac_user_passwd, user_domain, + rw_rbac_int_proxy): + """Variou tests on the super-admin role.""" + # Creating two super admins and then deleting the first one. + rift.auto.mano.create_user( + rw_user_proxy, 'super_admin', rbac_user_passwd, user_domain) + rift.auto.mano.assign_platform_role_to_user( + rbac_platform_proxy, 'rw-rbac-platform:super-admin', + 'super_admin', user_domain, rw_rbac_int_proxy) + rift.auto.mano.create_user( + rw_user_proxy, 'super_admin_2', rbac_user_passwd, user_domain) + rift.auto.mano.assign_platform_role_to_user( + rbac_platform_proxy, 'rw-rbac-platform:super-admin', + 'super_admin_2', user_domain, rw_rbac_int_proxy) + + user_session = rift.auto.mano.get_session( + session_class, confd_host, 'super_admin_2', rbac_user_passwd) + pxy = user_session.proxy(RwRbacPlatformYang) + role_keyed_path = ( + "/rbac-platform-config/" + + "user[user-name={user}][user-domain={domain}]" + ) + pxy.delete_config(role_keyed_path.format( + user=quoted_key('super_admin'), domain=quoted_key(user_domain)) + ) + pxy = user_session.proxy(RwUserYang) + rift.auto.mano.delete_user(pxy, 'super_admin', user_domain) + rift.auto.mano.close_session(user_session) + + @pytest.mark.skipif(not pytest.config.getoption("--tbac"), reason="need --tbac option to run") + def test_token_expiry_timeout(self, mgmt_session, rw_user_proxy, rw_conman_proxy, rbac_user_passwd, user_domain, + confd_host, logger, rw_project_proxy, rw_rbac_int_proxy, session_class): + """Set 30 seconds as token-expiry-timeout; then verifies an user session is automatically expired after 30 secs""" + test_user, role = 'user-1', 'rw-project:project-oper' + test_proj = 'project_test_token_expiry_timeout' + token_expiry_timeout = 30 + + logger.debug('Creating user {} under project {} and assigning it {}'.format(test_user, test_proj, role)) + rift.auto.mano.create_project(rw_conman_proxy, test_proj) + rift.auto.mano.create_user(rw_user_proxy, test_user, rbac_user_passwd, user_domain) + rift.auto.mano.assign_project_role_to_user(rw_project_proxy, role, test_user, test_proj, user_domain, rw_rbac_int_proxy) + + # admin user setting token_expiry_timeout + openidc_provider_xpath = '/rw-openidc-provider:openidc-provider-config' + openidc_provider = RwOpenidcProviderYang.YangData_RwOpenidcProvider_OpenidcProviderConfig.from_dict( + {'token_expiry_timeout': 30}) + pxy = mgmt_session.proxy(RwOpenidcProviderYang) + logger.debug('Settig token_expiry_timeout to {} secs'.format(token_expiry_timeout)) + pxy.replace_config(openidc_provider_xpath, openidc_provider) + + # Verifying if token_expiry_timeout is set in openidc-provider-config + openidc_provider = pxy.get_config(openidc_provider_xpath) + assert openidc_provider + assert openidc_provider.token_expiry_timeout == token_expiry_timeout + + def project_access(user_session): + user_conman_pxy = user_session.proxy(RwProjectYang) + assert user_conman_pxy.get_config('/project[name={}]/project-state'.format(quoted_key(test_proj)), list_obj=True) + + # Log-in as test_user and validate operations under that user getting 'Unauthorized' after time-out + user_session = rift.auto.mano.get_session(session_class, confd_host, test_user, rbac_user_passwd) + project_access(user_session) + + logger.debug('Sleeping for {} secs'.format(token_expiry_timeout)) + time.sleep(token_expiry_timeout+5) + + with pytest.raises(Exception, message='logged-in user able to access default project even after token expired'): + logger.debug('User {} trying to access default project. It should fail') + project_access(user_session) + + # log-in as same user and perform the same operation. It should pass now. + user_session = rift.auto.mano.get_session(session_class, confd_host, test_user, rbac_user_passwd) + project_access(user_session)