"""Managed Playwright browser sessions for custom pages.""" from __future__ import annotations import asyncio import logging import re import time from dataclasses import dataclass from pathlib import Path from typing import Any, Optional from urllib.parse import urlparse from uuid import uuid4 from app.config import get_settings logger = logging.getLogger(__name__) class BrowserDependencyError(RuntimeError): """Raised when Playwright or its browser runtime is unavailable.""" class BrowserSessionError(RuntimeError): """Raised when an existing browser session can no longer be used.""" @dataclass class BrowserSession: id: str custom_page_id: int profile_key: str context: Any page: Any lock: asyncio.Lock class BrowserSessionService: def __init__(self) -> None: self._playwright: Optional[Any] = None self._sessions: dict[str, BrowserSession] = {} self._profiles: dict[str, str] = {} self._lock = asyncio.Lock() async def create( self, custom_page_id: int, url: str, width: int = 1280, height: int = 720, login_config: Optional[dict[str, Any]] = None, ) -> BrowserSession: if not url.startswith(("http://", "https://")): raise ValueError("Only http/https URLs are allowed") width = max(320, min(width, 2560)) height = max(240, min(height, 1600)) async with self._lock: await self._ensure_playwright() profile_key = self._profile_key(custom_page_id, url) existing_id = self._profiles.get(profile_key) existing = self._sessions.get(existing_id or "") if existing and not existing.page.is_closed(): async with existing.lock: await existing.page.set_viewport_size({"width": width, "height": height}) if existing.page.url == "about:blank": await existing.page.goto(url, wait_until="domcontentloaded", timeout=45000) await self._autofill_login(existing.page, login_config) await self._reset_page_zoom(existing) return existing if existing_id: self._profiles.pop(profile_key, None) context = await self._playwright.chromium.launch_persistent_context( str(self._profile_dir(profile_key)), headless=get_settings().browser_headless, viewport={"width": width, "height": height}, args=["--no-sandbox", "--disable-dev-shm-usage"], ) page = context.pages[0] if context.pages else await context.new_page() session = BrowserSession( id=uuid4().hex, custom_page_id=custom_page_id, profile_key=profile_key, context=context, page=page, lock=asyncio.Lock(), ) self._sessions[session.id] = session self._profiles[profile_key] = session.id try: await page.goto(url, wait_until="domcontentloaded", timeout=45000) await self._autofill_login(page, login_config) await self._reset_page_zoom(session) except Exception: await self.close(session.id) raise return session async def screenshot(self, session_id: str) -> bytes: session = self._get(session_id) async with session.lock: self._ensure_open(session) return await session.page.screenshot(type="jpeg", quality=65, full_page=False) async def event( self, session_id: str, event_type: str, payload: dict[str, Any], *, include_state: bool = True, ) -> dict[str, Any] | None: session = self._get(session_id) async with session.lock: self._ensure_open(session) page = session.page if event_type == "click": await page.mouse.click(float(payload["x"]), float(payload["y"]), button=payload.get("button", "left")) elif event_type == "dblclick": await page.mouse.dblclick(float(payload["x"]), float(payload["y"]), button=payload.get("button", "left")) elif event_type == "mousemove": await page.mouse.move(float(payload["x"]), float(payload["y"])) elif event_type == "mousedown": await page.mouse.move(float(payload["x"]), float(payload["y"])) await page.mouse.down(button=payload.get("button", "left")) elif event_type == "mouseup": await page.mouse.move(float(payload["x"]), float(payload["y"])) await page.mouse.up(button=payload.get("button", "left")) elif event_type == "type": text = str(payload.get("text", "")) if text: await page.keyboard.type(text) elif event_type == "key": key = str(payload.get("key", "")) if key: await page.keyboard.press(key) elif event_type == "scroll": if payload.get("x") is not None and payload.get("y") is not None: await page.mouse.move(float(payload["x"]), float(payload["y"])) await page.mouse.wheel(float(payload.get("delta_x", 0)), float(payload.get("delta_y", 0))) elif event_type == "reload": await page.reload(wait_until="domcontentloaded", timeout=45000) elif event_type == "back": await page.go_back(wait_until="domcontentloaded", timeout=45000) elif event_type == "forward": await page.go_forward(wait_until="domcontentloaded", timeout=45000) elif event_type == "resize": width = max(320, min(int(payload.get("width", 1280)), 2560)) height = max(240, min(int(payload.get("height", 720)), 1600)) await page.set_viewport_size({"width": width, "height": height}) else: raise ValueError("Unsupported browser event") if not include_state: return None return await self._session_state(session) async def selected_text(self, session_id: str) -> str: session = self._get(session_id) async with session.lock: self._ensure_open(session) value = await session.page.evaluate("() => window.getSelection()?.toString() || ''") return str(value or "") async def close(self, session_id: str) -> None: session = self._discard_session(session_id) if not session: return try: await session.context.close() except Exception: pass async def shutdown(self) -> None: sessions = list(self._sessions) for session_id in sessions: await self.close(session_id) if self._playwright: await self._playwright.stop() self._playwright = None async def state(self, session_id: str) -> dict[str, Any]: session = self._get(session_id) async with session.lock: self._ensure_open(session) return await self._session_state(session) async def _session_state(self, session: BrowserSession) -> dict[str, Any]: return { "id": session.id, "custom_page_id": session.custom_page_id, "url": session.page.url, "title": await session.page.title(), } async def _ensure_playwright(self) -> None: if self._playwright: return try: from playwright.async_api import async_playwright except ImportError as exc: raise BrowserDependencyError("Playwright is not installed. Run `pip install -r requirements.txt`.") from exc try: self._playwright = await async_playwright().start() except Exception as exc: raise BrowserDependencyError(f"Unable to start Playwright: {exc}") from exc async def _reset_page_zoom(self, session: BrowserSession) -> None: try: cdp = await session.context.new_cdp_session(session.page) try: await cdp.send("Emulation.setPageScaleFactor", {"pageScaleFactor": 1}) finally: await cdp.detach() except Exception: pass async def _autofill_login( self, page: Any, config: Optional[dict[str, Any]], *, max_wait_seconds: float = 8.0, poll_interval_seconds: float = 0.25, ) -> None: if not config or not config.get("enabled"): return username = str(config.get("username") or "") password = str(config.get("password") or "") if not username or not password: return try: username_selectors = [ config.get("username_selector"), "input[type='email']", "input[name*='user' i]", "input[id*='user' i]", "input[name*='email' i]", "input[id*='email' i]", "input[name*='login' i]", "input[id*='login' i]", "input[autocomplete='username']", "input:not([type]), input[type='text']", ] password_selectors = [ config.get("password_selector"), "input[type='password']", "input[autocomplete='current-password']", ] username_locator, password_locator = await self._wait_for_login_locators( page, username_selectors, password_selectors, max_wait_seconds=max_wait_seconds, poll_interval_seconds=poll_interval_seconds, ) if not username_locator or not password_locator: logger.info("Login autofill skipped for %s: login fields not found", page.url) return await username_locator.fill(username, timeout=3000) await password_locator.fill(password, timeout=3000) submit_selector = str(config.get("submit_selector") or "").strip() if submit_selector: submit = await self._first_visible_locator(page, [submit_selector], timeout=500) if submit: await submit.click(timeout=3000) except Exception as exc: logger.info("Login autofill skipped for %s: %s", page.url, exc) async def _wait_for_login_locators( self, page: Any, username_selectors: list[Optional[str]], password_selectors: list[Optional[str]], *, max_wait_seconds: float, poll_interval_seconds: float, ) -> tuple[Optional[Any], Optional[Any]]: deadline = time.monotonic() + max_wait_seconds while True: username_locator = await self._first_visible_locator(page, username_selectors, timeout=150) password_locator = await self._first_visible_locator(page, password_selectors, timeout=150) if username_locator and password_locator: return username_locator, password_locator if time.monotonic() >= deadline: return None, None await asyncio.sleep(poll_interval_seconds) async def _first_visible_locator( self, page: Any, selectors: list[Optional[str]], *, timeout: float = 1500, ) -> Optional[Any]: for selector in selectors: selector = str(selector or "").strip() if not selector: continue try: locator = page.locator(selector).first if await locator.count() and await locator.is_visible(timeout=timeout): return locator except Exception: continue return None def _get(self, session_id: str) -> BrowserSession: session = self._sessions.get(session_id) if not session: raise KeyError("browser session not found") return session def _ensure_open(self, session: BrowserSession) -> None: if session.page.is_closed(): self._discard_session(session.id) raise BrowserSessionError("browser page is closed") def _discard_session(self, session_id: str) -> BrowserSession | None: session = self._sessions.pop(session_id, None) if session and self._profiles.get(session.profile_key) == session_id: self._profiles.pop(session.profile_key, None) return session def _profile_dir(self, profile_key: str) -> Path: root = Path(get_settings().browser_profiles_dir) root.mkdir(parents=True, exist_ok=True) profile = root / profile_key profile.mkdir(parents=True, exist_ok=True) return profile def _profile_key(self, custom_page_id: int, url: str) -> str: parsed = urlparse(url) origin = f"{parsed.scheme}-{parsed.netloc}".lower() safe_origin = re.sub(r"[^a-z0-9_.-]+", "_", origin).strip("_") or "page" return f"page-{custom_page_id}-{safe_origin[:80]}" browser_sessions = BrowserSessionService()