diff --git a/bin/pytest-slow b/bin/pytest-slow index 9f9d5ae611..70b9a53a88 100755 --- a/bin/pytest-slow +++ b/bin/pytest-slow @@ -3,4 +3,4 @@ set -euo pipefail . .venv/bin/activate -exec pytest "$@" -m 'not (tool or mujoco)' dimos +exec pytest "$@" -m 'not (tool or mujoco or self_hosted_large)' dimos diff --git a/dimos/agents/conftest.py b/dimos/agents/conftest.py index e0953cb1b3..7fe9fecee1 100644 --- a/dimos/agents/conftest.py +++ b/dimos/agents/conftest.py @@ -84,7 +84,7 @@ def on_message(msg: BaseMessage) -> None: AgentTestRunner.blueprint(messages=messages), ) - global_config.update(viewer="none") + global_config.update(viewer="none", transport="lcm") # fixture uses pLCMTransport sidecars nonlocal coordinator coordinator = ModuleCoordinator.build(blueprint) diff --git a/dimos/agents/mcp/conftest.py b/dimos/agents/mcp/conftest.py index 8b36fa03f3..a06224bd8b 100644 --- a/dimos/agents/mcp/conftest.py +++ b/dimos/agents/mcp/conftest.py @@ -81,7 +81,7 @@ def on_message(msg: BaseMessage) -> None: AgentTestRunner.blueprint(messages=messages), ) - global_config.update(viewer="none") + global_config.update(viewer="none", transport="lcm") # fixture uses pLCMTransport sidecars nonlocal coordinator coordinator = ModuleCoordinator.build(blueprint) diff --git a/dimos/agents/mcp/mcp_server.py b/dimos/agents/mcp/mcp_server.py index 2b792ef59a..61d7572af8 100644 --- a/dimos/agents/mcp/mcp_server.py +++ b/dimos/agents/mcp/mcp_server.py @@ -35,6 +35,7 @@ from dimos.core.core import rpc from dimos.core.module import Module from dimos.core.rpc_client import RpcCall, RPCClient +from dimos.core.transport_factory import make_transport from dimos.utils.logging_config import setup_logger if TYPE_CHECKING: @@ -419,13 +420,11 @@ def list_modules(self) -> str: @skill def agent_send(self, message: str) -> str: - """Send a message to the running DimOS agent via LCM.""" + """Send a message to the running DimOS agent over the active transport.""" if not message: raise ValueError("Message cannot be empty") - from dimos.core.transport import pLCMTransport - - transport: pLCMTransport[str] = pLCMTransport("/human_input") + transport = make_transport("/human_input") try: transport.start() transport.publish(message) diff --git a/dimos/agents/mcp/test_tool_stream.py b/dimos/agents/mcp/test_tool_stream.py index 0979163335..992bce8694 100644 --- a/dimos/agents/mcp/test_tool_stream.py +++ b/dimos/agents/mcp/test_tool_stream.py @@ -262,14 +262,14 @@ def set_context(value): @pytest.fixture() def stream_with_transport_mock(mocker, skill_context): - """ToolStream wired to a mock pLCMTransport so unit tests can inspect publishes. + """ToolStream wired to a mock transport so unit tests can inspect publishes. Constructs the stream inside a simulated `@skill` context with no progress token, so the strict-construction rule is satisfied and the stream takes the `notifications/message` fallback path. """ mock_transport = mocker.MagicMock() - mocker.patch("dimos.agents.mcp.tool_stream.pLCMTransport", return_value=mock_transport) + mocker.patch("dimos.agents.mcp.tool_stream.make_transport", return_value=mock_transport) stream = ToolStream("test_tool") return stream, mock_transport @@ -374,7 +374,7 @@ def test_stop_frame_carries_acquire_token(mocker, skill_context) -> None: McpServer can release the hold for that specific invocation.""" skill_context({"acquire_token": "tok-1"}) mock_transport = mocker.MagicMock() - mocker.patch("dimos.agents.mcp.tool_stream.pLCMTransport", return_value=mock_transport) + mocker.patch("dimos.agents.mcp.tool_stream.make_transport", return_value=mock_transport) stream = ToolStream("follow_person") stream.stop() frame = mock_transport.publish.call_args.args[0] @@ -403,7 +403,7 @@ def test_make_progress_notification_shape() -> None: def stream_with_progress_context(mocker, skill_context): """ToolStream constructed with a skill-context progress_token set.""" mock_transport = mocker.MagicMock() - mocker.patch("dimos.agents.mcp.tool_stream.pLCMTransport", return_value=mock_transport) + mocker.patch("dimos.agents.mcp.tool_stream.make_transport", return_value=mock_transport) skill_context({"progress_token": "pt-unit-1"}) stream = ToolStream("progress_tool") return stream, mock_transport @@ -518,7 +518,7 @@ def tool_helper_module(mocker): start_tool/tool_update/stop_tool helpers. """ mock_transport = mocker.MagicMock() - mocker.patch("dimos.agents.mcp.tool_stream.pLCMTransport", return_value=mock_transport) + mocker.patch("dimos.agents.mcp.tool_stream.make_transport", return_value=mock_transport) mocker.patch("dimos.core.module.get_loop", return_value=(None, None)) mocker.patch.object(LCMRPC, "__init__", return_value=None) mocker.patch.object(LCMRPC, "serve_module_rpc") @@ -571,7 +571,7 @@ def test_rebind_acquire_token_noops_when_closed_or_no_context(mocker, skill_cont """`rebind_acquire_token` updates the live token, but is a no-op outside a skill context or once the stream is closed.""" mock_transport = mocker.MagicMock() - mocker.patch("dimos.agents.mcp.tool_stream.pLCMTransport", return_value=mock_transport) + mocker.patch("dimos.agents.mcp.tool_stream.make_transport", return_value=mock_transport) skill_context({"acquire_token": "T1"}) stream = ToolStream("job") diff --git a/dimos/agents/mcp/tool_stream.py b/dimos/agents/mcp/tool_stream.py index 403539f93a..7a8d0882ed 100644 --- a/dimos/agents/mcp/tool_stream.py +++ b/dimos/agents/mcp/tool_stream.py @@ -19,15 +19,16 @@ still running. Transport: each `ToolStream.send` publishes a ready-made JSON-RPC -`notifications/message` frame on the shared `/tool_streams` LCM topic. Skill +`notifications/message` frame on the shared `/tool_streams` topic via the active +transport backend (LCM or Zenoh; see `dimos.core.transport_factory`). Skill workers and the `McpServer` process typically live in different workers, so we -lean on LCM's local-multicast bus to cross that boundary. `McpServer` subscribes -to the topic once, forwards each frame to every connected `GET /mcp` SSE client, -and drops frames when nobody is listening. +lean on the transport's local pub/sub bus to cross that boundary. `McpServer` +subscribes to the topic once, forwards each frame to every connected `GET /mcp` +SSE client, and drops frames when nobody is listening. -Each `ToolStream` instance owns its own `pLCMTransport`, created lazily on the -first `send` and torn down by `stop`. There is no module-level or process-level -state. The stream's lifetime is exactly the owning skill's lifetime. +Each `ToolStream` instance owns its own transport, created lazily on the first +`send` and torn down by `stop`. There is no module-level or process-level state. +The stream's lifetime is exactly the owning skill's lifetime. """ from __future__ import annotations @@ -38,7 +39,8 @@ import uuid from dimos.agents.annotation import current_skill_context -from dimos.core.transport import pLCMTransport +from dimos.core.transport import PubSubTransport +from dimos.core.transport_factory import make_transport from dimos.utils.logging_config import setup_logger logger = setup_logger() @@ -94,8 +96,8 @@ def make_stopped_notification(tool_name: str, token: str | None = None) -> dict[ def subscribe(callback: ToolStreamCallback) -> Callable[[], None]: - """Subscribe to the tool-stream LCM topic and return a cleanup callable.""" - transport: pLCMTransport[dict[str, Any]] = pLCMTransport(TOOL_STREAM_TOPIC) + """Subscribe to the tool-stream topic and return a cleanup callable.""" + transport: PubSubTransport[dict[str, Any]] = make_transport(TOOL_STREAM_TOPIC) transport.start() unsubscribe = transport.subscribe(callback) @@ -139,7 +141,7 @@ def __init__(self, tool_name: str) -> None: self.id: str = str(uuid.uuid4()) self._closed: threading.Event = threading.Event() self._lock = threading.Lock() - self._transport: pLCMTransport[dict[str, Any]] | None = None + self._transport: PubSubTransport[dict[str, Any]] | None = None context = current_skill_context() if context is None: raise RuntimeError( @@ -177,7 +179,7 @@ def send(self, message: str) -> None: logger.warning("send on closed ToolStream", stream_id=self.id) return if self._transport is None: - self._transport = pLCMTransport(TOOL_STREAM_TOPIC) + self._transport = make_transport(TOOL_STREAM_TOPIC) self._transport.start() self._progress += 1 progress = self._progress @@ -203,7 +205,7 @@ def stop(self) -> None: # If no `send()` ever happened we spin up a transport here so the # lifecycle signal isn't lost. if transport is None: - transport = pLCMTransport(TOOL_STREAM_TOPIC) + transport = make_transport(TOOL_STREAM_TOPIC) transport.start() try: transport.publish(make_stopped_notification(self.tool_name, self._acquire_token)) diff --git a/dimos/agents/web_human_input.py b/dimos/agents/web_human_input.py index 0a4fe7c3f3..720887cb38 100644 --- a/dimos/agents/web_human_input.py +++ b/dimos/agents/web_human_input.py @@ -21,7 +21,8 @@ from dimos.constants import DEFAULT_THREAD_JOIN_TIMEOUT from dimos.core.core import rpc from dimos.core.module import Module -from dimos.core.transport import pLCMTransport +from dimos.core.transport import PubSubTransport +from dimos.core.transport_factory import make_transport from dimos.stream.audio.node_normalizer import AudioNormalizer from dimos.utils.logging_config import setup_logger from dimos.web.robot_web_interface import RobotWebInterface @@ -35,13 +36,13 @@ class WebInput(Module): _web_interface: RobotWebInterface | None = None _thread: Thread | None = None - _human_transport: pLCMTransport[str] | None = None + _human_transport: PubSubTransport[str] | None = None @rpc def start(self) -> None: super().start() - self._human_transport = pLCMTransport("/human_input") + self._human_transport = make_transport("/human_input") audio_subject: rx.subject.Subject[AudioEvent] = rx.subject.Subject() @@ -83,5 +84,5 @@ def stop(self) -> None: if self._thread: self._thread.join(timeout=DEFAULT_THREAD_JOIN_TIMEOUT) if self._human_transport: - self._human_transport.lcm.stop() + self._human_transport.stop() super().stop() diff --git a/dimos/conftest.py b/dimos/conftest.py index e3e35fdddf..b0cc98f6b5 100644 --- a/dimos/conftest.py +++ b/dimos/conftest.py @@ -68,6 +68,7 @@ from dimos.core.coordination.module_coordinator import ModuleCoordinator from dimos.core.coordination.process_lifecycle import spawn_watchdog +from dimos.utils.testing.waiting import retry_until as _retry_until, wait_until as _wait_until load_dotenv() @@ -134,6 +135,18 @@ def lcm_url() -> str: return os.environ.get("LCM_DEFAULT_URL", "udpm://239.255.76.67:7667?ttl=0") +@pytest.fixture +def wait_until(): + """Poll a predicate until it's true or a timeout elapses. See dimos.utils.testing.waiting.""" + return _wait_until + + +@pytest.fixture +def retry_until(): + """Retry an action until a threading.Event fires. See dimos.utils.testing.waiting.""" + return _retry_until + + @pytest.hookimpl(tryfirst=True) def pytest_collection_modifyitems(config, items): _skipif_markers = { diff --git a/dimos/core/coordination/coordinator_rpc.py b/dimos/core/coordination/coordinator_rpc.py index e0ea6f28a9..fd65b182ca 100644 --- a/dimos/core/coordination/coordinator_rpc.py +++ b/dimos/core/coordination/coordinator_rpc.py @@ -16,36 +16,39 @@ from typing import TYPE_CHECKING, Any -from dimos.protocol.rpc.pubsubrpc import LCMRPC +from dimos.core.global_config import global_config +from dimos.core.transport_factory import rpc_backend from dimos.utils.logging_config import setup_logger if TYPE_CHECKING: - from dimos.protocol.rpc.spec import RPCInspectable + from dimos.protocol.rpc.spec import RPCInspectable, RPCSpec logger = setup_logger() class CoordinatorRPC: - """Owns the LCM RPC connection to the singleton Coordinator service.""" + """Owns the RPC connection to the singleton Coordinator service.""" NAME = "Coordinator" - def __init__(self, rpc: LCMRPC) -> None: + def __init__(self, rpc: RPCSpec) -> None: self._rpc = rpc @classmethod def serve(cls, coordinator: RPCInspectable) -> CoordinatorRPC: """Publish `coordinator`'s @rpc methods under the `Coordinator/` prefix.""" cls._ensure_no_existing_service() - rpc = LCMRPC() - rpc.serve_module_rpc(coordinator, name=cls.NAME) + rpc = rpc_backend()() + # start() before serve_module_rpc(): Zenoh's subscribe needs an open + # session (acquired in start()), whereas LCM tolerates either order. rpc.start() + rpc.serve_module_rpc(coordinator, name=cls.NAME) return cls(rpc) @classmethod def connect(cls, *, timeout: float) -> CoordinatorRPC: """Attach to a running Coordinator, raising `TimeoutError` if none answers.""" - rpc = LCMRPC() + rpc = rpc_backend()() rpc.start() client = cls(rpc) try: @@ -65,7 +68,7 @@ def call(self, method: str, *args: Any, rpc_timeout: float | None = None, **kwar return result @property - def rpc(self) -> LCMRPC: + def rpc(self) -> RPCSpec: return self._rpc def stop(self) -> None: @@ -76,7 +79,7 @@ def stop(self) -> None: @classmethod def _ensure_no_existing_service(cls) -> None: - probe = LCMRPC() + probe = rpc_backend()() probe.start() try: try: @@ -84,8 +87,8 @@ def _ensure_no_existing_service(cls) -> None: except TimeoutError: return raise RuntimeError( - f"another {cls.NAME} service is already running on this LCM bus. " - "Run `dimos stop` first." + f"another {cls.NAME} service is already running on the " + f"{global_config.transport} bus. Run `dimos stop` first." ) finally: probe.stop() diff --git a/dimos/core/coordination/module_coordinator.py b/dimos/core/coordination/module_coordinator.py index a560152133..849c928681 100644 --- a/dimos/core/coordination/module_coordinator.py +++ b/dimos/core/coordination/module_coordinator.py @@ -29,7 +29,14 @@ from dimos.core.global_config import GlobalConfig, global_config from dimos.core.module import ModuleBase, ModuleSpec from dimos.core.resource import Resource -from dimos.core.transport import LCMTransport, PubSubTransport, pLCMTransport +from dimos.core.transport import ( + LCMTransport, + PubSubTransport, + ZenohTransport, + pLCMTransport, + pZenohTransport, +) +from dimos.core.transport_factory import make_transport from dimos.spec.utils import is_spec, spec_annotation_compliance, spec_structural_compliance from dimos.utils.generic import short_id from dimos.utils.logging_config import setup_logger @@ -572,15 +579,37 @@ def _is_name_unique(blueprint: Blueprint, name: str) -> bool: def _get_transport_for(blueprint: Blueprint, name: str, stream_type: type) -> PubSubTransport[Any]: - transport = blueprint.transport_map.get((name, stream_type), None) - if transport: - return transport + mapped = blueprint.transport_map.get((name, stream_type), None) + if mapped is not None: + return _coerce_transport_to_backend(mapped) - use_pickled = getattr(stream_type, "lcm_encode", None) is None topic = f"/{name}" if _is_name_unique(blueprint, name) else f"/{short_id()}" - transport = pLCMTransport(topic) if use_pickled else LCMTransport(topic, stream_type) + return make_transport(topic, stream_type) + + +def _coerce_transport_to_backend(transport: PubSubTransport[Any]) -> PubSubTransport[Any]: + """Rebuild an explicitly-mapped LCM/Zenoh transport for the active backend. + + Blueprints pin specific channels in their `transport_map` with e.g. `LCMTransport("/cmd_vel", + Twist)`. So the global transport switch reaches those too, rebuild the plain LCM<->Zenoh pair + via the factory when it doesn't match `global_config.transport`. Deliberate non-default choices + (`JpegLcmTransport`, SHM, ROS, DDS, ...) are exact-type-checked out and left untouched. + """ + want = global_config.transport + is_pickled = type(transport) in (pLCMTransport, pZenohTransport) + is_lcm = type(transport) in (LCMTransport, pLCMTransport) + is_zenoh = type(transport) in (ZenohTransport, pZenohTransport) + if not ((want == "zenoh" and is_lcm) or (want == "lcm" and is_zenoh)): + return transport - return transport + if is_pickled: + raw, msg_type = transport.topic, None + else: + raw, msg_type = transport.topic.topic, transport.topic.lcm_type + # Strip the Zenoh 'dimos/' namespace (if present) back to the logical name. + # The factory re-applies the right prefix for the target backend. + logical = raw[len("dimos/") :] if raw.startswith("dimos/") else raw + return make_transport(logical, msg_type) def _verify_no_name_conflicts(blueprint: Blueprint) -> None: @@ -648,7 +677,8 @@ def _run_configurators(blueprint: Blueprint) -> None: from dimos.protocol.service.system_configurator.base import configure_system from dimos.protocol.service.system_configurator.lcm_config import lcm_configurators - configurators = [*lcm_configurators(), *blueprint.configurator_checks] + lcm_checks = lcm_configurators() if global_config.transport == "lcm" else [] + configurators = [*lcm_checks, *blueprint.configurator_checks] try: configure_system(configurators) diff --git a/dimos/core/coordination/python_worker.py b/dimos/core/coordination/python_worker.py index c0d5d8c9c4..14ad46fe43 100644 --- a/dimos/core/coordination/python_worker.py +++ b/dimos/core/coordination/python_worker.py @@ -371,7 +371,15 @@ def _worker_entrypoint(conn: Connection, worker_id: int) -> None: def _handle_request(request: Any, state: _WorkerState) -> WorkerResponse: match request: case DeployModuleRequest(module_id=module_id, module_class=module_class, kwargs=kwargs): + # Always use the same transport and QoS rules as the host. + host_config = kwargs.get("g") + if host_config is not None: + global_config.update( + transport=host_config.transport, zenoh_qos=host_config.zenoh_qos + ) + state.instances[module_id] = module_class(**kwargs) + return WorkerResponse(result=module_id) case SetRefRequest(module_id=module_id, ref=ref): diff --git a/dimos/core/coordination/test_module_reloading.py b/dimos/core/coordination/test_module_reloading.py index 46cbd3f181..b16b32afcb 100644 --- a/dimos/core/coordination/test_module_reloading.py +++ b/dimos/core/coordination/test_module_reloading.py @@ -95,8 +95,10 @@ def test_module_file(): def test_module_reloading(repl, greeting, response, test_module_file): repl.stdin.write(""" +from dimos.core.global_config import global_config from dimos.core.coordination.module_coordinator import ModuleCoordinator from dimos.core.coordination import _test_module +global_config.update(transport="lcm") mc = ModuleCoordinator.build(_test_module.AliceModule.blueprint()) """) repl.stdin.flush() diff --git a/dimos/core/global_config.py b/dimos/core/global_config.py index c75ea6fd18..48f3d2887d 100644 --- a/dimos/core/global_config.py +++ b/dimos/core/global_config.py @@ -12,12 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +import platform import re +from typing import Literal, TypeAlias +from pydantic import AliasChoices, Field from pydantic_settings import BaseSettings, SettingsConfigDict from dimos.constants import DEFAULT_BUILD_NATIVE from dimos.models.vl.types import VlModelName +from dimos.protocol.pubsub.impl.zenohqos import DEFAULT_ZENOH_QOS, ZenohQoS from dimos.visualization.rerun.constants import ( RERUN_ENABLE_WEB, RERUN_OPEN_DEFAULT, @@ -25,11 +29,19 @@ ViewerBackend, ) +TransportBackend: TypeAlias = Literal["lcm", "zenoh"] + def _get_all_numbers(s: str) -> list[float]: return [float(x) for x in re.findall(r"-?\d+\.?\d*", s)] +def _default_transport() -> TransportBackend: + if platform.system() == "Darwin": + return "zenoh" + return "lcm" + + class GlobalConfig(BaseSettings): robot_ip: str | None = None robot_ips: str | None = None @@ -65,6 +77,19 @@ class GlobalConfig(BaseSettings): nerf_speed: float = 1.0 planner_robot_speed: float | None = None mcp_port: int = 9990 + # `DIMOS_TRANSPORT` (or `.env`) is the single switch read by every process + # (dimos, humancli, agentspy, dtop). The `transport` alias keeps the bare + # env name and the `--transport` CLI flag (which sets the field by name) working. + transport: TransportBackend = Field( + default_factory=_default_transport, + validation_alias=AliasChoices("DIMOS_TRANSPORT", "transport"), + ) + # Per-key-expr Zenoh publisher QoS rules; first matching rule wins. + # Env override is JSON: DIMOS_ZENOH_QOS='[{"key":"dimos/foo","reliability":"best_effort"}]' + zenoh_qos: tuple[ZenohQoS, ...] = Field( + default=DEFAULT_ZENOH_QOS, + validation_alias=AliasChoices("DIMOS_ZENOH_QOS", "zenoh_qos"), + ) build_native: bool = DEFAULT_BUILD_NATIVE dtop: bool = False obstacle_avoidance: bool = True @@ -77,6 +102,7 @@ class GlobalConfig(BaseSettings): env_file=".env", env_file_encoding="utf-8", extra="ignore", + validate_assignment=True, ) def update(self, **kwargs: object) -> None: diff --git a/dimos/core/module.py b/dimos/core/module.py index 26a2b6f893..4f7aa9f964 100644 --- a/dimos/core/module.py +++ b/dimos/core/module.py @@ -40,10 +40,10 @@ from dimos.core.resource import CompositeResource from dimos.core.rpc_client import RpcCall from dimos.core.stream import In, Out, RemoteOut, Transport -from dimos.protocol.rpc.pubsubrpc import LCMRPC +from dimos.core.transport_factory import rpc_backend, tf_backend from dimos.protocol.rpc.spec import DEFAULT_RPC_TIMEOUT, DEFAULT_RPC_TIMEOUTS, RPCSpec from dimos.protocol.service.spec import BaseConfig, Configurable -from dimos.protocol.tf.tf import LCMTF, TFSpec +from dimos.protocol.tf.tf import TFSpec from dimos.utils import colors from dimos.utils.generic import classproperty from dimos.utils.logging_config import setup_logger @@ -103,10 +103,10 @@ def get_loop() -> tuple[asyncio.AbstractEventLoop, threading.Thread | None]: class ModuleConfig(BaseConfig): - rpc_transport: type[RPCSpec] = LCMRPC + rpc_transport: type[RPCSpec] = Field(default_factory=rpc_backend) default_rpc_timeout: float = DEFAULT_RPC_TIMEOUT rpc_timeouts: dict[str, float] = Field(default_factory=lambda: dict(DEFAULT_RPC_TIMEOUTS)) - tf_transport: type[TFSpec] = LCMTF # type: ignore[type-arg] + tf_transport: type[TFSpec] = Field(default_factory=tf_backend) # type: ignore[type-arg] frame_id_prefix: str | None = None frame_id: str | None = None g: GlobalConfig = global_config @@ -154,8 +154,10 @@ def __init__(self, config_args: dict[str, Any]) -> None: rpc_timeouts=self.config.rpc_timeouts, default_rpc_timeout=self.config.default_rpc_timeout, ) - self.rpc.serve_module_rpc(self) + # start() before serve_module_rpc(): Zenoh's subscribe needs an open + # session (acquired in start()), whereas LCM tolerates either order. self.rpc.start() # type: ignore[attr-defined] + self.rpc.serve_module_rpc(self) except ValueError: ... @@ -269,8 +271,7 @@ def __setstate__(self, state) -> None: # type: ignore[no-untyped-def] @property def tf(self): # type: ignore[no-untyped-def] if self._tf is None: - # self._tf = self.config.tf_transport() - self._tf = LCMTF() + self._tf = self.config.tf_transport() return self._tf @tf.setter diff --git a/dimos/core/resource_monitor/logger.py b/dimos/core/resource_monitor/logger.py index b4668f7198..ad1afec54d 100644 --- a/dimos/core/resource_monitor/logger.py +++ b/dimos/core/resource_monitor/logger.py @@ -21,6 +21,7 @@ if TYPE_CHECKING: from dimos.core.resource_monitor.stats import ProcessStats, WorkerStats + from dimos.core.transport import PubSubTransport logger = setup_logger() @@ -63,12 +64,12 @@ def log_stats(self, coordinator: ProcessStats, workers: list[WorkerStats]) -> No class LCMResourceLogger: - """Publishes resource stats as dicts over a pickle LCM channel.""" + """Publishes resource stats as dicts over the active transport's pubsub channel.""" - def __init__(self, topic: str = "/dimos/resource_stats") -> None: - from dimos.core.transport import pLCMTransport + def __init__(self, topic: str = "/resource_stats") -> None: + from dimos.core.transport_factory import make_transport - self._transport: pLCMTransport[dict[str, Any]] = pLCMTransport(topic) + self._transport: PubSubTransport[dict[str, Any]] = make_transport(topic) def stop(self) -> None: self._transport.stop() diff --git a/dimos/core/rpc_client.py b/dimos/core/rpc_client.py index 56dcb4e200..cfdcf3dfa0 100644 --- a/dimos/core/rpc_client.py +++ b/dimos/core/rpc_client.py @@ -20,7 +20,7 @@ from dimos.core.coordination.python_worker import Actor, MethodCallProxy from dimos.core.stream import RemoteStream -from dimos.protocol.rpc.pubsubrpc import LCMRPC +from dimos.core.transport_factory import rpc_backend from dimos.protocol.rpc.spec import RPCSpec from dimos.utils.logging_config import setup_logger @@ -105,10 +105,10 @@ def __init__( actor_instance: Actor | None, actor_class: type[ModuleBase], *, - rpc: LCMRPC | None = None, + rpc: RPCSpec | None = None, ) -> None: if rpc is None: - self.rpc = LCMRPC() + self.rpc = rpc_backend()() self._owns_rpc = True self.rpc.start() else: @@ -121,7 +121,7 @@ def __init__( self._unsub_fns: list = [] # type: ignore[type-arg] @classmethod - def remote(cls, actor_class: type[ModuleBase], *, rpc: LCMRPC | None = None) -> RPCClient: + def remote(cls, actor_class: type[ModuleBase], *, rpc: RPCSpec | None = None) -> RPCClient: """Build an RPCClient with no parent-side Actor (cross-process clients).""" return cls(None, actor_class, rpc=rpc) diff --git a/dimos/core/stream.py b/dimos/core/stream.py index c988f7428e..f62fc74676 100644 --- a/dimos/core/stream.py +++ b/dimos/core/stream.py @@ -80,8 +80,8 @@ class State(enum.Enum): class Transport(Resource, ObservableMixin[T]): - # used by local Output - def broadcast(self, selfstream: Out[T], value: T) -> None: + # used by local Output; selfstream is None when publishing without a source stream + def broadcast(self, selfstream: Out[T] | None, value: T) -> None: raise NotImplementedError # used by local Input @@ -91,7 +91,7 @@ def subscribe( raise NotImplementedError def publish(self, msg: T) -> None: - self.broadcast(None, msg) # type: ignore[arg-type] + self.broadcast(None, msg) class Stream(Generic[T]): diff --git a/dimos/core/test_global_config.py b/dimos/core/test_global_config.py index d42d004bd2..d3ab105cbe 100644 --- a/dimos/core/test_global_config.py +++ b/dimos/core/test_global_config.py @@ -12,16 +12,31 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Tests for GlobalConfig security defaults.""" +from dimos.core.global_config import GlobalConfig +from dimos.protocol.pubsub.impl.zenohqos import DEFAULT_ZENOH_QOS, ZenohQoS class TestGlobalConfigSecurityDefaults: """Network services must bind to localhost by default (not 0.0.0.0).""" def test_listen_host_defaults_to_localhost(self) -> None: - from dimos.core.global_config import GlobalConfig - config = GlobalConfig() assert config.listen_host == "127.0.0.1", ( f"listen_host must default to 127.0.0.1, got {config.listen_host}" ) + + +class TestZenohQoSField: + def test_default_is_rule_table(self) -> None: + assert GlobalConfig().zenoh_qos == DEFAULT_ZENOH_QOS + + def test_env_json_override(self, monkeypatch) -> None: + monkeypatch.setenv("DIMOS_ZENOH_QOS", '[{"key": "dimos/x", "reliability": "best_effort"}]') + config = GlobalConfig() + assert config.zenoh_qos == (ZenohQoS(key="dimos/x", reliability="best_effort"),) + + def test_update_coerces_dicts(self) -> None: + # Blueprint overrides arrive as plain dicts via global_config.update(). + config = GlobalConfig() + config.update(zenoh_qos=({"key": "dimos/x", "congestion_control": "block"},)) + assert config.zenoh_qos == (ZenohQoS(key="dimos/x", congestion_control="block"),) diff --git a/dimos/core/test_transport_factory.py b/dimos/core/test_transport_factory.py new file mode 100644 index 0000000000..a479cce235 --- /dev/null +++ b/dimos/core/test_transport_factory.py @@ -0,0 +1,125 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import pytest + +from dimos.core.global_config import GlobalConfig +from dimos.core.transport import ( + LCMTransport, + ZenohTransport, + pLCMTransport, + pZenohTransport, +) +from dimos.core.transport_factory import ( + apply_transport_arg, + make_transport, + rpc_backend, + tf_backend, + transport_topic, +) +from dimos.msgs.sensor_msgs.Image import Image +from dimos.protocol.rpc.pubsubrpc import LCMRPC, ZenohRPC +from dimos.protocol.tf.tf import LCMTF, ZenohTF + +LCM = GlobalConfig(transport="lcm") +ZENOH = GlobalConfig(transport="zenoh") + + +def test_transport_topic_lcm() -> None: + # LCM channels are leading-slash; either input form normalizes the same. + assert transport_topic("/human_input", LCM) == "/human_input" + assert transport_topic("human_input", LCM) == "/human_input" + + +def test_transport_topic_zenoh() -> None: + # Zenoh keyexprs can't start with '/'; namespaced under 'dimos/'. + assert transport_topic("/human_input", ZENOH) == "dimos/human_input" + assert transport_topic("human_input", ZENOH) == "dimos/human_input" + assert transport_topic("/coordinator/joint_state", ZENOH) == "dimos/coordinator/joint_state" + + +def test_make_transport_lcm_typed() -> None: + t = make_transport("/camera/color", Image, g=LCM) + assert type(t) is LCMTransport + assert t.topic.topic == "/camera/color" + + +def test_make_transport_lcm_pickled() -> None: + t = make_transport("/human_input", g=LCM) + assert type(t) is pLCMTransport + assert t.topic == "/human_input" + + +def test_make_transport_zenoh_typed() -> None: + t = make_transport("/camera/color", Image, g=ZENOH) + assert type(t) is ZenohTransport + assert t.topic.topic == "dimos/camera/color" + + +def test_make_transport_zenoh_pickled() -> None: + t = make_transport("/human_input", g=ZENOH) + assert type(t) is pZenohTransport + assert t.topic == "dimos/human_input" + + +def test_rpc_backend_resolves_per_transport() -> None: + assert rpc_backend(LCM) is LCMRPC + assert rpc_backend(ZENOH) is ZenohRPC + + +def test_tf_backend_resolves_per_transport() -> None: + assert tf_backend(LCM) is LCMTF + assert tf_backend(ZENOH) is ZenohTF + + +def test_zenoh_tf_config_topic_and_pubsub() -> None: + from dimos.protocol.pubsub.impl.zenohpubsub import Zenoh + from dimos.protocol.tf.tf import ZenohPubsubConfig + + cfg = ZenohPubsubConfig() + assert cfg.topic.topic == "dimos/tf" + assert cfg.pubsub is Zenoh + + +def test_zenoh_rpc_topicgen_has_no_leading_slash() -> None: + assert ZenohRPC().topicgen("Hello/say", req_or_res=True).topic == "dimos/rpc/Hello/say/res" + + +def test_apply_transport_arg() -> None: + g = GlobalConfig(transport="lcm") + apply_transport_arg(["prog", "--transport", "zenoh"], g=g) + assert g.transport == "zenoh" + apply_transport_arg(["prog", "--transport=lcm"], g=g) + assert g.transport == "lcm" + apply_transport_arg(["prog", "--other", "x"], g=g) # no flag -> unchanged + assert g.transport == "lcm" + + +@pytest.mark.parametrize( + "argv", + [ + ["prog", "--transport", "zeno"], # typo + ["prog", "--transport=zeno"], + ["prog", "--transport"], # missing value + ["prog", "--transport", "--web"], # next flag must not be consumed as value + ], +) +def test_apply_transport_arg_bad_value_exits(argv: list[str]) -> None: + g = GlobalConfig(transport="lcm") + with pytest.raises(SystemExit) as exc: + apply_transport_arg(argv, g=g) + assert exc.value.code == 2 + assert g.transport == "lcm" # config left untouched diff --git a/dimos/core/test_zenoh_transport.py b/dimos/core/test_zenoh_transport.py new file mode 100644 index 0000000000..aba4e833aa --- /dev/null +++ b/dimos/core/test_zenoh_transport.py @@ -0,0 +1,249 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import threading +from types import SimpleNamespace +from typing import cast + +import numpy as np +from pydantic import ValidationError +import pytest + +from dimos.core.coordination.blueprints import autoconnect +from dimos.core.coordination.module_coordinator import ( + _coerce_transport_to_backend, + _get_transport_for, + _run_configurators, +) +from dimos.core.global_config import GlobalConfig, global_config +from dimos.core.module import Module +from dimos.core.stream import In, Out +from dimos.core.transport import ( + JpegLcmTransport, + LCMTransport, + ZenohTransport, + pLCMTransport, + pZenohTransport, +) +from dimos.msgs.sensor_msgs.Image import Image +from dimos.protocol.service.zenohservice import ZenohSessionPool + + +class TypedMsg: + """A fake typed message with lcm_encode for testing.""" + + @staticmethod + def lcm_encode() -> bytes: + return b"" + + +class UntypedMsg: + """A message without lcm_encode. Triggers pickle transport.""" + + pass + + +class ProducerModule(Module): + typed_data: Out[TypedMsg] + untyped_data: Out[UntypedMsg] + + +class ConsumerModule(Module): + typed_data: In[TypedMsg] + untyped_data: In[UntypedMsg] + + +@pytest.fixture() +def use_lcm(mocker): + mocker.patch.object(global_config, "transport", "lcm") + + +@pytest.fixture() +def use_zenoh(mocker): + mocker.patch.object(global_config, "transport", "zenoh") + + +@pytest.fixture() +def blueprint(): + return autoconnect(ProducerModule.blueprint(), ConsumerModule.blueprint()) + + +@pytest.fixture() +def lcm_config_mock(mocker): + """Stub out LCM system configuration; return the lcm_configurators mock for assertions.""" + mock = mocker.patch( + "dimos.protocol.service.system_configurator.lcm_config.lcm_configurators", + return_value=[], + ) + mocker.patch("dimos.protocol.service.system_configurator.base.configure_system") + return mock + + +@pytest.fixture() +def session_pool(): + pool = ZenohSessionPool() + yield pool + pool.close_all() + + +@pytest.fixture() +def collector(): + """Collect subscription messages; `event` fires when the first one arrives.""" + received = [] + event = threading.Event() + + def callback(msg): + received.append(msg) + event.set() + + return SimpleNamespace(received=received, event=event, callback=callback) + + +def test_default_transport_is_lcm_on_linux(mocker) -> None: + mocker.patch("dimos.core.global_config.platform.system", return_value="Linux") + + config = GlobalConfig() + assert config.transport == "lcm" + + +def test_default_transport_is_zenoh_on_macos(mocker) -> None: + mocker.patch("dimos.core.global_config.platform.system", return_value="Darwin") + + config = GlobalConfig() + assert config.transport == "zenoh" + + +def test_transport_can_be_set_to_zenoh() -> None: + config = GlobalConfig() + config.update(transport="zenoh") + assert config.transport == "zenoh" + + +def test_invalid_transport_is_rejected_at_init() -> None: + with pytest.raises(ValidationError, match="transport"): + GlobalConfig(transport=cast("object", "invalid")) + + +def test_invalid_transport_is_rejected_on_update() -> None: + config = GlobalConfig() + with pytest.raises(ValidationError, match="transport"): + config.update(transport=cast("object", "invalid")) + + +def test_lcm_transport_returned_when_transport_is_lcm(use_lcm, blueprint) -> None: + transport = _get_transport_for(blueprint, "typed_data", TypedMsg) + assert isinstance(transport, LCMTransport) + + +def test_lcm_pickle_transport_returned_for_untyped_when_lcm(use_lcm, blueprint) -> None: + transport = _get_transport_for(blueprint, "untyped_data", UntypedMsg) + assert isinstance(transport, pLCMTransport) + + +def test_zenoh_transport_returned_when_transport_is_zenoh(use_zenoh, blueprint) -> None: + transport = _get_transport_for(blueprint, "typed_data", TypedMsg) + assert isinstance(transport, ZenohTransport) + + +def test_zenoh_pickle_transport_returned_for_untyped_when_zenoh(use_zenoh, blueprint) -> None: + transport = _get_transport_for(blueprint, "untyped_data", UntypedMsg) + assert isinstance(transport, pZenohTransport) + + +def test_zenoh_topic_uses_dimos_prefix(use_zenoh, blueprint) -> None: + transport = _get_transport_for(blueprint, "untyped_data", UntypedMsg) + assert isinstance(transport, pZenohTransport) + assert "dimos/" in transport.topic + + +def test_lcm_configurators_run_when_transport_is_lcm(use_lcm, blueprint, lcm_config_mock) -> None: + _run_configurators(blueprint) + lcm_config_mock.assert_called_once() + + +def test_lcm_configurators_skipped_when_transport_is_zenoh( + use_zenoh, blueprint, lcm_config_mock +) -> None: + _run_configurators(blueprint) + lcm_config_mock.assert_not_called() + + +def test_zenoh_transport_broadcast_and_subscribe(retry_until, session_pool, collector) -> None: + t = ZenohTransport("dimos/test/transport", Image, session_pool=session_pool) + t.start() + t.subscribe(collector.callback) + + test_img = Image(np.zeros((2, 2, 3), dtype=np.uint8)) + retry_until(collector.event, lambda: t.broadcast(None, test_img)) + assert isinstance(collector.received[0], Image) + t.stop() + + +def test_pzenoh_transport_broadcast_and_subscribe(retry_until, session_pool, collector) -> None: + t = pZenohTransport("dimos/test/pickle_transport", session_pool=session_pool) + t.start() + t.subscribe(collector.callback) + + retry_until(collector.event, lambda: t.broadcast(None, {"key": "value"})) + assert collector.received[0] == {"key": "value"} + t.stop() + + +def test_auto_start_on_broadcast(session_pool) -> None: + t = pZenohTransport("dimos/test/autostart", session_pool=session_pool) + # Don't call start(); broadcast should auto-start + t.broadcast(None, "test") + assert t._started + t.stop() + + +def test_stop_and_restart(session_pool) -> None: + t = pZenohTransport("dimos/test/restart", session_pool=session_pool) + t.start() + assert t._started + t.stop() + assert not t._started + t.start() + assert t._started + t.stop() + + +def test_coerce_lcm_to_zenoh_typed(use_zenoh) -> None: + t = _coerce_transport_to_backend(LCMTransport("/cmd_vel", Image)) + assert type(t) is ZenohTransport + assert t.topic.topic == "dimos/cmd_vel" + + +def test_coerce_pickled_lcm_to_zenoh(use_zenoh) -> None: + t = _coerce_transport_to_backend(pLCMTransport("/human_input")) + assert type(t) is pZenohTransport + assert t.topic == "dimos/human_input" + + +def test_coerce_zenoh_to_lcm_typed(use_lcm) -> None: + t = _coerce_transport_to_backend(ZenohTransport("dimos/cmd_vel", Image)) + assert type(t) is LCMTransport + assert t.topic.topic == "/cmd_vel" + + +def test_coerce_identity_when_backend_matches(use_lcm) -> None: + orig = LCMTransport("/cmd_vel", Image) + assert _coerce_transport_to_backend(orig) is orig + + +def test_coerce_leaves_deliberate_jpeg_untouched(use_zenoh) -> None: + jpeg = JpegLcmTransport("/color_image", Image) + assert _coerce_transport_to_backend(jpeg) is jpeg diff --git a/dimos/core/transport.py b/dimos/core/transport.py index 6435003758..e1088cd0a9 100644 --- a/dimos/core/transport.py +++ b/dimos/core/transport.py @@ -19,6 +19,7 @@ TYPE_CHECKING, Any, TypeVar, + cast, ) from dimos.core.stream import In, Out, Stream, Transport @@ -35,6 +36,11 @@ from dimos.protocol.pubsub.impl.lcmpubsub import LCM, PickleLCM, Topic as LCMTopic from dimos.protocol.pubsub.impl.rospubsub import DimosROS, ROSTopic from dimos.protocol.pubsub.impl.shmpubsub import BytesSharedMemory, PickleSharedMemory +from dimos.protocol.pubsub.impl.zenohpubsub import ( + PickleZenoh, + Topic as ZenohTopic, + Zenoh, +) if TYPE_CHECKING: from collections.abc import Callable @@ -267,7 +273,7 @@ def __init__(self, topic: str, msg_type: type[DimosMsg], **kwargs: Any) -> None: def __reduce__(self) -> tuple[Any, ...]: return (ROSTransport, (self.topic.topic, self.topic.msg_type)) - def broadcast(self, _: Out[DimosMsg], msg: DimosMsg) -> None: + def broadcast(self, _: Out[DimosMsg] | None, msg: DimosMsg) -> None: if self._ros is None: self.start() assert self._ros is not None # for type narrowing @@ -315,18 +321,90 @@ def stop(self) -> None: self._started = False def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def] - with self._start_lock: - if not self._started: - self.start() - self.dds.publish(self.topic, msg) + if not self._started: + self.start() + self.dds.publish(self.topic, msg) def subscribe( self, callback: Callable[[T], None], selfstream: Stream[T] | None = None ) -> Callable[[], None]: - with self._start_lock: - if not self._started: - self.start() - return self.dds.subscribe(self.topic, lambda msg, topic: callback(msg)) + if not self._started: + self.start() + return self.dds.subscribe(self.topic, lambda msg, topic: callback(msg)) + +class ZenohTransport(PubSubTransport[T]): + """Zenoh transport with LCM encoding for typed DimosMsg.""" + + _started: bool = False + + def __init__(self, topic: str, type: type, **kwargs: Any) -> None: + super().__init__(LCMTopic(topic, type)) + self.zenoh = Zenoh(**kwargs) + self._start_lock = threading.RLock() + + def __reduce__(self) -> tuple[Any, ...]: + return (ZenohTransport, (self.topic.topic, self.topic.lcm_type)) -class ZenohTransport(PubSubTransport[T]): ... + def start(self) -> None: + with self._start_lock: + if not self._started: + self.zenoh.start() + self._started = True + + def stop(self) -> None: + with self._start_lock: + if self._started: + self.zenoh.stop() + self._started = False + + def broadcast(self, _: Out[T] | None, msg: T) -> None: + if not self._started: + self.start() + self.zenoh.publish(self.topic, cast("DimosMsg", msg)) + + def subscribe( + self, callback: Callable[[T], None], selfstream: Stream[T] | None = None + ) -> Callable[[], None]: + if not self._started: + self.start() + return self.zenoh.subscribe(self.topic, lambda msg, topic: callback(cast("T", msg))) + + +class pZenohTransport(PubSubTransport[T]): + """Zenoh transport with pickle encoding for arbitrary Python objects.""" + + _started: bool = False + + def __init__(self, topic: str, **kwargs: Any) -> None: + super().__init__(topic) + self.zenoh = PickleZenoh(**kwargs) + self._zenoh_topic = ZenohTopic(topic) + self._start_lock = threading.RLock() + + def __reduce__(self) -> tuple[Any, ...]: + return (pZenohTransport, (self.topic,)) + + def start(self) -> None: + with self._start_lock: + if not self._started: + self.zenoh.start() + self._started = True + + def stop(self) -> None: + with self._start_lock: + if self._started: + self.zenoh.stop() + self._started = False + + def broadcast(self, _: Out[T] | None, msg: T) -> None: + if not self._started: + self.start() + self.zenoh.publish(self._zenoh_topic, msg) + + def subscribe( + self, callback: Callable[[T], None], selfstream: Stream[T] | None = None + ) -> Callable[[], None]: + if not self._started: + self.start() + return self.zenoh.subscribe(self._zenoh_topic, lambda msg, topic: callback(msg)) diff --git a/dimos/core/transport_factory.py b/dimos/core/transport_factory.py new file mode 100644 index 0000000000..fdfe8001f0 --- /dev/null +++ b/dimos/core/transport_factory.py @@ -0,0 +1,119 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Backend-agnostic transport construction.""" + +from __future__ import annotations + +import os +import sys +from typing import TYPE_CHECKING, Any, NoReturn, get_args + +from dimos.core.global_config import GlobalConfig, TransportBackend, global_config +from dimos.core.transport import ( + LCMTransport, + ZenohTransport, + pLCMTransport, + pZenohTransport, +) +from dimos.protocol.rpc.pubsubrpc import LCMRPC, ZenohRPC +from dimos.protocol.tf.tf import LCMTF, ZenohTF + +if TYPE_CHECKING: + from dimos.core.transport import PubSubTransport + from dimos.protocol.rpc.spec import RPCSpec + from dimos.protocol.tf.tf import TFSpec + + +def transport_topic(name: str, g: GlobalConfig = global_config) -> str: + """Map a logical channel name to the active backend's topic string. + + LCM channels are leading-slash paths (`/foo`). + + Zenoh key expressions can't start with `/` and are namespaced under `dimos`. + """ + if g.transport == "zenoh": + return "dimos/" + name.lstrip("/") + return name if name.startswith("/") else "/" + name + + +def make_transport( + name: str, msg_type: type | None = None, *, g: GlobalConfig = global_config +) -> PubSubTransport[Any]: + """Construct the active-backend pub/sub transport for a logical channel. + + A pickled (self-describing) transport is used when no `msg_type` is given or + the type has no `lcm_encode`. Otherwise a typed transport is used. + + The factory covers pub/sub backends only; per-backend channel tuning lives + in `GlobalConfig` overlay fields (`zenoh_qos` maps key-expr patterns to + Zenoh publisher QoS) rather than per-call-site parameters. A future + non-pubsub backend (e.g. TCP) would add its own overlay fields the same way. + """ + + use_pickled = msg_type is None or getattr(msg_type, "lcm_encode", None) is None + topic = transport_topic(name, g) + if use_pickled: + return pZenohTransport(topic) if g.transport == "zenoh" else pLCMTransport(topic) + assert msg_type is not None # not use_pickled implies a typed msg_type + return ( + ZenohTransport(topic, msg_type) if g.transport == "zenoh" else LCMTransport(topic, msg_type) + ) + + +def _transport_arg_error(argv: list[str], message: str) -> NoReturn: + """Print an argparse-style CLI error for `--transport` and exit(2).""" + prog = os.path.basename(argv[0]) if argv else "dimos" + print(f"{prog}: error: {message}", file=sys.stderr) + sys.exit(2) + + +def apply_transport_arg(argv: list[str], *, g: GlobalConfig = global_config) -> None: + """Apply a `--transport ` / `--transport=...` override from argv. + + Lets standalone CLIs (`humancli`, `agentspy`, `dtop`) flip the backend + explicitly. Without it they follow `DIMOS_TRANSPORT` / `.env` via the + global config, which is the single switch shared with the `dimos` process. + + A missing or invalid value exits(2) with a CLI-style message rather than + letting the assignment raise a raw pydantic ValidationError (the field is + validated on assignment). + """ + choices = get_args(TransportBackend) + for i, arg in enumerate(argv): + if arg.startswith("--transport="): + value = arg.split("=", 1)[1] + elif arg == "--transport": + if i + 1 >= len(argv) or argv[i + 1].startswith("-"): + _transport_arg_error(argv, "argument --transport: expected one argument") + value = argv[i + 1] + else: + continue + if value not in choices: + _transport_arg_error( + argv, + f"argument --transport: invalid choice: {value!r} " + f"(choose from {', '.join(map(repr, choices))})", + ) + g.update(transport=value) + + +def rpc_backend(g: GlobalConfig = global_config) -> type[RPCSpec]: + """Return the RPC class (`LCMRPC` or `ZenohRPC`) for the active backend.""" + return ZenohRPC if g.transport == "zenoh" else LCMRPC + + +def tf_backend(g: GlobalConfig = global_config) -> type[TFSpec]: + """Return the TF class (`LCMTF` or `ZenohTF`) for the active backend.""" + return ZenohTF if g.transport == "zenoh" else LCMTF diff --git a/dimos/e2e_tests/lcm_spy.py b/dimos/e2e_tests/lcm_spy.py index 2692d04f7d..cec34324f3 100644 --- a/dimos/e2e_tests/lcm_spy.py +++ b/dimos/e2e_tests/lcm_spy.py @@ -17,7 +17,6 @@ import math import pickle import threading -import time from typing import Any import lcm @@ -25,6 +24,7 @@ from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped from dimos.msgs.protocol import DimosMsg from dimos.protocol.service.lcmservice import LCMService +from dimos.utils.testing.waiting import wait_until class LcmSpy(LCMService): @@ -89,30 +89,15 @@ def topic_listener(self, topic: str, listener: Callable[[bytes], None]) -> Itera finally: self.unregister_topic_listener(topic, listener) - def wait_until( - self, - *, - condition: Callable[[], bool], - timeout: float, - error_message: str, - poll_interval: float = 0.1, - ) -> None: - start_time = time.time() - while time.time() - start_time < timeout: - if condition(): - return - time.sleep(poll_interval) - raise TimeoutError(error_message) - def wait_for_saved_topic(self, topic: str, timeout: float = 30.0) -> None: def condition() -> bool: with self._messages_lock: return topic in self.messages - self.wait_until( - condition=condition, + wait_until( + condition, timeout=timeout, - error_message=f"Timeout waiting for topic {topic}", + message=f"Timeout waiting for topic {topic}", ) def wait_for_saved_topic_content( @@ -122,10 +107,10 @@ def condition() -> bool: with self._messages_lock: return any(content_contains in msg for msg in self.messages.get(topic, [])) - self.wait_until( - condition=condition, + wait_until( + condition, timeout=timeout, - error_message=f"Timeout waiting for '{topic}' to contain '{content_contains!r}'", + message=f"Timeout waiting for '{topic}' to contain '{content_contains!r}'", ) def wait_for_message_pickle_result( @@ -143,10 +128,10 @@ def listener(msg: bytes) -> None: event.set() with self.topic_listener(topic, listener): - self.wait_until( - condition=event.is_set, + wait_until( + event.is_set, timeout=timeout, - error_message=fail_message, + message=fail_message, ) def wait_for_message_result( @@ -165,10 +150,10 @@ def listener(msg: bytes) -> None: event.set() with self.topic_listener(topic, listener): - self.wait_until( - condition=event.is_set, + wait_until( + event.is_set, timeout=timeout, - error_message=fail_message, + message=fail_message, ) def wait_until_odom_position( diff --git a/dimos/hardware/sensors/camera/realsense/camera.py b/dimos/hardware/sensors/camera/realsense/camera.py index 6257720608..dae71e908f 100644 --- a/dimos/hardware/sensors/camera/realsense/camera.py +++ b/dimos/hardware/sensors/camera/realsense/camera.py @@ -30,7 +30,7 @@ from dimos.core.core import rpc from dimos.core.module import Module, ModuleConfig from dimos.core.stream import Out -from dimos.core.transport import LCMTransport +from dimos.core.transport_factory import make_transport from dimos.hardware.sensors.camera.spec import ( OPTICAL_ROTATION, DepthCameraConfig, @@ -448,11 +448,11 @@ def main() -> None: dimos.start() camera = dimos.deploy(RealSenseCamera, enable_pointcloud=True, pointcloud_fps=5.0) - camera.color_image.transport = LCMTransport("/camera/color", Image) - camera.depth_image.transport = LCMTransport("/camera/depth", Image) - camera.pointcloud.transport = LCMTransport("/camera/pointcloud", PointCloud2) - camera.camera_info.transport = LCMTransport("/camera/color_info", CameraInfo) - camera.depth_camera_info.transport = LCMTransport("/camera/depth_info", CameraInfo) + camera.color_image.transport = make_transport("/camera/color", Image) + camera.depth_image.transport = make_transport("/camera/depth", Image) + camera.pointcloud.transport = make_transport("/camera/pointcloud", PointCloud2) + camera.camera_info.transport = make_transport("/camera/color_info", CameraInfo) + camera.depth_camera_info.transport = make_transport("/camera/depth_info", CameraInfo) def cleanup() -> None: try: diff --git a/dimos/hardware/sensors/camera/zed/camera.py b/dimos/hardware/sensors/camera/zed/camera.py index 5203fa9242..84492c28f3 100644 --- a/dimos/hardware/sensors/camera/zed/camera.py +++ b/dimos/hardware/sensors/camera/zed/camera.py @@ -28,7 +28,7 @@ from dimos.core.core import rpc from dimos.core.module import Module, ModuleConfig from dimos.core.stream import Out -from dimos.core.transport import LCMTransport +from dimos.core.transport_factory import make_transport from dimos.hardware.sensors.camera.spec import ( OPTICAL_ROTATION, DepthCameraConfig, @@ -494,11 +494,11 @@ def main() -> None: dimos.start() camera = dimos.deploy(ZEDCamera, enable_pointcloud=True, pointcloud_fps=5.0) - camera.color_image.transport = LCMTransport("/camera/color", Image) - camera.depth_image.transport = LCMTransport("/camera/depth", Image) - camera.pointcloud.transport = LCMTransport("/camera/pointcloud", PointCloud2) - camera.camera_info.transport = LCMTransport("/camera/color_info", CameraInfo) - camera.depth_camera_info.transport = LCMTransport("/camera/depth_info", CameraInfo) + camera.color_image.transport = make_transport("/camera/color", Image) + camera.depth_image.transport = make_transport("/camera/depth", Image) + camera.pointcloud.transport = make_transport("/camera/pointcloud", PointCloud2) + camera.camera_info.transport = make_transport("/camera/color_info", CameraInfo) + camera.depth_camera_info.transport = make_transport("/camera/depth_info", CameraInfo) def cleanup() -> None: try: diff --git a/dimos/perception/detection/module2D.py b/dimos/perception/detection/module2D.py index 5531293098..7d147d41a6 100644 --- a/dimos/perception/detection/module2D.py +++ b/dimos/perception/detection/module2D.py @@ -156,16 +156,16 @@ def deploy( # type: ignore[no-untyped-def] prefix: str = "/detector2d", **kwargs, ) -> Detection2DModule: - from dimos.core.transport import LCMTransport + from dimos.core.transport_factory import make_transport detector = Detection2DModule(**kwargs) detector.color_image.connect(camera.color_image) - detector.detections.transport = LCMTransport(f"{prefix}/detections", Detection2DArray) + detector.detections.transport = make_transport(f"{prefix}/detections", Detection2DArray) - detector.detected_image_0.transport = LCMTransport(f"{prefix}/image/0", Image) - detector.detected_image_1.transport = LCMTransport(f"{prefix}/image/1", Image) - detector.detected_image_2.transport = LCMTransport(f"{prefix}/image/2", Image) + detector.detected_image_0.transport = make_transport(f"{prefix}/image/0", Image) + detector.detected_image_1.transport = make_transport(f"{prefix}/image/1", Image) + detector.detected_image_2.transport = make_transport(f"{prefix}/image/2", Image) detector.start() return detector diff --git a/dimos/perception/detection/module3D.py b/dimos/perception/detection/module3D.py index fc57782bb2..393a217207 100644 --- a/dimos/perception/detection/module3D.py +++ b/dimos/perception/detection/module3D.py @@ -22,7 +22,7 @@ from dimos.core.coordination.module_coordinator import ModuleCoordinator from dimos.core.core import rpc from dimos.core.stream import In, Out -from dimos.core.transport import LCMTransport +from dimos.core.transport_factory import make_transport from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped from dimos.msgs.geometry_msgs.Quaternion import Quaternion from dimos.msgs.geometry_msgs.Transform import Transform @@ -208,15 +208,15 @@ def deploy( # type: ignore[no-untyped-def] detector.image.connect(camera.color_image) detector.pointcloud.connect(lidar.pointcloud) - detector.detections.transport = LCMTransport(f"{prefix}/detections", Detection2DArray) + detector.detections.transport = make_transport(f"{prefix}/detections", Detection2DArray) - detector.detected_image_0.transport = LCMTransport(f"{prefix}/image/0", Image) - detector.detected_image_1.transport = LCMTransport(f"{prefix}/image/1", Image) - detector.detected_image_2.transport = LCMTransport(f"{prefix}/image/2", Image) + detector.detected_image_0.transport = make_transport(f"{prefix}/image/0", Image) + detector.detected_image_1.transport = make_transport(f"{prefix}/image/1", Image) + detector.detected_image_2.transport = make_transport(f"{prefix}/image/2", Image) - detector.detected_pointcloud_0.transport = LCMTransport(f"{prefix}/pointcloud/0", PointCloud2) - detector.detected_pointcloud_1.transport = LCMTransport(f"{prefix}/pointcloud/1", PointCloud2) - detector.detected_pointcloud_2.transport = LCMTransport(f"{prefix}/pointcloud/2", PointCloud2) + detector.detected_pointcloud_0.transport = make_transport(f"{prefix}/pointcloud/0", PointCloud2) + detector.detected_pointcloud_1.transport = make_transport(f"{prefix}/pointcloud/1", PointCloud2) + detector.detected_pointcloud_2.transport = make_transport(f"{prefix}/pointcloud/2", PointCloud2) detector.start() diff --git a/dimos/porcelain/remote_module_source.py b/dimos/porcelain/remote_module_source.py index c3044933cc..e87aec2284 100644 --- a/dimos/porcelain/remote_module_source.py +++ b/dimos/porcelain/remote_module_source.py @@ -26,7 +26,7 @@ if TYPE_CHECKING: from dimos.core.coordination.blueprints import Blueprint - from dimos.protocol.rpc.pubsubrpc import LCMRPC + from dimos.protocol.rpc.spec import RPCSpec logger = setup_logger() @@ -38,7 +38,7 @@ class _RemoteProxy: attribute access raises `AttributeError`. """ - def __init__(self, rpc: LCMRPC, remote_name: str, rpc_names: set[str]) -> None: + def __init__(self, rpc: RPCSpec, remote_name: str, rpc_names: set[str]) -> None: self._rpc = rpc self._remote_name = remote_name self._rpc_names = rpc_names diff --git a/dimos/protocol/pubsub/benchmark/testdata.py b/dimos/protocol/pubsub/benchmark/testdata.py index a5c59dc00e..129d4bff41 100644 --- a/dimos/protocol/pubsub/benchmark/testdata.py +++ b/dimos/protocol/pubsub/benchmark/testdata.py @@ -21,6 +21,8 @@ from dimos.msgs.sensor_msgs.Image import Image, ImageFormat from dimos.protocol.pubsub.benchmark.type import Case +from dimos.protocol.pubsub.impl.zenohpubsub import Topic as ZenohTopic, Zenoh +from dimos.protocol.service.zenohservice import ZenohSessionPool try: import cyclonedds as _cyclonedds # noqa: F401 @@ -272,6 +274,28 @@ def redis_msggen(size: int) -> tuple[str, Any]: print("Redis not available") +@contextmanager +def zenoh_pubsub_channel() -> Generator[Zenoh, None, None]: + pool = ZenohSessionPool() + zenoh_pubsub = Zenoh(session_pool=pool) + zenoh_pubsub.start() + yield zenoh_pubsub + zenoh_pubsub.stop() + pool.close_all() + + +def zenoh_msggen(size: int) -> tuple[ZenohTopic, Image]: + return (ZenohTopic("dimos/benchmark/zenoh", Image), make_data_image(size)) + + +testcases.append( + Case( + pubsub_context=zenoh_pubsub_channel, + msg_gen=zenoh_msggen, + ) +) + + from dimos.protocol.pubsub.impl.rospubsub import ( ROS_AVAILABLE, DimosROS, diff --git a/dimos/protocol/pubsub/impl/test_zenohpubsub.py b/dimos/protocol/pubsub/impl/test_zenohpubsub.py new file mode 100644 index 0000000000..6f9636d264 --- /dev/null +++ b/dimos/protocol/pubsub/impl/test_zenohpubsub.py @@ -0,0 +1,290 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for ZenohPubSubBase — raw bytes pub/sub over Zenoh.""" + +from __future__ import annotations + +import threading + +import pytest + +pytest.importorskip("zenoh") + +import zenoh + +from dimos.msgs.geometry_msgs.Twist import Twist +from dimos.msgs.sensor_msgs.Image import Image +from dimos.protocol.pubsub.impl.lcmpubsub import Topic +from dimos.protocol.pubsub.impl.zenohpubsub import ( + ZenohPubSubBase, + _key_expr_to_topic, + _topic_to_key_expr, +) +from dimos.protocol.pubsub.impl.zenohqos import ZenohQoS +from dimos.protocol.service.zenohservice import ZenohSessionPool + + +@pytest.fixture() +def make_pubsub(): + """Build started ZenohPubSubBase instances on isolated pools, clean up after.""" + created = [] + + def _make(**kwargs): + pool = ZenohSessionPool() + ps = ZenohPubSubBase(session_pool=pool, **kwargs) + ps.start() + created.append((ps, pool)) + return ps + + yield _make + for ps, pool in created: + ps.stop() + # Close the pool so Zenoh's internal threads are joined + pool.close_all() + + +@pytest.fixture() +def pubsub(make_pubsub): + """Create and start a ZenohPubSubBase instance on an isolated pool, clean up after.""" + return make_pubsub() + + +class TestZenohPubSubBase: + def test_publish_and_subscribe(self, pubsub, retry_until) -> None: + received = [] + event = threading.Event() + topic = Topic("dimos/test/basic") + + def callback(msg: bytes, t: Topic) -> None: + received.append(msg) + event.set() + + pubsub.subscribe(topic, callback) + retry_until(event, lambda: pubsub.publish(topic, b"hello zenoh")) + assert received[0] == b"hello zenoh" + + def test_multiple_subscribers(self, pubsub, retry_until) -> None: + received_a: list[bytes] = [] + received_b: list[bytes] = [] + both_received = threading.Event() + countdown = threading.Barrier(2, action=both_received.set) + topic = Topic("dimos/test/multi") + + def callback_a(msg: bytes, t: Topic) -> None: + received_a.append(msg) + countdown.wait() + + def callback_b(msg: bytes, t: Topic) -> None: + received_b.append(msg) + countdown.wait() + + pubsub.subscribe(topic, callback_a) + pubsub.subscribe(topic, callback_b) + retry_until(both_received, lambda: pubsub.publish(topic, b"broadcast")) + assert received_a[-1:] == [b"broadcast"] + assert received_b[-1:] == [b"broadcast"] + + def test_unsubscribe(self, pubsub, retry_until) -> None: + received: list[bytes] = [] + event = threading.Event() + topic = Topic("dimos/test/unsub") + + def callback(msg: bytes, t: Topic) -> None: + received.append(msg) + event.set() + + unsub = pubsub.subscribe(topic, callback) + retry_until(event, lambda: pubsub.publish(topic, b"before")) + assert received == [b"before"] + + # Unsubscribe and verify no more messages arrive + unsub() + received.clear() + event.clear() + pubsub.publish(topic, b"after") + + # We can't prove a negative with an event, so use a short timeout + assert not event.wait(timeout=0.2), "Received message after unsubscribe" + assert received == [] + + def test_unsubscribe_is_idempotent(self, pubsub) -> None: + topic = Topic("dimos/test/idempotent") + unsub = pubsub.subscribe(topic, lambda msg, t: None) + unsub() + unsub() # should not raise + + def test_concurrent_unsubscribe(self, pubsub) -> None: + topic = Topic("dimos/test/concurrent_unsub") + unsub = pubsub.subscribe(topic, lambda msg, t: None) + + n_threads = 8 + barrier = threading.Barrier(n_threads) + errors: list[Exception] = [] + + def race() -> None: + barrier.wait() + try: + unsub() + except Exception as e: + errors.append(e) + + threads = [threading.Thread(target=race) for _ in range(n_threads)] + for t in threads: + t.start() + for t in threads: + t.join() + assert errors == [] + assert len(pubsub._subscribers) == 0 + + def test_concrete_subscription_passes_topic_through(self, pubsub, retry_until) -> None: + received: list[Topic] = [] + event = threading.Event() + topic = Topic("dimos/test/passthrough", lcm_type=Twist) + + def callback(msg: bytes, t: Topic) -> None: + received.append(t) + event.set() + + pubsub.subscribe(topic, callback) + retry_until(event, lambda: pubsub.publish(topic, b"data")) + assert received[0] is topic + + def test_untyped_topic_with_dotted_segment_round_trips(self, pubsub, retry_until) -> None: + # The last segment resolves as a type name, but a concrete subscription + # must not re-parse it into base+type on receive. + received: list[Topic] = [] + event = threading.Event() + topic = Topic("dimos/test/geometry_msgs.Twist") + + def callback(msg: bytes, t: Topic) -> None: + received.append(t) + event.set() + + pubsub.subscribe(topic, callback) + retry_until(event, lambda: pubsub.publish(topic, b"data")) + assert received[0] is topic + + def test_publish_before_subscriber_does_not_error(self, pubsub) -> None: + topic = Topic("dimos/test/no_sub") + pubsub.publish(topic, b"orphan message") # should not raise + + def test_stop_cleans_up_publishers_and_subscribers(self, pubsub) -> None: + topic = Topic("dimos/test/cleanup") + pubsub.subscribe(topic, lambda msg, t: None) + pubsub.publish(topic, b"test") + pubsub.stop() + assert len(pubsub._publishers) == 0 + assert len(pubsub._subscribers) == 0 + + def test_subscribe_all(self, pubsub, retry_until) -> None: + received: list[bytes] = [] + event = threading.Event() + topic = Topic("dimos/test/any/topic") + + def callback(msg: bytes, t: Topic) -> None: + received.append(msg) + event.set() + + pubsub.subscribe_all(callback) + retry_until(event, lambda: pubsub.publish(topic, b"wildcard")) + assert received[-1] == b"wildcard" + + def test_subscribe_after_stop_does_not_track(self, pubsub) -> None: + # Models the declare/stop race: once stopped, a newly declared subscriber + # must undeclare itself rather than be tracked (and leak past shutdown). + pubsub.stop() + unsub = pubsub.subscribe(Topic("dimos/test/after_stop"), lambda msg, t: None) + assert pubsub._subscribers == [] + unsub() # no-op, must not raise + + def test_subscribe_all_after_stop_is_noop(self, pubsub) -> None: + pubsub.stop() + unsub = pubsub.subscribe_all(lambda msg, t: None) + assert pubsub._drain_stops == [] + unsub() # no-op, must not raise + + +class TestPublisherQoS: + """Publisher QoS is resolved from the config's rules at declare time.""" + + def test_publisher_qos_from_config(self, make_pubsub) -> None: + rule = ZenohQoS( + key="dimos/test/qos/**", reliability="best_effort", congestion_control="drop" + ) + ps = make_pubsub(qos=(rule,)) + pub = ps._get_publisher("dimos/test/qos/stream") + assert pub.reliability == zenoh.Reliability.BEST_EFFORT + assert pub.congestion_control == zenoh.CongestionControl.DROP + + def test_publisher_default_qos_without_rules(self, make_pubsub) -> None: + # Pins zenoh's publisher defaults: reliable, drop under congestion. + ps = make_pubsub(qos=()) + pub = ps._get_publisher("dimos/test/qos/defaults") + assert pub.reliability == zenoh.Reliability.RELIABLE + assert pub.congestion_control == zenoh.CongestionControl.DROP + + def test_publisher_follows_global_rules_when_qos_unset(self, make_pubsub) -> None: + ps = make_pubsub() + pub = ps._get_publisher("dimos/rpc/test_qos/req") + assert pub.congestion_control == zenoh.CongestionControl.BLOCK + + +class TestTopicKeyExprConversion: + """Tests for _topic_to_key_expr and _key_expr_to_topic round-trip.""" + + def test_typed_topic_to_key_expr(self) -> None: + topic = Topic("dimos/cmd_vel", lcm_type=Twist) + key = _topic_to_key_expr(topic) + assert key == "dimos/cmd_vel/geometry_msgs.Twist" + + def test_untyped_topic_to_key_expr(self) -> None: + topic = Topic("dimos/data") + key = _topic_to_key_expr(topic) + assert key == "dimos/data" + + def test_key_expr_to_topic_with_known_type(self) -> None: + topic = _key_expr_to_topic("dimos/cmd_vel/geometry_msgs.Twist") + assert topic.topic == "dimos/cmd_vel" + assert topic.lcm_type is Twist + + def test_key_expr_to_topic_with_unknown_type(self) -> None: + topic = _key_expr_to_topic("dimos/data/unknown.FooBar") + # Last segment doesn't resolve — entire string becomes the topic + assert topic.topic == "dimos/data/unknown.FooBar" + assert topic.lcm_type is None + + def test_key_expr_to_topic_with_no_slash(self) -> None: + topic = _key_expr_to_topic("simple_topic") + assert topic.topic == "simple_topic" + assert topic.lcm_type is None + + def test_key_expr_to_topic_uses_default_type(self) -> None: + topic = _key_expr_to_topic("dimos/data", default_lcm_type=Twist) + assert topic.topic == "dimos/data" + assert topic.lcm_type is Twist + + def test_round_trip_typed(self) -> None: + original = Topic("dimos/color_image", lcm_type=Image) + key = _topic_to_key_expr(original) + reconstructed = _key_expr_to_topic(key) + assert reconstructed.topic == original.topic + assert reconstructed.lcm_type is original.lcm_type + + def test_round_trip_untyped(self) -> None: + original = Topic("dimos/gps_location") + key = _topic_to_key_expr(original) + reconstructed = _key_expr_to_topic(key) + assert reconstructed.topic == original.topic + assert reconstructed.lcm_type is None diff --git a/dimos/protocol/pubsub/impl/test_zenohqos.py b/dimos/protocol/pubsub/impl/test_zenohqos.py new file mode 100644 index 0000000000..4ad7d7d4aa --- /dev/null +++ b/dimos/protocol/pubsub/impl/test_zenohqos.py @@ -0,0 +1,66 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import pytest +import zenoh + +from dimos.protocol.pubsub.impl.zenohpubsub import resolve_qos +from dimos.protocol.pubsub.impl.zenohqos import DEFAULT_ZENOH_QOS, ZenohQoS + + +def test_first_match_wins() -> None: + rules = ( + ZenohQoS(key="dimos/a/**", reliability="best_effort"), + ZenohQoS(key="dimos/**", reliability="reliable"), + ) + assert resolve_qos("dimos/a/b", rules) == {"reliability": zenoh.Reliability.BEST_EFFORT} + + +def test_default_rpc_is_reliable_and_blocks() -> None: + assert resolve_qos("dimos/rpc/Hello/say/req", DEFAULT_ZENOH_QOS) == { + "reliability": zenoh.Reliability.RELIABLE, + "congestion_control": zenoh.CongestionControl.BLOCK, + } + + +def test_default_image_type_suffix_drops() -> None: + assert resolve_qos("dimos/camera/color/sensor_msgs.Image", DEFAULT_ZENOH_QOS) == { + "reliability": zenoh.Reliability.BEST_EFFORT, + "congestion_control": zenoh.CongestionControl.DROP, + } + + +def test_default_agent_channels_are_exact_keys() -> None: + qos = resolve_qos("dimos/agent", DEFAULT_ZENOH_QOS) + assert qos["congestion_control"] == zenoh.CongestionControl.BLOCK + # A sibling channel does not inherit the agent rule. + assert resolve_qos("dimos/agents", DEFAULT_ZENOH_QOS) == {} + + +def test_no_match_returns_empty() -> None: + assert resolve_qos("dimos/odom/geometry_msgs.PoseStamped", DEFAULT_ZENOH_QOS) == {} + + +def test_partial_rule_omits_unset_fields() -> None: + rules = (ZenohQoS(key="dimos/x", reliability="best_effort"),) + assert resolve_qos("dimos/x", rules) == {"reliability": zenoh.Reliability.BEST_EFFORT} + + +def test_invalid_rule_key_raises() -> None: + # Leading slashes are invalid zenoh key expressions; publish() logs this. + rules = (ZenohQoS(key="/leading/slash"),) + with pytest.raises(zenoh.ZError): + resolve_qos("dimos/x", rules) diff --git a/dimos/protocol/pubsub/impl/zenohpubsub.py b/dimos/protocol/pubsub/impl/zenohpubsub.py new file mode 100644 index 0000000000..01b99060b9 --- /dev/null +++ b/dimos/protocol/pubsub/impl/zenohpubsub.py @@ -0,0 +1,289 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from collections.abc import Callable, Iterable +from functools import lru_cache +import threading +from typing import Any + +import zenoh + +from dimos.msgs.helpers import resolve_msg_type +from dimos.protocol.pubsub.encoders import LCMEncoderMixin, PickleEncoderMixin +from dimos.protocol.pubsub.impl.lcmpubsub import Topic +from dimos.protocol.pubsub.impl.zenohqos import ZenohQoS +from dimos.protocol.pubsub.spec import AllPubSub +from dimos.protocol.service.zenohservice import ZenohService +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + +_RELIABILITY = { + "reliable": zenoh.Reliability.RELIABLE, + "best_effort": zenoh.Reliability.BEST_EFFORT, +} +_CONGESTION_CONTROL = { + "drop": zenoh.CongestionControl.DROP, + "block": zenoh.CongestionControl.BLOCK, +} + + +def resolve_qos(key_expr: str, rules: Iterable[ZenohQoS]) -> dict[str, Any]: + """`declare_publisher` kwargs for a key expression: first intersecting rule wins. + + Unset fields in the winning rule (and unmatched keys) fall back to zenoh's + publisher defaults (reliable + drop under congestion). + """ + target = zenoh.KeyExpr(key_expr) + for rule in rules: + if zenoh.KeyExpr(rule.key).intersects(target): + kwargs: dict[str, Any] = {} + if rule.reliability is not None: + kwargs["reliability"] = _RELIABILITY[rule.reliability] + if rule.congestion_control is not None: + kwargs["congestion_control"] = _CONGESTION_CONTROL[rule.congestion_control] + return kwargs + return {} + + +def _topic_to_key_expr(topic: Topic) -> str: + """Convert a Topic to a Zenoh key expression. + + Embeds the lcm_type in the key using '/' instead of '#' (what LCM does). + + Examples: + Topic("dimos/cmd_vel", Twist) -> "dimos/cmd_vel/geometry_msgs.Twist" + Topic("dimos/data") -> "dimos/data" + """ + base = topic.topic if isinstance(topic.topic, str) else topic.pattern + if topic.lcm_type is not None: + return f"{base}/{topic.lcm_type.msg_name}" + return base + + +@lru_cache(maxsize=1024) +def _key_expr_to_topic(key_expr: str, default_lcm_type: type | None = None) -> Topic: + """Reconstruct a Topic from a Zenoh key expression. + + Parses the last '/' segment and attempts to resolve it as a DimosMsg + type via resolve_msg_type(). If resolution succeeds, the segment is + treated as the type suffix and the remainder as the base topic. + + Results are cached; callers must treat the returned Topic as immutable. + + Examples: + "dimos/cmd_vel/geometry_msgs.Twist" -> Topic("dimos/cmd_vel", Twist) + "dimos/data" -> Topic("dimos/data", default_lcm_type) + "dimos/data/unknown.Foo" -> Topic("dimos/data/unknown.Foo", default_lcm_type) + """ + # Try to resolve the last segment as a message type + parts = key_expr.rsplit("/", 1) + if len(parts) == 2: + base, maybe_type = parts + lcm_type = resolve_msg_type(maybe_type) + if lcm_type is not None: + return Topic(topic=base, lcm_type=lcm_type) + return Topic(topic=key_expr, lcm_type=default_lcm_type) + + +class ZenohPubSubBase(ZenohService, AllPubSub[Topic, bytes]): + """Raw bytes pub/sub over Zenoh. + + Publishers are cached per-topic to avoid re-declaring on every publish. + Subscribers are tracked for cleanup on stop(). + """ + + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + self._publishers: dict[str, zenoh.Publisher] = {} + self._publisher_lock = threading.Lock() + self._subscribers: list[zenoh.Subscriber[Any]] = [] + self._drain_stops: list[Callable[[], None]] = [] + self._subscriber_lock = threading.Lock() + self._stopped = False + + def _qos_rules(self) -> tuple[ZenohQoS, ...]: + if self.config.qos is not None: + return self.config.qos + # Deferred import: protocol/ stays free of core/ at import time (same + # pattern as pubsub/registry.py). Reads the worker-synced singleton. + from dimos.core.global_config import global_config + + return global_config.zenoh_qos + + def _get_publisher(self, key_expr: str) -> zenoh.Publisher: + """Get or declare the cached publisher for a key expression. + + QoS is resolved from the active rules once per key at declare time; + later rule changes only affect publishers declared afterwards. + """ + with self._publisher_lock: + if key_expr not in self._publishers: + qos = resolve_qos(key_expr, self._qos_rules()) + self._publishers[key_expr] = self.session.declare_publisher(key_expr, **qos) + return self._publishers[key_expr] + + def publish(self, topic: Topic, message: bytes) -> None: + """Publish bytes to a Zenoh key expression. + + Transport-level errors (session closed, invalid key expression) are + logged but not raised. Delivery guarantees are handled by Zenoh's + reliability protocol (RELIABLE mode retransmits at each hop). + """ + key_expr = _topic_to_key_expr(topic) + try: + publisher = self._get_publisher(key_expr) + publisher.put(message) + except Exception: + logger.error(f"Error publishing to {key_expr}", exc_info=True) + + def subscribe( + self, topic: Topic, callback: Callable[[bytes, Topic], None] + ) -> Callable[[], None]: + """Subscribe to a Zenoh key expression.""" + key_expr = _topic_to_key_expr(topic) + + def on_sample(sample: zenoh.Sample) -> None: + try: + data = sample.payload.to_bytes() + except Exception: + logger.error(f"Error reading payload from {key_expr}", exc_info=True) + return + # Concrete subscriptions only ever receive their own key, so the + # subscribed topic can be passed through without re-parsing. + sample_key = str(sample.key_expr) + if sample_key == key_expr: + recv_topic = topic + else: + recv_topic = _key_expr_to_topic(sample_key, topic.lcm_type) + callback(data, recv_topic) + + sub = self.session.declare_subscriber(key_expr, on_sample) + with self._subscriber_lock: + if self._stopped: + sub.undeclare() + return lambda: None + self._subscribers.append(sub) + + def unsubscribe() -> None: + with self._subscriber_lock: + if sub not in self._subscribers: + return # Already removed by stop() or a concurrent unsubscribe + self._subscribers.remove(sub) + sub.undeclare() + + return unsubscribe + + def subscribe_all(self, callback: Callable[[bytes, Topic], Any]) -> Callable[[], None]: + """Subscribe to all dimos topics, delivering only the latest per topic. + + Unlike `subscribe`, this is best effort. If it's done otherwise, rerun lags behind. + """ + latest: dict[str, tuple[bytes, Topic]] = {} + lock = threading.Lock() + wake = threading.Event() + stop = threading.Event() + + def collect(msg: bytes, topic: Topic) -> None: + # Fast path on the Zenoh delivery thread: keep only the newest per topic. + with lock: + latest[str(topic)] = (msg, topic) + wake.set() + + def drain() -> None: + while not stop.is_set(): + wake.wait() + wake.clear() + with lock: + batch = list(latest.values()) + latest.clear() + for msg, topic in batch: + try: + callback(msg, topic) + except Exception: + logger.error("Error in subscribe_all callback", exc_info=True) + + thread = threading.Thread(target=drain, name="zenoh-subscribe-all", daemon=True) + + def stop_drain() -> None: + stop.set() + wake.set() # unblock the drain so it observes the stop flag + thread.join(timeout=2.0) + + # Register the stop callback and launch the drain thread under the lock so + # stop() can't run between them and miss the thread. If stop() already ran, + # bail without starting anything. + with self._subscriber_lock: + if self._stopped: + return lambda: None + self._drain_stops.append(stop_drain) + thread.start() + + inner_unsub = self.subscribe(Topic("dimos/**"), collect) + + def unsubscribe() -> None: + with self._subscriber_lock: + if stop_drain not in self._drain_stops: + return # Already removed by stop() or a concurrent unsubscribe + self._drain_stops.remove(stop_drain) + inner_unsub() + stop_drain() + + return unsubscribe + + def stop(self) -> None: + with self._subscriber_lock: + self._stopped = True + drain_stops = list(self._drain_stops) + self._drain_stops.clear() + for stop_drain in drain_stops: + stop_drain() + with self._subscriber_lock: + for subscriber in self._subscribers: + subscriber.undeclare() + self._subscribers.clear() + with self._publisher_lock: + for publisher in self._publishers.values(): + publisher.undeclare() + self._publishers.clear() + super().stop() + + +class Zenoh( # type: ignore[misc] + LCMEncoderMixin, + ZenohPubSubBase, +): + """Zenoh pub/sub with LCM encoding for typed DimosMsg.""" + + ... + + +class PickleZenoh( + PickleEncoderMixin, # type: ignore[type-arg] + ZenohPubSubBase, +): + """Zenoh pub/sub with pickle encoding for arbitrary Python objects.""" + + ... + + +__all__ = [ + "PickleZenoh", + "Topic", + "Zenoh", + "ZenohPubSubBase", + "resolve_qos", +] diff --git a/dimos/protocol/pubsub/impl/zenohqos.py b/dimos/protocol/pubsub/impl/zenohqos.py new file mode 100644 index 0000000000..3651c0e669 --- /dev/null +++ b/dimos/protocol/pubsub/impl/zenohqos.py @@ -0,0 +1,49 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Per-key-expression Zenoh publisher QoS rules. + +This module must stay free of zenoh/dimos imports so `dimos.core.global_config` +can import it from lightweight entry-points (see `visualization/rerun/constants.py`). +""" + +from __future__ import annotations + +from typing import Literal + +from pydantic import BaseModel + + +class ZenohQoS(BaseModel): + """One publisher-QoS rule: applies to key expressions intersecting `key`.""" + + key: str # zenoh key-expr pattern, e.g. "dimos/rpc/**" + reliability: Literal["reliable", "best_effort"] | None = None # None = zenoh default + congestion_control: Literal["drop", "block"] | None = None # None = zenoh default + + +DEFAULT_ZENOH_QOS: tuple[ZenohQoS, ...] = ( + # RPC requests/responses are one-shot: never drop under congestion. + ZenohQoS(key="dimos/rpc/**", reliability="reliable", congestion_control="block"), + # Agent/human channels: low-rate, loss unacceptable. + ZenohQoS(key="dimos/human_input", reliability="reliable", congestion_control="block"), + ZenohQoS(key="dimos/agent", reliability="reliable", congestion_control="block"), + ZenohQoS(key="dimos/agent_idle", reliability="reliable", congestion_control="block"), + # High-rate sensor streams (typed transports embed the message type in the + # key): drop stale frames under congestion rather than block publishers. + ZenohQoS(key="**/sensor_msgs.Image", reliability="best_effort", congestion_control="drop"), + ZenohQoS( + key="**/sensor_msgs.PointCloud2", reliability="best_effort", congestion_control="drop" + ), +) diff --git a/dimos/protocol/pubsub/test_spec.py b/dimos/protocol/pubsub/test_spec.py index a0579ca199..e414bde540 100644 --- a/dimos/protocol/pubsub/test_spec.py +++ b/dimos/protocol/pubsub/test_spec.py @@ -25,6 +25,8 @@ from dimos.msgs.geometry_msgs.Vector3 import Vector3 from dimos.protocol.pubsub.impl.lcmpubsub import LCM, Topic from dimos.protocol.pubsub.impl.memory import Memory +from dimos.protocol.pubsub.impl.zenohpubsub import PickleZenoh, Zenoh +from dimos.protocol.service.zenohservice import ZenohSessionPool from dimos.utils.testing.collector import CallbackCollector @@ -145,6 +147,44 @@ def shared_memory_cpu_context() -> Generator[PickleSharedMemory, None, None]: ) +@contextmanager +def zenoh_lcm_context() -> Generator[Zenoh, None, None]: + pool = ZenohSessionPool() + zenoh_pubsub = Zenoh(session_pool=pool) + zenoh_pubsub.start() + yield zenoh_pubsub + zenoh_pubsub.stop() + pool.close_all() + + +testdata.append( + ( + zenoh_lcm_context, + Topic(topic="dimos/test/spec", lcm_type=Vector3), + [Vector3(1, 2, 3), Vector3(4, 5, 6), Vector3(7, 8, 9)], + ) +) + + +@contextmanager +def zenoh_pickle_context() -> Generator[PickleZenoh, None, None]: + pool = ZenohSessionPool() + zenoh_pubsub = PickleZenoh(session_pool=pool) + zenoh_pubsub.start() + yield zenoh_pubsub + zenoh_pubsub.stop() + pool.close_all() + + +testdata.append( + ( + zenoh_pickle_context, + Topic("dimos/test/spec/pickle"), + [{"key": "value1"}, {"key": "value2"}, {"key": "value3"}], + ) +) + + @pytest.mark.parametrize("pubsub_context, topic, values", testdata) def test_store(pubsub_context: Callable[[], Any], topic: Any, values: list[Any]) -> None: with pubsub_context() as x: diff --git a/dimos/protocol/rpc/pubsubrpc.py b/dimos/protocol/rpc/pubsubrpc.py index abfd521666..17354ec27a 100644 --- a/dimos/protocol/rpc/pubsubrpc.py +++ b/dimos/protocol/rpc/pubsubrpc.py @@ -30,6 +30,7 @@ from dimos.constants import LCM_MAX_CHANNEL_NAME_LENGTH from dimos.protocol.pubsub.impl.lcmpubsub import PickleLCM, Topic from dimos.protocol.pubsub.impl.shmpubsub import PickleSharedMemory +from dimos.protocol.pubsub.impl.zenohpubsub import PickleZenoh from dimos.protocol.pubsub.spec import PubSub from dimos.protocol.rpc.rpc_utils import deserialize_exception, serialize_exception from dimos.protocol.rpc.spec import DEFAULT_RPC_TIMEOUT, DEFAULT_RPC_TIMEOUTS, Args, RPCSpec @@ -321,6 +322,27 @@ def topicgen(self, name: str, req_or_res: bool) -> Topic: return Topic(topic=topic) +class ZenohRPC(PubSubRPCMixin[Topic, Any], PickleZenoh): + def __init__( + self, + rpc_timeouts: dict[str, float] | None = None, + default_rpc_timeout: float = DEFAULT_RPC_TIMEOUT, + **kwargs: Any, + ) -> None: + if rpc_timeouts is None: + rpc_timeouts = dict(DEFAULT_RPC_TIMEOUTS) + PickleZenoh.__init__(self, **kwargs) + PubSubRPCMixin.__init__( + self, rpc_timeouts=rpc_timeouts, default_rpc_timeout=default_rpc_timeout, **kwargs + ) + + def topicgen(self, name: str, req_or_res: bool) -> Topic: + # Zenoh key expressions can't start with '/' and have no LCM-style + # channel-name length cap, so namespace under 'dimos/' with no fallback. + suffix = "res" if req_or_res else "req" + return Topic(topic=f"dimos/rpc/{name}/{suffix}") + + class ShmRPC(PubSubRPCMixin[str, Any], PickleSharedMemory): def __init__( self, diff --git a/dimos/protocol/rpc/spec.py b/dimos/protocol/rpc/spec.py index 69537c3abe..edbd069435 100644 --- a/dimos/protocol/rpc/spec.py +++ b/dimos/protocol/rpc/spec.py @@ -121,4 +121,10 @@ def override_f(*args, fname=fname, **kwargs): # type: ignore[no-untyped-def] class RPCSpec(RPCServer, RPCClient): - pass + def start(self) -> None: + if hasattr(super(), "start"): + super().start() # type: ignore[misc] + + def stop(self) -> None: + if hasattr(super(), "stop"): + super().stop() # type: ignore[misc] diff --git a/dimos/protocol/rpc/test_spec.py b/dimos/protocol/rpc/test_spec.py index 785dc3904f..ef233a0507 100644 --- a/dimos/protocol/rpc/test_spec.py +++ b/dimos/protocol/rpc/test_spec.py @@ -25,9 +25,10 @@ import pytest -from dimos.protocol.rpc.pubsubrpc import LCMRPC, ShmRPC +from dimos.protocol.rpc.pubsubrpc import LCMRPC, ShmRPC, ZenohRPC from dimos.protocol.rpc.rpc_utils import RemoteError from dimos.protocol.rpc.spec import DEFAULT_RPC_TIMEOUT +from dimos.protocol.service.zenohservice import ZenohSessionPool class CustomTestError(Exception): @@ -80,6 +81,31 @@ def shm_rpc_context(): testdata.append((shm_rpc_context, "shm")) + +@contextmanager +def zenoh_rpc_context(): + """Context manager for ZenohRPC implementation. + + Server and client share one session pool (one Zenoh session) -- the same + single-session self-delivery the Zenoh transport tests rely on -- so the grid + runs without cross-session discovery latency. + """ + pool = ZenohSessionPool() + server = ZenohRPC(rpc_timeouts={}, default_rpc_timeout=DEFAULT_RPC_TIMEOUT, session_pool=pool) + client = ZenohRPC(rpc_timeouts={}, default_rpc_timeout=DEFAULT_RPC_TIMEOUT, session_pool=pool) + server.start() + client.start() + + try: + yield server, client + finally: + server.stop() + client.stop() + pool.close_all() + + +testdata.append((zenoh_rpc_context, "zenoh")) + # Try to add RedisRPC if available try: from dimos.protocol.rpc.redisrpc import RedisRPC diff --git a/dimos/protocol/service/ddsservice.py b/dimos/protocol/service/ddsservice.py index 71df162707..fe86b2843c 100644 --- a/dimos/protocol/service/ddsservice.py +++ b/dimos/protocol/service/ddsservice.py @@ -15,7 +15,6 @@ from __future__ import annotations import threading -from typing import TYPE_CHECKING try: from cyclonedds.domain import DomainParticipant @@ -28,9 +27,6 @@ from dimos.protocol.service.spec import BaseConfig, Service from dimos.utils.logging_config import setup_logger -if TYPE_CHECKING: - from cyclonedds.qos import Qos - logger = setup_logger() _participants: dict[int, DomainParticipant] = {} @@ -41,7 +37,6 @@ class DDSConfig(BaseConfig): """Configuration for DDS service.""" domain_id: int = 0 - qos: Qos | None = None class DDSService(Service): diff --git a/dimos/protocol/service/test_zenohservice.py b/dimos/protocol/service/test_zenohservice.py new file mode 100644 index 0000000000..ca5623b72c --- /dev/null +++ b/dimos/protocol/service/test_zenohservice.py @@ -0,0 +1,79 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import pytest + +from dimos.protocol.pubsub.impl.zenohqos import ZenohQoS +from dimos.protocol.service.zenohservice import ZenohConfig, ZenohService, ZenohSessionPool + + +@pytest.fixture() +def session_pool(): + """Provide a fresh, isolated session pool and close it after the test.""" + pool = ZenohSessionPool() + yield pool + pool.close_all() + + +def test_different_modes_produce_different_keys() -> None: + peer = ZenohConfig(mode="peer") + client = ZenohConfig(mode="client") + assert peer.session_key != client.session_key + + +def test_qos_does_not_change_session_key() -> None: + # Sessions are shared across QoS configs; QoS applies per publisher. + with_qos = ZenohConfig(qos=(ZenohQoS(key="dimos/x", congestion_control="block"),)) + assert with_qos.session_key == ZenohConfig().session_key + + +def test_start_creates_session(session_pool) -> None: + svc = ZenohService(session_pool=session_pool) + svc.start() + assert svc.session is not None + + +def test_two_services_share_session(session_pool) -> None: + svc1 = ZenohService(session_pool=session_pool) + svc2 = ZenohService(session_pool=session_pool) + svc1.start() + svc2.start() + assert svc1.session is svc2.session + + +def test_stop_does_not_close_shared_session(session_pool) -> None: + svc1 = ZenohService(session_pool=session_pool) + svc2 = ZenohService(session_pool=session_pool) + svc1.start() + svc2.start() + svc1.stop() + # svc2's session should still be valid + assert svc2.session is not None + + +def test_session_before_start_raises(session_pool) -> None: + svc = ZenohService(session_pool=session_pool) + with pytest.raises(RuntimeError, match="not initialized"): + _ = svc.session + + +def test_start_is_idempotent(session_pool) -> None: + svc = ZenohService(session_pool=session_pool) + svc.start() + session1 = svc.session + svc.start() + session2 = svc.session + assert session1 is session2 diff --git a/dimos/protocol/service/zenohservice.py b/dimos/protocol/service/zenohservice.py new file mode 100644 index 0000000000..a9fc87228e --- /dev/null +++ b/dimos/protocol/service/zenohservice.py @@ -0,0 +1,95 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import json +import threading +from typing import Any + +import zenoh + +from dimos.protocol.pubsub.impl.zenohqos import ZenohQoS +from dimos.protocol.service.spec import BaseConfig, Service +from dimos.utils.logging_config import setup_logger + +zenoh.init_log_from_env_or("warn") + +logger = setup_logger() + + +class ZenohConfig(BaseConfig): + mode: str = "peer" + connect: list[str] = [] + listen: list[str] = [] + # Per-publisher QoS rules; None = follow global_config.zenoh_qos. + # Excluded from session_key: sessions are shared, QoS is per-publisher. + qos: tuple[ZenohQoS, ...] | None = None + + @property + def session_key(self) -> str: + return f"{self.mode}|{json.dumps(sorted(self.connect))}|{json.dumps(sorted(self.listen))}" + + +class ZenohSessionPool: + def __init__(self) -> None: + self._sessions: dict[str, zenoh.Session] = {} + self._lock = threading.Lock() + + def acquire(self, config: ZenohConfig) -> zenoh.Session: + """Open a session for this config, or return the existing shared one.""" + key = config.session_key + with self._lock: + if key not in self._sessions: + zconfig = zenoh.Config() + zconfig.insert_json5("mode", json.dumps(config.mode)) + if config.connect: + zconfig.insert_json5("connect/endpoints", json.dumps(config.connect)) + if config.listen: + zconfig.insert_json5("listen/endpoints", json.dumps(config.listen)) + self._sessions[key] = zenoh.open(zconfig) + logger.debug(f"Zenoh session opened in {config.mode} mode") + return self._sessions[key] + + def close_all(self) -> None: + """Close every pooled session and empty the pool.""" + with self._lock: + for session in self._sessions.values(): + session.close() + self._sessions.clear() + + +# Process-default pool used by production code. Constructing it opens no sessions. +default_session_pool = ZenohSessionPool() + + +class ZenohService(Service): + config: ZenohConfig + + def __init__(self, *, session_pool: ZenohSessionPool | None = None, **kwargs: Any) -> None: + # session_pool is keyword-only so it never reaches the pydantic config + # (which is extra="forbid"). It rides the same **kwargs path as mode/connect/listen. + super().__init__(**kwargs) + self._session_pool = session_pool or default_session_pool + self._session: zenoh.Session | None = None + + def start(self) -> None: + self._session = self._session_pool.acquire(self.config) + super().start() + + @property + def session(self) -> zenoh.Session: + if self._session is None: + raise RuntimeError("Zenoh session not initialized. Call start() first.") + return self._session diff --git a/dimos/protocol/tf/tf.py b/dimos/protocol/tf/tf.py index eb3d72b470..71790f99d0 100644 --- a/dimos/protocol/tf/tf.py +++ b/dimos/protocol/tf/tf.py @@ -25,6 +25,7 @@ from dimos.msgs.geometry_msgs.Transform import Transform from dimos.msgs.tf2_msgs.TFMessage import TFMessage from dimos.protocol.pubsub.impl.lcmpubsub import LCM, Topic +from dimos.protocol.pubsub.impl.zenohpubsub import Zenoh from dimos.protocol.pubsub.spec import PubSub from dimos.protocol.service.spec import BaseConfig, Service from dimos.types.timestamped import to_human_readable @@ -422,4 +423,15 @@ class LCMTF(PubSubTF): config: LCMPubsubConfig +class ZenohPubsubConfig(PubSubTFConfig): + # Zenoh key expressions can't start with '/'; namespace under 'dimos'. + topic: Topic = field(default_factory=lambda: Topic("dimos/tf", TFMessage)) + pubsub: type[PubSub] | PubSub | None = Zenoh # type: ignore[type-arg] + autostart: bool = True + + +class ZenohTF(PubSubTF): + config: ZenohPubsubConfig + + TF = LCMTF diff --git a/dimos/robot/cli/dimos.py b/dimos/robot/cli/dimos.py index 3f94a6be4e..a91baca809 100644 --- a/dimos/robot/cli/dimos.py +++ b/dimos/robot/cli/dimos.py @@ -93,6 +93,11 @@ def create_dynamic_callback(): # type: ignore[no-untyped-def] for field_name, field_info in fields.items(): field_type = field_info.annotation + # Container generics (e.g. `tuple[ZenohQoS, ...]`) have no single-flag CLI + # representation; they're configured via env/JSON. Skip like arg_help does. + if isinstance(field_type, types.GenericAlias): + continue + # Handle Optional types # Check for Optional/Union with None if get_origin(field_type) is type(str | None): @@ -137,7 +142,11 @@ def create_dynamic_callback(): # type: ignore[no-untyped-def] def callback(**kwargs) -> None: # type: ignore[no-untyped-def] ctx = kwargs.pop("ctx") - ctx.obj = {k: v for k, v in kwargs.items() if v is not None} + overrides = {k: v for k, v in kwargs.items() if v is not None} + ctx.obj = overrides + # Apply overrides (e.g. --transport, --viewer) to the process-global config + # up front so every subcommand honors flags given before the subcommand name. + global_config.update(**overrides) callback.__signature__ = inspect.Signature(params) # type: ignore[attr-defined] @@ -250,10 +259,6 @@ def run( cli_config_overrides: dict[str, Any] = ctx.obj - # this is a workaround until we have a proper way to have delayed-module-choice in blueprints - # ex: vis_module(viewer=global_config.viewer) is wrong (viewer will always be default value) without this patch - global_config.update(**cli_config_overrides) - # Clean stale registry entries stale = cleanup_stale() if stale: @@ -595,12 +600,8 @@ def restart( @main.command() -def show_config(ctx: typer.Context) -> None: +def show_config() -> None: """Show current config settings and their values.""" - - cli_config_overrides: dict[str, Any] = ctx.obj - global_config.update(**cli_config_overrides) - for field_name, value in global_config.model_dump().items(): typer.echo(f"{field_name}: {value}") diff --git a/dimos/robot/cli/test_dimos.py b/dimos/robot/cli/test_dimos.py index b925d4ebd9..5f9e8330a2 100644 --- a/dimos/robot/cli/test_dimos.py +++ b/dimos/robot/cli/test_dimos.py @@ -15,10 +15,12 @@ from typing import Literal import pytest +from typer.testing import CliRunner from dimos.core.coordination.blueprints import autoconnect +from dimos.core.global_config import global_config from dimos.core.module import Module, ModuleConfig -from dimos.robot.cli.dimos import _normalize_simulation_argv, arg_help +from dimos.robot.cli.dimos import _normalize_simulation_argv, arg_help, main @pytest.mark.parametrize( @@ -45,6 +47,20 @@ def test_normalize_simulation_argv(argv: list[str], expected: list[str]): assert _normalize_simulation_argv(argv) == expected +def test_global_config_flag_applies_before_subcommand(): + """A GlobalConfig flag placed before the subcommand (e.g. --transport) must be + applied by the root callback so every subcommand sees it -- not just + run/show-config, which no longer apply it themselves.""" + runner = CliRunner() + original = global_config.transport + try: + result = runner.invoke(main, ["--transport", "zenoh", "show-config"]) + assert result.exit_code == 0, result.output + assert "transport: zenoh" in result.output + finally: + global_config.update(transport=original) + + def test_blueprint_arg_help(): class ConfigA(ModuleConfig): min_interval_sec: float = 0.1 diff --git a/dimos/robot/cli/topic.py b/dimos/robot/cli/topic.py index e4dce1f139..e674546cfd 100644 --- a/dimos/robot/cli/topic.py +++ b/dimos/robot/cli/topic.py @@ -14,14 +14,18 @@ from __future__ import annotations +from collections.abc import Callable import importlib import re import time import typer -from dimos.core.transport import LCMTransport, pLCMTransport -from dimos.protocol.pubsub.impl.lcmpubsub import LCMPubSubBase +from dimos.core.global_config import global_config +from dimos.core.transport import PubSubTransport +from dimos.core.transport_factory import make_transport, transport_topic +from dimos.protocol.pubsub.impl.lcmpubsub import LCMPubSubBase, Topic +from dimos.protocol.pubsub.impl.zenohpubsub import Zenoh _modules_to_try = [ "dimos.msgs.geometry_msgs", @@ -55,33 +59,41 @@ def _decode_typed_lcm_message(channel: str, data: bytes) -> object: return cls.lcm_decode(data) +def _listen_forever(listening_msg: str, on_stop: Callable[[], None] = lambda: None) -> None: + """Print the banner and block until Ctrl+C, then run on_stop.""" + typer.echo(listening_msg) + try: + while True: + time.sleep(0.1) + except KeyboardInterrupt: + on_stop() + typer.echo("\nStopped.") + + def topic_echo(topic: str, type_name: str | None) -> None: - # Explicit mode (legacy): unchanged. + # Explicit mode (legacy): backend chosen by make_transport from global_config. if type_name is not None: msg_type = _resolve_type(type_name) - use_pickled = getattr(msg_type, "lcm_encode", None) is None - transport: pLCMTransport[object] | LCMTransport[object] = ( - pLCMTransport(topic) if use_pickled else LCMTransport(topic, msg_type) - ) + transport: PubSubTransport[object] = make_transport(topic, msg_type) + transport.subscribe(lambda msg: print(msg)) + _listen_forever(f"Listening on {topic} for {type_name} messages... (Ctrl+C to stop)") + return - def _on_message(msg: object) -> None: - print(msg) + # Inferred typed mode: decode each message from the type embedded in its + # channel/key. The wire format is backend-specific, so dispatch on it. + if global_config.transport == "zenoh": + _topic_echo_inferred_zenoh(topic) + else: + _topic_echo_inferred_lcm(topic) - transport.subscribe(_on_message) - typer.echo(f"Listening on {topic} for {type_name} messages... (Ctrl+C to stop)") - try: - while True: - time.sleep(0.1) - except KeyboardInterrupt: - typer.echo("\nStopped.") - return +def _topic_echo_inferred_lcm(topic: str) -> None: # Warn about missing system config for standalone CLI usage. from dimos.protocol.service.lcmservice import autoconf autoconf(check_only=True) - # Inferred typed mode: listen on /topic#pkg.Msg and decode from the msg_name suffix. + # Listen on /topic#pkg.Msg and decode from the msg_name suffix. bus = LCMPubSubBase() bus.start() # starts threaded handle loop @@ -93,17 +105,29 @@ def on_msg(channel: str, data: bytes) -> None: assert bus.l is not None bus.l.subscribe(typed_pattern, on_msg) - typer.echo( + _listen_forever( f"Listening on {topic} (inferring from typed LCM channels like '{topic}#pkg.Msg')... " - "(Ctrl+C to stop)" + "(Ctrl+C to stop)", + bus.stop, ) - try: - while True: - time.sleep(0.1) - except KeyboardInterrupt: - bus.stop() - typer.echo("\nStopped.") + +def _topic_echo_inferred_zenoh(topic: str) -> None: + key = transport_topic(topic) + bus = Zenoh() + bus.start() + + # Typed Zenoh keys embed the type as a trailing segment ("dimos/topic/pkg.Msg"); + # a wildcard subscription decodes each message from that suffix. Untyped keys + # don't resolve to a type and are skipped by the encoder. The ignore reflects the + # pattern Topic vs the encoder's concrete-topic protocol (see lcmpubsub.py). + bus.subscribe(Topic(f"{key}/**"), lambda msg, _topic: print(msg)) # type: ignore[arg-type] + + _listen_forever( + f"Listening on {topic} (inferring from typed Zenoh keys like '{key}/pkg.Msg')... " + "(Ctrl+C to stop)", + bus.stop, + ) def topic_send(topic: str, message_expr: str) -> None: @@ -135,10 +159,7 @@ def topic_send(topic: str, message_expr: str) -> None: raise typer.Exit(1) msg_type = type(message) - use_pickled = getattr(msg_type, "lcm_encode", None) is None - transport: pLCMTransport[object] | LCMTransport[object] = ( - pLCMTransport(topic) if use_pickled else LCMTransport(topic, msg_type) - ) + transport: PubSubTransport[object] = make_transport(topic, msg_type) transport.broadcast(None, message) typer.echo(f"Sent to {topic}: {message}") diff --git a/dimos/robot/unitree/b1/unitree_b1.py b/dimos/robot/unitree/b1/unitree_b1.py index 166a622ade..46924f3edc 100644 --- a/dimos/robot/unitree/b1/unitree_b1.py +++ b/dimos/robot/unitree/b1/unitree_b1.py @@ -25,7 +25,8 @@ from dimos.core.coordination.module_coordinator import ModuleCoordinator from dimos.core.resource import Resource -from dimos.core.transport import LCMTransport, ROSTransport +from dimos.core.transport import ROSTransport +from dimos.core.transport_factory import make_transport from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped from dimos.msgs.geometry_msgs.TwistStamped import TwistStamped from dimos.msgs.nav_msgs.Odometry import Odometry @@ -97,11 +98,11 @@ def start(self) -> None: else: self.connection = self._dimos.deploy(B1ConnectionModule, ip=self.ip, port=self.port) # type: ignore[assignment] - # Configure LCM transports for connection (matching G1 pattern) - self.connection.cmd_vel.transport = LCMTransport("/cmd_vel", TwistStamped) # type: ignore[attr-defined] - self.connection.mode_cmd.transport = LCMTransport("/b1/mode", Int32) # type: ignore[attr-defined] - self.connection.odom_in.transport = LCMTransport("/state_estimation", Odometry) # type: ignore[attr-defined] - self.connection.odom_pose.transport = LCMTransport("/odom", PoseStamped) # type: ignore[attr-defined] + # Configure transports for connection (matching G1 pattern) + self.connection.cmd_vel.transport = make_transport("/cmd_vel", TwistStamped) # type: ignore[attr-defined] + self.connection.mode_cmd.transport = make_transport("/b1/mode", Int32) # type: ignore[attr-defined] + self.connection.odom_in.transport = make_transport("/state_estimation", Odometry) # type: ignore[attr-defined] + self.connection.odom_pose.transport = make_transport("/odom", PoseStamped) # type: ignore[attr-defined] # Configure ROS transports for connection self.connection.ros_cmd_vel.transport = ROSTransport("/cmd_vel", TwistStamped) # type: ignore[attr-defined] @@ -113,8 +114,8 @@ def start(self) -> None: from dimos.robot.unitree.b1.joystick_module import JoystickModule self.joystick = self._dimos.deploy(JoystickModule) # type: ignore[assignment] - self.joystick.twist_out.transport = LCMTransport("/cmd_vel", TwistStamped) # type: ignore[attr-defined] - self.joystick.mode_out.transport = LCMTransport("/b1/mode", Int32) # type: ignore[attr-defined] + self.joystick.twist_out.transport = make_transport("/cmd_vel", TwistStamped) # type: ignore[attr-defined] + self.joystick.mode_out.transport = make_transport("/b1/mode", Int32) # type: ignore[attr-defined] logger.info("Joystick module deployed - pygame window will open") self._dimos.start_all_modules() diff --git a/dimos/robot/unitree/dimsim_connection.py b/dimos/robot/unitree/dimsim_connection.py index 5afcd1fda7..9f9abe6fb1 100644 --- a/dimos/robot/unitree/dimsim_connection.py +++ b/dimos/robot/unitree/dimsim_connection.py @@ -19,7 +19,8 @@ from reactivex import Observable, Subject from dimos.core.global_config import GlobalConfig -from dimos.core.transport import LCMTransport +from dimos.core.transport import PubSubTransport +from dimos.core.transport_factory import make_transport, tf_backend from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped from dimos.msgs.geometry_msgs.Quaternion import Quaternion from dimos.msgs.geometry_msgs.Transform import Transform @@ -28,7 +29,6 @@ from dimos.msgs.sensor_msgs.CameraInfo import CameraInfo from dimos.msgs.sensor_msgs.Image import Image from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2 -from dimos.protocol.tf.tf import LCMTF from dimos.simulation.dimsim.dimsim_process import DimSimProcess from dimos.utils.logging_config import setup_logger @@ -50,9 +50,9 @@ class DimSimConnection: def __init__(self, global_config: GlobalConfig) -> None: self._dimsim_process: DimSimProcess = DimSimProcess(global_config) - self._odom_transport: LCMTransport[PoseStamped] = LCMTransport("/odom", PoseStamped) + self._odom_transport: PubSubTransport[PoseStamped] = make_transport("/odom", PoseStamped) self._unsubscribe_odom: Callable[[], None] | None = None - self._tf = LCMTF() + self._tf = tf_backend()() def start(self) -> None: self._dimsim_process.start() diff --git a/dimos/robot/unitree/go2/connection.py b/dimos/robot/unitree/go2/connection.py index 185546217b..44db3cbdf2 100644 --- a/dimos/robot/unitree/go2/connection.py +++ b/dimos/robot/unitree/go2/connection.py @@ -32,7 +32,8 @@ from dimos.core.module import Module, ModuleConfig from dimos.core.resource import CompositeResource from dimos.core.stream import In, Out -from dimos.core.transport import LCMTransport, pSHMTransport +from dimos.core.transport import pSHMTransport +from dimos.core.transport_factory import make_transport from dimos.spec.perception import Camera, Pointcloud from dimos.utils.logging_config import setup_logger @@ -382,9 +383,9 @@ def deploy(dimos: ModuleCoordinator, ip: str, prefix: str = "") -> "ModuleProxy" f"{prefix}/image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE ) - connection.cmd_vel.transport = LCMTransport(f"{prefix}/cmd_vel", Twist) + connection.cmd_vel.transport = make_transport(f"{prefix}/cmd_vel", Twist) - connection.camera_info.transport = LCMTransport(f"{prefix}/camera_info", CameraInfo) + connection.camera_info.transport = make_transport(f"{prefix}/camera_info", CameraInfo) connection.start() return connection diff --git a/dimos/robot/unitree/type/map.py b/dimos/robot/unitree/type/map.py index f98eba2c3c..4eb82eadb1 100644 --- a/dimos/robot/unitree/type/map.py +++ b/dimos/robot/unitree/type/map.py @@ -24,7 +24,7 @@ from dimos.core.core import rpc from dimos.core.module import Module, ModuleConfig from dimos.core.stream import In, Out -from dimos.core.transport import LCMTransport +from dimos.core.transport_factory import make_transport from dimos.mapping.pointclouds.accumulators.general import GeneralPointCloudAccumulator from dimos.mapping.pointclouds.accumulators.protocol import PointCloudAccumulator from dimos.mapping.pointclouds.occupancy import general_occupancy @@ -113,8 +113,8 @@ def _publish(self, _: Any) -> None: def deploy(dimos: ModuleCoordinator, connection: Go2ConnectionProtocol): # type: ignore[no-untyped-def] mapper = dimos.deploy(Map, global_publish_interval=1.0) - mapper.global_map.transport = LCMTransport("/global_map", PointCloud2) - mapper.global_costmap.transport = LCMTransport("/global_costmap", OccupancyGrid) + mapper.global_map.transport = make_transport("/global_map", PointCloud2) + mapper.global_costmap.transport = make_transport("/global_costmap", OccupancyGrid) mapper.lidar.connect(connection.pointcloud) # type: ignore[attr-defined] mapper.start() return mapper diff --git a/dimos/simulation/mujoco/person_on_track.py b/dimos/simulation/mujoco/person_on_track.py index f19b49e4c6..280569ca00 100644 --- a/dimos/simulation/mujoco/person_on_track.py +++ b/dimos/simulation/mujoco/person_on_track.py @@ -18,7 +18,8 @@ import numpy as np from numpy.typing import NDArray -from dimos.core.transport import LCMTransport +from dimos.core.transport import PubSubTransport +from dimos.core.transport_factory import make_transport from dimos.msgs.geometry_msgs.Pose import Pose @@ -29,7 +30,7 @@ def __init__(self, model: mujoco.MjModel) -> None: person_body_id = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_BODY, "person") self._person_mocap_id = model.body_mocapid[person_body_id] self._latest_pose: Pose | None = None - self._transport: LCMTransport[Pose] = LCMTransport("/person_pose", Pose) + self._transport: PubSubTransport[Pose] = make_transport("/person_pose", Pose) self._transport.subscribe(self._on_pose) def _on_pose(self, pose: Pose) -> None: @@ -65,7 +66,7 @@ def __init__(self, track: list[tuple[float, float]]) -> None: self._current_waypoint_idx = 0 self._initialized = False self._current_pos = np.array([0.0, 0.0]) - self._transport: LCMTransport[Pose] = LCMTransport("/person_pose", Pose) + self._transport: PubSubTransport[Pose] = make_transport("/person_pose", Pose) def _get_segment_heading(self, from_idx: int, to_idx: int) -> float: """Get heading angle for traveling from one waypoint to another.""" diff --git a/dimos/utils/cli/agentspy/agentspy.py b/dimos/utils/cli/agentspy/agentspy.py index 685e5c0b43..14b173e4f6 100644 --- a/dimos/utils/cli/agentspy/agentspy.py +++ b/dimos/utils/cli/agentspy/agentspy.py @@ -16,6 +16,8 @@ from collections import deque from dataclasses import dataclass +import os +import sys import time from typing import Any, Union @@ -28,8 +30,9 @@ from textual.app import App, ComposeResult from textual.binding import Binding from textual.widgets import Footer, RichLog +from textual_serve.server import Server # type: ignore[import-not-found] -from dimos.protocol.pubsub.impl.lcmpubsub import PickleLCM, Topic +from dimos.core.transport_factory import apply_transport_arg, make_transport from dimos.utils.cli import theme # Type alias for all message types we might receive @@ -56,21 +59,20 @@ def __init__(self, topic: str = "/agent", max_messages: int = 1000) -> None: self.topic = topic self.max_messages = max_messages self.messages: deque[MessageEntry] = deque(maxlen=max_messages) - self.transport = PickleLCM() + self.transport = make_transport(self.topic) self.transport.start() self.callbacks: list[callable] = [] # type: ignore[valid-type] pass def start(self) -> None: """Start monitoring messages.""" - self.transport.subscribe(Topic(self.topic), self._handle_message) + self.transport.subscribe(self._handle_message) def stop(self) -> None: """Stop monitoring.""" - # PickleLCM doesn't have explicit stop method - pass + self.transport.stop() - def _handle_message(self, msg: Any, topic: Topic) -> None: + def _handle_message(self, msg: Any) -> None: """Handle incoming messages.""" # Check if it's one of the message types we care about if isinstance(msg, SystemMessage | ToolMessage | AIMessage | HumanMessage): @@ -219,14 +221,9 @@ def action_clear(self) -> None: def main() -> None: - """Main entry point for agentspy.""" - import sys + apply_transport_arg(sys.argv) if len(sys.argv) > 1 and sys.argv[1] == "web": - import os - - from textual_serve.server import Server # type: ignore[import-not-found] - server = Server(f"python {os.path.abspath(__file__)}") server.serve() else: diff --git a/dimos/utils/cli/dtop.py b/dimos/utils/cli/dtop.py index 4507f115c6..9090233743 100644 --- a/dimos/utils/cli/dtop.py +++ b/dimos/utils/cli/dtop.py @@ -15,7 +15,7 @@ """dtop — Live TUI for per-worker resource stats over LCM. Usage: - uv run python -m dimos.utils.cli.dtop [--topic /dimos/resource_stats] + uv run python -m dimos.utils.cli.dtop [--topic /resource_stats] """ from __future__ import annotations @@ -34,7 +34,8 @@ from textual.containers import VerticalScroll from textual.widgets import Static -from dimos.protocol.pubsub.impl.lcmpubsub import PickleLCM, Topic +from dimos.core.global_config import global_config +from dimos.core.transport_factory import make_transport from dimos.utils.cli import theme if TYPE_CHECKING: @@ -203,9 +204,7 @@ class ResourceSpyApp(App[None]): BINDINGS = [("q", "quit"), ("ctrl+c", "quit")] - def __init__( - self, topic_name: str = "/dimos/resource_stats", db_path: str | None = None - ) -> None: + def __init__(self, topic_name: str = "/resource_stats", db_path: str | None = None) -> None: super().__init__() self._topic_name = topic_name # Warn about missing system config before entering TUI raw mode. @@ -222,8 +221,8 @@ def __init__( self._store = None self._mem_streams: dict[str, Any] = {} - self._lcm = PickleLCM() - self._lcm.subscribe(Topic(self._topic_name), self._on_msg) + self._lcm = make_transport(self._topic_name) + self._lcm.subscribe(self._on_msg) self._lcm.start() self._lock = threading.Lock() self._latest: dict[str, Any] | None = None @@ -243,7 +242,7 @@ async def on_unmount(self) -> None: if self._store is not None: self._store.stop() - def _on_msg(self, msg: dict[str, Any], _topic: str) -> None: + def _on_msg(self, msg: dict[str, Any]) -> None: with self._lock: self._latest = msg self._last_msg_time = time.monotonic() @@ -557,8 +556,11 @@ def main() -> None: parser = argparse.ArgumentParser( prog="dtop", description="Live TUI for per-worker resource stats." ) + parser.add_argument("--topic", default="/resource_stats", help="Topic to subscribe to.") parser.add_argument( - "--topic", default="/dimos/resource_stats", help="LCM topic to subscribe to." + "--transport", + choices=["lcm", "zenoh"], + help="Transport backend (defaults to DIMOS_TRANSPORT / .env).", ) parser.add_argument( "--log", @@ -567,6 +569,9 @@ def main() -> None: ) args = parser.parse_args() + if args.transport is not None: + global_config.update(transport=args.transport) + db_path = f"dtop_{time.strftime('%Y%m%d_%H%M%S')}.ignore.db" if args.log else None if db_path: print(f"Logging to {db_path}") diff --git a/dimos/utils/cli/human/humancli.py b/dimos/utils/cli/human/humancli.py index 45faef630f..fa107a41af 100644 --- a/dimos/utils/cli/human/humancli.py +++ b/dimos/utils/cli/human/humancli.py @@ -17,6 +17,8 @@ from collections import deque from datetime import datetime, timedelta import json +import os +import sys import textwrap import threading from typing import TYPE_CHECKING, Any @@ -32,9 +34,10 @@ from textual.geometry import Size from textual.strip import Strip from textual.widgets import Input, RichLog, Static +from textual_serve.server import Server # type: ignore[import-not-found] from dimos.agents.mcp import tool_stream -from dimos.core.transport import pLCMTransport +from dimos.core.transport_factory import apply_transport_arg, make_transport from dimos.utils.cli import theme from dimos.utils.generic import truncate_display_string @@ -266,9 +269,9 @@ class HumanCLIApp(App): # type: ignore[type-arg] def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def] super().__init__(*args, **kwargs) - self._human_transport = pLCMTransport("/human_input") # type: ignore[var-annotated] - self._agent_transport = pLCMTransport("/agent") # type: ignore[var-annotated] - self._agent_idle = pLCMTransport("/agent_idle") # type: ignore[var-annotated] + self._human_transport = make_transport("/human_input") + self._agent_transport = make_transport("/agent") + self._agent_idle = make_transport("/agent_idle") self.chat_log: RichLog | None = None self.input_widget: Input | None = None self._subscription_thread: threading.Thread | None = None @@ -655,15 +658,9 @@ def action_quit(self) -> None: # type: ignore[override] def main() -> None: - """Main entry point for the human CLI.""" - import sys + apply_transport_arg(sys.argv) if len(sys.argv) > 1 and sys.argv[1] == "web": - # Support for textual-serve web mode - import os - - from textual_serve.server import Server # type: ignore[import-not-found] - server = Server(f"python {os.path.abspath(__file__)}") server.serve() else: diff --git a/dimos/utils/testing/waiting.py b/dimos/utils/testing/waiting.py new file mode 100644 index 0000000000..f83eba488e --- /dev/null +++ b/dimos/utils/testing/waiting.py @@ -0,0 +1,54 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from collections.abc import Callable +import threading +import time + + +def wait_until( + predicate: Callable[[], bool], + *, + timeout: float, + interval: float = 0.1, + message: str | None = None, +) -> None: + """Poll ``predicate`` until it returns truthy or ``timeout`` elapses.""" + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + if predicate(): + return + time.sleep(interval) + raise TimeoutError(message or f"Timed out after {timeout}s waiting for condition") + + +def retry_until( + event: threading.Event, + action: Callable[[], None], + timeout: float = 2.0, + interval: float = 0.01, +) -> None: + """Retry an action until an Event fires.""" + deadline = threading.Event() + timer = threading.Timer(timeout, deadline.set) + timer.start() + try: + while not event.is_set() and not deadline.is_set(): + action() + event.wait(interval) + finally: + timer.cancel() + assert event.is_set(), f"Timed out after {timeout}s waiting for event" diff --git a/dimos/visualization/rerun/bridge.py b/dimos/visualization/rerun/bridge.py index 2f5fb1efa9..5d13dd01f1 100644 --- a/dimos/visualization/rerun/bridge.py +++ b/dimos/visualization/rerun/bridge.py @@ -42,8 +42,10 @@ from toolz import pipe # type: ignore[import-untyped] from dimos.core.core import rpc +from dimos.core.global_config import global_config from dimos.core.module import Module, ModuleConfig from dimos.protocol.pubsub.impl.lcmpubsub import LCM +from dimos.protocol.pubsub.impl.zenohpubsub import Zenoh from dimos.protocol.pubsub.patterns import Glob, pattern_matches from dimos.protocol.pubsub.spec import SubscribeAllCapable from dimos.protocol.service.lcmservice import autoconf @@ -163,7 +165,47 @@ def _default_blueprint() -> Blueprint: ) +def _default_pubsubs(config: Any = None) -> list[SubscribeAllCapable[Any, Any]]: + """Select the pubsub backend matching the active transport. + + All channels including TF flow over the active transport, so the bridge + listens only on that backend. To also bridge external LCM publishers while + running Zenoh, pass an explicit ``pubsubs=[Zenoh(), LCM()]``. + """ + transport = getattr(config, "transport", None) or global_config.transport + if transport == "zenoh": + return [Zenoh()] + return [LCM()] + + +def _resolve_pubsubs(config: Any) -> list[SubscribeAllCapable[Any, Any]]: + """Return explicit pubsubs when truly overridden, else transport defaults. + + Older blueprints commonly passed ``pubsubs=[LCM()]`` as the effective + default. Preserve the newer transport-driven behavior for that legacy + value, but honor explicit non-default overrides such as custom backends. + """ + fields_set: set[str] = cast("set[str]", getattr(config, "model_fields_set", set())) + pubsubs = cast( + "list[SubscribeAllCapable[Any, Any]] | None", + getattr(config, "pubsubs", None), + ) + if "pubsubs" in fields_set and pubsubs is not None: + is_legacy_default = len(pubsubs) == 1 and isinstance(pubsubs[0], LCM) + if not is_legacy_default: + return pubsubs + return _default_pubsubs(getattr(config, "g", config)) + + class Config(ModuleConfig): + """Configuration for RerunBridgeModule. + + The pubsubs field is accepted for backwards compatibility. The legacy + ``[LCM()]`` value is treated as the old default and replaced by the + transport-driven runtime default. Explicit non-default overrides are still + honored. + """ + pubsubs: list[SubscribeAllCapable[Any, Any]] = field(default_factory=lambda: [LCM()]) visual_override: dict[Glob | str, Callable[[Any], Archetype] | None] = field( @@ -269,7 +311,15 @@ def _get_entity_path(self, topic: Any) -> str: return self.config.topic_to_entity(topic) topic_str = getattr(topic, "name", None) or str(topic) - topic_str = topic_str.split("#")[0] # strip LCM topic suffix + # Strip type suffix: LCM uses '#type', Zenoh embeds type as '/type' in key expr + # but _key_expr_to_topic already parsed it into topic.topic, so use that. + raw = getattr(topic, "topic", topic_str) + if isinstance(raw, str): + topic_str = raw + topic_str = topic_str.split("#")[0] + # Strip Zenoh key prefix (dimos/) to match LCM entity paths + if topic_str.startswith("dimos/"): + topic_str = "/" + topic_str.removeprefix("dimos/") return f"{self.config.entity_prefix}{topic_str}" def _on_message(self, msg: Any, topic: Any) -> None: @@ -390,14 +440,21 @@ def start(self) -> None: if self.config.blueprint: rr.send_blueprint(_with_graph_tab(self.config.blueprint())) - for pubsub in self.config.pubsubs: + # Resolve pubsubs lazily — the module-level global_config singleton in worker + # processes doesn't have CLI overrides. Use self.config.g which is the parent's + # updated config, passed via the worker kwargs. + pubsubs = _resolve_pubsubs(self.config) + + # Start pubsubs and subscribe to all messages + for pubsub in pubsubs: logger.info(f"bridge listening on {pubsub.__class__.__name__}") if hasattr(pubsub, "start"): pubsub.start() unsub = pubsub.subscribe_all(self._on_message) self.register_disposable(Disposable(unsub)) - for pubsub in self.config.pubsubs: + # Add pubsub stop as disposable + for pubsub in pubsubs: if hasattr(pubsub, "stop"): self.register_disposable(Disposable(pubsub.stop)) # type: ignore[union-attr] @@ -523,7 +580,6 @@ def run_bridge( memory_limit=memory_limit, rerun_open=rerun_open, rerun_web=rerun_web, - pubsubs=[LCM()], ) bridge.start() diff --git a/dimos/visualization/rerun/test_viewer_integration.py b/dimos/visualization/rerun/test_viewer_integration.py index a8eb0602c3..d6a0ba1e95 100644 --- a/dimos/visualization/rerun/test_viewer_integration.py +++ b/dimos/visualization/rerun/test_viewer_integration.py @@ -27,6 +27,10 @@ import inspect import shutil +from dimos.core.global_config import GlobalConfig +from dimos.protocol.pubsub.impl.lcmpubsub import LCM +from dimos.visualization.rerun.bridge import Config, _resolve_pubsubs + class TestViewerBinaryInstallation: """Verify dimos-viewer binary is installed and functional.""" @@ -116,3 +120,24 @@ def test_bridge_has_fallback(self): "bridge.py start() has no fallback for missing dimos-viewer. " "Users without dimos-viewer will crash." ) + + +class ExplicitPubSubOverride: + def subscribe_all(self, callback): + return lambda: None + + +class TestBridgePubsubResolution: + def test_legacy_lcm_pubsubs_defers_to_transport_default(self): + config = Config(pubsubs=[LCM()], g=GlobalConfig(transport="lcm")) + pubsubs = _resolve_pubsubs(config) + + assert len(pubsubs) == 1 + assert isinstance(pubsubs[0], LCM) + + def test_explicit_custom_pubsubs_override_is_honored(self): + custom = ExplicitPubSubOverride() + config = Config(pubsubs=[custom], g=GlobalConfig(transport="lcm")) + pubsubs = _resolve_pubsubs(config) + + assert pubsubs == [custom] diff --git a/docs/development/testing.md b/docs/development/testing.md index 40e429797c..58b919423e 100644 --- a/docs/development/testing.md +++ b/docs/development/testing.md @@ -57,6 +57,8 @@ The default `addopts` in `pyproject.toml` includes a `-m` filter that excludes ` (Shortcut for `pytest -m 'not (tool or mujoco)' dimos` — runs the default suite *and* self-hosted tests, but not `tool` or `mujoco`.) +This includes slow agent and MCP-style integration tests in addition to slower transport and module tests. If one of those paths is broken, a failure can take close to a minute to surface because the harness waits for the agent flow to finish before timing out. + When writing or debugging a specific self-hosted test, override `-m` yourself to run it: ```bash diff --git a/docs/installation/osx.md b/docs/installation/osx.md index 8f190c1c5e..25909eb529 100644 --- a/docs/installation/osx.md +++ b/docs/installation/osx.md @@ -41,3 +41,19 @@ uv run mypy dimos # tests (around a minute to run) uv run pytest --numprocesses=auto dimos ``` + +## Transport note for macOS + +LCM over UDP can be unreliable on macOS for large or high-rate replay workloads. DimOS defaults the global stream transport to **Zenoh** on macOS, so you usually do not need `--transport=zenoh`. Use `--transport=lcm` if you need to force the legacy multicast path. + +See the [Zenoh quickstart](/docs/usage/transports/index.md#zenoh-quickstart) for install, Linux versus macOS defaults, and `DIMOS_TRANSPORT`. + +```sh +dimos --dtop --replay --replay-db=go2_bigoffice run unitree-go2 +``` + +If you are developing on the repository, prefer syncing the full environment with the checked-in lockfile: + +```sh +uv sync --extra all --frozen +``` diff --git a/docs/usage/cli.md b/docs/usage/cli.md index 7abd1fa7bb..58fd2a0ac0 100644 --- a/docs/usage/cli.md +++ b/docs/usage/cli.md @@ -25,6 +25,7 @@ dimos [GLOBAL OPTIONS] COMMAND [ARGS] | `--memory-limit` | TEXT | `auto` | Rerun viewer memory limit | | `--mcp-port` | INT | `9990` | MCP server port | | `--mcp-host` | TEXT | `127.0.0.1` | MCP server bind address | +| `--transport` | `lcm\|zenoh` | platform-dependent | Transport backend for streams, RPC, and TF. Defaults to `zenoh` on macOS, otherwise `lcm`. Set `DIMOS_TRANSPORT` (env var or `.env`) to switch every process at once. Standalone CLIs like `humancli`, `agentspy`, and `dtop`, which also accept `--transport`. | | `--dtop` / `--no-dtop` | bool | `False` | Enable live resource monitor overlay | | `--obstacle-avoidance` / `--no-obstacle-avoidance` | bool | `True` | Enable obstacle avoidance | | `--detection-model` | `qwen\|moondream` | `moondream` | Vision model for object detection | @@ -83,6 +84,9 @@ dimos run unitree-go2-agentic --daemon # Replay with Rerun viewer dimos --replay --viewer rerun run unitree-go2 +# Replay Big Office (on Linux use --transport=zenoh; on macOS Zenoh is default when installed) +dimos --transport=zenoh --dtop --replay --replay-db=go2_bigoffice run unitree-go2 + # Real robot dimos run unitree-go2-agentic --robot-ip 192.168.123.161 @@ -93,6 +97,8 @@ dimos run unitree-go2 keyboard-teleop dimos run unitree-go2-agentic --disable OsmSkill WebInput ``` +On macOS, heavy replay workloads can be unreliable over LCM UDP, so the default transport resolves to `zenoh`; you can still force either path explicitly with `--transport=lcm` or `--transport=zenoh`. + When `--daemon` is used, the process: 1. Builds and starts all modules (foreground — you see errors) 2. Runs a health check (polls worker PIDs) diff --git a/docs/usage/transports/index.md b/docs/usage/transports/index.md index 5415aab1d0..af5bd8eb18 100644 --- a/docs/usage/transports/index.md +++ b/docs/usage/transports/index.md @@ -27,7 +27,52 @@ A transport is responsible for the mechanics of delivery (IPC, sockets, Redis, R So: treat the API as uniform, but pick a backend whose semantics match the task. ---- +## Choosing a backend + +For most users, the important choice is between `lcm`, `zenoh`, and shared memory overrides: + +* `lcm`: current legacy default on most platforms. Fast and simple, but UDP multicast is best-effort. +* `zenoh`: network transport with reliable delivery semantics and the same typed message model through `LCMEncoderMixin`. +* shared memory (`pSHMTransport`, etc.): best for large local streams on a single machine. + +At the CLI level, you can select the stream transport globally with: + +```bash +dimos --transport=lcm run unitree-go2 +dimos --transport=zenoh run unitree-go2 +``` + +On macOS, large replay workloads can be unreliable over LCM UDP, so DimOS defaults the global stream transport to `zenoh` there. Other platforms default to `lcm`. + +## Zenoh quickstart + +Zenoh ships with DimOS by default (`eclipse-zenoh` is a base dependency), so there is nothing extra to install. + +**Default global stream transport** (only applies when you do not pass `--transport` or set `DIMOS_TRANSPORT`): + +| Situation | Default | +|-----------|---------| +| macOS | `zenoh` | +| Any other platform | `lcm` | + +**Two ways to override for one run or for your shell:** + +1. **CLI:** `dimos --transport=zenoh ...` or `dimos --transport=lcm ...` (see [CLI](/docs/usage/cli.md) for precedence with `.env` and blueprints). +2. **Environment:** `DIMOS_TRANSPORT=zenoh` or `DIMOS_TRANSPORT=lcm`. + +Typical **replay on macOS** (default is already Zenoh, so no transport flag is required): + +```bash +dimos --dtop --replay --replay-db=go2_bigoffice run unitree-go2 +``` + +The same workload on **Linux** (default remains `lcm` until you opt in): + +```bash +dimos --transport=zenoh --dtop --replay --replay-db=go2_bigoffice run unitree-go2 +``` + +Architecture notes (Rerun bridge, TF still on LCM) live under [Zenoh](#zenoh) in PubSub transports below. ## Benchmarks @@ -39,8 +84,6 @@ python -m pytest -svm tool -k "not bytes" dimos/protocol/pubsub/benchmark/test_b ![Benchmark results](../assets/pubsub_benchmark.png) ---- - ## Abstraction layers
@@ -85,8 +128,6 @@ text "pub/sub API" at P.s + (0, -0.2in) We’ll go through these layers top-down. ---- - ## Using transports with blueprints See [Blueprints](/docs/usage/blueprints.md) for the blueprint API. @@ -114,8 +155,6 @@ ros = nav.transports( ) ``` ---- - ## Using transports with modules Each **stream** on a module can use a different transport. Set `.transport` on the stream **before starting** modules. @@ -207,8 +246,6 @@ Received: (480, 640, 3) See [Modules](/docs/usage/modules.md) for more on module architecture. ---- - ## Inspecting LCM traffic (CLI) `lcmspy` shows topic frequency/bandwidth stats: @@ -222,8 +259,6 @@ Listening on /camera/rgb (inferring from typed LCM channels like '/camera/rgb#pk Image(shape=(480, 640, 3), format=RGB, dtype=uint8, dev=cpu, ts=2026-01-24 20:28:59) ``` ---- - ## Implementing a transport At the stream layer, a transport is implemented by subclassing `Transport` (see [`core/stream.py`](/dimos/core/stream.py#L83)) and implementing: @@ -244,8 +279,6 @@ Encoding is an implementation detail, but we encourage using LCM-compatible mess Many of our message types provide `lcm_encode` / `lcm_decode` for compact, language-agnostic binary encoding (often faster than pickle). For details, see [LCM](/docs/usage/lcm.md). ---- - ## PubSub transports Even though transport can be anything (TCP connection, unix socket) for now all our transport backends implement the `PubSub` interface. @@ -306,6 +339,42 @@ lcm.stop() Received velocity: x=1.0, y=0.0, z=0.5 ``` +### Zenoh + +Zenoh provides network pubsub without relying on UDP multicast for the user-facing stream transport. In DimOS it carries the same typed messages by encoding them with `LCMEncoderMixin`, so existing `dimos.msgs.*` types still work. + +Use Zenoh when: + +* you want a transport that behaves better than UDP multicast on macOS +* you are replaying large or high-rate data and want a more reliable network path +* you want to keep the DimOS typed stream model while changing the transport backend + +At the stream level, the transport wrappers are `ZenohTransport` and `pZenohTransport`. Install, defaults, and CLI versus environment overrides are in the [Zenoh quickstart](#zenoh-quickstart) above. + +The Rerun bridge also follows the global transport. When `transport=zenoh`, the bridge listens on Zenoh and on LCM for TF data. + +#### Per-topic QoS + +Zenoh publisher QoS is set per key expression by `global_config.zenoh_qos`, an ordered rule table (first match wins, defined in [`zenohqos.py`](/dimos/protocol/pubsub/impl/zenohqos.py)). Defaults: + +* `dimos/rpc/**` and the agent channels (`dimos/human_input`, `dimos/agent`, `dimos/agent_idle`): reliable, block under congestion (never drop). +* `Image`/`PointCloud2` streams (matched by the type suffix in the key): best-effort, drop under congestion. +* Everything else: zenoh defaults (reliable, drop under congestion). + +Override via env (JSON) or blueprint; rules are full-replace, so prepend to keep the defaults: + +```bash skip +DIMOS_ZENOH_QOS='[{"key": "dimos/foo", "reliability": "best_effort"}]' +``` + +```python skip +blueprint.global_config( + zenoh_qos=({"key": "dimos/foo/**", "congestion_control": "drop"}, *DEFAULT_ZENOH_QOS), +) +``` + +QoS is resolved once per key when the publisher is first declared; rule changes don't affect already-declared publishers. LCM has no per-topic settings, so the rules only apply when `transport=zenoh`. + ### Shared memory (IPC) Shared memory is highest performance, but only works on the **same machine**. @@ -364,8 +433,6 @@ dds.stop() ```results Received: [SensorReading(value=22.5)] ``` ---- - ## A minimal transport: `Memory` The simplest toy backend is `Memory` (single process). Start from there when implementing a new pubsub backend. @@ -396,8 +463,6 @@ Received 2 messages: See [`pubsub/impl/memory.py`](/dimos/protocol/pubsub/impl/memory.py) for the complete source. ---- - ## Encode/decode mixins Transports often need to serialize messages before sending and deserialize after receiving. @@ -448,8 +513,6 @@ class MyPicklePubSub(PickleEncoderMixin, Memory): pass ``` ---- - ## Testing and benchmarks ### Spec tests @@ -464,8 +527,6 @@ Add your backend to benchmarks to compare in context: python -m pytest -svm tool -k "not bytes" dimos/protocol/pubsub/benchmark/test_benchmark.py ``` ---- - # Available transports | Transport | Use case | Cross-process | Network | Notes | @@ -473,6 +534,7 @@ python -m pytest -svm tool -k "not bytes" dimos/protocol/pubsub/benchmark/test_b | `Memory` | Testing only, single process | No | No | Minimal reference impl | | `SharedMemory` | Multi-process on same machine | Yes | No | Highest throughput (IPC) | | `LCM` | Robot LAN broadcast (UDP multicast) | Yes | Yes | Best-effort; can drop packets on LAN | +| `Zenoh` | Reliable network stream transport | Yes | Yes | Recommended on macOS for heavy replay | | `Redis` | Network pubsub via Redis server | Yes | Yes | Central broker; adds hop | | `ROS` | ROS 2 topic communication | Yes | Yes | Integrates with RViz/ROS tools | | `DDS` | Cyclone DDS without ROS (WIP) | Yes | Yes | WIP | diff --git a/pyproject.toml b/pyproject.toml index 7435473562..b3709ecec9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -94,6 +94,7 @@ classifiers = [ dependencies = [ # Transport Protocols "dimos-lcm>=0.1.3", + "eclipse-zenoh>=1.0.0,<2.0", "PyTurboJPEG==1.8.2", # Core "numpy>=1.26.4", @@ -491,6 +492,7 @@ python_version = "3.12" incremental = true strict = true warn_unused_ignores = false +untyped_calls_exclude = ["zenoh"] explicit_package_bases = true exclude = "^dimos/models/Detic(/|$)|.*/test_.|.*/conftest.py*" diff --git a/uv.lock b/uv.lock index 49703d472d..c2d8cb7241 100644 --- a/uv.lock +++ b/uv.lock @@ -1954,6 +1954,7 @@ dependencies = [ { name = "cryptography" }, { name = "dimos-lcm" }, { name = "dimos-viewer" }, + { name = "eclipse-zenoh" }, { name = "llvmlite" }, { name = "lz4" }, { name = "numba" }, @@ -2405,6 +2406,7 @@ requires-dist = [ { name = "dimos-viewer", marker = "extra == 'visualization'", specifier = "==0.32.0a1" }, { name = "drake", marker = "platform_machine != 'aarch64' and sys_platform == 'darwin' and extra == 'manipulation'", specifier = "==1.45.0" }, { name = "drake", marker = "platform_machine != 'aarch64' and sys_platform != 'darwin' and extra == 'manipulation'", specifier = ">=1.40.0" }, + { name = "eclipse-zenoh", specifier = ">=1.0.0,<2.0" }, { name = "edgetam-dimos", marker = "extra == 'misc'" }, { name = "einops", marker = "extra == 'perception'", specifier = ">=0.8.1" }, { name = "fastapi", marker = "extra == 'web'", specifier = ">=0.115.6" }, @@ -2801,6 +2803,22 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b0/0d/9feae160378a3553fa9a339b0e9c1a048e147a4127210e286ef18b730f03/durationpy-0.10-py3-none-any.whl", hash = "sha256:3b41e1b601234296b4fb368338fdcd3e13e0b4fb5b67345948f4f2bf9868b286", size = 3922, upload-time = "2025-05-17T13:52:36.463Z" }, ] +[[package]] +name = "eclipse-zenoh" +version = "1.9.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/d9/42/c8502d0e77f74b9cf4c192a01e620b3d15273d371464485796807d202d9d/eclipse_zenoh-1.9.0.tar.gz", hash = "sha256:b0477ab431132ebfe1096eccac13ea0066d50d1528d726c8872c00e0345070d1", size = 164557, upload-time = "2026-04-10T13:23:35.883Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e7/3b/22b9104b0a022bd2b1627b4866876831585eda2eacb9ca1f3b4b8e847945/eclipse_zenoh-1.9.0-cp39-abi3-linux_armv6l.whl", hash = "sha256:15b6f37c407617ea4de32d32835cbcab4d1a116b892477490fc6c10a7d27c73b", size = 10664168, upload-time = "2026-04-10T13:23:15.008Z" }, + { url = "https://files.pythonhosted.org/packages/05/c5/ee0815c7ec49c5a29307cd935478305159bb3f0b2489f8c54fc6db3fdf36/eclipse_zenoh-1.9.0-cp39-abi3-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl", hash = "sha256:6f66059b12e1ec53c70bc25192b0e74502751759064726dbb153ed6dd8f4dc8b", size = 19942168, upload-time = "2026-04-10T13:23:17.785Z" }, + { url = "https://files.pythonhosted.org/packages/7b/6a/42b83b4e8c262ebbb3bcae702394478326c807f54b3162130b0a603e1a01/eclipse_zenoh-1.9.0-cp39-abi3-macosx_10_12_x86_64.whl", hash = "sha256:180dd2a6da3b86b52e87f5e470a1f8a86db03c519978b22ffb1dc7c11f98ef3b", size = 10225694, upload-time = "2026-04-10T13:23:20.244Z" }, + { url = "https://files.pythonhosted.org/packages/27/57/28e66893801b63df36fea355a64b6fc22637e1148a952ee11e3039ae955e/eclipse_zenoh-1.9.0-cp39-abi3-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:949d82851bc9e3ad646fd1307ee544ed23359dcfd18d4065075fc592f6ab6fa7", size = 10517069, upload-time = "2026-04-10T13:23:23.053Z" }, + { url = "https://files.pythonhosted.org/packages/f0/2f/be614f1f7f4e046da2764cd36227d19db3655839219744ce7a12e6e2dae6/eclipse_zenoh-1.9.0-cp39-abi3-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:3a1fe847225cda21e3e74677cfd4ddfd2e72600d5a56968d4229d981c67f78d4", size = 11580068, upload-time = "2026-04-10T13:23:25.594Z" }, + { url = "https://files.pythonhosted.org/packages/58/1b/2a074d4f4595bd37c3d12f1b2ad49bceef5c8cd0962cbfd97d1d39f32e1f/eclipse_zenoh-1.9.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:43299593891cfd648bca4b2aa00f3dca916508a49a0c9e6960902e6e867b247e", size = 10537556, upload-time = "2026-04-10T13:23:28.414Z" }, + { url = "https://files.pythonhosted.org/packages/ab/33/c3116f1bf7647ee0ea8972efbe0fe5710ae75ea7226440a8fda7f04a4cbc/eclipse_zenoh-1.9.0-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:8c139a43706c8ff3c94fa625008af8667687c161a8395ad1fa3faff29c16fae4", size = 10721249, upload-time = "2026-04-10T13:23:30.843Z" }, + { url = "https://files.pythonhosted.org/packages/26/16/a94c4f37e3a088faadf4b5fbc64e5f69dea1023dc7efc49b3be0e0ecc953/eclipse_zenoh-1.9.0-cp39-abi3-win_amd64.whl", hash = "sha256:5dfb352eca4585b85edbbc84c6db58906008e202823ca280496c0b867f9719f0", size = 9124510, upload-time = "2026-04-10T13:23:34.119Z" }, +] + [[package]] name = "edgetam-dimos" version = "1.0"