331 lines
9.5 KiB
Python
331 lines
9.5 KiB
Python
import hashlib
|
|
import importlib
|
|
import re
|
|
import unicodedata
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from .scanner import ALLOWED_AUDIO_EXTENSIONS
|
|
|
|
|
|
LOSSLESS_EXTENSIONS = {'.flac', '.wav', '.ape', '.aiff', '.alac'}
|
|
PRESERVED_VERSION_TOKENS = {
|
|
'live',
|
|
'remix',
|
|
'demo',
|
|
'karaoke',
|
|
'instrumental',
|
|
'cover'
|
|
}
|
|
|
|
|
|
def build_library_index(
|
|
output_dir: str,
|
|
*,
|
|
probe_audio,
|
|
read_tags
|
|
) -> dict:
|
|
items = scan_library_items(
|
|
output_dir,
|
|
probe_audio=probe_audio,
|
|
read_tags=read_tags
|
|
)
|
|
by_basis = {
|
|
'recording_id': {},
|
|
'release_track': {},
|
|
'text_duration': {}
|
|
}
|
|
|
|
for item in items:
|
|
for basis, identity_key in identity_keys_for_item(item, include_fingerprint=False):
|
|
by_basis.setdefault(basis, {}).setdefault(identity_key, []).append(item)
|
|
|
|
return {
|
|
'count': len(items),
|
|
'items': items,
|
|
'by_basis': by_basis
|
|
}
|
|
|
|
|
|
def scan_library_items(
|
|
output_dir: str,
|
|
*,
|
|
probe_audio,
|
|
read_tags
|
|
) -> list[dict]:
|
|
output_root = Path(output_dir).expanduser().resolve(strict=False)
|
|
items: list[dict] = []
|
|
|
|
if not output_root.exists() or not output_root.is_dir():
|
|
return items
|
|
|
|
for file_path in sorted(output_root.rglob('*')):
|
|
if not file_path.is_file() or file_path.suffix.lower() not in ALLOWED_AUDIO_EXTENSIONS:
|
|
continue
|
|
|
|
stat = file_path.stat()
|
|
modified_at = format_timestamp(stat.st_mtime)
|
|
relative_path = file_path.relative_to(output_root).as_posix()
|
|
absolute_path = str(file_path.resolve(strict=False))
|
|
audio_props = safe_probe_audio(probe_audio, absolute_path)
|
|
tags = safe_read_tags(read_tags, absolute_path)
|
|
|
|
items.append(
|
|
{
|
|
'track_id': build_track_id(relative_path, stat.st_size, modified_at),
|
|
'id': None,
|
|
'current_file_path': absolute_path,
|
|
'file_path': absolute_path,
|
|
'library_file_path': absolute_path,
|
|
'relative_path': relative_path,
|
|
'library_relative_path': relative_path,
|
|
'filename': file_path.name,
|
|
'extension': file_path.suffix.lower(),
|
|
'size_bytes': stat.st_size,
|
|
'modified_at': modified_at,
|
|
'audio_props_json': audio_props,
|
|
'match_confidence': None,
|
|
'match_is_authoritative': True,
|
|
'matched_metadata_json': build_library_metadata(tags, audio_props, file_path),
|
|
'acoustic_fingerprint': None,
|
|
'fingerprint_duration_seconds': audio_props.get('duration_seconds'),
|
|
'created_at': None
|
|
}
|
|
)
|
|
|
|
return items
|
|
|
|
|
|
def count_suspected_duplicates(items: list[dict]) -> int:
|
|
duplicate_count = 0
|
|
|
|
for group in group_items_by_identity(items):
|
|
if len(group) > 1:
|
|
duplicate_count += len(group) - 1
|
|
|
|
return duplicate_count
|
|
|
|
|
|
def group_items_by_identity(items: list[dict]) -> list[list[dict]]:
|
|
groups: list[list[dict]] = []
|
|
indexed_groups: dict[tuple[str, str], list[dict]] = {}
|
|
|
|
for item in items:
|
|
identity_keys = identity_keys_for_item(item, include_fingerprint=False)
|
|
if not identity_keys:
|
|
continue
|
|
|
|
target_group = None
|
|
for identity_key in identity_keys:
|
|
target_group = indexed_groups.get(identity_key)
|
|
if target_group is not None:
|
|
break
|
|
|
|
if target_group is None:
|
|
target_group = [item]
|
|
groups.append(target_group)
|
|
else:
|
|
target_group.append(item)
|
|
|
|
for identity_key in identity_keys:
|
|
indexed_groups.setdefault(identity_key, target_group)
|
|
|
|
return groups
|
|
|
|
|
|
def build_track_id(relative_path: str, size_bytes: int | None, modified_at: str | None) -> str:
|
|
digest = hashlib.sha1(
|
|
f'{relative_path}|{size_bytes or 0}|{modified_at or ""}'.encode('utf-8')
|
|
).hexdigest()
|
|
return digest
|
|
|
|
|
|
def safe_probe_audio(probe_audio, file_path: str) -> dict:
|
|
try:
|
|
return probe_audio(file_path) or {}
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def default_read_library_tags(file_path: str) -> dict:
|
|
mutagen = importlib.import_module('mutagen')
|
|
tags_file = mutagen.File(file_path, easy=False)
|
|
if tags_file is None or not getattr(tags_file, 'tags', None):
|
|
return {}
|
|
|
|
normalized = {}
|
|
for key, value in tags_file.tags.items():
|
|
normalized[normalize_tag_key(key)] = coerce_tag_value(value)
|
|
return normalized
|
|
|
|
|
|
def safe_read_tags(read_tags, file_path: str) -> dict:
|
|
try:
|
|
return read_tags(file_path) or {}
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def build_library_metadata(tags: dict, audio_props: dict, file_path: Path) -> dict:
|
|
release_date = first_non_empty(tags.get('date'), tags.get('year'))
|
|
return {
|
|
'title': first_non_empty(tags.get('title'), file_path.stem),
|
|
'artist': first_non_empty(tags.get('artist'), tags.get('albumartist')),
|
|
'artists': split_artists(tags.get('artist')),
|
|
'album': tags.get('album'),
|
|
'album_artist': first_non_empty(tags.get('albumartist'), tags.get('artist')),
|
|
'track_number': parse_track_number(tags.get('tracknumber')),
|
|
'disc_number': parse_track_number(tags.get('discnumber')),
|
|
'release_date': release_date,
|
|
'year': extract_year(release_date),
|
|
'duration_seconds': audio_props.get('duration_seconds'),
|
|
'recording_id': first_non_empty(
|
|
tags.get('musicbrainzrecordingid'),
|
|
tags.get('musicbrainztrackid')
|
|
),
|
|
'release_id': first_non_empty(
|
|
tags.get('musicbrainzalbumid'),
|
|
tags.get('musicbrainzreleaseid')
|
|
),
|
|
'release_group_id': tags.get('musicbrainzreleasegroupid'),
|
|
'source_ids': {
|
|
key: value
|
|
for key, value in {
|
|
'musicbrainz_recording_id': first_non_empty(
|
|
tags.get('musicbrainzrecordingid'),
|
|
tags.get('musicbrainztrackid')
|
|
),
|
|
'musicbrainz_release_id': first_non_empty(
|
|
tags.get('musicbrainzalbumid'),
|
|
tags.get('musicbrainzreleaseid')
|
|
),
|
|
'musicbrainz_release_group_id': tags.get('musicbrainzreleasegroupid')
|
|
}.items()
|
|
if value
|
|
}
|
|
}
|
|
|
|
|
|
def identity_keys_for_item(item: dict, *, include_fingerprint: bool) -> list[tuple[str, str]]:
|
|
metadata = item.get('matched_metadata_json') or {}
|
|
keys: list[tuple[str, str]] = []
|
|
|
|
if metadata.get('recording_id'):
|
|
keys.append(('recording_id', str(metadata['recording_id'])))
|
|
|
|
release_id = metadata.get('release_id')
|
|
disc_number = parse_track_number(metadata.get('disc_number'))
|
|
track_number = parse_track_number(metadata.get('track_number'))
|
|
if release_id and disc_number and track_number:
|
|
keys.append(('release_track', f'{release_id}|{disc_number}|{track_number}'))
|
|
|
|
title_key = normalize_identity_text(metadata.get('title'))
|
|
artist_key = normalize_identity_text(
|
|
first_non_empty(metadata.get('artist'), metadata.get('album_artist'))
|
|
)
|
|
duration_seconds = first_non_empty(
|
|
metadata.get('duration_seconds'),
|
|
item.get('fingerprint_duration_seconds'),
|
|
(item.get('audio_props_json') or {}).get('duration_seconds')
|
|
)
|
|
if title_key and artist_key and duration_seconds not in (None, ''):
|
|
title_versions = extract_preserved_version_tokens(metadata.get('title'))
|
|
keys.append((
|
|
'text_duration',
|
|
f'{title_key}|{artist_key}|{duration_bucket(duration_seconds)}|{",".join(sorted(title_versions))}'
|
|
))
|
|
|
|
if include_fingerprint and not keys:
|
|
fingerprint = item.get('acoustic_fingerprint')
|
|
if fingerprint and duration_seconds not in (None, ''):
|
|
keys.append(('fingerprint_duration', f'{fingerprint}|{duration_bucket(duration_seconds)}'))
|
|
|
|
return keys
|
|
|
|
|
|
def choose_primary_identity(item: dict) -> tuple[str | None, str | None]:
|
|
keys = identity_keys_for_item(item, include_fingerprint=False)
|
|
if keys:
|
|
return keys[0]
|
|
fallback_keys = identity_keys_for_item(item, include_fingerprint=True)
|
|
return fallback_keys[0] if fallback_keys else (None, None)
|
|
|
|
|
|
def format_timestamp(timestamp: float) -> str:
|
|
return (
|
|
datetime.fromtimestamp(timestamp, tz=timezone.utc)
|
|
.replace(microsecond=0)
|
|
.isoformat()
|
|
.replace('+00:00', 'Z')
|
|
)
|
|
|
|
|
|
def normalize_identity_text(value: str | None) -> str:
|
|
if not value:
|
|
return ''
|
|
normalized = unicodedata.normalize('NFKC', str(value)).lower()
|
|
normalized = re.sub(r'\b(feat|ft|featuring)\.?\b', ' ', normalized)
|
|
normalized = re.sub(r'\bversion\b', ' ', normalized)
|
|
normalized = re.sub(r'[^a-z0-9]+', ' ', normalized)
|
|
return ' '.join(normalized.split())
|
|
|
|
|
|
def extract_preserved_version_tokens(value: str | None) -> set[str]:
|
|
normalized = normalize_identity_text(value)
|
|
return {token for token in normalized.split() if token in PRESERVED_VERSION_TOKENS}
|
|
|
|
|
|
def normalize_tag_key(value: str) -> str:
|
|
return re.sub(r'[^a-z0-9]+', '', value.lower())
|
|
|
|
|
|
def coerce_tag_value(value) -> str | None:
|
|
if isinstance(value, list):
|
|
if not value:
|
|
return None
|
|
return str(value[0])
|
|
if isinstance(value, bytes):
|
|
return value.decode('utf-8', errors='ignore')
|
|
return str(value) if value not in (None, '') else None
|
|
|
|
|
|
def parse_track_number(value) -> int | None:
|
|
if value in (None, ''):
|
|
return None
|
|
match = re.search(r'\d+', str(value))
|
|
return int(match.group(0)) if match else None
|
|
|
|
|
|
def extract_year(value) -> int | None:
|
|
if value in (None, ''):
|
|
return None
|
|
match = re.search(r'(\d{4})', str(value))
|
|
return int(match.group(1)) if match else None
|
|
|
|
|
|
def duration_bucket(value) -> int | None:
|
|
duration = safe_float(value)
|
|
return int(round(duration)) if duration is not None else None
|
|
|
|
|
|
def safe_float(value) -> float | None:
|
|
if value in (None, ''):
|
|
return None
|
|
try:
|
|
return float(value)
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
|
|
def first_non_empty(*values):
|
|
for value in values:
|
|
if value not in (None, ''):
|
|
return value
|
|
return None
|
|
|
|
|
|
def split_artists(value: str | None) -> list[str]:
|
|
if not value:
|
|
return []
|
|
return [part.strip() for part in re.split(r'[,/&;]+', str(value)) if part.strip()]
|