4 # Copyright 2017 RIFT.IO Inc
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
26 import rift
.auto
.descriptor
27 gi
.require_version('RwUserYang', '1.0')
28 gi
.require_version('RwProjectYang', '1.0')
29 gi
.require_version('RwConmanYang', '1.0')
30 gi
.require_version('RwProjectNsdYang', '1.0')
31 gi
.require_version('RwNsrYang', '1.0')
32 gi
.require_version('RwVnfrYang', '1.0')
33 gi
.require_version('RwlogMgmtYang', '1.0')
34 from gi
.repository
import (
47 gi
.require_version('RwKeyspec', '1.0')
48 from gi
.repository
.RwKeyspec
import quoted_key
50 SESSION_CONNECT_TIMEOUT
=5
52 @pytest.fixture(scope
='session')
53 def user_test_roles():
54 """Returns tuples of roles which enable an user to delete/create a new user"""
55 write_roles
= ('rw-rbac-platform:super-admin', 'rw-rbac-platform:platform-admin')
57 return write_roles
, read_roles
60 @pytest.fixture(scope
='session')
61 def project_test_roles():
62 """Returns tuples of roles which enable an user to create, read, delete a project"""
63 write_roles
= ('rw-rbac-platform:super-admin', )
64 read_roles
= ('rw-project:project-oper', 'rw-project:project-admin')
65 return write_roles
, read_roles
68 @pytest.fixture(scope
='session')
69 def onboarding_test_roles():
70 """Fixture that returns a tuple of roles which enable an user to onboard/modify/delete a VNF/NS package"""
71 write_roles
= ('rw-rbac-platform:super-admin', 'rw-project-mano:catalog-admin', 'rw-project:project-admin')
72 read_roles
= ('rw-project-mano:catalog-oper', 'rw-project-mano:lcm-admin')
73 return write_roles
, read_roles
76 @pytest.fixture(scope
='session')
77 def account_test_roles():
78 """Fixture that returns a tuple of roles which enable an user to CRUD a VIM, Sdn account"""
79 write_roles
= ('rw-rbac-platform:super-admin', 'rw-project-mano:account-admin', 'rw-project:project-admin')
80 read_roles
= ('rw-project-mano:account-oper', 'rw-project-mano:lcm-admin')
81 return write_roles
, read_roles
84 @pytest.fixture(scope
='session')
85 def ns_instantiate_test_roles():
86 """Fixture that returns a tuple of roles which enable an user to instantiate/terminate a NS
87 Read roles: who all can access vnfr-catalog, vnfr-console, ns-instance-opdata etc"""
88 write_roles
= ('rw-rbac-platform:super-admin', 'rw-project-mano:lcm-admin', 'rw-project:project-admin')
89 read_roles
= ('rw-project-mano:lcm-oper', )
90 return write_roles
, read_roles
93 @pytest.fixture(scope
='session')
94 def syslog_server_test_roles():
95 """Fixture that returns a tuple of roles which enable an user set the syslog server_address"""
96 write_roles
= ('rw-rbac-platform:super-admin', 'rw-rbac-platform:platform-admin', 'rw-rbac-platform:platform-oper')
98 return write_roles
, read_roles
101 @pytest.fixture(scope
='session')
102 def redundancy_config_test_roles():
103 """Fixture that returns a tuple of roles which enable an user set the syslog server_address"""
104 write_roles
= ('rw-rbac-platform:super-admin', 'rw-rbac-platform:platform-admin')
105 read_roles
= ('rw-rbac-platform:platform-oper', )
106 return write_roles
, read_roles
109 @pytest.fixture(scope
='session')
110 def project_acessible():
111 """Fixture that returns name of the project to which all new users will be associated"""
112 return random
.choice(['project1', 'default'])
115 # @pytest.fixture(scope='session')
116 # def project_not_accessible():
117 # """Retruns name of the project whose users are not supposed to access the resources under project 'project_acessible'"""
121 @pytest.fixture(scope
='session')
122 def users_test_data(rw_user_proxy
, rbac_platform_proxy
, rw_project_proxy
, all_roles
, user_test_roles
, project_test_roles
,
123 onboarding_test_roles
, account_test_roles
, ns_instantiate_test_roles
, user_domain
, project_acessible
, rw_conman_proxy
,
124 syslog_server_test_roles
, all_roles_combinations
, rw_rbac_int_proxy
, tbac
, redundancy_config_test_roles
):
125 """Creates new users required for a test and assign appropriate roles to them"""
126 if pytest
.config
.getoption("--user-creation-test"):
127 test_roles
= user_test_roles
128 elif pytest
.config
.getoption("--project-creation-test"):
129 test_roles
= project_test_roles
130 elif pytest
.config
.getoption("--onboarding-test"):
131 test_roles
= onboarding_test_roles
132 elif pytest
.config
.getoption("--account-test"):
133 test_roles
= account_test_roles
134 elif pytest
.config
.getoption("--nsr-test"):
135 test_roles
= ns_instantiate_test_roles
136 elif pytest
.config
.getoption("--syslog-server-test"):
137 test_roles
= syslog_server_test_roles
138 elif pytest
.config
.getoption("--redundancy-role-test"):
139 test_roles
= redundancy_config_test_roles
141 # Create a project to which these users will be part of
142 if project_acessible
!= 'default':
143 rift
.auto
.mano
.create_project(rw_conman_proxy
, project_acessible
)
145 def create_user_assign_role(user_name
, password
, role_set
):
146 rift
.auto
.mano
.create_user(rw_user_proxy
, user_name
, password
, user_domain
)
147 project_roles_list
, platform_roles_list
= [], []
148 for role
in role_set
:
149 if 'platform' in role
:
150 platform_roles_list
.append(role
)
152 project_roles_list
.append(role
)
153 if platform_roles_list
:
154 rift
.auto
.mano
.assign_platform_role_to_user(rbac_platform_proxy
, platform_roles_list
, user_name
, user_domain
, rw_rbac_int_proxy
)
155 if project_roles_list
:
156 rift
.auto
.mano
.assign_project_role_to_user(rw_project_proxy
, project_roles_list
, user_name
,
157 project_acessible
, user_domain
, rw_rbac_int_proxy
)
159 write_roles
, read_roles
= test_roles
160 fail_roles
= [role
for role
in all_roles
if role
not in write_roles
]
162 if False: #If its desired to run checks for all combinations, tbd on what option this will be enabled
163 write_roles_tmp
, read_roles_tmp
, fail_roles_tmp
= [], [], []
164 for role_combination
in all_roles_combinations
:
165 if bool(set(role_combination
).intersection(write_roles
)):
166 write_roles_tmp
.append(role_combination
)
168 if bool(set(role_combination
).intersection(read_roles
)):
169 read_roles_tmp
.append(role_combination
)
171 if bool(set(role_combination
).isdisjoint(write_roles
)):
172 fail_roles_tmp
.append(role_combination
)
173 write_roles
, read_roles
, fail_roles
= write_roles_tmp
, read_roles_tmp
, fail_roles_tmp
175 # Create the users with roles mapped
176 write_users
, read_users
, fail_users
= dict(), dict(), dict()
177 for idx
, role_set
in enumerate(write_roles
, 1):
178 if type(role_set
) is str:
179 role_set
= [role_set
]
180 user_name
= 'write-{}'.format(idx
)
184 password
= rift
.auto
.mano
.generate_password()
185 create_user_assign_role(user_name
, password
, role_set
)
186 write_users
[user_name
] = (role_set
, password
)
188 for idx
, role_set
in enumerate(read_roles
, 1):
189 if type(role_set
) is str:
190 role_set
= [role_set
]
191 user_name
= 'read-{}'.format(idx
)
195 password
= rift
.auto
.mano
.generate_password()
196 create_user_assign_role(user_name
, password
, role_set
)
197 read_users
[user_name
] = (role_set
, password
)
199 for idx
, role_set
in enumerate(fail_roles
, 1):
200 if type(role_set
) is str:
201 role_set
= [role_set
]
202 user_name
= 'fail-{}'.format(idx
)
206 password
= rift
.auto
.mano
.generate_password()
207 create_user_assign_role(user_name
, password
, role_set
)
208 fail_users
[user_name
] = (role_set
, password
)
209 return write_users
, read_users
, fail_users
212 @pytest.mark
.setup('test_rbac_roles_setup')
213 @pytest.mark
.incremental
214 class TestRbacVerification(object):
215 @pytest.mark
.skipif(not pytest
.config
.getoption("--project-creation-test"), reason
="need --project-creation-test option to run")
216 def test_project_create_delete_authorization(self
, logger
, users_test_data
, session_class
, confd_host
, rw_conman_proxy
,
217 project_keyed_xpath
, project_acessible
):
218 """Verifies only users with certain roles can create/delete a project"""
220 write_users
, read_users
, fail_users
= users_test_data
222 # Check users in write_users dict able to create/delete a project
223 logger
.debug('Verifying users which are authorised to create/delete a project')
224 for user
in write_users
:
225 logger
.debug('Verifying user:(role,password) {}:{}'.format(user
, write_users
[user
]))
226 user_session
= rift
.auto
.mano
.get_session(session_class
, confd_host
, user
, write_users
[user
][1])
227 pxy
= user_session
.proxy(RwProjectYang
)
229 project_name
= 'project-{}'.format(user
)
230 logger
.debug('Trying to create project {}'.format(project_name
))
231 rift
.auto
.mano
.create_project(pxy
, project_name
)
233 logger
.debug('Trying to delete project {}'.format(project_name
))
234 rift
.auto
.mano
.delete_project(pxy
, project_name
)
236 rift
.auto
.mano
.close_session(user_session
)
238 # Check users in read_users dict able to read a project
239 logger
.debug('Verifying users which are authorised to read a project')
240 for user
in read_users
:
241 logger
.debug('Verifying user:(role,password) {}:{}'.format(user
, read_users
[user
]))
242 user_session
= rift
.auto
.mano
.get_session(session_class
, confd_host
, user
, read_users
[user
][1])
243 pxy
= user_session
.proxy(RwProjectYang
)
245 logger
.debug('User {} trying to read project {}'.format(user
, project_acessible
))
246 project_
= pxy
.get_config(project_keyed_xpath
.format(project_name
=quoted_key(project_acessible
))+'/project-state', list_obj
=True)
249 rift
.auto
.mano
.close_session(user_session
)
251 # Check users in fail_users dict shouldn't be allowed to create a project or delete a project
253 # 'project-admin' user not able to create a project, but can delete a project, hence do the create/delete
254 # operation for this user at the end
255 fail_users_reordered
= collections
.OrderedDict()
256 for user
, role_passwd_tuple
in fail_users
.items():
257 if any('project-admin' in role
for role
in role_passwd_tuple
[0]):
258 project_admin_key
, project_admin_val
= user
, role_passwd_tuple
260 fail_users_reordered
[user
] = role_passwd_tuple
261 fail_users_reordered
[project_admin_key
] = project_admin_val
263 logger
.debug('Verifying users which are not supposed to create/delete a project')
264 for user
in fail_users_reordered
:
265 logger
.debug('Verifying user:(role,password) {}:{}'.format(user
, fail_users_reordered
[user
]))
266 user_session
= rift
.auto
.mano
.get_session(session_class
, confd_host
, user
, fail_users_reordered
[user
][1])
267 pxy
= user_session
.proxy(RwProjectYang
)
269 project_name
= 'project-{}'.format(user
)
271 with pytest
.raises(Exception, message
='User {} not authorised to create project {}'.format(
272 user
, project_name
)) as excinfo
:
273 logger
.debug('User {} trying to create project {}'.format(user
, project_name
))
274 rift
.auto
.mano
.create_project(pxy
, project_name
)
276 logger
.debug('User {} trying to delete project {}'.format(user
, project_acessible
))
277 if any('project-admin' in role
for role
in fail_users_reordered
[user
][0]):
278 rift
.auto
.mano
.delete_project(pxy
, project_acessible
)
280 with pytest
.raises(Exception, message
='User {} not authorised to delete project {}'.format(
281 user
, project_acessible
)) as excinfo
:
282 rift
.auto
.mano
.delete_project(pxy
, project_acessible
)
284 rift
.auto
.mano
.close_session(user_session
)
286 def delete_user_from_project(
287 self
, project_proxy
, target_user
, target_project
, user_domain
):
289 "/project[name={project}]/project-config/user" +
290 "[user-name={user}][user-domain={domain}]"
292 # Check if the user exists for the project
293 ret_val
= project_proxy
.get_config(
294 project_xpath
.format(
295 project
=quoted_key(target_project
),
296 user
=quoted_key(target_user
),
297 domain
=quoted_key(user_domain
)))
300 # Delete the target_user from the target_project
301 project_proxy
.delete_config(
302 project_xpath
.format(
303 project
=quoted_key(target_project
),
304 user
=quoted_key(target_user
),
305 domain
=quoted_key(user_domain
))
307 # Verify that he is deleted
308 ret_val
= project_proxy
.get_config(
309 project_xpath
.format(
310 project
=quoted_key(target_project
),
311 user
=quoted_key(target_user
),
312 domain
=quoted_key(user_domain
))
314 assert ret_val
is None
317 not pytest
.config
.getoption("--project-creation-test"),
318 reason
="need --project-creation-test option to run")
319 def test_project_admin_users_role_authorization(
320 self
, logger
, user_roles
, rw_user_proxy
, session_class
,
321 user_domain
, confd_host
, rw_conman_proxy
, project_keyed_xpath
,
322 rw_project_proxy
, rw_rbac_int_proxy
, tbac
):
323 """Verify project admin & oper role operations on a single project."""
325 "Create a project & 8 users each with its own project/mano role")
326 rift
.auto
.mano
.create_project(rw_conman_proxy
, 'project-vzw')
327 project_user_data
= {}
328 for idx
, role
in enumerate(user_roles
, 1):
329 user_name
= 'project_vzw_user-{}'.format(idx
)
331 password
= rift
.auto
.mano
.generate_password()
334 rift
.auto
.mano
.create_user(
335 rw_user_proxy
, user_name
, password
, user_domain
)
336 rift
.auto
.mano
.assign_project_role_to_user(
337 rw_project_proxy
, role
, user_name
, 'project-vzw',
338 user_domain
, rw_rbac_int_proxy
)
339 project_user_data
[user_name
] = {"role": role
, "password": password
}
340 if "project-admin" in role
:
341 project_admin_user
= user_name
343 logger
.debug("Project admin deleting roles from users.")
344 project_admin_session
= rift
.auto
.mano
.get_session(
345 session_class
, confd_host
, project_admin_user
,
346 project_user_data
[project_admin_user
]["password"])
347 project_admin_proxy
= project_admin_session
.proxy(RwProjectYang
)
348 for user
in project_user_data
:
349 role
= project_user_data
[user
]["role"]
350 if project_admin_user
== user
:
352 rift
.auto
.mano
.revoke_project_role_from_user(
353 project_admin_proxy
, role
, user
, 'project-vzw', user_domain
)
354 rift
.auto
.mano
.close_session(project_admin_session
)
356 logger
.debug("Verify project admin can assign another role to users")
357 project_admin_session
= rift
.auto
.mano
.get_session(
358 session_class
, confd_host
, project_admin_user
,
359 project_user_data
[project_admin_user
]["password"])
360 project_admin_proxy
= project_admin_session
.proxy(RwProjectYang
)
361 for user
in project_user_data
:
362 role
= 'rw-project:project-oper'
363 if project_admin_user
== user
:
365 rift
.auto
.mano
.assign_project_role_to_user(
366 project_admin_proxy
, role
, user
, 'project-vzw',
367 user_domain
, rw_rbac_int_proxy
)
368 rift
.auto
.mano
.close_session(project_admin_session
)
370 # Verify the user able to read project
371 for user
in project_user_data
:
372 user_session
= rift
.auto
.mano
.get_session(
373 session_class
, confd_host
, user
,
374 project_user_data
[user
]["password"])
375 user_project_pxy
= user_session
.proxy(RwProjectYang
)
376 logger
.debug("verifying user able to read project")
377 xpath
= "/project[name={project}]/project-config"
378 ret_val
= user_project_pxy
.get_config(
379 xpath
.format(project
=quoted_key('project-vzw')))
381 rift
.auto
.mano
.close_session(user_session
)
383 logger
.debug("Verify if project admin can replace roles for users")
384 project_admin_session
= rift
.auto
.mano
.get_session(
385 session_class
, confd_host
, project_admin_user
,
386 project_user_data
[project_admin_user
]["password"])
387 project_admin_proxy
= project_admin_session
.proxy(RwProjectYang
)
388 for user
in project_user_data
:
389 if project_admin_user
!= user
:
391 "/project[name={project}]/project-config/user" +
392 "[user-name={user}][user-domain={domain}]")
395 YangData_RwProject_Project_ProjectConfig_User_Role
.
397 'role': 'rw-project-mano:account-admin'})
399 project_admin_proxy
.replace_config(
401 project
=quoted_key('project-vzw'),
402 user
=quoted_key(user
),
403 domain
=quoted_key(user_domain
)), new_role
)
404 ret_val
= project_admin_proxy
.get_config(
406 project
=quoted_key('project-vzw'),
407 user
=quoted_key(user
),
408 domain
=quoted_key(user_domain
),
409 role
=quoted_key('rw-project-mano:lcm-oper')))
411 rift
.auto
.mano
.close_session(project_admin_session
)
413 logger
.debug("Verify if users able to change its own user details")
414 for user
in project_user_data
:
417 password
= project_user_data
[user
]["password"]
418 user_session
= rift
.auto
.mano
.get_session(
419 session_class
, confd_host
, user
, password
)
420 user_proxy
= user_session
.proxy(RwUserYang
)
421 rift
.auto
.mano
.update_password(
422 user_proxy
, user
, user
, user_domain
, rw_rbac_int_proxy
)
423 project_user_data
[user
]["new_password"] = user
424 rift
.auto
.mano
.close_session(user_session
)
427 "{} trying to connect ".format(user
) +
428 "with its old password {}".format(password
)
431 message
= ('{} not supposed to '.format(user
) +
432 'log-in with old passwd {}'.format(password
))
433 with pytest
.raises(Exception, message
=message
):
434 rift
.auto
.mano
.get_session(
435 session_class
, confd_host
, user
,
436 password
, timeout
=SESSION_CONNECT_TIMEOUT
)
438 # Verify the user should be able to log-in with new password
440 "User {} trying to log-in with its updated password {}".format(
441 user
, project_user_data
[user
]["new_password"]))
443 usession_updated_passwd
= rift
.auto
.mano
.get_session(
444 session_class
, confd_host
, user
,
445 project_user_data
[user
]["new_password"])
447 # project admin able to delete users from the project database
449 password
= project_user_data
[project_admin_user
]["password"]
451 password
= project_user_data
[project_admin_user
]["new_password"]
452 project_admin_session
= rift
.auto
.mano
.get_session(
453 session_class
, confd_host
, project_admin_user
, password
)
454 project_admin_proxy
= project_admin_session
.proxy(RwProjectYang
)
456 for user
in project_user_data
:
457 if user
== project_admin_user
:
459 logger
.debug('deleting user {} from project project-vzw'.format(user
))
460 self
.delete_user_from_project(
461 project_admin_proxy
, user
, 'project-vzw', user_domain
)
462 rift
.auto
.mano
.close_session(project_admin_session
)
465 not pytest
.config
.getoption("--project-creation-test"),
466 reason
="need --project-creation-test option to run")
467 def test_multi_project_multi_users_role_authorization(
468 self
, logger
, user_roles
, rw_user_proxy
, session_class
,
469 user_domain
, confd_host
, rw_conman_proxy
, project_keyed_xpath
,
470 rw_project_proxy
, rw_rbac_int_proxy
, tbac
, rbac_user_passwd
):
471 """Verify that users with roles doesn't have unauthorized access."""
473 Case 01. rbac_user2 has different roles in project1 and project2.
474 Case 02. rbac_user4 has project-admin in project3 and project4.
475 Case 03. rbac_user9 has project-oper in project5 and project6.
478 # The sample user data
479 role1
= 'rw-project:project-admin'
480 role2
= 'rw-project:project-oper'
481 project_user_data
= {
509 for idx
in range(1, 7):
510 rift
.auto
.mano
.create_project(
511 rw_conman_proxy
, 'project{}'.format(idx
))
513 for idx
in range(1, 10):
514 rift
.auto
.mano
.create_user(
515 rw_user_proxy
, 'rbac_user{}'.format(idx
),
516 rbac_user_passwd
, user_domain
)
517 # Assign roles to users according to the project_user_data
518 for idx
in range(1, 7):
519 project
= 'project{}'.format(idx
)
520 for user_name
, role
in project_user_data
[project
].items():
521 rift
.auto
.mano
.assign_project_role_to_user(
522 rw_project_proxy
, role
, user_name
, project
,
523 user_domain
, rw_rbac_int_proxy
)
526 user_name
, target_project
, session_class
,
528 """Verify if user has access to target project."""
529 password
= rbac_user_passwd
532 user_session
= rift
.auto
.mano
.get_session(
533 session_class
, confd_host
, user_name
, password
)
534 logger
.debug("{} trying to access {}".format(
535 user_name
, target_project
) +
538 pxy
= user_session
.proxy(RwProjectYang
)
539 # Verify is user has access to /project
540 project_xpath
= '/project[name={}]/project-state'.format(
541 quoted_key(target_project
)
543 response
= pxy
.get_config(project_xpath
, list_obj
=True)
545 # Verify is user has access to /project/project-config/user
546 project_user_xpath
= (
547 "/project[name={project}]/project-config/" +
548 "user[user-name={user}][user-domain={domain}]"
550 target_user
= list(project_user_data
[target_project
].keys())[0]
551 pxy
= user_session
.proxy(RwProjectYang
)
552 response
= pxy
.get_config(
553 project_user_xpath
.format(
554 project
=quoted_key(target_project
),
555 user
=quoted_key(target_user
),
556 domain
=quoted_key(user_domain
)
560 rift
.auto
.mano
.close_session(user_session
)
562 # Case 01. rbac_user2 has different roles in project1 and project2.
564 logger
.debug('Veryfy rbac_user1 of project1 has no access to project2')
567 message
="rbac_user1 accessed project2 which its not part of."):
569 'rbac_user1', 'project2', session_class
, confd_host
, logger
)
571 logger
.debug('Verify rbac_user2 has access to project1 and project2')
573 'rbac_user2', 'project1', session_class
, confd_host
, logger
)
575 'rbac_user2', 'project2', session_class
, confd_host
, logger
)
577 # Case 02. rbac_user4 has project-admin in project3 and project4.
579 logger
.debug('Verify rbac_user4 has access to project 3 & project4')
581 'rbac_user4', 'project4', session_class
, confd_host
, logger
)
583 'rbac_user4', 'project3', session_class
, confd_host
, logger
)
585 logger
.debug('Two users in project3 exchanges roles & check access')
586 rift
.auto
.mano
.revoke_project_role_from_user(
587 rw_project_proxy
, role1
, 'rbac_user4',
588 'project3', user_domain
)
589 rift
.auto
.mano
.revoke_project_role_from_user(
590 rw_project_proxy
, role2
, 'rbac_user5',
591 'project3', user_domain
)
592 rift
.auto
.mano
.assign_project_role_to_user(
593 rw_project_proxy
, role2
, 'rbac_user4',
594 'project3', user_domain
, rw_rbac_int_proxy
)
595 rift
.auto
.mano
.assign_project_role_to_user(
596 rw_project_proxy
, role1
, 'rbac_user5',
597 'project3', user_domain
, rw_rbac_int_proxy
)
599 logger
.debug('rbac_user5 trying its access on project3 and project4')
601 'rbac_user5', 'project3', session_class
,
606 message
="rbac_user5 accessed project4 which its not part of."):
608 'rbac_user5', 'project4', session_class
,
612 # 'rbac_user5'(admin role) revoking the role from rbac-user4
613 password
= rbac_user_passwd
615 password
= 'rbac_user5'
616 rbac_user2_session
= rift
.auto
.mano
.get_session(
617 session_class
, confd_host
, 'rbac_user5', password
)
618 rbac_user2_prjt_pxy
= rbac_user2_session
.proxy(RwProjectYang
)
619 self
.delete_user_from_project(
620 rbac_user2_prjt_pxy
, 'rbac_user4', 'project3', user_domain
)
622 # Create new user 'del-user'
623 rift
.auto
.mano
.create_user(
624 rw_user_proxy
, 'del-user', rbac_user_passwd
, user_domain
)
625 rift
.auto
.mano
.assign_project_role_to_user(
626 rw_project_proxy
, role2
, 'del-user', 'project3',
627 user_domain
, rw_rbac_int_proxy
)
628 # Delete 'del-user' with 'rbac_user5'(admin role)
629 self
.delete_user_from_project(
630 rbac_user2_prjt_pxy
, 'del-user', 'project3', user_domain
)
633 'rbac_user4 try to access project3 which its not a part of anymore'
637 message
="rbac_user4 accessed project3 which its not part of."):
639 'rbac_user4', 'project3', session_class
,
642 logger
.debug('rbac_user4 try to access project4 which its a part of.')
644 'rbac_user4', 'project4', session_class
,
647 # Case 03. rbac_user9 has project-oper in project5 and project6.
649 logger
.debug('rbac_user9 try to access project5 & project6')
651 'rbac_user9', 'project5', session_class
,
654 'rbac_user9', 'project6', session_class
,
658 'rbac_user8 try to access to project5 which its not part of.'
662 message
="rbac_user8 accessed project5 which its not part of."):
664 'rbac_user8', 'project5', session_class
,
668 'rbac_user7 try to access to project6 which its not part of.'
672 message
="rbac_user7 accessed project6 which its not part of."):
674 'rbac_user7', 'project6', session_class
,
678 @pytest.mark
.skipif(not pytest
.config
.getoption("--user-creation-test"), reason
="need --user-creation-test option to run")
679 def test_user_create_delete_authorization(self
, logger
, users_test_data
, session_class
, confd_host
, rw_user_proxy
,
680 rbac_user_passwd
, user_domain
, tbac
, rw_rbac_int_proxy
):
681 """Verifies only users with certain roles can create/delete users and set the password of an user"""
682 write_users
, read_users
, fail_users
= users_test_data
684 # Create a dummy user with admin/admin
685 dummy_user_name
= 'dummy-user'
686 rift
.auto
.mano
.create_user(rw_user_proxy
, dummy_user_name
, rbac_user_passwd
, user_domain
)
688 # Check users in write_users dict able to create/delete an user and able to set password for others
689 logger
.debug('Verifying users which are authorised to create/delete an user')
690 for user
in write_users
:
691 logger
.debug('Verifying user:(role,password) {}:{}'.format(user
, write_users
[user
]))
692 user_session
= rift
.auto
.mano
.get_session(session_class
, confd_host
, user
, write_users
[user
][1])
693 pxy
= user_session
.proxy(RwUserYang
)
695 user_name
= 'new-user-{}'.format(user
)
696 logger
.debug('Trying to create user {}'.format(user_name
))
697 rift
.auto
.mano
.create_user(pxy
, user_name
, rbac_user_passwd
, user_domain
)
699 logger
.debug('Trying to delete user {}'.format(user_name
))
700 rift
.auto
.mano
.delete_user(pxy
, user_name
, user_domain
)
702 if not tbac
: # password update is not allowed for external users in tbac
703 new_passwd
= rift
.auto
.mano
.generate_password()
704 # Check users in write_users dict able to set password for other user (dummy-user)
705 logger
.debug('User {} trying to update password for user {}'.format(user
, dummy_user_name
))
706 rift
.auto
.mano
.update_password(pxy
, dummy_user_name
, new_passwd
, user_domain
, rw_rbac_int_proxy
)
708 # Verify dummy_user_name able to log-in with its new password
709 logger
.debug('User {} trying to log-in with its updated password {}'.format(dummy_user_name
, new_passwd
))
710 dummy_user_session_updated_passwd
= rift
.auto
.mano
.get_session(session_class
, confd_host
, dummy_user_name
,
713 # Verify the user not able to log-in with old password
714 with pytest
.raises(Exception, message
='User {} not supposed to log-in with its old password {}'.format(
715 dummy_user_name
, rbac_user_passwd
)) as excinfo
:
716 logger
.debug('User {} trying to connect with its old password {}'.format(user
, rbac_user_passwd
))
717 rift
.auto
.mano
.get_session(session_class
, confd_host
, dummy_user_name
, rbac_user_passwd
,
718 timeout
=SESSION_CONNECT_TIMEOUT
)
720 rift
.auto
.mano
.close_session(dummy_user_session_updated_passwd
)
721 rift
.auto
.mano
.close_session(user_session
)
723 # Check users in read_users dict able to read user list (path: /user-config)
724 logger
.debug('Verifying users which are authorised to read user list')
725 for user
in read_users
:
726 logger
.debug('Verifying user:(role,password) {}:{}'.format(user
, read_users
[user
]))
727 user_session
= rift
.auto
.mano
.get_session(session_class
, confd_host
, user
, read_users
[user
][1])
728 pxy
= user_session
.proxy(RwUserYang
)
729 logger
.debug('User {} trying to access /user-config xpath'.format(user
))
730 user_config
= pxy
.get_config('/user-config')
731 assert [user
.user_name
for user
in user_config
.user
]
733 rift
.auto
.mano
.close_session(user_session
)
735 # Check users in fail_users dict not able to create/delete an user
736 logger
.debug('Verifying users which are not supposed to create/delete an user')
737 for user
in fail_users
:
738 logger
.debug('Verifying user:(role,password) {}:{}'.format(user
, fail_users
[user
]))
739 user_session
= rift
.auto
.mano
.get_session(session_class
, confd_host
, user
, fail_users
[user
][1])
740 pxy
= user_session
.proxy(RwUserYang
)
742 user_name
= 'new-user-{}'.format(user
)
744 with pytest
.raises(Exception, message
='User {} not authorised to create user {}'.format(
745 user
, user_name
)) as excinfo
:
746 logger
.debug('User {} trying to create an user {}'.format(user
, user_name
))
747 rift
.auto
.mano
.create_user(pxy
, user_name
, rbac_user_passwd
, user_domain
)
749 with pytest
.raises(Exception, message
='User {} not authorised to delete user {}'.format(
750 user
, dummy_user_name
)) as excinfo
:
751 logger
.debug('User {} trying to delete user {}'.format(user
, dummy_user_name
))
752 rift
.auto
.mano
.delete_user(pxy
, dummy_user_name
, user_domain
)
754 rift
.auto
.mano
.close_session(user_session
)
756 if not tbac
: # password update is not allowed for external users in tbac
757 # Check all users able to set their own password
758 logger
.debug('Verifying an user able to set its own password')
759 for user
, role_passwd_tuple
in dict(write_users
, **dict(read_users
, **fail_users
)).items():
760 logger
.debug('Verifying user:(role,password) {}:{}'.format(user
, role_passwd_tuple
))
761 user_session
= rift
.auto
.mano
.get_session(session_class
, confd_host
, user
, role_passwd_tuple
[1])
762 pxy
= user_session
.proxy(RwUserYang
)
764 new_passwd
= rift
.auto
.mano
.generate_password()
765 logger
.debug('User {} trying to update its password to {}'.format(user
, new_passwd
))
766 rift
.auto
.mano
.update_password(pxy
, user
, new_passwd
, user_domain
, rw_rbac_int_proxy
)
768 # Verify the user should be able to log-in with new password
769 logger
.debug('User {} trying to log-in with its updated password {}'.format(user
, new_passwd
))
770 user_session_updated_passwd
= rift
.auto
.mano
.get_session(session_class
, confd_host
, user
, new_passwd
)
772 # Verify the user not able to log-in with old password
773 with pytest
.raises(Exception, message
='User {} not supposed to log-in with its old password {}'.format(
774 user
, role_passwd_tuple
[1])) as excinfo
:
775 logger
.debug('User {} trying to connect with its old password {}'.format(user
, role_passwd_tuple
[1]))
776 rift
.auto
.mano
.get_session(session_class
, confd_host
, user
, rbac_user_passwd
, timeout
=SESSION_CONNECT_TIMEOUT
)
778 rift
.auto
.mano
.close_session(user_session
)
779 rift
.auto
.mano
.close_session(user_session_updated_passwd
)
782 @pytest.mark
.skipif(not pytest
.config
.getoption("--account-test"), reason
="need --account-test option to run")
783 def test_account_create_delete_authorization(self
, users_test_data
, mgmt_session
, logger
, cloud_module
, fmt_cloud_xpath
,
784 fmt_prefixed_cloud_xpath
, project_acessible
, cloud_account
, session_class
, confd_host
):
785 """Verifies only users with certain roles can create/read/delete cloud, sdn accounts"""
786 write_users
, read_users
, fail_users
= users_test_data
787 xpath_no_pfx
= fmt_cloud_xpath
.format(project
=quoted_key(project_acessible
), account_name
=quoted_key(cloud_account
.name
))
788 xpath
= fmt_prefixed_cloud_xpath
.format(project
=quoted_key(project_acessible
), account_name
=quoted_key(cloud_account
.name
))
790 # Check users in write_users dict able to create/delete cloud accounts
791 logger
.debug('Verifying users which are authorised to create/delete cloud accounts')
792 for user
in write_users
:
793 logger
.debug('Verifying user:(role,password) {}:{}'.format(user
, write_users
[user
]))
794 user_session
= rift
.auto
.mano
.get_session(session_class
, confd_host
, user
, write_users
[user
][1])
795 cloud_pxy
= user_session
.proxy(cloud_module
)
797 logger
.debug('Trying to create a cloud account')
798 cloud_pxy
.replace_config(xpath
, cloud_account
)
799 response
= cloud_pxy
.get(xpath_no_pfx
)
800 assert response
.name
== cloud_account
.name
801 assert response
.account_type
== cloud_account
.account_type
803 logger
.debug('Trying to delete the cloud account')
804 cloud_pxy
.delete_config(xpath
)
805 assert cloud_pxy
.get(xpath_no_pfx
) is None
807 rift
.auto
.mano
.close_session(user_session
)
809 # admin user creating a cloud account which read_users will be trying to read
810 logger
.debug('admin user creating cloud account {}'.format(cloud_account
.name
))
811 admin_cloud_proxy
= mgmt_session
.proxy(cloud_module
)
812 admin_cloud_proxy
.replace_config(xpath
, cloud_account
)
813 assert admin_cloud_proxy
.get(xpath_no_pfx
).name
== cloud_account
.name
815 # Check users in read_users dict able to read cloud accounts
816 logger
.debug('Verifying users which are authorised to read cloud accounts')
817 for user
in read_users
:
818 logger
.debug('Verifying user:(role,password) {}:{}'.format(user
, read_users
[user
]))
819 user_session
= rift
.auto
.mano
.get_session(session_class
, confd_host
, user
, read_users
[user
][1])
820 cloud_pxy
= user_session
.proxy(cloud_module
)
822 response
= cloud_pxy
.get(xpath_no_pfx
)
823 assert response
.name
== cloud_account
.name
824 assert response
.account_type
== cloud_account
.account_type
826 rift
.auto
.mano
.close_session(user_session
)
828 # Check users in fail_users dict not able to delete/read cloud accounts
829 logger
.debug('Verifying users which are not authorised to read/delete cloud accounts')
830 for user
in fail_users
:
831 logger
.debug('Verifying user:(role,password) {}:{}'.format(user
, fail_users
[user
]))
832 user_session
= rift
.auto
.mano
.get_session(session_class
, confd_host
, user
, fail_users
[user
][1])
833 cloud_pxy
= user_session
.proxy(cloud_module
)
835 with pytest
.raises(Exception, message
='User {} not authorised to delete cloud account {}'.format(
836 user
, cloud_account
.name
)) as excinfo
:
837 logger
.debug('User {} trying to delete cloud account {}'.format(user
, cloud_account
.name
))
838 cloud_pxy
.delete_config(xpath
)
840 # logger.debug('User {} trying to access cloud account {}'.format(user, cloud_account.name))
841 # assert cloud_pxy.get(xpath_no_pfx) is None
842 rift
.auto
.mano
.close_session(user_session
)
844 # admin user deleting the cloud account
845 logger
.debug('admin user deleting cloud account {}'.format(cloud_account
.name
))
846 admin_cloud_proxy
.delete_config(xpath
)
847 assert admin_cloud_proxy
.get(xpath_no_pfx
) is None
849 # Check users in fail_users dict not able to create cloud accounts
850 logger
.debug('Verifying users which are not authorised to create cloud accounts')
851 for user
in fail_users
:
852 logger
.debug('Verifying user:(role,password) {}:{}'.format(user
, fail_users
[user
]))
853 user_session
= rift
.auto
.mano
.get_session(session_class
, confd_host
, user
, fail_users
[user
][1])
854 cloud_pxy
= user_session
.proxy(cloud_module
)
856 with pytest
.raises(Exception, message
='User {} not authorised to create cloud account {}'.format(
857 user
, cloud_account
.name
)) as excinfo
:
858 logger
.debug('User {} trying to create a cloud account {}'.format(user
, cloud_account
.name
))
859 cloud_pxy
.replace_config(xpath
, cloud_account
)
861 rift
.auto
.mano
.close_session(user_session
)
864 def delete_descriptors(project
, vnfd_proxy
, nsd_proxy
, vnfd_xpath
, nsd_xpath
, fmt_vnfd_id_xpath
, fmt_nsd_id_xpath
):
865 nsds
= nsd_proxy
.get('{}/nsd'.format(nsd_xpath
), list_obj
=True)
867 xpath
= fmt_nsd_id_xpath
.format(project
=quoted_key(project
), nsd_id
=quoted_key(nsd
.id))
868 nsd_proxy
.delete_config(xpath
)
869 nsds
= nsd_proxy
.get('{}/nsd'.format(nsd_xpath
), list_obj
=True)
870 assert nsds
is None or len(nsds
.nsd
) == 0
872 vnfds
= vnfd_proxy
.get('{}/vnfd'.format(vnfd_xpath
), list_obj
=True)
873 for vnfd_record
in vnfds
.vnfd
:
874 xpath
= fmt_vnfd_id_xpath
.format(project
=quoted_key(project
), vnfd_id
=quoted_key(vnfd_record
.id))
875 vnfd_proxy
.delete_config(xpath
)
877 vnfds
= vnfd_proxy
.get('{}/vnfd'.format(vnfd_xpath
), list_obj
=True)
878 assert vnfds
is None or len(vnfds
.vnfd
) == 0
880 @pytest.mark
.skipif(not pytest
.config
.getoption("--onboarding-test"), reason
="need --onboarding-test option to run")
881 def test_onboarding_authorization(self
, users_test_data
, logger
, descriptors
, session_class
, confd_host
,
882 fmt_vnfd_catalog_xpath
, fmt_nsd_catalog_xpath
, fmt_nsd_id_xpath
, fmt_vnfd_id_xpath
, project_acessible
, mgmt_session
):
883 """Verifies only users with certain roles can onboard/update/delete a package"""
885 descriptor_vnfds
, descriptor_nsd
= descriptors
[:-1], descriptors
[-1]
886 write_users
, read_users
, fail_users
= users_test_data
887 logger
.debug('The descriptrs being used: {}'.format(descriptors
))
888 nsd_xpath
= fmt_nsd_catalog_xpath
.format(project
=quoted_key(project_acessible
))
889 vnfd_xpath
= fmt_vnfd_catalog_xpath
.format(project
=quoted_key(project_acessible
))
891 def onboard(user_session
, project
):
892 for descriptor
in descriptors
:
893 rift
.auto
.descriptor
.onboard(user_session
, descriptor
, project
=project
)
895 def verify_descriptors(vnfd_pxy
, nsd_pxy
, vnfd_count
, nsd_count
):
896 catalog
= vnfd_pxy
.get_config(vnfd_xpath
)
897 actual_vnfds
= catalog
.vnfd
898 assert len(actual_vnfds
) == vnfd_count
, 'There should be {} vnfds'.format(vnfd_count
)
899 catalog
= nsd_pxy
.get_config(nsd_xpath
)
900 actual_nsds
= catalog
.nsd
901 assert len(actual_nsds
) == nsd_count
, 'There should be {} nsd'.format(nsd_count
)
903 # Check users in write_users dict able to onboard/delete descriptors
904 logger
.debug('Verifying users which are authorised to onboard/delete descriptors')
905 for user
in write_users
:
906 logger
.debug('Verifying user:(role,password) {}:{}'.format(user
, write_users
[user
]))
907 user_session
= rift
.auto
.mano
.get_session(session_class
, confd_host
, user
, write_users
[user
][1])
908 vnfd_pxy
= user_session
.proxy(RwProjectVnfdYang
)
909 nsd_pxy
= user_session
.proxy(RwProjectNsdYang
)
910 logger
.debug('Trying to onboard ping-pong descriptors')
911 onboard(user_session
, project_acessible
)
912 logger
.debug('Verifying if the descriptors are uploaded')
913 verify_descriptors(vnfd_pxy
, nsd_pxy
, len(descriptor_vnfds
), 1)
915 logger
.debug('Trying to delete descriptors')
916 TestRbacVerification
.delete_descriptors(project_acessible
, vnfd_pxy
, nsd_pxy
, vnfd_xpath
, nsd_xpath
,
917 fmt_vnfd_id_xpath
, fmt_nsd_id_xpath
)
919 rift
.auto
.mano
.close_session(user_session
)
921 # onboard the descriptors using mgmt_session which read_users will try to read
922 logger
.debug('admin user uploading the descriptors which read_users will try to read')
923 onboard(mgmt_session
, project_acessible
)
924 admin_vnfd_pxy
= mgmt_session
.proxy(RwProjectVnfdYang
)
925 admin_nsd_pxy
= mgmt_session
.proxy(RwProjectNsdYang
)
926 logger
.debug('Verifying if the descriptors are uploaded')
927 verify_descriptors(admin_vnfd_pxy
, admin_nsd_pxy
, len(descriptor_vnfds
), 1)
929 # Check users in read_users dict able to read already onboarded descriptors
930 logger
.debug('Verifying users which are authorised to read descriptors')
931 for user
in read_users
:
932 logger
.debug('Verifying user:(role,password) {}:{}'.format(user
, read_users
[user
]))
933 user_session
= rift
.auto
.mano
.get_session(session_class
, confd_host
, user
, read_users
[user
][1])
934 vnfd_pxy
= user_session
.proxy(RwProjectVnfdYang
)
935 nsd_pxy
= user_session
.proxy(RwProjectNsdYang
)
937 logger
.debug('Trying to read ping-pong descriptors')
938 verify_descriptors(vnfd_pxy
, nsd_pxy
, len(descriptor_vnfds
), 1)
940 rift
.auto
.mano
.close_session(user_session
)
942 # Check users in fail_users dict not able to onboard/delete descriptors
943 logger
.debug('Verifying users which are not supposed to delete descriptors')
944 for user
in fail_users
:
945 logger
.debug('Verifying user:(role,password) {}:{}'.format(user
, fail_users
[user
]))
946 user_session
= rift
.auto
.mano
.get_session(session_class
, confd_host
, user
, fail_users
[user
][1])
947 vnfd_pxy
= user_session
.proxy(RwProjectVnfdYang
)
948 nsd_pxy
= user_session
.proxy(RwProjectNsdYang
)
950 with pytest
.raises(Exception, message
='User {} not authorised to delete descriptors'.format(user
)) as excinfo
:
951 logger
.debug('User {} trying to delete descriptors'.format(user
))
952 TestRbacVerification
.delete_descriptors(project_acessible
, vnfd_pxy
, nsd_pxy
, vnfd_xpath
, nsd_xpath
,
953 fmt_vnfd_id_xpath
, fmt_nsd_id_xpath
)
955 rift
.auto
.mano
.close_session(user_session
)
957 logger
.debug('Deleting the descriptors as fail_users trying to upload the descriptors')
958 TestRbacVerification
.delete_descriptors(project_acessible
, admin_vnfd_pxy
, admin_nsd_pxy
, vnfd_xpath
, nsd_xpath
,
959 fmt_vnfd_id_xpath
, fmt_nsd_id_xpath
)
961 logger
.debug('Verifying users which are not supposed to create descriptors')
962 for user
in fail_users
:
963 logger
.debug('Verifying user:(role,password) {}:{}'.format(user
, fail_users
[user
]))
964 user_session
= rift
.auto
.mano
.get_session(session_class
, confd_host
, user
, fail_users
[user
][1])
965 vnfd_pxy
= user_session
.proxy(RwProjectVnfdYang
)
966 nsd_pxy
= user_session
.proxy(RwProjectNsdYang
)
968 with pytest
.raises(Exception, message
='User {} not authorised to onboard descriptors'.format(user
)) as excinfo
:
969 logger
.debug('User {} trying to onboard ping-pong descriptors'.format(user
))
970 onboard(user_session
)
972 rift
.auto
.mano
.close_session(user_session
)
974 @pytest.mark
.skipif(not pytest
.config
.getoption("--nsr-test"),
975 reason
="need --nsr-test option to run")
976 def test_nsr_authorization(self
, users_test_data
, logger
, cloud_account
,
977 cloud_module
, descriptors
, session_class
,
978 confd_host
, fmt_cloud_xpath
,
979 fmt_prefixed_cloud_xpath
, mgmt_session
, fmt_nsd_id_xpath
, fmt_vnfd_id_xpath
,
980 project_acessible
, fmt_nsd_catalog_xpath
, fmt_vnfd_catalog_xpath
):
981 """Verifies only users with certain roles can
982 create/read/delete nsr/vlr/vnfr
985 descriptor_vnfds
, descriptor_nsd
= descriptors
[:-1], descriptors
[-1]
986 write_users
, read_users
, fail_users
= users_test_data
988 # Cloud account creation
989 logger
.debug('Creating a cloud account which will be used for NS instantiation')
990 cloud_pxy
= mgmt_session
.proxy(cloud_module
)
991 cloud_pxy
.replace_config(fmt_prefixed_cloud_xpath
.format(project
=quoted_key(project_acessible
),
992 account_name
=quoted_key(cloud_account
.name
)),
994 response
= cloud_pxy
.get(
995 fmt_cloud_xpath
.format(project
=quoted_key(project_acessible
), account_name
=quoted_key(cloud_account
.name
)))
996 assert response
.name
== cloud_account
.name
998 cloud_pxy
.wait_for(fmt_cloud_xpath
.format(project
=quoted_key(project_acessible
), account_name
=quoted_key(
999 cloud_account
.name
)) + '/connection-status/status', 'success', timeout
=30, fail_on
=['failure'])
1001 # Upload the descriptors
1002 nsd_xpath
= fmt_nsd_catalog_xpath
.format(project
=quoted_key(project_acessible
))
1003 vnfd_xpath
= fmt_vnfd_catalog_xpath
.format(project
=quoted_key(project_acessible
))
1004 logger
.debug('Uploading descriptors {} which will be used for NS instantiation'.format(descriptors
))
1005 for descriptor
in descriptors
:
1006 rift
.auto
.descriptor
.onboard(mgmt_session
, descriptor
, project
=project_acessible
)
1007 admin_nsd_pxy
= mgmt_session
.proxy(RwProjectNsdYang
)
1008 nsd_catalog
= admin_nsd_pxy
.get_config(nsd_xpath
)
1010 nsd
= nsd_catalog
.nsd
[0]
1011 nsr
= rift
.auto
.descriptor
.create_nsr(cloud_account
.name
, nsd
.name
, nsd
)
1013 # Check users in write_users dict able to instantiate/delete a NS
1014 logger
.debug('Verifying users which are authorised to instantiate/delete a NS')
1015 for user
in write_users
:
1016 logger
.debug('Verifying user:(role,password) {}:{}'.format(user
, write_users
[user
]))
1017 user_session
= rift
.auto
.mano
.get_session(session_class
, confd_host
, user
, write_users
[user
][1])
1018 rwnsr_pxy
= user_session
.proxy(RwNsrYang
)
1019 rwvnfr_pxy
= user_session
.proxy(RwVnfrYang
)
1020 rwvlr_pxy
= user_session
.proxy(RwVlrYang
)
1022 logger
.info("Trying to instantiate the Network Service")
1023 rift
.auto
.descriptor
.instantiate_nsr(nsr
, rwnsr_pxy
, logger
,
1024 project
=project_acessible
)
1026 logger
.info("Trying to terminate the Network Service")
1027 rift
.auto
.descriptor
.terminate_nsr(rwvnfr_pxy
, rwnsr_pxy
,
1031 # Instantiate a NS which the read_users, fail_users will try to
1033 admin_rwnsr_pxy
= mgmt_session
.proxy(RwNsrYang
)
1034 admin_rwvnfr_pxy
= mgmt_session
.proxy(RwVnfrYang
)
1035 admin_rwvlr_pxy
= mgmt_session
.proxy(RwVlrYang
)
1036 logger
.debug('admin user instantiating NS which the read_users, fail_users will try to read/delete.')
1037 rift
.auto
.descriptor
.instantiate_nsr(nsr
, admin_rwnsr_pxy
, logger
, project
=project_acessible
)
1039 # Check users in read_users, write_users dict able to read vnfr-console, vnfr-catalog, ns-instance-opdata
1040 p_xpath
= '/project[name={}]'.format(quoted_key(project_acessible
))
1041 read_xpaths
= ['/ns-instance-opdata', '/vnfr-catalog', '/vnfr-console']
1042 logger
.debug('Verifying users which are authorised to read vnfr-catalog, ns-instance-opdata, vnfr-console etc')
1043 for user
, role_passwd_tuple
in dict(write_users
, **read_users
).items():
1044 logger
.debug('Verifying user:(role,password) {}:{}'.format(user
, role_passwd_tuple
))
1045 user_session
= rift
.auto
.mano
.get_session(session_class
, confd_host
, user
, role_passwd_tuple
[1])
1046 rwnsr_pxy
= user_session
.proxy(RwNsrYang
)
1047 rwvnfr_pxy
= user_session
.proxy(RwVnfrYang
)
1048 for xpath
in read_xpaths
:
1049 logger
.debug('Trying to read xpath: {}'.format(p_xpath
+xpath
))
1050 proxy_
= rwvnfr_pxy
if 'vnfr' in xpath
else rwnsr_pxy
1051 assert proxy_
.get(p_xpath
+xpath
)
1053 rift
.auto
.mano
.close_session(user_session
)
1055 # Check users in fail_users dict not able to terminate a NS
1056 logger
.debug('Verifying users which are NOT authorised to terminate a NS')
1057 for user
in fail_users
:
1058 logger
.debug('Verifying user:(role,password) {}:{}'.format(user
, fail_users
[user
]))
1059 user_session
= rift
.auto
.mano
.get_session(session_class
, confd_host
, user
, fail_users
[user
][1])
1060 rwnsr_pxy
= user_session
.proxy(RwNsrYang
)
1061 rwvnfr_pxy
= user_session
.proxy(RwVnfrYang
)
1063 with pytest
.raises(Exception, message
='User {} not authorised to terminate NS'.format(user
)) as excinfo
:
1064 logger
.debug('User {} trying to delete NS'.format(user
))
1065 rift
.auto
.descriptor
.terminate_nsr(rwvnfr_pxy
, rwnsr_pxy
,
1066 logger
, admin_rwvlr_pxy
,
1067 project
=project_acessible
)
1068 rift
.auto
.mano
.close_session(user_session
)
1070 # Terminate the NS instantiated by admin user
1071 logger
.debug('admin user terminating the NS')
1072 rift
.auto
.descriptor
.terminate_nsr(admin_rwvnfr_pxy
,
1074 admin_rwvlr_pxy
, logger
,
1075 project
=project_acessible
)
1077 # Check users in fail_users dict not able to instantiate a NS
1078 nsr
.id = str(uuid
.uuid4())
1079 logger
.debug('Verifying users which are NOT authorised to instantiate a NS')
1080 for user
in fail_users
:
1081 logger
.debug('Verifying user:(role,password) {}:{}'.format(user
, fail_users
[user
]))
1082 user_session
= rift
.auto
.mano
.get_session(session_class
, confd_host
, user
, fail_users
[user
][1])
1083 rwnsr_pxy
= user_session
.proxy(RwNsrYang
)
1084 rwvnfr_pxy
= user_session
.proxy(RwVnfrYang
)
1086 with pytest
.raises(Exception, message
='User {} not authorised to instantiate NS'.format(user
)) as excinfo
:
1087 logger
.debug('User {} trying to instantiate NS'.format(user
))
1088 rift
.auto
.descriptor
.instantiate_nsr(nsr
, rwnsr_pxy
, logger
, project
=project_acessible
)
1089 rift
.auto
.mano
.close_session(user_session
)
1091 # delete cloud accounts and descriptors; else deleting project in teardown fails
1092 cloud_pxy
.delete_config(fmt_prefixed_cloud_xpath
.format(project
=quoted_key(project_acessible
),
1093 account_name
=quoted_key(cloud_account
.name
)))
1094 admin_vnfd_pxy
= mgmt_session
.proxy(RwProjectVnfdYang
)
1095 TestRbacVerification
.delete_descriptors(project_acessible
, admin_vnfd_pxy
, admin_nsd_pxy
, vnfd_xpath
, nsd_xpath
,
1096 fmt_vnfd_id_xpath
, fmt_nsd_id_xpath
)
1098 @pytest.mark
.skipif(not pytest
.config
.getoption("--syslog-server-test"), reason
="need --syslog-server-test option to run")
1099 def test_set_syslog_server_authorization(self
, mgmt_session
, users_test_data
, session_class
, confd_host
, logger
):
1100 """Verifies only users with certain roles can set syslog server"""
1101 write_users
, read_users
, fail_users
= users_test_data
1102 admin_log_mgmt_pxy
= mgmt_session
.proxy(RwlogMgmtYang
)
1104 def update_syslog_server_address(user_log_mgmt_pxy
):
1105 ip
= '127.0.0.{}'.format(random
.randint(0,255))
1106 sink_obj
= RwlogMgmtYang
.Logging_Sink
.from_dict({'server_address': ip
})
1108 syslog_name
= admin_log_mgmt_pxy
.get_config('/logging').sink
[0].name
1109 logger
.debug('updating the syslog {} server_address to {}'.format(syslog_name
, ip
))
1110 user_log_mgmt_pxy
.merge_config('/logging/sink[name={sink_name}]'.format(sink_name
=quoted_key(syslog_name
)), sink_obj
)
1111 assert [sink
.server_address
for sink
in admin_log_mgmt_pxy
.get_config('/logging').sink
if sink
.name
== syslog_name
][0] == ip
1113 for user
, role_passwd_tuple
in dict(write_users
, **dict(read_users
, **fail_users
)).items():
1114 logger
.debug('Verifying user:(role,password) {}:{}'.format(user
, role_passwd_tuple
))
1115 user_session
= rift
.auto
.mano
.get_session(session_class
, confd_host
, user
, role_passwd_tuple
[1])
1116 user_log_mgmt_pxy
= user_session
.proxy(RwlogMgmtYang
)
1118 if user
in write_users
:
1119 logger
.debug('User {} should be able to update the syslog server address'.format(user
))
1120 update_syslog_server_address(user_log_mgmt_pxy
)
1122 if user
in fail_users
:
1123 with pytest
.raises(Exception, message
='User {} not authorised to set syslog server address'.format(user
)) as excinfo
:
1124 logger
.debug('User {} trying to update the syslog server address. It should fail'.format(user
))
1125 update_syslog_server_address(user_log_mgmt_pxy
)
1127 if user
in read_users
:
1128 logger
.debug('User {} trying to read the syslog server address'.format(user
))
1129 logging_obj
= user_log_mgmt_pxy
.get_config('/logging')
1130 assert logging_obj
.sink
[0]
1131 assert logging_obj
.sink
[0].server_address
1133 @pytest.mark
.skipif(not pytest
.config
.getoption("--redundancy-role-test"), reason
="need --redundancy-role-test option to run")
1134 def test_redundancy_config_authorization(self
, mgmt_session
, users_test_data
, session_class
, confd_host
, logger
, redundancy_config_test_roles
):
1135 """Verifies only users with certain roles can set redundancy-config or read redundancy-state"""
1136 write_users
, read_users
, fail_users
= users_test_data
1137 admin_redundancy_pxy
= mgmt_session
.proxy(RwRedundancyYang
)
1138 site_nm_pfx
= 'ha_site_'
1140 def create_redundancy_site(user_redundancy_pxy
, site_nm
):
1141 site_id
= '127.0.0.1'
1142 site_obj
= RwRedundancyYang
.YangData_RwRedundancy_RedundancyConfig_Site
.from_dict({'site_name': site_nm
, 'site_id': site_id
})
1144 logger
.debug('Creating redundancy site {}'.format(site_nm
))
1145 user_redundancy_pxy
.create_config('/rw-redundancy:redundancy-config/rw-redundancy:site', site_obj
)
1146 assert [site
.site_name
for site
in admin_redundancy_pxy
.get_config('/redundancy-config/site', list_obj
=True).site
if site
.site_name
== site_nm
]
1148 def delete_redundancy_site(user_redundancy_pxy
, site_nm
):
1149 logger
.debug('Deleting redundancy site {}'.format(site_nm
))
1150 user_redundancy_pxy
.delete_config('/rw-redundancy:redundancy-config/rw-redundancy:site[rw-redundancy:site-name={}]'.format(quoted_key(site_nm
)))
1151 assert not [site
.site_name
for site
in admin_redundancy_pxy
.get_config('/redundancy-config/site', list_obj
=True).site
if site
.site_name
== site_nm
]
1153 # Create a redundancy site which fail user will try to delete/ read user will try to read
1154 create_redundancy_site(admin_redundancy_pxy
, 'test_site')
1156 for user
, role_passwd_tuple
in dict(write_users
, **dict(read_users
, **fail_users
)).items():
1157 logger
.debug('Verifying user:(role,password) {}:{}'.format(user
, role_passwd_tuple
))
1158 user_session
= rift
.auto
.mano
.get_session(session_class
, confd_host
, user
, role_passwd_tuple
[1])
1159 user_redundancy_pxy
= user_session
.proxy(RwRedundancyYang
)
1161 if user
in write_users
:
1162 site_nm
= '{}_{}'.format(site_nm_pfx
, user
)
1163 logger
.debug('User {} should be able to create a new redundancy site {}'.format(user
, site_nm
))
1164 create_redundancy_site(user_redundancy_pxy
, site_nm
)
1166 logger
.debug('User {} should be able to delete a redundancy site {}'.format(user
, site_nm
))
1167 delete_redundancy_site(user_redundancy_pxy
, site_nm
)
1169 assert user_redundancy_pxy
.get('/redundancy-state')
1171 if user
in fail_users
:
1172 site_nm
= '{}_{}'.format(site_nm_pfx
, user
)
1173 with pytest
.raises(Exception, message
='User {} not authorised to create redundancy site'.format(user
)) as excinfo
:
1174 logger
.debug('User {} trying to create redundancy site {}. It should fail'.format(user
, site_nm
))
1175 create_redundancy_site(user_redundancy_pxy
, site_nm
)
1177 with pytest
.raises(Exception, message
='User {} not authorised to delete redundancy site'.format(user
)) as excinfo
:
1178 logger
.debug('User {} trying to delete redundancy site {}. It should fail'.format(user
, site_nm
))
1179 delete_redundancy_site(user_redundancy_pxy
, 'test_site')
1181 if user
in read_users
:
1182 logger
.debug('User {} trying to read redundancy-config'.format(user
))
1183 assert user_redundancy_pxy
.get('/redundancy-state')
1184 assert user_redundancy_pxy
.get('/redundancy-config')
1187 @pytest.mark
.depends('test_rbac_roles_setup')
1188 @pytest.mark
.teardown('test_rbac_roles_setup')
1189 @pytest.mark
.incremental
1190 class TestRbacTeardown(object):
1191 def test_delete_project(self
, rw_project_proxy
, logger
, project_keyed_xpath
, project_acessible
):
1192 """Deletes projects used for the test"""
1193 if rw_project_proxy
.get_config(project_keyed_xpath
.format(project_name
=quoted_key(project_acessible
))+'/project-state', list_obj
=True):
1194 logger
.debug('Deleting project {}'.format(project_acessible
))
1195 rift
.auto
.mano
.delete_project(rw_project_proxy
, project_acessible
)
1197 def test_delete_users(self
, users_test_data
, logger
, rw_user_proxy
, rbac_platform_proxy
, platform_config_keyed_xpath
,
1198 user_keyed_xpath
, user_domain
, rw_conman_proxy
, project_acessible
):
1199 """Deletes the users which are part of rbac test-data and verify their deletion"""
1200 write_users
, read_users
, fail_users
= users_test_data
1202 for user
, role_passwd_tuple
in dict(write_users
, **dict(read_users
, **fail_users
)).items():
1203 logger
.debug('Deleting user:(role,password) {}:{}'.format(user
, role_passwd_tuple
))
1204 if any('platform' in role
for role
in role_passwd_tuple
[0]):
1205 rbac_platform_proxy
.delete_config(platform_config_keyed_xpath
.format(user
=quoted_key(user
), domain
=quoted_key(user_domain
)))
1206 rw_user_proxy
.delete_config(user_keyed_xpath
.format(user
=quoted_key(user
), domain
=quoted_key(user_domain
)))
1208 # Verify if the user is deleted
1209 user_config
= rw_user_proxy
.get_config('/user-config')
1210 current_users_list
= [user
.user_name
for user
in user_config
.user
]
1212 assert user
not in current_users_list
1214 # Verify only two users should be present now: oper & admin
1215 user_config
= rw_user_proxy
.get_config('/user-config')
1216 current_users_list
= [user
.user_name
for user
in user_config
.user
]
1218 logger
.debug('Current users list after deleting all test users: {}'.format(current_users_list
))
1219 expected_empty_user_list
= [user
for user
in users_test_data
if user
in current_users_list
]
1220 assert not expected_empty_user_list