From: escaleira Date: Thu, 3 Apr 2025 17:53:24 +0000 (+0100) Subject: Bug 2404 fix: handling passwords more securely with bcrypt X-Git-Tag: v18.0.0~20 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=9e39382a25a0f5076d2e51e19d6919b445a49cc3;p=osm%2FNBI.git Bug 2404 fix: handling passwords more securely with bcrypt Change-Id: Ia7b781e6af846ab6969ba7fa618c6999bdc169a0 Signed-off-by: escaleira --- diff --git a/osm_nbi/authconn_internal.py b/osm_nbi/authconn_internal.py index cfd23af..ac157b8 100644 --- a/osm_nbi/authconn_internal.py +++ b/osm_nbi/authconn_internal.py @@ -41,11 +41,15 @@ from osm_nbi.authconn import ( from osm_common.dbbase import DbException from osm_nbi.base_topic import BaseTopic from osm_nbi.utils import cef_event, cef_event_builder +from osm_nbi.password_utils import ( + hash_password, + verify_password, + verify_password_sha256, +) from osm_nbi.validation import is_valid_uuid, email_schema from time import time, sleep from http import HTTPStatus from uuid import uuid4 -from hashlib import sha256 from copy import deepcopy from random import choice as random_choice import smtplib @@ -252,11 +256,18 @@ class AuthconnInternal(Authconn): ) if otp: return user_content - salt = user_content["_admin"]["salt"] - shadow_password = sha256( - password.encode("utf-8") + salt.encode("utf-8") - ).hexdigest() - if shadow_password != user_content["password"]: + correct_pwd = False + if user_content.get("hashing_function") == "bcrypt": + correct_pwd = verify_password( + password=password, hashed_password_hex=user_content["password"] + ) + else: + correct_pwd = verify_password_sha256( + password=password, + hashed_password_hex=user_content["password"], + salt=user_content["_admin"]["salt"], + ) + if not correct_pwd: count = 1 if user_content.get("_admin").get("retry_count") >= 0: count += user_content.get("_admin").get("retry_count") @@ -289,6 +300,24 @@ class AuthconnInternal(Authconn): ) else: user_content = None + elif correct_pwd and user_content.get("hashing_function") != "bcrypt": + # Update the database using a more secure hashing function to store the password + user_content["password"] = hash_password( + password=password, + rounds=self.config.get("password_rounds", 12), + ) + user_content["hashing_function"] = "bcrypt" + user_content["_admin"]["password_history_sha256"] = user_content[ + "_admin" + ]["password_history"] + user_content["_admin"]["password_history"] = [ + user_content["password"] + ] + del user_content["_admin"]["salt"] + + uid = user_content["_id"] + idf = BaseTopic.id_field("users", uid) + self.db.set_one(self.users_collection, {idf: uid}, user_content) return user_content def authenticate(self, credentials, token_info=None): @@ -521,8 +550,6 @@ class AuthconnInternal(Authconn): :return: returns the username and id of the user. """ BaseTopic.format_on_new(user_info, make_public=False) - salt = uuid4().hex - user_info["_admin"]["salt"] = salt user_info["_admin"]["user_status"] = "active" present = time() if not user_info["username"] == "admin": @@ -537,10 +564,12 @@ class AuthconnInternal(Authconn): user_info["_admin"]["retry_count"] = 0 user_info["_admin"]["last_token_time"] = present if "password" in user_info: - user_info["password"] = sha256( - user_info["password"].encode("utf-8") + salt.encode("utf-8") - ).hexdigest() - user_info["_admin"]["password_history"] = {salt: user_info["password"]} + user_info["password"] = hash_password( + password=user_info["password"], + rounds=self.config.get("password_rounds", 12), + ) + user_info["hashing_function"] = "bcrypt" + user_info["_admin"]["password_history"] = [user_info["password"]] # "projects" are not stored any more if "projects" in user_info: del user_info["projects"] @@ -564,11 +593,18 @@ class AuthconnInternal(Authconn): self.users_collection, {BaseTopic.id_field("users", uid): uid} ) if old_pwd: - salt = user_data["_admin"]["salt"] - shadow_password = sha256( - old_pwd.encode("utf-8") + salt.encode("utf-8") - ).hexdigest() - if shadow_password != user_data["password"]: + correct_pwd = False + if user_data.get("hashing_function") == "bcrypt": + correct_pwd = verify_password( + password=old_pwd, hashed_password_hex=user_data["password"] + ) + else: + correct_pwd = verify_password_sha256( + password=old_pwd, + hashed_password_hex=user_data["password"], + salt=user_data["salt"], + ) + if not correct_pwd: raise AuthconnConflictException( "Incorrect password", http_code=HTTPStatus.CONFLICT ) @@ -687,28 +723,38 @@ class AuthconnInternal(Authconn): }, ) self.logger.info("{}".format(self.cef_logger)) - salt = uuid4().hex if "_admin" not in user_data: user_data["_admin"] = {} if user_data.get("_admin").get("password_history"): old_pwds = user_data.get("_admin").get("password_history") else: - old_pwds = {} - for k, v in old_pwds.items(): - shadow_password = sha256( - pswd.encode("utf-8") + k.encode("utf-8") - ).hexdigest() - if v == shadow_password: + old_pwds = [] + for v in old_pwds: + if verify_password(password=pswd, hashed_password_hex=v): raise AuthconnConflictException( "Password is used before", http_code=HTTPStatus.CONFLICT ) - user_data["_admin"]["salt"] = salt - user_data["password"] = sha256( - pswd.encode("utf-8") + salt.encode("utf-8") - ).hexdigest() + + # Backwards compatibility for SHA256 hashed passwords + if user_data.get("_admin").get("password_history_sha256"): + old_pwds_sha256 = user_data.get("_admin").get("password_history_sha256") + else: + old_pwds_sha256 = {} + for k, v in old_pwds_sha256.items(): + if verify_password_sha256(password=pswd, hashed_password_hex=v, salt=k): + raise AuthconnConflictException( + "Password is used before", http_code=HTTPStatus.CONFLICT + ) + + # Finally, hash the password to be updated + user_data["password"] = hash_password( + password=pswd, rounds=self.config.get("password_rounds", 12) + ) + user_data["hashing_function"] = "bcrypt" + if len(old_pwds) >= 3: old_pwds.pop(list(old_pwds.keys())[0]) - old_pwds.update({salt: user_data["password"]}) + old_pwds.append([user_data["password"]]) user_data["_admin"]["password_history"] = old_pwds if not user_data["username"] == "admin": if self.config.get("user_management"): @@ -925,12 +971,9 @@ class AuthconnInternal(Authconn): if user_data.get("email_id"): if user_data["email_id"] == indata.get("email_id"): otp = self.generate_otp() - encode_otp = ( - sha256( - otp.encode("utf-8") - + user_data["_admin"]["salt"].encode("utf-8") - ) - ).hexdigest() + encode_otp = hash_password( + password=otp, rounds=self.config.get("password_rounds", 12) + ) otp_field = {encode_otp: time() + otp_expiry_time, "retries": 0} user_data["OTP"] = otp_field uid = user_data["_id"] @@ -976,8 +1019,6 @@ class AuthconnInternal(Authconn): idf = BaseTopic.id_field("users", uid) retry_count = self.config.get("retry_count", 3) if user_data: - salt = user_data["_admin"]["salt"] - actual_otp = sha256(otp.encode("utf-8") + salt.encode("utf-8")).hexdigest() if not user_data.get("OTP"): otp_field = {"retries": 1} user_data["OTP"] = otp_field @@ -985,7 +1026,10 @@ class AuthconnInternal(Authconn): return {"retries": user_data["OTP"]["retries"]} for key, value in user_data["OTP"].items(): curr_time = time() - if key == actual_otp and curr_time < value: + if ( + verify_password(password=otp, hashed_password_hex=key) + and curr_time < value + ): user_data["OTP"] = {} self.db.set_one(self.users_collection, {idf: uid}, user_data) return {"valid": "True", "password_change": "True"} diff --git a/osm_nbi/nbi.cfg b/osm_nbi/nbi.cfg index 00cb377..e0ac3e5 100644 --- a/osm_nbi/nbi.cfg +++ b/osm_nbi/nbi.cfg @@ -122,6 +122,9 @@ version: "0" deviceVendor: "OSM" deviceProduct: "OSM" +# Password hashing configurations +password_rounds: 12 + # SMTP Configuration # smtp_server: "" # smtp_port: diff --git a/osm_nbi/password_utils.py b/osm_nbi/password_utils.py new file mode 100644 index 0000000..9430c05 --- /dev/null +++ b/osm_nbi/password_utils.py @@ -0,0 +1,79 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from hashlib import sha256 +import bcrypt + + +def hash_password(password: str, rounds: int = 12) -> str: + """ + Hash a password with a given number of rounds and return as hex. + + Args: + - password (str): The password to hash. + - rounds (int): The number of rounds (log_rounds) for bcrypt. Default is 12. + + Returns: + - str: The hashed password as an hex string. + """ + # Generate a salt with the specified number of rounds + salt = bcrypt.gensalt(rounds=rounds) + + # Hash the password using the generated salt + hashed_password = bcrypt.hashpw(password.encode("utf-8"), salt) + + # Return the hashed password and salt as hex strings + return hashed_password.hex() + + +def verify_password(password: str, hashed_password_hex: str) -> bool: + """ + Verify a password against a hashed password provided as hex. + + Args: + - password (str): The password to verify. + - hashed_password_hex (str): The hashed password as a hex string. + + Returns: + - bool: True if the password matches the hashed password, False otherwise. + """ + # Convert the hashed password from hex to bytes + hashed_password = bytes.fromhex(hashed_password_hex) + + # Verify the password against the hashed password + return bcrypt.checkpw(password.encode("utf-8"), hashed_password) + + +def verify_password_sha256(password: str, hashed_password_hex: str, salt: str) -> bool: + """ + [Function for backwards compatibility using the SHA256] + Verify a password against a hashed password provided as hex. + + Args: + - password (str): The password to verify. + - hashed_password_hex (str): The hashed password as a hex string. + - salt (str): The salt used to hash the password as a hex string. + + Returns: + - bool: True if the password matches the hashed password, False otherwise. + """ + # Old verification for backwards compatibility + shadow_password = sha256( + password.encode("utf-8") + salt.encode("utf-8") + ).hexdigest() + + return shadow_password == hashed_password_hex diff --git a/requirements.in b/requirements.in index b580767..440cb9a 100644 --- a/requirements.in +++ b/requirements.in @@ -12,6 +12,7 @@ aiohttp async-timeout==4.0.3 +bcrypt cefevent CherryPy>=18.1.2 deepdiff diff --git a/requirements.txt b/requirements.txt index f0ea6c8..ad98174 100644 --- a/requirements.txt +++ b/requirements.txt @@ -33,6 +33,8 @@ autocommand==2.2.2 # via jaraco-text backports-tarfile==1.2.0 # via jaraco-context +bcrypt==4.3.0 + # via -r requirements.in cefevent==0.5.6 # via -r requirements.in certifi==2025.1.31