from datetime import datetime, timedelta, timezone from typing import Optional from uuid import uuid4 from jose import JWTError, jwt import bcrypt from fastapi import Depends, HTTPException, Query, Request, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlalchemy.orm import Session from app.config import get_settings from app.database import get_db from app.models.admin_user import AdminUser from app.models.revoked_token import RevokedToken ALGORITHM = "HS256" BCRYPT_MAX_PASSWORD_BYTES = 72 bearer_scheme = HTTPBearer(auto_error=False) def validate_password_supported(password: str) -> None: if len(password.encode("utf-8")) > BCRYPT_MAX_PASSWORD_BYTES: raise ValueError("password must be at most 72 bytes when UTF-8 encoded") def hash_password(password: str) -> str: validate_password_supported(password) pw = password.encode("utf-8") return bcrypt.hashpw(pw, bcrypt.gensalt()).decode("utf-8") def verify_password(plain: str, hashed: str) -> bool: validate_password_supported(plain) pw = plain.encode("utf-8") return bcrypt.checkpw(pw, hashed.encode("utf-8")) def create_access_token(email: str, expires_hours: Optional[int] = None) -> str: settings = get_settings() hours = expires_hours or settings.jwt_expire_hours expire = datetime.now(timezone.utc) + timedelta(hours=hours) data = {"sub": email, "exp": expire, "jti": uuid4().hex} return jwt.encode(data, settings.jwt_secret, algorithm=ALGORITHM) def decode_token_payload(token: str) -> Optional[dict]: settings = get_settings() try: return jwt.decode(token, settings.jwt_secret, algorithms=[ALGORITHM]) except JWTError: return None def decode_token(token: str) -> Optional[str]: payload = decode_token_payload(token) return payload.get("sub") if payload else None def _is_revoked(db: Session, jti: str | None) -> bool: if not jti: return True return db.query(RevokedToken).filter(RevokedToken.jti == jti).first() is not None def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(bearer_scheme), db: Session = Depends(get_db), ) -> AdminUser: token = credentials.credentials if credentials else None if not token: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated") payload = decode_token_payload(token) if not payload or _is_revoked(db, payload.get("jti")): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") user = db.query(AdminUser).filter(AdminUser.email == payload.get("sub")).first() if not user: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found") return user def get_user_from_token_param( token: Optional[str] = Query(default=None), credentials: HTTPAuthorizationCredentials = Depends(bearer_scheme), db: Session = Depends(get_db), ) -> AdminUser: """Accept JWT from ?token= query param (for iframe src) OR Authorization header.""" raw = token or (credentials.credentials if credentials else None) if not raw: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated") payload = decode_token_payload(raw) if not payload or _is_revoked(db, payload.get("jti")): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") user = db.query(AdminUser).filter(AdminUser.email == payload.get("sub")).first() if not user: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found") return user