import asyncio from pathlib import Path from app.config import get_settings from app.routers.auth_capture import _sanitize_candidate from app.services.browser_session_service import BrowserSessionService class FakeLocator: def __init__(self, *, visible=True, count=1): self._visible = list(visible) if isinstance(visible, list) else [visible] self._count = count self.filled = [] self.clicked = 0 self.timeouts = [] @property def first(self): return self async def count(self): return self._count async def is_visible(self, timeout=0): self.timeouts.append(timeout) if not self._visible: return False if len(self._visible) == 1: return self._visible[0] return self._visible.pop(0) async def fill(self, value, timeout=0): self.filled.append((value, timeout)) async def click(self, timeout=0): self.clicked += 1 class FakePage: url = "https://example.test/login" def __init__(self, locators): self.locators = locators self.queries = [] def locator(self, selector): self.queries.append(selector) return self.locators.get(selector, FakeLocator(visible=False, count=0)) def run(coro): return asyncio.run(coro) def test_autofill_retries_until_delayed_fields_are_visible(): service = BrowserSessionService() username = FakeLocator(visible=[False, True]) password = FakeLocator(visible=True) submit = FakeLocator(visible=True) page = FakePage({ "#user": username, "#pass": password, "#submit": submit, }) run(service._autofill_login( page, { "enabled": True, "username": "alice", "password": "secret", "username_selector": "#user", "password_selector": "#pass", "submit_selector": "#submit", }, max_wait_seconds=1, poll_interval_seconds=0, )) assert page.queries[0] == "#user" assert "#pass" in page.queries assert "input[type='password']" not in page.queries assert username.filled == [("alice", 3000)] assert password.filled == [("secret", 3000)] assert submit.clicked == 1 def test_autofill_returns_without_selectors_when_disabled_or_missing_credentials(): service = BrowserSessionService() disabled_page = FakePage({"#user": FakeLocator()}) run(service._autofill_login( disabled_page, {"enabled": False, "username": "alice", "password": "secret"}, max_wait_seconds=1, poll_interval_seconds=0, )) assert disabled_page.queries == [] missing_password_page = FakePage({"#user": FakeLocator()}) run(service._autofill_login( missing_password_page, {"enabled": True, "username": "alice", "password": ""}, max_wait_seconds=1, poll_interval_seconds=0, )) assert missing_password_page.queries == [] def test_sanitize_candidate_strips_secret_fields_but_keeps_metadata(): sanitized = _sanitize_candidate({ "type": "cookie", "source": "cookie:session", "value": "Bearer secret-token", "preview": "Bearer s…token", "label": "session cookie", "confidence": 90, "cookie_name": "session", "cookie_value": "secret-cookie", "domain": "example.test", }) assert sanitized == { "type": "cookie", "source": "cookie:session", "preview": "Bearer s…token", "label": "session cookie", "confidence": 90, "cookie_name": "session", "domain": "example.test", } def test_cookies_path_mapping(): import tempfile import shutil service = BrowserSessionService() temp_dir = tempfile.mkdtemp() original_dir = get_settings().browser_profiles_dir get_settings().browser_profiles_dir = temp_dir try: profile_key = "test-profile-123" expected_path = Path(service._cookies_path(profile_key)) assert expected_path.name == "session-cookies.json" assert expected_path.parent == Path(service._profile_dir(profile_key)) finally: get_settings().browser_profiles_dir = original_dir shutil.rmtree(temp_dir, ignore_errors=True) def test_screenshot_throttled_save(): import tempfile import shutil service = BrowserSessionService() # 准备临时目录并 mock settings.browser_profiles_dir temp_dir = tempfile.mkdtemp() original_dir = get_settings().browser_profiles_dir get_settings().browser_profiles_dir = temp_dir try: from unittest.mock import AsyncMock, MagicMock from app.services.browser_session_service import BrowserSession # Mock Context & Page fake_context = AsyncMock() fake_page = MagicMock() fake_page.is_closed = MagicMock(return_value=False) fake_page.screenshot = AsyncMock(return_value=b"screenshot-bytes") session = BrowserSession( id="session123", custom_page_id=1, profile_key="page-1-test", context=fake_context, page=fake_page, lock=asyncio.Lock(), last_saved_state_at=0.0 ) service._sessions[session.id] = session # 第一次调用 screenshot: 触发存储 res1 = run(service.screenshot(session.id)) assert res1 == b"screenshot-bytes" assert fake_context.storage_state.call_count == 1 # 记录第一次保存后的时间戳 first_save_time = session.last_saved_state_at assert first_save_time > 0 # 第二次立即调用 screenshot: 应该因为限流 10s 被跳过,不增加 call_count res2 = run(service.screenshot(session.id)) assert res2 == b"screenshot-bytes" assert fake_context.storage_state.call_count == 1 # 模拟 11 秒后(防抖时间已过)再度截图 session.last_saved_state_at = first_save_time - 11.0 res3 = run(service.screenshot(session.id)) assert res3 == b"screenshot-bytes" assert fake_context.storage_state.call_count == 2 # 测试临时 auth-capture 会话不触发任何 state 存储 ephemeral_session = BrowserSession( id="session-eph", custom_page_id=0, profile_key="auth-capture-xyz", context=fake_context, page=fake_page, lock=asyncio.Lock(), last_saved_state_at=0.0 ) service._sessions[ephemeral_session.id] = ephemeral_session run(service.screenshot(ephemeral_session.id)) # 它的 call_count 依然是 2,没有增加 assert fake_context.storage_state.call_count == 2 finally: get_settings().browser_profiles_dir = original_dir shutil.rmtree(temp_dir, ignore_errors=True) def test_close_saves_state_and_cleans_up(): import tempfile import shutil service = BrowserSessionService() temp_dir = tempfile.mkdtemp() original_dir = get_settings().browser_profiles_dir get_settings().browser_profiles_dir = temp_dir try: from unittest.mock import AsyncMock, MagicMock from app.services.browser_session_service import BrowserSession fake_context = AsyncMock() fake_page = MagicMock() fake_page.is_closed = MagicMock(return_value=False) import time session = BrowserSession( id="session_close", custom_page_id=2, profile_key="page-2-test", context=fake_context, page=fake_page, lock=asyncio.Lock(), last_saved_state_at=time.monotonic() # 此时在限流内 ) service._sessions[session.id] = session # 即使在限流时间内,close 也必须强制保存 run(service.close(session.id)) assert fake_context.storage_state.call_count == 1 assert fake_context.close.call_count == 1 # 测试 ephemeral 会话在 close 时不应该保存 state,并且其 profile_dir 应当被清理,导致 cookies json 不复存在 eph_context = AsyncMock() eph_page = MagicMock() eph_page.is_closed = MagicMock(return_value=False) eph_session = BrowserSession( id="session_eph_close", custom_page_id=0, profile_key="auth-capture-abc", context=eph_context, page=eph_page, lock=asyncio.Lock(), last_saved_state_at=0.0 ) service._sessions[eph_session.id] = eph_session # 先手动创建一个假 session-cookies.json eph_cookies_path = service._cookies_path(eph_session.profile_key) eph_cookies_path.write_text("{}") assert eph_cookies_path.exists() run(service.close(eph_session.id)) # ephemeral close 时不保存,所以 call_count 依然是 0 assert eph_context.storage_state.call_count == 0 assert eph_context.close.call_count == 1 # 但对应的 profile 目录已被删除,文件自然不复存在 assert not eph_cookies_path.exists() finally: get_settings().browser_profiles_dir = original_dir shutil.rmtree(temp_dir, ignore_errors=True) def test_restore_session_state_decoding_and_inject(): import json import tempfile import shutil import time service = BrowserSessionService() temp_dir = tempfile.mkdtemp() original_dir = get_settings().browser_profiles_dir get_settings().browser_profiles_dir = temp_dir try: from unittest.mock import AsyncMock fake_context = AsyncMock() profile_key = "test-restore-profile" # 准备假 cookies.json,包含 cookies 和 origins/localStorage cookies_path = service._cookies_path(profile_key) now = time.time() fake_state = { "cookies": [ { "name": "valid_persistent", "value": "123", "domain": "example.test", "path": "/", "expires": now + 3600 # 未过期 }, { "name": "expired_cookie", "value": "456", "domain": "example.test", "path": "/", "expires": now - 3600 # 已过期 }, { "name": "session_cookie", "value": "789", "domain": "example.test", "path": "/", "expires": -1 # session cookie,应被保留并剔除 expires 字段 } ], "origins": [ { "origin": "https://example.test", "localStorage": [ { "name": "theme", "value": "dark" } ] } ] } with open(cookies_path, "w", encoding='utf-8') as f: json.dump(fake_state, f) # 运行还原方法 run(service._restore_session_state(fake_context, profile_key)) # 检查是否成功调用 add_cookies assert fake_context.add_cookies.call_count == 1 # 检查过滤后的 cookies 内容 injected_cookies = fake_context.add_cookies.call_args[0][0] assert len(injected_cookies) == 2 names = [c["name"] for c in injected_cookies] assert "valid_persistent" in names assert "session_cookie" in names assert "expired_cookie" not in names # 校验 session_cookie 是否成功移除了 expires session_c = next(c for c in injected_cookies if c["name"] == "session_cookie") assert "expires" not in session_c # 检查是否成功调用了 add_init_script (用于还原 LocalStorage) assert fake_context.add_init_script.call_count == 1 init_script = fake_context.add_init_script.call_args[0][0] assert "window.localStorage.setItem" in init_script assert "theme" in init_script assert "dark" in init_script finally: get_settings().browser_profiles_dir = original_dir shutil.rmtree(temp_dir, ignore_errors=True) def test_websocket_event_saves_state(): import tempfile import shutil service = BrowserSessionService() temp_dir = tempfile.mkdtemp() original_dir = get_settings().browser_profiles_dir get_settings().browser_profiles_dir = temp_dir try: from unittest.mock import AsyncMock, MagicMock from app.services.browser_session_service import BrowserSession fake_context = AsyncMock() fake_page = MagicMock() fake_page.is_closed = MagicMock(return_value=False) fake_page.mouse = MagicMock() fake_page.mouse.click = AsyncMock() session = BrowserSession( id="session_ws", custom_page_id=3, profile_key="page-3-ws-test", context=fake_context, page=fake_page, lock=asyncio.Lock(), last_saved_state_at=0.0 ) service._sessions[session.id] = session # 即使 include_state=False,也应当在 5 秒节流到期后保存状态 run(service.event( session_id=session.id, event_type="click", payload={"x": 10.0, "y": 20.0}, include_state=False )) # storage_state 应该被调用,说明保存成功触发了 assert fake_context.storage_state.call_count == 1 assert session.last_saved_state_at > 0 finally: get_settings().browser_profiles_dir = original_dir shutil.rmtree(temp_dir, ignore_errors=True)