import base64 import json import re import time from pathlib import Path from urllib import error, parse, request MUSICBRAINZ_USER_AGENT = 'MusicWorkshop/0.1 (+https://example.invalid/musicworkshop)' MUSICBRAINZ_THROTTLE_SECONDS = 1.0 HTTP_TIMEOUT_SECONDS = 5 HTTP_RETRY_COUNT = 1 MAX_MATCH_CANDIDATES = 5 VERSION_TOKENS = { 'live', 'remix', 'demo', 'karaoke', 'instrumental', 'cover', 'acoustic', 'edit', 'version' } SPOTIFY_TOKEN_URL = 'https://accounts.spotify.com/api/token' class MatchProviderError(Exception): def __init__(self, provider: str, message: str): super().__init__(message) self.provider = provider class MatchHttpClient: def __init__(self): self._last_request_at: dict[str, float] = {} def request_json( self, provider: str, url: str, *, params: dict | None = None, headers: dict[str, str] | None = None, method: str = 'GET', data: bytes | None = None, timeout: int = HTTP_TIMEOUT_SECONDS, retries: int = HTTP_RETRY_COUNT, throttle_key: str | None = None, throttle_seconds: float = 0.0 ) -> dict: request_headers = dict(headers or {}) final_url = _append_query_params(url, params) if throttle_key and throttle_seconds > 0: self._throttle(throttle_key, throttle_seconds) for attempt in range(retries + 1): try: response = self._open( request.Request( final_url, data=data, headers=request_headers, method=method ), timeout ) with response: charset = response.headers.get_content_charset() or 'utf-8' payload = response.read().decode(charset) return json.loads(payload) if payload else {} except error.HTTPError as exc: response_body = exc.read().decode('utf-8', errors='ignore') if exc.code >= 500 and attempt < retries: time.sleep(0.2 * (attempt + 1)) continue raise MatchProviderError( provider, f'{provider} 请求失败 (HTTP {exc.code}) {response_body[:160]}'.strip() ) from exc except error.URLError as exc: if attempt < retries: time.sleep(0.2 * (attempt + 1)) continue raise MatchProviderError( provider, f'{provider} 无法连接: {getattr(exc, "reason", exc)}' ) from exc except TimeoutError as exc: if attempt < retries: time.sleep(0.2 * (attempt + 1)) continue raise MatchProviderError(provider, f'{provider} 请求超时') from exc except json.JSONDecodeError as exc: raise MatchProviderError(provider, f'{provider} 返回了无效 JSON') from exc raise MatchProviderError(provider, f'{provider} 请求失败') def _open(self, req: request.Request, timeout: int): return request.urlopen(req, timeout=timeout) def _throttle(self, throttle_key: str, throttle_seconds: float): now = time.monotonic() previous = self._last_request_at.get(throttle_key) if previous is not None: sleep_seconds = throttle_seconds - (now - previous) if sleep_seconds > 0: time.sleep(sleep_seconds) self._last_request_at[throttle_key] = time.monotonic() class MusicBrainzProvider: def __init__(self, http_client: MatchHttpClient): self.http_client = http_client self._recording_cache: dict[tuple[str, str], dict] = {} self._release_cache: dict[tuple[str, str], dict] = {} def search_text( self, item_metadata: dict, config: dict, *, provider_name: str = 'musicbrainz' ) -> list[dict]: base_url = _normalize_base_url(config.get('musicbrainz')) query = _build_text_query(item_metadata) if not base_url or not query: return [] response = self._request_json( provider_name, f'{base_url}/recording', params={ 'query': query, 'limit': MAX_MATCH_CANDIDATES, 'fmt': 'json' } ) candidates: list[dict] = [] for entry in response.get('recordings') or []: recording_id = entry.get('id') if not recording_id: continue score = _to_float(entry.get('score') or entry.get('ext:score')) / 100.0 release_ids = [ release.get('id') for release in entry.get('releases') or [] if release.get('id') ] candidates.extend( self.lookup_recording_seed( provider_name, config, recording_id=recording_id, release_ids=release_ids, search_confidence=score ) ) return _dedupe_candidates(candidates) def lookup_recording_seed( self, provider_name: str, config: dict, *, recording_id: str, release_ids: list[str] | None = None, search_confidence: float | None = None, fingerprint_confidence: float | None = None, extra_source_ids: dict | None = None ) -> list[dict]: recording = self._get_recording(config, recording_id) candidate_release_ids = _unique_non_empty( [*(release_ids or []), *[ release.get('id') for release in recording.get('releases') or [] if release.get('id') ]] ) candidates: list[dict] = [] for release_id in candidate_release_ids[:2]: release = self._get_release(config, release_id) candidate = self._build_release_candidate( provider_name, recording, release, search_confidence=search_confidence, fingerprint_confidence=fingerprint_confidence, extra_source_ids=extra_source_ids ) if candidate: candidates.append(candidate) if candidates: return _dedupe_candidates(candidates) fallback = self._build_recording_candidate( provider_name, recording, search_confidence=search_confidence, fingerprint_confidence=fingerprint_confidence, extra_source_ids=extra_source_ids ) return [fallback] if fallback else [] def align_candidate(self, candidate: dict, config: dict) -> dict | None: item_metadata = { 'title': candidate.get('title'), 'artist': candidate.get('artist'), 'album': candidate.get('album'), 'duration_seconds': candidate.get('duration_seconds'), 'track_number': candidate.get('track_number'), 'disc_number': candidate.get('disc_number') } aligned_candidates = self.search_text(item_metadata, config, provider_name='musicbrainz') return aligned_candidates[0] if aligned_candidates else None def _get_recording(self, config: dict, recording_id: str) -> dict: base_url = _normalize_base_url(config.get('musicbrainz')) cache_key = (base_url, recording_id) if cache_key not in self._recording_cache: self._recording_cache[cache_key] = self._request_json( 'musicbrainz', f'{base_url}/recording/{recording_id}', params={ 'inc': 'artists+releases', 'fmt': 'json' } ) return self._recording_cache[cache_key] def _get_release(self, config: dict, release_id: str) -> dict: base_url = _normalize_base_url(config.get('musicbrainz')) cache_key = (base_url, release_id) if cache_key not in self._release_cache: self._release_cache[cache_key] = self._request_json( 'musicbrainz', f'{base_url}/release/{release_id}', params={ 'inc': 'artists+recordings+release-groups', 'fmt': 'json' } ) return self._release_cache[cache_key] def _request_json(self, provider: str, url: str, *, params: dict | None = None) -> dict: return self.http_client.request_json( provider, url, params=params, headers={'User-Agent': MUSICBRAINZ_USER_AGENT}, throttle_key='musicbrainz', throttle_seconds=MUSICBRAINZ_THROTTLE_SECONDS ) def _build_recording_candidate( self, provider_name: str, recording: dict, *, search_confidence: float | None, fingerprint_confidence: float | None, extra_source_ids: dict | None ) -> dict | None: recording_id = recording.get('id') if not recording_id: return None title = recording.get('title') artists = _extract_artist_names(recording.get('artist-credit') or []) duration_seconds = _milliseconds_to_seconds(recording.get('length')) release_date = None return { 'provider': provider_name, 'is_authoritative': True, 'title': title, 'artist': _join_artists(artists), 'artists': artists, 'album': None, 'album_artist': _join_artists(artists), 'track_number': None, 'disc_number': None, 'release_date': release_date, 'year': _extract_year(release_date), 'duration_seconds': duration_seconds, 'recording_id': recording_id, 'release_id': None, 'release_group_id': None, 'source_ids': { **(extra_source_ids or {}), 'musicbrainz_recording_id': recording_id }, 'fingerprint_confidence': fingerprint_confidence, 'search_confidence': search_confidence, 'release_tracklist': [] } def _build_release_candidate( self, provider_name: str, recording: dict, release: dict, *, search_confidence: float | None, fingerprint_confidence: float | None, extra_source_ids: dict | None ) -> dict | None: recording_id = recording.get('id') release_id = release.get('id') if not recording_id or not release_id: return None track_info = _find_release_track(release, recording_id) track_title = track_info.get('title') or track_info.get('recording', {}).get('title') track_artist_credit = ( track_info.get('recording', {}).get('artist-credit') or recording.get('artist-credit') or release.get('artist-credit') or [] ) artists = _extract_artist_names(track_artist_credit) album_artists = _extract_artist_names(release.get('artist-credit') or []) release_date = release.get('date') duration_seconds = _milliseconds_to_seconds( track_info.get('length') or recording.get('length') ) return { 'provider': provider_name, 'is_authoritative': True, 'title': track_title or recording.get('title'), 'artist': _join_artists(artists or album_artists), 'artists': artists or album_artists, 'album': release.get('title'), 'album_artist': _join_artists(album_artists or artists), 'track_number': _parse_track_number(track_info.get('position') or track_info.get('number')), 'disc_number': _parse_track_number(track_info.get('disc_number')), 'release_date': release_date, 'year': _extract_year(release_date), 'duration_seconds': duration_seconds, 'recording_id': recording_id, 'release_id': release_id, 'release_group_id': (release.get('release-group') or {}).get('id'), 'source_ids': { **(extra_source_ids or {}), 'musicbrainz_recording_id': recording_id, 'musicbrainz_release_id': release_id, 'musicbrainz_release_group_id': (release.get('release-group') or {}).get('id') }, 'fingerprint_confidence': fingerprint_confidence, 'search_confidence': search_confidence, 'release_tracklist': _build_release_tracklist(release) } class AcoustIdProvider: def __init__(self, http_client: MatchHttpClient, musicbrainz_provider: MusicBrainzProvider): self.http_client = http_client self.musicbrainz_provider = musicbrainz_provider def search(self, item: dict, config: dict) -> list[dict]: metadata_config = config.get('metadata') or {} fingerprint = item.get('acoustic_fingerprint') duration_seconds = item.get('fingerprint_duration_seconds') client_key = (metadata_config.get('acoustidClientKey') or '').strip() base_url = _normalize_base_url(metadata_config.get('acoustidUrl')) if not fingerprint or not duration_seconds or not client_key or not base_url: return [] response = self.http_client.request_json( 'acoustid', f'{base_url}/lookup', params={ 'client': client_key, 'duration': int(round(duration_seconds)), 'fingerprint': fingerprint, 'meta': 'recordings releasegroups' } ) candidates: list[dict] = [] for result in response.get('results') or []: result_id = result.get('id') fingerprint_confidence = _to_float(result.get('score')) for recording in result.get('recordings') or []: recording_id = recording.get('id') if not recording_id: continue release_ids = [ release.get('id') for release in recording.get('releases') or [] if release.get('id') ] candidates.extend( self.musicbrainz_provider.lookup_recording_seed( 'acoustid', metadata_config, recording_id=recording_id, release_ids=release_ids, fingerprint_confidence=fingerprint_confidence, extra_source_ids={'acoustid_id': result_id} ) ) return _dedupe_candidates(candidates) class TextSearchProvider: provider_name = '' credentials: tuple[str, ...] = () def __init__(self, http_client: MatchHttpClient): self.http_client = http_client def search(self, item_metadata: dict, config: dict) -> list[dict]: query = _build_text_query(item_metadata) metadata_config = config.get('metadata') or {} base_url = _normalize_base_url(metadata_config.get(self.base_url_key)) if not base_url or not query: return [] if self.credentials and not all( (metadata_config.get(field_name) or '').strip() for field_name in self.credentials ): return [] return self._parse_search_payload( self.http_client.request_json( self.provider_name, f'{base_url}{self.search_path}', params=self.build_params(query) ) ) def build_params(self, query: str) -> dict: return {'keywords': query, 'limit': MAX_MATCH_CANDIDATES} def _parse_search_payload(self, payload: dict) -> list[dict]: raise NotImplementedError class NeteaseProvider(TextSearchProvider): provider_name = 'netease' base_url_key = 'netease' search_path = '/search' def build_params(self, query: str) -> dict: return { 'keywords': query, 'type': 1, 'limit': MAX_MATCH_CANDIDATES } def _parse_search_payload(self, payload: dict) -> list[dict]: songs = ((payload.get('result') or {}).get('songs')) or [] candidates = [] for song in songs[:MAX_MATCH_CANDIDATES]: artists = [artist.get('name') for artist in song.get('ar') or [] if artist.get('name')] album = song.get('al') or {} release_date = _format_timestamp_date(album.get('publishTime')) candidates.append( { 'provider': 'netease', 'is_authoritative': False, 'title': song.get('name'), 'artist': _join_artists(artists), 'artists': artists, 'album': album.get('name'), 'album_artist': _join_artists(artists), 'track_number': None, 'disc_number': None, 'release_date': release_date, 'year': _extract_year(release_date), 'duration_seconds': _milliseconds_to_seconds(song.get('dt')), 'recording_id': None, 'release_id': None, 'release_group_id': None, 'source_ids': { 'netease_song_id': song.get('id'), 'netease_album_id': album.get('id') }, 'cover_url': album.get('picUrl'), 'search_confidence': 0.88 } ) return candidates class QQProvider(TextSearchProvider): provider_name = 'qq' base_url_key = 'qq' search_path = '/search' def _parse_search_payload(self, payload: dict) -> list[dict]: song_list = ( ((payload.get('data') or {}).get('song') or {}).get('list') or ((payload.get('result') or {}).get('list') or []) ) candidates = [] for song in song_list[:MAX_MATCH_CANDIDATES]: singers = song.get('singer') or song.get('singers') or [] artists = [artist.get('name') for artist in singers if artist.get('name')] album = song.get('album') or {} release_date = song.get('time_public') or album.get('time_public') candidates.append( { 'provider': 'qq', 'is_authoritative': False, 'title': song.get('title') or song.get('name'), 'artist': _join_artists(artists), 'artists': artists, 'album': album.get('title') or album.get('name'), 'album_artist': _join_artists(artists), 'track_number': None, 'disc_number': None, 'release_date': release_date, 'year': _extract_year(release_date), 'duration_seconds': _milliseconds_to_seconds(song.get('interval', 0) * 1000), 'recording_id': None, 'release_id': None, 'release_group_id': None, 'source_ids': { 'qq_song_mid': song.get('mid') or song.get('songmid'), 'qq_album_mid': album.get('mid') }, 'search_confidence': 0.88 } ) return candidates class SpotifyProvider(TextSearchProvider): provider_name = 'spotify' base_url_key = 'spotifyUrl' search_path = '/search' credentials = ('spotifyClientId', 'spotifySecret') def __init__(self, http_client: MatchHttpClient): super().__init__(http_client) self._token_cache: dict[str, dict] = {} def search(self, item_metadata: dict, config: dict) -> list[dict]: query = _build_text_query(item_metadata) metadata_config = config.get('metadata') or {} base_url = _normalize_base_url(metadata_config.get(self.base_url_key)) client_id = (metadata_config.get('spotifyClientId') or '').strip() client_secret = (metadata_config.get('spotifySecret') or '').strip() if not base_url or not query or not client_id or not client_secret: return [] token = self._get_access_token(client_id, client_secret) payload = self.http_client.request_json( 'spotify', f'{base_url}{self.search_path}', params={ 'q': query, 'type': 'track', 'limit': MAX_MATCH_CANDIDATES }, headers={'Authorization': f'Bearer {token}'} ) candidates = [] tracks = ((payload.get('tracks') or {}).get('items')) or [] for track in tracks[:MAX_MATCH_CANDIDATES]: artists = [artist.get('name') for artist in track.get('artists') or [] if artist.get('name')] album = track.get('album') or {} images = album.get('images') or [] candidates.append( { 'provider': 'spotify', 'is_authoritative': False, 'title': track.get('name'), 'artist': _join_artists(artists), 'artists': artists, 'album': album.get('name'), 'album_artist': _join_artists(artists), 'track_number': track.get('track_number'), 'disc_number': track.get('disc_number'), 'release_date': album.get('release_date'), 'year': _extract_year(album.get('release_date')), 'duration_seconds': _milliseconds_to_seconds(track.get('duration_ms')), 'recording_id': None, 'release_id': None, 'release_group_id': None, 'source_ids': { 'spotify_track_id': track.get('id'), 'spotify_album_id': album.get('id') }, 'cover_url': images[0].get('url') if images else None, 'search_confidence': 0.9 } ) return candidates def _get_access_token(self, client_id: str, client_secret: str) -> str: cache_key = f'{client_id}:{client_secret}' cached_token = self._token_cache.get(cache_key) if cached_token and cached_token['expires_at'] > time.time(): return cached_token['access_token'] basic_token = base64.b64encode(f'{client_id}:{client_secret}'.encode()).decode() payload = self.http_client.request_json( 'spotify', SPOTIFY_TOKEN_URL, method='POST', data=b'grant_type=client_credentials', headers={ 'Authorization': f'Basic {basic_token}', 'Content-Type': 'application/x-www-form-urlencoded' } ) access_token = payload.get('access_token') expires_in = int(payload.get('expires_in') or 3600) self._token_cache[cache_key] = { 'access_token': access_token, 'expires_at': time.time() + max(60, expires_in - 30) } return access_token class DiscogsProvider: def __init__(self, http_client: MatchHttpClient): self.http_client = http_client def enrich(self, metadata: dict, config: dict) -> dict | None: metadata_config = config.get('metadata') or {} base_url = _normalize_base_url(metadata_config.get('discogsUrl')) token = (metadata_config.get('discogsToken') or '').strip() if not base_url or not token: return None payload = self.http_client.request_json( 'discogs', f'{base_url}/database/search', params={ 'track': metadata.get('title') or '', 'artist': metadata.get('artist') or '', 'release_title': metadata.get('album') or '', 'per_page': 3, 'token': token } ) result = (payload.get('results') or [None])[0] if not result: return None return { 'provider': 'discogs', 'cover_url': result.get('cover_image'), 'genres': result.get('genre') or [], 'tags': result.get('style') or [], 'quality': 0.9 if result.get('cover_image') else 0.5, 'source_id': result.get('id') } class LastFmProvider: def __init__(self, http_client: MatchHttpClient): self.http_client = http_client def enrich(self, metadata: dict, config: dict) -> dict | None: metadata_config = config.get('metadata') or {} base_url = _normalize_base_url(metadata_config.get('lastfmUrl')) api_key = (metadata_config.get('lastfmKey') or '').strip() if not base_url or not api_key: return None payload = self.http_client.request_json( 'lastfm', base_url, params={ 'method': 'track.getInfo', 'api_key': api_key, 'artist': metadata.get('artist') or '', 'track': metadata.get('title') or '', 'autocorrect': 1, 'format': 'json' } ) track = payload.get('track') or {} album = track.get('album') or {} images = album.get('image') or [] top_tags = ((track.get('toptags') or {}).get('tag')) or [] return { 'provider': 'lastfm', 'cover_url': next( (image.get('#text') for image in reversed(images) if image.get('#text')), None ), 'genres': [tag.get('name') for tag in top_tags if tag.get('name')], 'tags': [tag.get('name') for tag in top_tags if tag.get('name')], 'quality': 0.75 if images else 0.6, 'source_id': track.get('mbid') or metadata.get('recording_id') } class GeniusProvider: def __init__(self, http_client: MatchHttpClient): self.http_client = http_client def enrich(self, metadata: dict, config: dict) -> dict | None: metadata_config = config.get('metadata') or {} base_url = _normalize_base_url(metadata_config.get('geniusUrl')) token = (metadata_config.get('geniusToken') or '').strip() query = _build_text_query(metadata) if not base_url or not token or not query: return None payload = self.http_client.request_json( 'genius', f'{base_url}/search', params={'q': query}, headers={'Authorization': f'Bearer {token}'} ) hit = (((payload.get('response') or {}).get('hits')) or [None])[0] if not hit: return None result = hit.get('result') or {} return { 'provider': 'genius', 'lyrics_url': result.get('url'), 'quality': 0.8 if result.get('url') else 0.0, 'source_id': result.get('id') } class Matcher: def __init__( self, *, http_client: MatchHttpClient | None = None, musicbrainz_provider: MusicBrainzProvider | None = None, acoustid_provider: AcoustIdProvider | None = None, netease_provider: NeteaseProvider | None = None, qq_provider: QQProvider | None = None, spotify_provider: SpotifyProvider | None = None, discogs_provider: DiscogsProvider | None = None, lastfm_provider: LastFmProvider | None = None, genius_provider: GeniusProvider | None = None ): self.http_client = http_client or MatchHttpClient() self.musicbrainz_provider = musicbrainz_provider or MusicBrainzProvider(self.http_client) self.acoustid_provider = acoustid_provider or AcoustIdProvider( self.http_client, self.musicbrainz_provider ) self.netease_provider = netease_provider or NeteaseProvider(self.http_client) self.qq_provider = qq_provider or QQProvider(self.http_client) self.spotify_provider = spotify_provider or SpotifyProvider(self.http_client) self.discogs_provider = discogs_provider or DiscogsProvider(self.http_client) self.lastfm_provider = lastfm_provider or LastFmProvider(self.http_client) self.genius_provider = genius_provider or GeniusProvider(self.http_client) def match_item(self, item: dict, album_group: list[dict], config: dict) -> dict: item_metadata = _build_input_metadata(item) provider_warnings: list[dict] = [] candidates: list[dict] = [] provider_scope = set(config.get('repair_provider_scope') or []) use_all_providers = not provider_scope def provider_enabled(name: str) -> bool: return use_all_providers or name in provider_scope if provider_enabled('acoustid'): candidates.extend( self._collect_provider_candidates( 'acoustid', self.acoustid_provider.search, provider_warnings, item, config ) ) if provider_enabled('musicbrainz'): candidates.extend( self._collect_provider_candidates( 'musicbrainz', self.musicbrainz_provider.search_text, provider_warnings, item_metadata, config.get('metadata') or {} ) ) candidates = self._score_candidates(item_metadata, album_group, candidates) top_authoritative = candidates[0] if candidates else None fallback_enabled = bool((config.get('advancedStrategy') or {}).get('metadataFallback', True)) if fallback_enabled and (top_authoritative is None or top_authoritative['score'] < 85): fallback_candidates = [] if provider_enabled('netease'): fallback_candidates.extend( self._collect_provider_candidates( 'netease', self.netease_provider.search, provider_warnings, item_metadata, config ) ) if provider_enabled('qq'): fallback_candidates.extend( self._collect_provider_candidates( 'qq', self.qq_provider.search, provider_warnings, item_metadata, config ) ) if provider_enabled('spotify'): fallback_candidates.extend( self._collect_provider_candidates( 'spotify', self.spotify_provider.search, provider_warnings, item_metadata, config ) ) candidates = self._score_candidates( item_metadata, album_group, [*candidates, *fallback_candidates] ) if not candidates: return { 'status': 'not_found', 'reason': 'no_candidates', 'message': '未找到任何匹配候选', 'source': None, 'confidence': None, 'is_authoritative': False, 'matched_metadata_json': None, 'match_candidates_json': [], 'match_enrichment_json': None, 'provider_warnings': provider_warnings } top_candidate = candidates[0] if not top_candidate['is_authoritative'] and provider_enabled('musicbrainz'): aligned_candidate = self._align_candidate_with_warnings( top_candidate, config.get('metadata') or {}, provider_warnings ) if aligned_candidate: top_candidate = self._score_candidates( item_metadata, album_group, [self._merge_aligned_candidate(top_candidate, aligned_candidate)] )[0] candidates = self._score_candidates( item_metadata, album_group, [top_candidate, *candidates[1:]] ) runner_up = candidates[1] if len(candidates) > 1 else None score_gap = top_candidate['score'] - (runner_up['score'] if runner_up else 0) candidates_json = [_serialize_candidate(candidate) for candidate in candidates[:MAX_MATCH_CANDIDATES]] enrichment = self._build_enrichment(top_candidate, config) if top_candidate['is_authoritative']: if top_candidate['score'] >= 85 and score_gap >= 8: return self._build_match_result( 'matched', 'authoritative_auto_match', f'权威候选自动匹配成功,得分 {top_candidate["score"]:.1f}', top_candidate, candidates_json, enrichment, provider_warnings ) else: if top_candidate['score'] >= 80 and score_gap >= 8: return self._build_match_result( 'matched_fallback', 'fallback_auto_match', f'Fallback 候选自动匹配成功,得分 {top_candidate["score"]:.1f}', top_candidate, candidates_json, enrichment, provider_warnings ) reason = 'score_gap_too_small' if score_gap < 8 else 'score_below_threshold' message = ( f'候选最高分 {top_candidate["score"]:.1f},与次高分差 {score_gap:.1f},需人工复核' ) return self._build_match_result( 'low_score', reason, message, top_candidate, candidates_json, enrichment, provider_warnings ) def _collect_provider_candidates( self, provider_name: str, search_provider, provider_warnings: list[dict], *args, **kwargs ) -> list[dict]: try: return search_provider(*args, **kwargs) except MatchProviderError as error: self._append_provider_warning(provider_name, error, provider_warnings) return [] def _align_candidate_with_warnings( self, candidate: dict, metadata_config: dict, provider_warnings: list[dict] ) -> dict | None: try: return self.musicbrainz_provider.align_candidate(candidate, metadata_config) except MatchProviderError as error: self._append_provider_warning('musicbrainz', error, provider_warnings) return None def _append_provider_warning( self, provider_name: str, error: MatchProviderError, provider_warnings: list[dict] ): provider_warnings.append( { 'provider': getattr(error, 'provider', None) or provider_name, 'message': str(error) } ) def _score_candidates( self, item_metadata: dict, album_group: list[dict], candidates: list[dict] ) -> list[dict]: scored_candidates = [] for candidate in _dedupe_candidates(candidates): identity_confidence = min( 1.0, max( 0.0, candidate.get('fingerprint_confidence') or candidate.get('search_confidence') or 0.0 ) ) score_breakdown = { 'fingerprint': round(30 * identity_confidence, 2), 'title': round(20 * _text_similarity(item_metadata.get('title'), candidate.get('title')), 2), 'artist': round(15 * _artist_similarity(item_metadata, candidate), 2), 'album': round(10 * _text_similarity(item_metadata.get('album'), candidate.get('album')), 2), 'duration': round(10 * _duration_similarity( item_metadata.get('duration_seconds'), candidate.get('duration_seconds') ), 2), 'track_disc': round(5 * _track_disc_similarity(item_metadata, candidate), 2), 'album_context': round(10 * _album_context_similarity(album_group, candidate), 2), 'version_penalty': round(_version_penalty(item_metadata, candidate), 2) } total_score = round( max( 0.0, min( 100.0, sum( value for key, value in score_breakdown.items() if key != 'version_penalty' ) - score_breakdown['version_penalty'] ) ), 2 ) scored_candidates.append( { **candidate, 'score': total_score, 'score_breakdown': score_breakdown } ) return sorted( scored_candidates, key=lambda candidate: ( candidate['score'], 1 if candidate.get('is_authoritative') else 0, _provider_rank(candidate.get('provider')) ), reverse=True ) def _merge_aligned_candidate(self, fallback_candidate: dict, aligned_candidate: dict) -> dict: merged_source_ids = { **(fallback_candidate.get('source_ids') or {}), **(aligned_candidate.get('source_ids') or {}) } return { **fallback_candidate, **aligned_candidate, 'provider': 'musicbrainz', 'is_authoritative': True, 'source_ids': merged_source_ids, 'cover_url': fallback_candidate.get('cover_url') or aligned_candidate.get('cover_url'), 'lyrics_url': fallback_candidate.get('lyrics_url') or aligned_candidate.get('lyrics_url') } def _build_enrichment(self, candidate: dict, config: dict) -> dict: if not candidate: return { 'cover': {'selected_source': None, 'candidates': []}, 'lyrics': {'selected_source': None, 'candidates': []}, 'genres': {'selected_source': None, 'candidates': []}, 'tags': {'selected_source': None, 'candidates': []} } enrichment_candidates = { 'cover': [], 'lyrics': [], 'genres': [], 'tags': [] } if candidate.get('cover_url'): enrichment_candidates['cover'].append( { 'provider': candidate['provider'], 'value': candidate['cover_url'], 'quality': 0.6, 'source_id': (candidate.get('source_ids') or {}).get(f'{candidate["provider"]}_album_id') } ) if candidate.get('lyrics_url'): enrichment_candidates['lyrics'].append( { 'provider': candidate['provider'], 'value': candidate['lyrics_url'], 'quality': 0.6, 'source_id': (candidate.get('source_ids') or {}).get(f'{candidate["provider"]}_song_id') } ) if (config.get('advancedStrategy') or {}).get('downloadAssets', True): for provider in ( self.discogs_provider.enrich(candidate, config), self.lastfm_provider.enrich(candidate, config), self.genius_provider.enrich(candidate, config) ): if not provider: continue if provider.get('cover_url'): enrichment_candidates['cover'].append( { 'provider': provider['provider'], 'value': provider['cover_url'], 'quality': provider.get('quality', 0.5), 'source_id': provider.get('source_id') } ) if provider.get('lyrics_url'): enrichment_candidates['lyrics'].append( { 'provider': provider['provider'], 'value': provider['lyrics_url'], 'quality': provider.get('quality', 0.5), 'source_id': provider.get('source_id') } ) if provider.get('genres'): enrichment_candidates['genres'].append( { 'provider': provider['provider'], 'value': provider['genres'], 'quality': provider.get('quality', 0.5), 'source_id': provider.get('source_id') } ) if provider.get('tags'): enrichment_candidates['tags'].append( { 'provider': provider['provider'], 'value': provider['tags'], 'quality': provider.get('quality', 0.5), 'source_id': provider.get('source_id') } ) return { key: { 'selected_source': _pick_best_candidate(value_candidates), 'candidates': value_candidates } for key, value_candidates in enrichment_candidates.items() } def _build_match_result( self, status: str, reason: str, message: str, candidate: dict, candidates_json: list[dict], enrichment: dict, provider_warnings: list[dict] ) -> dict: return { 'status': status, 'reason': reason, 'message': message, 'source': candidate.get('provider'), 'confidence': candidate.get('score'), 'is_authoritative': bool(candidate.get('is_authoritative')), 'matched_metadata_json': _serialize_metadata(candidate), 'match_candidates_json': candidates_json, 'match_enrichment_json': enrichment, 'provider_warnings': provider_warnings } def _append_query_params(url: str, params: dict | None) -> str: if not params: return url query = parse.urlencode( { key: value for key, value in params.items() if value is not None and value != '' }, doseq=True ) separator = '&' if parse.urlparse(url).query else '?' return f'{url}{separator}{query}' if query else url def _normalize_base_url(value: str | None) -> str: if not value: return '' return value.rstrip('/') def _build_text_query(item_metadata: dict) -> str: query_parts = [ item_metadata.get('title'), item_metadata.get('artist'), item_metadata.get('album') ] return ' '.join(part.strip() for part in query_parts if isinstance(part, str) and part.strip()) def _build_input_metadata(item: dict) -> dict: tags = item.get('original_tags_json') or {} audio_props = item.get('audio_props_json') or {} inferred_title = Path(item.get('relative_path') or item.get('filename') or '').stem release_date = tags.get('date') or tags.get('year') return { 'title': _first_non_empty(tags.get('title'), inferred_title), 'artist': _first_non_empty(tags.get('artist'), tags.get('album_artist')), 'artists': _split_artists(tags.get('artist')), 'album': tags.get('album'), 'album_artist': tags.get('album_artist'), 'track_number': _parse_track_number(tags.get('track_number') or tags.get('track')), 'disc_number': _parse_track_number(tags.get('disc_number') or tags.get('disc')), 'duration_seconds': ( item.get('fingerprint_duration_seconds') or audio_props.get('duration_seconds') ), 'release_date': release_date, 'year': _extract_year(release_date) } def _serialize_metadata(candidate: dict) -> dict: return { 'title': candidate.get('title'), 'artist': candidate.get('artist'), 'artists': candidate.get('artists') or [], 'album': candidate.get('album'), 'album_artist': candidate.get('album_artist'), 'track_number': candidate.get('track_number'), 'disc_number': candidate.get('disc_number'), 'release_date': candidate.get('release_date'), 'year': candidate.get('year'), 'duration_seconds': candidate.get('duration_seconds'), 'recording_id': candidate.get('recording_id'), 'release_id': candidate.get('release_id'), 'release_group_id': candidate.get('release_group_id'), 'source_ids': candidate.get('source_ids') or {} } def _serialize_candidate(candidate: dict) -> dict: return { 'provider': candidate.get('provider'), 'score': candidate.get('score'), 'score_breakdown': candidate.get('score_breakdown') or {}, 'is_authoritative': bool(candidate.get('is_authoritative')), 'title': candidate.get('title'), 'artist': candidate.get('artist'), 'album': candidate.get('album'), 'recording_id': candidate.get('recording_id'), 'release_id': candidate.get('release_id'), 'release_group_id': candidate.get('release_group_id'), 'source_ids': candidate.get('source_ids') or {} } def _dedupe_candidates(candidates: list[dict]) -> list[dict]: deduped: dict[str, dict] = {} for candidate in candidates: dedupe_key = '|'.join( [ candidate.get('provider') or '', candidate.get('recording_id') or '', candidate.get('release_id') or '', _normalize_text(candidate.get('title')), _normalize_text(candidate.get('artist')), _normalize_text(candidate.get('album')) ] ) current = deduped.get(dedupe_key) if current is None or (candidate.get('score') or 0) > (current.get('score') or 0): deduped[dedupe_key] = candidate return list(deduped.values()) def _extract_artist_names(artist_credit: list[dict]) -> list[str]: names = [] for artist in artist_credit: if artist.get('name'): names.append(artist['name']) continue nested_artist = artist.get('artist') or {} if nested_artist.get('name'): names.append(nested_artist['name']) return names def _join_artists(artists: list[str]) -> str | None: if not artists: return None return ', '.join(artists) def _milliseconds_to_seconds(value) -> float | None: if value in (None, ''): return None return round(_to_float(value) / 1000.0, 2) def _find_release_track(release: dict, recording_id: str) -> dict: for medium in release.get('media') or []: disc_number = _parse_track_number(medium.get('position')) for track in medium.get('tracks') or []: nested_recording = track.get('recording') or {} if nested_recording.get('id') == recording_id: return { **track, 'disc_number': disc_number } return {} def _build_release_tracklist(release: dict) -> list[dict]: tracklist = [] for medium in release.get('media') or []: disc_number = _parse_track_number(medium.get('position')) for track in medium.get('tracks') or []: nested_recording = track.get('recording') or {} tracklist.append( { 'title': track.get('title') or nested_recording.get('title'), 'track_number': _parse_track_number(track.get('position') or track.get('number')), 'disc_number': disc_number, 'duration_seconds': _milliseconds_to_seconds(track.get('length')) } ) return tracklist def _text_similarity(left: str | None, right: str | None) -> float: normalized_left = _normalize_text(left) normalized_right = _normalize_text(right) if not normalized_left or not normalized_right: return 0.0 if normalized_left == normalized_right: return 1.0 left_tokens = set(normalized_left.split()) right_tokens = set(normalized_right.split()) overlap = len(left_tokens & right_tokens) return overlap / max(len(left_tokens), len(right_tokens), 1) def _artist_similarity(item_metadata: dict, candidate: dict) -> float: artist_candidates = [ candidate.get('artist'), _join_artists(candidate.get('artists') or []), candidate.get('album_artist') ] return max( (_text_similarity(item_metadata.get('artist'), artist_name) for artist_name in artist_candidates), default=0.0 ) def _duration_similarity(source_duration, candidate_duration) -> float: if source_duration in (None, '') or candidate_duration in (None, ''): return 0.0 delta = abs(_to_float(source_duration) - _to_float(candidate_duration)) if delta <= 1: return 1.0 if delta <= 3: return 0.8 if delta <= 5: return 0.6 if delta <= 10: return 0.3 return 0.0 def _track_disc_similarity(item_metadata: dict, candidate: dict) -> float: score = 0.0 if item_metadata.get('track_number') and candidate.get('track_number'): if item_metadata['track_number'] == candidate['track_number']: score += 0.6 if item_metadata.get('disc_number') and candidate.get('disc_number'): if item_metadata['disc_number'] == candidate['disc_number']: score += 0.4 return score def _album_context_similarity(album_group: list[dict], candidate: dict) -> float: release_tracklist = candidate.get('release_tracklist') or [] if len(album_group) < 2 or not release_tracklist: return 0.0 comparable_items = 0 matched_items = 0 for item in album_group: item_metadata = _build_input_metadata(item) track_number = item_metadata.get('track_number') if not track_number: continue comparable_items += 1 track_match = next( ( track for track in release_tracklist if track.get('track_number') == track_number and ( not item_metadata.get('disc_number') or not track.get('disc_number') or track.get('disc_number') == item_metadata.get('disc_number') ) ), None ) if not track_match: continue title_ok = _text_similarity(item_metadata.get('title'), track_match.get('title')) >= 0.7 duration_ok = _duration_similarity( item_metadata.get('duration_seconds'), track_match.get('duration_seconds') ) >= 0.6 if title_ok or duration_ok: matched_items += 1 if comparable_items == 0: return 0.0 return matched_items / comparable_items def _version_penalty(item_metadata: dict, candidate: dict) -> float: item_tokens = _extract_version_tokens(item_metadata.get('title')) candidate_tokens = _extract_version_tokens(candidate.get('title')) if not item_tokens and not candidate_tokens: return 0.0 if item_tokens == candidate_tokens: return 0.0 return 8.0 def _extract_version_tokens(value: str | None) -> set[str]: normalized = _normalize_text(value) if not normalized: return set() return {token for token in normalized.split() if token in VERSION_TOKENS} def _parse_track_number(value) -> int | None: if value in (None, ''): return None match = re.search(r'\d+', str(value)) return int(match.group(0)) if match else None def _extract_year(value: str | None) -> int | None: if not value: return None match = re.search(r'(\d{4})', str(value)) return int(match.group(1)) if match else None def _normalize_text(value: str | None) -> str: if not value: return '' cleaned = re.sub(r'[^a-z0-9]+', ' ', str(value).lower()) return ' '.join(cleaned.split()) def _first_non_empty(*values): for value in values: if isinstance(value, str) and value.strip(): return value.strip() return None def _split_artists(value: str | None) -> list[str]: if not value: return [] return [part.strip() for part in re.split(r'[,/&]| feat\. ', value) if part.strip()] def _provider_rank(provider: str | None) -> int: provider_order = { 'acoustid': 6, 'musicbrainz': 5, 'netease': 4, 'qq': 3, 'spotify': 2 } return provider_order.get(provider or '', 0) def _pick_best_candidate(candidates: list[dict]) -> dict | None: if not candidates: return None return max(candidates, key=lambda candidate: candidate.get('quality', 0)) def _unique_non_empty(values: list[str]) -> list[str]: unique_values = [] seen_values: set[str] = set() for value in values: if not value or value in seen_values: continue seen_values.add(value) unique_values.append(value) return unique_values def _format_timestamp_date(value) -> str | None: if value in (None, ''): return None if isinstance(value, (int, float)) and value > 1000: return time.strftime('%Y-%m-%d', time.gmtime(value / 1000)) return str(value) def _to_float(value) -> float: try: return float(value) except (TypeError, ValueError): return 0.0