From bea4344bb35638b7807c86adbbdee61488eba384 Mon Sep 17 00:00:00 2001 From: liumangmang Date: Mon, 1 Jun 2026 11:29:37 +0800 Subject: [PATCH] fix: reconcile upstream keys on list/generate/import to prevent stale key imports - Extract reconcile_upstream_keys() to website_sync.py (shared scheduler + on-demand) - Add reconcile_upstream_keys_full() for on-demand reconciliation at three entry points: list_generated_keys, generate_keys_by_groups, import_upstream_keys_as_accounts - Safe on failure: active_group_ids=None / remote_key_ids=None skip cleanup - Support custom managed_prefix via _fetch_remote_managed_key_ids() helper - Exclude orphaned keys from frontend importable list - Remove hardcoded search='SmartUp' from scheduler path --- backend/app/routers/upstreams.py | 11 +++ backend/app/routers/websites.py | 28 +++++- backend/app/services/scheduler.py | 54 ++---------- backend/app/services/website_sync.py | 127 +++++++++++++++++++++++++++ frontend/src/views/Websites.vue | 1 + 5 files changed, 175 insertions(+), 46 deletions(-) diff --git a/backend/app/routers/upstreams.py b/backend/app/routers/upstreams.py index df261c4..c589f05 100644 --- a/backend/app/routers/upstreams.py +++ b/backend/app/routers/upstreams.py @@ -127,6 +127,11 @@ def list_upstreams(db: Session = Depends(get_db), _=Depends(get_current_user)): def list_generated_keys(uid: int, db: Session = Depends(get_db), _=Depends(get_current_user)): if not db.query(Upstream.id).filter(Upstream.id == uid).first(): raise HTTPException(404, "upstream not found") + # 返回前强制对账,清理远端已删除的 Key + try: + website_sync.reconcile_upstream_keys_full(db, uid) + except Exception as exc: + logger.warning("list_generated_keys reconcile failed for %s: %s", uid, exc) rows = ( db.query(UpstreamGeneratedKey) .filter(UpstreamGeneratedKey.upstream_id == uid) @@ -293,6 +298,12 @@ def generate_keys_by_groups( if u.api_prefix.strip("/") != "api/v1": raise HTTPException(400, "首版仅支持 Sub2API 上游(API Prefix 应为 /api/v1)") + # 生成前先对账,清理远端已删除的旧 Key + try: + website_sync.reconcile_upstream_keys_full(db, uid) + except Exception as exc: + logger.warning("generate_keys_by_groups reconcile failed for %s: %s", uid, exc) + auth_config = json.loads(u.auth_config_json or "{}") selected = set(body.group_ids) prefix = body.name_prefix diff --git a/backend/app/routers/websites.py b/backend/app/routers/websites.py index be199f2..f186fe6 100644 --- a/backend/app/routers/websites.py +++ b/backend/app/routers/websites.py @@ -34,7 +34,7 @@ from app.schemas.website import ( ) from app.services.website_client import Sub2ApiWebsiteClient -from app.services.website_sync import binding_sources, sync_binding, build_rate_priority_map +from app.services.website_sync import binding_sources, sync_binding, build_rate_priority_map, reconcile_upstream_keys_full from app.utils.auth import get_current_user router = APIRouter(tags=["websites"]) @@ -483,6 +483,20 @@ def import_upstream_keys_as_accounts( .order_by(UpstreamGeneratedKey.id) .all() ) + # 导入前尝试对账(失败不阻塞,仅打日志—避免远端不可达时误删本地 Key) + upstream_ids = {row.upstream_id for row in rows} + for uid in upstream_ids: + try: + reconcile_upstream_keys_full(db, uid) + except Exception as exc: + logger.warning("import reconcile failed for upstream %s: %s", uid, exc) + # 重新查询(对账成功时已清理失效 Key) + rows = ( + db.query(UpstreamGeneratedKey) + .filter(UpstreamGeneratedKey.id.in_(body.upstream_key_ids)) + .order_by(UpstreamGeneratedKey.id) + .all() + ) found_ids = {row.id for row in rows} missing_ids = [kid for kid in body.upstream_key_ids if kid not in found_ids] items: list[ImportAccountItem] = [ @@ -518,6 +532,18 @@ def import_upstream_keys_as_accounts( with _client(website) as c: for row in rows: + # 跳过远端已不存在或导入失败的 Key(对账后标记为 orphaned / import_failed) + if row.status in ("orphaned", "failed", "import_failed"): + items.append(ImportAccountItem( + upstream_key_id=row.id, + source_group_id=row.group_id, + source_group_name=row.group_name, + target_group_id=body.target_group_map.get(row.group_id), + platform=body.default_platform, + status="failed", + message="远端 Key 已不存在,请重新生成", + )) + continue # 先确定平台(失败项也需要记录) if body.platform_mode == "auto": platform = _detect_platform( diff --git a/backend/app/services/scheduler.py b/backend/app/services/scheduler.py index c5c0406..2c1ef82 100644 --- a/backend/app/services/scheduler.py +++ b/backend/app/services/scheduler.py @@ -11,9 +11,8 @@ from sqlalchemy.orm import Session from app.database import SessionLocal from app.models.upstream import Upstream -from app.models.upstream_key import UpstreamGeneratedKey from app.models.snapshot import UpstreamRateSnapshot -from app.services.upstream_client import UpstreamClient, UpstreamError, build_snapshot +from app.services.upstream_client import UpstreamClient, build_snapshot from app.services.snapshot_service import diff_snapshots, prune_snapshots from app.services import webhook_service from app.services import website_sync @@ -217,26 +216,19 @@ def _notify_balance_low( def _sync_upstream_keys(upstream_id: int, snapshot: dict[str, Any], captured_at: datetime) -> None: - """上游检测成功后同步 SmartUp Key 状态(远端删除/分组删除)。""" + """上游检测成功后同步 SmartUp Key 状态(远端删除/分组删除)。 + + 委托给 website_sync.reconcile_upstream_keys 实现核心逻辑。 + """ db = SessionLocal() try: active_group_ids = set(snapshot.get("groups", {}).keys()) - key_rows = ( - db.query(UpstreamGeneratedKey) - .filter( - UpstreamGeneratedKey.upstream_id == upstream_id, - UpstreamGeneratedKey.key_name.like("SmartUp-%"), - ) - .all() - ) - auth_config = json.loads( - db.query(Upstream).filter(Upstream.id == upstream_id).first().auth_config_json or "{}" - ) # 用 UpstreamClient 查询远端活跃 Key ID 集合 - remote_key_ids: set[str] | None = None # None=查询失败,set()=查询成功但为空 + remote_key_ids: set[str] | None = None try: upstream = db.query(Upstream).filter(Upstream.id == upstream_id).first() if upstream: + auth_config = json.loads(upstream.auth_config_json or "{}") with UpstreamClient( base_url=upstream.base_url, api_prefix=upstream.api_prefix, @@ -245,39 +237,11 @@ def _sync_upstream_keys(upstream_id: int, snapshot: dict[str, Any], captured_at: timeout=float(upstream.timeout_seconds), ) as client: client.login() - remote_keys = client.list_api_keys(search="SmartUp", status="active") - remote_key_ids = { - str(k["id"]) for k in remote_keys if k.get("id") - } + remote_key_ids = website_sync._fetch_remote_managed_key_ids(db, client, upstream_id) except Exception as exc: logger.warning("sync upstream keys list failed for %s: %s", upstream_id, exc) - for row in key_rows: - # 1. 分组已不在当前快照中 → 删除本地记录 - if row.group_id not in active_group_ids: - if row.imported_website_id and row.imported_account_id: - row.status = "orphaned" - row.error = "来源分组已不存在" - row.updated_at = captured_at - logger.info("marked key %s orphaned (group %s no longer in snapshot)", row.id, row.group_id) - else: - db.delete(row) - logger.info("removed key %s (group %s no longer in snapshot)", row.id, row.group_id) - continue - # 2. 远端查询成功但 key_id 不在列表中 → 删除本地记录 - if row.key_id and remote_key_ids is not None and row.key_id not in remote_key_ids: - if row.imported_website_id and row.imported_account_id: - row.status = "orphaned" - row.error = "远端 Key 已不存在" - row.updated_at = captured_at - logger.info("marked key %s orphaned (key_id %s gone from remote)", row.id, row.key_id) - else: - db.delete(row) - logger.info("removed key %s (key_id %s gone from remote)", row.id, row.key_id) - continue - # 3. 更新同步时间戳(仅当查询成功且 Key 仍在远端时) - if remote_key_ids is not None and row.key_id in remote_key_ids: - row.updated_at = captured_at + website_sync.reconcile_upstream_keys(db, upstream_id, active_group_ids, remote_key_ids, captured_at) db.commit() except Exception: logger.exception("key sync failed for upstream %s", upstream_id) diff --git a/backend/app/services/website_sync.py b/backend/app/services/website_sync.py index 9edd33f..7b76094 100644 --- a/backend/app/services/website_sync.py +++ b/backend/app/services/website_sync.py @@ -12,6 +12,7 @@ from app.models.upstream import Upstream from app.models.upstream_key import UpstreamGeneratedKey from app.models.website import Website, WebsiteGroupBinding, WebsiteSyncLog from app.services.website_client import Sub2ApiWebsiteClient, WebsiteError, calculate_target_rate, decimal_string +from app.services.upstream_client import UpstreamClient from app.services import webhook_service logger = logging.getLogger(__name__) @@ -417,6 +418,132 @@ def sync_account_priorities_for_upstream(db: Session, upstream_id: int) -> list[ return all_results +def _fetch_remote_managed_key_ids(db: Session, client, upstream_id: int) -> set[str]: + """查询本地 distinct managed_prefix,分别拉远端活跃 Key ID 集合。 + + 返回全部找到的远端 Key ID(合并多个 prefix 的结果)。 + """ + prefixes = [ + row[0] for row in + db.query(UpstreamGeneratedKey.managed_prefix) + .filter( + UpstreamGeneratedKey.upstream_id == upstream_id, + UpstreamGeneratedKey.managed_prefix.isnot(None), + ) + .distinct() + .all() + ] + if not prefixes: + prefixes = ["SmartUp"] + all_ids: set[str] = set() + for prefix in prefixes: + remote_keys = client.list_api_keys(search=prefix, status="active") + all_ids.update(str(k["id"]) for k in remote_keys if k.get("id")) + return all_ids + + +def reconcile_upstream_keys( + db: Session, + upstream_id: int, + active_group_ids: set[str] | None, + remote_key_ids: set[str] | None, + captured_at: datetime, +) -> None: + """对账上游 Key 的本地缓存与远端状态。 + + active_group_ids=None → 跳过分组级清理(避免登录失败时误删)。 + remote_key_ids=None → 跳过远端 key_id 级清理(查询失败时安全)。 + 两者同时为 None 则完全跳过对账。 + """ + if active_group_ids is None and remote_key_ids is None: + return + key_rows = ( + db.query(UpstreamGeneratedKey) + .filter( + UpstreamGeneratedKey.upstream_id == upstream_id, + ) + .all() + ) + for row in key_rows: + if active_group_ids is not None and row.group_id not in active_group_ids: + if row.imported_website_id and row.imported_account_id: + row.status = "orphaned" + row.error = "来源分组已不存在" + row.updated_at = captured_at + logger.info("marked key %s orphaned (group %s no longer in snapshot)", row.id, row.group_id) + else: + db.delete(row) + logger.info("removed key %s (group %s no longer in snapshot)", row.id, row.group_id) + continue + if row.key_id and remote_key_ids is not None and row.key_id not in remote_key_ids: + if row.imported_website_id and row.imported_account_id: + row.status = "orphaned" + row.error = "远端 Key 已不存在" + row.updated_at = captured_at + logger.info("marked key %s orphaned (key_id %s gone from remote)", row.id, row.key_id) + else: + db.delete(row) + logger.info("removed key %s (key_id %s gone from remote)", row.id, row.key_id) + continue + if remote_key_ids is not None and row.key_id in remote_key_ids: + row.updated_at = captured_at + + +def reconcile_upstream_keys_full(db: Session, upstream_id: int) -> bool: + """完整的 Key 对账:登录上游、拉取分组和远端 Key 列表、调用 reconcile_upstream_keys。 + + 安全规则: + - 分组拉取成功 → 才允许分组级清理。 + - 远端 Key 列表拉取成功 → 才允许 key_id 级清理。 + - 登录/分组拉取失败 → 不做任何删除/标记。 + + 支持自定义 managed_prefix:查询本地 distinct prefix,分别查远端。 + """ + from datetime import datetime, timezone + + upstream = db.query(Upstream).filter(Upstream.id == upstream_id).first() + if not upstream: + return False + + auth_config = json.loads(upstream.auth_config_json or "{}") + groups_fetched = False + keys_fetched = False + active_group_ids: set[str] | None = None + remote_key_ids: set[str] | None = None + now = datetime.now(timezone.utc) + + try: + with UpstreamClient( + base_url=upstream.base_url, + api_prefix=upstream.api_prefix, + auth_type=upstream.auth_type, + auth_config=auth_config, + timeout=float(upstream.timeout_seconds), + ) as client: + client.login() + # 获取当前分组 ID 集合 + groups = client.get_available_groups(upstream.groups_endpoint) + active_group_ids = {g.get("group_id") or g.get("id") or "" for g in groups if isinstance(g, dict)} + active_group_ids.discard("") + groups_fetched = True + # 获取远端 Key 列表(支持自定义 managed_prefix) + remote_key_ids = _fetch_remote_managed_key_ids(db, client, upstream_id) + keys_fetched = True + except Exception as exc: + logger.warning("reconcile_upstream_keys_full: upstream %s failed: %s", upstream_id, exc) + + # 只传递成功获取的数据;失败的传 None 跳过对应检查 + reconcile_upstream_keys( + db, + upstream_id, + active_group_ids if groups_fetched else None, + remote_key_ids if keys_fetched else None, + now, + ) + db.commit() + return keys_fetched + + def sync_affected_bindings(db: Session, upstream_id: int, changes: list[dict[str, Any]]) -> None: for binding in get_affected_bindings(db, changes, upstream_id): try: diff --git a/frontend/src/views/Websites.vue b/frontend/src/views/Websites.vue index 88f474d..51aab07 100644 --- a/frontend/src/views/Websites.vue +++ b/frontend/src/views/Websites.vue @@ -592,6 +592,7 @@ function isImportableGeneratedKey(item: GeneratedUpstreamKey) { return item.id !== null && item.has_key_value && item.status !== 'failed' + && item.status !== 'orphaned' && !(item.imported_website_id === importAccountsForm.value.website_id && item.imported_account_id) }