Revert "Revert "Feature 10941: User Management Enhancements""
[osm/NBI.git] / osm_nbi / tests / test_admin_topics.py
index 90f4acd..6a44365 100755 (executable)
@@ -25,6 +25,7 @@ from uuid import uuid4
 from http import HTTPStatus
 from time import time
 from osm_common import dbbase, fsbase, msgbase
+from osm_common.dbmemory import DbMemory
 from osm_nbi import authconn, validation
 from osm_nbi.admin_topics import (
     ProjectTopicAuth,
@@ -35,6 +36,7 @@ from osm_nbi.admin_topics import (
 )
 from osm_nbi.engine import EngineException
 from osm_nbi.authconn import AuthconnNotFoundException
+from osm_nbi.authconn_internal import AuthconnInternal
 
 
 test_pid = str(uuid4())
@@ -777,9 +779,11 @@ class Test_UserTopicAuth(TestCase):
     @classmethod
     def setUpClass(cls):
         cls.test_name = "test-user-topic"
+        cls.password = "Test@123"
 
     def setUp(self):
-        self.db = Mock(dbbase.DbBase())
+        # self.db = Mock(dbbase.DbBase())
+        self.db = DbMemory()
         self.fs = Mock(fsbase.FsBase())
         self.msg = Mock(msgbase.MsgBase())
         self.auth = Mock(authconn.Authconn(None, None, None))
@@ -812,7 +816,7 @@ class Test_UserTopicAuth(TestCase):
                 self.fake_session,
                 {
                     "username": self.test_name,
-                    "password": self.test_name,
+                    "password": self.password,
                     "project_role_mappings": prms_in,
                 },
             )
@@ -820,7 +824,7 @@ class Test_UserTopicAuth(TestCase):
             self.assertEqual(uid2, uid1, "Wrong project identifier")
             content = self.auth.create_user.call_args[0][0]
             self.assertEqual(content["username"], self.test_name, "Wrong project name")
-            self.assertEqual(content["password"], self.test_name, "Wrong password")
+            self.assertEqual(content["password"], self.password, "Wrong password")
             self.assertEqual(
                 content["project_role_mappings"],
                 prms_out,
@@ -844,7 +848,7 @@ class Test_UserTopicAuth(TestCase):
                 self.fake_session,
                 {
                     "username": self.test_name,
-                    "password": self.test_name,
+                    "password": self.password,
                     "projects": ["some_project"],
                 },
             )
@@ -852,7 +856,7 @@ class Test_UserTopicAuth(TestCase):
             self.assertEqual(uid2, uid1, "Wrong project identifier")
             content = self.auth.create_user.call_args[0][0]
             self.assertEqual(content["username"], self.test_name, "Wrong project name")
-            self.assertEqual(content["password"], self.test_name, "Wrong password")
+            self.assertEqual(content["password"], self.password, "Wrong password")
             self.assertEqual(
                 content["project_role_mappings"],
                 prms_out,
@@ -874,7 +878,7 @@ class Test_UserTopicAuth(TestCase):
                     self.fake_session,
                     {
                         "username": "other-project-name",
-                        "password": "other-password",
+                        "password": "Other@pwd1",
                         "project_role_mappings": [{}],
                     },
                 )
@@ -899,7 +903,7 @@ class Test_UserTopicAuth(TestCase):
                     self.fake_session,
                     {
                         "username": "other-project-name",
-                        "password": "other-password",
+                        "password": "Other@pwd1",
                         "projects": [],
                     },
                 )
@@ -950,7 +954,7 @@ class Test_UserTopicAuth(TestCase):
                 {"_id": rid1, "name": "role-1"},
             ]
             new_name = "new-user-name"
-            new_pasw = "new-password"
+            new_pasw = "New@pwd1"
             add_prms = [{"project": pid2, "role": rid2}]
             rem_prms = [{"project": pid1, "role": rid1}]
             self.topic.edit(
@@ -1005,8 +1009,8 @@ class Test_UserTopicAuth(TestCase):
         with self.subTest(i=3):
             self.auth.get_user_list.side_effect = [[user], []]
             self.auth.get_user.return_value = user
-            old_password = self.test_name
-            new_pasw = "new-password"
+            old_password = self.password
+            new_pasw = "New@pwd1"
             self.topic.edit(
                 self.fake_session,
                 uid,
@@ -1053,7 +1057,7 @@ class Test_UserTopicAuth(TestCase):
                     self.fake_session,
                     {
                         "username": uid,
-                        "password": self.test_name,
+                        "password": self.password,
                         "projects": [test_pid],
                     },
                 )
@@ -1081,7 +1085,7 @@ class Test_UserTopicAuth(TestCase):
                     self.fake_session,
                     {
                         "username": self.test_name,
-                        "password": self.test_name,
+                        "password": self.password,
                         "projects": [test_pid],
                     },
                 )
@@ -1106,7 +1110,7 @@ class Test_UserTopicAuth(TestCase):
                     self.fake_session,
                     {
                         "username": self.test_name,
-                        "password": self.test_name,
+                        "password": self.password,
                         "projects": [str(uuid4())],
                     },
                 )
@@ -1226,6 +1230,143 @@ class Test_UserTopicAuth(TestCase):
                 "Wrong exception text",
             )
 
+    def test_user_management(self):
+        self.config = {
+            "user_management": True,
+            "pwd_expire_days": 30,
+            "max_pwd_attempt": 5,
+            "account_expire_days": 90,
+            "version": "dev",
+            "deviceVendor": "test",
+            "deviceProduct": "test",
+        }
+        self.permissions = {"admin": True, "default": True}
+        now = time()
+        rid = str(uuid4())
+        role = {
+            "_id": rid,
+            "name": self.test_name,
+            "permissions": self.permissions,
+            "_admin": {"created": now, "modified": now},
+        }
+        self.db.create("roles", role)
+        admin_user = {
+            "_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500",
+            "username": "admin",
+            "password": "bf0d9f988ad9b404464cf8c8749b298209b05fd404119bae0c11e247efbbc4cb",
+            "_admin": {
+                "created": 1663058370.7721832,
+                "modified": 1663681183.5651639,
+                "salt": "37587e7e0c2f4dbfb9416f3fb5543e2b",
+                "last_token_time": 1666876472.2962265,
+                "user_status": "always-active",
+                "retry_count": 0,
+            },
+            "project_role_mappings": [
+                {"project": "a595ce4e-09dc-4b24-9d6f-e723830bc66b", "role": rid}
+            ],
+        }
+        self.db.create("users", admin_user)
+        with self.subTest(i=1):
+            self.user_create = AuthconnInternal(self.config, self.db, self.permissions)
+            user_info = {"username": "user_mgmt_true", "password": "Test@123"}
+            self.user_create.create_user(user_info)
+            user = self.db.get_one("users", {"username": user_info["username"]})
+            self.assertEqual(user["username"], user_info["username"], "Wrong user name")
+            self.assertEqual(
+                user["_admin"]["user_status"], "active", "User status is unknown"
+            )
+            self.assertIn("password_expire_time", user["_admin"], "Key is not there")
+            self.assertIn("account_expire_time", user["_admin"], "Key is not there")
+        with self.subTest(i=2):
+            self.user_update = AuthconnInternal(self.config, self.db, self.permissions)
+            locked_user = {
+                "username": "user_lock",
+                "password": "c94ba8cfe81985cf5c84dff16d5bac95814ab17e44a8871755eb4cf3a27b7d3d",
+                "_admin": {
+                    "created": 1667207552.2191198,
+                    "modified": 1667207552.2191815,
+                    "salt": "560a5d51b1d64bb4b9cae0ccff3f1102",
+                    "user_status": "locked",
+                    "password_expire_time": 1667207552.2191815,
+                    "account_expire_time": 1674983552.2191815,
+                    "retry_count": 5,
+                    "last_token_time": 1667207552.2191815,
+                },
+                "_id": "73bbbb71-ed38-4b79-9f58-ece19e7e32d6",
+            }
+            self.db.create("users", locked_user)
+            user_info = {
+                "_id": "73bbbb71-ed38-4b79-9f58-ece19e7e32d6",
+                "system_admin_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500",
+                "unlock": True,
+            }
+            self.assertEqual(
+                locked_user["_admin"]["user_status"], "locked", "User status is unknown"
+            )
+            self.user_update.update_user(user_info)
+            user = self.db.get_one("users", {"username": locked_user["username"]})
+            self.assertEqual(
+                user["username"], locked_user["username"], "Wrong user name"
+            )
+            self.assertEqual(
+                user["_admin"]["user_status"], "active", "User status is unknown"
+            )
+            self.assertEqual(user["_admin"]["retry_count"], 0, "retry_count is unknown")
+        with self.subTest(i=3):
+            self.user_update = AuthconnInternal(self.config, self.db, self.permissions)
+            expired_user = {
+                "username": "user_expire",
+                "password": "c94ba8cfe81985cf5c84dff16d5bac95814ab17e44a8871755eb4cf3a27b7d3d",
+                "_admin": {
+                    "created": 1665602087.601298,
+                    "modified": 1665636442.1245084,
+                    "salt": "560a5d51b1d64bb4b9cae0ccff3f1102",
+                    "user_status": "expired",
+                    "password_expire_time": 1668248628.2191815,
+                    "account_expire_time": 1666952628.2191815,
+                    "retry_count": 0,
+                    "last_token_time": 1666779828.2171815,
+                },
+                "_id": "3266430f-8222-407f-b08f-3a242504ab94",
+            }
+            self.db.create("users", expired_user)
+            user_info = {
+                "_id": "3266430f-8222-407f-b08f-3a242504ab94",
+                "system_admin_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500",
+                "renew": True,
+            }
+            self.assertEqual(
+                expired_user["_admin"]["user_status"],
+                "expired",
+                "User status is unknown",
+            )
+            self.user_update.update_user(user_info)
+            user = self.db.get_one("users", {"username": expired_user["username"]})
+            self.assertEqual(
+                user["username"], expired_user["username"], "Wrong user name"
+            )
+            self.assertEqual(
+                user["_admin"]["user_status"], "active", "User status is unknown"
+            )
+            self.assertGreater(
+                user["_admin"]["account_expire_time"],
+                expired_user["_admin"]["account_expire_time"],
+                "User expire time is not get extended",
+            )
+        with self.subTest(i=4):
+            self.config.update({"user_management": False})
+            self.user_create = AuthconnInternal(self.config, self.db, self.permissions)
+            user_info = {"username": "user_mgmt_false", "password": "Test@123"}
+            self.user_create.create_user(user_info)
+            user = self.db.get_one("users", {"username": user_info["username"]})
+            self.assertEqual(user["username"], user_info["username"], "Wrong user name")
+            self.assertEqual(
+                user["_admin"]["user_status"], "active", "User status is unknown"
+            )
+            self.assertNotIn("password_expire_time", user["_admin"], "Key is not there")
+            self.assertNotIn("account_expire_time", user["_admin"], "Key is not there")
+
 
 class Test_CommonVimWimSdn(TestCase):
     @classmethod