From fd234c9e557bd2165d62b7e0b25303aeeb443399 Mon Sep 17 00:00:00 2001 From: "Gregory P. Smith" Date: Fri, 3 Jul 2026 18:20:37 +0000 Subject: [PATCH] gh-119592: Fix ProcessPoolExecutor stranding submitted work when a max_tasks_per_child worker exits Worker replacement went through the executor object: the manager thread read executor attributes that shutdown(wait=False) clears concurrently, and could not replace workers at all once the executor was garbage collected. A worker exiting at its max_tasks_per_child limit in those states left the remaining submitted work permanently unexecuted and hung interpreter exit; the racing case could crash the manager thread. Replace workers from the executor manager thread using its own state plus configuration read through the live executor weakref, which shutdown() never clears: - After shutdown(wait=False) with the executor still referenced, a replacement is spawned and the remaining work is executed as documented. - Once the executor has been garbage collected (gh-152967), or a replacement worker cannot be started and no workers remain, the remaining futures now fail with BrokenProcessPool instead of never resolving. - A new _force_shutting_down flag stops both spawn paths from starting workers that would escape terminate_workers()/kill_workers(). Co-authored-by: Claude Fable 5 --- Lib/concurrent/futures/process.py | 167 +++++++++++++----- .../test_process_pool.py | 130 +++++++++++++- ...-07-03-18-30-00.gh-issue-119592.mQr3Vx.rst | 11 ++ 3 files changed, 262 insertions(+), 46 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2026-07-03-18-30-00.gh-issue-119592.mQr3Vx.rst diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 8f200fc1c82613f..e3b6c4a5305615a 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -269,6 +269,20 @@ def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=N return +def _spawn_worker(mp_context, call_queue, result_queue, initializer, + initargs, max_tasks_per_child, processes): + """Start one worker process and record it in *processes* by pid.""" + p = mp_context.Process( + target=_process_worker, + args=(call_queue, + result_queue, + initializer, + initargs, + max_tasks_per_child)) + p.start() + processes[p.pid] = p + + class _ExecutorManagerThread(threading.Thread): """Manages the communication between this process and the worker processes. @@ -321,6 +335,15 @@ def weakref_cb(_, # exiting safely self.max_tasks_per_child = executor._max_tasks_per_child + # gh-119592: Needed to size worker replacement, and immutable, so + # keep a copy rather than reading it back through the executor + # weakref. The rest of the spawn configuration is deliberately NOT + # copied here: holding user-provided objects (initializer, + # initargs, mp_context) in this always-reachable running thread + # could keep the executor itself reachable through them, breaking + # garbage-collection-triggered shutdown. + self.max_workers = executor._max_workers + # A dict mapping work ids to _WorkItems e.g. # {5: <_WorkItem...>, 6: <_WorkItem...>, ...} self.pending_work_items = executor._pending_work_items @@ -357,12 +380,14 @@ def run(self): # while waiting on new results. del result_item - if executor := self.executor_reference(): - if process_exited: - with self.shutdown_lock: - executor._replace_dead_worker() - else: - executor._idle_worker_semaphore.release() + if process_exited: + with self.shutdown_lock: + broken = self._replace_dead_worker() + if broken is not None: + self.terminate_broken(*broken) + return + elif executor := self.executor_reference(): + executor._idle_worker_semaphore.release() del executor if self.is_shutting_down(): @@ -379,6 +404,71 @@ def run(self): self.join_executor_internals() return + def _replace_dead_worker(self): + """Spawn a replacement for a worker that exited at its + max_tasks_per_child limit. Called under self.shutdown_lock. + + Returns None while the pool can still make progress, otherwise a + (cause, message) tuple describing why the remaining work items can + never run, so that run() can fail their futures. + """ + assert self.shutdown_lock.locked() + cause = None + message = None + executor = self.executor_reference() + if executor is None: + # gh-152967: The executor was garbage collected; nothing can + # spawn a replacement worker for it anymore. + message = ("The ProcessPoolExecutor was garbage collected with " + "work pending after its last worker process exited " + "upon reaching max_tasks_per_child; the pending work " + "can never be run.") + elif executor._force_shutting_down: + # terminate_workers()/kill_workers() is tearing the pool down; + # a replacement worker would escape the kill and run work + # items that were enqueued before it. + message = ("A worker process exited while the pool was being " + "forcefully shut down; work that was still enqueued " + "will not be run.") + elif self.pending_work_items or not self.is_shutting_down(): + # gh-115634: Do not consult the executor's + # _idle_worker_semaphore here: it counts task completions, not + # idle workers, so it can hold a stale token released by the + # now-dead worker. Trusting such a token would leave the pool + # a worker short, deadlocking once all workers reach their + # task limit. Spawning from this (manager) thread is safe + # despite gh-90622 because max_tasks_per_child is rejected for + # the "fork" start method. + if len(self.processes) < self.max_workers: + # gh-119592: Spawn using state owned by this thread and + # configuration read through the live weakref (which + # shutdown() never clears), not the executor state that + # shutdown(wait=False) clears concurrently. + try: + _spawn_worker(executor._mp_context, self.call_queue, + self.result_queue, executor._initializer, + executor._initargs, + self.max_tasks_per_child, self.processes) + except Exception as exc: + # While other workers remain the pool has merely lost + # capacity and they keep draining the queue; with none + # left the failure is reported below. + cause = format_exception(exc) + message = ("A replacement worker process could not be " + "started, leaving the pool without workers " + "to run the remaining work.") + del executor + + if not self.processes and (self.pending_work_items + or cause is not None): + # No worker processes remain and no replacement can be + # spawned: any remaining work items can never run. A spawn + # failure breaks the pool even with nothing pending; leaving + # a zero-worker pool alive would hang a later submit() on a + # stale _idle_worker_semaphore token instead of raising. + return (cause, message) + return None + def add_call_item_to_queue(self): # Fills call_queue with _WorkItems from pending_work_items. # This function never blocks. @@ -455,10 +545,11 @@ def is_shutting_down(self): return (_global_shutdown or executor is None or executor._shutdown_thread) - def _terminate_broken(self, cause): + def _terminate_broken(self, cause, bpe_message=None): # Terminate the executor because it is in a broken state. The cause # argument can be used to display more information on the error that - # lead the executor into becoming broken. + # lead the executor into becoming broken. bpe_message overrides the + # default message on the BrokenProcessPool set on pending futures. # Mark the process pool broken so that submits fail right now. executor = self.executor_reference() @@ -489,11 +580,12 @@ def _terminate_broken(self, cause): cause_str = "\n".join(errors) cause_tb = f"\n'''\n{cause_str}'''" if cause_str else None + if bpe_message is None: + bpe_message = ("A process in the process pool was terminated " + "abruptly while the future was running or pending.") # Mark pending tasks as failed. for work_id, work_item in self.pending_work_items.items(): - bpe = BrokenProcessPool("A process in the process pool was " - "terminated abruptly while the future was " - "running or pending.") + bpe = BrokenProcessPool(bpe_message) if cause_tb is not None: bpe.__cause__ = _RemoteTraceback(cause_tb) try: @@ -518,9 +610,9 @@ def _terminate_broken(self, cause): # clean up resources self._join_executor_internals(broken=True) - def terminate_broken(self, cause): + def terminate_broken(self, cause, bpe_message=None): with self.shutdown_lock: - self._terminate_broken(cause) + self._terminate_broken(cause, bpe_message) def flag_executor_shutting_down(self): # Flag the executor as shutting down and cancel remaining tasks if @@ -733,6 +825,7 @@ def __init__(self, max_workers=None, mp_context=None, self._queue_count = 0 self._pending_work_items = {} self._cancel_pending_futures = False + self._force_shutting_down = False # _ThreadWakeup is a communication channel used to interrupt the wait # of the main loop of executor_manager_thread from another thread (e.g. @@ -772,34 +865,15 @@ def _start_executor_manager_thread(self): _threads_wakeups[self._executor_manager_thread] = \ self._executor_manager_thread_wakeup - def _replace_dead_worker(self): + def _adjust_process_count(self): # gh-132969: avoid error when state is reset and executor is still running, # which will happen when shutdown(wait=False) is called. if self._processes is None: return - # A replacement is pointless when shutting down with nothing left - # to run. Both attributes are read under _shutdown_lock, which - # shutdown() holds while setting _shutdown_thread. - assert self._shutdown_lock.locked() - if self._shutdown_thread and not self._pending_work_items: - return - - # gh-115634: A worker exited after reaching max_tasks_per_child and - # has been removed from self._processes. Do not consult - # _idle_worker_semaphore here: it counts task completions, not idle - # workers, so it can hold a stale token released by the now-dead - # worker. Trusting such a token would leave the pool a worker short, - # deadlocking once all workers reach their task limit. Spawning is - # safe from this (manager) thread despite gh-90622 because - # max_tasks_per_child is rejected for the "fork" start method. - if len(self._processes) < self._max_workers: - self._spawn_process() - - def _adjust_process_count(self): - # gh-132969: avoid error when state is reset and executor is still running, - # which will happen when shutdown(wait=False) is called. - if self._processes is None: + # gh-152967: A forceful shutdown is in progress; a worker spawned + # here could escape its process snapshot and keep running work. + if self._force_shutting_down: return # if there's an idle process, we don't need to spawn a new one. @@ -825,15 +899,10 @@ def _launch_processes(self): self._spawn_process() def _spawn_process(self): - p = self._mp_context.Process( - target=_process_worker, - args=(self._call_queue, - self._result_queue, - self._initializer, - self._initargs, - self._max_tasks_per_child)) - p.start() - self._processes[p.pid] = p + _spawn_worker(self._mp_context, self._call_queue, + self._result_queue, self._initializer, + self._initargs, self._max_tasks_per_child, + self._processes) def submit(self, fn, /, *args, **kwargs): with self._shutdown_lock: @@ -930,6 +999,14 @@ def _force_shutdown(self, operation): if operation not in _SHUTDOWN_CALLBACK_OPERATION: raise ValueError(f"Unsupported operation: {operation!r}") + # gh-152967: Stop the manager thread from spawning replacement + # workers before we copy the processes to signal: a worker spawned + # after the copy would survive the loop below and run enqueued + # work items. Taking the lock orders this against the manager's + # worker replacement, which runs under the same lock. + with self._shutdown_lock: + self._force_shutting_down = True + processes = {} if self._processes: processes = self._processes.copy() diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index 205662c91c2558d..dafbda862c51c24 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -6,11 +6,12 @@ import traceback import unittest import unittest.mock +import weakref from concurrent import futures from concurrent.futures.process import BrokenProcessPool from test import support -from test.support import hashlib_helper, warnings_helper +from test.support import hashlib_helper, threading_helper, warnings_helper from test.test_importlib.metadata.fixtures import parameterize from .executor import ExecutorTest, mul @@ -42,6 +43,13 @@ def _put_wait_put(queue, event): queue.put('finished') +def _report_wait_return(queue, event, value): + """ Used as part of _run_stranded_worker_exit_test """ + queue.put(value) + event.wait() + return value + + class ProcessPoolExecutorTest(ExecutorTest): @unittest.skipUnless(sys.platform=='win32', 'Windows-only process limit') @@ -268,6 +276,126 @@ def test_max_tasks_per_child_pending_tasks_gh115634(self): finally: executor.shutdown(wait=True, cancel_futures=True) + def _run_stranded_worker_exit_test(self, *, shutdown, drop_reference): + # A worker exits upon reaching its max_tasks_per_child limit while + # more submitted work is queued. While the executor object is + # alive a replacement worker must be spawned and the remaining + # work executed; once it has been garbage collected no replacement + # is possible and the remaining futures must fail promptly instead + # of never resolving. + context = self.get_context() + if context.get_start_method(allow_none=False) == "fork": + raise unittest.SkipTest("Incompatible with the fork start method.") + manager = context.Manager() + self.addCleanup(manager.join) + self.addCleanup(manager.shutdown) + started = manager.Queue() + gate = manager.Event() + + executor = self.executor_type( + 1, mp_context=context, max_tasks_per_child=1) + futs = [executor.submit(_report_wait_return, started, gate, i) + for i in range(3)] + self.addCleanup(threading_helper.join_thread, + executor._executor_manager_thread) + # Wait until the worker is inside the first task so that it exits + # at its task limit only after the executor has been shut down + # and/or garbage collected below. + self.assertEqual(started.get(timeout=support.SHORT_TIMEOUT), 0) + if shutdown: + executor.shutdown(wait=False) + if drop_reference: + executor_ref = weakref.ref(executor) + executor = None + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + support.gc_collect() + if executor_ref() is None: + break + gate.set() + + self.assertEqual(futs[0].result(timeout=support.SHORT_TIMEOUT), 0) + if drop_reference: + for fut in futs[1:]: + with self.assertRaisesRegex(BrokenProcessPool, + "garbage collected"): + fut.result(timeout=support.SHORT_TIMEOUT) + else: + results = [f.result(timeout=support.SHORT_TIMEOUT) + for f in futs[1:]] + self.assertEqual(results, [1, 2]) + + def test_shutdown_no_wait_max_tasks_gh119592(self): + # gh-119592: shutdown(wait=False) used to clear executor state that + # worker replacement relied on. A worker exiting at its + # max_tasks_per_child limit afterwards could not be replaced, so + # the remaining submitted work never ran, and a racing worker exit + # could crash the manager thread on the partially cleared state. + for drop_reference in (False, True): + with self.subTest(drop_reference=drop_reference): + self._run_stranded_worker_exit_test( + shutdown=True, drop_reference=drop_reference) + + def test_gc_during_max_tasks_worker_exit_gh152967(self): + # gh-152967: If the executor was garbage collected without + # shutdown() while its last worker exited at its + # max_tasks_per_child limit, no replacement worker could be + # spawned and the remaining futures were never resolved. + self._run_stranded_worker_exit_test( + shutdown=False, drop_reference=True) + + def _run_unreplaceable_worker_exit_test(self, *, error_regex, + force_shutting_down=False, + failing_spawn=False): + # Drive a max_tasks_per_child worker exit while worker + # replacement is impossible; the queued futures must fail + # promptly with a BrokenProcessPool explaining why. + context = self.get_context() + if context.get_start_method(allow_none=False) == "fork": + raise unittest.SkipTest("Incompatible with the fork start method.") + manager = context.Manager() + self.addCleanup(manager.join) + self.addCleanup(manager.shutdown) + started = manager.Queue() + gate = manager.Event() + + executor = self.executor_type( + 1, mp_context=context, max_tasks_per_child=1) + futs = [executor.submit(_report_wait_return, started, gate, i) + for i in range(3)] + self.addCleanup(threading_helper.join_thread, + executor._executor_manager_thread) + self.assertEqual(started.get(timeout=support.SHORT_TIMEOUT), 0) + if force_shutting_down: + with executor._shutdown_lock: + executor._force_shutting_down = True + if failing_spawn: + spawn_patch = unittest.mock.patch( + "concurrent.futures.process._spawn_worker", + side_effect=OSError("spawn failed")) + spawn_patch.start() + self.addCleanup(spawn_patch.stop) + gate.set() + + self.assertEqual(futs[0].result(timeout=support.SHORT_TIMEOUT), 0) + for fut in futs[1:]: + with self.assertRaisesRegex(BrokenProcessPool, error_regex): + fut.result(timeout=support.SHORT_TIMEOUT) + + def test_force_shutdown_during_max_tasks_worker_exit(self): + # A worker exiting at its max_tasks_per_child limit during + # terminate_workers()/kill_workers() must not be replaced (the + # replacement would escape the kill); queued futures fail instead. + self._run_unreplaceable_worker_exit_test( + force_shutting_down=True, + error_regex="forcefully shut down") + + def test_failed_worker_replacement_breaks_pool(self): + # If no replacement worker can be started and no workers remain, + # the pool must break rather than strand the queued futures. + self._run_unreplaceable_worker_exit_test( + failing_spawn=True, + error_regex="could not be started") + def test_max_tasks_early_shutdown(self): context = self.get_context() if context.get_start_method(allow_none=False) == "fork": diff --git a/Misc/NEWS.d/next/Library/2026-07-03-18-30-00.gh-issue-119592.mQr3Vx.rst b/Misc/NEWS.d/next/Library/2026-07-03-18-30-00.gh-issue-119592.mQr3Vx.rst new file mode 100644 index 000000000000000..f718c877c8ed22b --- /dev/null +++ b/Misc/NEWS.d/next/Library/2026-07-03-18-30-00.gh-issue-119592.mQr3Vx.rst @@ -0,0 +1,11 @@ +Fix :class:`concurrent.futures.ProcessPoolExecutor` stranding submitted +work forever when a worker process exited upon reaching its +*max_tasks_per_child* limit after +:meth:`~concurrent.futures.Executor.shutdown` was called with +``wait=False``: a replacement worker is now spawned and the remaining work +executed as documented. If the executor has instead been garbage collected +without ``shutdown()`` (:gh:`152967`), or a replacement worker cannot be +started, the remaining futures now fail with +:exc:`~concurrent.futures.process.BrokenProcessPool` instead of never +resolving. A worker exit racing ``shutdown(wait=False)`` can also no +longer crash the executor management thread.