Bug 2404 fix: handling passwords more securely with bcrypt 54/15154/8
authorescaleira <escaleira@av.it.pt>
Thu, 3 Apr 2025 17:53:24 +0000 (18:53 +0100)
committerescaleira <escaleira@av.it.pt>
Tue, 29 Apr 2025 15:36:28 +0000 (17:36 +0200)
Change-Id: Ia7b781e6af846ab6969ba7fa618c6999bdc169a0
Signed-off-by: escaleira <escaleira@av.it.pt>
osm_nbi/authconn_internal.py
osm_nbi/nbi.cfg
osm_nbi/password_utils.py [new file with mode: 0644]
requirements.in
requirements.txt

index cfd23af..ac157b8 100644 (file)
@@ -41,11 +41,15 @@ from osm_nbi.authconn import (
 from osm_common.dbbase import DbException
 from osm_nbi.base_topic import BaseTopic
 from osm_nbi.utils import cef_event, cef_event_builder
+from osm_nbi.password_utils import (
+    hash_password,
+    verify_password,
+    verify_password_sha256,
+)
 from osm_nbi.validation import is_valid_uuid, email_schema
 from time import time, sleep
 from http import HTTPStatus
 from uuid import uuid4
-from hashlib import sha256
 from copy import deepcopy
 from random import choice as random_choice
 import smtplib
@@ -252,11 +256,18 @@ class AuthconnInternal(Authconn):
                             )
                 if otp:
                     return user_content
-                salt = user_content["_admin"]["salt"]
-                shadow_password = sha256(
-                    password.encode("utf-8") + salt.encode("utf-8")
-                ).hexdigest()
-                if shadow_password != user_content["password"]:
+                correct_pwd = False
+                if user_content.get("hashing_function") == "bcrypt":
+                    correct_pwd = verify_password(
+                        password=password, hashed_password_hex=user_content["password"]
+                    )
+                else:
+                    correct_pwd = verify_password_sha256(
+                        password=password,
+                        hashed_password_hex=user_content["password"],
+                        salt=user_content["_admin"]["salt"],
+                    )
+                if not correct_pwd:
                     count = 1
                     if user_content.get("_admin").get("retry_count") >= 0:
                         count += user_content.get("_admin").get("retry_count")
@@ -289,6 +300,24 @@ class AuthconnInternal(Authconn):
                                 )
                             else:
                                 user_content = None
+                elif correct_pwd and user_content.get("hashing_function") != "bcrypt":
+                    # Update the database using a more secure hashing function to store the password
+                    user_content["password"] = hash_password(
+                        password=password,
+                        rounds=self.config.get("password_rounds", 12),
+                    )
+                    user_content["hashing_function"] = "bcrypt"
+                    user_content["_admin"]["password_history_sha256"] = user_content[
+                        "_admin"
+                    ]["password_history"]
+                    user_content["_admin"]["password_history"] = [
+                        user_content["password"]
+                    ]
+                    del user_content["_admin"]["salt"]
+
+                    uid = user_content["_id"]
+                    idf = BaseTopic.id_field("users", uid)
+                    self.db.set_one(self.users_collection, {idf: uid}, user_content)
         return user_content
 
     def authenticate(self, credentials, token_info=None):
@@ -521,8 +550,6 @@ class AuthconnInternal(Authconn):
         :return: returns the username and id of the user.
         """
         BaseTopic.format_on_new(user_info, make_public=False)
-        salt = uuid4().hex
-        user_info["_admin"]["salt"] = salt
         user_info["_admin"]["user_status"] = "active"
         present = time()
         if not user_info["username"] == "admin":
@@ -537,10 +564,12 @@ class AuthconnInternal(Authconn):
         user_info["_admin"]["retry_count"] = 0
         user_info["_admin"]["last_token_time"] = present
         if "password" in user_info:
-            user_info["password"] = sha256(
-                user_info["password"].encode("utf-8") + salt.encode("utf-8")
-            ).hexdigest()
-            user_info["_admin"]["password_history"] = {salt: user_info["password"]}
+            user_info["password"] = hash_password(
+                password=user_info["password"],
+                rounds=self.config.get("password_rounds", 12),
+            )
+            user_info["hashing_function"] = "bcrypt"
+            user_info["_admin"]["password_history"] = [user_info["password"]]
         # "projects" are not stored any more
         if "projects" in user_info:
             del user_info["projects"]
@@ -564,11 +593,18 @@ class AuthconnInternal(Authconn):
             self.users_collection, {BaseTopic.id_field("users", uid): uid}
         )
         if old_pwd:
-            salt = user_data["_admin"]["salt"]
-            shadow_password = sha256(
-                old_pwd.encode("utf-8") + salt.encode("utf-8")
-            ).hexdigest()
-            if shadow_password != user_data["password"]:
+            correct_pwd = False
+            if user_data.get("hashing_function") == "bcrypt":
+                correct_pwd = verify_password(
+                    password=old_pwd, hashed_password_hex=user_data["password"]
+                )
+            else:
+                correct_pwd = verify_password_sha256(
+                    password=old_pwd,
+                    hashed_password_hex=user_data["password"],
+                    salt=user_data["salt"],
+                )
+            if not correct_pwd:
                 raise AuthconnConflictException(
                     "Incorrect password", http_code=HTTPStatus.CONFLICT
                 )
@@ -687,28 +723,38 @@ class AuthconnInternal(Authconn):
                 },
             )
             self.logger.info("{}".format(self.cef_logger))
-            salt = uuid4().hex
             if "_admin" not in user_data:
                 user_data["_admin"] = {}
             if user_data.get("_admin").get("password_history"):
                 old_pwds = user_data.get("_admin").get("password_history")
             else:
-                old_pwds = {}
-            for k, v in old_pwds.items():
-                shadow_password = sha256(
-                    pswd.encode("utf-8") + k.encode("utf-8")
-                ).hexdigest()
-                if v == shadow_password:
+                old_pwds = []
+            for v in old_pwds:
+                if verify_password(password=pswd, hashed_password_hex=v):
                     raise AuthconnConflictException(
                         "Password is used before", http_code=HTTPStatus.CONFLICT
                     )
-            user_data["_admin"]["salt"] = salt
-            user_data["password"] = sha256(
-                pswd.encode("utf-8") + salt.encode("utf-8")
-            ).hexdigest()
+
+            # Backwards compatibility for SHA256 hashed passwords
+            if user_data.get("_admin").get("password_history_sha256"):
+                old_pwds_sha256 = user_data.get("_admin").get("password_history_sha256")
+            else:
+                old_pwds_sha256 = {}
+            for k, v in old_pwds_sha256.items():
+                if verify_password_sha256(password=pswd, hashed_password_hex=v, salt=k):
+                    raise AuthconnConflictException(
+                        "Password is used before", http_code=HTTPStatus.CONFLICT
+                    )
+
+            # Finally, hash the password to be updated
+            user_data["password"] = hash_password(
+                password=pswd, rounds=self.config.get("password_rounds", 12)
+            )
+            user_data["hashing_function"] = "bcrypt"
+
             if len(old_pwds) >= 3:
                 old_pwds.pop(list(old_pwds.keys())[0])
-            old_pwds.update({salt: user_data["password"]})
+            old_pwds.append([user_data["password"]])
             user_data["_admin"]["password_history"] = old_pwds
             if not user_data["username"] == "admin":
                 if self.config.get("user_management"):
@@ -925,12 +971,9 @@ class AuthconnInternal(Authconn):
             if user_data.get("email_id"):
                 if user_data["email_id"] == indata.get("email_id"):
                     otp = self.generate_otp()
-                    encode_otp = (
-                        sha256(
-                            otp.encode("utf-8")
-                            + user_data["_admin"]["salt"].encode("utf-8")
-                        )
-                    ).hexdigest()
+                    encode_otp = hash_password(
+                        password=otp, rounds=self.config.get("password_rounds", 12)
+                    )
                     otp_field = {encode_otp: time() + otp_expiry_time, "retries": 0}
                     user_data["OTP"] = otp_field
                     uid = user_data["_id"]
@@ -976,8 +1019,6 @@ class AuthconnInternal(Authconn):
         idf = BaseTopic.id_field("users", uid)
         retry_count = self.config.get("retry_count", 3)
         if user_data:
-            salt = user_data["_admin"]["salt"]
-            actual_otp = sha256(otp.encode("utf-8") + salt.encode("utf-8")).hexdigest()
             if not user_data.get("OTP"):
                 otp_field = {"retries": 1}
                 user_data["OTP"] = otp_field
@@ -985,7 +1026,10 @@ class AuthconnInternal(Authconn):
                 return {"retries": user_data["OTP"]["retries"]}
             for key, value in user_data["OTP"].items():
                 curr_time = time()
-                if key == actual_otp and curr_time < value:
+                if (
+                    verify_password(password=otp, hashed_password_hex=key)
+                    and curr_time < value
+                ):
                     user_data["OTP"] = {}
                     self.db.set_one(self.users_collection, {idf: uid}, user_data)
                     return {"valid": "True", "password_change": "True"}
index 00cb377..e0ac3e5 100644 (file)
@@ -122,6 +122,9 @@ version: "0"
 deviceVendor: "OSM"
 deviceProduct: "OSM"
 
+# Password hashing configurations
+password_rounds: 12
+
 # SMTP Configuration
 # smtp_server: ""
 # smtp_port:
diff --git a/osm_nbi/password_utils.py b/osm_nbi/password_utils.py
new file mode 100644 (file)
index 0000000..9430c05
--- /dev/null
@@ -0,0 +1,79 @@
+#!/usr/bin/python3
+# -*- coding: utf-8 -*-
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from hashlib import sha256
+import bcrypt
+
+
+def hash_password(password: str, rounds: int = 12) -> str:
+    """
+    Hash a password with a given number of rounds and return as hex.
+
+    Args:
+    - password (str): The password to hash.
+    - rounds (int): The number of rounds (log_rounds) for bcrypt. Default is 12.
+
+    Returns:
+    - str: The hashed password as an hex string.
+    """
+    # Generate a salt with the specified number of rounds
+    salt = bcrypt.gensalt(rounds=rounds)
+
+    # Hash the password using the generated salt
+    hashed_password = bcrypt.hashpw(password.encode("utf-8"), salt)
+
+    # Return the hashed password and salt as hex strings
+    return hashed_password.hex()
+
+
+def verify_password(password: str, hashed_password_hex: str) -> bool:
+    """
+    Verify a password against a hashed password provided as hex.
+
+    Args:
+    - password (str): The password to verify.
+    - hashed_password_hex (str): The hashed password as a hex string.
+
+    Returns:
+    - bool: True if the password matches the hashed password, False otherwise.
+    """
+    # Convert the hashed password from hex to bytes
+    hashed_password = bytes.fromhex(hashed_password_hex)
+
+    # Verify the password against the hashed password
+    return bcrypt.checkpw(password.encode("utf-8"), hashed_password)
+
+
+def verify_password_sha256(password: str, hashed_password_hex: str, salt: str) -> bool:
+    """
+    [Function for backwards compatibility using the SHA256]
+    Verify a password against a hashed password provided as hex.
+
+    Args:
+    - password (str): The password to verify.
+    - hashed_password_hex (str): The hashed password as a hex string.
+    - salt (str): The salt used to hash the password as a hex string.
+
+    Returns:
+    - bool: True if the password matches the hashed password, False otherwise.
+    """
+    # Old verification for backwards compatibility
+    shadow_password = sha256(
+        password.encode("utf-8") + salt.encode("utf-8")
+    ).hexdigest()
+
+    return shadow_password == hashed_password_hex
index b580767..440cb9a 100644 (file)
@@ -12,6 +12,7 @@
 
 aiohttp
 async-timeout==4.0.3
+bcrypt
 cefevent
 CherryPy>=18.1.2
 deepdiff
index f0ea6c8..ad98174 100644 (file)
@@ -33,6 +33,8 @@ autocommand==2.2.2
     # via jaraco-text
 backports-tarfile==1.2.0
     # via jaraco-context
+bcrypt==4.3.0
+    # via -r requirements.in
 cefevent==0.5.6
     # via -r requirements.in
 certifi==2025.1.31