from osm_common.dbbase import DbException
from osm_nbi.base_topic import BaseTopic
from osm_nbi.utils import cef_event, cef_event_builder
+from osm_nbi.password_utils import (
+ hash_password,
+ verify_password,
+ verify_password_sha256,
+)
from osm_nbi.validation import is_valid_uuid, email_schema
from time import time, sleep
from http import HTTPStatus
from uuid import uuid4
-from hashlib import sha256
from copy import deepcopy
from random import choice as random_choice
import smtplib
)
if otp:
return user_content
- salt = user_content["_admin"]["salt"]
- shadow_password = sha256(
- password.encode("utf-8") + salt.encode("utf-8")
- ).hexdigest()
- if shadow_password != user_content["password"]:
+ correct_pwd = False
+ if user_content.get("hashing_function") == "bcrypt":
+ correct_pwd = verify_password(
+ password=password, hashed_password_hex=user_content["password"]
+ )
+ else:
+ correct_pwd = verify_password_sha256(
+ password=password,
+ hashed_password_hex=user_content["password"],
+ salt=user_content["_admin"]["salt"],
+ )
+ if not correct_pwd:
count = 1
if user_content.get("_admin").get("retry_count") >= 0:
count += user_content.get("_admin").get("retry_count")
)
else:
user_content = None
+ elif correct_pwd and user_content.get("hashing_function") != "bcrypt":
+ # Update the database using a more secure hashing function to store the password
+ user_content["password"] = hash_password(
+ password=password,
+ rounds=self.config.get("password_rounds", 12),
+ )
+ user_content["hashing_function"] = "bcrypt"
+ user_content["_admin"]["password_history_sha256"] = user_content[
+ "_admin"
+ ]["password_history"]
+ user_content["_admin"]["password_history"] = [
+ user_content["password"]
+ ]
+ del user_content["_admin"]["salt"]
+
+ uid = user_content["_id"]
+ idf = BaseTopic.id_field("users", uid)
+ self.db.set_one(self.users_collection, {idf: uid}, user_content)
return user_content
def authenticate(self, credentials, token_info=None):
:return: returns the username and id of the user.
"""
BaseTopic.format_on_new(user_info, make_public=False)
- salt = uuid4().hex
- user_info["_admin"]["salt"] = salt
user_info["_admin"]["user_status"] = "active"
present = time()
if not user_info["username"] == "admin":
user_info["_admin"]["retry_count"] = 0
user_info["_admin"]["last_token_time"] = present
if "password" in user_info:
- user_info["password"] = sha256(
- user_info["password"].encode("utf-8") + salt.encode("utf-8")
- ).hexdigest()
- user_info["_admin"]["password_history"] = {salt: user_info["password"]}
+ user_info["password"] = hash_password(
+ password=user_info["password"],
+ rounds=self.config.get("password_rounds", 12),
+ )
+ user_info["hashing_function"] = "bcrypt"
+ user_info["_admin"]["password_history"] = [user_info["password"]]
# "projects" are not stored any more
if "projects" in user_info:
del user_info["projects"]
self.users_collection, {BaseTopic.id_field("users", uid): uid}
)
if old_pwd:
- salt = user_data["_admin"]["salt"]
- shadow_password = sha256(
- old_pwd.encode("utf-8") + salt.encode("utf-8")
- ).hexdigest()
- if shadow_password != user_data["password"]:
+ correct_pwd = False
+ if user_data.get("hashing_function") == "bcrypt":
+ correct_pwd = verify_password(
+ password=old_pwd, hashed_password_hex=user_data["password"]
+ )
+ else:
+ correct_pwd = verify_password_sha256(
+ password=old_pwd,
+ hashed_password_hex=user_data["password"],
+ salt=user_data["salt"],
+ )
+ if not correct_pwd:
raise AuthconnConflictException(
"Incorrect password", http_code=HTTPStatus.CONFLICT
)
},
)
self.logger.info("{}".format(self.cef_logger))
- salt = uuid4().hex
if "_admin" not in user_data:
user_data["_admin"] = {}
if user_data.get("_admin").get("password_history"):
old_pwds = user_data.get("_admin").get("password_history")
else:
- old_pwds = {}
- for k, v in old_pwds.items():
- shadow_password = sha256(
- pswd.encode("utf-8") + k.encode("utf-8")
- ).hexdigest()
- if v == shadow_password:
+ old_pwds = []
+ for v in old_pwds:
+ if verify_password(password=pswd, hashed_password_hex=v):
raise AuthconnConflictException(
"Password is used before", http_code=HTTPStatus.CONFLICT
)
- user_data["_admin"]["salt"] = salt
- user_data["password"] = sha256(
- pswd.encode("utf-8") + salt.encode("utf-8")
- ).hexdigest()
+
+ # Backwards compatibility for SHA256 hashed passwords
+ if user_data.get("_admin").get("password_history_sha256"):
+ old_pwds_sha256 = user_data.get("_admin").get("password_history_sha256")
+ else:
+ old_pwds_sha256 = {}
+ for k, v in old_pwds_sha256.items():
+ if verify_password_sha256(password=pswd, hashed_password_hex=v, salt=k):
+ raise AuthconnConflictException(
+ "Password is used before", http_code=HTTPStatus.CONFLICT
+ )
+
+ # Finally, hash the password to be updated
+ user_data["password"] = hash_password(
+ password=pswd, rounds=self.config.get("password_rounds", 12)
+ )
+ user_data["hashing_function"] = "bcrypt"
+
if len(old_pwds) >= 3:
old_pwds.pop(list(old_pwds.keys())[0])
- old_pwds.update({salt: user_data["password"]})
+ old_pwds.append([user_data["password"]])
user_data["_admin"]["password_history"] = old_pwds
if not user_data["username"] == "admin":
if self.config.get("user_management"):
if user_data.get("email_id"):
if user_data["email_id"] == indata.get("email_id"):
otp = self.generate_otp()
- encode_otp = (
- sha256(
- otp.encode("utf-8")
- + user_data["_admin"]["salt"].encode("utf-8")
- )
- ).hexdigest()
+ encode_otp = hash_password(
+ password=otp, rounds=self.config.get("password_rounds", 12)
+ )
otp_field = {encode_otp: time() + otp_expiry_time, "retries": 0}
user_data["OTP"] = otp_field
uid = user_data["_id"]
idf = BaseTopic.id_field("users", uid)
retry_count = self.config.get("retry_count", 3)
if user_data:
- salt = user_data["_admin"]["salt"]
- actual_otp = sha256(otp.encode("utf-8") + salt.encode("utf-8")).hexdigest()
if not user_data.get("OTP"):
otp_field = {"retries": 1}
user_data["OTP"] = otp_field
return {"retries": user_data["OTP"]["retries"]}
for key, value in user_data["OTP"].items():
curr_time = time()
- if key == actual_otp and curr_time < value:
+ if (
+ verify_password(password=otp, hashed_password_hex=key)
+ and curr_time < value
+ ):
user_data["OTP"] = {}
self.db.set_one(self.users_collection, {idf: uid}, user_data)
return {"valid": "True", "password_change": "True"}
--- /dev/null
+#!/usr/bin/python3
+# -*- coding: utf-8 -*-
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from hashlib import sha256
+import bcrypt
+
+
+def hash_password(password: str, rounds: int = 12) -> str:
+ """
+ Hash a password with a given number of rounds and return as hex.
+
+ Args:
+ - password (str): The password to hash.
+ - rounds (int): The number of rounds (log_rounds) for bcrypt. Default is 12.
+
+ Returns:
+ - str: The hashed password as an hex string.
+ """
+ # Generate a salt with the specified number of rounds
+ salt = bcrypt.gensalt(rounds=rounds)
+
+ # Hash the password using the generated salt
+ hashed_password = bcrypt.hashpw(password.encode("utf-8"), salt)
+
+ # Return the hashed password and salt as hex strings
+ return hashed_password.hex()
+
+
+def verify_password(password: str, hashed_password_hex: str) -> bool:
+ """
+ Verify a password against a hashed password provided as hex.
+
+ Args:
+ - password (str): The password to verify.
+ - hashed_password_hex (str): The hashed password as a hex string.
+
+ Returns:
+ - bool: True if the password matches the hashed password, False otherwise.
+ """
+ # Convert the hashed password from hex to bytes
+ hashed_password = bytes.fromhex(hashed_password_hex)
+
+ # Verify the password against the hashed password
+ return bcrypt.checkpw(password.encode("utf-8"), hashed_password)
+
+
+def verify_password_sha256(password: str, hashed_password_hex: str, salt: str) -> bool:
+ """
+ [Function for backwards compatibility using the SHA256]
+ Verify a password against a hashed password provided as hex.
+
+ Args:
+ - password (str): The password to verify.
+ - hashed_password_hex (str): The hashed password as a hex string.
+ - salt (str): The salt used to hash the password as a hex string.
+
+ Returns:
+ - bool: True if the password matches the hashed password, False otherwise.
+ """
+ # Old verification for backwards compatibility
+ shadow_password = sha256(
+ password.encode("utf-8") + salt.encode("utf-8")
+ ).hexdigest()
+
+ return shadow_password == hashed_password_hex