1210 lines
38 KiB
Python
1210 lines
38 KiB
Python
import json
|
|
import sqlite3
|
|
from pathlib import Path
|
|
from uuid import uuid4
|
|
|
|
from .task_constants import (
|
|
ACTIVE_TASK_STATUSES,
|
|
SCAN_PROGRESS_LOG_LIMIT,
|
|
STAGE_STATUS_FAILED,
|
|
TASK_STATUS_COMPLETED,
|
|
TASK_STATUS_FAILED,
|
|
TASK_STATUS_PENDING,
|
|
TASK_TYPE_INGEST,
|
|
current_timestamp,
|
|
create_empty_dedupe_stats,
|
|
create_empty_match_stats,
|
|
create_empty_organize_stats,
|
|
create_empty_preprocess_stats,
|
|
create_empty_repair_stats,
|
|
create_empty_scan_stats,
|
|
create_empty_task_stats,
|
|
create_pending_repair_stage_states,
|
|
create_pending_stage_states
|
|
)
|
|
|
|
|
|
class TaskConflictError(Exception):
|
|
def __init__(self, active_task_id: str):
|
|
super().__init__('Task already running')
|
|
self.active_task_id = active_task_id
|
|
|
|
|
|
class TaskNotFoundError(Exception):
|
|
pass
|
|
|
|
|
|
class TaskStore:
|
|
def __init__(self, db_path: Path):
|
|
self.db_path = Path(db_path)
|
|
self.db_path.parent.mkdir(parents=True, exist_ok=True)
|
|
self._initialize()
|
|
|
|
def _connect(self):
|
|
connection = sqlite3.connect(self.db_path, check_same_thread=False)
|
|
connection.row_factory = sqlite3.Row
|
|
connection.execute('PRAGMA foreign_keys = ON')
|
|
return connection
|
|
|
|
def _initialize(self):
|
|
with self._connect() as connection:
|
|
connection.executescript(
|
|
'''
|
|
CREATE TABLE IF NOT EXISTS task_runs (
|
|
id TEXT PRIMARY KEY,
|
|
task_type TEXT NOT NULL DEFAULT 'ingest',
|
|
trigger_source TEXT NOT NULL,
|
|
source_task_id TEXT,
|
|
status TEXT NOT NULL,
|
|
current_stage TEXT NOT NULL,
|
|
stage_states_json TEXT NOT NULL,
|
|
config_snapshot_json TEXT NOT NULL,
|
|
stats_json TEXT NOT NULL,
|
|
repair_plan_json TEXT,
|
|
error_message TEXT,
|
|
started_at TEXT NOT NULL,
|
|
completed_at TEXT,
|
|
updated_at TEXT NOT NULL
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_task_runs_status_updated
|
|
ON task_runs (status, updated_at DESC);
|
|
|
|
CREATE TABLE IF NOT EXISTS task_items (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
task_id TEXT NOT NULL,
|
|
original_path TEXT NOT NULL,
|
|
relative_path TEXT NOT NULL,
|
|
filename TEXT NOT NULL,
|
|
extension TEXT NOT NULL,
|
|
size_bytes INTEGER,
|
|
modified_at TEXT,
|
|
local_cover TEXT,
|
|
local_lyric TEXT,
|
|
scan_status TEXT NOT NULL,
|
|
scan_reason TEXT,
|
|
scan_message TEXT,
|
|
created_at TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL,
|
|
FOREIGN KEY (task_id) REFERENCES task_runs(id) ON DELETE CASCADE
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS task_logs (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
task_id TEXT NOT NULL,
|
|
stage TEXT NOT NULL,
|
|
level TEXT NOT NULL,
|
|
event_type TEXT NOT NULL,
|
|
message TEXT NOT NULL,
|
|
payload_json TEXT,
|
|
created_at TEXT NOT NULL,
|
|
FOREIGN KEY (task_id) REFERENCES task_runs(id) ON DELETE CASCADE
|
|
);
|
|
'''
|
|
)
|
|
self._ensure_task_items_columns(connection)
|
|
self._ensure_indexes(connection)
|
|
connection.commit()
|
|
|
|
def _ensure_task_items_columns(self, connection: sqlite3.Connection):
|
|
existing_columns = {
|
|
row['name']
|
|
for row in connection.execute("PRAGMA table_info('task_items')").fetchall()
|
|
}
|
|
|
|
column_specs = {
|
|
'parent_item_id': 'INTEGER',
|
|
'is_active': 'INTEGER NOT NULL DEFAULT 1',
|
|
'current_file_path': 'TEXT',
|
|
'preprocess_status': "TEXT NOT NULL DEFAULT 'pending'",
|
|
'preprocess_reason': 'TEXT',
|
|
'preprocess_message': 'TEXT',
|
|
'audio_props_json': 'TEXT',
|
|
'original_tags_json': 'TEXT',
|
|
'preprocess_artifacts_json': 'TEXT',
|
|
'acoustic_fingerprint': 'TEXT',
|
|
'fingerprint_duration_seconds': 'REAL',
|
|
'match_status': "TEXT NOT NULL DEFAULT 'pending'",
|
|
'match_reason': 'TEXT',
|
|
'match_message': 'TEXT',
|
|
'match_source': 'TEXT',
|
|
'match_confidence': 'REAL',
|
|
'match_is_authoritative': 'INTEGER NOT NULL DEFAULT 0',
|
|
'matched_metadata_json': 'TEXT',
|
|
'match_candidates_json': 'TEXT',
|
|
'match_enrichment_json': 'TEXT',
|
|
'dedupe_status': "TEXT NOT NULL DEFAULT 'pending'",
|
|
'dedupe_reason': 'TEXT',
|
|
'dedupe_message': 'TEXT',
|
|
'dedupe_group_key': 'TEXT',
|
|
'duplicate_of_path': 'TEXT',
|
|
'duplicate_of_item_id': 'INTEGER',
|
|
'dedupe_decision_json': 'TEXT',
|
|
'organize_status': "TEXT NOT NULL DEFAULT 'pending'",
|
|
'organize_reason': 'TEXT',
|
|
'organize_message': 'TEXT',
|
|
'library_relative_path': 'TEXT',
|
|
'library_file_path': 'TEXT',
|
|
'trash_file_path': 'TEXT',
|
|
'organize_decision_json': 'TEXT',
|
|
'exception_resolution_status': "TEXT NOT NULL DEFAULT 'open'",
|
|
'exception_resolution_json': 'TEXT',
|
|
'last_repair_task_id': 'TEXT'
|
|
}
|
|
|
|
task_run_columns = {
|
|
row['name']
|
|
for row in connection.execute("PRAGMA table_info('task_runs')").fetchall()
|
|
}
|
|
|
|
task_run_specs = {
|
|
'task_type': "TEXT NOT NULL DEFAULT 'ingest'",
|
|
'source_task_id': 'TEXT',
|
|
'repair_plan_json': 'TEXT'
|
|
}
|
|
|
|
for column_name, column_sql in task_run_specs.items():
|
|
if column_name not in task_run_columns:
|
|
connection.execute(
|
|
f'ALTER TABLE task_runs ADD COLUMN {column_name} {column_sql}'
|
|
)
|
|
|
|
for column_name, column_sql in column_specs.items():
|
|
if column_name not in existing_columns:
|
|
connection.execute(
|
|
f'ALTER TABLE task_items ADD COLUMN {column_name} {column_sql}'
|
|
)
|
|
|
|
connection.execute(
|
|
'''
|
|
UPDATE task_items
|
|
SET current_file_path = original_path
|
|
WHERE current_file_path IS NULL
|
|
'''
|
|
)
|
|
|
|
def _ensure_indexes(self, connection: sqlite3.Connection):
|
|
connection.executescript(
|
|
'''
|
|
CREATE INDEX IF NOT EXISTS idx_task_items_task_created
|
|
ON task_items (task_id, created_at ASC);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_task_items_task_status
|
|
ON task_items (task_id, scan_status);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_task_items_task_preprocess
|
|
ON task_items (task_id, preprocess_status, is_active);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_task_items_task_match
|
|
ON task_items (task_id, match_status, is_active);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_task_items_task_dedupe
|
|
ON task_items (task_id, dedupe_status, is_active);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_task_items_task_organize
|
|
ON task_items (task_id, organize_status, is_active);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_task_logs_task_id
|
|
ON task_logs (task_id, id ASC);
|
|
'''
|
|
)
|
|
|
|
def fail_stale_active_tasks(self) -> list[str]:
|
|
failed_task_ids: list[str] = []
|
|
timestamp = current_timestamp()
|
|
|
|
with self._connect() as connection:
|
|
connection.isolation_level = None
|
|
connection.execute('BEGIN IMMEDIATE')
|
|
stale_rows = connection.execute(
|
|
'''
|
|
SELECT id, stage_states_json
|
|
FROM task_runs
|
|
WHERE status IN (?, ?)
|
|
''',
|
|
ACTIVE_TASK_STATUSES
|
|
).fetchall()
|
|
|
|
for row in stale_rows:
|
|
stage_states = json.loads(row['stage_states_json'])
|
|
repaired_stage_states = {
|
|
stage_id: (
|
|
STAGE_STATUS_FAILED
|
|
if stage_status in ('pending', 'running')
|
|
else stage_status
|
|
)
|
|
for stage_id, stage_status in stage_states.items()
|
|
}
|
|
connection.execute(
|
|
'''
|
|
UPDATE task_runs
|
|
SET status = ?, stage_states_json = ?, error_message = ?, completed_at = ?, updated_at = ?
|
|
WHERE id = ?
|
|
''',
|
|
(
|
|
TASK_STATUS_FAILED,
|
|
json.dumps(repaired_stage_states),
|
|
'Service restarted unexpectedly',
|
|
timestamp,
|
|
timestamp,
|
|
row['id']
|
|
)
|
|
)
|
|
connection.execute(
|
|
'''
|
|
INSERT INTO task_logs (task_id, stage, level, event_type, message, payload_json, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
''',
|
|
(
|
|
row['id'],
|
|
'system',
|
|
'error',
|
|
'task.failed',
|
|
'Service restarted unexpectedly',
|
|
None,
|
|
timestamp
|
|
)
|
|
)
|
|
failed_task_ids.append(row['id'])
|
|
|
|
connection.commit()
|
|
|
|
return failed_task_ids
|
|
|
|
def create_task_if_idle(
|
|
self,
|
|
config_snapshot: dict,
|
|
trigger_source: str = 'manual',
|
|
*,
|
|
task_type: str = TASK_TYPE_INGEST,
|
|
source_task_id: str | None = None,
|
|
repair_plan_json: dict | None = None
|
|
) -> dict:
|
|
task_id = str(uuid4())
|
|
timestamp = current_timestamp()
|
|
if task_type == TASK_TYPE_INGEST:
|
|
stage_states = create_pending_stage_states()
|
|
stats = create_empty_task_stats()
|
|
initial_stage = 'scan'
|
|
else:
|
|
stage_states = create_pending_repair_stage_states()
|
|
stats = create_empty_repair_stats()
|
|
initial_stage = 'prepare'
|
|
|
|
with self._connect() as connection:
|
|
connection.isolation_level = None
|
|
connection.execute('BEGIN IMMEDIATE')
|
|
active_row = connection.execute(
|
|
'''
|
|
SELECT id
|
|
FROM task_runs
|
|
WHERE status IN (?, ?)
|
|
AND task_type = ?
|
|
ORDER BY started_at DESC
|
|
LIMIT 1
|
|
''',
|
|
(*ACTIVE_TASK_STATUSES, task_type)
|
|
).fetchone()
|
|
|
|
if active_row is not None:
|
|
connection.rollback()
|
|
raise TaskConflictError(active_row['id'])
|
|
|
|
connection.execute(
|
|
'''
|
|
INSERT INTO task_runs (
|
|
id, task_type, trigger_source, source_task_id, status, current_stage, stage_states_json,
|
|
config_snapshot_json, stats_json, repair_plan_json, error_message, started_at, completed_at, updated_at
|
|
)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
''',
|
|
(
|
|
task_id,
|
|
task_type,
|
|
trigger_source,
|
|
source_task_id,
|
|
TASK_STATUS_PENDING,
|
|
initial_stage,
|
|
json.dumps(stage_states),
|
|
json.dumps(config_snapshot),
|
|
json.dumps(stats),
|
|
json.dumps(repair_plan_json) if repair_plan_json is not None else None,
|
|
None,
|
|
timestamp,
|
|
None,
|
|
timestamp
|
|
)
|
|
)
|
|
connection.commit()
|
|
|
|
return self.get_task(task_id)
|
|
|
|
def get_active_task(self, task_type: str = TASK_TYPE_INGEST) -> dict | None:
|
|
with self._connect() as connection:
|
|
row = connection.execute(
|
|
'''
|
|
SELECT *
|
|
FROM task_runs
|
|
WHERE status IN (?, ?)
|
|
AND task_type = ?
|
|
ORDER BY started_at DESC
|
|
LIMIT 1
|
|
''',
|
|
(*ACTIVE_TASK_STATUSES, task_type)
|
|
).fetchone()
|
|
|
|
return self._parse_task(row) if row is not None else None
|
|
|
|
def get_latest_task(self, task_type: str | None = TASK_TYPE_INGEST) -> dict | None:
|
|
with self._connect() as connection:
|
|
if task_type is None:
|
|
row = connection.execute(
|
|
'''
|
|
SELECT *
|
|
FROM task_runs
|
|
ORDER BY started_at DESC
|
|
LIMIT 1
|
|
'''
|
|
).fetchone()
|
|
else:
|
|
row = connection.execute(
|
|
'''
|
|
SELECT *
|
|
FROM task_runs
|
|
WHERE task_type = ?
|
|
ORDER BY started_at DESC
|
|
LIMIT 1
|
|
''',
|
|
(task_type,)
|
|
).fetchone()
|
|
|
|
return self._parse_task(row) if row is not None else None
|
|
|
|
def get_task(self, task_id: str) -> dict:
|
|
with self._connect() as connection:
|
|
row = connection.execute(
|
|
'SELECT * FROM task_runs WHERE id = ?',
|
|
(task_id,)
|
|
).fetchone()
|
|
|
|
if row is None:
|
|
raise TaskNotFoundError(task_id)
|
|
|
|
return self._parse_task(row)
|
|
|
|
def update_task(
|
|
self,
|
|
task_id: str,
|
|
*,
|
|
status: str | None = None,
|
|
current_stage: str | None = None,
|
|
stage_states: dict[str, str] | None = None,
|
|
stats: dict | None = None,
|
|
error_message: str | None = None,
|
|
completed_at: str | None = None
|
|
) -> dict:
|
|
assignments: list[str] = []
|
|
values: list[object] = []
|
|
|
|
if status is not None:
|
|
assignments.append('status = ?')
|
|
values.append(status)
|
|
if current_stage is not None:
|
|
assignments.append('current_stage = ?')
|
|
values.append(current_stage)
|
|
if stage_states is not None:
|
|
assignments.append('stage_states_json = ?')
|
|
values.append(json.dumps(stage_states))
|
|
if stats is not None:
|
|
assignments.append('stats_json = ?')
|
|
values.append(json.dumps(stats))
|
|
if error_message is not None or completed_at is not None:
|
|
assignments.append('error_message = ?')
|
|
values.append(error_message)
|
|
if completed_at is not None:
|
|
assignments.append('completed_at = ?')
|
|
values.append(completed_at)
|
|
|
|
assignments.append('updated_at = ?')
|
|
values.append(current_timestamp())
|
|
values.append(task_id)
|
|
|
|
with self._connect() as connection:
|
|
connection.execute(
|
|
f'UPDATE task_runs SET {", ".join(assignments)} WHERE id = ?',
|
|
values
|
|
)
|
|
connection.commit()
|
|
|
|
return self.get_task(task_id)
|
|
|
|
def append_log(
|
|
self,
|
|
task_id: str,
|
|
stage: str,
|
|
level: str,
|
|
event_type: str,
|
|
message: str,
|
|
payload: dict | None = None
|
|
) -> dict:
|
|
timestamp = current_timestamp()
|
|
|
|
with self._connect() as connection:
|
|
cursor = connection.execute(
|
|
'''
|
|
INSERT INTO task_logs (task_id, stage, level, event_type, message, payload_json, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
''',
|
|
(
|
|
task_id,
|
|
stage,
|
|
level,
|
|
event_type,
|
|
message,
|
|
json.dumps(payload) if payload is not None else None,
|
|
timestamp
|
|
)
|
|
)
|
|
connection.commit()
|
|
row = connection.execute(
|
|
'SELECT * FROM task_logs WHERE id = ?',
|
|
(cursor.lastrowid,)
|
|
).fetchone()
|
|
|
|
return self._parse_log(row)
|
|
|
|
def insert_task_item(
|
|
self,
|
|
task_id: str,
|
|
*,
|
|
original_path: str,
|
|
relative_path: str,
|
|
filename: str,
|
|
extension: str,
|
|
size_bytes: int | None,
|
|
modified_at: str | None,
|
|
local_cover: str | None,
|
|
local_lyric: str | None,
|
|
scan_status: str,
|
|
scan_reason: str | None,
|
|
scan_message: str | None,
|
|
parent_item_id: int | None = None,
|
|
is_active: int = 1,
|
|
current_file_path: str | None = None,
|
|
preprocess_status: str = 'pending',
|
|
preprocess_reason: str | None = None,
|
|
preprocess_message: str | None = None,
|
|
audio_props_json: dict | None = None,
|
|
original_tags_json: dict | None = None,
|
|
preprocess_artifacts_json: dict | None = None,
|
|
acoustic_fingerprint: str | None = None,
|
|
fingerprint_duration_seconds: float | None = None,
|
|
match_status: str = 'pending',
|
|
match_reason: str | None = None,
|
|
match_message: str | None = None,
|
|
match_source: str | None = None,
|
|
match_confidence: float | None = None,
|
|
match_is_authoritative: int = 0,
|
|
matched_metadata_json: dict | None = None,
|
|
match_candidates_json: list[dict] | None = None,
|
|
match_enrichment_json: dict | None = None,
|
|
dedupe_status: str = 'pending',
|
|
dedupe_reason: str | None = None,
|
|
dedupe_message: str | None = None,
|
|
dedupe_group_key: str | None = None,
|
|
duplicate_of_path: str | None = None,
|
|
duplicate_of_item_id: int | None = None,
|
|
dedupe_decision_json: dict | None = None,
|
|
organize_status: str = 'pending',
|
|
organize_reason: str | None = None,
|
|
organize_message: str | None = None,
|
|
library_relative_path: str | None = None,
|
|
library_file_path: str | None = None,
|
|
trash_file_path: str | None = None,
|
|
organize_decision_json: dict | None = None,
|
|
exception_resolution_status: str = 'open',
|
|
exception_resolution_json: dict | None = None,
|
|
last_repair_task_id: str | None = None
|
|
) -> dict:
|
|
timestamp = current_timestamp()
|
|
current_file_path = current_file_path or original_path
|
|
|
|
with self._connect() as connection:
|
|
cursor = connection.execute(
|
|
'''
|
|
INSERT INTO task_items (
|
|
task_id, parent_item_id, is_active, original_path, current_file_path, relative_path,
|
|
filename, extension, size_bytes, modified_at, local_cover, local_lyric, scan_status,
|
|
scan_reason, scan_message, preprocess_status, preprocess_reason, preprocess_message,
|
|
audio_props_json, original_tags_json, preprocess_artifacts_json, acoustic_fingerprint,
|
|
fingerprint_duration_seconds, match_status, match_reason, match_message, match_source,
|
|
match_confidence, match_is_authoritative, matched_metadata_json, match_candidates_json,
|
|
match_enrichment_json, dedupe_status, dedupe_reason, dedupe_message,
|
|
dedupe_group_key, duplicate_of_path, duplicate_of_item_id, dedupe_decision_json,
|
|
organize_status, organize_reason, organize_message, library_relative_path,
|
|
library_file_path, trash_file_path, organize_decision_json,
|
|
exception_resolution_status, exception_resolution_json, last_repair_task_id,
|
|
created_at, updated_at
|
|
)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
''',
|
|
(
|
|
task_id,
|
|
parent_item_id,
|
|
is_active,
|
|
original_path,
|
|
current_file_path,
|
|
relative_path,
|
|
filename,
|
|
extension,
|
|
size_bytes,
|
|
modified_at,
|
|
local_cover,
|
|
local_lyric,
|
|
scan_status,
|
|
scan_reason,
|
|
scan_message,
|
|
preprocess_status,
|
|
preprocess_reason,
|
|
preprocess_message,
|
|
json.dumps(audio_props_json) if audio_props_json is not None else None,
|
|
json.dumps(original_tags_json) if original_tags_json is not None else None,
|
|
json.dumps(preprocess_artifacts_json) if preprocess_artifacts_json is not None else None,
|
|
acoustic_fingerprint,
|
|
fingerprint_duration_seconds,
|
|
match_status,
|
|
match_reason,
|
|
match_message,
|
|
match_source,
|
|
match_confidence,
|
|
match_is_authoritative,
|
|
json.dumps(matched_metadata_json) if matched_metadata_json is not None else None,
|
|
json.dumps(match_candidates_json) if match_candidates_json is not None else None,
|
|
json.dumps(match_enrichment_json) if match_enrichment_json is not None else None,
|
|
dedupe_status,
|
|
dedupe_reason,
|
|
dedupe_message,
|
|
dedupe_group_key,
|
|
duplicate_of_path,
|
|
duplicate_of_item_id,
|
|
json.dumps(dedupe_decision_json) if dedupe_decision_json is not None else None,
|
|
organize_status,
|
|
organize_reason,
|
|
organize_message,
|
|
library_relative_path,
|
|
library_file_path,
|
|
trash_file_path,
|
|
json.dumps(organize_decision_json) if organize_decision_json is not None else None,
|
|
exception_resolution_status,
|
|
json.dumps(exception_resolution_json) if exception_resolution_json is not None else None,
|
|
last_repair_task_id,
|
|
timestamp,
|
|
timestamp
|
|
)
|
|
)
|
|
connection.commit()
|
|
row = connection.execute(
|
|
'SELECT * FROM task_items WHERE id = ?',
|
|
(cursor.lastrowid,)
|
|
).fetchone()
|
|
|
|
return self._parse_item(row)
|
|
|
|
def update_task_item(self, item_id: int, **fields) -> dict:
|
|
json_fields = {
|
|
'audio_props_json',
|
|
'original_tags_json',
|
|
'preprocess_artifacts_json',
|
|
'matched_metadata_json',
|
|
'match_candidates_json',
|
|
'match_enrichment_json',
|
|
'dedupe_decision_json',
|
|
'organize_decision_json',
|
|
'exception_resolution_json'
|
|
}
|
|
assignments: list[str] = []
|
|
values: list[object] = []
|
|
|
|
for key, value in fields.items():
|
|
assignments.append(f'{key} = ?')
|
|
if key in json_fields and value is not None:
|
|
values.append(json.dumps(value))
|
|
else:
|
|
values.append(value)
|
|
|
|
assignments.append('updated_at = ?')
|
|
values.append(current_timestamp())
|
|
values.append(item_id)
|
|
|
|
with self._connect() as connection:
|
|
connection.execute(
|
|
f'UPDATE task_items SET {", ".join(assignments)} WHERE id = ?',
|
|
values
|
|
)
|
|
connection.commit()
|
|
row = connection.execute(
|
|
'SELECT * FROM task_items WHERE id = ?',
|
|
(item_id,)
|
|
).fetchone()
|
|
|
|
if row is None:
|
|
raise TaskNotFoundError(item_id)
|
|
|
|
return self._parse_item(row)
|
|
|
|
def list_preprocess_candidate_items(self, task_id: str) -> list[dict]:
|
|
with self._connect() as connection:
|
|
rows = connection.execute(
|
|
'''
|
|
SELECT *
|
|
FROM task_items
|
|
WHERE task_id = ?
|
|
AND is_active = 1
|
|
AND scan_status = 'queued'
|
|
AND preprocess_status IN ('pending', 'running')
|
|
ORDER BY created_at ASC, id ASC
|
|
''',
|
|
(task_id,)
|
|
).fetchall()
|
|
|
|
return [self._parse_item(row) for row in rows]
|
|
|
|
def list_match_candidate_items(self, task_id: str) -> list[dict]:
|
|
with self._connect() as connection:
|
|
rows = connection.execute(
|
|
'''
|
|
SELECT *
|
|
FROM task_items
|
|
WHERE task_id = ?
|
|
AND is_active = 1
|
|
AND scan_status = 'queued'
|
|
AND preprocess_status IN ('completed', 'warning')
|
|
AND match_status IN ('pending', 'running')
|
|
ORDER BY created_at ASC, id ASC
|
|
''',
|
|
(task_id,)
|
|
).fetchall()
|
|
|
|
return [self._parse_item(row) for row in rows]
|
|
|
|
def list_dedupe_candidate_items(self, task_id: str) -> list[dict]:
|
|
with self._connect() as connection:
|
|
rows = connection.execute(
|
|
'''
|
|
SELECT *
|
|
FROM task_items
|
|
WHERE task_id = ?
|
|
AND is_active = 1
|
|
AND match_status IN ('matched', 'matched_fallback')
|
|
AND dedupe_status IN ('pending', 'running')
|
|
ORDER BY created_at ASC, id ASC
|
|
''',
|
|
(task_id,)
|
|
).fetchall()
|
|
|
|
return [self._parse_item(row) for row in rows]
|
|
|
|
def list_organize_candidate_items(self, task_id: str) -> list[dict]:
|
|
with self._connect() as connection:
|
|
rows = connection.execute(
|
|
'''
|
|
SELECT *
|
|
FROM task_items
|
|
WHERE task_id = ?
|
|
AND is_active = 1
|
|
AND match_status IN ('matched', 'matched_fallback')
|
|
AND dedupe_status IN ('unique', 'duplicate_replaced')
|
|
AND organize_status IN ('pending', 'running')
|
|
ORDER BY created_at ASC, id ASC
|
|
''',
|
|
(task_id,)
|
|
).fetchall()
|
|
|
|
return [self._parse_item(row) for row in rows]
|
|
|
|
def list_task_items(
|
|
self,
|
|
task_id: str,
|
|
scan_status: str | None,
|
|
page: int,
|
|
page_size: int,
|
|
*,
|
|
preprocess_status: str | None = None,
|
|
match_status: str | None = None,
|
|
dedupe_status: str | None = None,
|
|
organize_status: str | None = None,
|
|
active_only: bool = False
|
|
) -> dict:
|
|
offset = (page - 1) * page_size
|
|
where_clauses = ['task_id = ?']
|
|
params: list[object] = [task_id]
|
|
|
|
if scan_status:
|
|
where_clauses.append('scan_status = ?')
|
|
params.append(scan_status)
|
|
if preprocess_status:
|
|
where_clauses.append('preprocess_status = ?')
|
|
params.append(preprocess_status)
|
|
if match_status:
|
|
where_clauses.append('match_status = ?')
|
|
params.append(match_status)
|
|
if dedupe_status:
|
|
where_clauses.append('dedupe_status = ?')
|
|
params.append(dedupe_status)
|
|
if organize_status:
|
|
where_clauses.append('organize_status = ?')
|
|
params.append(organize_status)
|
|
if active_only:
|
|
where_clauses.append('is_active = 1')
|
|
|
|
where_clause = f"WHERE {' AND '.join(where_clauses)}"
|
|
|
|
with self._connect() as connection:
|
|
total = connection.execute(
|
|
f'SELECT COUNT(*) AS total FROM task_items {where_clause}',
|
|
params
|
|
).fetchone()['total']
|
|
rows = connection.execute(
|
|
f'''
|
|
SELECT *
|
|
FROM task_items
|
|
{where_clause}
|
|
ORDER BY created_at ASC, id ASC
|
|
LIMIT ? OFFSET ?
|
|
''',
|
|
[*params, page_size, offset]
|
|
).fetchall()
|
|
|
|
return {
|
|
'items': [self._parse_item(row) for row in rows],
|
|
'page': page,
|
|
'page_size': page_size,
|
|
'total': total
|
|
}
|
|
|
|
def list_all_task_items(self, task_id: str, *, active_only: bool = False) -> list[dict]:
|
|
where_clauses = ['task_id = ?']
|
|
params: list[object] = [task_id]
|
|
if active_only:
|
|
where_clauses.append('is_active = 1')
|
|
|
|
with self._connect() as connection:
|
|
rows = connection.execute(
|
|
f'''
|
|
SELECT *
|
|
FROM task_items
|
|
WHERE {' AND '.join(where_clauses)}
|
|
ORDER BY created_at ASC, id ASC
|
|
''',
|
|
params
|
|
).fetchall()
|
|
|
|
return [self._parse_item(row) for row in rows]
|
|
|
|
def list_task_history(self, page: int, page_size: int) -> dict:
|
|
offset = (page - 1) * page_size
|
|
|
|
with self._connect() as connection:
|
|
total = connection.execute(
|
|
'''
|
|
SELECT COUNT(*) AS total
|
|
FROM task_runs
|
|
WHERE status IN (?, ?)
|
|
AND task_type = 'ingest'
|
|
''',
|
|
(TASK_STATUS_COMPLETED, TASK_STATUS_FAILED)
|
|
).fetchone()['total']
|
|
rows = connection.execute(
|
|
'''
|
|
WITH task_item_stats AS (
|
|
SELECT
|
|
task_id,
|
|
COUNT(*) AS total_items,
|
|
SUM(CASE WHEN organize_status = 'organized' THEN 1 ELSE 0 END) AS success_items
|
|
FROM task_items
|
|
GROUP BY task_id
|
|
)
|
|
SELECT
|
|
task_runs.id AS task_id,
|
|
task_runs.started_at,
|
|
task_runs.status,
|
|
COALESCE(task_item_stats.total_items, 0) AS total_items,
|
|
COALESCE(task_item_stats.success_items, 0) AS success_items
|
|
FROM task_runs
|
|
LEFT JOIN task_item_stats ON task_item_stats.task_id = task_runs.id
|
|
WHERE task_runs.status IN (?, ?)
|
|
AND task_runs.task_type = 'ingest'
|
|
ORDER BY task_runs.started_at DESC, task_runs.id DESC
|
|
LIMIT ? OFFSET ?
|
|
''',
|
|
(TASK_STATUS_COMPLETED, TASK_STATUS_FAILED, page_size, offset)
|
|
).fetchall()
|
|
|
|
return {
|
|
'items': [self._parse_task_history_row(row) for row in rows],
|
|
'page': page,
|
|
'page_size': page_size,
|
|
'total': total
|
|
}
|
|
|
|
def list_task_logs(self, task_id: str, page: int, page_size: int) -> dict:
|
|
offset = (page - 1) * page_size
|
|
|
|
with self._connect() as connection:
|
|
total = connection.execute(
|
|
'SELECT COUNT(*) AS total FROM task_logs WHERE task_id = ?',
|
|
(task_id,)
|
|
).fetchone()['total']
|
|
rows = connection.execute(
|
|
'''
|
|
SELECT *
|
|
FROM task_logs
|
|
WHERE task_id = ?
|
|
ORDER BY id ASC
|
|
LIMIT ? OFFSET ?
|
|
''',
|
|
(task_id, page_size, offset)
|
|
).fetchall()
|
|
|
|
return {
|
|
'logs': [self._parse_log(row) for row in rows],
|
|
'page': page,
|
|
'page_size': page_size,
|
|
'total': total
|
|
}
|
|
|
|
def list_library_provenance_items(self) -> list[dict]:
|
|
with self._connect() as connection:
|
|
rows = connection.execute(
|
|
'''
|
|
SELECT
|
|
task_id,
|
|
library_file_path,
|
|
library_relative_path,
|
|
updated_at AS organized_at,
|
|
match_source,
|
|
match_confidence,
|
|
dedupe_status
|
|
FROM task_items
|
|
WHERE organize_status = 'organized'
|
|
ORDER BY updated_at DESC, id DESC
|
|
'''
|
|
).fetchall()
|
|
|
|
return [
|
|
{
|
|
'task_id': row['task_id'],
|
|
'library_file_path': row['library_file_path'],
|
|
'library_relative_path': row['library_relative_path'],
|
|
'organized_at': row['organized_at'],
|
|
'match_source': row['match_source'],
|
|
'match_confidence': row['match_confidence'],
|
|
'dedupe_status': row['dedupe_status']
|
|
}
|
|
for row in rows
|
|
]
|
|
|
|
def list_exception_source_items(self, resolution_status: str = 'open') -> list[dict]:
|
|
if resolution_status not in {'open', 'resolved', 'ignored', 'all', 'planned'}:
|
|
raise ValueError(f'Unsupported resolution status: {resolution_status}')
|
|
|
|
resolution_clause = ''
|
|
params: list[object] = []
|
|
if resolution_status == 'open':
|
|
resolution_clause = "AND task_items.exception_resolution_status IN ('open', 'planned')"
|
|
elif resolution_status != 'all':
|
|
resolution_clause = 'AND task_items.exception_resolution_status = ?'
|
|
params.append(resolution_status)
|
|
|
|
with self._connect() as connection:
|
|
rows = connection.execute(
|
|
'''
|
|
SELECT
|
|
task_items.*,
|
|
task_runs.started_at AS task_started_at
|
|
FROM task_items
|
|
JOIN task_runs ON task_runs.id = task_items.task_id
|
|
WHERE (
|
|
task_items.organize_status IN ('trashed', 'failed')
|
|
OR task_items.dedupe_status IN ('duplicate_trashed', 'failed')
|
|
OR task_items.match_status IN ('low_score', 'failed', 'not_found')
|
|
OR (
|
|
task_items.preprocess_status = 'failed'
|
|
AND task_items.preprocess_reason = 'convert_failed'
|
|
)
|
|
OR (
|
|
task_items.preprocess_status = 'warning'
|
|
AND task_items.preprocess_reason LIKE '%metadata_failed%'
|
|
)
|
|
OR (
|
|
task_items.exception_resolution_status IN ('open', 'planned')
|
|
AND task_items.exception_resolution_json IS NOT NULL
|
|
AND (
|
|
task_items.exception_resolution_json LIKE '%"workflow_state"%candidate_selected%'
|
|
OR task_items.exception_resolution_json LIKE '%"workflow_state"%ready_to_ingest%'
|
|
)
|
|
)
|
|
)
|
|
''' + resolution_clause + '''
|
|
ORDER BY task_items.updated_at DESC, task_items.id DESC
|
|
''',
|
|
params
|
|
).fetchall()
|
|
|
|
return [self._parse_item_with_task_context(row) for row in rows]
|
|
|
|
def get_exception_source_item(self, item_id: int) -> dict | None:
|
|
with self._connect() as connection:
|
|
row = connection.execute(
|
|
'''
|
|
SELECT
|
|
task_items.*,
|
|
task_runs.started_at AS task_started_at
|
|
FROM task_items
|
|
JOIN task_runs ON task_runs.id = task_items.task_id
|
|
WHERE task_items.id = ?
|
|
''',
|
|
(item_id,)
|
|
).fetchone()
|
|
|
|
if row is None:
|
|
return None
|
|
|
|
return self._parse_item_with_task_context(row)
|
|
|
|
def get_exception_source_items_by_ids(self, item_ids: list[int]) -> list[dict]:
|
|
if not item_ids:
|
|
return []
|
|
|
|
placeholders = ', '.join('?' for _ in item_ids)
|
|
with self._connect() as connection:
|
|
rows = connection.execute(
|
|
f'''
|
|
SELECT
|
|
task_items.*,
|
|
task_runs.started_at AS task_started_at
|
|
FROM task_items
|
|
JOIN task_runs ON task_runs.id = task_items.task_id
|
|
WHERE task_items.id IN ({placeholders})
|
|
''',
|
|
item_ids
|
|
).fetchall()
|
|
|
|
return [self._parse_item_with_task_context(row) for row in rows]
|
|
|
|
def get_task_snapshot(self, task_id: str) -> dict:
|
|
task = self.get_task(task_id)
|
|
|
|
with self._connect() as connection:
|
|
total_logs = connection.execute(
|
|
'SELECT COUNT(*) AS total FROM task_logs WHERE task_id = ?',
|
|
(task_id,)
|
|
).fetchone()['total']
|
|
recent_rows = connection.execute(
|
|
'''
|
|
SELECT *
|
|
FROM task_logs
|
|
WHERE task_id = ?
|
|
ORDER BY id DESC
|
|
LIMIT ?
|
|
''',
|
|
(task_id, SCAN_PROGRESS_LOG_LIMIT)
|
|
).fetchall()
|
|
|
|
recent_logs = [self._parse_log(row) for row in reversed(recent_rows)]
|
|
|
|
return {
|
|
'task': task,
|
|
'recent_logs': recent_logs,
|
|
'recent_logs_limit': SCAN_PROGRESS_LOG_LIMIT,
|
|
'has_more_logs': total_logs > len(recent_logs),
|
|
'latest_log_id': recent_logs[-1]['id'] if recent_logs else None
|
|
}
|
|
|
|
def _parse_task(self, row: sqlite3.Row | None) -> dict | None:
|
|
if row is None:
|
|
return None
|
|
|
|
raw_stats = json.loads(row['stats_json'])
|
|
if row['task_type'] != TASK_TYPE_INGEST:
|
|
stats = {
|
|
'prepare': {
|
|
**create_empty_repair_stats()['prepare'],
|
|
**(raw_stats.get('prepare') or {})
|
|
},
|
|
'execute': {
|
|
**create_empty_repair_stats()['execute'],
|
|
**(raw_stats.get('execute') or {})
|
|
}
|
|
}
|
|
elif 'scan' in raw_stats:
|
|
stats = {
|
|
'scan': {
|
|
**create_empty_scan_stats(),
|
|
**(raw_stats.get('scan') or {})
|
|
},
|
|
'preprocess': {
|
|
**create_empty_preprocess_stats(),
|
|
**(raw_stats.get('preprocess') or {})
|
|
},
|
|
'match': {
|
|
**create_empty_match_stats(),
|
|
**(raw_stats.get('match') or {})
|
|
},
|
|
'dedupe': {
|
|
**create_empty_dedupe_stats(),
|
|
**(raw_stats.get('dedupe') or {})
|
|
},
|
|
'organize': {
|
|
**create_empty_organize_stats(),
|
|
**(raw_stats.get('organize') or {})
|
|
}
|
|
}
|
|
else:
|
|
stats = {
|
|
'scan': {
|
|
**create_empty_scan_stats(),
|
|
**raw_stats
|
|
},
|
|
'preprocess': create_empty_preprocess_stats(),
|
|
'match': create_empty_match_stats(),
|
|
'dedupe': create_empty_dedupe_stats(),
|
|
'organize': create_empty_organize_stats()
|
|
}
|
|
|
|
return {
|
|
'task_id': row['id'],
|
|
'task_type': row['task_type'] or TASK_TYPE_INGEST,
|
|
'trigger_source': row['trigger_source'],
|
|
'source_task_id': row['source_task_id'],
|
|
'status': row['status'],
|
|
'current_stage': row['current_stage'],
|
|
'stage_states': json.loads(row['stage_states_json']),
|
|
'stats': stats,
|
|
'repair_plan_json': json.loads(row['repair_plan_json']) if row['repair_plan_json'] else None,
|
|
'error_message': row['error_message'],
|
|
'started_at': row['started_at'],
|
|
'completed_at': row['completed_at'],
|
|
'updated_at': row['updated_at']
|
|
}
|
|
|
|
def _parse_item(self, row: sqlite3.Row) -> dict:
|
|
return {
|
|
'id': row['id'],
|
|
'task_id': row['task_id'],
|
|
'parent_item_id': row['parent_item_id'],
|
|
'is_active': bool(row['is_active']),
|
|
'original_path': row['original_path'],
|
|
'current_file_path': row['current_file_path'] or row['original_path'],
|
|
'relative_path': row['relative_path'],
|
|
'filename': row['filename'],
|
|
'extension': row['extension'],
|
|
'size_bytes': row['size_bytes'],
|
|
'modified_at': row['modified_at'],
|
|
'local_cover': row['local_cover'],
|
|
'local_lyric': row['local_lyric'],
|
|
'scan_status': row['scan_status'],
|
|
'scan_reason': row['scan_reason'],
|
|
'scan_message': row['scan_message'],
|
|
'preprocess_status': row['preprocess_status'] or 'pending',
|
|
'preprocess_reason': row['preprocess_reason'],
|
|
'preprocess_message': row['preprocess_message'],
|
|
'audio_props_json': json.loads(row['audio_props_json']) if row['audio_props_json'] else None,
|
|
'original_tags_json': json.loads(row['original_tags_json']) if row['original_tags_json'] else None,
|
|
'preprocess_artifacts_json': (
|
|
json.loads(row['preprocess_artifacts_json'])
|
|
if row['preprocess_artifacts_json']
|
|
else None
|
|
),
|
|
'acoustic_fingerprint': row['acoustic_fingerprint'],
|
|
'fingerprint_duration_seconds': row['fingerprint_duration_seconds'],
|
|
'match_status': row['match_status'] or 'pending',
|
|
'match_reason': row['match_reason'],
|
|
'match_message': row['match_message'],
|
|
'match_source': row['match_source'],
|
|
'match_confidence': row['match_confidence'],
|
|
'match_is_authoritative': bool(row['match_is_authoritative']),
|
|
'matched_metadata_json': (
|
|
json.loads(row['matched_metadata_json'])
|
|
if row['matched_metadata_json']
|
|
else None
|
|
),
|
|
'match_candidates_json': (
|
|
json.loads(row['match_candidates_json'])
|
|
if row['match_candidates_json']
|
|
else None
|
|
),
|
|
'match_enrichment_json': (
|
|
json.loads(row['match_enrichment_json'])
|
|
if row['match_enrichment_json']
|
|
else None
|
|
),
|
|
'dedupe_status': row['dedupe_status'] or 'pending',
|
|
'dedupe_reason': row['dedupe_reason'],
|
|
'dedupe_message': row['dedupe_message'],
|
|
'dedupe_group_key': row['dedupe_group_key'],
|
|
'duplicate_of_path': row['duplicate_of_path'],
|
|
'duplicate_of_item_id': row['duplicate_of_item_id'],
|
|
'dedupe_decision_json': (
|
|
json.loads(row['dedupe_decision_json'])
|
|
if row['dedupe_decision_json']
|
|
else None
|
|
),
|
|
'organize_status': row['organize_status'] or 'pending',
|
|
'organize_reason': row['organize_reason'],
|
|
'organize_message': row['organize_message'],
|
|
'library_relative_path': row['library_relative_path'],
|
|
'library_file_path': row['library_file_path'],
|
|
'trash_file_path': row['trash_file_path'],
|
|
'organize_decision_json': (
|
|
json.loads(row['organize_decision_json'])
|
|
if row['organize_decision_json']
|
|
else None
|
|
),
|
|
'exception_resolution_status': row['exception_resolution_status'] or 'open',
|
|
'exception_resolution_json': (
|
|
json.loads(row['exception_resolution_json'])
|
|
if row['exception_resolution_json']
|
|
else None
|
|
),
|
|
'last_repair_task_id': row['last_repair_task_id'],
|
|
'created_at': row['created_at'],
|
|
'updated_at': row['updated_at']
|
|
}
|
|
|
|
def _parse_item_with_task_context(self, row: sqlite3.Row) -> dict:
|
|
item = self._parse_item(row)
|
|
item['task_started_at'] = row['task_started_at']
|
|
return item
|
|
|
|
def _parse_log(self, row: sqlite3.Row) -> dict:
|
|
return {
|
|
'id': row['id'],
|
|
'task_id': row['task_id'],
|
|
'stage': row['stage'],
|
|
'level': row['level'],
|
|
'event_type': row['event_type'],
|
|
'message': row['message'],
|
|
'payload': json.loads(row['payload_json']) if row['payload_json'] else None,
|
|
'created_at': row['created_at']
|
|
}
|
|
|
|
def _parse_task_history_row(self, row: sqlite3.Row) -> dict:
|
|
total_items = row['total_items'] or 0
|
|
success_items = row['success_items'] or 0
|
|
exception_items = total_items - success_items
|
|
|
|
return {
|
|
'task_id': row['task_id'],
|
|
'started_at': row['started_at'],
|
|
'status': row['status'],
|
|
'total_items': total_items,
|
|
'success_items': success_items,
|
|
'exception_items': exception_items,
|
|
'report_status': (
|
|
'success'
|
|
if row['status'] == TASK_STATUS_COMPLETED and exception_items == 0
|
|
else 'warning'
|
|
)
|
|
}
|