Files
SmartUp/backend/app/services/snapshot_service.py
T
SmartUp Developer ad16618406 fix: address multiple code audit findings
- CORS: replace wildcard with explicit origin list from CORS_ORIGINS env
- Auth: enforce strong defaults, JWT blacklist (RevokedToken model), login rate limiting
- Auth: validate password length before bcrypt (72-byte limit)
- Scheduler: single-threaded worker to mitigate SQLite write contention
- Scheduler: graceful shutdown (wait=True)
- Snapshots: add prune_snapshots() with configurable retention count
- Storage: isolate localStorage keys via VITE_APP_KEY prefix
- Config: add cors_origins, login_rate_limit, snapshot_retention_count settings
2026-05-17 10:52:18 +08:00

61 lines
2.1 KiB
Python

"""Snapshot diff logic."""
from typing import Any, Optional
from sqlalchemy.orm import Session
from app.models.snapshot import UpstreamRateSnapshot
def diff_snapshots(
previous: Optional[dict[str, Any]],
current: dict[str, Any],
) -> list[dict[str, Any]]:
"""Return list of rate changes between previous and current snapshots.
Returns empty list if previous is None (first check)."""
if not previous:
return []
old_groups: dict[str, Any] = previous.get("groups") or {}
new_groups: dict[str, Any] = current.get("groups") or {}
changes: list[dict[str, Any]] = []
for gid, new_g in sorted(new_groups.items()):
if not isinstance(new_g, dict):
continue
old_g = old_groups.get(gid)
old_rate = old_g.get("rate") if isinstance(old_g, dict) else None
new_rate = new_g.get("rate")
if old_rate != new_rate:
changes.append({
"group_id": gid,
"group_name": new_g.get("group_name", ""),
"platform": new_g.get("platform", ""),
"old_rate": old_rate,
"new_rate": new_rate,
})
for gid, old_g in sorted(old_groups.items()):
if gid not in new_groups and isinstance(old_g, dict):
changes.append({
"group_id": gid,
"group_name": old_g.get("group_name", ""),
"platform": old_g.get("platform", ""),
"old_rate": old_g.get("rate"),
"new_rate": None,
})
return changes
def prune_snapshots(db: Session, upstream_id: int, keep: int) -> None:
if keep <= 0:
return
stale_ids = [
row_id
for (row_id,) in (
db.query(UpstreamRateSnapshot.id)
.filter(UpstreamRateSnapshot.upstream_id == upstream_id)
.order_by(UpstreamRateSnapshot.captured_at.desc(), UpstreamRateSnapshot.id.desc())
.offset(keep)
.all()
)
]
if stale_ids:
db.query(UpstreamRateSnapshot).filter(UpstreamRateSnapshot.id.in_(stale_ids)).delete(synchronize_session=False)