"""Auth credential extraction from remote browser sessions.""" from __future__ import annotations import json import logging from typing import Any from urllib.parse import urlparse logger = logging.getLogger(__name__) # Keys likely to contain auth tokens in storage TOKEN_KEYS = frozenset({ "token", "access_token", "accessToken", "jwt", "auth_token", "authToken", "refresh_token", "refreshToken", "id_token", "session_token", }) SECRET_KEYS = frozenset({ "secret", "api_key", "apiKey", "apikey", }) SESSION_COOKIE_NAMES = frozenset({ "session", "token", "jwt", "sid", "auth", "connect.sid", "gin_session", "tdc_itoken", "sessionid", "access_token", "refresh_token", }) async def extract_cookies(session: Any) -> list[dict[str, Any]]: """Extract all cookies from the browser context.""" cookies = await session.context.cookies() return [ { "name": c.get("name", ""), "value": c.get("value", ""), "domain": c.get("domain", ""), "httpOnly": c.get("httpOnly", False), "secure": c.get("secure", False), } for c in cookies ] async def extract_local_storage(page: Any) -> dict[str, str]: try: raw = await page.evaluate("() => JSON.stringify(window.localStorage)") if isinstance(raw, str): return json.loads(raw) return raw or {} except Exception as exc: logger.debug("localStorage extraction failed: %s", exc) return {} async def extract_session_storage(page: Any) -> dict[str, str]: try: raw = await page.evaluate("() => JSON.stringify(window.sessionStorage)") if isinstance(raw, str): return json.loads(raw) return raw or {} except Exception as exc: logger.debug("sessionStorage extraction failed: %s", exc) return {} async def extract_new_api_user_id(page: Any) -> str: try: value = await page.evaluate(""" async () => { const uid = localStorage.getItem('uid') if (uid) return uid const userRaw = localStorage.getItem('user') if (userRaw) { try { const user = JSON.parse(userRaw) if (user?.id) return String(user.id) } catch {} } const response = await fetch('/api/user/self', { credentials: 'include' }) if (!response.ok) return '' const payload = await response.json() const data = payload?.data || payload return data?.id ? String(data.id) : '' } """) return str(value or "").strip() except Exception as exc: logger.debug("New-API user id extraction failed: %s", exc) return "" async def extract_request_headers(session: Any) -> list[dict[str, str]]: """Return Authorization / API-Key headers captured continuously by CDP. The CDP Network listener is started when the ephemeral session is created (in BrowserSessionService.create_ephemeral), so headers from the login flow are captured in real-time without needing a fresh CDP attach. """ if hasattr(session, "captured_headers") and session.captured_headers: logger.debug("auth-capture: returning %d cached headers", len(session.captured_headers)) return list(session.captured_headers) return [] async def extract_all(session: Any) -> dict[str, Any]: """Extract all auth credentials from a browser session. Returns: cookies, storage, session_storage, auth_headers, candidates """ page = session.page cookies = await extract_cookies(session) local_storage = await extract_local_storage(page) session_storage = await extract_session_storage(page) auth_headers = await extract_request_headers(session) new_api_user = _find_new_api_user(local_storage, session_storage) or await extract_new_api_user_id(page) # 获取当前浏览器页面的真实 URL(比 session.url 更准确) page_url = "" try: page_url = page.url or "" except Exception: pass candidates = _curate_candidates( cookies, local_storage, session_storage, auth_headers, new_api_user, page_url=page_url, ) return { "cookies": cookies, "storage": local_storage, "session_storage": session_storage, "auth_headers": auth_headers, "candidates": candidates, } def _cookie_matches_hostname(cookie_domain: str, hostname: str) -> bool: """判断 cookie domain 是否适用于给定 hostname。 支持带点前缀的 domain(如 `.saki.lat` 匹配 `api.saki.lat`)。 注意:hostname 为空时,调用方应跳过 cookie 收集而不是调用此函数。 """ if not cookie_domain: return True # 无 domain 限制的 cookie 对当前域有效 if not hostname: return False # 无法确定当前域,保守拒绝 # 去掉前缀点 domain = cookie_domain.lstrip(".") return hostname == domain or hostname.endswith("." + domain) def _build_cookie_bundle( cookies: list[dict[str, Any]], page_url: str, ) -> tuple[str, list[str]]: """按 page_url 的 hostname 过滤并组装完整 cookie 字符串。 返回 (cookie_string, cookie_names_list)。 cookie_string 格式:name1=value1; name2=value2; ... 过滤掉空值 cookie。若 page_url 为空或无法解析 hostname,返回空结果 (不收集全域 cookie 以防误写入无关域凭证)。 """ if not page_url: logger.debug("_build_cookie_bundle: no page_url, skipping cookie collection") return "", [] hostname = "" try: hostname = urlparse(page_url).hostname or "" except Exception: pass if not hostname: logger.debug("_build_cookie_bundle: cannot parse hostname from %s, skipping", page_url[:80]) return "", [] parts: list[str] = [] names: list[str] = [] for c in cookies: name = c.get("name", "") value = c.get("value", "") domain = c.get("domain", "") if not name or not value: continue if not _cookie_matches_hostname(domain, hostname): continue parts.append(f"{name}={value}") names.append(name) return "; ".join(parts), names def _curate_candidates( cookies: list[dict[str, Any]], local_storage: dict[str, str], session_storage: dict[str, str], auth_headers: list[dict[str, str]], new_api_user: str = "", page_url: str = "", ) -> list[dict[str, Any]]: """Scan extracted data for likely credentials with confidence scoring.""" candidates: list[dict[str, Any]] = [] # 0. 完整 Cookie Bundle(最高优先级) # 按页面 origin 收集所有相关 cookie,包含 cf_clearance 等 Cloudflare cookie cookie_string, cookie_names = _build_cookie_bundle(cookies, page_url) if cookie_string: bundle_extra: dict[str, Any] = { "cookie_count": len(cookie_names), "cookie_names": cookie_names, } if new_api_user: bundle_extra["new_api_user"] = new_api_user _add( candidates, "cookie_bundle", f"bundle:{page_url[:60]}", cookie_string, f"[{len(cookie_names)} cookies: {', '.join(cookie_names[:5])}{'…' if len(cookie_names) > 5 else ''}]", f"完整 Cookie 组({len(cookie_names)} 个)", 98, extra=bundle_extra, ) # 1. CDP-captured network headers (high confidence) seen = set() for h in auth_headers: dedup_key = h["value"] if dedup_key in seen: continue seen.add(dedup_key) htype = h.get("type", "authorization") preview = _preview(h["value"]) if htype == "api_key": _add(candidates, "api_key", f"network:{h['url'][:60]}", h["value"], preview, f"X-API-Key — {h['url'][:40]}", 95) else: _add(candidates, "bearer_token", f"network:{h['url'][:60]}", h["value"], preview, f"Authorization — {h['url'][:40]}", 95) # 2. localStorage/sessionStorage items for store_name, store in [("localStorage", local_storage), ("sessionStorage", session_storage)]: for key, val in store.items(): if not isinstance(val, str) or not val: continue key_lower = key.lower() # Explicit auth-named keys if any(k in key_lower for k in TOKEN_KEYS): preview = _preview(val) score = 85 if "token" in key_lower and val.count(".") >= 2 else 75 _add(candidates, "bearer_token", f"{store_name}.{key}", val, preview, f"{store_name}.{key}", score) elif any(k in key_lower for k in SECRET_KEYS): _add(candidates, "credential", f"{store_name}.{key}", val, _preview(val), f"{store_name}.{key}", 70) # Looks like a JWT (xx.yy.zz format) if val.count(".") >= 2 and 20 < len(val) < 5000: if val not in seen: seen.add(val) _add(candidates, "bearer_token", f"{store_name}.{key}", val, _preview(val), f"{store_name}.{key} (JWT)", 80) # sk-xxx API key pattern if val.startswith("sk-") and len(val) > 10: _add(candidates, "bearer_token", f"{store_name}.{key}", val, _preview(val), f"{store_name}.{key} (API Key)", 90) if not new_api_user: new_api_user = _find_new_api_user(local_storage, session_storage) # 3. 单个 Session cookie(保留,供独立 fallback / bearer 降级使用) for c in cookies: cname = c["name"].lower() if any(k in cname for k in SESSION_COOKIE_NAMES): preview = _preview(c["value"]) cookie_val = f"{c['name']}={c['value']}" confidence = 99 if cname == "session" else 85 extra = {"cookie_name": c["name"], "cookie_value": c["value"]} if cname == "session" and new_api_user: extra["new_api_user"] = new_api_user _add(candidates, "cookie", f"cookie:{c['name']}", cookie_val, preview, f"Cookie {c['name']} ({c['domain']})", confidence, extra=extra) # 排序:cookie_bundle 最高 → cookie → bearer/api_key → 其他 candidates.sort(key=lambda item: ( 0 if item.get("type") == "cookie_bundle" else 1 if item.get("type") == "cookie" else 2, -int(item.get("confidence") or 0), )) return candidates def _find_storage_value(*stores: dict[str, str], key: str) -> str: for store in stores: value = store.get(key) if isinstance(value, str) and value.strip(): return value.strip() return "" def _find_new_api_user(*stores: dict[str, str]) -> str: uid = _find_storage_value(*stores, key="uid") if uid: return uid user_raw = _find_storage_value(*stores, key="user") if not user_raw: return "" try: user = json.loads(user_raw) except Exception: return "" if isinstance(user, dict): for key in ("id", "user_id", "userId"): value = user.get(key) if value is not None: return str(value).strip() return "" def _add( candidates: list[dict[str, Any]], ctype: str, source: str, value: str, preview: str, label: str, confidence: int, extra: dict | None = None, ) -> None: """Add a candidate entry. Value is masked in logs.""" logger.debug("auth-capture candidate: type=%s source=%s confidence=%d", ctype, source, confidence) entry: dict[str, Any] = { "type": ctype, "source": source, "value": value, "preview": preview, "label": label, "confidence": confidence, } if extra: entry.update(extra) candidates.append(entry) def _preview(value: str) -> str: """Generate a masked preview of a credential.""" if not value or len(value) <= 8: return "***" if len(value) <= 16: return value[:4] + "…" + value[-4:] return value[:8] + "…" + value[-6:]