"""Auth credential extraction from remote browser sessions.""" from __future__ import annotations import json import logging from typing import Any logger = logging.getLogger(__name__) async def extract_cookies(session: Any) -> list[dict[str, Any]]: """Extract cookies from the browser context.""" cookies = await session.context.cookies() return [ { "name": c.get("name", ""), "value": c.get("value", ""), "domain": c.get("domain", ""), "httpOnly": c.get("httpOnly", False), "secure": c.get("secure", False), } for c in cookies ] async def extract_local_storage(page: Any) -> dict[str, str]: """Extract all localStorage items from the page origin.""" try: raw = await page.evaluate("() => JSON.stringify(window.localStorage)") if isinstance(raw, str): return json.loads(raw) return raw or {} except Exception as exc: logger.debug("localStorage extraction failed: %s", exc) return {} async def extract_session_storage(page: Any) -> dict[str, str]: """Extract all sessionStorage items from the page origin.""" try: raw = await page.evaluate("() => JSON.stringify(window.sessionStorage)") if isinstance(raw, str): return json.loads(raw) return raw or {} except Exception as exc: logger.debug("sessionStorage extraction failed: %s", exc) return {} async def extract_all(session: Any) -> dict[str, Any]: """Extract all possible auth credentials from a browser session. Returns: - cookies: list of cookie dicts - storage: dict of localStorage key-values - session_storage: dict of sessionStorage key-values - candidates: curated list of likely auth tokens/credentials """ page = session.page cookies = await extract_cookies(session) local_storage = await extract_local_storage(page) session_storage = await extract_session_storage(page) candidates = _curate_candidates(cookies, local_storage, session_storage) return { "cookies": cookies, "storage": local_storage, "session_storage": session_storage, "candidates": candidates, } def _curate_candidates( cookies: list[dict[str, Any]], local_storage: dict[str, str], session_storage: dict[str, str], ) -> list[dict[str, Any]]: """Scan extracted data for likely bearer tokens and session cookies.""" candidates: list[dict[str, Any]] = [] # 1. localStorage / sessionStorage items that look like tokens for store_name, store in [("localStorage", local_storage), ("sessionStorage", session_storage)]: for key, val in store.items(): if not isinstance(val, str) or not val: continue key_lower = key.lower() # Explicit auth keys if any(k in key_lower for k in ("token", "jwt", "auth", "access", "secret", "api_key")): _add_candidate(candidates, "bearer_token", f"{store_name}.{key}", val, f"{store_name}.{key}") # JWT-shaped strings (not in an auth-named key) elif val.count(".") >= 2 and 20 < len(val) < 5000: _add_candidate(candidates, "bearer_token", f"{store_name}.{key}", val, f"{store_name}.{key} (JWT)") # 2. Cookies that look like session/token cookies cookie_keywords = ("session", "token", "jwt", "sid", "auth", "connect.sid", "gin_session", "tdc_itoken") for c in cookies: cname = c["name"].lower() if any(k in cname for k in cookie_keywords): _add_candidate(candidates, "cookie", f"cookie:{c['name']}", f"{c['name']}={c['value']}", f"🍪 {c['name']} ({c['domain']})", extra={"cookie_name": c["name"], "cookie_value": c["value"]}) # 3. Any localStorage key whose value looks like a sk-xxx key for store_name, store in [("localStorage", local_storage), ("sessionStorage", session_storage)]: for key, val in store.items(): if isinstance(val, str) and val.startswith("sk-") and len(val) > 10: _add_candidate(candidates, "bearer_token", f"{store_name}.{key}", val, f"{store_name}.{key} (sk-key)") # Deduplicate by value seen = set() deduped = [] for c in candidates: if c["value"] not in seen: seen.add(c["value"]) deduped.append(c) return deduped def _add_candidate( candidates: list[dict[str, Any]], ctype: str, source: str, value: str, label: str, extra: dict | None = None, ) -> None: """Add a candidate, masking sensitive values in logs.""" logger.debug("auth-capture candidate: type=%s source=%s label=%s", ctype, source, label) entry: dict[str, Any] = { "type": ctype, "source": source, "value": value, "label": label, } if extra: entry.update(extra) candidates.append(entry)