"""Remote browser session API.""" from __future__ import annotations import asyncio import hashlib import json import logging from typing import Any, Literal, Optional from fastapi import APIRouter, Depends, HTTPException, Query, WebSocket, WebSocketDisconnect from fastapi.responses import Response from pydantic import BaseModel, Field from sqlalchemy.orm import Session from app.database import get_db from app.models.custom_page import CustomPage from app.services.browser_session_service import ( BrowserDependencyError, BrowserSessionError, browser_sessions, ) from app.utils.auth import decode_token, get_current_user, get_user_from_token_param from app.database import SessionLocal from app.models.admin_user import AdminUser logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/browser-sessions", tags=["browser-sessions"]) class BrowserSessionCreate(BaseModel): custom_page_id: int width: int = Field(default=1280) height: int = Field(default=720) class BrowserTabResponse(BaseModel): id: str title: str url: str created_at: float class BrowserSessionResponse(BaseModel): id: str custom_page_id: int url: str title: str active_tab_id: Optional[str] = None tabs: Optional[list[BrowserTabResponse]] = None tab_revision: Optional[int] = 0 class BrowserSelectionResponse(BaseModel): text: str class BrowserEvent(BaseModel): type: Literal["click", "dblclick", "mousemove", "mousedown", "mouseup", "type", "key", "scroll", "reload", "back", "forward", "resize"] x: Optional[float] = None y: Optional[float] = None button: Optional[Literal["left", "right", "middle"]] = "left" text: Optional[str] = None key: Optional[str] = None delta_x: Optional[float] = 0 delta_y: Optional[float] = 0 width: Optional[int] = None height: Optional[int] = None def _error_from_browser(exc: Exception) -> HTTPException: if isinstance(exc, BrowserDependencyError): return HTTPException(503, str(exc)) if isinstance(exc, BrowserSessionError): return HTTPException(409, str(exc)) if isinstance(exc, KeyError): return HTTPException(404, "browser session not found") if isinstance(exc, ValueError): return HTTPException(400, str(exc)) return HTTPException(502, f"Browser error: {exc}") @router.post("", response_model=BrowserSessionResponse, status_code=201) async def create_session( body: BrowserSessionCreate, db: Session = Depends(get_db), _=Depends(get_current_user), ): page = db.query(CustomPage).filter(CustomPage.id == body.custom_page_id).first() if not page or not page.enabled: raise HTTPException(404, "page not found") if page.access_mode != "remote_browser": raise HTTPException(400, "custom page is not configured for remote browser mode") login_config = { "enabled": page.login_autofill_enabled, "username": page.login_username, "password": page.login_password, "username_selector": page.login_username_selector, "password_selector": page.login_password_selector, "submit_selector": page.login_submit_selector, } try: session = await browser_sessions.create(page.id, page.url, body.width, body.height, login_config) return await browser_sessions.state(session.id) except Exception as exc: raise _error_from_browser(exc) @router.get("/{session_id}", response_model=BrowserSessionResponse) async def get_session(session_id: str, _=Depends(get_current_user)): try: return await browser_sessions.state(session_id) except Exception as exc: raise _error_from_browser(exc) @router.get("/{session_id}/screenshot") async def session_screenshot(session_id: str, _=Depends(get_user_from_token_param)): try: image = await browser_sessions.screenshot(session_id) except Exception as exc: raise _error_from_browser(exc) return Response(content=image, media_type="image/jpeg", headers={"Cache-Control": "no-store"}) @router.post("/{session_id}/events", response_model=BrowserSessionResponse) async def send_event(session_id: str, body: BrowserEvent, _=Depends(get_current_user)): try: payload: dict[str, Any] = body.model_dump(exclude_none=True) event_type = payload.pop("type") return await browser_sessions.event(session_id, event_type, payload) except Exception as exc: raise _error_from_browser(exc) @router.post("/{session_id}/tabs/{tab_id}/activate", response_model=BrowserSessionResponse) async def activate_tab(session_id: str, tab_id: str, _=Depends(get_current_user)): try: return await browser_sessions.activate_tab(session_id, tab_id) except Exception as exc: raise _error_from_browser(exc) @router.delete("/{session_id}/tabs/{tab_id}", response_model=BrowserSessionResponse) async def close_tab(session_id: str, tab_id: str, _=Depends(get_current_user)): try: return await browser_sessions.close_tab(session_id, tab_id) except Exception as exc: raise _error_from_browser(exc) @router.get("/{session_id}/selection", response_model=BrowserSelectionResponse) async def get_selection(session_id: str, _=Depends(get_current_user)): try: return BrowserSelectionResponse(text=await browser_sessions.selected_text(session_id)) except Exception as exc: raise _error_from_browser(exc) class BrowserClipboardResponse(BaseModel): text: Optional[str] = None error: Optional[str] = None @router.get("/{session_id}/clipboard", response_model=BrowserClipboardResponse) async def session_clipboard(session_id: str, _=Depends(get_current_user)): """Read text from the remote browser's clipboard.""" from fastapi.responses import JSONResponse try: text, error = await browser_sessions.read_clipboard(session_id) body: dict[str, Any] = {} if text: body["text"] = text elif error == "denied": body["error"] = "远程浏览器未授予剪贴板读取权限" elif error == "read_failed": body["error"] = "读取远程剪贴板时发生内部错误" else: if error: logger.warning("clipboard read error for %s: %s", session_id[:12], error) body["error"] = "远程剪贴板为空" return JSONResponse(content=body, headers={"Cache-Control": "no-store"}) except Exception as exc: raise _error_from_browser(exc) class AutofillLoginResponse(BaseModel): success: bool message: str @router.post("/{session_id}/autofill-login", response_model=AutofillLoginResponse) async def autofill_login(session_id: str, _=Depends(get_current_user)): """Manually trigger login autofill for the remote browser session. Uses the linked custom page's saved credentials. Never returns passwords. """ try: session_state = await browser_sessions.state(session_id) except Exception as exc: raise _error_from_browser(exc) from app.database import SessionLocal as _Db from app.models.custom_page import CustomPage db = _Db() try: page = db.query(CustomPage).filter( CustomPage.id == session_state["custom_page_id"] ).first() if not page or not page.enabled: raise HTTPException(400, "linked custom page is not available") if page.access_mode != "remote_browser": raise HTTPException(400, "linked custom page is not in remote browser mode") if not page.login_autofill_enabled: return AutofillLoginResponse(success=False, message="该页面未启用自动填充登录") if not page.login_username or not page.login_password: return AutofillLoginResponse(success=False, message="该页面未保存账号密码") login_config = { "enabled": True, "username": page.login_username, "password": page.login_password, "username_selector": page.login_username_selector, "password_selector": page.login_password_selector, "submit_selector": page.login_submit_selector, } filled = await browser_sessions.autofill_login(session_id, login_config) if filled: return AutofillLoginResponse(success=True, message="已填入账号密码") return AutofillLoginResponse( success=False, message="未找到登录输入框,请先关闭弹窗或进入登录页后重试", ) finally: db.close() @router.delete("/{session_id}", status_code=204) async def close_session(session_id: str, _=Depends(get_current_user)): await browser_sessions.close(session_id) @router.delete("/profiles/{custom_page_id}", status_code=204) async def clear_profile(custom_page_id: int, _=Depends(get_current_user)): """Close active session for the page and delete its profile directory. On next open the browser starts fresh, losing login state. """ from app.models.custom_page import CustomPage from app.database import SessionLocal as _Db db = _Db() try: page = db.query(CustomPage).filter(CustomPage.id == custom_page_id).first() if not page or not page.enabled: raise HTTPException(404, "custom page not found") if page.access_mode != "remote_browser": raise HTTPException(400, "custom page is not in remote browser mode") try: await browser_sessions.clear_profile(custom_page_id, page.url) except RuntimeError as exc: raise HTTPException(500, str(exc)) finally: db.close() # ——— WebSocket stream ——— # Frame interval & diff detection (tuned for CPU efficiency) _WS_MIN_INTERVAL = 0.15 _WS_IDLE_INTERVAL = 1.00 _WS_ACTIVE_INTERVAL = 0.20 _WS_BACKOFF_INTERVAL = 2.00 _WS_DEEP_IDLE_INTERVAL = 5.00 _WS_ACTIVE_WINDOW = 1.25 async def _ws_authenticate(token: Optional[str]) -> bool: """Validate JWT token for WebSocket connections.""" if not token: return False email = decode_token(token) if not email: return False db = SessionLocal() try: user = db.query(AdminUser).filter(AdminUser.email == email).first() return user is not None finally: db.close() @router.websocket("/{session_id}/ws") async def session_ws( websocket: WebSocket, session_id: str, token: Optional[str] = Query(default=None), ): """WebSocket endpoint: pushes JPEG frames as binary, receives JSON event messages.""" # Authenticate before accepting if not await _ws_authenticate(token): await websocket.close(code=4401) return await websocket.accept() # Track when a user event arrived so we can temporarily speed up last_event_at: float = 0.0 last_frame_hash: str = "" unchanged_count = 0 # Task: receive events from client async def receive_loop(): nonlocal last_event_at, unchanged_count try: while True: raw = await websocket.receive_text() try: msg = json.loads(raw) except json.JSONDecodeError: continue msg_type = msg.get("type") if not msg_type: continue payload: dict[str, Any] = {k: v for k, v in msg.items() if k != "type"} try: await browser_sessions.event(session_id, msg_type, payload, include_state=False) last_event_at = asyncio.get_event_loop().time() unchanged_count = 0 except Exception as exc: logger.warning("ws event error: %s", exc) try: await websocket.send_json({"error": str(exc)}) except Exception: pass except (WebSocketDisconnect, asyncio.CancelledError): pass except Exception as exc: logger.debug("ws receive_loop ended: %s", exc) # Task: push screenshots async def push_loop(): nonlocal last_frame_hash, unchanged_count last_tab_revision = -1 try: while True: now = asyncio.get_event_loop().time() if (now - last_event_at) < _WS_ACTIVE_WINDOW: interval = _WS_ACTIVE_INTERVAL elif unchanged_count >= 9: interval = _WS_DEEP_IDLE_INTERVAL elif unchanged_count >= 3: interval = _WS_BACKOFF_INTERVAL else: interval = _WS_IDLE_INTERVAL try: # Check for tab state changes session_obj = browser_sessions.get_session(session_id) if session_obj.tab_revision != last_tab_revision: last_tab_revision = session_obj.tab_revision state = await browser_sessions.state(session_id) await websocket.send_json({"type": "state", "session": state}) frame = await asyncio.wait_for( browser_sessions.screenshot(session_id), timeout=5.0) except asyncio.TimeoutError: logger.warning("ws screenshot timeout for %s", session_id[:12]) await asyncio.sleep(interval) continue except KeyError: await websocket.send_json({"error": "session_not_found"}) break except Exception as exc: logger.warning("ws screenshot error: %s", exc) await asyncio.sleep(interval) continue frame_hash = hashlib.md5(frame).hexdigest() if frame_hash != last_frame_hash: last_frame_hash = frame_hash unchanged_count = 0 try: await websocket.send_bytes(frame) except Exception: break else: unchanged_count += 1 await asyncio.sleep(max(_WS_MIN_INTERVAL, interval)) except (WebSocketDisconnect, asyncio.CancelledError): pass except Exception as exc: logger.debug("ws push_loop ended: %s", exc) # Send initial metadata so client knows session info try: state = await browser_sessions.state(session_id) await websocket.send_json({"type": "init", "session": state}) except Exception as exc: await websocket.send_json({"error": f"session error: {exc}"}) await websocket.close() return recv_task = asyncio.create_task(receive_loop()) push_task = asyncio.create_task(push_loop()) # Run until one side closes done, pending = await asyncio.wait( [recv_task, push_task], return_when=asyncio.FIRST_COMPLETED, ) for t in pending: t.cancel() try: await t except asyncio.CancelledError: pass