from pathlib import Path from backend.app.exception_service import ExceptionService from backend.app.library_postprocess import ( _build_prefixed_name, _build_quality_breakdown, _build_unique_destination, _serialize_compared_candidate, ) from backend.app.metadata_normalization import MetadataNormalizationService, can_ingest_metadata from backend.app.task_constants import current_timestamp class RepairExecutionError(Exception): def __init__(self, reason: str, message: str): super().__init__(message) self.reason = reason self.message = message class RepairExecutionService: def __init__( self, task_store, exception_service: ExceptionService, metadata_normalizer: MetadataNormalizationService, organize_service, match_service, preprocess_service, dedupe_service, ): self.task_store = task_store self.exception_service = exception_service self.metadata_normalizer = metadata_normalizer self.organize_service = organize_service self.match_service = match_service self.preprocess_service = preprocess_service self.dedupe_service = dedupe_service def apply_action_to_item( self, repair_task_id: str, exception_id: int, action: str, params: dict, config_snapshot: dict, stats: dict ) -> dict: item = self.task_store.get_exception_source_item(exception_id) if item is None: raise RepairExecutionError('item_missing', f'异常项不存在: {exception_id}') before_snapshot = self.exception_service.get_item(exception_id) final_item = item execution_result = {'action': action, 'status': 'completed'} if action == 'ignore_exception': stats['execute']['ignored_items'] += 1 resolution_status = 'ignored' workflow_state = 'ignored' elif action == 'edit_metadata': final_item, changed = self._apply_metadata_patch(item, params.get('metadata_patch') or {}) if changed: stats['execute']['updated_metadata_items'] += 1 resolution_status = 'open' workflow_state = self._metadata_workflow_state(final_item, params.get('metadata_patch') or {}) elif action == 'retry_match': final_item = self.match_service.retry_match(item, config_snapshot, providers=params.get('providers') or None) resolution_status = 'open' workflow_state = self._metadata_workflow_state(final_item) elif action == 'select_match_candidate': final_item = self.match_service.select_candidate(item, int(params.get('candidate_index', -1))) resolution_status = 'open' workflow_state = self._candidate_workflow_state(final_item) elif action == 'retry_preprocess': final_item = self.preprocess_service.retry_preprocess(item) resolution_status = 'resolved' if final_item.get('preprocess_status') != 'failed' else 'open' workflow_state = 'ingested' if resolution_status == 'resolved' else 'open' elif action == 'move_to_review_trash': trashed_path = self.organize_service.move_to_review_trash( trash_root=config_snapshot['trash'], task_id=repair_task_id, item_id=item['id'], source_path=item['current_file_path'], reason='manual_review', ) final_item = self.task_store.update_task_item( item['id'], is_active=0, current_file_path=trashed_path, trash_file_path=trashed_path, organize_status='trashed', organize_reason='manual_review', organize_message='已移入 review trash', ) stats['execute']['moved_items'] += 1 resolution_status = 'resolved' workflow_state = 'ingested' elif action == 'keep_existing': final_item = self.dedupe_service.keep_existing(item, task_id=repair_task_id, trash_root=config_snapshot['trash']) stats['execute']['moved_items'] += 1 resolution_status = 'resolved' workflow_state = 'ingested' elif action == 'replace_existing': final_item, execution_result = self.dedupe_service.replace_existing( item, task_id=repair_task_id, output_root=config_snapshot['output'], trash_root=config_snapshot['trash'] ) stats['execute']['moved_items'] += 1 resolution_status = 'resolved' workflow_state = 'ingested' elif action == 'keep_both_with_rename': final_item, execution_result = self.dedupe_service.keep_both_with_rename(item, output_root=config_snapshot['output']) stats['execute']['moved_items'] += 1 resolution_status = 'resolved' workflow_state = 'ingested' elif action == 'retry_organize': final_item, execution_result = self.organize_service.organize_item( item, output_root=config_snapshot['output'], override_relative_path=params.get('target_relative_path') ) stats['execute']['moved_items'] += 1 resolution_status = 'resolved' workflow_state = 'ingested' elif action == 'save_and_organize': patched_item, changed = self._apply_metadata_patch(item, params.get('metadata_patch') or {}) if changed: stats['execute']['updated_metadata_items'] += 1 if not can_ingest_metadata(self.metadata_normalizer.normalize_item(patched_item)): raise RepairExecutionError('metadata_incomplete', '加入音乐库前必须补齐 title、artist、album_artist') final_item, execution_result = self.organize_service.organize_item(patched_item, output_root=config_snapshot['output']) stats['execute']['moved_items'] += 1 resolution_status = 'resolved' workflow_state = 'ingested' elif action == 'delete_file': file_path = Path(item['current_file_path']) if not file_path.exists(): raise RepairExecutionError('source_missing', f'源文件不存在: {file_path}') file_path.unlink() final_item = self.task_store.update_task_item( item['id'], is_active=0, organize_status='deleted', organize_reason='manual_delete', organize_message='文件已被永久删除', ) resolution_status = 'resolved' workflow_state = 'deleted' else: raise ValueError(f'Unsupported action: {action}') after_snapshot = self._build_after_snapshot(before_snapshot, final_item, resolution_status) self._update_resolution(final_item, resolution_status, workflow_state, after_snapshot, execution_result, repair_task_id, params) return final_item def _apply_metadata_patch(self, item: dict, metadata_patch: dict) -> tuple[dict, bool]: patch = {key: value for key, value in (metadata_patch or {}).items() if value is not None} if not patch: return item, False normalized_metadata = self.metadata_normalizer.normalize_item(item, patch) merged_tags = dict(item.get('original_tags_json') or {}) merged_tags.update({key: value for key, value in normalized_metadata.items() if key in self._writable_keys()}) file_path = Path(item['current_file_path']) if file_path.exists(): self._write_tags(file_path, merged_tags) updated_item = self.task_store.update_task_item( item['id'], original_tags_json=merged_tags, matched_metadata_json=normalized_metadata ) return updated_item, True def _write_tags(self, file_path: Path, tags: dict): import mutagen tags_file = mutagen.File(str(file_path), easy=True) if tags_file is None: raise RepairExecutionError('metadata_write_failed', f'无法写入标签: {file_path}') key_mapping = { 'title': 'title', 'artist': 'artist', 'album': 'album', 'album_artist': 'albumartist', 'track_number': 'tracknumber', 'disc_number': 'discnumber', 'year': 'date', 'lyrics': 'lyrics', } for source_key, target_key in key_mapping.items(): if source_key not in tags: continue value = tags[source_key] if value in (None, ''): continue tags_file[target_key] = [str(value)] tags_file.save() def _writable_keys(self): return {'title', 'artist', 'album', 'album_artist', 'track_number', 'disc_number', 'year', 'lyrics'} def _build_after_snapshot(self, before_snapshot: dict, final_item: dict, resolution_status: str) -> dict: if resolution_status == 'open': return self.exception_service.get_item(before_snapshot.get('exception_id')) return { **before_snapshot, 'current_file_path': final_item.get('current_file_path'), 'trash_file_path': final_item.get('trash_file_path'), 'library_relative_path': final_item.get('library_relative_path'), 'library_file_path': final_item.get('library_file_path'), 'matched_metadata_json': final_item.get('matched_metadata_json'), 'original_tags_json': final_item.get('original_tags_json'), } def _update_resolution( self, final_item: dict, resolution_status: str, workflow_state: str, after_snapshot: dict, execution_result: dict, repair_task_id: str, params: dict, ): resolution = dict(final_item.get('exception_resolution_json') or {}) resolution.update( { 'resolved_at': current_timestamp(), 'workflow_state': workflow_state, 'metadata_draft': self._build_metadata_draft(final_item, params.get('metadata_patch') or {}, workflow_state), 'after_snapshot': after_snapshot, 'execution_result': execution_result, } ) self.task_store.update_task_item( final_item['id'], exception_resolution_status=resolution_status, exception_resolution_json=resolution, last_repair_task_id=repair_task_id, ) def _metadata_workflow_state(self, item: dict, metadata_patch: dict | None = None) -> str: metadata = item.get('matched_metadata_json') or {} if metadata_patch: metadata = {**metadata, **{key: value for key, value in metadata_patch.items() if value is not None}} return 'ready_to_ingest' if can_ingest_metadata(metadata) else 'open' def _candidate_workflow_state(self, item: dict) -> str: return 'ready_to_ingest' if can_ingest_metadata(item.get('matched_metadata_json') or {}) else 'candidate_selected' def _build_metadata_draft(self, item: dict, metadata_patch: dict | None, workflow_state: str) -> dict | None: if workflow_state not in {'candidate_selected', 'ready_to_ingest'} and not metadata_patch: return None metadata = dict(item.get('matched_metadata_json') or {}) metadata.update({key: value for key, value in (metadata_patch or {}).items() if value is not None}) return metadata