Add MusicWorkshop application
This commit is contained in:
@@ -0,0 +1,330 @@
|
||||
import hashlib
|
||||
import importlib
|
||||
import re
|
||||
import unicodedata
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
from .scanner import ALLOWED_AUDIO_EXTENSIONS
|
||||
|
||||
|
||||
LOSSLESS_EXTENSIONS = {'.flac', '.wav', '.ape', '.aiff', '.alac'}
|
||||
PRESERVED_VERSION_TOKENS = {
|
||||
'live',
|
||||
'remix',
|
||||
'demo',
|
||||
'karaoke',
|
||||
'instrumental',
|
||||
'cover'
|
||||
}
|
||||
|
||||
|
||||
def build_library_index(
|
||||
output_dir: str,
|
||||
*,
|
||||
probe_audio,
|
||||
read_tags
|
||||
) -> dict:
|
||||
items = scan_library_items(
|
||||
output_dir,
|
||||
probe_audio=probe_audio,
|
||||
read_tags=read_tags
|
||||
)
|
||||
by_basis = {
|
||||
'recording_id': {},
|
||||
'release_track': {},
|
||||
'text_duration': {}
|
||||
}
|
||||
|
||||
for item in items:
|
||||
for basis, identity_key in identity_keys_for_item(item, include_fingerprint=False):
|
||||
by_basis.setdefault(basis, {}).setdefault(identity_key, []).append(item)
|
||||
|
||||
return {
|
||||
'count': len(items),
|
||||
'items': items,
|
||||
'by_basis': by_basis
|
||||
}
|
||||
|
||||
|
||||
def scan_library_items(
|
||||
output_dir: str,
|
||||
*,
|
||||
probe_audio,
|
||||
read_tags
|
||||
) -> list[dict]:
|
||||
output_root = Path(output_dir).expanduser().resolve(strict=False)
|
||||
items: list[dict] = []
|
||||
|
||||
if not output_root.exists() or not output_root.is_dir():
|
||||
return items
|
||||
|
||||
for file_path in sorted(output_root.rglob('*')):
|
||||
if not file_path.is_file() or file_path.suffix.lower() not in ALLOWED_AUDIO_EXTENSIONS:
|
||||
continue
|
||||
|
||||
stat = file_path.stat()
|
||||
modified_at = format_timestamp(stat.st_mtime)
|
||||
relative_path = file_path.relative_to(output_root).as_posix()
|
||||
absolute_path = str(file_path.resolve(strict=False))
|
||||
audio_props = safe_probe_audio(probe_audio, absolute_path)
|
||||
tags = safe_read_tags(read_tags, absolute_path)
|
||||
|
||||
items.append(
|
||||
{
|
||||
'track_id': build_track_id(relative_path, stat.st_size, modified_at),
|
||||
'id': None,
|
||||
'current_file_path': absolute_path,
|
||||
'file_path': absolute_path,
|
||||
'library_file_path': absolute_path,
|
||||
'relative_path': relative_path,
|
||||
'library_relative_path': relative_path,
|
||||
'filename': file_path.name,
|
||||
'extension': file_path.suffix.lower(),
|
||||
'size_bytes': stat.st_size,
|
||||
'modified_at': modified_at,
|
||||
'audio_props_json': audio_props,
|
||||
'match_confidence': None,
|
||||
'match_is_authoritative': True,
|
||||
'matched_metadata_json': build_library_metadata(tags, audio_props, file_path),
|
||||
'acoustic_fingerprint': None,
|
||||
'fingerprint_duration_seconds': audio_props.get('duration_seconds'),
|
||||
'created_at': None
|
||||
}
|
||||
)
|
||||
|
||||
return items
|
||||
|
||||
|
||||
def count_suspected_duplicates(items: list[dict]) -> int:
|
||||
duplicate_count = 0
|
||||
|
||||
for group in group_items_by_identity(items):
|
||||
if len(group) > 1:
|
||||
duplicate_count += len(group) - 1
|
||||
|
||||
return duplicate_count
|
||||
|
||||
|
||||
def group_items_by_identity(items: list[dict]) -> list[list[dict]]:
|
||||
groups: list[list[dict]] = []
|
||||
indexed_groups: dict[tuple[str, str], list[dict]] = {}
|
||||
|
||||
for item in items:
|
||||
identity_keys = identity_keys_for_item(item, include_fingerprint=False)
|
||||
if not identity_keys:
|
||||
continue
|
||||
|
||||
target_group = None
|
||||
for identity_key in identity_keys:
|
||||
target_group = indexed_groups.get(identity_key)
|
||||
if target_group is not None:
|
||||
break
|
||||
|
||||
if target_group is None:
|
||||
target_group = [item]
|
||||
groups.append(target_group)
|
||||
else:
|
||||
target_group.append(item)
|
||||
|
||||
for identity_key in identity_keys:
|
||||
indexed_groups.setdefault(identity_key, target_group)
|
||||
|
||||
return groups
|
||||
|
||||
|
||||
def build_track_id(relative_path: str, size_bytes: int | None, modified_at: str | None) -> str:
|
||||
digest = hashlib.sha1(
|
||||
f'{relative_path}|{size_bytes or 0}|{modified_at or ""}'.encode('utf-8')
|
||||
).hexdigest()
|
||||
return digest
|
||||
|
||||
|
||||
def safe_probe_audio(probe_audio, file_path: str) -> dict:
|
||||
try:
|
||||
return probe_audio(file_path) or {}
|
||||
except Exception:
|
||||
return {}
|
||||
|
||||
|
||||
def default_read_library_tags(file_path: str) -> dict:
|
||||
mutagen = importlib.import_module('mutagen')
|
||||
tags_file = mutagen.File(file_path, easy=False)
|
||||
if tags_file is None or not getattr(tags_file, 'tags', None):
|
||||
return {}
|
||||
|
||||
normalized = {}
|
||||
for key, value in tags_file.tags.items():
|
||||
normalized[normalize_tag_key(key)] = coerce_tag_value(value)
|
||||
return normalized
|
||||
|
||||
|
||||
def safe_read_tags(read_tags, file_path: str) -> dict:
|
||||
try:
|
||||
return read_tags(file_path) or {}
|
||||
except Exception:
|
||||
return {}
|
||||
|
||||
|
||||
def build_library_metadata(tags: dict, audio_props: dict, file_path: Path) -> dict:
|
||||
release_date = first_non_empty(tags.get('date'), tags.get('year'))
|
||||
return {
|
||||
'title': first_non_empty(tags.get('title'), file_path.stem),
|
||||
'artist': first_non_empty(tags.get('artist'), tags.get('albumartist')),
|
||||
'artists': split_artists(tags.get('artist')),
|
||||
'album': tags.get('album'),
|
||||
'album_artist': first_non_empty(tags.get('albumartist'), tags.get('artist')),
|
||||
'track_number': parse_track_number(tags.get('tracknumber')),
|
||||
'disc_number': parse_track_number(tags.get('discnumber')),
|
||||
'release_date': release_date,
|
||||
'year': extract_year(release_date),
|
||||
'duration_seconds': audio_props.get('duration_seconds'),
|
||||
'recording_id': first_non_empty(
|
||||
tags.get('musicbrainzrecordingid'),
|
||||
tags.get('musicbrainztrackid')
|
||||
),
|
||||
'release_id': first_non_empty(
|
||||
tags.get('musicbrainzalbumid'),
|
||||
tags.get('musicbrainzreleaseid')
|
||||
),
|
||||
'release_group_id': tags.get('musicbrainzreleasegroupid'),
|
||||
'source_ids': {
|
||||
key: value
|
||||
for key, value in {
|
||||
'musicbrainz_recording_id': first_non_empty(
|
||||
tags.get('musicbrainzrecordingid'),
|
||||
tags.get('musicbrainztrackid')
|
||||
),
|
||||
'musicbrainz_release_id': first_non_empty(
|
||||
tags.get('musicbrainzalbumid'),
|
||||
tags.get('musicbrainzreleaseid')
|
||||
),
|
||||
'musicbrainz_release_group_id': tags.get('musicbrainzreleasegroupid')
|
||||
}.items()
|
||||
if value
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def identity_keys_for_item(item: dict, *, include_fingerprint: bool) -> list[tuple[str, str]]:
|
||||
metadata = item.get('matched_metadata_json') or {}
|
||||
keys: list[tuple[str, str]] = []
|
||||
|
||||
if metadata.get('recording_id'):
|
||||
keys.append(('recording_id', str(metadata['recording_id'])))
|
||||
|
||||
release_id = metadata.get('release_id')
|
||||
disc_number = parse_track_number(metadata.get('disc_number'))
|
||||
track_number = parse_track_number(metadata.get('track_number'))
|
||||
if release_id and disc_number and track_number:
|
||||
keys.append(('release_track', f'{release_id}|{disc_number}|{track_number}'))
|
||||
|
||||
title_key = normalize_identity_text(metadata.get('title'))
|
||||
artist_key = normalize_identity_text(
|
||||
first_non_empty(metadata.get('artist'), metadata.get('album_artist'))
|
||||
)
|
||||
duration_seconds = first_non_empty(
|
||||
metadata.get('duration_seconds'),
|
||||
item.get('fingerprint_duration_seconds'),
|
||||
(item.get('audio_props_json') or {}).get('duration_seconds')
|
||||
)
|
||||
if title_key and artist_key and duration_seconds not in (None, ''):
|
||||
title_versions = extract_preserved_version_tokens(metadata.get('title'))
|
||||
keys.append((
|
||||
'text_duration',
|
||||
f'{title_key}|{artist_key}|{duration_bucket(duration_seconds)}|{",".join(sorted(title_versions))}'
|
||||
))
|
||||
|
||||
if include_fingerprint and not keys:
|
||||
fingerprint = item.get('acoustic_fingerprint')
|
||||
if fingerprint and duration_seconds not in (None, ''):
|
||||
keys.append(('fingerprint_duration', f'{fingerprint}|{duration_bucket(duration_seconds)}'))
|
||||
|
||||
return keys
|
||||
|
||||
|
||||
def choose_primary_identity(item: dict) -> tuple[str | None, str | None]:
|
||||
keys = identity_keys_for_item(item, include_fingerprint=False)
|
||||
if keys:
|
||||
return keys[0]
|
||||
fallback_keys = identity_keys_for_item(item, include_fingerprint=True)
|
||||
return fallback_keys[0] if fallback_keys else (None, None)
|
||||
|
||||
|
||||
def format_timestamp(timestamp: float) -> str:
|
||||
return (
|
||||
datetime.fromtimestamp(timestamp, tz=timezone.utc)
|
||||
.replace(microsecond=0)
|
||||
.isoformat()
|
||||
.replace('+00:00', 'Z')
|
||||
)
|
||||
|
||||
|
||||
def normalize_identity_text(value: str | None) -> str:
|
||||
if not value:
|
||||
return ''
|
||||
normalized = unicodedata.normalize('NFKC', str(value)).lower()
|
||||
normalized = re.sub(r'\b(feat|ft|featuring)\.?\b', ' ', normalized)
|
||||
normalized = re.sub(r'\bversion\b', ' ', normalized)
|
||||
normalized = re.sub(r'[^a-z0-9]+', ' ', normalized)
|
||||
return ' '.join(normalized.split())
|
||||
|
||||
|
||||
def extract_preserved_version_tokens(value: str | None) -> set[str]:
|
||||
normalized = normalize_identity_text(value)
|
||||
return {token for token in normalized.split() if token in PRESERVED_VERSION_TOKENS}
|
||||
|
||||
|
||||
def normalize_tag_key(value: str) -> str:
|
||||
return re.sub(r'[^a-z0-9]+', '', value.lower())
|
||||
|
||||
|
||||
def coerce_tag_value(value) -> str | None:
|
||||
if isinstance(value, list):
|
||||
if not value:
|
||||
return None
|
||||
return str(value[0])
|
||||
if isinstance(value, bytes):
|
||||
return value.decode('utf-8', errors='ignore')
|
||||
return str(value) if value not in (None, '') else None
|
||||
|
||||
|
||||
def parse_track_number(value) -> int | None:
|
||||
if value in (None, ''):
|
||||
return None
|
||||
match = re.search(r'\d+', str(value))
|
||||
return int(match.group(0)) if match else None
|
||||
|
||||
|
||||
def extract_year(value) -> int | None:
|
||||
if value in (None, ''):
|
||||
return None
|
||||
match = re.search(r'(\d{4})', str(value))
|
||||
return int(match.group(1)) if match else None
|
||||
|
||||
|
||||
def duration_bucket(value) -> int | None:
|
||||
duration = safe_float(value)
|
||||
return int(round(duration)) if duration is not None else None
|
||||
|
||||
|
||||
def safe_float(value) -> float | None:
|
||||
if value in (None, ''):
|
||||
return None
|
||||
try:
|
||||
return float(value)
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
|
||||
|
||||
def first_non_empty(*values):
|
||||
for value in values:
|
||||
if value not in (None, ''):
|
||||
return value
|
||||
return None
|
||||
|
||||
|
||||
def split_artists(value: str | None) -> list[str]:
|
||||
if not value:
|
||||
return []
|
||||
return [part.strip() for part in re.split(r'[,/&;]+', str(value)) if part.strip()]
|
||||
Reference in New Issue
Block a user