Revert "Revert "Feature 10941: User Management Enhancements"" 36/13536/2
authorgarciadeblas <gerardo.garciadeblas@telefonica.com>
Thu, 15 Jun 2023 09:47:15 +0000 (11:47 +0200)
committergarciadeblas <gerardo.garciadeblas@telefonica.com>
Thu, 15 Jun 2023 11:07:45 +0000 (13:07 +0200)
This reverts commit 86ec75c0970e672638d4f0d27bc6f471fe256bfb.

Change-Id: I00db6e24782f9cb42f49eeab21b2a35467809f5c
Signed-off-by: garciadeblas <gerardo.garciadeblas@telefonica.com>
osm_nbi/admin_topics.py
osm_nbi/auth.py
osm_nbi/authconn_internal.py
osm_nbi/nbi.cfg
osm_nbi/nbi.py
osm_nbi/tests/test_admin_topics.py
osm_nbi/validation.py

index 8960961..de0ad1f 100644 (file)
@@ -1000,6 +1000,8 @@ class UserTopicAuth(UserTopic):
                 or indata.get("project_role_mappings")
                 or indata.get("projects")
                 or indata.get("add_projects")
+                or indata.get("unlock")
+                or indata.get("renew")
             ):
                 return _id
             if indata.get("project_role_mappings") and (
@@ -1122,6 +1124,9 @@ class UserTopicAuth(UserTopic):
                     "old_password": indata.get("old_password"),
                     "add_project_role_mappings": mappings_to_add,
                     "remove_project_role_mappings": mappings_to_remove,
+                    "system_admin_id": indata.get("system_admin_id"),
+                    "unlock": indata.get("unlock"),
+                    "renew": indata.get("renew"),
                 }
             )
             data_to_send = {"_id": _id, "changes": indata}
index ec33b1c..9c8c8d3 100644 (file)
@@ -252,7 +252,7 @@ class Authenticator:
         user_desc = {
             "username": "admin",
             "password": "admin",
-            "_admin": {"created": now, "modified": now},
+            "_admin": {"created": now, "modified": now, "user_status": "always-active"},
         }
         if project_id:
             pid = project_id
@@ -791,20 +791,24 @@ class Authenticator:
         This method will check for password expiry of the user
         :param outdata: user token information
         """
-        user_content = None
+        user_list = None
         present_time = time()
         user = outdata["username"]
-        if self.config["authentication"].get("pwd_expiry_check"):
-            user_content = self.db.get_list("users", {"username": user})[0]
-            if not user_content.get("username") == "admin":
-                user_content["_admin"]["modified_time"] = present_time
-                if user_content.get("_admin").get("expire_time"):
-                    expire_time = user_content["_admin"]["expire_time"]
-                else:
-                    expire_time = present_time
-                uid = user_content["_id"]
-                self.db.set_one("users", {"_id": uid}, user_content)
-                if not present_time < expire_time:
-                    return True
+        if self.config["authentication"].get("user_management"):
+            user_list = self.db.get_list("users", {"username": user})
+            if user_list:
+                user_content = user_list[0]
+                if not user_content.get("username") == "admin":
+                    user_content["_admin"]["modified"] = present_time
+                    if user_content.get("_admin").get("password_expire_time"):
+                        password_expire_time = user_content["_admin"][
+                            "password_expire_time"
+                        ]
+                    else:
+                        password_expire_time = present_time
+                    uid = user_content["_id"]
+                    self.db.set_one("users", {"_id": uid}, user_content)
+                    if not present_time < password_expire_time:
+                        return True
         else:
             pass
index 3f495d8..0f414b1 100644 (file)
@@ -156,15 +156,134 @@ class AuthconnInternal(Authconn):
         user_rows = self.db.get_list(
             self.users_collection, {BaseTopic.id_field("users", user): user}
         )
+        now = time()
         user_content = None
-        if user_rows:
-            user_content = user_rows[0]
-            salt = user_content["_admin"]["salt"]
-            shadow_password = sha256(
-                password.encode("utf-8") + salt.encode("utf-8")
-            ).hexdigest()
-            if shadow_password != user_content["password"]:
-                user_content = None
+        if user:
+            user_rows = self.db.get_list(
+                self.users_collection,
+                {BaseTopic.id_field(self.users_collection, user): user},
+            )
+            if user_rows:
+                user_content = user_rows[0]
+                # Updating user_status for every system_admin id role login
+                mapped_roles = user_content.get("project_role_mappings")
+                for role in mapped_roles:
+                    role_id = role.get("role")
+                    role_assigned = self.db.get_one(
+                        self.roles_collection,
+                        {BaseTopic.id_field(self.roles_collection, role_id): role_id},
+                    )
+
+                    if role_assigned.get("permissions")["admin"]:
+                        if role_assigned.get("permissions")["default"]:
+                            if self.config.get("user_management"):
+                                filt = {}
+                                users = self.db.get_list(self.users_collection, filt)
+                                for user_info in users:
+                                    if not user_info.get("username") == "admin":
+                                        if not user_info.get("_admin").get(
+                                            "account_expire_time"
+                                        ):
+                                            expire = now + 86400 * self.config.get(
+                                                "account_expire_days"
+                                            )
+                                            self.db.set_one(
+                                                self.users_collection,
+                                                {"_id": user_info["_id"]},
+                                                {"_admin.account_expire_time": expire},
+                                            )
+                                        else:
+                                            if now > user_info.get("_admin").get(
+                                                "account_expire_time"
+                                            ):
+                                                self.db.set_one(
+                                                    self.users_collection,
+                                                    {"_id": user_info["_id"]},
+                                                    {"_admin.user_status": "expired"},
+                                                )
+                                break
+
+                # To add "admin" user_status key while upgrading osm setup with feature enabled
+                if user_content.get("username") == "admin":
+                    if self.config.get("user_management"):
+                        self.db.set_one(
+                            self.users_collection,
+                            {"_id": user_content["_id"]},
+                            {"_admin.user_status": "always-active"},
+                        )
+
+                if not user_content.get("username") == "admin":
+                    if self.config.get("user_management"):
+                        if not user_content.get("_admin").get("account_expire_time"):
+                            account_expire_time = now + 86400 * self.config.get(
+                                "account_expire_days"
+                            )
+                            self.db.set_one(
+                                self.users_collection,
+                                {"_id": user_content["_id"]},
+                                {"_admin.account_expire_time": account_expire_time},
+                            )
+                        else:
+                            account_expire_time = user_content.get("_admin").get(
+                                "account_expire_time"
+                            )
+
+                        if now > account_expire_time:
+                            self.db.set_one(
+                                self.users_collection,
+                                {"_id": user_content["_id"]},
+                                {"_admin.user_status": "expired"},
+                            )
+                            raise AuthException(
+                                "Account expired", http_code=HTTPStatus.UNAUTHORIZED
+                            )
+
+                        if user_content.get("_admin").get("user_status") == "locked":
+                            raise AuthException(
+                                "Failed to login as the account is locked due to MANY FAILED ATTEMPTS"
+                            )
+                        elif user_content.get("_admin").get("user_status") == "expired":
+                            raise AuthException(
+                                "Failed to login as the account is expired"
+                            )
+
+                salt = user_content["_admin"]["salt"]
+                shadow_password = sha256(
+                    password.encode("utf-8") + salt.encode("utf-8")
+                ).hexdigest()
+                if shadow_password != user_content["password"]:
+                    count = 1
+                    if user_content.get("_admin").get("retry_count") >= 0:
+                        count += user_content.get("_admin").get("retry_count")
+                        self.db.set_one(
+                            self.users_collection,
+                            {"_id": user_content["_id"]},
+                            {"_admin.retry_count": count},
+                        )
+                        self.logger.debug(
+                            "Failed Authentications count: {}".format(count)
+                        )
+
+                    if user_content.get("username") == "admin":
+                        user_content = None
+                    else:
+                        if not self.config.get("user_management"):
+                            user_content = None
+                        else:
+                            if (
+                                user_content.get("_admin").get("retry_count")
+                                >= self.config["max_pwd_attempt"] - 1
+                            ):
+                                self.db.set_one(
+                                    self.users_collection,
+                                    {"_id": user_content["_id"]},
+                                    {"_admin.user_status": "locked"},
+                                )
+                                raise AuthException(
+                                    "Failed to login as the account is locked due to MANY FAILED ATTEMPTS"
+                                )
+                            else:
+                                user_content = None
         return user_content
 
     def authenticate(self, credentials, token_info=None):
@@ -236,10 +355,14 @@ class AuthconnInternal(Authconn):
             sleep(self.token_delay)
         # user_content["_admin"]["last_token_time"] = now
         # self.db.replace("users", user_content["_id"], user_content)   # might cause race conditions
+        user_data = {
+            "_admin.last_token_time": now,
+            "_admin.retry_count": 0,
+        }
         self.db.set_one(
             self.users_collection,
             {"_id": user_content["_id"]},
-            {"_admin.last_token_time": now},
+            user_data,
         )
 
         token_id = "".join(
@@ -299,6 +422,24 @@ class AuthconnInternal(Authconn):
             ]
             roles_list = [{"name": "project_admin", "id": rid}]
 
+        login_count = user_content.get("_admin").get("retry_count")
+        last_token_time = user_content.get("_admin").get("last_token_time")
+
+        admin_show = False
+        user_show = False
+        if self.config.get("user_management"):
+            for role in roles_list:
+                role_id = role.get("id")
+                permission = self.db.get_one(
+                    self.roles_collection,
+                    {BaseTopic.id_field(self.roles_collection, role_id): role_id},
+                )
+                if permission.get("permissions")["admin"]:
+                    if permission.get("permissions")["default"]:
+                        admin_show = True
+                        break
+            else:
+                user_show = True
         new_token = {
             "issued_at": now,
             "expires": now + 3600,
@@ -310,6 +451,10 @@ class AuthconnInternal(Authconn):
             "user_id": user_content["_id"],
             "admin": token_admin,
             "roles": roles_list,
+            "login_count": login_count,
+            "last_login": last_token_time,
+            "admin_show": admin_show,
+            "user_show": user_show,
         }
 
         self.db.create(self.tokens_collection, new_token)
@@ -370,15 +515,24 @@ class AuthconnInternal(Authconn):
         BaseTopic.format_on_new(user_info, make_public=False)
         salt = uuid4().hex
         user_info["_admin"]["salt"] = salt
+        user_info["_admin"]["user_status"] = "active"
         present = time()
         if not user_info["username"] == "admin":
-            if self.config.get("pwd_expiry_check"):
-                user_info["_admin"]["modified_time"] = present
-                user_info["_admin"]["expire_time"] = present
+            if self.config.get("user_management"):
+                user_info["_admin"]["modified"] = present
+                user_info["_admin"]["password_expire_time"] = present
+                account_expire_time = present + 86400 * self.config.get(
+                    "account_expire_days"
+                )
+                user_info["_admin"]["account_expire_time"] = account_expire_time
+
+        user_info["_admin"]["retry_count"] = 0
+        user_info["_admin"]["last_token_time"] = present
         if "password" in user_info:
             user_info["password"] = sha256(
                 user_info["password"].encode("utf-8") + salt.encode("utf-8")
             ).hexdigest()
+            user_info["_admin"]["password_history"] = {salt: user_info["password"]}
         # "projects" are not stored any more
         if "projects" in user_info:
             del user_info["projects"]
@@ -393,6 +547,10 @@ class AuthconnInternal(Authconn):
         """
         uid = user_info["_id"]
         old_pwd = user_info.get("old_password")
+        unlock = user_info.get("unlock")
+        renew = user_info.get("renew")
+        permission_id = user_info.get("system_admin_id")
+
         user_data = self.db.get_one(
             self.users_collection, {BaseTopic.id_field("users", uid): uid}
         )
@@ -405,6 +563,87 @@ class AuthconnInternal(Authconn):
                 raise AuthconnConflictException(
                     "Incorrect password", http_code=HTTPStatus.CONFLICT
                 )
+        # Unlocking the user
+        if unlock:
+            system_user = None
+            unlock_state = False
+            if not permission_id:
+                raise AuthconnConflictException(
+                    "system_admin_id is the required field to unlock the user",
+                    http_code=HTTPStatus.CONFLICT,
+                )
+            else:
+                system_user = self.db.get_one(
+                    self.users_collection,
+                    {
+                        BaseTopic.id_field(
+                            self.users_collection, permission_id
+                        ): permission_id
+                    },
+                )
+                mapped_roles = system_user.get("project_role_mappings")
+                for role in mapped_roles:
+                    role_id = role.get("role")
+                    role_assigned = self.db.get_one(
+                        self.roles_collection,
+                        {BaseTopic.id_field(self.roles_collection, role_id): role_id},
+                    )
+                    if role_assigned.get("permissions")["admin"]:
+                        if role_assigned.get("permissions")["default"]:
+                            user_data["_admin"]["retry_count"] = 0
+                            user_data["_admin"]["user_status"] = "active"
+                            unlock_state = True
+                            break
+                if not unlock_state:
+                    raise AuthconnConflictException(
+                        "User '{}' does not have the privilege to unlock the user".format(
+                            permission_id
+                        ),
+                        http_code=HTTPStatus.CONFLICT,
+                    )
+        # Renewing the user
+        if renew:
+            system_user = None
+            renew_state = False
+            if not permission_id:
+                raise AuthconnConflictException(
+                    "system_admin_id is the required field to renew the user",
+                    http_code=HTTPStatus.CONFLICT,
+                )
+            else:
+                system_user = self.db.get_one(
+                    self.users_collection,
+                    {
+                        BaseTopic.id_field(
+                            self.users_collection, permission_id
+                        ): permission_id
+                    },
+                )
+                mapped_roles = system_user.get("project_role_mappings")
+                for role in mapped_roles:
+                    role_id = role.get("role")
+                    role_assigned = self.db.get_one(
+                        self.roles_collection,
+                        {BaseTopic.id_field(self.roles_collection, role_id): role_id},
+                    )
+                    if role_assigned.get("permissions")["admin"]:
+                        if role_assigned.get("permissions")["default"]:
+                            present = time()
+                            account_expire = (
+                                present + 86400 * self.config["account_expire_days"]
+                            )
+                            user_data["_admin"]["modified"] = present
+                            user_data["_admin"]["account_expire_time"] = account_expire
+                            user_data["_admin"]["user_status"] = "active"
+                            renew_state = True
+                            break
+                if not renew_state:
+                    raise AuthconnConflictException(
+                        "User '{}' does not have the privilege to renew the user".format(
+                            permission_id
+                        ),
+                        http_code=HTTPStatus.CONFLICT,
+                    )
         BaseTopic.format_on_edit(user_data, user_info)
         # User Name
         usnm = user_info.get("username")
@@ -428,17 +667,33 @@ class AuthconnInternal(Authconn):
             salt = uuid4().hex
             if "_admin" not in user_data:
                 user_data["_admin"] = {}
+            if user_data.get("_admin").get("password_history"):
+                old_pwds = user_data.get("_admin").get("password_history")
+            else:
+                old_pwds = {}
+            for k, v in old_pwds.items():
+                shadow_password = sha256(
+                    pswd.encode("utf-8") + k.encode("utf-8")
+                ).hexdigest()
+                if v == shadow_password:
+                    raise AuthconnConflictException(
+                        "Password is used before", http_code=HTTPStatus.CONFLICT
+                    )
             user_data["_admin"]["salt"] = salt
             user_data["password"] = sha256(
                 pswd.encode("utf-8") + salt.encode("utf-8")
             ).hexdigest()
+            if len(old_pwds) >= 3:
+                old_pwds.pop(list(old_pwds.keys())[0])
+            old_pwds.update({salt: user_data["password"]})
+            user_data["_admin"]["password_history"] = old_pwds
             if not user_data["username"] == "admin":
-                if self.config.get("pwd_expiry_check"):
+                if self.config.get("user_management"):
                     present = time()
-                    if self.config.get("days"):
-                        expire = present + 86400 * self.config.get("days")
-                        user_data["_admin"]["modified_time"] = present
-                        user_data["_admin"]["expire_time"] = expire
+                    if self.config.get("pwd_expire_days"):
+                        expire = present + 86400 * self.config.get("pwd_expire_days")
+                        user_data["_admin"]["modified"] = present
+                        user_data["_admin"]["password_expire_time"] = expire
         # Project-Role Mappings
         # TODO: Check that user_info NEVER includes "project_role_mappings"
         if "project_role_mappings" not in user_data:
index 49f4985..7035cae 100644 (file)
@@ -116,9 +116,11 @@ backend: "keystone"         # internal or keystone or tacacs
 # tacacs_port: 49    # Default value
 # tacacs_timeout: 10 # Default value
 
-# Password expiry configuration
-# pwd_expiry_check: True      # Uncomment to enable the password expiry check
-# days: 30                    # Default value
+# User Management configuration
+user_management: True
+pwd_expire_days: 30         # Password expiry Default value
+max_pwd_attempt: 5
+account_expire_days: 90     # Account expiry Default value
 
 # CEF Configuration
 version: "0"
index d62e8a2..46fd8cc 100644 (file)
@@ -1934,6 +1934,9 @@ def _start_service():
 
     engine_config = cherrypy.tree.apps["/osm"].config
     for k, v in environ.items():
+        if k == "OSMNBI_USER_MANAGEMENT":
+            feature_state = eval(v.title())
+            engine_config["authentication"]["user_management"] = feature_state
         if not k.startswith("OSMNBI_"):
             continue
         k1, _, k2 = k[7:].lower().partition("_")
index 90f4acd..6a44365 100755 (executable)
@@ -25,6 +25,7 @@ from uuid import uuid4
 from http import HTTPStatus
 from time import time
 from osm_common import dbbase, fsbase, msgbase
+from osm_common.dbmemory import DbMemory
 from osm_nbi import authconn, validation
 from osm_nbi.admin_topics import (
     ProjectTopicAuth,
@@ -35,6 +36,7 @@ from osm_nbi.admin_topics import (
 )
 from osm_nbi.engine import EngineException
 from osm_nbi.authconn import AuthconnNotFoundException
+from osm_nbi.authconn_internal import AuthconnInternal
 
 
 test_pid = str(uuid4())
@@ -777,9 +779,11 @@ class Test_UserTopicAuth(TestCase):
     @classmethod
     def setUpClass(cls):
         cls.test_name = "test-user-topic"
+        cls.password = "Test@123"
 
     def setUp(self):
-        self.db = Mock(dbbase.DbBase())
+        # self.db = Mock(dbbase.DbBase())
+        self.db = DbMemory()
         self.fs = Mock(fsbase.FsBase())
         self.msg = Mock(msgbase.MsgBase())
         self.auth = Mock(authconn.Authconn(None, None, None))
@@ -812,7 +816,7 @@ class Test_UserTopicAuth(TestCase):
                 self.fake_session,
                 {
                     "username": self.test_name,
-                    "password": self.test_name,
+                    "password": self.password,
                     "project_role_mappings": prms_in,
                 },
             )
@@ -820,7 +824,7 @@ class Test_UserTopicAuth(TestCase):
             self.assertEqual(uid2, uid1, "Wrong project identifier")
             content = self.auth.create_user.call_args[0][0]
             self.assertEqual(content["username"], self.test_name, "Wrong project name")
-            self.assertEqual(content["password"], self.test_name, "Wrong password")
+            self.assertEqual(content["password"], self.password, "Wrong password")
             self.assertEqual(
                 content["project_role_mappings"],
                 prms_out,
@@ -844,7 +848,7 @@ class Test_UserTopicAuth(TestCase):
                 self.fake_session,
                 {
                     "username": self.test_name,
-                    "password": self.test_name,
+                    "password": self.password,
                     "projects": ["some_project"],
                 },
             )
@@ -852,7 +856,7 @@ class Test_UserTopicAuth(TestCase):
             self.assertEqual(uid2, uid1, "Wrong project identifier")
             content = self.auth.create_user.call_args[0][0]
             self.assertEqual(content["username"], self.test_name, "Wrong project name")
-            self.assertEqual(content["password"], self.test_name, "Wrong password")
+            self.assertEqual(content["password"], self.password, "Wrong password")
             self.assertEqual(
                 content["project_role_mappings"],
                 prms_out,
@@ -874,7 +878,7 @@ class Test_UserTopicAuth(TestCase):
                     self.fake_session,
                     {
                         "username": "other-project-name",
-                        "password": "other-password",
+                        "password": "Other@pwd1",
                         "project_role_mappings": [{}],
                     },
                 )
@@ -899,7 +903,7 @@ class Test_UserTopicAuth(TestCase):
                     self.fake_session,
                     {
                         "username": "other-project-name",
-                        "password": "other-password",
+                        "password": "Other@pwd1",
                         "projects": [],
                     },
                 )
@@ -950,7 +954,7 @@ class Test_UserTopicAuth(TestCase):
                 {"_id": rid1, "name": "role-1"},
             ]
             new_name = "new-user-name"
-            new_pasw = "new-password"
+            new_pasw = "New@pwd1"
             add_prms = [{"project": pid2, "role": rid2}]
             rem_prms = [{"project": pid1, "role": rid1}]
             self.topic.edit(
@@ -1005,8 +1009,8 @@ class Test_UserTopicAuth(TestCase):
         with self.subTest(i=3):
             self.auth.get_user_list.side_effect = [[user], []]
             self.auth.get_user.return_value = user
-            old_password = self.test_name
-            new_pasw = "new-password"
+            old_password = self.password
+            new_pasw = "New@pwd1"
             self.topic.edit(
                 self.fake_session,
                 uid,
@@ -1053,7 +1057,7 @@ class Test_UserTopicAuth(TestCase):
                     self.fake_session,
                     {
                         "username": uid,
-                        "password": self.test_name,
+                        "password": self.password,
                         "projects": [test_pid],
                     },
                 )
@@ -1081,7 +1085,7 @@ class Test_UserTopicAuth(TestCase):
                     self.fake_session,
                     {
                         "username": self.test_name,
-                        "password": self.test_name,
+                        "password": self.password,
                         "projects": [test_pid],
                     },
                 )
@@ -1106,7 +1110,7 @@ class Test_UserTopicAuth(TestCase):
                     self.fake_session,
                     {
                         "username": self.test_name,
-                        "password": self.test_name,
+                        "password": self.password,
                         "projects": [str(uuid4())],
                     },
                 )
@@ -1226,6 +1230,143 @@ class Test_UserTopicAuth(TestCase):
                 "Wrong exception text",
             )
 
+    def test_user_management(self):
+        self.config = {
+            "user_management": True,
+            "pwd_expire_days": 30,
+            "max_pwd_attempt": 5,
+            "account_expire_days": 90,
+            "version": "dev",
+            "deviceVendor": "test",
+            "deviceProduct": "test",
+        }
+        self.permissions = {"admin": True, "default": True}
+        now = time()
+        rid = str(uuid4())
+        role = {
+            "_id": rid,
+            "name": self.test_name,
+            "permissions": self.permissions,
+            "_admin": {"created": now, "modified": now},
+        }
+        self.db.create("roles", role)
+        admin_user = {
+            "_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500",
+            "username": "admin",
+            "password": "bf0d9f988ad9b404464cf8c8749b298209b05fd404119bae0c11e247efbbc4cb",
+            "_admin": {
+                "created": 1663058370.7721832,
+                "modified": 1663681183.5651639,
+                "salt": "37587e7e0c2f4dbfb9416f3fb5543e2b",
+                "last_token_time": 1666876472.2962265,
+                "user_status": "always-active",
+                "retry_count": 0,
+            },
+            "project_role_mappings": [
+                {"project": "a595ce4e-09dc-4b24-9d6f-e723830bc66b", "role": rid}
+            ],
+        }
+        self.db.create("users", admin_user)
+        with self.subTest(i=1):
+            self.user_create = AuthconnInternal(self.config, self.db, self.permissions)
+            user_info = {"username": "user_mgmt_true", "password": "Test@123"}
+            self.user_create.create_user(user_info)
+            user = self.db.get_one("users", {"username": user_info["username"]})
+            self.assertEqual(user["username"], user_info["username"], "Wrong user name")
+            self.assertEqual(
+                user["_admin"]["user_status"], "active", "User status is unknown"
+            )
+            self.assertIn("password_expire_time", user["_admin"], "Key is not there")
+            self.assertIn("account_expire_time", user["_admin"], "Key is not there")
+        with self.subTest(i=2):
+            self.user_update = AuthconnInternal(self.config, self.db, self.permissions)
+            locked_user = {
+                "username": "user_lock",
+                "password": "c94ba8cfe81985cf5c84dff16d5bac95814ab17e44a8871755eb4cf3a27b7d3d",
+                "_admin": {
+                    "created": 1667207552.2191198,
+                    "modified": 1667207552.2191815,
+                    "salt": "560a5d51b1d64bb4b9cae0ccff3f1102",
+                    "user_status": "locked",
+                    "password_expire_time": 1667207552.2191815,
+                    "account_expire_time": 1674983552.2191815,
+                    "retry_count": 5,
+                    "last_token_time": 1667207552.2191815,
+                },
+                "_id": "73bbbb71-ed38-4b79-9f58-ece19e7e32d6",
+            }
+            self.db.create("users", locked_user)
+            user_info = {
+                "_id": "73bbbb71-ed38-4b79-9f58-ece19e7e32d6",
+                "system_admin_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500",
+                "unlock": True,
+            }
+            self.assertEqual(
+                locked_user["_admin"]["user_status"], "locked", "User status is unknown"
+            )
+            self.user_update.update_user(user_info)
+            user = self.db.get_one("users", {"username": locked_user["username"]})
+            self.assertEqual(
+                user["username"], locked_user["username"], "Wrong user name"
+            )
+            self.assertEqual(
+                user["_admin"]["user_status"], "active", "User status is unknown"
+            )
+            self.assertEqual(user["_admin"]["retry_count"], 0, "retry_count is unknown")
+        with self.subTest(i=3):
+            self.user_update = AuthconnInternal(self.config, self.db, self.permissions)
+            expired_user = {
+                "username": "user_expire",
+                "password": "c94ba8cfe81985cf5c84dff16d5bac95814ab17e44a8871755eb4cf3a27b7d3d",
+                "_admin": {
+                    "created": 1665602087.601298,
+                    "modified": 1665636442.1245084,
+                    "salt": "560a5d51b1d64bb4b9cae0ccff3f1102",
+                    "user_status": "expired",
+                    "password_expire_time": 1668248628.2191815,
+                    "account_expire_time": 1666952628.2191815,
+                    "retry_count": 0,
+                    "last_token_time": 1666779828.2171815,
+                },
+                "_id": "3266430f-8222-407f-b08f-3a242504ab94",
+            }
+            self.db.create("users", expired_user)
+            user_info = {
+                "_id": "3266430f-8222-407f-b08f-3a242504ab94",
+                "system_admin_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500",
+                "renew": True,
+            }
+            self.assertEqual(
+                expired_user["_admin"]["user_status"],
+                "expired",
+                "User status is unknown",
+            )
+            self.user_update.update_user(user_info)
+            user = self.db.get_one("users", {"username": expired_user["username"]})
+            self.assertEqual(
+                user["username"], expired_user["username"], "Wrong user name"
+            )
+            self.assertEqual(
+                user["_admin"]["user_status"], "active", "User status is unknown"
+            )
+            self.assertGreater(
+                user["_admin"]["account_expire_time"],
+                expired_user["_admin"]["account_expire_time"],
+                "User expire time is not get extended",
+            )
+        with self.subTest(i=4):
+            self.config.update({"user_management": False})
+            self.user_create = AuthconnInternal(self.config, self.db, self.permissions)
+            user_info = {"username": "user_mgmt_false", "password": "Test@123"}
+            self.user_create.create_user(user_info)
+            user = self.db.get_one("users", {"username": user_info["username"]})
+            self.assertEqual(user["username"], user_info["username"], "Wrong user name")
+            self.assertEqual(
+                user["_admin"]["user_status"], "active", "User status is unknown"
+            )
+            self.assertNotIn("password_expire_time", user["_admin"], "Key is not there")
+            self.assertNotIn("account_expire_time", user["_admin"], "Key is not there")
+
 
 class Test_CommonVimWimSdn(TestCase):
     @classmethod
index e5c2a03..4dafed5 100644 (file)
@@ -35,6 +35,10 @@ shortname_schema = {
     "pattern": "^[^,;()\\.\\$'\"]+$",
 }
 passwd_schema = {"type": "string", "minLength": 1, "maxLength": 60}
+user_passwd_schema = {
+    "type": "string",
+    "pattern": "^.*(?=.{8,})((?=.*[!@#$%^&*()\\-_=+{};:,<.>]){1})(?=.*\\d)((?=.*[a-z]){1})((?=.*[A-Z]){1}).*$",
+}
 name_schema = {
     "type": "string",
     "minLength": 1,
@@ -1082,7 +1086,7 @@ user_new_schema = {
     "properties": {
         "username": string_schema,
         "domain_name": shortname_schema,
-        "password": passwd_schema,
+        "password": user_passwd_schema,
         "projects": nameshort_list_schema,
         "project_role_mappings": project_role_mappings,
     },
@@ -1094,13 +1098,16 @@ user_edit_schema = {
     "title": "User edit schema for administrators",
     "type": "object",
     "properties": {
-        "password": passwd_schema,
+        "password": user_passwd_schema,
         "old_password": passwd_schema,
         "username": string_schema,  # To allow User Name modification
         "projects": {"oneOf": [nameshort_list_schema, array_edition_schema]},
         "project_role_mappings": project_role_mappings,
         "add_project_role_mappings": project_role_mappings,
         "remove_project_role_mappings": project_role_mappings_optional,
+        "system_admin_id": id_schema,
+        "unlock": bool_schema,
+        "renew": bool_schema,
     },
     "minProperties": 1,
     "additionalProperties": False,