"""Auth credential extraction from remote browser sessions.""" from __future__ import annotations import json import logging from typing import Any logger = logging.getLogger(__name__) # Keys likely to contain auth tokens in storage TOKEN_KEYS = frozenset({ "token", "access_token", "accessToken", "jwt", "auth_token", "authToken", "refresh_token", "refreshToken", "id_token", "session_token", }) SECRET_KEYS = frozenset({ "secret", "api_key", "apiKey", "apikey", }) SESSION_COOKIE_NAMES = frozenset({ "session", "token", "jwt", "sid", "auth", "connect.sid", "gin_session", "tdc_itoken", "sessionid", "access_token", "refresh_token", }) async def extract_cookies(session: Any) -> list[dict[str, Any]]: """Extract all cookies from the browser context.""" cookies = await session.context.cookies() return [ { "name": c.get("name", ""), "value": c.get("value", ""), "domain": c.get("domain", ""), "httpOnly": c.get("httpOnly", False), "secure": c.get("secure", False), } for c in cookies ] async def extract_local_storage(page: Any) -> dict[str, str]: try: raw = await page.evaluate("() => JSON.stringify(window.localStorage)") if isinstance(raw, str): return json.loads(raw) return raw or {} except Exception as exc: logger.debug("localStorage extraction failed: %s", exc) return {} async def extract_session_storage(page: Any) -> dict[str, str]: try: raw = await page.evaluate("() => JSON.stringify(window.sessionStorage)") if isinstance(raw, str): return json.loads(raw) return raw or {} except Exception as exc: logger.debug("sessionStorage extraction failed: %s", exc) return {} async def extract_request_headers(page: Any) -> list[dict[str, str]]: """Capture Authorization headers from network requests via CDP. Uses Chrome DevTools Protocol to subscribe to Network.requestWillBeSent events and extract Authorization / X-API-Key headers from captured requests. Only catches requests made *after* CDP is enabled. """ captured: list[dict[str, str]] = [] cdp = None try: cdp = await page.context.new_cdp_session(page) await cdp.send("Network.enable") def on_request(params: dict) -> None: headers = params.get("request", {}).get("headers", {}) auth = (headers.get("authorization") or headers.get("Authorization")) api_key = (headers.get("x-api-key") or headers.get("X-API-Key")) if auth: captured.append({ "type": "authorization", "value": auth, "url": params.get("request", {}).get("url", ""), }) logger.debug("auth-capture CDP: captured Authorization header") if api_key: captured.append({ "type": "api_key", "value": api_key, "url": params.get("request", {}).get("url", ""), }) logger.debug("auth-capture CDP: captured X-API-Key header") cdp.on("Network.requestWillBeSent", on_request) # Give a moment for any in-flight requests to be captured import asyncio await asyncio.sleep(0.5) except Exception as exc: logger.debug("CDP network capture not available: %s", exc) finally: if cdp: try: await cdp.detach() except Exception: pass return captured async def extract_all(session: Any) -> dict[str, Any]: """Extract all auth credentials from a browser session. Returns: cookies, storage, session_storage, auth_headers, candidates """ page = session.page cookies = await extract_cookies(session) local_storage = await extract_local_storage(page) session_storage = await extract_session_storage(page) auth_headers = await extract_request_headers(page) candidates = _curate_candidates(cookies, local_storage, session_storage, auth_headers) return { "cookies": cookies, "storage": local_storage, "session_storage": session_storage, "auth_headers": auth_headers, "candidates": candidates, } def _curate_candidates( cookies: list[dict[str, Any]], local_storage: dict[str, str], session_storage: dict[str, str], auth_headers: list[dict[str, str]], ) -> list[dict[str, Any]]: """Scan extracted data for likely credentials with confidence scoring.""" candidates: list[dict[str, Any]] = [] # 1. CDP-captured Authorization headers (highest confidence) seen = set() for h in auth_headers: dedup_key = h["value"] if dedup_key in seen: continue seen.add(dedup_key) preview = _preview(h["value"]) candidates.append({ "type": "bearer_token", "source": f"network:{h['url'][:60]}", "value": h["value"], "preview": preview, "label": f"Authorization — {h['url'][:40]}", "confidence": 95, }) # 2. localStorage/sessionStorage items for store_name, store in [("localStorage", local_storage), ("sessionStorage", session_storage)]: for key, val in store.items(): if not isinstance(val, str) or not val: continue key_lower = key.lower() # Explicit auth-named keys if any(k in key_lower for k in TOKEN_KEYS): preview = _preview(val) score = 85 if "token" in key_lower and val.count(".") >= 2 else 75 _add(candidates, "bearer_token", f"{store_name}.{key}", val, preview, f"{store_name}.{key}", score) elif any(k in key_lower for k in SECRET_KEYS): _add(candidates, "credential", f"{store_name}.{key}", val, _preview(val), f"{store_name}.{key}", 70) # Looks like a JWT (xx.yy.zz format) if val.count(".") >= 2 and 20 < len(val) < 5000: if dedup_key := val not in seen: seen.add(val) _add(candidates, "bearer_token", f"{store_name}.{key}", val, _preview(val), f"{store_name}.{key} (JWT)", 80) # sk-xxx API key pattern if val.startswith("sk-") and len(val) > 10: _add(candidates, "bearer_token", f"{store_name}.{key}", val, _preview(val), f"{store_name}.{key} (API Key)", 90) # 3. Session cookies for c in cookies: cname = c["name"].lower() if any(k in cname for k in SESSION_COOKIE_NAMES): preview = _preview(c["value"]) cookie_val = f"{c['name']}={c['value']}" _add(candidates, "cookie", f"cookie:{c['name']}", cookie_val, preview, f"🍪 {c['name']} ({c['domain']})", 75, extra={"cookie_name": c["name"], "cookie_value": c["value"]}) return candidates def _add( candidates: list[dict[str, Any]], ctype: str, source: str, value: str, preview: str, label: str, confidence: int, extra: dict | None = None, ) -> None: """Add a candidate entry. Value is masked in logs.""" logger.debug("auth-capture candidate: type=%s source=%s confidence=%d", ctype, source, confidence) entry: dict[str, Any] = { "type": ctype, "source": source, "value": value, "preview": preview, "label": label, "confidence": confidence, } if extra: entry.update(extra) candidates.append(entry) def _preview(value: str) -> str: """Generate a masked preview of a credential.""" if not value or len(value) <= 8: return "***" if len(value) <= 16: return value[:4] + "…" + value[-4:] return value[:8] + "…" + value[-6:]