400 lines
12 KiB
Python
400 lines
12 KiB
Python
import importlib
|
|
import json
|
|
import shutil
|
|
import subprocess
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
from .task_constants import TASK_WORKSPACE_ROOT
|
|
|
|
|
|
FORCED_FLAC_EXTENSIONS = {'.ape', '.wav', '.wma'}
|
|
|
|
|
|
class PreprocessDependencyError(Exception):
|
|
pass
|
|
|
|
|
|
class PreprocessItemError(Exception):
|
|
def __init__(self, reason: str, message: str):
|
|
super().__init__(message)
|
|
self.reason = reason
|
|
self.message = message
|
|
|
|
|
|
@dataclass
|
|
class CueTrack:
|
|
number: int
|
|
title: str | None
|
|
performer: str | None
|
|
index_seconds: float
|
|
|
|
|
|
@dataclass
|
|
class CueSheet:
|
|
cue_path: Path
|
|
album_title: str | None
|
|
album_performer: str | None
|
|
tracks: list[CueTrack]
|
|
|
|
|
|
class Preprocessor:
|
|
def check_dependencies(self) -> dict[str, str]:
|
|
binaries = {
|
|
'ffmpeg': shutil.which('ffmpeg'),
|
|
'ffprobe': shutil.which('ffprobe'),
|
|
'fpcalc': shutil.which('fpcalc')
|
|
}
|
|
missing_binaries = [name for name, path in binaries.items() if not path]
|
|
|
|
try:
|
|
importlib.import_module('mutagen')
|
|
except ModuleNotFoundError as error:
|
|
raise PreprocessDependencyError('缺少 Python 依赖 mutagen') from error
|
|
|
|
if missing_binaries:
|
|
raise PreprocessDependencyError(
|
|
f'缺少预处理依赖: {", ".join(sorted(missing_binaries))}'
|
|
)
|
|
|
|
return binaries
|
|
|
|
def create_workspace(self, task_id: str) -> dict[str, Path]:
|
|
preprocess_root = Path(TASK_WORKSPACE_ROOT) / task_id / 'preprocess'
|
|
split_dir = preprocess_root / 'split'
|
|
converted_dir = preprocess_root / 'converted'
|
|
covers_dir = preprocess_root / 'covers'
|
|
|
|
split_dir.mkdir(parents=True, exist_ok=True)
|
|
converted_dir.mkdir(parents=True, exist_ok=True)
|
|
covers_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
return {
|
|
'root': preprocess_root,
|
|
'split': split_dir,
|
|
'converted': converted_dir,
|
|
'covers': covers_dir
|
|
}
|
|
|
|
def find_matching_cue(self, audio_path: str) -> Path | None:
|
|
source_path = Path(audio_path)
|
|
directory = source_path.parent
|
|
target_name = f'{source_path.stem}.cue'.lower()
|
|
|
|
if not directory.exists():
|
|
return None
|
|
|
|
for entry in directory.iterdir():
|
|
if entry.is_symlink() or not entry.is_file():
|
|
continue
|
|
if entry.name.lower() == target_name:
|
|
return entry.resolve(strict=False)
|
|
|
|
return None
|
|
|
|
def parse_cue(self, cue_path: Path) -> CueSheet:
|
|
album_title = None
|
|
album_performer = None
|
|
tracks: list[CueTrack] = []
|
|
current_track: dict | None = None
|
|
|
|
for raw_line in cue_path.read_text(encoding='utf-8', errors='replace').splitlines():
|
|
line = raw_line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
keyword, _, remainder = line.partition(' ')
|
|
keyword = keyword.upper()
|
|
remainder = remainder.strip()
|
|
|
|
if keyword == 'TRACK':
|
|
if current_track and current_track.get('index_seconds') is not None:
|
|
tracks.append(CueTrack(**current_track))
|
|
parts = remainder.split()
|
|
if len(parts) < 2 or not parts[0].isdigit():
|
|
raise PreprocessItemError('split_failed', 'CUE TRACK 行格式无效')
|
|
current_track = {
|
|
'number': int(parts[0]),
|
|
'title': None,
|
|
'performer': None,
|
|
'index_seconds': None
|
|
}
|
|
elif keyword == 'TITLE':
|
|
value = _strip_cue_value(remainder)
|
|
if current_track is None:
|
|
album_title = value
|
|
else:
|
|
current_track['title'] = value
|
|
elif keyword == 'PERFORMER':
|
|
value = _strip_cue_value(remainder)
|
|
if current_track is None:
|
|
album_performer = value
|
|
else:
|
|
current_track['performer'] = value
|
|
elif keyword == 'INDEX' and current_track is not None:
|
|
parts = remainder.split()
|
|
if len(parts) >= 2 and parts[0] == '01':
|
|
current_track['index_seconds'] = _cue_time_to_seconds(parts[1])
|
|
|
|
if current_track and current_track.get('index_seconds') is not None:
|
|
tracks.append(CueTrack(**current_track))
|
|
|
|
if not tracks:
|
|
raise PreprocessItemError('split_failed', 'CUE 中未找到可用 TRACK/INDEX 记录')
|
|
|
|
return CueSheet(
|
|
cue_path=cue_path,
|
|
album_title=album_title,
|
|
album_performer=album_performer,
|
|
tracks=tracks
|
|
)
|
|
|
|
def split_cue_tracks(
|
|
self,
|
|
source_path: str,
|
|
cue_sheet: CueSheet,
|
|
output_dir: Path,
|
|
total_duration_seconds: float | None
|
|
) -> list[dict]:
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
generated_tracks: list[dict] = []
|
|
|
|
for index, track in enumerate(cue_sheet.tracks):
|
|
output_path = output_dir / f'track_{track.number:02d}.flac'
|
|
command = [
|
|
'ffmpeg',
|
|
'-y',
|
|
'-hide_banner',
|
|
'-loglevel',
|
|
'error',
|
|
'-ss',
|
|
str(track.index_seconds),
|
|
'-i',
|
|
source_path
|
|
]
|
|
|
|
next_track = cue_sheet.tracks[index + 1] if index + 1 < len(cue_sheet.tracks) else None
|
|
if next_track is not None:
|
|
command.extend(['-to', str(next_track.index_seconds - track.index_seconds)])
|
|
elif total_duration_seconds is not None:
|
|
command.extend(['-to', str(max(total_duration_seconds - track.index_seconds, 0.01))])
|
|
|
|
command.extend(['-map', '0:a:0', '-vn', '-c:a', 'flac', str(output_path)])
|
|
self._run_command(command, 'split_failed', f'CUE 切轨失败: {output_path.name}')
|
|
|
|
generated_tracks.append(
|
|
{
|
|
'path': str(output_path.resolve(strict=False)),
|
|
'filename': output_path.name,
|
|
'track_number': track.number,
|
|
'title': track.title,
|
|
'artist': track.performer or cue_sheet.album_performer,
|
|
'album': cue_sheet.album_title,
|
|
'album_artist': cue_sheet.album_performer
|
|
}
|
|
)
|
|
|
|
return generated_tracks
|
|
|
|
def convert_to_flac(self, source_path: str, output_path: Path) -> str:
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
command = [
|
|
'ffmpeg',
|
|
'-y',
|
|
'-hide_banner',
|
|
'-loglevel',
|
|
'error',
|
|
'-i',
|
|
source_path,
|
|
'-map',
|
|
'0:a:0',
|
|
'-vn',
|
|
'-c:a',
|
|
'flac',
|
|
str(output_path)
|
|
]
|
|
self._run_command(command, 'convert_failed', '音频转码失败')
|
|
return str(output_path.resolve(strict=False))
|
|
|
|
def probe_audio(self, file_path: str) -> dict:
|
|
command = [
|
|
'ffprobe',
|
|
'-v',
|
|
'error',
|
|
'-print_format',
|
|
'json',
|
|
'-show_format',
|
|
'-show_streams',
|
|
file_path
|
|
]
|
|
result = self._run_command(command, 'probe_failed', '音频信息探测失败')
|
|
|
|
try:
|
|
payload = json.loads(result.stdout)
|
|
except json.JSONDecodeError as error:
|
|
raise PreprocessItemError('probe_failed', 'ffprobe 输出无法解析') from error
|
|
|
|
audio_stream = next(
|
|
(stream for stream in payload.get('streams', []) if stream.get('codec_type') == 'audio'),
|
|
None
|
|
)
|
|
if audio_stream is None:
|
|
raise PreprocessItemError('probe_failed', '未找到可用音频流')
|
|
|
|
format_info = payload.get('format', {})
|
|
duration_seconds = _safe_float(format_info.get('duration')) or _safe_float(audio_stream.get('duration'))
|
|
|
|
return {
|
|
'format': (format_info.get('format_name') or Path(file_path).suffix.lstrip('.')).upper(),
|
|
'codec': (audio_stream.get('codec_name') or '').upper() or None,
|
|
'bitrate': _safe_int(audio_stream.get('bit_rate')) or _safe_int(format_info.get('bit_rate')),
|
|
'sample_rate': _safe_int(audio_stream.get('sample_rate')),
|
|
'bit_depth': _safe_int(audio_stream.get('bits_per_raw_sample')) or _safe_int(audio_stream.get('bits_per_sample')),
|
|
'channels': _safe_int(audio_stream.get('channels')),
|
|
'duration_seconds': round(duration_seconds, 3) if duration_seconds is not None else None
|
|
}
|
|
|
|
def read_tags(self, file_path: str) -> dict:
|
|
mutagen = importlib.import_module('mutagen')
|
|
tags_file = mutagen.File(file_path, easy=True)
|
|
|
|
if tags_file is None or not getattr(tags_file, 'tags', None):
|
|
return {}
|
|
|
|
def first_value(key: str) -> str | None:
|
|
value = tags_file.tags.get(key)
|
|
if isinstance(value, list) and value:
|
|
return str(value[0])
|
|
if value is not None:
|
|
return str(value)
|
|
return None
|
|
|
|
return {
|
|
'title': first_value('title'),
|
|
'artist': first_value('artist'),
|
|
'album': first_value('album'),
|
|
'album_artist': first_value('albumartist'),
|
|
'track_number': first_value('tracknumber'),
|
|
'disc_number': first_value('discnumber'),
|
|
'date': first_value('date'),
|
|
'genre': first_value('genre')
|
|
}
|
|
|
|
def extract_embedded_cover(self, file_path: str, output_path: Path) -> str | None:
|
|
mutagen = importlib.import_module('mutagen')
|
|
tags_file = mutagen.File(file_path)
|
|
|
|
if tags_file is None:
|
|
return None
|
|
|
|
image_bytes = None
|
|
if getattr(tags_file, 'pictures', None):
|
|
if tags_file.pictures:
|
|
image_bytes = tags_file.pictures[0].data
|
|
elif getattr(tags_file, 'tags', None):
|
|
tags = tags_file.tags
|
|
apic_keys = [key for key in tags.keys() if str(key).startswith('APIC')]
|
|
if apic_keys:
|
|
image_bytes = tags[apic_keys[0]].data
|
|
elif 'covr' in tags and tags['covr']:
|
|
image_bytes = bytes(tags['covr'][0])
|
|
|
|
if not image_bytes:
|
|
return None
|
|
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
output_path.write_bytes(image_bytes)
|
|
return str(output_path.resolve(strict=False))
|
|
|
|
def calculate_fingerprint(self, file_path: str) -> dict:
|
|
command = [
|
|
'fpcalc',
|
|
'-length',
|
|
'120',
|
|
file_path
|
|
]
|
|
result = self._run_command(command, 'fingerprint_failed', '声学指纹计算失败')
|
|
|
|
duration = None
|
|
fingerprint = None
|
|
for line in result.stdout.splitlines():
|
|
if line.startswith('DURATION='):
|
|
duration = _safe_float(line.split('=', 1)[1])
|
|
elif line.startswith('FINGERPRINT='):
|
|
fingerprint = line.split('=', 1)[1].strip()
|
|
|
|
if not fingerprint:
|
|
raise PreprocessItemError('fingerprint_failed', '未生成有效声学指纹')
|
|
|
|
return {
|
|
'fingerprint': fingerprint,
|
|
'duration_seconds': duration
|
|
}
|
|
|
|
def _run_command(
|
|
self,
|
|
command: list[str],
|
|
reason: str,
|
|
message: str
|
|
) -> subprocess.CompletedProcess[str]:
|
|
try:
|
|
return subprocess.run(
|
|
command,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
except subprocess.CalledProcessError as error:
|
|
stderr = error.stderr.strip() if error.stderr else ''
|
|
detail = f'{message}: {stderr}' if stderr else message
|
|
raise PreprocessItemError(reason, detail) from error
|
|
|
|
|
|
def build_preprocess_paths(task_id: str, item_id: int) -> dict[str, Path]:
|
|
root = Path(TASK_WORKSPACE_ROOT) / task_id / 'preprocess'
|
|
return {
|
|
'root': root,
|
|
'split': root / 'split' / str(item_id),
|
|
'converted': root / 'converted' / f'{item_id}.flac',
|
|
'cover': root / 'covers' / f'{item_id}.jpg'
|
|
}
|
|
|
|
|
|
def build_split_child_relative_path(parent_relative_path: str, filename: str) -> str:
|
|
parent_path = Path(parent_relative_path)
|
|
return (parent_path.parent / filename).as_posix()
|
|
|
|
|
|
def merge_tag_snapshots(primary: dict | None, fallback: dict | None) -> dict:
|
|
merged: dict[str, str | None] = {}
|
|
for source in (primary or {}, fallback or {}):
|
|
for key, value in source.items():
|
|
if value is not None and merged.get(key) in (None, ''):
|
|
merged[key] = value
|
|
return merged
|
|
|
|
|
|
def _cue_time_to_seconds(value: str) -> float:
|
|
minute, second, frame = value.split(':')
|
|
return int(minute) * 60 + int(second) + (int(frame) / 75.0)
|
|
|
|
|
|
def _strip_cue_value(value: str) -> str:
|
|
if value.startswith('"') and value.endswith('"'):
|
|
return value[1:-1]
|
|
return value
|
|
|
|
|
|
def _safe_int(value) -> int | None:
|
|
try:
|
|
return int(value) if value not in (None, '') else None
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
|
|
def _safe_float(value) -> float | None:
|
|
try:
|
|
return float(value) if value not in (None, '') else None
|
|
except (TypeError, ValueError):
|
|
return None
|