Files
MusicWorkshop/backend/app/scanner.py
T
2026-04-30 14:34:28 +08:00

323 lines
8.8 KiB
Python

import os
import time
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Callable
from .task_constants import (
SCAN_PROGRESS_BATCH_SIZE,
SCAN_PROGRESS_INTERVAL_SECONDS,
create_empty_scan_stats
)
COVER_IMAGE_EXTENSIONS = ('.jpg', '.png', '.jpeg', '.webp')
ALLOWED_AUDIO_EXTENSIONS = ('.flac', '.mp3', '.m4a', '.wav', '.ape', '.aac', '.ogg')
IGNORED_FILENAMES = ('.ds_store', 'thumbs.db', 'desktop.ini')
IGNORED_EXTENSIONS = ('.txt', '.nfo')
_COVER_BASENAMES = ('cover', 'folder')
_LYRIC_EXTENSION = '.lrc'
class ScannerError(Exception):
pass
@dataclass
class ScanItem:
original_path: str
relative_path: str
filename: str
extension: str
size_bytes: int | None
modified_at: str | None
local_cover: str | None
local_lyric: str | None
scan_status: str
scan_reason: str | None
scan_message: str | None
def to_dict(self) -> dict:
return asdict(self)
class Scanner:
def scan(
self,
input_dir: str,
*,
on_item: Callable[[ScanItem], None] | None = None,
on_progress: Callable[[dict[str, int]], None] | None = None,
on_log: Callable[[str, str, dict | None], None] | None = None
) -> dict[str, int]:
input_root = Path(input_dir).expanduser().resolve(strict=False)
if not input_root.exists():
raise ScannerError(f'扫描目录不存在: {input_root}')
if not input_root.is_dir():
raise ScannerError(f'扫描目录不是有效文件夹: {input_root}')
stats = create_empty_scan_stats()
processed_candidates = 0
last_progress_at = time.monotonic()
directory_stack = [input_root]
while directory_stack:
current_directory = directory_stack.pop()
try:
entries = sorted(
list(os.scandir(current_directory)),
key=lambda entry: entry.name.lower()
)
except OSError as error:
if on_log is not None:
on_log(
'error',
f'无法读取目录: {current_directory}',
{
'path': str(current_directory),
'error': str(error)
}
)
continue
for entry in entries:
entry_path = Path(entry.path)
if entry.is_symlink():
continue
if entry.is_dir(follow_symlinks=False):
directory_stack.append(entry_path)
continue
if not entry.is_file(follow_symlinks=False):
continue
filename_lower = entry.name.lower()
extension = entry_path.suffix.lower()
if filename_lower in IGNORED_FILENAMES or extension in IGNORED_EXTENSIONS:
stats['ignored_non_audio'] += 1
continue
if extension not in ALLOWED_AUDIO_EXTENSIONS:
stats['ignored_non_audio'] += 1
continue
stats['total_found'] += 1
processed_candidates += 1
item = self._build_item(input_root, entry_path, entry, extension)
if item.scan_status == 'queued':
stats['queued'] += 1
elif item.scan_status == 'skipped_locked':
stats['skipped_locked'] += 1
else:
stats['skipped_invalid'] += 1
if on_item is not None:
on_item(item)
now = time.monotonic()
if (
processed_candidates % SCAN_PROGRESS_BATCH_SIZE == 0
or now - last_progress_at >= SCAN_PROGRESS_INTERVAL_SECONDS
):
if on_progress is not None:
on_progress(stats.copy())
last_progress_at = now
if on_progress is not None:
on_progress(stats.copy())
return stats
def _build_item(
self,
input_root: Path,
entry_path: Path,
entry: os.DirEntry,
extension: str
) -> ScanItem:
absolute_path = entry_path.resolve(strict=False)
relative_path = absolute_path.relative_to(input_root).as_posix()
modified_at = None
size_bytes = None
try:
entry_stat = entry.stat(follow_symlinks=False)
size_bytes = entry_stat.st_size
modified_at = _format_timestamp(entry_stat.st_mtime)
except FileNotFoundError:
return ScanItem(
original_path=str(absolute_path),
relative_path=relative_path,
filename=entry_path.name,
extension=extension,
size_bytes=size_bytes,
modified_at=modified_at,
local_cover=None,
local_lyric=None,
scan_status='invalid',
scan_reason='path_disappeared',
scan_message='文件在扫描过程中消失'
)
except OSError:
return ScanItem(
original_path=str(absolute_path),
relative_path=relative_path,
filename=entry_path.name,
extension=extension,
size_bytes=size_bytes,
modified_at=modified_at,
local_cover=None,
local_lyric=None,
scan_status='invalid',
scan_reason='stat_failed',
scan_message='无法读取文件状态信息'
)
if time.time() - entry_stat.st_mtime < 60:
return ScanItem(
original_path=str(absolute_path),
relative_path=relative_path,
filename=entry_path.name,
extension=extension,
size_bytes=size_bytes,
modified_at=modified_at,
local_cover=None,
local_lyric=None,
scan_status='skipped_locked',
scan_reason='recent_mtime',
scan_message='文件最近 60 秒内仍在变更,已跳过'
)
if not os.access(absolute_path, os.R_OK) or not os.access(absolute_path, os.W_OK):
return ScanItem(
original_path=str(absolute_path),
relative_path=relative_path,
filename=entry_path.name,
extension=extension,
size_bytes=size_bytes,
modified_at=modified_at,
local_cover=None,
local_lyric=None,
scan_status='invalid',
scan_reason='permission_denied',
scan_message='当前进程缺少读写权限'
)
try:
with absolute_path.open('rb') as file_handle:
file_handle.read(1)
except FileNotFoundError:
return ScanItem(
original_path=str(absolute_path),
relative_path=relative_path,
filename=entry_path.name,
extension=extension,
size_bytes=size_bytes,
modified_at=modified_at,
local_cover=None,
local_lyric=None,
scan_status='invalid',
scan_reason='path_disappeared',
scan_message='文件在读取前已消失'
)
except OSError:
return ScanItem(
original_path=str(absolute_path),
relative_path=relative_path,
filename=entry_path.name,
extension=extension,
size_bytes=size_bytes,
modified_at=modified_at,
local_cover=None,
local_lyric=None,
scan_status='invalid',
scan_reason='unreadable',
scan_message='文件无法读取'
)
assets = probe_local_assets(absolute_path)
return ScanItem(
original_path=str(absolute_path),
relative_path=relative_path,
filename=entry_path.name,
extension=extension,
size_bytes=size_bytes,
modified_at=modified_at,
local_cover=assets['local_cover'],
local_lyric=assets['local_lyric'],
scan_status='queued',
scan_reason=None,
scan_message=None
)
def probe_local_assets(audio_path: str | Path) -> dict[str, str | None]:
audio_file = Path(audio_path)
audio_dir = audio_file.parent
if not audio_dir.exists() or not audio_dir.is_dir():
return {'local_cover': None, 'local_lyric': None}
files_by_lower_name = _index_regular_files(audio_dir)
normalized_stem = audio_file.stem.lower()
cover_candidates = [
f'{basename}{extension}'
for basename in _COVER_BASENAMES
for extension in COVER_IMAGE_EXTENSIONS
]
cover_candidates.extend(
f'{normalized_stem}{extension}' for extension in COVER_IMAGE_EXTENSIONS
)
cover_path = _first_existing_path(files_by_lower_name, cover_candidates)
lyric_path = _first_existing_path(
files_by_lower_name,
[f'{normalized_stem}{_LYRIC_EXTENSION}']
)
return {
'local_cover': str(cover_path) if cover_path else None,
'local_lyric': str(lyric_path) if lyric_path else None
}
def _index_regular_files(directory: Path) -> dict[str, Path]:
indexed_files: dict[str, Path] = {}
for entry in sorted(directory.iterdir(), key=lambda path: path.name.lower()):
if entry.is_symlink() or not entry.is_file():
continue
indexed_files.setdefault(entry.name.lower(), entry.resolve(strict=False))
return indexed_files
def _first_existing_path(
files_by_lower_name: dict[str, Path],
candidates: list[str]
) -> Path | None:
for candidate in candidates:
matched_path = files_by_lower_name.get(candidate.lower())
if matched_path is not None:
return matched_path
return None
def _format_timestamp(timestamp: float) -> str:
return (
datetime.fromtimestamp(timestamp, tz=timezone.utc)
.replace(microsecond=0)
.isoformat()
.replace('+00:00', 'Z')
)