"""One-time credential import sessions for real-browser auth capture.""" from __future__ import annotations import hashlib import secrets import time from dataclasses import dataclass, field from typing import Any from app.services.auth_capture_service import _curate_candidates, _find_new_api_user IMPORT_SESSION_TTL_SECONDS = 600 class ImportSessionError(RuntimeError): pass @dataclass class BrowserImportSession: id: str secret_hash: str target_url: str created_by: str expires_at: float payload: dict[str, Any] | None = None consumed: bool = False created_at: float = field(default_factory=time.time) def _hash_secret(secret: str) -> str: return hashlib.sha256(secret.encode("utf-8")).hexdigest() def _normalize_storage(value: Any) -> dict[str, str]: if not isinstance(value, dict): return {} result: dict[str, str] = {} for key, item in value.items(): if item is None: continue result[str(key)] = item if isinstance(item, str) else str(item) return result def _normalize_cookies(value: Any) -> list[dict[str, Any]]: if not isinstance(value, list): return [] result: list[dict[str, Any]] = [] for item in value: if not isinstance(item, dict): continue name = str(item.get("name") or "").strip() cookie_value = str(item.get("value") or "") if not name or not cookie_value: continue result.append({ "name": name, "value": cookie_value, "domain": str(item.get("domain") or ""), "path": str(item.get("path") or "/"), "httpOnly": bool(item.get("httpOnly", False)), "secure": bool(item.get("secure", False)), }) return result def _normalize_headers(value: Any) -> list[dict[str, str]]: if not isinstance(value, list): return [] result: list[dict[str, str]] = [] for item in value: if not isinstance(item, dict): continue header_value = str(item.get("value") or "").strip() if not header_value: continue result.append({ "type": str(item.get("type") or "authorization"), "value": header_value, "url": str(item.get("url") or ""), }) return result def build_import_result(payload: dict[str, Any]) -> dict[str, Any]: """Convert extension-submitted payload into auth-capture result shape.""" page_url = str(payload.get("page_url") or payload.get("url") or "") cookies = _normalize_cookies(payload.get("cookies")) storage = _normalize_storage(payload.get("local_storage") or payload.get("storage")) session_storage = _normalize_storage(payload.get("session_storage")) auth_headers = _normalize_headers(payload.get("auth_headers")) new_api_user = _find_new_api_user(storage, session_storage) candidates = _curate_candidates( cookies=cookies, local_storage=storage, session_storage=session_storage, auth_headers=auth_headers, new_api_user=new_api_user, page_url=page_url, ) return { "cookies": cookies, "storage": storage, "session_storage": session_storage, "auth_headers": auth_headers, "candidates": candidates, } class BrowserImportService: def __init__(self) -> None: self._sessions: dict[str, BrowserImportSession] = {} def create(self, target_url: str, created_by: str) -> tuple[BrowserImportSession, str]: self.cleanup() session_id = secrets.token_urlsafe(18) secret = secrets.token_urlsafe(24) session = BrowserImportSession( id=session_id, secret_hash=_hash_secret(secret), target_url=target_url, created_by=created_by, expires_at=time.time() + IMPORT_SESSION_TTL_SECONDS, ) self._sessions[session_id] = session return session, secret def get(self, session_id: str, created_by: str | None = None) -> BrowserImportSession: self.cleanup() session = self._sessions.get(session_id) if not session: raise ImportSessionError("import session not found") if created_by is not None and session.created_by != created_by: raise ImportSessionError("import session not found") if session.expires_at <= time.time(): self._sessions.pop(session_id, None) raise ImportSessionError("import session expired") return session def submit(self, session_id: str, secret: str, payload: dict[str, Any]) -> BrowserImportSession: session = self.get(session_id) if session.consumed: raise ImportSessionError("import session already consumed") if not secrets.compare_digest(session.secret_hash, _hash_secret(secret)): raise ImportSessionError("invalid import secret") session.payload = payload session.consumed = True return session def cleanup(self) -> None: now = time.time() expired = [sid for sid, session in self._sessions.items() if session.expires_at <= now] for sid in expired: self._sessions.pop(sid, None) browser_imports = BrowserImportService()