import hashlib import importlib import re import unicodedata from datetime import datetime, timezone from pathlib import Path from .scanner import ALLOWED_AUDIO_EXTENSIONS LOSSLESS_EXTENSIONS = {'.flac', '.wav', '.ape', '.aiff', '.alac'} PRESERVED_VERSION_TOKENS = { 'live', 'remix', 'demo', 'karaoke', 'instrumental', 'cover' } def build_library_index( output_dir: str, *, probe_audio, read_tags ) -> dict: items = scan_library_items( output_dir, probe_audio=probe_audio, read_tags=read_tags ) by_basis = { 'recording_id': {}, 'release_track': {}, 'text_duration': {} } for item in items: for basis, identity_key in identity_keys_for_item(item, include_fingerprint=False): by_basis.setdefault(basis, {}).setdefault(identity_key, []).append(item) return { 'count': len(items), 'items': items, 'by_basis': by_basis } def scan_library_items( output_dir: str, *, probe_audio, read_tags ) -> list[dict]: output_root = Path(output_dir).expanduser().resolve(strict=False) items: list[dict] = [] if not output_root.exists() or not output_root.is_dir(): return items for file_path in sorted(output_root.rglob('*')): if not file_path.is_file() or file_path.suffix.lower() not in ALLOWED_AUDIO_EXTENSIONS: continue stat = file_path.stat() modified_at = format_timestamp(stat.st_mtime) relative_path = file_path.relative_to(output_root).as_posix() absolute_path = str(file_path.resolve(strict=False)) audio_props = safe_probe_audio(probe_audio, absolute_path) tags = safe_read_tags(read_tags, absolute_path) items.append( { 'track_id': build_track_id(relative_path, stat.st_size, modified_at), 'id': None, 'current_file_path': absolute_path, 'file_path': absolute_path, 'library_file_path': absolute_path, 'relative_path': relative_path, 'library_relative_path': relative_path, 'filename': file_path.name, 'extension': file_path.suffix.lower(), 'size_bytes': stat.st_size, 'modified_at': modified_at, 'audio_props_json': audio_props, 'match_confidence': None, 'match_is_authoritative': True, 'matched_metadata_json': build_library_metadata(tags, audio_props, file_path), 'acoustic_fingerprint': None, 'fingerprint_duration_seconds': audio_props.get('duration_seconds'), 'created_at': None } ) return items def count_suspected_duplicates(items: list[dict]) -> int: duplicate_count = 0 for group in group_items_by_identity(items): if len(group) > 1: duplicate_count += len(group) - 1 return duplicate_count def group_items_by_identity(items: list[dict]) -> list[list[dict]]: groups: list[list[dict]] = [] indexed_groups: dict[tuple[str, str], list[dict]] = {} for item in items: identity_keys = identity_keys_for_item(item, include_fingerprint=False) if not identity_keys: continue target_group = None for identity_key in identity_keys: target_group = indexed_groups.get(identity_key) if target_group is not None: break if target_group is None: target_group = [item] groups.append(target_group) else: target_group.append(item) for identity_key in identity_keys: indexed_groups.setdefault(identity_key, target_group) return groups def build_track_id(relative_path: str, size_bytes: int | None, modified_at: str | None) -> str: digest = hashlib.sha1( f'{relative_path}|{size_bytes or 0}|{modified_at or ""}'.encode('utf-8') ).hexdigest() return digest def safe_probe_audio(probe_audio, file_path: str) -> dict: try: return probe_audio(file_path) or {} except Exception: return {} def default_read_library_tags(file_path: str) -> dict: mutagen = importlib.import_module('mutagen') tags_file = mutagen.File(file_path, easy=False) if tags_file is None or not getattr(tags_file, 'tags', None): return {} normalized = {} for key, value in tags_file.tags.items(): normalized[normalize_tag_key(key)] = coerce_tag_value(value) return normalized def safe_read_tags(read_tags, file_path: str) -> dict: try: return read_tags(file_path) or {} except Exception: return {} def build_library_metadata(tags: dict, audio_props: dict, file_path: Path) -> dict: release_date = first_non_empty(tags.get('date'), tags.get('year')) return { 'title': first_non_empty(tags.get('title'), file_path.stem), 'artist': first_non_empty(tags.get('artist'), tags.get('albumartist')), 'artists': split_artists(tags.get('artist')), 'album': tags.get('album'), 'album_artist': first_non_empty(tags.get('albumartist'), tags.get('artist')), 'track_number': parse_track_number(tags.get('tracknumber')), 'disc_number': parse_track_number(tags.get('discnumber')), 'release_date': release_date, 'year': extract_year(release_date), 'duration_seconds': audio_props.get('duration_seconds'), 'recording_id': first_non_empty( tags.get('musicbrainzrecordingid'), tags.get('musicbrainztrackid') ), 'release_id': first_non_empty( tags.get('musicbrainzalbumid'), tags.get('musicbrainzreleaseid') ), 'release_group_id': tags.get('musicbrainzreleasegroupid'), 'source_ids': { key: value for key, value in { 'musicbrainz_recording_id': first_non_empty( tags.get('musicbrainzrecordingid'), tags.get('musicbrainztrackid') ), 'musicbrainz_release_id': first_non_empty( tags.get('musicbrainzalbumid'), tags.get('musicbrainzreleaseid') ), 'musicbrainz_release_group_id': tags.get('musicbrainzreleasegroupid') }.items() if value } } def identity_keys_for_item(item: dict, *, include_fingerprint: bool) -> list[tuple[str, str]]: metadata = item.get('matched_metadata_json') or {} keys: list[tuple[str, str]] = [] if metadata.get('recording_id'): keys.append(('recording_id', str(metadata['recording_id']))) release_id = metadata.get('release_id') disc_number = parse_track_number(metadata.get('disc_number')) track_number = parse_track_number(metadata.get('track_number')) if release_id and disc_number and track_number: keys.append(('release_track', f'{release_id}|{disc_number}|{track_number}')) title_key = normalize_identity_text(metadata.get('title')) artist_key = normalize_identity_text( first_non_empty(metadata.get('artist'), metadata.get('album_artist')) ) duration_seconds = first_non_empty( metadata.get('duration_seconds'), item.get('fingerprint_duration_seconds'), (item.get('audio_props_json') or {}).get('duration_seconds') ) if title_key and artist_key and duration_seconds not in (None, ''): title_versions = extract_preserved_version_tokens(metadata.get('title')) keys.append(( 'text_duration', f'{title_key}|{artist_key}|{duration_bucket(duration_seconds)}|{",".join(sorted(title_versions))}' )) if include_fingerprint and not keys: fingerprint = item.get('acoustic_fingerprint') if fingerprint and duration_seconds not in (None, ''): keys.append(('fingerprint_duration', f'{fingerprint}|{duration_bucket(duration_seconds)}')) return keys def choose_primary_identity(item: dict) -> tuple[str | None, str | None]: keys = identity_keys_for_item(item, include_fingerprint=False) if keys: return keys[0] fallback_keys = identity_keys_for_item(item, include_fingerprint=True) return fallback_keys[0] if fallback_keys else (None, None) def format_timestamp(timestamp: float) -> str: return ( datetime.fromtimestamp(timestamp, tz=timezone.utc) .replace(microsecond=0) .isoformat() .replace('+00:00', 'Z') ) def normalize_identity_text(value: str | None) -> str: if not value: return '' normalized = unicodedata.normalize('NFKC', str(value)).lower() normalized = re.sub(r'\b(feat|ft|featuring)\.?\b', ' ', normalized) normalized = re.sub(r'\bversion\b', ' ', normalized) normalized = re.sub(r'[^a-z0-9]+', ' ', normalized) return ' '.join(normalized.split()) def extract_preserved_version_tokens(value: str | None) -> set[str]: normalized = normalize_identity_text(value) return {token for token in normalized.split() if token in PRESERVED_VERSION_TOKENS} def normalize_tag_key(value: str) -> str: return re.sub(r'[^a-z0-9]+', '', value.lower()) def coerce_tag_value(value) -> str | None: if isinstance(value, list): if not value: return None return str(value[0]) if isinstance(value, bytes): return value.decode('utf-8', errors='ignore') return str(value) if value not in (None, '') else None def parse_track_number(value) -> int | None: if value in (None, ''): return None match = re.search(r'\d+', str(value)) return int(match.group(0)) if match else None def extract_year(value) -> int | None: if value in (None, ''): return None match = re.search(r'(\d{4})', str(value)) return int(match.group(1)) if match else None def duration_bucket(value) -> int | None: duration = safe_float(value) return int(round(duration)) if duration is not None else None def safe_float(value) -> float | None: if value in (None, ''): return None try: return float(value) except (TypeError, ValueError): return None def first_non_empty(*values): for value in values: if value not in (None, ''): return value return None def split_artists(value: str | None) -> list[str]: if not value: return [] return [part.strip() for part in re.split(r'[,/&;]+', str(value)) if part.strip()]