Files
liumangmang e519d1804b fix(priority-sync): narrow account priority update to competitive groups only
Root cause: sync_account_priorities_for_upstream() was doing a global
priority re-rank across ALL imported accounts on a website whenever any
upstream rate changed, triggering spurious account_priority_changed
notifications for accounts in different target groups with no competition.

Fix:
- Add imported_target_group_id / imported_target_group_name to
  UpstreamGeneratedKey (nullable; old data falls back to group_id)
- Writ imported_target_group_id on account import in websites.py
- Rewrite sync_account_priorities_for_upstream():
  * bucket accounts by competition_group = imported_target_group_id or group_id
  * only process buckets with count > 1 (genuine competition)
  * each competitive bucket independently sorted by rate; priority starts at 1
  * single-account groups: completely skipped (no update_account, no notification)
  * no competitive groups at all: early return, no log, no notification
- Remove auto priority update in re-import idempotency path (was also
  incorrect; now fully delegated to sync_account_priorities_for_upstream)
- Fix Sub2ApiWebsiteClient local import in sync fn → use module-level name
  so monkeypatch works correctly in tests

Tests: rewrite test_priority_sync.py
- REMOVED: test_priority_sync_full_website_update (was asserting the buggy behavior)
- NEW: test_no_update_when_different_groups_single_account_each
- NEW: test_same_target_group_two_accounts_updated
- NEW: test_two_target_groups_independent_priority
- NEW: test_old_data_null_target_group_fallback
- NEW: test_single_account_in_mixed_website
- UPDATED: test_priority_sync_log_structure (now requires competitive group)
- KEPT: test_priority_sync_cross_upstream_group, test_import_auto_priority_by_rate

All 25 tests pass (8 priority_sync + 17 existing upstream tests).
2026-06-01 19:13:14 +08:00

194 lines
9.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from sqlalchemy import create_engine, event, inspect, text
from sqlalchemy.orm import sessionmaker, DeclarativeBase
from app.config import get_settings
settings = get_settings()
engine = create_engine(
settings.database_url,
connect_args={"check_same_thread": False},
)
# ── SQLite 性能 PRAGMAWAL + 缓存 + 超时) ──
@event.listens_for(engine, "connect", insert=True)
def _set_sqlite_pragma(dbapi_connection, connection_record):
if engine.dialect.name != "sqlite":
return
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA journal_mode=WAL;")
cursor.execute("PRAGMA synchronous=NORMAL;")
cursor.execute("PRAGMA busy_timeout=5000;")
cursor.execute("PRAGMA foreign_keys=ON;")
cursor.execute("PRAGMA cache_size=-20000;") # 20 MB page cache
cursor.execute("PRAGMA mmap_size=67108864;") # 64 MB(小内存容器友好)
cursor.close()
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
class Base(DeclarativeBase):
pass
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
def init_db():
"""Create all tables."""
# import models so SQLAlchemy registers them
from app.models import admin_user, upstream, snapshot, webhook_config, notification_log, custom_page, website, revoked_token, upstream_key # noqa: F401
Base.metadata.create_all(bind=engine)
_ensure_indexes()
_migrate_custom_pages()
_migrate_upstreams()
_migrate_upstream_generated_keys()
# ── 已有数据库幂等索引迁移 ─────────────────────────────────
_NEW_INDEXES = [
"CREATE INDEX IF NOT EXISTS ix_snapshot_upstream_captured ON upstream_rate_snapshots(upstream_id, captured_at DESC)",
"CREATE INDEX IF NOT EXISTS ix_notif_event_created ON notification_logs(event_type, created_at DESC)",
"CREATE INDEX IF NOT EXISTS ix_notif_status_created ON notification_logs(status, created_at DESC)",
"CREATE INDEX IF NOT EXISTS ix_sync_website_created ON website_sync_logs(website_id, created_at DESC)",
"CREATE INDEX IF NOT EXISTS ix_sync_binding_created ON website_sync_logs(binding_id, created_at DESC)",
"CREATE INDEX IF NOT EXISTS ix_upstream_enabled ON upstreams(enabled)",
"CREATE INDEX IF NOT EXISTS ix_key_upstream_name ON upstream_generated_keys(upstream_id, key_name)",
]
def _ensure_indexes() -> None:
"""创建 InitDB 时可能未包含的复合索引,幂等执行。"""
with engine.begin() as conn:
for stmt in _NEW_INDEXES:
try:
conn.execute(text(stmt))
except Exception:
logger = __import__("logging").getLogger(__name__)
logger.warning("index creation failed (non-fatal): %s", stmt[:60])
def _migrate_custom_pages():
"""Apply small SQLite-safe migrations for deployments without Alembic."""
inspector = inspect(engine)
if "custom_pages" not in inspector.get_table_names():
return
columns = {col["name"] for col in inspector.get_columns("custom_pages")}
with engine.begin() as conn:
if "access_mode" not in columns:
conn.execute(text("ALTER TABLE custom_pages ADD COLUMN access_mode VARCHAR(32) NOT NULL DEFAULT 'direct'"))
conn.execute(text("UPDATE custom_pages SET access_mode = CASE WHEN use_proxy = 1 THEN 'proxy' ELSE 'direct' END"))
if "login_username" not in columns:
conn.execute(text("ALTER TABLE custom_pages ADD COLUMN login_username VARCHAR(255)"))
if "login_password" not in columns:
conn.execute(text("ALTER TABLE custom_pages ADD COLUMN login_password TEXT"))
if "login_username_selector" not in columns:
conn.execute(text("ALTER TABLE custom_pages ADD COLUMN login_username_selector VARCHAR(512)"))
if "login_password_selector" not in columns:
conn.execute(text("ALTER TABLE custom_pages ADD COLUMN login_password_selector VARCHAR(512)"))
if "login_submit_selector" not in columns:
conn.execute(text("ALTER TABLE custom_pages ADD COLUMN login_submit_selector VARCHAR(512)"))
if "login_autofill_enabled" not in columns:
conn.execute(text("ALTER TABLE custom_pages ADD COLUMN login_autofill_enabled BOOLEAN NOT NULL DEFAULT 0"))
if "login_autofill_backfilled_at" not in columns:
conn.execute(text("ALTER TABLE custom_pages ADD COLUMN login_autofill_backfilled_at DATETIME"))
conn.execute(
text(
"UPDATE custom_pages "
"SET login_autofill_enabled = 1, login_autofill_backfilled_at = CURRENT_TIMESTAMP "
"WHERE login_autofill_enabled = 0 "
"AND NULLIF(TRIM(login_username), '') IS NOT NULL "
"AND NULLIF(TRIM(login_password), '') IS NOT NULL"
)
)
if "linked_upstream_id" not in columns:
conn.execute(text("ALTER TABLE custom_pages ADD COLUMN linked_upstream_id INTEGER"))
def _migrate_upstreams():
"""Apply SQLite-safe migrations to the upstreams table."""
inspector = inspect(engine)
if "upstreams" not in inspector.get_table_names():
return
columns = {col["name"] for col in inspector.get_columns("upstreams")}
with engine.begin() as conn:
if "balance" not in columns:
conn.execute(text("ALTER TABLE upstreams ADD COLUMN balance FLOAT"))
if "balance_updated_at" not in columns:
conn.execute(text("ALTER TABLE upstreams ADD COLUMN balance_updated_at DATETIME"))
if "balance_endpoint" not in columns:
conn.execute(text("ALTER TABLE upstreams ADD COLUMN balance_endpoint VARCHAR(256) NOT NULL DEFAULT ''"))
if "balance_response_path" not in columns:
conn.execute(text("ALTER TABLE upstreams ADD COLUMN balance_response_path VARCHAR(256) NOT NULL DEFAULT ''"))
if "balance_divisor" not in columns:
conn.execute(text("ALTER TABLE upstreams ADD COLUMN balance_divisor FLOAT NOT NULL DEFAULT 1.0"))
if "balance_alert_threshold" not in columns:
conn.execute(text("ALTER TABLE upstreams ADD COLUMN balance_alert_threshold FLOAT"))
if "balance_alert_notified" not in columns:
conn.execute(text("ALTER TABLE upstreams ADD COLUMN balance_alert_notified BOOLEAN NOT NULL DEFAULT 0"))
def _migrate_upstream_generated_keys():
"""Apply SQLite-safe migrations to the generated upstream keys table."""
inspector = inspect(engine)
if "upstream_generated_keys" not in inspector.get_table_names():
return
columns = {col["name"] for col in inspector.get_columns("upstream_generated_keys")}
with engine.begin() as conn:
if "imported_website_id" not in columns:
conn.execute(text("ALTER TABLE upstream_generated_keys ADD COLUMN imported_website_id INTEGER"))
if "imported_account_id" not in columns:
conn.execute(text("ALTER TABLE upstream_generated_keys ADD COLUMN imported_account_id VARCHAR(255)"))
if "imported_at" not in columns:
conn.execute(text("ALTER TABLE upstream_generated_keys ADD COLUMN imported_at DATETIME"))
if "updated_at" not in columns:
conn.execute(text("ALTER TABLE upstream_generated_keys ADD COLUMN updated_at DATETIME"))
conn.execute(text("UPDATE upstream_generated_keys SET updated_at = created_at WHERE updated_at IS NULL"))
if "managed_prefix" not in columns:
conn.execute(text("ALTER TABLE upstream_generated_keys ADD COLUMN managed_prefix VARCHAR(64)"))
if "imported_target_group_id" not in columns:
conn.execute(text("ALTER TABLE upstream_generated_keys ADD COLUMN imported_target_group_id VARCHAR(255)"))
if "imported_target_group_name" not in columns:
conn.execute(text("ALTER TABLE upstream_generated_keys ADD COLUMN imported_target_group_name VARCHAR(255)"))
# ——— 历史数据迁移:回填 managed_prefix + 清理重复 ———
with engine.begin() as conn:
# 1. 回填:key_name 以 SmartUp- 开头的旧记录设置 managed_prefix = 'SmartUp'
conn.execute(text(
"UPDATE upstream_generated_keys SET managed_prefix = 'SmartUp' "
"WHERE managed_prefix IS NULL AND key_name LIKE 'SmartUp-%'"
))
# 2. 清理:同一 (upstream_id, group_id, managed_prefix) 只保留最新一条
# SQLite 不支持子查询直接 DELETE,用两步
to_delete = conn.execute(text("""
SELECT id FROM upstream_generated_keys
WHERE managed_prefix IS NOT NULL
AND id NOT IN (
SELECT MAX(id) FROM upstream_generated_keys
WHERE managed_prefix IS NOT NULL
GROUP BY upstream_id, group_id, managed_prefix
)
""")).fetchall()
for (row_id,) in to_delete:
conn.execute(text("DELETE FROM upstream_generated_keys WHERE id = :id"), {"id": row_id})
# ——— 创建唯一索引 ———
try:
with engine.begin() as conn:
conn.execute(
text("CREATE UNIQUE INDEX IF NOT EXISTS uq_upstream_group_key "
"ON upstream_generated_keys(upstream_id, group_id, key_name)")
)
conn.execute(
text("CREATE UNIQUE INDEX IF NOT EXISTS uq_upstream_group_managed "
"ON upstream_generated_keys(upstream_id, group_id, managed_prefix) "
"WHERE managed_prefix IS NOT NULL")
)
except Exception:
logger = __import__("logging").getLogger(__name__)
logger.warning("could not create unique indexes on upstream_generated_keys (non-fatal)")