+ exmsg = "Error during token revocation using internal backend"
+ self.logger.exception(exmsg)
+ raise AuthException(exmsg, http_code=HTTPStatus.UNAUTHORIZED)
+
+ def validate_user(self, user, password):
+ """
+ Validate username and password via appropriate backend.
+ :param user: username of the user.
+ :param password: password to be validated.
+ """
+ user_rows = self.db.get_list(
+ self.users_collection, {BaseTopic.id_field("users", user): user}
+ )
+ now = time()
+ user_content = None
+ if user:
+ user_rows = self.db.get_list(
+ self.users_collection,
+ {BaseTopic.id_field(self.users_collection, user): user},
+ )
+ if user_rows:
+ user_content = user_rows[0]
+ # Updating user_status for every system_admin id role login
+ mapped_roles = user_content.get("project_role_mappings")
+ for role in mapped_roles:
+ role_id = role.get("role")
+ role_assigned = self.db.get_one(
+ self.roles_collection,
+ {BaseTopic.id_field(self.roles_collection, role_id): role_id},
+ )
+
+ if role_assigned.get("permissions")["admin"]:
+ if role_assigned.get("permissions")["default"]:
+ if self.config.get("user_management"):
+ filt = {}
+ users = self.db.get_list(self.users_collection, filt)
+ for user_info in users:
+ if not user_info.get("username") == "admin":
+ if not user_info.get("_admin").get(
+ "account_expire_time"
+ ):
+ expire = now + 86400 * self.config.get(
+ "account_expire_days"
+ )
+ self.db.set_one(
+ self.users_collection,
+ {"_id": user_info["_id"]},
+ {"_admin.account_expire_time": expire},
+ )
+ else:
+ if now > user_info.get("_admin").get(
+ "account_expire_time"
+ ):
+ self.db.set_one(
+ self.users_collection,
+ {"_id": user_info["_id"]},
+ {"_admin.user_status": "expired"},
+ )
+ break
+
+ # To add "admin" user_status key while upgrading osm setup with feature enabled
+ if user_content.get("username") == "admin":
+ if self.config.get("user_management"):
+ self.db.set_one(
+ self.users_collection,
+ {"_id": user_content["_id"]},
+ {"_admin.user_status": "always-active"},
+ )
+
+ if not user_content.get("username") == "admin":
+ if self.config.get("user_management"):
+ if not user_content.get("_admin").get("account_expire_time"):
+ account_expire_time = now + 86400 * self.config.get(
+ "account_expire_days"
+ )
+ self.db.set_one(
+ self.users_collection,
+ {"_id": user_content["_id"]},
+ {"_admin.account_expire_time": account_expire_time},
+ )
+ else:
+ account_expire_time = user_content.get("_admin").get(
+ "account_expire_time"
+ )
+
+ if now > account_expire_time:
+ self.db.set_one(
+ self.users_collection,
+ {"_id": user_content["_id"]},
+ {"_admin.user_status": "expired"},
+ )
+ raise AuthException(
+ "Account expired", http_code=HTTPStatus.UNAUTHORIZED
+ )
+
+ if user_content.get("_admin").get("user_status") == "locked":
+ raise AuthException(
+ "Failed to login as the account is locked due to MANY FAILED ATTEMPTS"
+ )
+ elif user_content.get("_admin").get("user_status") == "expired":
+ raise AuthException(
+ "Failed to login as the account is expired"
+ )