or indata.get("project_role_mappings")
or indata.get("projects")
or indata.get("add_projects")
- or indata.get("unlock")
- or indata.get("renew")
):
return _id
if indata.get("project_role_mappings") and (
"old_password": indata.get("old_password"),
"add_project_role_mappings": mappings_to_add,
"remove_project_role_mappings": mappings_to_remove,
- "system_admin_id": indata.get("system_admin_id"),
- "unlock": indata.get("unlock"),
- "renew": indata.get("renew"),
}
)
data_to_send = {"_id": _id, "changes": indata}
user_desc = {
"username": "admin",
"password": "admin",
- "_admin": {"created": now, "modified": now, "user_status": "always-active"},
+ "_admin": {"created": now, "modified": now},
}
if project_id:
pid = project_id
This method will check for password expiry of the user
:param outdata: user token information
"""
- user_list = None
+ user_content = None
present_time = time()
user = outdata["username"]
- if self.config["authentication"].get("user_management"):
- user_list = self.db.get_list("users", {"username": user})
- if user_list:
- user_content = user_list[0]
- if not user_content.get("username") == "admin":
- user_content["_admin"]["modified"] = present_time
- if user_content.get("_admin").get("password_expire_time"):
- password_expire_time = user_content["_admin"][
- "password_expire_time"
- ]
- else:
- password_expire_time = present_time
- uid = user_content["_id"]
- self.db.set_one("users", {"_id": uid}, user_content)
- if not present_time < password_expire_time:
- return True
+ if self.config["authentication"].get("pwd_expiry_check"):
+ user_content = self.db.get_list("users", {"username": user})[0]
+ if not user_content.get("username") == "admin":
+ user_content["_admin"]["modified_time"] = present_time
+ if user_content.get("_admin").get("expire_time"):
+ expire_time = user_content["_admin"]["expire_time"]
+ else:
+ expire_time = present_time
+ uid = user_content["_id"]
+ self.db.set_one("users", {"_id": uid}, user_content)
+ if not present_time < expire_time:
+ return True
else:
pass
user_rows = self.db.get_list(
self.users_collection, {BaseTopic.id_field("users", user): user}
)
- now = time()
user_content = None
- if user:
- user_rows = self.db.get_list(
- self.users_collection,
- {BaseTopic.id_field(self.users_collection, user): user},
- )
- if user_rows:
- user_content = user_rows[0]
- # Updating user_status for every system_admin id role login
- mapped_roles = user_content.get("project_role_mappings")
- for role in mapped_roles:
- role_id = role.get("role")
- role_assigned = self.db.get_one(
- self.roles_collection,
- {BaseTopic.id_field(self.roles_collection, role_id): role_id},
- )
-
- if role_assigned.get("permissions")["admin"]:
- if role_assigned.get("permissions")["default"]:
- if self.config.get("user_management"):
- filt = {}
- users = self.db.get_list(self.users_collection, filt)
- for user_info in users:
- if not user_info.get("username") == "admin":
- if not user_info.get("_admin").get(
- "account_expire_time"
- ):
- expire = now + 86400 * self.config.get(
- "account_expire_days"
- )
- self.db.set_one(
- self.users_collection,
- {"_id": user_info["_id"]},
- {"_admin.account_expire_time": expire},
- )
- else:
- if now > user_info.get("_admin").get(
- "account_expire_time"
- ):
- self.db.set_one(
- self.users_collection,
- {"_id": user_info["_id"]},
- {"_admin.user_status": "expired"},
- )
- break
-
- # To add "admin" user_status key while upgrading osm setup with feature enabled
- if user_content.get("username") == "admin":
- if self.config.get("user_management"):
- self.db.set_one(
- self.users_collection,
- {"_id": user_content["_id"]},
- {"_admin.user_status": "always-active"},
- )
-
- if not user_content.get("username") == "admin":
- if self.config.get("user_management"):
- if not user_content.get("_admin").get("account_expire_time"):
- account_expire_time = now + 86400 * self.config.get(
- "account_expire_days"
- )
- self.db.set_one(
- self.users_collection,
- {"_id": user_content["_id"]},
- {"_admin.account_expire_time": account_expire_time},
- )
- else:
- account_expire_time = user_content.get("_admin").get(
- "account_expire_time"
- )
-
- if now > account_expire_time:
- self.db.set_one(
- self.users_collection,
- {"_id": user_content["_id"]},
- {"_admin.user_status": "expired"},
- )
- raise AuthException(
- "Account expired", http_code=HTTPStatus.UNAUTHORIZED
- )
-
- if user_content.get("_admin").get("user_status") == "locked":
- raise AuthException(
- "Failed to login as the account is locked due to MANY FAILED ATTEMPTS"
- )
- elif user_content.get("_admin").get("user_status") == "expired":
- raise AuthException(
- "Failed to login as the account is expired"
- )
-
- salt = user_content["_admin"]["salt"]
- shadow_password = sha256(
- password.encode("utf-8") + salt.encode("utf-8")
- ).hexdigest()
- if shadow_password != user_content["password"]:
- count = 1
- if user_content.get("_admin").get("retry_count") >= 0:
- count += user_content.get("_admin").get("retry_count")
- self.db.set_one(
- self.users_collection,
- {"_id": user_content["_id"]},
- {"_admin.retry_count": count},
- )
- self.logger.debug(
- "Failed Authentications count: {}".format(count)
- )
-
- if user_content.get("username") == "admin":
- user_content = None
- else:
- if not self.config.get("user_management"):
- user_content = None
- else:
- if (
- user_content.get("_admin").get("retry_count")
- >= self.config["max_pwd_attempt"] - 1
- ):
- self.db.set_one(
- self.users_collection,
- {"_id": user_content["_id"]},
- {"_admin.user_status": "locked"},
- )
- raise AuthException(
- "Failed to login as the account is locked due to MANY FAILED ATTEMPTS"
- )
- else:
- user_content = None
+ if user_rows:
+ user_content = user_rows[0]
+ salt = user_content["_admin"]["salt"]
+ shadow_password = sha256(
+ password.encode("utf-8") + salt.encode("utf-8")
+ ).hexdigest()
+ if shadow_password != user_content["password"]:
+ user_content = None
return user_content
def authenticate(self, credentials, token_info=None):
sleep(self.token_delay)
# user_content["_admin"]["last_token_time"] = now
# self.db.replace("users", user_content["_id"], user_content) # might cause race conditions
- user_data = {
- "_admin.last_token_time": now,
- "_admin.retry_count": 0,
- }
self.db.set_one(
self.users_collection,
{"_id": user_content["_id"]},
- user_data,
+ {"_admin.last_token_time": now},
)
token_id = "".join(
]
roles_list = [{"name": "project_admin", "id": rid}]
- login_count = user_content.get("_admin").get("retry_count")
- last_token_time = user_content.get("_admin").get("last_token_time")
-
- admin_show = False
- user_show = False
- if self.config.get("user_management"):
- for role in roles_list:
- role_id = role.get("id")
- permission = self.db.get_one(
- self.roles_collection,
- {BaseTopic.id_field(self.roles_collection, role_id): role_id},
- )
- if permission.get("permissions")["admin"]:
- if permission.get("permissions")["default"]:
- admin_show = True
- break
- else:
- user_show = True
new_token = {
"issued_at": now,
"expires": now + 3600,
"user_id": user_content["_id"],
"admin": token_admin,
"roles": roles_list,
- "login_count": login_count,
- "last_login": last_token_time,
- "admin_show": admin_show,
- "user_show": user_show,
}
self.db.create(self.tokens_collection, new_token)
BaseTopic.format_on_new(user_info, make_public=False)
salt = uuid4().hex
user_info["_admin"]["salt"] = salt
- user_info["_admin"]["user_status"] = "active"
present = time()
if not user_info["username"] == "admin":
- if self.config.get("user_management"):
- user_info["_admin"]["modified"] = present
- user_info["_admin"]["password_expire_time"] = present
- account_expire_time = present + 86400 * self.config.get(
- "account_expire_days"
- )
- user_info["_admin"]["account_expire_time"] = account_expire_time
-
- user_info["_admin"]["retry_count"] = 0
- user_info["_admin"]["last_token_time"] = present
+ if self.config.get("pwd_expiry_check"):
+ user_info["_admin"]["modified_time"] = present
+ user_info["_admin"]["expire_time"] = present
if "password" in user_info:
user_info["password"] = sha256(
user_info["password"].encode("utf-8") + salt.encode("utf-8")
).hexdigest()
- user_info["_admin"]["password_history"] = {salt: user_info["password"]}
# "projects" are not stored any more
if "projects" in user_info:
del user_info["projects"]
"""
uid = user_info["_id"]
old_pwd = user_info.get("old_password")
- unlock = user_info.get("unlock")
- renew = user_info.get("renew")
- permission_id = user_info.get("system_admin_id")
-
user_data = self.db.get_one(
self.users_collection, {BaseTopic.id_field("users", uid): uid}
)
raise AuthconnConflictException(
"Incorrect password", http_code=HTTPStatus.CONFLICT
)
- # Unlocking the user
- if unlock:
- system_user = None
- unlock_state = False
- if not permission_id:
- raise AuthconnConflictException(
- "system_admin_id is the required field to unlock the user",
- http_code=HTTPStatus.CONFLICT,
- )
- else:
- system_user = self.db.get_one(
- self.users_collection,
- {
- BaseTopic.id_field(
- self.users_collection, permission_id
- ): permission_id
- },
- )
- mapped_roles = system_user.get("project_role_mappings")
- for role in mapped_roles:
- role_id = role.get("role")
- role_assigned = self.db.get_one(
- self.roles_collection,
- {BaseTopic.id_field(self.roles_collection, role_id): role_id},
- )
- if role_assigned.get("permissions")["admin"]:
- if role_assigned.get("permissions")["default"]:
- user_data["_admin"]["retry_count"] = 0
- user_data["_admin"]["user_status"] = "active"
- unlock_state = True
- break
- if not unlock_state:
- raise AuthconnConflictException(
- "User '{}' does not have the privilege to unlock the user".format(
- permission_id
- ),
- http_code=HTTPStatus.CONFLICT,
- )
- # Renewing the user
- if renew:
- system_user = None
- renew_state = False
- if not permission_id:
- raise AuthconnConflictException(
- "system_admin_id is the required field to renew the user",
- http_code=HTTPStatus.CONFLICT,
- )
- else:
- system_user = self.db.get_one(
- self.users_collection,
- {
- BaseTopic.id_field(
- self.users_collection, permission_id
- ): permission_id
- },
- )
- mapped_roles = system_user.get("project_role_mappings")
- for role in mapped_roles:
- role_id = role.get("role")
- role_assigned = self.db.get_one(
- self.roles_collection,
- {BaseTopic.id_field(self.roles_collection, role_id): role_id},
- )
- if role_assigned.get("permissions")["admin"]:
- if role_assigned.get("permissions")["default"]:
- present = time()
- account_expire = (
- present + 86400 * self.config["account_expire_days"]
- )
- user_data["_admin"]["modified"] = present
- user_data["_admin"]["account_expire_time"] = account_expire
- user_data["_admin"]["user_status"] = "active"
- renew_state = True
- break
- if not renew_state:
- raise AuthconnConflictException(
- "User '{}' does not have the privilege to renew the user".format(
- permission_id
- ),
- http_code=HTTPStatus.CONFLICT,
- )
BaseTopic.format_on_edit(user_data, user_info)
# User Name
usnm = user_info.get("username")
salt = uuid4().hex
if "_admin" not in user_data:
user_data["_admin"] = {}
- if user_data.get("_admin").get("password_history"):
- old_pwds = user_data.get("_admin").get("password_history")
- else:
- old_pwds = {}
- for k, v in old_pwds.items():
- shadow_password = sha256(
- pswd.encode("utf-8") + k.encode("utf-8")
- ).hexdigest()
- if v == shadow_password:
- raise AuthconnConflictException(
- "Password is used before", http_code=HTTPStatus.CONFLICT
- )
user_data["_admin"]["salt"] = salt
user_data["password"] = sha256(
pswd.encode("utf-8") + salt.encode("utf-8")
).hexdigest()
- if len(old_pwds) >= 3:
- old_pwds.pop(list(old_pwds.keys())[0])
- old_pwds.update({salt: user_data["password"]})
- user_data["_admin"]["password_history"] = old_pwds
if not user_data["username"] == "admin":
- if self.config.get("user_management"):
+ if self.config.get("pwd_expiry_check"):
present = time()
- if self.config.get("pwd_expire_days"):
- expire = present + 86400 * self.config.get("pwd_expire_days")
- user_data["_admin"]["modified"] = present
- user_data["_admin"]["password_expire_time"] = expire
+ if self.config.get("days"):
+ expire = present + 86400 * self.config.get("days")
+ user_data["_admin"]["modified_time"] = present
+ user_data["_admin"]["expire_time"] = expire
# Project-Role Mappings
# TODO: Check that user_info NEVER includes "project_role_mappings"
if "project_role_mappings" not in user_data:
# tacacs_port: 49 # Default value
# tacacs_timeout: 10 # Default value
-# User Management configuration
-user_management: True
-pwd_expire_days: 30 # Password expiry Default value
-max_pwd_attempt: 5
-account_expire_days: 90 # Account expiry Default value
+# Password expiry configuration
+# pwd_expiry_check: True # Uncomment to enable the password expiry check
+# days: 30 # Default value
# CEF Configuration
version: "0"
engine_config = cherrypy.tree.apps["/osm"].config
for k, v in environ.items():
- if k == "OSMNBI_USER_MANAGEMENT":
- feature_state = eval(v.title())
- engine_config["authentication"]["user_management"] = feature_state
if not k.startswith("OSMNBI_"):
continue
k1, _, k2 = k[7:].lower().partition("_")
from http import HTTPStatus
from time import time
from osm_common import dbbase, fsbase, msgbase
-from osm_common.dbmemory import DbMemory
from osm_nbi import authconn, validation
from osm_nbi.admin_topics import (
ProjectTopicAuth,
)
from osm_nbi.engine import EngineException
from osm_nbi.authconn import AuthconnNotFoundException
-from osm_nbi.authconn_internal import AuthconnInternal
test_pid = str(uuid4())
@classmethod
def setUpClass(cls):
cls.test_name = "test-user-topic"
- cls.password = "Test@123"
def setUp(self):
- # self.db = Mock(dbbase.DbBase())
- self.db = DbMemory()
+ self.db = Mock(dbbase.DbBase())
self.fs = Mock(fsbase.FsBase())
self.msg = Mock(msgbase.MsgBase())
self.auth = Mock(authconn.Authconn(None, None, None))
self.fake_session,
{
"username": self.test_name,
- "password": self.password,
+ "password": self.test_name,
"project_role_mappings": prms_in,
},
)
self.assertEqual(uid2, uid1, "Wrong project identifier")
content = self.auth.create_user.call_args[0][0]
self.assertEqual(content["username"], self.test_name, "Wrong project name")
- self.assertEqual(content["password"], self.password, "Wrong password")
+ self.assertEqual(content["password"], self.test_name, "Wrong password")
self.assertEqual(
content["project_role_mappings"],
prms_out,
self.fake_session,
{
"username": self.test_name,
- "password": self.password,
+ "password": self.test_name,
"projects": ["some_project"],
},
)
self.assertEqual(uid2, uid1, "Wrong project identifier")
content = self.auth.create_user.call_args[0][0]
self.assertEqual(content["username"], self.test_name, "Wrong project name")
- self.assertEqual(content["password"], self.password, "Wrong password")
+ self.assertEqual(content["password"], self.test_name, "Wrong password")
self.assertEqual(
content["project_role_mappings"],
prms_out,
self.fake_session,
{
"username": "other-project-name",
- "password": "Other@pwd1",
+ "password": "other-password",
"project_role_mappings": [{}],
},
)
self.fake_session,
{
"username": "other-project-name",
- "password": "Other@pwd1",
+ "password": "other-password",
"projects": [],
},
)
{"_id": rid1, "name": "role-1"},
]
new_name = "new-user-name"
- new_pasw = "New@pwd1"
+ new_pasw = "new-password"
add_prms = [{"project": pid2, "role": rid2}]
rem_prms = [{"project": pid1, "role": rid1}]
self.topic.edit(
with self.subTest(i=3):
self.auth.get_user_list.side_effect = [[user], []]
self.auth.get_user.return_value = user
- old_password = self.password
- new_pasw = "New@pwd1"
+ old_password = self.test_name
+ new_pasw = "new-password"
self.topic.edit(
self.fake_session,
uid,
self.fake_session,
{
"username": uid,
- "password": self.password,
+ "password": self.test_name,
"projects": [test_pid],
},
)
self.fake_session,
{
"username": self.test_name,
- "password": self.password,
+ "password": self.test_name,
"projects": [test_pid],
},
)
self.fake_session,
{
"username": self.test_name,
- "password": self.password,
+ "password": self.test_name,
"projects": [str(uuid4())],
},
)
"Wrong exception text",
)
- def test_user_management(self):
- self.config = {
- "user_management": True,
- "pwd_expire_days": 30,
- "max_pwd_attempt": 5,
- "account_expire_days": 90,
- }
- self.permissions = {"admin": True, "default": True}
- now = time()
- rid = str(uuid4())
- role = {
- "_id": rid,
- "name": self.test_name,
- "permissions": self.permissions,
- "_admin": {"created": now, "modified": now},
- }
- self.db.create("roles", role)
- admin_user = {
- "_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500",
- "username": "admin",
- "password": "bf0d9f988ad9b404464cf8c8749b298209b05fd404119bae0c11e247efbbc4cb",
- "_admin": {
- "created": 1663058370.7721832,
- "modified": 1663681183.5651639,
- "salt": "37587e7e0c2f4dbfb9416f3fb5543e2b",
- "last_token_time": 1666876472.2962265,
- "user_status": "always-active",
- "retry_count": 0,
- },
- "project_role_mappings": [
- {"project": "a595ce4e-09dc-4b24-9d6f-e723830bc66b", "role": rid}
- ],
- }
- self.db.create("users", admin_user)
- with self.subTest(i=1):
- self.user_create = AuthconnInternal(self.config, self.db, self.permissions)
- user_info = {"username": "user_mgmt_true", "password": "Test@123"}
- self.user_create.create_user(user_info)
- user = self.db.get_one("users", {"username": user_info["username"]})
- self.assertEqual(user["username"], user_info["username"], "Wrong user name")
- self.assertEqual(
- user["_admin"]["user_status"], "active", "User status is unknown"
- )
- self.assertIn("password_expire_time", user["_admin"], "Key is not there")
- self.assertIn("account_expire_time", user["_admin"], "Key is not there")
- with self.subTest(i=2):
- self.user_update = AuthconnInternal(self.config, self.db, self.permissions)
- locked_user = {
- "username": "user_lock",
- "password": "c94ba8cfe81985cf5c84dff16d5bac95814ab17e44a8871755eb4cf3a27b7d3d",
- "_admin": {
- "created": 1667207552.2191198,
- "modified": 1667207552.2191815,
- "salt": "560a5d51b1d64bb4b9cae0ccff3f1102",
- "user_status": "locked",
- "password_expire_time": 1667207552.2191815,
- "account_expire_time": 1674983552.2191815,
- "retry_count": 5,
- "last_token_time": 1667207552.2191815,
- },
- "_id": "73bbbb71-ed38-4b79-9f58-ece19e7e32d6",
- }
- self.db.create("users", locked_user)
- user_info = {
- "_id": "73bbbb71-ed38-4b79-9f58-ece19e7e32d6",
- "system_admin_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500",
- "unlock": True,
- }
- self.assertEqual(
- locked_user["_admin"]["user_status"], "locked", "User status is unknown"
- )
- self.user_update.update_user(user_info)
- user = self.db.get_one("users", {"username": locked_user["username"]})
- self.assertEqual(
- user["username"], locked_user["username"], "Wrong user name"
- )
- self.assertEqual(
- user["_admin"]["user_status"], "active", "User status is unknown"
- )
- self.assertEqual(user["_admin"]["retry_count"], 0, "retry_count is unknown")
- with self.subTest(i=3):
- self.user_update = AuthconnInternal(self.config, self.db, self.permissions)
- expired_user = {
- "username": "user_expire",
- "password": "c94ba8cfe81985cf5c84dff16d5bac95814ab17e44a8871755eb4cf3a27b7d3d",
- "_admin": {
- "created": 1665602087.601298,
- "modified": 1665636442.1245084,
- "salt": "560a5d51b1d64bb4b9cae0ccff3f1102",
- "user_status": "expired",
- "password_expire_time": 1668248628.2191815,
- "account_expire_time": 1666952628.2191815,
- "retry_count": 0,
- "last_token_time": 1666779828.2171815,
- },
- "_id": "3266430f-8222-407f-b08f-3a242504ab94",
- }
- self.db.create("users", expired_user)
- user_info = {
- "_id": "3266430f-8222-407f-b08f-3a242504ab94",
- "system_admin_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500",
- "renew": True,
- }
- self.assertEqual(
- expired_user["_admin"]["user_status"],
- "expired",
- "User status is unknown",
- )
- self.user_update.update_user(user_info)
- user = self.db.get_one("users", {"username": expired_user["username"]})
- self.assertEqual(
- user["username"], expired_user["username"], "Wrong user name"
- )
- self.assertEqual(
- user["_admin"]["user_status"], "active", "User status is unknown"
- )
- self.assertGreater(
- user["_admin"]["account_expire_time"],
- expired_user["_admin"]["account_expire_time"],
- "User expire time is not get extended",
- )
- with self.subTest(i=4):
- self.config.update({"user_management": False})
- self.user_create = AuthconnInternal(self.config, self.db, self.permissions)
- user_info = {"username": "user_mgmt_false", "password": "Test@123"}
- self.user_create.create_user(user_info)
- user = self.db.get_one("users", {"username": user_info["username"]})
- self.assertEqual(user["username"], user_info["username"], "Wrong user name")
- self.assertEqual(
- user["_admin"]["user_status"], "active", "User status is unknown"
- )
- self.assertNotIn("password_expire_time", user["_admin"], "Key is not there")
- self.assertNotIn("account_expire_time", user["_admin"], "Key is not there")
-
class Test_CommonVimWimSdn(TestCase):
@classmethod
"pattern": "^[^,;()\\.\\$'\"]+$",
}
passwd_schema = {"type": "string", "minLength": 1, "maxLength": 60}
-user_passwd_schema = {
- "type": "string",
- "pattern": "^.*(?=.{8,})((?=.*[!@#$%^&*()\\-_=+{};:,<.>]){1})(?=.*\\d)((?=.*[a-z]){1})((?=.*[A-Z]){1}).*$",
-}
name_schema = {
"type": "string",
"minLength": 1,
"properties": {
"username": string_schema,
"domain_name": shortname_schema,
- "password": user_passwd_schema,
+ "password": passwd_schema,
"projects": nameshort_list_schema,
"project_role_mappings": project_role_mappings,
},
"title": "User edit schema for administrators",
"type": "object",
"properties": {
- "password": user_passwd_schema,
+ "password": passwd_schema,
"old_password": passwd_schema,
"username": string_schema, # To allow User Name modification
"projects": {"oneOf": [nameshort_list_schema, array_edition_schema]},
"project_role_mappings": project_role_mappings,
"add_project_role_mappings": project_role_mappings,
"remove_project_role_mappings": project_role_mappings_optional,
- "system_admin_id": id_schema,
- "unlock": bool_schema,
- "renew": bool_schema,
},
"minProperties": 1,
"additionalProperties": False,