From 0ed5b6c72d4434e61f1e6274b3c69d4aaa8bb641 Mon Sep 17 00:00:00 2001 From: Mike Rosseel Date: Wed, 17 Jun 2026 11:38:13 +0200 Subject: [PATCH] fix(state): graceful worker shutdown when shared-state manager dies When the multiprocessing Manager that owns SharedStateObj dies, worker proxy calls raise BrokenPipeError/ConnectionResetError/EOFError. The solver's outer restart loop turned this into an infinite ~2s spin that flooded the log (70k+ "Broken pipe" lines), and the integrator/camera loops handled it inconsistently. Add a shared helper in state_utils (DEAD_MANAGER_EXCEPTIONS, the SharedStateLost signal raised by sleep_for_framerate, and is_dead_manager_error) and apply it in the solver, integrator and camera worker loops so a dead manager is logged once and the loop exits cleanly instead of retrying forever. Behaviour is unchanged while the manager is healthy. Co-Authored-By: Claude Opus 4.8 --- python/PiFinder/camera_interface.py | 6 ++- python/PiFinder/integrator.py | 6 ++- python/PiFinder/solver.py | 15 +++--- python/PiFinder/state_utils.py | 30 ++++++++++- python/tests/test_state_utils.py | 77 +++++++++++++++++++++++++++++ 5 files changed, 121 insertions(+), 13 deletions(-) create mode 100644 python/tests/test_state_utils.py diff --git a/python/PiFinder/camera_interface.py b/python/PiFinder/camera_interface.py index 58a7fb865..d817f0cae 100644 --- a/python/PiFinder/camera_interface.py +++ b/python/PiFinder/camera_interface.py @@ -674,5 +674,9 @@ def get_image_loop( logger.info( f"CameraInterface: Camera loop exited with command: '{command}'" ) - except (BrokenPipeError, EOFError, FileNotFoundError): + except FileNotFoundError: logger.exception("Error in Camera Loop") + except Exception as e: + if not state_utils.is_dead_manager_error(e): + raise + logger.error("Shared-state manager gone; stopping camera loop: %s", e) diff --git a/python/PiFinder/integrator.py b/python/PiFinder/integrator.py index 5e3cbd68a..7ae908c97 100644 --- a/python/PiFinder/integrator.py +++ b/python/PiFinder/integrator.py @@ -200,8 +200,10 @@ def integrator( telemetry.flush() - except EOFError: - logger.error("Main no longer running for integrator") + except Exception as e: + if not state_utils.is_dead_manager_error(e): + raise + logger.error("Shared-state manager gone; stopping integrator: %s", e) finally: if telemetry is not None: telemetry.stop() diff --git a/python/PiFinder/solver.py b/python/PiFinder/solver.py index c3eb27f7d..fb1c51be3 100644 --- a/python/PiFinder/solver.py +++ b/python/PiFinder/solver.py @@ -434,9 +434,9 @@ def solver( # which might be from the IMU try: last_image_metadata = shared_state.last_image_metadata() - except (BrokenPipeError, ConnectionResetError) as e: - logger.error(f"Lost connection to shared state manager: {e}") - continue + except state_utils.DEAD_MANAGER_EXCEPTIONS as e: + logger.error("Shared-state manager gone; stopping Solver: %s", e) + return # Check if we should process this image is_new_image = last_image_metadata["exposure_end"] > last_solve_attempt @@ -592,13 +592,10 @@ def solver( t_extract_ms=0.0, ) ) - except EOFError as eof: - logger.error(f"Main process no longer running for solver: {eof}") - logger.exception(eof) - logger.error( - f"Last solve attempt: {last_solve_attempt}, last success: {last_solve_success}" - ) except Exception as e: + if state_utils.is_dead_manager_error(e): + logger.error("Shared-state manager gone; stopping Solver: %s", e) + return logger.error(f"Exception in Solver: {e.__class__.__name__}: {str(e)}") logger.exception(e) logger.error(f"Current process ID: {os.getpid()}") diff --git a/python/PiFinder/state_utils.py b/python/PiFinder/state_utils.py index 5e9a1599b..ff5031224 100644 --- a/python/PiFinder/state_utils.py +++ b/python/PiFinder/state_utils.py @@ -7,11 +7,39 @@ _TARGET_PERIOD = 1.0 / 30.0 _last_wake: Optional[float] = None +# Exceptions raised on a SharedStateObj proxy call when the multiprocessing +# Manager process that owns the shared state has died. The proxy connection is +# then permanently broken, so a worker that sees one of these can never recover +# by retrying -- it should log once and stop its loop instead of spinning. +DEAD_MANAGER_EXCEPTIONS = (BrokenPipeError, ConnectionResetError, EOFError) + + +class SharedStateLost(RuntimeError): + """The shared-state Manager process is gone; the worker should stop. + + Raised by :func:`sleep_for_framerate` so the documented spin site surfaces + a single, intentional signal rather than a raw connection error. + """ + + +def is_dead_manager_error(exc: BaseException) -> bool: + """Return True when *exc* signals the shared-state Manager process is gone. + + Worker loops should treat this as terminal: log once and exit the loop + cleanly instead of retrying forever (which floods the logs). + """ + return isinstance(exc, DEAD_MANAGER_EXCEPTIONS + (SharedStateLost,)) + def sleep_for_framerate(shared_state: SharedStateObj, limit_framerate=True) -> bool: global _last_wake - if shared_state.power_state() <= 0: + try: + powered = shared_state.power_state() > 0 + except DEAD_MANAGER_EXCEPTIONS as e: + raise SharedStateLost(str(e)) from e + + if not powered: time.sleep(0.5) _last_wake = time.monotonic() return True diff --git a/python/tests/test_state_utils.py b/python/tests/test_state_utils.py new file mode 100644 index 000000000..5ba7ba32b --- /dev/null +++ b/python/tests/test_state_utils.py @@ -0,0 +1,77 @@ +"""Tests for shared-state dead-manager detection and graceful shutdown. + +When the multiprocessing Manager that owns SharedStateObj dies, worker +proxy calls raise BrokenPipeError / ConnectionResetError / EOFError. Workers +must treat this as terminal (log once, stop) instead of retrying forever. +""" + +import pytest + +from PiFinder import state_utils + + +@pytest.mark.unit +@pytest.mark.parametrize( + "exc", + [ + BrokenPipeError(32, "Broken pipe"), + ConnectionResetError(104, "Connection reset by peer"), + EOFError(), + state_utils.SharedStateLost("manager gone"), + ], +) +def test_dead_manager_errors_detected(exc): + assert state_utils.is_dead_manager_error(exc) is True + + +@pytest.mark.unit +@pytest.mark.parametrize("exc", [ValueError(), RuntimeError(), KeyError(), OSError()]) +def test_live_manager_errors_not_flagged(exc): + assert state_utils.is_dead_manager_error(exc) is False + + +class _Healthy: + def power_state(self): + return 1 + + +class _PoweredOff: + def power_state(self): + return 0 + + +class _DeadManager: + def __init__(self, exc): + self._exc = exc + + def power_state(self): + raise self._exc + + +@pytest.mark.unit +def test_sleep_for_framerate_awake_when_powered(): + state_utils._last_wake = None + assert state_utils.sleep_for_framerate(_Healthy(), limit_framerate=False) is False + + +@pytest.mark.unit +def test_sleep_for_framerate_sleeps_when_powered_off(): + assert state_utils.sleep_for_framerate(_PoweredOff()) is True + + +@pytest.mark.unit +@pytest.mark.parametrize( + "exc", + [ + BrokenPipeError(32, "Broken pipe"), + ConnectionResetError(104, "reset"), + EOFError(), + ], +) +def test_sleep_for_framerate_translates_dead_manager(exc): + with pytest.raises(state_utils.SharedStateLost) as info: + state_utils.sleep_for_framerate(_DeadManager(exc)) + # The original connection error is preserved as the cause and is still + # recognised by the shared detector. + assert info.value.__cause__ is exc + assert state_utils.is_dead_manager_error(info.value) is True