421 lines
14 KiB
Python
421 lines
14 KiB
Python
import asyncio
|
|
from pathlib import Path
|
|
from app.config import get_settings
|
|
from app.routers.auth_capture import _sanitize_candidate
|
|
from app.services.browser_session_service import BrowserSessionService
|
|
|
|
|
|
class FakeLocator:
|
|
def __init__(self, *, visible=True, count=1):
|
|
self._visible = list(visible) if isinstance(visible, list) else [visible]
|
|
self._count = count
|
|
self.filled = []
|
|
self.clicked = 0
|
|
self.timeouts = []
|
|
|
|
@property
|
|
def first(self):
|
|
return self
|
|
|
|
async def count(self):
|
|
return self._count
|
|
|
|
async def is_visible(self, timeout=0):
|
|
self.timeouts.append(timeout)
|
|
if not self._visible:
|
|
return False
|
|
if len(self._visible) == 1:
|
|
return self._visible[0]
|
|
return self._visible.pop(0)
|
|
|
|
async def fill(self, value, timeout=0):
|
|
self.filled.append((value, timeout))
|
|
|
|
async def click(self, timeout=0):
|
|
self.clicked += 1
|
|
|
|
|
|
class FakePage:
|
|
url = "https://example.test/login"
|
|
|
|
def __init__(self, locators):
|
|
self.locators = locators
|
|
self.queries = []
|
|
|
|
def locator(self, selector):
|
|
self.queries.append(selector)
|
|
return self.locators.get(selector, FakeLocator(visible=False, count=0))
|
|
|
|
|
|
def run(coro):
|
|
return asyncio.run(coro)
|
|
|
|
|
|
def test_autofill_retries_until_delayed_fields_are_visible():
|
|
service = BrowserSessionService()
|
|
username = FakeLocator(visible=[False, True])
|
|
password = FakeLocator(visible=True)
|
|
submit = FakeLocator(visible=True)
|
|
page = FakePage({
|
|
"#user": username,
|
|
"#pass": password,
|
|
"#submit": submit,
|
|
})
|
|
|
|
run(service._autofill_login(
|
|
page,
|
|
{
|
|
"enabled": True,
|
|
"username": "alice",
|
|
"password": "secret",
|
|
"username_selector": "#user",
|
|
"password_selector": "#pass",
|
|
"submit_selector": "#submit",
|
|
},
|
|
max_wait_seconds=1,
|
|
poll_interval_seconds=0,
|
|
))
|
|
|
|
assert page.queries[0] == "#user"
|
|
assert "#pass" in page.queries
|
|
assert "input[type='password']" not in page.queries
|
|
assert username.filled == [("alice", 3000)]
|
|
assert password.filled == [("secret", 3000)]
|
|
assert submit.clicked == 1
|
|
|
|
|
|
def test_autofill_returns_without_selectors_when_disabled_or_missing_credentials():
|
|
service = BrowserSessionService()
|
|
|
|
disabled_page = FakePage({"#user": FakeLocator()})
|
|
run(service._autofill_login(
|
|
disabled_page,
|
|
{"enabled": False, "username": "alice", "password": "secret"},
|
|
max_wait_seconds=1,
|
|
poll_interval_seconds=0,
|
|
))
|
|
assert disabled_page.queries == []
|
|
|
|
missing_password_page = FakePage({"#user": FakeLocator()})
|
|
run(service._autofill_login(
|
|
missing_password_page,
|
|
{"enabled": True, "username": "alice", "password": ""},
|
|
max_wait_seconds=1,
|
|
poll_interval_seconds=0,
|
|
))
|
|
assert missing_password_page.queries == []
|
|
|
|
|
|
def test_sanitize_candidate_strips_secret_fields_but_keeps_metadata():
|
|
sanitized = _sanitize_candidate({
|
|
"type": "cookie",
|
|
"source": "cookie:session",
|
|
"value": "Bearer secret-token",
|
|
"preview": "Bearer s…token",
|
|
"label": "session cookie",
|
|
"confidence": 90,
|
|
"cookie_name": "session",
|
|
"cookie_value": "secret-cookie",
|
|
"domain": "example.test",
|
|
})
|
|
|
|
assert sanitized == {
|
|
"type": "cookie",
|
|
"source": "cookie:session",
|
|
"preview": "Bearer s…token",
|
|
"label": "session cookie",
|
|
"confidence": 90,
|
|
"cookie_name": "session",
|
|
"domain": "example.test",
|
|
}
|
|
|
|
|
|
def test_cookies_path_mapping():
|
|
import tempfile
|
|
import shutil
|
|
service = BrowserSessionService()
|
|
temp_dir = tempfile.mkdtemp()
|
|
original_dir = get_settings().browser_profiles_dir
|
|
get_settings().browser_profiles_dir = temp_dir
|
|
try:
|
|
profile_key = "test-profile-123"
|
|
expected_path = Path(service._cookies_path(profile_key))
|
|
assert expected_path.name == "session-cookies.json"
|
|
assert expected_path.parent == Path(service._profile_dir(profile_key))
|
|
finally:
|
|
get_settings().browser_profiles_dir = original_dir
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
|
|
|
|
def test_screenshot_throttled_save():
|
|
import tempfile
|
|
import shutil
|
|
service = BrowserSessionService()
|
|
|
|
# 准备临时目录并 mock settings.browser_profiles_dir
|
|
temp_dir = tempfile.mkdtemp()
|
|
original_dir = get_settings().browser_profiles_dir
|
|
get_settings().browser_profiles_dir = temp_dir
|
|
|
|
try:
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
from app.services.browser_session_service import BrowserSession
|
|
|
|
# Mock Context & Page
|
|
fake_context = AsyncMock()
|
|
fake_page = MagicMock()
|
|
fake_page.is_closed = MagicMock(return_value=False)
|
|
fake_page.screenshot = AsyncMock(return_value=b"screenshot-bytes")
|
|
|
|
session = BrowserSession(
|
|
id="session123",
|
|
custom_page_id=1,
|
|
profile_key="page-1-test",
|
|
context=fake_context,
|
|
page=fake_page,
|
|
lock=asyncio.Lock(),
|
|
last_saved_state_at=0.0
|
|
)
|
|
service._sessions[session.id] = session
|
|
|
|
# 第一次调用 screenshot: 触发存储
|
|
res1 = run(service.screenshot(session.id))
|
|
assert res1 == b"screenshot-bytes"
|
|
assert fake_context.storage_state.call_count == 1
|
|
|
|
# 记录第一次保存后的时间戳
|
|
first_save_time = session.last_saved_state_at
|
|
assert first_save_time > 0
|
|
|
|
# 第二次立即调用 screenshot: 应该因为限流 10s 被跳过,不增加 call_count
|
|
res2 = run(service.screenshot(session.id))
|
|
assert res2 == b"screenshot-bytes"
|
|
assert fake_context.storage_state.call_count == 1
|
|
|
|
# 模拟 11 秒后(防抖时间已过)再度截图
|
|
session.last_saved_state_at = first_save_time - 11.0
|
|
res3 = run(service.screenshot(session.id))
|
|
assert res3 == b"screenshot-bytes"
|
|
assert fake_context.storage_state.call_count == 2
|
|
|
|
# 测试临时 auth-capture 会话不触发任何 state 存储
|
|
ephemeral_session = BrowserSession(
|
|
id="session-eph",
|
|
custom_page_id=0,
|
|
profile_key="auth-capture-xyz",
|
|
context=fake_context,
|
|
page=fake_page,
|
|
lock=asyncio.Lock(),
|
|
last_saved_state_at=0.0
|
|
)
|
|
service._sessions[ephemeral_session.id] = ephemeral_session
|
|
run(service.screenshot(ephemeral_session.id))
|
|
# 它的 call_count 依然是 2,没有增加
|
|
assert fake_context.storage_state.call_count == 2
|
|
|
|
finally:
|
|
get_settings().browser_profiles_dir = original_dir
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
|
|
|
|
def test_close_saves_state_and_cleans_up():
|
|
import tempfile
|
|
import shutil
|
|
service = BrowserSessionService()
|
|
temp_dir = tempfile.mkdtemp()
|
|
original_dir = get_settings().browser_profiles_dir
|
|
get_settings().browser_profiles_dir = temp_dir
|
|
|
|
try:
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
from app.services.browser_session_service import BrowserSession
|
|
|
|
fake_context = AsyncMock()
|
|
fake_page = MagicMock()
|
|
fake_page.is_closed = MagicMock(return_value=False)
|
|
|
|
import time
|
|
session = BrowserSession(
|
|
id="session_close",
|
|
custom_page_id=2,
|
|
profile_key="page-2-test",
|
|
context=fake_context,
|
|
page=fake_page,
|
|
lock=asyncio.Lock(),
|
|
last_saved_state_at=time.monotonic() # 此时在限流内
|
|
)
|
|
service._sessions[session.id] = session
|
|
|
|
# 即使在限流时间内,close 也必须强制保存
|
|
run(service.close(session.id))
|
|
assert fake_context.storage_state.call_count == 1
|
|
assert fake_context.close.call_count == 1
|
|
|
|
# 测试 ephemeral 会话在 close 时不应该保存 state,并且其 profile_dir 应当被清理,导致 cookies json 不复存在
|
|
eph_context = AsyncMock()
|
|
eph_page = MagicMock()
|
|
eph_page.is_closed = MagicMock(return_value=False)
|
|
eph_session = BrowserSession(
|
|
id="session_eph_close",
|
|
custom_page_id=0,
|
|
profile_key="auth-capture-abc",
|
|
context=eph_context,
|
|
page=eph_page,
|
|
lock=asyncio.Lock(),
|
|
last_saved_state_at=0.0
|
|
)
|
|
service._sessions[eph_session.id] = eph_session
|
|
|
|
# 先手动创建一个假 session-cookies.json
|
|
eph_cookies_path = service._cookies_path(eph_session.profile_key)
|
|
eph_cookies_path.write_text("{}")
|
|
assert eph_cookies_path.exists()
|
|
|
|
run(service.close(eph_session.id))
|
|
# ephemeral close 时不保存,所以 call_count 依然是 0
|
|
assert eph_context.storage_state.call_count == 0
|
|
assert eph_context.close.call_count == 1
|
|
# 但对应的 profile 目录已被删除,文件自然不复存在
|
|
assert not eph_cookies_path.exists()
|
|
|
|
finally:
|
|
get_settings().browser_profiles_dir = original_dir
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
|
|
|
|
def test_restore_session_state_decoding_and_inject():
|
|
import json
|
|
import tempfile
|
|
import shutil
|
|
import time
|
|
service = BrowserSessionService()
|
|
temp_dir = tempfile.mkdtemp()
|
|
original_dir = get_settings().browser_profiles_dir
|
|
get_settings().browser_profiles_dir = temp_dir
|
|
|
|
try:
|
|
from unittest.mock import AsyncMock
|
|
fake_context = AsyncMock()
|
|
profile_key = "test-restore-profile"
|
|
|
|
# 准备假 cookies.json,包含 cookies 和 origins/localStorage
|
|
cookies_path = service._cookies_path(profile_key)
|
|
|
|
now = time.time()
|
|
fake_state = {
|
|
"cookies": [
|
|
{
|
|
"name": "valid_persistent",
|
|
"value": "123",
|
|
"domain": "example.test",
|
|
"path": "/",
|
|
"expires": now + 3600 # 未过期
|
|
},
|
|
{
|
|
"name": "expired_cookie",
|
|
"value": "456",
|
|
"domain": "example.test",
|
|
"path": "/",
|
|
"expires": now - 3600 # 已过期
|
|
},
|
|
{
|
|
"name": "session_cookie",
|
|
"value": "789",
|
|
"domain": "example.test",
|
|
"path": "/",
|
|
"expires": -1 # session cookie,应被保留并剔除 expires 字段
|
|
}
|
|
],
|
|
"origins": [
|
|
{
|
|
"origin": "https://example.test",
|
|
"localStorage": [
|
|
{
|
|
"name": "theme",
|
|
"value": "dark"
|
|
}
|
|
]
|
|
}
|
|
]
|
|
}
|
|
with open(cookies_path, "w", encoding='utf-8') as f:
|
|
json.dump(fake_state, f)
|
|
|
|
# 运行还原方法
|
|
run(service._restore_session_state(fake_context, profile_key))
|
|
|
|
# 检查是否成功调用 add_cookies
|
|
assert fake_context.add_cookies.call_count == 1
|
|
|
|
# 检查过滤后的 cookies 内容
|
|
injected_cookies = fake_context.add_cookies.call_args[0][0]
|
|
assert len(injected_cookies) == 2
|
|
|
|
names = [c["name"] for c in injected_cookies]
|
|
assert "valid_persistent" in names
|
|
assert "session_cookie" in names
|
|
assert "expired_cookie" not in names
|
|
|
|
# 校验 session_cookie 是否成功移除了 expires
|
|
session_c = next(c for c in injected_cookies if c["name"] == "session_cookie")
|
|
assert "expires" not in session_c
|
|
|
|
# 检查是否成功调用了 add_init_script (用于还原 LocalStorage)
|
|
assert fake_context.add_init_script.call_count == 1
|
|
init_script = fake_context.add_init_script.call_args[0][0]
|
|
assert "window.localStorage.setItem" in init_script
|
|
assert "theme" in init_script
|
|
assert "dark" in init_script
|
|
|
|
finally:
|
|
get_settings().browser_profiles_dir = original_dir
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
|
|
|
|
def test_websocket_event_saves_state():
|
|
import tempfile
|
|
import shutil
|
|
service = BrowserSessionService()
|
|
temp_dir = tempfile.mkdtemp()
|
|
original_dir = get_settings().browser_profiles_dir
|
|
get_settings().browser_profiles_dir = temp_dir
|
|
|
|
try:
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
from app.services.browser_session_service import BrowserSession
|
|
|
|
fake_context = AsyncMock()
|
|
fake_page = MagicMock()
|
|
fake_page.is_closed = MagicMock(return_value=False)
|
|
fake_page.mouse = MagicMock()
|
|
fake_page.mouse.click = AsyncMock()
|
|
|
|
session = BrowserSession(
|
|
id="session_ws",
|
|
custom_page_id=3,
|
|
profile_key="page-3-ws-test",
|
|
context=fake_context,
|
|
page=fake_page,
|
|
lock=asyncio.Lock(),
|
|
last_saved_state_at=0.0
|
|
)
|
|
service._sessions[session.id] = session
|
|
|
|
# 即使 include_state=False,也应当在 5 秒节流到期后保存状态
|
|
run(service.event(
|
|
session_id=session.id,
|
|
event_type="click",
|
|
payload={"x": 10.0, "y": 20.0},
|
|
include_state=False
|
|
))
|
|
|
|
# storage_state 应该被调用,说明保存成功触发了
|
|
assert fake_context.storage_state.call_count == 1
|
|
assert session.last_saved_state_at > 0
|
|
|
|
finally:
|
|
get_settings().browser_profiles_dir = original_dir
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
|
|
|