Files
SmartUp/backend/app/services/browser_session_service.py
T
liumangmang 6cc797f915 feat: remote browser login persistence + balance display + UI consistency
- Retain login state in remote browser profiles (don't delete on disconnect)
- Add GET /api/browser-sessions/{id}/clipboard for clipboard sync
- Add POST /api/browser-sessions/{id}/autofill-login for manual credential fill
- Add DELETE /api/browser-sessions/profiles/{custom_page_id} for login clear
- Add balance tracking with configurable divisor (balance_divisor)
- Health check on session reuse, idle TTL eviction, background cleanup
- Add first-frame watchdog (10s timeout) to prevent infinite loading
- Reconnect browser on active=true when session was closed
- UI: uniform text-only inline buttons (websites + upstreams pages)
- Fix page switch race with closingRemoteSessionPromise
2026-05-20 09:44:20 +08:00

618 lines
26 KiB
Python

"""Managed Playwright browser sessions for custom pages."""
from __future__ import annotations
import asyncio
import logging
import re
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Optional
from urllib.parse import urlparse
from uuid import uuid4
from app.config import get_settings
logger = logging.getLogger(__name__)
class BrowserDependencyError(RuntimeError):
"""Raised when Playwright or its browser runtime is unavailable."""
class BrowserSessionError(RuntimeError):
"""Raised when an existing browser session can no longer be used."""
@dataclass
class BrowserSession:
id: str
custom_page_id: int
profile_key: str
context: Any
page: Any
lock: asyncio.Lock
cdp_session: Any = None
captured_headers: list[dict] = None # auth headers from CDP
class BrowserSessionService:
# Idle TTL: close sessions that haven't had activity for this long
IDLE_TTL_SECONDS = 1800 # 30 minutes
# Cap: max concurrent persistent sessions (excludes auth-capture)
MAX_SESSIONS = 10
def __init__(self) -> None:
self._playwright: Optional[Any] = None
self._sessions: dict[str, BrowserSession] = {}
self._profiles: dict[str, str] = {}
self._lock = asyncio.Lock()
self._last_event_at: dict[str, float] = {}
self._evict_task: Optional[asyncio.Task[None]] = None
async def create(
self,
custom_page_id: int,
url: str,
width: int = 1280,
height: int = 720,
login_config: Optional[dict[str, Any]] = None,
) -> BrowserSession:
if not url.startswith(("http://", "https://")):
raise ValueError("Only http/https URLs are allowed")
width = max(320, min(width, 2560))
height = max(240, min(height, 1600))
async with self._lock:
await self._ensure_playwright()
profile_key = self._profile_key(custom_page_id, url)
existing_id = self._profiles.get(profile_key)
existing = self._sessions.get(existing_id or "")
if existing and not existing.page.is_closed():
# Health check: verify session can actually serve content
healthy = True
try:
async with existing.lock:
url_before = existing.page.url
await existing.page.evaluate("1") # ping
await existing.page.screenshot(type="jpeg", quality=10, timeout=5000)
await existing.page.set_viewport_size({"width": width, "height": height})
if url_before == "about:blank":
await existing.page.goto(url, wait_until="domcontentloaded", timeout=45000)
await self._autofill_login(existing.page, login_config)
await self._reset_page_zoom(existing)
self._touch(existing.id)
except Exception:
logger.info("existing session %s unhealthy, recreating", existing.id[:12])
healthy = False
if healthy:
return existing
# Close unhealthy session (profile stays on disk)
await self.close(existing.id)
if existing_id:
self._profiles.pop(profile_key, None)
# Idle cleanup: close stale sessions before spawning new ones
await self._evict_idle_sessions()
context = await self._playwright.chromium.launch_persistent_context(
str(self._profile_dir(profile_key)),
headless=get_settings().browser_headless,
viewport={"width": width, "height": height},
args=["--no-sandbox", "--disable-dev-shm-usage"],
)
# Grant clipboard access for the page origin
try:
parsed = urlparse(url)
origin = f"{parsed.scheme}://{parsed.netloc}"
await context.grant_permissions(["clipboard-read", "clipboard-write"], origin=origin)
except Exception:
logger.debug("clipboard permission grant failed (non-fatal)")
page = context.pages[0] if context.pages else await context.new_page()
session = BrowserSession(
id=uuid4().hex,
custom_page_id=custom_page_id,
profile_key=profile_key,
context=context,
page=page,
lock=asyncio.Lock(),
)
self._sessions[session.id] = session
self._profiles[profile_key] = session.id
self._touch(session.id)
# Evict again after adding the new session so cap is enforced immediately
await self._evict_idle_sessions()
try:
await page.goto(url, wait_until="domcontentloaded", timeout=45000)
await self._autofill_login(page, login_config)
await self._reset_page_zoom(session)
except Exception:
await self.close(session.id)
raise
return session
def _touch(self, session_id: str) -> None:
"""Mark a session as recently active (reset idle timer)."""
self._last_event_at[session_id] = asyncio.get_event_loop().time()
async def screenshot(self, session_id: str) -> bytes:
session = self._get(session_id)
self._touch(session_id)
async with session.lock:
self._ensure_open(session)
return await session.page.screenshot(type="jpeg", quality=65, full_page=False)
async def event(
self,
session_id: str,
event_type: str,
payload: dict[str, Any],
*,
include_state: bool = True,
) -> dict[str, Any] | None:
session = self._get(session_id)
self._last_event_at[session_id] = asyncio.get_event_loop().time()
async with session.lock:
self._ensure_open(session)
page = session.page
if event_type == "click":
await page.mouse.click(float(payload["x"]), float(payload["y"]), button=payload.get("button", "left"))
elif event_type == "dblclick":
await page.mouse.dblclick(float(payload["x"]), float(payload["y"]), button=payload.get("button", "left"))
elif event_type == "mousemove":
await page.mouse.move(float(payload["x"]), float(payload["y"]))
elif event_type == "mousedown":
await page.mouse.move(float(payload["x"]), float(payload["y"]))
await page.mouse.down(button=payload.get("button", "left"))
elif event_type == "mouseup":
await page.mouse.move(float(payload["x"]), float(payload["y"]))
await page.mouse.up(button=payload.get("button", "left"))
elif event_type == "type":
text = str(payload.get("text", ""))
if text:
await page.keyboard.insert_text(text)
elif event_type == "key":
key = str(payload.get("key", ""))
if key:
await page.keyboard.press(key)
elif event_type == "scroll":
if payload.get("x") is not None and payload.get("y") is not None:
await page.mouse.move(float(payload["x"]), float(payload["y"]))
await page.mouse.wheel(float(payload.get("delta_x", 0)), float(payload.get("delta_y", 0)))
elif event_type == "reload":
await page.reload(wait_until="domcontentloaded", timeout=45000)
elif event_type == "back":
await page.go_back(wait_until="domcontentloaded", timeout=45000)
elif event_type == "forward":
await page.go_forward(wait_until="domcontentloaded", timeout=45000)
elif event_type == "resize":
width = max(320, min(int(payload.get("width", 1280)), 2560))
height = max(240, min(int(payload.get("height", 720)), 1600))
await page.set_viewport_size({"width": width, "height": height})
else:
raise ValueError("Unsupported browser event")
if not include_state:
return None
return await self._session_state(session)
async def selected_text(self, session_id: str) -> str:
session = self._get(session_id)
self._touch(session_id)
async with session.lock:
self._ensure_open(session)
value = await session.page.evaluate("() => window.getSelection()?.toString() || ''")
return str(value or "")
async def read_clipboard(self, session_id: str) -> tuple[Optional[str], Optional[str]]:
"""Read the remote browser's clipboard text.
Returns (text, error_reason).
text is None when the clipboard is empty or unreadable.
error_reason is None on success or "empty" — non-None indicates a genuine failure.
"""
session = self._get(session_id)
self._touch(session_id)
async with session.lock:
self._ensure_open(session)
try:
result = await session.page.evaluate("""
async () => {
try {
const text = await navigator.clipboard.readText();
return text || null;
} catch (e) {
if (e instanceof DOMException) {
if (e.name === 'NotAllowedError') return 'ERROR:denied';
if (e.name === 'NotFoundError') return null;
}
return 'ERROR:' + (e.message || String(e));
}
}
""")
if result is None:
return None, None # empty clipboard
if isinstance(result, str) and result.startswith("ERROR:"):
reason = result[6:]
logger.debug("clipboard read error for %s: %s", session_id[:12], reason)
return None, reason
return str(result), None
except Exception as exc:
logger.warning("clipboard read failed for %s: %s", session_id[:12], exc)
return None, "read_failed"
async def close(self, session_id: str) -> None:
self._last_event_at.pop(session_id, None)
session = self._discard_session(session_id)
if not session:
return
# Detach CDP session if active
if session.cdp_session:
try:
await session.cdp_session.detach()
except Exception:
pass
try:
await session.context.close()
except Exception:
pass
# Clean up ephemeral (auth-capture) profile directories
if session.profile_key and session.profile_key.startswith("auth-capture-"):
profile_dir = self._profile_dir(session.profile_key)
import shutil
try:
shutil.rmtree(profile_dir, ignore_errors=True)
except Exception:
pass
async def shutdown(self) -> None:
# Cancel the background eviction loop
if self._evict_task is not None and not self._evict_task.done():
self._evict_task.cancel()
try:
await self._evict_task
except asyncio.CancelledError:
pass
self._evict_task = None
sessions = list(self._sessions)
for session_id in sessions:
await self.close(session_id)
if self._playwright:
await self._playwright.stop()
self._playwright = None
async def state(self, session_id: str) -> dict[str, Any]:
session = self._get(session_id)
self._touch(session_id)
async with session.lock:
self._ensure_open(session)
return await self._session_state(session)
async def _session_state(self, session: BrowserSession) -> dict[str, Any]:
return {
"id": session.id,
"custom_page_id": session.custom_page_id,
"url": session.page.url,
"title": await session.page.title(),
}
async def _ensure_playwright(self) -> None:
if self._playwright:
return
try:
from playwright.async_api import async_playwright
except ImportError as exc:
raise BrowserDependencyError("Playwright is not installed. Run `pip install -r requirements.txt`.") from exc
try:
self._playwright = await async_playwright().start()
except Exception as exc:
raise BrowserDependencyError(f"Unable to start Playwright: {exc}") from exc
# Start background eviction loop
if self._evict_task is None or self._evict_task.done():
self._evict_task = asyncio.create_task(self._evict_loop())
async def _reset_page_zoom(self, session: BrowserSession) -> None:
try:
cdp = await session.context.new_cdp_session(session.page)
try:
await cdp.send("Emulation.setPageScaleFactor", {"pageScaleFactor": 1})
finally:
await cdp.detach()
except Exception:
pass
async def autofill_login(
self,
session_id: str,
login_config: Optional[dict[str, Any]],
) -> bool:
"""Public: manually trigger login autofill for an active session.
Only fills username/password fields — never auto-submits.
Returns True if fields were found and filled, False otherwise.
Never returns password data to the caller.
"""
session = self._get(session_id)
self._touch(session_id)
async with session.lock:
self._ensure_open(session)
return await self._autofill_login(session.page, login_config, max_wait_seconds=3.0, skip_submit=True)
async def _autofill_login(
self,
page: Any,
config: Optional[dict[str, Any]],
*,
max_wait_seconds: float = 2.0,
poll_interval_seconds: float = 0.25,
skip_submit: bool = False,
) -> bool:
if not config or not config.get("enabled"):
return False
username = str(config.get("username") or "")
password = str(config.get("password") or "")
if not username or not password:
return False
try:
username_selectors = [
config.get("username_selector"),
"input[type='email']",
"input[name*='user' i]",
"input[id*='user' i]",
"input[name*='email' i]",
"input[id*='email' i]",
"input[name*='login' i]",
"input[id*='login' i]",
"input[autocomplete='username']",
"input:not([type]), input[type='text']",
]
password_selectors = [
config.get("password_selector"),
"input[type='password']",
"input[autocomplete='current-password']",
]
username_locator, password_locator = await self._wait_for_login_locators(
page,
username_selectors,
password_selectors,
max_wait_seconds=max_wait_seconds,
poll_interval_seconds=poll_interval_seconds,
)
if not username_locator or not password_locator:
logger.info("Login autofill skipped: login fields not found")
return False
await username_locator.fill(username, timeout=3000)
await password_locator.fill(password, timeout=3000)
if not skip_submit:
submit_selector = str(config.get("submit_selector") or "").strip()
if submit_selector:
submit = await self._first_visible_locator(page, [submit_selector], timeout=500)
if submit:
await submit.click(timeout=3000)
return True
except Exception as exc:
logger.info("Login autofill skipped: %s", exc)
return False
async def _wait_for_login_locators(
self,
page: Any,
username_selectors: list[Optional[str]],
password_selectors: list[Optional[str]],
*,
max_wait_seconds: float,
poll_interval_seconds: float,
) -> tuple[Optional[Any], Optional[Any]]:
deadline = time.monotonic() + max_wait_seconds
while True:
username_locator = await self._first_visible_locator(page, username_selectors, timeout=150)
password_locator = await self._first_visible_locator(page, password_selectors, timeout=150)
if username_locator and password_locator:
return username_locator, password_locator
if time.monotonic() >= deadline:
return None, None
await asyncio.sleep(poll_interval_seconds)
async def _first_visible_locator(
self,
page: Any,
selectors: list[Optional[str]],
*,
timeout: float = 1500,
) -> Optional[Any]:
for selector in selectors:
selector = str(selector or "").strip()
if not selector:
continue
try:
locator = page.locator(selector).first
if await locator.count() and await locator.is_visible(timeout=timeout):
return locator
except Exception:
continue
return None
def get_session(self, session_id: str) -> BrowserSession:
"""Retrieve a session by id — raises KeyError if missing."""
session = self._sessions.get(session_id)
if not session:
raise KeyError("browser session not found")
return session
def find_by_page_id(self, custom_page_id: int) -> BrowserSession:
"""Find the active session for a custom page. Raises KeyError if none."""
for session in self._sessions.values():
if session.custom_page_id == custom_page_id and not session.page.is_closed():
return session
raise KeyError(f"no active browser session for page {custom_page_id}")
_get = get_session # alias for internal use
def _ensure_open(self, session: BrowserSession) -> None:
if session.page.is_closed():
self._discard_session(session.id)
raise BrowserSessionError("browser page is closed")
def _discard_session(self, session_id: str) -> BrowserSession | None:
session = self._sessions.pop(session_id, None)
if session and self._profiles.get(session.profile_key) == session_id:
self._profiles.pop(session.profile_key, None)
return session
async def _evict_loop(self) -> None:
"""Background loop that runs every 5 minutes to evict idle sessions."""
while True:
await asyncio.sleep(300) # 5 minutes
try:
await self._evict_idle_sessions()
except Exception:
logger.exception("idle eviction loop error")
async def _evict_idle_sessions(self) -> None:
"""Close oldest idle sessions when over cap, or any past TTL."""
now = asyncio.get_event_loop().time()
# First: drop sessions past idle TTL (excluding just-created ones)
to_remove: list[str] = []
for sid, session in self._sessions.items():
if session.profile_key and session.profile_key.startswith("auth-capture-"):
continue # ephemeral sessions are handled separately
last_active = self._last_event_at.get(sid, 0.0)
if last_active > 0 and (now - last_active) > self.IDLE_TTL_SECONDS:
to_remove.append(sid)
for sid in to_remove:
logger.info("evicting idle session %s (no activity for >%ds)", sid[:12], self.IDLE_TTL_SECONDS)
await self.close(sid)
# Second: if still over cap, evict oldest by last_event_at
persistent = [(sid, s) for sid, s in self._sessions.items()
if not (s.profile_key or "").startswith("auth-capture-")]
if len(persistent) > self.MAX_SESSIONS:
persistent.sort(key=lambda x: self._last_event_at.get(x[0], 0.0))
excess = len(persistent) - self.MAX_SESSIONS
for sid, _ in persistent[:excess]:
logger.info("evicting session %s (over cap of %d)", sid[:12], self.MAX_SESSIONS)
await self.close(sid)
async def clear_profile(self, custom_page_id: int, url: str) -> None:
"""Close session for the page if active, then delete profile directory.
Raises RuntimeError if the directory cannot be fully removed.
"""
import shutil
# Close active session and use its profile_key (precise match)
profile_key: Optional[str] = None
try:
session = self.find_by_page_id(custom_page_id)
profile_key = session.profile_key
await self.close(session.id)
except KeyError:
pass
# Fallback: compute from URL (may be wrong if URL changed since session was created)
if not profile_key:
profile_key = self._profile_key(custom_page_id, url)
profile_dir = self._profile_dir(profile_key)
if profile_dir.exists():
shutil.rmtree(profile_dir) # no ignore_errors — let failure surface
if profile_dir.exists():
raise RuntimeError(
f"Failed to fully remove browser profile directory: {profile_dir}"
)
logger.info("cleared browser profile for page %d: %s", custom_page_id, profile_dir)
def _profile_dir(self, profile_key: str) -> Path:
root = Path(get_settings().browser_profiles_dir)
root.mkdir(parents=True, exist_ok=True)
profile = root / profile_key
profile.mkdir(parents=True, exist_ok=True)
return profile
def _profile_key(self, custom_page_id: int, url: str) -> str:
parsed = urlparse(url)
origin = f"{parsed.scheme}-{parsed.netloc}".lower()
safe_origin = re.sub(r"[^a-z0-9_.-]+", "_", origin).strip("_") or "page"
return f"page-{custom_page_id}-{safe_origin[:80]}"
async def create_ephemeral(
self,
url: str,
width: int = 1280,
height: int = 720,
) -> BrowserSession:
"""Create a temporary browser session without a CustomPage record.
The session uses an isolated random-named profile so it never collides
with persistent custom-page profiles. Caller MUST close() when done.
"""
if not url.startswith(("http://", "https://")):
raise ValueError("Only http/https URLs are allowed")
width = max(320, min(width, 2560))
height = max(240, min(height, 1600))
async with self._lock:
await self._ensure_playwright()
session_id = uuid4().hex
profile_key = f"auth-capture-{session_id[:12]}"
context = await self._playwright.chromium.launch_persistent_context(
str(self._profile_dir(profile_key)),
headless=get_settings().browser_headless,
viewport={"width": width, "height": height},
args=["--no-sandbox", "--disable-dev-shm-usage"],
)
# Grant clipboard access for the page origin
try:
parsed = urlparse(url)
origin = f"{parsed.scheme}://{parsed.netloc}"
await context.grant_permissions(["clipboard-read", "clipboard-write"], origin=origin)
except Exception:
logger.debug("clipboard permission grant failed (non-fatal)")
page = context.pages[0] if context.pages else await context.new_page()
session = BrowserSession(
id=session_id,
custom_page_id=0,
profile_key=profile_key,
context=context,
page=page,
lock=asyncio.Lock(),
captured_headers=[],
)
self._sessions[session.id] = session
self._touch(session.id)
# Start CDP network capture BEFORE the initial page load,
# so we capture login redirects and auth headers from the start.
await self._start_cdp_capture(session)
try:
await page.goto(url, wait_until="domcontentloaded", timeout=45000)
except Exception:
await self.close(session.id)
raise
return session
async def _start_cdp_capture(self, session: BrowserSession) -> None:
"""Enable CDP Network domain and capture Authorization headers."""
try:
cdp = await session.context.new_cdp_session(session.page)
await cdp.send("Network.enable")
def on_request(params: dict) -> None:
headers = params.get("request", {}).get("headers", {})
auth = headers.get("authorization") or headers.get("Authorization")
api_key = headers.get("x-api-key") or headers.get("X-API-Key")
url = params.get("request", {}).get("url", "")
if auth:
session.captured_headers.append({
"type": "authorization",
"value": auth,
"url": url,
})
if api_key:
session.captured_headers.append({
"type": "api_key",
"value": api_key,
"url": url,
})
cdp.on("Network.requestWillBeSent", on_request)
session.cdp_session = cdp
except Exception as exc:
logger.debug("CDP capture not available: %s", exc)
browser_sessions = BrowserSessionService()