273 lines
9.9 KiB
Python
273 lines
9.9 KiB
Python
"""
|
||
auth_capture_service 单元测试
|
||
|
||
覆盖场景:
|
||
- 完整 cookie bundle(含 cf_clearance、session)
|
||
- domain 后缀匹配(.saki.lat → api.saki.lat)
|
||
- 仅 1 个 cookie 也生成 bundle
|
||
- 无 cookies 时 bearer_token 排第一
|
||
- bundle 优先级高于单 session cookie
|
||
- 现有 bearer / api_key 提取路径不受影响
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import pytest
|
||
|
||
from app.services.auth_capture_service import (
|
||
_build_cookie_bundle,
|
||
_cookie_matches_hostname,
|
||
_curate_candidates,
|
||
)
|
||
|
||
|
||
# ── _cookie_matches_hostname ──────────────────────────────────────────────────
|
||
|
||
def test_exact_hostname_match():
|
||
assert _cookie_matches_hostname("saki.lat", "saki.lat")
|
||
|
||
|
||
def test_dot_prefix_subdomain_match():
|
||
"""`.saki.lat` 应该匹配 `api.saki.lat`。"""
|
||
assert _cookie_matches_hostname(".saki.lat", "api.saki.lat")
|
||
|
||
|
||
def test_dot_prefix_exact_match():
|
||
"""`saki.lat` 仍应匹配 `saki.lat`(无前缀点)。"""
|
||
assert _cookie_matches_hostname(".saki.lat", "saki.lat")
|
||
|
||
|
||
def test_no_domain_cookie_matches_any_hostname():
|
||
"""空 cookie domain(无限制)应对任意 hostname 返回 True。"""
|
||
assert _cookie_matches_hostname("", "anything.example.com")
|
||
|
||
|
||
def test_empty_hostname_rejects_all():
|
||
"""hostname 为空时,所有有 domain 的 cookie 都应被保守拒绝。"""
|
||
assert not _cookie_matches_hostname(".saki.lat", "")
|
||
assert not _cookie_matches_hostname("saki.lat", "")
|
||
|
||
|
||
def test_different_domain_no_match():
|
||
assert not _cookie_matches_hostname(".example.com", "saki.lat")
|
||
|
||
|
||
def test_partial_suffix_no_match():
|
||
"""evil-saki.lat 不应匹配 .saki.lat。"""
|
||
assert not _cookie_matches_hostname(".saki.lat", "evil-saki.lat")
|
||
|
||
|
||
# ── _build_cookie_bundle ──────────────────────────────────────────────────────
|
||
|
||
def _make_cookies(*items):
|
||
"""构造 cookie 列表,items 是 (name, value, domain) 元组。"""
|
||
return [
|
||
{"name": n, "value": v, "domain": d, "httpOnly": False, "secure": True}
|
||
for n, v, d in items
|
||
]
|
||
|
||
|
||
def test_bundle_includes_cf_clearance():
|
||
cookies = _make_cookies(
|
||
("cf_clearance", "abc123", ".saki.lat"),
|
||
("session", "sess456", ".saki.lat"),
|
||
("csrfToken", "csrf789", ".saki.lat"),
|
||
)
|
||
cookie_str, names = _build_cookie_bundle(cookies, "https://api.saki.lat/login")
|
||
assert "cf_clearance=abc123" in cookie_str
|
||
assert "session=sess456" in cookie_str
|
||
assert "csrfToken=csrf789" in cookie_str
|
||
assert "cf_clearance" in names
|
||
assert "session" in names
|
||
|
||
|
||
def test_bundle_excludes_unrelated_domain():
|
||
cookies = _make_cookies(
|
||
("session", "mine", ".saki.lat"),
|
||
("other_session", "theirs", ".example.com"),
|
||
)
|
||
cookie_str, names = _build_cookie_bundle(cookies, "https://api.saki.lat/login")
|
||
assert "mine" in cookie_str
|
||
assert "theirs" not in cookie_str
|
||
assert "other_session" not in names
|
||
|
||
|
||
def test_single_cookie_generates_bundle():
|
||
cookies = _make_cookies(("session", "onlyme", ".saki.lat"))
|
||
cookie_str, names = _build_cookie_bundle(cookies, "https://saki.lat/login")
|
||
assert cookie_str == "session=onlyme"
|
||
assert names == ["session"]
|
||
|
||
|
||
def test_empty_value_cookie_excluded():
|
||
cookies = _make_cookies(
|
||
("session", "valid", ".saki.lat"),
|
||
("empty_cookie", "", ".saki.lat"),
|
||
)
|
||
cookie_str, names = _build_cookie_bundle(cookies, "https://saki.lat/")
|
||
assert "empty_cookie" not in names
|
||
assert "empty_cookie" not in cookie_str
|
||
|
||
|
||
def test_no_cookies_returns_empty_bundle():
|
||
cookie_str, names = _build_cookie_bundle([], "https://saki.lat/")
|
||
assert cookie_str == ""
|
||
assert names == []
|
||
|
||
|
||
# ── _curate_candidates ────────────────────────────────────────────────────────
|
||
|
||
def _base_candidates(cookies, page_url="", auth_headers=None):
|
||
return _curate_candidates(
|
||
cookies=cookies,
|
||
local_storage={},
|
||
session_storage={},
|
||
auth_headers=auth_headers or [],
|
||
new_api_user="",
|
||
page_url=page_url,
|
||
)
|
||
|
||
|
||
def test_cookie_bundle_ranks_first():
|
||
"""有 cookies 时,cookie_bundle 候选应排在第一。"""
|
||
cookies = _make_cookies(
|
||
("cf_clearance", "cf_val", ".saki.lat"),
|
||
("session", "sess_val", ".saki.lat"),
|
||
)
|
||
candidates = _base_candidates(cookies, page_url="https://api.saki.lat/login")
|
||
assert len(candidates) > 0
|
||
assert candidates[0]["type"] == "cookie_bundle"
|
||
|
||
|
||
def test_cookie_bundle_value_is_full_string():
|
||
"""cookie_bundle 的 value 应是完整 cookie 字符串。"""
|
||
cookies = _make_cookies(
|
||
("cf_clearance", "cf_val", ".saki.lat"),
|
||
("session", "sess_val", ".saki.lat"),
|
||
)
|
||
candidates = _base_candidates(cookies, page_url="https://api.saki.lat/")
|
||
bundle = next(c for c in candidates if c["type"] == "cookie_bundle")
|
||
assert "cf_clearance=cf_val" in bundle["value"]
|
||
assert "session=sess_val" in bundle["value"]
|
||
|
||
|
||
def test_bundle_prioritized_over_single_session_cookie():
|
||
"""cookie_bundle 候选应排在单 session cookie 候选之前。"""
|
||
cookies = _make_cookies(
|
||
("session", "sess_val", ".saki.lat"),
|
||
("cf_clearance", "cf_val", ".saki.lat"),
|
||
)
|
||
candidates = _base_candidates(cookies, page_url="https://api.saki.lat/")
|
||
types = [c["type"] for c in candidates]
|
||
bundle_idx = types.index("cookie_bundle")
|
||
cookie_idx = types.index("cookie") if "cookie" in types else len(types)
|
||
assert bundle_idx < cookie_idx
|
||
|
||
|
||
def test_bearer_token_wins_when_no_cookies():
|
||
"""无 cookies 时,bearer_token 应排在第一(如果有 auth_headers)。"""
|
||
auth_headers = [
|
||
{"type": "authorization", "value": "eyJ.abc.def", "url": "https://api.saki.lat/api/v1/groups"}
|
||
]
|
||
candidates = _base_candidates([], page_url="https://api.saki.lat/", auth_headers=auth_headers)
|
||
assert len(candidates) > 0
|
||
assert candidates[0]["type"] == "bearer_token"
|
||
|
||
|
||
def test_no_cookies_no_headers_returns_empty():
|
||
"""无 cookies、无 storage、无 headers → 候选列表为空。"""
|
||
candidates = _base_candidates([], page_url="https://api.saki.lat/")
|
||
assert candidates == []
|
||
|
||
|
||
def test_bundle_extra_contains_cookie_count_and_names():
|
||
"""cookie_bundle 候选的 extra 应包含 cookie_count 和 cookie_names。"""
|
||
cookies = _make_cookies(
|
||
("cf_clearance", "cf_val", ".saki.lat"),
|
||
("session", "sess_val", ".saki.lat"),
|
||
)
|
||
candidates = _base_candidates(cookies, page_url="https://api.saki.lat/")
|
||
bundle = next(c for c in candidates if c["type"] == "cookie_bundle")
|
||
assert bundle.get("cookie_count") == 2
|
||
assert set(bundle.get("cookie_names", [])) == {"cf_clearance", "session"}
|
||
|
||
|
||
def test_single_session_cookie_still_present_as_fallback():
|
||
"""session cookie 应同时出现在候选列表中(作为 fallback)。"""
|
||
cookies = _make_cookies(
|
||
("session", "sess_val", ".saki.lat"),
|
||
("cf_clearance", "cf_val", ".saki.lat"),
|
||
)
|
||
candidates = _base_candidates(cookies, page_url="https://api.saki.lat/")
|
||
types = [c["type"] for c in candidates]
|
||
assert "cookie_bundle" in types
|
||
assert "cookie" in types
|
||
|
||
|
||
def test_new_api_user_propagated_to_bundle():
|
||
"""new_api_user 应写入 cookie_bundle 的 extra 字段。"""
|
||
cookies = _make_cookies(("session", "s", ".saki.lat"))
|
||
candidates = _curate_candidates(
|
||
cookies=cookies,
|
||
local_storage={"uid": "42"},
|
||
session_storage={},
|
||
auth_headers=[],
|
||
new_api_user="42",
|
||
page_url="https://saki.lat/",
|
||
)
|
||
bundle = next(c for c in candidates if c["type"] == "cookie_bundle")
|
||
assert bundle.get("new_api_user") == "42"
|
||
|
||
|
||
def test_browser_import_payload_builds_cookie_bundle_with_new_api_user():
|
||
from app.services.browser_import_service import build_import_result
|
||
|
||
result = build_import_result({
|
||
"page_url": "https://meow.example.com/panel",
|
||
"cookies": [
|
||
{"name": "cf_clearance", "value": "cf", "domain": ".example.com", "httpOnly": True},
|
||
{"name": "session", "value": "sess", "domain": ".example.com", "httpOnly": True},
|
||
],
|
||
"local_storage": {"uid": "7"},
|
||
"session_storage": {},
|
||
"auth_headers": [],
|
||
})
|
||
|
||
bundle = next(c for c in result["candidates"] if c["type"] == "cookie_bundle")
|
||
assert "cf_clearance=cf" in bundle["value"]
|
||
assert "session=sess" in bundle["value"]
|
||
assert bundle["new_api_user"] == "7"
|
||
|
||
|
||
def test_browser_import_payload_includes_auth_headers():
|
||
from app.services.browser_import_service import build_import_result
|
||
|
||
result = build_import_result({
|
||
"page_url": "https://sub2api.example.com/dashboard",
|
||
"cookies": [],
|
||
"local_storage": {},
|
||
"session_storage": {},
|
||
"auth_headers": [
|
||
{"type": "authorization", "value": "Bearer abc.def.ghi", "url": "https://sub2api.example.com/api/v1/groups"}
|
||
],
|
||
})
|
||
|
||
assert result["candidates"][0]["type"] == "bearer_token"
|
||
assert result["candidates"][0]["value"] == "Bearer abc.def.ghi"
|
||
|
||
|
||
def test_browser_import_session_secret_and_one_time_submit():
|
||
from app.services.browser_import_service import BrowserImportService, ImportSessionError
|
||
|
||
service = BrowserImportService()
|
||
session, secret = service.create("https://example.com/login", "admin@example.com")
|
||
|
||
with pytest.raises(ImportSessionError):
|
||
service.submit(session.id, "wrong", {"page_url": "https://example.com/"})
|
||
|
||
submitted = service.submit(session.id, secret, {"page_url": "https://example.com/"})
|
||
assert submitted.consumed is True
|
||
assert submitted.payload == {"page_url": "https://example.com/"}
|
||
|
||
with pytest.raises(ImportSessionError):
|
||
service.submit(session.id, secret, {"page_url": "https://example.com/again"})
|