Files
2026-04-30 14:34:28 +08:00

1413 lines
42 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import shutil
import time
from datetime import datetime, timezone
from pathlib import Path
from .defaults import merge_config
from .library_postprocess import (
DedupeRunner,
OrganizeRunner,
_build_prefixed_name,
_build_unique_destination
)
from .matcher import MatchProviderError, Matcher
from .preprocessor import (
FORCED_FLAC_EXTENSIONS,
PreprocessItemError,
Preprocessor,
build_preprocess_paths,
build_split_child_relative_path,
merge_tag_snapshots
)
from .scanner import ScanItem, Scanner
from .task_constants import (
MATCH_PROGRESS_BATCH_SIZE,
MATCH_PROGRESS_INTERVAL_SECONDS,
PREPROCESS_PROGRESS_BATCH_SIZE,
PREPROCESS_PROGRESS_INTERVAL_SECONDS,
STAGE_STATUS_RUNNING,
TASK_STATUS_COMPLETED,
TASK_STATUS_FAILED,
TASK_STATUS_RUNNING,
current_timestamp,
create_dedupe_completed_stage_states,
create_dedupe_failed_stage_states,
create_dedupe_running_stage_states,
create_empty_task_stats,
create_match_completed_stage_states,
create_match_failed_stage_states,
create_match_running_stage_states,
create_organize_completed_stage_states,
create_organize_failed_stage_states,
create_organize_running_stage_states,
create_pending_stage_states,
create_preprocess_completed_stage_states,
create_preprocess_failed_stage_states,
create_preprocess_running_stage_states,
create_task_completed_stage_states,
create_scan_completed_stage_states,
create_scan_failed_stage_states
)
from .task_store import TaskStore
from .task_stream import TaskStreamManager
class TaskRunner:
def __init__(
self,
task_store: TaskStore,
scanner: Scanner,
preprocessor: Preprocessor,
task_stream: TaskStreamManager,
matcher: Matcher | None = None,
dedupe_runner: DedupeRunner | None = None,
organize_runner: OrganizeRunner | None = None
):
self.task_store = task_store
self.scanner = scanner
self.preprocessor = preprocessor
self.task_stream = task_stream
self.matcher = matcher or Matcher()
self.dedupe_runner = dedupe_runner or DedupeRunner(task_store, preprocessor, task_stream)
self.organize_runner = organize_runner or OrganizeRunner(task_store, task_stream)
def start_task(self, task_id: str, config_snapshot: dict):
normalized_config = merge_config(config_snapshot)
current_stats = create_empty_task_stats()
failure_stage = 'scan'
running_stage_states = create_pending_stage_states()
running_stage_states['scan'] = STAGE_STATUS_RUNNING
self.task_store.update_task(
task_id,
status=TASK_STATUS_RUNNING,
current_stage='scan',
stage_states=running_stage_states,
stats=current_stats
)
self.task_stream.broadcast_event(
task_id,
'task.started',
'system',
{'status': TASK_STATUS_RUNNING, 'current_stage': 'scan'}
)
self._append_log(
task_id,
stage='system',
level='info',
event_type='task.started',
message='任务已启动',
payload={'current_stage': 'scan'}
)
self.task_stream.broadcast_event(
task_id,
'stage.started',
'scan',
{'stage': 'scan'}
)
self._append_log(
task_id,
stage='scan',
level='info',
event_type='stage.started',
message=f'开始扫描目录: {normalized_config["input"]}',
payload={'path': normalized_config['input']}
)
try:
self._quarantine_exception_items(task_id, normalized_config, scope='history')
scan_stats = self.scanner.scan(
normalized_config['input'],
on_item=lambda item: self._handle_scan_item(task_id, item),
on_progress=lambda next_stats: self._handle_scan_progress(
task_id,
next_stats,
current_stats
),
on_log=lambda level, message, payload: self._handle_scanner_log(
task_id,
level,
message,
payload
)
)
current_stats['scan'] = scan_stats.copy()
self.task_store.update_task(
task_id,
status=TASK_STATUS_RUNNING,
current_stage='scan',
stage_states=create_scan_completed_stage_states(),
stats=current_stats
)
self.task_stream.broadcast_event(
task_id,
'stage.completed',
'scan',
{'stats': current_stats}
)
self._append_log(
task_id,
stage='scan',
level='success',
event_type='stage.completed',
message='扫描阶段完成',
payload={'stats': current_stats}
)
failure_stage = 'preprocess'
self._run_preprocess_stage(task_id, current_stats)
self.task_store.update_task(
task_id,
status=TASK_STATUS_RUNNING,
current_stage='preprocess',
stage_states=create_preprocess_completed_stage_states(),
stats=current_stats
)
self.task_stream.broadcast_event(
task_id,
'stage.completed',
'preprocess',
{'stats': current_stats}
)
self._append_log(
task_id,
stage='preprocess',
level='success',
event_type='stage.completed',
message='音频预处理阶段完成',
payload={'stats': current_stats}
)
failure_stage = 'match'
self._run_match_stage(task_id, current_stats, normalized_config)
self.task_store.update_task(
task_id,
status=TASK_STATUS_RUNNING,
current_stage='match',
stage_states=create_match_completed_stage_states(),
stats=current_stats
)
self.task_stream.broadcast_event(
task_id,
'stage.completed',
'match',
{'stats': current_stats}
)
self._append_log(
task_id,
stage='match',
level='success',
event_type='stage.completed',
message='音乐匹配阶段完成',
payload={'stats': current_stats}
)
failure_stage = 'dedupe'
self._run_dedupe_stage(task_id, current_stats, normalized_config)
self.task_store.update_task(
task_id,
status=TASK_STATUS_RUNNING,
current_stage='dedupe',
stage_states=create_dedupe_completed_stage_states(),
stats=current_stats,
)
self.task_stream.broadcast_event(
task_id,
'stage.completed',
'dedupe',
{'stats': current_stats}
)
self._append_log(
task_id,
stage='dedupe',
level='success',
event_type='stage.completed',
message='重复检测阶段完成',
payload={'stats': current_stats}
)
failure_stage = 'organize'
self._run_organize_stage(task_id, current_stats, normalized_config)
self._quarantine_exception_items(task_id, normalized_config, scope='current')
self.task_store.update_task(
task_id,
status=TASK_STATUS_RUNNING,
current_stage='organize',
stage_states=create_organize_completed_stage_states(),
stats=current_stats
)
self.task_stream.broadcast_event(
task_id,
'stage.completed',
'organize',
{'stats': current_stats}
)
self._append_log(
task_id,
stage='organize',
level='success',
event_type='stage.completed',
message='整理入库阶段完成',
payload={'stats': current_stats}
)
completed_at = current_timestamp()
completed_stage_states = create_task_completed_stage_states()
self.task_store.update_task(
task_id,
status=TASK_STATUS_COMPLETED,
current_stage='complete',
stage_states=completed_stage_states,
stats=current_stats,
completed_at=completed_at
)
self.task_stream.broadcast_event(
task_id,
'task.completed',
'complete',
{'status': TASK_STATUS_COMPLETED, 'stats': current_stats}
)
self._append_log(
task_id,
stage='complete',
level='success',
event_type='task.completed',
message='任务已完成,五个阶段均已执行结束',
payload={'stats': current_stats}
)
except Exception as error:
if failure_stage == 'scan':
failed_stage_states = create_scan_failed_stage_states()
failed_stage = 'scan'
elif failure_stage == 'preprocess':
failed_stage_states = create_preprocess_failed_stage_states()
failed_stage = 'preprocess'
elif failure_stage == 'match':
failed_stage_states = create_match_failed_stage_states()
failed_stage = 'match'
elif failure_stage == 'dedupe':
failed_stage_states = create_dedupe_failed_stage_states()
failed_stage = 'dedupe'
else:
failed_stage_states = create_organize_failed_stage_states()
failed_stage = 'organize'
completed_at = current_timestamp()
self.task_store.update_task(
task_id,
status=TASK_STATUS_FAILED,
current_stage=failed_stage,
stage_states=failed_stage_states,
stats=current_stats,
error_message=str(error),
completed_at=completed_at
)
self.task_stream.broadcast_event(
task_id,
'task.failed',
failed_stage,
{'status': TASK_STATUS_FAILED, 'error_message': str(error), 'stats': current_stats}
)
self._append_log(
task_id,
stage=failed_stage,
level='error',
event_type='task.failed',
message=f'任务失败: {error}',
payload={'error_message': str(error), 'stats': current_stats}
)
def _run_preprocess_stage(self, task_id: str, current_stats: dict):
preprocess_stats = current_stats['preprocess'].copy()
initial_candidates = self.task_store.list_preprocess_candidate_items(task_id)
preprocess_stats['input_items'] = len(initial_candidates)
current_stats['preprocess'] = preprocess_stats.copy()
self.task_store.update_task(
task_id,
status=TASK_STATUS_RUNNING,
current_stage='preprocess',
stage_states=create_preprocess_running_stage_states(),
stats=current_stats
)
self.task_stream.broadcast_event(
task_id,
'stage.started',
'preprocess',
{'stage': 'preprocess'}
)
self._append_log(
task_id,
stage='preprocess',
level='info',
event_type='stage.started',
message='开始执行音频预处理阶段',
payload={'input_items': preprocess_stats['input_items']}
)
if not initial_candidates:
self._persist_preprocess_progress(task_id, current_stats, preprocess_stats)
return
dependencies = self.preprocessor.check_dependencies()
workspace = self.preprocessor.create_workspace(task_id)
self.task_stream.broadcast_event(
task_id,
'preprocess.dependencies_checked',
'preprocess',
{
'dependencies': dependencies,
'workspace': {name: str(path) for name, path in workspace.items()}
}
)
self._append_log(
task_id,
stage='preprocess',
level='info',
event_type='preprocess.dependencies_checked',
message='预处理依赖检查通过',
payload={
'dependencies': dependencies,
'workspace': {name: str(path) for name, path in workspace.items()}
}
)
processed_count = 0
last_progress_at = time.monotonic()
while True:
candidates = self.task_store.list_preprocess_candidate_items(task_id)
if not candidates:
break
item = self.task_store.update_task_item(
candidates[0]['id'],
preprocess_status='running',
preprocess_reason=None,
preprocess_message=None
)
try:
self._process_preprocess_item(task_id, item, preprocess_stats)
except PreprocessItemError as error:
preprocess_stats['failed_items'] += 1
failed_item = self.task_store.update_task_item(
item['id'],
preprocess_status='failed',
preprocess_reason=error.reason,
preprocess_message=error.message
)
self.task_stream.broadcast_event(
task_id,
'preprocess.item_failed',
'preprocess',
{'item': failed_item}
)
self._append_log(
task_id,
stage='preprocess',
level='error',
event_type='preprocess.item_failed',
message=f'预处理失败: {item["relative_path"]}',
payload={'item': failed_item}
)
except Exception as error:
preprocess_stats['failed_items'] += 1
failed_item = self.task_store.update_task_item(
item['id'],
preprocess_status='failed',
preprocess_reason='unexpected_error',
preprocess_message=str(error)
)
self.task_stream.broadcast_event(
task_id,
'preprocess.item_failed',
'preprocess',
{'item': failed_item}
)
self._append_log(
task_id,
stage='preprocess',
level='error',
event_type='preprocess.item_failed',
message=f'预处理异常: {item["relative_path"]}',
payload={'item': failed_item}
)
processed_count += 1
now = time.monotonic()
if (
processed_count % PREPROCESS_PROGRESS_BATCH_SIZE == 0
or now - last_progress_at >= PREPROCESS_PROGRESS_INTERVAL_SECONDS
):
self._persist_preprocess_progress(task_id, current_stats, preprocess_stats)
last_progress_at = now
self._persist_preprocess_progress(task_id, current_stats, preprocess_stats)
def _run_match_stage(self, task_id: str, current_stats: dict, config_snapshot: dict):
match_stats = current_stats['match'].copy()
initial_candidates = self.task_store.list_match_candidate_items(task_id)
album_groups = _build_album_groups(initial_candidates)
match_stats['input_items'] = len(initial_candidates)
current_stats['match'] = match_stats.copy()
self.task_store.update_task(
task_id,
status=TASK_STATUS_RUNNING,
current_stage='match',
stage_states=create_match_running_stage_states(),
stats=current_stats
)
self.task_stream.broadcast_event(
task_id,
'stage.started',
'match',
{'stage': 'match'}
)
self._append_log(
task_id,
stage='match',
level='info',
event_type='stage.started',
message='开始执行音乐匹配阶段',
payload={'input_items': match_stats['input_items']}
)
if not initial_candidates:
self._persist_match_progress(task_id, current_stats, match_stats)
return
processed_count = 0
last_progress_at = time.monotonic()
while True:
candidates = self.task_store.list_match_candidate_items(task_id)
if not candidates:
break
item = self.task_store.update_task_item(
candidates[0]['id'],
match_status='running',
match_reason=None,
match_message=None,
match_source=None,
match_confidence=None,
match_is_authoritative=0,
matched_metadata_json=None,
match_candidates_json=None,
match_enrichment_json=None
)
item_group = album_groups.get(_album_group_key(item), [item])
self.task_stream.broadcast_event(
task_id,
'match.lookup_started',
'match',
{'item': item}
)
self._append_log(
task_id,
stage='match',
level='info',
event_type='match.lookup_started',
message=f'开始匹配: {item["relative_path"]}',
payload={'item': item}
)
try:
match_result = self.matcher.match_item(item, item_group, config_snapshot)
item = self.task_store.update_task_item(
item['id'],
match_status=match_result['status'],
match_reason=match_result['reason'],
match_message=match_result['message'],
match_source=match_result['source'],
match_confidence=match_result['confidence'],
match_is_authoritative=1 if match_result['is_authoritative'] else 0,
matched_metadata_json=match_result['matched_metadata_json'],
match_candidates_json=match_result['match_candidates_json'],
match_enrichment_json=match_result['match_enrichment_json']
)
self._handle_provider_warnings(
task_id,
item,
match_result.get('provider_warnings') or [],
match_stats
)
self.task_stream.broadcast_event(
task_id,
'match.candidates_found',
'match',
{
'item': item,
'candidates': match_result['match_candidates_json']
}
)
self._append_log(
task_id,
stage='match',
level='info',
event_type='match.candidates_found',
message=(
f'匹配候选已生成: {item["relative_path"]} '
f'({len(match_result["match_candidates_json"])} 个)'
),
payload={
'item': item,
'candidates': match_result['match_candidates_json']
}
)
self._handle_match_result(task_id, item, match_result, match_stats)
except MatchProviderError as error:
match_stats['failed_items'] += 1
failed_item = self.task_store.update_task_item(
item['id'],
match_status='failed',
match_reason='provider_error',
match_message=str(error),
match_source=getattr(error, 'provider', None),
match_confidence=None,
match_is_authoritative=0,
matched_metadata_json=None,
match_candidates_json=None,
match_enrichment_json=None
)
self.task_stream.broadcast_event(
task_id,
'match.item_failed',
'match',
{'item': failed_item}
)
self._append_log(
task_id,
stage='match',
level='error',
event_type='match.item_failed',
message=f'匹配失败: {item["relative_path"]}',
payload={'item': failed_item}
)
except Exception as error:
match_stats['failed_items'] += 1
failed_item = self.task_store.update_task_item(
item['id'],
match_status='failed',
match_reason='unexpected_error',
match_message=str(error),
match_source=None,
match_confidence=None,
match_is_authoritative=0,
matched_metadata_json=None,
match_candidates_json=None,
match_enrichment_json=None
)
self.task_stream.broadcast_event(
task_id,
'match.item_failed',
'match',
{'item': failed_item}
)
self._append_log(
task_id,
stage='match',
level='error',
event_type='match.item_failed',
message=f'匹配异常: {item["relative_path"]}',
payload={'item': failed_item}
)
processed_count += 1
now = time.monotonic()
if (
processed_count % MATCH_PROGRESS_BATCH_SIZE == 0
or now - last_progress_at >= MATCH_PROGRESS_INTERVAL_SECONDS
):
self._persist_match_progress(task_id, current_stats, match_stats)
last_progress_at = now
self._persist_match_progress(task_id, current_stats, match_stats)
def _run_dedupe_stage(self, task_id: str, current_stats: dict, config_snapshot: dict):
dedupe_stats = current_stats['dedupe'].copy()
initial_candidates = self.task_store.list_dedupe_candidate_items(task_id)
dedupe_stats['input_items'] = len(initial_candidates)
current_stats['dedupe'] = dedupe_stats.copy()
self.task_store.update_task(
task_id,
status=TASK_STATUS_RUNNING,
current_stage='dedupe',
stage_states=create_dedupe_running_stage_states(),
stats=current_stats
)
self.task_stream.broadcast_event(
task_id,
'stage.started',
'dedupe',
{'stage': 'dedupe'}
)
self._append_log(
task_id,
stage='dedupe',
level='info',
event_type='stage.started',
message='开始执行重复检测阶段',
payload={'input_items': dedupe_stats['input_items']}
)
self.dedupe_runner.run(task_id, current_stats, config_snapshot)
def _run_organize_stage(self, task_id: str, current_stats: dict, config_snapshot: dict):
organize_stats = current_stats['organize'].copy()
initial_candidates = self.task_store.list_organize_candidate_items(task_id)
organize_stats['input_items'] = len(initial_candidates)
current_stats['organize'] = organize_stats.copy()
self.task_store.update_task(
task_id,
status=TASK_STATUS_RUNNING,
current_stage='organize',
stage_states=create_organize_running_stage_states(),
stats=current_stats
)
self.task_stream.broadcast_event(
task_id,
'stage.started',
'organize',
{'stage': 'organize'}
)
self._append_log(
task_id,
stage='organize',
level='info',
event_type='stage.started',
message='开始执行整理入库阶段',
payload={'input_items': organize_stats['input_items']}
)
self.organize_runner.run(task_id, current_stats, config_snapshot)
def _quarantine_exception_items(self, task_id: str, config_snapshot: dict, *, scope: str):
input_root = Path(config_snapshot['input']).expanduser().resolve(strict=False)
trash_root = Path(config_snapshot['trash']).expanduser().resolve(strict=False)
if scope == 'current':
source_items = self.task_store.list_all_task_items(task_id)
else:
source_items = self.task_store.list_exception_source_items('open')
for item in source_items:
exception_type = _exception_type_for_item(item)
if exception_type is None:
continue
if item.get('exception_resolution_status') not in {'open', 'planned'}:
continue
if item.get('trash_file_path'):
self._log_quarantine_skip(
task_id,
item,
exception_type,
'already_trashed',
'异常文件已存在回收站路径,跳过隔离'
)
continue
try:
result = self._quarantine_exception_item(
item,
exception_type,
input_root,
trash_root
)
except OSError as error:
self._append_log(
task_id,
stage='system',
level='error',
event_type='exception.quarantine_skipped',
message=f'异常文件隔离失败: {item["relative_path"]}',
payload={
'item_id': item['id'],
'source_task_id': item['task_id'],
'exception_type': exception_type,
'reason': 'move_failed',
'error_message': str(error)
}
)
continue
if result is None:
self._log_quarantine_skip(
task_id,
item,
exception_type,
'no_input_file',
'异常项没有残留在输入目录的可隔离文件'
)
continue
updated_item = self.task_store.update_task_item(
item['id'],
is_active=0,
current_file_path=result['current_file_path'],
trash_file_path=result['trash_file_path'],
exception_resolution_status=item.get('exception_resolution_status') or 'open'
)
self.task_stream.broadcast_event(
task_id,
'exception.item_quarantined',
'system',
{'item': updated_item, 'quarantine': result}
)
self._append_log(
task_id,
stage='system',
level='warning',
event_type='exception.item_quarantined',
message=f'已隔离异常文件: {item["relative_path"]}',
payload={
'item_id': item['id'],
'source_task_id': item['task_id'],
'exception_type': exception_type,
**result
}
)
if result.get('errors'):
self._append_log(
task_id,
stage='system',
level='error',
event_type='exception.quarantine_skipped',
message=f'部分异常文件隔离失败: {item["relative_path"]}',
payload={
'item_id': item['id'],
'source_task_id': item['task_id'],
'exception_type': exception_type,
'reason': 'partial_move_failed',
'errors': result['errors']
}
)
def _quarantine_exception_item(
self,
item: dict,
exception_type: str,
input_root: Path,
trash_root: Path
) -> dict | None:
current_path = _existing_file_path(item.get('current_file_path'))
original_path = _existing_file_path(item.get('original_path'))
current_in_input = _is_relative_to(current_path, input_root) if current_path else False
original_in_input = _is_relative_to(original_path, input_root) if original_path else False
if not current_in_input and not original_in_input:
return None
destination_dir = trash_root / exception_type / item['task_id']
moved_paths: dict[str, str] = {}
errors: list[dict[str, str]] = []
seen_sources: set[Path] = set()
for role, source_path in (('current', current_path), ('original', original_path)):
if source_path is None:
continue
resolved_source = source_path.resolve(strict=False)
if resolved_source in seen_sources:
continue
if role == 'current' or _is_relative_to(source_path, input_root):
destination = _build_unique_destination(
destination_dir,
_build_prefixed_name(item['id'], source_path.name)
)
destination.parent.mkdir(parents=True, exist_ok=True)
try:
shutil.move(str(source_path), str(destination))
moved_paths[role] = str(destination.resolve(strict=False))
except OSError as error:
errors.append(
{
'role': role,
'source_path': str(source_path),
'destination_path': str(destination),
'error_message': str(error)
}
)
continue
seen_sources.add(resolved_source)
current_file_path = moved_paths.get('current')
if current_file_path is None and current_path is not None and current_path.exists():
current_file_path = str(current_path.resolve(strict=False))
current_file_path = current_file_path or moved_paths.get('original') or item.get('current_file_path')
trash_file_path = moved_paths.get('current') or moved_paths.get('original')
if trash_file_path is None:
if errors:
raise OSError(f'failed to move exception files: {errors}')
return None
return {
'current_file_path': current_file_path,
'trash_file_path': trash_file_path,
'moved_paths': moved_paths,
'errors': errors
}
def _log_quarantine_skip(
self,
task_id: str,
item: dict,
exception_type: str,
reason: str,
message: str
):
self._append_log(
task_id,
stage='system',
level='info',
event_type='exception.quarantine_skipped',
message=message,
payload={
'item_id': item['id'],
'source_task_id': item['task_id'],
'exception_type': exception_type,
'reason': reason
}
)
def _handle_provider_warnings(
self,
task_id: str,
item: dict,
provider_warnings: list[dict],
match_stats: dict[str, int]
):
if not provider_warnings:
return
match_stats['provider_warnings'] += len(provider_warnings)
for provider_warning in provider_warnings:
provider_name = provider_warning.get('provider') or 'unknown'
warning_message = provider_warning.get('message') or f'{provider_name} 请求失败'
payload = {
'item': item,
'provider_warning': provider_warning
}
self.task_stream.broadcast_event(
task_id,
'match.provider_skipped',
'match',
payload
)
self._append_log(
task_id,
stage='match',
level='warning',
event_type='match.provider_skipped',
message=f'已跳过 {provider_name}: {warning_message}',
payload=payload
)
def _handle_match_result(
self,
task_id: str,
item: dict,
match_result: dict,
match_stats: dict[str, int]
):
if match_result['status'] == 'matched':
match_stats['matched_authoritative'] += 1
event_type = 'match.item_matched'
level = 'success'
message = (
f'权威匹配成功: {item["relative_path"]} '
f'({match_result["source"]}, {match_result["confidence"]:.1f})'
)
elif match_result['status'] == 'matched_fallback':
match_stats['matched_fallback'] += 1
event_type = 'match.item_matched'
level = 'warning'
message = (
f'Fallback 匹配成功: {item["relative_path"]} '
f'({match_result["source"]}, {match_result["confidence"]:.1f})'
)
elif match_result['status'] == 'low_score':
match_stats['low_score'] += 1
event_type = 'match.item_low_score'
level = 'warning'
message = f'候选分数不足: {item["relative_path"]}'
else:
match_stats['not_found'] += 1
event_type = 'match.item_not_found'
level = 'warning'
message = f'未找到匹配候选: {item["relative_path"]}'
self.task_stream.broadcast_event(
task_id,
event_type,
'match',
{'item': item}
)
self._append_log(
task_id,
stage='match',
level=level,
event_type=event_type,
message=message,
payload={'item': item}
)
def _process_preprocess_item(
self,
task_id: str,
item: dict,
preprocess_stats: dict[str, int]
):
current_file_path = item['current_file_path'] or item['original_path']
item_paths = build_preprocess_paths(task_id, item['id'])
if cue_path := self.preprocessor.find_matching_cue(current_file_path):
self._split_cue_item(
task_id,
item,
cue_path,
current_file_path,
item_paths,
preprocess_stats
)
return
artifacts = dict(item.get('preprocess_artifacts_json') or {})
original_tags = dict(item.get('original_tags_json') or {})
local_cover = item.get('local_cover')
converted_path = current_file_path
if Path(current_file_path).suffix.lower() in FORCED_FLAC_EXTENSIONS:
converted_path = self.preprocessor.convert_to_flac(
current_file_path,
item_paths['converted']
)
artifacts['converted_path'] = converted_path
preprocess_stats['converted_items'] += 1
converted_file = Path(converted_path)
converted_item = self.task_store.update_task_item(
item['id'],
current_file_path=converted_path,
filename=converted_file.name,
extension=converted_file.suffix.lower(),
preprocess_artifacts_json=artifacts
)
self.task_stream.broadcast_event(
task_id,
'preprocess.item_converted',
'preprocess',
{'item': converted_item}
)
self._append_log(
task_id,
stage='preprocess',
level='info',
event_type='preprocess.item_converted',
message=f'已转码为 FLAC: {item["relative_path"]}',
payload={'item': converted_item}
)
audio_props = self.preprocessor.probe_audio(converted_path)
warnings: list[str] = []
warning_reasons: list[str] = []
try:
extracted_tags = self.preprocessor.read_tags(converted_path)
original_tags = merge_tag_snapshots(extracted_tags, original_tags)
except Exception as error:
warning_reasons.append('metadata_failed')
warnings.append(f'读取元数据失败: {error}')
if not local_cover:
try:
embedded_cover = self.preprocessor.extract_embedded_cover(
converted_path,
item_paths['cover']
)
if embedded_cover:
artifacts['embedded_cover'] = embedded_cover
local_cover = embedded_cover
except Exception as error:
warning_reasons.append('metadata_failed')
warnings.append(f'提取内嵌封面失败: {error}')
fingerprint = None
fingerprint_duration = None
try:
fingerprint_payload = self.preprocessor.calculate_fingerprint(converted_path)
fingerprint = fingerprint_payload['fingerprint']
fingerprint_duration = fingerprint_payload['duration_seconds']
preprocess_stats['fingerprints_ok'] += 1
except PreprocessItemError as error:
warning_reasons.append(error.reason)
warnings.append(error.message)
preprocess_stats['fingerprints_failed'] += 1
preprocess_stats['metadata_snapshots'] += 1
preprocess_stats['output_items'] += 1
update_fields = {
'current_file_path': converted_path,
'local_cover': local_cover,
'audio_props_json': audio_props,
'original_tags_json': original_tags,
'preprocess_artifacts_json': artifacts or None,
'acoustic_fingerprint': fingerprint,
'fingerprint_duration_seconds': fingerprint_duration
}
if warnings:
preprocess_stats['warning_items'] += 1
update_fields.update(
preprocess_status='warning',
preprocess_reason=','.join(_unique_strings(warning_reasons)),
preprocess_message=''.join(_unique_strings(warnings))
)
final_item = self.task_store.update_task_item(item['id'], **update_fields)
self.task_stream.broadcast_event(
task_id,
'preprocess.item_warning',
'preprocess',
{'item': final_item}
)
self._append_log(
task_id,
stage='preprocess',
level='warning',
event_type='preprocess.item_warning',
message=f'预处理完成但存在警告: {item["relative_path"]}',
payload={'item': final_item}
)
return
update_fields.update(
preprocess_status='completed',
preprocess_reason=None,
preprocess_message=None
)
final_item = self.task_store.update_task_item(item['id'], **update_fields)
self.task_stream.broadcast_event(
task_id,
'preprocess.item_completed',
'preprocess',
{'item': final_item}
)
self._append_log(
task_id,
stage='preprocess',
level='success',
event_type='preprocess.item_completed',
message=f'预处理完成: {item["relative_path"]}',
payload={'item': final_item}
)
def _split_cue_item(
self,
task_id: str,
item: dict,
cue_path: Path,
current_file_path: str,
item_paths: dict[str, Path],
preprocess_stats: dict[str, int]
):
total_duration_seconds = None
try:
total_duration_seconds = self.preprocessor.probe_audio(current_file_path).get(
'duration_seconds'
)
except PreprocessItemError:
total_duration_seconds = None
cue_sheet = self.preprocessor.parse_cue(cue_path)
split_outputs = self.preprocessor.split_cue_tracks(
current_file_path,
cue_sheet,
item_paths['split'],
total_duration_seconds
)
child_items: list[dict] = []
for split_output in split_outputs:
file_size, modified_at = _read_file_state(split_output['path'])
cue_tags = {
'title': split_output['title'],
'artist': split_output['artist'],
'album': split_output['album'],
'album_artist': split_output['album_artist'],
'track_number': str(split_output['track_number'])
}
child_items.append(
self.task_store.insert_task_item(
task_id,
original_path=item['original_path'],
relative_path=build_split_child_relative_path(
item['relative_path'],
split_output['filename']
),
filename=split_output['filename'],
extension=Path(split_output['filename']).suffix.lower(),
size_bytes=file_size,
modified_at=modified_at,
local_cover=item['local_cover'],
local_lyric=item['local_lyric'],
scan_status='queued',
scan_reason=None,
scan_message=None,
parent_item_id=item['id'],
current_file_path=split_output['path'],
preprocess_status='pending',
original_tags_json=merge_tag_snapshots(
cue_tags,
item.get('original_tags_json') or {}
),
preprocess_artifacts_json={'cue_path': str(cue_path)}
)
)
parent_artifacts = dict(item.get('preprocess_artifacts_json') or {})
parent_artifacts.update(
{
'cue_path': str(cue_path),
'split_outputs': [child['current_file_path'] for child in child_items]
}
)
parent_item = self.task_store.update_task_item(
item['id'],
is_active=0,
preprocess_status='replaced_by_split',
preprocess_reason='cue_split',
preprocess_message=f'已按 CUE 切分为 {len(child_items)} 个子轨道',
preprocess_artifacts_json=parent_artifacts
)
preprocess_stats['split_parents'] += 1
preprocess_stats['generated_children'] += len(child_items)
self.task_stream.broadcast_event(
task_id,
'preprocess.item_split',
'preprocess',
{'items': [parent_item, *child_items]}
)
self._append_log(
task_id,
stage='preprocess',
level='info',
event_type='preprocess.item_split',
message=f'已根据 CUE 切轨: {item["relative_path"]}',
payload={
'item': parent_item,
'children': child_items
}
)
def _handle_scan_item(self, task_id: str, item: ScanItem):
persisted_item = self.task_store.insert_task_item(
task_id,
**item.to_dict(),
preprocess_status='pending' if item.scan_status == 'queued' else 'skipped'
)
if item.scan_status == 'queued':
message = f'文件已加入扫描结果: {item.relative_path}'
event_type = 'scan.file_queued'
level = 'info'
elif item.scan_status == 'skipped_locked':
message = f'跳过最近仍在写入的文件: {item.relative_path}'
event_type = 'scan.file_skipped'
level = 'warning'
else:
message = f'文件无法处理: {item.relative_path}'
event_type = 'scan.file_skipped'
level = 'error'
self.task_stream.broadcast_event(
task_id,
event_type,
'scan',
{'item': persisted_item}
)
self._append_log(
task_id,
stage='scan',
level=level,
event_type=event_type,
message=message,
payload={'item': persisted_item}
)
def _handle_scan_progress(
self,
task_id: str,
scan_stats: dict[str, int],
current_stats: dict
):
current_stats['scan'] = scan_stats.copy()
self.task_store.update_task(
task_id,
status=TASK_STATUS_RUNNING,
current_stage='scan',
stats=current_stats
)
self.task_stream.broadcast_event(
task_id,
'scan.progress',
'scan',
{'stats': current_stats}
)
def _persist_preprocess_progress(
self,
task_id: str,
current_stats: dict,
preprocess_stats: dict[str, int]
):
current_stats['preprocess'] = preprocess_stats.copy()
self.task_store.update_task(
task_id,
status=TASK_STATUS_RUNNING,
current_stage='preprocess',
stats=current_stats
)
self.task_stream.broadcast_event(
task_id,
'preprocess.progress',
'preprocess',
{'stats': current_stats}
)
def _persist_match_progress(
self,
task_id: str,
current_stats: dict,
match_stats: dict[str, int]
):
current_stats['match'] = match_stats.copy()
self.task_store.update_task(
task_id,
status=TASK_STATUS_RUNNING,
current_stage='match',
stats=current_stats
)
self.task_stream.broadcast_event(
task_id,
'match.progress',
'match',
{'stats': current_stats}
)
def _handle_scanner_log(
self,
task_id: str,
level: str,
message: str,
payload: dict | None
):
self._append_log(
task_id,
stage='scan',
level=level,
event_type='log.appended',
message=message,
payload=payload
)
def _append_log(
self,
task_id: str,
*,
stage: str,
level: str,
event_type: str,
message: str,
payload: dict | None = None
):
persisted_log = self.task_store.append_log(
task_id,
stage,
level,
event_type,
message,
payload
)
self.task_stream.broadcast_event(
task_id,
'log.appended',
stage,
{'log': persisted_log}
)
def _read_file_state(file_path: str) -> tuple[int, str]:
file_stat = Path(file_path).stat()
return file_stat.st_size, _format_file_timestamp(file_stat.st_mtime)
def _format_file_timestamp(value: float) -> str:
return (
datetime.fromtimestamp(value, tz=timezone.utc)
.replace(microsecond=0)
.isoformat()
.replace('+00:00', 'Z')
)
def _unique_strings(values: list[str]) -> list[str]:
return list(dict.fromkeys(values))
def _existing_file_path(value: str | None) -> Path | None:
if not value:
return None
path = Path(value).expanduser()
return path if path.exists() and path.is_file() else None
def _is_relative_to(path: Path | None, root: Path) -> bool:
if path is None:
return False
try:
path.resolve(strict=False).relative_to(root)
return True
except ValueError:
return False
def _exception_type_for_item(item: dict) -> str | None:
if item.get('organize_status') in {'trashed', 'failed'}:
return 'organize_failed'
if item.get('dedupe_status') in {'duplicate_trashed', 'failed'}:
return 'duplicates'
if item.get('match_status') == 'low_score':
return 'low_score'
if item.get('match_status') in {'failed', 'not_found'}:
return 'match_failed'
if (
item.get('preprocess_status') == 'failed'
and item.get('preprocess_reason') == 'convert_failed'
):
return 'convert_failed'
preprocess_reason = item.get('preprocess_reason') or ''
if item.get('preprocess_status') == 'warning' and 'metadata_failed' in preprocess_reason:
return 'missing_tags'
return None
def _album_group_key(item: dict) -> str:
current_file_path = item.get('current_file_path') or item.get('original_path') or ''
if current_file_path:
return str(Path(current_file_path).parent)
relative_path = item.get('relative_path') or ''
return str(Path(relative_path).parent)
def _build_album_groups(items: list[dict]) -> dict[str, list[dict]]:
album_groups: dict[str, list[dict]] = {}
for item in items:
album_groups.setdefault(_album_group_key(item), []).append(item)
return album_groups