From a1e5c7b87598ef891e9682d3b35e4f46958746d7 Mon Sep 17 00:00:00 2001 From: mustafab0 Date: Sat, 20 Jun 2026 22:33:25 +0000 Subject: [PATCH 1/4] reintroduced hardwre component factory --- dimos/control/blueprints/_hardware.py | 227 ++++++++++++++++++++++++++ 1 file changed, 227 insertions(+) create mode 100644 dimos/control/blueprints/_hardware.py diff --git a/dimos/control/blueprints/_hardware.py b/dimos/control/blueprints/_hardware.py new file mode 100644 index 0000000000..5bb079c654 --- /dev/null +++ b/dimos/control/blueprints/_hardware.py @@ -0,0 +1,227 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Hardware component helpers for coordinator blueprints.""" + +from __future__ import annotations + +from dimos.control.components import ( + HardwareComponent, + HardwareType, + make_joints, + make_twist_base_joints, +) +from dimos.core.global_config import global_config +from dimos.utils.data import LfsPath + +PIPER_FK_MODEL = LfsPath("piper_description/mujoco_model/piper_no_gripper_description.xml") +XARM6_FK_MODEL = LfsPath("xarm_description/urdf/xarm6/xarm6.urdf") +XARM7_FK_MODEL = LfsPath("xarm_description/urdf/xarm7/xarm7.urdf") +A750_FK_MODEL = LfsPath("a750_description/urdf/a750_rev1_no_gripper.urdf") + +XARM7_SIM_PATH = LfsPath("xarm7/scene.xml") +XARM6_SIM_PATH = LfsPath("xarm6/scene.xml") +PIPER_SIM_PATH = LfsPath("piper/scene.xml") + + +def _adapter_kwargs(home_joints: list[float] | None = None) -> dict[str, object]: + if home_joints is None: + return {} + return {"initial_positions": home_joints} + + +def manipulator( + hw_id: str, + dof: int, + *, + adapter_type: str = "mock", + address: str | None = None, + gripper: bool = False, + gripper_suffix: str = "gripper", + auto_enable: bool = True, + adapter_kwargs: dict[str, object] | None = None, + home_joints: list[float] | None = None, +) -> HardwareComponent: + """Create a manipulator hardware component with DimOS joint names.""" + kwargs = _adapter_kwargs(home_joints) + if adapter_kwargs: + kwargs.update(adapter_kwargs) + return HardwareComponent( + hardware_id=hw_id, + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints(hw_id, dof), + adapter_type=adapter_type, + address=address, + auto_enable=auto_enable, + gripper_joints=[f"{hw_id}/{gripper_suffix}"] if gripper else [], + adapter_kwargs=kwargs, + ) + + +def mock_arm( + hw_id: str = "arm", + dof: int = 7, + *, + gripper: bool = False, + gripper_suffix: str = "gripper", + home_joints: list[float] | None = None, +) -> HardwareComponent: + """Mock manipulator with no real hardware.""" + return manipulator( + hw_id, + dof, + gripper=gripper, + gripper_suffix=gripper_suffix, + home_joints=home_joints, + ) + + +def xarm7( + hw_id: str = "arm", + *, + gripper: bool = False, + mock_without_address: bool = False, + home_joints: list[float] | None = None, +) -> HardwareComponent: + """xArm7 hardware, MuJoCo when --simulation is set, or mock if requested.""" + if global_config.simulation: + return manipulator( + hw_id, + 7, + adapter_type="sim_mujoco", + address=str(XARM7_SIM_PATH), + gripper=gripper, + home_joints=home_joints, + ) + address = global_config.xarm7_ip + if mock_without_address and not address: + return mock_arm(hw_id, 7, gripper=gripper, home_joints=home_joints) + return manipulator( + hw_id, + 7, + adapter_type="xarm", + address=address, + gripper=gripper, + home_joints=home_joints, + ) + + +def xarm6( + hw_id: str = "arm", + *, + gripper: bool = False, + mock_without_address: bool = False, + home_joints: list[float] | None = None, +) -> HardwareComponent: + """xArm6 hardware, MuJoCo when --simulation is set, or mock if requested.""" + if global_config.simulation: + return manipulator( + hw_id, + 6, + adapter_type="sim_mujoco", + address=str(XARM6_SIM_PATH), + gripper=gripper, + home_joints=home_joints, + ) + address = global_config.xarm6_ip + if mock_without_address and not address: + return mock_arm(hw_id, 6, gripper=gripper, home_joints=home_joints) + return manipulator( + hw_id, + 6, + adapter_type="xarm", + address=address, + gripper=gripper, + home_joints=home_joints, + ) + + +def piper( + hw_id: str = "arm", + *, + gripper: bool = True, + mock_without_address: bool = False, + home_joints: list[float] | None = None, +) -> HardwareComponent: + """Piper hardware, MuJoCo when --simulation is set, or mock if requested.""" + if global_config.simulation: + return manipulator( + hw_id, + 6, + adapter_type="sim_mujoco", + address=str(PIPER_SIM_PATH), + gripper=gripper, + home_joints=home_joints, + ) + address = global_config.can_port or "can0" + if mock_without_address and not global_config.can_port: + return mock_arm(hw_id, 6, gripper=gripper, home_joints=home_joints) + return manipulator( + hw_id, + 6, + adapter_type="piper", + address=address, + gripper=gripper, + home_joints=home_joints, + ) + + +def a750(hw_id: str = "arm", *, mock_without_address: bool = False) -> HardwareComponent: + """A-750 hardware or mock when no device path is configured.""" + home_joints = [0.0, 0.0, -1.5707963267948966, 0.0, 0.0, 0.0] + if mock_without_address and not global_config.device_path: + return mock_arm( + hw_id, + 6, + gripper=True, + gripper_suffix="finger", + home_joints=home_joints, + ) + return manipulator( + hw_id, + 6, + adapter_type="a750", + address=global_config.device_path or "/dev/ttyACM0", + gripper=True, + gripper_suffix="finger", + home_joints=home_joints, + ) + + +def mock_twist_base(hw_id: str = "base") -> HardwareComponent: + """Mock holonomic twist base (3-DOF: vx, vy, wz).""" + return HardwareComponent( + hardware_id=hw_id, + hardware_type=HardwareType.BASE, + joints=make_twist_base_joints(hw_id), + adapter_type="mock_twist_base", + ) + + +__all__ = [ + "A750_FK_MODEL", + "PIPER_FK_MODEL", + "PIPER_SIM_PATH", + "XARM6_FK_MODEL", + "XARM6_SIM_PATH", + "XARM7_FK_MODEL", + "XARM7_SIM_PATH", + "a750", + "manipulator", + "mock_arm", + "mock_twist_base", + "piper", + "xarm6", + "xarm7", +] From 2e4c1a821f70c785f3375f49118e8e4ca1079d13 Mon Sep 17 00:00:00 2001 From: mustafab0 Date: Sat, 20 Jun 2026 22:39:03 +0000 Subject: [PATCH 2/4] deprecated robot catalogue directory --- dimos/control/blueprints/basic.py | 86 +++-- dimos/control/blueprints/dual.py | 66 ++-- dimos/control/blueprints/mobile.py | 17 +- dimos/control/blueprints/teleop.py | 237 ++++++++------ dimos/manipulation/blueprints.py | 218 +++++++++---- dimos/robot/catalog/a750.py | 101 ------ dimos/robot/catalog/openarm.py | 120 ------- dimos/robot/catalog/piper.py | 89 ------ dimos/robot/catalog/ufactory.py | 164 ---------- dimos/robot/config.py | 299 ------------------ dimos/robot/diy/alfred/config.py | 17 +- dimos/robot/manipulators/a750/blueprints.py | 91 ++++-- .../robot/manipulators/openarm/blueprints.py | 246 ++++++++++---- dimos/robot/manipulators/piper/blueprints.py | 71 ++++- dimos/robot/manipulators/xarm/blueprints.py | 97 ++++-- dimos/robot/unitree/g1/config.py | 17 +- docs/capabilities/manipulation/a750.md | 7 +- .../manipulation/openarm_integration.md | 7 +- 18 files changed, 827 insertions(+), 1123 deletions(-) delete mode 100644 dimos/robot/catalog/a750.py delete mode 100644 dimos/robot/catalog/openarm.py delete mode 100644 dimos/robot/catalog/piper.py delete mode 100644 dimos/robot/catalog/ufactory.py delete mode 100644 dimos/robot/config.py diff --git a/dimos/control/blueprints/basic.py b/dimos/control/blueprints/basic.py index c570dd6eef..165f707a85 100644 --- a/dimos/control/blueprints/basic.py +++ b/dimos/control/blueprints/basic.py @@ -28,16 +28,18 @@ from __future__ import annotations -from dimos.control.coordinator import ControlCoordinator -from dimos.core.coordination.blueprints import Blueprint, autoconnect -from dimos.core.global_config import global_config -from dimos.robot.catalog.piper import PIPER_SIM_PATH, piper as _catalog_piper -from dimos.robot.catalog.ufactory import ( +from dimos.control.blueprints._hardware import ( + PIPER_SIM_PATH, XARM6_SIM_PATH, XARM7_SIM_PATH, - xarm6 as _catalog_xarm6, - xarm7 as _catalog_xarm7, + mock_arm, + piper, + xarm6, + xarm7, ) +from dimos.control.coordinator import ControlCoordinator, TaskConfig +from dimos.core.coordination.blueprints import Blueprint, autoconnect +from dimos.core.global_config import global_config from dimos.simulation.engines.mujoco_sim_module import MujocoSimModule _is_sim = global_config.simulation @@ -57,56 +59,72 @@ def _mujoco_if_sim(sim_path: str, dof: int) -> tuple[Blueprint, ...]: ) # Mock 7-DOF arm (for testing) -_mock_cfg = _catalog_xarm7(name="arm") +_mock_hw = mock_arm("arm", 7) coordinator_mock = ControlCoordinator.blueprint( - hardware=[_mock_cfg.to_hardware_component()], - tasks=[_mock_cfg.to_task_config()], + hardware=[_mock_hw], + tasks=[ + TaskConfig( + name="traj_arm", + type="trajectory", + joint_names=_mock_hw.joints, + priority=10, + ) + ], ) # XArm7 (real, or MuJoCo with --simulation) -_xarm7_cfg = _catalog_xarm7( - name="arm", - adapter_type="sim_mujoco" if _is_sim else "xarm", - address=str(XARM7_SIM_PATH) if _is_sim else global_config.xarm7_ip, -) +_xarm7_hw = xarm7("arm") coordinator_xarm7 = autoconnect( ControlCoordinator.blueprint( - hardware=[_xarm7_cfg.to_hardware_component()], - tasks=[_xarm7_cfg.to_task_config()], + hardware=[_xarm7_hw], + tasks=[ + TaskConfig( + name="traj_arm", + type="trajectory", + joint_names=_xarm7_hw.joints, + priority=10, + ) + ], ), - *_mujoco_if_sim(str(XARM7_SIM_PATH), _xarm7_cfg.dof), + *_mujoco_if_sim(str(XARM7_SIM_PATH), len(_xarm7_hw.joints)), ) # XArm6 (real, or MuJoCo with --simulation) -_xarm6_cfg = _catalog_xarm6( - name="arm", - adapter_type="sim_mujoco" if _is_sim else "xarm", - address=str(XARM6_SIM_PATH) if _is_sim else global_config.xarm6_ip, -) +_xarm6_hw = xarm6("arm", gripper=True) coordinator_xarm6 = autoconnect( ControlCoordinator.blueprint( - hardware=[_xarm6_cfg.to_hardware_component()], - tasks=[_xarm6_cfg.to_task_config(task_name="traj_xarm")], + hardware=[_xarm6_hw], + tasks=[ + TaskConfig( + name="traj_xarm", + type="trajectory", + joint_names=_xarm6_hw.joints, + priority=10, + ) + ], ), - *_mujoco_if_sim(str(XARM6_SIM_PATH), _xarm6_cfg.dof), + *_mujoco_if_sim(str(XARM6_SIM_PATH), len(_xarm6_hw.joints)), ) # Piper 6-DOF (CAN bus, or MuJoCo with --simulation) -_piper_cfg = _catalog_piper( - name="arm", - adapter_type="sim_mujoco" if _is_sim else "piper", - address=str(PIPER_SIM_PATH) if _is_sim else (global_config.can_port or "can0"), -) +_piper_hw = piper("arm") coordinator_piper = autoconnect( ControlCoordinator.blueprint( - hardware=[_piper_cfg.to_hardware_component()], - tasks=[_piper_cfg.to_task_config(task_name="traj_piper")], + hardware=[_piper_hw], + tasks=[ + TaskConfig( + name="traj_piper", + type="trajectory", + joint_names=_piper_hw.joints, + priority=10, + ) + ], ), - *_mujoco_if_sim(str(PIPER_SIM_PATH), _piper_cfg.dof), + *_mujoco_if_sim(str(PIPER_SIM_PATH), len(_piper_hw.joints)), ) diff --git a/dimos/control/blueprints/dual.py b/dimos/control/blueprints/dual.py index d721f88877..5b6cabae26 100644 --- a/dimos/control/blueprints/dual.py +++ b/dimos/control/blueprints/dual.py @@ -22,50 +22,74 @@ from __future__ import annotations -from dimos.control.coordinator import ControlCoordinator +from dimos.control.blueprints._hardware import manipulator, mock_arm +from dimos.control.coordinator import ControlCoordinator, TaskConfig from dimos.core.global_config import global_config -from dimos.robot.catalog.piper import piper as _catalog_piper -from dimos.robot.catalog.ufactory import xarm6 as _catalog_xarm6, xarm7 as _catalog_xarm7 # Dual mock arms (7-DOF left, 6-DOF right) -_mock_left = _catalog_xarm7(name="left_arm") -_mock_right = _catalog_xarm6(name="right_arm", add_gripper=False) +_mock_left = mock_arm("left_arm", 7) +_mock_right = mock_arm("right_arm", 6) coordinator_dual_mock = ControlCoordinator.blueprint( - hardware=[_mock_left.to_hardware_component(), _mock_right.to_hardware_component()], + hardware=[_mock_left, _mock_right], tasks=[ - _mock_left.to_task_config(task_name="traj_left"), - _mock_right.to_task_config(task_name="traj_right"), + TaskConfig(name="traj_left", type="trajectory", joint_names=_mock_left.joints, priority=10), + TaskConfig( + name="traj_right", type="trajectory", joint_names=_mock_right.joints, priority=10 + ), ], ) # Dual XArm (XArm7 left, XArm6 right) -_xarm7_left = _catalog_xarm7(name="left_arm", adapter_type="xarm", address=global_config.xarm7_ip) -_xarm6_right = _catalog_xarm6( - name="right_arm", adapter_type="xarm", address=global_config.xarm6_ip, add_gripper=False +_xarm7_left = manipulator( + "left_arm", + 7, + adapter_type="xarm", + address=global_config.xarm7_ip, +) +_xarm6_right = manipulator( + "right_arm", + 6, + adapter_type="xarm", + address=global_config.xarm6_ip, ) coordinator_dual_xarm = ControlCoordinator.blueprint( - hardware=[_xarm7_left.to_hardware_component(), _xarm6_right.to_hardware_component()], + hardware=[_xarm7_left, _xarm6_right], tasks=[ - _xarm7_left.to_task_config(task_name="traj_left"), - _xarm6_right.to_task_config(task_name="traj_right"), + TaskConfig( + name="traj_left", type="trajectory", joint_names=_xarm7_left.joints, priority=10 + ), + TaskConfig( + name="traj_right", type="trajectory", joint_names=_xarm6_right.joints, priority=10 + ), ], ) # Dual arm (XArm6 + Piper) -_xarm6_dual = _catalog_xarm6( - name="xarm_arm", adapter_type="xarm", address=global_config.xarm6_ip, add_gripper=False +_xarm6_dual = manipulator( + "xarm_arm", + 6, + adapter_type="xarm", + address=global_config.xarm6_ip, ) -_piper_dual = _catalog_piper( - name="piper_arm", adapter_type="piper", address=global_config.can_port or "can0" +_piper_dual = manipulator( + "piper_arm", + 6, + adapter_type="piper", + address=global_config.can_port or "can0", + gripper=True, ) coordinator_piper_xarm = ControlCoordinator.blueprint( - hardware=[_xarm6_dual.to_hardware_component(), _piper_dual.to_hardware_component()], + hardware=[_xarm6_dual, _piper_dual], tasks=[ - _xarm6_dual.to_task_config(task_name="traj_xarm"), - _piper_dual.to_task_config(task_name="traj_piper"), + TaskConfig( + name="traj_xarm", type="trajectory", joint_names=_xarm6_dual.joints, priority=10 + ), + TaskConfig( + name="traj_piper", type="trajectory", joint_names=_piper_dual.joints, priority=10 + ), ], ) diff --git a/dimos/control/blueprints/mobile.py b/dimos/control/blueprints/mobile.py index 8bd63264e5..d6f1b5a514 100644 --- a/dimos/control/blueprints/mobile.py +++ b/dimos/control/blueprints/mobile.py @@ -39,7 +39,6 @@ from dimos.msgs.geometry_msgs.Vector3 import Vector3 from dimos.navigation.movement_manager.movement_manager import MovementManager from dimos.navigation.nav_stack.main import create_nav_stack, nav_stack_rerun_config -from dimos.robot.catalog.ufactory import xarm7 as _catalog_xarm7 from dimos.robot.unitree.g1.config import G1_LOCAL_PLANNER_PRECOMPUTED_PATHS from dimos.robot.unitree.keyboard_teleop import KeyboardTeleop from dimos.visualization.rerun.bridge import RerunBridgeModule @@ -195,12 +194,22 @@ def _flowbase_twist_base( # Mock arm (7-DOF) + mock holonomic base (3-DOF) -_mock_arm_cfg = _catalog_xarm7(name="arm") +_mock_arm_hw = HardwareComponent( + hardware_id="arm", + hardware_type=HardwareType.MANIPULATOR, + joints=[f"arm/joint{i}" for i in range(1, 8)], + adapter_type="mock", +) coordinator_mobile_manip_mock = ControlCoordinator.blueprint( - hardware=[_mock_arm_cfg.to_hardware_component(), _mock_twist_base()], + hardware=[_mock_arm_hw, _mock_twist_base()], tasks=[ - _mock_arm_cfg.to_task_config(task_name="traj_arm"), + TaskConfig( + name="traj_arm", + type="trajectory", + joint_names=_mock_arm_hw.joints, + priority=10, + ), TaskConfig( name="vel_base", type="velocity", diff --git a/dimos/control/blueprints/teleop.py b/dimos/control/blueprints/teleop.py index 9ad677f95d..83c5b77fcf 100644 --- a/dimos/control/blueprints/teleop.py +++ b/dimos/control/blueprints/teleop.py @@ -33,19 +33,23 @@ from __future__ import annotations -from dimos.control.components import make_gripper_joints -from dimos.control.coordinator import ControlCoordinator -from dimos.core.coordination.blueprints import Blueprint, autoconnect -from dimos.core.global_config import global_config -from dimos.robot.catalog.piper import PIPER_FK_MODEL, PIPER_SIM_PATH, piper as _catalog_piper -from dimos.robot.catalog.ufactory import ( +from dimos.control.blueprints._hardware import ( + PIPER_FK_MODEL, + PIPER_SIM_PATH, XARM6_FK_MODEL, XARM6_SIM_PATH, XARM7_FK_MODEL, XARM7_SIM_PATH, - xarm6 as _catalog_xarm6, - xarm7 as _catalog_xarm7, + manipulator, + mock_arm, + piper, + xarm6, + xarm7, ) +from dimos.control.components import make_gripper_joints +from dimos.control.coordinator import ControlCoordinator, TaskConfig +from dimos.core.coordination.blueprints import Blueprint, autoconnect +from dimos.core.global_config import global_config from dimos.simulation.engines.mujoco_sim_module import MujocoSimModule _is_sim = global_config.simulation @@ -57,77 +61,96 @@ def _mujoco_if_sim(sim_path: str, dof: int) -> tuple[Blueprint, ...]: return (MujocoSimModule.blueprint(address=sim_path, headless=False, dof=dof),) -_xarm6_cfg = _catalog_xarm6(name="arm", adapter_type="xarm", address=global_config.xarm6_ip) -_piper_cfg = _catalog_piper( - name="arm", adapter_type="piper", address=global_config.can_port or "can0" -) - -_xarm7_teleop_cfg = _catalog_xarm7( - name="arm", - adapter_type="sim_mujoco" if _is_sim else "xarm", - address=str(XARM7_SIM_PATH) if _is_sim else global_config.xarm7_ip, - add_gripper=True, -) -_xarm6_teleop_cfg = _catalog_xarm6( - name="arm", - adapter_type="sim_mujoco" if _is_sim else "xarm", - address=str(XARM6_SIM_PATH) if _is_sim else global_config.xarm6_ip, +_xarm6_hw = manipulator( + "arm", + 6, + adapter_type="xarm", + address=global_config.xarm6_ip, + gripper=True, ) -_piper_teleop_cfg = _catalog_piper( - name="arm", - adapter_type="sim_mujoco" if _is_sim else "piper", - address=str(PIPER_SIM_PATH) if _is_sim else (global_config.can_port or "can0"), +_piper_hw = manipulator( + "arm", + 6, + adapter_type="piper", + address=global_config.can_port or "can0", + gripper=True, ) +_xarm7_teleop_hw = xarm7("arm", gripper=True) +_xarm6_teleop_hw = xarm6("arm", gripper=True) +_piper_teleop_hw = piper("arm") + # XArm6 servo - streaming position control coordinator_servo_xarm6 = ControlCoordinator.blueprint( - hardware=[_xarm6_cfg.to_hardware_component()], + hardware=[_xarm6_hw], tasks=[ - _xarm6_cfg.to_task_config(task_type="servo", task_name="servo_arm"), + TaskConfig( + name="servo_arm", + type="servo", + joint_names=_xarm6_hw.joints, + priority=10, + ), ], ) # XArm6 velocity control - streaming velocity for joystick coordinator_velocity_xarm6 = ControlCoordinator.blueprint( - hardware=[_xarm6_cfg.to_hardware_component()], + hardware=[_xarm6_hw], tasks=[ - _xarm6_cfg.to_task_config(task_type="velocity", task_name="velocity_arm"), + TaskConfig( + name="velocity_arm", + type="velocity", + joint_names=_xarm6_hw.joints, + priority=10, + ), ], ) # XArm6 combined (servo + velocity tasks) coordinator_combined_xarm6 = ControlCoordinator.blueprint( - hardware=[_xarm6_cfg.to_hardware_component()], + hardware=[_xarm6_hw], tasks=[ - _xarm6_cfg.to_task_config(task_type="servo", task_name="servo_arm"), - _xarm6_cfg.to_task_config(task_type="velocity", task_name="velocity_arm"), + TaskConfig( + name="servo_arm", + type="servo", + joint_names=_xarm6_hw.joints, + priority=10, + ), + TaskConfig( + name="velocity_arm", + type="velocity", + joint_names=_xarm6_hw.joints, + priority=10, + ), ], ) # Mock 6-DOF arm with CartesianIK -_mock_6dof_cfg = _catalog_piper(name="arm") +_mock_6dof_hw = mock_arm("arm", 6) coordinator_cartesian_ik_mock = ControlCoordinator.blueprint( - hardware=[_mock_6dof_cfg.to_hardware_component()], + hardware=[_mock_6dof_hw], tasks=[ - _mock_6dof_cfg.to_task_config( - task_type="cartesian_ik", - task_name="cartesian_ik_arm", - model_path=PIPER_FK_MODEL, - ee_joint_id=_mock_6dof_cfg.dof, + TaskConfig( + name="cartesian_ik_arm", + type="cartesian_ik", + joint_names=_mock_6dof_hw.joints, + priority=10, + params={"model_path": PIPER_FK_MODEL, "ee_joint_id": 6}, ), ], ) # Piper arm with CartesianIK coordinator_cartesian_ik_piper = ControlCoordinator.blueprint( - hardware=[_piper_cfg.to_hardware_component()], + hardware=[_piper_hw], tasks=[ - _piper_cfg.to_task_config( - task_type="cartesian_ik", - task_name="cartesian_ik_arm", - model_path=PIPER_FK_MODEL, - ee_joint_id=_piper_cfg.dof, + TaskConfig( + name="cartesian_ik_arm", + type="cartesian_ik", + joint_names=_piper_hw.joints, + priority=10, + params={"model_path": PIPER_FK_MODEL, "ee_joint_id": 6}, ), ], ) @@ -135,87 +158,107 @@ def _mujoco_if_sim(sim_path: str, dof: int) -> tuple[Blueprint, ...]: # XArm7 with TeleopIK (real, or MuJoCo with --simulation) coordinator_teleop_xarm7 = autoconnect( ControlCoordinator.blueprint( - hardware=[_xarm7_teleop_cfg.to_hardware_component()], + hardware=[_xarm7_teleop_hw], tasks=[ - _xarm7_teleop_cfg.to_task_config( - task_type="teleop_ik", - task_name="teleop_xarm", - model_path=XARM7_FK_MODEL, - ee_joint_id=_xarm7_teleop_cfg.dof, - hand="right", - gripper_joint=make_gripper_joints("arm")[0], - gripper_open_pos=0.85, - gripper_closed_pos=0.0, + TaskConfig( + name="teleop_xarm", + type="teleop_ik", + joint_names=_xarm7_teleop_hw.joints, + priority=10, + params={ + "model_path": XARM7_FK_MODEL, + "ee_joint_id": 7, + "hand": "right", + "gripper_joint": make_gripper_joints("arm")[0], + "gripper_open_pos": 0.85, + "gripper_closed_pos": 0.0, + }, ), ], ), - *_mujoco_if_sim(str(XARM7_SIM_PATH), _xarm7_teleop_cfg.dof), + *_mujoco_if_sim(str(XARM7_SIM_PATH), len(_xarm7_teleop_hw.joints)), ) # XArm6 with TeleopIK (real, or MuJoCo with --simulation) coordinator_teleop_xarm6 = autoconnect( ControlCoordinator.blueprint( - hardware=[_xarm6_teleop_cfg.to_hardware_component()], + hardware=[_xarm6_teleop_hw], tasks=[ - _xarm6_teleop_cfg.to_task_config( - task_type="teleop_ik", - task_name="teleop_xarm", - model_path=XARM6_FK_MODEL, - ee_joint_id=_xarm6_teleop_cfg.dof, - hand="right", - gripper_joint=make_gripper_joints("arm")[0], - gripper_open_pos=0.85, - gripper_closed_pos=0.0, + TaskConfig( + name="teleop_xarm", + type="teleop_ik", + joint_names=_xarm6_teleop_hw.joints, + priority=10, + params={ + "model_path": XARM6_FK_MODEL, + "ee_joint_id": 6, + "hand": "right", + "gripper_joint": make_gripper_joints("arm")[0], + "gripper_open_pos": 0.85, + "gripper_closed_pos": 0.0, + }, ), ], ), - *_mujoco_if_sim(str(XARM6_SIM_PATH), _xarm6_teleop_cfg.dof), + *_mujoco_if_sim(str(XARM6_SIM_PATH), len(_xarm6_teleop_hw.joints)), ) # Piper with TeleopIK (real, or MuJoCo with --simulation) coordinator_teleop_piper = autoconnect( ControlCoordinator.blueprint( - hardware=[_piper_teleop_cfg.to_hardware_component()], + hardware=[_piper_teleop_hw], tasks=[ - _piper_teleop_cfg.to_task_config( - task_type="teleop_ik", - task_name="teleop_piper", - model_path=PIPER_FK_MODEL, - ee_joint_id=_piper_teleop_cfg.dof, - hand="left", - gripper_joint=make_gripper_joints("arm")[0], - gripper_open_pos=0.0, - gripper_closed_pos=0.035, + TaskConfig( + name="teleop_piper", + type="teleop_ik", + joint_names=_piper_teleop_hw.joints, + priority=10, + params={ + "model_path": PIPER_FK_MODEL, + "ee_joint_id": 6, + "hand": "left", + "gripper_joint": make_gripper_joints("arm")[0], + "gripper_open_pos": 0.0, + "gripper_closed_pos": 0.035, + }, ), ], ), - *_mujoco_if_sim(str(PIPER_SIM_PATH), _piper_teleop_cfg.dof), + *_mujoco_if_sim(str(PIPER_SIM_PATH), len(_piper_teleop_hw.joints)), ) # Dual arm teleop: XArm6 + Piper with TeleopIK (real-only) -_xarm6_dual_cfg = _catalog_xarm6( - name="xarm_arm", adapter_type="xarm", address=global_config.xarm6_ip +_xarm6_dual_hw = manipulator( + "xarm_arm", + 6, + adapter_type="xarm", + address=global_config.xarm6_ip, + gripper=True, ) -_piper_dual_cfg = _catalog_piper( - name="piper_arm", adapter_type="piper", address=global_config.can_port +_piper_dual_hw = manipulator( + "piper_arm", + 6, + adapter_type="piper", + address=global_config.can_port, + gripper=True, ) coordinator_teleop_dual = ControlCoordinator.blueprint( - hardware=[_xarm6_dual_cfg.to_hardware_component(), _piper_dual_cfg.to_hardware_component()], + hardware=[_xarm6_dual_hw, _piper_dual_hw], tasks=[ - _xarm6_dual_cfg.to_task_config( - task_type="teleop_ik", - task_name="teleop_xarm", - model_path=XARM6_FK_MODEL, - ee_joint_id=_xarm6_dual_cfg.dof, - hand="left", + TaskConfig( + name="teleop_xarm", + type="teleop_ik", + joint_names=_xarm6_dual_hw.joints, + priority=10, + params={"model_path": XARM6_FK_MODEL, "ee_joint_id": 6, "hand": "left"}, ), - _piper_dual_cfg.to_task_config( - task_type="teleop_ik", - task_name="teleop_piper", - model_path=PIPER_FK_MODEL, - ee_joint_id=_piper_dual_cfg.dof, - hand="right", + TaskConfig( + name="teleop_piper", + type="teleop_ik", + joint_names=_piper_dual_hw.joints, + priority=10, + params={"model_path": PIPER_FK_MODEL, "ee_joint_id": 6, "hand": "right"}, ), ], ) diff --git a/dimos/manipulation/blueprints.py b/dimos/manipulation/blueprints.py index 3653ecc9c9..61df8004c6 100644 --- a/dimos/manipulation/blueprints.py +++ b/dimos/manipulation/blueprints.py @@ -28,54 +28,142 @@ """ import math +from typing import Any from dimos.agents.mcp.mcp_client import McpClient from dimos.agents.mcp.mcp_server import McpServer -from dimos.control.coordinator import ControlCoordinator +from dimos.control.blueprints._hardware import XARM7_SIM_PATH, manipulator +from dimos.control.coordinator import ControlCoordinator, TaskConfig from dimos.core.coordination.blueprints import autoconnect from dimos.core.global_config import global_config from dimos.hardware.sensors.camera.realsense.camera import RealSenseCamera from dimos.manipulation.manipulation_module import ManipulationModule from dimos.manipulation.pick_and_place_module import PickAndPlaceModule +from dimos.manipulation.planning.spec.config import RobotModelConfig +from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped from dimos.msgs.geometry_msgs.Quaternion import Quaternion from dimos.msgs.geometry_msgs.Transform import Transform from dimos.msgs.geometry_msgs.Vector3 import Vector3 from dimos.perception.object_scene_registration import ObjectSceneRegistrationModule -from dimos.robot.catalog.ufactory import xarm6 as _catalog_xarm6, xarm7 as _catalog_xarm7 +from dimos.simulation.engines.mujoco_sim_module import MujocoSimModule +from dimos.utils.data import LfsPath +from dimos.visualization.rerun.bridge import RerunBridgeModule + +XARM_GRIPPER_COLLISION_EXCLUSIONS: list[tuple[str, str]] = [ + ("right_inner_knuckle", "right_outer_knuckle"), + ("left_inner_knuckle", "left_outer_knuckle"), + ("right_inner_knuckle", "right_finger"), + ("left_inner_knuckle", "left_finger"), + ("left_finger", "right_finger"), + ("left_outer_knuckle", "right_outer_knuckle"), + ("left_inner_knuckle", "right_inner_knuckle"), + ("left_outer_knuckle", "right_finger"), + ("right_outer_knuckle", "left_finger"), + ("xarm_gripper_base_link", "left_inner_knuckle"), + ("xarm_gripper_base_link", "right_inner_knuckle"), + ("xarm_gripper_base_link", "left_finger"), + ("xarm_gripper_base_link", "right_finger"), + ("link6", "xarm_gripper_base_link"), + ("link6", "left_outer_knuckle"), + ("link6", "right_outer_knuckle"), +] + +_XARM_MODEL_PATH = LfsPath("xarm_description") / "urdf/xarm_device.urdf.xacro" +_XARM_PACKAGE_PATHS = {"xarm_description": LfsPath("xarm_description")} + + +def _base_pose(x: float = 0.0, y: float = 0.0, z: float = 0.0) -> PoseStamped: + return PoseStamped( + position=Vector3(x=x, y=y, z=z), + orientation=Quaternion(0.0, 0.0, 0.0, 1.0), + ) + + +def _coordinator_joint_mapping( + name: str, + dof: int, + *, + joint_prefix: str | None = None, +) -> dict[str, str]: + prefix = f"{name}/" if joint_prefix is None else joint_prefix + if not prefix: + return {} + return {f"{prefix}joint{i}": f"joint{i}" for i in range(1, dof + 1)} + + +def _make_xarm_model_config( + name: str, + dof: int, + *, + add_gripper: bool = True, + x_offset: float = 0.0, + y_offset: float = 0.0, + z_offset: float = 0.0, + pitch: float = 0.0, + joint_prefix: str | None = None, + coordinator_task_name: str | None = None, + tf_extra_links: list[str] | None = None, + home_joints: list[float] | None = None, + pre_grasp_offset: float = 0.10, +) -> RobotModelConfig: + xacro_args = { + "dof": str(dof), + "limited": "true", + "attach_xyz": f"{x_offset} {y_offset} {z_offset}", + "attach_rpy": f"0 {pitch} 0", + } + if add_gripper: + xacro_args["add_gripper"] = "true" + + return RobotModelConfig( + name=name, + model_path=_XARM_MODEL_PATH, + base_pose=_base_pose(x_offset, y_offset, z_offset), + joint_names=[f"joint{i}" for i in range(1, dof + 1)], + end_effector_link="link_tcp" if add_gripper else f"link{dof}", + base_link="link_base", + package_paths=_XARM_PACKAGE_PATHS, + xacro_args=xacro_args, + auto_convert_meshes=True, + collision_exclusion_pairs=(XARM_GRIPPER_COLLISION_EXCLUSIONS if add_gripper else []), + joint_name_mapping=_coordinator_joint_mapping( + name, + dof, + joint_prefix=joint_prefix, + ), + coordinator_task_name=coordinator_task_name or f"traj_{name}", + gripper_hardware_id=name if add_gripper else None, + tf_extra_links=tf_extra_links or [], + home_joints=home_joints or [0.0] * dof, + pre_grasp_offset=pre_grasp_offset, + ) + + +def _make_xarm6_model_config( + name: str = "arm", + **kwargs: Any, +) -> RobotModelConfig: + return _make_xarm_model_config(name, 6, **kwargs) + + +def _make_xarm7_model_config( + name: str = "arm", + **kwargs: Any, +) -> RobotModelConfig: + return _make_xarm_model_config(name, 7, **kwargs) -# Single XArm6 planner (standalone, no coordinator) -_xarm6_planner_cfg = _catalog_xarm6( - name="arm", - adapter_type="xarm" if global_config.xarm6_ip else "mock", - address=global_config.xarm6_ip, -) xarm6_planner_only = ManipulationModule.blueprint( - robots=[_xarm6_planner_cfg.to_robot_model_config()], + robots=[_make_xarm6_model_config(name="arm")], planning_timeout=10.0, visualization={"backend": "meshcat"}, ) -# Dual XArm6 planner with coordinator integration -# Usage: Start with coordinator_dual_mock, then plan/execute via RPC -_left_arm_cfg = _catalog_xarm6( - name="left_arm", - adapter_type="xarm" if global_config.xarm6_ip else "mock", - address=global_config.xarm6_ip, - y_offset=0.5, -) -_right_arm_cfg = _catalog_xarm6( - name="right_arm", - adapter_type="xarm" if global_config.xarm6_ip else "mock", - address=global_config.xarm6_ip, - y_offset=-0.5, -) - dual_xarm6_planner = ManipulationModule.blueprint( robots=[ - _left_arm_cfg.to_robot_model_config(), - _right_arm_cfg.to_robot_model_config(), + _make_xarm6_model_config(name="left_arm", y_offset=0.5), + _make_xarm6_model_config(name="right_arm", y_offset=-0.5), ], planning_timeout=10.0, visualization={"backend": "meshcat"}, @@ -84,16 +172,17 @@ # Single XArm7 planner + coordinator (uses real hardware when XARM7_IP is set) # Usage: XARM7_IP= dimos run xarm7-planner-coordinator -_xarm7_cfg = _catalog_xarm7( - name="arm", +_xarm7_hw = manipulator( + "arm", + 7, adapter_type="xarm" if global_config.xarm7_ip else "mock", address=global_config.xarm7_ip, - add_gripper=True, + gripper=True, ) xarm7_planner_coordinator = autoconnect( ManipulationModule.blueprint( - robots=[_xarm7_cfg.to_robot_model_config()], + robots=[_make_xarm7_model_config(name="arm", add_gripper=True)], planning_timeout=10.0, visualization={"backend": "meshcat"}, ), @@ -101,8 +190,15 @@ tick_rate=100.0, publish_joint_state=True, joint_state_frame_id="coordinator", - hardware=[_xarm7_cfg.to_hardware_component()], - tasks=[_xarm7_cfg.to_task_config()], + hardware=[_xarm7_hw], + tasks=[ + TaskConfig( + name="traj_arm", + type="trajectory", + joint_names=_xarm7_hw.joints, + priority=10, + ), + ], ), ) @@ -153,18 +249,16 @@ rotation=Quaternion(0.70513398, 0.00535696, 0.70897578, -0.01052180), # xyzw ) -_xarm7_perception_cfg = _catalog_xarm7( - name="arm", - adapter_type="xarm" if global_config.xarm7_ip else "mock", - address=global_config.xarm7_ip, - pitch=math.radians(45), - add_gripper=True, - tf_extra_links=["link7"], -) - xarm_perception = autoconnect( PickAndPlaceModule.blueprint( - robots=[_xarm7_perception_cfg.to_robot_model_config()], + robots=[ + _make_xarm7_model_config( + name="arm", + add_gripper=True, + pitch=math.radians(45), + tf_extra_links=["link7"], + ) + ], planning_timeout=10.0, visualization={"backend": "meshcat"}, floor_z=-0.02, @@ -263,25 +357,28 @@ # Sim perception: MujocoSimModule owns the MujocoEngine and publishes both # camera streams and joint state via shared memory. # ShmMujocoAdapter attaches to the same SHM buffers by MJCF path. - -from dimos.robot.catalog.ufactory import XARM7_SIM_PATH -from dimos.simulation.engines.mujoco_sim_module import MujocoSimModule -from dimos.visualization.rerun.bridge import RerunBridgeModule - -_xarm7_sim_cfg = _catalog_xarm7( - name="arm", +_xarm7_sim_home = [0.0, 0.0, 0.0, 0.0, 0.0, -0.7, 0.0] +_xarm7_sim_hw = manipulator( + "arm", + 7, adapter_type="sim_mujoco", address=str(XARM7_SIM_PATH), - add_gripper=True, - pitch=math.radians(45), - tf_extra_links=["link7"], - home_joints=[0.0, 0.0, 0.0, 0.0, 0.0, -0.7, 0.0], - pre_grasp_offset=0.05, + gripper=True, + home_joints=_xarm7_sim_home, ) xarm_perception_sim = autoconnect( PickAndPlaceModule.blueprint( - robots=[_xarm7_sim_cfg.to_robot_model_config()], + robots=[ + _make_xarm7_model_config( + name="arm", + add_gripper=True, + pitch=math.radians(45), + tf_extra_links=["link7"], + home_joints=_xarm7_sim_home, + pre_grasp_offset=0.05, + ) + ], planning_timeout=10.0, visualization={"backend": "meshcat"}, ), @@ -297,8 +394,15 @@ tick_rate=100.0, publish_joint_state=True, joint_state_frame_id="coordinator", - hardware=[_xarm7_sim_cfg.to_hardware_component()], - tasks=[_xarm7_sim_cfg.to_task_config()], + hardware=[_xarm7_sim_hw], + tasks=[ + TaskConfig( + name="traj_arm", + type="trajectory", + joint_names=_xarm7_sim_hw.joints, + priority=10, + ), + ], ), RerunBridgeModule.blueprint(), ) diff --git a/dimos/robot/catalog/a750.py b/dimos/robot/catalog/a750.py deleted file mode 100644 index 94288d128f..0000000000 --- a/dimos/robot/catalog/a750.py +++ /dev/null @@ -1,101 +0,0 @@ -# Copyright 2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Agilex A-750 robot configuration.""" - -from __future__ import annotations - -import math -from typing import Any - -from dimos.robot.config import GripperConfig, RobotConfig -from dimos.utils.data import LfsPath - -# Pre-built MJCF for Pinocchio FK (xacro not supported by Pinocchio) -A750_FK_MODEL = LfsPath("a750_description/urdf/a750_rev1_no_gripper.urdf") - -# A-750 gripper collision exclusions (parallel jaw gripper) -# The gripper fingers (link7, link8) can touch each other and gripper_base -# from a750_moveit_config/config/a750-rev1.srdf -A750_GRIPPER_COLLISION_EXCLUSIONS: list[tuple[str, str]] = [ - ("base_link", "link1"), - ("base_link", "link2"), - ("left_finger_link", "link3"), - ("left_finger_link", "link4"), - ("left_finger_link", "link5"), - ("left_finger_link", "link6"), - ("left_finger_link", "right_finger_link"), - ("link1", "link2"), - ("link2", "link3"), - ("link2", "link4"), - ("link3", "link4"), - ("link3", "link5"), - ("link3", "right_finger_link"), - ("link4", "link5"), - ("link4", "link6"), - ("link4", "right_finger_link"), - ("link5", "link6"), - ("link5", "right_finger_link"), - ("link6", "right_finger_link"), -] - - -def a750( - name: str = "a750", - *, - adapter_type: str = "mock", - device_path: str | None = None, - **overrides: Any, -) -> RobotConfig: - """Create an A-750 robot configuration. - - A-750 has 6 revolute joints (joint1-joint6) for the arm and 2 prismatic - joints (joint7, joint8) for the parallel jaw gripper. - - Args: - name: Robot identifier. - adapter_type: Hardware adapter ("mock", "a750"). - device_path: Device path (e.g., "/dev/ttyACM0"). - **overrides: Override any RobotConfig field. - """ - defaults: dict[str, Any] = { - "name": name, - "model_path": LfsPath("a750_description") / "urdf/a750_rev1.urdf", - "end_effector_link": "gripper_base", - "adapter_type": adapter_type, - "address": device_path, - "joint_names": [f"joint{i}" for i in range(1, 7)], - "base_link": "base_link", - "home_joints": [0.0, 0.0, -math.radians(90), 0.0, 0.0, 0.0], - "base_pose": [0, 0, 0, 0, 0, 0, 1], # base_pose is where the robot sits in the world - "package_paths": { - "a750_description": LfsPath("a750_description"), - "a750_gazebo": LfsPath("a750_description"), - }, - "xacro_args": {}, - "auto_convert_meshes": True, - "collision_exclusion_pairs": A750_GRIPPER_COLLISION_EXCLUSIONS, - "gripper": GripperConfig( - type="a750", - joints=["finger"], - collision_exclusions=A750_GRIPPER_COLLISION_EXCLUSIONS, - open_position=0.06, - close_position=0.02, - ), - } - defaults.update(overrides) - return RobotConfig(**defaults) - - -__all__ = ["A750_FK_MODEL", "A750_GRIPPER_COLLISION_EXCLUSIONS", "a750"] diff --git a/dimos/robot/catalog/openarm.py b/dimos/robot/catalog/openarm.py deleted file mode 100644 index b6e1238cf2..0000000000 --- a/dimos/robot/catalog/openarm.py +++ /dev/null @@ -1,120 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""OpenArm v10 robot configurations.""" - -from __future__ import annotations - -from typing import Any - -from dimos.robot.config import RobotConfig -from dimos.utils.data import LfsPath - -# Collision exclusion pairs — structural mesh overlaps in the OpenArm URDF. -# link5 and link7 collision meshes overlap by ~3mm at zero pose (and every -# other pose) — same pattern as R1 Pro's non-adjacent link overlap. -OPENARM_COLLISION_EXCLUSIONS: list[tuple[str, str]] = [ - ("openarm_left_link5", "openarm_left_link7"), - ("openarm_right_link5", "openarm_right_link7"), -] - -# LFS-backed: data/.lfs/openarm_description.tar.gz extracts to data/openarm_description/ -_OPENARM_PKG = LfsPath("openarm_description") -_OPENARM_MODEL_PATH = _OPENARM_PKG / "urdf/robot/openarm_v10_bimanual.urdf" -# Per-side URDFs: extracted from bimanual expansion, only one arm + torso each. -# Avoids phantom-arm collisions when Drake loads both sides into one world. -_OPENARM_LEFT_MODEL = _OPENARM_PKG / "urdf/robot/openarm_v10_left.urdf" -_OPENARM_RIGHT_MODEL = _OPENARM_PKG / "urdf/robot/openarm_v10_right.urdf" - -# Pre-expanded single-arm URDF for Pinocchio FK (keyboard teleop, IK, etc.) -OPENARM_V10_FK_MODEL = _OPENARM_PKG / "urdf/robot/openarm_v10_single.urdf" - - -def openarm_arm( - side: str = "left", - name: str | None = None, - *, - adapter_type: str = "mock", - address: str | None = None, - **overrides: Any, -) -> RobotConfig: - """OpenArm v10 config for one side. Uses per-side URDF (arm + torso only).""" - if side not in ("left", "right"): - raise ValueError(f"side must be 'left' or 'right', got {side!r}") - - resolved_name = name or f"{side}_arm" - # Pre-expanded bimanual URDF uses openarm_{side}_* naming. - joint_names = [f"openarm_{side}_joint{i}" for i in range(1, 8)] - ee_link = f"openarm_{side}_link7" - - defaults: dict[str, Any] = { - "name": resolved_name, - "model_path": _OPENARM_LEFT_MODEL if side == "left" else _OPENARM_RIGHT_MODEL, - "end_effector_link": ee_link, - "adapter_type": adapter_type, - "address": address, - "joint_names": joint_names, - # URDF already prefixes joints with "left_"/"right_" in bimanual mode, - # so suppress RobotConfig's automatic "{name}_" prefix. - "joint_prefix": "", - "base_link": "openarm_body_link0", - "home_joints": [0.0] * 7, - "base_pose": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0], - "package_paths": {"openarm_description": _OPENARM_PKG}, - "collision_exclusion_pairs": OPENARM_COLLISION_EXCLUSIONS, - "auto_convert_meshes": True, - "max_velocity": 0.5, - "max_acceleration": 1.0, - "adapter_kwargs": {"side": side}, - } - # Merge adapter_kwargs rather than replace, so callers can add keys - # (e.g. auto_set_mit_mode) without clobbering the catalog's "side". - if "adapter_kwargs" in overrides: - defaults["adapter_kwargs"] = { - **defaults["adapter_kwargs"], - **overrides.pop("adapter_kwargs"), - } - defaults.update(overrides) - return RobotConfig(**defaults) - - -def openarm_single( - name: str = "arm", - *, - adapter_type: str = "mock", - address: str | None = None, - **overrides: Any, -) -> RobotConfig: - """Single-arm config (keyboard teleop, cartesian IK). Use openarm_arm() for bimanual.""" - defaults: dict[str, Any] = { - "name": name, - "model_path": OPENARM_V10_FK_MODEL, - "end_effector_link": "openarm_left_link7", - "adapter_type": adapter_type, - "address": address, - "joint_names": [f"openarm_left_joint{i}" for i in range(1, 8)], - "joint_prefix": "", - "base_link": "openarm_body_link0", - "home_joints": [0.0] * 7, - "package_paths": {"openarm_description": _OPENARM_PKG}, - "auto_convert_meshes": True, - "max_velocity": 0.5, - "max_acceleration": 1.0, - "adapter_kwargs": {"side": "left"}, - } - defaults.update(overrides) - return RobotConfig(**defaults) - - -__all__ = ["OPENARM_V10_FK_MODEL", "openarm_arm", "openarm_single"] diff --git a/dimos/robot/catalog/piper.py b/dimos/robot/catalog/piper.py deleted file mode 100644 index 8c9191c68e..0000000000 --- a/dimos/robot/catalog/piper.py +++ /dev/null @@ -1,89 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Agilex Piper robot configuration.""" - -from __future__ import annotations - -from typing import Any - -from dimos.robot.config import GripperConfig, RobotConfig -from dimos.utils.data import LfsPath - -# Pre-built MJCF for Pinocchio FK (xacro not supported by Pinocchio) -PIPER_FK_MODEL = LfsPath("piper_description/mujoco_model/piper_no_gripper_description.xml") - -# Simulation model path (MJCF) -PIPER_SIM_PATH = LfsPath("piper/scene.xml") - -# Piper gripper collision exclusions (parallel jaw gripper) -# The gripper fingers (link7, link8) can touch each other and gripper_base -PIPER_GRIPPER_COLLISION_EXCLUSIONS: list[tuple[str, str]] = [ - ("gripper_base", "link7"), - ("gripper_base", "link8"), - ("link7", "link8"), - ("link6", "gripper_base"), -] - - -def piper( - name: str = "piper", - *, - adapter_type: str = "mock", - address: str | None = None, - y_offset: float = 0.0, - **overrides: Any, -) -> RobotConfig: - """Create a Piper robot configuration. - - Piper has 6 revolute joints (joint1-joint6) for the arm and 2 prismatic - joints (joint7, joint8) for the parallel jaw gripper. - - Args: - name: Robot identifier. - adapter_type: Hardware adapter ("mock", "piper"). - address: CAN port (e.g., "can0"). - y_offset: Y-axis offset for base pose (multi-arm setups). - **overrides: Override any RobotConfig field. - """ - defaults: dict[str, Any] = { - "name": name, - "model_path": LfsPath("piper_description") / "urdf/piper_description.xacro", - "end_effector_link": "gripper_base", - "adapter_type": adapter_type, - "address": address, - "joint_names": [f"joint{i}" for i in range(1, 7)], - "base_link": "base_link", - "home_joints": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0], - "base_pose": [0, y_offset, 0, 0, 0, 0, 1], - "package_paths": { - "piper_description": LfsPath("piper_description"), - "piper_gazebo": LfsPath("piper_description"), - }, - "xacro_args": {}, - "auto_convert_meshes": True, - "collision_exclusion_pairs": PIPER_GRIPPER_COLLISION_EXCLUSIONS, - "gripper": GripperConfig( - type="piper", - joints=["gripper"], - collision_exclusions=PIPER_GRIPPER_COLLISION_EXCLUSIONS, - open_position=0.08, - close_position=0.0, - ), - } - defaults.update(overrides) - return RobotConfig(**defaults) - - -__all__ = ["PIPER_FK_MODEL", "PIPER_GRIPPER_COLLISION_EXCLUSIONS", "piper"] diff --git a/dimos/robot/catalog/ufactory.py b/dimos/robot/catalog/ufactory.py deleted file mode 100644 index ddf93185dc..0000000000 --- a/dimos/robot/catalog/ufactory.py +++ /dev/null @@ -1,164 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""UFactory xArm robot configurations.""" - -from __future__ import annotations - -from typing import Any - -from dimos.robot.config import GripperConfig, RobotConfig -from dimos.utils.data import LfsPath - -# Pre-built URDFs for Pinocchio FK (xacro not supported by Pinocchio) -XARM6_FK_MODEL = LfsPath("xarm_description/urdf/xarm6/xarm6.urdf") -XARM7_FK_MODEL = LfsPath("xarm_description/urdf/xarm7/xarm7.urdf") - -# Simulation model paths (MJCF) -XARM7_SIM_PATH = LfsPath("xarm7/scene.xml") -XARM6_SIM_PATH = LfsPath("xarm6/scene.xml") - -# XArm gripper collision exclusions (parallel linkage mechanism) -# The gripper uses mimic joints where non-adjacent links can overlap legitimately -XARM_GRIPPER_COLLISION_EXCLUSIONS: list[tuple[str, str]] = [ - ("right_inner_knuckle", "right_outer_knuckle"), - ("left_inner_knuckle", "left_outer_knuckle"), - ("right_inner_knuckle", "right_finger"), - ("left_inner_knuckle", "left_finger"), - ("left_finger", "right_finger"), - ("left_outer_knuckle", "right_outer_knuckle"), - ("left_inner_knuckle", "right_inner_knuckle"), - ("left_outer_knuckle", "right_finger"), - ("right_outer_knuckle", "left_finger"), - ("xarm_gripper_base_link", "left_inner_knuckle"), - ("xarm_gripper_base_link", "right_inner_knuckle"), - ("xarm_gripper_base_link", "left_finger"), - ("xarm_gripper_base_link", "right_finger"), - ("link6", "xarm_gripper_base_link"), - ("link6", "left_outer_knuckle"), - ("link6", "right_outer_knuckle"), -] - - -def xarm7( - name: str = "arm", - *, - adapter_type: str = "mock", - address: str | None = None, - add_gripper: bool = False, - x_offset: float = 0.0, - y_offset: float = 0.0, - z_offset: float = 0.0, - pitch: float = 0.0, - tf_extra_links: list[str] | None = None, - **overrides: Any, -) -> RobotConfig: - """Create an xArm7 robot configuration.""" - xacro_args: dict[str, str] = { - "dof": "7", - "limited": "true", - "attach_xyz": f"{x_offset} {y_offset} {z_offset}", - "attach_rpy": f"0 {pitch} 0", - } - if add_gripper: - xacro_args["add_gripper"] = "true" - - defaults: dict[str, Any] = { - "name": name, - "model_path": LfsPath("xarm_description") / "urdf/xarm_device.urdf.xacro", - "end_effector_link": "link_tcp" if add_gripper else "link7", - "adapter_type": adapter_type, - "address": address, - "joint_names": [f"joint{i}" for i in range(1, 8)], - "base_link": "link_base", - "home_joints": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], - "base_pose": [x_offset, y_offset, z_offset, 0, 0, 0, 1], - "package_paths": {"xarm_description": LfsPath("xarm_description")}, - "xacro_args": xacro_args, - "auto_convert_meshes": True, - "collision_exclusion_pairs": XARM_GRIPPER_COLLISION_EXCLUSIONS if add_gripper else [], - "tf_extra_links": tf_extra_links or [], - "gripper": GripperConfig( - type="xarm", - joints=["gripper"], - collision_exclusions=XARM_GRIPPER_COLLISION_EXCLUSIONS, - open_position=0.85, - close_position=0.0, - ) - if add_gripper - else None, - } - defaults.update(overrides) - return RobotConfig(**defaults) - - -def xarm6( - name: str = "arm", - *, - adapter_type: str = "mock", - address: str | None = None, - add_gripper: bool = True, - x_offset: float = 0.0, - y_offset: float = 0.0, - z_offset: float = 0.0, - pitch: float = 0.0, - tf_extra_links: list[str] | None = None, - **overrides: Any, -) -> RobotConfig: - """Create an xArm6 robot configuration.""" - xacro_args: dict[str, str] = { - "dof": "6", - "limited": "true", - "attach_xyz": f"{x_offset} {y_offset} {z_offset}", - "attach_rpy": f"0 {pitch} 0", - } - if add_gripper: - xacro_args["add_gripper"] = "true" - - defaults: dict[str, Any] = { - "name": name, - "model_path": LfsPath("xarm_description") / "urdf/xarm_device.urdf.xacro", - "end_effector_link": "link_tcp" if add_gripper else "link6", - "adapter_type": adapter_type, - "address": address, - "joint_names": [f"joint{i}" for i in range(1, 7)], - "base_link": "link_base", - "home_joints": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0], - "base_pose": [x_offset, y_offset, z_offset, 0, 0, 0, 1], - "package_paths": {"xarm_description": LfsPath("xarm_description")}, - "xacro_args": xacro_args, - "auto_convert_meshes": True, - "collision_exclusion_pairs": XARM_GRIPPER_COLLISION_EXCLUSIONS if add_gripper else [], - "tf_extra_links": tf_extra_links or [], - "gripper": GripperConfig( - type="xarm", - joints=["gripper"], - collision_exclusions=XARM_GRIPPER_COLLISION_EXCLUSIONS, - open_position=0.85, - close_position=0.0, - ) - if add_gripper - else None, - } - defaults.update(overrides) - return RobotConfig(**defaults) - - -__all__ = [ - "XARM6_FK_MODEL", - "XARM7_FK_MODEL", - "XARM_GRIPPER_COLLISION_EXCLUSIONS", - "xarm6", - "xarm7", -] diff --git a/dimos/robot/config.py b/dimos/robot/config.py deleted file mode 100644 index d0922cb842..0000000000 --- a/dimos/robot/config.py +++ /dev/null @@ -1,299 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Unified robot configuration. - -Single source of truth for a robot. The URDF/MJCF model file is the -ground truth — joint names, DOF, limits, and link hierarchy are parsed -automatically. Generates RobotModelConfig, HardwareComponent, and TaskConfig. -""" - -from __future__ import annotations - -from pathlib import Path -from typing import Any - -from pydantic import BaseModel, Field, PrivateAttr - -from dimos.control.components import HardwareComponent, HardwareType -from dimos.control.coordinator import TaskConfig -from dimos.manipulation.planning.spec.config import RobotModelConfig -from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped -from dimos.msgs.geometry_msgs.Quaternion import Quaternion -from dimos.msgs.geometry_msgs.Vector3 import Vector3 -from dimos.robot.model_parser import ModelDescription, parse_model - - -class GripperConfig(BaseModel): - """Gripper configuration.""" - - type: str - joints: list[str] = Field(default_factory=list) - collision_exclusions: list[tuple[str, str]] = Field(default_factory=list) - open_position: float = 1.0 - close_position: float = 0.0 - - -class RobotConfig(BaseModel): - """Unified robot configuration — URDF/MJCF is the ground truth. - - Model parsing is lazy to avoid LFS downloads at import time. - """ - - # Required fields - name: str - model_path: Path | None = None - end_effector_link: str | None = None - - # Physical dimensions (meters) - height_clearance: float | None = None # max height - width_clearance: float | None = None # max width - - # These offsets are applied so that odometry at 0,0,0 corresponds roughly with the floor - # Note: these cannot (easily) be calculated from the URDF because - # the URDF doesn't always have an initial robot pose/stance - # This is a quality of life offset, not exact - # The key names should match keys in the urdf - internal_odom_offsets: dict[str, Any] = Field(default_factory=dict) - - # Hardware connection - adapter_type: str = "mock" - address: str | None = None - adapter_kwargs: dict[str, Any] = Field(default_factory=dict) - auto_enable: bool = True - - # Optional overrides (derived from model if not set) - joint_names: list[str] | None = None - base_link: str | None = None - home_joints: list[float] | None = None - - # Multi-robot / coordinator - joint_prefix: str | None = None # defaults to "{name}_" - base_pose: list[float] = Field(default_factory=lambda: [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0]) - - # Planning - max_velocity: float = 1.0 - max_acceleration: float = 2.0 - pre_grasp_offset: float = 0.10 - - # Gripper - gripper: GripperConfig | None = None - - # Model loading - package_paths: dict[str, Path] = Field(default_factory=dict) - xacro_args: dict[str, str] = Field(default_factory=dict) - auto_convert_meshes: bool = True - - # TF publishing - tf_extra_links: list[str] = Field(default_factory=list) - - # Task defaults - task_type: str = "trajectory" - task_priority: int = 10 - - # Collision exclusion pairs (gripper-specific, cannot be parsed from model) - collision_exclusion_pairs: list[tuple[str, str]] = Field(default_factory=list) - - _parsed: ModelDescription | None = PrivateAttr(default=None) - - def _ensure_prefix(self) -> None: - """Ensure joint_prefix is set (no model parsing needed).""" - if self.joint_prefix is None: - self.joint_prefix = f"{self.name}/" - - def _ensure_parsed(self) -> ModelDescription: - """Parse model lazily on first access.""" - if self._parsed is None: - if self.model_path is None: - raise ValueError( - f"RobotConfig '{self.name}' has no model_path — " - "joint/link info is unavailable. Set model_path to a URDF/MJCF." - ) - self._parsed = parse_model(self.model_path, self.package_paths, self.xacro_args) - self._ensure_prefix() - if self.joint_names is None: - self.joint_names = self._parsed.actuated_joint_names - if self.base_link is None: - self.base_link = self._parsed.root_link - if self.home_joints is None: - self.home_joints = self._compute_default_home() - return self._parsed - - def _compute_default_home(self) -> list[float]: - assert self._parsed is not None - home = [] - for joint_name in self.resolved_joint_names: - joint = self._parsed.get_joint(joint_name) - if ( - joint is not None - and joint.lower_limit is not None - and joint.upper_limit is not None - ): - home.append((joint.lower_limit + joint.upper_limit) / 2.0) - else: - home.append(0.0) - return home - - # -- Derived properties --------------------------------------------------- - - @property - def model_description(self) -> ModelDescription: - return self._ensure_parsed() - - @property - def resolved_joint_names(self) -> list[str]: - self._ensure_parsed() - assert self.joint_names is not None - return self.joint_names - - @property - def resolved_base_link(self) -> str: - self._ensure_parsed() - assert self.base_link is not None - return self.base_link - - @property - def dof(self) -> int: - if self.joint_names is not None: - return len(self.joint_names) - return len(self.resolved_joint_names) - - @property - def coordinator_joint_names(self) -> list[str]: - self._ensure_prefix() - names = self.joint_names if self.joint_names is not None else self.resolved_joint_names - if not self.joint_prefix: - return list(names) - return [f"{self.joint_prefix}{j}" for j in names] - - @property - def joint_name_mapping(self) -> dict[str, str]: - self._ensure_prefix() - names = self.joint_names if self.joint_names is not None else self.resolved_joint_names - if not self.joint_prefix: - return {} - return {f"{self.joint_prefix}{j}": j for j in names} - - @property - def coordinator_task_name(self) -> str: - return f"traj_{self.name}" - - # -- Converter methods ---------------------------------------------------- - - def to_robot_model_config(self) -> RobotModelConfig: - """Generate RobotModelConfig for ManipulationModule.""" - if self.end_effector_link is None: - raise ValueError( - f"RobotConfig '{self.name}' has no end_effector_link — " - "cannot generate RobotModelConfig for manipulation." - ) - if self.model_path is None: - raise ValueError( - f"RobotConfig '{self.name}' has no model_path — " - "cannot generate RobotModelConfig for manipulation." - ) - bp = self.base_pose - base_pose = PoseStamped( - position=Vector3(x=bp[0], y=bp[1], z=bp[2]), - orientation=Quaternion(bp[3], bp[4], bp[5], bp[6]), - ) - - exclusions = list(self.collision_exclusion_pairs) - if self.gripper: - exclusions.extend(self.gripper.collision_exclusions) - - # Use direct fields when available to avoid triggering model parsing at import time - joint_names = ( - self.joint_names if self.joint_names is not None else self.resolved_joint_names - ) - base_link = self.base_link if self.base_link is not None else self.resolved_base_link - - return RobotModelConfig( - name=self.name, - model_path=self.model_path, - base_pose=base_pose, - joint_names=joint_names, - end_effector_link=self.end_effector_link, - base_link=base_link, - package_paths=self.package_paths, - xacro_args=self.xacro_args, - collision_exclusion_pairs=exclusions, - auto_convert_meshes=self.auto_convert_meshes, - max_velocity=self.max_velocity, - max_acceleration=self.max_acceleration, - joint_name_mapping=self.joint_name_mapping, - coordinator_task_name=self.coordinator_task_name, - gripper_hardware_id=self.name if self.gripper else None, - tf_extra_links=self.tf_extra_links, - home_joints=self.home_joints, - pre_grasp_offset=self.pre_grasp_offset, - ) - - def to_hardware_component(self) -> HardwareComponent: - """Generate HardwareComponent for ControlCoordinator.""" - self._ensure_prefix() - gripper_joints: list[str] = [] - if self.gripper and self.gripper.joints: - gripper_joints = [f"{self.joint_prefix}{j}" for j in self.gripper.joints] - - adapter_kwargs = dict(self.adapter_kwargs) - if self.home_joints is not None: - adapter_kwargs.setdefault("initial_positions", self.home_joints) - - return HardwareComponent( - hardware_id=self.name, - hardware_type=HardwareType.MANIPULATOR, - joints=self.coordinator_joint_names, - adapter_type=self.adapter_type, - address=self.address, - auto_enable=self.auto_enable, - gripper_joints=gripper_joints, - adapter_kwargs=adapter_kwargs, - ) - - def to_task_config( - self, - task_type: str | None = None, - task_name: str | None = None, - priority: int | None = None, - auto_start: bool = False, - **task_kwargs: Any, - ) -> TaskConfig: - """Generate TaskConfig for ControlCoordinator. - - Args: - task_type: Override task type (default: self.task_type). - task_name: Override task name (default: self.coordinator_task_name). - priority: Override priority (default: self.task_priority). - auto_start: Whether the coordinator should start this task on startup. - **task_kwargs: Task-specific params (e.g., model_path, - ee_joint_id, hand, gripper_joint, gripper_open_pos, gripper_closed_pos). - """ - params = dict(task_kwargs.pop("params", {})) - params.update(task_kwargs) - - return TaskConfig( - name=task_name if task_name is not None else self.coordinator_task_name, - type=task_type if task_type is not None else self.task_type, - joint_names=self.coordinator_joint_names, - priority=priority if priority is not None else self.task_priority, - auto_start=auto_start, - params=params, - ) - - -__all__ = [ - "GripperConfig", - "RobotConfig", -] diff --git a/dimos/robot/diy/alfred/config.py b/dimos/robot/diy/alfred/config.py index c3410f786b..163e14ad4b 100644 --- a/dimos/robot/diy/alfred/config.py +++ b/dimos/robot/diy/alfred/config.py @@ -14,10 +14,12 @@ from __future__ import annotations +from dataclasses import dataclass, field +from typing import Any + from dimos.msgs.geometry_msgs.Pose import Pose from dimos.msgs.geometry_msgs.Quaternion import Quaternion from dimos.msgs.geometry_msgs.Vector3 import Vector3 -from dimos.robot.config import RobotConfig from dimos.robot.unitree.g1.config import G1_LOCAL_PLANNER_PRECOMPUTED_PATHS DEFAULT_ADDRESS = "172.6.2.20:11323" @@ -25,7 +27,18 @@ # just as a starting point. May re-compute these later. In principle robot-specific LOCAL_PLANNER_PRECOMPUTED_PATHS = G1_LOCAL_PLANNER_PRECOMPUTED_PATHS -ALFRED = RobotConfig( + +@dataclass(frozen=True) +class AlfredConfig: + """Physical metadata used by Alfred navigation and sensor blueprints.""" + + name: str + height_clearance: float + width_clearance: float + internal_odom_offsets: dict[str, Any] = field(default_factory=dict) + + +ALFRED = AlfredConfig( name="alfred", height_clearance=2.0, # meters width_clearance=1.0, diff --git a/dimos/robot/manipulators/a750/blueprints.py b/dimos/robot/manipulators/a750/blueprints.py index 5d86f9497a..cb2155b756 100644 --- a/dimos/robot/manipulators/a750/blueprints.py +++ b/dimos/robot/manipulators/a750/blueprints.py @@ -22,42 +22,97 @@ dimos run keyboard-teleop-a750 """ -from dimos.control.coordinator import ControlCoordinator +import math + +from dimos.control.blueprints._hardware import A750_FK_MODEL, a750 +from dimos.control.coordinator import ControlCoordinator, TaskConfig from dimos.core.coordination.blueprints import autoconnect -from dimos.core.global_config import global_config from dimos.manipulation.manipulation_module import ManipulationModule -from dimos.robot.catalog.a750 import A750_FK_MODEL, a750 as _catalog_a750 +from dimos.manipulation.planning.spec.config import RobotModelConfig +from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped +from dimos.msgs.geometry_msgs.Quaternion import Quaternion +from dimos.msgs.geometry_msgs.Vector3 import Vector3 from dimos.teleop.keyboard.keyboard_teleop_module import KeyboardTeleopModule +from dimos.utils.data import LfsPath -_a750_cfg = _catalog_a750( - name="arm", - adapter_type="a750" if global_config.device_path else "mock", - device_path=global_config.device_path or "/dev/ttyACM0", -) +A750_GRIPPER_COLLISION_EXCLUSIONS: list[tuple[str, str]] = [ + ("base_link", "link1"), + ("base_link", "link2"), + ("left_finger_link", "link3"), + ("left_finger_link", "link4"), + ("left_finger_link", "link5"), + ("left_finger_link", "link6"), + ("left_finger_link", "right_finger_link"), + ("link1", "link2"), + ("link2", "link3"), + ("link2", "link4"), + ("link3", "link4"), + ("link3", "link5"), + ("link3", "right_finger_link"), + ("link4", "link5"), + ("link4", "link6"), + ("link4", "right_finger_link"), + ("link5", "link6"), + ("link5", "right_finger_link"), + ("link6", "right_finger_link"), +] + +_A750_MODEL_PATH = LfsPath("a750_description") / "urdf/a750_rev1.urdf" +_A750_HOME_JOINTS = [0.0, 0.0, -math.radians(90), 0.0, 0.0, 0.0] +_A750_PACKAGE_PATHS = { + "a750_description": LfsPath("a750_description"), + "a750_gazebo": LfsPath("a750_description"), +} + + +def _a750_model_config() -> RobotModelConfig: + return RobotModelConfig( + name="arm", + model_path=_A750_MODEL_PATH, + base_pose=PoseStamped( + position=Vector3(x=0.0, y=0.0, z=0.0), + orientation=Quaternion(0.0, 0.0, 0.0, 1.0), + ), + joint_names=[f"joint{i}" for i in range(1, 7)], + end_effector_link="gripper_base", + base_link="base_link", + package_paths=_A750_PACKAGE_PATHS, + auto_convert_meshes=True, + collision_exclusion_pairs=A750_GRIPPER_COLLISION_EXCLUSIONS, + joint_name_mapping={f"arm/joint{i}": f"joint{i}" for i in range(1, 7)}, + coordinator_task_name="traj_arm", + gripper_hardware_id="arm", + home_joints=_A750_HOME_JOINTS, + ) + + +_a750_hw = a750("arm", mock_without_address=True) -# Piper 6-DOF mock sim + keyboard teleop + Drake visualization +# A-750 6-DOF mock sim + keyboard teleop + Drake visualization keyboard_teleop_a750 = autoconnect( KeyboardTeleopModule.blueprint( model_path=A750_FK_MODEL, - ee_joint_id=_a750_cfg.dof, - home_joints=_a750_cfg.home_joints, + ee_joint_id=6, + home_joints=_A750_HOME_JOINTS, + joint_names=_a750_hw.joints, ), ControlCoordinator.blueprint( tick_rate=100.0, publish_joint_state=True, joint_state_frame_id="coordinator", - hardware=[_a750_cfg.to_hardware_component()], + hardware=[_a750_hw], tasks=[ - _a750_cfg.to_task_config( - task_type="cartesian_ik", - task_name="cartesian_ik_arm", - model_path=A750_FK_MODEL, - ee_joint_id=_a750_cfg.dof, + TaskConfig( + name="cartesian_ik_arm", + type="cartesian_ik", + joint_names=_a750_hw.joints, + priority=10, + params={"model_path": A750_FK_MODEL, "ee_joint_id": 6}, ), ], ), ManipulationModule.blueprint( - robots=[_a750_cfg.to_robot_model_config()], + robots=[_a750_model_config()], visualization={"backend": "meshcat"}, ), ) diff --git a/dimos/robot/manipulators/openarm/blueprints.py b/dimos/robot/manipulators/openarm/blueprints.py index 53388102fb..4b1d9846f7 100644 --- a/dimos/robot/manipulators/openarm/blueprints.py +++ b/dimos/robot/manipulators/openarm/blueprints.py @@ -16,49 +16,158 @@ from __future__ import annotations -from dimos.control.coordinator import ControlCoordinator +from typing import Any + +from dimos.control.components import HardwareComponent, HardwareType +from dimos.control.coordinator import ControlCoordinator, TaskConfig from dimos.core.coordination.blueprints import autoconnect from dimos.manipulation.manipulation_module import ManipulationModule -from dimos.robot.catalog.openarm import ( - OPENARM_V10_FK_MODEL, - openarm_arm as _openarm, - openarm_single as _openarm_single, -) +from dimos.manipulation.planning.spec.config import RobotModelConfig +from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped +from dimos.msgs.geometry_msgs.Quaternion import Quaternion +from dimos.msgs.geometry_msgs.Vector3 import Vector3 from dimos.teleop.keyboard.keyboard_teleop_module import KeyboardTeleopModule +from dimos.utils.data import LfsPath + +OPENARM_COLLISION_EXCLUSIONS: list[tuple[str, str]] = [ + ("openarm_left_link5", "openarm_left_link7"), + ("openarm_right_link5", "openarm_right_link7"), +] + +_OPENARM_PKG = LfsPath("openarm_description") +_OPENARM_LEFT_MODEL = _OPENARM_PKG / "urdf/robot/openarm_v10_left.urdf" +_OPENARM_RIGHT_MODEL = _OPENARM_PKG / "urdf/robot/openarm_v10_right.urdf" +OPENARM_V10_FK_MODEL = _OPENARM_PKG / "urdf/robot/openarm_v10_single.urdf" + + +def _base_pose() -> PoseStamped: + return PoseStamped( + position=Vector3(x=0.0, y=0.0, z=0.0), + orientation=Quaternion(0.0, 0.0, 0.0, 1.0), + ) + + +def _validate_side(side: str) -> None: + if side not in ("left", "right"): + raise ValueError(f"side must be 'left' or 'right', got {side!r}") + + +def _openarm_joints(side: str) -> list[str]: + _validate_side(side) + return [f"openarm_{side}_joint{i}" for i in range(1, 8)] + + +def _openarm_hardware( + side: str, + name: str | None = None, + *, + adapter_type: str = "mock", + address: str | None = None, + adapter_kwargs: dict[str, Any] | None = None, +) -> HardwareComponent: + _validate_side(side) + kwargs = {"side": side} + if adapter_kwargs: + kwargs.update(adapter_kwargs) + return HardwareComponent( + hardware_id=name or f"{side}_arm", + hardware_type=HardwareType.MANIPULATOR, + joints=_openarm_joints(side), + adapter_type=adapter_type, + address=address, + adapter_kwargs=kwargs, + ) -# ── Mock bimanual: no hardware, great for verifying wiring ───────────── -_mock_left = _openarm(side="left") -_mock_right = _openarm(side="right") + +def _openarm_model_config(side: str, name: str | None = None) -> RobotModelConfig: + _validate_side(side) + resolved_name = name or f"{side}_arm" + return RobotModelConfig( + name=resolved_name, + model_path=_OPENARM_LEFT_MODEL if side == "left" else _OPENARM_RIGHT_MODEL, + base_pose=_base_pose(), + joint_names=_openarm_joints(side), + end_effector_link=f"openarm_{side}_link7", + base_link="openarm_body_link0", + package_paths={"openarm_description": _OPENARM_PKG}, + collision_exclusion_pairs=OPENARM_COLLISION_EXCLUSIONS, + auto_convert_meshes=True, + max_velocity=0.5, + max_acceleration=1.0, + coordinator_task_name=f"traj_{resolved_name}", + home_joints=[0.0] * 7, + ) + + +def _openarm_task(hw: HardwareComponent, name: str | None = None) -> TaskConfig: + return TaskConfig( + name=name or f"traj_{hw.hardware_id}", + type="trajectory", + joint_names=hw.joints, + priority=10, + ) + + +def _openarm_single_hardware( + *, + adapter_type: str = "mock", + address: str | None = None, +) -> HardwareComponent: + return _openarm_hardware( + "left", + name="arm", + adapter_type=adapter_type, + address=address, + ) + + +def _openarm_single_model_config() -> RobotModelConfig: + return RobotModelConfig( + name="arm", + model_path=OPENARM_V10_FK_MODEL, + base_pose=_base_pose(), + joint_names=_openarm_joints("left"), + end_effector_link="openarm_left_link7", + base_link="openarm_body_link0", + package_paths={"openarm_description": _OPENARM_PKG}, + auto_convert_meshes=True, + max_velocity=0.5, + max_acceleration=1.0, + coordinator_task_name="traj_arm", + home_joints=[0.0] * 7, + ) + + +# Mock bimanual: no hardware, great for verifying wiring. +_mock_left = _openarm_hardware(side="left") +_mock_right = _openarm_hardware(side="right") coordinator_openarm_mock = ControlCoordinator.blueprint( - hardware=[_mock_left.to_hardware_component(), _mock_right.to_hardware_component()], + hardware=[_mock_left, _mock_right], tasks=[ - _mock_left.to_task_config(), - _mock_right.to_task_config(), + _openarm_task(_mock_left), + _openarm_task(_mock_right), ], ) -# ── Single-arm hardware blueprints (first real bring-up targets) ─────── +# Single-arm hardware blueprints (first real bring-up targets). # CAN interface each physical arm is on. Linux assigns can0/can1 in USB -# enumeration order which isn't guaranteed stable — if the arms come up -# swapped, flip these two values. +# enumeration order which is not guaranteed stable; if swapped, flip these. LEFT_CAN = "can1" RIGHT_CAN = "can0" -# Flip to False to skip the CTRL_MODE=MIT write at connect-time — useful for -# verifying the setting persists across power cycles. Leave True for normal -# operation (idempotent; ensures motors work even if they were reflashed / -# replaced / factory-reset). +# Flip to False to skip the CTRL_MODE=MIT write at connect-time. Leave True for +# normal operation; it is idempotent and ensures motors are in the expected mode. AUTO_SET_MIT_MODE = True _ADAPTER_KWARGS = {"auto_set_mit_mode": AUTO_SET_MIT_MODE} -_left_hw = _openarm( +_left_hw = _openarm_hardware( side="left", address=LEFT_CAN, adapter_type="openarm", adapter_kwargs=_ADAPTER_KWARGS, ) -_right_hw = _openarm( +_right_hw = _openarm_hardware( side="right", address=RIGHT_CAN, adapter_type="openarm", @@ -66,102 +175,113 @@ ) coordinator_openarm_left = ControlCoordinator.blueprint( - hardware=[_left_hw.to_hardware_component()], - tasks=[_left_hw.to_task_config()], + hardware=[_left_hw], + tasks=[_openarm_task(_left_hw)], ) coordinator_openarm_right = ControlCoordinator.blueprint( - hardware=[_right_hw.to_hardware_component()], - tasks=[_right_hw.to_task_config()], + hardware=[_right_hw], + tasks=[_openarm_task(_right_hw)], ) -# ── Bimanual hardware blueprint ──────────────────────────────────────── coordinator_openarm_bimanual = ControlCoordinator.blueprint( - hardware=[_left_hw.to_hardware_component(), _right_hw.to_hardware_component()], + hardware=[_left_hw, _right_hw], tasks=[ - _left_hw.to_task_config(), - _right_hw.to_task_config(), + _openarm_task(_left_hw), + _openarm_task(_right_hw), ], ) -# ── Planner + coordinator (mock): Drake plans, mock adapters execute ──── -# Great for visualizing motions in Meshcat with no hardware. +# Planner + coordinator (mock): Drake plans, mock adapters execute. openarm_mock_planner_coordinator = autoconnect( ManipulationModule.blueprint( - robots=[_mock_left.to_robot_model_config(), _mock_right.to_robot_model_config()], + robots=[ + _openarm_model_config("left"), + _openarm_model_config("right"), + ], planning_timeout=10.0, visualization={"backend": "meshcat"}, ), ControlCoordinator.blueprint( - hardware=[_mock_left.to_hardware_component(), _mock_right.to_hardware_component()], + hardware=[_mock_left, _mock_right], tasks=[ - _mock_left.to_task_config(), - _mock_right.to_task_config(), + _openarm_task(_mock_left), + _openarm_task(_mock_right), ], ), ) -# ── Planner + coordinator (real hw): plan & execute on both arms ──────── +# Planner + coordinator (real hw): plan and execute on both arms. openarm_planner_coordinator = autoconnect( ManipulationModule.blueprint( - robots=[_left_hw.to_robot_model_config(), _right_hw.to_robot_model_config()], + robots=[ + _openarm_model_config("left"), + _openarm_model_config("right"), + ], planning_timeout=10.0, visualization={"backend": "meshcat"}, ), ControlCoordinator.blueprint( - hardware=[_left_hw.to_hardware_component(), _right_hw.to_hardware_component()], + hardware=[_left_hw, _right_hw], tasks=[ - _left_hw.to_task_config(), - _right_hw.to_task_config(), + _openarm_task(_left_hw), + _openarm_task(_right_hw), ], ), ) -# ── Keyboard teleop (single arm, mock) ────────────────────────────────── -# pygame keyboard UI → Cartesian IK (Drake) → mock coordinator execution, -# with Drake/Meshcat visualization. Good for testing the single-arm URDF -# and IK without touching hardware. -_teleop_cfg = _openarm_single(name="arm") +# Keyboard teleop (single arm, mock). +_teleop_hw = _openarm_single_hardware() keyboard_teleop_openarm_mock = autoconnect( - KeyboardTeleopModule.blueprint(model_path=OPENARM_V10_FK_MODEL, ee_joint_id=_teleop_cfg.dof), + KeyboardTeleopModule.blueprint( + model_path=OPENARM_V10_FK_MODEL, + ee_joint_id=7, + joint_names=_teleop_hw.joints, + ), ControlCoordinator.blueprint( - hardware=[_teleop_cfg.to_hardware_component()], + hardware=[_teleop_hw], tasks=[ - _teleop_cfg.to_task_config( - task_type="cartesian_ik", - task_name="cartesian_ik_arm", - model_path=OPENARM_V10_FK_MODEL, - ee_joint_id=_teleop_cfg.dof, + TaskConfig( + name="cartesian_ik_arm", + type="cartesian_ik", + joint_names=_teleop_hw.joints, + priority=10, + params={"model_path": OPENARM_V10_FK_MODEL, "ee_joint_id": 7}, ), ], ), ManipulationModule.blueprint( - robots=[_teleop_cfg.to_robot_model_config()], + robots=[_openarm_single_model_config()], visualization={"backend": "meshcat"}, ), ) -# ── Keyboard teleop (single arm, real hw on can0) ─────────────────────── -_teleop_hw_cfg = _openarm_single(name="arm", adapter_type="openarm", address=LEFT_CAN) +# Keyboard teleop (single arm, real hw on can0). +_teleop_real_hw = _openarm_single_hardware(adapter_type="openarm", address=LEFT_CAN) keyboard_teleop_openarm = autoconnect( - KeyboardTeleopModule.blueprint(model_path=OPENARM_V10_FK_MODEL, ee_joint_id=_teleop_hw_cfg.dof), + KeyboardTeleopModule.blueprint( + model_path=OPENARM_V10_FK_MODEL, + ee_joint_id=7, + joint_names=_teleop_real_hw.joints, + ), ControlCoordinator.blueprint( - hardware=[_teleop_hw_cfg.to_hardware_component()], + hardware=[_teleop_real_hw], tasks=[ - _teleop_hw_cfg.to_task_config( - task_type="cartesian_ik", - task_name="cartesian_ik_arm", - model_path=OPENARM_V10_FK_MODEL, - ee_joint_id=_teleop_hw_cfg.dof, + TaskConfig( + name="cartesian_ik_arm", + type="cartesian_ik", + joint_names=_teleop_real_hw.joints, + priority=10, + params={"model_path": OPENARM_V10_FK_MODEL, "ee_joint_id": 7}, ), ], ), ManipulationModule.blueprint( - robots=[_teleop_hw_cfg.to_robot_model_config()], + robots=[_openarm_single_model_config()], visualization={"backend": "meshcat"}, ), ) diff --git a/dimos/robot/manipulators/piper/blueprints.py b/dimos/robot/manipulators/piper/blueprints.py index 000472753b..ec6925d6ff 100644 --- a/dimos/robot/manipulators/piper/blueprints.py +++ b/dimos/robot/manipulators/piper/blueprints.py @@ -22,38 +22,85 @@ dimos run keyboard-teleop-piper """ -from dimos.control.coordinator import ControlCoordinator +from dimos.control.blueprints._hardware import PIPER_FK_MODEL, manipulator +from dimos.control.coordinator import ControlCoordinator, TaskConfig from dimos.core.coordination.blueprints import autoconnect from dimos.core.global_config import global_config from dimos.manipulation.manipulation_module import ManipulationModule -from dimos.robot.catalog.piper import PIPER_FK_MODEL, piper as _catalog_piper +from dimos.manipulation.planning.spec.config import RobotModelConfig +from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped +from dimos.msgs.geometry_msgs.Quaternion import Quaternion +from dimos.msgs.geometry_msgs.Vector3 import Vector3 from dimos.teleop.keyboard.keyboard_teleop_module import KeyboardTeleopModule +from dimos.utils.data import LfsPath -_piper_cfg = _catalog_piper( - name="arm", +PIPER_GRIPPER_COLLISION_EXCLUSIONS: list[tuple[str, str]] = [ + ("gripper_base", "link7"), + ("gripper_base", "link8"), + ("link7", "link8"), + ("link6", "gripper_base"), +] + +_PIPER_MODEL_PATH = LfsPath("piper_description") / "urdf/piper_description.xacro" +_PIPER_PACKAGE_PATHS = { + "piper_description": LfsPath("piper_description"), + "piper_gazebo": LfsPath("piper_description"), +} + + +def _piper_model_config() -> RobotModelConfig: + return RobotModelConfig( + name="arm", + model_path=_PIPER_MODEL_PATH, + base_pose=PoseStamped( + position=Vector3(x=0.0, y=0.0, z=0.0), + orientation=Quaternion(0.0, 0.0, 0.0, 1.0), + ), + joint_names=[f"joint{i}" for i in range(1, 7)], + end_effector_link="gripper_base", + base_link="base_link", + package_paths=_PIPER_PACKAGE_PATHS, + auto_convert_meshes=True, + collision_exclusion_pairs=PIPER_GRIPPER_COLLISION_EXCLUSIONS, + joint_name_mapping={f"arm/joint{i}": f"joint{i}" for i in range(1, 7)}, + coordinator_task_name="traj_arm", + gripper_hardware_id="arm", + home_joints=[0.0] * 6, + ) + + +_piper_hw = manipulator( + "arm", + 6, adapter_type="piper" if global_config.can_port else "mock", address=global_config.can_port or "can0", + gripper=True, ) # Piper 6-DOF mock sim + keyboard teleop + Drake visualization keyboard_teleop_piper = autoconnect( - KeyboardTeleopModule.blueprint(model_path=PIPER_FK_MODEL, ee_joint_id=_piper_cfg.dof), + KeyboardTeleopModule.blueprint( + model_path=PIPER_FK_MODEL, + ee_joint_id=6, + joint_names=_piper_hw.joints, + ), ControlCoordinator.blueprint( tick_rate=100.0, publish_joint_state=True, joint_state_frame_id="coordinator", - hardware=[_piper_cfg.to_hardware_component()], + hardware=[_piper_hw], tasks=[ - _piper_cfg.to_task_config( - task_type="cartesian_ik", - task_name="cartesian_ik_arm", - model_path=PIPER_FK_MODEL, - ee_joint_id=_piper_cfg.dof, + TaskConfig( + name="cartesian_ik_arm", + type="cartesian_ik", + joint_names=_piper_hw.joints, + priority=10, + params={"model_path": PIPER_FK_MODEL, "ee_joint_id": 6}, ), ], ), ManipulationModule.blueprint( - robots=[_piper_cfg.to_robot_model_config()], + robots=[_piper_model_config()], visualization={"backend": "meshcat"}, ), ) diff --git a/dimos/robot/manipulators/xarm/blueprints.py b/dimos/robot/manipulators/xarm/blueprints.py index 88e4bfbe30..67828b0086 100644 --- a/dimos/robot/manipulators/xarm/blueprints.py +++ b/dimos/robot/manipulators/xarm/blueprints.py @@ -23,54 +23,84 @@ dimos run keyboard-teleop-xarm7 """ -from dimos.control.coordinator import ControlCoordinator +from dimos.control.blueprints._hardware import XARM6_FK_MODEL, XARM7_FK_MODEL, manipulator +from dimos.control.coordinator import ControlCoordinator, TaskConfig from dimos.core.coordination.blueprints import autoconnect from dimos.core.global_config import global_config from dimos.manipulation.manipulation_module import ManipulationModule -from dimos.robot.catalog.ufactory import ( - XARM6_FK_MODEL, - XARM7_FK_MODEL, - xarm6 as _catalog_xarm6, - xarm7 as _catalog_xarm7, -) +from dimos.manipulation.planning.spec.config import RobotModelConfig +from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped +from dimos.msgs.geometry_msgs.Quaternion import Quaternion +from dimos.msgs.geometry_msgs.Vector3 import Vector3 from dimos.teleop.keyboard.keyboard_teleop_module import KeyboardTeleopModule +from dimos.utils.data import LfsPath + +_XARM_MODEL_PATH = LfsPath("xarm_description") / "urdf/xarm_device.urdf.xacro" +_XARM_PACKAGE_PATHS = {"xarm_description": LfsPath("xarm_description")} + + +def _xarm_model_config(dof: int) -> RobotModelConfig: + return RobotModelConfig( + name="arm", + model_path=_XARM_MODEL_PATH, + base_pose=PoseStamped( + position=Vector3(x=0.0, y=0.0, z=0.0), + orientation=Quaternion(0.0, 0.0, 0.0, 1.0), + ), + joint_names=[f"joint{i}" for i in range(1, dof + 1)], + end_effector_link=f"link{dof}", + base_link="link_base", + package_paths=_XARM_PACKAGE_PATHS, + xacro_args={ + "dof": str(dof), + "limited": "true", + "attach_xyz": "0.0 0.0 0.0", + "attach_rpy": "0 0.0 0", + }, + auto_convert_meshes=True, + joint_name_mapping={f"arm/joint{i}": f"joint{i}" for i in range(1, dof + 1)}, + coordinator_task_name="traj_arm", + home_joints=[0.0] * dof, + ) + -_xarm6_cfg = _catalog_xarm6( - name="arm", - add_gripper=False, +_xarm6_hw = manipulator( + "arm", + 6, adapter_type="xarm" if global_config.xarm6_ip else "mock", - address=global_config.xarm6_ip or None, + address=global_config.xarm6_ip, ) -_xarm7_cfg = _catalog_xarm7( - name="arm", - add_gripper=False, +_xarm7_hw = manipulator( + "arm", + 7, adapter_type="xarm" if global_config.xarm7_ip else "mock", - address=global_config.xarm7_ip or None, + address=global_config.xarm7_ip, ) # XArm6 mock sim + keyboard teleop + Drake visualization keyboard_teleop_xarm6 = autoconnect( KeyboardTeleopModule.blueprint( model_path=XARM6_FK_MODEL, - ee_joint_id=_xarm6_cfg.dof, - joint_names=_xarm6_cfg.coordinator_joint_names, + ee_joint_id=6, + joint_names=_xarm6_hw.joints, ), ControlCoordinator.blueprint( tick_rate=100.0, publish_joint_state=True, joint_state_frame_id="coordinator", - hardware=[_xarm6_cfg.to_hardware_component()], + hardware=[_xarm6_hw], tasks=[ - _xarm6_cfg.to_task_config( - task_type="cartesian_ik", - task_name="cartesian_ik_arm", - model_path=XARM6_FK_MODEL, - ee_joint_id=_xarm6_cfg.dof, + TaskConfig( + name="cartesian_ik_arm", + type="cartesian_ik", + joint_names=_xarm6_hw.joints, + priority=10, + params={"model_path": XARM6_FK_MODEL, "ee_joint_id": 6}, ), ], ), ManipulationModule.blueprint( - robots=[_xarm6_cfg.to_robot_model_config()], + robots=[_xarm_model_config(6)], visualization={"backend": "meshcat"}, ), ) @@ -79,25 +109,26 @@ keyboard_teleop_xarm7 = autoconnect( KeyboardTeleopModule.blueprint( model_path=XARM7_FK_MODEL, - ee_joint_id=_xarm7_cfg.dof, - joint_names=_xarm7_cfg.coordinator_joint_names, + ee_joint_id=7, + joint_names=_xarm7_hw.joints, ), ControlCoordinator.blueprint( tick_rate=100.0, publish_joint_state=True, joint_state_frame_id="coordinator", - hardware=[_xarm7_cfg.to_hardware_component()], + hardware=[_xarm7_hw], tasks=[ - _xarm7_cfg.to_task_config( - task_type="cartesian_ik", - task_name="cartesian_ik_arm", - model_path=XARM7_FK_MODEL, - ee_joint_id=_xarm7_cfg.dof, + TaskConfig( + name="cartesian_ik_arm", + type="cartesian_ik", + joint_names=_xarm7_hw.joints, + priority=10, + params={"model_path": XARM7_FK_MODEL, "ee_joint_id": 7}, ), ], ), ManipulationModule.blueprint( - robots=[_xarm7_cfg.to_robot_model_config()], + robots=[_xarm_model_config(7)], visualization={"backend": "meshcat"}, ), ) diff --git a/dimos/robot/unitree/g1/config.py b/dimos/robot/unitree/g1/config.py index 7a5fd42bd7..24e85466be 100644 --- a/dimos/robot/unitree/g1/config.py +++ b/dimos/robot/unitree/g1/config.py @@ -16,12 +16,13 @@ from __future__ import annotations +from dataclasses import dataclass, field from pathlib import Path +from typing import Any from dimos.msgs.geometry_msgs.Pose import Pose from dimos.msgs.geometry_msgs.Quaternion import Quaternion from dimos.msgs.geometry_msgs.Vector3 import Vector3 -from dimos.robot.config import RobotConfig from dimos.utils.data import LfsPath # this is robot-specific, but only needed for the local_planner module @@ -29,7 +30,19 @@ # probably only needs to be regenerated on robots that are notably different than the g1 (the go2 in rage mode probably needs different local planning paths) G1_LOCAL_PLANNER_PRECOMPUTED_PATHS = LfsPath("unitree_g1_local_planner_precomputed_paths") -G1 = RobotConfig( + +@dataclass(frozen=True) +class G1Config: + """Physical metadata used by G1 navigation and sensor blueprints.""" + + name: str + model_path: Path + height_clearance: float + width_clearance: float + internal_odom_offsets: dict[str, Any] = field(default_factory=dict) + + +G1 = G1Config( name="unitree_g1", model_path=Path(__file__).parent / "g1.urdf", height_clearance=1.2, diff --git a/docs/capabilities/manipulation/a750.md b/docs/capabilities/manipulation/a750.md index 51b78f8205..8f4402ead0 100644 --- a/docs/capabilities/manipulation/a750.md +++ b/docs/capabilities/manipulation/a750.md @@ -49,13 +49,14 @@ DEVICE_PATH=/dev/ttyACM0 dimos run keyboard-teleop-a750 ## Robot Model -The robot catalog entry is defined in [`dimos/robot/catalog/a750.py`](/dimos/robot/catalog/a750.py). +The robot model, hardware, and task config used by the runnable stack are +defined in [`dimos/robot/manipulators/a750/blueprints.py`](/dimos/robot/manipulators/a750/blueprints.py). | Field | Value | |-------|-------| | Arm joints | `joint1` through `joint6` | | Arm DOF | 6 | -| Gripper joints | `joint7`, `joint8` in the URDF; exposed as `finger` in the gripper config | +| Gripper joints | `joint7`, `joint8` in the URDF; exposed as `arm/finger` in coordinator hardware | | Base link | `base_link` | | End-effector link | `gripper_base` | | Home joints | `[0, 0, -90 deg, 0, 0, 0]` | @@ -66,7 +67,7 @@ The no-gripper model is used for Pinocchio FK in keyboard teleop because Pinocch ## Gripper -The catalog defines a parallel-jaw gripper: +The blueprint configures a parallel-jaw gripper: | Parameter | Value | |-----------|-------| diff --git a/docs/capabilities/manipulation/openarm_integration.md b/docs/capabilities/manipulation/openarm_integration.md index 6869864f5a..b70d9e9f55 100644 --- a/docs/capabilities/manipulation/openarm_integration.md +++ b/docs/capabilities/manipulation/openarm_integration.md @@ -40,9 +40,8 @@ dimos/hardware/manipulators/openarm/ ├── test_driver.py # 13 unit tests (virtual CAN loopback, no hardware) └── test_adapter.py # 11 unit tests (virtual CAN + mock state frames) -dimos/robot/catalog/openarm.py # openarm_arm() and openarm_single() config factories dimos/robot/manipulators/openarm/ -├── blueprints.py # coordinator-*, planner-*, keyboard-teleop-* blueprints +├── blueprints.py # coordinator-*, planner-*, keyboard-teleop-* blueprints and model config └── scripts/ # bring-up + diagnostic scripts (run manually by humans) ├── openarm_can_up.sh # bring SocketCAN interfaces up (needs sudo) ├── openarm_can_probe.py # enumerate & read state from all 8 motors @@ -336,7 +335,7 @@ Persistent across power cycles. - **`ip link ... fd on` → `Operation not supported`.** gs_usb firmware doesn't support CAN-FD. Use classical CAN @ 1 Mbit (our bringup script's default). - **Motors reply to probes but commands do nothing.** CTRL_MODE is not MIT. The adapter now writes MIT on connect, but if you disabled that and motors got reset, run `openarm_set_mit_mode.py`. -- **`COLLISION_AT_START` during planning.** `link5` and `link7` collision meshes overlap by 3 mm at every configuration. Handled by `OPENARM_COLLISION_EXCLUSIONS` in the catalog. If you see it anyway, the exclusion pairs may not be getting applied — check that the collision filter log line appears during world build. +- **`COLLISION_AT_START` during planning.** `link5` and `link7` collision meshes overlap by 3 mm at every configuration. Handled by `OPENARM_COLLISION_EXCLUSIONS` in the OpenArm blueprint module. If you see it anyway, the exclusion pairs may not be getting applied — check that the collision filter log line appears during world build. - **`INVALID_START` during planning.** Hardware encoder noise pushed a joint 1 mrad past a URDF limit. Joint4 used to be exactly `lower=0.0` which tripped this — it's now `-0.01` to give breathing room. If you see it on a different joint, widen that limit by ~10 mrad. - **"Transmit buffer full" (ENOBUFS) at 100 Hz.** Kernel TX queue too small. The bringup script sets `txqueuelen 1000`; the driver also retries on ENOBUFS. If you still see the error, check `ip -details link show canX | grep qlen`. - **Arms swap sides.** USB enumeration order flipped. Swap `LEFT_CAN` / `RIGHT_CAN` in [blueprints.py](/dimos/robot/manipulators/openarm/blueprints.py). @@ -350,7 +349,7 @@ Persistent across power cycles. - **Gravity compensation on by default.** Eliminates steady-state position error without needing high kp. Needs Pinocchio + the per-side URDFs. - **One adapter per CAN bus, keyed by `address`.** Matches the Piper adapter pattern. Bimanual = two adapters with different `address` values. - **Per-side URDFs for Drake planning.** Loading the full 14-DOF bimanual URDF twice (once per robot instance) creates phantom-arm collisions with the "other" arm frozen at zero. The per-side URDFs keep only one arm's links + the torso, avoiding the phantom collisions while matching the bimanual kinematics exactly. -- **URDF stays in-tree (`data/openarm_description/`) for now.** Can migrate to LFS later — only the path constant in the catalog changes. +- **URDF stays in-tree (`data/openarm_description/`) for now.** Can migrate to LFS later — only the path constants in the OpenArm blueprint module change. - **CAN bringup stays manual (`sudo`).** Auto-bringup from `connect()` would need sudo-in-a-library or a systemd unit; the explicit script is clearer and testable. For production, add a oneshot systemd unit that runs the script at boot. --- From 5696614edbc284ea4a496699910f8253629eb3c2 Mon Sep 17 00:00:00 2001 From: mustafab0 Date: Sat, 20 Jun 2026 22:49:58 +0000 Subject: [PATCH 3/4] changed the package path maps to explicit dict[str, Path --- dimos/manipulation/blueprints.py | 3 ++- dimos/robot/manipulators/a750/blueprints.py | 3 ++- dimos/robot/manipulators/openarm/blueprints.py | 6 ++++-- dimos/robot/manipulators/piper/blueprints.py | 4 +++- dimos/robot/manipulators/xarm/blueprints.py | 4 +++- 5 files changed, 14 insertions(+), 6 deletions(-) diff --git a/dimos/manipulation/blueprints.py b/dimos/manipulation/blueprints.py index 61df8004c6..e49a11417a 100644 --- a/dimos/manipulation/blueprints.py +++ b/dimos/manipulation/blueprints.py @@ -28,6 +28,7 @@ """ import math +from pathlib import Path from typing import Any from dimos.agents.mcp.mcp_client import McpClient @@ -69,7 +70,7 @@ ] _XARM_MODEL_PATH = LfsPath("xarm_description") / "urdf/xarm_device.urdf.xacro" -_XARM_PACKAGE_PATHS = {"xarm_description": LfsPath("xarm_description")} +_XARM_PACKAGE_PATHS: dict[str, Path] = {"xarm_description": LfsPath("xarm_description")} def _base_pose(x: float = 0.0, y: float = 0.0, z: float = 0.0) -> PoseStamped: diff --git a/dimos/robot/manipulators/a750/blueprints.py b/dimos/robot/manipulators/a750/blueprints.py index cb2155b756..d4fcc930a1 100644 --- a/dimos/robot/manipulators/a750/blueprints.py +++ b/dimos/robot/manipulators/a750/blueprints.py @@ -23,6 +23,7 @@ """ import math +from pathlib import Path from dimos.control.blueprints._hardware import A750_FK_MODEL, a750 from dimos.control.coordinator import ControlCoordinator, TaskConfig @@ -59,7 +60,7 @@ _A750_MODEL_PATH = LfsPath("a750_description") / "urdf/a750_rev1.urdf" _A750_HOME_JOINTS = [0.0, 0.0, -math.radians(90), 0.0, 0.0, 0.0] -_A750_PACKAGE_PATHS = { +_A750_PACKAGE_PATHS: dict[str, Path] = { "a750_description": LfsPath("a750_description"), "a750_gazebo": LfsPath("a750_description"), } diff --git a/dimos/robot/manipulators/openarm/blueprints.py b/dimos/robot/manipulators/openarm/blueprints.py index 4b1d9846f7..e7470a739f 100644 --- a/dimos/robot/manipulators/openarm/blueprints.py +++ b/dimos/robot/manipulators/openarm/blueprints.py @@ -16,6 +16,7 @@ from __future__ import annotations +from pathlib import Path from typing import Any from dimos.control.components import HardwareComponent, HardwareType @@ -38,6 +39,7 @@ _OPENARM_LEFT_MODEL = _OPENARM_PKG / "urdf/robot/openarm_v10_left.urdf" _OPENARM_RIGHT_MODEL = _OPENARM_PKG / "urdf/robot/openarm_v10_right.urdf" OPENARM_V10_FK_MODEL = _OPENARM_PKG / "urdf/robot/openarm_v10_single.urdf" +_OPENARM_PACKAGE_PATHS: dict[str, Path] = {"openarm_description": _OPENARM_PKG} def _base_pose() -> PoseStamped: @@ -89,7 +91,7 @@ def _openarm_model_config(side: str, name: str | None = None) -> RobotModelConfi joint_names=_openarm_joints(side), end_effector_link=f"openarm_{side}_link7", base_link="openarm_body_link0", - package_paths={"openarm_description": _OPENARM_PKG}, + package_paths=_OPENARM_PACKAGE_PATHS, collision_exclusion_pairs=OPENARM_COLLISION_EXCLUSIONS, auto_convert_meshes=True, max_velocity=0.5, @@ -129,7 +131,7 @@ def _openarm_single_model_config() -> RobotModelConfig: joint_names=_openarm_joints("left"), end_effector_link="openarm_left_link7", base_link="openarm_body_link0", - package_paths={"openarm_description": _OPENARM_PKG}, + package_paths=_OPENARM_PACKAGE_PATHS, auto_convert_meshes=True, max_velocity=0.5, max_acceleration=1.0, diff --git a/dimos/robot/manipulators/piper/blueprints.py b/dimos/robot/manipulators/piper/blueprints.py index ec6925d6ff..44225248d0 100644 --- a/dimos/robot/manipulators/piper/blueprints.py +++ b/dimos/robot/manipulators/piper/blueprints.py @@ -22,6 +22,8 @@ dimos run keyboard-teleop-piper """ +from pathlib import Path + from dimos.control.blueprints._hardware import PIPER_FK_MODEL, manipulator from dimos.control.coordinator import ControlCoordinator, TaskConfig from dimos.core.coordination.blueprints import autoconnect @@ -42,7 +44,7 @@ ] _PIPER_MODEL_PATH = LfsPath("piper_description") / "urdf/piper_description.xacro" -_PIPER_PACKAGE_PATHS = { +_PIPER_PACKAGE_PATHS: dict[str, Path] = { "piper_description": LfsPath("piper_description"), "piper_gazebo": LfsPath("piper_description"), } diff --git a/dimos/robot/manipulators/xarm/blueprints.py b/dimos/robot/manipulators/xarm/blueprints.py index 67828b0086..c23dd6c0d2 100644 --- a/dimos/robot/manipulators/xarm/blueprints.py +++ b/dimos/robot/manipulators/xarm/blueprints.py @@ -23,6 +23,8 @@ dimos run keyboard-teleop-xarm7 """ +from pathlib import Path + from dimos.control.blueprints._hardware import XARM6_FK_MODEL, XARM7_FK_MODEL, manipulator from dimos.control.coordinator import ControlCoordinator, TaskConfig from dimos.core.coordination.blueprints import autoconnect @@ -36,7 +38,7 @@ from dimos.utils.data import LfsPath _XARM_MODEL_PATH = LfsPath("xarm_description") / "urdf/xarm_device.urdf.xacro" -_XARM_PACKAGE_PATHS = {"xarm_description": LfsPath("xarm_description")} +_XARM_PACKAGE_PATHS: dict[str, Path] = {"xarm_description": LfsPath("xarm_description")} def _xarm_model_config(dof: int) -> RobotModelConfig: From fee6656b559a2d1b5e3f3972b6d624da73e0d66b Mon Sep 17 00:00:00 2001 From: mustafab0 Date: Sat, 20 Jun 2026 23:28:28 +0000 Subject: [PATCH 4/4] address blueprint cleanup review comments --- dimos/control/blueprints/mobile.py | 8 +--- dimos/robot/manipulators/xarm/blueprints.py | 41 +++++++++++++++++---- 2 files changed, 35 insertions(+), 14 deletions(-) diff --git a/dimos/control/blueprints/mobile.py b/dimos/control/blueprints/mobile.py index d6f1b5a514..66b2273e16 100644 --- a/dimos/control/blueprints/mobile.py +++ b/dimos/control/blueprints/mobile.py @@ -26,6 +26,7 @@ import os +from dimos.control.blueprints._hardware import mock_arm from dimos.control.components import ( HardwareComponent, HardwareType, @@ -194,12 +195,7 @@ def _flowbase_twist_base( # Mock arm (7-DOF) + mock holonomic base (3-DOF) -_mock_arm_hw = HardwareComponent( - hardware_id="arm", - hardware_type=HardwareType.MANIPULATOR, - joints=[f"arm/joint{i}" for i in range(1, 8)], - adapter_type="mock", -) +_mock_arm_hw = mock_arm("arm", 7) coordinator_mobile_manip_mock = ControlCoordinator.blueprint( hardware=[_mock_arm_hw, _mock_twist_base()], diff --git a/dimos/robot/manipulators/xarm/blueprints.py b/dimos/robot/manipulators/xarm/blueprints.py index c23dd6c0d2..7d48571f67 100644 --- a/dimos/robot/manipulators/xarm/blueprints.py +++ b/dimos/robot/manipulators/xarm/blueprints.py @@ -40,8 +40,36 @@ _XARM_MODEL_PATH = LfsPath("xarm_description") / "urdf/xarm_device.urdf.xacro" _XARM_PACKAGE_PATHS: dict[str, Path] = {"xarm_description": LfsPath("xarm_description")} +XARM_GRIPPER_COLLISION_EXCLUSIONS: list[tuple[str, str]] = [ + ("right_inner_knuckle", "right_outer_knuckle"), + ("left_inner_knuckle", "left_outer_knuckle"), + ("right_inner_knuckle", "right_finger"), + ("left_inner_knuckle", "left_finger"), + ("left_finger", "right_finger"), + ("left_outer_knuckle", "right_outer_knuckle"), + ("left_inner_knuckle", "right_inner_knuckle"), + ("left_outer_knuckle", "right_finger"), + ("right_outer_knuckle", "left_finger"), + ("xarm_gripper_base_link", "left_inner_knuckle"), + ("xarm_gripper_base_link", "right_inner_knuckle"), + ("xarm_gripper_base_link", "left_finger"), + ("xarm_gripper_base_link", "right_finger"), + ("link6", "xarm_gripper_base_link"), + ("link6", "left_outer_knuckle"), + ("link6", "right_outer_knuckle"), +] + + +def _xarm_model_config(dof: int, *, add_gripper: bool = False) -> RobotModelConfig: + xacro_args = { + "dof": str(dof), + "limited": "true", + "attach_xyz": "0.0 0.0 0.0", + "attach_rpy": "0 0.0 0", + } + if add_gripper: + xacro_args["add_gripper"] = "true" -def _xarm_model_config(dof: int) -> RobotModelConfig: return RobotModelConfig( name="arm", model_path=_XARM_MODEL_PATH, @@ -50,18 +78,15 @@ def _xarm_model_config(dof: int) -> RobotModelConfig: orientation=Quaternion(0.0, 0.0, 0.0, 1.0), ), joint_names=[f"joint{i}" for i in range(1, dof + 1)], - end_effector_link=f"link{dof}", + end_effector_link="link_tcp" if add_gripper else f"link{dof}", base_link="link_base", package_paths=_XARM_PACKAGE_PATHS, - xacro_args={ - "dof": str(dof), - "limited": "true", - "attach_xyz": "0.0 0.0 0.0", - "attach_rpy": "0 0.0 0", - }, + xacro_args=xacro_args, auto_convert_meshes=True, + collision_exclusion_pairs=(XARM_GRIPPER_COLLISION_EXCLUSIONS if add_gripper else []), joint_name_mapping={f"arm/joint{i}": f"joint{i}" for i in range(1, dof + 1)}, coordinator_task_name="traj_arm", + gripper_hardware_id="arm" if add_gripper else None, home_joints=[0.0] * dof, )