Files
SmartUp/backend/test_auth_capture.py
T
2026-06-02 13:51:29 +08:00

273 lines
9.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
auth_capture_service 单元测试
覆盖场景:
- 完整 cookie bundle(含 cf_clearance、session
- domain 后缀匹配(.saki.lat → api.saki.lat
- 仅 1 个 cookie 也生成 bundle
- 无 cookies 时 bearer_token 排第一
- bundle 优先级高于单 session cookie
- 现有 bearer / api_key 提取路径不受影响
"""
from __future__ import annotations
import pytest
from app.services.auth_capture_service import (
_build_cookie_bundle,
_cookie_matches_hostname,
_curate_candidates,
)
# ── _cookie_matches_hostname ──────────────────────────────────────────────────
def test_exact_hostname_match():
assert _cookie_matches_hostname("saki.lat", "saki.lat")
def test_dot_prefix_subdomain_match():
"""`.saki.lat` 应该匹配 `api.saki.lat`。"""
assert _cookie_matches_hostname(".saki.lat", "api.saki.lat")
def test_dot_prefix_exact_match():
"""`saki.lat` 仍应匹配 `saki.lat`(无前缀点)。"""
assert _cookie_matches_hostname(".saki.lat", "saki.lat")
def test_no_domain_cookie_matches_any_hostname():
"""空 cookie domain(无限制)应对任意 hostname 返回 True。"""
assert _cookie_matches_hostname("", "anything.example.com")
def test_empty_hostname_rejects_all():
"""hostname 为空时,所有有 domain 的 cookie 都应被保守拒绝。"""
assert not _cookie_matches_hostname(".saki.lat", "")
assert not _cookie_matches_hostname("saki.lat", "")
def test_different_domain_no_match():
assert not _cookie_matches_hostname(".example.com", "saki.lat")
def test_partial_suffix_no_match():
"""evil-saki.lat 不应匹配 .saki.lat。"""
assert not _cookie_matches_hostname(".saki.lat", "evil-saki.lat")
# ── _build_cookie_bundle ──────────────────────────────────────────────────────
def _make_cookies(*items):
"""构造 cookie 列表,items 是 (name, value, domain) 元组。"""
return [
{"name": n, "value": v, "domain": d, "httpOnly": False, "secure": True}
for n, v, d in items
]
def test_bundle_includes_cf_clearance():
cookies = _make_cookies(
("cf_clearance", "abc123", ".saki.lat"),
("session", "sess456", ".saki.lat"),
("csrfToken", "csrf789", ".saki.lat"),
)
cookie_str, names = _build_cookie_bundle(cookies, "https://api.saki.lat/login")
assert "cf_clearance=abc123" in cookie_str
assert "session=sess456" in cookie_str
assert "csrfToken=csrf789" in cookie_str
assert "cf_clearance" in names
assert "session" in names
def test_bundle_excludes_unrelated_domain():
cookies = _make_cookies(
("session", "mine", ".saki.lat"),
("other_session", "theirs", ".example.com"),
)
cookie_str, names = _build_cookie_bundle(cookies, "https://api.saki.lat/login")
assert "mine" in cookie_str
assert "theirs" not in cookie_str
assert "other_session" not in names
def test_single_cookie_generates_bundle():
cookies = _make_cookies(("session", "onlyme", ".saki.lat"))
cookie_str, names = _build_cookie_bundle(cookies, "https://saki.lat/login")
assert cookie_str == "session=onlyme"
assert names == ["session"]
def test_empty_value_cookie_excluded():
cookies = _make_cookies(
("session", "valid", ".saki.lat"),
("empty_cookie", "", ".saki.lat"),
)
cookie_str, names = _build_cookie_bundle(cookies, "https://saki.lat/")
assert "empty_cookie" not in names
assert "empty_cookie" not in cookie_str
def test_no_cookies_returns_empty_bundle():
cookie_str, names = _build_cookie_bundle([], "https://saki.lat/")
assert cookie_str == ""
assert names == []
# ── _curate_candidates ────────────────────────────────────────────────────────
def _base_candidates(cookies, page_url="", auth_headers=None):
return _curate_candidates(
cookies=cookies,
local_storage={},
session_storage={},
auth_headers=auth_headers or [],
new_api_user="",
page_url=page_url,
)
def test_cookie_bundle_ranks_first():
"""有 cookies 时,cookie_bundle 候选应排在第一。"""
cookies = _make_cookies(
("cf_clearance", "cf_val", ".saki.lat"),
("session", "sess_val", ".saki.lat"),
)
candidates = _base_candidates(cookies, page_url="https://api.saki.lat/login")
assert len(candidates) > 0
assert candidates[0]["type"] == "cookie_bundle"
def test_cookie_bundle_value_is_full_string():
"""cookie_bundle 的 value 应是完整 cookie 字符串。"""
cookies = _make_cookies(
("cf_clearance", "cf_val", ".saki.lat"),
("session", "sess_val", ".saki.lat"),
)
candidates = _base_candidates(cookies, page_url="https://api.saki.lat/")
bundle = next(c for c in candidates if c["type"] == "cookie_bundle")
assert "cf_clearance=cf_val" in bundle["value"]
assert "session=sess_val" in bundle["value"]
def test_bundle_prioritized_over_single_session_cookie():
"""cookie_bundle 候选应排在单 session cookie 候选之前。"""
cookies = _make_cookies(
("session", "sess_val", ".saki.lat"),
("cf_clearance", "cf_val", ".saki.lat"),
)
candidates = _base_candidates(cookies, page_url="https://api.saki.lat/")
types = [c["type"] for c in candidates]
bundle_idx = types.index("cookie_bundle")
cookie_idx = types.index("cookie") if "cookie" in types else len(types)
assert bundle_idx < cookie_idx
def test_bearer_token_wins_when_no_cookies():
"""无 cookies 时,bearer_token 应排在第一(如果有 auth_headers)。"""
auth_headers = [
{"type": "authorization", "value": "eyJ.abc.def", "url": "https://api.saki.lat/api/v1/groups"}
]
candidates = _base_candidates([], page_url="https://api.saki.lat/", auth_headers=auth_headers)
assert len(candidates) > 0
assert candidates[0]["type"] == "bearer_token"
def test_no_cookies_no_headers_returns_empty():
"""无 cookies、无 storage、无 headers → 候选列表为空。"""
candidates = _base_candidates([], page_url="https://api.saki.lat/")
assert candidates == []
def test_bundle_extra_contains_cookie_count_and_names():
"""cookie_bundle 候选的 extra 应包含 cookie_count 和 cookie_names。"""
cookies = _make_cookies(
("cf_clearance", "cf_val", ".saki.lat"),
("session", "sess_val", ".saki.lat"),
)
candidates = _base_candidates(cookies, page_url="https://api.saki.lat/")
bundle = next(c for c in candidates if c["type"] == "cookie_bundle")
assert bundle.get("cookie_count") == 2
assert set(bundle.get("cookie_names", [])) == {"cf_clearance", "session"}
def test_single_session_cookie_still_present_as_fallback():
"""session cookie 应同时出现在候选列表中(作为 fallback)。"""
cookies = _make_cookies(
("session", "sess_val", ".saki.lat"),
("cf_clearance", "cf_val", ".saki.lat"),
)
candidates = _base_candidates(cookies, page_url="https://api.saki.lat/")
types = [c["type"] for c in candidates]
assert "cookie_bundle" in types
assert "cookie" in types
def test_new_api_user_propagated_to_bundle():
"""new_api_user 应写入 cookie_bundle 的 extra 字段。"""
cookies = _make_cookies(("session", "s", ".saki.lat"))
candidates = _curate_candidates(
cookies=cookies,
local_storage={"uid": "42"},
session_storage={},
auth_headers=[],
new_api_user="42",
page_url="https://saki.lat/",
)
bundle = next(c for c in candidates if c["type"] == "cookie_bundle")
assert bundle.get("new_api_user") == "42"
def test_browser_import_payload_builds_cookie_bundle_with_new_api_user():
from app.services.browser_import_service import build_import_result
result = build_import_result({
"page_url": "https://meow.example.com/panel",
"cookies": [
{"name": "cf_clearance", "value": "cf", "domain": ".example.com", "httpOnly": True},
{"name": "session", "value": "sess", "domain": ".example.com", "httpOnly": True},
],
"local_storage": {"uid": "7"},
"session_storage": {},
"auth_headers": [],
})
bundle = next(c for c in result["candidates"] if c["type"] == "cookie_bundle")
assert "cf_clearance=cf" in bundle["value"]
assert "session=sess" in bundle["value"]
assert bundle["new_api_user"] == "7"
def test_browser_import_payload_includes_auth_headers():
from app.services.browser_import_service import build_import_result
result = build_import_result({
"page_url": "https://sub2api.example.com/dashboard",
"cookies": [],
"local_storage": {},
"session_storage": {},
"auth_headers": [
{"type": "authorization", "value": "Bearer abc.def.ghi", "url": "https://sub2api.example.com/api/v1/groups"}
],
})
assert result["candidates"][0]["type"] == "bearer_token"
assert result["candidates"][0]["value"] == "Bearer abc.def.ghi"
def test_browser_import_session_secret_and_one_time_submit():
from app.services.browser_import_service import BrowserImportService, ImportSessionError
service = BrowserImportService()
session, secret = service.create("https://example.com/login", "admin@example.com")
with pytest.raises(ImportSessionError):
service.submit(session.id, "wrong", {"page_url": "https://example.com/"})
submitted = service.submit(session.id, secret, {"page_url": "https://example.com/"})
assert submitted.consumed is True
assert submitted.payload == {"page_url": "https://example.com/"}
with pytest.raises(ImportSessionError):
service.submit(session.id, secret, {"page_url": "https://example.com/again"})