Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions dimos/agents/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from dimos.core.coordination.module_coordinator import ModuleCoordinator
from dimos.core.global_config import global_config
from dimos.core.transport import pLCMTransport
from dimos.utils.testing.agent_teardown import teardown_agent_setup

load_dotenv()

Expand Down Expand Up @@ -96,11 +97,4 @@ def on_message(msg: BaseMessage) -> None:

yield fn

if coordinator is not None:
coordinator.stop()

for transport in transports:
transport.stop()

for unsub in unsubs:
unsub()
teardown_agent_setup(coordinator, transports, unsubs)
10 changes: 2 additions & 8 deletions dimos/agents/mcp/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from dimos.core.coordination.module_coordinator import ModuleCoordinator
from dimos.core.global_config import global_config
from dimos.core.transport import pLCMTransport
from dimos.utils.testing.agent_teardown import teardown_agent_setup

load_dotenv()

Expand Down Expand Up @@ -93,11 +94,4 @@ def on_message(msg: BaseMessage) -> None:

yield fn

if coordinator is not None:
coordinator.stop()

for transport in transports:
transport.stop()

for unsub in unsubs:
unsub()
teardown_agent_setup(coordinator, transports, unsubs)
11 changes: 10 additions & 1 deletion dimos/core/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,16 @@ def test_basic_deployment(dimos) -> None:
robot.start()
nav.start()

time.sleep(1)
# Messages flow end-to-end at ~10 Hz, but under heavy parallel load
# (e.g. `pytest -n auto`) the publisher and LCM transport threads get
# starved, so the count after a fixed 1s window is unreliable. Poll up to a
# generous deadline instead — this still verifies the pipeline delivers, it
# just doesn't assume a wall-clock throughput.
deadline = time.perf_counter() + 15
while time.perf_counter() < deadline:
if robot.mov_msg_count >= 8 and nav.odom_msg_count >= 8 and nav.lidar_msg_count >= 8:
break
time.sleep(0.1)

assert robot.mov_msg_count >= 8
assert nav.odom_msg_count >= 8
Expand Down
26 changes: 25 additions & 1 deletion dimos/robot/drone/blueprints/basic/drone_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
from dimos.robot.drone.connection_module import DroneConnectionModule
from dimos.visualization.vis_module import vis_module

# Camera intrinsics [fx, fy, cx, cy], shared by the camera module and the
# static Rerun pinhole below so the two can't drift.
_CAMERA_INTRINSICS = [1000.0, 1000.0, 960.0, 540.0]


def _static_drone_body(rr: Any) -> list[Any]:
"""Static visualization of drone body."""
Expand All @@ -36,6 +40,25 @@ def _static_drone_body(rr: Any) -> list[Any]:
]


def _static_camera_pinhole(rr: Any) -> list[Any]:
"""Pinhole at the camera-image entity.

The camera feed is logged at ``world/video``, which falls under the 3D
view's ``world`` origin. A 2D image needs a pinhole ancestor to be placed
in a 3D view, so without this Rerun warns ("2D visualizers require a
pinhole ancestor to be shown in a 3D view") and the feed only renders in
the dedicated 2D pane. Logging the pinhole here lets it also show as a
camera frustum in 3D.
"""
fx, fy, cx, cy = _CAMERA_INTRINSICS
return [
rr.Pinhole(
image_from_camera=[[fx, 0.0, cx], [0.0, fy, cy], [0.0, 0.0, 1.0]],
resolution=[2.0 * cx, 2.0 * cy],
),
]


def _drone_rerun_blueprint() -> Any:
"""Split layout: camera feed + 3D world view side by side."""
import rerun as rr
Expand All @@ -61,6 +84,7 @@ def _drone_rerun_blueprint() -> Any:
"blueprint": _drone_rerun_blueprint,
"static": {
"world/tf/base_link": _static_drone_body,
"world/video": _static_camera_pinhole,
},
}

Expand All @@ -79,7 +103,7 @@ def _drone_rerun_blueprint() -> Any:
video_port=video_port,
outdoor=False,
),
DroneCameraModule.blueprint(camera_intrinsics=[1000.0, 1000.0, 960.0, 540.0]),
DroneCameraModule.blueprint(camera_intrinsics=_CAMERA_INTRINSICS),
)

__all__ = [
Expand Down
56 changes: 56 additions & 0 deletions dimos/utils/testing/agent_teardown.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Copyright 2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Best-effort teardown for the ``agent_setup`` test fixtures.

When an agent test fails mid-run — e.g. the LLM is slow under heavy parallel
load and ``finished_event.wait`` times out — the fixture still has to release
everything it created: the coordinator's worker processes, the LCM transports,
and their subscriptions. If one cleanup step raises, the remaining steps must
still run; otherwise threads leak and the autouse ``monitor_threads`` fixture
turns a plain test failure into a failure *and* a teardown error. So each step
is isolated and its failure logged rather than propagated.
"""

from collections.abc import Callable, Iterable
from typing import Any

from dimos.utils.logging_config import setup_logger

logger = setup_logger()


def teardown_agent_setup(
coordinator: Any | None,
transports: Iterable[Any],
unsubs: Iterable[Callable[[], Any]],
) -> None:
"""Tear down an ``agent_setup`` run, attempting every step regardless of failures."""

def _safe(action: Callable[[], Any], step: str) -> None:
try:
action()
except Exception:
# Cleanup is best-effort: log and keep going so a single failing
# step never leaks the rest or masks the test's own result.
logger.error("agent_setup teardown step failed", step=step, exc_info=True)

if coordinator is not None:
_safe(coordinator.stop, "coordinator.stop")

for transport in transports:
_safe(transport.stop, f"{type(transport).__name__}.stop")

for i, unsub in enumerate(unsubs):
_safe(unsub, f"unsubscribe[{i}]")
Loading