diff --git a/backend/app/routers/custom_pages.py b/backend/app/routers/custom_pages.py index 1c83791..1980452 100644 --- a/backend/app/routers/custom_pages.py +++ b/backend/app/routers/custom_pages.py @@ -1,6 +1,7 @@ """Custom pages CRUD router + authenticated iframe proxy.""" from __future__ import annotations +import logging import re from datetime import datetime, timezone from typing import Any, List, Literal, Optional @@ -21,6 +22,8 @@ from app.services.auth_capture_service import extract_all from app.services.browser_session_service import browser_sessions from app.utils.auth import decode_token, get_current_user, get_user_from_token_param +logger = logging.getLogger(__name__) + router = APIRouter(prefix="/api/custom-pages", tags=["custom-pages"]) # Headers that prevent iframe embedding — strip them from proxied responses @@ -215,17 +218,29 @@ import json as _json class RefreshAuthResponse(BaseModel): success: bool message: str + warning: Optional[str] = None def _pick_best_candidate(candidates: list[dict], preferred_auth_type: str) -> Optional[dict]: if not candidates: return None - type_map = {"cookie": "cookie", "bearer": "bearer_token", "api_key": "api_key"} - preferred = type_map.get(preferred_auth_type) - if preferred: + # cookie_bundle > cookie > bearer_token > api_key + # preferred_auth_type="cookie" 时优先匹配 bundle,其次单 cookie + if preferred_auth_type == "cookie": for c in candidates: - if c["type"] == preferred: + if c["type"] == "cookie_bundle": return c + for c in candidates: + if c["type"] == "cookie": + return c + elif preferred_auth_type in ("bearer", "api_key"): + type_map = {"bearer": "bearer_token", "api_key": "api_key"} + preferred = type_map.get(preferred_auth_type) + if preferred: + for c in candidates: + if c["type"] == preferred: + return c + # fallback:排序后取第一个 return candidates[0] @@ -260,12 +275,10 @@ async def refresh_auth(pid: int, db: Session = Depends(get_db), _=Depends(get_cu existing_config = _json.loads(upstream.auth_config_json or "{}") ctype = candidate["type"] - if ctype == "cookie": + if ctype in ("cookie_bundle", "cookie"): upstream.auth_type = "cookie" - if candidate.get("cookie_name") and candidate.get("cookie_value"): - existing_config["cookie_string"] = f"{candidate['cookie_name']}={candidate['cookie_value']}" - else: - existing_config["cookie_string"] = candidate.get("value", "") + # cookie_bundle.value 已是完整 cookie_string;cookie.value 是 "name=value" 格式 + existing_config["cookie_string"] = candidate.get("value", "") if candidate.get("new_api_user"): existing_config["new_api_user"] = candidate["new_api_user"] elif ctype == "bearer_token": @@ -281,7 +294,7 @@ async def refresh_auth(pid: int, db: Session = Depends(get_db), _=Depends(get_cu except UnicodeEncodeError: return RefreshAuthResponse( success=False, - message=f"提取到的 Token 含有非 HTTP 标头字符,请确认已在远程浏览器中正确登录并重试", + message="提取到的 Token 含有非 HTTP 标头字符,请确认已在远程浏览器中正确登录并重试", ) existing_config["token"] = token elif ctype == "api_key": @@ -291,9 +304,40 @@ async def refresh_auth(pid: int, db: Session = Depends(get_db), _=Depends(get_cu upstream.auth_config_json = _json.dumps(existing_config, ensure_ascii=False) upstream.updated_at = datetime.now(timezone.utc) - db.commit() - return RefreshAuthResponse(success=True, message=f"凭证已刷新 ({upstream.auth_type})") + # ── 宽松验证:写回后尝试调用 get_available_groups 验证凭证可用性 ── + # 失败时仍然 commit(凭证已写入),但在 message 里说明验证失败 + # 这样用户仍能看到新凭证已写入,便于 debug(cf_clearance 绑 IP 时验证必然失败) + warning_msg: Optional[str] = None + try: + from app.services.upstream_client import UpstreamClient + groups_endpoint = upstream.groups_endpoint or "/groups/available" + new_auth_config = _json.loads(upstream.auth_config_json) + with UpstreamClient( + base_url=upstream.base_url, + api_prefix=upstream.api_prefix or "", + auth_type=upstream.auth_type, + auth_config=new_auth_config, + timeout=float(upstream.timeout_seconds or 30), + ) as uc: + uc.get_available_groups(groups_endpoint) + logger.info("refresh_auth: upstream %s credential verification passed", upstream.id) + except Exception as exc: + warning_msg = ( + f"凭证已写入但 API 验证失败:{exc}。" + "若 SmartUp 与远程浏览器不在同一 IP,cf_clearance 可能无法复用,请手动测试连接。" + ) + logger.warning( + "refresh_auth: upstream %s credential verification failed (written anyway): %s", + upstream.id, exc, + ) + + db.commit() + auth_type_label = upstream.auth_type + cookie_count = candidate.get("cookie_count", "") + count_str = f"({cookie_count} 个 cookie)" if cookie_count else "" + success_msg = f"凭证已刷新 ({auth_type_label}{count_str})" + return RefreshAuthResponse(success=True, message=success_msg, warning=warning_msg) # ---- Frame Proxy (simple: strip X-Frame-Options / CSP, pass through content) ---- diff --git a/backend/app/services/auth_capture_service.py b/backend/app/services/auth_capture_service.py index 74dd227..12be93d 100644 --- a/backend/app/services/auth_capture_service.py +++ b/backend/app/services/auth_capture_service.py @@ -4,6 +4,7 @@ from __future__ import annotations import json import logging from typing import Any +from urllib.parse import urlparse logger = logging.getLogger(__name__) @@ -110,7 +111,18 @@ async def extract_all(session: Any) -> dict[str, Any]: session_storage = await extract_session_storage(page) auth_headers = await extract_request_headers(session) new_api_user = _find_new_api_user(local_storage, session_storage) or await extract_new_api_user_id(page) - candidates = _curate_candidates(cookies, local_storage, session_storage, auth_headers, new_api_user) + + # 获取当前浏览器页面的真实 URL(比 session.url 更准确) + page_url = "" + try: + page_url = page.url or "" + except Exception: + pass + + candidates = _curate_candidates( + cookies, local_storage, session_storage, auth_headers, new_api_user, + page_url=page_url, + ) return { "cookies": cookies, @@ -121,17 +133,83 @@ async def extract_all(session: Any) -> dict[str, Any]: } +def _cookie_matches_hostname(cookie_domain: str, hostname: str) -> bool: + """判断 cookie domain 是否适用于给定 hostname。 + + 支持带点前缀的 domain(如 `.saki.lat` 匹配 `api.saki.lat`)。 + """ + if not cookie_domain or not hostname: + return True # 无 domain 限制时视为全域 + # 去掉前缀点 + domain = cookie_domain.lstrip(".") + return hostname == domain or hostname.endswith("." + domain) + + +def _build_cookie_bundle( + cookies: list[dict[str, Any]], + page_url: str, +) -> tuple[str, list[str]]: + """按 page_url 的 hostname 过滤并组装完整 cookie 字符串。 + + 返回 (cookie_string, cookie_names_list)。 + cookie_string 格式:name1=value1; name2=value2; ... + 过滤掉空值 cookie。 + """ + hostname = "" + if page_url: + try: + hostname = urlparse(page_url).hostname or "" + except Exception: + pass + + parts: list[str] = [] + names: list[str] = [] + for c in cookies: + name = c.get("name", "") + value = c.get("value", "") + domain = c.get("domain", "") + if not name or not value: + continue + if hostname and not _cookie_matches_hostname(domain, hostname): + continue + parts.append(f"{name}={value}") + names.append(name) + + return "; ".join(parts), names + + def _curate_candidates( cookies: list[dict[str, Any]], local_storage: dict[str, str], session_storage: dict[str, str], auth_headers: list[dict[str, str]], new_api_user: str = "", + page_url: str = "", ) -> list[dict[str, Any]]: """Scan extracted data for likely credentials with confidence scoring.""" candidates: list[dict[str, Any]] = [] - # 1. CDP-captured network headers (highest confidence) + # 0. 完整 Cookie Bundle(最高优先级) + # 按页面 origin 收集所有相关 cookie,包含 cf_clearance 等 Cloudflare cookie + cookie_string, cookie_names = _build_cookie_bundle(cookies, page_url) + if cookie_string: + bundle_extra: dict[str, Any] = { + "cookie_count": len(cookie_names), + "cookie_names": cookie_names, + } + if new_api_user: + bundle_extra["new_api_user"] = new_api_user + _add( + candidates, "cookie_bundle", + f"bundle:{page_url[:60]}", + cookie_string, + f"[{len(cookie_names)} cookies: {', '.join(cookie_names[:5])}{'…' if len(cookie_names) > 5 else ''}]", + f"完整 Cookie 组({len(cookie_names)} 个)", + 98, + extra=bundle_extra, + ) + + # 1. CDP-captured network headers (high confidence) seen = set() for h in auth_headers: dedup_key = h["value"] @@ -166,7 +244,7 @@ def _curate_candidates( # Looks like a JWT (xx.yy.zz format) if val.count(".") >= 2 and 20 < len(val) < 5000: - if dedup_key := val not in seen: + if val not in seen: seen.add(val) _add(candidates, "bearer_token", f"{store_name}.{key}", val, _preview(val), f"{store_name}.{key} (JWT)", 80) @@ -179,7 +257,7 @@ def _curate_candidates( if not new_api_user: new_api_user = _find_new_api_user(local_storage, session_storage) - # 3. Session cookies + # 3. 单个 Session cookie(保留,供独立 fallback / bearer 降级使用) for c in cookies: cname = c["name"].lower() if any(k in cname for k in SESSION_COOKIE_NAMES): @@ -193,8 +271,9 @@ def _curate_candidates( f"Cookie {c['name']} ({c['domain']})", confidence, extra=extra) + # 排序:cookie_bundle 最高 → cookie → bearer/api_key → 其他 candidates.sort(key=lambda item: ( - 0 if item.get("type") == "cookie" and item.get("cookie_name") == "session" else + 0 if item.get("type") == "cookie_bundle" else 1 if item.get("type") == "cookie" else 2, -int(item.get("confidence") or 0), diff --git a/backend/test_auth_capture.py b/backend/test_auth_capture.py new file mode 100644 index 0000000..699c7f6 --- /dev/null +++ b/backend/test_auth_capture.py @@ -0,0 +1,212 @@ +""" +auth_capture_service 单元测试 + +覆盖场景: +- 完整 cookie bundle(含 cf_clearance、session) +- domain 后缀匹配(.saki.lat → api.saki.lat) +- 仅 1 个 cookie 也生成 bundle +- 无 cookies 时 bearer_token 排第一 +- bundle 优先级高于单 session cookie +- 现有 bearer / api_key 提取路径不受影响 +""" +from __future__ import annotations + +import pytest + +from app.services.auth_capture_service import ( + _build_cookie_bundle, + _cookie_matches_hostname, + _curate_candidates, +) + + +# ── _cookie_matches_hostname ────────────────────────────────────────────────── + +def test_exact_hostname_match(): + assert _cookie_matches_hostname("saki.lat", "saki.lat") + + +def test_dot_prefix_subdomain_match(): + """`.saki.lat` 应该匹配 `api.saki.lat`。""" + assert _cookie_matches_hostname(".saki.lat", "api.saki.lat") + + +def test_dot_prefix_exact_match(): + """`saki.lat` 仍应匹配 `saki.lat`(无前缀点)。""" + assert _cookie_matches_hostname(".saki.lat", "saki.lat") + + +def test_no_domain_matches_all(): + """空 domain 视为不限制。""" + assert _cookie_matches_hostname("", "anything.example.com") + + +def test_different_domain_no_match(): + assert not _cookie_matches_hostname(".example.com", "saki.lat") + + +def test_partial_suffix_no_match(): + """evil-saki.lat 不应匹配 .saki.lat。""" + assert not _cookie_matches_hostname(".saki.lat", "evil-saki.lat") + + +# ── _build_cookie_bundle ────────────────────────────────────────────────────── + +def _make_cookies(*items): + """构造 cookie 列表,items 是 (name, value, domain) 元组。""" + return [ + {"name": n, "value": v, "domain": d, "httpOnly": False, "secure": True} + for n, v, d in items + ] + + +def test_bundle_includes_cf_clearance(): + cookies = _make_cookies( + ("cf_clearance", "abc123", ".saki.lat"), + ("session", "sess456", ".saki.lat"), + ("csrfToken", "csrf789", ".saki.lat"), + ) + cookie_str, names = _build_cookie_bundle(cookies, "https://api.saki.lat/login") + assert "cf_clearance=abc123" in cookie_str + assert "session=sess456" in cookie_str + assert "csrfToken=csrf789" in cookie_str + assert "cf_clearance" in names + assert "session" in names + + +def test_bundle_excludes_unrelated_domain(): + cookies = _make_cookies( + ("session", "mine", ".saki.lat"), + ("other_session", "theirs", ".example.com"), + ) + cookie_str, names = _build_cookie_bundle(cookies, "https://api.saki.lat/login") + assert "mine" in cookie_str + assert "theirs" not in cookie_str + assert "other_session" not in names + + +def test_single_cookie_generates_bundle(): + cookies = _make_cookies(("session", "onlyme", ".saki.lat")) + cookie_str, names = _build_cookie_bundle(cookies, "https://saki.lat/login") + assert cookie_str == "session=onlyme" + assert names == ["session"] + + +def test_empty_value_cookie_excluded(): + cookies = _make_cookies( + ("session", "valid", ".saki.lat"), + ("empty_cookie", "", ".saki.lat"), + ) + cookie_str, names = _build_cookie_bundle(cookies, "https://saki.lat/") + assert "empty_cookie" not in names + assert "empty_cookie" not in cookie_str + + +def test_no_cookies_returns_empty_bundle(): + cookie_str, names = _build_cookie_bundle([], "https://saki.lat/") + assert cookie_str == "" + assert names == [] + + +# ── _curate_candidates ──────────────────────────────────────────────────────── + +def _base_candidates(cookies, page_url="", auth_headers=None): + return _curate_candidates( + cookies=cookies, + local_storage={}, + session_storage={}, + auth_headers=auth_headers or [], + new_api_user="", + page_url=page_url, + ) + + +def test_cookie_bundle_ranks_first(): + """有 cookies 时,cookie_bundle 候选应排在第一。""" + cookies = _make_cookies( + ("cf_clearance", "cf_val", ".saki.lat"), + ("session", "sess_val", ".saki.lat"), + ) + candidates = _base_candidates(cookies, page_url="https://api.saki.lat/login") + assert len(candidates) > 0 + assert candidates[0]["type"] == "cookie_bundle" + + +def test_cookie_bundle_value_is_full_string(): + """cookie_bundle 的 value 应是完整 cookie 字符串。""" + cookies = _make_cookies( + ("cf_clearance", "cf_val", ".saki.lat"), + ("session", "sess_val", ".saki.lat"), + ) + candidates = _base_candidates(cookies, page_url="https://api.saki.lat/") + bundle = next(c for c in candidates if c["type"] == "cookie_bundle") + assert "cf_clearance=cf_val" in bundle["value"] + assert "session=sess_val" in bundle["value"] + + +def test_bundle_prioritized_over_single_session_cookie(): + """cookie_bundle 候选应排在单 session cookie 候选之前。""" + cookies = _make_cookies( + ("session", "sess_val", ".saki.lat"), + ("cf_clearance", "cf_val", ".saki.lat"), + ) + candidates = _base_candidates(cookies, page_url="https://api.saki.lat/") + types = [c["type"] for c in candidates] + bundle_idx = types.index("cookie_bundle") + cookie_idx = types.index("cookie") if "cookie" in types else len(types) + assert bundle_idx < cookie_idx + + +def test_bearer_token_wins_when_no_cookies(): + """无 cookies 时,bearer_token 应排在第一(如果有 auth_headers)。""" + auth_headers = [ + {"type": "authorization", "value": "eyJ.abc.def", "url": "https://api.saki.lat/api/v1/groups"} + ] + candidates = _base_candidates([], page_url="https://api.saki.lat/", auth_headers=auth_headers) + assert len(candidates) > 0 + assert candidates[0]["type"] == "bearer_token" + + +def test_no_cookies_no_headers_returns_empty(): + """无 cookies、无 storage、无 headers → 候选列表为空。""" + candidates = _base_candidates([], page_url="https://api.saki.lat/") + assert candidates == [] + + +def test_bundle_extra_contains_cookie_count_and_names(): + """cookie_bundle 候选的 extra 应包含 cookie_count 和 cookie_names。""" + cookies = _make_cookies( + ("cf_clearance", "cf_val", ".saki.lat"), + ("session", "sess_val", ".saki.lat"), + ) + candidates = _base_candidates(cookies, page_url="https://api.saki.lat/") + bundle = next(c for c in candidates if c["type"] == "cookie_bundle") + assert bundle.get("cookie_count") == 2 + assert set(bundle.get("cookie_names", [])) == {"cf_clearance", "session"} + + +def test_single_session_cookie_still_present_as_fallback(): + """session cookie 应同时出现在候选列表中(作为 fallback)。""" + cookies = _make_cookies( + ("session", "sess_val", ".saki.lat"), + ("cf_clearance", "cf_val", ".saki.lat"), + ) + candidates = _base_candidates(cookies, page_url="https://api.saki.lat/") + types = [c["type"] for c in candidates] + assert "cookie_bundle" in types + assert "cookie" in types + + +def test_new_api_user_propagated_to_bundle(): + """new_api_user 应写入 cookie_bundle 的 extra 字段。""" + cookies = _make_cookies(("session", "s", ".saki.lat")) + candidates = _curate_candidates( + cookies=cookies, + local_storage={"uid": "42"}, + session_storage={}, + auth_headers=[], + new_api_user="42", + page_url="https://saki.lat/", + ) + bundle = next(c for c in candidates if c["type"] == "cookie_bundle") + assert bundle.get("new_api_user") == "42"