Files
SmartUp/backend/app/routers/custom_pages.py
T
liumangmang 4f9acdc99c feat(auth-capture): full cookie bundle extraction + richer refresh-auth
Problem: Meow upstream uses Cloudflare, which sets cf_clearance + session
cookies that must all be sent together. The old code only captured a single
session-named cookie via a whitelist, discarding cf_clearance entirely, and
wrote back only 'name=value' instead of the full cookie string.

Changes:

auth_capture_service.py:
  - Add _cookie_matches_hostname(): hostname suffix matching supporting
    dot-prefixed domains (.saki.lat matches api.saki.lat)
  - Add _build_cookie_bundle(): collects ALL cookies matching the current
    page's hostname, returns complete 'name1=v1; name2=v2' string
  - _curate_candidates(): new 'cookie_bundle' candidate type (type=0 in sort,
    highest priority), carries cookie_count + cookie_names in extra fields
  - extract_all(): obtain real-time page URL from session.page.url and pass
    to _curate_candidates so cookie domain filtering is accurate
  - Sort order: cookie_bundle > cookie > bearer_token/api_key > credential
  - Fix bug in original JWT dedup check (was assigning instead of checking)

custom_pages.py:
  - Add logging import + logger
  - _pick_best_candidate(): cookie preferred_auth_type now tries cookie_bundle
    first, then single cookie; bearer/api_key use existing type_map logic
  - RefreshAuthResponse: add optional 'warning' field
  - refresh_auth(): handle ctype='cookie_bundle' same as 'cookie'; always
    write full candidate.value as cookie_string (works for both types)
  - Post-write validation: attempt get_available_groups with new credentials;
    on failure, still commit (lenient mode) but set warning message explaining
    cf_clearance IP-binding as the likely cause; success logs at INFO level

Tests (test_auth_capture.py, 19 cases):
  - _cookie_matches_hostname: exact, dot-prefix subdomain, empty domain,
    different domain, evil-subdomain partial match rejection
  - _build_cookie_bundle: cf_clearance included, cross-domain excluded,
    single cookie, empty value excluded, no cookies
  - _curate_candidates: bundle ranks first, value is full string, bundle
    beats single session cookie, bearer wins when no cookies, empty case,
    cookie_count/cookie_names in extra, session fallback preserved,
    new_api_user propagation to bundle

All 46 tests pass.
2026-06-02 09:32:23 +08:00

731 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Custom pages CRUD router + authenticated iframe proxy."""
from __future__ import annotations
import logging
import re
from datetime import datetime, timezone
from typing import Any, List, Literal, Optional
from urllib.parse import parse_qs, parse_qsl, urlencode, urljoin, urlparse
import httpx
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from fastapi.responses import Response
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.database import get_db
from app.models.admin_user import AdminUser
from app.models.custom_page import CustomPage
from app.models.upstream import Upstream
from app.services.upstream_client import _find_user_id
from app.services.auth_capture_service import extract_all
from app.services.browser_session_service import browser_sessions
from app.utils.auth import decode_token, get_current_user, get_user_from_token_param
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/custom-pages", tags=["custom-pages"])
# Headers that prevent iframe embedding — strip them from proxied responses
_STRIP_RESPONSE_HEADERS = {
"x-frame-options",
"content-security-policy",
"content-security-policy-report-only",
}
# Headers we should NOT forward to the upstream (hop-by-hop + host)
_STRIP_REQUEST_HEADERS = {"host", "connection", "transfer-encoding", "te",
"trailers", "upgrade", "proxy-authorization"}
# ---- Schemas ----
class CustomPageCreate(BaseModel):
name: str
url: str
icon: str = "Link"
sort_order: int = 0
enabled: bool = True
use_proxy: bool = False
access_mode: Literal["direct", "proxy", "remote_browser"] = "direct"
description: Optional[str] = None
login_username: Optional[str] = None
login_password: Optional[str] = None
login_username_selector: Optional[str] = None
login_password_selector: Optional[str] = None
login_submit_selector: Optional[str] = None
login_autofill_enabled: bool = False
linked_upstream_id: Optional[int] = None
class CustomPageUpdate(BaseModel):
name: Optional[str] = None
url: Optional[str] = None
icon: Optional[str] = None
sort_order: Optional[int] = None
enabled: Optional[bool] = None
use_proxy: Optional[bool] = None
access_mode: Optional[Literal["direct", "proxy", "remote_browser"]] = None
description: Optional[str] = None
login_username: Optional[str] = None
login_password: Optional[str] = None
login_username_selector: Optional[str] = None
login_password_selector: Optional[str] = None
login_submit_selector: Optional[str] = None
login_autofill_enabled: Optional[bool] = None
login_password_clear: Optional[bool] = None
linked_upstream_id: Optional[int] = None
class CustomPageResponse(BaseModel):
id: int
name: str
url: str
icon: str
sort_order: int
enabled: bool
use_proxy: bool
access_mode: str
description: Optional[str]
login_username: Optional[str]
login_username_selector: Optional[str]
login_password_selector: Optional[str]
login_submit_selector: Optional[str]
login_autofill_enabled: bool
login_password_configured: bool
linked_upstream_id: Optional[int]
created_at: datetime
updated_at: datetime
model_config = {"from_attributes": True}
def _blank_to_none(value: Optional[str]) -> Optional[str]:
if value is None:
return None
stripped = value.strip()
return stripped or None
def _has_login_credentials(username: Optional[str], password: Optional[str]) -> bool:
return bool(_blank_to_none(username) and _blank_to_none(password))
def _page_response(page: CustomPage) -> CustomPageResponse:
return CustomPageResponse(
id=page.id,
name=page.name,
url=page.url,
icon=page.icon,
sort_order=page.sort_order,
enabled=page.enabled,
use_proxy=page.use_proxy,
access_mode=page.access_mode,
description=page.description,
login_username=page.login_username,
login_username_selector=page.login_username_selector,
login_password_selector=page.login_password_selector,
login_submit_selector=page.login_submit_selector,
login_autofill_enabled=page.login_autofill_enabled,
login_password_configured=bool(page.login_password),
linked_upstream_id=page.linked_upstream_id,
created_at=page.created_at,
updated_at=page.updated_at,
)
# ---- CRUD Endpoints ----
@router.get("", response_model=List[CustomPageResponse])
def list_pages(db: Session = Depends(get_db), _=Depends(get_current_user)):
pages = db.query(CustomPage).order_by(CustomPage.sort_order, CustomPage.id).all()
return [_page_response(page) for page in pages]
@router.post("", response_model=CustomPageResponse, status_code=201)
def create_page(body: CustomPageCreate, db: Session = Depends(get_db), _=Depends(get_current_user)):
data = body.model_dump()
data["use_proxy"] = data["access_mode"] == "proxy"
for key in (
"login_username",
"login_password",
"login_username_selector",
"login_password_selector",
"login_submit_selector",
):
data[key] = _blank_to_none(data.get(key))
if "login_autofill_enabled" not in body.model_fields_set and _has_login_credentials(data.get("login_username"), data.get("login_password")):
data["login_autofill_enabled"] = True
page = CustomPage(**data)
db.add(page)
db.commit()
db.refresh(page)
return _page_response(page)
@router.put("/{pid}", response_model=CustomPageResponse)
def update_page(pid: int, body: CustomPageUpdate, db: Session = Depends(get_db), _=Depends(get_current_user)):
page = db.query(CustomPage).filter(CustomPage.id == pid).first()
if not page:
raise HTTPException(404, "page not found")
data = body.model_dump(exclude_none=True)
fields_set = body.model_fields_set
if "access_mode" in data:
data["use_proxy"] = data["access_mode"] == "proxy"
elif "use_proxy" in data:
data["access_mode"] = "proxy" if data["use_proxy"] else "direct"
for key in (
"login_username",
"login_username_selector",
"login_password_selector",
"login_submit_selector",
):
if key in data:
data[key] = _blank_to_none(data[key])
new_password_saved = False
if "login_password" in data:
# Empty password on update means "keep the existing secret"; the API never echoes it back.
password = data.pop("login_password")
if password and password.strip():
data["login_password"] = password
new_password_saved = True
if data.pop("login_password_clear", False):
data["login_password"] = None
next_username = data.get("login_username", page.login_username)
next_password = data.get("login_password", page.login_password)
if "login_autofill_enabled" not in fields_set and new_password_saved and _has_login_credentials(next_username, next_password):
data["login_autofill_enabled"] = True
for k, v in data.items():
setattr(page, k, v)
page.updated_at = datetime.now(timezone.utc)
db.commit()
db.refresh(page)
return _page_response(page)
@router.delete("/{pid}", status_code=204)
def delete_page(pid: int, db: Session = Depends(get_db), _=Depends(get_current_user)):
page = db.query(CustomPage).filter(CustomPage.id == pid).first()
if not page:
raise HTTPException(404, "page not found")
db.delete(page)
db.commit()
# ---- One-click refresh auth ----
import json as _json
class RefreshAuthResponse(BaseModel):
success: bool
message: str
warning: Optional[str] = None
def _pick_best_candidate(candidates: list[dict], preferred_auth_type: str) -> Optional[dict]:
if not candidates:
return None
# cookie_bundle > cookie > bearer_token > api_key
# preferred_auth_type="cookie" 时优先匹配 bundle,其次单 cookie
if preferred_auth_type == "cookie":
for c in candidates:
if c["type"] == "cookie_bundle":
return c
for c in candidates:
if c["type"] == "cookie":
return c
elif preferred_auth_type in ("bearer", "api_key"):
type_map = {"bearer": "bearer_token", "api_key": "api_key"}
preferred = type_map.get(preferred_auth_type)
if preferred:
for c in candidates:
if c["type"] == preferred:
return c
# fallback:排序后取第一个
return candidates[0]
@router.post("/{pid}/refresh-auth", response_model=RefreshAuthResponse)
async def refresh_auth(pid: int, db: Session = Depends(get_db), _=Depends(get_current_user)):
page = db.query(CustomPage).filter(CustomPage.id == pid).first()
if not page:
raise HTTPException(404, "page not found")
if page.access_mode != "remote_browser":
raise HTTPException(400, "page is not in remote_browser mode")
if not page.linked_upstream_id:
raise HTTPException(400, "page has no linked upstream")
upstream = db.query(Upstream).filter(Upstream.id == page.linked_upstream_id).first()
if not upstream:
raise HTTPException(404, "linked upstream not found")
try:
session = browser_sessions.find_by_page_id(page.id)
except KeyError:
return RefreshAuthResponse(success=False, message="请先打开远程浏览器并登录")
try:
result = await extract_all(session)
except Exception as exc:
return RefreshAuthResponse(success=False, message=f"提取失败: {exc}")
candidates = result.get("candidates", [])
candidate = _pick_best_candidate(candidates, upstream.auth_type)
if not candidate:
return RefreshAuthResponse(success=False, message="未提取到有效凭证,请确认已在远程浏览器中登录")
existing_config = _json.loads(upstream.auth_config_json or "{}")
ctype = candidate["type"]
if ctype in ("cookie_bundle", "cookie"):
upstream.auth_type = "cookie"
# cookie_bundle.value 已是完整 cookie_stringcookie.value 是 "name=value" 格式
existing_config["cookie_string"] = candidate.get("value", "")
if candidate.get("new_api_user"):
existing_config["new_api_user"] = candidate["new_api_user"]
elif ctype == "bearer_token":
upstream.auth_type = "bearer"
raw = candidate.get("value", "")
# Clean up: strip whitespace, remove "Bearer " prefix if present
token = raw.strip()
if token.startswith("Bearer "):
token = token[7:].strip()
# Validate token can be used as HTTP header value
try:
token.encode("latin-1")
except UnicodeEncodeError:
return RefreshAuthResponse(
success=False,
message="提取到的 Token 含有非 HTTP 标头字符,请确认已在远程浏览器中正确登录并重试",
)
existing_config["token"] = token
elif ctype == "api_key":
upstream.auth_type = "api_key"
existing_config["key"] = candidate.get("value", "")
existing_config.setdefault("header", "X-API-Key")
upstream.auth_config_json = _json.dumps(existing_config, ensure_ascii=False)
upstream.updated_at = datetime.now(timezone.utc)
# ── 宽松验证:写回后尝试调用 get_available_groups 验证凭证可用性 ──
# 失败时仍然 commit(凭证已写入),但在 message 里说明验证失败
# 这样用户仍能看到新凭证已写入,便于 debugcf_clearance 绑 IP 时验证必然失败)
warning_msg: Optional[str] = None
try:
from app.services.upstream_client import UpstreamClient
groups_endpoint = upstream.groups_endpoint or "/groups/available"
new_auth_config = _json.loads(upstream.auth_config_json)
with UpstreamClient(
base_url=upstream.base_url,
api_prefix=upstream.api_prefix or "",
auth_type=upstream.auth_type,
auth_config=new_auth_config,
timeout=float(upstream.timeout_seconds or 30),
) as uc:
uc.get_available_groups(groups_endpoint)
logger.info("refresh_auth: upstream %s credential verification passed", upstream.id)
except Exception as exc:
warning_msg = (
f"凭证已写入但 API 验证失败:{exc}"
"若 SmartUp 与远程浏览器不在同一 IPcf_clearance 可能无法复用,请手动测试连接。"
)
logger.warning(
"refresh_auth: upstream %s credential verification failed (written anyway): %s",
upstream.id, exc,
)
db.commit()
auth_type_label = upstream.auth_type
cookie_count = candidate.get("cookie_count", "")
count_str = f"{cookie_count} 个 cookie" if cookie_count else ""
success_msg = f"凭证已刷新 ({auth_type_label}{count_str})"
return RefreshAuthResponse(success=True, message=success_msg, warning=warning_msg)
# ---- Frame Proxy (simple: strip X-Frame-Options / CSP, pass through content) ----
_STRIP_RESP = {
"x-frame-options",
"content-security-policy",
"content-security-policy-report-only",
}
_STRIP_REQ = {
"host", "connection", "transfer-encoding", "te",
"trailers", "upgrade", "proxy-authorization", "authorization",
}
_PROXY_STATE: dict[int, dict[str, Any]] = {}
def _origin(url: str) -> str:
parsed = urlparse(url)
if not parsed.scheme or not parsed.netloc:
return ""
return f"{parsed.scheme}://{parsed.netloc}"
def _same_origin(a: str, b: str) -> bool:
return _origin(a).rstrip("/") == _origin(b).rstrip("/")
# ── TTL 缓存:origin → upstream_id30 秒自动过期,无手动 invalidate)──
import time as _time
from functools import wraps as _wraps
def _ttl_cache(ttl_seconds: int = 30, maxsize: int = 64):
def decorator(fn):
cache: dict = {}
@_wraps(fn)
def wrapper(*args):
key = args
now = _time.monotonic()
if key in cache:
value, ts = cache[key]
if now - ts < ttl_seconds:
return value
value = fn(*args)
if len(cache) >= maxsize:
cache.pop(next(iter(cache)))
cache[key] = (value, now)
return value
return wrapper
return decorator
def _find_matching_upstream(db: Session, page: CustomPage) -> Optional[Upstream]:
page_origin = _origin(page.url)
if not page_origin:
return None
upstream_id = _origin_to_upstream_id(page_origin)
if upstream_id is None:
return None
return db.query(Upstream).filter(Upstream.id == upstream_id).first()
@_ttl_cache(ttl_seconds=30, maxsize=64)
def _origin_to_upstream_id(origin: str) -> Optional[int]:
"""按 origin 查匹配的上游 ID,结果缓存 30 秒。"""
from app.database import SessionLocal
from app.models.upstream import Upstream
with SessionLocal() as sess:
for upstream in sess.query(Upstream).order_by(Upstream.id).all():
if _origin(upstream.base_url) == origin:
return upstream.id
return None
def _headers_for_upstream(request: Request, state: Optional[dict[str, Any]] = None) -> dict[str, str]:
fwd: dict[str, str] = {}
for k, v in request.headers.items():
lk = k.lower()
if lk in _STRIP_REQ or lk.startswith("x-forwarded"):
continue
fwd[k] = v
fwd["user-agent"] = (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
)
fwd.setdefault("accept", "text/html,application/xhtml+xml,*/*;q=0.8")
if state and state.get("new_api_user"):
fwd["New-Api-User"] = str(state["new_api_user"])
return fwd
async def _ensure_new_api_state(page_id: int, upstream: Optional[Upstream]) -> Optional[dict[str, Any]]:
if not upstream or upstream.auth_type != "login_password":
return None
cached = _PROXY_STATE.get(page_id)
if cached and cached.get("cookies"):
return cached
import json
cfg = json.loads(upstream.auth_config_json or "{}")
email = cfg.get("email", "")
password = cfg.get("password", "")
if not email or not password:
return None
login_path = cfg.get("login_path", "/api/user/login")
username_field = cfg.get("username_field", "username")
login_url = urljoin(upstream.base_url.rstrip("/") + "/", login_path.lstrip("/"))
async with httpx.AsyncClient(follow_redirects=True, timeout=float(upstream.timeout_seconds)) as client:
resp = await client.post(
login_url,
json={username_field: email, "password": password},
headers={
"Accept": "application/json",
"Content-Type": "application/json",
"User-Agent": "SmartUp/1.0",
},
)
resp.raise_for_status()
try:
payload = resp.json()
except ValueError:
payload = {}
cookies = dict(resp.cookies)
if not cookies:
return None
state = {
"cookies": cookies,
"new_api_user": cfg.get("new_api_user", "") or _find_user_id(payload),
}
_PROXY_STATE[page_id] = state
return state
def _with_token(url: str, token: Optional[str]) -> str:
if not token:
return url
sep = "&" if "?" in url else "?"
return f"{url}{sep}token={token}"
def _token_from_request(request: Request, token: Optional[str]) -> Optional[str]:
if token:
return token
ref = request.headers.get("referer", "")
if not ref:
return None
parsed = urlparse(ref)
values = parse_qs(parsed.query).get("token", [])
return values[0] if values else None
def _require_proxy_user(request: Request, token: Optional[str], db: Session) -> None:
raw = _token_from_request(request, token)
if not raw:
raise HTTPException(401, "Not authenticated")
email = decode_token(raw)
if not email:
raise HTTPException(401, "Invalid token")
user = db.query(AdminUser).filter(AdminUser.email == email).first()
if not user:
raise HTTPException(401, "User not found")
def _rewrite_html(content: bytes, page_id: int, target_url: str, token: Optional[str]) -> bytes:
try:
html = content.decode("utf-8")
except UnicodeDecodeError:
return content
proxy_root = f"/api/custom-pages/{page_id}/proxy"
target_origin = _origin(target_url)
def rewrite_url(value: str) -> str:
if value.startswith(("data:", "blob:", "mailto:", "tel:", "#", "javascript:")):
return value
if value.startswith(proxy_root):
return value
if value.startswith("//"):
absolute = f"{urlparse(target_url).scheme}:{value}"
if _same_origin(absolute, target_url):
return _with_token(f"{proxy_root}{urlparse(absolute).path or '/'}", token)
return value
if value.startswith(("http://", "https://")):
if _same_origin(value, target_url):
parsed = urlparse(value)
proxied = f"{proxy_root}{parsed.path or '/'}" + (f"?{parsed.query}" if parsed.query else "")
return _with_token(proxied, token)
return value
if value.startswith("/"):
return _with_token(f"{proxy_root}{value}", token)
absolute = urljoin(target_url, value)
if _origin(absolute) == target_origin:
parsed = urlparse(absolute)
proxied = f"{proxy_root}{parsed.path or '/'}" + (f"?{parsed.query}" if parsed.query else "")
return _with_token(proxied, token)
return value
html = re.sub(
r'(?P<attr>\b(?:src|href|action)=)(?P<quote>["\'])(?P<url>[^"\']+)(?P=quote)',
lambda m: f"{m.group('attr')}{m.group('quote')}{rewrite_url(m.group('url'))}{m.group('quote')}",
html,
flags=re.IGNORECASE,
)
inject = f"""
<script>
(function() {{
var root = {proxy_root!r};
var token = {token or ''!r};
function withToken(url) {{
if (!token || url.indexOf('token=') !== -1) return url;
return url + (url.indexOf('?') === -1 ? '?' : '&') + 'token=' + encodeURIComponent(token);
}}
function proxify(input) {{
if (typeof input !== 'string') return input;
if (input.indexOf(root) === 0) return withToken(input);
if (input.indexOf('/') === 0) return withToken(root + input);
try {{
var url = new URL(input, window.location.href);
if (url.origin === window.location.origin && url.pathname.indexOf(root) !== 0) {{
return withToken(root + url.pathname + url.search + url.hash);
}}
}} catch (e) {{}}
return input;
}}
var oldFetch = window.fetch;
if (oldFetch) {{
window.fetch = function(input, init) {{
if (typeof input === 'string') input = proxify(input);
else if (input && input.url) input = new Request(proxify(input.url), input);
return oldFetch.call(this, input, init);
}};
}}
var oldOpen = XMLHttpRequest.prototype.open;
XMLHttpRequest.prototype.open = function(method, url) {{
arguments[1] = proxify(url);
return oldOpen.apply(this, arguments);
}};
}})();
</script>
"""
if "</head>" in html:
html = html.replace("</head>", inject + "</head>", 1)
else:
html = inject + html
return html.encode("utf-8")
async def _proxy_to_page(
request: Request,
page: CustomPage,
target_url: str,
state: Optional[dict[str, Any]],
) -> httpx.Response:
body = await request.body() if request.method not in ("GET", "HEAD") else None
async with httpx.AsyncClient(follow_redirects=True, timeout=30) as client:
return await client.request(
method=request.method,
url=target_url,
headers=_headers_for_upstream(request, state),
cookies=(state or {}).get("cookies", {}),
content=body,
)
def _response_from_upstream(
resp: httpx.Response,
page_id: int,
target_url: str,
token: Optional[str],
) -> Response:
out: dict[str, str] = {}
for k, v in resp.headers.items():
kl = k.lower()
if kl in _STRIP_RESP:
continue
if kl in ("content-encoding", "transfer-encoding", "content-length", "set-cookie"):
continue
out[k] = v
content = resp.content
content_type = resp.headers.get("content-type", "")
if "text/html" in content_type:
content = _rewrite_html(content, page_id, target_url, token)
return Response(
content=content,
status_code=resp.status_code,
media_type=content_type,
headers=out,
)
@router.api_route("/{pid}/proxy", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD"])
@router.api_route("/{pid}/proxy/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD"])
async def page_proxy(
pid: int,
request: Request,
path: str = "",
token: Optional[str] = Query(default=None),
db: Session = Depends(get_db),
):
_require_proxy_user(request, token, db)
page = db.query(CustomPage).filter(CustomPage.id == pid).first()
if not page or not page.enabled:
raise HTTPException(404, "page not found")
if not page.url.startswith(("http://", "https://")):
raise HTTPException(400, "Only http/https URLs are allowed")
base = page.url.rstrip("/") + "/"
target_url = urljoin(base, path or "")
query = urlencode([(k, v) for k, v in parse_qsl(request.url.query, keep_blank_values=True) if k != "token"])
if query:
target_url += f"?{query}"
upstream = _find_matching_upstream(db, page)
state = await _ensure_new_api_state(pid, upstream)
try:
resp = await _proxy_to_page(request, page, target_url, state)
if resp.status_code == 401 and upstream:
_PROXY_STATE.pop(pid, None)
state = await _ensure_new_api_state(pid, upstream)
resp = await _proxy_to_page(request, page, target_url, state)
except httpx.RequestError as exc:
raise HTTPException(502, f"Proxy error: {exc}")
except httpx.HTTPStatusError as exc:
raise HTTPException(exc.response.status_code, exc.response.text)
return _response_from_upstream(resp, pid, target_url, _token_from_request(request, token))
@router.api_route("/frame-proxy", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD"])
async def frame_proxy(
request: Request,
url: str = Query(..., description="Target URL to proxy"),
token: Optional[str] = Query(default=None),
_=Depends(get_user_from_token_param),
):
"""
Simple transparent proxy: strips X-Frame-Options and CSP headers so the
response can be embedded in an iframe.
NOTE: For full SPA (React/Vue) sites, install the 'Requestly' browser
extension and set a rule to remove X-Frame-Options on the target domain —
that works reliably without any server-side complexity.
"""
if not url.startswith(("http://", "https://")):
raise HTTPException(400, "Only http/https URLs are allowed")
# Forward browser headers (cookies, language, accept, etc.)
fwd: dict[str, str] = {}
for k, v in request.headers.items():
if k.lower() in _STRIP_REQ or k.lower().startswith("x-forwarded"):
continue
fwd[k] = v
fwd["user-agent"] = (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
)
fwd.setdefault("accept", "text/html,application/xhtml+xml,*/*;q=0.8")
body = await request.body() if request.method not in ("GET", "HEAD") else None
try:
async with httpx.AsyncClient(follow_redirects=True, timeout=30) as client:
resp = await client.request(
method=request.method,
url=url,
headers=fwd,
content=body,
)
except httpx.RequestError as exc:
raise HTTPException(502, f"Proxy error: {exc}")
# Pass through content unchanged — just strip the iframe-blocking headers
out: dict[str, str] = {}
for k, v in resp.headers.items():
kl = k.lower()
if kl in _STRIP_RESP:
continue
if kl in ("content-encoding", "transfer-encoding", "content-length"):
continue
out[k] = v
return Response(
content=resp.content,
status_code=resp.status_code,
media_type=resp.headers.get("content-type"),
headers=out,
)