Files
2026-04-30 14:34:28 +08:00

396 lines
13 KiB
Python

import shutil
from pathlib import Path
from . import library_index
from .library_postprocess import _build_unique_destination
from .preprocessor import PreprocessItemError
from .task_constants import (
TASK_STATUS_COMPLETED,
TASK_STATUS_FAILED,
current_timestamp,
create_empty_task_stats,
create_match_failed_stage_states,
create_task_completed_stage_states
)
from .task_store import TaskConflictError
DEFAULT_SORT_BY = 'organized_at'
DEFAULT_SORT_ORDER = 'desc'
ALLOWED_SORT_FIELDS = {
'organized_at',
'modified_at',
'filename',
'title',
'artist',
'album',
'format'
}
ALLOWED_SORT_ORDERS = {'asc', 'desc'}
MANUAL_REQUEUE_MESSAGE = '从音乐库移出,等待重新匹配'
class LibraryTrackNotFoundError(Exception):
pass
class LibraryService:
def __init__(self, task_store, preprocessor, read_tags=None):
self.task_store = task_store
self.preprocessor = preprocessor
self.read_tags = read_tags or library_index.default_read_library_tags
def get_summary(self, output_dir: str) -> dict:
scanned_at = current_timestamp()
library_items = self._scan_items(output_dir)
artists = {
(item.get('matched_metadata_json') or {}).get('artist')
for item in library_items
if (item.get('matched_metadata_json') or {}).get('artist')
}
albums = {
(item.get('matched_metadata_json') or {}).get('album')
for item in library_items
if (item.get('matched_metadata_json') or {}).get('album')
}
return {
'total_tracks': len(library_items),
'total_albums': len(albums),
'total_artists': len(artists),
'suspected_duplicates': library_index.count_suspected_duplicates(library_items),
'scanned_at': scanned_at
}
def get_tracks_page(
self,
output_dir: str,
*,
q: str | None = None,
artist: str | None = None,
album: str | None = None,
format: str | None = None,
has_provenance: bool | None = None,
page: int = 1,
page_size: int = 50,
sort_by: str = DEFAULT_SORT_BY,
sort_order: str = DEFAULT_SORT_ORDER
) -> dict:
normalized_sort_by = sort_by if sort_by in ALLOWED_SORT_FIELDS else DEFAULT_SORT_BY
normalized_sort_order = sort_order.lower() if sort_order and sort_order.lower() in ALLOWED_SORT_ORDERS else DEFAULT_SORT_ORDER
tracks = self._build_track_payloads(output_dir)
filtered_tracks = self._filter_tracks(
tracks,
q=q,
artist=artist,
album=album,
format=format,
has_provenance=has_provenance
)
sorted_tracks = sorted(
filtered_tracks,
key=lambda track: self._sort_key(track, normalized_sort_by),
reverse=normalized_sort_order == 'desc'
)
offset = (page - 1) * page_size
return {
'items': sorted_tracks[offset:offset + page_size],
'page': page,
'page_size': page_size,
'total': len(sorted_tracks)
}
def move_track_to_exception(self, config_snapshot: dict, track_id: str) -> dict:
output_dir = (config_snapshot.get('output') or '').strip()
trash_dir = (config_snapshot.get('trash') or '').strip()
if not output_dir:
raise ValueError('请先配置输出音乐库目录')
if not trash_dir:
raise ValueError('请先配置回收站目录')
self._ensure_no_active_tasks()
output_root = Path(output_dir).expanduser().resolve(strict=False)
trash_root = Path(trash_dir).expanduser().resolve(strict=False)
library_item = self._find_item_by_track_id(output_dir, track_id)
if library_item is None:
raise LibraryTrackNotFoundError(track_id)
source_path = Path(library_item['library_file_path']).expanduser().resolve(strict=False)
if not source_path.exists() or not source_path.is_file():
raise LibraryTrackNotFoundError(track_id)
if not self._is_relative_to(source_path, output_root):
raise ValueError('目标文件不在输出音乐库目录内')
audio_props = library_item.get('audio_props_json') or {}
original_tags = self._read_original_tags(str(source_path))
fingerprint_payload = self._safe_calculate_fingerprint(str(source_path))
stats = create_empty_task_stats()
stats['scan']['total_found'] = 1
stats['scan']['queued'] = 1
stats['preprocess']['input_items'] = 1
stats['preprocess']['output_items'] = 1
stats['preprocess']['metadata_snapshots'] = 1
if fingerprint_payload.get('fingerprint'):
stats['preprocess']['fingerprints_ok'] = 1
else:
stats['preprocess']['fingerprints_failed'] = 1
stats['match']['input_items'] = 1
stats['match']['not_found'] = 1
task = self.task_store.create_task_if_idle(
config_snapshot,
trigger_source='manual_library_requeue'
)
task_id = task['task_id']
try:
destination = _build_unique_destination(
trash_root / 'match_failed' / task_id,
source_path.name
)
destination.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(source_path), str(destination))
trash_file_path = str(destination.resolve(strict=False))
item = self.task_store.insert_task_item(
task_id,
original_path=str(source_path),
current_file_path=trash_file_path,
relative_path=library_item['library_relative_path'],
filename=destination.name,
extension=destination.suffix.lower(),
size_bytes=destination.stat().st_size,
modified_at=library_item.get('modified_at'),
local_cover=None,
local_lyric=None,
scan_status='queued',
scan_reason=None,
scan_message=None,
preprocess_status='completed',
preprocess_reason=None,
preprocess_message='已从音乐库载入,等待重新匹配',
audio_props_json=audio_props,
original_tags_json=original_tags,
acoustic_fingerprint=fingerprint_payload.get('fingerprint'),
fingerprint_duration_seconds=(
fingerprint_payload.get('duration_seconds')
or library_item.get('fingerprint_duration_seconds')
),
match_status='not_found',
match_reason='manual_library_requeue',
match_message=MANUAL_REQUEUE_MESSAGE,
dedupe_status='pending',
organize_status='pending',
library_relative_path=library_item['library_relative_path'],
library_file_path=str(source_path),
trash_file_path=trash_file_path,
organize_decision_json={
'source_path': str(source_path),
'trashed_path': trash_file_path,
'final_action': 'manual_library_requeue'
}
)
self.task_store.update_task(
task_id,
status=TASK_STATUS_COMPLETED,
current_stage='complete',
stage_states=create_task_completed_stage_states(),
stats=stats,
completed_at=current_timestamp()
)
self.task_store.append_log(
task_id,
'match',
'warning',
'library.track_requeued',
MANUAL_REQUEUE_MESSAGE,
{
'track_id': track_id,
'library_relative_path': library_item['library_relative_path'],
'trash_file_path': trash_file_path,
'exception_id': item['id']
}
)
except Exception as error:
self.task_store.update_task(
task_id,
status=TASK_STATUS_FAILED,
current_stage='match',
stage_states=create_match_failed_stage_states(),
stats=stats,
error_message=str(error),
completed_at=current_timestamp()
)
raise
return {
'exception_id': item['id'],
'library_relative_path': library_item['library_relative_path'],
'trash_file_path': trash_file_path,
'message': '已移入异常中心,等待重新匹配'
}
def _build_track_payloads(self, output_dir: str) -> list[dict]:
provenance_by_path, provenance_by_relative_path = self._build_provenance_indexes()
payloads: list[dict] = []
for item in self._scan_items(output_dir):
metadata = item.get('matched_metadata_json') or {}
audio_props = item.get('audio_props_json') or {}
provenance = provenance_by_path.get(item['library_file_path'])
if provenance is None:
provenance = provenance_by_relative_path.get(item['library_relative_path'])
payloads.append(
{
'track_id': item['track_id'],
'library_relative_path': item['library_relative_path'],
'library_file_path': item['library_file_path'],
'filename': item['filename'],
'title': metadata.get('title'),
'artist': metadata.get('artist'),
'album': metadata.get('album'),
'album_artist': metadata.get('album_artist'),
'track_number': metadata.get('track_number'),
'disc_number': metadata.get('disc_number'),
'year': metadata.get('year'),
'duration_seconds': metadata.get('duration_seconds'),
'format': audio_props.get('format'),
'codec': audio_props.get('codec'),
'bitrate': audio_props.get('bitrate'),
'sample_rate': audio_props.get('sample_rate'),
'bit_depth': audio_props.get('bit_depth'),
'channels': audio_props.get('channels'),
'size_bytes': item.get('size_bytes'),
'modified_at': item.get('modified_at'),
'ingest_provenance': provenance
}
)
return payloads
def _build_provenance_indexes(self) -> tuple[dict[str, dict], dict[str, dict]]:
by_path: dict[str, dict] = {}
by_relative_path: dict[str, dict] = {}
for row in self.task_store.list_library_provenance_items():
payload = {
'task_id': row['task_id'],
'organized_at': row['organized_at'],
'match_source': row['match_source'],
'match_confidence': row['match_confidence'],
'dedupe_status': row['dedupe_status']
}
library_file_path = row.get('library_file_path')
library_relative_path = row.get('library_relative_path')
if library_file_path and library_file_path not in by_path:
by_path[library_file_path] = payload
if library_relative_path and library_relative_path not in by_relative_path:
by_relative_path[library_relative_path] = payload
return by_path, by_relative_path
def _scan_items(self, output_dir: str) -> list[dict]:
return library_index.scan_library_items(
output_dir,
probe_audio=self.preprocessor.probe_audio,
read_tags=self.read_tags
)
def _find_item_by_track_id(self, output_dir: str, track_id: str) -> dict | None:
return next(
(item for item in self._scan_items(output_dir) if item['track_id'] == track_id),
None
)
def _ensure_no_active_tasks(self):
active_ingest = self.task_store.get_active_task('ingest')
if active_ingest is not None:
raise TaskConflictError(active_ingest['task_id'])
active_repair = self.task_store.get_active_task('repair')
if active_repair is not None:
raise TaskConflictError(active_repair['task_id'])
def _read_original_tags(self, file_path: str) -> dict:
try:
tags = self.preprocessor.read_tags(file_path) or {}
except Exception:
tags = {}
if tags:
return tags
return library_index.build_library_metadata({}, {}, Path(file_path))
def _safe_calculate_fingerprint(self, file_path: str) -> dict:
try:
return self.preprocessor.calculate_fingerprint(file_path) or {}
except (AttributeError, PreprocessItemError, FileNotFoundError, OSError, Exception):
return {}
def _is_relative_to(self, path: Path, parent: Path) -> bool:
try:
path.relative_to(parent)
return True
except ValueError:
return False
def _filter_tracks(
self,
tracks: list[dict],
*,
q: str | None,
artist: str | None,
album: str | None,
format: str | None,
has_provenance: bool | None
) -> list[dict]:
query = (q or '').strip().lower()
artist_filter = (artist or '').strip().lower()
album_filter = (album or '').strip().lower()
format_filter = (format or '').strip().lower()
filtered_tracks: list[dict] = []
for track in tracks:
if query and not self._matches_query(track, query):
continue
if artist_filter and (track.get('artist') or '').strip().lower() != artist_filter:
continue
if album_filter and (track.get('album') or '').strip().lower() != album_filter:
continue
if format_filter and (track.get('format') or '').strip().lower() != format_filter:
continue
if has_provenance is True and track.get('ingest_provenance') is None:
continue
if has_provenance is False and track.get('ingest_provenance') is not None:
continue
filtered_tracks.append(track)
return filtered_tracks
def _matches_query(self, track: dict, query: str) -> bool:
searchable_fields = (
track.get('filename'),
track.get('title'),
track.get('artist'),
track.get('album'),
track.get('library_relative_path')
)
return any(query in str(value).lower() for value in searchable_fields if value)
def _sort_key(self, track: dict, sort_by: str) -> tuple[int, object]:
if sort_by == 'organized_at':
value = (
(track.get('ingest_provenance') or {}).get('organized_at')
or track.get('modified_at')
or ''
)
return (0 if value else 1, value)
value = track.get(sort_by)
if isinstance(value, str):
value = value.lower()
return (0 if value not in (None, '') else 1, value or '')