fix: reconcile upstream keys on list/generate/import to prevent stale key imports

- Extract reconcile_upstream_keys() to website_sync.py (shared scheduler + on-demand)
- Add reconcile_upstream_keys_full() for on-demand reconciliation at three entry points:
  list_generated_keys, generate_keys_by_groups, import_upstream_keys_as_accounts
- Safe on failure: active_group_ids=None / remote_key_ids=None skip cleanup
- Support custom managed_prefix via _fetch_remote_managed_key_ids() helper
- Exclude orphaned keys from frontend importable list
- Remove hardcoded search='SmartUp' from scheduler path
This commit is contained in:
liumangmang
2026-06-01 11:29:37 +08:00
parent 3408795289
commit bea4344bb3
5 changed files with 175 additions and 46 deletions
+11
View File
@@ -127,6 +127,11 @@ def list_upstreams(db: Session = Depends(get_db), _=Depends(get_current_user)):
def list_generated_keys(uid: int, db: Session = Depends(get_db), _=Depends(get_current_user)):
if not db.query(Upstream.id).filter(Upstream.id == uid).first():
raise HTTPException(404, "upstream not found")
# 返回前强制对账,清理远端已删除的 Key
try:
website_sync.reconcile_upstream_keys_full(db, uid)
except Exception as exc:
logger.warning("list_generated_keys reconcile failed for %s: %s", uid, exc)
rows = (
db.query(UpstreamGeneratedKey)
.filter(UpstreamGeneratedKey.upstream_id == uid)
@@ -293,6 +298,12 @@ def generate_keys_by_groups(
if u.api_prefix.strip("/") != "api/v1":
raise HTTPException(400, "首版仅支持 Sub2API 上游(API Prefix 应为 /api/v1")
# 生成前先对账,清理远端已删除的旧 Key
try:
website_sync.reconcile_upstream_keys_full(db, uid)
except Exception as exc:
logger.warning("generate_keys_by_groups reconcile failed for %s: %s", uid, exc)
auth_config = json.loads(u.auth_config_json or "{}")
selected = set(body.group_ids)
prefix = body.name_prefix
+27 -1
View File
@@ -34,7 +34,7 @@ from app.schemas.website import (
)
from app.services.website_client import Sub2ApiWebsiteClient
from app.services.website_sync import binding_sources, sync_binding, build_rate_priority_map
from app.services.website_sync import binding_sources, sync_binding, build_rate_priority_map, reconcile_upstream_keys_full
from app.utils.auth import get_current_user
router = APIRouter(tags=["websites"])
@@ -483,6 +483,20 @@ def import_upstream_keys_as_accounts(
.order_by(UpstreamGeneratedKey.id)
.all()
)
# 导入前尝试对账(失败不阻塞,仅打日志—避免远端不可达时误删本地 Key)
upstream_ids = {row.upstream_id for row in rows}
for uid in upstream_ids:
try:
reconcile_upstream_keys_full(db, uid)
except Exception as exc:
logger.warning("import reconcile failed for upstream %s: %s", uid, exc)
# 重新查询(对账成功时已清理失效 Key)
rows = (
db.query(UpstreamGeneratedKey)
.filter(UpstreamGeneratedKey.id.in_(body.upstream_key_ids))
.order_by(UpstreamGeneratedKey.id)
.all()
)
found_ids = {row.id for row in rows}
missing_ids = [kid for kid in body.upstream_key_ids if kid not in found_ids]
items: list[ImportAccountItem] = [
@@ -518,6 +532,18 @@ def import_upstream_keys_as_accounts(
with _client(website) as c:
for row in rows:
# 跳过远端已不存在或导入失败的 Key(对账后标记为 orphaned / import_failed
if row.status in ("orphaned", "failed", "import_failed"):
items.append(ImportAccountItem(
upstream_key_id=row.id,
source_group_id=row.group_id,
source_group_name=row.group_name,
target_group_id=body.target_group_map.get(row.group_id),
platform=body.default_platform,
status="failed",
message="远端 Key 已不存在,请重新生成",
))
continue
# 先确定平台(失败项也需要记录)
if body.platform_mode == "auto":
platform = _detect_platform(
+9 -45
View File
@@ -11,9 +11,8 @@ from sqlalchemy.orm import Session
from app.database import SessionLocal
from app.models.upstream import Upstream
from app.models.upstream_key import UpstreamGeneratedKey
from app.models.snapshot import UpstreamRateSnapshot
from app.services.upstream_client import UpstreamClient, UpstreamError, build_snapshot
from app.services.upstream_client import UpstreamClient, build_snapshot
from app.services.snapshot_service import diff_snapshots, prune_snapshots
from app.services import webhook_service
from app.services import website_sync
@@ -217,26 +216,19 @@ def _notify_balance_low(
def _sync_upstream_keys(upstream_id: int, snapshot: dict[str, Any], captured_at: datetime) -> None:
"""上游检测成功后同步 SmartUp Key 状态(远端删除/分组删除)。"""
"""上游检测成功后同步 SmartUp Key 状态(远端删除/分组删除)。
委托给 website_sync.reconcile_upstream_keys 实现核心逻辑。
"""
db = SessionLocal()
try:
active_group_ids = set(snapshot.get("groups", {}).keys())
key_rows = (
db.query(UpstreamGeneratedKey)
.filter(
UpstreamGeneratedKey.upstream_id == upstream_id,
UpstreamGeneratedKey.key_name.like("SmartUp-%"),
)
.all()
)
auth_config = json.loads(
db.query(Upstream).filter(Upstream.id == upstream_id).first().auth_config_json or "{}"
)
# 用 UpstreamClient 查询远端活跃 Key ID 集合
remote_key_ids: set[str] | None = None # None=查询失败,set()=查询成功但为空
remote_key_ids: set[str] | None = None
try:
upstream = db.query(Upstream).filter(Upstream.id == upstream_id).first()
if upstream:
auth_config = json.loads(upstream.auth_config_json or "{}")
with UpstreamClient(
base_url=upstream.base_url,
api_prefix=upstream.api_prefix,
@@ -245,39 +237,11 @@ def _sync_upstream_keys(upstream_id: int, snapshot: dict[str, Any], captured_at:
timeout=float(upstream.timeout_seconds),
) as client:
client.login()
remote_keys = client.list_api_keys(search="SmartUp", status="active")
remote_key_ids = {
str(k["id"]) for k in remote_keys if k.get("id")
}
remote_key_ids = website_sync._fetch_remote_managed_key_ids(db, client, upstream_id)
except Exception as exc:
logger.warning("sync upstream keys list failed for %s: %s", upstream_id, exc)
for row in key_rows:
# 1. 分组已不在当前快照中 → 删除本地记录
if row.group_id not in active_group_ids:
if row.imported_website_id and row.imported_account_id:
row.status = "orphaned"
row.error = "来源分组已不存在"
row.updated_at = captured_at
logger.info("marked key %s orphaned (group %s no longer in snapshot)", row.id, row.group_id)
else:
db.delete(row)
logger.info("removed key %s (group %s no longer in snapshot)", row.id, row.group_id)
continue
# 2. 远端查询成功但 key_id 不在列表中 → 删除本地记录
if row.key_id and remote_key_ids is not None and row.key_id not in remote_key_ids:
if row.imported_website_id and row.imported_account_id:
row.status = "orphaned"
row.error = "远端 Key 已不存在"
row.updated_at = captured_at
logger.info("marked key %s orphaned (key_id %s gone from remote)", row.id, row.key_id)
else:
db.delete(row)
logger.info("removed key %s (key_id %s gone from remote)", row.id, row.key_id)
continue
# 3. 更新同步时间戳(仅当查询成功且 Key 仍在远端时)
if remote_key_ids is not None and row.key_id in remote_key_ids:
row.updated_at = captured_at
website_sync.reconcile_upstream_keys(db, upstream_id, active_group_ids, remote_key_ids, captured_at)
db.commit()
except Exception:
logger.exception("key sync failed for upstream %s", upstream_id)
+127
View File
@@ -12,6 +12,7 @@ from app.models.upstream import Upstream
from app.models.upstream_key import UpstreamGeneratedKey
from app.models.website import Website, WebsiteGroupBinding, WebsiteSyncLog
from app.services.website_client import Sub2ApiWebsiteClient, WebsiteError, calculate_target_rate, decimal_string
from app.services.upstream_client import UpstreamClient
from app.services import webhook_service
logger = logging.getLogger(__name__)
@@ -417,6 +418,132 @@ def sync_account_priorities_for_upstream(db: Session, upstream_id: int) -> list[
return all_results
def _fetch_remote_managed_key_ids(db: Session, client, upstream_id: int) -> set[str]:
"""查询本地 distinct managed_prefix,分别拉远端活跃 Key ID 集合。
返回全部找到的远端 Key ID(合并多个 prefix 的结果)。
"""
prefixes = [
row[0] for row in
db.query(UpstreamGeneratedKey.managed_prefix)
.filter(
UpstreamGeneratedKey.upstream_id == upstream_id,
UpstreamGeneratedKey.managed_prefix.isnot(None),
)
.distinct()
.all()
]
if not prefixes:
prefixes = ["SmartUp"]
all_ids: set[str] = set()
for prefix in prefixes:
remote_keys = client.list_api_keys(search=prefix, status="active")
all_ids.update(str(k["id"]) for k in remote_keys if k.get("id"))
return all_ids
def reconcile_upstream_keys(
db: Session,
upstream_id: int,
active_group_ids: set[str] | None,
remote_key_ids: set[str] | None,
captured_at: datetime,
) -> None:
"""对账上游 Key 的本地缓存与远端状态。
active_group_ids=None → 跳过分组级清理(避免登录失败时误删)。
remote_key_ids=None → 跳过远端 key_id 级清理(查询失败时安全)。
两者同时为 None 则完全跳过对账。
"""
if active_group_ids is None and remote_key_ids is None:
return
key_rows = (
db.query(UpstreamGeneratedKey)
.filter(
UpstreamGeneratedKey.upstream_id == upstream_id,
)
.all()
)
for row in key_rows:
if active_group_ids is not None and row.group_id not in active_group_ids:
if row.imported_website_id and row.imported_account_id:
row.status = "orphaned"
row.error = "来源分组已不存在"
row.updated_at = captured_at
logger.info("marked key %s orphaned (group %s no longer in snapshot)", row.id, row.group_id)
else:
db.delete(row)
logger.info("removed key %s (group %s no longer in snapshot)", row.id, row.group_id)
continue
if row.key_id and remote_key_ids is not None and row.key_id not in remote_key_ids:
if row.imported_website_id and row.imported_account_id:
row.status = "orphaned"
row.error = "远端 Key 已不存在"
row.updated_at = captured_at
logger.info("marked key %s orphaned (key_id %s gone from remote)", row.id, row.key_id)
else:
db.delete(row)
logger.info("removed key %s (key_id %s gone from remote)", row.id, row.key_id)
continue
if remote_key_ids is not None and row.key_id in remote_key_ids:
row.updated_at = captured_at
def reconcile_upstream_keys_full(db: Session, upstream_id: int) -> bool:
"""完整的 Key 对账:登录上游、拉取分组和远端 Key 列表、调用 reconcile_upstream_keys。
安全规则:
- 分组拉取成功 → 才允许分组级清理。
- 远端 Key 列表拉取成功 → 才允许 key_id 级清理。
- 登录/分组拉取失败 → 不做任何删除/标记。
支持自定义 managed_prefix:查询本地 distinct prefix,分别查远端。
"""
from datetime import datetime, timezone
upstream = db.query(Upstream).filter(Upstream.id == upstream_id).first()
if not upstream:
return False
auth_config = json.loads(upstream.auth_config_json or "{}")
groups_fetched = False
keys_fetched = False
active_group_ids: set[str] | None = None
remote_key_ids: set[str] | None = None
now = datetime.now(timezone.utc)
try:
with UpstreamClient(
base_url=upstream.base_url,
api_prefix=upstream.api_prefix,
auth_type=upstream.auth_type,
auth_config=auth_config,
timeout=float(upstream.timeout_seconds),
) as client:
client.login()
# 获取当前分组 ID 集合
groups = client.get_available_groups(upstream.groups_endpoint)
active_group_ids = {g.get("group_id") or g.get("id") or "" for g in groups if isinstance(g, dict)}
active_group_ids.discard("")
groups_fetched = True
# 获取远端 Key 列表(支持自定义 managed_prefix
remote_key_ids = _fetch_remote_managed_key_ids(db, client, upstream_id)
keys_fetched = True
except Exception as exc:
logger.warning("reconcile_upstream_keys_full: upstream %s failed: %s", upstream_id, exc)
# 只传递成功获取的数据;失败的传 None 跳过对应检查
reconcile_upstream_keys(
db,
upstream_id,
active_group_ids if groups_fetched else None,
remote_key_ids if keys_fetched else None,
now,
)
db.commit()
return keys_fetched
def sync_affected_bindings(db: Session, upstream_id: int, changes: list[dict[str, Any]]) -> None:
for binding in get_affected_bindings(db, changes, upstream_id):
try:
+1
View File
@@ -592,6 +592,7 @@ function isImportableGeneratedKey(item: GeneratedUpstreamKey) {
return item.id !== null
&& item.has_key_value
&& item.status !== 'failed'
&& item.status !== 'orphaned'
&& !(item.imported_website_id === importAccountsForm.value.website_id && item.imported_account_id)
}