Feature-9904: Enhancing NG-UI to enable Juju operational view dashboard
[osm/NBI.git] / osm_nbi / authconn_tacacs.py
index 27f38e9..1b82935 100644 (file)
 ##
 
 
 ##
 
 
-""" 
+"""
 AuthconnTacacs implements implements the connector for TACACS.
 Leverages AuthconnInternal for token lifecycle management and the RBAC model.
 
 When NBI bootstraps, it tries to create admin user with admin role associated to admin project.
 Hence, the TACACS server should contain admin user.
 AuthconnTacacs implements implements the connector for TACACS.
 Leverages AuthconnInternal for token lifecycle management and the RBAC model.
 
 When NBI bootstraps, it tries to create admin user with admin role associated to admin project.
 Hence, the TACACS server should contain admin user.
-""" 
+"""
 
 __author__ = "K Sai Kiran <saikiran.k@tataelxsi.co.in>"
 __date__ = "$11-Nov-2020 11:04:00$"
 
 
 
 __author__ = "K Sai Kiran <saikiran.k@tataelxsi.co.in>"
 __date__ = "$11-Nov-2020 11:04:00$"
 
 
-from osm_nbi.authconn import Authconn, AuthException  
+from osm_nbi.authconn import Authconn, AuthException
 from osm_nbi.authconn_internal import AuthconnInternal
 from osm_nbi.base_topic import BaseTopic
 
 from osm_nbi.authconn_internal import AuthconnInternal
 from osm_nbi.base_topic import BaseTopic
 
@@ -63,21 +63,31 @@ class AuthconnTacacs(AuthconnInternal):
         self.db = db
         self.tacacs_host = config["tacacs_host"]
         self.tacacs_secret = config["tacacs_secret"]
         self.db = db
         self.tacacs_host = config["tacacs_host"]
         self.tacacs_secret = config["tacacs_secret"]
-        self.tacacs_port = config["tacacs_port"] if config.get("tacacs_port") else self.tacacs_def_port
-        self.tacacs_timeout = config["tacacs_timeout"] if config.get("tacacs_timeout") else self.tacacs_def_timeout
-        self.tacacs_cli = TACACSClient(self.tacacs_host, self.tacacs_port, self.tacacs_secret,
-                                       self.tacacs_timeout)
+        self.tacacs_port = (
+            config["tacacs_port"] if config.get("tacacs_port") else self.tacacs_def_port
+        )
+        self.tacacs_timeout = (
+            config["tacacs_timeout"]
+            if config.get("tacacs_timeout")
+            else self.tacacs_def_timeout
+        )
+        self.tacacs_cli = TACACSClient(
+            self.tacacs_host, self.tacacs_port, self.tacacs_secret, self.tacacs_timeout
+        )
 
     def validate_user(self, user, password):
 
     def validate_user(self, user, password):
-        """
-        """
+        """"""
         now = time()
         try:
             tacacs_authen = self.tacacs_cli.authenticate(user, password)
         except Exception as e:
         now = time()
         try:
             tacacs_authen = self.tacacs_cli.authenticate(user, password)
         except Exception as e:
-            raise AuthException("TACACS server error: {}".format(e), http_code=HTTPStatus.UNAUTHORIZED)
+            raise AuthException(
+                "TACACS server error: {}".format(e), http_code=HTTPStatus.UNAUTHORIZED
+            )
         user_content = None
         user_content = None
-        user_rows = self.db.get_list(self.users_collection, {BaseTopic.id_field("users", user): user})
+        user_rows = self.db.get_list(
+            self.users_collection, {BaseTopic.id_field("users", user): user}
+        )
         if not tacacs_authen.valid:
             if user_rows:
                 # To remove TACACS stale user from system.
         if not tacacs_authen.valid:
             if user_rows:
                 # To remove TACACS stale user from system.
@@ -86,14 +96,12 @@ class AuthconnTacacs(AuthconnInternal):
         if user_rows:
             user_content = user_rows[0]
         else:
         if user_rows:
             user_content = user_rows[0]
         else:
-            new_user = {'username': user,
-                        'password': password,
-                        '_admin': {
-                            'created': now,
-                            'modified': now
-                        },
-                        'project_role_mappings': []
-                        }
+            new_user = {
+                "username": user,
+                "password": password,
+                "_admin": {"created": now, "modified": now},
+                "project_role_mappings": [],
+            }
             user_content = self.create_user(new_user)
         return user_content
 
             user_content = self.create_user(new_user)
         return user_content
 
@@ -106,14 +114,21 @@ class AuthconnTacacs(AuthconnInternal):
         """
         BaseTopic.format_on_new(user_info, make_public=False)
         try:
         """
         BaseTopic.format_on_new(user_info, make_public=False)
         try:
-            authen = self.tacacs_cli.authenticate(user_info["username"], user_info["password"])
+            authen = self.tacacs_cli.authenticate(
+                user_info["username"], user_info["password"]
+            )
             if authen.valid:
                 user_info.pop("password")
                 self.db.create(self.users_collection, user_info)
             else:
             if authen.valid:
                 user_info.pop("password")
                 self.db.create(self.users_collection, user_info)
             else:
-                raise AuthException("TACACS server error: Invalid credentials", http_code=HTTPStatus.FORBIDDEN)
+                raise AuthException(
+                    "TACACS server error: Invalid credentials",
+                    http_code=HTTPStatus.FORBIDDEN,
+                )
         except Exception as e:
         except Exception as e:
-            raise AuthException("TACACS server error: {}".format(e), http_code=HTTPStatus.BAD_REQUEST)
+            raise AuthException(
+                "TACACS server error: {}".format(e), http_code=HTTPStatus.BAD_REQUEST
+            )
         return {"username": user_info["username"], "_id": user_info["_id"]}
 
     def update_user(self, user_info):
         return {"username": user_info["username"], "_id": user_info["_id"]}
 
     def update_user(self, user_info):
@@ -124,8 +139,12 @@ class AuthconnTacacs(AuthconnInternal):
         :param user_info: Full user information in dict.
         :return: returns None for successful add/remove of project and role map.
         """
         :param user_info: Full user information in dict.
         :return: returns None for successful add/remove of project and role map.
         """
-        if(user_info.get("username")):
-            raise AuthException("Can not update username of this user", http_code=HTTPStatus.FORBIDDEN)
-        if(user_info.get("password")):
-            raise AuthException("Can not update password of this user", http_code=HTTPStatus.FORBIDDEN)
+        if user_info.get("username"):
+            raise AuthException(
+                "Can not update username of this user", http_code=HTTPStatus.FORBIDDEN
+            )
+        if user_info.get("password"):
+            raise AuthException(
+                "Can not update password of this user", http_code=HTTPStatus.FORBIDDEN
+            )
         super(AuthconnTacacs, self).update_user(user_info)
         super(AuthconnTacacs, self).update_user(user_info)