""" auth_capture_service 单元测试 覆盖场景: - 完整 cookie bundle(含 cf_clearance、session) - domain 后缀匹配(.saki.lat → api.saki.lat) - 仅 1 个 cookie 也生成 bundle - 无 cookies 时 bearer_token 排第一 - bundle 优先级高于单 session cookie - 现有 bearer / api_key 提取路径不受影响 """ from __future__ import annotations import pytest from app.services.auth_capture_service import ( _build_cookie_bundle, _cookie_matches_hostname, _curate_candidates, ) # ── _cookie_matches_hostname ────────────────────────────────────────────────── def test_exact_hostname_match(): assert _cookie_matches_hostname("saki.lat", "saki.lat") def test_dot_prefix_subdomain_match(): """`.saki.lat` 应该匹配 `api.saki.lat`。""" assert _cookie_matches_hostname(".saki.lat", "api.saki.lat") def test_dot_prefix_exact_match(): """`saki.lat` 仍应匹配 `saki.lat`(无前缀点)。""" assert _cookie_matches_hostname(".saki.lat", "saki.lat") def test_no_domain_cookie_matches_any_hostname(): """空 cookie domain(无限制)应对任意 hostname 返回 True。""" assert _cookie_matches_hostname("", "anything.example.com") def test_empty_hostname_rejects_all(): """hostname 为空时,所有有 domain 的 cookie 都应被保守拒绝。""" assert not _cookie_matches_hostname(".saki.lat", "") assert not _cookie_matches_hostname("saki.lat", "") def test_different_domain_no_match(): assert not _cookie_matches_hostname(".example.com", "saki.lat") def test_partial_suffix_no_match(): """evil-saki.lat 不应匹配 .saki.lat。""" assert not _cookie_matches_hostname(".saki.lat", "evil-saki.lat") # ── _build_cookie_bundle ────────────────────────────────────────────────────── def _make_cookies(*items): """构造 cookie 列表,items 是 (name, value, domain) 元组。""" return [ {"name": n, "value": v, "domain": d, "httpOnly": False, "secure": True} for n, v, d in items ] def test_bundle_includes_cf_clearance(): cookies = _make_cookies( ("cf_clearance", "abc123", ".saki.lat"), ("session", "sess456", ".saki.lat"), ("csrfToken", "csrf789", ".saki.lat"), ) cookie_str, names = _build_cookie_bundle(cookies, "https://api.saki.lat/login") assert "cf_clearance=abc123" in cookie_str assert "session=sess456" in cookie_str assert "csrfToken=csrf789" in cookie_str assert "cf_clearance" in names assert "session" in names def test_bundle_excludes_unrelated_domain(): cookies = _make_cookies( ("session", "mine", ".saki.lat"), ("other_session", "theirs", ".example.com"), ) cookie_str, names = _build_cookie_bundle(cookies, "https://api.saki.lat/login") assert "mine" in cookie_str assert "theirs" not in cookie_str assert "other_session" not in names def test_single_cookie_generates_bundle(): cookies = _make_cookies(("session", "onlyme", ".saki.lat")) cookie_str, names = _build_cookie_bundle(cookies, "https://saki.lat/login") assert cookie_str == "session=onlyme" assert names == ["session"] def test_empty_value_cookie_excluded(): cookies = _make_cookies( ("session", "valid", ".saki.lat"), ("empty_cookie", "", ".saki.lat"), ) cookie_str, names = _build_cookie_bundle(cookies, "https://saki.lat/") assert "empty_cookie" not in names assert "empty_cookie" not in cookie_str def test_no_cookies_returns_empty_bundle(): cookie_str, names = _build_cookie_bundle([], "https://saki.lat/") assert cookie_str == "" assert names == [] # ── _curate_candidates ──────────────────────────────────────────────────────── def _base_candidates(cookies, page_url="", auth_headers=None): return _curate_candidates( cookies=cookies, local_storage={}, session_storage={}, auth_headers=auth_headers or [], new_api_user="", page_url=page_url, ) def test_cookie_bundle_ranks_first(): """有 cookies 时,cookie_bundle 候选应排在第一。""" cookies = _make_cookies( ("cf_clearance", "cf_val", ".saki.lat"), ("session", "sess_val", ".saki.lat"), ) candidates = _base_candidates(cookies, page_url="https://api.saki.lat/login") assert len(candidates) > 0 assert candidates[0]["type"] == "cookie_bundle" def test_cookie_bundle_value_is_full_string(): """cookie_bundle 的 value 应是完整 cookie 字符串。""" cookies = _make_cookies( ("cf_clearance", "cf_val", ".saki.lat"), ("session", "sess_val", ".saki.lat"), ) candidates = _base_candidates(cookies, page_url="https://api.saki.lat/") bundle = next(c for c in candidates if c["type"] == "cookie_bundle") assert "cf_clearance=cf_val" in bundle["value"] assert "session=sess_val" in bundle["value"] def test_bundle_prioritized_over_single_session_cookie(): """cookie_bundle 候选应排在单 session cookie 候选之前。""" cookies = _make_cookies( ("session", "sess_val", ".saki.lat"), ("cf_clearance", "cf_val", ".saki.lat"), ) candidates = _base_candidates(cookies, page_url="https://api.saki.lat/") types = [c["type"] for c in candidates] bundle_idx = types.index("cookie_bundle") cookie_idx = types.index("cookie") if "cookie" in types else len(types) assert bundle_idx < cookie_idx def test_bearer_token_wins_when_no_cookies(): """无 cookies 时,bearer_token 应排在第一(如果有 auth_headers)。""" auth_headers = [ {"type": "authorization", "value": "eyJ.abc.def", "url": "https://api.saki.lat/api/v1/groups"} ] candidates = _base_candidates([], page_url="https://api.saki.lat/", auth_headers=auth_headers) assert len(candidates) > 0 assert candidates[0]["type"] == "bearer_token" def test_no_cookies_no_headers_returns_empty(): """无 cookies、无 storage、无 headers → 候选列表为空。""" candidates = _base_candidates([], page_url="https://api.saki.lat/") assert candidates == [] def test_bundle_extra_contains_cookie_count_and_names(): """cookie_bundle 候选的 extra 应包含 cookie_count 和 cookie_names。""" cookies = _make_cookies( ("cf_clearance", "cf_val", ".saki.lat"), ("session", "sess_val", ".saki.lat"), ) candidates = _base_candidates(cookies, page_url="https://api.saki.lat/") bundle = next(c for c in candidates if c["type"] == "cookie_bundle") assert bundle.get("cookie_count") == 2 assert set(bundle.get("cookie_names", [])) == {"cf_clearance", "session"} def test_single_session_cookie_still_present_as_fallback(): """session cookie 应同时出现在候选列表中(作为 fallback)。""" cookies = _make_cookies( ("session", "sess_val", ".saki.lat"), ("cf_clearance", "cf_val", ".saki.lat"), ) candidates = _base_candidates(cookies, page_url="https://api.saki.lat/") types = [c["type"] for c in candidates] assert "cookie_bundle" in types assert "cookie" in types def test_new_api_user_propagated_to_bundle(): """new_api_user 应写入 cookie_bundle 的 extra 字段。""" cookies = _make_cookies(("session", "s", ".saki.lat")) candidates = _curate_candidates( cookies=cookies, local_storage={"uid": "42"}, session_storage={}, auth_headers=[], new_api_user="42", page_url="https://saki.lat/", ) bundle = next(c for c in candidates if c["type"] == "cookie_bundle") assert bundle.get("new_api_user") == "42" def test_browser_import_payload_builds_cookie_bundle_with_new_api_user(): from app.services.browser_import_service import build_import_result result = build_import_result({ "page_url": "https://meow.example.com/panel", "cookies": [ {"name": "cf_clearance", "value": "cf", "domain": ".example.com", "httpOnly": True}, {"name": "session", "value": "sess", "domain": ".example.com", "httpOnly": True}, ], "local_storage": {"uid": "7"}, "session_storage": {}, "auth_headers": [], }) bundle = next(c for c in result["candidates"] if c["type"] == "cookie_bundle") assert "cf_clearance=cf" in bundle["value"] assert "session=sess" in bundle["value"] assert bundle["new_api_user"] == "7" def test_browser_import_payload_includes_auth_headers(): from app.services.browser_import_service import build_import_result result = build_import_result({ "page_url": "https://sub2api.example.com/dashboard", "cookies": [], "local_storage": {}, "session_storage": {}, "auth_headers": [ {"type": "authorization", "value": "Bearer abc.def.ghi", "url": "https://sub2api.example.com/api/v1/groups"} ], }) assert result["candidates"][0]["type"] == "bearer_token" assert result["candidates"][0]["value"] == "Bearer abc.def.ghi" def test_browser_import_session_secret_and_one_time_submit(): from app.services.browser_import_service import BrowserImportService, ImportSessionError service = BrowserImportService() session, secret = service.create("https://example.com/login", "admin@example.com") with pytest.raises(ImportSessionError): service.submit(session.id, "wrong", {"page_url": "https://example.com/"}) submitted = service.submit(session.id, secret, {"page_url": "https://example.com/"}) assert submitted.consumed is True assert submitted.payload == {"page_url": "https://example.com/"} with pytest.raises(ImportSessionError): service.submit(session.id, secret, {"page_url": "https://example.com/again"})