"""Auth capture API — remote browser for manual login + credential extraction.""" from __future__ import annotations import logging from typing import Any, Optional from fastapi import APIRouter, Depends, HTTPException, Query from pydantic import BaseModel, Field from sqlalchemy.orm import Session from app.database import get_db from app.services.auth_capture_service import extract_all from app.services.browser_session_service import ( BrowserDependencyError, BrowserSessionError, browser_sessions, ) from app.utils.auth import get_current_user, get_user_from_token_param logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/auth-capture", tags=["auth-capture"]) SENSITIVE_CANDIDATE_FIELDS = frozenset({"value", "cookie_value"}) class CaptureSessionCreate(BaseModel): url: str = Field(..., description="Target login page URL to open in browser") width: int = Field(default=1280, ge=320, le=2560) height: int = Field(default=720, ge=240, le=1600) class CaptureSessionResponse(BaseModel): session_id: str ws_url: str class CaptureExtractResponse(BaseModel): cookies: list[dict] = [] storage: dict[str, str] = {} session_storage: dict[str, str] = {} auth_headers: list[dict] = [] candidates: list[dict] = [] def _sanitize_candidate(candidate: dict[str, Any]) -> dict[str, Any]: return { key: value for key, value in candidate.items() if key not in SENSITIVE_CANDIDATE_FIELDS } def _browser_error(exc: Exception) -> HTTPException: if isinstance(exc, BrowserDependencyError): return HTTPException(503, str(exc)) if isinstance(exc, BrowserSessionError): return HTTPException(409, str(exc)) if isinstance(exc, KeyError): return HTTPException(404, "session not found") if isinstance(exc, ValueError): return HTTPException(400, str(exc)) logger.exception("auth-capture error") return HTTPException(500, "internal error") def _ws_url(session_id: str, token: str) -> str: """Build WebSocket URL for the remote browser viewer.""" return f"/api/browser-sessions/{session_id}/ws?token={token}" @router.post("/sessions", response_model=CaptureSessionResponse, status_code=201) async def create_capture_session( body: CaptureSessionCreate, _=Depends(get_current_user), ): """Create a temporary browser session pointing at the given URL. Returns a session_id and ws_url for the frontend to view/interact. The user should manually log in, then call GET /extract. """ try: session = await browser_sessions.create_ephemeral( url=body.url, width=body.width, height=body.height, ) except Exception as exc: raise _browser_error(exc) # Build a short-lived token for WS auth (reuse current user's token logic) # The frontend already has the user's Bearer token, pass it via query param return CaptureSessionResponse( session_id=session.id, ws_url=f"/api/browser-sessions/{session.id}/ws", ) @router.get("/sessions/{session_id}/extract", response_model=CaptureExtractResponse) async def extract_credentials( session_id: str, include_raw: bool = Query(default=False, description="Include full cookies/storage/headers in response"), _=Depends(get_current_user), ): """Extract auth credentials from the browser session. By default only returns curated candidates (typed, scored, with masked preview). Pass include_raw=true to also get full cookies, localStorage, and headers. """ try: session = browser_sessions.get_session(session_id) except KeyError: raise HTTPException(404, "session not found") try: result = await extract_all(session) except Exception as exc: raise _browser_error(exc) if not include_raw: # Strip raw data — only keep curated candidates with masked previews candidates = [_sanitize_candidate(candidate) for candidate in result.get("candidates", [])] return CaptureExtractResponse(candidates=candidates) return CaptureExtractResponse(**result) @router.delete("/sessions/{session_id}", status_code=204) async def close_capture_session( session_id: str, _=Depends(get_current_user), ): """Close and release the auth-capture browser session.""" try: await browser_sessions.close(session_id) except Exception as exc: raise _browser_error(exc)