from osm_common.dbbase import DbException
from osm_nbi.base_topic import BaseTopic
from osm_nbi.utils import cef_event, cef_event_builder
-from osm_nbi.validation import is_valid_uuid
+from osm_nbi.validation import is_valid_uuid, email_schema
from time import time, sleep
from http import HTTPStatus
from uuid import uuid4
from hashlib import sha256
from copy import deepcopy
from random import choice as random_choice
+import smtplib
+from email.message import EmailMessage
+from email.mime.text import MIMEText
+from email.mime.multipart import MIMEMultipart
class AuthconnInternal(Authconn):
self.logger.exception(exmsg)
raise AuthException(exmsg, http_code=HTTPStatus.UNAUTHORIZED)
- def validate_user(self, user, password):
+ def validate_user(self, user, password, otp=None):
"""
Validate username and password via appropriate backend.
:param user: username of the user.
raise AuthException(
"Failed to login as the account is expired"
)
-
+ if otp:
+ return user_content
salt = user_content["_admin"]["salt"]
shadow_password = sha256(
password.encode("utf-8") + salt.encode("utf-8")
user = credentials.get("username")
password = credentials.get("password")
project = credentials.get("project_id")
+ otp_validation = credentials.get("otp")
# Try using username/password
- if user:
+ if otp_validation:
+ user_content = self.validate_user(user, password=None, otp=otp_validation)
+ elif user:
user_content = self.validate_user(user, password)
if not user_content:
cef_event(
BaseTopic.format_on_edit(user_data, user_info)
# User Name
usnm = user_info.get("username")
+ email_id = user_info.get("email_id")
if usnm:
user_data["username"] = usnm
+ if email_id:
+ user_data["email_id"] = email_id
# If password is given and is not already encripted
pswd = user_info.get("password")
if pswd and (
{BaseTopic.id_field("projects", project_id): project_id},
project_info,
)
+
+ def generate_otp(self):
+ otp = "".join(random_choice("0123456789") for i in range(0, 4))
+ return otp
+
+ def send_email(self, indata):
+ user = indata.get("username")
+ user_rows = self.db.get_list(self.users_collection, {"username": user})
+ sender_password = None
+ otp_expiry_time = self.config.get("otp_expiry_time", 300)
+ if not re.match(email_schema["pattern"], indata.get("email_id")):
+ raise AuthException(
+ "Invalid email-id",
+ http_code=HTTPStatus.BAD_REQUEST,
+ )
+ if self.config.get("sender_email"):
+ sender_email = self.config["sender_email"]
+ else:
+ raise AuthException(
+ "sender_email not found",
+ http_code=HTTPStatus.NOT_FOUND,
+ )
+ if self.config.get("smtp_server"):
+ smtp_server = self.config["smtp_server"]
+ else:
+ raise AuthException(
+ "smtp server not found",
+ http_code=HTTPStatus.NOT_FOUND,
+ )
+ if self.config.get("smtp_port"):
+ smtp_port = self.config["smtp_port"]
+ else:
+ raise AuthException(
+ "smtp port not found",
+ http_code=HTTPStatus.NOT_FOUND,
+ )
+ sender_password = self.config.get("sender_password") or None
+ if user_rows:
+ user_data = user_rows[0]
+ user_status = user_data["_admin"]["user_status"]
+ if not user_data.get("project_role_mappings", None):
+ raise AuthException(
+ "can't find a default project for this user",
+ http_code=HTTPStatus.UNAUTHORIZED,
+ )
+ if user_status != "active" and user_status != "always-active":
+ raise AuthException(
+ f"User account is {user_status}.Please contact the system administrator.",
+ http_code=HTTPStatus.UNAUTHORIZED,
+ )
+ if user_data.get("email_id"):
+ if user_data["email_id"] == indata.get("email_id"):
+ otp = self.generate_otp()
+ encode_otp = (
+ sha256(
+ otp.encode("utf-8")
+ + user_data["_admin"]["salt"].encode("utf-8")
+ )
+ ).hexdigest()
+ otp_field = {encode_otp: time() + otp_expiry_time, "retries": 0}
+ user_data["OTP"] = otp_field
+ uid = user_data["_id"]
+ idf = BaseTopic.id_field("users", uid)
+ reciever_email = user_data["email_id"]
+ email_template_path = self.config.get("email_template")
+ with open(email_template_path, "r") as et:
+ email_template = et.read()
+ msg = EmailMessage()
+ msg = MIMEMultipart("alternative")
+ html_content = email_template.format(
+ username=user_data["username"],
+ otp=otp,
+ validity=otp_expiry_time // 60,
+ )
+ html = MIMEText(html_content, "html")
+ msg["Subject"] = "OSM password reset request"
+ msg.attach(html)
+ with smtplib.SMTP(smtp_server, smtp_port) as smtp:
+ smtp.starttls()
+ if sender_password:
+ smtp.login(sender_email, sender_password)
+ smtp.sendmail(sender_email, reciever_email, msg.as_string())
+ self.db.set_one(self.users_collection, {idf: uid}, user_data)
+ return {"email": "sent"}
+ else:
+ raise AuthException(
+ "No email id is registered for this user.Please contact the system administrator.",
+ http_code=HTTPStatus.NOT_FOUND,
+ )
+ else:
+ raise AuthException(
+ "user not found",
+ http_code=HTTPStatus.NOT_FOUND,
+ )
+
+ def validate_otp(self, indata):
+ otp = indata.get("otp")
+ user = indata.get("username")
+ user_rows = self.db.get_list(self.users_collection, {"username": user})
+ user_data = user_rows[0]
+ uid = user_data["_id"]
+ idf = BaseTopic.id_field("users", uid)
+ retry_count = self.config.get("retry_count", 3)
+ if user_data:
+ salt = user_data["_admin"]["salt"]
+ actual_otp = sha256(otp.encode("utf-8") + salt.encode("utf-8")).hexdigest()
+ if not user_data.get("OTP"):
+ otp_field = {"retries": 1}
+ user_data["OTP"] = otp_field
+ self.db.set_one(self.users_collection, {idf: uid}, user_data)
+ return {"retries": user_data["OTP"]["retries"]}
+ for key, value in user_data["OTP"].items():
+ curr_time = time()
+ if key == actual_otp and curr_time < value:
+ user_data["OTP"] = {}
+ self.db.set_one(self.users_collection, {idf: uid}, user_data)
+ return {"valid": "True", "password_change": "True"}
+ else:
+ user_data["OTP"]["retries"] += 1
+ self.db.set_one(self.users_collection, {idf: uid}, user_data)
+ if user_data["OTP"].get("retries") >= retry_count:
+ raise AuthException(
+ "Invalid OTP. Maximum retries exceeded",
+ http_code=HTTPStatus.TOO_MANY_REQUESTS,
+ )
+ return {"retry_count": user_data["OTP"]["retries"]}
outdata = token_info = self.authenticator.new_token(
token_info, indata, cherrypy.request.remote
)
+ if outdata.get("email") or outdata.get("otp") == "invalid":
+ return self._format_out(outdata, token_info)
cherrypy.session["Authorization"] = outdata["_id"] # pylint: disable=E1101
self._set_location_header("admin", "v1", "tokens", outdata["_id"])
# for logging
self._format_login(token_info)
+ if outdata.get("otp") == "valid":
+ outdata = {
+ "id": outdata["id"],
+ "message": "valid_otp",
+ "user_id": outdata["user_id"],
+ }
# password expiry check
- if self.authenticator.check_password_expiry(outdata):
+ elif self.authenticator.check_password_expiry(outdata):
outdata = {
"id": outdata["id"],
"message": "change_password",
elif k == "OSMNBI_ACCOUNT_EXPIRE_DAYS":
account_expire_days = int(v)
engine_config["authentication"]["account_expire_days"] = account_expire_days
+ elif k == "OSMNBI_SMTP_SERVER":
+ engine_config["authentication"]["smtp_server"] = v
+ engine_config["authentication"]["all"] = environ
+ elif k == "OSMNBI_SMTP_PORT":
+ port = int(v)
+ engine_config["authentication"]["smtp_port"] = port
+ elif k == "OSMNBI_SENDER_EMAIL":
+ engine_config["authentication"]["sender_email"] = v
+ elif k == "OSMNBI_EMAIL_PASSWORD":
+ engine_config["authentication"]["sender_password"] = v
+ elif k == "OSMNBI_OTP_RETRY_COUNT":
+ otp_retry_count = int(v)
+ engine_config["authentication"]["retry_count"] = otp_retry_count
+ elif k == "OSMNBI_OTP_EXPIRY_TIME":
+ otp_expiry_time = int(v)
+ engine_config["authentication"]["otp_expiry_time"] = otp_expiry_time
if not k.startswith("OSMNBI_"):
continue
k1, _, k2 = k[7:].lower().partition("_")