Files
2026-04-30 14:34:28 +08:00

395 lines
14 KiB
Python

import os
import tempfile
import unittest
from pathlib import Path
os.environ['MUSIC_WORKSHOP_DB_PATH'] = str(
Path(tempfile.gettempdir()) / f'music_workshop_task_api_{next(tempfile._get_candidate_names())}.db'
)
from backend.app.task_store import TaskStore
try:
from backend.app.schemas import TaskHistoryListResponse
import backend.app.main as main_module
except ModuleNotFoundError as error:
main_module = None
TaskHistoryListResponse = None
FASTAPI_IMPORT_ERROR = error
else:
FASTAPI_IMPORT_ERROR = None
class TaskStoreTests(unittest.TestCase):
def setUp(self):
self.db_path = Path(os.environ['MUSIC_WORKSHOP_DB_PATH'])
if self.db_path.exists():
self.db_path.unlink()
self.task_store = TaskStore(self.db_path)
self._item_index = 0
def test_get_task_items_filters_new_status_fields_and_serializes_postprocess_fields(self):
task = self.task_store.create_task_if_idle(
{
'input': '/tmp/input',
'output': '/tmp/output',
'trash': '/tmp/trash'
}
)
self.task_store.insert_task_item(
task['task_id'],
original_path='/tmp/source-1.flac',
current_file_path='/tmp/source-1.flac',
relative_path='Artist/Album/01.flac',
filename='01.flac',
extension='.flac',
size_bytes=123,
modified_at='2024-01-01T00:00:00Z',
local_cover=None,
local_lyric=None,
scan_status='queued',
scan_reason=None,
scan_message=None,
preprocess_status='completed',
match_status='matched',
match_reason='authoritative_auto_match',
match_message='matched',
match_source='musicbrainz',
match_confidence=92.5,
match_is_authoritative=1,
matched_metadata_json={
'title': 'Song Title',
'artist': 'Song Artist',
'artists': ['Song Artist'],
'album': 'Album Name',
'album_artist': 'Song Artist',
'track_number': 1,
'disc_number': 1,
'release_date': '2024-01-01',
'year': 2024,
'duration_seconds': 201,
'recording_id': 'recording-1',
'release_id': 'release-1',
'release_group_id': 'group-1',
'source_ids': {'musicbrainz_recording_id': 'recording-1'}
},
match_candidates_json=[
{
'provider': 'musicbrainz',
'score': 92.5,
'score_breakdown': {'title': 20},
'is_authoritative': True,
'recording_id': 'recording-1',
'release_id': 'release-1',
'release_group_id': 'group-1',
'source_ids': {'musicbrainz_recording_id': 'recording-1'}
}
],
match_enrichment_json={
'cover': {'selected_source': None, 'candidates': []},
'lyrics': {'selected_source': None, 'candidates': []},
'genres': {'selected_source': None, 'candidates': []},
'tags': {'selected_source': None, 'candidates': []}
},
dedupe_status='unique',
dedupe_reason=None,
dedupe_message='kept',
dedupe_group_key='recording-1',
dedupe_decision_json={
'comparison_scope': 'library',
'identity_basis': 'recording_id',
'kept_side': 'batch'
},
organize_status='organized',
organize_reason=None,
organize_message='organized',
library_relative_path='S/Song Artist/Album Name/01 - Song Title.flac',
library_file_path='/tmp/output/S/Song Artist/Album Name/01 - Song Title.flac',
trash_file_path=None,
organize_decision_json={
'source_path': '/tmp/source-1.flac',
'final_relative_path': 'S/Song Artist/Album Name/01 - Song Title.flac',
'final_action': 'organized'
}
)
self.task_store.insert_task_item(
task['task_id'],
original_path='/tmp/source-2.flac',
current_file_path='/tmp/source-2.flac',
relative_path='Artist/Album/02.flac',
filename='02.flac',
extension='.flac',
size_bytes=123,
modified_at='2024-01-01T00:00:00Z',
local_cover=None,
local_lyric=None,
scan_status='queued',
scan_reason=None,
scan_message=None,
preprocess_status='completed',
match_status='low_score',
match_reason='score_gap_too_small',
match_message='low score'
)
response = self.task_store.list_task_items(
task['task_id'],
scan_status=None,
preprocess_status=None,
match_status='matched',
dedupe_status='unique',
organize_status='organized',
page=1,
page_size=10,
active_only=False
)
self.assertEqual(response['total'], 1)
self.assertEqual(response['items'][0]['match_status'], 'matched')
self.assertEqual(response['items'][0]['match_source'], 'musicbrainz')
self.assertEqual(response['items'][0]['matched_metadata_json']['release_id'], 'release-1')
self.assertEqual(response['items'][0]['match_candidates_json'][0]['provider'], 'musicbrainz')
self.assertEqual(response['items'][0]['dedupe_status'], 'unique')
self.assertEqual(response['items'][0]['dedupe_decision_json']['identity_basis'], 'recording_id')
self.assertEqual(response['items'][0]['organize_status'], 'organized')
self.assertEqual(
response['items'][0]['organize_decision_json']['final_relative_path'],
'S/Song Artist/Album Name/01 - Song Title.flac'
)
def test_list_task_history_returns_empty_when_no_terminal_tasks(self):
response = self.task_store.list_task_history(page=1, page_size=8)
self.assertEqual(response, {'items': [], 'page': 1, 'page_size': 8, 'total': 0})
def test_list_task_history_only_returns_completed_and_failed_tasks(self):
completed_task = self._create_terminal_task('completed', '2024-01-01T08:00:00Z')
failed_task = self._create_terminal_task('failed', '2024-01-02T08:00:00Z')
active_task = self.task_store.create_task_if_idle(
{
'input': '/tmp/input',
'output': '/tmp/output',
'trash': '/tmp/trash'
}
)
self.task_store.update_task(active_task['task_id'], status='running')
response = self.task_store.list_task_history(page=1, page_size=10)
self.assertEqual(response['total'], 2)
self.assertEqual(
[item['task_id'] for item in response['items']],
[failed_task['task_id'], completed_task['task_id']]
)
def test_list_task_history_orders_by_started_at_desc(self):
oldest = self._create_terminal_task('completed', '2024-01-01T08:00:00Z')
middle = self._create_terminal_task('completed', '2024-01-01T12:00:00Z')
newest = self._create_terminal_task('failed', '2024-01-02T09:30:00Z')
response = self.task_store.list_task_history(page=1, page_size=10)
self.assertEqual(
[item['task_id'] for item in response['items']],
[newest['task_id'], middle['task_id'], oldest['task_id']]
)
def test_list_task_history_aggregates_counts_and_report_status(self):
all_success = self._create_terminal_task('completed', '2024-01-01T08:00:00Z')
self._insert_history_item(all_success['task_id'])
self._insert_history_item(all_success['task_id'])
partial_success = self._create_terminal_task('completed', '2024-01-02T08:00:00Z')
self._insert_history_item(partial_success['task_id'])
self._insert_history_item(
partial_success['task_id'],
match_status='low_score',
match_reason='score_gap_too_small',
match_message='匹配分过低',
dedupe_status='pending',
organize_status='pending'
)
self._insert_history_item(
partial_success['task_id'],
preprocess_status='failed',
preprocess_reason='convert_failed',
preprocess_message='音频转码失败',
match_status='pending',
dedupe_status='pending',
organize_status='pending'
)
failed_task = self._create_terminal_task('failed', '2024-01-03T08:00:00Z')
self._insert_history_item(failed_task['task_id'])
response = self.task_store.list_task_history(page=1, page_size=10)
items_by_task_id = {item['task_id']: item for item in response['items']}
self.assertEqual(items_by_task_id[all_success['task_id']]['total_items'], 2)
self.assertEqual(items_by_task_id[all_success['task_id']]['success_items'], 2)
self.assertEqual(items_by_task_id[all_success['task_id']]['exception_items'], 0)
self.assertEqual(items_by_task_id[all_success['task_id']]['report_status'], 'success')
self.assertEqual(items_by_task_id[partial_success['task_id']]['total_items'], 3)
self.assertEqual(items_by_task_id[partial_success['task_id']]['success_items'], 1)
self.assertEqual(items_by_task_id[partial_success['task_id']]['exception_items'], 2)
self.assertEqual(items_by_task_id[partial_success['task_id']]['report_status'], 'warning')
self.assertEqual(items_by_task_id[failed_task['task_id']]['total_items'], 1)
self.assertEqual(items_by_task_id[failed_task['task_id']]['success_items'], 1)
self.assertEqual(items_by_task_id[failed_task['task_id']]['exception_items'], 0)
self.assertEqual(items_by_task_id[failed_task['task_id']]['report_status'], 'warning')
def test_list_task_history_paginates_results(self):
first = self._create_terminal_task('completed', '2024-01-01T08:00:00Z')
second = self._create_terminal_task('completed', '2024-01-02T08:00:00Z')
third = self._create_terminal_task('failed', '2024-01-03T08:00:00Z')
first_page = self.task_store.list_task_history(page=1, page_size=2)
second_page = self.task_store.list_task_history(page=2, page_size=2)
self.assertEqual(first_page['total'], 3)
self.assertEqual(first_page['page'], 1)
self.assertEqual(first_page['page_size'], 2)
self.assertEqual(len(first_page['items']), 2)
self.assertEqual(
[item['task_id'] for item in first_page['items']],
[third['task_id'], second['task_id']]
)
self.assertEqual(second_page['total'], 3)
self.assertEqual(second_page['page'], 2)
self.assertEqual(second_page['page_size'], 2)
self.assertEqual(
[item['task_id'] for item in second_page['items']],
[first['task_id']]
)
def _create_terminal_task(self, status: str, started_at: str) -> dict:
task = self.task_store.create_task_if_idle(
{
'input': '/tmp/input',
'output': '/tmp/output',
'trash': '/tmp/trash'
}
)
self.task_store.update_task(task['task_id'], status=status, completed_at=started_at)
with self.task_store._connect() as connection:
connection.execute(
'''
UPDATE task_runs
SET started_at = ?, updated_at = ?, completed_at = ?
WHERE id = ?
''',
(started_at, started_at, started_at, task['task_id'])
)
connection.commit()
return self.task_store.get_task(task['task_id'])
def _insert_history_item(self, task_id: str, **overrides) -> dict:
self._item_index += 1
item_index = self._item_index
fields = {
'original_path': f'/tmp/source-{item_index}.flac',
'relative_path': f'Artist/Album/{item_index:02d}.flac',
'filename': f'{item_index:02d}.flac',
'extension': '.flac',
'size_bytes': 123,
'modified_at': '2024-01-01T00:00:00Z',
'local_cover': None,
'local_lyric': None,
'scan_status': 'queued',
'scan_reason': None,
'scan_message': None,
'preprocess_status': 'completed',
'preprocess_reason': None,
'preprocess_message': None,
'match_status': 'matched',
'match_reason': None,
'match_message': None,
'dedupe_status': 'unique',
'dedupe_reason': None,
'dedupe_message': '未发现重复项',
'organize_status': 'organized',
'organize_reason': None,
'organize_message': '已按标准路径入库'
}
fields.update(overrides)
return self.task_store.insert_task_item(task_id, **fields)
@unittest.skipIf(main_module is None, f'api deps unavailable: {FASTAPI_IMPORT_ERROR}')
class TaskHistoryApiTests(unittest.TestCase):
def setUp(self):
self.previous_task_store = main_module.task_store
def tearDown(self):
main_module.task_store = self.previous_task_store
def test_get_tasks_returns_paginated_history_payload(self):
fake_store = _FakeTaskStore(
{
'items': [
{
'task_id': 'task-2',
'started_at': '2024-01-03T12:00:00Z',
'status': 'failed',
'total_items': 5,
'success_items': 3,
'exception_items': 2,
'report_status': 'warning'
},
{
'task_id': 'task-1',
'started_at': '2024-01-02T12:00:00Z',
'status': 'completed',
'total_items': 4,
'success_items': 4,
'exception_items': 0,
'report_status': 'success'
}
],
'page': 2,
'page_size': 2,
'total': 7
}
)
main_module.task_store = fake_store
response = main_module.get_tasks(page=2, page_size=2)
payload = TaskHistoryListResponse.model_validate(response)
self.assertEqual(payload.page, 2)
self.assertEqual(payload.page_size, 2)
self.assertEqual(payload.total, 7)
self.assertEqual(payload.items[0].task_id, 'task-2')
self.assertEqual(payload.items[0].report_status, 'warning')
self.assertEqual(payload.items[1].report_status, 'success')
self.assertEqual(fake_store.calls, [{'page': 2, 'page_size': 2}])
def test_get_tasks_returns_empty_payload(self):
fake_store = _FakeTaskStore({'items': [], 'page': 1, 'page_size': 8, 'total': 0})
main_module.task_store = fake_store
response = main_module.get_tasks(page=1, page_size=8)
payload = TaskHistoryListResponse.model_validate(response)
self.assertEqual(payload.items, [])
self.assertEqual(payload.total, 0)
self.assertEqual(fake_store.calls, [{'page': 1, 'page_size': 8}])
class _FakeTaskStore:
def __init__(self, response: dict):
self.response = response
self.calls: list[dict] = []
def list_task_history(self, page: int, page_size: int) -> dict:
self.calls.append({'page': page, 'page_size': page_size})
return self.response
if __name__ == '__main__':
unittest.main()