Files
MusicWorkshop/backend/app/matcher.py
T
2026-04-30 14:34:28 +08:00

1484 lines
47 KiB
Python

import base64
import json
import re
import time
from pathlib import Path
from urllib import error, parse, request
MUSICBRAINZ_USER_AGENT = 'MusicWorkshop/0.1 (+https://example.invalid/musicworkshop)'
MUSICBRAINZ_THROTTLE_SECONDS = 1.0
HTTP_TIMEOUT_SECONDS = 5
HTTP_RETRY_COUNT = 1
MAX_MATCH_CANDIDATES = 5
VERSION_TOKENS = {
'live',
'remix',
'demo',
'karaoke',
'instrumental',
'cover',
'acoustic',
'edit',
'version'
}
SPOTIFY_TOKEN_URL = 'https://accounts.spotify.com/api/token'
class MatchProviderError(Exception):
def __init__(self, provider: str, message: str):
super().__init__(message)
self.provider = provider
class MatchHttpClient:
def __init__(self):
self._last_request_at: dict[str, float] = {}
def request_json(
self,
provider: str,
url: str,
*,
params: dict | None = None,
headers: dict[str, str] | None = None,
method: str = 'GET',
data: bytes | None = None,
timeout: int = HTTP_TIMEOUT_SECONDS,
retries: int = HTTP_RETRY_COUNT,
throttle_key: str | None = None,
throttle_seconds: float = 0.0
) -> dict:
request_headers = dict(headers or {})
final_url = _append_query_params(url, params)
if throttle_key and throttle_seconds > 0:
self._throttle(throttle_key, throttle_seconds)
for attempt in range(retries + 1):
try:
response = self._open(
request.Request(
final_url,
data=data,
headers=request_headers,
method=method
),
timeout
)
with response:
charset = response.headers.get_content_charset() or 'utf-8'
payload = response.read().decode(charset)
return json.loads(payload) if payload else {}
except error.HTTPError as exc:
response_body = exc.read().decode('utf-8', errors='ignore')
if exc.code >= 500 and attempt < retries:
time.sleep(0.2 * (attempt + 1))
continue
raise MatchProviderError(
provider,
f'{provider} 请求失败 (HTTP {exc.code}) {response_body[:160]}'.strip()
) from exc
except error.URLError as exc:
if attempt < retries:
time.sleep(0.2 * (attempt + 1))
continue
raise MatchProviderError(
provider,
f'{provider} 无法连接: {getattr(exc, "reason", exc)}'
) from exc
except TimeoutError as exc:
if attempt < retries:
time.sleep(0.2 * (attempt + 1))
continue
raise MatchProviderError(provider, f'{provider} 请求超时') from exc
except json.JSONDecodeError as exc:
raise MatchProviderError(provider, f'{provider} 返回了无效 JSON') from exc
raise MatchProviderError(provider, f'{provider} 请求失败')
def _open(self, req: request.Request, timeout: int):
return request.urlopen(req, timeout=timeout)
def _throttle(self, throttle_key: str, throttle_seconds: float):
now = time.monotonic()
previous = self._last_request_at.get(throttle_key)
if previous is not None:
sleep_seconds = throttle_seconds - (now - previous)
if sleep_seconds > 0:
time.sleep(sleep_seconds)
self._last_request_at[throttle_key] = time.monotonic()
class MusicBrainzProvider:
def __init__(self, http_client: MatchHttpClient):
self.http_client = http_client
self._recording_cache: dict[tuple[str, str], dict] = {}
self._release_cache: dict[tuple[str, str], dict] = {}
def search_text(
self,
item_metadata: dict,
config: dict,
*,
provider_name: str = 'musicbrainz'
) -> list[dict]:
base_url = _normalize_base_url(config.get('musicbrainz'))
query = _build_text_query(item_metadata)
if not base_url or not query:
return []
response = self._request_json(
provider_name,
f'{base_url}/recording',
params={
'query': query,
'limit': MAX_MATCH_CANDIDATES,
'fmt': 'json'
}
)
candidates: list[dict] = []
for entry in response.get('recordings') or []:
recording_id = entry.get('id')
if not recording_id:
continue
score = _to_float(entry.get('score') or entry.get('ext:score')) / 100.0
release_ids = [
release.get('id')
for release in entry.get('releases') or []
if release.get('id')
]
candidates.extend(
self.lookup_recording_seed(
provider_name,
config,
recording_id=recording_id,
release_ids=release_ids,
search_confidence=score
)
)
return _dedupe_candidates(candidates)
def lookup_recording_seed(
self,
provider_name: str,
config: dict,
*,
recording_id: str,
release_ids: list[str] | None = None,
search_confidence: float | None = None,
fingerprint_confidence: float | None = None,
extra_source_ids: dict | None = None
) -> list[dict]:
recording = self._get_recording(config, recording_id)
candidate_release_ids = _unique_non_empty(
[*(release_ids or []), *[
release.get('id')
for release in recording.get('releases') or []
if release.get('id')
]]
)
candidates: list[dict] = []
for release_id in candidate_release_ids[:2]:
release = self._get_release(config, release_id)
candidate = self._build_release_candidate(
provider_name,
recording,
release,
search_confidence=search_confidence,
fingerprint_confidence=fingerprint_confidence,
extra_source_ids=extra_source_ids
)
if candidate:
candidates.append(candidate)
if candidates:
return _dedupe_candidates(candidates)
fallback = self._build_recording_candidate(
provider_name,
recording,
search_confidence=search_confidence,
fingerprint_confidence=fingerprint_confidence,
extra_source_ids=extra_source_ids
)
return [fallback] if fallback else []
def align_candidate(self, candidate: dict, config: dict) -> dict | None:
item_metadata = {
'title': candidate.get('title'),
'artist': candidate.get('artist'),
'album': candidate.get('album'),
'duration_seconds': candidate.get('duration_seconds'),
'track_number': candidate.get('track_number'),
'disc_number': candidate.get('disc_number')
}
aligned_candidates = self.search_text(item_metadata, config, provider_name='musicbrainz')
return aligned_candidates[0] if aligned_candidates else None
def _get_recording(self, config: dict, recording_id: str) -> dict:
base_url = _normalize_base_url(config.get('musicbrainz'))
cache_key = (base_url, recording_id)
if cache_key not in self._recording_cache:
self._recording_cache[cache_key] = self._request_json(
'musicbrainz',
f'{base_url}/recording/{recording_id}',
params={
'inc': 'artists+releases',
'fmt': 'json'
}
)
return self._recording_cache[cache_key]
def _get_release(self, config: dict, release_id: str) -> dict:
base_url = _normalize_base_url(config.get('musicbrainz'))
cache_key = (base_url, release_id)
if cache_key not in self._release_cache:
self._release_cache[cache_key] = self._request_json(
'musicbrainz',
f'{base_url}/release/{release_id}',
params={
'inc': 'artists+recordings+release-groups',
'fmt': 'json'
}
)
return self._release_cache[cache_key]
def _request_json(self, provider: str, url: str, *, params: dict | None = None) -> dict:
return self.http_client.request_json(
provider,
url,
params=params,
headers={'User-Agent': MUSICBRAINZ_USER_AGENT},
throttle_key='musicbrainz',
throttle_seconds=MUSICBRAINZ_THROTTLE_SECONDS
)
def _build_recording_candidate(
self,
provider_name: str,
recording: dict,
*,
search_confidence: float | None,
fingerprint_confidence: float | None,
extra_source_ids: dict | None
) -> dict | None:
recording_id = recording.get('id')
if not recording_id:
return None
title = recording.get('title')
artists = _extract_artist_names(recording.get('artist-credit') or [])
duration_seconds = _milliseconds_to_seconds(recording.get('length'))
release_date = None
return {
'provider': provider_name,
'is_authoritative': True,
'title': title,
'artist': _join_artists(artists),
'artists': artists,
'album': None,
'album_artist': _join_artists(artists),
'track_number': None,
'disc_number': None,
'release_date': release_date,
'year': _extract_year(release_date),
'duration_seconds': duration_seconds,
'recording_id': recording_id,
'release_id': None,
'release_group_id': None,
'source_ids': {
**(extra_source_ids or {}),
'musicbrainz_recording_id': recording_id
},
'fingerprint_confidence': fingerprint_confidence,
'search_confidence': search_confidence,
'release_tracklist': []
}
def _build_release_candidate(
self,
provider_name: str,
recording: dict,
release: dict,
*,
search_confidence: float | None,
fingerprint_confidence: float | None,
extra_source_ids: dict | None
) -> dict | None:
recording_id = recording.get('id')
release_id = release.get('id')
if not recording_id or not release_id:
return None
track_info = _find_release_track(release, recording_id)
track_title = track_info.get('title') or track_info.get('recording', {}).get('title')
track_artist_credit = (
track_info.get('recording', {}).get('artist-credit')
or recording.get('artist-credit')
or release.get('artist-credit')
or []
)
artists = _extract_artist_names(track_artist_credit)
album_artists = _extract_artist_names(release.get('artist-credit') or [])
release_date = release.get('date')
duration_seconds = _milliseconds_to_seconds(
track_info.get('length') or recording.get('length')
)
return {
'provider': provider_name,
'is_authoritative': True,
'title': track_title or recording.get('title'),
'artist': _join_artists(artists or album_artists),
'artists': artists or album_artists,
'album': release.get('title'),
'album_artist': _join_artists(album_artists or artists),
'track_number': _parse_track_number(track_info.get('position') or track_info.get('number')),
'disc_number': _parse_track_number(track_info.get('disc_number')),
'release_date': release_date,
'year': _extract_year(release_date),
'duration_seconds': duration_seconds,
'recording_id': recording_id,
'release_id': release_id,
'release_group_id': (release.get('release-group') or {}).get('id'),
'source_ids': {
**(extra_source_ids or {}),
'musicbrainz_recording_id': recording_id,
'musicbrainz_release_id': release_id,
'musicbrainz_release_group_id': (release.get('release-group') or {}).get('id')
},
'fingerprint_confidence': fingerprint_confidence,
'search_confidence': search_confidence,
'release_tracklist': _build_release_tracklist(release)
}
class AcoustIdProvider:
def __init__(self, http_client: MatchHttpClient, musicbrainz_provider: MusicBrainzProvider):
self.http_client = http_client
self.musicbrainz_provider = musicbrainz_provider
def search(self, item: dict, config: dict) -> list[dict]:
metadata_config = config.get('metadata') or {}
fingerprint = item.get('acoustic_fingerprint')
duration_seconds = item.get('fingerprint_duration_seconds')
client_key = (metadata_config.get('acoustidClientKey') or '').strip()
base_url = _normalize_base_url(metadata_config.get('acoustidUrl'))
if not fingerprint or not duration_seconds or not client_key or not base_url:
return []
response = self.http_client.request_json(
'acoustid',
f'{base_url}/lookup',
params={
'client': client_key,
'duration': int(round(duration_seconds)),
'fingerprint': fingerprint,
'meta': 'recordings releasegroups'
}
)
candidates: list[dict] = []
for result in response.get('results') or []:
result_id = result.get('id')
fingerprint_confidence = _to_float(result.get('score'))
for recording in result.get('recordings') or []:
recording_id = recording.get('id')
if not recording_id:
continue
release_ids = [
release.get('id')
for release in recording.get('releases') or []
if release.get('id')
]
candidates.extend(
self.musicbrainz_provider.lookup_recording_seed(
'acoustid',
metadata_config,
recording_id=recording_id,
release_ids=release_ids,
fingerprint_confidence=fingerprint_confidence,
extra_source_ids={'acoustid_id': result_id}
)
)
return _dedupe_candidates(candidates)
class TextSearchProvider:
provider_name = ''
credentials: tuple[str, ...] = ()
def __init__(self, http_client: MatchHttpClient):
self.http_client = http_client
def search(self, item_metadata: dict, config: dict) -> list[dict]:
query = _build_text_query(item_metadata)
metadata_config = config.get('metadata') or {}
base_url = _normalize_base_url(metadata_config.get(self.base_url_key))
if not base_url or not query:
return []
if self.credentials and not all(
(metadata_config.get(field_name) or '').strip()
for field_name in self.credentials
):
return []
return self._parse_search_payload(
self.http_client.request_json(
self.provider_name,
f'{base_url}{self.search_path}',
params=self.build_params(query)
)
)
def build_params(self, query: str) -> dict:
return {'keywords': query, 'limit': MAX_MATCH_CANDIDATES}
def _parse_search_payload(self, payload: dict) -> list[dict]:
raise NotImplementedError
class NeteaseProvider(TextSearchProvider):
provider_name = 'netease'
base_url_key = 'netease'
search_path = '/search'
def build_params(self, query: str) -> dict:
return {
'keywords': query,
'type': 1,
'limit': MAX_MATCH_CANDIDATES
}
def _parse_search_payload(self, payload: dict) -> list[dict]:
songs = ((payload.get('result') or {}).get('songs')) or []
candidates = []
for song in songs[:MAX_MATCH_CANDIDATES]:
artists = [artist.get('name') for artist in song.get('ar') or [] if artist.get('name')]
album = song.get('al') or {}
release_date = _format_timestamp_date(album.get('publishTime'))
candidates.append(
{
'provider': 'netease',
'is_authoritative': False,
'title': song.get('name'),
'artist': _join_artists(artists),
'artists': artists,
'album': album.get('name'),
'album_artist': _join_artists(artists),
'track_number': None,
'disc_number': None,
'release_date': release_date,
'year': _extract_year(release_date),
'duration_seconds': _milliseconds_to_seconds(song.get('dt')),
'recording_id': None,
'release_id': None,
'release_group_id': None,
'source_ids': {
'netease_song_id': song.get('id'),
'netease_album_id': album.get('id')
},
'cover_url': album.get('picUrl'),
'search_confidence': 0.88
}
)
return candidates
class QQProvider(TextSearchProvider):
provider_name = 'qq'
base_url_key = 'qq'
search_path = '/search'
def _parse_search_payload(self, payload: dict) -> list[dict]:
song_list = (
((payload.get('data') or {}).get('song') or {}).get('list')
or ((payload.get('result') or {}).get('list') or [])
)
candidates = []
for song in song_list[:MAX_MATCH_CANDIDATES]:
singers = song.get('singer') or song.get('singers') or []
artists = [artist.get('name') for artist in singers if artist.get('name')]
album = song.get('album') or {}
release_date = song.get('time_public') or album.get('time_public')
candidates.append(
{
'provider': 'qq',
'is_authoritative': False,
'title': song.get('title') or song.get('name'),
'artist': _join_artists(artists),
'artists': artists,
'album': album.get('title') or album.get('name'),
'album_artist': _join_artists(artists),
'track_number': None,
'disc_number': None,
'release_date': release_date,
'year': _extract_year(release_date),
'duration_seconds': _milliseconds_to_seconds(song.get('interval', 0) * 1000),
'recording_id': None,
'release_id': None,
'release_group_id': None,
'source_ids': {
'qq_song_mid': song.get('mid') or song.get('songmid'),
'qq_album_mid': album.get('mid')
},
'search_confidence': 0.88
}
)
return candidates
class SpotifyProvider(TextSearchProvider):
provider_name = 'spotify'
base_url_key = 'spotifyUrl'
search_path = '/search'
credentials = ('spotifyClientId', 'spotifySecret')
def __init__(self, http_client: MatchHttpClient):
super().__init__(http_client)
self._token_cache: dict[str, dict] = {}
def search(self, item_metadata: dict, config: dict) -> list[dict]:
query = _build_text_query(item_metadata)
metadata_config = config.get('metadata') or {}
base_url = _normalize_base_url(metadata_config.get(self.base_url_key))
client_id = (metadata_config.get('spotifyClientId') or '').strip()
client_secret = (metadata_config.get('spotifySecret') or '').strip()
if not base_url or not query or not client_id or not client_secret:
return []
token = self._get_access_token(client_id, client_secret)
payload = self.http_client.request_json(
'spotify',
f'{base_url}{self.search_path}',
params={
'q': query,
'type': 'track',
'limit': MAX_MATCH_CANDIDATES
},
headers={'Authorization': f'Bearer {token}'}
)
candidates = []
tracks = ((payload.get('tracks') or {}).get('items')) or []
for track in tracks[:MAX_MATCH_CANDIDATES]:
artists = [artist.get('name') for artist in track.get('artists') or [] if artist.get('name')]
album = track.get('album') or {}
images = album.get('images') or []
candidates.append(
{
'provider': 'spotify',
'is_authoritative': False,
'title': track.get('name'),
'artist': _join_artists(artists),
'artists': artists,
'album': album.get('name'),
'album_artist': _join_artists(artists),
'track_number': track.get('track_number'),
'disc_number': track.get('disc_number'),
'release_date': album.get('release_date'),
'year': _extract_year(album.get('release_date')),
'duration_seconds': _milliseconds_to_seconds(track.get('duration_ms')),
'recording_id': None,
'release_id': None,
'release_group_id': None,
'source_ids': {
'spotify_track_id': track.get('id'),
'spotify_album_id': album.get('id')
},
'cover_url': images[0].get('url') if images else None,
'search_confidence': 0.9
}
)
return candidates
def _get_access_token(self, client_id: str, client_secret: str) -> str:
cache_key = f'{client_id}:{client_secret}'
cached_token = self._token_cache.get(cache_key)
if cached_token and cached_token['expires_at'] > time.time():
return cached_token['access_token']
basic_token = base64.b64encode(f'{client_id}:{client_secret}'.encode()).decode()
payload = self.http_client.request_json(
'spotify',
SPOTIFY_TOKEN_URL,
method='POST',
data=b'grant_type=client_credentials',
headers={
'Authorization': f'Basic {basic_token}',
'Content-Type': 'application/x-www-form-urlencoded'
}
)
access_token = payload.get('access_token')
expires_in = int(payload.get('expires_in') or 3600)
self._token_cache[cache_key] = {
'access_token': access_token,
'expires_at': time.time() + max(60, expires_in - 30)
}
return access_token
class DiscogsProvider:
def __init__(self, http_client: MatchHttpClient):
self.http_client = http_client
def enrich(self, metadata: dict, config: dict) -> dict | None:
metadata_config = config.get('metadata') or {}
base_url = _normalize_base_url(metadata_config.get('discogsUrl'))
token = (metadata_config.get('discogsToken') or '').strip()
if not base_url or not token:
return None
payload = self.http_client.request_json(
'discogs',
f'{base_url}/database/search',
params={
'track': metadata.get('title') or '',
'artist': metadata.get('artist') or '',
'release_title': metadata.get('album') or '',
'per_page': 3,
'token': token
}
)
result = (payload.get('results') or [None])[0]
if not result:
return None
return {
'provider': 'discogs',
'cover_url': result.get('cover_image'),
'genres': result.get('genre') or [],
'tags': result.get('style') or [],
'quality': 0.9 if result.get('cover_image') else 0.5,
'source_id': result.get('id')
}
class LastFmProvider:
def __init__(self, http_client: MatchHttpClient):
self.http_client = http_client
def enrich(self, metadata: dict, config: dict) -> dict | None:
metadata_config = config.get('metadata') or {}
base_url = _normalize_base_url(metadata_config.get('lastfmUrl'))
api_key = (metadata_config.get('lastfmKey') or '').strip()
if not base_url or not api_key:
return None
payload = self.http_client.request_json(
'lastfm',
base_url,
params={
'method': 'track.getInfo',
'api_key': api_key,
'artist': metadata.get('artist') or '',
'track': metadata.get('title') or '',
'autocorrect': 1,
'format': 'json'
}
)
track = payload.get('track') or {}
album = track.get('album') or {}
images = album.get('image') or []
top_tags = ((track.get('toptags') or {}).get('tag')) or []
return {
'provider': 'lastfm',
'cover_url': next(
(image.get('#text') for image in reversed(images) if image.get('#text')),
None
),
'genres': [tag.get('name') for tag in top_tags if tag.get('name')],
'tags': [tag.get('name') for tag in top_tags if tag.get('name')],
'quality': 0.75 if images else 0.6,
'source_id': track.get('mbid') or metadata.get('recording_id')
}
class GeniusProvider:
def __init__(self, http_client: MatchHttpClient):
self.http_client = http_client
def enrich(self, metadata: dict, config: dict) -> dict | None:
metadata_config = config.get('metadata') or {}
base_url = _normalize_base_url(metadata_config.get('geniusUrl'))
token = (metadata_config.get('geniusToken') or '').strip()
query = _build_text_query(metadata)
if not base_url or not token or not query:
return None
payload = self.http_client.request_json(
'genius',
f'{base_url}/search',
params={'q': query},
headers={'Authorization': f'Bearer {token}'}
)
hit = (((payload.get('response') or {}).get('hits')) or [None])[0]
if not hit:
return None
result = hit.get('result') or {}
return {
'provider': 'genius',
'lyrics_url': result.get('url'),
'quality': 0.8 if result.get('url') else 0.0,
'source_id': result.get('id')
}
class Matcher:
def __init__(
self,
*,
http_client: MatchHttpClient | None = None,
musicbrainz_provider: MusicBrainzProvider | None = None,
acoustid_provider: AcoustIdProvider | None = None,
netease_provider: NeteaseProvider | None = None,
qq_provider: QQProvider | None = None,
spotify_provider: SpotifyProvider | None = None,
discogs_provider: DiscogsProvider | None = None,
lastfm_provider: LastFmProvider | None = None,
genius_provider: GeniusProvider | None = None
):
self.http_client = http_client or MatchHttpClient()
self.musicbrainz_provider = musicbrainz_provider or MusicBrainzProvider(self.http_client)
self.acoustid_provider = acoustid_provider or AcoustIdProvider(
self.http_client,
self.musicbrainz_provider
)
self.netease_provider = netease_provider or NeteaseProvider(self.http_client)
self.qq_provider = qq_provider or QQProvider(self.http_client)
self.spotify_provider = spotify_provider or SpotifyProvider(self.http_client)
self.discogs_provider = discogs_provider or DiscogsProvider(self.http_client)
self.lastfm_provider = lastfm_provider or LastFmProvider(self.http_client)
self.genius_provider = genius_provider or GeniusProvider(self.http_client)
def match_item(self, item: dict, album_group: list[dict], config: dict) -> dict:
item_metadata = _build_input_metadata(item)
provider_warnings: list[dict] = []
candidates: list[dict] = []
provider_scope = set(config.get('repair_provider_scope') or [])
use_all_providers = not provider_scope
def provider_enabled(name: str) -> bool:
return use_all_providers or name in provider_scope
if provider_enabled('acoustid'):
candidates.extend(
self._collect_provider_candidates(
'acoustid',
self.acoustid_provider.search,
provider_warnings,
item,
config
)
)
if provider_enabled('musicbrainz'):
candidates.extend(
self._collect_provider_candidates(
'musicbrainz',
self.musicbrainz_provider.search_text,
provider_warnings,
item_metadata,
config.get('metadata') or {}
)
)
candidates = self._score_candidates(item_metadata, album_group, candidates)
top_authoritative = candidates[0] if candidates else None
fallback_enabled = bool((config.get('advancedStrategy') or {}).get('metadataFallback', True))
if fallback_enabled and (top_authoritative is None or top_authoritative['score'] < 85):
fallback_candidates = []
if provider_enabled('netease'):
fallback_candidates.extend(
self._collect_provider_candidates(
'netease',
self.netease_provider.search,
provider_warnings,
item_metadata,
config
)
)
if provider_enabled('qq'):
fallback_candidates.extend(
self._collect_provider_candidates(
'qq',
self.qq_provider.search,
provider_warnings,
item_metadata,
config
)
)
if provider_enabled('spotify'):
fallback_candidates.extend(
self._collect_provider_candidates(
'spotify',
self.spotify_provider.search,
provider_warnings,
item_metadata,
config
)
)
candidates = self._score_candidates(
item_metadata,
album_group,
[*candidates, *fallback_candidates]
)
if not candidates:
return {
'status': 'not_found',
'reason': 'no_candidates',
'message': '未找到任何匹配候选',
'source': None,
'confidence': None,
'is_authoritative': False,
'matched_metadata_json': None,
'match_candidates_json': [],
'match_enrichment_json': None,
'provider_warnings': provider_warnings
}
top_candidate = candidates[0]
if not top_candidate['is_authoritative'] and provider_enabled('musicbrainz'):
aligned_candidate = self._align_candidate_with_warnings(
top_candidate,
config.get('metadata') or {},
provider_warnings
)
if aligned_candidate:
top_candidate = self._score_candidates(
item_metadata,
album_group,
[self._merge_aligned_candidate(top_candidate, aligned_candidate)]
)[0]
candidates = self._score_candidates(
item_metadata,
album_group,
[top_candidate, *candidates[1:]]
)
runner_up = candidates[1] if len(candidates) > 1 else None
score_gap = top_candidate['score'] - (runner_up['score'] if runner_up else 0)
candidates_json = [_serialize_candidate(candidate) for candidate in candidates[:MAX_MATCH_CANDIDATES]]
enrichment = self._build_enrichment(top_candidate, config)
if top_candidate['is_authoritative']:
if top_candidate['score'] >= 85 and score_gap >= 8:
return self._build_match_result(
'matched',
'authoritative_auto_match',
f'权威候选自动匹配成功,得分 {top_candidate["score"]:.1f}',
top_candidate,
candidates_json,
enrichment,
provider_warnings
)
else:
if top_candidate['score'] >= 80 and score_gap >= 8:
return self._build_match_result(
'matched_fallback',
'fallback_auto_match',
f'Fallback 候选自动匹配成功,得分 {top_candidate["score"]:.1f}',
top_candidate,
candidates_json,
enrichment,
provider_warnings
)
reason = 'score_gap_too_small' if score_gap < 8 else 'score_below_threshold'
message = (
f'候选最高分 {top_candidate["score"]:.1f},与次高分差 {score_gap:.1f},需人工复核'
)
return self._build_match_result(
'low_score',
reason,
message,
top_candidate,
candidates_json,
enrichment,
provider_warnings
)
def _collect_provider_candidates(
self,
provider_name: str,
search_provider,
provider_warnings: list[dict],
*args,
**kwargs
) -> list[dict]:
try:
return search_provider(*args, **kwargs)
except MatchProviderError as error:
self._append_provider_warning(provider_name, error, provider_warnings)
return []
def _align_candidate_with_warnings(
self,
candidate: dict,
metadata_config: dict,
provider_warnings: list[dict]
) -> dict | None:
try:
return self.musicbrainz_provider.align_candidate(candidate, metadata_config)
except MatchProviderError as error:
self._append_provider_warning('musicbrainz', error, provider_warnings)
return None
def _append_provider_warning(
self,
provider_name: str,
error: MatchProviderError,
provider_warnings: list[dict]
):
provider_warnings.append(
{
'provider': getattr(error, 'provider', None) or provider_name,
'message': str(error)
}
)
def _score_candidates(
self,
item_metadata: dict,
album_group: list[dict],
candidates: list[dict]
) -> list[dict]:
scored_candidates = []
for candidate in _dedupe_candidates(candidates):
identity_confidence = min(
1.0,
max(
0.0,
candidate.get('fingerprint_confidence')
or candidate.get('search_confidence')
or 0.0
)
)
score_breakdown = {
'fingerprint': round(30 * identity_confidence, 2),
'title': round(20 * _text_similarity(item_metadata.get('title'), candidate.get('title')), 2),
'artist': round(15 * _artist_similarity(item_metadata, candidate), 2),
'album': round(10 * _text_similarity(item_metadata.get('album'), candidate.get('album')), 2),
'duration': round(10 * _duration_similarity(
item_metadata.get('duration_seconds'),
candidate.get('duration_seconds')
), 2),
'track_disc': round(5 * _track_disc_similarity(item_metadata, candidate), 2),
'album_context': round(10 * _album_context_similarity(album_group, candidate), 2),
'version_penalty': round(_version_penalty(item_metadata, candidate), 2)
}
total_score = round(
max(
0.0,
min(
100.0,
sum(
value
for key, value in score_breakdown.items()
if key != 'version_penalty'
) - score_breakdown['version_penalty']
)
),
2
)
scored_candidates.append(
{
**candidate,
'score': total_score,
'score_breakdown': score_breakdown
}
)
return sorted(
scored_candidates,
key=lambda candidate: (
candidate['score'],
1 if candidate.get('is_authoritative') else 0,
_provider_rank(candidate.get('provider'))
),
reverse=True
)
def _merge_aligned_candidate(self, fallback_candidate: dict, aligned_candidate: dict) -> dict:
merged_source_ids = {
**(fallback_candidate.get('source_ids') or {}),
**(aligned_candidate.get('source_ids') or {})
}
return {
**fallback_candidate,
**aligned_candidate,
'provider': 'musicbrainz',
'is_authoritative': True,
'source_ids': merged_source_ids,
'cover_url': fallback_candidate.get('cover_url') or aligned_candidate.get('cover_url'),
'lyrics_url': fallback_candidate.get('lyrics_url') or aligned_candidate.get('lyrics_url')
}
def _build_enrichment(self, candidate: dict, config: dict) -> dict:
if not candidate:
return {
'cover': {'selected_source': None, 'candidates': []},
'lyrics': {'selected_source': None, 'candidates': []},
'genres': {'selected_source': None, 'candidates': []},
'tags': {'selected_source': None, 'candidates': []}
}
enrichment_candidates = {
'cover': [],
'lyrics': [],
'genres': [],
'tags': []
}
if candidate.get('cover_url'):
enrichment_candidates['cover'].append(
{
'provider': candidate['provider'],
'value': candidate['cover_url'],
'quality': 0.6,
'source_id': (candidate.get('source_ids') or {}).get(f'{candidate["provider"]}_album_id')
}
)
if candidate.get('lyrics_url'):
enrichment_candidates['lyrics'].append(
{
'provider': candidate['provider'],
'value': candidate['lyrics_url'],
'quality': 0.6,
'source_id': (candidate.get('source_ids') or {}).get(f'{candidate["provider"]}_song_id')
}
)
if (config.get('advancedStrategy') or {}).get('downloadAssets', True):
for provider in (
self.discogs_provider.enrich(candidate, config),
self.lastfm_provider.enrich(candidate, config),
self.genius_provider.enrich(candidate, config)
):
if not provider:
continue
if provider.get('cover_url'):
enrichment_candidates['cover'].append(
{
'provider': provider['provider'],
'value': provider['cover_url'],
'quality': provider.get('quality', 0.5),
'source_id': provider.get('source_id')
}
)
if provider.get('lyrics_url'):
enrichment_candidates['lyrics'].append(
{
'provider': provider['provider'],
'value': provider['lyrics_url'],
'quality': provider.get('quality', 0.5),
'source_id': provider.get('source_id')
}
)
if provider.get('genres'):
enrichment_candidates['genres'].append(
{
'provider': provider['provider'],
'value': provider['genres'],
'quality': provider.get('quality', 0.5),
'source_id': provider.get('source_id')
}
)
if provider.get('tags'):
enrichment_candidates['tags'].append(
{
'provider': provider['provider'],
'value': provider['tags'],
'quality': provider.get('quality', 0.5),
'source_id': provider.get('source_id')
}
)
return {
key: {
'selected_source': _pick_best_candidate(value_candidates),
'candidates': value_candidates
}
for key, value_candidates in enrichment_candidates.items()
}
def _build_match_result(
self,
status: str,
reason: str,
message: str,
candidate: dict,
candidates_json: list[dict],
enrichment: dict,
provider_warnings: list[dict]
) -> dict:
return {
'status': status,
'reason': reason,
'message': message,
'source': candidate.get('provider'),
'confidence': candidate.get('score'),
'is_authoritative': bool(candidate.get('is_authoritative')),
'matched_metadata_json': _serialize_metadata(candidate),
'match_candidates_json': candidates_json,
'match_enrichment_json': enrichment,
'provider_warnings': provider_warnings
}
def _append_query_params(url: str, params: dict | None) -> str:
if not params:
return url
query = parse.urlencode(
{
key: value
for key, value in params.items()
if value is not None and value != ''
},
doseq=True
)
separator = '&' if parse.urlparse(url).query else '?'
return f'{url}{separator}{query}' if query else url
def _normalize_base_url(value: str | None) -> str:
if not value:
return ''
return value.rstrip('/')
def _build_text_query(item_metadata: dict) -> str:
query_parts = [
item_metadata.get('title'),
item_metadata.get('artist'),
item_metadata.get('album')
]
return ' '.join(part.strip() for part in query_parts if isinstance(part, str) and part.strip())
def _build_input_metadata(item: dict) -> dict:
tags = item.get('original_tags_json') or {}
audio_props = item.get('audio_props_json') or {}
inferred_title = Path(item.get('relative_path') or item.get('filename') or '').stem
release_date = tags.get('date') or tags.get('year')
return {
'title': _first_non_empty(tags.get('title'), inferred_title),
'artist': _first_non_empty(tags.get('artist'), tags.get('album_artist')),
'artists': _split_artists(tags.get('artist')),
'album': tags.get('album'),
'album_artist': tags.get('album_artist'),
'track_number': _parse_track_number(tags.get('track_number') or tags.get('track')),
'disc_number': _parse_track_number(tags.get('disc_number') or tags.get('disc')),
'duration_seconds': (
item.get('fingerprint_duration_seconds')
or audio_props.get('duration_seconds')
),
'release_date': release_date,
'year': _extract_year(release_date)
}
def _serialize_metadata(candidate: dict) -> dict:
return {
'title': candidate.get('title'),
'artist': candidate.get('artist'),
'artists': candidate.get('artists') or [],
'album': candidate.get('album'),
'album_artist': candidate.get('album_artist'),
'track_number': candidate.get('track_number'),
'disc_number': candidate.get('disc_number'),
'release_date': candidate.get('release_date'),
'year': candidate.get('year'),
'duration_seconds': candidate.get('duration_seconds'),
'recording_id': candidate.get('recording_id'),
'release_id': candidate.get('release_id'),
'release_group_id': candidate.get('release_group_id'),
'source_ids': candidate.get('source_ids') or {}
}
def _serialize_candidate(candidate: dict) -> dict:
return {
'provider': candidate.get('provider'),
'score': candidate.get('score'),
'score_breakdown': candidate.get('score_breakdown') or {},
'is_authoritative': bool(candidate.get('is_authoritative')),
'title': candidate.get('title'),
'artist': candidate.get('artist'),
'album': candidate.get('album'),
'recording_id': candidate.get('recording_id'),
'release_id': candidate.get('release_id'),
'release_group_id': candidate.get('release_group_id'),
'source_ids': candidate.get('source_ids') or {}
}
def _dedupe_candidates(candidates: list[dict]) -> list[dict]:
deduped: dict[str, dict] = {}
for candidate in candidates:
dedupe_key = '|'.join(
[
candidate.get('provider') or '',
candidate.get('recording_id') or '',
candidate.get('release_id') or '',
_normalize_text(candidate.get('title')),
_normalize_text(candidate.get('artist')),
_normalize_text(candidate.get('album'))
]
)
current = deduped.get(dedupe_key)
if current is None or (candidate.get('score') or 0) > (current.get('score') or 0):
deduped[dedupe_key] = candidate
return list(deduped.values())
def _extract_artist_names(artist_credit: list[dict]) -> list[str]:
names = []
for artist in artist_credit:
if artist.get('name'):
names.append(artist['name'])
continue
nested_artist = artist.get('artist') or {}
if nested_artist.get('name'):
names.append(nested_artist['name'])
return names
def _join_artists(artists: list[str]) -> str | None:
if not artists:
return None
return ', '.join(artists)
def _milliseconds_to_seconds(value) -> float | None:
if value in (None, ''):
return None
return round(_to_float(value) / 1000.0, 2)
def _find_release_track(release: dict, recording_id: str) -> dict:
for medium in release.get('media') or []:
disc_number = _parse_track_number(medium.get('position'))
for track in medium.get('tracks') or []:
nested_recording = track.get('recording') or {}
if nested_recording.get('id') == recording_id:
return {
**track,
'disc_number': disc_number
}
return {}
def _build_release_tracklist(release: dict) -> list[dict]:
tracklist = []
for medium in release.get('media') or []:
disc_number = _parse_track_number(medium.get('position'))
for track in medium.get('tracks') or []:
nested_recording = track.get('recording') or {}
tracklist.append(
{
'title': track.get('title') or nested_recording.get('title'),
'track_number': _parse_track_number(track.get('position') or track.get('number')),
'disc_number': disc_number,
'duration_seconds': _milliseconds_to_seconds(track.get('length'))
}
)
return tracklist
def _text_similarity(left: str | None, right: str | None) -> float:
normalized_left = _normalize_text(left)
normalized_right = _normalize_text(right)
if not normalized_left or not normalized_right:
return 0.0
if normalized_left == normalized_right:
return 1.0
left_tokens = set(normalized_left.split())
right_tokens = set(normalized_right.split())
overlap = len(left_tokens & right_tokens)
return overlap / max(len(left_tokens), len(right_tokens), 1)
def _artist_similarity(item_metadata: dict, candidate: dict) -> float:
artist_candidates = [
candidate.get('artist'),
_join_artists(candidate.get('artists') or []),
candidate.get('album_artist')
]
return max(
(_text_similarity(item_metadata.get('artist'), artist_name) for artist_name in artist_candidates),
default=0.0
)
def _duration_similarity(source_duration, candidate_duration) -> float:
if source_duration in (None, '') or candidate_duration in (None, ''):
return 0.0
delta = abs(_to_float(source_duration) - _to_float(candidate_duration))
if delta <= 1:
return 1.0
if delta <= 3:
return 0.8
if delta <= 5:
return 0.6
if delta <= 10:
return 0.3
return 0.0
def _track_disc_similarity(item_metadata: dict, candidate: dict) -> float:
score = 0.0
if item_metadata.get('track_number') and candidate.get('track_number'):
if item_metadata['track_number'] == candidate['track_number']:
score += 0.6
if item_metadata.get('disc_number') and candidate.get('disc_number'):
if item_metadata['disc_number'] == candidate['disc_number']:
score += 0.4
return score
def _album_context_similarity(album_group: list[dict], candidate: dict) -> float:
release_tracklist = candidate.get('release_tracklist') or []
if len(album_group) < 2 or not release_tracklist:
return 0.0
comparable_items = 0
matched_items = 0
for item in album_group:
item_metadata = _build_input_metadata(item)
track_number = item_metadata.get('track_number')
if not track_number:
continue
comparable_items += 1
track_match = next(
(
track
for track in release_tracklist
if track.get('track_number') == track_number
and (
not item_metadata.get('disc_number')
or not track.get('disc_number')
or track.get('disc_number') == item_metadata.get('disc_number')
)
),
None
)
if not track_match:
continue
title_ok = _text_similarity(item_metadata.get('title'), track_match.get('title')) >= 0.7
duration_ok = _duration_similarity(
item_metadata.get('duration_seconds'),
track_match.get('duration_seconds')
) >= 0.6
if title_ok or duration_ok:
matched_items += 1
if comparable_items == 0:
return 0.0
return matched_items / comparable_items
def _version_penalty(item_metadata: dict, candidate: dict) -> float:
item_tokens = _extract_version_tokens(item_metadata.get('title'))
candidate_tokens = _extract_version_tokens(candidate.get('title'))
if not item_tokens and not candidate_tokens:
return 0.0
if item_tokens == candidate_tokens:
return 0.0
return 8.0
def _extract_version_tokens(value: str | None) -> set[str]:
normalized = _normalize_text(value)
if not normalized:
return set()
return {token for token in normalized.split() if token in VERSION_TOKENS}
def _parse_track_number(value) -> int | None:
if value in (None, ''):
return None
match = re.search(r'\d+', str(value))
return int(match.group(0)) if match else None
def _extract_year(value: str | None) -> int | None:
if not value:
return None
match = re.search(r'(\d{4})', str(value))
return int(match.group(1)) if match else None
def _normalize_text(value: str | None) -> str:
if not value:
return ''
cleaned = re.sub(r'[^a-z0-9]+', ' ', str(value).lower())
return ' '.join(cleaned.split())
def _first_non_empty(*values):
for value in values:
if isinstance(value, str) and value.strip():
return value.strip()
return None
def _split_artists(value: str | None) -> list[str]:
if not value:
return []
return [part.strip() for part in re.split(r'[,/&]| feat\. ', value) if part.strip()]
def _provider_rank(provider: str | None) -> int:
provider_order = {
'acoustid': 6,
'musicbrainz': 5,
'netease': 4,
'qq': 3,
'spotify': 2
}
return provider_order.get(provider or '', 0)
def _pick_best_candidate(candidates: list[dict]) -> dict | None:
if not candidates:
return None
return max(candidates, key=lambda candidate: candidate.get('quality', 0))
def _unique_non_empty(values: list[str]) -> list[str]:
unique_values = []
seen_values: set[str] = set()
for value in values:
if not value or value in seen_values:
continue
seen_values.add(value)
unique_values.append(value)
return unique_values
def _format_timestamp_date(value) -> str | None:
if value in (None, ''):
return None
if isinstance(value, (int, float)) and value > 1000:
return time.strftime('%Y-%m-%d', time.gmtime(value / 1000))
return str(value)
def _to_float(value) -> float:
try:
return float(value)
except (TypeError, ValueError):
return 0.0