diff --git a/dimos/agents/conftest.py b/dimos/agents/conftest.py index e0953cb1b3..9ab20ed408 100644 --- a/dimos/agents/conftest.py +++ b/dimos/agents/conftest.py @@ -27,6 +27,7 @@ from dimos.core.coordination.module_coordinator import ModuleCoordinator from dimos.core.global_config import global_config from dimos.core.transport import pLCMTransport +from dimos.utils.testing.agent_teardown import teardown_agent_setup load_dotenv() @@ -96,11 +97,4 @@ def on_message(msg: BaseMessage) -> None: yield fn - if coordinator is not None: - coordinator.stop() - - for transport in transports: - transport.stop() - - for unsub in unsubs: - unsub() + teardown_agent_setup(coordinator, transports, unsubs) diff --git a/dimos/agents/mcp/conftest.py b/dimos/agents/mcp/conftest.py index 8b36fa03f3..63522970cf 100644 --- a/dimos/agents/mcp/conftest.py +++ b/dimos/agents/mcp/conftest.py @@ -27,6 +27,7 @@ from dimos.core.coordination.module_coordinator import ModuleCoordinator from dimos.core.global_config import global_config from dimos.core.transport import pLCMTransport +from dimos.utils.testing.agent_teardown import teardown_agent_setup load_dotenv() @@ -93,11 +94,4 @@ def on_message(msg: BaseMessage) -> None: yield fn - if coordinator is not None: - coordinator.stop() - - for transport in transports: - transport.stop() - - for unsub in unsubs: - unsub() + teardown_agent_setup(coordinator, transports, unsubs) diff --git a/dimos/core/test_core.py b/dimos/core/test_core.py index 22cbca31b3..895e33d816 100644 --- a/dimos/core/test_core.py +++ b/dimos/core/test_core.py @@ -116,7 +116,16 @@ def test_basic_deployment(dimos) -> None: robot.start() nav.start() - time.sleep(1) + # Messages flow end-to-end at ~10 Hz, but under heavy parallel load + # (e.g. `pytest -n auto`) the publisher and LCM transport threads get + # starved, so the count after a fixed 1s window is unreliable. Poll up to a + # generous deadline instead — this still verifies the pipeline delivers, it + # just doesn't assume a wall-clock throughput. + deadline = time.perf_counter() + 15 + while time.perf_counter() < deadline: + if robot.mov_msg_count >= 8 and nav.odom_msg_count >= 8 and nav.lidar_msg_count >= 8: + break + time.sleep(0.1) assert robot.mov_msg_count >= 8 assert nav.odom_msg_count >= 8 diff --git a/dimos/robot/drone/blueprints/basic/drone_basic.py b/dimos/robot/drone/blueprints/basic/drone_basic.py index aaf82f6355..c97f7b1976 100644 --- a/dimos/robot/drone/blueprints/basic/drone_basic.py +++ b/dimos/robot/drone/blueprints/basic/drone_basic.py @@ -24,6 +24,10 @@ from dimos.robot.drone.connection_module import DroneConnectionModule from dimos.visualization.vis_module import vis_module +# Camera intrinsics [fx, fy, cx, cy], shared by the camera module and the +# static Rerun pinhole below so the two can't drift. +_CAMERA_INTRINSICS = [1000.0, 1000.0, 960.0, 540.0] + def _static_drone_body(rr: Any) -> list[Any]: """Static visualization of drone body.""" @@ -36,6 +40,25 @@ def _static_drone_body(rr: Any) -> list[Any]: ] +def _static_camera_pinhole(rr: Any) -> list[Any]: + """Pinhole at the camera-image entity. + + The camera feed is logged at ``world/video``, which falls under the 3D + view's ``world`` origin. A 2D image needs a pinhole ancestor to be placed + in a 3D view, so without this Rerun warns ("2D visualizers require a + pinhole ancestor to be shown in a 3D view") and the feed only renders in + the dedicated 2D pane. Logging the pinhole here lets it also show as a + camera frustum in 3D. + """ + fx, fy, cx, cy = _CAMERA_INTRINSICS + return [ + rr.Pinhole( + image_from_camera=[[fx, 0.0, cx], [0.0, fy, cy], [0.0, 0.0, 1.0]], + resolution=[2.0 * cx, 2.0 * cy], + ), + ] + + def _drone_rerun_blueprint() -> Any: """Split layout: camera feed + 3D world view side by side.""" import rerun as rr @@ -61,6 +84,7 @@ def _drone_rerun_blueprint() -> Any: "blueprint": _drone_rerun_blueprint, "static": { "world/tf/base_link": _static_drone_body, + "world/video": _static_camera_pinhole, }, } @@ -79,7 +103,7 @@ def _drone_rerun_blueprint() -> Any: video_port=video_port, outdoor=False, ), - DroneCameraModule.blueprint(camera_intrinsics=[1000.0, 1000.0, 960.0, 540.0]), + DroneCameraModule.blueprint(camera_intrinsics=_CAMERA_INTRINSICS), ) __all__ = [ diff --git a/dimos/utils/testing/agent_teardown.py b/dimos/utils/testing/agent_teardown.py new file mode 100644 index 0000000000..7515045224 --- /dev/null +++ b/dimos/utils/testing/agent_teardown.py @@ -0,0 +1,56 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Best-effort teardown for the ``agent_setup`` test fixtures. + +When an agent test fails mid-run — e.g. the LLM is slow under heavy parallel +load and ``finished_event.wait`` times out — the fixture still has to release +everything it created: the coordinator's worker processes, the LCM transports, +and their subscriptions. If one cleanup step raises, the remaining steps must +still run; otherwise threads leak and the autouse ``monitor_threads`` fixture +turns a plain test failure into a failure *and* a teardown error. So each step +is isolated and its failure logged rather than propagated. +""" + +from collections.abc import Callable, Iterable +from typing import Any + +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + + +def teardown_agent_setup( + coordinator: Any | None, + transports: Iterable[Any], + unsubs: Iterable[Callable[[], Any]], +) -> None: + """Tear down an ``agent_setup`` run, attempting every step regardless of failures.""" + + def _safe(action: Callable[[], Any], step: str) -> None: + try: + action() + except Exception: + # Cleanup is best-effort: log and keep going so a single failing + # step never leaks the rest or masks the test's own result. + logger.error("agent_setup teardown step failed", step=step, exc_info=True) + + if coordinator is not None: + _safe(coordinator.stop, "coordinator.stop") + + for transport in transports: + _safe(transport.stop, f"{type(transport).__name__}.stop") + + for i, unsub in enumerate(unsubs): + _safe(unsub, f"unsubscribe[{i}]")