Files
SmartUp/backend/test_upstream_key_account_import.py
liumangmang 6044b00685 feat: 上游 Key 唯一化、分组导入跳过、账号导入平台识别&远端校验&base_url 注入
- 上游 Key 命名改为 {prefix}-{upstream.id}-{safe_group_name}-{group_id}
- 唯一约束 (upstream_id, group_id, managed_prefix) 加 managed_prefix 列
- 上游检测成功时同步 Key 状态,远端已删/分组已删自动清理
- 重复分组导入跳过,目标网站已存在同名分组返回 exists
- 账号导入平台自动识别(auto/manual 模式)
- 全选可导入 Key 按钮 + 目标分组自动匹配
- 导入幂等:已导入过的 Key 校验远端账号,不存在则重建
- 新增同步接口 POST /sync-imported-upstream-keys
- account_exists() 通过拉取账号列表判断,避免 404 误判
- credentials.base_url 注入来源上游地址,避免 401
- 前端导入弹窗自动同步+刷新按钮+并发/优先级设置
- 新增 12 个测试覆盖同步、幂等、远端删除、校验失败路径
2026-05-21 01:16:39 +08:00

384 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import sys
from pathlib import Path
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
sys.path.insert(0, str(Path(__file__).resolve().parent))
from app.database import Base
from app.models.upstream import Upstream
from app.models.upstream_key import UpstreamGeneratedKey
from app.models.website import Website
from app.routers import websites as websites_router
from app.schemas.website import ImportAccountsRequest
@pytest.fixture()
def db_session():
engine = create_engine(
"sqlite://",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(bind=engine)
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
db = TestingSessionLocal()
try:
yield db
finally:
db.close()
Base.metadata.drop_all(bind=engine)
def seed_account_import_rows(db_session):
website = Website(
name="My Sub2API",
site_type="sub2api",
base_url="http://sub2api.local",
api_prefix="/api/v1/admin",
auth_type="api_key",
auth_config_json=json.dumps({"key": "admin-key", "header": "x-api-key"}),
groups_endpoint="/groups",
group_update_endpoint="/groups/{id}",
)
upstream = Upstream(
name="Packy",
base_url="http://packy.local",
api_prefix="/api/v1",
auth_type="login_password",
auth_config_json="{}",
)
db_session.add_all([website, upstream])
db_session.commit()
db_session.refresh(website)
db_session.refresh(upstream)
generated = UpstreamGeneratedKey(
upstream_id=upstream.id,
group_id="vip",
group_name="VIP",
key_id="up-key-id",
key_name="SmartUp-VIP",
key_value="sk-upstream-generated",
masked_key="sk-u...ated",
raw_json="{}",
status="created",
)
db_session.add(generated)
db_session.commit()
db_session.refresh(generated)
return website, generated
def test_import_upstream_key_creates_sub2api_account_management_apikey(monkeypatch, db_session):
website, generated = seed_account_import_rows(db_session)
account_bodies = []
class FakeWebsiteClient:
def __init__(self, **kwargs):
self.kwargs = kwargs
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def create_account(self, body, endpoint="/accounts"):
account_bodies.append((endpoint, body))
return {"id": 101, "name": body["name"]}
def account_exists(self, account_id):
return True
@staticmethod
def extract_id(data):
return str(data.get("id"))
monkeypatch.setattr(websites_router, "Sub2ApiWebsiteClient", FakeWebsiteClient)
response = websites_router.import_upstream_keys_as_accounts(
website.id,
ImportAccountsRequest(
upstream_key_ids=[generated.id],
target_group_map={"vip": "7"},
account_name_prefix="SmartUp",
default_platform="anthropic",
),
db_session,
object(),
)
assert "新建 1" in response.message
assert len(account_bodies) == 1
endpoint, body = account_bodies[0]
assert endpoint == "/accounts"
assert body["type"] == "apikey"
assert body["platform"] == "anthropic"
assert body["credentials"]["api_key"] == "sk-upstream-generated"
assert body["credentials"]["base_url"] == "http://packy.local"
assert body["group_ids"] == [7]
assert body["concurrency"] == 10
assert body["priority"] == 1
assert body["credentials"]["base_url"] == "http://packy.local"
db_session.refresh(generated)
assert generated.status == "imported"
assert generated.imported_website_id == website.id
assert generated.imported_account_id == "101"
def test_import_upstream_key_idempotent_skips_already_imported(monkeypatch, db_session):
"""已导入过的 Key 再次调用不会重复创建。"""
website, generated = seed_account_import_rows(db_session)
generated.imported_website_id = website.id
generated.imported_account_id = "101"
db_session.commit()
create_called = False
class FakeWebsiteClient:
def __init__(self, **kwargs):
self.kwargs = kwargs
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def create_account(self, body, endpoint="/accounts"):
nonlocal create_called
create_called = True
return {"id": 999, "name": body["name"]}
def account_exists(self, account_id):
return True # 模拟远端账号仍存在
@staticmethod
def extract_id(data):
return str(data.get("id"))
monkeypatch.setattr(websites_router, "Sub2ApiWebsiteClient", FakeWebsiteClient)
response = websites_router.import_upstream_keys_as_accounts(
website.id,
ImportAccountsRequest(
upstream_key_ids=[generated.id],
target_group_map={"vip": "7"},
account_name_prefix="SmartUp",
default_platform="anthropic",
),
db_session,
object(),
)
# create_account 不应被调用
assert not create_called, "create_account should NOT be called for already imported key"
# 返回 exists
assert len(response.items) == 1
assert response.items[0].status == "exists"
assert response.items[0].account_id == "101"
assert response.items[0].message == "已导入过,已跳过"
# success 应为 true(没有 failed
assert response.success
def test_sync_clears_stale_import_mark(monkeypatch, db_session):
"""同步接口:远端账号已删除时清除本地导入标记。"""
from app.routers import websites as websites_router
from app.schemas.website import SyncImportStatusRequest, ImportAccountsRequest
website, generated = seed_account_import_rows(db_session)
generated.imported_website_id = website.id
generated.imported_account_id = "101"
db_session.commit()
class FakeClient:
def __init__(self, **kwargs):
self.kwargs = kwargs
def __enter__(self):
return self
def __exit__(self, *a):
return False
def account_exists(self, account_id):
return False # 远端返回不存在
def _get_account_ids(self, endpoint="/accounts"):
return set()
@staticmethod
def extract_id(data):
return str(data.get("id"))
monkeypatch.setattr(websites_router, "Sub2ApiWebsiteClient", lambda **kw: FakeClient(**kw))
response = websites_router.sync_imported_upstream_keys(
website.id, SyncImportStatusRequest(upstream_id=generated.upstream_id),
db_session, object(),
)
assert len(response.items) == 1
assert response.items[0].status == "stale_cleared"
assert response.items[0].account_id == "101"
db_session.refresh(generated)
assert generated.imported_account_id is None
def test_sync_preserves_mark_on_check_failed(monkeypatch, db_session):
"""同步接口:校验失败时不清除本地标记。"""
from app.routers import websites as websites_router
from app.schemas.website import SyncImportStatusRequest
website, generated = seed_account_import_rows(db_session)
generated.imported_website_id = website.id
generated.imported_account_id = "101"
db_session.commit()
class FakeClient:
def __init__(self, **kwargs):
self.kwargs = kwargs
def __enter__(self):
return self
def __exit__(self, *a):
return False
def account_exists(self, account_id):
return None # 校验失败
def _get_account_ids(self, endpoint="/accounts"):
return set()
@staticmethod
def extract_id(data):
return str(data.get("id"))
monkeypatch.setattr(websites_router, "Sub2ApiWebsiteClient", lambda **kw: FakeClient(**kw))
response = websites_router.sync_imported_upstream_keys(
website.id, SyncImportStatusRequest(upstream_id=generated.upstream_id),
db_session, object(),
)
assert len(response.items) == 1
assert response.items[0].status == "check_failed"
db_session.refresh(generated)
assert generated.imported_account_id == "101" # 未被清除
def test_import_rebuilds_when_remote_deleted(monkeypatch, db_session):
"""导入接口:远端账号已删除时自动清标记并重新创建。"""
from app.routers import websites as websites_router
from app.schemas.website import ImportAccountsRequest
website, generated = seed_account_import_rows(db_session)
generated.imported_website_id = website.id
generated.imported_account_id = "101"
db_session.commit()
class FakeClient:
def __init__(self, **kwargs):
self.kwargs = kwargs
def __enter__(self):
return self
def __exit__(self, *a):
return False
def account_exists(self, account_id):
return False
def create_account(self, body, endpoint="/accounts"):
return {"id": 202, "name": body["name"]}
@staticmethod
def extract_id(data):
return str(data.get("id"))
monkeypatch.setattr(websites_router, "Sub2ApiWebsiteClient", lambda **kw: FakeClient(**kw))
response = websites_router.import_upstream_keys_as_accounts(
website.id,
ImportAccountsRequest(upstream_key_ids=[generated.id], target_group_map={"vip": "7"},
account_name_prefix="SmartUp", default_platform="openai"),
db_session, object(),
)
assert len(response.items) == 1
assert response.items[0].status == "created", f"expected created, got {response.items[0].status}"
db_session.refresh(generated)
assert generated.imported_account_id == "202"
def test_import_skips_on_check_failed(monkeypatch, db_session):
"""导入接口:校验失败时保守跳过,不创建也不清除。"""
from app.routers import websites as websites_router
from app.schemas.website import ImportAccountsRequest
website, generated = seed_account_import_rows(db_session)
generated.imported_website_id = website.id
generated.imported_account_id = "101"
db_session.commit()
class FakeClient:
def __init__(self, **kwargs):
self.kwargs = kwargs
def __enter__(self):
return self
def __exit__(self, *a):
return False
def account_exists(self, account_id):
return None
def create_account(self, body, endpoint="/accounts"):
raise RuntimeError("should not be called")
@staticmethod
def extract_id(data):
return str(data.get("id"))
monkeypatch.setattr(websites_router, "Sub2ApiWebsiteClient", lambda **kw: FakeClient(**kw))
response = websites_router.import_upstream_keys_as_accounts(
website.id,
ImportAccountsRequest(upstream_key_ids=[generated.id], target_group_map={"vip": "7"},
account_name_prefix="SmartUp", default_platform="openai"),
db_session, object(),
)
assert len(response.items) == 1
assert response.items[0].status == "check_failed"
db_session.refresh(generated)
assert generated.imported_account_id == "101" # 未被清除
assert not response.success
def test_import_upstream_key_with_custom_concurrency_and_priority(monkeypatch, db_session):
website, generated = seed_account_import_rows(db_session)
account_bodies = []
class FakeWebsiteClient:
def __init__(self, **kwargs):
self.kwargs = kwargs
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def create_account(self, body, endpoint="/accounts"):
account_bodies.append((endpoint, body))
return {"id": 202, "name": body["name"]}
def account_exists(self, account_id):
return True
@staticmethod
def extract_id(data):
return str(data.get("id"))
monkeypatch.setattr(websites_router, "Sub2ApiWebsiteClient", FakeWebsiteClient)
websites_router.import_upstream_keys_as_accounts(
website.id,
ImportAccountsRequest(
upstream_key_ids=[generated.id],
target_group_map={"vip": "7"},
account_name_prefix="SmartUp",
default_platform="openai",
concurrency=20,
priority=5,
),
db_session,
object(),
)
assert len(account_bodies) == 1
_, body = account_bodies[0]
assert body["concurrency"] == 20
assert body["priority"] == 5
assert body["credentials"]["base_url"] == "http://packy.local"