Files
MusicWorkshop/backend/tests/test_task_runner_preprocess.py
2026-04-30 14:34:28 +08:00

541 lines
19 KiB
Python

import math
import os
import struct
import tempfile
import unittest
import wave
from pathlib import Path
from unittest.mock import patch
from backend.app.matcher import MatchProviderError
from backend.app.preprocessor import PreprocessDependencyError, Preprocessor
from backend.app.scanner import Scanner
from backend.app.task_runner import TaskRunner
from backend.app.task_store import TaskStore
from backend.app.task_stream import TaskStreamManager
class TaskRunnerPreprocessTests(unittest.TestCase):
def test_task_runner_completes_full_pipeline(self):
root = Path(tempfile.mkdtemp())
input_dir = root / 'input'
output_dir = root / 'output'
trash_dir = root / 'trash'
input_dir.mkdir()
output_dir.mkdir()
trash_dir.mkdir()
source_file = input_dir / 'Artist' / 'Album' / '01.wav'
source_file.parent.mkdir(parents=True)
_write_wave_file(source_file, duration_seconds=8)
stable_timestamp = source_file.stat().st_mtime - 120
os.utime(source_file, (stable_timestamp, stable_timestamp))
db_path = root / 'music_workshop.db'
task_store = TaskStore(db_path)
task_runner = TaskRunner(
task_store,
Scanner(),
Preprocessor(),
TaskStreamManager(),
matcher=StaticMatcher()
)
config_snapshot = {
'input': str(input_dir),
'output': str(output_dir),
'trash': str(trash_dir)
}
task = task_store.create_task_if_idle(config_snapshot)
task_runner.start_task(task['task_id'], config_snapshot)
persisted_task = task_store.get_task(task['task_id'])
persisted_items = task_store.list_task_items(task['task_id'], None, 1, 20)['items']
self.assertEqual(persisted_task['status'], 'completed')
self.assertEqual(persisted_task['current_stage'], 'complete')
self.assertEqual(persisted_task['stage_states']['preprocess'], 'completed')
self.assertEqual(persisted_task['stage_states']['match'], 'completed')
self.assertEqual(persisted_task['stage_states']['dedupe'], 'completed')
self.assertEqual(persisted_task['stage_states']['organize'], 'completed')
self.assertEqual(persisted_task['stats']['scan']['queued'], 1)
self.assertEqual(persisted_task['stats']['preprocess']['input_items'], 1)
self.assertEqual(persisted_task['stats']['preprocess']['converted_items'], 1)
self.assertEqual(persisted_task['stats']['preprocess']['output_items'], 1)
self.assertEqual(persisted_task['stats']['match']['matched_authoritative'], 1)
self.assertEqual(persisted_task['stats']['dedupe']['kept_items'], 1)
self.assertEqual(persisted_task['stats']['organize']['moved_items'], 1)
self.assertEqual(len(persisted_items), 1)
item = persisted_items[0]
self.assertEqual(item['preprocess_status'], 'completed')
self.assertEqual(item['match_status'], 'matched')
self.assertEqual(item['dedupe_status'], 'unique')
self.assertEqual(item['organize_status'], 'organized')
self.assertTrue(item['current_file_path'].endswith('.flac'))
self.assertTrue(Path(item['current_file_path']).exists())
self.assertTrue(str(output_dir) in item['current_file_path'])
self.assertEqual(item['audio_props_json']['codec'], 'FLAC')
self.assertTrue(item['acoustic_fingerprint'])
self.assertEqual(item['matched_metadata_json']['release_id'], 'release-1')
self.assertEqual(item['library_relative_path'], 'M/Matched Artist/Matched Album/01 - Matched Song.flac')
def test_task_runner_fails_fast_when_preprocess_dependencies_are_missing(self):
root = Path(tempfile.mkdtemp())
input_dir = root / 'input'
output_dir = root / 'output'
trash_dir = root / 'trash'
input_dir.mkdir()
output_dir.mkdir()
trash_dir.mkdir()
source_file = input_dir / 'single.flac'
source_file.write_bytes(b'not-real-audio')
stable_timestamp = source_file.stat().st_mtime - 120
os.utime(source_file, (stable_timestamp, stable_timestamp))
db_path = root / 'music_workshop.db'
task_store = TaskStore(db_path)
preprocessor = Preprocessor()
task_runner = TaskRunner(
task_store,
Scanner(),
preprocessor,
TaskStreamManager(),
matcher=StaticMatcher()
)
config_snapshot = {
'input': str(input_dir),
'output': str(output_dir),
'trash': str(trash_dir)
}
task = task_store.create_task_if_idle(config_snapshot)
with patch.object(
preprocessor,
'check_dependencies',
side_effect=PreprocessDependencyError('missing preprocess dependencies')
):
task_runner.start_task(task['task_id'], config_snapshot)
persisted_task = task_store.get_task(task['task_id'])
self.assertEqual(persisted_task['status'], 'failed')
self.assertEqual(persisted_task['current_stage'], 'preprocess')
self.assertEqual(persisted_task['stage_states']['preprocess'], 'failed')
self.assertEqual(
persisted_task['error_message'],
'missing preprocess dependencies'
)
def test_task_runner_marks_match_item_failed_when_provider_errors(self):
root = Path(tempfile.mkdtemp())
input_dir = root / 'input'
output_dir = root / 'output'
trash_dir = root / 'trash'
input_dir.mkdir()
output_dir.mkdir()
trash_dir.mkdir()
source_file = input_dir / 'Artist' / 'Album' / '01.wav'
source_file.parent.mkdir(parents=True)
_write_wave_file(source_file, duration_seconds=8)
stable_timestamp = source_file.stat().st_mtime - 120
os.utime(source_file, (stable_timestamp, stable_timestamp))
db_path = root / 'music_workshop.db'
task_store = TaskStore(db_path)
task_runner = TaskRunner(
task_store,
Scanner(),
Preprocessor(),
TaskStreamManager(),
matcher=ErrorMatcher()
)
config_snapshot = {
'input': str(input_dir),
'output': str(output_dir),
'trash': str(trash_dir)
}
task = task_store.create_task_if_idle(config_snapshot)
task_runner.start_task(task['task_id'], config_snapshot)
persisted_task = task_store.get_task(task['task_id'])
persisted_items = task_store.list_task_items(task['task_id'], None, 1, 20)['items']
self.assertEqual(persisted_task['status'], 'completed')
self.assertEqual(persisted_task['stats']['match']['failed_items'], 1)
self.assertEqual(persisted_items[0]['match_status'], 'failed')
self.assertEqual(persisted_items[0]['match_reason'], 'provider_error')
self.assertFalse(source_file.exists())
self.assertFalse(persisted_items[0]['is_active'])
self.assertTrue(Path(persisted_items[0]['current_file_path']).exists())
self.assertTrue(Path(persisted_items[0]['trash_file_path']).exists())
self.assertIn('/trash/match_failed/', persisted_items[0]['current_file_path'])
def test_task_runner_quarantines_historical_exception_before_scan(self):
root = Path(tempfile.mkdtemp())
input_dir = root / 'input'
output_dir = root / 'output'
trash_dir = root / 'trash'
input_dir.mkdir()
output_dir.mkdir()
trash_dir.mkdir()
source_file = input_dir / 'failed.flac'
source_file.write_bytes(b'audio')
stable_timestamp = source_file.stat().st_mtime - 120
os.utime(source_file, (stable_timestamp, stable_timestamp))
db_path = root / 'music_workshop.db'
task_store = TaskStore(db_path)
config_snapshot = {
'input': str(input_dir),
'output': str(output_dir),
'trash': str(trash_dir)
}
previous_task = task_store.create_task_if_idle(config_snapshot)
previous_item = task_store.insert_task_item(
previous_task['task_id'],
original_path=str(source_file),
current_file_path=str(source_file),
relative_path='failed.flac',
filename='failed.flac',
extension='.flac',
size_bytes=source_file.stat().st_size,
modified_at=None,
local_cover=None,
local_lyric=None,
scan_status='queued',
scan_reason=None,
scan_message=None,
preprocess_status='completed',
match_status='failed',
match_reason='provider_error',
match_message='provider exploded'
)
task_store.update_task(previous_task['task_id'], status='completed', current_stage='complete')
task = task_store.create_task_if_idle(config_snapshot)
task_runner = TaskRunner(
task_store,
Scanner(),
Preprocessor(),
TaskStreamManager(),
matcher=StaticMatcher()
)
task_runner.start_task(task['task_id'], config_snapshot)
persisted_task = task_store.get_task(task['task_id'])
quarantined_item = task_store.get_exception_source_item(previous_item['id'])
self.assertEqual(persisted_task['status'], 'completed')
self.assertEqual(persisted_task['stats']['scan']['total_found'], 0)
self.assertEqual(persisted_task['stats']['scan']['queued'], 0)
self.assertFalse(source_file.exists())
self.assertFalse(quarantined_item['is_active'])
self.assertTrue(Path(quarantined_item['trash_file_path']).exists())
self.assertIn('/trash/match_failed/', quarantined_item['trash_file_path'])
def test_task_runner_skips_exception_with_existing_trash_path(self):
root = Path(tempfile.mkdtemp())
input_dir = root / 'input'
output_dir = root / 'output'
trash_dir = root / 'trash'
input_dir.mkdir()
output_dir.mkdir()
trash_dir.mkdir()
source_file = input_dir / 'duplicate.flac'
source_file.write_bytes(b'audio')
existing_trash = trash_dir / 'duplicates' / 'old-task' / '1_duplicate.flac'
existing_trash.parent.mkdir(parents=True)
existing_trash.write_bytes(b'audio')
db_path = root / 'music_workshop.db'
task_store = TaskStore(db_path)
config_snapshot = {
'input': str(input_dir),
'output': str(output_dir),
'trash': str(trash_dir)
}
task = task_store.create_task_if_idle(config_snapshot)
item = task_store.insert_task_item(
task['task_id'],
original_path=str(source_file),
current_file_path=str(existing_trash),
relative_path='duplicate.flac',
filename='duplicate.flac',
extension='.flac',
size_bytes=source_file.stat().st_size,
modified_at=None,
local_cover=None,
local_lyric=None,
scan_status='queued',
scan_reason=None,
scan_message=None,
preprocess_status='completed',
match_status='matched',
dedupe_status='duplicate_trashed',
trash_file_path=str(existing_trash)
)
task_runner = TaskRunner(
task_store,
Scanner(),
Preprocessor(),
TaskStreamManager(),
matcher=StaticMatcher()
)
task_runner._quarantine_exception_items(task['task_id'], config_snapshot, scope='current')
persisted_item = task_store.get_exception_source_item(item['id'])
self.assertTrue(source_file.exists())
self.assertTrue(existing_trash.exists())
self.assertEqual(persisted_item['trash_file_path'], str(existing_trash))
def test_task_runner_quarantines_converted_exception_and_original_source(self):
root = Path(tempfile.mkdtemp())
input_dir = root / 'input'
output_dir = root / 'output'
trash_dir = root / 'trash'
workspace_dir = root / 'workspace'
input_dir.mkdir()
output_dir.mkdir()
trash_dir.mkdir()
workspace_dir.mkdir()
source_file = input_dir / 'source.wav'
converted_file = workspace_dir / 'source.flac'
source_file.write_bytes(b'wav')
converted_file.write_bytes(b'flac')
db_path = root / 'music_workshop.db'
task_store = TaskStore(db_path)
config_snapshot = {
'input': str(input_dir),
'output': str(output_dir),
'trash': str(trash_dir)
}
task = task_store.create_task_if_idle(config_snapshot)
item = task_store.insert_task_item(
task['task_id'],
original_path=str(source_file),
current_file_path=str(converted_file),
relative_path='source.wav',
filename='source.flac',
extension='.flac',
size_bytes=converted_file.stat().st_size,
modified_at=None,
local_cover=None,
local_lyric=None,
scan_status='queued',
scan_reason=None,
scan_message=None,
preprocess_status='completed',
match_status='low_score',
match_reason='score_below_threshold',
match_message='候选分数不足'
)
task_runner = TaskRunner(
task_store,
Scanner(),
Preprocessor(),
TaskStreamManager(),
matcher=StaticMatcher()
)
task_runner._quarantine_exception_items(task['task_id'], config_snapshot, scope='current')
quarantined_item = task_store.get_exception_source_item(item['id'])
moved_paths = sorted((trash_dir / 'low_score' / task['task_id']).glob('*'))
self.assertFalse(source_file.exists())
self.assertFalse(converted_file.exists())
self.assertEqual(len(moved_paths), 2)
self.assertFalse(quarantined_item['is_active'])
self.assertTrue(Path(quarantined_item['current_file_path']).exists())
self.assertTrue(Path(quarantined_item['trash_file_path']).exists())
self.assertEqual(
Path(quarantined_item['current_file_path']).name,
f'{item["id"]}_source.flac'
)
def test_task_runner_logs_provider_warnings_and_continues_matching(self):
root = Path(tempfile.mkdtemp())
input_dir = root / 'input'
output_dir = root / 'output'
trash_dir = root / 'trash'
input_dir.mkdir()
output_dir.mkdir()
trash_dir.mkdir()
source_file = input_dir / 'Artist' / 'Album' / '01.wav'
source_file.parent.mkdir(parents=True)
_write_wave_file(source_file, duration_seconds=8)
stable_timestamp = source_file.stat().st_mtime - 120
os.utime(source_file, (stable_timestamp, stable_timestamp))
db_path = root / 'music_workshop.db'
task_store = TaskStore(db_path)
task_runner = TaskRunner(
task_store,
Scanner(),
Preprocessor(),
TaskStreamManager(),
matcher=WarningMatcher()
)
config_snapshot = {
'input': str(input_dir),
'output': str(output_dir),
'trash': str(trash_dir)
}
task = task_store.create_task_if_idle(config_snapshot)
task_runner.start_task(task['task_id'], config_snapshot)
persisted_task = task_store.get_task(task['task_id'])
persisted_items = task_store.list_task_items(task['task_id'], None, 1, 20)['items']
persisted_logs = task_store.list_task_logs(task['task_id'], 1, 50)['logs']
self.assertEqual(persisted_task['status'], 'completed')
self.assertEqual(persisted_task['stats']['match']['matched_fallback'], 1)
self.assertEqual(persisted_task['stats']['match']['provider_warnings'], 2)
self.assertEqual(persisted_task['stats']['match']['failed_items'], 0)
self.assertEqual(persisted_items[0]['match_status'], 'matched_fallback')
self.assertEqual(
[log['event_type'] for log in persisted_logs if log['event_type'] == 'match.provider_skipped'],
['match.provider_skipped', 'match.provider_skipped']
)
class StaticMatcher:
def match_item(self, item, album_group, config):
return {
'status': 'matched',
'reason': 'authoritative_auto_match',
'message': '静态测试匹配成功',
'source': 'musicbrainz',
'confidence': 93.5,
'is_authoritative': True,
'matched_metadata_json': {
'title': 'Matched Song',
'artist': 'Matched Artist',
'artists': ['Matched Artist'],
'album': 'Matched Album',
'album_artist': 'Matched Artist',
'track_number': 1,
'disc_number': 1,
'release_date': '2024-01-01',
'year': 2024,
'duration_seconds': 8.0,
'recording_id': 'recording-1',
'release_id': 'release-1',
'release_group_id': 'release-group-1',
'source_ids': {'musicbrainz_recording_id': 'recording-1'}
},
'match_candidates_json': [
{
'provider': 'musicbrainz',
'score': 93.5,
'score_breakdown': {'title': 20},
'is_authoritative': True,
'recording_id': 'recording-1',
'release_id': 'release-1',
'release_group_id': 'release-group-1',
'source_ids': {'musicbrainz_recording_id': 'recording-1'}
}
],
'match_enrichment_json': {
'cover': {'selected_source': None, 'candidates': []},
'lyrics': {'selected_source': None, 'candidates': []},
'genres': {'selected_source': None, 'candidates': []},
'tags': {'selected_source': None, 'candidates': []}
},
'provider_warnings': []
}
class ErrorMatcher:
def match_item(self, item, album_group, config):
raise MatchProviderError('musicbrainz', 'provider exploded')
class WarningMatcher:
def match_item(self, item, album_group, config):
return {
'status': 'matched_fallback',
'reason': 'fallback_auto_match',
'message': 'Fallback 候选自动匹配成功,得分 88.0',
'source': 'qq',
'confidence': 88.0,
'is_authoritative': False,
'matched_metadata_json': {
'title': 'Matched Song',
'artist': 'Matched Artist',
'artists': ['Matched Artist'],
'album': 'Matched Album',
'album_artist': 'Matched Artist',
'track_number': 1,
'disc_number': 1,
'release_date': '2024-01-01',
'year': 2024,
'duration_seconds': 8.0,
'recording_id': None,
'release_id': None,
'release_group_id': None,
'source_ids': {'qq_song_mid': 'song-1'}
},
'match_candidates_json': [
{
'provider': 'qq',
'score': 88.0,
'score_breakdown': {'title': 20},
'is_authoritative': False,
'recording_id': None,
'release_id': None,
'release_group_id': None,
'source_ids': {'qq_song_mid': 'song-1'}
}
],
'match_enrichment_json': {
'cover': {'selected_source': None, 'candidates': []},
'lyrics': {'selected_source': None, 'candidates': []},
'genres': {'selected_source': None, 'candidates': []},
'tags': {'selected_source': None, 'candidates': []}
},
'provider_warnings': [
{
'provider': 'acoustid',
'message': 'acoustid 请求失败 (HTTP 400) {"error":{"code":4,"message":"invalid API key"}}'
},
{
'provider': 'spotify',
'message': 'spotify 请求失败 (HTTP 403) Active premium subscription required for the owner of the app.'
}
]
}
def _write_wave_file(path: Path, *, duration_seconds: int):
sample_rate = 44100
frequency = 440.0
amplitude = 16000
total_frames = sample_rate * duration_seconds
with wave.open(str(path), 'wb') as handle:
handle.setnchannels(1)
handle.setsampwidth(2)
handle.setframerate(sample_rate)
frames = bytearray()
for index in range(total_frames):
sample = int(amplitude * math.sin((2.0 * math.pi * frequency * index) / sample_rate))
frames.extend(struct.pack('<h', sample))
handle.writeframes(bytes(frames))
if __name__ == '__main__':
unittest.main()