Files
MusicWorkshop/backend/app/library_postprocess.py
2026-04-30 14:34:28 +08:00

1063 lines
35 KiB
Python

import os
import re
import shutil
import time
import unicodedata
from pathlib import Path
from . import library_index
from .scanner import ALLOWED_AUDIO_EXTENSIONS
from .task_constants import (
DEDUPE_PROGRESS_BATCH_SIZE,
DEDUPE_PROGRESS_INTERVAL_SECONDS,
ORGANIZE_PROGRESS_BATCH_SIZE,
ORGANIZE_PROGRESS_INTERVAL_SECONDS,
TASK_STATUS_RUNNING
)
LOSSLESS_EXTENSIONS = library_index.LOSSLESS_EXTENSIONS
PRESERVED_VERSION_TOKENS = library_index.PRESERVED_VERSION_TOKENS
REPLACE_SCORE_THRESHOLD = 15.0
MAX_PATH_COMPONENT_LENGTH = 96
class DedupeItemError(Exception):
def __init__(self, reason: str, message: str):
super().__init__(message)
self.reason = reason
self.message = message
class OrganizeItemError(Exception):
def __init__(self, reason: str, message: str):
super().__init__(message)
self.reason = reason
self.message = message
class DedupeRunner:
def __init__(self, task_store, preprocessor, task_stream):
self.task_store = task_store
self.preprocessor = preprocessor
self.task_stream = task_stream
def run(self, task_id: str, current_stats: dict, config_snapshot: dict):
dedupe_stats = current_stats['dedupe'].copy()
candidates = self.task_store.list_dedupe_candidate_items(task_id)
dedupe_stats['input_items'] = len(candidates)
current_stats['dedupe'] = dedupe_stats.copy()
self._persist_progress(task_id, current_stats, dedupe_stats)
if not candidates:
return
library_index = self._index_library_files(config_snapshot['output'])
dedupe_stats['library_candidates'] = library_index['count']
current_stats['dedupe'] = dedupe_stats.copy()
self.task_stream.broadcast_event(
task_id,
'dedupe.library_indexed',
'dedupe',
{'count': library_index['count']}
)
self._append_log(
task_id,
level='info',
event_type='dedupe.library_indexed',
message=f'已索引输出库音频: {library_index["count"]}',
payload={'count': library_index['count']}
)
groups = _group_batch_candidates(candidates)
winners: list[dict] = []
processed_count = 0
last_progress_at = time.monotonic()
for group in groups:
running_items = [
self.task_store.update_task_item(
item['id'],
dedupe_status='running',
dedupe_reason=None,
dedupe_message=None,
dedupe_group_key=None,
duplicate_of_path=None,
duplicate_of_item_id=None,
dedupe_decision_json=None
)
for item in group
]
winner, batch_duplicates, identity_basis, group_key = _select_batch_winner(running_items)
for duplicate_item in batch_duplicates:
try:
self.task_stream.broadcast_event(
task_id,
'dedupe.lookup_started',
'dedupe',
{'item': duplicate_item}
)
self._append_log(
task_id,
level='info',
event_type='dedupe.lookup_started',
message=f'开始比对重复项: {duplicate_item["relative_path"]}',
payload={'item': duplicate_item}
)
dedupe_stats['batch_duplicates'] += 1
trashed_path = self._move_file_to_trash(
config_snapshot['trash'],
'duplicates',
task_id,
duplicate_item['id'],
duplicate_item['current_file_path']
)
final_item = self.task_store.update_task_item(
duplicate_item['id'],
is_active=0,
current_file_path=trashed_path,
trash_file_path=trashed_path,
dedupe_status='duplicate_trashed',
dedupe_reason='batch_duplicate',
dedupe_message='当前批次中存在更优文件,已移入回收站',
dedupe_group_key=group_key,
duplicate_of_path=winner['current_file_path'],
duplicate_of_item_id=winner['id'],
dedupe_decision_json={
'comparison_scope': 'batch',
'identity_basis': identity_basis,
'quality_breakdown': {
'kept': _build_quality_breakdown(winner),
'trashed': _build_quality_breakdown(duplicate_item)
},
'kept_side': 'batch',
'trashed_path': trashed_path,
'replaced_existing_path': None,
'compared_candidates': [
_serialize_compared_candidate('kept', winner),
_serialize_compared_candidate('trashed', duplicate_item)
]
}
)
self.task_stream.broadcast_event(
task_id,
'dedupe.item_duplicate',
'dedupe',
{'item': final_item}
)
self._append_log(
task_id,
level='warning',
event_type='dedupe.item_duplicate',
message=f'批次重复已淘汰: {duplicate_item["relative_path"]}',
payload={'item': final_item}
)
except DedupeItemError as error:
dedupe_stats['failed_items'] += 1
final_item = self.task_store.update_task_item(
duplicate_item['id'],
dedupe_status='failed',
dedupe_reason=error.reason,
dedupe_message=error.message,
dedupe_group_key=group_key,
duplicate_of_path=winner['current_file_path'],
duplicate_of_item_id=winner['id']
)
self.task_stream.broadcast_event(
task_id,
'dedupe.item_failed',
'dedupe',
{'item': final_item}
)
self._append_log(
task_id,
level='error',
event_type='dedupe.item_failed',
message=f'重复检测失败: {duplicate_item["relative_path"]}',
payload={'item': final_item}
)
processed_count += 1
last_progress_at = self._maybe_persist_progress(
task_id,
current_stats,
dedupe_stats,
processed_count,
last_progress_at
)
winners.append(
self.task_store.update_task_item(
winner['id'],
dedupe_status='running',
dedupe_group_key=group_key
)
)
replace_enabled = bool(
(config_snapshot.get('advancedStrategy') or {}).get('replaceLowQualityDuplicates')
)
for winner in winners:
if not winner['is_active']:
continue
self.task_stream.broadcast_event(
task_id,
'dedupe.lookup_started',
'dedupe',
{'item': winner}
)
self._append_log(
task_id,
level='info',
event_type='dedupe.lookup_started',
message=f'开始比对重复项: {winner["relative_path"]}',
payload={'item': winner}
)
identity_basis, identity_key = _choose_primary_identity(winner)
if not identity_basis or not identity_key:
unique_item = self.task_store.update_task_item(
winner['id'],
dedupe_status='unique',
dedupe_reason=None,
dedupe_message='未发现重复项',
dedupe_group_key=winner.get('dedupe_group_key') or f'item:{winner["id"]}',
dedupe_decision_json={
'comparison_scope': 'none',
'identity_basis': None,
'quality_breakdown': {'kept': _build_quality_breakdown(winner)},
'kept_side': 'batch',
'trashed_path': None,
'replaced_existing_path': None,
'compared_candidates': [_serialize_compared_candidate('kept', winner)]
}
)
dedupe_stats['kept_items'] += 1
self.task_stream.broadcast_event(
task_id,
'dedupe.item_unique',
'dedupe',
{'item': unique_item}
)
self._append_log(
task_id,
level='success',
event_type='dedupe.item_unique',
message=f'未发现重复项,保留文件: {winner["relative_path"]}',
payload={'item': unique_item}
)
processed_count += 1
last_progress_at = self._maybe_persist_progress(
task_id,
current_stats,
dedupe_stats,
processed_count,
last_progress_at
)
continue
library_candidates = library_index['by_basis'].get(identity_basis, {}).get(identity_key, [])
if not library_candidates:
unique_item = self.task_store.update_task_item(
winner['id'],
dedupe_status='unique',
dedupe_reason=None,
dedupe_message='未发现库内重复项',
dedupe_group_key=winner.get('dedupe_group_key') or identity_key,
dedupe_decision_json={
'comparison_scope': 'library',
'identity_basis': identity_basis,
'quality_breakdown': {'kept': _build_quality_breakdown(winner)},
'kept_side': 'batch',
'trashed_path': None,
'replaced_existing_path': None,
'compared_candidates': [_serialize_compared_candidate('kept', winner)]
}
)
dedupe_stats['kept_items'] += 1
self.task_stream.broadcast_event(
task_id,
'dedupe.item_unique',
'dedupe',
{'item': unique_item}
)
self._append_log(
task_id,
level='success',
event_type='dedupe.item_unique',
message=f'未发现库内重复项,保留文件: {winner["relative_path"]}',
payload={'item': unique_item}
)
processed_count += 1
last_progress_at = self._maybe_persist_progress(
task_id,
current_stats,
dedupe_stats,
processed_count,
last_progress_at
)
continue
library_item = max(library_candidates, key=lambda candidate: _build_quality_breakdown(candidate)['total'])
dedupe_stats['library_duplicates'] += 1
winner_quality = _build_quality_breakdown(winner)
library_quality = _build_quality_breakdown(library_item)
try:
if replace_enabled and winner_quality['total'] >= library_quality['total'] + REPLACE_SCORE_THRESHOLD:
replaced_path = self._move_file_to_trash(
config_snapshot['trash'],
'duplicates',
task_id,
winner['id'],
library_item['file_path']
)
final_item = self.task_store.update_task_item(
winner['id'],
dedupe_status='duplicate_replaced',
dedupe_reason='replaced_library_duplicate',
dedupe_message='当前文件质量明显更高,已替换库内旧文件',
dedupe_group_key=winner.get('dedupe_group_key') or identity_key,
duplicate_of_path=library_item['file_path'],
duplicate_of_item_id=None,
dedupe_decision_json={
'comparison_scope': 'library',
'identity_basis': identity_basis,
'quality_breakdown': {
'kept': winner_quality,
'replaced': library_quality
},
'kept_side': 'batch',
'trashed_path': replaced_path,
'replaced_existing_path': library_item['file_path'],
'compared_candidates': [
_serialize_compared_candidate('kept', winner),
_serialize_compared_candidate('replaced', library_item)
]
}
)
dedupe_stats['replaced_library_items'] += 1
dedupe_stats['kept_items'] += 1
self.task_stream.broadcast_event(
task_id,
'dedupe.item_replaced',
'dedupe',
{'item': final_item}
)
self._append_log(
task_id,
level='success',
event_type='dedupe.item_replaced',
message=f'已替换库内旧文件: {winner["relative_path"]}',
payload={'item': final_item}
)
else:
trashed_path = self._move_file_to_trash(
config_snapshot['trash'],
'duplicates',
task_id,
winner['id'],
winner['current_file_path']
)
final_item = self.task_store.update_task_item(
winner['id'],
is_active=0,
current_file_path=trashed_path,
trash_file_path=trashed_path,
dedupe_status='duplicate_trashed',
dedupe_reason='library_duplicate',
dedupe_message='输出库中已存在重复文件,保留库内文件',
dedupe_group_key=winner.get('dedupe_group_key') or identity_key,
duplicate_of_path=library_item['file_path'],
duplicate_of_item_id=None,
dedupe_decision_json={
'comparison_scope': 'library',
'identity_basis': identity_basis,
'quality_breakdown': {
'kept': library_quality,
'trashed': winner_quality
},
'kept_side': 'library',
'trashed_path': trashed_path,
'replaced_existing_path': None,
'compared_candidates': [
_serialize_compared_candidate('kept', library_item),
_serialize_compared_candidate('trashed', winner)
]
}
)
self.task_stream.broadcast_event(
task_id,
'dedupe.item_duplicate',
'dedupe',
{'item': final_item}
)
self._append_log(
task_id,
level='warning',
event_type='dedupe.item_duplicate',
message=f'输出库已存在重复文件,已淘汰: {winner["relative_path"]}',
payload={'item': final_item}
)
except DedupeItemError as error:
dedupe_stats['failed_items'] += 1
final_item = self.task_store.update_task_item(
winner['id'],
dedupe_status='failed',
dedupe_reason=error.reason,
dedupe_message=error.message,
dedupe_group_key=winner.get('dedupe_group_key') or identity_key,
duplicate_of_path=library_item['file_path']
)
self.task_stream.broadcast_event(
task_id,
'dedupe.item_failed',
'dedupe',
{'item': final_item}
)
self._append_log(
task_id,
level='error',
event_type='dedupe.item_failed',
message=f'重复检测失败: {winner["relative_path"]}',
payload={'item': final_item}
)
processed_count += 1
last_progress_at = self._maybe_persist_progress(
task_id,
current_stats,
dedupe_stats,
processed_count,
last_progress_at
)
self._persist_progress(task_id, current_stats, dedupe_stats)
def _index_library_files(self, output_dir: str) -> dict:
return library_index.build_library_index(
output_dir,
probe_audio=self._safe_probe_audio,
read_tags=self._safe_read_library_tags
)
def _safe_probe_audio(self, file_path: str) -> dict:
try:
return self.preprocessor.probe_audio(file_path)
except Exception:
return {}
def _safe_read_library_tags(self, file_path: str) -> dict:
return library_index.safe_read_tags(library_index.default_read_library_tags, file_path)
def _move_file_to_trash(
self,
trash_root: str,
reason: str,
task_id: str,
item_id: int | None,
source_path: str
) -> str:
source = Path(source_path)
if not source.exists():
raise DedupeItemError('source_missing', f'源文件不存在: {source}')
destination = _build_unique_destination(
Path(trash_root) / reason / task_id,
_build_prefixed_name(item_id, source.name)
)
destination.parent.mkdir(parents=True, exist_ok=True)
try:
self._move_file(source, destination)
except OSError as error:
raise DedupeItemError('trash_move_failed', f'移动到回收站失败: {error}') from error
return str(destination.resolve(strict=False))
def _move_file(self, source: Path, destination: Path):
shutil.move(str(source), str(destination))
def _persist_progress(self, task_id: str, current_stats: dict, dedupe_stats: dict[str, int]):
current_stats['dedupe'] = dedupe_stats.copy()
self.task_store.update_task(
task_id,
status=TASK_STATUS_RUNNING,
current_stage='dedupe',
stats=current_stats
)
self.task_stream.broadcast_event(
task_id,
'dedupe.progress',
'dedupe',
{'stats': current_stats}
)
def _append_log(
self,
task_id: str,
*,
level: str,
event_type: str,
message: str,
payload: dict | None = None
):
persisted_log = self.task_store.append_log(
task_id,
'dedupe',
level,
event_type,
message,
payload
)
self.task_stream.broadcast_event(
task_id,
'log.appended',
'dedupe',
{'log': persisted_log}
)
def _maybe_persist_progress(
self,
task_id: str,
current_stats: dict,
dedupe_stats: dict[str, int],
processed_count: int,
last_progress_at: float
) -> float:
now = time.monotonic()
if (
processed_count % DEDUPE_PROGRESS_BATCH_SIZE == 0
or now - last_progress_at >= DEDUPE_PROGRESS_INTERVAL_SECONDS
):
self._persist_progress(task_id, current_stats, dedupe_stats)
return now
return last_progress_at
class OrganizeRunner:
def __init__(self, task_store, task_stream):
self.task_store = task_store
self.task_stream = task_stream
def run(self, task_id: str, current_stats: dict, config_snapshot: dict):
organize_stats = current_stats['organize'].copy()
candidates = self.task_store.list_organize_candidate_items(task_id)
organize_stats['input_items'] = len(candidates)
current_stats['organize'] = organize_stats.copy()
self._persist_progress(task_id, current_stats, organize_stats)
if not candidates:
return
output_root = Path(config_snapshot['output']).expanduser().resolve(strict=False)
trash_root = Path(config_snapshot['trash']).expanduser().resolve(strict=False)
processed_count = 0
last_progress_at = time.monotonic()
for original_item in candidates:
item = self.task_store.update_task_item(
original_item['id'],
organize_status='running',
organize_reason=None,
organize_message=None,
library_relative_path=None,
library_file_path=None,
organize_decision_json=None
)
try:
plan = _build_organize_plan(output_root, item)
self.task_stream.broadcast_event(
task_id,
'organize.path_planned',
'organize',
{
'item': item,
'planned_relative_path': plan['planned_relative_path']
}
)
self._append_log(
task_id,
level='info',
event_type='organize.path_planned',
message=f'已规划入库路径: {item["relative_path"]}',
payload={
'item': item,
'planned_relative_path': plan['planned_relative_path']
}
)
final_path, collision_count = self._resolve_destination(
output_root / plan['planned_relative_path'],
Path(item['current_file_path'])
)
final_path.parent.mkdir(parents=True, exist_ok=True)
source_path = Path(item['current_file_path'])
if not source_path.exists():
raise OrganizeItemError('source_missing', f'源文件不存在: {source_path}')
self._move_file(source_path, final_path)
final_relative_path = final_path.relative_to(output_root).as_posix()
renamed = Path(item['current_file_path']).name != final_path.name
moved = Path(item['current_file_path']).resolve(strict=False) != final_path.resolve(strict=False)
if moved:
organize_stats['moved_items'] += 1
if renamed:
organize_stats['renamed_items'] += 1
if collision_count > 1:
organize_stats['collision_resolved'] += 1
final_item = self.task_store.update_task_item(
item['id'],
current_file_path=str(final_path.resolve(strict=False)),
filename=final_path.name,
organize_status='organized',
organize_reason=None,
organize_message='已按标准路径入库',
library_relative_path=final_relative_path,
library_file_path=str(final_path.resolve(strict=False)),
organize_decision_json={
'source_path': item['current_file_path'],
'planned_relative_path': plan['planned_relative_path'],
'final_relative_path': final_relative_path,
'collision_strategy': 'suffix' if collision_count > 1 else 'none',
'trashed_on_failure': None,
'final_action': 'organized'
}
)
self.task_stream.broadcast_event(
task_id,
'organize.item_organized',
'organize',
{'item': final_item}
)
self._append_log(
task_id,
level='success',
event_type='organize.item_organized',
message=f'文件已入库: {final_relative_path}',
payload={'item': final_item}
)
except OrganizeItemError as error:
organize_stats['failed_items'] += 1
final_item = self._handle_failure(task_id, item, error, output_root, trash_root)
if final_item['organize_status'] == 'trashed':
organize_stats['trashed_items'] += 1
self.task_stream.broadcast_event(
task_id,
'organize.item_trashed',
'organize',
{'item': final_item}
)
self._append_log(
task_id,
level='warning',
event_type='organize.item_trashed',
message=f'入库失败后已移入回收站: {item["relative_path"]}',
payload={'item': final_item}
)
else:
self.task_stream.broadcast_event(
task_id,
'organize.item_failed',
'organize',
{'item': final_item}
)
self._append_log(
task_id,
level='error',
event_type='organize.item_failed',
message=f'整理入库失败: {item["relative_path"]}',
payload={'item': final_item}
)
except OSError as error:
organize_stats['failed_items'] += 1
final_item = self._handle_failure(
task_id,
item,
OrganizeItemError('move_failed', f'整理入库失败: {error}'),
output_root,
trash_root
)
if final_item['organize_status'] == 'trashed':
organize_stats['trashed_items'] += 1
self.task_stream.broadcast_event(
task_id,
'organize.item_trashed',
'organize',
{'item': final_item}
)
self._append_log(
task_id,
level='warning',
event_type='organize.item_trashed',
message=f'入库失败后已移入回收站: {item["relative_path"]}',
payload={'item': final_item}
)
else:
self.task_stream.broadcast_event(
task_id,
'organize.item_failed',
'organize',
{'item': final_item}
)
self._append_log(
task_id,
level='error',
event_type='organize.item_failed',
message=f'整理入库失败: {item["relative_path"]}',
payload={'item': final_item}
)
processed_count += 1
last_progress_at = self._maybe_persist_progress(
task_id,
current_stats,
organize_stats,
processed_count,
last_progress_at
)
self._persist_progress(task_id, current_stats, organize_stats)
def _handle_failure(
self,
task_id: str,
item: dict,
error: OrganizeItemError,
output_root: Path,
trash_root: Path
) -> dict:
source_path = Path(item['current_file_path'])
trashed_path = None
final_status = 'failed'
message = error.message
if source_path.exists():
destination = _build_unique_destination(
trash_root / 'organize_failed' / task_id,
_build_prefixed_name(item['id'], source_path.name)
)
destination.parent.mkdir(parents=True, exist_ok=True)
try:
self._move_file(source_path, destination)
trashed_path = str(destination.resolve(strict=False))
final_status = 'trashed'
except OSError as trash_error:
message = f'{error.message}; 移入回收站失败: {trash_error}'
return self.task_store.update_task_item(
item['id'],
current_file_path=trashed_path or item['current_file_path'],
trash_file_path=trashed_path,
organize_status=final_status,
organize_reason=error.reason,
organize_message=message,
organize_decision_json={
'source_path': item['current_file_path'],
'planned_relative_path': None,
'final_relative_path': None,
'collision_strategy': 'none',
'trashed_on_failure': trashed_path,
'final_action': final_status
}
)
def _resolve_destination(self, desired_path: Path, source_path: Path) -> tuple[Path, int]:
candidate = desired_path
collision_index = 1
while candidate.exists():
if candidate.resolve(strict=False) == source_path.resolve(strict=False):
return candidate, collision_index
collision_index += 1
candidate = candidate.with_name(
f'{desired_path.stem} ({collision_index}){desired_path.suffix}'
)
return candidate, collision_index
def _move_file(self, source: Path, destination: Path):
shutil.move(str(source), str(destination))
def _persist_progress(self, task_id: str, current_stats: dict, organize_stats: dict[str, int]):
current_stats['organize'] = organize_stats.copy()
self.task_store.update_task(
task_id,
status=TASK_STATUS_RUNNING,
current_stage='organize',
stats=current_stats
)
self.task_stream.broadcast_event(
task_id,
'organize.progress',
'organize',
{'stats': current_stats}
)
def _append_log(
self,
task_id: str,
*,
level: str,
event_type: str,
message: str,
payload: dict | None = None
):
persisted_log = self.task_store.append_log(
task_id,
'organize',
level,
event_type,
message,
payload
)
self.task_stream.broadcast_event(
task_id,
'log.appended',
'organize',
{'log': persisted_log}
)
def _maybe_persist_progress(
self,
task_id: str,
current_stats: dict,
organize_stats: dict[str, int],
processed_count: int,
last_progress_at: float
) -> float:
now = time.monotonic()
if (
processed_count % ORGANIZE_PROGRESS_BATCH_SIZE == 0
or now - last_progress_at >= ORGANIZE_PROGRESS_INTERVAL_SECONDS
):
self._persist_progress(task_id, current_stats, organize_stats)
return now
return last_progress_at
def _group_batch_candidates(items: list[dict]) -> list[list[dict]]:
groups: list[list[dict]] = []
indexed_groups: dict[tuple[str, str], list[dict]] = {}
for item in items:
identity_keys = _identity_keys_for_item(item, include_fingerprint=True)
target_group = None
for identity_basis, identity_key in identity_keys:
target_group = indexed_groups.get((identity_basis, identity_key))
if target_group is not None:
break
if target_group is None:
target_group = [item]
groups.append(target_group)
if identity_keys:
for identity_basis, identity_key in identity_keys:
indexed_groups.setdefault((identity_basis, identity_key), target_group)
else:
indexed_groups[(f'item:{item["id"]}', str(item['id']))] = target_group
else:
target_group.append(item)
return groups
def _select_batch_winner(group: list[dict]) -> tuple[dict, list[dict], str | None, str]:
winner = group[0]
for candidate in group[1:]:
if _compare_batch_candidates(candidate, winner) < 0:
winner = candidate
identity_basis, identity_key = _choose_primary_identity(winner)
return winner, [item for item in group if item['id'] != winner['id']], identity_basis, identity_key or f'item:{winner["id"]}'
def _compare_batch_candidates(left: dict, right: dict) -> int:
left_quality = _build_quality_breakdown(left)
right_quality = _build_quality_breakdown(right)
if left_quality['total'] != right_quality['total']:
return -1 if left_quality['total'] > right_quality['total'] else 1
left_authority = (1 if left.get('match_is_authoritative') else 0, left.get('match_confidence') or 0)
right_authority = (1 if right.get('match_is_authoritative') else 0, right.get('match_confidence') or 0)
if left_authority != right_authority:
return -1 if left_authority > right_authority else 1
left_created = left.get('created_at') or ''
right_created = right.get('created_at') or ''
if left_created != right_created:
return -1 if left_created < right_created else 1
if left['id'] != right['id']:
return -1 if left['id'] < right['id'] else 1
return 0
def _build_library_metadata(tags: dict, audio_props: dict, file_path: Path) -> dict:
return library_index.build_library_metadata(tags, audio_props, file_path)
def _identity_keys_for_item(item: dict, *, include_fingerprint: bool) -> list[tuple[str, str]]:
return library_index.identity_keys_for_item(item, include_fingerprint=include_fingerprint)
def _choose_primary_identity(item: dict) -> tuple[str | None, str | None]:
return library_index.choose_primary_identity(item)
def _build_quality_breakdown(item: dict) -> dict:
audio_props = item.get('audio_props_json') or {}
duration_seconds = _first_non_empty(
(item.get('matched_metadata_json') or {}).get('duration_seconds'),
item.get('fingerprint_duration_seconds'),
audio_props.get('duration_seconds')
)
bit_depth = _safe_float(audio_props.get('bit_depth'))
sample_rate = _safe_float(audio_props.get('sample_rate'))
bitrate = _safe_float(audio_props.get('bitrate'))
channels = _safe_float(audio_props.get('channels'))
size_bytes = _safe_float(item.get('size_bytes'))
extension = (item.get('extension') or Path(item.get('current_file_path') or '').suffix).lower()
is_lossless = extension in LOSSLESS_EXTENSIONS or str(audio_props.get('codec') or '').upper() == 'FLAC'
breakdown = {
'bit_depth': round(min((bit_depth or 0) / 24.0, 1.0) * 30.0, 2),
'sample_rate': round(min((sample_rate or 0) / 96000.0, 1.0) * 20.0, 2),
'bitrate': round(min((bitrate or 0) / 320000.0, 1.0) * 20.0, 2),
'lossless': 15.0 if is_lossless else 0.0,
'channels': round(min((channels or 0) / 2.0, 1.0) * 5.0, 2),
'size_duration_consistency': 0.0,
'match_quality': round(
(3.0 if item.get('match_is_authoritative') else 0.0)
+ min((_safe_float(item.get('match_confidence')) or 0.0) / 100.0, 1.0) * 2.0,
2
)
}
if size_bytes and duration_seconds and _safe_float(duration_seconds) and size_bytes / _safe_float(duration_seconds) > 1000:
breakdown['size_duration_consistency'] = 5.0
breakdown['total'] = round(sum(value for key, value in breakdown.items() if key != 'total'), 2)
return breakdown
def _serialize_compared_candidate(side: str, item: dict) -> dict:
return {
'side': side,
'item_id': item.get('id'),
'path': item.get('file_path') or item.get('current_file_path'),
'relative_path': item.get('relative_path'),
'quality_score': _build_quality_breakdown(item)['total']
}
def _build_organize_plan(output_root: Path, item: dict) -> dict:
metadata = item.get('matched_metadata_json') or {}
album_artist = _sanitize_path_component(
_first_non_empty(metadata.get('album_artist'), metadata.get('artist'), 'Unknown Artist')
)
if not album_artist:
raise OrganizeItemError('invalid_target_path', '无法生成有效的 Album Artist 目录')
title = _sanitize_path_component(
_first_non_empty(metadata.get('title'), Path(item.get('current_file_path') or item['filename']).stem)
)
if not title:
raise OrganizeItemError('invalid_target_path', '无法生成有效的标题文件名')
year = _extract_year(_first_non_empty(metadata.get('release_date'), metadata.get('year')))
track_number = _parse_track_number(metadata.get('track_number')) or 1
disc_number = _parse_track_number(metadata.get('disc_number'))
extension = (item.get('extension') or Path(item.get('current_file_path') or '').suffix).lower()
bucket = _bucket_letter(album_artist)
album = _sanitize_path_component(metadata.get('album'))
filename = f'{track_number:02d} - {title}{extension}'
if album and album.lower() not in {'single', 'singles'}:
path_parts = [bucket, album_artist, album]
if disc_number and disc_number > 1:
path_parts.append(f'Disc {disc_number}')
path_parts.append(filename)
else:
year_label = str(year) if year else 'Unknown Year'
single_dir = _sanitize_path_component(f'{year_label} - {title}')
path_parts = [bucket, album_artist, 'Singles', single_dir, f'01 - {title}{extension}']
planned_relative_path = Path(*path_parts).as_posix()
return {
'output_root': output_root,
'planned_relative_path': planned_relative_path
}
def _build_prefixed_name(item_id: int | None, filename: str) -> str:
safe_name = _sanitize_path_component(Path(filename).name, fallback='file')
return f'{item_id}_{safe_name}' if item_id is not None else safe_name
def _build_unique_destination(directory: Path, filename: str) -> Path:
candidate = directory / filename
if not candidate.exists():
return candidate
stem = candidate.stem
suffix = candidate.suffix
counter = 2
while True:
next_candidate = candidate.with_name(f'{stem} ({counter}){suffix}')
if not next_candidate.exists():
return next_candidate
counter += 1
def _bucket_letter(value: str) -> str:
normalized = unicodedata.normalize('NFKC', value).strip()
if not normalized:
return '#'
first = normalized[0].upper()
return first if first.isalnum() and first.isascii() else '#'
def _sanitize_path_component(value: str | None, fallback: str | None = None) -> str:
raw = unicodedata.normalize('NFKC', str(value or fallback or '')).strip()
cleaned = re.sub(r'[\\/:*?"<>|\x00-\x1f]+', ' ', raw)
cleaned = re.sub(r'\s+', ' ', cleaned).strip().rstrip('. ')
if not cleaned:
cleaned = fallback or ''
if len(cleaned) > MAX_PATH_COMPONENT_LENGTH:
cleaned = cleaned[:MAX_PATH_COMPONENT_LENGTH].rstrip('. ')
return cleaned
def _normalize_identity_text(value: str | None) -> str:
return library_index.normalize_identity_text(value)
def _extract_preserved_version_tokens(value: str | None) -> set[str]:
return library_index.extract_preserved_version_tokens(value)
def _normalize_tag_key(value: str) -> str:
return library_index.normalize_tag_key(value)
def _coerce_tag_value(value) -> str | None:
return library_index.coerce_tag_value(value)
def _parse_track_number(value) -> int | None:
return library_index.parse_track_number(value)
def _extract_year(value) -> int | None:
return library_index.extract_year(value)
def _duration_bucket(value) -> int | None:
return library_index.duration_bucket(value)
def _safe_float(value) -> float | None:
return library_index.safe_float(value)
def _first_non_empty(*values):
return library_index.first_non_empty(*values)
def _split_artists(value: str | None) -> list[str]:
return library_index.split_artists(value)