Fix parallel-coordinator hang when summary pipe saturates#248
Fix parallel-coordinator hang when summary pipe saturates#248
Conversation
The coordinator drained the 'summary' queue only after joining all worker processes. With enough queued data (or a single large testsFailed dict), the summary-pipe buffer (~64 KiB on Linux) saturates and worker feeder threads block in pipe_write, both inside on_timeout's join_thread() and during Python's end-of-process queue finalization. This in turn hangs the coordinator's p.join() indefinitely. Introduce a module-level helper _join_workers_with_summary_drain that joins workers while continuously draining 'summary' from a background thread, and use it in execute(). Also correct the stale comment in the on_timeout closure to describe the actual watcher-thread os._exit(1) flow.
SimpleQueue.put is synchronous (no feeder thread, no internal buffer), so a successful put() implies the bytes are already in the kernel pipe. That removes the need for the summary.close() + summary.join_thread() dance in on_timeout before the watcher's os._exit(1), and the comment that explained it. The coordinator-side drain thread is updated to a blocking get() driven by a sentinel on shutdown, eliminating its busy-loop timeout too. results stays a Queue because the progressbar liveness loop relies on get(timeout=...), which SimpleQueue does not expose publicly.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
In the on_timeout closure, put 'results' before 'summary'. The summary queue is a SimpleQueue with a synchronous put(), and the coordinator only starts draining it after every result is in. Putting summary first risked blocking on a full summary pipe while the coordinator was still waiting on this worker's result, which would have stalled the whole results-collection loop. Putting results first guarantees the worker's output reaches the coordinator unconditionally; the subsequent summary put may briefly block but always unblocks once the coordinator moves to the drain phase. Also drop the stale 'feeder threads' wording near the call site: the summary queue no longer has a feeder thread.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
The parallel coordinator used two queues: 'results' for per-test output
(read by the progressbar loop) and 'summary' for per-worker aggregates
('done' count and the worker's full testsFailed dict, read after the
loop). Workers' summary.put could block on a full pipe because the
coordinator only drained summary in a second phase, after every results
message had been received.
Collapse to a single 'results' queue carrying one self-contained message
per test: { test_name, output, done, failures }. The worker resets
self.testsFailed = {} before each test so addFailure() writes into a
fresh dict that ships verbatim; the worker keeps no cumulative state.
The coordinator owns the canonical testsFailed via update() per message.
This eliminates the deadlock by construction: the only queue is drained
continuously by the coordinator's progressbar loop for the entire
lifetime of the workers, so worker put()s can never block on a full
pipe. Removes the SimpleQueue import, the _SUMMARY_DRAIN_STOP sentinel,
and the _join_workers_with_summary_drain helper. on_timeout shrinks to
a single put + close + join_thread.
The unit test is updated to exercise the new pattern: workers push many
large per-test messages while the main thread drains them live.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- on_timeout now ships both the per-test result (with shutdown=False) and the shutdown sentinel before the watcher thread calls os._exit(1), keeping the coordinator's bounded count of n_jobs + parallelism accurate when a worker dies on timeout. Also fixes a KeyError on the missing 'shutdown' key in the timeout payload. - Grammar: 'no more processors is alive' -> 'are alive'. - test_parallel_drain: use time.monotonic() for elapsed measurement.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if res['output']: | ||
| print('%s' % res['output'], end="") | ||
| done += res['done'] | ||
| self.testsFailed.update(res['failures']) |
There was a problem hiding this comment.
self.testsFailed.update(res['failures']) can overwrite existing failure lists when the incoming dict contains a key that already exists (e.g., the shutdown message can carry failures keyed by self.currEnv.testName, which may match a test that already failed earlier). This risks losing earlier failure details in the final summary. Consider merging per-key lists (extend) instead of dict.update(), or ensure shutdown-phase failures are reported under a dedicated key that cannot collide with real test names.
| self.testsFailed.update(res['failures']) | |
| for test_name, failures in res['failures'].items(): | |
| if test_name not in self.testsFailed: | |
| self.testsFailed[test_name] = list(failures) | |
| else: | |
| self.testsFailed[test_name].extend(failures) |
There was a problem hiding this comment.
Only relevant for reused envs, will be handled in the future if needed
Problem
When running
RLTestwith--parallelism > 1, the coordinator could hang indefinitely onp.join()after worker processes produced enough data to saturate the inter-process pipe buffer (~64 KiB on Linux).The parallel coordinator used two queues:
resultsfor per-test output (drained live by the progressbar loop) andsummaryfor per-worker aggregates (donecounts and the worker's fulltestsFaileddict, drained only after the loop). With enough queued data — or a single largetestsFaileddict — workers blocked inpipe_writeon thesummarypipe (either insideon_timeout'ssummary.join_thread()or during Python's end-of-process queue finalization) while the coordinator sat inp.join()waiting for them to exit. Classic pipe-buffer deadlock.Observed in practice on a RediSearch run that hung for 3h53m with 16 workers all stuck in
pipe_writeand the coordinator parked indo_wait.Fix
The PR is structured as five bisect-friendly commits, building from a minimal patch to a structural refactor. The final design (commits 4 and 5) is the recommended end state.
4a8a452— Concurrent summary drain (minimal patch)A
_join_workers_with_summary_drainhelper spawns a background thread to drainsummarywhile the main thread joins workers. Breaks the deadlock by ensuring writers can always make progress. Also corrects a stale comment inon_timeout.c8df5bc—summary→SimpleQueueSimpleQueue.put()is synchronous (no feeder thread, no internal buffer), soon_timeoutno longer needsclose()+join_thread()to flush before the watcher'sos._exit(1). The drainer thread also simplifies to a blockingget()driven by a sentinel.resultsstays aQueuebecause the progressbar loop relies onget(timeout=…).c0cb696— Address review feedbackReorder puts in
on_timeoutso the coordinator-visible result is delivered before the (potentially blocking) summary put.dfc9114— Collapseresults+summaryinto a single queueThe two queues existed for historical/semantic reasons, not technical ones. Collapse to a single
resultsqueue carrying one self-contained message per test:{ test_name, output, done, failures }. The worker resetsself.testsFailed = {}before each test soaddFailure()writes into a fresh dict that ships verbatim; the worker keeps no cumulative state. The coordinator owns the canonicaltestsFailedviaupdate()per message.This eliminates the deadlock by construction: the only queue is drained continuously by the coordinator's progressbar loop for the entire lifetime of the workers, so worker
put()s can never block on a full pipe. RemovesSimpleQueue, the_SUMMARY_DRAIN_STOPsentinel, the_join_workers_with_summary_drainhelper, and the threading import.on_timeoutshrinks to a single put + flush.3a50d5a— Add per-worker shutdown message + bounded-count drainFailures raised during the worker's final
takeEnvDown(fullShutDown=True)(e.g. "redis did not exit cleanly" whenenv_reuse=True) need to reach the coordinator too. Each worker now ships exactly one extra'<worker shutdown>'message aftertakeEnvDown, tagged'shutdown': True.The coordinator reads a known total of
n_jobs + parallelismmessages in a single bounded loop, ticking the progressbar only on per-test messages. Order-independent: the single shared queue does not preserve per-worker ordering, so a fast worker's shutdown may arrive before a slow worker's last test — the conditional bar tick handles both cases. Thehas_live_processorliveness guard turns a worker crash before it ships its shutdown message into a clean error instead of an indefinite hang.Final architecture
testsFailed); the coordinator is the sole owner of aggregated state.SimpleQueue, no helper functions, no time-based polling outsideresults.get(timeout=1).Testing
tests/unit/test_parallel_drain.pyreproduces the saturation scenario: 8 worker processes each push many ~32 KiB messages (well over the 64 KiB pipe buffer) and we assert the coordinator drains them and joins all workers within a bounded time.RLTEST=$(realpath ../RLTest) make pytest): 1744 passed, 0 failed.Pull Request opened by Augment Code with guidance from the PR author