Files
SmartUp/backend/app/routers/auth.py
T
SmartUp Developer ad16618406 fix: address multiple code audit findings
- CORS: replace wildcard with explicit origin list from CORS_ORIGINS env
- Auth: enforce strong defaults, JWT blacklist (RevokedToken model), login rate limiting
- Auth: validate password length before bcrypt (72-byte limit)
- Scheduler: single-threaded worker to mitigate SQLite write contention
- Scheduler: graceful shutdown (wait=True)
- Snapshots: add prune_snapshots() with configurable retention count
- Storage: isolate localStorage keys via VITE_APP_KEY prefix
- Config: add cors_origins, login_rate_limit, snapshot_retention_count settings
2026-05-17 10:52:18 +08:00

85 lines
3.4 KiB
Python

from datetime import datetime, timezone
from threading import Lock
from fastapi import APIRouter, Depends, HTTPException, Request, status
from fastapi.security import HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from app.config import get_settings
from app.database import get_db
from app.models.admin_user import AdminUser
from app.models.revoked_token import RevokedToken
from app.schemas.auth import LoginRequest, TokenResponse, UserInfo
from app.utils.auth import bearer_scheme, create_access_token, decode_token_payload, get_current_user, verify_password
router = APIRouter(prefix="/api/auth", tags=["auth"])
_login_attempts: dict[tuple[str, str], list[float]] = {}
_login_attempts_lock = Lock()
def _login_key(request: Request, email: str) -> tuple[str, str]:
forwarded = request.headers.get("x-forwarded-for", "").split(",", 1)[0].strip()
ip = forwarded or (request.client.host if request.client else "unknown")
return ip, email.lower()
def _check_login_limit(key: tuple[str, str]) -> None:
settings = get_settings()
now = datetime.now(timezone.utc).timestamp()
cutoff = now - settings.login_rate_limit_window_seconds
with _login_attempts_lock:
attempts = [item for item in _login_attempts.get(key, []) if item >= cutoff]
if len(attempts) >= settings.login_rate_limit_attempts:
_login_attempts[key] = attempts
raise HTTPException(status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail="登录尝试过多,请稍后再试")
_login_attempts[key] = attempts
def _record_login_failure(key: tuple[str, str]) -> None:
with _login_attempts_lock:
_login_attempts.setdefault(key, []).append(datetime.now(timezone.utc).timestamp())
def _clear_login_failures(key: tuple[str, str]) -> None:
with _login_attempts_lock:
_login_attempts.pop(key, None)
@router.post("/login", response_model=TokenResponse)
def login(req: LoginRequest, request: Request, db: Session = Depends(get_db)):
key = _login_key(request, req.email)
_check_login_limit(key)
user = db.query(AdminUser).filter(AdminUser.email == req.email).first()
try:
password_ok = bool(user and verify_password(req.password, user.password_hash))
except ValueError:
password_ok = False
if not password_ok:
_record_login_failure(key)
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="邮箱或密码错误")
_clear_login_failures(key)
token = create_access_token(user.email)
return TokenResponse(access_token=token)
@router.get("/me", response_model=UserInfo)
def me(current_user: AdminUser = Depends(get_current_user)):
return UserInfo(email=current_user.email)
@router.post("/logout")
def logout(
credentials: HTTPAuthorizationCredentials = Depends(bearer_scheme),
db: Session = Depends(get_db),
_current_user: AdminUser = Depends(get_current_user),
):
payload = decode_token_payload(credentials.credentials)
jti = payload.get("jti") if payload else None
exp = payload.get("exp") if payload else None
if jti and exp and not db.query(RevokedToken).filter(RevokedToken.jti == jti).first():
expires_at = datetime.fromtimestamp(exp, tz=timezone.utc)
db.add(RevokedToken(jti=jti, expires_at=expires_at))
db.query(RevokedToken).filter(RevokedToken.expires_at < datetime.now(timezone.utc)).delete(synchronize_session=False)
db.commit()
return {"message": "logged out"}