- if user:
- user_rows = self.db.get_list(
- self.users_collection,
- {BaseTopic.id_field(self.users_collection, user): user},
- )
- if user_rows:
- user_content = user_rows[0]
- # Updating user_status for every system_admin id role login
- mapped_roles = user_content.get("project_role_mappings")
- for role in mapped_roles:
- role_id = role.get("role")
- role_assigned = self.db.get_one(
- self.roles_collection,
- {BaseTopic.id_field(self.roles_collection, role_id): role_id},
- )
-
- if role_assigned.get("permissions")["admin"]:
- if role_assigned.get("permissions")["default"]:
- if self.config.get("user_management"):
- filt = {}
- users = self.db.get_list(self.users_collection, filt)
- for user_info in users:
- if not user_info.get("username") == "admin":
- if not user_info.get("_admin").get(
- "account_expire_time"
- ):
- expire = now + 86400 * self.config.get(
- "account_expire_days"
- )
- self.db.set_one(
- self.users_collection,
- {"_id": user_info["_id"]},
- {"_admin.account_expire_time": expire},
- )
- else:
- if now > user_info.get("_admin").get(
- "account_expire_time"
- ):
- self.db.set_one(
- self.users_collection,
- {"_id": user_info["_id"]},
- {"_admin.user_status": "expired"},
- )
- break
-
- # To add "admin" user_status key while upgrading osm setup with feature enabled
- if user_content.get("username") == "admin":
- if self.config.get("user_management"):
- self.db.set_one(
- self.users_collection,
- {"_id": user_content["_id"]},
- {"_admin.user_status": "always-active"},
- )
-
- if not user_content.get("username") == "admin":
- if self.config.get("user_management"):
- if not user_content.get("_admin").get("account_expire_time"):
- account_expire_time = now + 86400 * self.config.get(
- "account_expire_days"
- )
- self.db.set_one(
- self.users_collection,
- {"_id": user_content["_id"]},
- {"_admin.account_expire_time": account_expire_time},
- )
- else:
- account_expire_time = user_content.get("_admin").get(
- "account_expire_time"
- )
-
- if now > account_expire_time:
- self.db.set_one(
- self.users_collection,
- {"_id": user_content["_id"]},
- {"_admin.user_status": "expired"},
- )
- raise AuthException(
- "Account expired", http_code=HTTPStatus.UNAUTHORIZED
- )
-
- if user_content.get("_admin").get("user_status") == "locked":
- raise AuthException(
- "Failed to login as the account is locked due to MANY FAILED ATTEMPTS"
- )
- elif user_content.get("_admin").get("user_status") == "expired":
- raise AuthException(
- "Failed to login as the account is expired"
- )
-
- salt = user_content["_admin"]["salt"]
- shadow_password = sha256(
- password.encode("utf-8") + salt.encode("utf-8")
- ).hexdigest()
- if shadow_password != user_content["password"]:
- count = 1
- if user_content.get("_admin").get("retry_count") >= 0:
- count += user_content.get("_admin").get("retry_count")
- self.db.set_one(
- self.users_collection,
- {"_id": user_content["_id"]},
- {"_admin.retry_count": count},
- )
- self.logger.debug(
- "Failed Authentications count: {}".format(count)
- )
-
- if user_content.get("username") == "admin":
- user_content = None
- else:
- if not self.config.get("user_management"):
- user_content = None
- else:
- if (
- user_content.get("_admin").get("retry_count")
- >= self.config["max_pwd_attempt"] - 1
- ):
- self.db.set_one(
- self.users_collection,
- {"_id": user_content["_id"]},
- {"_admin.user_status": "locked"},
- )
- raise AuthException(
- "Failed to login as the account is locked due to MANY FAILED ATTEMPTS"
- )
- else:
- user_content = None