Coverage for osm_nbi/authconn_tacacs.py: 37%

62 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2024-06-27 02:46 +0000

1# -*- coding: utf-8 -*- 

2 

3# Copyright 2020 TATA ELXSI 

4# 

5# Licensed under the Apache License, Version 2.0 (the "License"); you may 

6# not use this file except in compliance with the License. You may obtain 

7# a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

14# License for the specific language governing permissions and limitations 

15# under the License. 

16# 

17# For those usages not covered by the Apache License, Version 2.0 please 

18# contact: saikiran.k@tataelxsi.co.in 

19## 

20 

21 

22""" 

23AuthconnTacacs implements implements the connector for TACACS. 

24Leverages AuthconnInternal for token lifecycle management and the RBAC model. 

25 

26When NBI bootstraps, it tries to create admin user with admin role associated to admin project. 

27Hence, the TACACS server should contain admin user. 

28""" 

29 

30__author__ = "K Sai Kiran <saikiran.k@tataelxsi.co.in>" 

31__date__ = "$11-Nov-2020 11:04:00$" 

32 

33 

34from osm_nbi.authconn import Authconn, AuthException 

35from osm_nbi.authconn_internal import AuthconnInternal 

36from osm_nbi.base_topic import BaseTopic 

37 

38import logging 

39from time import time 

40from http import HTTPStatus 

41 

42# TACACS+ Library 

43from tacacs_plus.client import TACACSClient 

44 

45 

46class AuthconnTacacs(AuthconnInternal): 

47 token_time_window = 2 

48 token_delay = 1 

49 

50 tacacs_def_port = 49 

51 tacacs_def_timeout = 10 

52 users_collection = "users_tacacs" 

53 roles_collection = "roles_tacacs" 

54 projects_collection = "projects_tacacs" 

55 tokens_collection = "tokens_tacacs" 

56 

57 def __init__(self, config, db, role_permissions): 

58 """ 

59 Constructor to initialize db and TACACS server attributes to members. 

60 """ 

61 Authconn.__init__(self, config, db, role_permissions) 

62 self.logger = logging.getLogger("nbi.authenticator.tacacs") 

63 self.db = db 

64 self.tacacs_host = config["tacacs_host"] 

65 self.tacacs_secret = config["tacacs_secret"] 

66 self.tacacs_port = ( 

67 config["tacacs_port"] if config.get("tacacs_port") else self.tacacs_def_port 

68 ) 

69 self.tacacs_timeout = ( 

70 config["tacacs_timeout"] 

71 if config.get("tacacs_timeout") 

72 else self.tacacs_def_timeout 

73 ) 

74 self.tacacs_cli = TACACSClient( 

75 self.tacacs_host, self.tacacs_port, self.tacacs_secret, self.tacacs_timeout 

76 ) 

77 

78 def validate_user(self, user, password): 

79 """""" 

80 now = time() 

81 try: 

82 tacacs_authen = self.tacacs_cli.authenticate(user, password) 

83 except Exception as e: 

84 raise AuthException( 

85 "TACACS server error: {}".format(e), http_code=HTTPStatus.UNAUTHORIZED 

86 ) 

87 user_content = None 

88 user_rows = self.db.get_list( 

89 self.users_collection, {BaseTopic.id_field("users", user): user} 

90 ) 

91 if not tacacs_authen.valid: 

92 if user_rows: 

93 # To remove TACACS stale user from system. 

94 self.delete_user(user_rows[0][BaseTopic.id_field("users", user)]) 

95 return user_content 

96 if user_rows: 

97 user_content = user_rows[0] 

98 else: 

99 new_user = { 

100 "username": user, 

101 "password": password, 

102 "_admin": {"created": now, "modified": now}, 

103 "project_role_mappings": [], 

104 } 

105 user_content = self.create_user(new_user) 

106 return user_content 

107 

108 def create_user(self, user_info): 

109 """ 

110 Validates user credentials in TACACS and add user. 

111 

112 :param user_info: Full user information in dict. 

113 :return: returns username and id if credentails are valid. Otherwise, raise exception 

114 """ 

115 BaseTopic.format_on_new(user_info, make_public=False) 

116 try: 

117 authen = self.tacacs_cli.authenticate( 

118 user_info["username"], user_info["password"] 

119 ) 

120 if authen.valid: 

121 user_info.pop("password") 

122 self.db.create(self.users_collection, user_info) 

123 else: 

124 raise AuthException( 

125 "TACACS server error: Invalid credentials", 

126 http_code=HTTPStatus.FORBIDDEN, 

127 ) 

128 except Exception as e: 

129 raise AuthException( 

130 "TACACS server error: {}".format(e), http_code=HTTPStatus.BAD_REQUEST 

131 ) 

132 return {"username": user_info["username"], "_id": user_info["_id"]} 

133 

134 def update_user(self, user_info): 

135 """ 

136 Updates user information, in particular for add/remove of project and role mappings. 

137 Does not allow change of username or password. 

138 

139 :param user_info: Full user information in dict. 

140 :return: returns None for successful add/remove of project and role map. 

141 """ 

142 if user_info.get("username"): 

143 raise AuthException( 

144 "Can not update username of this user", http_code=HTTPStatus.FORBIDDEN 

145 ) 

146 if user_info.get("password"): 

147 raise AuthException( 

148 "Can not update password of this user", http_code=HTTPStatus.FORBIDDEN 

149 ) 

150 super(AuthconnTacacs, self).update_user(user_info)