From 871557e4aeb5929588ec1b359c82895278b71991 Mon Sep 17 00:00:00 2001 From: liumangmang Date: Mon, 1 Jun 2026 16:46:42 +0800 Subject: [PATCH] feat(upstreams): add batch test-all / check-now-all endpoints MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - POST /api/upstreams/test-all: batch connection test for all enabled upstreams (no snapshot, no webhook); updates last_status, balance - POST /api/upstreams/check-now-all: full batch sync (snapshot, diff, webhook, key sync, priority sync); mirrors single check-now behavior - Both routes are registered before /{uid} to avoid path capture - Skips disabled upstreams (status=skipped); single failure does not abort subsequent upstreams (serial execution) - Returns UpstreamBatchActionResponse with per-item detail and summary Refactor: extract _test_upstream_core(db, u) and _check_now_core(db, u) - All four routes (single + batch × 2) now share the same core helpers - Eliminates duplicate logic and future divergence risk Frontend: - Add UpstreamBatchActionResponse / Item / Summary TS types - Add upstreamsApi.testAll() and upstreamsApi.checkNowAll() - Add '一键测试' and '一键同步' buttons in Upstreams.vue toolbar (order: 一键测试 → 一键同步 → 刷新 → 新增上游) - Buttons disabled when list is empty or another batch op is running - On completion: refresh list + ElMessageBox with per-item failure detail --- backend/app/routers/upstreams.py | 348 +++++++++++++++++++++---------- backend/app/schemas/upstream.py | 22 ++ frontend/src/api/index.ts | 24 +++ frontend/src/views/Upstreams.vue | 71 ++++++- 4 files changed, 358 insertions(+), 107 deletions(-) diff --git a/backend/app/routers/upstreams.py b/backend/app/routers/upstreams.py index abff652..39eafa3 100644 --- a/backend/app/routers/upstreams.py +++ b/backend/app/routers/upstreams.py @@ -21,7 +21,8 @@ from app.schemas.upstream import ( GenerateKeysByGroupsRequest, GenerateKeysByGroupsResponse, GeneratedUpstreamKeyResponse, - UpstreamCreate, UpstreamUpdate, UpstreamResponse, SnapshotResponse, TestResult + UpstreamCreate, UpstreamUpdate, UpstreamResponse, SnapshotResponse, TestResult, + UpstreamBatchActionItem, UpstreamBatchActionSummary, UpstreamBatchActionResponse, ) from app.services.upstream_client import UpstreamClient, UpstreamError, build_snapshot, mask_secret, _extract_key_value from app.services.snapshot_service import diff_snapshots @@ -513,6 +514,227 @@ def generate_keys_by_groups( ) +# ─── Shared core helpers ────────────────────────────────────────────────────── + +def _test_upstream_core(db: Session, u: Upstream) -> UpstreamBatchActionItem: + """连接测试核心逻辑,含余额拉取。与单行 test_upstream 行为一致。""" + auth_config = json.loads(u.auth_config_json or "{}") + with UpstreamClient( + base_url=u.base_url, + api_prefix=u.api_prefix, + auth_type=u.auth_type, + auth_config=auth_config, + timeout=float(u.timeout_seconds), + ) as client: + client.login() + groups = client.get_available_groups(u.groups_endpoint) + # 余额(与单行 test_upstream 保持一致) + if u.balance_endpoint and u.balance_response_path: + try: + raw_balance = client.get_balance(u.balance_endpoint, u.balance_response_path) + if raw_balance is not None: + u.balance = raw_balance / (u.balance_divisor or 1.0) + u.balance_updated_at = datetime.now(timezone.utc) if raw_balance is not None else None + except Exception as exc: + logger.warning("test-all: upstream %s balance failed: %s", u.name, exc) + u.last_status = "healthy" + u.last_error = None + u.last_checked_at = datetime.now(timezone.utc) + u.consecutive_failures = 0 + db.commit() + return UpstreamBatchActionItem( + upstream_id=u.id, + upstream_name=u.name, + status="success", + message=f"连接成功,获取到 {len(groups)} 个分组", + ) + + +def _check_now_core(db: Session, u: Upstream) -> tuple[str, bool]: + """完整同步核心逻辑:写快照、对比倍率、发 Webhook、同步 Key/优先级。 + + Returns: + (message, was_changed) — message 供调用方组装返回体。 + """ + uid = u.id + auth_config = json.loads(u.auth_config_json or "{}") + with UpstreamClient( + base_url=u.base_url, + api_prefix=u.api_prefix, + auth_type=u.auth_type, + auth_config=auth_config, + timeout=float(u.timeout_seconds), + ) as client: + client.login() + groups = client.get_available_groups(u.groups_endpoint) + raw_rates = client.get_group_rates(u.rate_endpoint) + snapshot = build_snapshot(uid, u.base_url, u.api_prefix, groups, raw_rates) + # 余额(可选) + if u.balance_endpoint and u.balance_response_path: + try: + raw_balance = client.get_balance(u.balance_endpoint, u.balance_response_path) + if raw_balance is not None: + u.balance = raw_balance / (u.balance_divisor or 1.0) + u.balance_updated_at = datetime.now(timezone.utc) if raw_balance is not None else None + except Exception as exc: + logger.warning("check-now: upstream %s balance failed: %s", u.name, exc) + + # 写快照 & diff + prev_row = ( + db.query(UpstreamRateSnapshot) + .filter(UpstreamRateSnapshot.upstream_id == uid) + .order_by(UpstreamRateSnapshot.captured_at.desc()) + .first() + ) + previous = json.loads(prev_row.snapshot_json) if prev_row else None + changes = diff_snapshots(previous, snapshot) + + new_row = UpstreamRateSnapshot( + upstream_id=uid, + snapshot_json=json.dumps(snapshot, ensure_ascii=False), + captured_at=datetime.now(timezone.utc), + ) + db.add(new_row) + was_unhealthy = u.last_status == "unhealthy" + u.last_status = "healthy" + u.last_checked_at = datetime.now(timezone.utc) + u.last_error = None + u.consecutive_failures = 0 + db.commit() + + if was_unhealthy: + webhook_service.send_status_event(db, uid, u.name, u.base_url, "upstream_recovered") + + # 先同步 Key 状态(标记 orphaned),再执行优先级同步 + from app.services.scheduler import _sync_upstream_keys as _synck + _synck(uid, snapshot, new_row.captured_at) + + if changes: + webhook_service.send_rate_changed(db, uid, u.name, u.base_url, changes) + website_sync.sync_affected_bindings(db, uid, changes) + website_sync.sync_account_priorities_for_upstream(db, uid) + + msg = f"检测成功,{len(groups)} 个分组" + if changes: + msg += f",发现 {len(changes)} 处倍率变化" + elif previous is None: + msg += ",初始化快照完成" + else: + msg += ",无变化" + return msg, bool(changes) + + +@router.post("/test-all", response_model=UpstreamBatchActionResponse) +def test_all_upstreams(db: Session = Depends(get_db), _=Depends(get_current_user)): + """批量测试所有启用上游的连接(不写快照,不触发 Webhook)。""" + upstreams = db.query(Upstream).order_by(Upstream.id).all() + items: list[UpstreamBatchActionItem] = [] + success_count = failed_count = skipped_count = 0 + + for u in upstreams: + if not u.enabled: + items.append(UpstreamBatchActionItem( + upstream_id=u.id, + upstream_name=u.name, + status="skipped", + message="上游已停用,跳过", + )) + skipped_count += 1 + continue + try: + item = _test_upstream_core(db, u) + items.append(item) + success_count += 1 + except Exception as exc: + u.last_status = "unhealthy" + u.last_error = str(exc) + u.last_checked_at = datetime.now(timezone.utc) + u.consecutive_failures = (u.consecutive_failures or 0) + 1 + db.commit() + items.append(UpstreamBatchActionItem( + upstream_id=u.id, + upstream_name=u.name, + status="failed", + message="连接失败", + detail=str(exc), + )) + failed_count += 1 + + total = len(upstreams) + overall_ok = failed_count == 0 + msg = f"完成:{success_count} 成功 / {failed_count} 失败 / {skipped_count} 跳过" + return UpstreamBatchActionResponse( + success=overall_ok, + message=msg, + summary=UpstreamBatchActionSummary( + total=total, + success=success_count, + failed=failed_count, + skipped=skipped_count, + ), + items=items, + ) + + +@router.post("/check-now-all", response_model=UpstreamBatchActionResponse) +def check_now_all_upstreams(db: Session = Depends(get_db), _=Depends(get_current_user)): + """批量全量同步所有启用上游:拉取倍率 → 写快照 → 对比变化 → Webhook → 同步 Key。""" + upstreams = db.query(Upstream).order_by(Upstream.id).all() + items: list[UpstreamBatchActionItem] = [] + success_count = failed_count = skipped_count = 0 + + for u in upstreams: + if not u.enabled: + items.append(UpstreamBatchActionItem( + upstream_id=u.id, + upstream_name=u.name, + status="skipped", + message="上游已停用,跳过", + )) + skipped_count += 1 + continue + try: + detail_msg, _ = _check_now_core(db, u) + items.append(UpstreamBatchActionItem( + upstream_id=u.id, + upstream_name=u.name, + status="success", + message=detail_msg, + )) + success_count += 1 + + except Exception as exc: + u.consecutive_failures = (u.consecutive_failures or 0) + 1 + u.last_error = str(exc) + u.last_checked_at = datetime.now(timezone.utc) + db.commit() + items.append(UpstreamBatchActionItem( + upstream_id=u.id, + upstream_name=u.name, + status="failed", + message="同步失败", + detail=str(exc), + )) + failed_count += 1 + + total = len(upstreams) + overall_ok = failed_count == 0 + msg = f"完成:{success_count} 成功 / {failed_count} 失败 / {skipped_count} 跳过" + return UpstreamBatchActionResponse( + success=overall_ok, + message=msg, + summary=UpstreamBatchActionSummary( + total=total, + success=success_count, + failed=failed_count, + skipped=skipped_count, + ), + items=items, + ) + + +# ─── CRUD ───────────────────────────────────────────────────────────────────── + @router.post("", response_model=UpstreamResponse, status_code=201) def create_upstream( body: UpstreamCreate, @@ -597,40 +819,16 @@ def test_upstream(uid: int, db: Session = Depends(get_db), _=Depends(get_current u = db.query(Upstream).filter(Upstream.id == uid).first() if not u: raise HTTPException(404, "upstream not found") - auth_config = json.loads(u.auth_config_json or "{}") - with UpstreamClient( - base_url=u.base_url, - api_prefix=u.api_prefix, - auth_type=u.auth_type, - auth_config=auth_config, - timeout=float(u.timeout_seconds), - ) as client: - try: - client.login() - groups = client.get_available_groups(u.groups_endpoint) - # Also try balance if configured - if u.balance_endpoint and u.balance_response_path: - try: - raw_balance = client.get_balance(u.balance_endpoint, u.balance_response_path) - if raw_balance is not None: - divisor = u.balance_divisor or 1.0 - u.balance = raw_balance / divisor - u.balance_updated_at = datetime.now(timezone.utc) if raw_balance is not None else None - except Exception as exc: - logger.warning("upstream %s balance fetch failed during test: %s", u.name, exc) - u.last_status = "healthy" - u.last_error = None - u.last_checked_at = datetime.now(timezone.utc) - u.consecutive_failures = 0 - db.commit() - return TestResult(success=True, message=f"连接成功,获取到 {len(groups)} 个分组") - except Exception as exc: - u.last_status = "unhealthy" - u.last_error = str(exc) - u.last_checked_at = datetime.now(timezone.utc) - u.consecutive_failures = (u.consecutive_failures or 0) + 1 - db.commit() - return TestResult(success=False, message="连接失败", detail=str(exc)) + try: + item = _test_upstream_core(db, u) + return TestResult(success=True, message=item.message) + except Exception as exc: + u.last_status = "unhealthy" + u.last_error = str(exc) + u.last_checked_at = datetime.now(timezone.utc) + u.consecutive_failures = (u.consecutive_failures or 0) + 1 + db.commit() + return TestResult(success=False, message="连接失败", detail=str(exc)) @router.post("/{uid}/check-now", response_model=TestResult) @@ -638,77 +836,15 @@ def check_now(uid: int, db: Session = Depends(get_db), _=Depends(get_current_use u = db.query(Upstream).filter(Upstream.id == uid).first() if not u: raise HTTPException(404, "upstream not found") - auth_config = json.loads(u.auth_config_json or "{}") - with UpstreamClient( - base_url=u.base_url, - api_prefix=u.api_prefix, - auth_type=u.auth_type, - auth_config=auth_config, - timeout=float(u.timeout_seconds), - ) as client: - try: - client.login() - groups = client.get_available_groups(u.groups_endpoint) - raw_rates = client.get_group_rates(u.rate_endpoint) - snapshot = build_snapshot(u.id, u.base_url, u.api_prefix, groups, raw_rates) - # Also try balance if configured - if u.balance_endpoint and u.balance_response_path: - try: - raw_balance = client.get_balance(u.balance_endpoint, u.balance_response_path) - if raw_balance is not None: - divisor = u.balance_divisor or 1.0 - u.balance = raw_balance / divisor - u.balance_updated_at = datetime.now(timezone.utc) if raw_balance is not None else None - except Exception as exc: - logger.warning("upstream %s balance fetch failed during check-now: %s", u.name, exc) - except Exception as exc: - u.consecutive_failures = (u.consecutive_failures or 0) + 1 - u.last_error = str(exc) - u.last_checked_at = datetime.now(timezone.utc) - db.commit() - return TestResult(success=False, message="检测失败", detail=str(exc)) - - prev_row = ( - db.query(UpstreamRateSnapshot) - .filter(UpstreamRateSnapshot.upstream_id == uid) - .order_by(UpstreamRateSnapshot.captured_at.desc()) - .first() - ) - previous = json.loads(prev_row.snapshot_json) if prev_row else None - changes = diff_snapshots(previous, snapshot) - - new_row = UpstreamRateSnapshot( - upstream_id=uid, - snapshot_json=json.dumps(snapshot, ensure_ascii=False), - captured_at=datetime.now(timezone.utc), - ) - db.add(new_row) - was_unhealthy = u.last_status == "unhealthy" - u.last_status = "healthy" - u.last_checked_at = datetime.now(timezone.utc) - u.last_error = None - u.consecutive_failures = 0 - db.commit() - - if was_unhealthy: - webhook_service.send_status_event(db, u.id, u.name, u.base_url, "upstream_recovered") - # 先同步 Key 状态(标记 orphaned),再执行优先级同步(避免未标记的 key 参与计算) - from app.services.scheduler import _sync_upstream_keys as _synck - _synck(uid, snapshot, new_row.captured_at) - - if changes: - webhook_service.send_rate_changed(db, u.id, u.name, u.base_url, changes) - website_sync.sync_affected_bindings(db, u.id, changes) - website_sync.sync_account_priorities_for_upstream(db, u.id) - - msg = f"检测成功,{len(groups)} 个分组" - if changes: - msg += f",发现 {len(changes)} 处倍率变化" - elif previous is None: - msg += ",初始化快照完成" - else: - msg += ",无变化" - return TestResult(success=True, message=msg) + try: + msg, _ = _check_now_core(db, u) + return TestResult(success=True, message=msg) + except Exception as exc: + u.consecutive_failures = (u.consecutive_failures or 0) + 1 + u.last_error = str(exc) + u.last_checked_at = datetime.now(timezone.utc) + db.commit() + return TestResult(success=False, message="检测失败", detail=str(exc)) @router.get("/{uid}/snapshots/latest", response_model=SnapshotResponse) diff --git a/backend/app/schemas/upstream.py b/backend/app/schemas/upstream.py index b25978c..8d09efb 100644 --- a/backend/app/schemas/upstream.py +++ b/backend/app/schemas/upstream.py @@ -128,3 +128,25 @@ class GenerateKeysByGroupsResponse(BaseModel): success: bool message: str items: list[GeneratedUpstreamKeyResponse] + + +class UpstreamBatchActionItem(BaseModel): + upstream_id: int + upstream_name: str + status: str # success | failed | skipped + message: str + detail: Optional[str] = None + + +class UpstreamBatchActionSummary(BaseModel): + total: int + success: int + failed: int + skipped: int + + +class UpstreamBatchActionResponse(BaseModel): + success: bool + message: str + summary: UpstreamBatchActionSummary + items: list[UpstreamBatchActionItem] diff --git a/frontend/src/api/index.ts b/frontend/src/api/index.ts index 8217a85..2e46ae4 100644 --- a/frontend/src/api/index.ts +++ b/frontend/src/api/index.ts @@ -139,6 +139,28 @@ export interface GenerateKeysByGroupsForm { endpoint: string } +export interface UpstreamBatchActionItem { + upstream_id: number + upstream_name: string + status: 'success' | 'failed' | 'skipped' + message: string + detail?: string | null +} + +export interface UpstreamBatchActionSummary { + total: number + success: number + failed: number + skipped: number +} + +export interface UpstreamBatchActionResponse { + success: boolean + message: string + summary: UpstreamBatchActionSummary + items: UpstreamBatchActionItem[] +} + export const upstreamsApi = { list: () => api.get('/api/upstreams'), create: (data: UpstreamForm) => api.post('/api/upstreams', data), @@ -152,6 +174,8 @@ export const upstreamsApi = { latestSnapshot: (id: number) => api.get(`/api/upstreams/${id}/snapshots/latest`), listSnapshots: (id: number, limit = 20, offset = 0) => api.get(`/api/upstreams/${id}/snapshots`, { params: { limit, offset } }), + testAll: () => api.post('/api/upstreams/test-all'), + checkNowAll: () => api.post('/api/upstreams/check-now-all'), } // ——— Websites ——— diff --git a/frontend/src/views/Upstreams.vue b/frontend/src/views/Upstreams.vue index 6ac0127..6869f39 100644 --- a/frontend/src/views/Upstreams.vue +++ b/frontend/src/views/Upstreams.vue @@ -7,6 +7,25 @@

检测与变更控制台

+ 一键测试 + 一键同步 刷新 新增上游
@@ -397,11 +416,13 @@ import { ElMessage, ElMessageBox } from 'element-plus' import type { FormInstance } from 'element-plus' import dayjs from 'dayjs' import { Refresh, Plus, Edit, List, Delete, Warning, Clock, ArrowRight, Pointer, Key } from '@element-plus/icons-vue' -import { upstreamsApi, type GeneratedUpstreamKey, type UpstreamData } from '@/api' +import { upstreamsApi, type GeneratedUpstreamKey, type UpstreamData, type UpstreamBatchActionResponse } from '@/api' import AuthCaptureDialog from '@/components/AuthCaptureDialog.vue' const list = ref<(UpstreamData & { _testing?: boolean; _checking?: boolean })[]>([]) const tableLoading = ref(false) +const testingAll = ref(false) +const checkingAll = ref(false) const drawerVisible = ref(false) const saving = ref(false) @@ -812,6 +833,54 @@ async function confirmDelete(row: UpstreamData) { } } +function _showBatchResult(res: UpstreamBatchActionResponse, actionLabel: string) { + const { summary, items, message } = res + const failed = items.filter(i => i.status === 'failed') + + if (failed.length > 0) { + const failedLines = failed + .map(i => `• ${i.upstream_name}:${i.detail || i.message}`) + .join('\n') + ElMessageBox.alert( + `${message}\n\n失败明细:\n${failedLines}`, + `${actionLabel}完成`, + { + type: 'warning', + confirmButtonText: '知道了', + customStyle: { whiteSpace: 'pre-wrap', maxHeight: '60vh', overflowY: 'auto' }, + } + ) + } else { + ElMessage[res.success ? 'success' : 'warning'](message) + } +} + +async function testAll() { + testingAll.value = true + try { + const res = await upstreamsApi.testAll() + await loadList() + _showBatchResult(res.data, '一键测试') + } catch (e: any) { + ElMessage.error(e.response?.data?.detail || '批量测试失败') + } finally { + testingAll.value = false + } +} + +async function checkNowAll() { + checkingAll.value = true + try { + const res = await upstreamsApi.checkNowAll() + await loadList() + _showBatchResult(res.data, '一键同步') + } catch (e: any) { + ElMessage.error(e.response?.data?.detail || '批量同步失败') + } finally { + checkingAll.value = false + } +} + onMounted(loadList)