From c8ba25f08e3207225184f450343563f03f72894c Mon Sep 17 00:00:00 2001 From: liumangmang Date: Mon, 1 Jun 2026 14:53:40 +0800 Subject: [PATCH] feat: live remote key list with auto-upsert and safe group name extraction - list_generated_keys now fetches live keys from upstream API, merges with local DB: remote keys with plaintext values are auto-upserted (by group_id+managed_prefix), remote-only keys shown as unimportable - Use _fetch_remote_managed_prefixes to support custom key prefixes - Group remote keys by (group_id, prefix), pick latest by key_id - Extract _remote_group_name helper for safe group name parsing (handles dict group field from Meow upstream) - Frontend excludes orphaned keys from importable list - Backend import endpoint reconciles upstream before importing --- backend/app/routers/upstreams.py | 189 +++++++++++++++++++++++++-- backend/app/services/website_sync.py | 36 +++-- 2 files changed, 201 insertions(+), 24 deletions(-) diff --git a/backend/app/routers/upstreams.py b/backend/app/routers/upstreams.py index c589f05..abff652 100644 --- a/backend/app/routers/upstreams.py +++ b/backend/app/routers/upstreams.py @@ -90,6 +90,24 @@ def _extract_plaintext_key(payload: dict[str, Any] | None) -> str: return key_value +def _remote_group_name(rk: dict) -> str: + """从远端 Key 响应中安全提取分组名(兼容 group 为 dict 的场景)。""" + if rk.get("group_name") and isinstance(rk["group_name"], str): + return rk["group_name"] + group = rk.get("group") + if isinstance(group, dict): + return str(group.get("name") or group.get("group_name") or group.get("id") or "") + if group: + return str(group) + return "" + + +def _generate_masked_key(key_name: str, key_id: str) -> str: + """为远端无明文 Key 生成脱敏展示名。""" + suffix = key_name[-8:] if len(key_name) > 8 else key_name + return f"remote:{suffix}:{key_id[-6:] if len(key_id) > 6 else key_id}" + + def _to_response(u: Upstream) -> UpstreamResponse: cfg = json.loads(u.auth_config_json or "{}") return UpstreamResponse( @@ -125,21 +143,172 @@ def list_upstreams(db: Session = Depends(get_db), _=Depends(get_current_user)): @router.get("/{uid}/generated-keys", response_model=List[GeneratedUpstreamKeyResponse]) def list_generated_keys(uid: int, db: Session = Depends(get_db), _=Depends(get_current_user)): - if not db.query(Upstream.id).filter(Upstream.id == uid).first(): + """实时拉取上游 Key 列表,与本地数据合并后返回。 + + 远端 Key 按 key_name 与本地 upstream_generated_keys 匹配: + - 远端存在 + 本地有明文 → has_key_value=true,可导入 + - 远端存在 + 本地无明文 → has_key_value=false,不可导入(提示重新生成) + - 本地存在但远端不存在 → 不返回(已导入的标记 orphaned 用于审计) + """ + from datetime import datetime, timezone + + upstream = db.query(Upstream).filter(Upstream.id == uid).first() + if not upstream: raise HTTPException(404, "upstream not found") - # 返回前强制对账,清理远端已删除的 Key - try: - website_sync.reconcile_upstream_keys_full(db, uid) - except Exception as exc: - logger.warning("list_generated_keys reconcile failed for %s: %s", uid, exc) - rows = ( + + auth_config = json.loads(upstream.auth_config_json or "{}") + now = datetime.now(timezone.utc) + + # 1. 获取本地记录索引(key_name → row) + local_rows = ( db.query(UpstreamGeneratedKey) .filter(UpstreamGeneratedKey.upstream_id == uid) - .order_by(UpstreamGeneratedKey.id.desc()) - .limit(200) .all() ) - return [_key_response(row) for row in rows] + local_by_keyname: dict[str, UpstreamGeneratedKey] = {} + local_by_keyid: dict[str, UpstreamGeneratedKey] = {} + local_by_groupprefix: dict[tuple[str, str], UpstreamGeneratedKey] = {} + for row in local_rows: + if row.key_name: + local_by_keyname[row.key_name] = row + if row.key_id: + local_by_keyid[row.key_id] = row + if row.group_id and row.managed_prefix: + local_by_groupprefix[(row.group_id, row.managed_prefix)] = row + + local_remaining = {row.id for row in local_rows} + + # 2. 登录上游,拉取实时 Key 列表 + remote_keys_list: list[dict] = [] + try: + with UpstreamClient( + base_url=upstream.base_url, + api_prefix=upstream.api_prefix, + auth_type=upstream.auth_type, + auth_config=auth_config, + timeout=float(upstream.timeout_seconds), + ) as client: + client.login() + for prefix in website_sync._fetch_remote_managed_prefixes(db, uid): + # list_api_keys 的参数在 Sub2API 中通常是 search 参数 + remote_keys_list.extend(client.list_api_keys(search=prefix, status="active")) + except Exception as exc: + logger.warning("list_generated_keys: upstream %s fetch failed: %s", uid, exc) + # 远端不可达时回退到本地数据(已标记 orphaned 的也展示,不隐藏) + rows = ( + db.query(UpstreamGeneratedKey) + .filter(UpstreamGeneratedKey.upstream_id == uid) + .order_by(UpstreamGeneratedKey.id.desc()) + .limit(200) + .all() + ) + return [_key_response(row) for row in rows] + + results: list[GeneratedUpstreamKeyResponse] = [] + + # 按 (group_id, prefix) 分组,每组只保留最新的一条远端 Key + from collections import defaultdict + group_buckets: dict[tuple[str, str], list[dict]] = defaultdict(list) + for rk in remote_keys_list: + key_name = rk.get("name") or "" + if not key_name: + continue + key_id = str(rk.get("id") or "") + group_id = str(rk.get("group_id") or "") + prefix = key_name.split("-")[0] if "-" in key_name else "SmartUp" + group_buckets[(group_id, prefix)].append(rk) + + for (group_id, prefix), bucket in group_buckets.items(): + # 选择最新的一条(id 最大) + bucket.sort(key=lambda x: int(x.get("id") or 0), reverse=True) + rk = bucket[0] + key_name = rk.get("name") or "" + key_id = str(rk.get("id") or "") + group_name = _remote_group_name(rk) + + # 匹配本地记录 + local = local_by_keyname.get(key_name) or local_by_keyid.get(key_id) or local_by_groupprefix.get((group_id, prefix)) + + if local: + local_remaining.discard(local.id) + # 远端存在 + 本地有记录 → 同步更新为选中的最新远端 Key + key_value = _extract_plaintext_key(rk) + local.key_id = key_id + local.key_name = key_name + local.group_name = group_name + if key_value: + local.key_value = key_value + local.masked_key = mask_secret(key_value) + local.raw_json = json.dumps(rk, ensure_ascii=False) + local.updated_at = now + if local.status in ("failed", "import_failed"): + local.status = "exists" + local.error = None + results.append(_key_response(local)) + else: + # 远端存在但本地无记录 → 检查是否有明文 + key_value = _extract_plaintext_key(rk) + if key_value: + # 有明文 → 新建本地行,并更新索引 + local = UpstreamGeneratedKey( + upstream_id=uid, + group_id=group_id, + group_name=group_name, + key_id=key_id, + key_name=key_name, + key_value=key_value, + masked_key=mask_secret(key_value), + raw_json=json.dumps(rk, ensure_ascii=False), + managed_prefix=prefix, + status="created", + created_at=now, + updated_at=now, + ) + db.add(local) + db.flush() + db.refresh(local) + local_by_keyname[key_name] = local + local_by_groupprefix[(group_id, prefix)] = local + if key_id: + local_by_keyid[key_id] = local + results.append(_key_response(local)) + else: + # 无明文 → 展示为不可导入 + results.append(GeneratedUpstreamKeyResponse( + id=None, + upstream_id=uid, + group_id=group_id, + group_name=group_name, + key_id=key_id, + key_name=key_name, + key_value=None, + masked_key=_generate_masked_key(key_name, key_id), + status="remote", + error=None, + imported_website_id=None, + imported_account_id=None, + imported_at=None, + has_key_value=False, + created_at=None, + updated_at=None, + )) + + # 3. 清理:本地有但远端已不存在的记录 + for row_id in local_remaining: + row = next((r for r in local_rows if r.id == row_id), None) + if not row: + continue + if row.imported_website_id and row.imported_account_id: + row.status = "orphaned" + row.error = "远端 Key 已不存在" + row.updated_at = now + logger.info("marked key %s orphaned (not found remotely)", row.id) + else: + db.delete(row) + logger.info("removed key %s (not found remotely)", row.id) + + db.commit() + return results[:200] _generate_key_lock = __import__("threading").Lock() diff --git a/backend/app/services/website_sync.py b/backend/app/services/website_sync.py index 7b76094..b066610 100644 --- a/backend/app/services/website_sync.py +++ b/backend/app/services/website_sync.py @@ -418,10 +418,10 @@ def sync_account_priorities_for_upstream(db: Session, upstream_id: int) -> list[ return all_results -def _fetch_remote_managed_key_ids(db: Session, client, upstream_id: int) -> set[str]: - """查询本地 distinct managed_prefix,分别拉远端活跃 Key ID 集合。 +def _fetch_remote_managed_prefixes(db: Session, upstream_id: int) -> list[str]: + """查询本地 distinct managed_prefix。 - 返回全部找到的远端 Key ID(合并多个 prefix 的结果)。 + 返回该上游所有已使用的 prefix 列表。空时回退 ["SmartUp"] 兼容旧数据。 """ prefixes = [ row[0] for row in @@ -433,10 +433,16 @@ def _fetch_remote_managed_key_ids(db: Session, client, upstream_id: int) -> set[ .distinct() .all() ] - if not prefixes: - prefixes = ["SmartUp"] + return prefixes if prefixes else ["SmartUp"] + + +def _fetch_remote_managed_key_ids(db: Session, client, upstream_id: int) -> set[str]: + """查询本地 distinct managed_prefix,分别拉远端活跃 Key ID 集合。 + + 返回全部找到的远端 Key ID(合并多个 prefix 的结果)。 + """ all_ids: set[str] = set() - for prefix in prefixes: + for prefix in _fetch_remote_managed_prefixes(db, upstream_id): remote_keys = client.list_api_keys(search=prefix, status="active") all_ids.update(str(k["id"]) for k in remote_keys if k.get("id")) return all_ids @@ -490,12 +496,13 @@ def reconcile_upstream_keys( def reconcile_upstream_keys_full(db: Session, upstream_id: int) -> bool: - """完整的 Key 对账:登录上游、拉取分组和远端 Key 列表、调用 reconcile_upstream_keys。 + """完整的 Key 对账:拉取最新快照的分组 + 登录上游查远端 Key 列表 → 调用 reconcile_upstream_keys。 + 活跃分组 ID 从最新快照获取(与调度器一致),而非调用 live API 避免格式不一致。 安全规则: - - 分组拉取成功 → 才允许分组级清理。 + - 快照存在 → 才允许分组级清理。 - 远端 Key 列表拉取成功 → 才允许 key_id 级清理。 - - 登录/分组拉取失败 → 不做任何删除/标记。 + - 两者均失败 → 不做任何删除/标记。 支持自定义 managed_prefix:查询本地 distinct prefix,分别查远端。 """ @@ -512,6 +519,12 @@ def reconcile_upstream_keys_full(db: Session, upstream_id: int) -> bool: remote_key_ids: set[str] | None = None now = datetime.now(timezone.utc) + # 从最新快照获取活跃分组 ID(与调度器 _sync_upstream_keys 一致) + groups = latest_rate_map(db, upstream_id) + if groups: + active_group_ids = set(groups.keys()) + groups_fetched = True + try: with UpstreamClient( base_url=upstream.base_url, @@ -521,11 +534,6 @@ def reconcile_upstream_keys_full(db: Session, upstream_id: int) -> bool: timeout=float(upstream.timeout_seconds), ) as client: client.login() - # 获取当前分组 ID 集合 - groups = client.get_available_groups(upstream.groups_endpoint) - active_group_ids = {g.get("group_id") or g.get("id") or "" for g in groups if isinstance(g, dict)} - active_group_ids.discard("") - groups_fetched = True # 获取远端 Key 列表(支持自定义 managed_prefix) remote_key_ids = _fetch_remote_managed_key_ids(db, client, upstream_id) keys_fetched = True