update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / ra / pytest / ns / rbac / test_rbac.py
diff --git a/rwlaunchpad/ra/pytest/ns/rbac/test_rbac.py b/rwlaunchpad/ra/pytest/ns/rbac/test_rbac.py
new file mode 100644 (file)
index 0000000..30c3261
--- /dev/null
@@ -0,0 +1,258 @@
+#!/usr/bin/env python3
+"""
+# 
+#   Copyright 2017 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+"""
+
+import gi
+import pytest
+
+from rift.auto.session import NetconfSession, RestconfSession
+import rift.auto.mano
+
+gi.require_version('RwUserYang', '1.0')
+gi.require_version('RwProjectYang', '1.0')
+gi.require_version('RwRbacPlatformYang', '1.0')
+gi.require_version('RwRbacInternalYang', '1.0')
+from gi.repository import (
+    RwUserYang,
+    RwProjectYang,
+    RwRbacPlatformYang,
+    RwRbacInternalYang,
+)
+gi.require_version('RwKeyspec', '1.0')
+from gi.repository.RwKeyspec import quoted_key
+
+@pytest.fixture(scope='session')
+def rbac_test_data():
+    """Fixture which returns rbac test data: users, roles, projects being used in the test.
+    users: tuple of user names
+    projects: tuple of project names
+    map_platform_roles: mapping of a user to multiple platform roles
+    map_project_roles: mapping of a user to multiple projects (project, list of roles in that project)"""
+    users = ('admin3', 'user1', 'user2', )
+
+    projects = ('project1', 'project2', )
+
+    map_platform_roles = {
+                            'admin3': ['rw-rbac-platform:platform-admin'],
+                            }
+
+    map_project_roles = {
+                            'user1': [
+                                        ('project1', ['rw-project:project-admin']),
+                                        ('project2', ['rw-project:project-oper']),
+                                     ], 
+
+                            'user2': [
+                                        ('project1', ['rw-project:project-admin']),
+                                     ], 
+
+                            'admin3': [],
+                            }
+
+    return {'users': users, 'projects': projects, 'roles': (map_platform_roles, map_project_roles)}
+
+
+@pytest.mark.setup('rbac_setup')
+@pytest.mark.incremental
+class TestRbacSetup(object):
+    def test_create_users(self, rbac_test_data, rw_user_proxy, user_domain, rbac_user_passwd, logger):
+        """Creates all users as per rbac test-data  and verify if they are successfully created."""
+        users_test_data =  rbac_test_data['users']
+
+        # Create all users mentioned in users_test_data
+        for user in users_test_data:
+            rift.auto.mano.create_user(rw_user_proxy, user, rbac_user_passwd, user_domain)
+
+        # Verify users are created
+        user_config = rw_user_proxy.get_config('/user-config')
+        assert user_config
+
+        user_config_test_data = [user.user_name for user in user_config.user if user.user_name in users_test_data]
+        logger.debug('Users: {} have been successfully created'.format(user_config_test_data))
+
+        assert len(user_config_test_data) == len(users_test_data)
+
+    def test_create_projects(self, logger, rw_conman_proxy, rbac_test_data):
+        """Creates all projects as per rbac test-data and verify them."""
+        projects_test_data = rbac_test_data['projects']
+
+        # Create all projects mentioned in projects_test_data and verify if they are created
+        for project in projects_test_data:
+            logger.debug('Creating project {}'.format(project))
+            rift.auto.mano.create_project(rw_conman_proxy, project)
+
+    def test_assign_platform_roles_to_users(self, rbac_platform_proxy, logger, rbac_test_data, user_domain, rw_rbac_int_proxy):
+        """Assign platform roles to an user as per test data mapping and verify them."""
+        platform_roles_test_data, _ = rbac_test_data['roles']
+
+        # Loop through the user & platform-roles mapping and assign roles to the user
+        for user, roles in platform_roles_test_data.items():
+            for role in roles:
+                rift.auto.mano.assign_platform_role_to_user(rbac_platform_proxy, role, user, user_domain, rw_rbac_int_proxy)
+
+        # Verify if the roles are assigned as per test data mapping
+        platform_config = rbac_platform_proxy.get_config('/rbac-platform-config')
+
+        platform_config_test_data_match = 0
+        logger.debug('Matching platform_roles_test_data with rbac-platform-config')
+        for user in platform_config.user:
+            if user.user_name in platform_roles_test_data:
+                logger.debug('Matched user: {}'.format(user.as_dict()))
+                platform_config_test_data_match += 1
+
+                test_data_user_platform_roles = platform_roles_test_data[user.user_name]
+                assert len(test_data_user_platform_roles) == len(user.role)
+                assert len(test_data_user_platform_roles) == len([role for role in user.role if role.role in test_data_user_platform_roles])
+
+        assert platform_config_test_data_match == len(platform_roles_test_data)
+
+    def test_assign_users_to_projects_roles(self, rbac_test_data, rw_project_proxy, user_domain, rw_rbac_int_proxy):
+        """Assign projects and roles to an user as per test data mapping."""
+        _, project_roles_test_data = rbac_test_data['roles']
+
+        # Loop through the user & (project, role) mapping and asign the project, role to the user
+        for user, project_role_tuple in project_roles_test_data.items():
+            for project, role_list in project_role_tuple:
+                for role in role_list:
+                    rift.auto.mano.assign_project_role_to_user(rw_project_proxy, role, user, project, user_domain, rw_rbac_int_proxy)
+
+
+@pytest.mark.depends('rbac_setup')
+@pytest.mark.incremental
+class TestRbacVerification(object):
+    def test_match_rbac_internal(self, mgmt_session, logger, rbac_test_data):
+        """Verifies the test data with rw-rbac-internal"""
+        rbac_intl_proxy = mgmt_session.proxy(RwRbacInternalYang)
+        rbac_intl = rbac_intl_proxy.get('/rw-rbac-internal')
+
+        # Verify users in show rw-rbac-internal
+        users_test_data =  rbac_test_data['users']
+        assert len(rbac_intl.user) == len(users_test_data) + 2   # 'admin', 'oper' are two default users
+        users_match = 0
+        for user in rbac_intl.user:
+            if user.user_name in users_test_data:
+                logger.info('User matched: {}'.format(user.as_dict()))
+                users_match += 1
+        assert users_match == len(users_test_data)
+
+        # Verify roles (only project roles mapping, not the platform roles mapping)
+        # Each role in rw-rbac-internal is associated with a project through the field 'keys'. All mapping from users to project 
+        # is part of project roles mapping.
+        _, project_roles_test_data = rbac_test_data['roles']
+        for user, project_role_tuple in project_roles_test_data.items():
+            for project, role_list in project_role_tuple:
+                for role in role_list:
+                    logger.debug("Matching user: '{}' and its role '{}' in project '{}'".format(user, role, project))
+                    
+                    # Verify there exists a role entry in rw-rbac-internal which matches 'role', 'project'
+                    rbac_intl_role = [role_ for role_ in rbac_intl.role if (role_.role==role and role_.keys==project)]
+
+                    # Each role is identified through its key 'project'. So there can be only one such role which matches 
+                    # the above 'role.role==role and role.keys=project'
+                    assert len(rbac_intl_role) == 1
+                    logger.info('Matched role in rw-rbac-internal: {}'.format(rbac_intl_role[0].as_dict()))
+
+                    # Verify the user list in this rw-rbac-internal role carries 'user'
+                    assert len([user_ for user_ in rbac_intl_role[0].user if user_.user_name==user]) == 1
+
+    def test_role_access(self, logger, session_class, confd_host, rbac_test_data, rbac_user_passwd, project_keyed_xpath):
+        """Verifies the roles assigned to users for a project. Login as each user and verify the user can only access 
+        the projects linked to it."""
+        _, project_roles_test_data = rbac_test_data['roles']
+        projects_test_data = rbac_test_data['projects']
+
+        for user, project_role_tuple in project_roles_test_data.items():
+            logger.debug('Verifying user: {}'.format(user))
+            projects_not_accessible = list(projects_test_data)
+
+            # Establish a session with this current user
+            user_session = rift.auto.mano.get_session(session_class, confd_host, user, rbac_user_passwd)
+            print ("Connected using username {} password {}".format(user, rbac_user_passwd))
+
+            rw_project_proxy_ = user_session.proxy(RwProjectYang)
+            
+            if project_role_tuple:  # Skip the for loop for users who are not associated with any project e.g admin3
+                for project, role_list in project_role_tuple:
+                    projects_not_accessible.remove(project)
+                    project_config = rw_project_proxy_.get_config(project_keyed_xpath.format(project_name=quoted_key(project))+'/project-config')
+                    user_ = [user_ for user_ in project_config.user if user_.user_name==user]
+                    logger.debug('User: {}'.format(user_[0].as_dict()))
+                    assert len(user_) == 1
+
+                    # Match the roles for this user
+                    assert set(role_list) == set([role_.role for role_ in user_[0].role])
+
+            # It can't access any other project.
+            for project in projects_not_accessible:
+                assert rw_project_proxy_.get_config(project_keyed_xpath.format(project_name=quoted_key(project))+'/project-config') is None # It should 
+                # return None as the project is not mapped to this user.
+
+    def test_admin_user(self, logger, rw_project_proxy, project_keyed_xpath, rbac_test_data):
+        """Verify admin can see all projects as part of test-data as well as the default project"""
+        projects_test_data = rbac_test_data['projects']
+        projects_test_data = projects_test_data + ('default', )
+
+        # Verify admin user can see all projects including default
+        # If it is post-reboot verification, then check default project should not be listed
+        for project in projects_test_data:
+            project_ = rw_project_proxy.get_config(project_keyed_xpath.format(project_name=quoted_key(project))+'/project-state', list_obj=True)
+            if project=='default' and pytest.config.getoption('--default-project-deleted'):
+                assert project_ is None
+                continue
+            assert project_     # If the project doesn't exist, it returns None
+
+
+@pytest.mark.depends('rbac_setup')
+@pytest.mark.teardown('rbac_setup')
+@pytest.mark.incremental
+class TestRbacTeardown(object):
+    def test_delete_default_project(self, logger, rw_conman_proxy):
+        """Only deletes the default project"""
+        logger.debug('Deleting the default project')
+        rift.auto.mano.delete_project(rw_conman_proxy, 'default')
+    
+    def test_delete_projects(self, logger, rbac_test_data, rw_conman_proxy):
+        """Deletes the projects which are part of rbac test-data and verify their deletion"""
+        projects_test_data = rbac_test_data['projects']
+
+        # Delete the projects
+        for project in projects_test_data:
+            logger.debug('Deleting project {}'.format(project))
+            rift.auto.mano.delete_project(rw_conman_proxy, project)
+
+    def test_delete_users(self, logger, rw_user_proxy, rbac_platform_proxy, platform_config_keyed_xpath, 
+                                    user_keyed_xpath, rbac_test_data, user_domain):
+        """Deletes the users which are part of rbac test-data and verify their deletion"""
+        users_test_data = rbac_test_data['users']
+        map_platform_roles, _ = rbac_test_data['roles']
+
+        # Deletes the users
+        # If an user is associated with a platform role, at first it needs be removed from rbac-platform-config
+        # before deleting it from user-config
+        for user in users_test_data:
+            if user in map_platform_roles:
+                rbac_platform_proxy.delete_config(platform_config_keyed_xpath.format(user=quoted_key(user), domain=quoted_key(user_domain)))
+            rw_user_proxy.delete_config(user_keyed_xpath.format(user=quoted_key(user), domain=quoted_key(user_domain)))
+
+        # Verify if the users are deleted
+        user_config = rw_user_proxy.get_config('/user-config')
+        default_users = [user.user_name for user in user_config.user]
+
+        logger.debug('Default users list: {}'.format(default_users))
+        expected_empty_user_list = [user for user in users_test_data if user in default_users]
+        assert not expected_empty_user_list