import os import time from dataclasses import asdict, dataclass from datetime import datetime, timezone from pathlib import Path from typing import Callable from .task_constants import ( SCAN_PROGRESS_BATCH_SIZE, SCAN_PROGRESS_INTERVAL_SECONDS, create_empty_scan_stats ) COVER_IMAGE_EXTENSIONS = ('.jpg', '.png', '.jpeg', '.webp') ALLOWED_AUDIO_EXTENSIONS = ('.flac', '.mp3', '.m4a', '.wav', '.ape', '.aac', '.ogg') IGNORED_FILENAMES = ('.ds_store', 'thumbs.db', 'desktop.ini') IGNORED_EXTENSIONS = ('.txt', '.nfo') _COVER_BASENAMES = ('cover', 'folder') _LYRIC_EXTENSION = '.lrc' class ScannerError(Exception): pass @dataclass class ScanItem: original_path: str relative_path: str filename: str extension: str size_bytes: int | None modified_at: str | None local_cover: str | None local_lyric: str | None scan_status: str scan_reason: str | None scan_message: str | None def to_dict(self) -> dict: return asdict(self) class Scanner: def scan( self, input_dir: str, *, on_item: Callable[[ScanItem], None] | None = None, on_progress: Callable[[dict[str, int]], None] | None = None, on_log: Callable[[str, str, dict | None], None] | None = None ) -> dict[str, int]: input_root = Path(input_dir).expanduser().resolve(strict=False) if not input_root.exists(): raise ScannerError(f'扫描目录不存在: {input_root}') if not input_root.is_dir(): raise ScannerError(f'扫描目录不是有效文件夹: {input_root}') stats = create_empty_scan_stats() processed_candidates = 0 last_progress_at = time.monotonic() directory_stack = [input_root] while directory_stack: current_directory = directory_stack.pop() try: entries = sorted( list(os.scandir(current_directory)), key=lambda entry: entry.name.lower() ) except OSError as error: if on_log is not None: on_log( 'error', f'无法读取目录: {current_directory}', { 'path': str(current_directory), 'error': str(error) } ) continue for entry in entries: entry_path = Path(entry.path) if entry.is_symlink(): continue if entry.is_dir(follow_symlinks=False): directory_stack.append(entry_path) continue if not entry.is_file(follow_symlinks=False): continue filename_lower = entry.name.lower() extension = entry_path.suffix.lower() if filename_lower in IGNORED_FILENAMES or extension in IGNORED_EXTENSIONS: stats['ignored_non_audio'] += 1 continue if extension not in ALLOWED_AUDIO_EXTENSIONS: stats['ignored_non_audio'] += 1 continue stats['total_found'] += 1 processed_candidates += 1 item = self._build_item(input_root, entry_path, entry, extension) if item.scan_status == 'queued': stats['queued'] += 1 elif item.scan_status == 'skipped_locked': stats['skipped_locked'] += 1 else: stats['skipped_invalid'] += 1 if on_item is not None: on_item(item) now = time.monotonic() if ( processed_candidates % SCAN_PROGRESS_BATCH_SIZE == 0 or now - last_progress_at >= SCAN_PROGRESS_INTERVAL_SECONDS ): if on_progress is not None: on_progress(stats.copy()) last_progress_at = now if on_progress is not None: on_progress(stats.copy()) return stats def _build_item( self, input_root: Path, entry_path: Path, entry: os.DirEntry, extension: str ) -> ScanItem: absolute_path = entry_path.resolve(strict=False) relative_path = absolute_path.relative_to(input_root).as_posix() modified_at = None size_bytes = None try: entry_stat = entry.stat(follow_symlinks=False) size_bytes = entry_stat.st_size modified_at = _format_timestamp(entry_stat.st_mtime) except FileNotFoundError: return ScanItem( original_path=str(absolute_path), relative_path=relative_path, filename=entry_path.name, extension=extension, size_bytes=size_bytes, modified_at=modified_at, local_cover=None, local_lyric=None, scan_status='invalid', scan_reason='path_disappeared', scan_message='文件在扫描过程中消失' ) except OSError: return ScanItem( original_path=str(absolute_path), relative_path=relative_path, filename=entry_path.name, extension=extension, size_bytes=size_bytes, modified_at=modified_at, local_cover=None, local_lyric=None, scan_status='invalid', scan_reason='stat_failed', scan_message='无法读取文件状态信息' ) if time.time() - entry_stat.st_mtime < 60: return ScanItem( original_path=str(absolute_path), relative_path=relative_path, filename=entry_path.name, extension=extension, size_bytes=size_bytes, modified_at=modified_at, local_cover=None, local_lyric=None, scan_status='skipped_locked', scan_reason='recent_mtime', scan_message='文件最近 60 秒内仍在变更,已跳过' ) if not os.access(absolute_path, os.R_OK) or not os.access(absolute_path, os.W_OK): return ScanItem( original_path=str(absolute_path), relative_path=relative_path, filename=entry_path.name, extension=extension, size_bytes=size_bytes, modified_at=modified_at, local_cover=None, local_lyric=None, scan_status='invalid', scan_reason='permission_denied', scan_message='当前进程缺少读写权限' ) try: with absolute_path.open('rb') as file_handle: file_handle.read(1) except FileNotFoundError: return ScanItem( original_path=str(absolute_path), relative_path=relative_path, filename=entry_path.name, extension=extension, size_bytes=size_bytes, modified_at=modified_at, local_cover=None, local_lyric=None, scan_status='invalid', scan_reason='path_disappeared', scan_message='文件在读取前已消失' ) except OSError: return ScanItem( original_path=str(absolute_path), relative_path=relative_path, filename=entry_path.name, extension=extension, size_bytes=size_bytes, modified_at=modified_at, local_cover=None, local_lyric=None, scan_status='invalid', scan_reason='unreadable', scan_message='文件无法读取' ) assets = probe_local_assets(absolute_path) return ScanItem( original_path=str(absolute_path), relative_path=relative_path, filename=entry_path.name, extension=extension, size_bytes=size_bytes, modified_at=modified_at, local_cover=assets['local_cover'], local_lyric=assets['local_lyric'], scan_status='queued', scan_reason=None, scan_message=None ) def probe_local_assets(audio_path: str | Path) -> dict[str, str | None]: audio_file = Path(audio_path) audio_dir = audio_file.parent if not audio_dir.exists() or not audio_dir.is_dir(): return {'local_cover': None, 'local_lyric': None} files_by_lower_name = _index_regular_files(audio_dir) normalized_stem = audio_file.stem.lower() cover_candidates = [ f'{basename}{extension}' for basename in _COVER_BASENAMES for extension in COVER_IMAGE_EXTENSIONS ] cover_candidates.extend( f'{normalized_stem}{extension}' for extension in COVER_IMAGE_EXTENSIONS ) cover_path = _first_existing_path(files_by_lower_name, cover_candidates) lyric_path = _first_existing_path( files_by_lower_name, [f'{normalized_stem}{_LYRIC_EXTENSION}'] ) return { 'local_cover': str(cover_path) if cover_path else None, 'local_lyric': str(lyric_path) if lyric_path else None } def _index_regular_files(directory: Path) -> dict[str, Path]: indexed_files: dict[str, Path] = {} for entry in sorted(directory.iterdir(), key=lambda path: path.name.lower()): if entry.is_symlink() or not entry.is_file(): continue indexed_files.setdefault(entry.name.lower(), entry.resolve(strict=False)) return indexed_files def _first_existing_path( files_by_lower_name: dict[str, Path], candidates: list[str] ) -> Path | None: for candidate in candidates: matched_path = files_by_lower_name.get(candidate.lower()) if matched_path is not None: return matched_path return None def _format_timestamp(timestamp: float) -> str: return ( datetime.fromtimestamp(timestamp, tz=timezone.utc) .replace(microsecond=0) .isoformat() .replace('+00:00', 'Z') )