Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/pytest-slow
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
set -euo pipefail

. .venv/bin/activate
exec pytest "$@" -m 'not (tool or mujoco)' dimos
exec pytest "$@" -m 'not (tool or mujoco or self_hosted_large)' dimos
2 changes: 1 addition & 1 deletion dimos/agents/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def on_message(msg: BaseMessage) -> None:
AgentTestRunner.blueprint(messages=messages),
)

global_config.update(viewer="none")
global_config.update(viewer="none", transport="lcm") # fixture uses pLCMTransport sidecars

nonlocal coordinator
coordinator = ModuleCoordinator.build(blueprint)
Expand Down
2 changes: 1 addition & 1 deletion dimos/agents/mcp/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def on_message(msg: BaseMessage) -> None:
AgentTestRunner.blueprint(messages=messages),
)

global_config.update(viewer="none")
global_config.update(viewer="none", transport="lcm") # fixture uses pLCMTransport sidecars

nonlocal coordinator
coordinator = ModuleCoordinator.build(blueprint)
Expand Down
7 changes: 3 additions & 4 deletions dimos/agents/mcp/mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from dimos.core.core import rpc
from dimos.core.module import Module
from dimos.core.rpc_client import RpcCall, RPCClient
from dimos.core.transport_factory import make_transport
from dimos.utils.logging_config import setup_logger

if TYPE_CHECKING:
Expand Down Expand Up @@ -419,13 +420,11 @@ def list_modules(self) -> str:

@skill
def agent_send(self, message: str) -> str:
"""Send a message to the running DimOS agent via LCM."""
"""Send a message to the running DimOS agent over the active transport."""
if not message:
raise ValueError("Message cannot be empty")

from dimos.core.transport import pLCMTransport

transport: pLCMTransport[str] = pLCMTransport("/human_input")
transport = make_transport("/human_input")
try:
transport.start()
transport.publish(message)
Expand Down
12 changes: 6 additions & 6 deletions dimos/agents/mcp/test_tool_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,14 +262,14 @@ def set_context(value):

@pytest.fixture()
def stream_with_transport_mock(mocker, skill_context):
"""ToolStream wired to a mock pLCMTransport so unit tests can inspect publishes.
"""ToolStream wired to a mock transport so unit tests can inspect publishes.

Constructs the stream inside a simulated `@skill` context with no progress
token, so the strict-construction rule is satisfied and the stream takes the
`notifications/message` fallback path.
"""
mock_transport = mocker.MagicMock()
mocker.patch("dimos.agents.mcp.tool_stream.pLCMTransport", return_value=mock_transport)
mocker.patch("dimos.agents.mcp.tool_stream.make_transport", return_value=mock_transport)
stream = ToolStream("test_tool")
return stream, mock_transport

Expand Down Expand Up @@ -374,7 +374,7 @@ def test_stop_frame_carries_acquire_token(mocker, skill_context) -> None:
McpServer can release the hold for that specific invocation."""
skill_context({"acquire_token": "tok-1"})
mock_transport = mocker.MagicMock()
mocker.patch("dimos.agents.mcp.tool_stream.pLCMTransport", return_value=mock_transport)
mocker.patch("dimos.agents.mcp.tool_stream.make_transport", return_value=mock_transport)
stream = ToolStream("follow_person")
stream.stop()
frame = mock_transport.publish.call_args.args[0]
Expand Down Expand Up @@ -403,7 +403,7 @@ def test_make_progress_notification_shape() -> None:
def stream_with_progress_context(mocker, skill_context):
"""ToolStream constructed with a skill-context progress_token set."""
mock_transport = mocker.MagicMock()
mocker.patch("dimos.agents.mcp.tool_stream.pLCMTransport", return_value=mock_transport)
mocker.patch("dimos.agents.mcp.tool_stream.make_transport", return_value=mock_transport)
skill_context({"progress_token": "pt-unit-1"})
stream = ToolStream("progress_tool")
return stream, mock_transport
Expand Down Expand Up @@ -518,7 +518,7 @@ def tool_helper_module(mocker):
start_tool/tool_update/stop_tool helpers.
"""
mock_transport = mocker.MagicMock()
mocker.patch("dimos.agents.mcp.tool_stream.pLCMTransport", return_value=mock_transport)
mocker.patch("dimos.agents.mcp.tool_stream.make_transport", return_value=mock_transport)
mocker.patch("dimos.core.module.get_loop", return_value=(None, None))
mocker.patch.object(LCMRPC, "__init__", return_value=None)
mocker.patch.object(LCMRPC, "serve_module_rpc")
Expand Down Expand Up @@ -571,7 +571,7 @@ def test_rebind_acquire_token_noops_when_closed_or_no_context(mocker, skill_cont
"""`rebind_acquire_token` updates the live token, but is a no-op outside a
skill context or once the stream is closed."""
mock_transport = mocker.MagicMock()
mocker.patch("dimos.agents.mcp.tool_stream.pLCMTransport", return_value=mock_transport)
mocker.patch("dimos.agents.mcp.tool_stream.make_transport", return_value=mock_transport)

skill_context({"acquire_token": "T1"})
stream = ToolStream("job")
Expand Down
28 changes: 15 additions & 13 deletions dimos/agents/mcp/tool_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@
still running.

Transport: each `ToolStream.send` publishes a ready-made JSON-RPC
`notifications/message` frame on the shared `/tool_streams` LCM topic. Skill
`notifications/message` frame on the shared `/tool_streams` topic via the active
transport backend (LCM or Zenoh; see `dimos.core.transport_factory`). Skill
workers and the `McpServer` process typically live in different workers, so we
lean on LCM's local-multicast bus to cross that boundary. `McpServer` subscribes
to the topic once, forwards each frame to every connected `GET /mcp` SSE client,
and drops frames when nobody is listening.
lean on the transport's local pub/sub bus to cross that boundary. `McpServer`
subscribes to the topic once, forwards each frame to every connected `GET /mcp`
SSE client, and drops frames when nobody is listening.

Each `ToolStream` instance owns its own `pLCMTransport`, created lazily on the
first `send` and torn down by `stop`. There is no module-level or process-level
state. The stream's lifetime is exactly the owning skill's lifetime.
Each `ToolStream` instance owns its own transport, created lazily on the first
`send` and torn down by `stop`. There is no module-level or process-level state.
The stream's lifetime is exactly the owning skill's lifetime.
"""

from __future__ import annotations
Expand All @@ -38,7 +39,8 @@
import uuid

from dimos.agents.annotation import current_skill_context
from dimos.core.transport import pLCMTransport
from dimos.core.transport import PubSubTransport
from dimos.core.transport_factory import make_transport
from dimos.utils.logging_config import setup_logger

logger = setup_logger()
Expand Down Expand Up @@ -94,8 +96,8 @@ def make_stopped_notification(tool_name: str, token: str | None = None) -> dict[


def subscribe(callback: ToolStreamCallback) -> Callable[[], None]:
"""Subscribe to the tool-stream LCM topic and return a cleanup callable."""
transport: pLCMTransport[dict[str, Any]] = pLCMTransport(TOOL_STREAM_TOPIC)
"""Subscribe to the tool-stream topic and return a cleanup callable."""
transport: PubSubTransport[dict[str, Any]] = make_transport(TOOL_STREAM_TOPIC)
transport.start()
unsubscribe = transport.subscribe(callback)

Expand Down Expand Up @@ -139,7 +141,7 @@ def __init__(self, tool_name: str) -> None:
self.id: str = str(uuid.uuid4())
self._closed: threading.Event = threading.Event()
self._lock = threading.Lock()
self._transport: pLCMTransport[dict[str, Any]] | None = None
self._transport: PubSubTransport[dict[str, Any]] | None = None
context = current_skill_context()
if context is None:
raise RuntimeError(
Expand Down Expand Up @@ -177,7 +179,7 @@ def send(self, message: str) -> None:
logger.warning("send on closed ToolStream", stream_id=self.id)
return
if self._transport is None:
self._transport = pLCMTransport(TOOL_STREAM_TOPIC)
self._transport = make_transport(TOOL_STREAM_TOPIC)
self._transport.start()
self._progress += 1
progress = self._progress
Expand All @@ -203,7 +205,7 @@ def stop(self) -> None:
# If no `send()` ever happened we spin up a transport here so the
# lifecycle signal isn't lost.
if transport is None:
transport = pLCMTransport(TOOL_STREAM_TOPIC)
transport = make_transport(TOOL_STREAM_TOPIC)
transport.start()
try:
transport.publish(make_stopped_notification(self.tool_name, self._acquire_token))
Expand Down
9 changes: 5 additions & 4 deletions dimos/agents/web_human_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
from dimos.constants import DEFAULT_THREAD_JOIN_TIMEOUT
from dimos.core.core import rpc
from dimos.core.module import Module
from dimos.core.transport import pLCMTransport
from dimos.core.transport import PubSubTransport
from dimos.core.transport_factory import make_transport
from dimos.stream.audio.node_normalizer import AudioNormalizer
from dimos.utils.logging_config import setup_logger
from dimos.web.robot_web_interface import RobotWebInterface
Expand All @@ -35,13 +36,13 @@
class WebInput(Module):
_web_interface: RobotWebInterface | None = None
_thread: Thread | None = None
_human_transport: pLCMTransport[str] | None = None
_human_transport: PubSubTransport[str] | None = None

@rpc
def start(self) -> None:
super().start()

self._human_transport = pLCMTransport("/human_input")
self._human_transport = make_transport("/human_input")

audio_subject: rx.subject.Subject[AudioEvent] = rx.subject.Subject()

Expand Down Expand Up @@ -83,5 +84,5 @@ def stop(self) -> None:
if self._thread:
self._thread.join(timeout=DEFAULT_THREAD_JOIN_TIMEOUT)
if self._human_transport:
self._human_transport.lcm.stop()
self._human_transport.stop()
super().stop()
13 changes: 13 additions & 0 deletions dimos/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@

from dimos.core.coordination.module_coordinator import ModuleCoordinator
from dimos.core.coordination.process_lifecycle import spawn_watchdog
from dimos.utils.testing.waiting import retry_until as _retry_until, wait_until as _wait_until

load_dotenv()

Expand Down Expand Up @@ -134,6 +135,18 @@ def lcm_url() -> str:
return os.environ.get("LCM_DEFAULT_URL", "udpm://239.255.76.67:7667?ttl=0")


@pytest.fixture
def wait_until():
"""Poll a predicate until it's true or a timeout elapses. See dimos.utils.testing.waiting."""
return _wait_until


@pytest.fixture
def retry_until():
"""Retry an action until a threading.Event fires. See dimos.utils.testing.waiting."""
return _retry_until


@pytest.hookimpl(tryfirst=True)
def pytest_collection_modifyitems(config, items):
_skipif_markers = {
Expand Down
25 changes: 14 additions & 11 deletions dimos/core/coordination/coordinator_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,39 @@

from typing import TYPE_CHECKING, Any

from dimos.protocol.rpc.pubsubrpc import LCMRPC
from dimos.core.global_config import global_config
from dimos.core.transport_factory import rpc_backend
from dimos.utils.logging_config import setup_logger

if TYPE_CHECKING:
from dimos.protocol.rpc.spec import RPCInspectable
from dimos.protocol.rpc.spec import RPCInspectable, RPCSpec

logger = setup_logger()


class CoordinatorRPC:
"""Owns the LCM RPC connection to the singleton Coordinator service."""
"""Owns the RPC connection to the singleton Coordinator service."""

NAME = "Coordinator"

def __init__(self, rpc: LCMRPC) -> None:
def __init__(self, rpc: RPCSpec) -> None:
self._rpc = rpc

@classmethod
def serve(cls, coordinator: RPCInspectable) -> CoordinatorRPC:
"""Publish `coordinator`'s @rpc methods under the `Coordinator/` prefix."""
cls._ensure_no_existing_service()
rpc = LCMRPC()
rpc.serve_module_rpc(coordinator, name=cls.NAME)
rpc = rpc_backend()()
# start() before serve_module_rpc(): Zenoh's subscribe needs an open
# session (acquired in start()), whereas LCM tolerates either order.
rpc.start()
rpc.serve_module_rpc(coordinator, name=cls.NAME)
return cls(rpc)

@classmethod
def connect(cls, *, timeout: float) -> CoordinatorRPC:
"""Attach to a running Coordinator, raising `TimeoutError` if none answers."""
rpc = LCMRPC()
rpc = rpc_backend()()
rpc.start()
client = cls(rpc)
try:
Expand All @@ -65,7 +68,7 @@ def call(self, method: str, *args: Any, rpc_timeout: float | None = None, **kwar
return result

@property
def rpc(self) -> LCMRPC:
def rpc(self) -> RPCSpec:
return self._rpc

def stop(self) -> None:
Expand All @@ -76,16 +79,16 @@ def stop(self) -> None:

@classmethod
def _ensure_no_existing_service(cls) -> None:
probe = LCMRPC()
probe = rpc_backend()()
probe.start()
try:
try:
probe.call_sync(f"{cls.NAME}/ping", ([], {}), rpc_timeout=0.5)
except TimeoutError:
return
raise RuntimeError(
f"another {cls.NAME} service is already running on this LCM bus. "
"Run `dimos stop` first."
f"another {cls.NAME} service is already running on the "
f"{global_config.transport} bus. Run `dimos stop` first."
)
finally:
probe.stop()
46 changes: 38 additions & 8 deletions dimos/core/coordination/module_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,14 @@
from dimos.core.global_config import GlobalConfig, global_config
from dimos.core.module import ModuleBase, ModuleSpec
from dimos.core.resource import Resource
from dimos.core.transport import LCMTransport, PubSubTransport, pLCMTransport
from dimos.core.transport import (
LCMTransport,
PubSubTransport,
ZenohTransport,
pLCMTransport,
pZenohTransport,
)
from dimos.core.transport_factory import make_transport
from dimos.spec.utils import is_spec, spec_annotation_compliance, spec_structural_compliance
from dimos.utils.generic import short_id
from dimos.utils.logging_config import setup_logger
Expand Down Expand Up @@ -572,15 +579,37 @@ def _is_name_unique(blueprint: Blueprint, name: str) -> bool:


def _get_transport_for(blueprint: Blueprint, name: str, stream_type: type) -> PubSubTransport[Any]:
transport = blueprint.transport_map.get((name, stream_type), None)
if transport:
return transport
mapped = blueprint.transport_map.get((name, stream_type), None)
if mapped is not None:
return _coerce_transport_to_backend(mapped)

use_pickled = getattr(stream_type, "lcm_encode", None) is None
topic = f"/{name}" if _is_name_unique(blueprint, name) else f"/{short_id()}"
transport = pLCMTransport(topic) if use_pickled else LCMTransport(topic, stream_type)
return make_transport(topic, stream_type)


def _coerce_transport_to_backend(transport: PubSubTransport[Any]) -> PubSubTransport[Any]:
"""Rebuild an explicitly-mapped LCM/Zenoh transport for the active backend.

Blueprints pin specific channels in their `transport_map` with e.g. `LCMTransport("/cmd_vel",
Twist)`. So the global transport switch reaches those too, rebuild the plain LCM<->Zenoh pair
via the factory when it doesn't match `global_config.transport`. Deliberate non-default choices
(`JpegLcmTransport`, SHM, ROS, DDS, ...) are exact-type-checked out and left untouched.
"""
want = global_config.transport
is_pickled = type(transport) in (pLCMTransport, pZenohTransport)
is_lcm = type(transport) in (LCMTransport, pLCMTransport)
is_zenoh = type(transport) in (ZenohTransport, pZenohTransport)
if not ((want == "zenoh" and is_lcm) or (want == "lcm" and is_zenoh)):
return transport

return transport
if is_pickled:
raw, msg_type = transport.topic, None
else:
raw, msg_type = transport.topic.topic, transport.topic.lcm_type
# Strip the Zenoh 'dimos/' namespace (if present) back to the logical name.
# The factory re-applies the right prefix for the target backend.
logical = raw[len("dimos/") :] if raw.startswith("dimos/") else raw
return make_transport(logical, msg_type)


def _verify_no_name_conflicts(blueprint: Blueprint) -> None:
Expand Down Expand Up @@ -648,7 +677,8 @@ def _run_configurators(blueprint: Blueprint) -> None:
from dimos.protocol.service.system_configurator.base import configure_system
from dimos.protocol.service.system_configurator.lcm_config import lcm_configurators

configurators = [*lcm_configurators(), *blueprint.configurator_checks]
lcm_checks = lcm_configurators() if global_config.transport == "lcm" else []
configurators = [*lcm_checks, *blueprint.configurator_checks]

try:
configure_system(configurators)
Expand Down
8 changes: 8 additions & 0 deletions dimos/core/coordination/python_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,15 @@ def _worker_entrypoint(conn: Connection, worker_id: int) -> None:
def _handle_request(request: Any, state: _WorkerState) -> WorkerResponse:
match request:
case DeployModuleRequest(module_id=module_id, module_class=module_class, kwargs=kwargs):
# Always use the same transport and QoS rules as the host.
host_config = kwargs.get("g")
if host_config is not None:
global_config.update(
transport=host_config.transport, zenoh_qos=host_config.zenoh_qos
)

state.instances[module_id] = module_class(**kwargs)

return WorkerResponse(result=module_id)

case SetRefRequest(module_id=module_id, ref=ref):
Expand Down
Loading
Loading