or indata.get("project_role_mappings")
or indata.get("projects")
or indata.get("add_projects")
+ or indata.get("unlock")
+ or indata.get("renew")
):
return _id
if indata.get("project_role_mappings") and (
"old_password": indata.get("old_password"),
"add_project_role_mappings": mappings_to_add,
"remove_project_role_mappings": mappings_to_remove,
+ "system_admin_id": indata.get("system_admin_id"),
+ "unlock": indata.get("unlock"),
+ "renew": indata.get("renew"),
}
)
data_to_send = {"_id": _id, "changes": indata}
user_desc = {
"username": "admin",
"password": "admin",
- "_admin": {"created": now, "modified": now},
+ "_admin": {"created": now, "modified": now, "user_status": "always-active"},
}
if project_id:
pid = project_id
This method will check for password expiry of the user
:param outdata: user token information
"""
- user_content = None
+ user_list = None
present_time = time()
user = outdata["username"]
- if self.config["authentication"].get("pwd_expiry_check"):
- user_content = self.db.get_list("users", {"username": user})[0]
- if not user_content.get("username") == "admin":
- user_content["_admin"]["modified_time"] = present_time
- if user_content.get("_admin").get("expire_time"):
- expire_time = user_content["_admin"]["expire_time"]
- else:
- expire_time = present_time
- uid = user_content["_id"]
- self.db.set_one("users", {"_id": uid}, user_content)
- if not present_time < expire_time:
- return True
+ if self.config["authentication"].get("user_management"):
+ user_list = self.db.get_list("users", {"username": user})
+ if user_list:
+ user_content = user_list[0]
+ if not user_content.get("username") == "admin":
+ user_content["_admin"]["modified"] = present_time
+ if user_content.get("_admin").get("password_expire_time"):
+ password_expire_time = user_content["_admin"][
+ "password_expire_time"
+ ]
+ else:
+ password_expire_time = present_time
+ uid = user_content["_id"]
+ self.db.set_one("users", {"_id": uid}, user_content)
+ if not present_time < password_expire_time:
+ return True
else:
pass
user_rows = self.db.get_list(
self.users_collection, {BaseTopic.id_field("users", user): user}
)
+ now = time()
user_content = None
- if user_rows:
- user_content = user_rows[0]
- salt = user_content["_admin"]["salt"]
- shadow_password = sha256(
- password.encode("utf-8") + salt.encode("utf-8")
- ).hexdigest()
- if shadow_password != user_content["password"]:
- user_content = None
+ if user:
+ user_rows = self.db.get_list(
+ self.users_collection,
+ {BaseTopic.id_field(self.users_collection, user): user},
+ )
+ if user_rows:
+ user_content = user_rows[0]
+ # Updating user_status for every system_admin id role login
+ mapped_roles = user_content.get("project_role_mappings")
+ for role in mapped_roles:
+ role_id = role.get("role")
+ role_assigned = self.db.get_one(
+ self.roles_collection,
+ {BaseTopic.id_field(self.roles_collection, role_id): role_id},
+ )
+
+ if role_assigned.get("permissions")["admin"]:
+ if role_assigned.get("permissions")["default"]:
+ if self.config.get("user_management"):
+ filt = {}
+ users = self.db.get_list(self.users_collection, filt)
+ for user_info in users:
+ if not user_info.get("username") == "admin":
+ if not user_info.get("_admin").get(
+ "account_expire_time"
+ ):
+ expire = now + 86400 * self.config.get(
+ "account_expire_days"
+ )
+ self.db.set_one(
+ self.users_collection,
+ {"_id": user_info["_id"]},
+ {"_admin.account_expire_time": expire},
+ )
+ else:
+ if now > user_info.get("_admin").get(
+ "account_expire_time"
+ ):
+ self.db.set_one(
+ self.users_collection,
+ {"_id": user_info["_id"]},
+ {"_admin.user_status": "expired"},
+ )
+ break
+
+ # To add "admin" user_status key while upgrading osm setup with feature enabled
+ if user_content.get("username") == "admin":
+ if self.config.get("user_management"):
+ self.db.set_one(
+ self.users_collection,
+ {"_id": user_content["_id"]},
+ {"_admin.user_status": "always-active"},
+ )
+
+ if not user_content.get("username") == "admin":
+ if self.config.get("user_management"):
+ if not user_content.get("_admin").get("account_expire_time"):
+ account_expire_time = now + 86400 * self.config.get(
+ "account_expire_days"
+ )
+ self.db.set_one(
+ self.users_collection,
+ {"_id": user_content["_id"]},
+ {"_admin.account_expire_time": account_expire_time},
+ )
+ else:
+ account_expire_time = user_content.get("_admin").get(
+ "account_expire_time"
+ )
+
+ if now > account_expire_time:
+ self.db.set_one(
+ self.users_collection,
+ {"_id": user_content["_id"]},
+ {"_admin.user_status": "expired"},
+ )
+ raise AuthException(
+ "Account expired", http_code=HTTPStatus.UNAUTHORIZED
+ )
+
+ if user_content.get("_admin").get("user_status") == "locked":
+ raise AuthException(
+ "Failed to login as the account is locked due to MANY FAILED ATTEMPTS"
+ )
+ elif user_content.get("_admin").get("user_status") == "expired":
+ raise AuthException(
+ "Failed to login as the account is expired"
+ )
+
+ salt = user_content["_admin"]["salt"]
+ shadow_password = sha256(
+ password.encode("utf-8") + salt.encode("utf-8")
+ ).hexdigest()
+ if shadow_password != user_content["password"]:
+ count = 1
+ if user_content.get("_admin").get("retry_count") >= 0:
+ count += user_content.get("_admin").get("retry_count")
+ self.db.set_one(
+ self.users_collection,
+ {"_id": user_content["_id"]},
+ {"_admin.retry_count": count},
+ )
+ self.logger.debug(
+ "Failed Authentications count: {}".format(count)
+ )
+
+ if user_content.get("username") == "admin":
+ user_content = None
+ else:
+ if not self.config.get("user_management"):
+ user_content = None
+ else:
+ if (
+ user_content.get("_admin").get("retry_count")
+ >= self.config["max_pwd_attempt"] - 1
+ ):
+ self.db.set_one(
+ self.users_collection,
+ {"_id": user_content["_id"]},
+ {"_admin.user_status": "locked"},
+ )
+ raise AuthException(
+ "Failed to login as the account is locked due to MANY FAILED ATTEMPTS"
+ )
+ else:
+ user_content = None
return user_content
def authenticate(self, credentials, token_info=None):
sleep(self.token_delay)
# user_content["_admin"]["last_token_time"] = now
# self.db.replace("users", user_content["_id"], user_content) # might cause race conditions
+ user_data = {
+ "_admin.last_token_time": now,
+ "_admin.retry_count": 0,
+ }
self.db.set_one(
self.users_collection,
{"_id": user_content["_id"]},
- {"_admin.last_token_time": now},
+ user_data,
)
token_id = "".join(
]
roles_list = [{"name": "project_admin", "id": rid}]
+ login_count = user_content.get("_admin").get("retry_count")
+ last_token_time = user_content.get("_admin").get("last_token_time")
+
+ admin_show = False
+ user_show = False
+ if self.config.get("user_management"):
+ for role in roles_list:
+ role_id = role.get("id")
+ permission = self.db.get_one(
+ self.roles_collection,
+ {BaseTopic.id_field(self.roles_collection, role_id): role_id},
+ )
+ if permission.get("permissions")["admin"]:
+ if permission.get("permissions")["default"]:
+ admin_show = True
+ break
+ else:
+ user_show = True
new_token = {
"issued_at": now,
"expires": now + 3600,
"user_id": user_content["_id"],
"admin": token_admin,
"roles": roles_list,
+ "login_count": login_count,
+ "last_login": last_token_time,
+ "admin_show": admin_show,
+ "user_show": user_show,
}
self.db.create(self.tokens_collection, new_token)
BaseTopic.format_on_new(user_info, make_public=False)
salt = uuid4().hex
user_info["_admin"]["salt"] = salt
+ user_info["_admin"]["user_status"] = "active"
present = time()
if not user_info["username"] == "admin":
- if self.config.get("pwd_expiry_check"):
- user_info["_admin"]["modified_time"] = present
- user_info["_admin"]["expire_time"] = present
+ if self.config.get("user_management"):
+ user_info["_admin"]["modified"] = present
+ user_info["_admin"]["password_expire_time"] = present
+ account_expire_time = present + 86400 * self.config.get(
+ "account_expire_days"
+ )
+ user_info["_admin"]["account_expire_time"] = account_expire_time
+
+ user_info["_admin"]["retry_count"] = 0
+ user_info["_admin"]["last_token_time"] = present
if "password" in user_info:
user_info["password"] = sha256(
user_info["password"].encode("utf-8") + salt.encode("utf-8")
).hexdigest()
+ user_info["_admin"]["password_history"] = {salt: user_info["password"]}
# "projects" are not stored any more
if "projects" in user_info:
del user_info["projects"]
"""
uid = user_info["_id"]
old_pwd = user_info.get("old_password")
+ unlock = user_info.get("unlock")
+ renew = user_info.get("renew")
+ permission_id = user_info.get("system_admin_id")
+
user_data = self.db.get_one(
self.users_collection, {BaseTopic.id_field("users", uid): uid}
)
raise AuthconnConflictException(
"Incorrect password", http_code=HTTPStatus.CONFLICT
)
+ # Unlocking the user
+ if unlock:
+ system_user = None
+ unlock_state = False
+ if not permission_id:
+ raise AuthconnConflictException(
+ "system_admin_id is the required field to unlock the user",
+ http_code=HTTPStatus.CONFLICT,
+ )
+ else:
+ system_user = self.db.get_one(
+ self.users_collection,
+ {
+ BaseTopic.id_field(
+ self.users_collection, permission_id
+ ): permission_id
+ },
+ )
+ mapped_roles = system_user.get("project_role_mappings")
+ for role in mapped_roles:
+ role_id = role.get("role")
+ role_assigned = self.db.get_one(
+ self.roles_collection,
+ {BaseTopic.id_field(self.roles_collection, role_id): role_id},
+ )
+ if role_assigned.get("permissions")["admin"]:
+ if role_assigned.get("permissions")["default"]:
+ user_data["_admin"]["retry_count"] = 0
+ user_data["_admin"]["user_status"] = "active"
+ unlock_state = True
+ break
+ if not unlock_state:
+ raise AuthconnConflictException(
+ "User '{}' does not have the privilege to unlock the user".format(
+ permission_id
+ ),
+ http_code=HTTPStatus.CONFLICT,
+ )
+ # Renewing the user
+ if renew:
+ system_user = None
+ renew_state = False
+ if not permission_id:
+ raise AuthconnConflictException(
+ "system_admin_id is the required field to renew the user",
+ http_code=HTTPStatus.CONFLICT,
+ )
+ else:
+ system_user = self.db.get_one(
+ self.users_collection,
+ {
+ BaseTopic.id_field(
+ self.users_collection, permission_id
+ ): permission_id
+ },
+ )
+ mapped_roles = system_user.get("project_role_mappings")
+ for role in mapped_roles:
+ role_id = role.get("role")
+ role_assigned = self.db.get_one(
+ self.roles_collection,
+ {BaseTopic.id_field(self.roles_collection, role_id): role_id},
+ )
+ if role_assigned.get("permissions")["admin"]:
+ if role_assigned.get("permissions")["default"]:
+ present = time()
+ account_expire = (
+ present + 86400 * self.config["account_expire_days"]
+ )
+ user_data["_admin"]["modified"] = present
+ user_data["_admin"]["account_expire_time"] = account_expire
+ user_data["_admin"]["user_status"] = "active"
+ renew_state = True
+ break
+ if not renew_state:
+ raise AuthconnConflictException(
+ "User '{}' does not have the privilege to renew the user".format(
+ permission_id
+ ),
+ http_code=HTTPStatus.CONFLICT,
+ )
BaseTopic.format_on_edit(user_data, user_info)
# User Name
usnm = user_info.get("username")
salt = uuid4().hex
if "_admin" not in user_data:
user_data["_admin"] = {}
+ if user_data.get("_admin").get("password_history"):
+ old_pwds = user_data.get("_admin").get("password_history")
+ else:
+ old_pwds = {}
+ for k, v in old_pwds.items():
+ shadow_password = sha256(
+ pswd.encode("utf-8") + k.encode("utf-8")
+ ).hexdigest()
+ if v == shadow_password:
+ raise AuthconnConflictException(
+ "Password is used before", http_code=HTTPStatus.CONFLICT
+ )
user_data["_admin"]["salt"] = salt
user_data["password"] = sha256(
pswd.encode("utf-8") + salt.encode("utf-8")
).hexdigest()
+ if len(old_pwds) >= 3:
+ old_pwds.pop(list(old_pwds.keys())[0])
+ old_pwds.update({salt: user_data["password"]})
+ user_data["_admin"]["password_history"] = old_pwds
if not user_data["username"] == "admin":
- if self.config.get("pwd_expiry_check"):
+ if self.config.get("user_management"):
present = time()
- if self.config.get("days"):
- expire = present + 86400 * self.config.get("days")
- user_data["_admin"]["modified_time"] = present
- user_data["_admin"]["expire_time"] = expire
+ if self.config.get("pwd_expire_days"):
+ expire = present + 86400 * self.config.get("pwd_expire_days")
+ user_data["_admin"]["modified"] = present
+ user_data["_admin"]["password_expire_time"] = expire
# Project-Role Mappings
# TODO: Check that user_info NEVER includes "project_role_mappings"
if "project_role_mappings" not in user_data:
# tacacs_port: 49 # Default value
# tacacs_timeout: 10 # Default value
-# Password expiry configuration
-# pwd_expiry_check: True # Uncomment to enable the password expiry check
-# days: 30 # Default value
+# User Management configuration
+user_management: True
+pwd_expire_days: 30 # Password expiry Default value
+max_pwd_attempt: 5
+account_expire_days: 90 # Account expiry Default value
# CEF Configuration
version: "0"
engine_config = cherrypy.tree.apps["/osm"].config
for k, v in environ.items():
+ if k == "OSMNBI_USER_MANAGEMENT":
+ feature_state = eval(v.title())
+ engine_config["authentication"]["user_management"] = feature_state
if not k.startswith("OSMNBI_"):
continue
k1, _, k2 = k[7:].lower().partition("_")
from http import HTTPStatus
from time import time
from osm_common import dbbase, fsbase, msgbase
+from osm_common.dbmemory import DbMemory
from osm_nbi import authconn, validation
from osm_nbi.admin_topics import (
ProjectTopicAuth,
)
from osm_nbi.engine import EngineException
from osm_nbi.authconn import AuthconnNotFoundException
+from osm_nbi.authconn_internal import AuthconnInternal
test_pid = str(uuid4())
@classmethod
def setUpClass(cls):
cls.test_name = "test-user-topic"
+ cls.password = "Test@123"
def setUp(self):
- self.db = Mock(dbbase.DbBase())
+ # self.db = Mock(dbbase.DbBase())
+ self.db = DbMemory()
self.fs = Mock(fsbase.FsBase())
self.msg = Mock(msgbase.MsgBase())
self.auth = Mock(authconn.Authconn(None, None, None))
self.fake_session,
{
"username": self.test_name,
- "password": self.test_name,
+ "password": self.password,
"project_role_mappings": prms_in,
},
)
self.assertEqual(uid2, uid1, "Wrong project identifier")
content = self.auth.create_user.call_args[0][0]
self.assertEqual(content["username"], self.test_name, "Wrong project name")
- self.assertEqual(content["password"], self.test_name, "Wrong password")
+ self.assertEqual(content["password"], self.password, "Wrong password")
self.assertEqual(
content["project_role_mappings"],
prms_out,
self.fake_session,
{
"username": self.test_name,
- "password": self.test_name,
+ "password": self.password,
"projects": ["some_project"],
},
)
self.assertEqual(uid2, uid1, "Wrong project identifier")
content = self.auth.create_user.call_args[0][0]
self.assertEqual(content["username"], self.test_name, "Wrong project name")
- self.assertEqual(content["password"], self.test_name, "Wrong password")
+ self.assertEqual(content["password"], self.password, "Wrong password")
self.assertEqual(
content["project_role_mappings"],
prms_out,
self.fake_session,
{
"username": "other-project-name",
- "password": "other-password",
+ "password": "Other@pwd1",
"project_role_mappings": [{}],
},
)
self.fake_session,
{
"username": "other-project-name",
- "password": "other-password",
+ "password": "Other@pwd1",
"projects": [],
},
)
{"_id": rid1, "name": "role-1"},
]
new_name = "new-user-name"
- new_pasw = "new-password"
+ new_pasw = "New@pwd1"
add_prms = [{"project": pid2, "role": rid2}]
rem_prms = [{"project": pid1, "role": rid1}]
self.topic.edit(
with self.subTest(i=3):
self.auth.get_user_list.side_effect = [[user], []]
self.auth.get_user.return_value = user
- old_password = self.test_name
- new_pasw = "new-password"
+ old_password = self.password
+ new_pasw = "New@pwd1"
self.topic.edit(
self.fake_session,
uid,
self.fake_session,
{
"username": uid,
- "password": self.test_name,
+ "password": self.password,
"projects": [test_pid],
},
)
self.fake_session,
{
"username": self.test_name,
- "password": self.test_name,
+ "password": self.password,
"projects": [test_pid],
},
)
self.fake_session,
{
"username": self.test_name,
- "password": self.test_name,
+ "password": self.password,
"projects": [str(uuid4())],
},
)
"Wrong exception text",
)
+ def test_user_management(self):
+ self.config = {
+ "user_management": True,
+ "pwd_expire_days": 30,
+ "max_pwd_attempt": 5,
+ "account_expire_days": 90,
+ }
+ self.permissions = {"admin": True, "default": True}
+ now = time()
+ rid = str(uuid4())
+ role = {
+ "_id": rid,
+ "name": self.test_name,
+ "permissions": self.permissions,
+ "_admin": {"created": now, "modified": now},
+ }
+ self.db.create("roles", role)
+ admin_user = {
+ "_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500",
+ "username": "admin",
+ "password": "bf0d9f988ad9b404464cf8c8749b298209b05fd404119bae0c11e247efbbc4cb",
+ "_admin": {
+ "created": 1663058370.7721832,
+ "modified": 1663681183.5651639,
+ "salt": "37587e7e0c2f4dbfb9416f3fb5543e2b",
+ "last_token_time": 1666876472.2962265,
+ "user_status": "always-active",
+ "retry_count": 0,
+ },
+ "project_role_mappings": [
+ {"project": "a595ce4e-09dc-4b24-9d6f-e723830bc66b", "role": rid}
+ ],
+ }
+ self.db.create("users", admin_user)
+ with self.subTest(i=1):
+ self.user_create = AuthconnInternal(self.config, self.db, self.permissions)
+ user_info = {"username": "user_mgmt_true", "password": "Test@123"}
+ self.user_create.create_user(user_info)
+ user = self.db.get_one("users", {"username": user_info["username"]})
+ self.assertEqual(user["username"], user_info["username"], "Wrong user name")
+ self.assertEqual(
+ user["_admin"]["user_status"], "active", "User status is unknown"
+ )
+ self.assertIn("password_expire_time", user["_admin"], "Key is not there")
+ self.assertIn("account_expire_time", user["_admin"], "Key is not there")
+ with self.subTest(i=2):
+ self.user_update = AuthconnInternal(self.config, self.db, self.permissions)
+ locked_user = {
+ "username": "user_lock",
+ "password": "c94ba8cfe81985cf5c84dff16d5bac95814ab17e44a8871755eb4cf3a27b7d3d",
+ "_admin": {
+ "created": 1667207552.2191198,
+ "modified": 1667207552.2191815,
+ "salt": "560a5d51b1d64bb4b9cae0ccff3f1102",
+ "user_status": "locked",
+ "password_expire_time": 1667207552.2191815,
+ "account_expire_time": 1674983552.2191815,
+ "retry_count": 5,
+ "last_token_time": 1667207552.2191815,
+ },
+ "_id": "73bbbb71-ed38-4b79-9f58-ece19e7e32d6",
+ }
+ self.db.create("users", locked_user)
+ user_info = {
+ "_id": "73bbbb71-ed38-4b79-9f58-ece19e7e32d6",
+ "system_admin_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500",
+ "unlock": True,
+ }
+ self.assertEqual(
+ locked_user["_admin"]["user_status"], "locked", "User status is unknown"
+ )
+ self.user_update.update_user(user_info)
+ user = self.db.get_one("users", {"username": locked_user["username"]})
+ self.assertEqual(
+ user["username"], locked_user["username"], "Wrong user name"
+ )
+ self.assertEqual(
+ user["_admin"]["user_status"], "active", "User status is unknown"
+ )
+ self.assertEqual(user["_admin"]["retry_count"], 0, "retry_count is unknown")
+ with self.subTest(i=3):
+ self.user_update = AuthconnInternal(self.config, self.db, self.permissions)
+ expired_user = {
+ "username": "user_expire",
+ "password": "c94ba8cfe81985cf5c84dff16d5bac95814ab17e44a8871755eb4cf3a27b7d3d",
+ "_admin": {
+ "created": 1665602087.601298,
+ "modified": 1665636442.1245084,
+ "salt": "560a5d51b1d64bb4b9cae0ccff3f1102",
+ "user_status": "expired",
+ "password_expire_time": 1668248628.2191815,
+ "account_expire_time": 1666952628.2191815,
+ "retry_count": 0,
+ "last_token_time": 1666779828.2171815,
+ },
+ "_id": "3266430f-8222-407f-b08f-3a242504ab94",
+ }
+ self.db.create("users", expired_user)
+ user_info = {
+ "_id": "3266430f-8222-407f-b08f-3a242504ab94",
+ "system_admin_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500",
+ "renew": True,
+ }
+ self.assertEqual(
+ expired_user["_admin"]["user_status"],
+ "expired",
+ "User status is unknown",
+ )
+ self.user_update.update_user(user_info)
+ user = self.db.get_one("users", {"username": expired_user["username"]})
+ self.assertEqual(
+ user["username"], expired_user["username"], "Wrong user name"
+ )
+ self.assertEqual(
+ user["_admin"]["user_status"], "active", "User status is unknown"
+ )
+ self.assertGreater(
+ user["_admin"]["account_expire_time"],
+ expired_user["_admin"]["account_expire_time"],
+ "User expire time is not get extended",
+ )
+ with self.subTest(i=4):
+ self.config.update({"user_management": False})
+ self.user_create = AuthconnInternal(self.config, self.db, self.permissions)
+ user_info = {"username": "user_mgmt_false", "password": "Test@123"}
+ self.user_create.create_user(user_info)
+ user = self.db.get_one("users", {"username": user_info["username"]})
+ self.assertEqual(user["username"], user_info["username"], "Wrong user name")
+ self.assertEqual(
+ user["_admin"]["user_status"], "active", "User status is unknown"
+ )
+ self.assertNotIn("password_expire_time", user["_admin"], "Key is not there")
+ self.assertNotIn("account_expire_time", user["_admin"], "Key is not there")
+
class Test_CommonVimWimSdn(TestCase):
@classmethod
"pattern": "^[^,;()\\.\\$'\"]+$",
}
passwd_schema = {"type": "string", "minLength": 1, "maxLength": 60}
+user_passwd_schema = {
+ "type": "string",
+ "pattern": "^.*(?=.{8,})((?=.*[!@#$%^&*()\\-_=+{};:,<.>]){1})(?=.*\\d)((?=.*[a-z]){1})((?=.*[A-Z]){1}).*$",
+}
name_schema = {
"type": "string",
"minLength": 1,
"properties": {
"username": string_schema,
"domain_name": shortname_schema,
- "password": passwd_schema,
+ "password": user_passwd_schema,
"projects": nameshort_list_schema,
"project_role_mappings": project_role_mappings,
},
"title": "User edit schema for administrators",
"type": "object",
"properties": {
- "password": passwd_schema,
+ "password": user_passwd_schema,
"old_password": passwd_schema,
"username": string_schema, # To allow User Name modification
"projects": {"oneOf": [nameshort_list_schema, array_edition_schema]},
"project_role_mappings": project_role_mappings,
"add_project_role_mappings": project_role_mappings,
"remove_project_role_mappings": project_role_mappings_optional,
+ "system_admin_id": id_schema,
+ "unlock": bool_schema,
+ "renew": bool_schema,
},
"minProperties": 1,
"additionalProperties": False,