"""Snapshot diff logic.""" from typing import Any, Optional from sqlalchemy.orm import Session from app.models.snapshot import UpstreamRateSnapshot def diff_snapshots( previous: Optional[dict[str, Any]], current: dict[str, Any], ) -> list[dict[str, Any]]: """Return list of rate changes between previous and current snapshots. Returns empty list if previous is None (first check).""" if not previous: return [] old_groups: dict[str, Any] = previous.get("groups") or {} new_groups: dict[str, Any] = current.get("groups") or {} changes: list[dict[str, Any]] = [] for gid, new_g in sorted(new_groups.items()): if not isinstance(new_g, dict): continue old_g = old_groups.get(gid) old_rate = old_g.get("rate") if isinstance(old_g, dict) else None new_rate = new_g.get("rate") if old_rate != new_rate: changes.append({ "group_id": gid, "group_name": new_g.get("group_name", ""), "platform": new_g.get("platform", ""), "old_rate": old_rate, "new_rate": new_rate, }) for gid, old_g in sorted(old_groups.items()): if gid not in new_groups and isinstance(old_g, dict): changes.append({ "group_id": gid, "group_name": old_g.get("group_name", ""), "platform": old_g.get("platform", ""), "old_rate": old_g.get("rate"), "new_rate": None, }) return changes def prune_snapshots(db: Session, upstream_id: int, keep: int) -> None: if keep <= 0: return stale_ids = [ row_id for (row_id,) in ( db.query(UpstreamRateSnapshot.id) .filter(UpstreamRateSnapshot.upstream_id == upstream_id) .order_by(UpstreamRateSnapshot.captured_at.desc(), UpstreamRateSnapshot.id.desc()) .offset(keep) .all() ) ] if stale_ids: db.query(UpstreamRateSnapshot).filter(UpstreamRateSnapshot.id.in_(stale_ids)).delete(synchronize_session=False)