From acdb5d4d3d2452b5a7f3be87190875d3a06e6999 Mon Sep 17 00:00:00 2001 From: "selvi.j" Date: Tue, 18 Apr 2023 04:24:13 +0000 Subject: [PATCH] Feature 10941: User Management Enhancements Added the implementation code for the user management enhancements feature Change-Id: I728816c454c1c9b509606fb30df374f1b443e893 Signed-off-by: selvi.j --- osm_nbi/admin_topics.py | 5 + osm_nbi/auth.py | 32 ++-- osm_nbi/authconn_internal.py | 289 +++++++++++++++++++++++++++-- osm_nbi/nbi.cfg | 8 +- osm_nbi/nbi.py | 3 + osm_nbi/tests/test_admin_topics.py | 164 ++++++++++++++-- osm_nbi/validation.py | 11 +- 7 files changed, 463 insertions(+), 49 deletions(-) diff --git a/osm_nbi/admin_topics.py b/osm_nbi/admin_topics.py index 8960961..de0ad1f 100644 --- a/osm_nbi/admin_topics.py +++ b/osm_nbi/admin_topics.py @@ -1000,6 +1000,8 @@ class UserTopicAuth(UserTopic): or indata.get("project_role_mappings") or indata.get("projects") or indata.get("add_projects") + or indata.get("unlock") + or indata.get("renew") ): return _id if indata.get("project_role_mappings") and ( @@ -1122,6 +1124,9 @@ class UserTopicAuth(UserTopic): "old_password": indata.get("old_password"), "add_project_role_mappings": mappings_to_add, "remove_project_role_mappings": mappings_to_remove, + "system_admin_id": indata.get("system_admin_id"), + "unlock": indata.get("unlock"), + "renew": indata.get("renew"), } ) data_to_send = {"_id": _id, "changes": indata} diff --git a/osm_nbi/auth.py b/osm_nbi/auth.py index ec33b1c..9c8c8d3 100644 --- a/osm_nbi/auth.py +++ b/osm_nbi/auth.py @@ -252,7 +252,7 @@ class Authenticator: user_desc = { "username": "admin", "password": "admin", - "_admin": {"created": now, "modified": now}, + "_admin": {"created": now, "modified": now, "user_status": "always-active"}, } if project_id: pid = project_id @@ -791,20 +791,24 @@ class Authenticator: This method will check for password expiry of the user :param outdata: user token information """ - user_content = None + user_list = None present_time = time() user = outdata["username"] - if self.config["authentication"].get("pwd_expiry_check"): - user_content = self.db.get_list("users", {"username": user})[0] - if not user_content.get("username") == "admin": - user_content["_admin"]["modified_time"] = present_time - if user_content.get("_admin").get("expire_time"): - expire_time = user_content["_admin"]["expire_time"] - else: - expire_time = present_time - uid = user_content["_id"] - self.db.set_one("users", {"_id": uid}, user_content) - if not present_time < expire_time: - return True + if self.config["authentication"].get("user_management"): + user_list = self.db.get_list("users", {"username": user}) + if user_list: + user_content = user_list[0] + if not user_content.get("username") == "admin": + user_content["_admin"]["modified"] = present_time + if user_content.get("_admin").get("password_expire_time"): + password_expire_time = user_content["_admin"][ + "password_expire_time" + ] + else: + password_expire_time = present_time + uid = user_content["_id"] + self.db.set_one("users", {"_id": uid}, user_content) + if not present_time < password_expire_time: + return True else: pass diff --git a/osm_nbi/authconn_internal.py b/osm_nbi/authconn_internal.py index 3f495d8..0f414b1 100644 --- a/osm_nbi/authconn_internal.py +++ b/osm_nbi/authconn_internal.py @@ -156,15 +156,134 @@ class AuthconnInternal(Authconn): user_rows = self.db.get_list( self.users_collection, {BaseTopic.id_field("users", user): user} ) + now = time() user_content = None - if user_rows: - user_content = user_rows[0] - salt = user_content["_admin"]["salt"] - shadow_password = sha256( - password.encode("utf-8") + salt.encode("utf-8") - ).hexdigest() - if shadow_password != user_content["password"]: - user_content = None + if user: + user_rows = self.db.get_list( + self.users_collection, + {BaseTopic.id_field(self.users_collection, user): user}, + ) + if user_rows: + user_content = user_rows[0] + # Updating user_status for every system_admin id role login + mapped_roles = user_content.get("project_role_mappings") + for role in mapped_roles: + role_id = role.get("role") + role_assigned = self.db.get_one( + self.roles_collection, + {BaseTopic.id_field(self.roles_collection, role_id): role_id}, + ) + + if role_assigned.get("permissions")["admin"]: + if role_assigned.get("permissions")["default"]: + if self.config.get("user_management"): + filt = {} + users = self.db.get_list(self.users_collection, filt) + for user_info in users: + if not user_info.get("username") == "admin": + if not user_info.get("_admin").get( + "account_expire_time" + ): + expire = now + 86400 * self.config.get( + "account_expire_days" + ) + self.db.set_one( + self.users_collection, + {"_id": user_info["_id"]}, + {"_admin.account_expire_time": expire}, + ) + else: + if now > user_info.get("_admin").get( + "account_expire_time" + ): + self.db.set_one( + self.users_collection, + {"_id": user_info["_id"]}, + {"_admin.user_status": "expired"}, + ) + break + + # To add "admin" user_status key while upgrading osm setup with feature enabled + if user_content.get("username") == "admin": + if self.config.get("user_management"): + self.db.set_one( + self.users_collection, + {"_id": user_content["_id"]}, + {"_admin.user_status": "always-active"}, + ) + + if not user_content.get("username") == "admin": + if self.config.get("user_management"): + if not user_content.get("_admin").get("account_expire_time"): + account_expire_time = now + 86400 * self.config.get( + "account_expire_days" + ) + self.db.set_one( + self.users_collection, + {"_id": user_content["_id"]}, + {"_admin.account_expire_time": account_expire_time}, + ) + else: + account_expire_time = user_content.get("_admin").get( + "account_expire_time" + ) + + if now > account_expire_time: + self.db.set_one( + self.users_collection, + {"_id": user_content["_id"]}, + {"_admin.user_status": "expired"}, + ) + raise AuthException( + "Account expired", http_code=HTTPStatus.UNAUTHORIZED + ) + + if user_content.get("_admin").get("user_status") == "locked": + raise AuthException( + "Failed to login as the account is locked due to MANY FAILED ATTEMPTS" + ) + elif user_content.get("_admin").get("user_status") == "expired": + raise AuthException( + "Failed to login as the account is expired" + ) + + salt = user_content["_admin"]["salt"] + shadow_password = sha256( + password.encode("utf-8") + salt.encode("utf-8") + ).hexdigest() + if shadow_password != user_content["password"]: + count = 1 + if user_content.get("_admin").get("retry_count") >= 0: + count += user_content.get("_admin").get("retry_count") + self.db.set_one( + self.users_collection, + {"_id": user_content["_id"]}, + {"_admin.retry_count": count}, + ) + self.logger.debug( + "Failed Authentications count: {}".format(count) + ) + + if user_content.get("username") == "admin": + user_content = None + else: + if not self.config.get("user_management"): + user_content = None + else: + if ( + user_content.get("_admin").get("retry_count") + >= self.config["max_pwd_attempt"] - 1 + ): + self.db.set_one( + self.users_collection, + {"_id": user_content["_id"]}, + {"_admin.user_status": "locked"}, + ) + raise AuthException( + "Failed to login as the account is locked due to MANY FAILED ATTEMPTS" + ) + else: + user_content = None return user_content def authenticate(self, credentials, token_info=None): @@ -236,10 +355,14 @@ class AuthconnInternal(Authconn): sleep(self.token_delay) # user_content["_admin"]["last_token_time"] = now # self.db.replace("users", user_content["_id"], user_content) # might cause race conditions + user_data = { + "_admin.last_token_time": now, + "_admin.retry_count": 0, + } self.db.set_one( self.users_collection, {"_id": user_content["_id"]}, - {"_admin.last_token_time": now}, + user_data, ) token_id = "".join( @@ -299,6 +422,24 @@ class AuthconnInternal(Authconn): ] roles_list = [{"name": "project_admin", "id": rid}] + login_count = user_content.get("_admin").get("retry_count") + last_token_time = user_content.get("_admin").get("last_token_time") + + admin_show = False + user_show = False + if self.config.get("user_management"): + for role in roles_list: + role_id = role.get("id") + permission = self.db.get_one( + self.roles_collection, + {BaseTopic.id_field(self.roles_collection, role_id): role_id}, + ) + if permission.get("permissions")["admin"]: + if permission.get("permissions")["default"]: + admin_show = True + break + else: + user_show = True new_token = { "issued_at": now, "expires": now + 3600, @@ -310,6 +451,10 @@ class AuthconnInternal(Authconn): "user_id": user_content["_id"], "admin": token_admin, "roles": roles_list, + "login_count": login_count, + "last_login": last_token_time, + "admin_show": admin_show, + "user_show": user_show, } self.db.create(self.tokens_collection, new_token) @@ -370,15 +515,24 @@ class AuthconnInternal(Authconn): BaseTopic.format_on_new(user_info, make_public=False) salt = uuid4().hex user_info["_admin"]["salt"] = salt + user_info["_admin"]["user_status"] = "active" present = time() if not user_info["username"] == "admin": - if self.config.get("pwd_expiry_check"): - user_info["_admin"]["modified_time"] = present - user_info["_admin"]["expire_time"] = present + if self.config.get("user_management"): + user_info["_admin"]["modified"] = present + user_info["_admin"]["password_expire_time"] = present + account_expire_time = present + 86400 * self.config.get( + "account_expire_days" + ) + user_info["_admin"]["account_expire_time"] = account_expire_time + + user_info["_admin"]["retry_count"] = 0 + user_info["_admin"]["last_token_time"] = present if "password" in user_info: user_info["password"] = sha256( user_info["password"].encode("utf-8") + salt.encode("utf-8") ).hexdigest() + user_info["_admin"]["password_history"] = {salt: user_info["password"]} # "projects" are not stored any more if "projects" in user_info: del user_info["projects"] @@ -393,6 +547,10 @@ class AuthconnInternal(Authconn): """ uid = user_info["_id"] old_pwd = user_info.get("old_password") + unlock = user_info.get("unlock") + renew = user_info.get("renew") + permission_id = user_info.get("system_admin_id") + user_data = self.db.get_one( self.users_collection, {BaseTopic.id_field("users", uid): uid} ) @@ -405,6 +563,87 @@ class AuthconnInternal(Authconn): raise AuthconnConflictException( "Incorrect password", http_code=HTTPStatus.CONFLICT ) + # Unlocking the user + if unlock: + system_user = None + unlock_state = False + if not permission_id: + raise AuthconnConflictException( + "system_admin_id is the required field to unlock the user", + http_code=HTTPStatus.CONFLICT, + ) + else: + system_user = self.db.get_one( + self.users_collection, + { + BaseTopic.id_field( + self.users_collection, permission_id + ): permission_id + }, + ) + mapped_roles = system_user.get("project_role_mappings") + for role in mapped_roles: + role_id = role.get("role") + role_assigned = self.db.get_one( + self.roles_collection, + {BaseTopic.id_field(self.roles_collection, role_id): role_id}, + ) + if role_assigned.get("permissions")["admin"]: + if role_assigned.get("permissions")["default"]: + user_data["_admin"]["retry_count"] = 0 + user_data["_admin"]["user_status"] = "active" + unlock_state = True + break + if not unlock_state: + raise AuthconnConflictException( + "User '{}' does not have the privilege to unlock the user".format( + permission_id + ), + http_code=HTTPStatus.CONFLICT, + ) + # Renewing the user + if renew: + system_user = None + renew_state = False + if not permission_id: + raise AuthconnConflictException( + "system_admin_id is the required field to renew the user", + http_code=HTTPStatus.CONFLICT, + ) + else: + system_user = self.db.get_one( + self.users_collection, + { + BaseTopic.id_field( + self.users_collection, permission_id + ): permission_id + }, + ) + mapped_roles = system_user.get("project_role_mappings") + for role in mapped_roles: + role_id = role.get("role") + role_assigned = self.db.get_one( + self.roles_collection, + {BaseTopic.id_field(self.roles_collection, role_id): role_id}, + ) + if role_assigned.get("permissions")["admin"]: + if role_assigned.get("permissions")["default"]: + present = time() + account_expire = ( + present + 86400 * self.config["account_expire_days"] + ) + user_data["_admin"]["modified"] = present + user_data["_admin"]["account_expire_time"] = account_expire + user_data["_admin"]["user_status"] = "active" + renew_state = True + break + if not renew_state: + raise AuthconnConflictException( + "User '{}' does not have the privilege to renew the user".format( + permission_id + ), + http_code=HTTPStatus.CONFLICT, + ) BaseTopic.format_on_edit(user_data, user_info) # User Name usnm = user_info.get("username") @@ -428,17 +667,33 @@ class AuthconnInternal(Authconn): salt = uuid4().hex if "_admin" not in user_data: user_data["_admin"] = {} + if user_data.get("_admin").get("password_history"): + old_pwds = user_data.get("_admin").get("password_history") + else: + old_pwds = {} + for k, v in old_pwds.items(): + shadow_password = sha256( + pswd.encode("utf-8") + k.encode("utf-8") + ).hexdigest() + if v == shadow_password: + raise AuthconnConflictException( + "Password is used before", http_code=HTTPStatus.CONFLICT + ) user_data["_admin"]["salt"] = salt user_data["password"] = sha256( pswd.encode("utf-8") + salt.encode("utf-8") ).hexdigest() + if len(old_pwds) >= 3: + old_pwds.pop(list(old_pwds.keys())[0]) + old_pwds.update({salt: user_data["password"]}) + user_data["_admin"]["password_history"] = old_pwds if not user_data["username"] == "admin": - if self.config.get("pwd_expiry_check"): + if self.config.get("user_management"): present = time() - if self.config.get("days"): - expire = present + 86400 * self.config.get("days") - user_data["_admin"]["modified_time"] = present - user_data["_admin"]["expire_time"] = expire + if self.config.get("pwd_expire_days"): + expire = present + 86400 * self.config.get("pwd_expire_days") + user_data["_admin"]["modified"] = present + user_data["_admin"]["password_expire_time"] = expire # Project-Role Mappings # TODO: Check that user_info NEVER includes "project_role_mappings" if "project_role_mappings" not in user_data: diff --git a/osm_nbi/nbi.cfg b/osm_nbi/nbi.cfg index 49f4985..7035cae 100644 --- a/osm_nbi/nbi.cfg +++ b/osm_nbi/nbi.cfg @@ -116,9 +116,11 @@ backend: "keystone" # internal or keystone or tacacs # tacacs_port: 49 # Default value # tacacs_timeout: 10 # Default value -# Password expiry configuration -# pwd_expiry_check: True # Uncomment to enable the password expiry check -# days: 30 # Default value +# User Management configuration +user_management: True +pwd_expire_days: 30 # Password expiry Default value +max_pwd_attempt: 5 +account_expire_days: 90 # Account expiry Default value # CEF Configuration version: "0" diff --git a/osm_nbi/nbi.py b/osm_nbi/nbi.py index d62e8a2..46fd8cc 100644 --- a/osm_nbi/nbi.py +++ b/osm_nbi/nbi.py @@ -1934,6 +1934,9 @@ def _start_service(): engine_config = cherrypy.tree.apps["/osm"].config for k, v in environ.items(): + if k == "OSMNBI_USER_MANAGEMENT": + feature_state = eval(v.title()) + engine_config["authentication"]["user_management"] = feature_state if not k.startswith("OSMNBI_"): continue k1, _, k2 = k[7:].lower().partition("_") diff --git a/osm_nbi/tests/test_admin_topics.py b/osm_nbi/tests/test_admin_topics.py index 90f4acd..c53e268 100755 --- a/osm_nbi/tests/test_admin_topics.py +++ b/osm_nbi/tests/test_admin_topics.py @@ -25,6 +25,7 @@ from uuid import uuid4 from http import HTTPStatus from time import time from osm_common import dbbase, fsbase, msgbase +from osm_common.dbmemory import DbMemory from osm_nbi import authconn, validation from osm_nbi.admin_topics import ( ProjectTopicAuth, @@ -35,6 +36,7 @@ from osm_nbi.admin_topics import ( ) from osm_nbi.engine import EngineException from osm_nbi.authconn import AuthconnNotFoundException +from osm_nbi.authconn_internal import AuthconnInternal test_pid = str(uuid4()) @@ -777,9 +779,11 @@ class Test_UserTopicAuth(TestCase): @classmethod def setUpClass(cls): cls.test_name = "test-user-topic" + cls.password = "Test@123" def setUp(self): - self.db = Mock(dbbase.DbBase()) + # self.db = Mock(dbbase.DbBase()) + self.db = DbMemory() self.fs = Mock(fsbase.FsBase()) self.msg = Mock(msgbase.MsgBase()) self.auth = Mock(authconn.Authconn(None, None, None)) @@ -812,7 +816,7 @@ class Test_UserTopicAuth(TestCase): self.fake_session, { "username": self.test_name, - "password": self.test_name, + "password": self.password, "project_role_mappings": prms_in, }, ) @@ -820,7 +824,7 @@ class Test_UserTopicAuth(TestCase): self.assertEqual(uid2, uid1, "Wrong project identifier") content = self.auth.create_user.call_args[0][0] self.assertEqual(content["username"], self.test_name, "Wrong project name") - self.assertEqual(content["password"], self.test_name, "Wrong password") + self.assertEqual(content["password"], self.password, "Wrong password") self.assertEqual( content["project_role_mappings"], prms_out, @@ -844,7 +848,7 @@ class Test_UserTopicAuth(TestCase): self.fake_session, { "username": self.test_name, - "password": self.test_name, + "password": self.password, "projects": ["some_project"], }, ) @@ -852,7 +856,7 @@ class Test_UserTopicAuth(TestCase): self.assertEqual(uid2, uid1, "Wrong project identifier") content = self.auth.create_user.call_args[0][0] self.assertEqual(content["username"], self.test_name, "Wrong project name") - self.assertEqual(content["password"], self.test_name, "Wrong password") + self.assertEqual(content["password"], self.password, "Wrong password") self.assertEqual( content["project_role_mappings"], prms_out, @@ -874,7 +878,7 @@ class Test_UserTopicAuth(TestCase): self.fake_session, { "username": "other-project-name", - "password": "other-password", + "password": "Other@pwd1", "project_role_mappings": [{}], }, ) @@ -899,7 +903,7 @@ class Test_UserTopicAuth(TestCase): self.fake_session, { "username": "other-project-name", - "password": "other-password", + "password": "Other@pwd1", "projects": [], }, ) @@ -950,7 +954,7 @@ class Test_UserTopicAuth(TestCase): {"_id": rid1, "name": "role-1"}, ] new_name = "new-user-name" - new_pasw = "new-password" + new_pasw = "New@pwd1" add_prms = [{"project": pid2, "role": rid2}] rem_prms = [{"project": pid1, "role": rid1}] self.topic.edit( @@ -1005,8 +1009,8 @@ class Test_UserTopicAuth(TestCase): with self.subTest(i=3): self.auth.get_user_list.side_effect = [[user], []] self.auth.get_user.return_value = user - old_password = self.test_name - new_pasw = "new-password" + old_password = self.password + new_pasw = "New@pwd1" self.topic.edit( self.fake_session, uid, @@ -1053,7 +1057,7 @@ class Test_UserTopicAuth(TestCase): self.fake_session, { "username": uid, - "password": self.test_name, + "password": self.password, "projects": [test_pid], }, ) @@ -1081,7 +1085,7 @@ class Test_UserTopicAuth(TestCase): self.fake_session, { "username": self.test_name, - "password": self.test_name, + "password": self.password, "projects": [test_pid], }, ) @@ -1106,7 +1110,7 @@ class Test_UserTopicAuth(TestCase): self.fake_session, { "username": self.test_name, - "password": self.test_name, + "password": self.password, "projects": [str(uuid4())], }, ) @@ -1226,6 +1230,140 @@ class Test_UserTopicAuth(TestCase): "Wrong exception text", ) + def test_user_management(self): + self.config = { + "user_management": True, + "pwd_expire_days": 30, + "max_pwd_attempt": 5, + "account_expire_days": 90, + } + self.permissions = {"admin": True, "default": True} + now = time() + rid = str(uuid4()) + role = { + "_id": rid, + "name": self.test_name, + "permissions": self.permissions, + "_admin": {"created": now, "modified": now}, + } + self.db.create("roles", role) + admin_user = { + "_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500", + "username": "admin", + "password": "bf0d9f988ad9b404464cf8c8749b298209b05fd404119bae0c11e247efbbc4cb", + "_admin": { + "created": 1663058370.7721832, + "modified": 1663681183.5651639, + "salt": "37587e7e0c2f4dbfb9416f3fb5543e2b", + "last_token_time": 1666876472.2962265, + "user_status": "always-active", + "retry_count": 0, + }, + "project_role_mappings": [ + {"project": "a595ce4e-09dc-4b24-9d6f-e723830bc66b", "role": rid} + ], + } + self.db.create("users", admin_user) + with self.subTest(i=1): + self.user_create = AuthconnInternal(self.config, self.db, self.permissions) + user_info = {"username": "user_mgmt_true", "password": "Test@123"} + self.user_create.create_user(user_info) + user = self.db.get_one("users", {"username": user_info["username"]}) + self.assertEqual(user["username"], user_info["username"], "Wrong user name") + self.assertEqual( + user["_admin"]["user_status"], "active", "User status is unknown" + ) + self.assertIn("password_expire_time", user["_admin"], "Key is not there") + self.assertIn("account_expire_time", user["_admin"], "Key is not there") + with self.subTest(i=2): + self.user_update = AuthconnInternal(self.config, self.db, self.permissions) + locked_user = { + "username": "user_lock", + "password": "c94ba8cfe81985cf5c84dff16d5bac95814ab17e44a8871755eb4cf3a27b7d3d", + "_admin": { + "created": 1667207552.2191198, + "modified": 1667207552.2191815, + "salt": "560a5d51b1d64bb4b9cae0ccff3f1102", + "user_status": "locked", + "password_expire_time": 1667207552.2191815, + "account_expire_time": 1674983552.2191815, + "retry_count": 5, + "last_token_time": 1667207552.2191815, + }, + "_id": "73bbbb71-ed38-4b79-9f58-ece19e7e32d6", + } + self.db.create("users", locked_user) + user_info = { + "_id": "73bbbb71-ed38-4b79-9f58-ece19e7e32d6", + "system_admin_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500", + "unlock": True, + } + self.assertEqual( + locked_user["_admin"]["user_status"], "locked", "User status is unknown" + ) + self.user_update.update_user(user_info) + user = self.db.get_one("users", {"username": locked_user["username"]}) + self.assertEqual( + user["username"], locked_user["username"], "Wrong user name" + ) + self.assertEqual( + user["_admin"]["user_status"], "active", "User status is unknown" + ) + self.assertEqual(user["_admin"]["retry_count"], 0, "retry_count is unknown") + with self.subTest(i=3): + self.user_update = AuthconnInternal(self.config, self.db, self.permissions) + expired_user = { + "username": "user_expire", + "password": "c94ba8cfe81985cf5c84dff16d5bac95814ab17e44a8871755eb4cf3a27b7d3d", + "_admin": { + "created": 1665602087.601298, + "modified": 1665636442.1245084, + "salt": "560a5d51b1d64bb4b9cae0ccff3f1102", + "user_status": "expired", + "password_expire_time": 1668248628.2191815, + "account_expire_time": 1666952628.2191815, + "retry_count": 0, + "last_token_time": 1666779828.2171815, + }, + "_id": "3266430f-8222-407f-b08f-3a242504ab94", + } + self.db.create("users", expired_user) + user_info = { + "_id": "3266430f-8222-407f-b08f-3a242504ab94", + "system_admin_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500", + "renew": True, + } + self.assertEqual( + expired_user["_admin"]["user_status"], + "expired", + "User status is unknown", + ) + self.user_update.update_user(user_info) + user = self.db.get_one("users", {"username": expired_user["username"]}) + self.assertEqual( + user["username"], expired_user["username"], "Wrong user name" + ) + self.assertEqual( + user["_admin"]["user_status"], "active", "User status is unknown" + ) + self.assertGreater( + user["_admin"]["account_expire_time"], + expired_user["_admin"]["account_expire_time"], + "User expire time is not get extended", + ) + with self.subTest(i=4): + self.config.update({"user_management": False}) + self.user_create = AuthconnInternal(self.config, self.db, self.permissions) + user_info = {"username": "user_mgmt_false", "password": "Test@123"} + self.user_create.create_user(user_info) + user = self.db.get_one("users", {"username": user_info["username"]}) + self.assertEqual(user["username"], user_info["username"], "Wrong user name") + self.assertEqual( + user["_admin"]["user_status"], "active", "User status is unknown" + ) + self.assertNotIn("password_expire_time", user["_admin"], "Key is not there") + self.assertNotIn("account_expire_time", user["_admin"], "Key is not there") + class Test_CommonVimWimSdn(TestCase): @classmethod diff --git a/osm_nbi/validation.py b/osm_nbi/validation.py index e5c2a03..4dafed5 100644 --- a/osm_nbi/validation.py +++ b/osm_nbi/validation.py @@ -35,6 +35,10 @@ shortname_schema = { "pattern": "^[^,;()\\.\\$'\"]+$", } passwd_schema = {"type": "string", "minLength": 1, "maxLength": 60} +user_passwd_schema = { + "type": "string", + "pattern": "^.*(?=.{8,})((?=.*[!@#$%^&*()\\-_=+{};:,<.>]){1})(?=.*\\d)((?=.*[a-z]){1})((?=.*[A-Z]){1}).*$", +} name_schema = { "type": "string", "minLength": 1, @@ -1082,7 +1086,7 @@ user_new_schema = { "properties": { "username": string_schema, "domain_name": shortname_schema, - "password": passwd_schema, + "password": user_passwd_schema, "projects": nameshort_list_schema, "project_role_mappings": project_role_mappings, }, @@ -1094,13 +1098,16 @@ user_edit_schema = { "title": "User edit schema for administrators", "type": "object", "properties": { - "password": passwd_schema, + "password": user_passwd_schema, "old_password": passwd_schema, "username": string_schema, # To allow User Name modification "projects": {"oneOf": [nameshort_list_schema, array_edition_schema]}, "project_role_mappings": project_role_mappings, "add_project_role_mappings": project_role_mappings, "remove_project_role_mappings": project_role_mappings_optional, + "system_admin_id": id_schema, + "unlock": bool_schema, + "renew": bool_schema, }, "minProperties": 1, "additionalProperties": False, -- 2.17.1