feat: live remote key list with auto-upsert and safe group name extraction

- list_generated_keys now fetches live keys from upstream API, merges with
  local DB: remote keys with plaintext values are auto-upserted (by
  group_id+managed_prefix), remote-only keys shown as unimportable
- Use _fetch_remote_managed_prefixes to support custom key prefixes
- Group remote keys by (group_id, prefix), pick latest by key_id
- Extract _remote_group_name helper for safe group name parsing
  (handles dict group field from Meow upstream)
- Frontend excludes orphaned keys from importable list
- Backend import endpoint reconciles upstream before importing
This commit is contained in:
liumangmang
2026-06-01 14:53:40 +08:00
parent bea4344bb3
commit c8ba25f08e
2 changed files with 201 additions and 24 deletions
+179 -10
View File
@@ -90,6 +90,24 @@ def _extract_plaintext_key(payload: dict[str, Any] | None) -> str:
return key_value
def _remote_group_name(rk: dict) -> str:
"""从远端 Key 响应中安全提取分组名(兼容 group 为 dict 的场景)。"""
if rk.get("group_name") and isinstance(rk["group_name"], str):
return rk["group_name"]
group = rk.get("group")
if isinstance(group, dict):
return str(group.get("name") or group.get("group_name") or group.get("id") or "")
if group:
return str(group)
return ""
def _generate_masked_key(key_name: str, key_id: str) -> str:
"""为远端无明文 Key 生成脱敏展示名。"""
suffix = key_name[-8:] if len(key_name) > 8 else key_name
return f"remote:{suffix}:{key_id[-6:] if len(key_id) > 6 else key_id}"
def _to_response(u: Upstream) -> UpstreamResponse:
cfg = json.loads(u.auth_config_json or "{}")
return UpstreamResponse(
@@ -125,21 +143,172 @@ def list_upstreams(db: Session = Depends(get_db), _=Depends(get_current_user)):
@router.get("/{uid}/generated-keys", response_model=List[GeneratedUpstreamKeyResponse])
def list_generated_keys(uid: int, db: Session = Depends(get_db), _=Depends(get_current_user)):
if not db.query(Upstream.id).filter(Upstream.id == uid).first():
"""实时拉取上游 Key 列表,与本地数据合并后返回。
远端 Key 按 key_name 与本地 upstream_generated_keys 匹配:
- 远端存在 + 本地有明文 → has_key_value=true,可导入
- 远端存在 + 本地无明文 → has_key_value=false,不可导入(提示重新生成)
- 本地存在但远端不存在 → 不返回(已导入的标记 orphaned 用于审计)
"""
from datetime import datetime, timezone
upstream = db.query(Upstream).filter(Upstream.id == uid).first()
if not upstream:
raise HTTPException(404, "upstream not found")
# 返回前强制对账,清理远端已删除的 Key
try:
website_sync.reconcile_upstream_keys_full(db, uid)
except Exception as exc:
logger.warning("list_generated_keys reconcile failed for %s: %s", uid, exc)
rows = (
auth_config = json.loads(upstream.auth_config_json or "{}")
now = datetime.now(timezone.utc)
# 1. 获取本地记录索引(key_name → row
local_rows = (
db.query(UpstreamGeneratedKey)
.filter(UpstreamGeneratedKey.upstream_id == uid)
.order_by(UpstreamGeneratedKey.id.desc())
.limit(200)
.all()
)
return [_key_response(row) for row in rows]
local_by_keyname: dict[str, UpstreamGeneratedKey] = {}
local_by_keyid: dict[str, UpstreamGeneratedKey] = {}
local_by_groupprefix: dict[tuple[str, str], UpstreamGeneratedKey] = {}
for row in local_rows:
if row.key_name:
local_by_keyname[row.key_name] = row
if row.key_id:
local_by_keyid[row.key_id] = row
if row.group_id and row.managed_prefix:
local_by_groupprefix[(row.group_id, row.managed_prefix)] = row
local_remaining = {row.id for row in local_rows}
# 2. 登录上游,拉取实时 Key 列表
remote_keys_list: list[dict] = []
try:
with UpstreamClient(
base_url=upstream.base_url,
api_prefix=upstream.api_prefix,
auth_type=upstream.auth_type,
auth_config=auth_config,
timeout=float(upstream.timeout_seconds),
) as client:
client.login()
for prefix in website_sync._fetch_remote_managed_prefixes(db, uid):
# list_api_keys 的参数在 Sub2API 中通常是 search 参数
remote_keys_list.extend(client.list_api_keys(search=prefix, status="active"))
except Exception as exc:
logger.warning("list_generated_keys: upstream %s fetch failed: %s", uid, exc)
# 远端不可达时回退到本地数据(已标记 orphaned 的也展示,不隐藏)
rows = (
db.query(UpstreamGeneratedKey)
.filter(UpstreamGeneratedKey.upstream_id == uid)
.order_by(UpstreamGeneratedKey.id.desc())
.limit(200)
.all()
)
return [_key_response(row) for row in rows]
results: list[GeneratedUpstreamKeyResponse] = []
# 按 (group_id, prefix) 分组,每组只保留最新的一条远端 Key
from collections import defaultdict
group_buckets: dict[tuple[str, str], list[dict]] = defaultdict(list)
for rk in remote_keys_list:
key_name = rk.get("name") or ""
if not key_name:
continue
key_id = str(rk.get("id") or "")
group_id = str(rk.get("group_id") or "")
prefix = key_name.split("-")[0] if "-" in key_name else "SmartUp"
group_buckets[(group_id, prefix)].append(rk)
for (group_id, prefix), bucket in group_buckets.items():
# 选择最新的一条(id 最大)
bucket.sort(key=lambda x: int(x.get("id") or 0), reverse=True)
rk = bucket[0]
key_name = rk.get("name") or ""
key_id = str(rk.get("id") or "")
group_name = _remote_group_name(rk)
# 匹配本地记录
local = local_by_keyname.get(key_name) or local_by_keyid.get(key_id) or local_by_groupprefix.get((group_id, prefix))
if local:
local_remaining.discard(local.id)
# 远端存在 + 本地有记录 → 同步更新为选中的最新远端 Key
key_value = _extract_plaintext_key(rk)
local.key_id = key_id
local.key_name = key_name
local.group_name = group_name
if key_value:
local.key_value = key_value
local.masked_key = mask_secret(key_value)
local.raw_json = json.dumps(rk, ensure_ascii=False)
local.updated_at = now
if local.status in ("failed", "import_failed"):
local.status = "exists"
local.error = None
results.append(_key_response(local))
else:
# 远端存在但本地无记录 → 检查是否有明文
key_value = _extract_plaintext_key(rk)
if key_value:
# 有明文 → 新建本地行,并更新索引
local = UpstreamGeneratedKey(
upstream_id=uid,
group_id=group_id,
group_name=group_name,
key_id=key_id,
key_name=key_name,
key_value=key_value,
masked_key=mask_secret(key_value),
raw_json=json.dumps(rk, ensure_ascii=False),
managed_prefix=prefix,
status="created",
created_at=now,
updated_at=now,
)
db.add(local)
db.flush()
db.refresh(local)
local_by_keyname[key_name] = local
local_by_groupprefix[(group_id, prefix)] = local
if key_id:
local_by_keyid[key_id] = local
results.append(_key_response(local))
else:
# 无明文 → 展示为不可导入
results.append(GeneratedUpstreamKeyResponse(
id=None,
upstream_id=uid,
group_id=group_id,
group_name=group_name,
key_id=key_id,
key_name=key_name,
key_value=None,
masked_key=_generate_masked_key(key_name, key_id),
status="remote",
error=None,
imported_website_id=None,
imported_account_id=None,
imported_at=None,
has_key_value=False,
created_at=None,
updated_at=None,
))
# 3. 清理:本地有但远端已不存在的记录
for row_id in local_remaining:
row = next((r for r in local_rows if r.id == row_id), None)
if not row:
continue
if row.imported_website_id and row.imported_account_id:
row.status = "orphaned"
row.error = "远端 Key 已不存在"
row.updated_at = now
logger.info("marked key %s orphaned (not found remotely)", row.id)
else:
db.delete(row)
logger.info("removed key %s (not found remotely)", row.id)
db.commit()
return results[:200]
_generate_key_lock = __import__("threading").Lock()
+22 -14
View File
@@ -418,10 +418,10 @@ def sync_account_priorities_for_upstream(db: Session, upstream_id: int) -> list[
return all_results
def _fetch_remote_managed_key_ids(db: Session, client, upstream_id: int) -> set[str]:
"""查询本地 distinct managed_prefix,分别拉远端活跃 Key ID 集合
def _fetch_remote_managed_prefixes(db: Session, upstream_id: int) -> list[str]:
"""查询本地 distinct managed_prefix。
返回全部找到的远端 Key ID(合并多个 prefix 的结果)
返回该上游所有已使用的 prefix 列表。空时回退 ["SmartUp"] 兼容旧数据
"""
prefixes = [
row[0] for row in
@@ -433,10 +433,16 @@ def _fetch_remote_managed_key_ids(db: Session, client, upstream_id: int) -> set[
.distinct()
.all()
]
if not prefixes:
prefixes = ["SmartUp"]
return prefixes if prefixes else ["SmartUp"]
def _fetch_remote_managed_key_ids(db: Session, client, upstream_id: int) -> set[str]:
"""查询本地 distinct managed_prefix,分别拉远端活跃 Key ID 集合。
返回全部找到的远端 Key ID(合并多个 prefix 的结果)。
"""
all_ids: set[str] = set()
for prefix in prefixes:
for prefix in _fetch_remote_managed_prefixes(db, upstream_id):
remote_keys = client.list_api_keys(search=prefix, status="active")
all_ids.update(str(k["id"]) for k in remote_keys if k.get("id"))
return all_ids
@@ -490,12 +496,13 @@ def reconcile_upstream_keys(
def reconcile_upstream_keys_full(db: Session, upstream_id: int) -> bool:
"""完整的 Key 对账:登录上游、拉取分组和远端 Key 列表调用 reconcile_upstream_keys。
"""完整的 Key 对账:拉取最新快照的分组 + 登录上游查远端 Key 列表调用 reconcile_upstream_keys。
活跃分组 ID 从最新快照获取(与调度器一致),而非调用 live API 避免格式不一致。
安全规则:
- 分组拉取成功 → 才允许分组级清理。
- 快照存在 → 才允许分组级清理。
- 远端 Key 列表拉取成功 → 才允许 key_id 级清理。
- 登录/分组拉取失败 → 不做任何删除/标记。
- 两者均失败 → 不做任何删除/标记。
支持自定义 managed_prefix:查询本地 distinct prefix,分别查远端。
"""
@@ -512,6 +519,12 @@ def reconcile_upstream_keys_full(db: Session, upstream_id: int) -> bool:
remote_key_ids: set[str] | None = None
now = datetime.now(timezone.utc)
# 从最新快照获取活跃分组 ID(与调度器 _sync_upstream_keys 一致)
groups = latest_rate_map(db, upstream_id)
if groups:
active_group_ids = set(groups.keys())
groups_fetched = True
try:
with UpstreamClient(
base_url=upstream.base_url,
@@ -521,11 +534,6 @@ def reconcile_upstream_keys_full(db: Session, upstream_id: int) -> bool:
timeout=float(upstream.timeout_seconds),
) as client:
client.login()
# 获取当前分组 ID 集合
groups = client.get_available_groups(upstream.groups_endpoint)
active_group_ids = {g.get("group_id") or g.get("id") or "" for g in groups if isinstance(g, dict)}
active_group_ids.discard("")
groups_fetched = True
# 获取远端 Key 列表(支持自定义 managed_prefix
remote_key_ids = _fetch_remote_managed_key_ids(db, client, upstream_id)
keys_fetched = True