update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / ra / pytest / ns / rbac / test_rbac_roles.py
1 #!/usr/bin/env python3
2 """
3 #
4 # Copyright 2017 RIFT.IO Inc
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18 """
19 import collections
20 import gi
21 import pytest
22 import random
23 import uuid
24
25 import rift.auto.mano
26 import rift.auto.descriptor
27 gi.require_version('RwUserYang', '1.0')
28 gi.require_version('RwProjectYang', '1.0')
29 gi.require_version('RwConmanYang', '1.0')
30 gi.require_version('RwProjectNsdYang', '1.0')
31 gi.require_version('RwNsrYang', '1.0')
32 gi.require_version('RwVnfrYang', '1.0')
33 gi.require_version('RwlogMgmtYang', '1.0')
34 from gi.repository import (
35 RwUserYang,
36 RwProjectYang,
37 RwConmanYang,
38 RwProjectVnfdYang,
39 RwProjectNsdYang,
40 RwNsrYang,
41 RwVnfrYang,
42 RwVlrYang,
43 RwRbacPlatformYang,
44 RwlogMgmtYang,
45 RwRedundancyYang,
46 )
47 gi.require_version('RwKeyspec', '1.0')
48 from gi.repository.RwKeyspec import quoted_key
49
50 SESSION_CONNECT_TIMEOUT=5
51
52 @pytest.fixture(scope='session')
53 def user_test_roles():
54 """Returns tuples of roles which enable an user to delete/create a new user"""
55 write_roles = ('rw-rbac-platform:super-admin', 'rw-rbac-platform:platform-admin')
56 read_roles = tuple()
57 return write_roles, read_roles
58
59
60 @pytest.fixture(scope='session')
61 def project_test_roles():
62 """Returns tuples of roles which enable an user to create, read, delete a project"""
63 write_roles = ('rw-rbac-platform:super-admin', )
64 read_roles = ('rw-project:project-oper', 'rw-project:project-admin')
65 return write_roles, read_roles
66
67
68 @pytest.fixture(scope='session')
69 def onboarding_test_roles():
70 """Fixture that returns a tuple of roles which enable an user to onboard/modify/delete a VNF/NS package"""
71 write_roles = ('rw-rbac-platform:super-admin', 'rw-project-mano:catalog-admin', 'rw-project:project-admin')
72 read_roles = ('rw-project-mano:catalog-oper', 'rw-project-mano:lcm-admin')
73 return write_roles, read_roles
74
75
76 @pytest.fixture(scope='session')
77 def account_test_roles():
78 """Fixture that returns a tuple of roles which enable an user to CRUD a VIM, Sdn account"""
79 write_roles = ('rw-rbac-platform:super-admin', 'rw-project-mano:account-admin', 'rw-project:project-admin')
80 read_roles = ('rw-project-mano:account-oper', 'rw-project-mano:lcm-admin')
81 return write_roles, read_roles
82
83
84 @pytest.fixture(scope='session')
85 def ns_instantiate_test_roles():
86 """Fixture that returns a tuple of roles which enable an user to instantiate/terminate a NS
87 Read roles: who all can access vnfr-catalog, vnfr-console, ns-instance-opdata etc"""
88 write_roles = ('rw-rbac-platform:super-admin', 'rw-project-mano:lcm-admin', 'rw-project:project-admin')
89 read_roles = ('rw-project-mano:lcm-oper', )
90 return write_roles, read_roles
91
92
93 @pytest.fixture(scope='session')
94 def syslog_server_test_roles():
95 """Fixture that returns a tuple of roles which enable an user set the syslog server_address"""
96 write_roles = ('rw-rbac-platform:super-admin', 'rw-rbac-platform:platform-admin', 'rw-rbac-platform:platform-oper')
97 read_roles = tuple()
98 return write_roles, read_roles
99
100
101 @pytest.fixture(scope='session')
102 def redundancy_config_test_roles():
103 """Fixture that returns a tuple of roles which enable an user set the syslog server_address"""
104 write_roles = ('rw-rbac-platform:super-admin', 'rw-rbac-platform:platform-admin')
105 read_roles = ('rw-rbac-platform:platform-oper', )
106 return write_roles, read_roles
107
108
109 @pytest.fixture(scope='session')
110 def project_acessible():
111 """Fixture that returns name of the project to which all new users will be associated"""
112 return random.choice(['project1', 'default'])
113
114
115 # @pytest.fixture(scope='session')
116 # def project_not_accessible():
117 # """Retruns name of the project whose users are not supposed to access the resources under project 'project_acessible'"""
118 # return 'project2'
119
120
121 @pytest.fixture(scope='session')
122 def users_test_data(rw_user_proxy, rbac_platform_proxy, rw_project_proxy, all_roles, user_test_roles, project_test_roles,
123 onboarding_test_roles, account_test_roles, ns_instantiate_test_roles, user_domain, project_acessible, rw_conman_proxy,
124 syslog_server_test_roles, all_roles_combinations, rw_rbac_int_proxy, tbac, redundancy_config_test_roles):
125 """Creates new users required for a test and assign appropriate roles to them"""
126 if pytest.config.getoption("--user-creation-test"):
127 test_roles = user_test_roles
128 elif pytest.config.getoption("--project-creation-test"):
129 test_roles = project_test_roles
130 elif pytest.config.getoption("--onboarding-test"):
131 test_roles = onboarding_test_roles
132 elif pytest.config.getoption("--account-test"):
133 test_roles = account_test_roles
134 elif pytest.config.getoption("--nsr-test"):
135 test_roles = ns_instantiate_test_roles
136 elif pytest.config.getoption("--syslog-server-test"):
137 test_roles = syslog_server_test_roles
138 elif pytest.config.getoption("--redundancy-role-test"):
139 test_roles = redundancy_config_test_roles
140
141 # Create a project to which these users will be part of
142 if project_acessible != 'default':
143 rift.auto.mano.create_project(rw_conman_proxy, project_acessible)
144
145 def create_user_assign_role(user_name, password, role_set):
146 rift.auto.mano.create_user(rw_user_proxy, user_name, password, user_domain)
147 project_roles_list, platform_roles_list = [], []
148 for role in role_set:
149 if 'platform' in role:
150 platform_roles_list.append(role)
151 else:
152 project_roles_list.append(role)
153 if platform_roles_list:
154 rift.auto.mano.assign_platform_role_to_user(rbac_platform_proxy, platform_roles_list, user_name, user_domain, rw_rbac_int_proxy)
155 if project_roles_list:
156 rift.auto.mano.assign_project_role_to_user(rw_project_proxy, project_roles_list, user_name,
157 project_acessible, user_domain, rw_rbac_int_proxy)
158
159 write_roles, read_roles = test_roles
160 fail_roles = [role for role in all_roles if role not in write_roles]
161
162 if False: #If its desired to run checks for all combinations, tbd on what option this will be enabled
163 write_roles_tmp, read_roles_tmp, fail_roles_tmp = [], [], []
164 for role_combination in all_roles_combinations:
165 if bool(set(role_combination).intersection(write_roles)):
166 write_roles_tmp.append(role_combination)
167 continue
168 if bool(set(role_combination).intersection(read_roles)):
169 read_roles_tmp.append(role_combination)
170 continue
171 if bool(set(role_combination).isdisjoint(write_roles)):
172 fail_roles_tmp.append(role_combination)
173 write_roles, read_roles, fail_roles = write_roles_tmp, read_roles_tmp, fail_roles_tmp
174
175 # Create the users with roles mapped
176 write_users, read_users, fail_users = dict(), dict(), dict()
177 for idx, role_set in enumerate(write_roles, 1):
178 if type(role_set) is str:
179 role_set = [role_set]
180 user_name = 'write-{}'.format(idx)
181 if tbac:
182 password=user_name
183 else:
184 password = rift.auto.mano.generate_password()
185 create_user_assign_role(user_name, password, role_set)
186 write_users[user_name] = (role_set, password)
187
188 for idx, role_set in enumerate(read_roles, 1):
189 if type(role_set) is str:
190 role_set = [role_set]
191 user_name = 'read-{}'.format(idx)
192 if tbac:
193 password=user_name
194 else:
195 password = rift.auto.mano.generate_password()
196 create_user_assign_role(user_name, password, role_set)
197 read_users[user_name] = (role_set, password)
198
199 for idx, role_set in enumerate(fail_roles, 1):
200 if type(role_set) is str:
201 role_set = [role_set]
202 user_name = 'fail-{}'.format(idx)
203 if tbac:
204 password=user_name
205 else:
206 password = rift.auto.mano.generate_password()
207 create_user_assign_role(user_name, password, role_set)
208 fail_users[user_name] = (role_set, password)
209 return write_users, read_users, fail_users
210
211
212 @pytest.mark.setup('test_rbac_roles_setup')
213 @pytest.mark.incremental
214 class TestRbacVerification(object):
215 @pytest.mark.skipif(not pytest.config.getoption("--project-creation-test"), reason="need --project-creation-test option to run")
216 def test_project_create_delete_authorization(self, logger, users_test_data, session_class, confd_host, rw_conman_proxy,
217 project_keyed_xpath, project_acessible):
218 """Verifies only users with certain roles can create/delete a project"""
219
220 write_users, read_users, fail_users = users_test_data
221
222 # Check users in write_users dict able to create/delete a project
223 logger.debug('Verifying users which are authorised to create/delete a project')
224 for user in write_users:
225 logger.debug('Verifying user:(role,password) {}:{}'.format(user, write_users[user]))
226 user_session = rift.auto.mano.get_session(session_class, confd_host, user, write_users[user][1])
227 pxy = user_session.proxy(RwProjectYang)
228
229 project_name = 'project-{}'.format(user)
230 logger.debug('Trying to create project {}'.format(project_name))
231 rift.auto.mano.create_project(pxy, project_name)
232
233 logger.debug('Trying to delete project {}'.format(project_name))
234 rift.auto.mano.delete_project(pxy, project_name)
235
236 rift.auto.mano.close_session(user_session)
237
238 # Check users in read_users dict able to read a project
239 logger.debug('Verifying users which are authorised to read a project')
240 for user in read_users:
241 logger.debug('Verifying user:(role,password) {}:{}'.format(user, read_users[user]))
242 user_session = rift.auto.mano.get_session(session_class, confd_host, user, read_users[user][1])
243 pxy = user_session.proxy(RwProjectYang)
244
245 logger.debug('User {} trying to read project {}'.format(user, project_acessible))
246 project_ = pxy.get_config(project_keyed_xpath.format(project_name=quoted_key(project_acessible))+'/project-state', list_obj=True)
247 assert project_
248
249 rift.auto.mano.close_session(user_session)
250
251 # Check users in fail_users dict shouldn't be allowed to create a project or delete a project
252
253 # 'project-admin' user not able to create a project, but can delete a project, hence do the create/delete
254 # operation for this user at the end
255 fail_users_reordered = collections.OrderedDict()
256 for user, role_passwd_tuple in fail_users.items():
257 if any('project-admin' in role for role in role_passwd_tuple[0]):
258 project_admin_key, project_admin_val = user, role_passwd_tuple
259 continue
260 fail_users_reordered[user] = role_passwd_tuple
261 fail_users_reordered[project_admin_key] = project_admin_val
262
263 logger.debug('Verifying users which are not supposed to create/delete a project')
264 for user in fail_users_reordered:
265 logger.debug('Verifying user:(role,password) {}:{}'.format(user, fail_users_reordered[user]))
266 user_session = rift.auto.mano.get_session(session_class, confd_host, user, fail_users_reordered[user][1])
267 pxy = user_session.proxy(RwProjectYang)
268
269 project_name = 'project-{}'.format(user)
270
271 with pytest.raises(Exception, message='User {} not authorised to create project {}'.format(
272 user, project_name)) as excinfo:
273 logger.debug('User {} trying to create project {}'.format(user, project_name))
274 rift.auto.mano.create_project(pxy, project_name)
275
276 logger.debug('User {} trying to delete project {}'.format(user, project_acessible))
277 if any('project-admin' in role for role in fail_users_reordered[user][0]):
278 rift.auto.mano.delete_project(pxy, project_acessible)
279 continue
280 with pytest.raises(Exception, message='User {} not authorised to delete project {}'.format(
281 user, project_acessible)) as excinfo:
282 rift.auto.mano.delete_project(pxy, project_acessible)
283
284 rift.auto.mano.close_session(user_session)
285
286 def delete_user_from_project(
287 self, project_proxy, target_user, target_project, user_domain):
288 project_xpath = (
289 "/project[name={project}]/project-config/user" +
290 "[user-name={user}][user-domain={domain}]"
291 )
292 # Check if the user exists for the project
293 ret_val = project_proxy.get_config(
294 project_xpath.format(
295 project=quoted_key(target_project),
296 user=quoted_key(target_user),
297 domain=quoted_key(user_domain)))
298
299 assert ret_val
300 # Delete the target_user from the target_project
301 project_proxy.delete_config(
302 project_xpath.format(
303 project=quoted_key(target_project),
304 user=quoted_key(target_user),
305 domain=quoted_key(user_domain))
306 )
307 # Verify that he is deleted
308 ret_val = project_proxy.get_config(
309 project_xpath.format(
310 project=quoted_key(target_project),
311 user=quoted_key(target_user),
312 domain=quoted_key(user_domain))
313 )
314 assert ret_val is None
315
316 @pytest.mark.skipif(
317 not pytest.config.getoption("--project-creation-test"),
318 reason="need --project-creation-test option to run")
319 def test_project_admin_users_role_authorization(
320 self, logger, user_roles, rw_user_proxy, session_class,
321 user_domain, confd_host, rw_conman_proxy, project_keyed_xpath,
322 rw_project_proxy, rw_rbac_int_proxy, tbac):
323 """Verify project admin & oper role operations on a single project."""
324 logger.debug(
325 "Create a project & 8 users each with its own project/mano role")
326 rift.auto.mano.create_project(rw_conman_proxy, 'project-vzw')
327 project_user_data = {}
328 for idx, role in enumerate(user_roles, 1):
329 user_name = 'project_vzw_user-{}'.format(idx)
330 if not tbac:
331 password = rift.auto.mano.generate_password()
332 else:
333 password = user_name
334 rift.auto.mano.create_user(
335 rw_user_proxy, user_name, password, user_domain)
336 rift.auto.mano.assign_project_role_to_user(
337 rw_project_proxy, role, user_name, 'project-vzw',
338 user_domain, rw_rbac_int_proxy)
339 project_user_data[user_name] = {"role": role, "password": password}
340 if "project-admin" in role:
341 project_admin_user = user_name
342
343 logger.debug("Project admin deleting roles from users.")
344 project_admin_session = rift.auto.mano.get_session(
345 session_class, confd_host, project_admin_user,
346 project_user_data[project_admin_user]["password"])
347 project_admin_proxy = project_admin_session.proxy(RwProjectYang)
348 for user in project_user_data:
349 role = project_user_data[user]["role"]
350 if project_admin_user == user:
351 continue
352 rift.auto.mano.revoke_project_role_from_user(
353 project_admin_proxy, role, user, 'project-vzw', user_domain)
354 rift.auto.mano.close_session(project_admin_session)
355
356 logger.debug("Verify project admin can assign another role to users")
357 project_admin_session = rift.auto.mano.get_session(
358 session_class, confd_host, project_admin_user,
359 project_user_data[project_admin_user]["password"])
360 project_admin_proxy = project_admin_session.proxy(RwProjectYang)
361 for user in project_user_data:
362 role = 'rw-project:project-oper'
363 if project_admin_user == user:
364 continue
365 rift.auto.mano.assign_project_role_to_user(
366 project_admin_proxy, role, user, 'project-vzw',
367 user_domain, rw_rbac_int_proxy)
368 rift.auto.mano.close_session(project_admin_session)
369
370 # Verify the user able to read project
371 for user in project_user_data:
372 user_session = rift.auto.mano.get_session(
373 session_class, confd_host, user,
374 project_user_data[user]["password"])
375 user_project_pxy = user_session.proxy(RwProjectYang)
376 logger.debug("verifying user able to read project")
377 xpath = "/project[name={project}]/project-config"
378 ret_val = user_project_pxy.get_config(
379 xpath.format(project=quoted_key('project-vzw')))
380 assert ret_val
381 rift.auto.mano.close_session(user_session)
382
383 logger.debug("Verify if project admin can replace roles for users")
384 project_admin_session = rift.auto.mano.get_session(
385 session_class, confd_host, project_admin_user,
386 project_user_data[project_admin_user]["password"])
387 project_admin_proxy = project_admin_session.proxy(RwProjectYang)
388 for user in project_user_data:
389 if project_admin_user != user:
390 xpath = (
391 "/project[name={project}]/project-config/user" +
392 "[user-name={user}][user-domain={domain}]")
393 new_role = (
394 RwProjectYang.
395 YangData_RwProject_Project_ProjectConfig_User_Role.
396 from_dict({
397 'role': 'rw-project-mano:account-admin'})
398 )
399 project_admin_proxy.replace_config(
400 xpath.format(
401 project=quoted_key('project-vzw'),
402 user=quoted_key(user),
403 domain=quoted_key(user_domain)), new_role)
404 ret_val = project_admin_proxy.get_config(
405 xpath.format(
406 project=quoted_key('project-vzw'),
407 user=quoted_key(user),
408 domain=quoted_key(user_domain),
409 role=quoted_key('rw-project-mano:lcm-oper')))
410 assert ret_val
411 rift.auto.mano.close_session(project_admin_session)
412
413 logger.debug("Verify if users able to change its own user details")
414 for user in project_user_data:
415 if tbac:
416 break
417 password = project_user_data[user]["password"]
418 user_session = rift.auto.mano.get_session(
419 session_class, confd_host, user, password)
420 user_proxy = user_session.proxy(RwUserYang)
421 rift.auto.mano.update_password(
422 user_proxy, user, user, user_domain, rw_rbac_int_proxy)
423 project_user_data[user]["new_password"] = user
424 rift.auto.mano.close_session(user_session)
425
426 logger.debug(
427 "{} trying to connect ".format(user) +
428 "with its old password {}".format(password)
429 )
430
431 message = ('{} not supposed to '.format(user) +
432 'log-in with old passwd {}'.format(password))
433 with pytest.raises(Exception, message=message):
434 rift.auto.mano.get_session(
435 session_class, confd_host, user,
436 password, timeout=SESSION_CONNECT_TIMEOUT)
437
438 # Verify the user should be able to log-in with new password
439 logger.debug(
440 "User {} trying to log-in with its updated password {}".format(
441 user, project_user_data[user]["new_password"]))
442
443 usession_updated_passwd = rift.auto.mano.get_session(
444 session_class, confd_host, user,
445 project_user_data[user]["new_password"])
446
447 # project admin able to delete users from the project database
448 if tbac:
449 password = project_user_data[project_admin_user]["password"]
450 else:
451 password = project_user_data[project_admin_user]["new_password"]
452 project_admin_session = rift.auto.mano.get_session(
453 session_class, confd_host, project_admin_user, password)
454 project_admin_proxy = project_admin_session.proxy(RwProjectYang)
455
456 for user in project_user_data:
457 if user == project_admin_user:
458 continue
459 logger.debug('deleting user {} from project project-vzw'.format(user))
460 self.delete_user_from_project(
461 project_admin_proxy, user, 'project-vzw', user_domain)
462 rift.auto.mano.close_session(project_admin_session)
463
464 @pytest.mark.skipif(
465 not pytest.config.getoption("--project-creation-test"),
466 reason="need --project-creation-test option to run")
467 def test_multi_project_multi_users_role_authorization(
468 self, logger, user_roles, rw_user_proxy, session_class,
469 user_domain, confd_host, rw_conman_proxy, project_keyed_xpath,
470 rw_project_proxy, rw_rbac_int_proxy, tbac, rbac_user_passwd):
471 """Verify that users with roles doesn't have unauthorized access."""
472 """
473 Case 01. rbac_user2 has different roles in project1 and project2.
474 Case 02. rbac_user4 has project-admin in project3 and project4.
475 Case 03. rbac_user9 has project-oper in project5 and project6.
476 """
477
478 # The sample user data
479 role1 = 'rw-project:project-admin'
480 role2 = 'rw-project:project-oper'
481 project_user_data = {
482 "project1": {
483 "rbac_user1": role1,
484 "rbac_user2": role2,
485 },
486 "project2": {
487 "rbac_user2": role1,
488 "rbac_user3": role2,
489 },
490 "project3": {
491 "rbac_user4": role1,
492 "rbac_user5": role2,
493
494 },
495 "project4": {
496 "rbac_user4": role1,
497 "rbac_user6": role2,
498 },
499 "project5": {
500 "rbac_user7": role1,
501 "rbac_user9": role2,
502 },
503 "project6": {
504 "rbac_user8": role1,
505 "rbac_user9": role2,
506 }
507 }
508 # Create projects
509 for idx in range(1, 7):
510 rift.auto.mano.create_project(
511 rw_conman_proxy, 'project{}'.format(idx))
512 # Create users
513 for idx in range(1, 10):
514 rift.auto.mano.create_user(
515 rw_user_proxy, 'rbac_user{}'.format(idx),
516 rbac_user_passwd, user_domain)
517 # Assign roles to users according to the project_user_data
518 for idx in range(1, 7):
519 project = 'project{}'.format(idx)
520 for user_name, role in project_user_data[project].items():
521 rift.auto.mano.assign_project_role_to_user(
522 rw_project_proxy, role, user_name, project,
523 user_domain, rw_rbac_int_proxy)
524
525 def project_access(
526 user_name, target_project, session_class,
527 confd_host, logger):
528 """Verify if user has access to target project."""
529 password = rbac_user_passwd
530 if tbac:
531 password = user_name
532 user_session = rift.auto.mano.get_session(
533 session_class, confd_host, user_name, password)
534 logger.debug("{} trying to access {}".format(
535 user_name, target_project) +
536 "/project-state"
537 )
538 pxy = user_session.proxy(RwProjectYang)
539 # Verify is user has access to /project
540 project_xpath = '/project[name={}]/project-state'.format(
541 quoted_key(target_project)
542 )
543 response = pxy.get_config(project_xpath, list_obj=True)
544 assert response
545 # Verify is user has access to /project/project-config/user
546 project_user_xpath = (
547 "/project[name={project}]/project-config/" +
548 "user[user-name={user}][user-domain={domain}]"
549 )
550 target_user = list(project_user_data[target_project].keys())[0]
551 pxy = user_session.proxy(RwProjectYang)
552 response = pxy.get_config(
553 project_user_xpath.format(
554 project=quoted_key(target_project),
555 user=quoted_key(target_user),
556 domain=quoted_key(user_domain)
557 )
558 )
559 assert response
560 rift.auto.mano.close_session(user_session)
561
562 # Case 01. rbac_user2 has different roles in project1 and project2.
563
564 logger.debug('Veryfy rbac_user1 of project1 has no access to project2')
565 with pytest.raises(
566 Exception,
567 message="rbac_user1 accessed project2 which its not part of."):
568 project_access(
569 'rbac_user1', 'project2', session_class, confd_host, logger)
570
571 logger.debug('Verify rbac_user2 has access to project1 and project2')
572 project_access(
573 'rbac_user2', 'project1', session_class, confd_host, logger)
574 project_access(
575 'rbac_user2', 'project2', session_class, confd_host, logger)
576
577 # Case 02. rbac_user4 has project-admin in project3 and project4.
578
579 logger.debug('Verify rbac_user4 has access to project 3 & project4')
580 project_access(
581 'rbac_user4', 'project4', session_class, confd_host, logger)
582 project_access(
583 'rbac_user4', 'project3', session_class, confd_host, logger)
584
585 logger.debug('Two users in project3 exchanges roles & check access')
586 rift.auto.mano.revoke_project_role_from_user(
587 rw_project_proxy, role1, 'rbac_user4',
588 'project3', user_domain)
589 rift.auto.mano.revoke_project_role_from_user(
590 rw_project_proxy, role2, 'rbac_user5',
591 'project3', user_domain)
592 rift.auto.mano.assign_project_role_to_user(
593 rw_project_proxy, role2, 'rbac_user4',
594 'project3', user_domain, rw_rbac_int_proxy)
595 rift.auto.mano.assign_project_role_to_user(
596 rw_project_proxy, role1, 'rbac_user5',
597 'project3', user_domain, rw_rbac_int_proxy)
598
599 logger.debug('rbac_user5 trying its access on project3 and project4')
600 project_access(
601 'rbac_user5', 'project3', session_class,
602 confd_host, logger
603 )
604 with pytest.raises(
605 Exception,
606 message="rbac_user5 accessed project4 which its not part of."):
607 project_access(
608 'rbac_user5', 'project4', session_class,
609 confd_host, logger
610 )
611
612 # 'rbac_user5'(admin role) revoking the role from rbac-user4
613 password = rbac_user_passwd
614 if tbac:
615 password = 'rbac_user5'
616 rbac_user2_session = rift.auto.mano.get_session(
617 session_class, confd_host, 'rbac_user5', password)
618 rbac_user2_prjt_pxy = rbac_user2_session.proxy(RwProjectYang)
619 self.delete_user_from_project(
620 rbac_user2_prjt_pxy, 'rbac_user4', 'project3', user_domain)
621
622 # Create new user 'del-user'
623 rift.auto.mano.create_user(
624 rw_user_proxy, 'del-user', rbac_user_passwd, user_domain)
625 rift.auto.mano.assign_project_role_to_user(
626 rw_project_proxy, role2, 'del-user', 'project3',
627 user_domain, rw_rbac_int_proxy)
628 # Delete 'del-user' with 'rbac_user5'(admin role)
629 self.delete_user_from_project(
630 rbac_user2_prjt_pxy, 'del-user', 'project3', user_domain)
631
632 logger.debug(
633 'rbac_user4 try to access project3 which its not a part of anymore'
634 )
635 with pytest.raises(
636 Exception,
637 message="rbac_user4 accessed project3 which its not part of."):
638 project_access(
639 'rbac_user4', 'project3', session_class,
640 confd_host, logger)
641
642 logger.debug('rbac_user4 try to access project4 which its a part of.')
643 project_access(
644 'rbac_user4', 'project4', session_class,
645 confd_host, logger)
646
647 # Case 03. rbac_user9 has project-oper in project5 and project6.
648
649 logger.debug('rbac_user9 try to access project5 & project6')
650 project_access(
651 'rbac_user9', 'project5', session_class,
652 confd_host, logger)
653 project_access(
654 'rbac_user9', 'project6', session_class,
655 confd_host, logger)
656
657 logger.debug(
658 'rbac_user8 try to access to project5 which its not part of.'
659 )
660 with pytest.raises(
661 Exception,
662 message="rbac_user8 accessed project5 which its not part of."):
663 project_access(
664 'rbac_user8', 'project5', session_class,
665 confd_host, logger)
666
667 logger.debug(
668 'rbac_user7 try to access to project6 which its not part of.'
669 )
670 with pytest.raises(
671 Exception,
672 message="rbac_user7 accessed project6 which its not part of."):
673 project_access(
674 'rbac_user7', 'project6', session_class,
675 confd_host, logger)
676
677
678 @pytest.mark.skipif(not pytest.config.getoption("--user-creation-test"), reason="need --user-creation-test option to run")
679 def test_user_create_delete_authorization(self, logger, users_test_data, session_class, confd_host, rw_user_proxy,
680 rbac_user_passwd, user_domain, tbac, rw_rbac_int_proxy):
681 """Verifies only users with certain roles can create/delete users and set the password of an user"""
682 write_users, read_users, fail_users = users_test_data
683
684 # Create a dummy user with admin/admin
685 dummy_user_name = 'dummy-user'
686 rift.auto.mano.create_user(rw_user_proxy, dummy_user_name, rbac_user_passwd, user_domain)
687
688 # Check users in write_users dict able to create/delete an user and able to set password for others
689 logger.debug('Verifying users which are authorised to create/delete an user')
690 for user in write_users:
691 logger.debug('Verifying user:(role,password) {}:{}'.format(user, write_users[user]))
692 user_session = rift.auto.mano.get_session(session_class, confd_host, user, write_users[user][1])
693 pxy = user_session.proxy(RwUserYang)
694
695 user_name = 'new-user-{}'.format(user)
696 logger.debug('Trying to create user {}'.format(user_name))
697 rift.auto.mano.create_user(pxy, user_name, rbac_user_passwd, user_domain)
698
699 logger.debug('Trying to delete user {}'.format(user_name))
700 rift.auto.mano.delete_user(pxy, user_name, user_domain)
701
702 if not tbac: # password update is not allowed for external users in tbac
703 new_passwd = rift.auto.mano.generate_password()
704 # Check users in write_users dict able to set password for other user (dummy-user)
705 logger.debug('User {} trying to update password for user {}'.format(user, dummy_user_name))
706 rift.auto.mano.update_password(pxy, dummy_user_name, new_passwd, user_domain, rw_rbac_int_proxy)
707
708 # Verify dummy_user_name able to log-in with its new password
709 logger.debug('User {} trying to log-in with its updated password {}'.format(dummy_user_name, new_passwd))
710 dummy_user_session_updated_passwd = rift.auto.mano.get_session(session_class, confd_host, dummy_user_name,
711 new_passwd)
712
713 # Verify the user not able to log-in with old password
714 with pytest.raises(Exception, message='User {} not supposed to log-in with its old password {}'.format(
715 dummy_user_name, rbac_user_passwd)) as excinfo:
716 logger.debug('User {} trying to connect with its old password {}'.format(user, rbac_user_passwd))
717 rift.auto.mano.get_session(session_class, confd_host, dummy_user_name, rbac_user_passwd,
718 timeout=SESSION_CONNECT_TIMEOUT)
719
720 rift.auto.mano.close_session(dummy_user_session_updated_passwd)
721 rift.auto.mano.close_session(user_session)
722
723 # Check users in read_users dict able to read user list (path: /user-config)
724 logger.debug('Verifying users which are authorised to read user list')
725 for user in read_users:
726 logger.debug('Verifying user:(role,password) {}:{}'.format(user, read_users[user]))
727 user_session = rift.auto.mano.get_session(session_class, confd_host, user, read_users[user][1])
728 pxy = user_session.proxy(RwUserYang)
729 logger.debug('User {} trying to access /user-config xpath'.format(user))
730 user_config = pxy.get_config('/user-config')
731 assert [user.user_name for user in user_config.user]
732
733 rift.auto.mano.close_session(user_session)
734
735 # Check users in fail_users dict not able to create/delete an user
736 logger.debug('Verifying users which are not supposed to create/delete an user')
737 for user in fail_users:
738 logger.debug('Verifying user:(role,password) {}:{}'.format(user, fail_users[user]))
739 user_session = rift.auto.mano.get_session(session_class, confd_host, user, fail_users[user][1])
740 pxy = user_session.proxy(RwUserYang)
741
742 user_name = 'new-user-{}'.format(user)
743
744 with pytest.raises(Exception, message='User {} not authorised to create user {}'.format(
745 user, user_name)) as excinfo:
746 logger.debug('User {} trying to create an user {}'.format(user, user_name))
747 rift.auto.mano.create_user(pxy, user_name, rbac_user_passwd, user_domain)
748
749 with pytest.raises(Exception, message='User {} not authorised to delete user {}'.format(
750 user, dummy_user_name)) as excinfo:
751 logger.debug('User {} trying to delete user {}'.format(user, dummy_user_name))
752 rift.auto.mano.delete_user(pxy, dummy_user_name, user_domain)
753
754 rift.auto.mano.close_session(user_session)
755
756 if not tbac: # password update is not allowed for external users in tbac
757 # Check all users able to set their own password
758 logger.debug('Verifying an user able to set its own password')
759 for user, role_passwd_tuple in dict(write_users, **dict(read_users, **fail_users)).items():
760 logger.debug('Verifying user:(role,password) {}:{}'.format(user, role_passwd_tuple))
761 user_session = rift.auto.mano.get_session(session_class, confd_host, user, role_passwd_tuple[1])
762 pxy = user_session.proxy(RwUserYang)
763
764 new_passwd = rift.auto.mano.generate_password()
765 logger.debug('User {} trying to update its password to {}'.format(user, new_passwd))
766 rift.auto.mano.update_password(pxy, user, new_passwd, user_domain, rw_rbac_int_proxy)
767
768 # Verify the user should be able to log-in with new password
769 logger.debug('User {} trying to log-in with its updated password {}'.format(user, new_passwd))
770 user_session_updated_passwd = rift.auto.mano.get_session(session_class, confd_host, user, new_passwd)
771
772 # Verify the user not able to log-in with old password
773 with pytest.raises(Exception, message='User {} not supposed to log-in with its old password {}'.format(
774 user, role_passwd_tuple[1])) as excinfo:
775 logger.debug('User {} trying to connect with its old password {}'.format(user, role_passwd_tuple[1]))
776 rift.auto.mano.get_session(session_class, confd_host, user, rbac_user_passwd, timeout=SESSION_CONNECT_TIMEOUT)
777
778 rift.auto.mano.close_session(user_session)
779 rift.auto.mano.close_session(user_session_updated_passwd)
780
781
782 @pytest.mark.skipif(not pytest.config.getoption("--account-test"), reason="need --account-test option to run")
783 def test_account_create_delete_authorization(self, users_test_data, mgmt_session, logger, cloud_module, fmt_cloud_xpath,
784 fmt_prefixed_cloud_xpath, project_acessible, cloud_account, session_class, confd_host):
785 """Verifies only users with certain roles can create/read/delete cloud, sdn accounts"""
786 write_users, read_users, fail_users = users_test_data
787 xpath_no_pfx = fmt_cloud_xpath.format(project=quoted_key(project_acessible), account_name=quoted_key(cloud_account.name))
788 xpath = fmt_prefixed_cloud_xpath.format(project=quoted_key(project_acessible), account_name=quoted_key(cloud_account.name))
789
790 # Check users in write_users dict able to create/delete cloud accounts
791 logger.debug('Verifying users which are authorised to create/delete cloud accounts')
792 for user in write_users:
793 logger.debug('Verifying user:(role,password) {}:{}'.format(user, write_users[user]))
794 user_session = rift.auto.mano.get_session(session_class, confd_host, user, write_users[user][1])
795 cloud_pxy = user_session.proxy(cloud_module)
796
797 logger.debug('Trying to create a cloud account')
798 cloud_pxy.replace_config(xpath, cloud_account)
799 response = cloud_pxy.get(xpath_no_pfx)
800 assert response.name == cloud_account.name
801 assert response.account_type == cloud_account.account_type
802
803 logger.debug('Trying to delete the cloud account')
804 cloud_pxy.delete_config(xpath)
805 assert cloud_pxy.get(xpath_no_pfx) is None
806
807 rift.auto.mano.close_session(user_session)
808
809 # admin user creating a cloud account which read_users will be trying to read
810 logger.debug('admin user creating cloud account {}'.format(cloud_account.name))
811 admin_cloud_proxy = mgmt_session.proxy(cloud_module)
812 admin_cloud_proxy.replace_config(xpath, cloud_account)
813 assert admin_cloud_proxy.get(xpath_no_pfx).name == cloud_account.name
814
815 # Check users in read_users dict able to read cloud accounts
816 logger.debug('Verifying users which are authorised to read cloud accounts')
817 for user in read_users:
818 logger.debug('Verifying user:(role,password) {}:{}'.format(user, read_users[user]))
819 user_session = rift.auto.mano.get_session(session_class, confd_host, user, read_users[user][1])
820 cloud_pxy = user_session.proxy(cloud_module)
821
822 response = cloud_pxy.get(xpath_no_pfx)
823 assert response.name == cloud_account.name
824 assert response.account_type == cloud_account.account_type
825
826 rift.auto.mano.close_session(user_session)
827
828 # Check users in fail_users dict not able to delete/read cloud accounts
829 logger.debug('Verifying users which are not authorised to read/delete cloud accounts')
830 for user in fail_users:
831 logger.debug('Verifying user:(role,password) {}:{}'.format(user, fail_users[user]))
832 user_session = rift.auto.mano.get_session(session_class, confd_host, user, fail_users[user][1])
833 cloud_pxy = user_session.proxy(cloud_module)
834
835 with pytest.raises(Exception, message='User {} not authorised to delete cloud account {}'.format(
836 user, cloud_account.name)) as excinfo:
837 logger.debug('User {} trying to delete cloud account {}'.format(user, cloud_account.name))
838 cloud_pxy.delete_config(xpath)
839
840 # logger.debug('User {} trying to access cloud account {}'.format(user, cloud_account.name))
841 # assert cloud_pxy.get(xpath_no_pfx) is None
842 rift.auto.mano.close_session(user_session)
843
844 # admin user deleting the cloud account
845 logger.debug('admin user deleting cloud account {}'.format(cloud_account.name))
846 admin_cloud_proxy.delete_config(xpath)
847 assert admin_cloud_proxy.get(xpath_no_pfx) is None
848
849 # Check users in fail_users dict not able to create cloud accounts
850 logger.debug('Verifying users which are not authorised to create cloud accounts')
851 for user in fail_users:
852 logger.debug('Verifying user:(role,password) {}:{}'.format(user, fail_users[user]))
853 user_session = rift.auto.mano.get_session(session_class, confd_host, user, fail_users[user][1])
854 cloud_pxy = user_session.proxy(cloud_module)
855
856 with pytest.raises(Exception, message='User {} not authorised to create cloud account {}'.format(
857 user, cloud_account.name)) as excinfo:
858 logger.debug('User {} trying to create a cloud account {}'.format(user, cloud_account.name))
859 cloud_pxy.replace_config(xpath, cloud_account)
860
861 rift.auto.mano.close_session(user_session)
862
863 @staticmethod
864 def delete_descriptors(project, vnfd_proxy, nsd_proxy, vnfd_xpath, nsd_xpath, fmt_vnfd_id_xpath, fmt_nsd_id_xpath):
865 nsds = nsd_proxy.get('{}/nsd'.format(nsd_xpath), list_obj=True)
866 for nsd in nsds.nsd:
867 xpath = fmt_nsd_id_xpath.format(project=quoted_key(project), nsd_id=quoted_key(nsd.id))
868 nsd_proxy.delete_config(xpath)
869 nsds = nsd_proxy.get('{}/nsd'.format(nsd_xpath), list_obj=True)
870 assert nsds is None or len(nsds.nsd) == 0
871
872 vnfds = vnfd_proxy.get('{}/vnfd'.format(vnfd_xpath), list_obj=True)
873 for vnfd_record in vnfds.vnfd:
874 xpath = fmt_vnfd_id_xpath.format(project=quoted_key(project), vnfd_id=quoted_key(vnfd_record.id))
875 vnfd_proxy.delete_config(xpath)
876
877 vnfds = vnfd_proxy.get('{}/vnfd'.format(vnfd_xpath), list_obj=True)
878 assert vnfds is None or len(vnfds.vnfd) == 0
879
880 @pytest.mark.skipif(not pytest.config.getoption("--onboarding-test"), reason="need --onboarding-test option to run")
881 def test_onboarding_authorization(self, users_test_data, logger, descriptors, session_class, confd_host,
882 fmt_vnfd_catalog_xpath, fmt_nsd_catalog_xpath, fmt_nsd_id_xpath, fmt_vnfd_id_xpath, project_acessible, mgmt_session):
883 """Verifies only users with certain roles can onboard/update/delete a package"""
884
885 descriptor_vnfds, descriptor_nsd = descriptors[:-1], descriptors[-1]
886 write_users, read_users, fail_users = users_test_data
887 logger.debug('The descriptrs being used: {}'.format(descriptors))
888 nsd_xpath = fmt_nsd_catalog_xpath.format(project=quoted_key(project_acessible))
889 vnfd_xpath = fmt_vnfd_catalog_xpath.format(project=quoted_key(project_acessible))
890
891 def onboard(user_session, project):
892 for descriptor in descriptors:
893 rift.auto.descriptor.onboard(user_session, descriptor, project=project)
894
895 def verify_descriptors(vnfd_pxy, nsd_pxy, vnfd_count, nsd_count):
896 catalog = vnfd_pxy.get_config(vnfd_xpath)
897 actual_vnfds = catalog.vnfd
898 assert len(actual_vnfds) == vnfd_count, 'There should be {} vnfds'.format(vnfd_count)
899 catalog = nsd_pxy.get_config(nsd_xpath)
900 actual_nsds = catalog.nsd
901 assert len(actual_nsds) == nsd_count, 'There should be {} nsd'.format(nsd_count)
902
903 # Check users in write_users dict able to onboard/delete descriptors
904 logger.debug('Verifying users which are authorised to onboard/delete descriptors')
905 for user in write_users:
906 logger.debug('Verifying user:(role,password) {}:{}'.format(user, write_users[user]))
907 user_session = rift.auto.mano.get_session(session_class, confd_host, user, write_users[user][1])
908 vnfd_pxy = user_session.proxy(RwProjectVnfdYang)
909 nsd_pxy = user_session.proxy(RwProjectNsdYang)
910 logger.debug('Trying to onboard ping-pong descriptors')
911 onboard(user_session, project_acessible)
912 logger.debug('Verifying if the descriptors are uploaded')
913 verify_descriptors(vnfd_pxy, nsd_pxy, len(descriptor_vnfds), 1)
914
915 logger.debug('Trying to delete descriptors')
916 TestRbacVerification.delete_descriptors(project_acessible, vnfd_pxy, nsd_pxy, vnfd_xpath, nsd_xpath,
917 fmt_vnfd_id_xpath, fmt_nsd_id_xpath)
918
919 rift.auto.mano.close_session(user_session)
920
921 # onboard the descriptors using mgmt_session which read_users will try to read
922 logger.debug('admin user uploading the descriptors which read_users will try to read')
923 onboard(mgmt_session, project_acessible)
924 admin_vnfd_pxy = mgmt_session.proxy(RwProjectVnfdYang)
925 admin_nsd_pxy = mgmt_session.proxy(RwProjectNsdYang)
926 logger.debug('Verifying if the descriptors are uploaded')
927 verify_descriptors(admin_vnfd_pxy, admin_nsd_pxy, len(descriptor_vnfds), 1)
928
929 # Check users in read_users dict able to read already onboarded descriptors
930 logger.debug('Verifying users which are authorised to read descriptors')
931 for user in read_users:
932 logger.debug('Verifying user:(role,password) {}:{}'.format(user, read_users[user]))
933 user_session = rift.auto.mano.get_session(session_class, confd_host, user, read_users[user][1])
934 vnfd_pxy = user_session.proxy(RwProjectVnfdYang)
935 nsd_pxy = user_session.proxy(RwProjectNsdYang)
936
937 logger.debug('Trying to read ping-pong descriptors')
938 verify_descriptors(vnfd_pxy, nsd_pxy, len(descriptor_vnfds), 1)
939
940 rift.auto.mano.close_session(user_session)
941
942 # Check users in fail_users dict not able to onboard/delete descriptors
943 logger.debug('Verifying users which are not supposed to delete descriptors')
944 for user in fail_users:
945 logger.debug('Verifying user:(role,password) {}:{}'.format(user, fail_users[user]))
946 user_session = rift.auto.mano.get_session(session_class, confd_host, user, fail_users[user][1])
947 vnfd_pxy = user_session.proxy(RwProjectVnfdYang)
948 nsd_pxy = user_session.proxy(RwProjectNsdYang)
949
950 with pytest.raises(Exception, message='User {} not authorised to delete descriptors'.format(user)) as excinfo:
951 logger.debug('User {} trying to delete descriptors'.format(user))
952 TestRbacVerification.delete_descriptors(project_acessible, vnfd_pxy, nsd_pxy, vnfd_xpath, nsd_xpath,
953 fmt_vnfd_id_xpath, fmt_nsd_id_xpath)
954
955 rift.auto.mano.close_session(user_session)
956
957 logger.debug('Deleting the descriptors as fail_users trying to upload the descriptors')
958 TestRbacVerification.delete_descriptors(project_acessible, admin_vnfd_pxy, admin_nsd_pxy, vnfd_xpath, nsd_xpath,
959 fmt_vnfd_id_xpath, fmt_nsd_id_xpath)
960
961 logger.debug('Verifying users which are not supposed to create descriptors')
962 for user in fail_users:
963 logger.debug('Verifying user:(role,password) {}:{}'.format(user, fail_users[user]))
964 user_session = rift.auto.mano.get_session(session_class, confd_host, user, fail_users[user][1])
965 vnfd_pxy = user_session.proxy(RwProjectVnfdYang)
966 nsd_pxy = user_session.proxy(RwProjectNsdYang)
967
968 with pytest.raises(Exception, message='User {} not authorised to onboard descriptors'.format(user)) as excinfo:
969 logger.debug('User {} trying to onboard ping-pong descriptors'.format(user))
970 onboard(user_session)
971
972 rift.auto.mano.close_session(user_session)
973
974 @pytest.mark.skipif(not pytest.config.getoption("--nsr-test"),
975 reason="need --nsr-test option to run")
976 def test_nsr_authorization(self, users_test_data, logger, cloud_account,
977 cloud_module, descriptors, session_class,
978 confd_host, fmt_cloud_xpath,
979 fmt_prefixed_cloud_xpath, mgmt_session, fmt_nsd_id_xpath, fmt_vnfd_id_xpath,
980 project_acessible, fmt_nsd_catalog_xpath, fmt_vnfd_catalog_xpath):
981 """Verifies only users with certain roles can
982 create/read/delete nsr/vlr/vnfr
983 """
984
985 descriptor_vnfds, descriptor_nsd = descriptors[:-1], descriptors[-1]
986 write_users, read_users, fail_users = users_test_data
987
988 # Cloud account creation
989 logger.debug('Creating a cloud account which will be used for NS instantiation')
990 cloud_pxy = mgmt_session.proxy(cloud_module)
991 cloud_pxy.replace_config(fmt_prefixed_cloud_xpath.format(project=quoted_key(project_acessible),
992 account_name=quoted_key(cloud_account.name)),
993 cloud_account)
994 response = cloud_pxy.get(
995 fmt_cloud_xpath.format(project=quoted_key(project_acessible), account_name=quoted_key(cloud_account.name)))
996 assert response.name == cloud_account.name
997
998 cloud_pxy.wait_for(fmt_cloud_xpath.format(project=quoted_key(project_acessible), account_name=quoted_key(
999 cloud_account.name)) + '/connection-status/status', 'success', timeout=30, fail_on=['failure'])
1000
1001 # Upload the descriptors
1002 nsd_xpath = fmt_nsd_catalog_xpath.format(project=quoted_key(project_acessible))
1003 vnfd_xpath = fmt_vnfd_catalog_xpath.format(project=quoted_key(project_acessible))
1004 logger.debug('Uploading descriptors {} which will be used for NS instantiation'.format(descriptors))
1005 for descriptor in descriptors:
1006 rift.auto.descriptor.onboard(mgmt_session, descriptor, project=project_acessible)
1007 admin_nsd_pxy = mgmt_session.proxy(RwProjectNsdYang)
1008 nsd_catalog = admin_nsd_pxy.get_config(nsd_xpath)
1009 assert nsd_catalog
1010 nsd = nsd_catalog.nsd[0]
1011 nsr = rift.auto.descriptor.create_nsr(cloud_account.name, nsd.name, nsd)
1012
1013 # Check users in write_users dict able to instantiate/delete a NS
1014 logger.debug('Verifying users which are authorised to instantiate/delete a NS')
1015 for user in write_users:
1016 logger.debug('Verifying user:(role,password) {}:{}'.format(user, write_users[user]))
1017 user_session = rift.auto.mano.get_session(session_class, confd_host, user, write_users[user][1])
1018 rwnsr_pxy = user_session.proxy(RwNsrYang)
1019 rwvnfr_pxy = user_session.proxy(RwVnfrYang)
1020 rwvlr_pxy = user_session.proxy(RwVlrYang)
1021
1022 logger.info("Trying to instantiate the Network Service")
1023 rift.auto.descriptor.instantiate_nsr(nsr, rwnsr_pxy, logger,
1024 project=project_acessible)
1025
1026 logger.info("Trying to terminate the Network Service")
1027 rift.auto.descriptor.terminate_nsr(rwvnfr_pxy, rwnsr_pxy,
1028 rwvlr_pxy, logger,
1029 project_acessible)
1030
1031 # Instantiate a NS which the read_users, fail_users will try to
1032 # read/delete.
1033 admin_rwnsr_pxy = mgmt_session.proxy(RwNsrYang)
1034 admin_rwvnfr_pxy = mgmt_session.proxy(RwVnfrYang)
1035 admin_rwvlr_pxy = mgmt_session.proxy(RwVlrYang)
1036 logger.debug('admin user instantiating NS which the read_users, fail_users will try to read/delete.')
1037 rift.auto.descriptor.instantiate_nsr(nsr, admin_rwnsr_pxy, logger, project=project_acessible)
1038
1039 # Check users in read_users, write_users dict able to read vnfr-console, vnfr-catalog, ns-instance-opdata
1040 p_xpath = '/project[name={}]'.format(quoted_key(project_acessible))
1041 read_xpaths = ['/ns-instance-opdata', '/vnfr-catalog', '/vnfr-console']
1042 logger.debug('Verifying users which are authorised to read vnfr-catalog, ns-instance-opdata, vnfr-console etc')
1043 for user, role_passwd_tuple in dict(write_users, **read_users).items():
1044 logger.debug('Verifying user:(role,password) {}:{}'.format(user, role_passwd_tuple))
1045 user_session = rift.auto.mano.get_session(session_class, confd_host, user, role_passwd_tuple[1])
1046 rwnsr_pxy = user_session.proxy(RwNsrYang)
1047 rwvnfr_pxy = user_session.proxy(RwVnfrYang)
1048 for xpath in read_xpaths:
1049 logger.debug('Trying to read xpath: {}'.format(p_xpath+xpath))
1050 proxy_ = rwvnfr_pxy if 'vnfr' in xpath else rwnsr_pxy
1051 assert proxy_.get(p_xpath+xpath)
1052
1053 rift.auto.mano.close_session(user_session)
1054
1055 # Check users in fail_users dict not able to terminate a NS
1056 logger.debug('Verifying users which are NOT authorised to terminate a NS')
1057 for user in fail_users:
1058 logger.debug('Verifying user:(role,password) {}:{}'.format(user, fail_users[user]))
1059 user_session = rift.auto.mano.get_session(session_class, confd_host, user, fail_users[user][1])
1060 rwnsr_pxy = user_session.proxy(RwNsrYang)
1061 rwvnfr_pxy = user_session.proxy(RwVnfrYang)
1062
1063 with pytest.raises(Exception, message='User {} not authorised to terminate NS'.format(user)) as excinfo:
1064 logger.debug('User {} trying to delete NS'.format(user))
1065 rift.auto.descriptor.terminate_nsr(rwvnfr_pxy, rwnsr_pxy,
1066 logger, admin_rwvlr_pxy,
1067 project=project_acessible)
1068 rift.auto.mano.close_session(user_session)
1069
1070 # Terminate the NS instantiated by admin user
1071 logger.debug('admin user terminating the NS')
1072 rift.auto.descriptor.terminate_nsr(admin_rwvnfr_pxy,
1073 admin_rwnsr_pxy,
1074 admin_rwvlr_pxy, logger,
1075 project=project_acessible)
1076
1077 # Check users in fail_users dict not able to instantiate a NS
1078 nsr.id = str(uuid.uuid4())
1079 logger.debug('Verifying users which are NOT authorised to instantiate a NS')
1080 for user in fail_users:
1081 logger.debug('Verifying user:(role,password) {}:{}'.format(user, fail_users[user]))
1082 user_session = rift.auto.mano.get_session(session_class, confd_host, user, fail_users[user][1])
1083 rwnsr_pxy = user_session.proxy(RwNsrYang)
1084 rwvnfr_pxy = user_session.proxy(RwVnfrYang)
1085
1086 with pytest.raises(Exception, message='User {} not authorised to instantiate NS'.format(user)) as excinfo:
1087 logger.debug('User {} trying to instantiate NS'.format(user))
1088 rift.auto.descriptor.instantiate_nsr(nsr, rwnsr_pxy, logger, project=project_acessible)
1089 rift.auto.mano.close_session(user_session)
1090
1091 # delete cloud accounts and descriptors; else deleting project in teardown fails
1092 cloud_pxy.delete_config(fmt_prefixed_cloud_xpath.format(project=quoted_key(project_acessible),
1093 account_name=quoted_key(cloud_account.name)))
1094 admin_vnfd_pxy = mgmt_session.proxy(RwProjectVnfdYang)
1095 TestRbacVerification.delete_descriptors(project_acessible, admin_vnfd_pxy, admin_nsd_pxy, vnfd_xpath, nsd_xpath,
1096 fmt_vnfd_id_xpath, fmt_nsd_id_xpath)
1097
1098 @pytest.mark.skipif(not pytest.config.getoption("--syslog-server-test"), reason="need --syslog-server-test option to run")
1099 def test_set_syslog_server_authorization(self, mgmt_session, users_test_data, session_class, confd_host, logger):
1100 """Verifies only users with certain roles can set syslog server"""
1101 write_users, read_users, fail_users = users_test_data
1102 admin_log_mgmt_pxy = mgmt_session.proxy(RwlogMgmtYang)
1103
1104 def update_syslog_server_address(user_log_mgmt_pxy):
1105 ip = '127.0.0.{}'.format(random.randint(0,255))
1106 sink_obj = RwlogMgmtYang.Logging_Sink.from_dict({'server_address': ip})
1107
1108 syslog_name = admin_log_mgmt_pxy.get_config('/logging').sink[0].name
1109 logger.debug('updating the syslog {} server_address to {}'.format(syslog_name, ip))
1110 user_log_mgmt_pxy.merge_config('/logging/sink[name={sink_name}]'.format(sink_name=quoted_key(syslog_name)), sink_obj)
1111 assert [sink.server_address for sink in admin_log_mgmt_pxy.get_config('/logging').sink if sink.name == syslog_name][0] == ip
1112
1113 for user, role_passwd_tuple in dict(write_users, **dict(read_users, **fail_users)).items():
1114 logger.debug('Verifying user:(role,password) {}:{}'.format(user, role_passwd_tuple))
1115 user_session = rift.auto.mano.get_session(session_class, confd_host, user, role_passwd_tuple[1])
1116 user_log_mgmt_pxy = user_session.proxy(RwlogMgmtYang)
1117
1118 if user in write_users:
1119 logger.debug('User {} should be able to update the syslog server address'.format(user))
1120 update_syslog_server_address(user_log_mgmt_pxy)
1121
1122 if user in fail_users:
1123 with pytest.raises(Exception, message='User {} not authorised to set syslog server address'.format(user)) as excinfo:
1124 logger.debug('User {} trying to update the syslog server address. It should fail'.format(user))
1125 update_syslog_server_address(user_log_mgmt_pxy)
1126
1127 if user in read_users:
1128 logger.debug('User {} trying to read the syslog server address'.format(user))
1129 logging_obj = user_log_mgmt_pxy.get_config('/logging')
1130 assert logging_obj.sink[0]
1131 assert logging_obj.sink[0].server_address
1132
1133 @pytest.mark.skipif(not pytest.config.getoption("--redundancy-role-test"), reason="need --redundancy-role-test option to run")
1134 def test_redundancy_config_authorization(self, mgmt_session, users_test_data, session_class, confd_host, logger, redundancy_config_test_roles):
1135 """Verifies only users with certain roles can set redundancy-config or read redundancy-state"""
1136 write_users, read_users, fail_users = users_test_data
1137 admin_redundancy_pxy = mgmt_session.proxy(RwRedundancyYang)
1138 site_nm_pfx = 'ha_site_'
1139
1140 def create_redundancy_site(user_redundancy_pxy, site_nm):
1141 site_id = '127.0.0.1'
1142 site_obj = RwRedundancyYang.YangData_RwRedundancy_RedundancyConfig_Site.from_dict({'site_name': site_nm, 'site_id': site_id})
1143
1144 logger.debug('Creating redundancy site {}'.format(site_nm))
1145 user_redundancy_pxy.create_config('/rw-redundancy:redundancy-config/rw-redundancy:site', site_obj)
1146 assert [site.site_name for site in admin_redundancy_pxy.get_config('/redundancy-config/site', list_obj=True).site if site.site_name == site_nm]
1147
1148 def delete_redundancy_site(user_redundancy_pxy, site_nm):
1149 logger.debug('Deleting redundancy site {}'.format(site_nm))
1150 user_redundancy_pxy.delete_config('/rw-redundancy:redundancy-config/rw-redundancy:site[rw-redundancy:site-name={}]'.format(quoted_key(site_nm)))
1151 assert not [site.site_name for site in admin_redundancy_pxy.get_config('/redundancy-config/site', list_obj=True).site if site.site_name == site_nm]
1152
1153 # Create a redundancy site which fail user will try to delete/ read user will try to read
1154 create_redundancy_site(admin_redundancy_pxy, 'test_site')
1155
1156 for user, role_passwd_tuple in dict(write_users, **dict(read_users, **fail_users)).items():
1157 logger.debug('Verifying user:(role,password) {}:{}'.format(user, role_passwd_tuple))
1158 user_session = rift.auto.mano.get_session(session_class, confd_host, user, role_passwd_tuple[1])
1159 user_redundancy_pxy = user_session.proxy(RwRedundancyYang)
1160
1161 if user in write_users:
1162 site_nm = '{}_{}'.format(site_nm_pfx, user)
1163 logger.debug('User {} should be able to create a new redundancy site {}'.format(user, site_nm))
1164 create_redundancy_site(user_redundancy_pxy, site_nm)
1165
1166 logger.debug('User {} should be able to delete a redundancy site {}'.format(user, site_nm))
1167 delete_redundancy_site(user_redundancy_pxy, site_nm)
1168
1169 assert user_redundancy_pxy.get('/redundancy-state')
1170
1171 if user in fail_users:
1172 site_nm = '{}_{}'.format(site_nm_pfx, user)
1173 with pytest.raises(Exception, message='User {} not authorised to create redundancy site'.format(user)) as excinfo:
1174 logger.debug('User {} trying to create redundancy site {}. It should fail'.format(user, site_nm))
1175 create_redundancy_site(user_redundancy_pxy, site_nm)
1176
1177 with pytest.raises(Exception, message='User {} not authorised to delete redundancy site'.format(user)) as excinfo:
1178 logger.debug('User {} trying to delete redundancy site {}. It should fail'.format(user, site_nm))
1179 delete_redundancy_site(user_redundancy_pxy, 'test_site')
1180
1181 if user in read_users:
1182 logger.debug('User {} trying to read redundancy-config'.format(user))
1183 assert user_redundancy_pxy.get('/redundancy-state')
1184 assert user_redundancy_pxy.get('/redundancy-config')
1185
1186
1187 @pytest.mark.depends('test_rbac_roles_setup')
1188 @pytest.mark.teardown('test_rbac_roles_setup')
1189 @pytest.mark.incremental
1190 class TestRbacTeardown(object):
1191 def test_delete_project(self, rw_project_proxy, logger, project_keyed_xpath, project_acessible):
1192 """Deletes projects used for the test"""
1193 if rw_project_proxy.get_config(project_keyed_xpath.format(project_name=quoted_key(project_acessible))+'/project-state', list_obj=True):
1194 logger.debug('Deleting project {}'.format(project_acessible))
1195 rift.auto.mano.delete_project(rw_project_proxy, project_acessible)
1196
1197 def test_delete_users(self, users_test_data, logger, rw_user_proxy, rbac_platform_proxy, platform_config_keyed_xpath,
1198 user_keyed_xpath, user_domain, rw_conman_proxy, project_acessible):
1199 """Deletes the users which are part of rbac test-data and verify their deletion"""
1200 write_users, read_users, fail_users = users_test_data
1201
1202 for user, role_passwd_tuple in dict(write_users, **dict(read_users, **fail_users)).items():
1203 logger.debug('Deleting user:(role,password) {}:{}'.format(user, role_passwd_tuple))
1204 if any('platform' in role for role in role_passwd_tuple[0]):
1205 rbac_platform_proxy.delete_config(platform_config_keyed_xpath.format(user=quoted_key(user), domain=quoted_key(user_domain)))
1206 rw_user_proxy.delete_config(user_keyed_xpath.format(user=quoted_key(user), domain=quoted_key(user_domain)))
1207
1208 # Verify if the user is deleted
1209 user_config = rw_user_proxy.get_config('/user-config')
1210 current_users_list = [user.user_name for user in user_config.user]
1211
1212 assert user not in current_users_list
1213
1214 # Verify only two users should be present now: oper & admin
1215 user_config = rw_user_proxy.get_config('/user-config')
1216 current_users_list = [user.user_name for user in user_config.user]
1217
1218 logger.debug('Current users list after deleting all test users: {}'.format(current_users_list))
1219 expected_empty_user_list = [user for user in users_test_data if user in current_users_list]
1220 assert not expected_empty_user_list