feat(upstreams): add batch test-all / check-now-all endpoints

- POST /api/upstreams/test-all: batch connection test for all enabled
  upstreams (no snapshot, no webhook); updates last_status, balance
- POST /api/upstreams/check-now-all: full batch sync (snapshot, diff,
  webhook, key sync, priority sync); mirrors single check-now behavior
- Both routes are registered before /{uid} to avoid path capture
- Skips disabled upstreams (status=skipped); single failure does not
  abort subsequent upstreams (serial execution)
- Returns UpstreamBatchActionResponse with per-item detail and summary

Refactor: extract _test_upstream_core(db, u) and _check_now_core(db, u)
- All four routes (single + batch × 2) now share the same core helpers
- Eliminates duplicate logic and future divergence risk

Frontend:
- Add UpstreamBatchActionResponse / Item / Summary TS types
- Add upstreamsApi.testAll() and upstreamsApi.checkNowAll()
- Add '一键测试' and '一键同步' buttons in Upstreams.vue toolbar
  (order: 一键测试 → 一键同步 → 刷新 → 新增上游)
- Buttons disabled when list is empty or another batch op is running
- On completion: refresh list + ElMessageBox with per-item failure detail
This commit is contained in:
liumangmang
2026-06-01 16:46:42 +08:00
parent a949969c4d
commit 871557e4ae
4 changed files with 358 additions and 107 deletions
+242 -106
View File
@@ -21,7 +21,8 @@ from app.schemas.upstream import (
GenerateKeysByGroupsRequest,
GenerateKeysByGroupsResponse,
GeneratedUpstreamKeyResponse,
UpstreamCreate, UpstreamUpdate, UpstreamResponse, SnapshotResponse, TestResult
UpstreamCreate, UpstreamUpdate, UpstreamResponse, SnapshotResponse, TestResult,
UpstreamBatchActionItem, UpstreamBatchActionSummary, UpstreamBatchActionResponse,
)
from app.services.upstream_client import UpstreamClient, UpstreamError, build_snapshot, mask_secret, _extract_key_value
from app.services.snapshot_service import diff_snapshots
@@ -513,6 +514,227 @@ def generate_keys_by_groups(
)
# ─── Shared core helpers ──────────────────────────────────────────────────────
def _test_upstream_core(db: Session, u: Upstream) -> UpstreamBatchActionItem:
"""连接测试核心逻辑,含余额拉取。与单行 test_upstream 行为一致。"""
auth_config = json.loads(u.auth_config_json or "{}")
with UpstreamClient(
base_url=u.base_url,
api_prefix=u.api_prefix,
auth_type=u.auth_type,
auth_config=auth_config,
timeout=float(u.timeout_seconds),
) as client:
client.login()
groups = client.get_available_groups(u.groups_endpoint)
# 余额(与单行 test_upstream 保持一致)
if u.balance_endpoint and u.balance_response_path:
try:
raw_balance = client.get_balance(u.balance_endpoint, u.balance_response_path)
if raw_balance is not None:
u.balance = raw_balance / (u.balance_divisor or 1.0)
u.balance_updated_at = datetime.now(timezone.utc) if raw_balance is not None else None
except Exception as exc:
logger.warning("test-all: upstream %s balance failed: %s", u.name, exc)
u.last_status = "healthy"
u.last_error = None
u.last_checked_at = datetime.now(timezone.utc)
u.consecutive_failures = 0
db.commit()
return UpstreamBatchActionItem(
upstream_id=u.id,
upstream_name=u.name,
status="success",
message=f"连接成功,获取到 {len(groups)} 个分组",
)
def _check_now_core(db: Session, u: Upstream) -> tuple[str, bool]:
"""完整同步核心逻辑:写快照、对比倍率、发 Webhook、同步 Key/优先级。
Returns:
(message, was_changed) — message 供调用方组装返回体。
"""
uid = u.id
auth_config = json.loads(u.auth_config_json or "{}")
with UpstreamClient(
base_url=u.base_url,
api_prefix=u.api_prefix,
auth_type=u.auth_type,
auth_config=auth_config,
timeout=float(u.timeout_seconds),
) as client:
client.login()
groups = client.get_available_groups(u.groups_endpoint)
raw_rates = client.get_group_rates(u.rate_endpoint)
snapshot = build_snapshot(uid, u.base_url, u.api_prefix, groups, raw_rates)
# 余额(可选)
if u.balance_endpoint and u.balance_response_path:
try:
raw_balance = client.get_balance(u.balance_endpoint, u.balance_response_path)
if raw_balance is not None:
u.balance = raw_balance / (u.balance_divisor or 1.0)
u.balance_updated_at = datetime.now(timezone.utc) if raw_balance is not None else None
except Exception as exc:
logger.warning("check-now: upstream %s balance failed: %s", u.name, exc)
# 写快照 & diff
prev_row = (
db.query(UpstreamRateSnapshot)
.filter(UpstreamRateSnapshot.upstream_id == uid)
.order_by(UpstreamRateSnapshot.captured_at.desc())
.first()
)
previous = json.loads(prev_row.snapshot_json) if prev_row else None
changes = diff_snapshots(previous, snapshot)
new_row = UpstreamRateSnapshot(
upstream_id=uid,
snapshot_json=json.dumps(snapshot, ensure_ascii=False),
captured_at=datetime.now(timezone.utc),
)
db.add(new_row)
was_unhealthy = u.last_status == "unhealthy"
u.last_status = "healthy"
u.last_checked_at = datetime.now(timezone.utc)
u.last_error = None
u.consecutive_failures = 0
db.commit()
if was_unhealthy:
webhook_service.send_status_event(db, uid, u.name, u.base_url, "upstream_recovered")
# 先同步 Key 状态(标记 orphaned),再执行优先级同步
from app.services.scheduler import _sync_upstream_keys as _synck
_synck(uid, snapshot, new_row.captured_at)
if changes:
webhook_service.send_rate_changed(db, uid, u.name, u.base_url, changes)
website_sync.sync_affected_bindings(db, uid, changes)
website_sync.sync_account_priorities_for_upstream(db, uid)
msg = f"检测成功,{len(groups)} 个分组"
if changes:
msg += f",发现 {len(changes)} 处倍率变化"
elif previous is None:
msg += ",初始化快照完成"
else:
msg += ",无变化"
return msg, bool(changes)
@router.post("/test-all", response_model=UpstreamBatchActionResponse)
def test_all_upstreams(db: Session = Depends(get_db), _=Depends(get_current_user)):
"""批量测试所有启用上游的连接(不写快照,不触发 Webhook)。"""
upstreams = db.query(Upstream).order_by(Upstream.id).all()
items: list[UpstreamBatchActionItem] = []
success_count = failed_count = skipped_count = 0
for u in upstreams:
if not u.enabled:
items.append(UpstreamBatchActionItem(
upstream_id=u.id,
upstream_name=u.name,
status="skipped",
message="上游已停用,跳过",
))
skipped_count += 1
continue
try:
item = _test_upstream_core(db, u)
items.append(item)
success_count += 1
except Exception as exc:
u.last_status = "unhealthy"
u.last_error = str(exc)
u.last_checked_at = datetime.now(timezone.utc)
u.consecutive_failures = (u.consecutive_failures or 0) + 1
db.commit()
items.append(UpstreamBatchActionItem(
upstream_id=u.id,
upstream_name=u.name,
status="failed",
message="连接失败",
detail=str(exc),
))
failed_count += 1
total = len(upstreams)
overall_ok = failed_count == 0
msg = f"完成:{success_count} 成功 / {failed_count} 失败 / {skipped_count} 跳过"
return UpstreamBatchActionResponse(
success=overall_ok,
message=msg,
summary=UpstreamBatchActionSummary(
total=total,
success=success_count,
failed=failed_count,
skipped=skipped_count,
),
items=items,
)
@router.post("/check-now-all", response_model=UpstreamBatchActionResponse)
def check_now_all_upstreams(db: Session = Depends(get_db), _=Depends(get_current_user)):
"""批量全量同步所有启用上游:拉取倍率 → 写快照 → 对比变化 → Webhook → 同步 Key。"""
upstreams = db.query(Upstream).order_by(Upstream.id).all()
items: list[UpstreamBatchActionItem] = []
success_count = failed_count = skipped_count = 0
for u in upstreams:
if not u.enabled:
items.append(UpstreamBatchActionItem(
upstream_id=u.id,
upstream_name=u.name,
status="skipped",
message="上游已停用,跳过",
))
skipped_count += 1
continue
try:
detail_msg, _ = _check_now_core(db, u)
items.append(UpstreamBatchActionItem(
upstream_id=u.id,
upstream_name=u.name,
status="success",
message=detail_msg,
))
success_count += 1
except Exception as exc:
u.consecutive_failures = (u.consecutive_failures or 0) + 1
u.last_error = str(exc)
u.last_checked_at = datetime.now(timezone.utc)
db.commit()
items.append(UpstreamBatchActionItem(
upstream_id=u.id,
upstream_name=u.name,
status="failed",
message="同步失败",
detail=str(exc),
))
failed_count += 1
total = len(upstreams)
overall_ok = failed_count == 0
msg = f"完成:{success_count} 成功 / {failed_count} 失败 / {skipped_count} 跳过"
return UpstreamBatchActionResponse(
success=overall_ok,
message=msg,
summary=UpstreamBatchActionSummary(
total=total,
success=success_count,
failed=failed_count,
skipped=skipped_count,
),
items=items,
)
# ─── CRUD ─────────────────────────────────────────────────────────────────────
@router.post("", response_model=UpstreamResponse, status_code=201)
def create_upstream(
body: UpstreamCreate,
@@ -597,40 +819,16 @@ def test_upstream(uid: int, db: Session = Depends(get_db), _=Depends(get_current
u = db.query(Upstream).filter(Upstream.id == uid).first()
if not u:
raise HTTPException(404, "upstream not found")
auth_config = json.loads(u.auth_config_json or "{}")
with UpstreamClient(
base_url=u.base_url,
api_prefix=u.api_prefix,
auth_type=u.auth_type,
auth_config=auth_config,
timeout=float(u.timeout_seconds),
) as client:
try:
client.login()
groups = client.get_available_groups(u.groups_endpoint)
# Also try balance if configured
if u.balance_endpoint and u.balance_response_path:
try:
raw_balance = client.get_balance(u.balance_endpoint, u.balance_response_path)
if raw_balance is not None:
divisor = u.balance_divisor or 1.0
u.balance = raw_balance / divisor
u.balance_updated_at = datetime.now(timezone.utc) if raw_balance is not None else None
except Exception as exc:
logger.warning("upstream %s balance fetch failed during test: %s", u.name, exc)
u.last_status = "healthy"
u.last_error = None
u.last_checked_at = datetime.now(timezone.utc)
u.consecutive_failures = 0
db.commit()
return TestResult(success=True, message=f"连接成功,获取到 {len(groups)} 个分组")
except Exception as exc:
u.last_status = "unhealthy"
u.last_error = str(exc)
u.last_checked_at = datetime.now(timezone.utc)
u.consecutive_failures = (u.consecutive_failures or 0) + 1
db.commit()
return TestResult(success=False, message="连接失败", detail=str(exc))
try:
item = _test_upstream_core(db, u)
return TestResult(success=True, message=item.message)
except Exception as exc:
u.last_status = "unhealthy"
u.last_error = str(exc)
u.last_checked_at = datetime.now(timezone.utc)
u.consecutive_failures = (u.consecutive_failures or 0) + 1
db.commit()
return TestResult(success=False, message="连接失败", detail=str(exc))
@router.post("/{uid}/check-now", response_model=TestResult)
@@ -638,77 +836,15 @@ def check_now(uid: int, db: Session = Depends(get_db), _=Depends(get_current_use
u = db.query(Upstream).filter(Upstream.id == uid).first()
if not u:
raise HTTPException(404, "upstream not found")
auth_config = json.loads(u.auth_config_json or "{}")
with UpstreamClient(
base_url=u.base_url,
api_prefix=u.api_prefix,
auth_type=u.auth_type,
auth_config=auth_config,
timeout=float(u.timeout_seconds),
) as client:
try:
client.login()
groups = client.get_available_groups(u.groups_endpoint)
raw_rates = client.get_group_rates(u.rate_endpoint)
snapshot = build_snapshot(u.id, u.base_url, u.api_prefix, groups, raw_rates)
# Also try balance if configured
if u.balance_endpoint and u.balance_response_path:
try:
raw_balance = client.get_balance(u.balance_endpoint, u.balance_response_path)
if raw_balance is not None:
divisor = u.balance_divisor or 1.0
u.balance = raw_balance / divisor
u.balance_updated_at = datetime.now(timezone.utc) if raw_balance is not None else None
except Exception as exc:
logger.warning("upstream %s balance fetch failed during check-now: %s", u.name, exc)
except Exception as exc:
u.consecutive_failures = (u.consecutive_failures or 0) + 1
u.last_error = str(exc)
u.last_checked_at = datetime.now(timezone.utc)
db.commit()
return TestResult(success=False, message="检测失败", detail=str(exc))
prev_row = (
db.query(UpstreamRateSnapshot)
.filter(UpstreamRateSnapshot.upstream_id == uid)
.order_by(UpstreamRateSnapshot.captured_at.desc())
.first()
)
previous = json.loads(prev_row.snapshot_json) if prev_row else None
changes = diff_snapshots(previous, snapshot)
new_row = UpstreamRateSnapshot(
upstream_id=uid,
snapshot_json=json.dumps(snapshot, ensure_ascii=False),
captured_at=datetime.now(timezone.utc),
)
db.add(new_row)
was_unhealthy = u.last_status == "unhealthy"
u.last_status = "healthy"
u.last_checked_at = datetime.now(timezone.utc)
u.last_error = None
u.consecutive_failures = 0
db.commit()
if was_unhealthy:
webhook_service.send_status_event(db, u.id, u.name, u.base_url, "upstream_recovered")
# 先同步 Key 状态(标记 orphaned),再执行优先级同步(避免未标记的 key 参与计算)
from app.services.scheduler import _sync_upstream_keys as _synck
_synck(uid, snapshot, new_row.captured_at)
if changes:
webhook_service.send_rate_changed(db, u.id, u.name, u.base_url, changes)
website_sync.sync_affected_bindings(db, u.id, changes)
website_sync.sync_account_priorities_for_upstream(db, u.id)
msg = f"检测成功,{len(groups)} 个分组"
if changes:
msg += f",发现 {len(changes)} 处倍率变化"
elif previous is None:
msg += ",初始化快照完成"
else:
msg += ",无变化"
return TestResult(success=True, message=msg)
try:
msg, _ = _check_now_core(db, u)
return TestResult(success=True, message=msg)
except Exception as exc:
u.consecutive_failures = (u.consecutive_failures or 0) + 1
u.last_error = str(exc)
u.last_checked_at = datetime.now(timezone.utc)
db.commit()
return TestResult(success=False, message="检测失败", detail=str(exc))
@router.get("/{uid}/snapshots/latest", response_model=SnapshotResponse)