"_id": _id,
"username": indata.get("username"),
"password": indata.get("password"),
+ "old_password": indata.get("old_password"),
"add_project_role_mappings": mappings_to_add,
"remove_project_role_mappings": mappings_to_remove,
}
else:
self.tokens_cache.clear()
self.msg.write("admin", "revoke_token", {"_id": token} if token else None)
+
+ def check_password_expiry(self, outdata):
+ """
+ This method will check for password expiry of the user
+ :param outdata: user token information
+ """
+ user_content = None
+ detail = {}
+ present_time = time()
+ user = outdata["username"]
+ if self.config["authentication"].get("pwd_expiry_check"):
+ user_content = self.db.get_list("users", {"username": user})[0]
+ if not user_content.get("username") == "admin":
+ user_content["_admin"]["modified_time"] = present_time
+ if user_content.get("_admin").get("expire_time"):
+ expire_time = user_content["_admin"]["expire_time"]
+ else:
+ expire_time = present_time
+ uid = user_content["_id"]
+ self.db.set_one("users", {"_id": uid}, user_content)
+ if not present_time < expire_time:
+ return True
+ else:
+ pass
import logging
import re
-from osm_nbi.authconn import Authconn, AuthException # , AuthconnOperationException
+from osm_nbi.authconn import Authconn, AuthException, AuthconnConflictException # , AuthconnOperationException
from osm_common.dbbase import DbException
from osm_nbi.base_topic import BaseTopic
from osm_nbi.validation import is_valid_uuid
BaseTopic.format_on_new(user_info, make_public=False)
salt = uuid4().hex
user_info["_admin"]["salt"] = salt
+ present = time()
+ if not user_info["username"] == "admin":
+ if self.config.get("pwd_expiry_check"):
+ user_info["_admin"]["modified_time"] = present
+ user_info["_admin"]["expire_time"] = present
if "password" in user_info:
user_info["password"] = sha256(
user_info["password"].encode("utf-8") + salt.encode("utf-8")
:param user_info: user info modifications
"""
uid = user_info["_id"]
+ old_pwd = user_info.get("old_password")
user_data = self.db.get_one(
self.users_collection, {BaseTopic.id_field("users", uid): uid}
)
+ if old_pwd:
+ salt = user_data["_admin"]["salt"]
+ shadow_password = sha256(old_pwd.encode('utf-8') + salt.encode('utf-8')).hexdigest()
+ if shadow_password != user_data["password"]:
+ raise AuthconnConflictException(
+ "Incorrect password",
+ http_code=HTTPStatus.CONFLICT
+ )
BaseTopic.format_on_edit(user_data, user_info)
# User Name
usnm = user_info.get("username")
user_data["password"] = sha256(
pswd.encode("utf-8") + salt.encode("utf-8")
).hexdigest()
+ if not user_data["username"] == "admin":
+ if self.config.get("pwd_expiry_check"):
+ present = time()
+ if self.config.get("days"):
+ expire = present + 86400 * self.config.get("days")
+ user_data["_admin"]["modified_time"] = present
+ user_data["_admin"]["expire_time"] = expire
# Project-Role Mappings
# TODO: Check that user_info NEVER includes "project_role_mappings"
if "project_role_mappings" not in user_data:
# tacacs_port: 49 # Default value
# tacacs_timeout: 10 # Default value
+# Password expiry configuration
+# pwd_expiry_check: True # Uncomment to enable the password expiry check
+# days: 30 # Default value
+
[rbac]
# roles_to_operations: "roles_to_operations.yml" # initial role generation when database
self._set_location_header("admin", "v1", "tokens", outdata["_id"])
# for logging
self._format_login(token_info)
-
+ # password expiry check
+ if self.authenticator.check_password_expiry(outdata):
+ outdata = {"id": outdata["id"],
+ "message": "change_password",
+ "user_id": outdata["user_id"]
+ }
# cherrypy.response.cookie["Authorization"] = outdata["id"]
# cherrypy.response.cookie["Authorization"]['expires'] = 3600
elif method == "DELETE":
permissions:
default: true
admin: false
- users: false
projects: false
roles: false
+ # Users
+ users: false
+ users:id:patch: true
- name: "project_user"
permissions:
ns_instances: true
vnf_instances: true
slice_instances: true
- users: false
projects: false
roles: false
+ # Users
+ users: false
+ users:id:patch: true
# VIMs
vims: false
vims:get: true
norm(str(e.exception)),
"Wrong exception text",
)
+ with self.subTest(i=3):
+ self.auth.get_user_list.side_effect = [[user], []]
+ self.auth.get_user.return_value = user
+ old_password = self.test_name
+ new_pasw = "new-password"
+ self.topic.edit(
+ self.fake_session,
+ uid,
+ {
+ "old_password": old_password,
+ "password": new_pasw,
+ },
+ )
+ content = self.auth.update_user.call_args[0][0]
+ self.assertEqual(content["old_password"], old_password, "Wrong old password")
+ self.assertEqual(content["password"], new_pasw, "Wrong user password")
def test_delete_user(self):
with self.subTest(i=1):
"type": "object",
"properties": {
"password": passwd_schema,
+ "old_password": passwd_schema,
"username": string_schema, # To allow User Name modification
"projects": {"oneOf": [nameshort_list_schema, array_edition_schema]},
"project_role_mappings": project_role_mappings,