"""Auth credential extraction from remote browser sessions.""" from __future__ import annotations import json import logging from typing import Any logger = logging.getLogger(__name__) # Keys likely to contain auth tokens in storage TOKEN_KEYS = frozenset({ "token", "access_token", "accessToken", "jwt", "auth_token", "authToken", "refresh_token", "refreshToken", "id_token", "session_token", }) SECRET_KEYS = frozenset({ "secret", "api_key", "apiKey", "apikey", }) SESSION_COOKIE_NAMES = frozenset({ "session", "token", "jwt", "sid", "auth", "connect.sid", "gin_session", "tdc_itoken", "sessionid", "access_token", "refresh_token", }) async def extract_cookies(session: Any) -> list[dict[str, Any]]: """Extract all cookies from the browser context.""" cookies = await session.context.cookies() return [ { "name": c.get("name", ""), "value": c.get("value", ""), "domain": c.get("domain", ""), "httpOnly": c.get("httpOnly", False), "secure": c.get("secure", False), } for c in cookies ] async def extract_local_storage(page: Any) -> dict[str, str]: try: raw = await page.evaluate("() => JSON.stringify(window.localStorage)") if isinstance(raw, str): return json.loads(raw) return raw or {} except Exception as exc: logger.debug("localStorage extraction failed: %s", exc) return {} async def extract_session_storage(page: Any) -> dict[str, str]: try: raw = await page.evaluate("() => JSON.stringify(window.sessionStorage)") if isinstance(raw, str): return json.loads(raw) return raw or {} except Exception as exc: logger.debug("sessionStorage extraction failed: %s", exc) return {} async def extract_request_headers(session: Any) -> list[dict[str, str]]: """Return Authorization / API-Key headers captured continuously by CDP. The CDP Network listener is started when the ephemeral session is created (in BrowserSessionService.create_ephemeral), so headers from the login flow are captured in real-time without needing a fresh CDP attach. """ if hasattr(session, "captured_headers") and session.captured_headers: logger.debug("auth-capture: returning %d cached headers", len(session.captured_headers)) return list(session.captured_headers) return [] async def extract_all(session: Any) -> dict[str, Any]: """Extract all auth credentials from a browser session. Returns: cookies, storage, session_storage, auth_headers, candidates """ page = session.page cookies = await extract_cookies(session) local_storage = await extract_local_storage(page) session_storage = await extract_session_storage(page) auth_headers = await extract_request_headers(session) candidates = _curate_candidates(cookies, local_storage, session_storage, auth_headers) return { "cookies": cookies, "storage": local_storage, "session_storage": session_storage, "auth_headers": auth_headers, "candidates": candidates, } def _curate_candidates( cookies: list[dict[str, Any]], local_storage: dict[str, str], session_storage: dict[str, str], auth_headers: list[dict[str, str]], ) -> list[dict[str, Any]]: """Scan extracted data for likely credentials with confidence scoring.""" candidates: list[dict[str, Any]] = [] # 1. CDP-captured network headers (highest confidence) seen = set() for h in auth_headers: dedup_key = h["value"] if dedup_key in seen: continue seen.add(dedup_key) htype = h.get("type", "authorization") preview = _preview(h["value"]) if htype == "api_key": _add(candidates, "api_key", f"network:{h['url'][:60]}", h["value"], preview, f"X-API-Key — {h['url'][:40]}", 95) else: _add(candidates, "bearer_token", f"network:{h['url'][:60]}", h["value"], preview, f"Authorization — {h['url'][:40]}", 95) # 2. localStorage/sessionStorage items for store_name, store in [("localStorage", local_storage), ("sessionStorage", session_storage)]: for key, val in store.items(): if not isinstance(val, str) or not val: continue key_lower = key.lower() # Explicit auth-named keys if any(k in key_lower for k in TOKEN_KEYS): preview = _preview(val) score = 85 if "token" in key_lower and val.count(".") >= 2 else 75 _add(candidates, "bearer_token", f"{store_name}.{key}", val, preview, f"{store_name}.{key}", score) elif any(k in key_lower for k in SECRET_KEYS): _add(candidates, "credential", f"{store_name}.{key}", val, _preview(val), f"{store_name}.{key}", 70) # Looks like a JWT (xx.yy.zz format) if val.count(".") >= 2 and 20 < len(val) < 5000: if dedup_key := val not in seen: seen.add(val) _add(candidates, "bearer_token", f"{store_name}.{key}", val, _preview(val), f"{store_name}.{key} (JWT)", 80) # sk-xxx API key pattern if val.startswith("sk-") and len(val) > 10: _add(candidates, "bearer_token", f"{store_name}.{key}", val, _preview(val), f"{store_name}.{key} (API Key)", 90) # 3. Session cookies for c in cookies: cname = c["name"].lower() if any(k in cname for k in SESSION_COOKIE_NAMES): preview = _preview(c["value"]) cookie_val = f"{c['name']}={c['value']}" _add(candidates, "cookie", f"cookie:{c['name']}", cookie_val, preview, f"🍪 {c['name']} ({c['domain']})", 75, extra={"cookie_name": c["name"], "cookie_value": c["value"]}) return candidates def _add( candidates: list[dict[str, Any]], ctype: str, source: str, value: str, preview: str, label: str, confidence: int, extra: dict | None = None, ) -> None: """Add a candidate entry. Value is masked in logs.""" logger.debug("auth-capture candidate: type=%s source=%s confidence=%d", ctype, source, confidence) entry: dict[str, Any] = { "type": ctype, "source": source, "value": value, "preview": preview, "label": label, "confidence": confidence, } if extra: entry.update(extra) candidates.append(entry) def _preview(value: str) -> str: """Generate a masked preview of a credential.""" if not value or len(value) <= 8: return "***" if len(value) <= 16: return value[:4] + "…" + value[-4:] return value[:8] + "…" + value[-6:]