Files
SmartUp/backend/app/services/browser_import_service.py
T
2026-06-02 13:51:29 +08:00

160 lines
5.2 KiB
Python

"""One-time credential import sessions for real-browser auth capture."""
from __future__ import annotations
import hashlib
import secrets
import time
from dataclasses import dataclass, field
from typing import Any
from app.services.auth_capture_service import _curate_candidates, _find_new_api_user
IMPORT_SESSION_TTL_SECONDS = 600
class ImportSessionError(RuntimeError):
pass
@dataclass
class BrowserImportSession:
id: str
secret_hash: str
target_url: str
created_by: str
expires_at: float
payload: dict[str, Any] | None = None
consumed: bool = False
created_at: float = field(default_factory=time.time)
def _hash_secret(secret: str) -> str:
return hashlib.sha256(secret.encode("utf-8")).hexdigest()
def _normalize_storage(value: Any) -> dict[str, str]:
if not isinstance(value, dict):
return {}
result: dict[str, str] = {}
for key, item in value.items():
if item is None:
continue
result[str(key)] = item if isinstance(item, str) else str(item)
return result
def _normalize_cookies(value: Any) -> list[dict[str, Any]]:
if not isinstance(value, list):
return []
result: list[dict[str, Any]] = []
for item in value:
if not isinstance(item, dict):
continue
name = str(item.get("name") or "").strip()
cookie_value = str(item.get("value") or "")
if not name or not cookie_value:
continue
result.append({
"name": name,
"value": cookie_value,
"domain": str(item.get("domain") or ""),
"path": str(item.get("path") or "/"),
"httpOnly": bool(item.get("httpOnly", False)),
"secure": bool(item.get("secure", False)),
})
return result
def _normalize_headers(value: Any) -> list[dict[str, str]]:
if not isinstance(value, list):
return []
result: list[dict[str, str]] = []
for item in value:
if not isinstance(item, dict):
continue
header_value = str(item.get("value") or "").strip()
if not header_value:
continue
result.append({
"type": str(item.get("type") or "authorization"),
"value": header_value,
"url": str(item.get("url") or ""),
})
return result
def build_import_result(payload: dict[str, Any]) -> dict[str, Any]:
"""Convert extension-submitted payload into auth-capture result shape."""
page_url = str(payload.get("page_url") or payload.get("url") or "")
cookies = _normalize_cookies(payload.get("cookies"))
storage = _normalize_storage(payload.get("local_storage") or payload.get("storage"))
session_storage = _normalize_storage(payload.get("session_storage"))
auth_headers = _normalize_headers(payload.get("auth_headers"))
new_api_user = _find_new_api_user(storage, session_storage)
candidates = _curate_candidates(
cookies=cookies,
local_storage=storage,
session_storage=session_storage,
auth_headers=auth_headers,
new_api_user=new_api_user,
page_url=page_url,
)
return {
"cookies": cookies,
"storage": storage,
"session_storage": session_storage,
"auth_headers": auth_headers,
"candidates": candidates,
}
class BrowserImportService:
def __init__(self) -> None:
self._sessions: dict[str, BrowserImportSession] = {}
def create(self, target_url: str, created_by: str) -> tuple[BrowserImportSession, str]:
self.cleanup()
session_id = secrets.token_urlsafe(18)
secret = secrets.token_urlsafe(24)
session = BrowserImportSession(
id=session_id,
secret_hash=_hash_secret(secret),
target_url=target_url,
created_by=created_by,
expires_at=time.time() + IMPORT_SESSION_TTL_SECONDS,
)
self._sessions[session_id] = session
return session, secret
def get(self, session_id: str, created_by: str | None = None) -> BrowserImportSession:
self.cleanup()
session = self._sessions.get(session_id)
if not session:
raise ImportSessionError("import session not found")
if created_by is not None and session.created_by != created_by:
raise ImportSessionError("import session not found")
if session.expires_at <= time.time():
self._sessions.pop(session_id, None)
raise ImportSessionError("import session expired")
return session
def submit(self, session_id: str, secret: str, payload: dict[str, Any]) -> BrowserImportSession:
session = self.get(session_id)
if session.consumed:
raise ImportSessionError("import session already consumed")
if not secrets.compare_digest(session.secret_hash, _hash_secret(secret)):
raise ImportSessionError("invalid import secret")
session.payload = payload
session.consumed = True
return session
def cleanup(self) -> None:
now = time.time()
expired = [sid for sid, session in self._sessions.items() if session.expires_at <= now]
for sid in expired:
self._sessions.pop(sid, None)
browser_imports = BrowserImportService()