diff --git a/dimos/control/extensions.py b/dimos/control/extensions.py new file mode 100644 index 0000000000..43a320fe16 --- /dev/null +++ b/dimos/control/extensions.py @@ -0,0 +1,86 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Public extension surface for ControlCoordinator sideloading.""" + +from __future__ import annotations + +from collections.abc import Callable +from typing import TYPE_CHECKING, cast + +from dimos.control.components import HardwareType +from dimos.control.tasks.registry import control_task_registry +from dimos.control.tasks.registry_utils import normalize_task_name, validate_task_factory_path +from dimos.hardware.registry_utils import normalize_adapter_name + +if TYPE_CHECKING: + from dimos.hardware.drive_trains.spec import TwistBaseAdapter + from dimos.hardware.manipulators.spec import ManipulatorAdapter + from dimos.hardware.whole_body.spec import WholeBodyAdapter + + +def register_hardware_adapter( + hardware_type: HardwareType, + adapter_type: str, + factory: Callable[..., object], +) -> None: + """Register a hardware adapter factory for external packages. + + The adapter is registered into the existing DimOS registry that matches + ``hardware_type``. Re-registering the same adapter name with the same + factory object is idempotent; registering a different factory for an + existing name raises from the target registry. + """ + adapter_name = normalize_adapter_name(adapter_type) + if not callable(factory): + raise TypeError("Hardware adapter factory must be callable") + + match hardware_type: + case HardwareType.MANIPULATOR: + from dimos.hardware.manipulators.registry import adapter_registry + + adapter_registry.register( + adapter_name, cast("Callable[..., ManipulatorAdapter]", factory) + ) + case HardwareType.BASE: + from dimos.hardware.drive_trains.registry import twist_base_adapter_registry + + twist_base_adapter_registry.register( + adapter_name, + cast("Callable[..., TwistBaseAdapter]", factory), + ) + case HardwareType.WHOLE_BODY: + from dimos.hardware.whole_body.registry import whole_body_adapter_registry + + whole_body_adapter_registry.register( + adapter_name, + cast("Callable[..., WholeBodyAdapter]", factory), + ) + case _: + raise ValueError(f"Unsupported hardware type: {hardware_type!r}") + + +def register_control_task(task_type: str, factory_path: str) -> None: + """Register a lazy control task factory path for external packages. + + The target factory module is not imported during registration. It is + resolved later by the control task registry when a coordinator creates a + matching ``TaskConfig``. + """ + task_name = normalize_task_name(task_type) + validate_task_factory_path(factory_path, label="control task factory path") + control_task_registry.register_path(task_name, factory_path) + + +__all__ = ["register_control_task", "register_hardware_adapter"] diff --git a/dimos/control/tasks/registry.py b/dimos/control/tasks/registry.py index aab4d98731..590022db0e 100644 --- a/dimos/control/tasks/registry.py +++ b/dimos/control/tasks/registry.py @@ -33,6 +33,8 @@ import os from typing import TYPE_CHECKING, cast +from dimos.control.tasks.registry_utils import normalize_task_name, validate_task_factory_path + if TYPE_CHECKING: from dimos.control.coordinator import TaskConfig from dimos.control.hardware_interface import ConnectedHardware, ConnectedWholeBody @@ -78,9 +80,8 @@ def discover(self) -> None: def register_path(self, name: str, factory_path: str) -> None: """Register a lazy task factory import path.""" - if ":" not in factory_path: - raise ValueError(f"Invalid task factory path: {factory_path!r}") - key = name.lower() + key = normalize_task_name(name) + validate_task_factory_path(factory_path) existing = self._factory_paths.get(key) if existing is not None and existing != factory_path: raise ValueError(f"Duplicate task type {key!r}: {existing!r} vs {factory_path!r}") @@ -103,7 +104,7 @@ def create( adapter resolve it from their typed params; pass ``None`` only if no task in this registry needs hardware. """ - key = name.lower() + key = normalize_task_name(name) factory = self._resolve_factory(key) return factory(cfg=cfg, hardware=hardware or {}) diff --git a/dimos/control/tasks/registry_utils.py b/dimos/control/tasks/registry_utils.py new file mode 100644 index 0000000000..84142ad637 --- /dev/null +++ b/dimos/control/tasks/registry_utils.py @@ -0,0 +1,39 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Shared helpers for control task registries and extension registration.""" + +from __future__ import annotations + + +def normalize_task_name(name: str) -> str: + """Normalize task type names consistently for registration and lookup.""" + key = name.strip().lower() + if not key: + raise ValueError("Task type must be non-empty") + return key + + +def validate_task_factory_path( + factory_path: str, + *, + label: str = "task factory path", +) -> None: + """Validate a lazy factory path of the form ``module:function``.""" + module_name, separator, attr = factory_path.partition(":") + if not factory_path.strip() or separator != ":" or not module_name or not attr: + raise ValueError(f"Invalid {label}: {factory_path!r}") + + +__all__ = ["normalize_task_name", "validate_task_factory_path"] diff --git a/dimos/control/test_extensions.py b/dimos/control/test_extensions.py new file mode 100644 index 0000000000..674b6b4100 --- /dev/null +++ b/dimos/control/test_extensions.py @@ -0,0 +1,266 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for ControlCoordinator extension sideloading.""" + +from __future__ import annotations + +from collections.abc import Generator +import sys +from types import ModuleType + +import pytest + +from dimos.control.components import HardwareComponent, HardwareType, make_twist_base_joints +from dimos.control.coordinator import ControlCoordinator, TaskConfig +from dimos.control.extensions import register_control_task, register_hardware_adapter +from dimos.control.task import BaseControlTask, CoordinatorState, JointCommandOutput, ResourceClaim +from dimos.control.tasks.registry import control_task_registry +from dimos.hardware.drive_trains.registry import twist_base_adapter_registry +from dimos.hardware.manipulators.registry import adapter_registry +from dimos.hardware.whole_body.registry import whole_body_adapter_registry + + +class ExternalBaseAdapter: + def __init__(self, dof: int = 3, **kwargs: object) -> None: + self._dof = dof + self._connected = False + self._enabled = False + + def connect(self) -> bool: + self._connected = True + return True + + def disconnect(self) -> None: + self._connected = False + + def is_connected(self) -> bool: + return self._connected + + def get_dof(self) -> int: + return self._dof + + def read_velocities(self) -> list[float]: + return [0.0] * self._dof + + def read_odometry(self) -> list[float] | None: + return [0.0] * self._dof + + def write_velocities(self, velocities: list[float]) -> bool: + return True + + def write_stop(self) -> bool: + return True + + def write_enable(self, enable: bool) -> bool: + self._enabled = enable + return True + + def read_enabled(self) -> bool: + return self._enabled + + +class ExternalControlTask(BaseControlTask): + def __init__(self, cfg: TaskConfig, hardware: object) -> None: + self._name = cfg.name + self.hardware = hardware + + @property + def name(self) -> str: + return self._name + + def claim(self) -> ResourceClaim: + return ResourceClaim(joints=frozenset()) + + def is_active(self) -> bool: + return False + + def compute(self, state: CoordinatorState) -> JointCommandOutput | None: + return None + + +@pytest.fixture(autouse=True) +def restore_registries() -> Generator[None, None, None]: + manipulator_adapters = adapter_registry._adapters.copy() + base_adapters = twist_base_adapter_registry._adapters.copy() + whole_body_adapters = whole_body_adapter_registry._adapters.copy() + task_paths = control_task_registry._factory_paths.copy() + task_factories = control_task_registry._factories.copy() + try: + yield + finally: + adapter_registry._adapters = manipulator_adapters + twist_base_adapter_registry._adapters = base_adapters + whole_body_adapter_registry._adapters = whole_body_adapters + control_task_registry._factory_paths = task_paths + control_task_registry._factories = task_factories + + +def test_register_hardware_adapter_supports_all_hardware_types() -> None: + def manipulator_factory(**kwargs: object) -> object: + return object() + + def base_factory(**kwargs: object) -> object: + return object() + + def whole_body_factory(**kwargs: object) -> object: + return object() + + register_hardware_adapter(HardwareType.MANIPULATOR, "Ext_Manipulator", manipulator_factory) + register_hardware_adapter(HardwareType.BASE, "Ext_Base", base_factory) + register_hardware_adapter(HardwareType.WHOLE_BODY, "Ext_Whole_Body", whole_body_factory) + + assert adapter_registry._adapters["ext_manipulator"] is manipulator_factory + assert twist_base_adapter_registry._adapters["ext_base"] is base_factory + assert whole_body_adapter_registry._adapters["ext_whole_body"] is whole_body_factory + + +def test_hardware_adapter_create_uses_same_normalization_as_register() -> None: + class PaddedAdapter: + pass + + register_hardware_adapter(HardwareType.MANIPULATOR, " Padded_Manipulator ", PaddedAdapter) + register_hardware_adapter(HardwareType.BASE, " Padded_Base ", PaddedAdapter) + register_hardware_adapter(HardwareType.WHOLE_BODY, " Padded_Whole_Body ", PaddedAdapter) + + assert isinstance(adapter_registry.create(" padded_manipulator "), PaddedAdapter) + assert isinstance(twist_base_adapter_registry.create(" padded_base "), PaddedAdapter) + assert isinstance(whole_body_adapter_registry.create(" padded_whole_body "), PaddedAdapter) + + +@pytest.mark.parametrize( + ("hardware_type", "registry_name"), + [ + (HardwareType.MANIPULATOR, "manipulator"), + (HardwareType.BASE, "base"), + (HardwareType.WHOLE_BODY, "whole_body"), + ], +) +def test_register_hardware_adapter_duplicate_policy_keeps_original( + hardware_type: HardwareType, + registry_name: str, +) -> None: + def original_factory(**kwargs: object) -> object: + return object() + + def different_factory(**kwargs: object) -> object: + return object() + + adapter_name = f"external_duplicate_{hardware_type.value}" + + register_hardware_adapter(hardware_type, adapter_name, original_factory) + register_hardware_adapter(hardware_type, adapter_name, original_factory) + + with pytest.raises(ValueError): + register_hardware_adapter(hardware_type, adapter_name, different_factory) + + registries = { + "manipulator": adapter_registry._adapters, + "base": twist_base_adapter_registry._adapters, + "whole_body": whole_body_adapter_registry._adapters, + } + assert registries[registry_name][adapter_name] is original_factory + + +def test_register_control_task_registers_lazy_path_without_importing_target(mocker) -> None: + import_module = mocker.patch("dimos.control.tasks.registry.importlib.import_module") + + register_control_task("External_Task", "external_pkg.tasks:make_task") + + assert control_task_registry._factory_paths["external_task"] == "external_pkg.tasks:make_task" + import_module.assert_not_called() + + +def test_control_task_create_uses_same_normalization_as_register( + monkeypatch: pytest.MonkeyPatch, +) -> None: + module_name = "external_padded_task_module" + module = ModuleType(module_name) + module.__dict__["make_task"] = ExternalControlTask + monkeypatch.setitem(sys.modules, module_name, module) + + control_task_registry.register_path(" External_Padded_Task ", f"{module_name}:make_task") + + task = control_task_registry.create( + " external_padded_task ", + TaskConfig(name="external_padded_task", type="external_padded_task"), + ) + + assert isinstance(task, ExternalControlTask) + + +@pytest.mark.parametrize( + "factory_path", ["", "external_pkg.tasks", ":make_task", "external_pkg.tasks:"] +) +def test_register_control_task_validates_path_format_only(factory_path: str) -> None: + with pytest.raises(ValueError): + register_control_task("external_invalid_path", factory_path) + + +def test_register_control_task_duplicate_policy_keeps_original() -> None: + register_control_task("external_duplicate_task", "external_pkg.tasks:make_task") + register_control_task("external_duplicate_task", "external_pkg.tasks:make_task") + + with pytest.raises(ValueError): + register_control_task("external_duplicate_task", "external_pkg.other:make_task") + + assert ( + control_task_registry._factory_paths["external_duplicate_task"] + == "external_pkg.tasks:make_task" + ) + + +def test_control_coordinator_resolves_external_base_adapter_without_manifest() -> None: + adapter_name = "external_base_no_manifest" + register_hardware_adapter(HardwareType.BASE, adapter_name, ExternalBaseAdapter) + coordinator = ControlCoordinator( + publish_joint_state=False, + hardware=[ + HardwareComponent( + hardware_id="external_base", + hardware_type=HardwareType.BASE, + joints=make_twist_base_joints("external_base"), + adapter_type=adapter_name, + ) + ], + ) + + try: + coordinator.start() + connected = coordinator._hardware["external_base"] + assert isinstance(connected.adapter, ExternalBaseAdapter) + assert "external_base/vx" in coordinator._joint_to_hardware + finally: + coordinator.stop() + + +def test_control_coordinator_resolves_external_task_from_lazy_registry_without_manifest( + monkeypatch: pytest.MonkeyPatch, +) -> None: + module_name = "external_control_extension_task_module" + task_type = "external_task_no_manifest" + module = ModuleType(module_name) + module.__dict__["make_task"] = ExternalControlTask + monkeypatch.setitem(sys.modules, module_name, module) + register_control_task(task_type, f"{module_name}:make_task") + coordinator = ControlCoordinator( + publish_joint_state=False, + tasks=[TaskConfig(name="external_task", type=task_type)], + ) + + try: + coordinator.start() + assert isinstance(coordinator.get_task("external_task"), ExternalControlTask) + finally: + coordinator.stop() diff --git a/dimos/control/test_external_control_extension_example.py b/dimos/control/test_external_control_extension_example.py new file mode 100644 index 0000000000..e866f0cfe7 --- /dev/null +++ b/dimos/control/test_external_control_extension_example.py @@ -0,0 +1,77 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for the logged external ControlCoordinator extension example.""" + +from __future__ import annotations + +from collections.abc import Generator +import importlib +import logging +from pathlib import Path +import sys + +import pytest + +from dimos.control.tasks.registry import control_task_registry +from dimos.hardware.drive_trains.registry import twist_base_adapter_registry + +EXAMPLE_ROOT = Path(__file__).parents[2] / "examples" / "external_control_extension" +EXAMPLE_PACKAGE = "dimos_external_control_extension" + + +@pytest.fixture(autouse=True) +def restore_external_example_state() -> Generator[None, None, None]: + base_adapters = twist_base_adapter_registry._adapters.copy() + task_paths = control_task_registry._factory_paths.copy() + task_factories = control_task_registry._factories.copy() + loaded_modules = { + name: module + for name, module in sys.modules.items() + if name == EXAMPLE_PACKAGE or name.startswith(f"{EXAMPLE_PACKAGE}.") + } + try: + yield + finally: + twist_base_adapter_registry._adapters = base_adapters + control_task_registry._factory_paths = task_paths + control_task_registry._factories = task_factories + for name in list(sys.modules): + if name == EXAMPLE_PACKAGE or name.startswith(f"{EXAMPLE_PACKAGE}."): + del sys.modules[name] + sys.modules.update(loaded_modules) + + +def test_external_control_extension_demo_logs_registration_and_runtime( + caplog: pytest.LogCaptureFixture, + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.syspath_prepend(str(EXAMPLE_ROOT)) + caplog.set_level(logging.INFO, logger=EXAMPLE_PACKAGE) + blueprints = importlib.import_module(f"{EXAMPLE_PACKAGE}.blueprints") + + blueprints.run_demo(target_writes=2, timeout=2.0) + + messages = "\n".join(caplog.messages) + assert "[external_test_robot] registered hardware adapter: BASE/external_test_base" in messages + assert "[external_test_robot] registered control task: external_test_drive" in messages + assert "[external_test_robot] ExternalTestBaseAdapter.__init__" in messages + assert "[external_test_robot] ExternalTestBaseAdapter.connect()" in messages + assert "[external_test_robot] ExternalTestDriveTask created for task=drive_demo" in messages + assert "[external_test_robot] ExternalTestDriveTask.compute(tick=1" in messages + assert ( + "[external_test_robot] ExternalTestBaseAdapter.write_velocities([0.1, 0.0, 0.0])" + in messages + ) + assert "[external_test_robot] demo observed" in messages diff --git a/dimos/hardware/drive_trains/registry.py b/dimos/hardware/drive_trains/registry.py index a4c49b183c..190f9d8581 100644 --- a/dimos/hardware/drive_trains/registry.py +++ b/dimos/hardware/drive_trains/registry.py @@ -35,6 +35,7 @@ import os from typing import TYPE_CHECKING, Any +from dimos.hardware.registry_utils import normalize_adapter_name from dimos.utils.logging_config import setup_logger if TYPE_CHECKING: @@ -50,9 +51,15 @@ def __init__(self) -> None: self._adapters: dict[str, type[TwistBaseAdapter] | Callable[..., TwistBaseAdapter]] = {} def register( - self, name: str, cls: type[TwistBaseAdapter] | Callable[..., TwistBaseAdapter] + self, name: str, factory: type[TwistBaseAdapter] | Callable[..., TwistBaseAdapter] ) -> None: - self._adapters[name.lower()] = cls + key = normalize_adapter_name(name) + existing = self._adapters.get(key) + if existing is factory: + return + if existing is not None: + raise ValueError(f"Duplicate twist base adapter {key!r}") + self._adapters[key] = factory def create(self, name: str, **kwargs: Any) -> TwistBaseAdapter: """Create an adapter instance by name. @@ -67,7 +74,7 @@ def create(self, name: str, **kwargs: Any) -> TwistBaseAdapter: Raises: KeyError: If adapter name is not found """ - key = name.lower() + key = normalize_adapter_name(name) if key not in self._adapters: raise KeyError(f"Unknown twist base adapter: {name}. Available: {self.available()}") diff --git a/dimos/hardware/manipulators/registry.py b/dimos/hardware/manipulators/registry.py index a8c87c149c..770ba90543 100644 --- a/dimos/hardware/manipulators/registry.py +++ b/dimos/hardware/manipulators/registry.py @@ -31,9 +31,11 @@ from __future__ import annotations +from collections.abc import Callable import importlib from typing import TYPE_CHECKING, Any +from dimos.hardware.registry_utils import normalize_adapter_name from dimos.utils.logging_config import setup_logger if TYPE_CHECKING: @@ -46,11 +48,23 @@ class AdapterRegistry: """Registry for manipulator adapters with auto-discovery.""" def __init__(self) -> None: - self._adapters: dict[str, type[ManipulatorAdapter]] = {} + self._adapters: dict[str, Callable[..., ManipulatorAdapter]] = {} - def register(self, name: str, cls: type[ManipulatorAdapter]) -> None: - """Register an adapter class.""" - self._adapters[name.lower()] = cls + def register(self, name: str, factory: Callable[..., ManipulatorAdapter]) -> None: + """Register an adapter factory. + + Re-registering the same name with the same factory object is + idempotent. Re-registering the same name with a different factory is a + configuration error because it would otherwise silently change which + adapter a blueprint resolves. + """ + key = normalize_adapter_name(name) + existing = self._adapters.get(key) + if existing is factory: + return + if existing is not None: + raise ValueError(f"Duplicate manipulator adapter {key!r}") + self._adapters[key] = factory def create(self, name: str, **kwargs: Any) -> ManipulatorAdapter: """Create an adapter instance by name. @@ -65,7 +79,7 @@ def create(self, name: str, **kwargs: Any) -> ManipulatorAdapter: Raises: KeyError: If adapter name is not found """ - key = name.lower() + key = normalize_adapter_name(name) if key not in self._adapters: raise KeyError(f"Unknown adapter: {name}. Available: {self.available()}") diff --git a/dimos/hardware/registry_utils.py b/dimos/hardware/registry_utils.py new file mode 100644 index 0000000000..e50c167900 --- /dev/null +++ b/dimos/hardware/registry_utils.py @@ -0,0 +1,28 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Shared helpers for hardware registries.""" + +from __future__ import annotations + + +def normalize_adapter_name(name: str) -> str: + """Normalize adapter names consistently for registration and lookup.""" + key = name.strip().lower() + if not key: + raise ValueError("Adapter name must be non-empty") + return key + + +__all__ = ["normalize_adapter_name"] diff --git a/dimos/hardware/whole_body/registry.py b/dimos/hardware/whole_body/registry.py index 5e6291b946..847d8b84fa 100644 --- a/dimos/hardware/whole_body/registry.py +++ b/dimos/hardware/whole_body/registry.py @@ -38,6 +38,7 @@ import os from typing import TYPE_CHECKING, Any +from dimos.hardware.registry_utils import normalize_adapter_name from dimos.utils.logging_config import setup_logger if TYPE_CHECKING: @@ -57,11 +58,17 @@ def __init__(self) -> None: def register(self, name: str, cls: Callable[..., WholeBodyAdapter]) -> None: """Register an adapter factory (class or callable).""" - self._adapters[name.lower()] = cls + key = normalize_adapter_name(name) + existing = self._adapters.get(key) + if existing is cls: + return + if existing is not None: + raise ValueError(f"Duplicate whole-body adapter {key!r}") + self._adapters[key] = cls def create(self, name: str, **kwargs: Any) -> WholeBodyAdapter: """Create an adapter instance by name.""" - key = name.lower() + key = normalize_adapter_name(name) if key not in self._adapters: raise KeyError(f"Unknown whole-body adapter: {name}. Available: {self.available()}") return self._adapters[key](**kwargs) diff --git a/docs/capabilities/external_robot_packages.md b/docs/capabilities/external_robot_packages.md new file mode 100644 index 0000000000..e0fa486424 --- /dev/null +++ b/docs/capabilities/external_robot_packages.md @@ -0,0 +1,202 @@ +# External Robot Packages + +This guide shows how an installed package can provide the ControlCoordinator pieces its blueprints reference without putting adapter or task files inside the DimOS source tree. + +Use this path when you are building a robot package outside the DimOS repository, such as `dimos_mydog`, `dimos_myarm`, or `dimos_myhumanoid`. + +## What this supports + +This guide covers explicit registration for: + +- hardware adapters used by `HardwareComponent(adapter_type=...)`; +- control tasks used by `TaskConfig(type=...)`. + +It does not add package auto-discovery, Python entry point discovery, blueprint discovery changes, template generation, or `available_*` diagnostics helpers. Your package still needs to be imported by user code or by a separate blueprint-discovery mechanism before its registered names can be used. + +## Package layout + +A small external robot package usually has this shape: + +```text +dimos_mydog/ +├── adapters.py +├── blueprints.py # calls register_extensions() +├── tasks/ +│ └── gait.py # lazy control task factory +├── skills.py # optional +└── prompts.py # optional +``` + +The important part is that the module defining the blueprint also performs extension registration before the blueprint is built. + +For a runnable no-hardware version of this pattern, see `examples/external_control_extension/`: + +```bash +uv run python examples/external_control_extension/demo_external_control.py +``` + +The demo logs registration, adapter construction, task creation, task ticks, and adapter writes so you can see that the externally registered pieces are actually running. + +## Supported embodiments + +Register hardware adapters for the same hardware categories that `ControlCoordinator` already understands: + +| Embodiment | Hardware type | Example adapter name | +|------------|---------------|----------------------| +| External arm | `HardwareType.MANIPULATOR` | `"myarm"` | +| Robot dog or mobile base | `HardwareType.BASE` | `"mydog"` | +| Humanoid or whole-body robot | `HardwareType.WHOLE_BODY` | `"myhumanoid"` | + +Each registration routes to the matching built-in registry, but external packages should use only the public facade in `dimos.control.extensions`. + +## Full example: robot dog / mobile base + +In `dimos_mydog/adapters.py`, define a base adapter that implements the twist-base adapter protocol used by `HardwareType.BASE`: + +```python skip +class MyDogAdapter: + def __init__( + self, + dof: int, + address: str | None = None, + hardware_id: str = "base", + **_: object, + ) -> None: + self._dof = dof + self._address = address + self._hardware_id = hardware_id + self._connected = False + self._enabled = False + + def connect(self) -> bool: + self._connected = True + return True + + def disconnect(self) -> None: + self._connected = False + + def is_connected(self) -> bool: + return self._connected + + def get_dof(self) -> int: + return self._dof + + def read_velocities(self) -> list[float]: + return [0.0] * self._dof + + def read_odometry(self) -> list[float] | None: + return None + + def write_velocities(self, velocities: list[float]) -> bool: + return True + + def write_stop(self) -> bool: + return True + + def write_enable(self, enable: bool) -> bool: + self._enabled = enable + return True + + def read_enabled(self) -> bool: + return self._enabled +``` + +In `dimos_mydog/tasks/gait.py`, define the task factory. The task module is referenced by import path, so it is imported only when a coordinator actually creates this task: + +```python skip +from dimos.control.task import BaseControlTask, CoordinatorState, JointCommandOutput, ResourceClaim + + +class MyDogGaitTask(BaseControlTask): + def __init__(self, name: str) -> None: + self._name = name + + @property + def name(self) -> str: + return self._name + + def claim(self) -> ResourceClaim: + return ResourceClaim(joints=frozenset()) + + def is_active(self) -> bool: + return False + + def compute(self, state: CoordinatorState) -> JointCommandOutput | None: + return None + + def on_preempted(self, by_task: str, joints: frozenset[str]) -> None: + return None + + +def create_task(cfg, hardware): + return MyDogGaitTask(cfg.name) +``` + +In `dimos_mydog/blueprints.py`, group registration in `register_extensions()` and call it at module import time: + +```python skip +from dimos.control.components import HardwareComponent, HardwareType, make_twist_base_joints +from dimos.control.coordinator import ControlCoordinator, TaskConfig +from dimos.control.extensions import register_control_task, register_hardware_adapter +from dimos.core.coordination.blueprints import autoconnect + +from dimos_mydog.adapters import MyDogAdapter + + +def register_extensions() -> None: + register_hardware_adapter(HardwareType.BASE, "mydog", MyDogAdapter) + register_control_task("mydog_gait", "dimos_mydog.tasks.gait:create_task") + + +register_extensions() + +mydog = autoconnect( + ControlCoordinator.blueprint( + hardware=[ + HardwareComponent( + hardware_id="base", + hardware_type=HardwareType.BASE, + joints=make_twist_base_joints("base"), + adapter_type="mydog", + ), + ], + tasks=[ + TaskConfig( + name="gait", + type="mydog_gait", + joint_names=[], + auto_start=True, + ), + ], + ), +) +``` + +The blueprint still uses the normal DimOS configuration objects: + +```python skip +HardwareComponent(hardware_type=HardwareType.BASE, adapter_type="mydog") +TaskConfig(type="mydog_gait") +``` + +The only new requirement is that `register_extensions()` runs before the coordinator resolves those names. + +## Other embodiments + +The same pattern works for arms and whole-body robots: + +```python skip +from dimos.control.components import HardwareType +from dimos.control.extensions import register_hardware_adapter + +register_hardware_adapter(HardwareType.MANIPULATOR, "myarm", MyArmAdapter) +register_hardware_adapter(HardwareType.BASE, "mydog", MyDogAdapter) +register_hardware_adapter(HardwareType.WHOLE_BODY, "myhumanoid", MyHumanoidAdapter) +``` + +Choose the hardware type that matches the protocol your adapter implements: + +- `MANIPULATOR`: arm-style joint, Cartesian, and gripper IO. +- `BASE`: velocity-commanded platforms that consume twist-like virtual joints. +- `WHOLE_BODY`: joint-level whole-body motor IO. + diff --git a/docs/capabilities/manipulation/adding_a_custom_arm.md b/docs/capabilities/manipulation/adding_a_custom_arm.md index 3d33f8877a..a663367df2 100644 --- a/docs/capabilities/manipulation/adding_a_custom_arm.md +++ b/docs/capabilities/manipulation/adding_a_custom_arm.md @@ -2,6 +2,8 @@ This guide walks through integrating a new robot arm with DimOS, from writing the hardware adapter to creating blueprints for planning and control. +If you are packaging an arm outside the DimOS repository, start with [External Robot Packages](/docs/capabilities/external_robot_packages.md). That guide shows how to register adapters and control tasks from an installed package without placing files under `dimos/hardware/` or `dimos/robot/`. + ## Architecture Overview DimOS uses a **Protocol-based adapter pattern** — no base class inheritance required. Your adapter wraps the vendor SDK and exposes a standard interface that the rest of the system consumes: diff --git a/examples/external_control_extension/README.md b/examples/external_control_extension/README.md new file mode 100644 index 0000000000..f080946cf9 --- /dev/null +++ b/examples/external_control_extension/README.md @@ -0,0 +1,23 @@ +# External Control Extension Example + +This example is shaped like a tiny package outside `dimos/`. It proves that an +external package can register a hardware adapter and a control task, then run +them through the normal `ControlCoordinator` path. + +Run the demo and watch the logs: + +```bash +uv run python examples/external_control_extension/demo_external_control.py +``` + +Expected log events include: + +- registering `BASE/external_test_base` +- registering task type `external_test_drive` +- constructing and connecting `ExternalTestBaseAdapter` +- creating `ExternalTestDriveTask` +- ticking `ExternalTestDriveTask.compute(...)` +- sending commands through `ExternalTestBaseAdapter.write_velocities(...)` + +This example performs no robot I/O. The adapter records commands in memory so +the demo is safe to run locally and in CI. diff --git a/examples/external_control_extension/demo_external_control.py b/examples/external_control_extension/demo_external_control.py new file mode 100644 index 0000000000..1eca27dca4 --- /dev/null +++ b/examples/external_control_extension/demo_external_control.py @@ -0,0 +1,30 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Run the external ControlCoordinator extension example.""" + +from __future__ import annotations + +import logging + +from dimos_external_control_extension.blueprints import run_demo + + +def main() -> None: + logging.basicConfig(level=logging.INFO, format="%(message)s") + run_demo(target_writes=3, timeout=2.0) + + +if __name__ == "__main__": + main() diff --git a/examples/external_control_extension/dimos_external_control_extension/__init__.py b/examples/external_control_extension/dimos_external_control_extension/__init__.py new file mode 100644 index 0000000000..a8f52f3050 --- /dev/null +++ b/examples/external_control_extension/dimos_external_control_extension/__init__.py @@ -0,0 +1,23 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Sample external package for ControlCoordinator extension sideloading.""" + +from dimos_external_control_extension.blueprints import ( + build_demo_coordinator, + register_extensions, + run_demo, +) + +__all__ = ["build_demo_coordinator", "register_extensions", "run_demo"] diff --git a/examples/external_control_extension/dimos_external_control_extension/adapters.py b/examples/external_control_extension/dimos_external_control_extension/adapters.py new file mode 100644 index 0000000000..8fb24f2200 --- /dev/null +++ b/examples/external_control_extension/dimos_external_control_extension/adapters.py @@ -0,0 +1,102 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Hardware adapter implemented by an external package.""" + +from __future__ import annotations + +import logging +from pathlib import Path +import threading + +LOGGER = logging.getLogger("dimos_external_control_extension") + + +class ExternalTestBaseAdapter: + """In-memory twist-base adapter used by the external package demo.""" + + def __init__( + self, + dof: int = 3, + address: str | Path | None = None, + hardware_id: str = "external_test_base", + **adapter_kwargs: object, + ) -> None: + self._dof = dof + self._hardware_id = hardware_id + self._connected = False + self._enabled = False + self._last_velocities = [0.0] * dof + self._write_count = 0 + self.command_event = threading.Event() + LOGGER.info( + "[external_test_robot] ExternalTestBaseAdapter.__init__(dof=%s, hardware_id=%s, address=%s, kwargs=%s)", + dof, + hardware_id, + address, + adapter_kwargs, + ) + + @property + def write_count(self) -> int: + return self._write_count + + @property + def last_velocities(self) -> list[float]: + return list(self._last_velocities) + + def connect(self) -> bool: + self._connected = True + LOGGER.info("[external_test_robot] ExternalTestBaseAdapter.connect()") + return True + + def disconnect(self) -> None: + self._connected = False + LOGGER.info("[external_test_robot] ExternalTestBaseAdapter.disconnect()") + + def is_connected(self) -> bool: + return self._connected + + def get_dof(self) -> int: + return self._dof + + def read_velocities(self) -> list[float]: + return list(self._last_velocities) + + def read_odometry(self) -> list[float] | None: + return [0.0] * self._dof + + def write_velocities(self, velocities: list[float]) -> bool: + self._last_velocities = list(velocities) + self._write_count += 1 + self.command_event.set() + LOGGER.info( + "[external_test_robot] ExternalTestBaseAdapter.write_velocities(%s)", + velocities, + ) + return True + + def write_stop(self) -> bool: + return self.write_velocities([0.0] * self._dof) + + def write_enable(self, enable: bool) -> bool: + self._enabled = enable + LOGGER.info("[external_test_robot] ExternalTestBaseAdapter.write_enable(%s)", enable) + return True + + def read_enabled(self) -> bool: + return self._enabled + + +__all__ = ["ExternalTestBaseAdapter"] diff --git a/examples/external_control_extension/dimos_external_control_extension/blueprints.py b/examples/external_control_extension/dimos_external_control_extension/blueprints.py new file mode 100644 index 0000000000..14b5f06562 --- /dev/null +++ b/examples/external_control_extension/dimos_external_control_extension/blueprints.py @@ -0,0 +1,112 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Blueprint-style entry points for the external control extension example.""" + +from __future__ import annotations + +import logging +import time + +from dimos.control.components import HardwareComponent, HardwareType, make_twist_base_joints +from dimos.control.coordinator import ControlCoordinator, TaskConfig +from dimos.control.extensions import register_control_task, register_hardware_adapter +from dimos_external_control_extension.adapters import ExternalTestBaseAdapter + +LOGGER = logging.getLogger("dimos_external_control_extension") + +ADAPTER_TYPE = "external_test_base" +TASK_TYPE = "external_test_drive" +TASK_FACTORY_PATH = "dimos_external_control_extension.tasks:create_task" +HARDWARE_ID = "external_test_base" +TASK_NAME = "drive_demo" + + +def register_extensions() -> None: + """Register this package's external ControlCoordinator extensions.""" + register_hardware_adapter(HardwareType.BASE, ADAPTER_TYPE, ExternalTestBaseAdapter) + LOGGER.info( + "[external_test_robot] registered hardware adapter: BASE/%s", + ADAPTER_TYPE, + ) + register_control_task(TASK_TYPE, TASK_FACTORY_PATH) + LOGGER.info("[external_test_robot] registered control task: %s", TASK_TYPE) + + +def build_demo_coordinator() -> ControlCoordinator: + """Build a coordinator using only externally registered names.""" + register_extensions() + return ControlCoordinator( + tick_rate=20.0, + publish_joint_state=False, + log_ticks=True, + hardware=[ + HardwareComponent( + hardware_id=HARDWARE_ID, + hardware_type=HardwareType.BASE, + joints=make_twist_base_joints(HARDWARE_ID), + adapter_type=ADAPTER_TYPE, + ) + ], + tasks=[ + TaskConfig( + name=TASK_NAME, + type=TASK_TYPE, + joint_names=make_twist_base_joints(HARDWARE_ID), + priority=10, + params={"velocity": 0.1}, + ) + ], + ) + + +def run_demo(target_writes: int = 3, timeout: float = 2.0) -> ControlCoordinator: + """Start the demo coordinator until the external adapter receives commands.""" + coordinator = build_demo_coordinator() + coordinator.start() + try: + adapter = coordinator._hardware[HARDWARE_ID].adapter + if not isinstance(adapter, ExternalTestBaseAdapter): + raise TypeError(f"Expected ExternalTestBaseAdapter, got {type(adapter).__name__}") + + deadline = time.monotonic() + timeout + while adapter.write_count < target_writes: + remaining = deadline - time.monotonic() + if remaining <= 0: + raise TimeoutError( + f"External adapter received {adapter.write_count} writes; " + f"expected {target_writes}" + ) + adapter.command_event.wait(timeout=remaining) + adapter.command_event.clear() + LOGGER.info( + "[external_test_robot] demo observed %s external adapter writes", + adapter.write_count, + ) + return coordinator + finally: + coordinator.stop() + + +register_extensions() + +__all__ = [ + "ADAPTER_TYPE", + "HARDWARE_ID", + "TASK_NAME", + "TASK_TYPE", + "build_demo_coordinator", + "register_extensions", + "run_demo", +] diff --git a/examples/external_control_extension/dimos_external_control_extension/tasks.py b/examples/external_control_extension/dimos_external_control_extension/tasks.py new file mode 100644 index 0000000000..e5c09f3e52 --- /dev/null +++ b/examples/external_control_extension/dimos_external_control_extension/tasks.py @@ -0,0 +1,98 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Control task implemented by an external package.""" + +from __future__ import annotations + +from collections.abc import Mapping +import logging + +from dimos.control.coordinator import TaskConfig +from dimos.control.hardware_interface import ConnectedHardware, ConnectedWholeBody +from dimos.control.task import BaseControlTask, CoordinatorState, JointCommandOutput, ResourceClaim +from dimos.hardware.manipulators.spec import ControlMode + +LOGGER = logging.getLogger("dimos_external_control_extension") + + +class ExternalTestDriveTask(BaseControlTask): + """Always-active demo task that commands a tiny base velocity.""" + + def __init__( + self, + cfg: TaskConfig, + hardware: Mapping[str, ConnectedHardware | ConnectedWholeBody], + ) -> None: + self._name = cfg.name + self._joint_names = list(cfg.joint_names) + self._priority = cfg.priority + velocity = cfg.params.get("velocity", 0.1) + self._velocity = float(velocity) + self._tick_count = 0 + self._hardware_ids = sorted(hardware.keys()) + LOGGER.info( + "[external_test_robot] ExternalTestDriveTask created for task=%s hardware=%s", + cfg.name, + self._hardware_ids, + ) + + @property + def name(self) -> str: + return self._name + + @property + def tick_count(self) -> int: + return self._tick_count + + def claim(self) -> ResourceClaim: + return ResourceClaim( + joints=frozenset(self._joint_names), + priority=self._priority, + mode=ControlMode.VELOCITY, + ) + + def is_active(self) -> bool: + return True + + def on_preempted(self, by_task: str, joints: frozenset[str]) -> None: + LOGGER.info( + "[external_test_robot] ExternalTestDriveTask.on_preempted(by_task=%s, joints=%s)", + by_task, + sorted(joints), + ) + + def compute(self, state: CoordinatorState) -> JointCommandOutput | None: + self._tick_count += 1 + velocities = [self._velocity, 0.0, 0.0][: len(self._joint_names)] + LOGGER.info( + "[external_test_robot] ExternalTestDriveTask.compute(tick=%s, dt=%.4f)", + self._tick_count, + state.dt, + ) + return JointCommandOutput( + joint_names=self._joint_names, + velocities=velocities, + mode=ControlMode.VELOCITY, + ) + + +def create_task( + cfg: TaskConfig, + hardware: Mapping[str, ConnectedHardware | ConnectedWholeBody], +) -> ExternalTestDriveTask: + return ExternalTestDriveTask(cfg=cfg, hardware=hardware) + + +__all__ = ["ExternalTestDriveTask", "create_task"]