Files
2026-04-30 14:34:28 +08:00

1210 lines
38 KiB
Python

import json
import sqlite3
from pathlib import Path
from uuid import uuid4
from .task_constants import (
ACTIVE_TASK_STATUSES,
SCAN_PROGRESS_LOG_LIMIT,
STAGE_STATUS_FAILED,
TASK_STATUS_COMPLETED,
TASK_STATUS_FAILED,
TASK_STATUS_PENDING,
TASK_TYPE_INGEST,
current_timestamp,
create_empty_dedupe_stats,
create_empty_match_stats,
create_empty_organize_stats,
create_empty_preprocess_stats,
create_empty_repair_stats,
create_empty_scan_stats,
create_empty_task_stats,
create_pending_repair_stage_states,
create_pending_stage_states
)
class TaskConflictError(Exception):
def __init__(self, active_task_id: str):
super().__init__('Task already running')
self.active_task_id = active_task_id
class TaskNotFoundError(Exception):
pass
class TaskStore:
def __init__(self, db_path: Path):
self.db_path = Path(db_path)
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._initialize()
def _connect(self):
connection = sqlite3.connect(self.db_path, check_same_thread=False)
connection.row_factory = sqlite3.Row
connection.execute('PRAGMA foreign_keys = ON')
return connection
def _initialize(self):
with self._connect() as connection:
connection.executescript(
'''
CREATE TABLE IF NOT EXISTS task_runs (
id TEXT PRIMARY KEY,
task_type TEXT NOT NULL DEFAULT 'ingest',
trigger_source TEXT NOT NULL,
source_task_id TEXT,
status TEXT NOT NULL,
current_stage TEXT NOT NULL,
stage_states_json TEXT NOT NULL,
config_snapshot_json TEXT NOT NULL,
stats_json TEXT NOT NULL,
repair_plan_json TEXT,
error_message TEXT,
started_at TEXT NOT NULL,
completed_at TEXT,
updated_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_task_runs_status_updated
ON task_runs (status, updated_at DESC);
CREATE TABLE IF NOT EXISTS task_items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL,
original_path TEXT NOT NULL,
relative_path TEXT NOT NULL,
filename TEXT NOT NULL,
extension TEXT NOT NULL,
size_bytes INTEGER,
modified_at TEXT,
local_cover TEXT,
local_lyric TEXT,
scan_status TEXT NOT NULL,
scan_reason TEXT,
scan_message TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY (task_id) REFERENCES task_runs(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS task_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL,
stage TEXT NOT NULL,
level TEXT NOT NULL,
event_type TEXT NOT NULL,
message TEXT NOT NULL,
payload_json TEXT,
created_at TEXT NOT NULL,
FOREIGN KEY (task_id) REFERENCES task_runs(id) ON DELETE CASCADE
);
'''
)
self._ensure_task_items_columns(connection)
self._ensure_indexes(connection)
connection.commit()
def _ensure_task_items_columns(self, connection: sqlite3.Connection):
existing_columns = {
row['name']
for row in connection.execute("PRAGMA table_info('task_items')").fetchall()
}
column_specs = {
'parent_item_id': 'INTEGER',
'is_active': 'INTEGER NOT NULL DEFAULT 1',
'current_file_path': 'TEXT',
'preprocess_status': "TEXT NOT NULL DEFAULT 'pending'",
'preprocess_reason': 'TEXT',
'preprocess_message': 'TEXT',
'audio_props_json': 'TEXT',
'original_tags_json': 'TEXT',
'preprocess_artifacts_json': 'TEXT',
'acoustic_fingerprint': 'TEXT',
'fingerprint_duration_seconds': 'REAL',
'match_status': "TEXT NOT NULL DEFAULT 'pending'",
'match_reason': 'TEXT',
'match_message': 'TEXT',
'match_source': 'TEXT',
'match_confidence': 'REAL',
'match_is_authoritative': 'INTEGER NOT NULL DEFAULT 0',
'matched_metadata_json': 'TEXT',
'match_candidates_json': 'TEXT',
'match_enrichment_json': 'TEXT',
'dedupe_status': "TEXT NOT NULL DEFAULT 'pending'",
'dedupe_reason': 'TEXT',
'dedupe_message': 'TEXT',
'dedupe_group_key': 'TEXT',
'duplicate_of_path': 'TEXT',
'duplicate_of_item_id': 'INTEGER',
'dedupe_decision_json': 'TEXT',
'organize_status': "TEXT NOT NULL DEFAULT 'pending'",
'organize_reason': 'TEXT',
'organize_message': 'TEXT',
'library_relative_path': 'TEXT',
'library_file_path': 'TEXT',
'trash_file_path': 'TEXT',
'organize_decision_json': 'TEXT',
'exception_resolution_status': "TEXT NOT NULL DEFAULT 'open'",
'exception_resolution_json': 'TEXT',
'last_repair_task_id': 'TEXT'
}
task_run_columns = {
row['name']
for row in connection.execute("PRAGMA table_info('task_runs')").fetchall()
}
task_run_specs = {
'task_type': "TEXT NOT NULL DEFAULT 'ingest'",
'source_task_id': 'TEXT',
'repair_plan_json': 'TEXT'
}
for column_name, column_sql in task_run_specs.items():
if column_name not in task_run_columns:
connection.execute(
f'ALTER TABLE task_runs ADD COLUMN {column_name} {column_sql}'
)
for column_name, column_sql in column_specs.items():
if column_name not in existing_columns:
connection.execute(
f'ALTER TABLE task_items ADD COLUMN {column_name} {column_sql}'
)
connection.execute(
'''
UPDATE task_items
SET current_file_path = original_path
WHERE current_file_path IS NULL
'''
)
def _ensure_indexes(self, connection: sqlite3.Connection):
connection.executescript(
'''
CREATE INDEX IF NOT EXISTS idx_task_items_task_created
ON task_items (task_id, created_at ASC);
CREATE INDEX IF NOT EXISTS idx_task_items_task_status
ON task_items (task_id, scan_status);
CREATE INDEX IF NOT EXISTS idx_task_items_task_preprocess
ON task_items (task_id, preprocess_status, is_active);
CREATE INDEX IF NOT EXISTS idx_task_items_task_match
ON task_items (task_id, match_status, is_active);
CREATE INDEX IF NOT EXISTS idx_task_items_task_dedupe
ON task_items (task_id, dedupe_status, is_active);
CREATE INDEX IF NOT EXISTS idx_task_items_task_organize
ON task_items (task_id, organize_status, is_active);
CREATE INDEX IF NOT EXISTS idx_task_logs_task_id
ON task_logs (task_id, id ASC);
'''
)
def fail_stale_active_tasks(self) -> list[str]:
failed_task_ids: list[str] = []
timestamp = current_timestamp()
with self._connect() as connection:
connection.isolation_level = None
connection.execute('BEGIN IMMEDIATE')
stale_rows = connection.execute(
'''
SELECT id, stage_states_json
FROM task_runs
WHERE status IN (?, ?)
''',
ACTIVE_TASK_STATUSES
).fetchall()
for row in stale_rows:
stage_states = json.loads(row['stage_states_json'])
repaired_stage_states = {
stage_id: (
STAGE_STATUS_FAILED
if stage_status in ('pending', 'running')
else stage_status
)
for stage_id, stage_status in stage_states.items()
}
connection.execute(
'''
UPDATE task_runs
SET status = ?, stage_states_json = ?, error_message = ?, completed_at = ?, updated_at = ?
WHERE id = ?
''',
(
TASK_STATUS_FAILED,
json.dumps(repaired_stage_states),
'Service restarted unexpectedly',
timestamp,
timestamp,
row['id']
)
)
connection.execute(
'''
INSERT INTO task_logs (task_id, stage, level, event_type, message, payload_json, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
''',
(
row['id'],
'system',
'error',
'task.failed',
'Service restarted unexpectedly',
None,
timestamp
)
)
failed_task_ids.append(row['id'])
connection.commit()
return failed_task_ids
def create_task_if_idle(
self,
config_snapshot: dict,
trigger_source: str = 'manual',
*,
task_type: str = TASK_TYPE_INGEST,
source_task_id: str | None = None,
repair_plan_json: dict | None = None
) -> dict:
task_id = str(uuid4())
timestamp = current_timestamp()
if task_type == TASK_TYPE_INGEST:
stage_states = create_pending_stage_states()
stats = create_empty_task_stats()
initial_stage = 'scan'
else:
stage_states = create_pending_repair_stage_states()
stats = create_empty_repair_stats()
initial_stage = 'prepare'
with self._connect() as connection:
connection.isolation_level = None
connection.execute('BEGIN IMMEDIATE')
active_row = connection.execute(
'''
SELECT id
FROM task_runs
WHERE status IN (?, ?)
AND task_type = ?
ORDER BY started_at DESC
LIMIT 1
''',
(*ACTIVE_TASK_STATUSES, task_type)
).fetchone()
if active_row is not None:
connection.rollback()
raise TaskConflictError(active_row['id'])
connection.execute(
'''
INSERT INTO task_runs (
id, task_type, trigger_source, source_task_id, status, current_stage, stage_states_json,
config_snapshot_json, stats_json, repair_plan_json, error_message, started_at, completed_at, updated_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''',
(
task_id,
task_type,
trigger_source,
source_task_id,
TASK_STATUS_PENDING,
initial_stage,
json.dumps(stage_states),
json.dumps(config_snapshot),
json.dumps(stats),
json.dumps(repair_plan_json) if repair_plan_json is not None else None,
None,
timestamp,
None,
timestamp
)
)
connection.commit()
return self.get_task(task_id)
def get_active_task(self, task_type: str = TASK_TYPE_INGEST) -> dict | None:
with self._connect() as connection:
row = connection.execute(
'''
SELECT *
FROM task_runs
WHERE status IN (?, ?)
AND task_type = ?
ORDER BY started_at DESC
LIMIT 1
''',
(*ACTIVE_TASK_STATUSES, task_type)
).fetchone()
return self._parse_task(row) if row is not None else None
def get_latest_task(self, task_type: str | None = TASK_TYPE_INGEST) -> dict | None:
with self._connect() as connection:
if task_type is None:
row = connection.execute(
'''
SELECT *
FROM task_runs
ORDER BY started_at DESC
LIMIT 1
'''
).fetchone()
else:
row = connection.execute(
'''
SELECT *
FROM task_runs
WHERE task_type = ?
ORDER BY started_at DESC
LIMIT 1
''',
(task_type,)
).fetchone()
return self._parse_task(row) if row is not None else None
def get_task(self, task_id: str) -> dict:
with self._connect() as connection:
row = connection.execute(
'SELECT * FROM task_runs WHERE id = ?',
(task_id,)
).fetchone()
if row is None:
raise TaskNotFoundError(task_id)
return self._parse_task(row)
def update_task(
self,
task_id: str,
*,
status: str | None = None,
current_stage: str | None = None,
stage_states: dict[str, str] | None = None,
stats: dict | None = None,
error_message: str | None = None,
completed_at: str | None = None
) -> dict:
assignments: list[str] = []
values: list[object] = []
if status is not None:
assignments.append('status = ?')
values.append(status)
if current_stage is not None:
assignments.append('current_stage = ?')
values.append(current_stage)
if stage_states is not None:
assignments.append('stage_states_json = ?')
values.append(json.dumps(stage_states))
if stats is not None:
assignments.append('stats_json = ?')
values.append(json.dumps(stats))
if error_message is not None or completed_at is not None:
assignments.append('error_message = ?')
values.append(error_message)
if completed_at is not None:
assignments.append('completed_at = ?')
values.append(completed_at)
assignments.append('updated_at = ?')
values.append(current_timestamp())
values.append(task_id)
with self._connect() as connection:
connection.execute(
f'UPDATE task_runs SET {", ".join(assignments)} WHERE id = ?',
values
)
connection.commit()
return self.get_task(task_id)
def append_log(
self,
task_id: str,
stage: str,
level: str,
event_type: str,
message: str,
payload: dict | None = None
) -> dict:
timestamp = current_timestamp()
with self._connect() as connection:
cursor = connection.execute(
'''
INSERT INTO task_logs (task_id, stage, level, event_type, message, payload_json, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
''',
(
task_id,
stage,
level,
event_type,
message,
json.dumps(payload) if payload is not None else None,
timestamp
)
)
connection.commit()
row = connection.execute(
'SELECT * FROM task_logs WHERE id = ?',
(cursor.lastrowid,)
).fetchone()
return self._parse_log(row)
def insert_task_item(
self,
task_id: str,
*,
original_path: str,
relative_path: str,
filename: str,
extension: str,
size_bytes: int | None,
modified_at: str | None,
local_cover: str | None,
local_lyric: str | None,
scan_status: str,
scan_reason: str | None,
scan_message: str | None,
parent_item_id: int | None = None,
is_active: int = 1,
current_file_path: str | None = None,
preprocess_status: str = 'pending',
preprocess_reason: str | None = None,
preprocess_message: str | None = None,
audio_props_json: dict | None = None,
original_tags_json: dict | None = None,
preprocess_artifacts_json: dict | None = None,
acoustic_fingerprint: str | None = None,
fingerprint_duration_seconds: float | None = None,
match_status: str = 'pending',
match_reason: str | None = None,
match_message: str | None = None,
match_source: str | None = None,
match_confidence: float | None = None,
match_is_authoritative: int = 0,
matched_metadata_json: dict | None = None,
match_candidates_json: list[dict] | None = None,
match_enrichment_json: dict | None = None,
dedupe_status: str = 'pending',
dedupe_reason: str | None = None,
dedupe_message: str | None = None,
dedupe_group_key: str | None = None,
duplicate_of_path: str | None = None,
duplicate_of_item_id: int | None = None,
dedupe_decision_json: dict | None = None,
organize_status: str = 'pending',
organize_reason: str | None = None,
organize_message: str | None = None,
library_relative_path: str | None = None,
library_file_path: str | None = None,
trash_file_path: str | None = None,
organize_decision_json: dict | None = None,
exception_resolution_status: str = 'open',
exception_resolution_json: dict | None = None,
last_repair_task_id: str | None = None
) -> dict:
timestamp = current_timestamp()
current_file_path = current_file_path or original_path
with self._connect() as connection:
cursor = connection.execute(
'''
INSERT INTO task_items (
task_id, parent_item_id, is_active, original_path, current_file_path, relative_path,
filename, extension, size_bytes, modified_at, local_cover, local_lyric, scan_status,
scan_reason, scan_message, preprocess_status, preprocess_reason, preprocess_message,
audio_props_json, original_tags_json, preprocess_artifacts_json, acoustic_fingerprint,
fingerprint_duration_seconds, match_status, match_reason, match_message, match_source,
match_confidence, match_is_authoritative, matched_metadata_json, match_candidates_json,
match_enrichment_json, dedupe_status, dedupe_reason, dedupe_message,
dedupe_group_key, duplicate_of_path, duplicate_of_item_id, dedupe_decision_json,
organize_status, organize_reason, organize_message, library_relative_path,
library_file_path, trash_file_path, organize_decision_json,
exception_resolution_status, exception_resolution_json, last_repair_task_id,
created_at, updated_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''',
(
task_id,
parent_item_id,
is_active,
original_path,
current_file_path,
relative_path,
filename,
extension,
size_bytes,
modified_at,
local_cover,
local_lyric,
scan_status,
scan_reason,
scan_message,
preprocess_status,
preprocess_reason,
preprocess_message,
json.dumps(audio_props_json) if audio_props_json is not None else None,
json.dumps(original_tags_json) if original_tags_json is not None else None,
json.dumps(preprocess_artifacts_json) if preprocess_artifacts_json is not None else None,
acoustic_fingerprint,
fingerprint_duration_seconds,
match_status,
match_reason,
match_message,
match_source,
match_confidence,
match_is_authoritative,
json.dumps(matched_metadata_json) if matched_metadata_json is not None else None,
json.dumps(match_candidates_json) if match_candidates_json is not None else None,
json.dumps(match_enrichment_json) if match_enrichment_json is not None else None,
dedupe_status,
dedupe_reason,
dedupe_message,
dedupe_group_key,
duplicate_of_path,
duplicate_of_item_id,
json.dumps(dedupe_decision_json) if dedupe_decision_json is not None else None,
organize_status,
organize_reason,
organize_message,
library_relative_path,
library_file_path,
trash_file_path,
json.dumps(organize_decision_json) if organize_decision_json is not None else None,
exception_resolution_status,
json.dumps(exception_resolution_json) if exception_resolution_json is not None else None,
last_repair_task_id,
timestamp,
timestamp
)
)
connection.commit()
row = connection.execute(
'SELECT * FROM task_items WHERE id = ?',
(cursor.lastrowid,)
).fetchone()
return self._parse_item(row)
def update_task_item(self, item_id: int, **fields) -> dict:
json_fields = {
'audio_props_json',
'original_tags_json',
'preprocess_artifacts_json',
'matched_metadata_json',
'match_candidates_json',
'match_enrichment_json',
'dedupe_decision_json',
'organize_decision_json',
'exception_resolution_json'
}
assignments: list[str] = []
values: list[object] = []
for key, value in fields.items():
assignments.append(f'{key} = ?')
if key in json_fields and value is not None:
values.append(json.dumps(value))
else:
values.append(value)
assignments.append('updated_at = ?')
values.append(current_timestamp())
values.append(item_id)
with self._connect() as connection:
connection.execute(
f'UPDATE task_items SET {", ".join(assignments)} WHERE id = ?',
values
)
connection.commit()
row = connection.execute(
'SELECT * FROM task_items WHERE id = ?',
(item_id,)
).fetchone()
if row is None:
raise TaskNotFoundError(item_id)
return self._parse_item(row)
def list_preprocess_candidate_items(self, task_id: str) -> list[dict]:
with self._connect() as connection:
rows = connection.execute(
'''
SELECT *
FROM task_items
WHERE task_id = ?
AND is_active = 1
AND scan_status = 'queued'
AND preprocess_status IN ('pending', 'running')
ORDER BY created_at ASC, id ASC
''',
(task_id,)
).fetchall()
return [self._parse_item(row) for row in rows]
def list_match_candidate_items(self, task_id: str) -> list[dict]:
with self._connect() as connection:
rows = connection.execute(
'''
SELECT *
FROM task_items
WHERE task_id = ?
AND is_active = 1
AND scan_status = 'queued'
AND preprocess_status IN ('completed', 'warning')
AND match_status IN ('pending', 'running')
ORDER BY created_at ASC, id ASC
''',
(task_id,)
).fetchall()
return [self._parse_item(row) for row in rows]
def list_dedupe_candidate_items(self, task_id: str) -> list[dict]:
with self._connect() as connection:
rows = connection.execute(
'''
SELECT *
FROM task_items
WHERE task_id = ?
AND is_active = 1
AND match_status IN ('matched', 'matched_fallback')
AND dedupe_status IN ('pending', 'running')
ORDER BY created_at ASC, id ASC
''',
(task_id,)
).fetchall()
return [self._parse_item(row) for row in rows]
def list_organize_candidate_items(self, task_id: str) -> list[dict]:
with self._connect() as connection:
rows = connection.execute(
'''
SELECT *
FROM task_items
WHERE task_id = ?
AND is_active = 1
AND match_status IN ('matched', 'matched_fallback')
AND dedupe_status IN ('unique', 'duplicate_replaced')
AND organize_status IN ('pending', 'running')
ORDER BY created_at ASC, id ASC
''',
(task_id,)
).fetchall()
return [self._parse_item(row) for row in rows]
def list_task_items(
self,
task_id: str,
scan_status: str | None,
page: int,
page_size: int,
*,
preprocess_status: str | None = None,
match_status: str | None = None,
dedupe_status: str | None = None,
organize_status: str | None = None,
active_only: bool = False
) -> dict:
offset = (page - 1) * page_size
where_clauses = ['task_id = ?']
params: list[object] = [task_id]
if scan_status:
where_clauses.append('scan_status = ?')
params.append(scan_status)
if preprocess_status:
where_clauses.append('preprocess_status = ?')
params.append(preprocess_status)
if match_status:
where_clauses.append('match_status = ?')
params.append(match_status)
if dedupe_status:
where_clauses.append('dedupe_status = ?')
params.append(dedupe_status)
if organize_status:
where_clauses.append('organize_status = ?')
params.append(organize_status)
if active_only:
where_clauses.append('is_active = 1')
where_clause = f"WHERE {' AND '.join(where_clauses)}"
with self._connect() as connection:
total = connection.execute(
f'SELECT COUNT(*) AS total FROM task_items {where_clause}',
params
).fetchone()['total']
rows = connection.execute(
f'''
SELECT *
FROM task_items
{where_clause}
ORDER BY created_at ASC, id ASC
LIMIT ? OFFSET ?
''',
[*params, page_size, offset]
).fetchall()
return {
'items': [self._parse_item(row) for row in rows],
'page': page,
'page_size': page_size,
'total': total
}
def list_all_task_items(self, task_id: str, *, active_only: bool = False) -> list[dict]:
where_clauses = ['task_id = ?']
params: list[object] = [task_id]
if active_only:
where_clauses.append('is_active = 1')
with self._connect() as connection:
rows = connection.execute(
f'''
SELECT *
FROM task_items
WHERE {' AND '.join(where_clauses)}
ORDER BY created_at ASC, id ASC
''',
params
).fetchall()
return [self._parse_item(row) for row in rows]
def list_task_history(self, page: int, page_size: int) -> dict:
offset = (page - 1) * page_size
with self._connect() as connection:
total = connection.execute(
'''
SELECT COUNT(*) AS total
FROM task_runs
WHERE status IN (?, ?)
AND task_type = 'ingest'
''',
(TASK_STATUS_COMPLETED, TASK_STATUS_FAILED)
).fetchone()['total']
rows = connection.execute(
'''
WITH task_item_stats AS (
SELECT
task_id,
COUNT(*) AS total_items,
SUM(CASE WHEN organize_status = 'organized' THEN 1 ELSE 0 END) AS success_items
FROM task_items
GROUP BY task_id
)
SELECT
task_runs.id AS task_id,
task_runs.started_at,
task_runs.status,
COALESCE(task_item_stats.total_items, 0) AS total_items,
COALESCE(task_item_stats.success_items, 0) AS success_items
FROM task_runs
LEFT JOIN task_item_stats ON task_item_stats.task_id = task_runs.id
WHERE task_runs.status IN (?, ?)
AND task_runs.task_type = 'ingest'
ORDER BY task_runs.started_at DESC, task_runs.id DESC
LIMIT ? OFFSET ?
''',
(TASK_STATUS_COMPLETED, TASK_STATUS_FAILED, page_size, offset)
).fetchall()
return {
'items': [self._parse_task_history_row(row) for row in rows],
'page': page,
'page_size': page_size,
'total': total
}
def list_task_logs(self, task_id: str, page: int, page_size: int) -> dict:
offset = (page - 1) * page_size
with self._connect() as connection:
total = connection.execute(
'SELECT COUNT(*) AS total FROM task_logs WHERE task_id = ?',
(task_id,)
).fetchone()['total']
rows = connection.execute(
'''
SELECT *
FROM task_logs
WHERE task_id = ?
ORDER BY id ASC
LIMIT ? OFFSET ?
''',
(task_id, page_size, offset)
).fetchall()
return {
'logs': [self._parse_log(row) for row in rows],
'page': page,
'page_size': page_size,
'total': total
}
def list_library_provenance_items(self) -> list[dict]:
with self._connect() as connection:
rows = connection.execute(
'''
SELECT
task_id,
library_file_path,
library_relative_path,
updated_at AS organized_at,
match_source,
match_confidence,
dedupe_status
FROM task_items
WHERE organize_status = 'organized'
ORDER BY updated_at DESC, id DESC
'''
).fetchall()
return [
{
'task_id': row['task_id'],
'library_file_path': row['library_file_path'],
'library_relative_path': row['library_relative_path'],
'organized_at': row['organized_at'],
'match_source': row['match_source'],
'match_confidence': row['match_confidence'],
'dedupe_status': row['dedupe_status']
}
for row in rows
]
def list_exception_source_items(self, resolution_status: str = 'open') -> list[dict]:
if resolution_status not in {'open', 'resolved', 'ignored', 'all', 'planned'}:
raise ValueError(f'Unsupported resolution status: {resolution_status}')
resolution_clause = ''
params: list[object] = []
if resolution_status == 'open':
resolution_clause = "AND task_items.exception_resolution_status IN ('open', 'planned')"
elif resolution_status != 'all':
resolution_clause = 'AND task_items.exception_resolution_status = ?'
params.append(resolution_status)
with self._connect() as connection:
rows = connection.execute(
'''
SELECT
task_items.*,
task_runs.started_at AS task_started_at
FROM task_items
JOIN task_runs ON task_runs.id = task_items.task_id
WHERE (
task_items.organize_status IN ('trashed', 'failed')
OR task_items.dedupe_status IN ('duplicate_trashed', 'failed')
OR task_items.match_status IN ('low_score', 'failed', 'not_found')
OR (
task_items.preprocess_status = 'failed'
AND task_items.preprocess_reason = 'convert_failed'
)
OR (
task_items.preprocess_status = 'warning'
AND task_items.preprocess_reason LIKE '%metadata_failed%'
)
OR (
task_items.exception_resolution_status IN ('open', 'planned')
AND task_items.exception_resolution_json IS NOT NULL
AND (
task_items.exception_resolution_json LIKE '%"workflow_state"%candidate_selected%'
OR task_items.exception_resolution_json LIKE '%"workflow_state"%ready_to_ingest%'
)
)
)
''' + resolution_clause + '''
ORDER BY task_items.updated_at DESC, task_items.id DESC
''',
params
).fetchall()
return [self._parse_item_with_task_context(row) for row in rows]
def get_exception_source_item(self, item_id: int) -> dict | None:
with self._connect() as connection:
row = connection.execute(
'''
SELECT
task_items.*,
task_runs.started_at AS task_started_at
FROM task_items
JOIN task_runs ON task_runs.id = task_items.task_id
WHERE task_items.id = ?
''',
(item_id,)
).fetchone()
if row is None:
return None
return self._parse_item_with_task_context(row)
def get_exception_source_items_by_ids(self, item_ids: list[int]) -> list[dict]:
if not item_ids:
return []
placeholders = ', '.join('?' for _ in item_ids)
with self._connect() as connection:
rows = connection.execute(
f'''
SELECT
task_items.*,
task_runs.started_at AS task_started_at
FROM task_items
JOIN task_runs ON task_runs.id = task_items.task_id
WHERE task_items.id IN ({placeholders})
''',
item_ids
).fetchall()
return [self._parse_item_with_task_context(row) for row in rows]
def get_task_snapshot(self, task_id: str) -> dict:
task = self.get_task(task_id)
with self._connect() as connection:
total_logs = connection.execute(
'SELECT COUNT(*) AS total FROM task_logs WHERE task_id = ?',
(task_id,)
).fetchone()['total']
recent_rows = connection.execute(
'''
SELECT *
FROM task_logs
WHERE task_id = ?
ORDER BY id DESC
LIMIT ?
''',
(task_id, SCAN_PROGRESS_LOG_LIMIT)
).fetchall()
recent_logs = [self._parse_log(row) for row in reversed(recent_rows)]
return {
'task': task,
'recent_logs': recent_logs,
'recent_logs_limit': SCAN_PROGRESS_LOG_LIMIT,
'has_more_logs': total_logs > len(recent_logs),
'latest_log_id': recent_logs[-1]['id'] if recent_logs else None
}
def _parse_task(self, row: sqlite3.Row | None) -> dict | None:
if row is None:
return None
raw_stats = json.loads(row['stats_json'])
if row['task_type'] != TASK_TYPE_INGEST:
stats = {
'prepare': {
**create_empty_repair_stats()['prepare'],
**(raw_stats.get('prepare') or {})
},
'execute': {
**create_empty_repair_stats()['execute'],
**(raw_stats.get('execute') or {})
}
}
elif 'scan' in raw_stats:
stats = {
'scan': {
**create_empty_scan_stats(),
**(raw_stats.get('scan') or {})
},
'preprocess': {
**create_empty_preprocess_stats(),
**(raw_stats.get('preprocess') or {})
},
'match': {
**create_empty_match_stats(),
**(raw_stats.get('match') or {})
},
'dedupe': {
**create_empty_dedupe_stats(),
**(raw_stats.get('dedupe') or {})
},
'organize': {
**create_empty_organize_stats(),
**(raw_stats.get('organize') or {})
}
}
else:
stats = {
'scan': {
**create_empty_scan_stats(),
**raw_stats
},
'preprocess': create_empty_preprocess_stats(),
'match': create_empty_match_stats(),
'dedupe': create_empty_dedupe_stats(),
'organize': create_empty_organize_stats()
}
return {
'task_id': row['id'],
'task_type': row['task_type'] or TASK_TYPE_INGEST,
'trigger_source': row['trigger_source'],
'source_task_id': row['source_task_id'],
'status': row['status'],
'current_stage': row['current_stage'],
'stage_states': json.loads(row['stage_states_json']),
'stats': stats,
'repair_plan_json': json.loads(row['repair_plan_json']) if row['repair_plan_json'] else None,
'error_message': row['error_message'],
'started_at': row['started_at'],
'completed_at': row['completed_at'],
'updated_at': row['updated_at']
}
def _parse_item(self, row: sqlite3.Row) -> dict:
return {
'id': row['id'],
'task_id': row['task_id'],
'parent_item_id': row['parent_item_id'],
'is_active': bool(row['is_active']),
'original_path': row['original_path'],
'current_file_path': row['current_file_path'] or row['original_path'],
'relative_path': row['relative_path'],
'filename': row['filename'],
'extension': row['extension'],
'size_bytes': row['size_bytes'],
'modified_at': row['modified_at'],
'local_cover': row['local_cover'],
'local_lyric': row['local_lyric'],
'scan_status': row['scan_status'],
'scan_reason': row['scan_reason'],
'scan_message': row['scan_message'],
'preprocess_status': row['preprocess_status'] or 'pending',
'preprocess_reason': row['preprocess_reason'],
'preprocess_message': row['preprocess_message'],
'audio_props_json': json.loads(row['audio_props_json']) if row['audio_props_json'] else None,
'original_tags_json': json.loads(row['original_tags_json']) if row['original_tags_json'] else None,
'preprocess_artifacts_json': (
json.loads(row['preprocess_artifacts_json'])
if row['preprocess_artifacts_json']
else None
),
'acoustic_fingerprint': row['acoustic_fingerprint'],
'fingerprint_duration_seconds': row['fingerprint_duration_seconds'],
'match_status': row['match_status'] or 'pending',
'match_reason': row['match_reason'],
'match_message': row['match_message'],
'match_source': row['match_source'],
'match_confidence': row['match_confidence'],
'match_is_authoritative': bool(row['match_is_authoritative']),
'matched_metadata_json': (
json.loads(row['matched_metadata_json'])
if row['matched_metadata_json']
else None
),
'match_candidates_json': (
json.loads(row['match_candidates_json'])
if row['match_candidates_json']
else None
),
'match_enrichment_json': (
json.loads(row['match_enrichment_json'])
if row['match_enrichment_json']
else None
),
'dedupe_status': row['dedupe_status'] or 'pending',
'dedupe_reason': row['dedupe_reason'],
'dedupe_message': row['dedupe_message'],
'dedupe_group_key': row['dedupe_group_key'],
'duplicate_of_path': row['duplicate_of_path'],
'duplicate_of_item_id': row['duplicate_of_item_id'],
'dedupe_decision_json': (
json.loads(row['dedupe_decision_json'])
if row['dedupe_decision_json']
else None
),
'organize_status': row['organize_status'] or 'pending',
'organize_reason': row['organize_reason'],
'organize_message': row['organize_message'],
'library_relative_path': row['library_relative_path'],
'library_file_path': row['library_file_path'],
'trash_file_path': row['trash_file_path'],
'organize_decision_json': (
json.loads(row['organize_decision_json'])
if row['organize_decision_json']
else None
),
'exception_resolution_status': row['exception_resolution_status'] or 'open',
'exception_resolution_json': (
json.loads(row['exception_resolution_json'])
if row['exception_resolution_json']
else None
),
'last_repair_task_id': row['last_repair_task_id'],
'created_at': row['created_at'],
'updated_at': row['updated_at']
}
def _parse_item_with_task_context(self, row: sqlite3.Row) -> dict:
item = self._parse_item(row)
item['task_started_at'] = row['task_started_at']
return item
def _parse_log(self, row: sqlite3.Row) -> dict:
return {
'id': row['id'],
'task_id': row['task_id'],
'stage': row['stage'],
'level': row['level'],
'event_type': row['event_type'],
'message': row['message'],
'payload': json.loads(row['payload_json']) if row['payload_json'] else None,
'created_at': row['created_at']
}
def _parse_task_history_row(self, row: sqlite3.Row) -> dict:
total_items = row['total_items'] or 0
success_items = row['success_items'] or 0
exception_items = total_items - success_items
return {
'task_id': row['task_id'],
'started_at': row['started_at'],
'status': row['status'],
'total_items': total_items,
'success_items': success_items,
'exception_items': exception_items,
'report_status': (
'success'
if row['status'] == TASK_STATUS_COMPLETED and exception_items == 0
else 'warning'
)
}