Skip to content

Commit fd234c9

Browse files
gpsheadclaude
andcommitted
gh-119592: Fix ProcessPoolExecutor stranding submitted work when a max_tasks_per_child worker exits
Worker replacement went through the executor object: the manager thread read executor attributes that shutdown(wait=False) clears concurrently, and could not replace workers at all once the executor was garbage collected. A worker exiting at its max_tasks_per_child limit in those states left the remaining submitted work permanently unexecuted and hung interpreter exit; the racing case could crash the manager thread. Replace workers from the executor manager thread using its own state plus configuration read through the live executor weakref, which shutdown() never clears: - After shutdown(wait=False) with the executor still referenced, a replacement is spawned and the remaining work is executed as documented. - Once the executor has been garbage collected (gh-152967), or a replacement worker cannot be started and no workers remain, the remaining futures now fail with BrokenProcessPool instead of never resolving. - A new _force_shutting_down flag stops both spawn paths from starting workers that would escape terminate_workers()/kill_workers(). Co-authored-by: Claude Fable 5 <noreply@anthropic.com>
1 parent 1b4135a commit fd234c9

3 files changed

Lines changed: 262 additions & 46 deletions

File tree

Lib/concurrent/futures/process.py

Lines changed: 122 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,20 @@ def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=N
269269
return
270270

271271

272+
def _spawn_worker(mp_context, call_queue, result_queue, initializer,
273+
initargs, max_tasks_per_child, processes):
274+
"""Start one worker process and record it in *processes* by pid."""
275+
p = mp_context.Process(
276+
target=_process_worker,
277+
args=(call_queue,
278+
result_queue,
279+
initializer,
280+
initargs,
281+
max_tasks_per_child))
282+
p.start()
283+
processes[p.pid] = p
284+
285+
272286
class _ExecutorManagerThread(threading.Thread):
273287
"""Manages the communication between this process and the worker processes.
274288
@@ -321,6 +335,15 @@ def weakref_cb(_,
321335
# exiting safely
322336
self.max_tasks_per_child = executor._max_tasks_per_child
323337

338+
# gh-119592: Needed to size worker replacement, and immutable, so
339+
# keep a copy rather than reading it back through the executor
340+
# weakref. The rest of the spawn configuration is deliberately NOT
341+
# copied here: holding user-provided objects (initializer,
342+
# initargs, mp_context) in this always-reachable running thread
343+
# could keep the executor itself reachable through them, breaking
344+
# garbage-collection-triggered shutdown.
345+
self.max_workers = executor._max_workers
346+
324347
# A dict mapping work ids to _WorkItems e.g.
325348
# {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
326349
self.pending_work_items = executor._pending_work_items
@@ -357,12 +380,14 @@ def run(self):
357380
# while waiting on new results.
358381
del result_item
359382

360-
if executor := self.executor_reference():
361-
if process_exited:
362-
with self.shutdown_lock:
363-
executor._replace_dead_worker()
364-
else:
365-
executor._idle_worker_semaphore.release()
383+
if process_exited:
384+
with self.shutdown_lock:
385+
broken = self._replace_dead_worker()
386+
if broken is not None:
387+
self.terminate_broken(*broken)
388+
return
389+
elif executor := self.executor_reference():
390+
executor._idle_worker_semaphore.release()
366391
del executor
367392

368393
if self.is_shutting_down():
@@ -379,6 +404,71 @@ def run(self):
379404
self.join_executor_internals()
380405
return
381406

407+
def _replace_dead_worker(self):
408+
"""Spawn a replacement for a worker that exited at its
409+
max_tasks_per_child limit. Called under self.shutdown_lock.
410+
411+
Returns None while the pool can still make progress, otherwise a
412+
(cause, message) tuple describing why the remaining work items can
413+
never run, so that run() can fail their futures.
414+
"""
415+
assert self.shutdown_lock.locked()
416+
cause = None
417+
message = None
418+
executor = self.executor_reference()
419+
if executor is None:
420+
# gh-152967: The executor was garbage collected; nothing can
421+
# spawn a replacement worker for it anymore.
422+
message = ("The ProcessPoolExecutor was garbage collected with "
423+
"work pending after its last worker process exited "
424+
"upon reaching max_tasks_per_child; the pending work "
425+
"can never be run.")
426+
elif executor._force_shutting_down:
427+
# terminate_workers()/kill_workers() is tearing the pool down;
428+
# a replacement worker would escape the kill and run work
429+
# items that were enqueued before it.
430+
message = ("A worker process exited while the pool was being "
431+
"forcefully shut down; work that was still enqueued "
432+
"will not be run.")
433+
elif self.pending_work_items or not self.is_shutting_down():
434+
# gh-115634: Do not consult the executor's
435+
# _idle_worker_semaphore here: it counts task completions, not
436+
# idle workers, so it can hold a stale token released by the
437+
# now-dead worker. Trusting such a token would leave the pool
438+
# a worker short, deadlocking once all workers reach their
439+
# task limit. Spawning from this (manager) thread is safe
440+
# despite gh-90622 because max_tasks_per_child is rejected for
441+
# the "fork" start method.
442+
if len(self.processes) < self.max_workers:
443+
# gh-119592: Spawn using state owned by this thread and
444+
# configuration read through the live weakref (which
445+
# shutdown() never clears), not the executor state that
446+
# shutdown(wait=False) clears concurrently.
447+
try:
448+
_spawn_worker(executor._mp_context, self.call_queue,
449+
self.result_queue, executor._initializer,
450+
executor._initargs,
451+
self.max_tasks_per_child, self.processes)
452+
except Exception as exc:
453+
# While other workers remain the pool has merely lost
454+
# capacity and they keep draining the queue; with none
455+
# left the failure is reported below.
456+
cause = format_exception(exc)
457+
message = ("A replacement worker process could not be "
458+
"started, leaving the pool without workers "
459+
"to run the remaining work.")
460+
del executor
461+
462+
if not self.processes and (self.pending_work_items
463+
or cause is not None):
464+
# No worker processes remain and no replacement can be
465+
# spawned: any remaining work items can never run. A spawn
466+
# failure breaks the pool even with nothing pending; leaving
467+
# a zero-worker pool alive would hang a later submit() on a
468+
# stale _idle_worker_semaphore token instead of raising.
469+
return (cause, message)
470+
return None
471+
382472
def add_call_item_to_queue(self):
383473
# Fills call_queue with _WorkItems from pending_work_items.
384474
# This function never blocks.
@@ -455,10 +545,11 @@ def is_shutting_down(self):
455545
return (_global_shutdown or executor is None
456546
or executor._shutdown_thread)
457547

458-
def _terminate_broken(self, cause):
548+
def _terminate_broken(self, cause, bpe_message=None):
459549
# Terminate the executor because it is in a broken state. The cause
460550
# argument can be used to display more information on the error that
461-
# lead the executor into becoming broken.
551+
# lead the executor into becoming broken. bpe_message overrides the
552+
# default message on the BrokenProcessPool set on pending futures.
462553

463554
# Mark the process pool broken so that submits fail right now.
464555
executor = self.executor_reference()
@@ -489,11 +580,12 @@ def _terminate_broken(self, cause):
489580
cause_str = "\n".join(errors)
490581
cause_tb = f"\n'''\n{cause_str}'''" if cause_str else None
491582

583+
if bpe_message is None:
584+
bpe_message = ("A process in the process pool was terminated "
585+
"abruptly while the future was running or pending.")
492586
# Mark pending tasks as failed.
493587
for work_id, work_item in self.pending_work_items.items():
494-
bpe = BrokenProcessPool("A process in the process pool was "
495-
"terminated abruptly while the future was "
496-
"running or pending.")
588+
bpe = BrokenProcessPool(bpe_message)
497589
if cause_tb is not None:
498590
bpe.__cause__ = _RemoteTraceback(cause_tb)
499591
try:
@@ -518,9 +610,9 @@ def _terminate_broken(self, cause):
518610
# clean up resources
519611
self._join_executor_internals(broken=True)
520612

521-
def terminate_broken(self, cause):
613+
def terminate_broken(self, cause, bpe_message=None):
522614
with self.shutdown_lock:
523-
self._terminate_broken(cause)
615+
self._terminate_broken(cause, bpe_message)
524616

525617
def flag_executor_shutting_down(self):
526618
# Flag the executor as shutting down and cancel remaining tasks if
@@ -733,6 +825,7 @@ def __init__(self, max_workers=None, mp_context=None,
733825
self._queue_count = 0
734826
self._pending_work_items = {}
735827
self._cancel_pending_futures = False
828+
self._force_shutting_down = False
736829

737830
# _ThreadWakeup is a communication channel used to interrupt the wait
738831
# of the main loop of executor_manager_thread from another thread (e.g.
@@ -772,34 +865,15 @@ def _start_executor_manager_thread(self):
772865
_threads_wakeups[self._executor_manager_thread] = \
773866
self._executor_manager_thread_wakeup
774867

775-
def _replace_dead_worker(self):
868+
def _adjust_process_count(self):
776869
# gh-132969: avoid error when state is reset and executor is still running,
777870
# which will happen when shutdown(wait=False) is called.
778871
if self._processes is None:
779872
return
780873

781-
# A replacement is pointless when shutting down with nothing left
782-
# to run. Both attributes are read under _shutdown_lock, which
783-
# shutdown() holds while setting _shutdown_thread.
784-
assert self._shutdown_lock.locked()
785-
if self._shutdown_thread and not self._pending_work_items:
786-
return
787-
788-
# gh-115634: A worker exited after reaching max_tasks_per_child and
789-
# has been removed from self._processes. Do not consult
790-
# _idle_worker_semaphore here: it counts task completions, not idle
791-
# workers, so it can hold a stale token released by the now-dead
792-
# worker. Trusting such a token would leave the pool a worker short,
793-
# deadlocking once all workers reach their task limit. Spawning is
794-
# safe from this (manager) thread despite gh-90622 because
795-
# max_tasks_per_child is rejected for the "fork" start method.
796-
if len(self._processes) < self._max_workers:
797-
self._spawn_process()
798-
799-
def _adjust_process_count(self):
800-
# gh-132969: avoid error when state is reset and executor is still running,
801-
# which will happen when shutdown(wait=False) is called.
802-
if self._processes is None:
874+
# gh-152967: A forceful shutdown is in progress; a worker spawned
875+
# here could escape its process snapshot and keep running work.
876+
if self._force_shutting_down:
803877
return
804878

805879
# if there's an idle process, we don't need to spawn a new one.
@@ -825,15 +899,10 @@ def _launch_processes(self):
825899
self._spawn_process()
826900

827901
def _spawn_process(self):
828-
p = self._mp_context.Process(
829-
target=_process_worker,
830-
args=(self._call_queue,
831-
self._result_queue,
832-
self._initializer,
833-
self._initargs,
834-
self._max_tasks_per_child))
835-
p.start()
836-
self._processes[p.pid] = p
902+
_spawn_worker(self._mp_context, self._call_queue,
903+
self._result_queue, self._initializer,
904+
self._initargs, self._max_tasks_per_child,
905+
self._processes)
837906

838907
def submit(self, fn, /, *args, **kwargs):
839908
with self._shutdown_lock:
@@ -930,6 +999,14 @@ def _force_shutdown(self, operation):
930999
if operation not in _SHUTDOWN_CALLBACK_OPERATION:
9311000
raise ValueError(f"Unsupported operation: {operation!r}")
9321001

1002+
# gh-152967: Stop the manager thread from spawning replacement
1003+
# workers before we copy the processes to signal: a worker spawned
1004+
# after the copy would survive the loop below and run enqueued
1005+
# work items. Taking the lock orders this against the manager's
1006+
# worker replacement, which runs under the same lock.
1007+
with self._shutdown_lock:
1008+
self._force_shutting_down = True
1009+
9331010
processes = {}
9341011
if self._processes:
9351012
processes = self._processes.copy()

0 commit comments

Comments
 (0)