Files
MusicWorkshop/backend/app/library_index.py
T
2026-04-30 14:34:28 +08:00

331 lines
9.5 KiB
Python

import hashlib
import importlib
import re
import unicodedata
from datetime import datetime, timezone
from pathlib import Path
from .scanner import ALLOWED_AUDIO_EXTENSIONS
LOSSLESS_EXTENSIONS = {'.flac', '.wav', '.ape', '.aiff', '.alac'}
PRESERVED_VERSION_TOKENS = {
'live',
'remix',
'demo',
'karaoke',
'instrumental',
'cover'
}
def build_library_index(
output_dir: str,
*,
probe_audio,
read_tags
) -> dict:
items = scan_library_items(
output_dir,
probe_audio=probe_audio,
read_tags=read_tags
)
by_basis = {
'recording_id': {},
'release_track': {},
'text_duration': {}
}
for item in items:
for basis, identity_key in identity_keys_for_item(item, include_fingerprint=False):
by_basis.setdefault(basis, {}).setdefault(identity_key, []).append(item)
return {
'count': len(items),
'items': items,
'by_basis': by_basis
}
def scan_library_items(
output_dir: str,
*,
probe_audio,
read_tags
) -> list[dict]:
output_root = Path(output_dir).expanduser().resolve(strict=False)
items: list[dict] = []
if not output_root.exists() or not output_root.is_dir():
return items
for file_path in sorted(output_root.rglob('*')):
if not file_path.is_file() or file_path.suffix.lower() not in ALLOWED_AUDIO_EXTENSIONS:
continue
stat = file_path.stat()
modified_at = format_timestamp(stat.st_mtime)
relative_path = file_path.relative_to(output_root).as_posix()
absolute_path = str(file_path.resolve(strict=False))
audio_props = safe_probe_audio(probe_audio, absolute_path)
tags = safe_read_tags(read_tags, absolute_path)
items.append(
{
'track_id': build_track_id(relative_path, stat.st_size, modified_at),
'id': None,
'current_file_path': absolute_path,
'file_path': absolute_path,
'library_file_path': absolute_path,
'relative_path': relative_path,
'library_relative_path': relative_path,
'filename': file_path.name,
'extension': file_path.suffix.lower(),
'size_bytes': stat.st_size,
'modified_at': modified_at,
'audio_props_json': audio_props,
'match_confidence': None,
'match_is_authoritative': True,
'matched_metadata_json': build_library_metadata(tags, audio_props, file_path),
'acoustic_fingerprint': None,
'fingerprint_duration_seconds': audio_props.get('duration_seconds'),
'created_at': None
}
)
return items
def count_suspected_duplicates(items: list[dict]) -> int:
duplicate_count = 0
for group in group_items_by_identity(items):
if len(group) > 1:
duplicate_count += len(group) - 1
return duplicate_count
def group_items_by_identity(items: list[dict]) -> list[list[dict]]:
groups: list[list[dict]] = []
indexed_groups: dict[tuple[str, str], list[dict]] = {}
for item in items:
identity_keys = identity_keys_for_item(item, include_fingerprint=False)
if not identity_keys:
continue
target_group = None
for identity_key in identity_keys:
target_group = indexed_groups.get(identity_key)
if target_group is not None:
break
if target_group is None:
target_group = [item]
groups.append(target_group)
else:
target_group.append(item)
for identity_key in identity_keys:
indexed_groups.setdefault(identity_key, target_group)
return groups
def build_track_id(relative_path: str, size_bytes: int | None, modified_at: str | None) -> str:
digest = hashlib.sha1(
f'{relative_path}|{size_bytes or 0}|{modified_at or ""}'.encode('utf-8')
).hexdigest()
return digest
def safe_probe_audio(probe_audio, file_path: str) -> dict:
try:
return probe_audio(file_path) or {}
except Exception:
return {}
def default_read_library_tags(file_path: str) -> dict:
mutagen = importlib.import_module('mutagen')
tags_file = mutagen.File(file_path, easy=False)
if tags_file is None or not getattr(tags_file, 'tags', None):
return {}
normalized = {}
for key, value in tags_file.tags.items():
normalized[normalize_tag_key(key)] = coerce_tag_value(value)
return normalized
def safe_read_tags(read_tags, file_path: str) -> dict:
try:
return read_tags(file_path) or {}
except Exception:
return {}
def build_library_metadata(tags: dict, audio_props: dict, file_path: Path) -> dict:
release_date = first_non_empty(tags.get('date'), tags.get('year'))
return {
'title': first_non_empty(tags.get('title'), file_path.stem),
'artist': first_non_empty(tags.get('artist'), tags.get('albumartist')),
'artists': split_artists(tags.get('artist')),
'album': tags.get('album'),
'album_artist': first_non_empty(tags.get('albumartist'), tags.get('artist')),
'track_number': parse_track_number(tags.get('tracknumber')),
'disc_number': parse_track_number(tags.get('discnumber')),
'release_date': release_date,
'year': extract_year(release_date),
'duration_seconds': audio_props.get('duration_seconds'),
'recording_id': first_non_empty(
tags.get('musicbrainzrecordingid'),
tags.get('musicbrainztrackid')
),
'release_id': first_non_empty(
tags.get('musicbrainzalbumid'),
tags.get('musicbrainzreleaseid')
),
'release_group_id': tags.get('musicbrainzreleasegroupid'),
'source_ids': {
key: value
for key, value in {
'musicbrainz_recording_id': first_non_empty(
tags.get('musicbrainzrecordingid'),
tags.get('musicbrainztrackid')
),
'musicbrainz_release_id': first_non_empty(
tags.get('musicbrainzalbumid'),
tags.get('musicbrainzreleaseid')
),
'musicbrainz_release_group_id': tags.get('musicbrainzreleasegroupid')
}.items()
if value
}
}
def identity_keys_for_item(item: dict, *, include_fingerprint: bool) -> list[tuple[str, str]]:
metadata = item.get('matched_metadata_json') or {}
keys: list[tuple[str, str]] = []
if metadata.get('recording_id'):
keys.append(('recording_id', str(metadata['recording_id'])))
release_id = metadata.get('release_id')
disc_number = parse_track_number(metadata.get('disc_number'))
track_number = parse_track_number(metadata.get('track_number'))
if release_id and disc_number and track_number:
keys.append(('release_track', f'{release_id}|{disc_number}|{track_number}'))
title_key = normalize_identity_text(metadata.get('title'))
artist_key = normalize_identity_text(
first_non_empty(metadata.get('artist'), metadata.get('album_artist'))
)
duration_seconds = first_non_empty(
metadata.get('duration_seconds'),
item.get('fingerprint_duration_seconds'),
(item.get('audio_props_json') or {}).get('duration_seconds')
)
if title_key and artist_key and duration_seconds not in (None, ''):
title_versions = extract_preserved_version_tokens(metadata.get('title'))
keys.append((
'text_duration',
f'{title_key}|{artist_key}|{duration_bucket(duration_seconds)}|{",".join(sorted(title_versions))}'
))
if include_fingerprint and not keys:
fingerprint = item.get('acoustic_fingerprint')
if fingerprint and duration_seconds not in (None, ''):
keys.append(('fingerprint_duration', f'{fingerprint}|{duration_bucket(duration_seconds)}'))
return keys
def choose_primary_identity(item: dict) -> tuple[str | None, str | None]:
keys = identity_keys_for_item(item, include_fingerprint=False)
if keys:
return keys[0]
fallback_keys = identity_keys_for_item(item, include_fingerprint=True)
return fallback_keys[0] if fallback_keys else (None, None)
def format_timestamp(timestamp: float) -> str:
return (
datetime.fromtimestamp(timestamp, tz=timezone.utc)
.replace(microsecond=0)
.isoformat()
.replace('+00:00', 'Z')
)
def normalize_identity_text(value: str | None) -> str:
if not value:
return ''
normalized = unicodedata.normalize('NFKC', str(value)).lower()
normalized = re.sub(r'\b(feat|ft|featuring)\.?\b', ' ', normalized)
normalized = re.sub(r'\bversion\b', ' ', normalized)
normalized = re.sub(r'[^a-z0-9]+', ' ', normalized)
return ' '.join(normalized.split())
def extract_preserved_version_tokens(value: str | None) -> set[str]:
normalized = normalize_identity_text(value)
return {token for token in normalized.split() if token in PRESERVED_VERSION_TOKENS}
def normalize_tag_key(value: str) -> str:
return re.sub(r'[^a-z0-9]+', '', value.lower())
def coerce_tag_value(value) -> str | None:
if isinstance(value, list):
if not value:
return None
return str(value[0])
if isinstance(value, bytes):
return value.decode('utf-8', errors='ignore')
return str(value) if value not in (None, '') else None
def parse_track_number(value) -> int | None:
if value in (None, ''):
return None
match = re.search(r'\d+', str(value))
return int(match.group(0)) if match else None
def extract_year(value) -> int | None:
if value in (None, ''):
return None
match = re.search(r'(\d{4})', str(value))
return int(match.group(1)) if match else None
def duration_bucket(value) -> int | None:
duration = safe_float(value)
return int(round(duration)) if duration is not None else None
def safe_float(value) -> float | None:
if value in (None, ''):
return None
try:
return float(value)
except (TypeError, ValueError):
return None
def first_non_empty(*values):
for value in values:
if value not in (None, ''):
return value
return None
def split_artists(value: str | None) -> list[str]:
if not value:
return []
return [part.strip() for part in re.split(r'[,/&;]+', str(value)) if part.strip()]