Skip to content

Commit 0317b98

Browse files
dprodgerclaude
andcommitted
Add Spotify duration-mismatch rematch sweep (issue #100, phase 2)
Adds a third Spotify job type, ('spotify', 'rematch_duration_mismatches'), target_type='song'. The handler is a thin wrapper around the existing SpotifyMatcher with duration_mismatch_threshold set — same code path match_spotify_tracks.py --duration-mismatches has used historically. For each song the matcher walks only the releases whose linked Spotify track's duration differs from the recording's canonical duration by more than the threshold, swapping the link to a better track if it finds one. No auto-unlinking. Stubborn mismatches stay in place and remain visible in /admin/duration-mismatches for human review — that's a bigger trust call we can revisit later. Threshold defaults to 60 seconds (60_000 ms) — matches the admin review page and the matcher CLI default. Configurable per-job via the payload and per-sweep via --threshold-seconds, since the obvious-mismatch passes are likely to want a tighter threshold once the loud cases are cleaned up. The job is registered as a separate job_type from match_song so a bulk cleanup sweep doesn't collide on the (source, job_type, target_type, target_id) unique index with a user-triggered rematch on the same song. Different priorities, different metrics, different filter facet on the admin research dashboard. Producer: core/spotify_rematch_mismatches.py — sweep enqueuer that walks get_songs_with_duration_mismatches() and enqueues one job per song at priority 110 (behind user 50 and routine 100). CLI: scripts/rematch_spotify_duration_mismatches.py — production path. The in-process match_spotify_tracks.py --duration-mismatches flag stays for ad-hoc debugging. Tests: 13 new in test_spotify_rematch_mismatches.py covering threshold defaulting, payload override, success/permanent/no-op/retryable error mapping, and sweep behaviour (threshold pass-through, limit, error counting). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 07614b5 commit 0317b98

4 files changed

Lines changed: 555 additions & 1 deletion

File tree

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
"""
2+
Spotify duration-mismatch rematch sweep (issue #100, second phase).
3+
4+
Walks songs where at least one Spotify streaming link's duration_ms
5+
differs from the linked recording's canonical duration_ms by more than
6+
a threshold, and enqueues one ('spotify', 'rematch_duration_mismatches')
7+
job per song onto the durable research queue.
8+
9+
The handler — see research_worker/handlers/spotify.py — wraps the same
10+
SpotifyMatcher path the `match_spotify_tracks.py --duration-mismatches`
11+
CLI has used historically. It re-runs matching narrowly on the
12+
mismatched releases; if it finds a better track, it swaps the link, and
13+
otherwise leaves the existing match in place.
14+
15+
Threshold defaults to 60s (60_000 ms) — same as the
16+
/admin/duration-mismatches review page and the matcher CLI. 60s is wide
17+
enough to ignore Spotify's usual 1-2s duration drift, narrow enough to
18+
catch wrong-track matches (e.g. a 4-min recording linked to a 2-min
19+
track).
20+
21+
Per-song was chosen over per-link for the same reasons as the backfill
22+
sweep: matches the existing match_song job shape, dedups cleanly, and
23+
the matcher itself is per-song-shaped under the hood.
24+
"""
25+
26+
from __future__ import annotations
27+
28+
import logging
29+
from typing import Optional
30+
31+
from core import research_jobs
32+
from integrations.spotify.db import get_songs_with_duration_mismatches
33+
34+
35+
logger = logging.getLogger(__name__)
36+
37+
38+
DEFAULT_THRESHOLD_MS = 60_000
39+
40+
41+
def find_candidate_song_ids(
42+
threshold_ms: int = DEFAULT_THRESHOLD_MS,
43+
limit: Optional[int] = None,
44+
) -> list[str]:
45+
"""Return song UUIDs that own one or more Spotify streaming links
46+
whose duration_ms differs from the linked recording's duration_ms by
47+
more than `threshold_ms`. Pass `limit` to cap the result set."""
48+
songs = get_songs_with_duration_mismatches(threshold_ms=threshold_ms)
49+
song_ids = [str(row['id']) for row in songs]
50+
if limit is not None:
51+
song_ids = song_ids[:limit]
52+
return song_ids
53+
54+
55+
def enqueue_sweep(
56+
threshold_ms: int = DEFAULT_THRESHOLD_MS,
57+
limit: Optional[int] = None,
58+
priority: int = 110,
59+
) -> dict[str, int]:
60+
"""Find candidate songs and enqueue one rematch job per song.
61+
62+
The default priority (110) sits behind user-initiated jobs (50) and
63+
plain-vanilla research (100) so a bulk cleanup pass doesn't starve
64+
normal traffic on the worker thread.
65+
66+
The threshold is passed through to the handler via payload so each
67+
job re-checks against the same value the sweep used to enqueue it.
68+
Otherwise a bulk run at threshold=60_000 could be silently widened by
69+
the handler reading a different default later.
70+
71+
Returns:
72+
{'candidates': N, 'enqueued': M, 'errors': E, 'threshold_ms': T}
73+
where `enqueued` counts successful enqueue() returns. The unique
74+
index in research_jobs collapses a re-sweep down to the existing
75+
(queued|running) job for the same song, so calling this twice in
76+
a row at the same threshold is safe and idempotent.
77+
"""
78+
song_ids = find_candidate_song_ids(threshold_ms=threshold_ms, limit=limit)
79+
base_stats = {
80+
'candidates': len(song_ids),
81+
'enqueued': 0,
82+
'errors': 0,
83+
'threshold_ms': threshold_ms,
84+
}
85+
if not song_ids:
86+
return base_stats
87+
88+
enqueued = 0
89+
errors = 0
90+
for song_id in song_ids:
91+
try:
92+
job_id = research_jobs.enqueue(
93+
source=research_jobs.SOURCE_SPOTIFY,
94+
job_type='rematch_duration_mismatches',
95+
target_type=research_jobs.TARGET_SONG,
96+
target_id=song_id,
97+
payload={'threshold_ms': threshold_ms},
98+
priority=priority,
99+
)
100+
except Exception:
101+
logger.exception(
102+
"spotify_rematch_mismatches: failed to enqueue song %s",
103+
song_id,
104+
)
105+
errors += 1
106+
continue
107+
108+
if job_id is None:
109+
errors += 1
110+
continue
111+
112+
enqueued += 1
113+
114+
logger.info(
115+
"spotify_rematch_mismatches: threshold_ms=%d candidates=%d "
116+
"enqueued=%d errors=%d",
117+
threshold_ms, len(song_ids), enqueued, errors,
118+
)
119+
base_stats['enqueued'] = enqueued
120+
base_stats['errors'] = errors
121+
return base_stats

backend/research_worker/handlers/spotify.py

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""
22
Spotify handlers on the durable queue.
33
4-
Two job types are registered here:
4+
Three job types are registered here:
55
66
1. ('spotify', 'match_song'), target_type='song'
77
Wraps integrations/spotify/matcher.SpotifyMatcher.match_releases — a
@@ -16,6 +16,16 @@
1616
backfill historic rows that were inserted before the matcher started
1717
capturing duration. Issue #100.
1818
19+
3. ('spotify', 'rematch_duration_mismatches'), target_type='song'
20+
For one song, re-runs the SpotifyMatcher against only the releases
21+
whose linked Spotify track's duration_ms differs from the recording's
22+
canonical duration_ms by more than `threshold_ms` (default 60_000 — 60s).
23+
The matcher decides whether to swap the link to a better track or
24+
leave the existing match alone. No auto-unlinking — leftover bad
25+
matches stay visible in the /admin/duration-mismatches review page.
26+
Separate job_type from match_song so a bulk-cleanup sweep doesn't
27+
collide on the unique index with a user-triggered rematch. Issue #100.
28+
1929
Quota accounting: skipped. Spotify uses HTTP 429 rate limits, not a daily
2030
budget like YouTube. The SpotifyClient already retries 429s internally
2131
with exponential backoff. If those retries exhaust, we surface the
@@ -206,3 +216,73 @@ def backfill_durations(payload: dict[str, Any], ctx) -> dict[str, Any]:
206216
)
207217

208218
return stats
219+
220+
221+
# ---------------------------------------------------------------------------
222+
# rematch_duration_mismatches — issue #100, second phase
223+
# ---------------------------------------------------------------------------
224+
225+
# Default threshold matches the /admin/duration-mismatches review page and
226+
# the matcher's existing CLI default. 60s is wide enough to ignore Spotify's
227+
# usual 1-2s duration drift, narrow enough to catch wrong-track matches
228+
# (e.g. a 4-min recording linked to a 2-min track).
229+
_DEFAULT_DURATION_MISMATCH_THRESHOLD_MS = 60_000
230+
231+
232+
@handler('spotify', 'rematch_duration_mismatches')
233+
def rematch_duration_mismatches(payload: dict[str, Any], ctx) -> dict[str, Any]:
234+
"""Re-run the Spotify matcher on this song's mismatched releases.
235+
236+
Constructs `SpotifyMatcher(duration_mismatch_threshold=...)` and
237+
delegates to `match_releases(song_id)` — same code path the
238+
`match_spotify_tracks.py --duration-mismatches` CLI uses. The matcher
239+
will only walk releases whose existing Spotify link's duration_ms
240+
differs from the recording's canonical duration_ms by more than the
241+
threshold, swapping in a better track if one is found.
242+
243+
payload may include:
244+
threshold_ms: int — override the 60_000 ms default. Useful for
245+
running tighter sweeps (e.g. 30s) once the obvious cases are
246+
cleaned up.
247+
"""
248+
song_id = ctx.target_id
249+
threshold_ms = int(
250+
payload.get('threshold_ms', _DEFAULT_DURATION_MISMATCH_THRESHOLD_MS)
251+
)
252+
253+
matcher = SpotifyMatcher(
254+
duration_mismatch_threshold=threshold_ms,
255+
logger=ctx.log,
256+
)
257+
result = matcher.match_releases(song_id)
258+
259+
if result.get('success'):
260+
stats = result.get('stats') or {}
261+
return {
262+
'threshold_ms': threshold_ms,
263+
'releases_processed': stats.get('releases_processed', 0),
264+
'releases_updated': stats.get('releases_updated', 0),
265+
'releases_no_match': stats.get('releases_no_match', 0),
266+
'tracks_matched': stats.get('tracks_matched', 0),
267+
'tracks_had_previous': stats.get('tracks_had_previous', 0),
268+
'cache_hits': stats.get('cache_hits', 0),
269+
'api_calls': stats.get('api_calls', 0),
270+
'rate_limit_hits': stats.get('rate_limit_hits', 0),
271+
}
272+
273+
error = result.get('error') or 'unknown error'
274+
error_lower = error.lower()
275+
276+
if any(marker in error_lower for marker in _PERMANENT_ERROR_MARKERS):
277+
raise PermanentError(f"Spotify rematch: {error}")
278+
279+
if any(marker in error_lower for marker in _NO_OP_ERROR_MARKERS):
280+
# Song existed but had no mismatched releases above threshold by the
281+
# time the worker got to it — fine, treat as a clean no-op.
282+
return {
283+
'threshold_ms': threshold_ms,
284+
'reason': 'no_mismatched_releases',
285+
'releases_processed': 0,
286+
}
287+
288+
raise RetryableError(f"Spotify rematch failed: {error}")
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Rematch Spotify Duration Mismatches — issue #100, second phase.
4+
5+
Enqueues one ('spotify', 'rematch_duration_mismatches') job per song that
6+
owns at least one Spotify streaming link whose duration differs from the
7+
linked recording's canonical duration by more than a threshold (default
8+
60s). The handler — see research_worker/handlers/spotify.py — re-runs
9+
the SpotifyMatcher narrowly on the mismatched releases.
10+
11+
The matcher swaps the link to a better track if one is found; otherwise
12+
the existing match stays in place for human review via
13+
/admin/duration-mismatches. There is no auto-unlinking.
14+
15+
For ad-hoc one-shot work the in-process
16+
`match_spotify_tracks.py --duration-mismatches <seconds>` flag still
17+
exists; this CLI is the production path that hands the work to the
18+
durable worker queue.
19+
20+
Usage:
21+
python rematch_spotify_duration_mismatches.py
22+
python rematch_spotify_duration_mismatches.py --threshold-seconds 30
23+
python rematch_spotify_duration_mismatches.py --limit 100
24+
python rematch_spotify_duration_mismatches.py --dry-run
25+
"""
26+
27+
from script_base import ScriptBase, run_script
28+
from core.spotify_rematch_mismatches import (
29+
DEFAULT_THRESHOLD_MS,
30+
enqueue_sweep,
31+
find_candidate_song_ids,
32+
)
33+
34+
35+
def main():
36+
script = ScriptBase(
37+
name="rematch_spotify_duration_mismatches",
38+
description=(
39+
"Enqueue per-song Spotify rematch jobs for releases whose "
40+
"linked Spotify track duration differs from the recording's "
41+
"canonical duration by more than a threshold."
42+
),
43+
epilog="""
44+
Examples:
45+
python rematch_spotify_duration_mismatches.py
46+
python rematch_spotify_duration_mismatches.py --threshold-seconds 30
47+
python rematch_spotify_duration_mismatches.py --limit 100
48+
python rematch_spotify_duration_mismatches.py --dry-run
49+
"""
50+
)
51+
52+
script.add_dry_run_arg()
53+
script.add_debug_arg()
54+
script.add_limit_arg(default=None)
55+
56+
# Threshold knob — seconds-keyed to match the existing match_spotify_tracks
57+
# CLI's --duration-mismatches flag and the admin review page UX.
58+
script.parser.add_argument(
59+
'--threshold-seconds',
60+
type=int,
61+
default=DEFAULT_THRESHOLD_MS // 1000,
62+
help=(
63+
f'Mismatch threshold in seconds (default: '
64+
f'{DEFAULT_THRESHOLD_MS // 1000}). Songs whose Spotify track '
65+
f'duration differs from the recording\'s duration by more '
66+
f'than this are eligible.'
67+
),
68+
)
69+
70+
args = script.parse_args()
71+
72+
threshold_ms = int(args.threshold_seconds) * 1000
73+
74+
script.print_header({
75+
"DRY RUN": args.dry_run,
76+
"THRESHOLD": f"{args.threshold_seconds}s",
77+
"LIMIT": args.limit if args.limit is not None else 'all candidates',
78+
})
79+
80+
if args.dry_run:
81+
song_ids = find_candidate_song_ids(
82+
threshold_ms=threshold_ms, limit=args.limit,
83+
)
84+
script.logger.info(
85+
"Would enqueue %d song(s) for Spotify duration-mismatch rematch "
86+
"(threshold %ds)",
87+
len(song_ids), args.threshold_seconds,
88+
)
89+
for sid in song_ids[:25]:
90+
script.logger.debug(" candidate: %s", sid)
91+
if len(song_ids) > 25:
92+
script.logger.debug(" ... %d more", len(song_ids) - 25)
93+
script.print_summary({
94+
'candidates': len(song_ids),
95+
'enqueued': 0,
96+
'errors': 0,
97+
'threshold_ms': threshold_ms,
98+
})
99+
return True
100+
101+
stats = enqueue_sweep(threshold_ms=threshold_ms, limit=args.limit)
102+
script.print_summary(stats)
103+
return stats['errors'] == 0
104+
105+
106+
if __name__ == "__main__":
107+
run_script(main)

0 commit comments

Comments
 (0)